Commit 0fc6c503df0e3c8b4de9d4f1e85de2edc87ee1c1

Authored by Charles Otto
1 parent 14026845

Add support for multi-threaded execution of time invariant stages of a stream

Showing 1 changed file with 142 additions and 30 deletions
sdk/plugins/stream.cpp
... ... @@ -2,6 +2,8 @@
2 2 #include <QReadWriteLock>
3 3 #include <QWaitCondition>
4 4 #include <QThreadPool>
  5 +#include <QSemaphore>
  6 +#include <QMap>
5 7  
6 8 #include "core/common.h"
7 9 #include "core/opencvutils.h"
... ... @@ -38,7 +40,7 @@ public:
38 40 virtual void startInput() = 0;
39 41 };
40 42  
41   -// For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex
  43 +// For 1 - n boundaries, a buffer class with a single shared buffer, a mutex
42 44 // is used to serialize all access to the buffer.
43 45 class SingleBuffer : public SharedBuffer
44 46 {
... ... @@ -98,6 +100,85 @@ private:
98 100 QList<FrameData *> buffer;
99 101 };
100 102  
  103 +// for n - 1 boundaries, multiple threads call addItem, the frames are
  104 +// sequenced based on FrameData::sequence_number, and calls to getItem
  105 +// receive them in that order
  106 +class SequencingBuffer : public SharedBuffer
  107 +{
  108 +public:
  109 + SequencingBuffer()
  110 + {
  111 + no_input = false;
  112 + next_target = 0;
  113 + }
  114 +
  115 + void stoppedInput()
  116 + {
  117 + QMutexLocker bufferLock(&bufferGuard);
  118 + no_input = true;
  119 + // Release anything waiting for input items.
  120 + availableInput.wakeAll();
  121 + }
  122 +
  123 + // There will be more input
  124 + void startInput()
  125 + {
  126 + QMutexLocker bufferLock(&bufferGuard);
  127 + no_input = false;
  128 + }
  129 +
  130 + void addItem(FrameData * input)
  131 + {
  132 + QMutexLocker bufferLock(&bufferGuard);
  133 +
  134 + buffer.insert(input->sequenceNumber, input);
  135 +
  136 + if (input->sequenceNumber == next_target) {
  137 + availableInput.wakeOne();
  138 + }
  139 + }
  140 +
  141 + FrameData * getItem()
  142 + {
  143 + QMutexLocker bufferLock(&bufferGuard);
  144 +
  145 + if (buffer.empty() || buffer.begin().key() != this->next_target) {
  146 + if (buffer.empty() && no_input) {
  147 + next_target = 0;
  148 + return NULL;
  149 + }
  150 + availableInput.wait(&bufferGuard);
  151 + }
  152 +
  153 + // availableInput was signalled, but the buffer is empty? We're done here.
  154 + if (buffer.empty()) {
  155 + next_target = 0;
  156 + return NULL;
  157 + }
  158 +
  159 + QMap<int, FrameData *>::Iterator result = buffer.begin();
  160 + //next_target++;
  161 + if (next_target != result.value()->sequenceNumber) {
  162 + qWarning("mismatched targets!");
  163 + }
  164 +
  165 + next_target = next_target + 1;
  166 +
  167 + FrameData * output = result.value();
  168 + buffer.erase(result);
  169 + return output;
  170 + }
  171 +
  172 +private:
  173 + QMutex bufferGuard;
  174 + QWaitCondition availableInput;
  175 + bool no_input;
  176 +
  177 + int next_target;
  178 +
  179 + QMap<int, FrameData *> buffer;
  180 +};
  181 +
101 182 // For 1 - 1 boundaries, a double buffering scheme
102 183 // Producer/consumer read/write from separate buffers, and switch if their
103 184 // buffer runs out/overflows. Synchronization is handled by a read/write lock
... ... @@ -372,7 +453,6 @@ public:
372 453 if (input.empty()) {
373 454 actualSource = new VideoDataSource(0);
374 455 open_res = actualSource->open(input);
375   - qDebug("created video resource status %d", open_res);
376 456 }
377 457 else {
378 458 // create frame dealer
... ... @@ -402,37 +482,32 @@ class ProcessingStage : public QRunnable
402 482 {
403 483 friend class StreamTransform;
404 484 public:
405   - ProcessingStage()
  485 + ProcessingStage(int nThreads = 1)
406 486 {
  487 + thread_count = nThreads;
  488 + activeThreads.release(thread_count);
407 489 setAutoDelete(false);
408 490 }
409 491  
410 492 void markStart()
411 493 {
412   - QMutexLocker lock(&stoppedGuard);
413   - stopped = false;
  494 + activeThreads.acquire();
414 495 }
415 496  
416 497 void waitStop()
417 498 {
418   - stoppedGuard.lock();
419   - while (!stopped)
420   - {
421   - waitStopped.wait(&stoppedGuard);
422   - }
423   - stoppedGuard.unlock();
  499 + // Wait until all threads have stopped
  500 + activeThreads.acquire(thread_count);
  501 + activeThreads.release(thread_count);
424 502 }
425 503  
426 504 protected:
427 505 void markStop()
428 506 {
429   - QMutexLocker lock(&stoppedGuard);
430   - stopped = true;
431   - this->waitStopped.wakeAll();
  507 + activeThreads.release();
432 508 }
433   - QMutex stoppedGuard;
434   - QWaitCondition waitStopped;
435   - bool stopped;
  509 + QSemaphore activeThreads;
  510 + int thread_count;
436 511  
437 512 SharedBuffer * inputBuffer;
438 513 SharedBuffer * outputBuffer;
... ... @@ -443,6 +518,7 @@ public:
443 518 // We should start, and enter a wait on input data
444 519 void run()
445 520 {
  521 + markStart();
446 522 forever
447 523 {
448 524 FrameData * currentItem = inputBuffer->getItem();
... ... @@ -455,9 +531,9 @@ public:
455 531 }
456 532 markStop();
457 533 }
458   -
459 534 };
460 535  
  536 +
461 537 // No input buffer, instead we draw templates from some data source
462 538 // Will be operated by the main thread for the stream
463 539 class FirstStage : public ProcessingStage
... ... @@ -511,6 +587,7 @@ public:
511 587 class StreamTransform : public CompositeTransform
512 588 {
513 589 Q_OBJECT
  590 + int threads_per_multi_stage;
514 591 public:
515 592 void train(const TemplateList & data)
516 593 {
... ... @@ -558,12 +635,11 @@ public:
558 635  
559 636 // Start our processing stages
560 637 for (int i=0; i < this->processingStages.size(); i++) {
561   - processingStages[i]->markStart();
562   - processingThreads.start(processingStages[i]);
  638 + int count = stage_variance[i] ? 1 : threads_per_multi_stage;
  639 + for (int j =0; j < count; j ++) processingThreads.start(processingStages[i]);
563 640 }
564 641  
565 642 // Start the final stage
566   - collectionStage.markStart();
567 643 processingThreads.start(&collectionStage);
568 644  
569 645 // Run the read stage ourselves
... ... @@ -597,18 +673,28 @@ public:
597 673 // Create and link stages
598 674 void init()
599 675 {
600   - // Set up the thread pool, 1 stage for each transform, as well as first
601   - // and last stages, but the first stage is operated by the thread that
602   - // calls project so the pool only needs nTransforms+1 total.
603   - processingThreads.setMaxThreadCount(transforms.size() + 1);
604   -
  676 + int thread_count = 0;
  677 + threads_per_multi_stage = 4;
605 678 stage_variance.reserve(transforms.size());
606 679 foreach (const br::Transform *transform, transforms) {
607 680 stage_variance.append(transform->timeVarying());
  681 + thread_count += transform->timeVarying() ? 1 : threads_per_multi_stage;
608 682 }
609 683  
610   - // buffer 0 -- output buffer for the read stage
611   - sharedBuffers.append(new DoubleBuffer());
  684 + // Set up the thread pool, 1 stage for each transform, as well as first
  685 + // and last stages, but the first stage is operated by the thread that
  686 + // calls project so the pool only needs nTransforms+1 total.
  687 + processingThreads.setMaxThreadCount(thread_count + 1);
  688 +
  689 +
  690 + // buffer 0 -- output buffer for the read stage, input buffer for
  691 + // first transform. Is that transform time-varying?
  692 + if (stage_variance[0])
  693 + sharedBuffers.append(new DoubleBuffer());
  694 + // If not, we can run multiple threads
  695 + else
  696 + sharedBuffers.append(new SingleBuffer());
  697 +
612 698 readStage.outputBuffer = sharedBuffers.last();
613 699 readStage.stage_id = 0;
614 700  
... ... @@ -618,13 +704,39 @@ public:
618 704 for (int i =0; i < transforms.size(); i++)
619 705 {
620 706 // Set up this stage
621   - processingStages.append(new ProcessingStage());
  707 + processingStages.append(new ProcessingStage(stage_variance[i] ? 1 : threads_per_multi_stage));
622 708  
623 709 processingStages.last()->stage_id = next_stage_id++;
624 710 processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx];
625 711 lastBufferIdx++;
626 712  
627   - sharedBuffers.append(new DoubleBuffer());
  713 + // This stage's output buffer, next stage's input buffer. If this is
  714 + // the last transform, the next stage is the (time varying) collection
  715 + // stage
  716 + bool next_variance = (i+1) < transforms.size() ? stage_variance[i+1] : true;
  717 + bool current_variance = stage_variance[i];
  718 + // if this is a single threaded stage
  719 + if (current_variance)
  720 + {
  721 + // 1 - 1 case
  722 + if (next_variance)
  723 + sharedBuffers.append(new DoubleBuffer());
  724 + // 1 - n case
  725 + else
  726 + sharedBuffers.append(new SingleBuffer());
  727 + }
  728 + // This is a multi-threaded stage
  729 + else
  730 + {
  731 + // If the next stage is single threaded, we need to sequence our
  732 + // output (n - 1 case)
  733 + if (next_variance)
  734 + sharedBuffers.append(new SequencingBuffer());
  735 + // Otherwise, this is an n-n boundary and we don't need to
  736 + // adhere to any particular sequence
  737 + else
  738 + sharedBuffers.append(new SingleBuffer());
  739 + }
628 740 processingStages.last()->outputBuffer = sharedBuffers.last();
629 741 processingStages.last()->transform = transforms[i];
630 742 }
... ...