diff --git a/app/br/br.cpp b/app/br/br.cpp index 1dbf14d..e8742eb 100644 --- a/app/br/br.cpp +++ b/app/br/br.cpp @@ -236,7 +236,6 @@ int main(int argc, char *argv[]) // the parallel work can make use of all available CPU threads. FakeMain *fakeMain = new FakeMain(argc, argv); QThreadPool::globalInstance()->start(fakeMain); - QThreadPool::globalInstance()->setMaxThreadCount(QThreadPool::globalInstance()->maxThreadCount()+1); QCoreApplication::exec(); br_finalize(); diff --git a/openbr/core/qtutils.h b/openbr/core/qtutils.h index cd150a1..b774119 100644 --- a/openbr/core/qtutils.h +++ b/openbr/core/qtutils.h @@ -22,9 +22,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -64,6 +66,13 @@ namespace QtUtils bool runRScript(const QString &file); bool runDot(const QString &file); void showFile(const QString &file); + + inline void releaseAndWait(QFutureSynchronizer & futures) + { + QThreadPool::globalInstance()->releaseThread(); + futures.waitForFinished(); + QThreadPool::globalInstance()->reserveThread(); + } } #endif // __QTUTILS_H diff --git a/openbr/openbr_plugin.cpp b/openbr/openbr_plugin.cpp index 0ea9f36..8854257 100644 --- a/openbr/openbr_plugin.cpp +++ b/openbr/openbr_plugin.cpp @@ -1204,7 +1204,7 @@ private: if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &templatesList[i])); else _train (transforms[i], &templatesList[i]); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } void project(const Template &src, Template &dst) const @@ -1332,7 +1332,7 @@ void Transform::project(const TemplateList &src, TemplateList &dst) const QFutureSynchronizer futures; for (int i=0; iparallelism) futures.addFuture(QtConcurrent::run(_backProject, this, &dst[i], &src[i])); else _backProject (this, &dst[i], &src[i]); - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } /* Distance - public methods */ @@ -1393,7 +1393,7 @@ void Distance::compare(const TemplateList &target, const TemplateList &query, Ou if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &Distance::compareBlock, targets, queries, output, targetOffset, queryOffset)); else compareBlock (targets, queries, output, targetOffset, queryOffset); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } QList Distance::compare(const TemplateList &targets, const Template &query) const diff --git a/openbr/plugins/distance.cpp b/openbr/plugins/distance.cpp index c3ef877..ae7bf22 100644 --- a/openbr/plugins/distance.cpp +++ b/openbr/plugins/distance.cpp @@ -160,7 +160,7 @@ class PipeDistance : public Distance foreach (br::Distance *distance, distances) if (Globals->parallelism) futures.addFuture(QtConcurrent::run(distance, &Distance::train, data)); else distance->train(data); - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } float compare(const Template &a, const Template &b) const diff --git a/openbr/plugins/meta.cpp b/openbr/plugins/meta.cpp index 4c7eee1..34f5f0d 100644 --- a/openbr/plugins/meta.cpp +++ b/openbr/plugins/meta.cpp @@ -111,7 +111,7 @@ class PipeTransform : public CompositeTransform for (int j=0; jparallelism) futures.addFuture(QtConcurrent::run(this, &PipeTransform::_projectPartial, ©[j], i, nextTrainableTransform)); else _projectPartial( ©[j], i, nextTrainableTransform); - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); i = nextTrainableTransform; } } @@ -293,7 +293,7 @@ class ForkTransform : public CompositeTransform if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &data)); else _train (transforms[i], &data); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } void backProject(const Template &dst, Template &src) const {Transform::backProject(dst, src);} @@ -648,7 +648,7 @@ public: if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i])); else _projectList( transform, &input_buffer[i], &output_buffer[i]); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); for (int i=0; iparallelism) futures.addFuture(QtConcurrent::run(this, &ProductQuantizationTransform::_train, subdata[i], labels, &subluts[i], ¢ers[i])); else _train (subdata[i], labels, &subluts[i], ¢ers[i]); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } int getIndex(const Mat &m, const Mat ¢er) const diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 00efb9b..ff37e34 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -66,7 +66,7 @@ public: QMap::Iterator result = buffer.begin(); if (next_target != result.value()->sequenceNumber) { - qWarning("mismatched targets!"); + qFatal("mismatched targets!"); } next_target = next_target + 1; @@ -160,7 +160,7 @@ private: class DataSource { public: - DataSource(int maxFrames=Globals->parallelism + 1) + DataSource(int maxFrames=500) { final_frame = -1; last_issued = -2; @@ -399,7 +399,19 @@ protected: }; -class ProcessingStage : public QRunnable +class ProcessingStage; + +class BasicLoop : public QRunnable +{ +public: + void run(); + + QList * stages; + int start_idx; + FrameData * startItem; +}; + +class ProcessingStage { public: friend class StreamTransform; @@ -407,55 +419,68 @@ public: ProcessingStage(int nThreads = 1) { thread_count = nThreads; - setAutoDelete(false); } + virtual ~ProcessingStage() {} + + virtual FrameData* run(FrameData * input, bool & should_continue)=0; - virtual void run()=0; + virtual bool tryAcquireNextStage(FrameData *& input)=0; - virtual void nextStageRun(FrameData * input)=0; + int stage_id; protected: int thread_count; SharedBuffer * inputBuffer; ProcessingStage * nextStage; + QList * stages; Transform * transform; - int stage_id; }; -class MultiThreadStage; - -void multistage_run(MultiThreadStage * basis, FrameData * input); +void BasicLoop::run() +{ + int current_idx = start_idx; + FrameData * target_item = startItem; + bool should_continue = true; + forever + { + target_item = stages->at(current_idx)->run(target_item, should_continue); + if (!should_continue) { + break; + } + current_idx++; + current_idx = current_idx % stages->size(); + } +} class MultiThreadStage : public ProcessingStage { public: MultiThreadStage(int _input) : ProcessingStage(_input) {} - friend void multistage_run(MultiThreadStage * basis, FrameData * input); - void run() + FrameData * run(FrameData * input, bool & should_continue) { - qFatal("no don't do it!"); + if (input == NULL) { + qFatal("null input to multi-thread stage"); + } + // Project the input we got + transform->projectUpdate(input->data); + + should_continue = nextStage->tryAcquireNextStage(input); + + return input; } // Called from a different thread than run - virtual void nextStageRun(FrameData * input) + virtual bool tryAcquireNextStage(FrameData *& input) { - QtConcurrent::run(multistage_run, this, input); + (void) input; + return true; } }; -void multistage_run(MultiThreadStage * basis, FrameData * input) -{ - if (input == NULL) - qFatal("null input to multi-thread stage"); - // Project the input we got - basis->transform->projectUpdate(input->data); - - basis->nextStage->nextStageRun(input); -} class SingleThreadStage : public ProcessingStage { @@ -464,10 +489,12 @@ public: { currentStatus = STOPPING; next_target = 0; - if (input_variance) + if (input_variance) { this->inputBuffer = new DoubleBuffer(); - else + } + else { this->inputBuffer = new SequencingBuffer(); + } } ~SingleThreadStage() { @@ -483,52 +510,75 @@ public: QReadWriteLock statusLock; Status currentStatus; - // We should start, and enter a wait on input data - void run() + FrameData * run(FrameData * input, bool & should_continue) { - FrameData * currentItem; - forever + if (input == NULL) + qFatal("NULL input to stage %d", this->stage_id); + + if (input->sequenceNumber != next_target) { - // Whether or not we get a valid item controls whether or not we - QWriteLocker lock(&statusLock); - currentItem = inputBuffer->tryGetItem(); - if (currentItem == NULL) - { - this->currentStatus = STOPPING; - return; - } - lock.unlock(); - if (currentItem->sequenceNumber != next_target) - { - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); - } - next_target = currentItem->sequenceNumber + 1; + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target); + } + next_target = input->sequenceNumber + 1; + + // Project the input we got + transform->projectUpdate(input->data); + + should_continue = nextStage->tryAcquireNextStage(input); + + // Is there anything on our input buffer? If so we should start a thread with that. + QWriteLocker lock(&statusLock); + FrameData * newItem = inputBuffer->tryGetItem(); + if (!newItem) + { + this->currentStatus = STOPPING; + } + lock.unlock(); - // Project the input we got - transform->projectUpdate(currentItem->data); + if (newItem) + { + BasicLoop * next = new BasicLoop(); + next->stages = stages; + next->start_idx = this->stage_id; + next->startItem = newItem; - this->nextStage->nextStageRun(currentItem); + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); } + + return input; } + // Calledfrom a different thread than run. - void nextStageRun(FrameData * input) + bool tryAcquireNextStage(FrameData *& input) { - // add to our input buffer inputBuffer->addItem(input); + QReadLocker lock(&statusLock); + // Thread is already running, we should just return if (currentStatus == STARTING) - return; - + { + return false; + } // Have to change to a write lock to modify currentStatus lock.unlock(); + QWriteLocker writeLock(&statusLock); - // But someone might have changed it between locks + // But someone else might have started a thread in the meantime if (currentStatus == STARTING) - return; - // Ok we can start a thread - QThreadPool::globalInstance()->start(this); + { + return false; + } + // Ok we might start a thread, as long as we can get something back + // from the input buffer + input = inputBuffer->tryGetItem(); + + if (!input) + return false; + currentStatus = STARTING; + + return true; } }; @@ -540,47 +590,63 @@ public: FirstStage() : SingleThreadStage(true) {} DataSourceManager dataSource; - // Start drawing frames from the datasource. - void run() + + FrameData * run(FrameData * input, bool & should_continue) { - FrameData * currentItem; - forever + if (input != NULL) { + dataSource.returnFrame(input); + } + + // Is there anything on our input buffer? If so we should start a thread with that. + QWriteLocker lock(&statusLock); + input = dataSource.tryGetFrame(); + // Datasource broke? + if (!input) { - // Whether or not we get a valid item controls whether or not we - QWriteLocker lock(&statusLock); + currentStatus = STOPPING; + should_continue = false; + return NULL; + } + lock.unlock(); - currentItem = this->dataSource.tryGetFrame(); - if (currentItem == NULL) - { - this->currentStatus = STOPPING; - return; - } - lock.unlock(); - if (currentItem->sequenceNumber != next_target) - { - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); - } - next_target = currentItem->sequenceNumber + 1; + should_continue = nextStage->tryAcquireNextStage(input); - this->nextStage->nextStageRun(currentItem); - } + BasicLoop * next = new BasicLoop(); + next->stages = stages; + next->start_idx = this->stage_id; + next->startItem = NULL; + + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); + + return input; } - void nextStageRun(FrameData * input) + // Calledfrom a different thread than run. + bool tryAcquireNextStage(FrameData *& input) { - QWriteLocker lock(&statusLock); - - // Return the frame to the frame buffer - bool res = dataSource.returnFrame(input); - // If the data source broke already, we're done. - if (res) - return; + dataSource.returnFrame(input); + input = NULL; + QReadLocker lock(&statusLock); + // Thread is already running, we should just return if (currentStatus == STARTING) - return; + { + return false; + } + // Have to change to a write lock to modify currentStatus + lock.unlock(); + QWriteLocker writeLock(&statusLock); + // But someone else might have started a thread in the meantime + if (currentStatus == STARTING) + { + return false; + } + // Ok we'll start a thread currentStatus = STARTING; - QThreadPool::globalInstance()->start(this, this->next_target); + + // We always start a readstage thread with null input, so nothing to do here + return true; } }; @@ -597,29 +663,42 @@ public: private: TemplateList collectedOutput; public: - void run() + FrameData * run(FrameData * input, bool & should_continue) { - forever + if (input == NULL) { + qFatal("NULL input to stage %d", this->stage_id); + } + + if (input->sequenceNumber != next_target) { - QWriteLocker lock(&statusLock); - FrameData * currentItem = inputBuffer->tryGetItem(); - if (currentItem == NULL) - { - currentStatus = STOPPING; - break; - } - lock.unlock(); + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target); + } + next_target = input->sequenceNumber + 1; - if (currentItem->sequenceNumber != next_target) - { - qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); - } - next_target = currentItem->sequenceNumber + 1; + collectedOutput.append(input->data); - // Just put the item on collectedOutput - collectedOutput.append(currentItem->data); - this->nextStage->nextStageRun(currentItem); + should_continue = nextStage->tryAcquireNextStage(input); + + // Is there anything on our input buffer? If so we should start a thread with that. + QWriteLocker lock(&statusLock); + FrameData * newItem = inputBuffer->tryGetItem(); + if (!newItem) + { + this->currentStatus = STOPPING; } + lock.unlock(); + + if (newItem) + { + BasicLoop * next = new BasicLoop(); + next->stages = stages; + next->start_idx = this->stage_id; + next->startItem = newItem; + + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); + } + + return input; } }; @@ -660,17 +739,22 @@ public: qFatal("Expected single template input to stream"); dst = src; - bool res = readStage.dataSource.open(dst[0]); - if (!res) { - qWarning("failed to stream template %s", qPrintable(dst[0].file.name)); - return; - } + bool res = readStage->dataSource.open(dst[0]); + if (!res) return; QThreadPool::globalInstance()->releaseThread(); - readStage.currentStatus = SingleThreadStage::STARTING; - QThreadPool::globalInstance()->start(&readStage, 0); + readStage->currentStatus = SingleThreadStage::STARTING; + + BasicLoop loop; + loop.stages = &this->processingStages; + loop.start_idx = 0; + loop.startItem = NULL; + loop.setAutoDelete(false); + + QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id); + // Wait for the end. - readStage.dataSource.waitLast(); + readStage->dataSource.waitLast(); QThreadPool::globalInstance()->reserveThread(); // dst is set to all output received by the final stage @@ -693,10 +777,14 @@ public: stage_variance.append(transform->timeVarying()); } - readStage.stage_id = 0; + readStage = new FirstStage(); + + processingStages.push_back(readStage); + readStage->stage_id = 0; + readStage->stages = &this->processingStages; int next_stage_id = 1; - int lastBufferIdx = 0; + bool prev_stage_variance = true; for (int i =0; i < transforms.size(); i++) { @@ -709,24 +797,25 @@ public: processingStages.last()->stage_id = next_stage_id++; - // link nextStage pointers - if (i == 0) - this->readStage.nextStage = processingStages[i]; - else - processingStages[i-1]->nextStage = processingStages[i]; + // link nextStage pointers, the stage we just appeneded is i+1 since + // the read stage was added before this loop + processingStages[i]->nextStage = processingStages[i+1]; - lastBufferIdx++; + processingStages.last()->stages = &this->processingStages; processingStages.last()->transform = transforms[i]; prev_stage_variance = stage_variance[i]; } collectionStage = new LastStage(prev_stage_variance); + processingStages.append(collectionStage); collectionStage->stage_id = next_stage_id; + collectionStage->stages = &this->processingStages; + + processingStages[processingStages.size() - 2]->nextStage = collectionStage; // It's a ring buffer, get it? - processingStages.last()->nextStage = collectionStage; - collectionStage->nextStage = &readStage; + collectionStage->nextStage = readStage; } ~StreamTransform() @@ -734,13 +823,12 @@ public: for (int i = 0; i < processingStages.size(); i++) { delete processingStages[i]; } - delete collectionStage; } protected: QList stage_variance; - FirstStage readStage; + FirstStage * readStage; LastStage * collectionStage; QList processingStages; diff --git a/openbr/plugins/validate.cpp b/openbr/plugins/validate.cpp index a4cc615..4db2a93 100644 --- a/openbr/plugins/validate.cpp +++ b/openbr/plugins/validate.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace br { @@ -44,7 +45,7 @@ class CrossValidateTransform : public MetaTransform if (Globals->parallelism) futures.addFuture(QtConcurrent::run(transforms[i], &Transform::train, partitionedData)); else transforms[i]->train(partitionedData); } - futures.waitForFinished(); + QtUtils::releaseAndWait(futures); } void project(const Template &src, Template &dst) const