Commit f639f9a4 authored by peguerin's avatar peguerin
Browse files

generate 2 fasta with valide and faulty records

parent 82cd6cf0
#==============================================================================
# MODULES
#==============================================================================
## Standard library imports
import re
import csv
from os import path
import sys
## Third party imports
from ete3 import Tree, TreeStyle, NodeStyle, TextFace, NCBITaxa
import numpy as np
from Bio.Seq import Seq
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
#==============================================================================
# CLASS
#==============================================================================
class Sample:
def __init__(self, sampleid, species_name, genus_name, family, family_name, scientific_name, taxid, rank):
self.sampleid=sampleid
self.species_name=species_name
self.genus_name=genus_name
self.family=family
self.family_name=family_name
self.scientific_name=scientific_name
self.taxid=taxid
self.rank=rank
def description(self):
return "taxid={6}; species_name={0}; genus_name={1}; family={2}; family_name={3}; scientific_name={4}; rank={5}; valide".format(self.species_name, self.genus_name, self.family, self.family_name, self.scientific_name, self.rank, self.taxid)
class DescriptionRecord:
def __init__(self, sampleid, species_name):
self.sampleid=sampleid
self.species_name = species_name
#.update_taxonomy_database(taxdump_file=localTaxdumpArchive)
## all the ncbi tree taxonomy
#ncbi = NCBITaxa()
## get rank information of nodes of the phylogenetic tree "actinopterygii"
#actinodesc=ncbi.get_descendant_taxa('Actinopterygii',intermediate_nodes=True,rank_limit='species',collapse_subspecies=True)
#actino=ncbi.translate_to_names(actinodesc)
#actinorank=ncbi.get_rank(actinodesc)
#==============================================================================
# FUNCTIONS
#==============================================================================
def check_record_description(record_description):
'''
Check fasta record description format
accepted format is
> sampleid ; species_name=Genus species
ATTATG
return False when the format is faulty
return DescriptionRecord object when the format is ok
'''
splitted = record_description.split(';')
if len(splitted) > 1:
idSample = splitted[0]
description = splitted[1]
if '=' in idSample:
return False
else:
if "species_name=" in description:
speciesName = str(description.split('=')[1])
descriptionRecord = DescriptionRecord(sampleid=idSample, species_name=speciesName)
return descriptionRecord
else:
return False
else:
return False
def check_format_species_name(species_name):
'''
Check species name format
'''
patternSpeciesPerfect = "^[A-Z][a-z]+ [a-z]+$"
if re.match(patternSpeciesPerfect, species_name):
return species_name
elif re.match("^[A-Z][a-z]+\_[a-z]+$", species_name):
return species_name.replace('_', ' ')
else:
return False
def check_dna_sequence(dna_sequence, code="ATGC"):
'''
Check DNA sequence format (iuapc ambiguity, gaps)
'''
for base in dna_sequence:
if base not in code:
return False
return True
def check_taxonomy_species_name(species_name, reference):
matching = { k:v for k,v in reference.items() if species_name in v }
if len(matching) == 1:
rank = list(ncbi.get_rank(list(matching.keys())).values())[0]
if rank == "species":
return matching
else:
return "the rank is not species but {0}".format(rank)
elif len(matching) > 1:
return "more than 1 matching species: {0}".format(eval(str(matching)))
else:
return "species name not found in ncbi"
def full_taxonomy_sample(sampleid, taxonid):
lineage=ncbi.get_lineage(taxonid)
if len(lineage) > 1:
ranks=ncbi.get_rank(lineage)
if len(ranks) > 1:
idGenus = False
idFamily = False
for k, v in ranks.items():
if v == "genus":
idGenus = k
if v == "family":
idFamily = k
if idGenus and idFamily:
break
if idGenus is False or idFamily is False:
return False
else:
genusName = str(ncbi.translate_to_names([idGenus])[0])
familyName = str(ncbi.translate_to_names([idFamily])[0])
speciesName = str(ncbi.translate_to_names([taxonid])[0])
sample = Sample(sampleid=sampleid,
species_name=speciesName,
genus_name=genusName,
family=str(idFamily),
family_name = familyName,
scientific_name = speciesName,
taxid = taxonid,
rank = "species")
return sample
else:
return False
else:
return False
#==============================================================================
# MAIN
#==============================================================================
##args
localTaxdumpArchive = "TAXO/taxdump_2021.tar.gz"
rawFastaFile="resources/test/raw.fasta"
ncbi = NCBITaxa()
#ncbi = NCBITaxa(taxdump_file=path.abspath(localTaxdumpArchive))
## vertebrate
vertdesc=ncbi.get_descendant_taxa(7742,intermediate_nodes=True,rank_limit='species',collapse_subspecies=False)
#vertleaf = vertdesc.get_leaves(is_leaf_fn=None)
vert=ncbi.translate_to_names(vertdesc)
zip_iterator = zip(vertdesc, vert)
vertDic = dict(zip_iterator)
valideRecords = []
faultyRecords = []
for record in SeqIO.parse(rawFastaFile, "fasta"):
#print(record.id)
#print(record.description)
#print(record.seq)
checkedRecord = check_record_description(record.description)
if checkedRecord is not False:
checkedFormat = check_format_species_name(checkedRecord.species_name)
if checkedFormat is not False:
if check_dna_sequence(record.seq):
checkedTaxon = check_taxonomy_species_name(checkedFormat, vertDic)
if isinstance(checkedTaxon, dict):
#print(checkedTaxon)
sample = full_taxonomy_sample(checkedRecord.sampleid , list(checkedTaxon.keys())[0])
if sample is not False:
thisValideRecord = SeqRecord(id = sample.sampleid, description = sample.description(), seq = record.seq)
valideRecords.append(thisValideRecord)
else:
## faulty rank taxonomy
faultydescription = str(record.description)+' | faulty rank taxonomy: family or genera not found'
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
faultyRecords.append(thisFaultyRecord)
else:
## faulty taxonomy
faultydescription = str(record.description)+' | faulty taxonomy '+str(checkedTaxon)
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
faultyRecords.append(thisFaultyRecord)
else:
faultydescription = str(record.description)+' | faulty DNA sequence'
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
faultyRecords.append(thisFaultyRecord)
else:
## faulty species name format
faultydescription = str(record.description)+' | faulty species name format ' + str(checkedRecord.species_name)
thisFaultyRecord = SeqRecord(id = record.id, description = faultydescription, seq=record.seq)
faultyRecords.append(thisFaultyRecord)
else:
## faulty record description format
print("ERROR FORMAT FASTA {0}: {1} {2}.\n Excepted format is\n\n>sampleid ; species_names=Genus Species\nACTAG\n".format(rawFastaFile,record.id, record.description))
## write valide records fasta
SeqIO.write(valideRecords, "valide.fasta", "fasta")
## write faulty records fasta
SeqIO.write(faultyRecords, "faulty.fasta", "fasta")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment