From a3817bf0b78bcc63d8366c62eab79928ebd2f060 Mon Sep 17 00:00:00 2001 From: Yaossg Date: Sun, 19 Jan 2025 17:06:28 +0800 Subject: [PATCH] use channel --- Makefile | 2 +- hisat_3n_table.cpp | 61 +++++++-------------- position_3n_table.h | 113 ++++++++++++++------------------------- utility_3n_table.h | 125 +++++++++++++++++++++++--------------------- 4 files changed, 124 insertions(+), 177 deletions(-) diff --git a/Makefile b/Makefile index 0abfad9..4f5c347 100644 --- a/Makefile +++ b/Makefile @@ -300,7 +300,7 @@ repeat: hisat2-repeat repeat-debug: hisat2-repeat-debug DEFS :=-fno-strict-aliasing \ - -DHISAT2_VERSION="\"`cat HISAT2_VERSION`\"" \ + -DHISAT2_VERSION="\"\"" \ -DBUILD_HOST="\"`hostname`\"" \ -DBUILD_TIME="\"`date`\"" \ -DCOMPILER_VERSION="\"`$(CXX) -v 2>&1 | tail -1`\"" \ diff --git a/hisat_3n_table.cpp b/hisat_3n_table.cpp index 0168af2..f639677 100644 --- a/hisat_3n_table.cpp +++ b/hisat_3n_table.cpp @@ -246,9 +246,9 @@ int hisat_3n_table() positions = new Positions(refFileName, nThreads, addedChrName, removedChrName); // open #nThreads workers - vector workers; + vector workers; for (int i = 0; i < nThreads; i++) { - workers.push_back(new thread(&Positions::append, positions, i)); + workers.emplace_back(&Positions::append, positions, i); } // open a output thread @@ -265,50 +265,37 @@ int hisat_3n_table() alignmentFile = &inputFile; } - string* line; // temporary string to get SAM line. - string samChromosome; // the chromosome name of current SAM line. long long int samPos; // the position of current SAM line. long long int reloadPos; // the position in reference that we need to reload. long long int lastPos = 0; // the position on last SAM line. compare lastPos with samPos to make sure the SAM is sorted. while (alignmentFile->good()) { - positions->getFreeStringPointer(line); - if (!getline(*alignmentFile, *line)) { - positions->returnLine(line); + string line; + string samChromosome; + + if (!getline(*alignmentFile, line)) { break; } - if (line->empty() || line->front() == '@') { - positions->returnLine(line); + if (line.empty() || line.front() == '@') { continue; } - // limit the linePool size to save memory - while(positions->linePool.size() > 1000 * nThreads) { - this_thread::sleep_for (std::chrono::microseconds(1)); - } + // if the SAM line is empty or unmapped, get the next SAM line. - if (!getSAMChromosomePos(line, samChromosome, samPos)) { - positions->returnLine(line); + if (!getSAMChromosomePos(&line, samChromosome, samPos)) { continue; } // if the samChromosome is different than current positions' chromosome, finish all SAM line. // then load a new reference chromosome. - if (samChromosome != positions->chromosome) { - // wait all line is processed - while (!positions->linePool.empty() || positions->outputPositionPool.size() > 100000) { - this_thread::sleep_for (std::chrono::microseconds(1)); - } + if (samChromosome != *positions->chromosome) { positions->appendingFinished(); positions->moveAllToOutput(); - positions->loadNewChromosome(samChromosome); + positions->loadNewChromosome(std::move(samChromosome)); reloadPos = loadingBlockSize; lastPos = 0; } // if the samPos is larger than reloadPos, load 1 loadingBlockSize bp in from reference. while (samPos > reloadPos) { - while (!positions->linePool.empty() || positions->outputPositionPool.size() > 100000) { - this_thread::sleep_for (std::chrono::microseconds(1)); - } positions->appendingFinished(); positions->moveBlockToOutput(); positions->loadMore(); @@ -318,7 +305,7 @@ int hisat_3n_table() cerr << "The input alignment file is not sorted. Please use sorted SAM file as alignment file." << endl; throw 1; } - positions->linePool.push(line); + positions->linePool.send(std::move(line)); lastPos = samPos; } //} @@ -326,30 +313,18 @@ int hisat_3n_table() inputFile.close(); } - - // prepare to close everything. - - // make sure linePool is empty - while (!positions->linePool.empty()) { - this_thread::sleep_for (std::chrono::microseconds(100)); + positions->linePool.close(); + for (int i = 0; i < nThreads; i++){ + workers[i].join(); } + // make sure all workers finished their appending work. positions->appendingFinished(); // move all position to outputPool positions->moveAllToOutput(); // wait until outputPool is empty - while (!positions->outputPositionPool.empty()) { - this_thread::sleep_for (std::chrono::microseconds(100)); - } - // stop all thread and clean - while(positions->freeLinePool.popFront(line)) { - delete line; - } - positions->working = false; - for (int i = 0; i < nThreads; i++){ - workers[i]->join(); - delete workers[i]; - } + + positions->outputPositionPool.close(); outputThread.join(); delete positions; return 0; diff --git a/position_3n_table.h b/position_3n_table.h index 2a0c99d..da1fbd3 100644 --- a/position_3n_table.h +++ b/position_3n_table.h @@ -26,6 +26,9 @@ #include #include #include +#include +#include + #include "alignment_3n_table.h" using namespace std; @@ -60,21 +63,21 @@ public: class Position{ mutex mutex_; public: - string chromosome; // reference chromosome name + shared_ptr chromosome; // reference chromosome name long long int location; // 1-based position char strand; // +(REF) or -(REF-RC) string convertedQualities; // each char is a mapping quality on this position for converted base. string unconvertedQualities; // each char is a mapping quality on this position for unconverted base. - vector uniqueIDs; // each value represent a readName which contributed the base information. + deque uniqueIDs; // each value represent a readName which contributed the base information. // readNameIDs is to make sure no read contribute 2 times in same position. void initialize() { - chromosome.clear(); + chromosome.reset(); location = -1; strand = '?'; convertedQualities.clear(); unconvertedQualities.clear(); - vector().swap(uniqueIDs); + uniqueIDs.clear(); } Position(){ @@ -92,8 +95,8 @@ public: * set the chromosome, location (position), and strand information. */ - void set (string& inputChr, long long int inputLoc) { - chromosome = inputChr; + void set (shared_ptr inputChr, long long int inputLoc) { + chromosome = std::move(inputChr); location = inputLoc + 1; } @@ -187,15 +190,13 @@ public: */ class Positions{ public: - vector refPositions; // the pool of all current reference position. - string chromosome; // current reference chromosome name. + deque refPositions; // the pool of all current reference position. + shared_ptr chromosome; // current reference chromosome name. long long int location; // current location (position) in reference chromosome. char lastBase = 'X'; // the last base of reference line. this is for CG_only mode. - SafeQueue linePool; // pool to store unprocessed SAM line. - SafeQueue freeLinePool; // pool to store free string pointer for SAM line. - SafeQueue freePositionPool; // pool to store free position pointer for reference position. - SafeQueue outputPositionPool; // pool to store the reference position which is loaded and ready to output. - bool working; + Channel linePool; // pool to store unprocessed SAM line. + Channel freePositionPool; // pool to store free position pointer for reference position. + Channel outputPositionPool; // pool to store the reference position which is loaded and ready to output. mutex mutex_; long long int refCoveredPosition; // this is the last position in reference chromosome we loaded in refPositions. ifstream refFile; @@ -206,7 +207,6 @@ public: bool removedChrName = false; Positions(string inputRefFileName, int inputNThreads, bool inputAddedChrName, bool inputRemovedChrName) { - working = true; nThreads = inputNThreads; addedChrName = inputAddedChrName; removedChrName = inputRemovedChrName; @@ -222,7 +222,7 @@ public: delete workerLock[i]; } Position* pos; - while(freePositionPool.popFront(pos)) { + while(freePositionPool.recv(pos)) { delete pos; } } @@ -271,13 +271,13 @@ public: while (refFile.good()) { getline(refFile, line); if (line.front() == '>') { // this line is chromosome name - chromosome = getChrName(line); + chromosome = make_shared(getChrName(line)); streampos currentPos = refFile.tellg(); - chromosomePos.append(chromosome, currentPos); + chromosomePos.append(*chromosome, currentPos); } } chromosomePos.sort(); - chromosome.clear(); + chromosome.reset(); } /** @@ -333,19 +333,15 @@ public: *out_ << "ref\tpos\tstrand\tconvertedBaseQualities\tconvertedBaseCount\tunconvertedBaseQualities\tunconvertedBaseCount\n"; Position* pos; - while (working) { - if (outputPositionPool.popFront(pos)) { - *out_ << pos->chromosome << '\t' - << to_string(pos->location) << '\t' - << pos->strand << '\t' - << pos->convertedQualities << '\t' - << to_string(pos->convertedQualities.size()) << '\t' - << pos->unconvertedQualities << '\t' - << to_string(pos->unconvertedQualities.size()) << '\n'; - returnPosition(pos); - } else { - this_thread::sleep_for (std::chrono::microseconds(1)); - } + while (outputPositionPool.recv(pos)) { + *out_ << pos->chromosome << '\t' + << to_string(pos->location) << '\t' + << pos->strand << '\t' + << pos->convertedQualities << '\t' + << to_string(pos->convertedQualities.size()) << '\t' + << pos->unconvertedQualities << '\t' + << to_string(pos->unconvertedQualities.size()) << '\n'; + returnPosition(pos); } tableFile.close(); } @@ -363,15 +359,13 @@ public: if (refPositions[index]->empty() || refPositions[index]->strand == '?') { returnPosition(refPositions[index]); } else { - outputPositionPool.push(refPositions[index]); + outputPositionPool.send(refPositions[index]); } } else { break; } } - if (index != 0) { - refPositions.erase(refPositions.begin(), refPositions.begin()+index); - } + refPositions.erase(refPositions.begin(), refPositions.begin() + index); } /** @@ -385,8 +379,8 @@ public: if (refPositions[index]->empty() || refPositions[index]->strand == '?') { returnPosition(refPositions[index]); } else { - vector().swap(refPositions[index]->uniqueIDs); - outputPositionPool.push(refPositions[index]); + refPositions[index]->uniqueIDs.clear(); + outputPositionPool.send(refPositions[index]); } } refPositions.clear(); @@ -399,7 +393,7 @@ public: refFile.clear(); // find the start position in file based on chromosome name. streampos startPos = chromosomePos.getChromosomePosInRefFile(targetChromosome); - chromosome = targetChromosome; + chromosome = make_shared(move(targetChromosome)); refFile.seekg(startPos, ios::beg); refCoveredPosition = 2 * loadingBlockSize; string line; @@ -479,45 +473,23 @@ public: } } - /** - * get a string pointer from freeLinePool, if freeLinePool is empty, make a new string pointer. - */ - void getFreeStringPointer(string*& newLine) { - if (freeLinePool.popFront(newLine)) { - return; - } else { - newLine = new string(); - } - } - /** * get a Position pointer from freePositionPool, if freePositionPool is empty, make a new Position pointer. */ void getFreePosition(Position*& newPosition) { - while (outputPositionPool.size() >= 10000) { - this_thread::sleep_for (std::chrono::microseconds(1)); - } - if (freePositionPool.popFront(newPosition)) { + if (freePositionPool.recv(newPosition)) { return; } else { newPosition = new Position(); } } - /** - * return the line to freeLinePool - */ - void returnLine(string* line) { - line->clear(); - freeLinePool.push(line); - } - /** * return the position to freePositionPool. */ void returnPosition(Position* pos) { pos->initialize(); - freePositionPool.push(pos); + freePositionPool.send(pos); } /** @@ -525,23 +497,18 @@ public: * it take the SAM line from linePool, parse it. */ void append(int threadID) { - string* line; + string line; Alignment newAlignment; - while (working) { - workerLock[threadID]->lock(); - if(!linePool.popFront(line)) { - workerLock[threadID]->unlock(); - this_thread::sleep_for (std::chrono::nanoseconds(1)); - continue; - } + + while (true) { + std::unique_lock lk{*workerLock[threadID]}; + if (!linePool.recv(line)) break; while (refPositions.empty()) { this_thread::sleep_for (std::chrono::microseconds(1)); } - newAlignment.parse(line); - returnLine(line); + newAlignment.parse(&line); appendPositions(newAlignment); - workerLock[threadID]->unlock(); } } }; diff --git a/utility_3n_table.h b/utility_3n_table.h index 33a086e..a4b0812 100644 --- a/utility_3n_table.h +++ b/utility_3n_table.h @@ -20,9 +20,11 @@ #ifndef UTILITY_3N_TABLE_H #define UTILITY_3N_TABLE_H +#include +#include #include #include -#include +#include using namespace std; @@ -185,74 +187,77 @@ public: } }; -/** - * simple safe queue - */ -template -class SafeQueue { -private: - mutex mutex_; - queue queue_; +template +struct Channel { + Channel() = default; + Channel(const Channel &) = delete; + Channel &operator=(const Channel &) = delete; + Channel(Channel &&) = delete; + Channel &operator=(Channel &&) = delete; + ~Channel() = default; - string getReadName(string* line){ - int startPosition = 0; - int endPosition; - - endPosition = line->find("\t", startPosition); - string readName = line->substr(startPosition, endPosition - startPosition); - return readName; - } - -public: - void pop() { - mutex_.lock(); - queue_.pop(); - mutex_.unlock(); - } - - T front() { - mutex_.lock(); - T value = queue_.front(); - mutex_.unlock(); - return value; - } - - int size() { - mutex_.lock(); - int s = queue_.size(); - mutex_.unlock(); - return s; - } - - /** - * return true if the queue is not empty and pop front and get value. - * return false if the queue is empty. - */ - bool popFront(T& value) { - mutex_.lock(); - bool isEmpty = queue_.empty(); - if (!isEmpty) { - value = queue_.front(); - queue_.pop(); + void send(T in) { + if (closed()) { + abort(); } - mutex_.unlock(); - return !isEmpty; + { + std::unique_lock lock{mtx_}; + queue_.push(std::move(in)); + ++size_; + } + cnd_.notify_one(); } - void push(T value) { - mutex_.lock(); - queue_.push(value); - mutex_.unlock(); + bool recv(T& out) { + if (closed() && empty()) { + return false; + } + { + std::unique_lock lock{mtx_}; + cnd_.wait(lock, [this] { return !empty() || closed(); }); + if (empty()) { + return false; + } + out = std::move(queue_.front()); + queue_.pop(); + --size_; + } + if (closed()) { + cnd_.notify_all(); + } else { + cnd_.notify_one(); + } + return true; } - bool empty() { - mutex_.lock(); - bool check = queue_.empty(); - mutex_.unlock(); - return check; + size_t constexpr size() const noexcept { + return size_; } + bool constexpr empty() const noexcept { + return size_ == 0; + } + + void close() noexcept { + { + std::unique_lock lock{mtx_}; + is_closed_.store(true); + } + cnd_.notify_all(); + } + + bool closed() const noexcept { + return is_closed_.load(); + } + +private: + std::queue queue_; + std::atomic size_{0}; + std::mutex mtx_; + std::condition_variable cnd_; + std::atomic is_closed_{false}; }; + /** * store one chromosome and it's stream position */