Commit 2abd0932 authored by mmassaviol's avatar mmassaviol
Browse files

Add workflow Virus_Assembler_Megahit

parent 9bb5f6ee
......@@ -8,5 +8,6 @@ Ce dépot contient les fichiers générés à partir du dépot WAW ([Workflow Ap
- RADseq_ref (stacks)
- RADseq_denovo (stacks)
- RNAseq (pseudoalign (kallisto, Salmon) + expression différentielle (Edger, DESeq2))
- Virus_Assembler_Megahit (Megahit + Blast + Mapping)
Pour pouvoir construire un conteneur à partir de ce dépot et l'utiliser voir la doc suivante : [Construction et exécution](Documentation/How_to_build_and_run.md)
import os
import re
import snakemake.utils
import csv
#############
# Wildcards #
#############
SAMPLES = config["samples"]
STEPS = config["steps"]
PREPARE_REPORT_OUTPUTS = config["prepare_report_outputs"]
PREPARE_REPORT_SCRIPTS = config["prepare_report_scripts"]
OUTPUTS = config["outputs"]
config = config["params"]
##########
# Inputs #
##########
# Generic input functions
## get raw_reads
def raw_reads():
inputs = dict()
if (config["SeOrPe"] == "PE"):
inputs["read"] = config['sample_dir']+'/{sample}_R1'+config["sample_suffix"]
inputs["read2"] = config['sample_dir']+'/{sample}_R2'+config["sample_suffix"]
elif (config["SeOrPe"] == "SE"):
inputs["read"] = config['sample_dir']+'/{sample}'+config["sample_suffix"]
else:
sys.exit("SeOrPe should be SE or PE")
return inputs
## get reads (trimmed or raw)
def reads():
return raw_reads()
# Tools inputs functions
def fastqc_inputs():
inputs = dict()
inputs["read"] = expand(raw_reads()["read"],sample=SAMPLES)
if (config["SeOrPe"] == "PE"):
inputs["read2"] = expand(raw_reads()["read2"],sample=SAMPLES)
return inputs
def megahit_inputs():
inputs = dict()
inputs["read"] = expand(raw_reads()["read"],sample=SAMPLES)
if (config["SeOrPe"] == "PE"):
inputs["read2"] = expand(raw_reads()["read2"],sample=SAMPLES)
return inputs
def blast_refseq_inputs():
inputs = dict()
inputs["query"] = config["results_dir"] + "/" + config["megahit_"+config["SeOrPe"]+"_output_dir"] + "/assembly.contigs.fa"
inputs["db"] = "/blast_db_virus/refseq_virus.nhr"
return inputs
def map_box_index_inputs():
inputs = dict()
inputs["fasta_refseq"] = rules.blast_refseq.output.top_contigs
return inputs
def map_box_align_inputs():
inputs = dict()
inputs["read"] = raw_reads()["read"]
if config["SeOrPe"] == "PE":
inputs["read2"] = raw_reads()["read2"]
return inputs
def prepare_report_inputs():
inputs = list()
for step in STEPS:
inputs.extend(step_outputs(step["name"]))
if (step["name"] == config["final_step"]):
break
return inputs
def prepare_report_scripts():
scripts = list()
for step in STEPS:
tool = config[step["name"]]
script = tool+".prepare.report.R"
if (script in PREPARE_REPORT_SCRIPTS):
scripts.append("/workflow/scripts/"+script)
if (tool == config["final_step"]):
break
return scripts
def prepare_report_outputs():
outputs = list()
outputs.append(config["results_dir"] + "/outputs_mqc.csv")
for step in STEPS:
tool = config[step["name"]]
if (tool in PREPARE_REPORT_OUTPUTS.keys()):
for output in PREPARE_REPORT_OUTPUTS[tool]:
outputs.append(config["results_dir"]+"/"+tool+"/"+output)
if (tool == config["final_step"]):
break
return outputs
def multiqc_inputs():
# Need prepare_report inputs and outputs in case prepare_reports has no outputs
return prepare_report_outputs()
###########
# Outputs #
###########
def step_outputs(step):
outputs = list()
if (step == "quality_check"):
if (config[step] == "fastqc"):
if(config["SeOrPe"] == "PE"):
outputs = expand(rules.fastqc_PE.output,sample=SAMPLES)
else:
outputs = expand(rules.fastqc_SE.output,sample=SAMPLES)
if (step == "assembling"):
if(config["SeOrPe"] == "PE"):
outputs = rules.megahit_PE.output
else:
outputs = rules.megahit_SE.output
if (step == "blasting"):
outputs = rules.blast_refseq.output
if (step == "mapping"):
outputs = expand(rules.map_box_align.output,sample=SAMPLES)
elif (step == "all"):
outputs = list(rules.multiqc.output)
return outputs
# get outputs for each choosen tools
def workflow_outputs(step):
outputs = list()
outputs.extend(step_outputs(step))
return outputs
#########
# Rules #
#########
rule fastqc_SE:
input:
**fastqc_inputs()
output:
html = config["results_dir"]+'/'+config["fastqc_SE_output_dir"]+'/{sample}'+config["sample_suffix"].split(".")[0]+'_fastqc.html',
zip = config["results_dir"]+'/'+config["fastqc_SE_output_dir"]+'/{sample}'+config["sample_suffix"].split(".")[0]+'_fastqc.zip',
log: config["results_dir"]+'/logs/fastqc/{sample}_fastqc_log.txt'
threads: 1
params:
output_dir = config["results_dir"]+'/'+config["fastqc_SE_output_dir"]
shell:
"fastqc {input.read} -t {threads} -o {params.output_dir} |& tee {log}"
rule fastqc_PE:
input:
**fastqc_inputs()
output:
html1 = config["results_dir"]+'/'+config["fastqc_PE_output_dir"]+'/{sample}_R1'+config["sample_suffix"].split(".")[0]+'_fastqc.html',
zip1 = config["results_dir"]+'/'+config["fastqc_PE_output_dir"]+'/{sample}_R1'+config["sample_suffix"].split(".")[0]+'_fastqc.zip',
html2 = config["results_dir"]+'/'+config["fastqc_PE_output_dir"]+'/{sample}_R2'+config["sample_suffix"].split(".")[0]+'_fastqc.html',
zip2 = config["results_dir"]+'/'+config["fastqc_PE_output_dir"]+'/{sample}_R2'+config["sample_suffix"].split(".")[0]+'_fastqc.zip',
log: config["results_dir"]+'/logs/fastqc/{sample}_fastqc_log.txt'
threads: 2
params:
output_dir = config["results_dir"]+'/'+config["fastqc_PE_output_dir"]
shell:
"fastqc {input.read} {input.read2} -t {threads} -o {params.output_dir} |& tee {log}"
ruleorder: fastqc_PE > fastqc_SE
rule megahit_SE:
input:
**megahit_inputs()
output:
contigs = config["results_dir"] + "/" + config["megahit_SE_output_dir"] + "/assembly.contigs.fa"
log:
config["results_dir"]+'/logs/megahit/megahit_log.txt'
threads:
config["megahit_threads"]
params:
output_dir = config["results_dir"]+'/'+config["megahit_SE_output_dir"]
run:
inputs = megahit_inputs()
readline = ""
for r1 in inputs["read"]:
readline += "-r "+r1+" "
shell(
"megahit "+
readline+
"-t {threads} "+
"-o {params.output_dir}.tmp "+
"--out-prefix assembly "+
#"--k-min 39 "+
#"--k-max 79 "+
"; mv {params.output_dir}.tmp/* {params.output_dir}/ && rm -rf {params.output_dir}.tmp/ "+
"|& tee {log}"
)
rule megahit_PE:
input:
**megahit_inputs()
output:
contigs = config["results_dir"] + "/" + config["megahit_PE_output_dir"] + "/assembly.contigs.fa"
log:
config["results_dir"]+'/logs/megahit/megahit_log.txt'
threads:
config["megahit_threads"]
params:
output_dir = config["results_dir"]+'/'+config["megahit_PE_output_dir"]
run:
inputs = megahit_inputs()
readline = ""
for r1,r2 in zip(inputs["read"],inputs["read2"]):
readline += "-1 "+r1+" -2 "+r2+" "
shell(
"megahit "+
readline+
"-t {threads} "+
"-o {params.output_dir}.tmp "+
"--out-prefix assembly "+
#"--k-min 39 "+
#"--k-max 79 "+
"; mv {params.output_dir}.tmp/* {params.output_dir}/ && rm -rf {params.output_dir}.tmp/ "+
"|& tee {log}"
)
rule blast_refseq:
input:
**blast_refseq_inputs(),
output:
blastout = config["results_dir"] + "/" + config["blast_refseq_output_dir"] + "/blastout.tsv",
top_contigs = config["results_dir"] + "/" + config["blast_refseq_output_dir"] + "/top_contigs.fasta",
params:
evalue = config["blast_refseq_evalue"],
max_target_sequences = config["blast_refseq_max_target_sequences"],
min_len = config["blast_refseq_min_len"],
db = lambda w,input: os.path.splitext(input.db)[0]
threads:
config["blast_refseq_threads"]
log:
config["results_dir"]+'/logs/blast_refseq/blast_refseq_log.txt'
run:
head = """
qseqid\tsseqid\tpident\tlength\tmismatch\tgapopen\tqstart\tqend\tsstart\tsend\tevalue\tbitscore\tqlen
"""
with open(output.blastout,"w") as out:
out.write(head)
shell("""
blastn \
-query {input.query} \
-db {params.db} \
-evalue {params.evalue} \
-max_target_seqs {params.max_target_sequences} \
-max_hsps 3 \
-outfmt '6 std qlen' \
-num_threads {threads} | \
awk "{{if (\$4 >= {params.min_len}) print }}"\
>> {output.blastout} \
&& head -n +2 {output.blastout} | cut -f 1 > /tmp/qids.txt \
&& grep --no-group-separator -f /tmp/qids.txt -A 1 {input.query} > {output.top_contigs} \
"""
)
rule map_box_index:
input:
**map_box_index_inputs()
output:
config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index.amb",
config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index.ann",
config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index.bwt",
config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index.pac",
config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index.sa"
log:
config["results_dir"]+"/logs/map_box/index.log"
params:
algorithm="bwtsw",
output_prefix = config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index"
shell:
"bwa index "
"-p {params.output_prefix} "
"-a {params.algorithm} "
"{input.fasta_refseq} |& tee {log}"
rule map_box_align:
input:
**map_box_align_inputs(),
index = rules.map_box_index.output
output:
bam = config["results_dir"]+"/"+config["map_box_output_dir"]+"/{sample}.bam",
stats = config["results_dir"]+"/"+config["map_box_output_dir"]+"/{sample}_bamstats.txt",
log:
config["results_dir"]+"/logs/map_box/{sample}_map_box_log.txt"
threads:
config["map_box_threads"]
params:
indexPrefix = config["results_dir"]+"/"+config["map_box_output_dir"]+"/index/index",
pe = map_box_align_inputs()["read2"] if (config["SeOrPe"] == "PE") else ""
shell:
"bwa mem "
"-t {threads} "
"{params.indexPrefix} "
"{input.read} "
"{params.pe} 2> {log} "
"| samtools view -b 2>> {log} "
"| samtools sort --threads {threads} > {output.bam} 2>> {log} "
"&& samtools stats {output.bam} -@ {threads} > {output.stats} 2>> {log}"
rule prepare_report:
input:
*prepare_report_inputs(),
output:
*prepare_report_outputs(),
config_multiqc = config["results_dir"] + "/config_multiqc.yaml",
params_tab = config["results_dir"] + "/params_tab_mqc.csv"
params:
params_file = config["results_dir"]+"/params.yml",
results_dir = config["results_dir"]
log:
config["results_dir"]+"/logs/prepare_report_log.txt"
run:
# Specific scripts for each tool
for script in prepare_report_scripts():
shell("Rscript "+script+" {params.params_file} |& tee {log}")
# Outputs files for Multiqc report
outfile = config["results_dir"] + "/outputs_mqc.csv"
head = """
# description: 'This is the list of the files generated by each step of the workflow'
# section_name: 'Workflow outputs'
"""
with open(outfile,"w") as out:
out.write(head)
out.write("step\ttool\tfile\tdescription\n")#\tname
for step in STEPS:
tool = config[step["name"]]
i=1
for command in OUTPUTS[tool]:
if ((config["SeOrPe"] == "SE" and not("_PE" in command)) or (config["SeOrPe"] == "PE" and not("_SE" in command))):
outputs = OUTPUTS[tool][command]
for files in outputs:
path = config[command+"_output_dir"] + "/" + files["file"] #config["results_dir"] +"/"+
out.write(str(i)+"-"+step["title"]+"\t"+tool+"\t"+path+"\t"+files["description"]+"\n")#"\t"+files["name"]+
i+=1
# Params list for Multiqc report
params_list = "params_name\tvalue\n"
head = """
# description: 'This is the list of the parameters for each rule'
# section_name: 'Workflow parameters'
"""
for step in STEPS:
tool = config[step["name"]]
for key, value in config.items():
if (tool in key and tool != "null") or (key in ["results_dir","sample_dir","sample_suffix","SeOrPe"]):
params_list += key + "\t'" + str(value) + "'\n"
with open(output.params_tab,"w") as out:
out.write(head)
out.write(params_list)
# Config for Multiqc report
shell("python3 /workflow/generate_multiqc_config.py {params.params_file} {output.config_multiqc}")
rule multiqc:
input:
multiqc_inputs(),
config_multiqc = config["results_dir"] + "/config_multiqc.yaml"
output:
multiqc_dir = directory(config["results_dir"]+"/multiqc_data")
params:
output_dir = config["results_dir"]
log:
config["results_dir"]+'/logs/multiqc/multiqc_log.txt'
shell:
"multiqc --config {input.config_multiqc} -f {params.output_dir} "
"-o {params.output_dir} |& tee {log}"
# Final Snakemake rule waiting for outputs of the final step choosen by user (default all steps)
rule all:
input:
workflow_outputs(config["final_step"])
output:
Snakefile = config["results_dir"]+"/workflow/Snakefile",
get_samples = config["results_dir"]+"/workflow/get_samples.py",
scripts = directory(config["results_dir"]+"/workflow/scripts"),
params = config["results_dir"]+"/workflow/params.yml"
params:
params_file = config["results_dir"]+"/params.yml",
shell:
"cp /workflow/Snakefile {output.Snakefile} && "
"cp /workflow/get_samples.py {output.get_samples} && "
"cp -r /workflow/scripts {output.scripts} && "
"cp {params.params_file} {output.params}"
onsuccess:
print("Workflow finished, no error")
shell("touch "+config["results_dir"]+"/logs/workflow_end.ok")
onerror:
print("An error occurred")
shell("cat {log} > "+config["results_dir"]+"/logs/workflow_end.error")
#shell("mail -s "an error occurred" youremail@provider.com < {log}")
import re
import sys
from tools import *
config = read_yaml(sys.argv[1])
def report_section_order():
res = "skip_generalstats: true\n\n"
res += "report_section_order:\n"
res += " Rule_graph:\n"
res += " order: 990\n"
res += " params_tab:\n"
res += " order: 980\n"
res += " outputs:\n"
res += " order: 970\n"
cpt = 960
for step in config["steps"]:
tool = config["params"][step["name"]]
if (config["multiqc"][tool] != "custom"):
res += " " + config["multiqc"][tool] + ":\n"
res += " " + "order: " + str(cpt) + "\n"
cpt += -10
for rule in config["outputs"][tool]:
if ((config["params"]["SeOrPe"] == "SE" and not("_PE" in rule)) or (config["params"]["SeOrPe"] == "PE" and not("_SE" in rule))):
for output in config["outputs"][tool][rule]:
if("mqc" in output["file"] and '{' not in output["file"]): # case of dynamic files ({wildcard}_mqc.png) to deal with
section = re.sub('\_mqc.*$', '', output["file"])
res += " " + section + ":\n"
res += " " + "order: " + str(cpt) + "\n"
cpt += -10
return res
def main():
res = ""
res += report_section_order()
with open(sys.argv[2],"w") as out:
out.write(res)
if __name__ == "__main__":
# execute only if run as a script
main()
\ No newline at end of file
#!/usr/bin/env python3
# This script will take a directory and a parameter to tell if the reads are paired end or single end and return the sample list and the suffix
# Needs 2 arguments: reads_directory, SeOrPe
# SeOrPe is SE for single end reads and PE for paired end reads
# Usage: ./get_samples.py reads_directory SeOrPe
import os
import re
import csv
import sys
def sample_list(dir, SeOrPe):
samples = list()
suffixes = list()
files = os.listdir(dir)
if SeOrPe == "PE":
regex = re.compile(r"^(.+?)(_R1|_R2)(.+)")
else:
regex = re.compile(r"^(.+?)(\..*)")
for file in files:
res = re.match(regex, file)
if res:
if res.group(1) not in samples:
samples.append(res.group(1))
if SeOrPe == "PE":
suffixes.append(res.group(3))
else:
suffixes.append(res.group(2))
if (len(set(suffixes)) == 1 ):
return {'samples': sorted(samples), 'suffix': list(set(suffixes))[0]}
else:
exit("Files have different suffixes:" + suffixes)
def main():
if len(sys.argv) == 3:
print(sample_list(sys.argv[1],sys.argv[2]))
else:
exit("""Needs 2 arguments: reads_directory, SeOrPe
Usage: ./get_samples.py reads_directory SeOrPe""")
if __name__ == "__main__":
# execute only if run as a script
main()
pipeline: Virus_Assembler_Megahit
params:
results_dir: /Results
sample_dir: /Data
SeOrPe: PE
quality_check: fastqc
fastqc_SE_output_dir: fastqc_SE
fastqc_PE_output_dir: fastqc_PE
fastqc_threads: 4
null_output_dir: ''
assembling: megahit
megahit_SE_output_dir: megahit_SE
megahit_threads: 4
megahit_PE_output_dir: megahit_PE
blasting: blast_refseq
blast_refseq_output_dir: blast_refseq
blast_refseq_threads: 4
blast_refseq_evalue: 1
blast_refseq_max_target_sequences: 1
blast_refseq_min_len: 800
mapping: map_box
map_box_output_dir: map_box
fasta_ref: ''
map_box_threads: 4
samples: []
groups: []
final_step: all
steps:
- title: Quality check
name: quality_check
tools:
- fastqc
- 'null'
default: fastqc
- title: Assembling
name: assembling
tools:
- megahit
default: megahit
- title: Blasting
name: blasting
tools:
- blast_refseq
default: blast_refseq
- title: Mapping
name: mapping
tools:
- map_box
default: map_box
params_info:
results_dir:
type: output_dir
sample_dir:
type: input_dir
SeOrPe:
type: radio
fastqc_threads:
tool: fastqc
rule: fastqc_PE
type: numeric
megahit_threads:
tool: megahit
rule: megahit_PE
type: numeric
blast_refseq_threads:
tool: blast_refseq
rule: blast_refseq
type: numeric
blast_refseq_evalue:
tool: blast_refseq
rule: blast_refseq
type: numeric
blast_refseq_max_target_sequences:
tool: blast_refseq
rule: blast_refseq
type: numeric
blast_refseq_min_len:
tool: blast_refseq
rule: blast_refseq
type: numeric
map_box_threads:
tool: map_box
rule: map_box
type: numeric
prepare_report_scripts: []
prepare_report_outputs: {}
outputs:
fastqc:
fastqc_SE:
- name: html
file: '{sample}_fastqc.html'
description: Rapport html fastqc
- name: zip
file: '{sample}_fastqc.zip'
description: Dossier zip fastqc
fastqc_PE:
- name: html1
file: '{sample}_R1_fastqc.html'
description: Rapport html fastqc
- name: zip1
file: '{sample}_R1_fastqc.zip'
description: Dossier zip fastqc
- name: html2