Commit 5173e85b authored by peguerin's avatar peguerin
Browse files

the whole workflow is into one single snakefile

parent 2ee63776
#===============================================================================
#HEADER
#===============================================================================
__author__ = "Pierre-Edouard Guerin"
__credits__ = ["Pierre-Edouard Guerin", "Virginie Marques"]
__license__ = "MIT"
__version__ = "1.2"
__maintainer__ = "Pierre-Edouard Guerin"
__email__ = "pierre-edouard.guerin@cefe.cnrs.fr"
__status__ = "Production"
"""
Codes for scientific papers related to metabarcoding studies
AUTHORS
=======
* Pierre-Edouard Guerin | pierre-edouard.guerin@cefe.cnrs.fr
* Virginie Marques | virginie.marques@cefe.cnrs.fr
* CNRS/CEFE, CNRS/MARBEC | Montpellier, France
* 2018-2021
DESCRIPTION
===========
This is a Snakefile using SNAKEMAKE workflow management system
From sample description files .dat files, config.yaml, rapidrun.tsv
it will return a demultiplexing.csv file. The output contains all required
wildcards en related information necessary to run the next workflow steps.
"""
###############################################################################
# MODULES
###############################################################################
import pandas
import glob
import os
from Bio.Seq import Seq
###############################################################################
# FUNCTIONS
###############################################################################
def mkdir_results(results_subfolders):
for subfolder in results_subfolders:
if not os.path.exists(os.path.join("results",subfolder)):
os.mkdir(os.path.join("results",subfolder))
## read a sample description .dat file and return a dataframe object
def read_dat(filedat):
dfdat = pandas.read_csv(filedat, sep="\t", header=None)
dfdat.columns=['experiment','plaque','barcode','primer5','primer3','F']
return dfdat
## join all the element of columns named "*cols" of the dataframe "df" into
## a string separated by "sep". It returns a list of joined columns elements
## as a string.
def str_join(df, sep, *cols):
from functools import reduce
return reduce(lambda x, y: x.astype(str).str.cat(y.astype(str), sep=sep), [df[col] for col in cols])
###############################################################################
# GLOBAL VARIABLES
###############################################################################
results_subfolders = ['00_flags',
'01_settings',
'02_illuminapairedend',
'02b_scaterred',
'03_remove_unaligned',
'04_demultiplex_dat',
'05_demultiplex_flags',
'06_assign_marker_sample_to_sequence',
'07_split_fastq_by_sample',
'08_samples',
'09_dereplicate_samples',
'10_goodlength_samples',
'11_clean_pcrerr_samples',
'12_rm_internal_samples',
'13_cat_samples_into_runs',
'14_dereplicate_runs',
'15_taxonomic_assignment',
'16_remove_annotations',
'17_sort_abundance_assigned_sequences',
'18_table_assigned_sequences'
]
mkdir_results(results_subfolders)
## check format (CLASSIC or RAPIDRUN)
if config['format'] == "CLASSIC":
print("CLASSIC data: one single marker for each run")
dfrClassic = pandas.DataFrame(columns=['plaque','run','sample','projet','marker'])
for run in config['fichiers']['dat']:
thisRunDatfile=config['fichiers']['dat'][run]
thisDat=read_dat(thisRunDatfile)
for index, datRow in thisDat.iterrows():
thisRow = {
"plaque": datRow['plaque'],
"run": run,
"sample": datRow['plaque'],
"projet": datRow['experiment'],
"marker": run
}
dfrClassic = dfrClassic.append(thisRow, ignore_index=True)
print(dfrClassic)
export_allsample = dfrClassic.to_csv (r'results/01_settings/all_samples_classic.csv', index = None, header = False, sep = ';')
rapidrunfile="results/01_settings/all_samples_classic.csv"
else:
print("RAPIDRUN data: many markers for many runs")
#configfile: "01_infos/config.yaml"
rapidrunfile = config['fichiers']['rapidrun']
#rapidrunfile="01_infos/all_samples.tsv"
## read 'rapidrun' .tsv file
dfr =pandas.read_csv(rapidrunfile, sep=";")
dfr.columns = ['plaque', 'run','sample','projet','marker']
## remove blacklisted projets
blacklistedProjets = config['blacklist']['projet']
dfrp= dfr[dfr.run.isin(blacklistedProjets) == False]
## remove blacklisted runs
blacklistedRuns = config['blacklist']['run']
dfrm= dfrp[dfrp.run.isin(blacklistedRuns) == False]
## get list of `run`, `projet` and `marker` wildcards
uniqRuns=dfrm.run.unique()
uniqProjets=dfrm.projet.unique()
uniqMarkers=dfrm.marker.unique()
###############################################################################
# MAIN
###############################################################################
## list of dataframe of `marker` sample description .dat file
all_dat= {}
for marker in uniqMarkers:
all_dat[marker]=read_dat(config['fichiers']['dat'][marker])
## init dataframe with all columns we want to output
dfMulti = pandas.DataFrame(columns=["demultiplex","projet", "marker","run", "plaque","sample","barcode5","barcode3","primer5","primer3","min_f","min_r","lenBarcode5","lenBarcode3"])
## fill the dataframe, each row belong to a `projet`/`marker`/`run`/`sample` wildcards combination
for run in uniqRuns:
for marker in uniqMarkers:
runMarkerDfrm=dfrm[(dfrm.run == run) & (dfrm.marker == marker)]
for plaque in runMarkerDfrm.plaque:
selectedRow=runMarkerDfrm[(runMarkerDfrm.plaque == plaque)]
projet=selectedRow['projet'].values[0]
sample=selectedRow['sample'].values[0]
plaque_dat=all_dat[marker][(all_dat[marker].plaque == plaque)]
barcode5=plaque_dat["barcode"].values[0]
barcode3=str(Seq(barcode5).reverse_complement())
primer5=plaque_dat["primer5"].values[0]
#primer3=str(Seq(plaque_dat["primer3"].values[0]).reverse_complement())
## attention ici on utilise bien le .dat de ngsfilter
primer3=plaque_dat["primer3"].values[0]
lenBarcode5=len(barcode5)
lenBarcode3=len(barcode3)
min_f=int(int(len(primer5))*2/3)
min_r=int(int(len(primer3))*2/3)
sa_fastq=projet+"_"+marker+"/"+sample+".fastq"
ru_fastq=run+".fastq"
#print(marker, run, plaque, projet,sample)
#print(ru_fastq)
#print(sa_fastq)
#print(barcode5, barcode3, primer5, primer3)
#print(min_f, min_r, lenBarcode5,lenBarcode3)
#print("========================")
demultiplex=projet+"/"+marker+"/"+run+"/"+sample
projmarkrun=projet+"/"+marker+"/"+run
thisRow= {
"demultiplex": demultiplex,
"projmarkrun": projmarkrun,
"projet": projet,
"marker": marker,
"run": run,
"plaque": plaque,
"sample": sample,
"barcode5": barcode5,
"barcode3": barcode3,
"primer5": primer5,
"primer3": primer3,
"min_f": min_f,
"min_r": min_r,
"lenBarcode5": lenBarcode5,
"lenBarcode3": lenBarcode3,
}
#print(thisRow)
dfMulti = dfMulti.append( thisRow, ignore_index=True)
## write the demultiplexing .csv file
export_csv = dfMulti.to_csv (r'results/01_settings/all_demultiplex.csv', index = None, header=True)
## print an overview of the dataframe we wrote
print (dfMulti)
if config['format'] != "CLASSIC":
rapidrunfile = config['fichiers']['rapidrun']
else:
rapidrunfile="results/01_settings/all_samples_classic.csv"
if os.path.isfile(rapidrunfile) is not True:
raise Exception("ERROR: "+rapidrunfile+" is not a file. You must run step 01_settings first in order to generate this file for the CLASSIC format.")
## read the rapidrun file as dataframe
dfr =pandas.read_csv(rapidrunfile, sep=";")
dfr.columns = ['plaque', 'run','sample','projet','marker']
## remove blacklisted projets
blacklistedProjets = config['blacklist']['projet']
dfrp= dfr[dfr.run.isin(blacklistedProjets) == False]
## remove blacklisted runs
blacklistedRuns = config['blacklist']['run']
dfrm= dfrp[dfrp.run.isin(blacklistedRuns) == False]
## names of `run` wildcards
uniqRuns=dfrm.run.unique()
## set number of chunks
listChunks = [*range(1,(config['illuminapairedend']['nb_chunk']+1))]
## keep only marker and run information for demultiplexing
dfRunMarker=dfrm[['marker','run']]
dfRunMarker['runMarker']=str_join(dfrm, '/', 'marker', 'run')
dfRunMarker['projMarker']=str_join(dfrm, '/', 'projet', 'marker')
dfRunMarker=dfRunMarker.drop_duplicates()
listRunMarker = list(dfRunMarker.runMarker)
dfRunMarker['projMarkRun'] = dfRunMarker['projMarker']+"/"+dfRunMarker['run']
## write .dat files for ngsfilter for each projet/marker
for projmarkerrun in dfRunMarker['projMarkRun'].unique():
thisProj = projmarkerrun.split("/")[0]
thisMarker = projmarkerrun.split("/")[1]
thisRun = projmarkerrun.split("/")[2]
fileName = "results/04_demultiplex_dat/" + str(thisProj) + "_" + str(thisMarker) + "_" + str(thisRun) + ".tsv"
dfprojmarkerrunMulti = dfMulti[(dfMulti.projet == thisProj) & (dfMulti.marker == thisMarker) & (dfMulti.run == thisRun)]
if dfprojmarkerrunMulti.empty == False:
print("Writing sample description files for project and marker", thisProj, thisMarker, ":", fileName)
thisExp = list(dfprojmarkerrunMulti['projet'])
thisSample = list(dfprojmarkerrunMulti['sample'])
thisTags = list(dfprojmarkerrunMulti['barcode5'])
ThisForward_primer = list(dfprojmarkerrunMulti['primer5'])
ThisReverse_primer = list(dfprojmarkerrunMulti['primer3'])
This_extra_info = list(pandas.Series(["F"]).repeat(len(thisExp)))
thisDat = { '#exp': thisExp, 'sample': thisSample, 'tags': thisTags, 'forward_primer': ThisForward_primer, 'reverse_primer': ThisReverse_primer, 'extra_information': This_extra_info }
dfDat = pandas.DataFrame(thisDat, columns = ['#exp', 'sample', 'tags', 'forward_primer', 'reverse_primer', 'extra_information'])
dfDat = dfDat.drop_duplicates()
#print(dfDat)
## check if files already exists
files_present = glob.glob(fileName)
if files_present:
os.remove(fileName)
dfDat.to_csv(fileName, index=False, sep="\t")
else:
print("Writing sample description files for project and marker", thisProj, thisMarker, ":", "WARNING no information about it, check the file", rapidrunfile, "or blacklist this project")
dfRunMarker['dat'] = ["results/04_demultiplex_dat/"+ele.replace('/','_')+".tsv" for ele in dfRunMarker['projMarkRun']]
projetMarkerRuns = dfMulti[['projmarkrun','marker','projet']].drop_duplicates()
dfpmr = projetMarkerRuns
## attribute sample description files to each row with corresponding `marker`
if config['format'] == "CLASSIC":
thisMarker=str(list(config["assign_taxon"]["bdr"])[0])
markerDic = {}
for dmarker in dfpmr['marker']:
markerDic[dmarker] = thisMarker
dfpmr['bdr'] = dfpmr['marker'].map(markerDic).map(config["assign_taxon"]["bdr"])
dfpmr['fasta'] = dfpmr['marker'].map(markerDic).map(config["assign_taxon"]["fasta"])
else:
dfpmr['bdr'] = dfpmr['marker'].map(config["assign_taxon"]["bdr"])
dfpmr['fasta'] = dfpmr['marker'].map(config["assign_taxon"]["fasta"])
## display selected `projet`/`marker`/`run` with related information
print(dfpmr)
###############################################################################
# WORKFLOW
###############################################################################
rule all:
input:
'results/00_flags/table_assigned_sequences.flag'
if config['illuminapairedend']['nb_chunk'] != 0:
include: "rules/split_fastq.smk"
include: "rules/chunk_illuminapairedend.smk"
else:
include: "rules/illuminapairedend.smk"
include: "rules/remove_unaligned.smk"
rule flag_assembly:
input:
expand('results/03_remove_unaligned/{run}.ali.fastq', run=uniqRuns)
output:
touch('results/00_flags/assembly.flag')
rule flag_demultiplex_init:
input:
'results/00_flags/assembly.flag'
output:
expand('results/05_demultiplex_flags/{demultiplex}.flag', demultiplex=dfRunMarker.projMarkRun)
run:
for dem in dfRunMarker.projMarkRun:
fileFlag = "results/05_demultiplex_flags/"+dem+".flag"
open(fileFlag, 'a').close()
include: "rules/assign_marker_sample_to_sequence.smk"
include: "rules/split_fastq_by_sample.smk"
rule flag_demultiplex_done:
input:
expand('results/07_split_fastq_by_sample/flags/{demultiplex}.done', demultiplex=dfRunMarker.projMarkRun)
output:
touch('results/00_flags/demultiplex.flag')
rule check_samples:
input:
'results/00_flags/demultiplex.flag'
output:
expand('results/08_samples/{sample}.fasta', sample=dfMulti['demultiplex'])
run:
for sample in dfMulti['demultiplex']:
file_sample = "results/07_split_fastq_by_sample/"+sample+".fasta"
if not os.path.exists(file_sample):
print("WARNING: results/07_split_fastq_by_sample/"+sample+".fasta not found. We removed it from this analysis.")
open("results/08_samples/"+str(sample)+".fasta",'a').close()
else:
os.replace("results/07_split_fastq_by_sample/"+str(sample)+".fasta", "results/08_samples/"+str(sample)+".fasta")
include: "rules/dereplicate_samples.smk"
include: "rules/goodlength_samples.smk"
include: "rules/clean_pcrerr_samples.smk"
include: "rules/rm_internal_samples.smk"
rule flag_filtering_done:
input:
expand('results/12_rm_internal_samples/{sample}.c.r.l.u.fasta',sample=dfMulti['demultiplex'])
output:
touch('results/00_flags/filtering.flag')
rule cat_samples_into_runs:
input:
'results/00_flags/filtering.flag'
output:
expand('results/13_cat_samples_into_runs/{projmarkrun}.fasta',projmarkrun=dfpmr['projmarkrun'])
shell:
'''
bash scripts/cat_samples_into_runs.sh
'''
include: "rules/dereplicate_runs.smk"
include: "rules/taxonomic_assignment.smk"
include: "rules/remove_annotations.smk"
include: "rules/sort_abundance_assigned_sequences.smk"
include: "rules/table_assigned_sequences.smk"
## copy and rename final results
rule flag_table_assigned_sequences:
input:
expand('results/18_table_assigned_sequences/{projmarkrun}.csv', projmarkrun=dfpmr['projmarkrun'])
output:
touch('results/00_flags/table_assigned_sequences.flag')
run:
print("Final results...", end='')
for pmr in dfpmr['projmarkrun']:
thisProjet = pmr.split('/')[0]
thisMarker = pmr.split('/')[1]
thisRun = pmr.split('/')[2]
tableNameFile = "results/18_table_assigned_sequences/"+pmr+".csv"
newTableNameFile = "results/"+thisProjet+"_"+thisMarker+"_"+thisRun+"_ecotag_ncbi.csv"
print(thisProjet+"_"+thisMarker+"_"+thisRun+", ", end='')
os.system("cp {0} {1}".format(tableNameFile, newTableNameFile))
print("done")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment