Commit 01576467bcd448f598956d950d0d1647423118bd

Authored by Josh Klontz
2 parents 4f43a6d3 5d77f0f1

Merge branch 'master' of https://github.com/biometrics/openbr

app/br/br.cpp
@@ -236,7 +236,6 @@ int main(int argc, char *argv[]) @@ -236,7 +236,6 @@ int main(int argc, char *argv[])
236 // the parallel work can make use of all available CPU threads. 236 // the parallel work can make use of all available CPU threads.
237 FakeMain *fakeMain = new FakeMain(argc, argv); 237 FakeMain *fakeMain = new FakeMain(argc, argv);
238 QThreadPool::globalInstance()->start(fakeMain); 238 QThreadPool::globalInstance()->start(fakeMain);
239 - QThreadPool::globalInstance()->setMaxThreadCount(QThreadPool::globalInstance()->maxThreadCount()+1);  
240 QCoreApplication::exec(); 239 QCoreApplication::exec();
241 240
242 br_finalize(); 241 br_finalize();
openbr/core/qtutils.h
@@ -22,9 +22,11 @@ @@ -22,9 +22,11 @@
22 #include <QFile> 22 #include <QFile>
23 #include <QFileInfo> 23 #include <QFileInfo>
24 #include <QFuture> 24 #include <QFuture>
  25 +#include <QFutureSynchronizer>
25 #include <QMap> 26 #include <QMap>
26 #include <QString> 27 #include <QString>
27 #include <QStringList> 28 #include <QStringList>
  29 +#include <QThreadPool>
28 #include <string> 30 #include <string>
29 #include <vector> 31 #include <vector>
30 32
@@ -64,6 +66,13 @@ namespace QtUtils @@ -64,6 +66,13 @@ namespace QtUtils
64 bool runRScript(const QString &file); 66 bool runRScript(const QString &file);
65 bool runDot(const QString &file); 67 bool runDot(const QString &file);
66 void showFile(const QString &file); 68 void showFile(const QString &file);
  69 +
  70 + inline void releaseAndWait(QFutureSynchronizer<void> & futures)
  71 + {
  72 + QThreadPool::globalInstance()->releaseThread();
  73 + futures.waitForFinished();
  74 + QThreadPool::globalInstance()->reserveThread();
  75 + }
67 } 76 }
68 77
69 #endif // __QTUTILS_H 78 #endif // __QTUTILS_H
openbr/openbr_plugin.cpp
@@ -1204,7 +1204,7 @@ private: @@ -1204,7 +1204,7 @@ private:
1204 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &templatesList[i])); 1204 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &templatesList[i]));
1205 else _train (transforms[i], &templatesList[i]); 1205 else _train (transforms[i], &templatesList[i]);
1206 } 1206 }
1207 - futures.waitForFinished(); 1207 + QtUtils::releaseAndWait(futures);
1208 } 1208 }
1209 1209
1210 void project(const Template &src, Template &dst) const 1210 void project(const Template &src, Template &dst) const
@@ -1332,7 +1332,7 @@ void Transform::project(const TemplateList &amp;src, TemplateList &amp;dst) const @@ -1332,7 +1332,7 @@ void Transform::project(const TemplateList &amp;src, TemplateList &amp;dst) const
1332 QFutureSynchronizer<void> futures; 1332 QFutureSynchronizer<void> futures;
1333 for (int i=0; i<dst.size(); i++) 1333 for (int i=0; i<dst.size(); i++)
1334 futures.addFuture(QtConcurrent::run(_project, this, &src[i], &dst[i])); 1334 futures.addFuture(QtConcurrent::run(_project, this, &src[i], &dst[i]));
1335 - futures.waitForFinished(); 1335 + QtUtils::releaseAndWait(futures);
1336 } 1336 }
1337 } 1337 }
1338 1338
@@ -1356,7 +1356,7 @@ void Transform::backProject(const TemplateList &amp;dst, TemplateList &amp;src) const @@ -1356,7 +1356,7 @@ void Transform::backProject(const TemplateList &amp;dst, TemplateList &amp;src) const
1356 for (int i=0; i<dst.size(); i++) 1356 for (int i=0; i<dst.size(); i++)
1357 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_backProject, this, &dst[i], &src[i])); 1357 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_backProject, this, &dst[i], &src[i]));
1358 else _backProject (this, &dst[i], &src[i]); 1358 else _backProject (this, &dst[i], &src[i]);
1359 - futures.waitForFinished(); 1359 + QtUtils::releaseAndWait(futures);
1360 } 1360 }
1361 1361
1362 /* Distance - public methods */ 1362 /* Distance - public methods */
@@ -1393,7 +1393,7 @@ void Distance::compare(const TemplateList &amp;target, const TemplateList &amp;query, Ou @@ -1393,7 +1393,7 @@ void Distance::compare(const TemplateList &amp;target, const TemplateList &amp;query, Ou
1393 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &Distance::compareBlock, targets, queries, output, targetOffset, queryOffset)); 1393 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &Distance::compareBlock, targets, queries, output, targetOffset, queryOffset));
1394 else compareBlock (targets, queries, output, targetOffset, queryOffset); 1394 else compareBlock (targets, queries, output, targetOffset, queryOffset);
1395 } 1395 }
1396 - futures.waitForFinished(); 1396 + QtUtils::releaseAndWait(futures);
1397 } 1397 }
1398 1398
1399 QList<float> Distance::compare(const TemplateList &targets, const Template &query) const 1399 QList<float> Distance::compare(const TemplateList &targets, const Template &query) const
openbr/plugins/distance.cpp
@@ -160,7 +160,7 @@ class PipeDistance : public Distance @@ -160,7 +160,7 @@ class PipeDistance : public Distance
160 foreach (br::Distance *distance, distances) 160 foreach (br::Distance *distance, distances)
161 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(distance, &Distance::train, data)); 161 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(distance, &Distance::train, data));
162 else distance->train(data); 162 else distance->train(data);
163 - futures.waitForFinished(); 163 + QtUtils::releaseAndWait(futures);
164 } 164 }
165 165
166 float compare(const Template &a, const Template &b) const 166 float compare(const Template &a, const Template &b) const
openbr/plugins/meta.cpp
@@ -111,7 +111,7 @@ class PipeTransform : public CompositeTransform @@ -111,7 +111,7 @@ class PipeTransform : public CompositeTransform
111 for (int j=0; j<copy.size(); j++) 111 for (int j=0; j<copy.size(); j++)
112 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &PipeTransform::_projectPartial, &copy[j], i, nextTrainableTransform)); 112 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &PipeTransform::_projectPartial, &copy[j], i, nextTrainableTransform));
113 else _projectPartial( &copy[j], i, nextTrainableTransform); 113 else _projectPartial( &copy[j], i, nextTrainableTransform);
114 - futures.waitForFinished(); 114 + QtUtils::releaseAndWait(futures);
115 i = nextTrainableTransform; 115 i = nextTrainableTransform;
116 } 116 }
117 } 117 }
@@ -293,7 +293,7 @@ class ForkTransform : public CompositeTransform @@ -293,7 +293,7 @@ class ForkTransform : public CompositeTransform
293 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &data)); 293 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_train, transforms[i], &data));
294 else _train (transforms[i], &data); 294 else _train (transforms[i], &data);
295 } 295 }
296 - futures.waitForFinished(); 296 + QtUtils::releaseAndWait(futures);
297 } 297 }
298 298
299 void backProject(const Template &dst, Template &src) const {Transform::backProject(dst, src);} 299 void backProject(const Template &dst, Template &src) const {Transform::backProject(dst, src);}
@@ -648,7 +648,7 @@ public: @@ -648,7 +648,7 @@ public:
648 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i])); 648 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i]));
649 else _projectList( transform, &input_buffer[i], &output_buffer[i]); 649 else _projectList( transform, &input_buffer[i], &output_buffer[i]);
650 } 650 }
651 - futures.waitForFinished(); 651 + QtUtils::releaseAndWait(futures);
652 652
653 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]); 653 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]);
654 } 654 }
openbr/plugins/normalize.cpp
@@ -130,7 +130,7 @@ private: @@ -130,7 +130,7 @@ private:
130 av[c] = av[c].reshape(1, data.first().m().rows); 130 av[c] = av[c].reshape(1, data.first().m().rows);
131 bv[c] = bv[c].reshape(1, data.first().m().rows); 131 bv[c] = bv[c].reshape(1, data.first().m().rows);
132 } 132 }
133 - futures.waitForFinished(); 133 + QtUtils::releaseAndWait(futures);
134 134
135 merge(av, a); 135 merge(av, a);
136 merge(bv, b); 136 merge(bv, b);
openbr/plugins/quantize.cpp
@@ -20,6 +20,7 @@ @@ -20,6 +20,7 @@
20 20
21 #include "openbr/core/common.h" 21 #include "openbr/core/common.h"
22 #include "openbr/core/opencvutils.h" 22 #include "openbr/core/opencvutils.h"
  23 +#include "openbr/core/qtutils.h"
23 24
24 using namespace cv; 25 using namespace cv;
25 26
@@ -328,7 +329,7 @@ private: @@ -328,7 +329,7 @@ private:
328 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &ProductQuantizationTransform::_train, subdata[i], labels, &subluts[i], &centers[i])); 329 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(this, &ProductQuantizationTransform::_train, subdata[i], labels, &subluts[i], &centers[i]));
329 else _train (subdata[i], labels, &subluts[i], &centers[i]); 330 else _train (subdata[i], labels, &subluts[i], &centers[i]);
330 } 331 }
331 - futures.waitForFinished(); 332 + QtUtils::releaseAndWait(futures);
332 } 333 }
333 334
334 int getIndex(const Mat &m, const Mat &center) const 335 int getIndex(const Mat &m, const Mat &center) const
openbr/plugins/stream.cpp
@@ -66,7 +66,7 @@ public: @@ -66,7 +66,7 @@ public:
66 QMap<int, FrameData *>::Iterator result = buffer.begin(); 66 QMap<int, FrameData *>::Iterator result = buffer.begin();
67 67
68 if (next_target != result.value()->sequenceNumber) { 68 if (next_target != result.value()->sequenceNumber) {
69 - qWarning("mismatched targets!"); 69 + qFatal("mismatched targets!");
70 } 70 }
71 71
72 next_target = next_target + 1; 72 next_target = next_target + 1;
@@ -160,7 +160,7 @@ private: @@ -160,7 +160,7 @@ private:
160 class DataSource 160 class DataSource
161 { 161 {
162 public: 162 public:
163 - DataSource(int maxFrames=Globals->parallelism + 1) 163 + DataSource(int maxFrames=500)
164 { 164 {
165 final_frame = -1; 165 final_frame = -1;
166 last_issued = -2; 166 last_issued = -2;
@@ -399,7 +399,19 @@ protected: @@ -399,7 +399,19 @@ protected:
399 399
400 }; 400 };
401 401
402 -class ProcessingStage : public QRunnable 402 +class ProcessingStage;
  403 +
  404 +class BasicLoop : public QRunnable
  405 +{
  406 +public:
  407 + void run();
  408 +
  409 + QList<ProcessingStage *> * stages;
  410 + int start_idx;
  411 + FrameData * startItem;
  412 +};
  413 +
  414 +class ProcessingStage
403 { 415 {
404 public: 416 public:
405 friend class StreamTransform; 417 friend class StreamTransform;
@@ -407,55 +419,68 @@ public: @@ -407,55 +419,68 @@ public:
407 ProcessingStage(int nThreads = 1) 419 ProcessingStage(int nThreads = 1)
408 { 420 {
409 thread_count = nThreads; 421 thread_count = nThreads;
410 - setAutoDelete(false);  
411 } 422 }
  423 + virtual ~ProcessingStage() {}
  424 +
  425 + virtual FrameData* run(FrameData * input, bool & should_continue)=0;
412 426
413 - virtual void run()=0; 427 + virtual bool tryAcquireNextStage(FrameData *& input)=0;
414 428
415 - virtual void nextStageRun(FrameData * input)=0; 429 + int stage_id;
416 430
417 protected: 431 protected:
418 int thread_count; 432 int thread_count;
419 433
420 SharedBuffer * inputBuffer; 434 SharedBuffer * inputBuffer;
421 ProcessingStage * nextStage; 435 ProcessingStage * nextStage;
  436 + QList<ProcessingStage *> * stages;
422 Transform * transform; 437 Transform * transform;
423 - int stage_id;  
424 438
425 }; 439 };
426 440
427 -class MultiThreadStage;  
428 -  
429 -void multistage_run(MultiThreadStage * basis, FrameData * input); 441 +void BasicLoop::run()
  442 +{
  443 + int current_idx = start_idx;
  444 + FrameData * target_item = startItem;
  445 + bool should_continue = true;
  446 + forever
  447 + {
  448 + target_item = stages->at(current_idx)->run(target_item, should_continue);
  449 + if (!should_continue) {
  450 + break;
  451 + }
  452 + current_idx++;
  453 + current_idx = current_idx % stages->size();
  454 + }
  455 +}
430 456
431 class MultiThreadStage : public ProcessingStage 457 class MultiThreadStage : public ProcessingStage
432 { 458 {
433 public: 459 public:
434 MultiThreadStage(int _input) : ProcessingStage(_input) {} 460 MultiThreadStage(int _input) : ProcessingStage(_input) {}
435 461
436 - friend void multistage_run(MultiThreadStage * basis, FrameData * input);  
437 462
438 - void run() 463 + FrameData * run(FrameData * input, bool & should_continue)
439 { 464 {
440 - qFatal("no don't do it!"); 465 + if (input == NULL) {
  466 + qFatal("null input to multi-thread stage");
  467 + }
  468 + // Project the input we got
  469 + transform->projectUpdate(input->data);
  470 +
  471 + should_continue = nextStage->tryAcquireNextStage(input);
  472 +
  473 + return input;
441 } 474 }
442 475
443 // Called from a different thread than run 476 // Called from a different thread than run
444 - virtual void nextStageRun(FrameData * input) 477 + virtual bool tryAcquireNextStage(FrameData *& input)
445 { 478 {
446 - QtConcurrent::run(multistage_run, this, input); 479 + (void) input;
  480 + return true;
447 } 481 }
448 }; 482 };
449 483
450 -void multistage_run(MultiThreadStage * basis, FrameData * input)  
451 -{  
452 - if (input == NULL)  
453 - qFatal("null input to multi-thread stage");  
454 - // Project the input we got  
455 - basis->transform->projectUpdate(input->data);  
456 -  
457 - basis->nextStage->nextStageRun(input);  
458 -}  
459 484
460 class SingleThreadStage : public ProcessingStage 485 class SingleThreadStage : public ProcessingStage
461 { 486 {
@@ -464,10 +489,12 @@ public: @@ -464,10 +489,12 @@ public:
464 { 489 {
465 currentStatus = STOPPING; 490 currentStatus = STOPPING;
466 next_target = 0; 491 next_target = 0;
467 - if (input_variance) 492 + if (input_variance) {
468 this->inputBuffer = new DoubleBuffer(); 493 this->inputBuffer = new DoubleBuffer();
469 - else 494 + }
  495 + else {
470 this->inputBuffer = new SequencingBuffer(); 496 this->inputBuffer = new SequencingBuffer();
  497 + }
471 } 498 }
472 ~SingleThreadStage() 499 ~SingleThreadStage()
473 { 500 {
@@ -483,52 +510,75 @@ public: @@ -483,52 +510,75 @@ public:
483 QReadWriteLock statusLock; 510 QReadWriteLock statusLock;
484 Status currentStatus; 511 Status currentStatus;
485 512
486 - // We should start, and enter a wait on input data  
487 - void run() 513 + FrameData * run(FrameData * input, bool & should_continue)
488 { 514 {
489 - FrameData * currentItem;  
490 - forever 515 + if (input == NULL)
  516 + qFatal("NULL input to stage %d", this->stage_id);
  517 +
  518 + if (input->sequenceNumber != next_target)
491 { 519 {
492 - // Whether or not we get a valid item controls whether or not we  
493 - QWriteLocker lock(&statusLock);  
494 - currentItem = inputBuffer->tryGetItem();  
495 - if (currentItem == NULL)  
496 - {  
497 - this->currentStatus = STOPPING;  
498 - return;  
499 - }  
500 - lock.unlock();  
501 - if (currentItem->sequenceNumber != next_target)  
502 - {  
503 - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
504 - }  
505 - next_target = currentItem->sequenceNumber + 1; 520 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target);
  521 + }
  522 + next_target = input->sequenceNumber + 1;
  523 +
  524 + // Project the input we got
  525 + transform->projectUpdate(input->data);
  526 +
  527 + should_continue = nextStage->tryAcquireNextStage(input);
  528 +
  529 + // Is there anything on our input buffer? If so we should start a thread with that.
  530 + QWriteLocker lock(&statusLock);
  531 + FrameData * newItem = inputBuffer->tryGetItem();
  532 + if (!newItem)
  533 + {
  534 + this->currentStatus = STOPPING;
  535 + }
  536 + lock.unlock();
506 537
507 - // Project the input we got  
508 - transform->projectUpdate(currentItem->data); 538 + if (newItem)
  539 + {
  540 + BasicLoop * next = new BasicLoop();
  541 + next->stages = stages;
  542 + next->start_idx = this->stage_id;
  543 + next->startItem = newItem;
509 544
510 - this->nextStage->nextStageRun(currentItem); 545 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
511 } 546 }
  547 +
  548 + return input;
512 } 549 }
513 550
  551 +
514 // Calledfrom a different thread than run. 552 // Calledfrom a different thread than run.
515 - void nextStageRun(FrameData * input) 553 + bool tryAcquireNextStage(FrameData *& input)
516 { 554 {
517 - // add to our input buffer  
518 inputBuffer->addItem(input); 555 inputBuffer->addItem(input);
  556 +
519 QReadLocker lock(&statusLock); 557 QReadLocker lock(&statusLock);
  558 + // Thread is already running, we should just return
520 if (currentStatus == STARTING) 559 if (currentStatus == STARTING)
521 - return;  
522 - 560 + {
  561 + return false;
  562 + }
523 // Have to change to a write lock to modify currentStatus 563 // Have to change to a write lock to modify currentStatus
524 lock.unlock(); 564 lock.unlock();
  565 +
525 QWriteLocker writeLock(&statusLock); 566 QWriteLocker writeLock(&statusLock);
526 - // But someone might have changed it between locks 567 + // But someone else might have started a thread in the meantime
527 if (currentStatus == STARTING) 568 if (currentStatus == STARTING)
528 - return;  
529 - // Ok we can start a thread  
530 - QThreadPool::globalInstance()->start(this); 569 + {
  570 + return false;
  571 + }
  572 + // Ok we might start a thread, as long as we can get something back
  573 + // from the input buffer
  574 + input = inputBuffer->tryGetItem();
  575 +
  576 + if (!input)
  577 + return false;
  578 +
531 currentStatus = STARTING; 579 currentStatus = STARTING;
  580 +
  581 + return true;
532 } 582 }
533 }; 583 };
534 584
@@ -540,47 +590,63 @@ public: @@ -540,47 +590,63 @@ public:
540 FirstStage() : SingleThreadStage(true) {} 590 FirstStage() : SingleThreadStage(true) {}
541 591
542 DataSourceManager dataSource; 592 DataSourceManager dataSource;
543 - // Start drawing frames from the datasource.  
544 - void run() 593 +
  594 + FrameData * run(FrameData * input, bool & should_continue)
545 { 595 {
546 - FrameData * currentItem;  
547 - forever 596 + if (input != NULL) {
  597 + dataSource.returnFrame(input);
  598 + }
  599 +
  600 + // Is there anything on our input buffer? If so we should start a thread with that.
  601 + QWriteLocker lock(&statusLock);
  602 + input = dataSource.tryGetFrame();
  603 + // Datasource broke?
  604 + if (!input)
548 { 605 {
549 - // Whether or not we get a valid item controls whether or not we  
550 - QWriteLocker lock(&statusLock); 606 + currentStatus = STOPPING;
  607 + should_continue = false;
  608 + return NULL;
  609 + }
  610 + lock.unlock();
551 611
552 - currentItem = this->dataSource.tryGetFrame();  
553 - if (currentItem == NULL)  
554 - {  
555 - this->currentStatus = STOPPING;  
556 - return;  
557 - }  
558 - lock.unlock();  
559 - if (currentItem->sequenceNumber != next_target)  
560 - {  
561 - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
562 - }  
563 - next_target = currentItem->sequenceNumber + 1; 612 + should_continue = nextStage->tryAcquireNextStage(input);
564 613
565 - this->nextStage->nextStageRun(currentItem);  
566 - } 614 + BasicLoop * next = new BasicLoop();
  615 + next->stages = stages;
  616 + next->start_idx = this->stage_id;
  617 + next->startItem = NULL;
  618 +
  619 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
  620 +
  621 + return input;
567 } 622 }
568 623
569 - void nextStageRun(FrameData * input) 624 + // Calledfrom a different thread than run.
  625 + bool tryAcquireNextStage(FrameData *& input)
570 { 626 {
571 - QWriteLocker lock(&statusLock);  
572 -  
573 - // Return the frame to the frame buffer  
574 - bool res = dataSource.returnFrame(input);  
575 - // If the data source broke already, we're done.  
576 - if (res)  
577 - return; 627 + dataSource.returnFrame(input);
  628 + input = NULL;
578 629
  630 + QReadLocker lock(&statusLock);
  631 + // Thread is already running, we should just return
579 if (currentStatus == STARTING) 632 if (currentStatus == STARTING)
580 - return; 633 + {
  634 + return false;
  635 + }
  636 + // Have to change to a write lock to modify currentStatus
  637 + lock.unlock();
581 638
  639 + QWriteLocker writeLock(&statusLock);
  640 + // But someone else might have started a thread in the meantime
  641 + if (currentStatus == STARTING)
  642 + {
  643 + return false;
  644 + }
  645 + // Ok we'll start a thread
582 currentStatus = STARTING; 646 currentStatus = STARTING;
583 - QThreadPool::globalInstance()->start(this, this->next_target); 647 +
  648 + // We always start a readstage thread with null input, so nothing to do here
  649 + return true;
584 } 650 }
585 651
586 }; 652 };
@@ -597,29 +663,42 @@ public: @@ -597,29 +663,42 @@ public:
597 private: 663 private:
598 TemplateList collectedOutput; 664 TemplateList collectedOutput;
599 public: 665 public:
600 - void run() 666 + FrameData * run(FrameData * input, bool & should_continue)
601 { 667 {
602 - forever 668 + if (input == NULL) {
  669 + qFatal("NULL input to stage %d", this->stage_id);
  670 + }
  671 +
  672 + if (input->sequenceNumber != next_target)
603 { 673 {
604 - QWriteLocker lock(&statusLock);  
605 - FrameData * currentItem = inputBuffer->tryGetItem();  
606 - if (currentItem == NULL)  
607 - {  
608 - currentStatus = STOPPING;  
609 - break;  
610 - }  
611 - lock.unlock(); 674 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target);
  675 + }
  676 + next_target = input->sequenceNumber + 1;
612 677
613 - if (currentItem->sequenceNumber != next_target)  
614 - {  
615 - qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
616 - }  
617 - next_target = currentItem->sequenceNumber + 1; 678 + collectedOutput.append(input->data);
618 679
619 - // Just put the item on collectedOutput  
620 - collectedOutput.append(currentItem->data);  
621 - this->nextStage->nextStageRun(currentItem); 680 + should_continue = nextStage->tryAcquireNextStage(input);
  681 +
  682 + // Is there anything on our input buffer? If so we should start a thread with that.
  683 + QWriteLocker lock(&statusLock);
  684 + FrameData * newItem = inputBuffer->tryGetItem();
  685 + if (!newItem)
  686 + {
  687 + this->currentStatus = STOPPING;
622 } 688 }
  689 + lock.unlock();
  690 +
  691 + if (newItem)
  692 + {
  693 + BasicLoop * next = new BasicLoop();
  694 + next->stages = stages;
  695 + next->start_idx = this->stage_id;
  696 + next->startItem = newItem;
  697 +
  698 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
  699 + }
  700 +
  701 + return input;
623 } 702 }
624 }; 703 };
625 704
@@ -660,17 +739,22 @@ public: @@ -660,17 +739,22 @@ public:
660 qFatal("Expected single template input to stream"); 739 qFatal("Expected single template input to stream");
661 740
662 dst = src; 741 dst = src;
663 - bool res = readStage.dataSource.open(dst[0]);  
664 - if (!res) {  
665 - qWarning("failed to stream template %s", qPrintable(dst[0].file.name));  
666 - return;  
667 - } 742 + bool res = readStage->dataSource.open(dst[0]);
  743 + if (!res) return;
668 744
669 QThreadPool::globalInstance()->releaseThread(); 745 QThreadPool::globalInstance()->releaseThread();
670 - readStage.currentStatus = SingleThreadStage::STARTING;  
671 - QThreadPool::globalInstance()->start(&readStage, 0); 746 + readStage->currentStatus = SingleThreadStage::STARTING;
  747 +
  748 + BasicLoop loop;
  749 + loop.stages = &this->processingStages;
  750 + loop.start_idx = 0;
  751 + loop.startItem = NULL;
  752 + loop.setAutoDelete(false);
  753 +
  754 + QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id);
  755 +
672 // Wait for the end. 756 // Wait for the end.
673 - readStage.dataSource.waitLast(); 757 + readStage->dataSource.waitLast();
674 QThreadPool::globalInstance()->reserveThread(); 758 QThreadPool::globalInstance()->reserveThread();
675 759
676 // dst is set to all output received by the final stage 760 // dst is set to all output received by the final stage
@@ -693,10 +777,14 @@ public: @@ -693,10 +777,14 @@ public:
693 stage_variance.append(transform->timeVarying()); 777 stage_variance.append(transform->timeVarying());
694 } 778 }
695 779
696 - readStage.stage_id = 0; 780 + readStage = new FirstStage();
  781 +
  782 + processingStages.push_back(readStage);
  783 + readStage->stage_id = 0;
  784 + readStage->stages = &this->processingStages;
697 785
698 int next_stage_id = 1; 786 int next_stage_id = 1;
699 - int lastBufferIdx = 0; 787 +
700 bool prev_stage_variance = true; 788 bool prev_stage_variance = true;
701 for (int i =0; i < transforms.size(); i++) 789 for (int i =0; i < transforms.size(); i++)
702 { 790 {
@@ -709,24 +797,25 @@ public: @@ -709,24 +797,25 @@ public:
709 797
710 processingStages.last()->stage_id = next_stage_id++; 798 processingStages.last()->stage_id = next_stage_id++;
711 799
712 - // link nextStage pointers  
713 - if (i == 0)  
714 - this->readStage.nextStage = processingStages[i];  
715 - else  
716 - processingStages[i-1]->nextStage = processingStages[i]; 800 + // link nextStage pointers, the stage we just appeneded is i+1 since
  801 + // the read stage was added before this loop
  802 + processingStages[i]->nextStage = processingStages[i+1];
717 803
718 - lastBufferIdx++; 804 + processingStages.last()->stages = &this->processingStages;
719 805
720 processingStages.last()->transform = transforms[i]; 806 processingStages.last()->transform = transforms[i];
721 prev_stage_variance = stage_variance[i]; 807 prev_stage_variance = stage_variance[i];
722 } 808 }
723 809
724 collectionStage = new LastStage(prev_stage_variance); 810 collectionStage = new LastStage(prev_stage_variance);
  811 + processingStages.append(collectionStage);
725 collectionStage->stage_id = next_stage_id; 812 collectionStage->stage_id = next_stage_id;
  813 + collectionStage->stages = &this->processingStages;
  814 +
  815 + processingStages[processingStages.size() - 2]->nextStage = collectionStage;
726 816
727 // It's a ring buffer, get it? 817 // It's a ring buffer, get it?
728 - processingStages.last()->nextStage = collectionStage;  
729 - collectionStage->nextStage = &readStage; 818 + collectionStage->nextStage = readStage;
730 } 819 }
731 820
732 ~StreamTransform() 821 ~StreamTransform()
@@ -734,13 +823,12 @@ public: @@ -734,13 +823,12 @@ public:
734 for (int i = 0; i < processingStages.size(); i++) { 823 for (int i = 0; i < processingStages.size(); i++) {
735 delete processingStages[i]; 824 delete processingStages[i];
736 } 825 }
737 - delete collectionStage;  
738 } 826 }
739 827
740 protected: 828 protected:
741 QList<bool> stage_variance; 829 QList<bool> stage_variance;
742 830
743 - FirstStage readStage; 831 + FirstStage * readStage;
744 LastStage * collectionStage; 832 LastStage * collectionStage;
745 833
746 QList<ProcessingStage *> processingStages; 834 QList<ProcessingStage *> processingStages;
openbr/plugins/validate.cpp
1 #include <QFutureSynchronizer> 1 #include <QFutureSynchronizer>
2 #include <QtConcurrentRun> 2 #include <QtConcurrentRun>
3 #include <openbr/openbr_plugin.h> 3 #include <openbr/openbr_plugin.h>
  4 +#include <openbr/core/qtutils.h>
4 5
5 namespace br 6 namespace br
6 { 7 {
@@ -44,7 +45,7 @@ class CrossValidateTransform : public MetaTransform @@ -44,7 +45,7 @@ class CrossValidateTransform : public MetaTransform
44 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(transforms[i], &Transform::train, partitionedData)); 45 if (Globals->parallelism) futures.addFuture(QtConcurrent::run(transforms[i], &Transform::train, partitionedData));
45 else transforms[i]->train(partitionedData); 46 else transforms[i]->train(partitionedData);
46 } 47 }
47 - futures.waitForFinished(); 48 + QtUtils::releaseAndWait(futures);
48 } 49 }
49 50
50 void project(const Template &src, Template &dst) const 51 void project(const Template &src, Template &dst) const