use channel

This commit is contained in:
Yaossg 2025-01-19 17:06:28 +08:00
parent 5e601d0401
commit a3817bf0b7
4 changed files with 124 additions and 177 deletions

View File

@ -300,7 +300,7 @@ repeat: hisat2-repeat
repeat-debug: hisat2-repeat-debug
DEFS :=-fno-strict-aliasing \
-DHISAT2_VERSION="\"`cat HISAT2_VERSION`\"" \
-DHISAT2_VERSION="\"\"" \
-DBUILD_HOST="\"`hostname`\"" \
-DBUILD_TIME="\"`date`\"" \
-DCOMPILER_VERSION="\"`$(CXX) -v 2>&1 | tail -1`\"" \

View File

@ -246,9 +246,9 @@ int hisat_3n_table()
positions = new Positions(refFileName, nThreads, addedChrName, removedChrName);
// open #nThreads workers
vector<thread*> workers;
vector<thread> workers;
for (int i = 0; i < nThreads; i++) {
workers.push_back(new thread(&Positions::append, positions, i));
workers.emplace_back(&Positions::append, positions, i);
}
// open a output thread
@ -265,50 +265,37 @@ int hisat_3n_table()
alignmentFile = &inputFile;
}
string* line; // temporary string to get SAM line.
string samChromosome; // the chromosome name of current SAM line.
long long int samPos; // the position of current SAM line.
long long int reloadPos; // the position in reference that we need to reload.
long long int lastPos = 0; // the position on last SAM line. compare lastPos with samPos to make sure the SAM is sorted.
while (alignmentFile->good()) {
positions->getFreeStringPointer(line);
if (!getline(*alignmentFile, *line)) {
positions->returnLine(line);
string line;
string samChromosome;
if (!getline(*alignmentFile, line)) {
break;
}
if (line->empty() || line->front() == '@') {
positions->returnLine(line);
if (line.empty() || line.front() == '@') {
continue;
}
// limit the linePool size to save memory
while(positions->linePool.size() > 1000 * nThreads) {
this_thread::sleep_for (std::chrono::microseconds(1));
}
// if the SAM line is empty or unmapped, get the next SAM line.
if (!getSAMChromosomePos(line, samChromosome, samPos)) {
positions->returnLine(line);
if (!getSAMChromosomePos(&line, samChromosome, samPos)) {
continue;
}
// if the samChromosome is different than current positions' chromosome, finish all SAM line.
// then load a new reference chromosome.
if (samChromosome != positions->chromosome) {
// wait all line is processed
while (!positions->linePool.empty() || positions->outputPositionPool.size() > 100000) {
this_thread::sleep_for (std::chrono::microseconds(1));
}
if (samChromosome != *positions->chromosome) {
positions->appendingFinished();
positions->moveAllToOutput();
positions->loadNewChromosome(samChromosome);
positions->loadNewChromosome(std::move(samChromosome));
reloadPos = loadingBlockSize;
lastPos = 0;
}
// if the samPos is larger than reloadPos, load 1 loadingBlockSize bp in from reference.
while (samPos > reloadPos) {
while (!positions->linePool.empty() || positions->outputPositionPool.size() > 100000) {
this_thread::sleep_for (std::chrono::microseconds(1));
}
positions->appendingFinished();
positions->moveBlockToOutput();
positions->loadMore();
@ -318,7 +305,7 @@ int hisat_3n_table()
cerr << "The input alignment file is not sorted. Please use sorted SAM file as alignment file." << endl;
throw 1;
}
positions->linePool.push(line);
positions->linePool.send(std::move(line));
lastPos = samPos;
}
//}
@ -326,30 +313,18 @@ int hisat_3n_table()
inputFile.close();
}
// prepare to close everything.
// make sure linePool is empty
while (!positions->linePool.empty()) {
this_thread::sleep_for (std::chrono::microseconds(100));
positions->linePool.close();
for (int i = 0; i < nThreads; i++){
workers[i].join();
}
// make sure all workers finished their appending work.
positions->appendingFinished();
// move all position to outputPool
positions->moveAllToOutput();
// wait until outputPool is empty
while (!positions->outputPositionPool.empty()) {
this_thread::sleep_for (std::chrono::microseconds(100));
}
// stop all thread and clean
while(positions->freeLinePool.popFront(line)) {
delete line;
}
positions->working = false;
for (int i = 0; i < nThreads; i++){
workers[i]->join();
delete workers[i];
}
positions->outputPositionPool.close();
outputThread.join();
delete positions;
return 0;

View File

@ -26,6 +26,9 @@
#include <mutex>
#include <thread>
#include <cassert>
#include <memory>
#include <deque>
#include "alignment_3n_table.h"
using namespace std;
@ -60,21 +63,21 @@ public:
class Position{
mutex mutex_;
public:
string chromosome; // reference chromosome name
shared_ptr<string> chromosome; // reference chromosome name
long long int location; // 1-based position
char strand; // +(REF) or -(REF-RC)
string convertedQualities; // each char is a mapping quality on this position for converted base.
string unconvertedQualities; // each char is a mapping quality on this position for unconverted base.
vector<uniqueID> uniqueIDs; // each value represent a readName which contributed the base information.
deque<uniqueID> uniqueIDs; // each value represent a readName which contributed the base information.
// readNameIDs is to make sure no read contribute 2 times in same position.
void initialize() {
chromosome.clear();
chromosome.reset();
location = -1;
strand = '?';
convertedQualities.clear();
unconvertedQualities.clear();
vector<uniqueID>().swap(uniqueIDs);
uniqueIDs.clear();
}
Position(){
@ -92,8 +95,8 @@ public:
* set the chromosome, location (position), and strand information.
*/
void set (string& inputChr, long long int inputLoc) {
chromosome = inputChr;
void set (shared_ptr<string> inputChr, long long int inputLoc) {
chromosome = std::move(inputChr);
location = inputLoc + 1;
}
@ -187,15 +190,13 @@ public:
*/
class Positions{
public:
vector<Position*> refPositions; // the pool of all current reference position.
string chromosome; // current reference chromosome name.
deque<Position*> refPositions; // the pool of all current reference position.
shared_ptr<string> chromosome; // current reference chromosome name.
long long int location; // current location (position) in reference chromosome.
char lastBase = 'X'; // the last base of reference line. this is for CG_only mode.
SafeQueue<string*> linePool; // pool to store unprocessed SAM line.
SafeQueue<string*> freeLinePool; // pool to store free string pointer for SAM line.
SafeQueue<Position*> freePositionPool; // pool to store free position pointer for reference position.
SafeQueue<Position*> outputPositionPool; // pool to store the reference position which is loaded and ready to output.
bool working;
Channel<string> linePool; // pool to store unprocessed SAM line.
Channel<Position*> freePositionPool; // pool to store free position pointer for reference position.
Channel<Position*> outputPositionPool; // pool to store the reference position which is loaded and ready to output.
mutex mutex_;
long long int refCoveredPosition; // this is the last position in reference chromosome we loaded in refPositions.
ifstream refFile;
@ -206,7 +207,6 @@ public:
bool removedChrName = false;
Positions(string inputRefFileName, int inputNThreads, bool inputAddedChrName, bool inputRemovedChrName) {
working = true;
nThreads = inputNThreads;
addedChrName = inputAddedChrName;
removedChrName = inputRemovedChrName;
@ -222,7 +222,7 @@ public:
delete workerLock[i];
}
Position* pos;
while(freePositionPool.popFront(pos)) {
while(freePositionPool.recv(pos)) {
delete pos;
}
}
@ -271,13 +271,13 @@ public:
while (refFile.good()) {
getline(refFile, line);
if (line.front() == '>') { // this line is chromosome name
chromosome = getChrName(line);
chromosome = make_shared<string>(getChrName(line));
streampos currentPos = refFile.tellg();
chromosomePos.append(chromosome, currentPos);
chromosomePos.append(*chromosome, currentPos);
}
}
chromosomePos.sort();
chromosome.clear();
chromosome.reset();
}
/**
@ -333,19 +333,15 @@ public:
*out_ << "ref\tpos\tstrand\tconvertedBaseQualities\tconvertedBaseCount\tunconvertedBaseQualities\tunconvertedBaseCount\n";
Position* pos;
while (working) {
if (outputPositionPool.popFront(pos)) {
*out_ << pos->chromosome << '\t'
<< to_string(pos->location) << '\t'
<< pos->strand << '\t'
<< pos->convertedQualities << '\t'
<< to_string(pos->convertedQualities.size()) << '\t'
<< pos->unconvertedQualities << '\t'
<< to_string(pos->unconvertedQualities.size()) << '\n';
returnPosition(pos);
} else {
this_thread::sleep_for (std::chrono::microseconds(1));
}
while (outputPositionPool.recv(pos)) {
*out_ << pos->chromosome << '\t'
<< to_string(pos->location) << '\t'
<< pos->strand << '\t'
<< pos->convertedQualities << '\t'
<< to_string(pos->convertedQualities.size()) << '\t'
<< pos->unconvertedQualities << '\t'
<< to_string(pos->unconvertedQualities.size()) << '\n';
returnPosition(pos);
}
tableFile.close();
}
@ -363,15 +359,13 @@ public:
if (refPositions[index]->empty() || refPositions[index]->strand == '?') {
returnPosition(refPositions[index]);
} else {
outputPositionPool.push(refPositions[index]);
outputPositionPool.send(refPositions[index]);
}
} else {
break;
}
}
if (index != 0) {
refPositions.erase(refPositions.begin(), refPositions.begin()+index);
}
refPositions.erase(refPositions.begin(), refPositions.begin() + index);
}
/**
@ -385,8 +379,8 @@ public:
if (refPositions[index]->empty() || refPositions[index]->strand == '?') {
returnPosition(refPositions[index]);
} else {
vector<uniqueID>().swap(refPositions[index]->uniqueIDs);
outputPositionPool.push(refPositions[index]);
refPositions[index]->uniqueIDs.clear();
outputPositionPool.send(refPositions[index]);
}
}
refPositions.clear();
@ -399,7 +393,7 @@ public:
refFile.clear();
// find the start position in file based on chromosome name.
streampos startPos = chromosomePos.getChromosomePosInRefFile(targetChromosome);
chromosome = targetChromosome;
chromosome = make_shared<string>(move(targetChromosome));
refFile.seekg(startPos, ios::beg);
refCoveredPosition = 2 * loadingBlockSize;
string line;
@ -479,45 +473,23 @@ public:
}
}
/**
* get a string pointer from freeLinePool, if freeLinePool is empty, make a new string pointer.
*/
void getFreeStringPointer(string*& newLine) {
if (freeLinePool.popFront(newLine)) {
return;
} else {
newLine = new string();
}
}
/**
* get a Position pointer from freePositionPool, if freePositionPool is empty, make a new Position pointer.
*/
void getFreePosition(Position*& newPosition) {
while (outputPositionPool.size() >= 10000) {
this_thread::sleep_for (std::chrono::microseconds(1));
}
if (freePositionPool.popFront(newPosition)) {
if (freePositionPool.recv(newPosition)) {
return;
} else {
newPosition = new Position();
}
}
/**
* return the line to freeLinePool
*/
void returnLine(string* line) {
line->clear();
freeLinePool.push(line);
}
/**
* return the position to freePositionPool.
*/
void returnPosition(Position* pos) {
pos->initialize();
freePositionPool.push(pos);
freePositionPool.send(pos);
}
/**
@ -525,23 +497,18 @@ public:
* it take the SAM line from linePool, parse it.
*/
void append(int threadID) {
string* line;
string line;
Alignment newAlignment;
while (working) {
workerLock[threadID]->lock();
if(!linePool.popFront(line)) {
workerLock[threadID]->unlock();
this_thread::sleep_for (std::chrono::nanoseconds(1));
continue;
}
while (true) {
std::unique_lock<std::mutex> lk{*workerLock[threadID]};
if (!linePool.recv(line)) break;
while (refPositions.empty()) {
this_thread::sleep_for (std::chrono::microseconds(1));
}
newAlignment.parse(line);
returnLine(line);
newAlignment.parse(&line);
appendPositions(newAlignment);
workerLock[threadID]->unlock();
}
}
};

View File

@ -20,9 +20,11 @@
#ifndef UTILITY_3N_TABLE_H
#define UTILITY_3N_TABLE_H
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <algorithm>
#include <atomic>
using namespace std;
@ -185,74 +187,77 @@ public:
}
};
/**
* simple safe queue
*/
template <typename T>
class SafeQueue {
private:
mutex mutex_;
queue<T> queue_;
template<typename T>
struct Channel {
Channel() = default;
Channel(const Channel &) = delete;
Channel &operator=(const Channel &) = delete;
Channel(Channel &&) = delete;
Channel &operator=(Channel &&) = delete;
~Channel() = default;
string getReadName(string* line){
int startPosition = 0;
int endPosition;
endPosition = line->find("\t", startPosition);
string readName = line->substr(startPosition, endPosition - startPosition);
return readName;
}
public:
void pop() {
mutex_.lock();
queue_.pop();
mutex_.unlock();
}
T front() {
mutex_.lock();
T value = queue_.front();
mutex_.unlock();
return value;
}
int size() {
mutex_.lock();
int s = queue_.size();
mutex_.unlock();
return s;
}
/**
* return true if the queue is not empty and pop front and get value.
* return false if the queue is empty.
*/
bool popFront(T& value) {
mutex_.lock();
bool isEmpty = queue_.empty();
if (!isEmpty) {
value = queue_.front();
queue_.pop();
void send(T in) {
if (closed()) {
abort();
}
mutex_.unlock();
return !isEmpty;
{
std::unique_lock<std::mutex> lock{mtx_};
queue_.push(std::move(in));
++size_;
}
cnd_.notify_one();
}
void push(T value) {
mutex_.lock();
queue_.push(value);
mutex_.unlock();
bool recv(T& out) {
if (closed() && empty()) {
return false;
}
{
std::unique_lock<std::mutex> lock{mtx_};
cnd_.wait(lock, [this] { return !empty() || closed(); });
if (empty()) {
return false;
}
out = std::move(queue_.front());
queue_.pop();
--size_;
}
if (closed()) {
cnd_.notify_all();
} else {
cnd_.notify_one();
}
return true;
}
bool empty() {
mutex_.lock();
bool check = queue_.empty();
mutex_.unlock();
return check;
size_t constexpr size() const noexcept {
return size_;
}
bool constexpr empty() const noexcept {
return size_ == 0;
}
void close() noexcept {
{
std::unique_lock<std::mutex> lock{mtx_};
is_closed_.store(true);
}
cnd_.notify_all();
}
bool closed() const noexcept {
return is_closed_.load();
}
private:
std::queue<T> queue_;
std::atomic<std::size_t> size_{0};
std::mutex mtx_;
std::condition_variable cnd_;
std::atomic<bool> is_closed_{false};
};
/**
* store one chromosome and it's stream position
*/