diff --git a/sdk/plugins/stream.cpp b/sdk/plugins/stream.cpp index 9617c38..5ea157a 100644 --- a/sdk/plugins/stream.cpp +++ b/sdk/plugins/stream.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include "core/common.h" #include "core/opencvutils.h" @@ -38,7 +40,7 @@ public: virtual void startInput() = 0; }; -// For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex +// For 1 - n boundaries, a buffer class with a single shared buffer, a mutex // is used to serialize all access to the buffer. class SingleBuffer : public SharedBuffer { @@ -98,6 +100,85 @@ private: QList buffer; }; +// for n - 1 boundaries, multiple threads call addItem, the frames are +// sequenced based on FrameData::sequence_number, and calls to getItem +// receive them in that order +class SequencingBuffer : public SharedBuffer +{ +public: + SequencingBuffer() + { + no_input = false; + next_target = 0; + } + + void stoppedInput() + { + QMutexLocker bufferLock(&bufferGuard); + no_input = true; + // Release anything waiting for input items. + availableInput.wakeAll(); + } + + // There will be more input + void startInput() + { + QMutexLocker bufferLock(&bufferGuard); + no_input = false; + } + + void addItem(FrameData * input) + { + QMutexLocker bufferLock(&bufferGuard); + + buffer.insert(input->sequenceNumber, input); + + if (input->sequenceNumber == next_target) { + availableInput.wakeOne(); + } + } + + FrameData * getItem() + { + QMutexLocker bufferLock(&bufferGuard); + + if (buffer.empty() || buffer.begin().key() != this->next_target) { + if (buffer.empty() && no_input) { + next_target = 0; + return NULL; + } + availableInput.wait(&bufferGuard); + } + + // availableInput was signalled, but the buffer is empty? We're done here. + if (buffer.empty()) { + next_target = 0; + return NULL; + } + + QMap::Iterator result = buffer.begin(); + //next_target++; + if (next_target != result.value()->sequenceNumber) { + qWarning("mismatched targets!"); + } + + next_target = next_target + 1; + + FrameData * output = result.value(); + buffer.erase(result); + return output; + } + +private: + QMutex bufferGuard; + QWaitCondition availableInput; + bool no_input; + + int next_target; + + QMap buffer; +}; + // For 1 - 1 boundaries, a double buffering scheme // Producer/consumer read/write from separate buffers, and switch if their // buffer runs out/overflows. Synchronization is handled by a read/write lock @@ -372,7 +453,6 @@ public: if (input.empty()) { actualSource = new VideoDataSource(0); open_res = actualSource->open(input); - qDebug("created video resource status %d", open_res); } else { // create frame dealer @@ -402,37 +482,32 @@ class ProcessingStage : public QRunnable { friend class StreamTransform; public: - ProcessingStage() + ProcessingStage(int nThreads = 1) { + thread_count = nThreads; + activeThreads.release(thread_count); setAutoDelete(false); } void markStart() { - QMutexLocker lock(&stoppedGuard); - stopped = false; + activeThreads.acquire(); } void waitStop() { - stoppedGuard.lock(); - while (!stopped) - { - waitStopped.wait(&stoppedGuard); - } - stoppedGuard.unlock(); + // Wait until all threads have stopped + activeThreads.acquire(thread_count); + activeThreads.release(thread_count); } protected: void markStop() { - QMutexLocker lock(&stoppedGuard); - stopped = true; - this->waitStopped.wakeAll(); + activeThreads.release(); } - QMutex stoppedGuard; - QWaitCondition waitStopped; - bool stopped; + QSemaphore activeThreads; + int thread_count; SharedBuffer * inputBuffer; SharedBuffer * outputBuffer; @@ -443,6 +518,7 @@ public: // We should start, and enter a wait on input data void run() { + markStart(); forever { FrameData * currentItem = inputBuffer->getItem(); @@ -455,9 +531,9 @@ public: } markStop(); } - }; + // No input buffer, instead we draw templates from some data source // Will be operated by the main thread for the stream class FirstStage : public ProcessingStage @@ -511,6 +587,7 @@ public: class StreamTransform : public CompositeTransform { Q_OBJECT + int threads_per_multi_stage; public: void train(const TemplateList & data) { @@ -558,12 +635,11 @@ public: // Start our processing stages for (int i=0; i < this->processingStages.size(); i++) { - processingStages[i]->markStart(); - processingThreads.start(processingStages[i]); + int count = stage_variance[i] ? 1 : threads_per_multi_stage; + for (int j =0; j < count; j ++) processingThreads.start(processingStages[i]); } // Start the final stage - collectionStage.markStart(); processingThreads.start(&collectionStage); // Run the read stage ourselves @@ -597,18 +673,28 @@ public: // Create and link stages void init() { - // Set up the thread pool, 1 stage for each transform, as well as first - // and last stages, but the first stage is operated by the thread that - // calls project so the pool only needs nTransforms+1 total. - processingThreads.setMaxThreadCount(transforms.size() + 1); - + int thread_count = 0; + threads_per_multi_stage = 4; stage_variance.reserve(transforms.size()); foreach (const br::Transform *transform, transforms) { stage_variance.append(transform->timeVarying()); + thread_count += transform->timeVarying() ? 1 : threads_per_multi_stage; } - // buffer 0 -- output buffer for the read stage - sharedBuffers.append(new DoubleBuffer()); + // Set up the thread pool, 1 stage for each transform, as well as first + // and last stages, but the first stage is operated by the thread that + // calls project so the pool only needs nTransforms+1 total. + processingThreads.setMaxThreadCount(thread_count + 1); + + + // buffer 0 -- output buffer for the read stage, input buffer for + // first transform. Is that transform time-varying? + if (stage_variance[0]) + sharedBuffers.append(new DoubleBuffer()); + // If not, we can run multiple threads + else + sharedBuffers.append(new SingleBuffer()); + readStage.outputBuffer = sharedBuffers.last(); readStage.stage_id = 0; @@ -618,13 +704,39 @@ public: for (int i =0; i < transforms.size(); i++) { // Set up this stage - processingStages.append(new ProcessingStage()); + processingStages.append(new ProcessingStage(stage_variance[i] ? 1 : threads_per_multi_stage)); processingStages.last()->stage_id = next_stage_id++; processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; lastBufferIdx++; - sharedBuffers.append(new DoubleBuffer()); + // This stage's output buffer, next stage's input buffer. If this is + // the last transform, the next stage is the (time varying) collection + // stage + bool next_variance = (i+1) < transforms.size() ? stage_variance[i+1] : true; + bool current_variance = stage_variance[i]; + // if this is a single threaded stage + if (current_variance) + { + // 1 - 1 case + if (next_variance) + sharedBuffers.append(new DoubleBuffer()); + // 1 - n case + else + sharedBuffers.append(new SingleBuffer()); + } + // This is a multi-threaded stage + else + { + // If the next stage is single threaded, we need to sequence our + // output (n - 1 case) + if (next_variance) + sharedBuffers.append(new SequencingBuffer()); + // Otherwise, this is an n-n boundary and we don't need to + // adhere to any particular sequence + else + sharedBuffers.append(new SingleBuffer()); + } processingStages.last()->outputBuffer = sharedBuffers.last(); processingStages.last()->transform = transforms[i]; }