Commit ec52d848 authored by peguerin's avatar peguerin
Browse files

mkbdr as a module

parent 985d7488
#!/usr/bin/env python3
#===============================================================================
#INFORMATION
#===============================================================================
# Codes for mkbdr
#
# CEFE - EPHE - RESERVEBENEFIT 2021
# Guerin Pierre-Edouard
#
# mkbdr is a toolkit to build a custom metabarcoding reference database
# mkbdr is a python3 program.
#
# git repository :
#
#==============================================================================
#MODULES
#==============================================================================
## Standard library imports
import re
import sys
import argparse
import csv
from os import path
import sys
## Third party imports
from ete3 import Tree, TreeStyle, NodeStyle, TextFace, NCBITaxa
import numpy as np
from Bio.Seq import Seq
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
import pytaxize
from pytaxize import gn
## Local applications import
from arguments import parse_args
from load import load_taxdump, taxdump2dic
from validate import validate_fasta, report_validate_results
#==============================================================================
#MAIN
#==============================================================================
def main():
args = parse_args()
## load taxonomy ncbi
ncbi = load_taxdump(args.ncbi_taxdump)
taxDic = taxdump2dic(ncbi, rootTaxon=7742)
results = validate_fasta(args.fasta, taxDic, ncbi)
## write valide format records fasta
SeqIO.write(results['valide'], str(args.output_prefix)+'_valide.fasta', 'fasta')
## write faulty format records fasta
SeqIO.write(results['faultyFormat'], str(args.output_prefix)+'_faulty_format.fasta', 'fasta')
## write faulty format records fasta
SeqIO.write(results['faultyTaxon'], str(args.output_prefix)+'_faulty_taxon.fasta', 'fasta')
print(report_validate_results(results))
if __name__ == '__main__':
main()
\ No newline at end of file
import argparse
import sys
HELPER_TEXT ="""
.___ ___. __ ___ .______ _______ .______
| \/ | | |/ / | _ \ | \ | _ \
| \ / | | ' / | |_) | | .--. || |_) |
| |\/| | | < | _ < | | | || /
| | | | | . \ | |_) | | '--' || |\ \----.
|__| |__| |__|\__\ |______/ |_______/ | _| `._____|
_______________________________________________________________________________
Pierre-Edouard GUERIN, Laetitia MATHON, Virginie MARQUES, Stephanie MANEL
CNRS, EPHE, Sorbonne University, Montpellier, France
version 0.1 "" March 2021
Usage:
> python3 mkbdr [options]
For help:
> python3 mkbdr --help
"""
def parse_args(usage=HELPER_TEXT):
parser = argparse.ArgumentParser(description='mkbdr - to build a custom metabarcoding reference database.')
subprasers = parser.add_subparsers(dest='command')
validate = subprasers.add_parser('validate', help='check format and taxonomy')
validate.add_argument("-f","--fasta", type=str, help='path of the barcodes sequences FASTA file', required=True)
validate.add_argument("-c","--curate", type=str, help='path of the curated taxonomy CSV file', required=False, default="NA")
validate.add_argument("-n","--ncbi_taxdump", type=str, help='path of NCBI taxdump.tar.gz file', required=False, default="NA")
validate.add_argument("-o","--output_prefix", type=str, help='prefix of the output FASTA such as [PREFIX].fasta')
curate = subprasers.add_parser('curate', help='try to correct wrong taxonomy')
curate.add_argument("-f","--fasta", type=str, help='path of the barcodes sequences FASTA file', required=True)
curate.add_argument("-o","--output_prefix", type=str, help='prefix of the output curated taxonomy CSV such as [PREFIX].csv')
args = parser.parse_args()
if args.command not in ['validate', 'curate']:
print(usage)
sys.exit(0)
return args
from ete3 import Tree, TreeStyle, NodeStyle, TextFace, NCBITaxa
def load_taxdump(localTaxdumpArchive):
"""
load taxonomy ncbi
"""
if localTaxdumpArchive == "NA":
ncbi = NCBITaxa()
else:
ncbi = NCBITaxa(taxdump_file=path.abspath(localTaxdumpArchive))
return ncbi
def taxdump2dic(ncbiTaxaElem, rootTaxon=7742):
"""
convert ncbiTaxa object into dictionnary with taxids and species names as keys and values
"""
descTaxId=ncbiTaxaElem.get_descendant_taxa(rootTaxon, intermediate_nodes=True, rank_limit='species', collapse_subspecies=False)
descNames=ncbiTaxaElem.translate_to_names(descTaxId)
zip_iterator = zip(descTaxId, descNames)
taxDic = dict(zip_iterator)
return taxDic
\ No newline at end of file
class Sample:
def __init__(self, sampleid, species_name, genus_name, family, family_name, scientific_name, taxid, rank):
self.sampleid=sampleid
self.species_name=species_name
self.genus_name=genus_name
self.family=family
self.family_name=family_name
self.scientific_name=scientific_name
self.taxid=taxid
self.rank=rank
def description(self):
return "taxid={6}; species_name={0}; genus_name={1}; family={2}; family_name={3}; scientific_name={4}; rank={5}; valide".format(self.species_name, self.genus_name, self.family, self.family_name, self.scientific_name, self.rank, self.taxid)
class DescriptionRecord:
def __init__(self, sampleid, species_name):
self.sampleid=sampleid
self.species_name = species_name
## base
import re
import sys
import argparse
import csv
from os import path
import sys
## external
from ete3 import Tree, TreeStyle, NodeStyle, TextFace, NCBITaxa
import numpy as np
from Bio.Seq import Seq
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
## local
from objects import Sample, DescriptionRecord
def check_record_description(record_description):
'''
Check fasta record description format
accepted format is
> sampleid ; species_name=Genus species
ATTATG
return False when the format is faulty
return DescriptionRecord object when the format is ok
'''
splitted = record_description.split(';')
if len(splitted) > 1:
idSample = splitted[0]
description = splitted[1]
if '=' in idSample:
## L'ID de la sequence ne doit pas comporter de caractere '=' car c'est attendu dans la description
return False
else:
if "species_name=" in description:
speciesName = str(description.split('=')[1])
descriptionRecord = DescriptionRecord(sampleid=idSample, species_name=speciesName)
return descriptionRecord
else:
## no species_name element in description
return False
else:
## the header doesn't have at least 2 colons separeted by ;
return False
def check_format_species_name(species_name):
'''
Check species name format
Excepted format "Genus species" or "Genus_species"
'''
patternSpeciesPerfect = "^[A-Z][a-z]+ [a-z]+$"
if re.match(patternSpeciesPerfect, species_name):
return species_name
elif re.match("^[A-Z][a-z]+\_[a-z]+$", species_name):
return species_name.replace('_', ' ')
else:
return False
def check_dna_sequence(dna_sequence, code="ATGC"):
'''
Check DNA sequence format (iuapc ambiguity, gaps)
'''
for base in dna_sequence:
if base not in code:
return False
return True
def check_taxonomy_species_name(species_name, reference, ncbi):
matching = { k:v for k,v in reference.items() if species_name == v }
if len(matching) == 1:
rank = list(ncbi.get_rank(list(matching.keys())).values())[0]
if rank == "species":
return matching
else:
## the rank in not "species"
return False
elif len(matching) > 1:
## they are many matches so the species name is ambiguous
return False
else:
## this species is unknown in ncbi
return False
def full_taxonomy_sample(sampleid, taxonid, ncbi):
lineage=ncbi.get_lineage(taxonid)
if len(lineage) > 1:
ranks=ncbi.get_rank(lineage)
if len(ranks) > 1:
idGenus = False
idFamily = False
for k, v in ranks.items():
if v == "genus":
idGenus = k
if v == "family":
idFamily = k
if idGenus and idFamily:
break
if idGenus is False or idFamily is False:
## Genus or Family is unknown in ncbi
return False
else:
genusName = str(ncbi.translate_to_names([idGenus])[0])
familyName = str(ncbi.translate_to_names([idFamily])[0])
speciesName = str(ncbi.translate_to_names([taxonid])[0])
sample = Sample(sampleid=sampleid,
species_name=speciesName,
genus_name=genusName,
family=str(idFamily),
family_name = familyName,
scientific_name = speciesName,
taxid = taxonid,
rank = "species")
return sample
else:
## The taxon taxonomy is unknown or empty
return False
else:
## This taxon have no taxonomy in ncbi
return False
def validate_fasta(rawFastaFile, taxDic, ncbi):
results = {}
results['valide'] = []
results['faultyFormat'] = []
results['faultyTaxon'] = []
for record in SeqIO.parse(rawFastaFile, "fasta"):
#print(record.id)
#print(record.description)
#print(record.seq)
checkedRecord = check_record_description(record.description)
if checkedRecord is not False:
checkedFormat = check_format_species_name(checkedRecord.species_name)
if checkedFormat is not False:
if check_dna_sequence(record.seq):
checkedRecord.species_name = checkedFormat
checkedTaxon = check_taxonomy_species_name(checkedRecord.species_name, taxDic, ncbi)
if checkedTaxon is not False:
#print(checkedTaxon)
sample = full_taxonomy_sample(checkedRecord.sampleid , list(checkedTaxon.keys())[0], ncbi)
if sample is not False:
thisValideRecord = SeqRecord(id = sample.sampleid, description = sample.description(), seq = record.seq)
results["valide"].append(thisValideRecord)
else:
## faulty rank taxonomy
faultydescription = str(record.description)+'; faulty rank taxonomy: family or genera not found'
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
results["faultyTaxon"].append(thisFaultyRecord)
else:
## faulty taxonomy
faultydescription = str(record.description)+'; faulty taxonomy: species name '+ str(checkedRecord.species_name)+ ' not found in NCBI'
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
results["faultyTaxon"].append(thisFaultyRecord)
else:
faultydescription = str(record.description)+' ; faulty DNA sequence'
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
results["faultyFormat"].append(thisFaultyRecord)
else:
## faulty species name format
faultydescription = str(record.description)+' ; faulty species name format ' + str(checkedRecord.species_name)
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
results["faultyFormat"].append(thisFaultyRecord)
else:
## faulty record description format
print("ERROR FORMAT FASTA {0}: {1} {2}.\n Excepted format is\n\n>sampleid ; species_names=Genus Species\nACTAG\n".format(rawFastaFile,record.id, record.description))
return results
def report_validate_results(results):
nValide=len(results['valide'])
nFaultyFormat=len(results['faultyFormat'])
nFaultyTaxon=len(results['faultyTaxon'])
nRecords=nValide+nFaultyFormat+nFaultyTaxon
reportstring ="{0} processed records.\nOn these records, {1} are valid, {2} are faulty format and {3} are faulty taxon.".format(nRecords, nValide,nFaultyFormat, nFaultyTaxon)
return reportstring
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment