Commit 662695ae authored by khalid's avatar khalid
Browse files

Add processTagged option to create locus from indexed reads with variable tags size

parent 841442d1
......@@ -28,6 +28,7 @@
#include "map.h"
#include "parameters.h"
#include "process.h"
#include "processTagged.h"
#include "signif.h"
#include "subset.h"
......@@ -97,6 +98,15 @@ inline Parameters parse_args(int& argc, char** argv) {
process->add_option("-T,--threads", parameters.n_threads, "Number of threads to use", true)->check(CLI::Range(1, 9999));
process->add_option("-d,--min-depth", parameters.min_depth, "Minimum depth in at least one individual to retain a marker", true)->check(CLI::Range(1, 9999));
// ProcessTagged subcommand parser
CLI::App* processTagged = parser.add_subcommand("processTagged", "Compute a table of marker depths from a set of demultiplexed reads files containing tags");
processTagged->add_option("-i,--input-dir", parameters.input_dir_path, "Path to a directory contains demultiplexed sequence files")->required()->check(CLI::ExistingDirectory);
processTagged->add_option("-o,--output-file", parameters.output_file_path, "Path to the output file (table of marker depths in each individual)")->required();
processTagged->add_option("-p,--popmap", parameters.popmap_file_path, "Path to a tabulated file specifying groups for all individuals (population map)")->required()->check(CLI::ExistingFile);
processTagged->add_option("-T,--threads", parameters.n_threads, "Number of threads to use", true)->check(CLI::Range(1, 9999));
processTagged->add_option("-d,--min-depth", parameters.min_depth, "Minimum depth in at least one individual to retain a marker", true)->check(CLI::Range(1, 9999));
// Signif subcommand parser
CLI::App* signif = parser.add_subcommand("signif", "Extract markers significantly associated with phenotypic group from a marker depths table");
signif->add_option("-t,--markers-table", parameters.markers_table_path, "Path to a marker depths table generated by \"process\"")->required()->check(CLI::ExistingFile);
......@@ -228,6 +238,10 @@ int main(int argc, char* argv[]) {
process(parameters);
} else if (parameters.command == "processTagged") {
processTagged(parameters);
}
return 0;
......
......@@ -31,6 +31,7 @@ Popmap::Popmap(Parameters& parameters, bool check_groups) {
std::string line;
uint line_number = 1;
uint16_t taglength = 0;
std::vector<std::string> fields;
// Parse popmap file
......@@ -46,8 +47,18 @@ Popmap::Popmap(Parameters& parameters, bool check_groups) {
++this->n_individuals; // Increase total individuals count
} else {
if (fields.size() == 3) {
this->individual_groups[fields[0]] = fields[1]; // Store group for this individual
++this->group_counts[fields[1]]; // Increase count for this group
this->individual_tags[fields[0]] = fields[2]; // Store tag for this individual
taglength = fields[2].length();
this->tagmax_size = std::max(this->tagmax_size, taglength);
++this->n_individuals; // Increase total individuals count
}
else {
log("Could not process line " + std::to_string(line_number) + " in popmap file <" + this->file + "> (expected format: \"individual\tgroup\")", LOG_WARNING);
}
}
......@@ -111,6 +122,23 @@ std::string Popmap::get_group(const std::string& individual) const {
}
std::string Popmap::get_tag(const std::string& individual) const {
std::string tag = "";
if (this->individual_tags.find(individual) != this->individual_tags.end()) {
tag = this->individual_tags.at(individual);
} else {
log("Trying to get tag for individual <" + individual + "> which was not found in popmap <" + this->file + ">", LOG_WARNING);
}
return tag;
}
......
......@@ -38,6 +38,7 @@ class Popmap {
public:
uint16_t n_individuals = 0; ///< Total number of individuals in the popmap
uint16_t tagmax_size = 0; ///< max length of tags if present for individuals in the popmap
/*!
* \brief Default Popmap constructor
......@@ -49,7 +50,7 @@ class Popmap {
/*!
* \brief Standard Popmap constructor
*
* Read a popmap file and store the group for each individual as well as individual counts for each group.
* Read a popmap file and store the group and eventually a tag for each individual as well as individual counts for each group.
*
* \param parameters Parameters instance storing the value of all RADSex parameters
* \param check_groups If true, performs checks on the groups obtained from the popmap
......@@ -69,6 +70,18 @@ class Popmap {
std::string get_group(const std::string& individual) const;
/*!
* \brief Get the tag of individual if present in popmap
*
* \param individual Name of the individual
*
* \return A string containing the tag for the individual or "" if the individual or the tag was not in the popmap
*/
std::string get_tag(const std::string& individual) const;
/*!
* \brief Get individual count for a group
*
......@@ -77,6 +90,7 @@ class Popmap {
* \return The number of individuals for this group
*/
uint get_count(const std::string& group) const;
......@@ -103,6 +117,9 @@ class Popmap {
std::string file; ///< Path to the popmap file
std::unordered_map<std::string, std::string> individual_groups; ///< Map storing group for each individual: {individual -> group}
//halid add a sample tag
std::unordered_map<std::string, std::string> individual_tags; ///< Map storing tag for each individual: {individual -> tag}
std::unordered_map<std::string, uint> group_counts; ///< Map storing the number of individuals in each group: {group -> number of individuals}
};
/*
* Copyright (C) 2020 Romain Feron
* This file is part of RADSex.
* RADSex is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* RADSex is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with RADSex. If not, see <https://www.gnu.org/licenses/>.
*/
/*!
* @file process.cpp
* @brief Implements functions for the "process" command.
*/
#include "processTagged.h"
#include "popmap.h"
KSEQ_INIT(gzFile, gzread); ///< Macro to initalize reading sequences from file with Kseq
std::vector<taggedInputFile> get_input_files(const std::string& input_dir_path, Popmap& popmap) {
/* Gets all relevant files (from supported formats) from the input directory.
* Files are stored as InputFile object which contains all relevant information about the file.
* The function returns a vector of InputFiles.
*/
DIR* dir = opendir(input_dir_path.c_str());
if(!dir) {
log("Could not read from the directory <" + input_dir_path + ">", LOG_ERROR);
exit(0);
}
struct dirent* dir_content;
std::vector<taggedInputFile> files;
std::string current_file, extension, individual_name, tag;
std::vector<std::string> split_name;
taggedInputFile temp;
std::cout << "Individual \t tag "<< "\n";
while ((dir_content = readdir(dir))) {
current_file = dir_content->d_name;
split_name = split(current_file, ".");
size_t s = split_name.size();
extension = "";
individual_name = split_name[0];
// Get file name and extension (even when there is "." in the file name)
if (s > 1) {
if (split_name[s - 1] == "gz" and s > 2) {
extension = "." + split_name[s - 2] + "." + split_name[s - 1];
for (uint i=1; i<split_name.size() - 2; ++i) individual_name += "." + split_name[i];
} else {
extension = "." + split_name[s - 1];
for (uint i=1; i<split_name.size() - 1; ++i) individual_name += "." + split_name[i];
}
}
if(std::find(fextensions.begin(), fextensions.end(), extension) != fextensions.end()) {
tag = popmap.get_tag(individual_name);
std::cout << individual_name << "\t " << tag << "\n";
temp.individual_name = individual_name;
temp.tag = tag;
temp.maxTagLength = popmap.tagmax_size;
temp.path = input_dir_path + current_file;
temp.extension = extension;
temp.processed = false;
files.push_back(temp);
}
}
if (files.size() == 0) {
log("No valid input file found in input directory \"" + input_dir_path + ">");
std::string valid_formats_message = "Input files are detected based on extensions <";
for (auto& extension: fextensions) valid_formats_message += extension;
if (extension != fextensions.back()) valid_formats_message += ", ";
valid_formats_message += ">";
log(valid_formats_message);
exit(1);
}
log("Found <" + std::to_string(files.size()) + "> reads files");
return files;
}
void processTagged(Parameters& parameters) {
/* The process_reads function does the following:
* - retrieve all sequence files from the input directory
* - create file processors according to the value of n_threads (allows to process files in parallel)
* - once all files are processed, output the results
*/
Popmap popmap; ///< Popmap instance storing group for each individual
std::chrono::steady_clock::time_point t_begin = std::chrono::steady_clock::now();
log("RADSex <processTagged> started");
if (parameters.input_dir_path.back() != '/') parameters.input_dir_path += "/"; // Append "/" to the end of the path if it's missing
(parameters.popmap_file_path != "") ? popmap = Popmap(parameters, false) : popmap = Popmap(); // Load popmap
std::vector<taggedInputFile> input_files = get_input_files(parameters.input_dir_path, popmap);
std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>> results;
std::vector<std::thread> threads;
std::mutex results_mutex, files_mutex;
// Create and run all file processors
for (uint i=0; i<parameters.n_threads; ++i) {
threads.push_back(std::thread(file_processor, std::ref(input_files), std::ref(results), std::ref(results_mutex), std::ref(files_mutex)));
}
for (auto &t : threads) t.join();
// Create list of individuals
std::vector<std::string> individuals;
for (auto i: input_files) individuals.push_back(i.individual_name);
std::ofstream output_file = open_output(parameters.output_file_path);
output_file << "#Number of markers : " << results.size() << "\n";
output_file << "id\tsequence";
for (auto& i: individuals) output_file << "\t" << i;
output_file << "\n";
log("Writing marker depths to output file");
uint id = 0;
bool print = true;
uint marker_processed_tick = static_cast<uint>(results.size() / 100);
uint64_t n_processed_markers = 0;
// Fill line by line
for (auto marker: results) {
if (parameters.min_depth > 1) {
print = false;
for (auto i: marker.second) {
if (i.second >= parameters.min_depth) {
print = true;
break;
}
}
}
if (print) {
output_file << id << "\t" << marker.first;
for (auto i: individuals) output_file << "\t" << marker.second[i];
output_file << "\n";
++id;
}
log_progress_bar(++n_processed_markers, marker_processed_tick);
}
log("RADSex processTagged ended (total runtime: " + get_runtime(t_begin) + ")");
}
inline void file_processor(std::vector<taggedInputFile>& input_files, std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>>& results, std::mutex& results_mutex, std::mutex& files_mutex) {
/* The file processor looks for files to process and process them
* It is meant to be used in parallel to increase throughput
* A mutex protects the list of input files and another mutex protects the results structure
*/
bool remaining_files = true;
while (remaining_files) {
files_mutex.lock();
for (std::vector<taggedInputFile>::iterator it = input_files.begin(); it != input_files.end(); ++it) {
if (not it->processed) {
it->processed = true;
remaining_files = true;
files_mutex.unlock();
process_file(*it, results, results_mutex);
break;
} else {
remaining_files = false;
}
}
files_mutex.unlock();
}
}
inline void process_file(taggedInputFile& input_file, std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>>& results, std::mutex& results_mutex) {
/* This function uses the Kseq generic file stream from Heng Li to read input files.
* The coverage of each sequence is computed without allowing for any mismatches.
* This is basically a read counter.
*/
std::unordered_map<std::string, uint16_t> temp_results; // Temporary result storage to avoid constantly locking/unlocking the mutex
gzFile file;
kseq_t* sequence; // Create the seq object
int line_n; // Stores the length of a line
file = gzopen(input_file.path.c_str(), "r");
std::string tag = input_file.tag;
int tagl = tag.length();
int maxtagl = input_file.maxTagLength;
if (not file) {
log("Could not open reads file <" + input_file.path + ">", LOG_ERROR);
exit(1);
}
sequence = kseq_init(file); // Initialize the seq object
// Read through the file and store the results
while ((line_n = kseq_read(sequence)) >= 0)
{
std::string aString(sequence->seq.s);
std::string str3;
std::size_t pos = aString.find(tag); // position of tag in sequence str
if(pos == 0)
{
if( tagl == maxtagl)
str3 = aString.substr (tagl); // get after tag to the end
else //tagl < maxtagl
{
uint seql = aString.length();
str3 = aString.substr (tagl, seql - ( maxtagl -tagl ));
}
}
else {
str3 = aString;
log("Could not find individual tag <" + tag + ">" + " in the seq : "+ str3, LOG_ERROR);
exit(1);
}
++temp_results[str3.c_str()];
// std::cout << aString << std::endl;
// std::cout << std::string(tagl, '*') << str3 << std::endl;
}
kseq_destroy(sequence); // Destroy the seq object
gzclose(file);
// Transfer the results from the temp data structure to the full data structure
results_mutex.lock();
for (auto marker : temp_results) results[marker.first][input_file.individual_name] += marker.second;
results_mutex.unlock();
log("Finished processing individual " + input_file.individual_name + " with tag " + tag + "" );
log("Trimmed "+ std::to_string(maxtagl - tagl)+ " bases in the tail for reads in file " + input_file.path + "");
}
/*
* Copyright (C) 2020 Romain Feron
* This file is part of RADSex.
* RADSex is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* RADSex is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with RADSex. If not, see <https://www.gnu.org/licenses/>.
*/
/*!
* @file process.h
* @brief Defines function for the "process" command generating a markers depth table from individual reads files .
*/
#pragma once
#include <dirent.h>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <zlib.h>
#include "kseq/kseq.h"
#include "parameters.h"
#include "utils.h"
#include "popmap.h"
const std::vector<std::string> fextensions {".fq.gz", ".fq", ".fastq.gz", ".fastq", ".fasta.gz", ".fasta", ".fa.gz", ".fa", ".fna.gz", ".fna"}; ///< List of extensions for currently supported file types
/*!
* \brief InputFile struct
*
* Store information about an input reads file.
*/
struct taggedInputFile {
std::string individual_name; ///< Name of the individual, inferred from the file name
std::string tag; // a tag associated with this sample
uint maxTagLength; // this info will help to get seq. of the same size between individuals with different tag size
std::string path; ///< File path
std::string extension; ///< File extension (supported extensions are defined in \link extensions \endlink
bool processed = false; ///< If true, the file has already been processed
};
/*!
* \brief Get input files
*
* Detect all input files in the input directory based on extensions and store file information in a InputFile vector.
*
* \param input_dir_path Path to the input directory
*
* \return A vector of InputFile objects, each containing information about an input file
*/
std::vector<taggedInputFile> get_input_files(const std::string& input_dir_path, Popmap& popmap);
/*!
* \brief Main function implementing the "process" command
*
* Generate a markers depth table from all individual reads files detected in an input directory.
* The function spans multiple file_processor() threads to process input files in parallel.
*
* \param parameters A Parameters object storing the value of all RADSex parameters
*/
void processTagged(Parameters& parameters);
/*!
* \brief File processor
*
* Look for an unprocessed input file in the vector of input files generated by get_input_files() and process this file.
*
* \param input_files Vector of InputFile generated by get_input_files()
* \param results A map of marker depth for each individual: {Marker -> {Individual -> depth}}
* \param results_mutex A mutex protecting the marker depth map when writing from process_file()
* \param files_mutex A mutex to lock the InputFile vector while looking for a file to process
*/
void file_processor(std::vector<taggedInputFile>& input_files, std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>>& results, std::mutex& results_mutex, std::mutex& files_mutex);
/*!
* \brief Process a single file
*
* Read all sequences from an input file with kseq_read() and increment the marker depth table accordingly.
*
* \param input_file Path to the reads file
* \param results A map of marker depth for each individual: {Marker -> {Individual -> depth}}
* \param results_mutex A mutex protecting the marker depth map when writing from multiple processing threads
*/
inline void process_file(taggedInputFile& input_file, std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>>& results, std::mutex& results_mutex);
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment