Commit 7211c76b authored by RomainFeron's avatar RomainFeron
Browse files

Added logging to file and cleaner logs

parent fdf57118
......@@ -19,13 +19,7 @@ void depth(Parameters& parameters) {
parsing_thread.join();
processing_thread.join();
std::ofstream output_file;
output_file.open(parameters.output_file_path);
if (not output_file.is_open()) {
std::cerr << "**Error: could not open output file <" << parameters.output_file_path << ">" << std::endl;
exit(1);
}
std::ofstream output_file = open_output(parameters.output_file_path);
output_file << "Individual\tGroup\tMarkers\tRetained\tMin_depth\tMax_depth\tMedian_depth\tAverage_depth\n";
......@@ -72,6 +66,9 @@ void depth(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex& queue_mutex, Depths& depths, std::vector<uint32_t>& n_markers, bool& parsing_ended, ulong batch_size, uint n_individuals) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<Marker> batch;
bool keep_going = true;
......@@ -94,7 +91,7 @@ void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex&
}
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
......
......@@ -16,5 +16,4 @@ typedef std::vector<std::vector<uint16_t>> Depths;
// Calculate the number of reads retained in each individual
void depth(Parameters& parameters);
void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex& queue_mutex, Depths& results, std::vector<uint32_t>& n_markers,
bool& parsing_ended, ulong batch_size, uint n_individuals);
void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex& queue_mutex, Depths& results, std::vector<uint32_t>& n_markers, bool& parsing_ended, ulong batch_size, uint n_individuals);
......@@ -8,8 +8,10 @@ void table_parser(Parameters& parameters, const Popmap& popmap, MarkersQueue& ma
input_file.open(parameters.markers_table_path);
if (not input_file) {
std::cerr << "**Error: could not open marker depths table <" << parameters.markers_table_path << ">" << std::endl;
log("Could not open marker depths table <" + parameters.markers_table_path + ">", LOG_ERROR);
exit(1);
}
std::string temp = "";
......
......@@ -21,6 +21,9 @@ void distrib(Parameters& parameters) {
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(queue_mutex), std::ref(results), std::ref(parsing_ended), 100);
parsing_thread.join();
processing_thread.join();
// Calculate p-values for association with sex for each combination of males and females
......@@ -52,6 +55,9 @@ void distrib(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex& queue_mutex, sd_table& results, bool& parsing_ended, ulong batch_size) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<Marker> batch;
bool keep_going = true;
......@@ -68,7 +74,7 @@ void processor(MarkersQueue& markers_queue, Parameters& parameters, std::mutex&
for (auto marker: batch) {
++results[marker.groups[parameters.group1]][marker.groups[parameters.group2]].first;
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
......
......@@ -24,12 +24,7 @@ void freq(Parameters& parameters) {
parsing_thread.join();
processing_thread.join();
std::ofstream output_file;
output_file.open(parameters.output_file_path);
if (not output_file.is_open()) {
std::cerr << "**Error: could not open output file <" << parameters.output_file_path << ">" << std::endl;
exit(1);
}
std::ofstream output_file = open_output(parameters.output_file_path);
output_file << "Frequency" << "\t" << "Count" << "\n";
......@@ -42,6 +37,9 @@ void freq(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, std::mutex& queue_mutex, std::vector<uint32_t>& frequencies, bool& parsing_ended, ulong batch_size) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<Marker> batch;
bool keep_going = true;
......@@ -58,7 +56,7 @@ void processor(MarkersQueue& markers_queue, std::mutex& queue_mutex, std::vector
for (auto& marker: batch) {
++frequencies[marker.n_individuals];
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
} else {
......
......@@ -40,12 +40,7 @@ void map(Parameters& parameters) {
parsing_thread.join();
processing_thread.join();
std::ofstream output_file;
output_file.open(parameters.output_file_path);
if (not output_file.is_open()) {
std::cerr << "**Error: could not open output file <" << parameters.output_file_path << ">";
exit(1);
}
std::ofstream output_file = open_output(parameters.output_file_path);
output_file << "Contig\tPosition\tLength\tMarker_id\tBias\tP\tSignif\n";
......@@ -65,6 +60,9 @@ void map(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, Parameters& parameters, Popmap& popmap, std::mutex& queue_mutex, std::vector<AlignedMarker>& aligned_markers, bool& parsing_ended, ulong batch_size) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Batch reading
std::vector<Marker> batch;
bool keep_going = true;
......@@ -76,7 +74,7 @@ void processor(MarkersQueue& markers_queue, Parameters& parameters, Popmap& popm
bwaidx_t* index = bwa_idx_load(parameters.genome_file_path.c_str(), BWA_IDX_ALL);
if (index == nullptr) {
std::cerr << "**Error: failed to load index for reference \"" + parameters.genome_file_path + "\"." << std::endl;
log("Failed to load index for genome file <" + parameters.genome_file_path + ">", LOG_ERROR);
exit(1);
}
......@@ -148,7 +146,7 @@ void processor(MarkersQueue& markers_queue, Parameters& parameters, Popmap& popm
best_alignment[1] = -1;
best_alignment[2] = 0;
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
} else {
......@@ -170,6 +168,14 @@ void processor(MarkersQueue& markers_queue, Parameters& parameters, Popmap& popm
std::unordered_map<std::string, uint64_t> get_contig_lengths(const std::string& genome_file_path) {
std::ifstream genome_file(genome_file_path);
if (not genome_file.is_open()) {
log("Could not open genome file <" + genome_file_path + "> in get_contig_lengths", LOG_ERROR);
exit(1);
}
std::string line = "", contig_name = "";
std::unordered_map<std::string, uint64_t> contig_lengths;
......@@ -211,14 +217,17 @@ void build_bwa_index(Parameters& parameters) {
// Checking that all index files are present
for (auto i=0; i<5; ++i) {
bwa_index_temp.open(parameters.genome_file_path + "." + extensions[i]);
if (bwa_index_temp.is_open()) bwa_index_temp.close(); else indexed = false;
}
if (not indexed) {
std::cerr << "**Info: bwa index file not found for the reference, creating bwa index file";
log("BWA index file not found for the reference, creating bwa index file");
bwa_idx_build(parameters.genome_file_path.c_str(), parameters.genome_file_path.c_str(), 0, 10000000); // Arguments: genome file, prefix, algorithm (default 0), block size (default 10000000)
}
std::cerr << "**Info: loading BWA index file" << std::endl;
}
#include "popmap.h"
Popmap load_popmap(Parameters& parameters) {
/* Load a popmap file.
......@@ -35,18 +33,18 @@ Popmap load_popmap(Parameters& parameters) {
} else {
std::cout << "**Error: cannot open popmap file \"" << parameters.popmap_file_path << "\"";
log("Could not open popmap file <" + parameters.popmap_file_path + ">", LOG_ERROR);
exit(1);
}
if (popmap.counts.size() < 2) {
std::cerr << "**Error: found only " << popmap.counts.size() << " groups in the popmap, at least two are required" << std::endl;
log("Found <" + std::to_string(popmap.counts.size()) + "> groups in the popmap file, but at least two are required", LOG_ERROR);
exit(1);
} else if (popmap.counts.size() > 2 and (parameters.group1 == "" or parameters.group2 == "")) {
std::cerr << "**Error: found " << popmap.counts.size() << " groups in the popmap but groups to compare were not defined (use --groups group1,group2)" << std::endl;
log("Found <" + std::to_string(popmap.counts.size()) + "> groups in the popmap file, but groups to compare were not defined (use --groups group1,group2)", LOG_ERROR);
exit(1);
} else {
......@@ -56,14 +54,16 @@ Popmap load_popmap(Parameters& parameters) {
parameters.group2 = (++i)->first;
}
std::cerr << "Successfully loaded popmap (";
std::string popmap_success_message = "Loaded popmap (";
uint n = 1;
for (auto group: popmap.counts) {
std::cerr << group.first << ": " << group.second;
if (n < popmap.counts.size()) std::cerr << ", ";
popmap_success_message += group.first + ": " + std::to_string(group.second);
if (n < popmap.counts.size()) popmap_success_message += ", ";
++n;
}
std::cerr << ")" << std::endl;
popmap_success_message += ")";
log(popmap_success_message);
return popmap;
}
......
......@@ -12,7 +12,7 @@ std::vector<InputFile> get_input_files(const std::string& input_dir_path) {
DIR* dir = opendir(input_dir_path.c_str());
if(!dir) {
std::cout << std::endl << "** Error: could not read the directory \"" << input_dir_path << "\"" << std::endl;
log("Could not read from the directory <" + input_dir_path + ">", LOG_ERROR);
exit(0);
}
......@@ -52,10 +52,18 @@ std::vector<InputFile> get_input_files(const std::string& input_dir_path) {
if (files.size() == 0) {
std::cout << " ** Error: no valid input file found in input directory \"" << input_dir_path <<"\"." << std::endl;
log("No valid input file found in input directory \"" + input_dir_path + ">");
std::string valid_formats_message = "Input files are detected based on extensions <";
for (auto& extension: extensions) valid_formats_message += extension;
if (extension != extensions.back()) valid_formats_message += ", ";
valid_formats_message += ">";
log(valid_formats_message);
exit(1);
}
log("Found <" + std::to_string(files.size()) + "> reads files");
return files;
}
......@@ -72,6 +80,7 @@ void process(Parameters& parameters) {
if (parameters.input_dir_path.back() != '/') parameters.input_dir_path += "/"; // Append "/" to the end of the path if it's missing
std::vector<InputFile> input_files = get_input_files(parameters.input_dir_path);
std::unordered_map<std::string, std::unordered_map<std::string, uint16_t>> results;
std::vector<std::thread> threads;
......@@ -88,25 +97,26 @@ void process(Parameters& parameters) {
std::vector<std::string> individuals;
for (auto i: input_files) individuals.push_back(i.individual_name);
std::ofstream output_file;
output_file.open(parameters.output_file_path);
std::ofstream output_file = open_output(parameters.output_file_path);
output_file << "#Number of markers : " << results.size() << "\n";
output_file << "id\tsequence";
for (auto& i: individuals) output_file << "\t" << i;
output_file << "\n";
std::cerr << "**Info: writing marker depths to output file" << std::endl;
log("**Writing marker depths to output file");
uint id = 0;
uint64_t n_markers = results.size();
uint64_t log_tick = static_cast<uint64_t>(n_markers / 10);
bool print = true;
uint marker_processed_tick = static_cast<uint>(results.size() / 100);
uint64_t n_processed_markers = 0;
// Fill line by line
for (auto r: results) {
for (auto marker: results) {
if (parameters.min_depth > 1) {
print = false;
for (auto i: r.second) {
for (auto i: marker.second) {
if (i.second >= parameters.min_depth) {
print = true;
break;
......@@ -115,13 +125,13 @@ void process(Parameters& parameters) {
}
if (print) {
output_file << id << "\t" << r.first;
for (auto i: individuals) output_file << "\t" << r.second[i];
output_file << id << "\t" << marker.first;
for (auto i: individuals) output_file << "\t" << marker.second[i];
output_file << "\n";
++id;
}
if (id % log_tick == 0) std::cerr << "**Info: wrote " << id / 1000 << " K. markers (" << 10 * id / log_tick << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
}
......@@ -139,19 +149,28 @@ inline void file_processor(std::vector<InputFile>& input_files, std::unordered_m
bool remaining_files = true;
while (remaining_files) {
files_mutex.lock();
for (std::vector<InputFile>::iterator it = input_files.begin(); it != input_files.end(); ++it) {
if (not it->processed) {
it->processed = true;
remaining_files = true;
files_mutex.unlock();
process_file(*it, results, results_mutex);
break;
} else {
remaining_files = false;
}
}
files_mutex.unlock();
}
}
......@@ -172,25 +191,24 @@ inline void process_file(InputFile& input_file, std::unordered_map<std::string,
file = gzopen(input_file.path.c_str(), "r");
if (not file) {
std::cerr << " ** Error: cannot open input file <" << input_file.path << ">" << std::endl;
log("Could not open reads file <" + input_file.path + ">", LOG_ERROR);
exit(1);
}
sequence = kseq_init(file); // Initialize the seq object
// Read through the file and store the results
while ((line_n = kseq_read(sequence)) >= 0) {
++temp_results[sequence->seq.s];
}
while ((line_n = kseq_read(sequence)) >= 0) ++temp_results[sequence->seq.s];
kseq_destroy(sequence); // Destroy the seq object
gzclose(file);
// Transfer the results from the temp data structure to the full data structure
results_mutex.lock();
for (auto sequence : temp_results) {
results[sequence.first][input_file.individual_name] += sequence.second;
}
std::cout << " - Finished processing individual <" + input_file.individual_name + ">" << std::endl;
for (auto marker : temp_results) results[marker.first][input_file.individual_name] += marker.second;
results_mutex.unlock();
log("Finished processing individual <" + input_file.individual_name + ">");
}
......@@ -24,12 +24,7 @@ void signif(Parameters& parameters) {
parsing_thread.join();
processing_thread.join();
std::ofstream output_file;
output_file.open(parameters.output_file_path);
if (not output_file.is_open()) {
std::cerr << "**Error: could not open output file <" << parameters.output_file_path << ">";
exit(1);
}
std::ofstream output_file = open_output(parameters.output_file_path);
if (not parameters.disable_correction) parameters.signif_threshold /= n_markers; // Bonferroni correction: divide threshold by number of tests
......@@ -50,6 +45,9 @@ void signif(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, Popmap& popmap, Parameters& parameters, std::mutex& queue_mutex, std::vector<Marker>& candidate_markers, uint& n_markers, bool& parsing_ended, ulong batch_size) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<Marker> batch;
bool keep_going = true;
......@@ -77,7 +75,7 @@ void processor(MarkersQueue& markers_queue, Popmap& popmap, Parameters& paramete
}
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
......
......@@ -9,12 +9,7 @@ void subset(Parameters& parameters) {
MarkersQueue markers_queue;
std::mutex queue_mutex;
std::ofstream output_file;
output_file.open(parameters.output_file_path);
if (not output_file.is_open()) {
std::cerr << "**Error: could not open output file <" << parameters.output_file_path << ">";
exit(1);
}
std::ofstream output_file = open_output(parameters.output_file_path);
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, false);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(popmap), std::ref(parameters), std::ref(queue_mutex), std::ref(output_file), std::ref(parsing_ended), 100);
......@@ -28,6 +23,9 @@ void subset(Parameters& parameters) {
void processor(MarkersQueue& markers_queue, Popmap& popmap, Parameters& parameters, std::mutex& queue_mutex, std::ofstream& output_file, bool& parsing_ended, ulong batch_size) {
// Give 100ms headstart to table parser thread (to get number of individuals from header)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<Marker> batch;
bool keep_going = true;
......@@ -56,7 +54,7 @@ void processor(MarkersQueue& markers_queue, Popmap& popmap, Parameters& paramete
}
if (++n_processed_markers % (10 * marker_processed_tick) == 0) std::cerr << "Processed " << n_processed_markers << " markers (" << n_processed_markers / (marker_processed_tick) << " %)" << std::endl;
log_progress(n_processed_markers, marker_processed_tick);
}
......
......@@ -13,6 +13,10 @@
#define DTTMFMT "%Y-%m-%d %H:%M:%S"
#define DTTMSZ 21
#define LOG_ERROR "ERROR"
#define LOG_WARNING "WARNING"
#define LOG_INFO "INFO"
// Store sex distribution results
typedef std::unordered_map<uint, std::unordered_map<uint, std::pair<uint64_t, double>>> sd_table;
......@@ -57,10 +61,10 @@ inline int fast_stoi(const char* str) {
// Log output formatting
template<typename T>
inline void log(T line) {
inline void log(T line, const std::string level = LOG_INFO) {
char logtime[DTTMSZ];
std::cerr << "[" << print_time(logtime) << "]" << " ";
std::cerr << "[" << print_time(logtime) << "]::" << level << " ";
std::cerr << std::boolalpha << line << std::endl;
}
......@@ -89,16 +93,16 @@ inline std::vector<std::string> get_column_sex(std::unordered_map<std::string, s
}
inline const std::vector<std::string> get_header(const std::string& input_file_path) {
std::string line = "#";
std::vector<std::string> header;
std::ifstream input_file;
input_file.open(input_file_path);
std::ifstream input_file(input_file_path);
if (not input_file.is_open()) {
std::cerr << "**Error: could not open input file <" << input_file_path << ">" << std::endl;
log("Could not open input file <" + input_file_path + "> in get_header");
exit(1);
}
......@@ -111,3 +115,30 @@ inline const std::vector<std::string> get_header(const std::string& input_file_p
header = split(line, "\t");
return header;
}
inline std::ofstream open_output(const std::string& output_file_path) {
std::ofstream output_file(output_file_path);
if (not output_file.is_open()) {
log("Could not open output file <" + output_file_path + ">", LOG_ERROR);
exit(1);
}
return output_file;
}
inline void log_progress(uint64_t& n_processed_markers, uint32_t marker_processed_tick) {
if (++n_processed_markers % (10 * marker_processed_tick) == 0) {
log("Processed " + std::to_string(n_processed_markers) + " markers (" + std::to_string(n_processed_markers / (marker_processed_tick)) + " %)");
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment