Commit 3e6a01a7 authored by RomainFeron's avatar RomainFeron
Browse files

Implemented max size for markers queue, thread waits if queue reaches this size

parent f41a5035
......@@ -11,11 +11,17 @@ inline std::string failure_message(const CLI::App* parser, const CLI::Error& err
std::string message = "";
if (error.what() == std::string("A subcommand is required")) {
message = "\nSubcommand error: missing or invalid subcommand\n\n" + parser->help();
} else if (error.get_exit_code() == 106) { // 106 corresponds to wrong argument type
message = "\nArgument error: " + std::string(error.what()) + "\n\n" + parser->help();
} else {
message = "\nError: " + std::string(error.what()) + "\n\n" + parser->help();
}
return message;
......
......@@ -17,7 +17,7 @@ void depth(Parameters& parameters) {
std::mutex queue_mutex;
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, true);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(queue_mutex), std::ref(depths), std::ref(n_markers), std::ref(parsing_ended), 100, popmap.n_individuals);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(queue_mutex), std::ref(depths), std::ref(n_markers), std::ref(parsing_ended), BATCH_SIZE, popmap.n_individuals);
parsing_thread.join();
processing_thread.join();
......
......@@ -2,8 +2,6 @@
void table_parser(Parameters& parameters, const Popmap& popmap, MarkersQueue& markers_queue, std::mutex& queue_mutex, Header& header, bool& parsing_ended, bool no_seq, bool no_group) {
const uint tmp_queue_size = 1000;
std::ifstream input_file;
input_file.open(parameters.markers_table_path);
......@@ -38,8 +36,9 @@ void table_parser(Parameters& parameters, const Popmap& popmap, MarkersQueue& ma
uint k = 0, field_n = 0, marker_n = 0;
Marker marker;
marker.individuals.resize(header.size() - 2);
std::vector<Marker> tmp_queue(tmp_queue_size); // Temporary block queue to avoid locking the shared blocks queue too often
std::vector<Marker> tmp_queue(TMP_QUEUE_SIZE); // Temporary block queue to avoid locking the shared blocks queue too often
uint tmp_queue_real_size = 0;
uint marker_queue_size = 0;
do {
......@@ -81,10 +80,14 @@ void table_parser(Parameters& parameters, const Popmap& popmap, MarkersQueue& ma
++marker.n_individuals;
}
// Add marker to the queue
tmp_queue[marker_n % tmp_queue_size] = marker; // Empty line means end of a block, we add it to the queue
tmp_queue[marker_n % TMP_QUEUE_SIZE] = marker; // Empty line means end of a block, we add it to the queue
++tmp_queue_real_size;
++marker_n;
if (marker_n % tmp_queue_size == 0) { // Merge temporary queue with shared queue after 1000 blocks
if (marker_n % TMP_QUEUE_SIZE == 0) { // Merge temporary queue with shared queue after 1000 blocks
do {
marker_queue_size = markers_queue.markers.size();
if (marker_queue_size > MAX_QUEUE_SIZE) std::this_thread::sleep_for(std::chrono::microseconds(10));
} while (marker_queue_size > MAX_QUEUE_SIZE);
queue_mutex.lock();
for (auto& tmp_marker: tmp_queue) {
markers_queue.markers.push(tmp_marker);
......
......@@ -21,7 +21,7 @@ void distrib(Parameters& parameters) {
std::mutex queue_mutex;
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, false);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(queue_mutex), std::ref(results), std::ref(parsing_ended), 100);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(queue_mutex), std::ref(results), std::ref(parsing_ended), BATCH_SIZE);
parsing_thread.join();
......
......@@ -22,7 +22,7 @@ void freq(Parameters& parameters) {
std::vector<uint32_t> frequencies(n_individuals, 0);
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, true);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(queue_mutex), std::ref(frequencies), std::ref(parsing_ended), 100);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(queue_mutex), std::ref(frequencies), std::ref(parsing_ended), BATCH_SIZE);
parsing_thread.join();
processing_thread.join();
......
......@@ -38,7 +38,7 @@ void map(Parameters& parameters) {
build_bwa_index(parameters);
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), false, false);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(popmap), std::ref(queue_mutex), std::ref(aligned_markers), std::ref(parsing_ended), 1000);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(parameters), std::ref(popmap), std::ref(queue_mutex), std::ref(aligned_markers), std::ref(parsing_ended), BATCH_SIZE);
parsing_thread.join();
processing_thread.join();
......
......@@ -22,7 +22,7 @@ void signif(Parameters& parameters) {
std::mutex queue_mutex;
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, false);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(popmap), std::ref(parameters), std::ref(queue_mutex), std::ref(candidate_markers), std::ref(n_markers), std::ref(parsing_ended), 100);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(popmap), std::ref(parameters), std::ref(queue_mutex), std::ref(candidate_markers), std::ref(n_markers), std::ref(parsing_ended), BATCH_SIZE);
parsing_thread.join();
processing_thread.join();
......
......@@ -15,7 +15,7 @@ void subset(Parameters& parameters) {
std::ofstream output_file = open_output(parameters.output_file_path);
std::thread parsing_thread(table_parser, std::ref(parameters), std::ref(popmap), std::ref(markers_queue), std::ref(queue_mutex), std::ref(header), std::ref(parsing_ended), true, false);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(popmap), std::ref(parameters), std::ref(queue_mutex), std::ref(output_file), std::ref(parsing_ended), 100);
std::thread processing_thread(processor, std::ref(markers_queue), std::ref(popmap), std::ref(parameters), std::ref(queue_mutex), std::ref(output_file), std::ref(parsing_ended), BATCH_SIZE);
parsing_thread.join();
processing_thread.join();
......
......@@ -12,10 +12,12 @@
#include <vector>
#define DTTMFMT "%Y-%m-%d %H:%M:%S"
#define DTTMSZ 21
#define LOG_ERROR "ERROR"
#define LOG_WARNING "WARNING"
#define LOG_INFO "INFO"
#define BATCH_SIZE 100
#define TMP_QUEUE_SIZE 1000
#define MAX_QUEUE_SIZE 10000
// Store sex distribution results
typedef std::unordered_map<uint, std::unordered_map<uint, std::pair<uint64_t, double>>> sd_table;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment