diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index fd0cc92..17c5a27 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include "openbr/core/common.h" @@ -33,70 +33,7 @@ public: virtual void addItem(FrameData * input)=0; - virtual FrameData * getItem()=0; - - virtual void stoppedInput() =0; - virtual void startInput() = 0; -}; - -// For 1 - n boundaries, a buffer class with a single shared buffer, a mutex -// is used to serialize all access to the buffer. -class SingleBuffer : public SharedBuffer -{ -public: - SingleBuffer() { no_input = false; } - - void stoppedInput() - { - QMutexLocker bufferLock(&bufferGuard); - no_input = true; - // Release anything waiting for input items. - availableInput.wakeAll(); - } - - // There will be more input - void startInput() - { - QMutexLocker bufferLock(&bufferGuard); - no_input = false; - } - - void addItem(FrameData * input) - { - QMutexLocker bufferLock(&bufferGuard); - - buffer.append(input); - - availableInput.wakeOne(); - } - - FrameData * getItem() - { - QMutexLocker bufferLock(&bufferGuard); - - if (buffer.empty()) { - // If no further items will come we are done here - if (no_input) - return NULL; - // Wait for an item - availableInput.wait(&bufferGuard); - } - - // availableInput was signalled, but the buffer is still empty? We're done here. - if (buffer.empty()) - return NULL; - - FrameData * output = buffer.first(); - buffer.removeFirst(); - return output; - } - -private: - QMutex bufferGuard; - QWaitCondition availableInput; - bool no_input; - - QList buffer; + virtual FrameData * tryGetItem()=0; }; // for n - 1 boundaries, multiple threads call addItem, the frames are @@ -107,56 +44,26 @@ class SequencingBuffer : public SharedBuffer public: SequencingBuffer() { - no_input = false; next_target = 0; } - void stoppedInput() - { - QMutexLocker bufferLock(&bufferGuard); - no_input = true; - // Release anything waiting for input items. - availableInput.wakeAll(); - } - - // There will be more input - void startInput() - { - QMutexLocker bufferLock(&bufferGuard); - no_input = false; - } - void addItem(FrameData * input) { QMutexLocker bufferLock(&bufferGuard); buffer.insert(input->sequenceNumber, input); - - if (input->sequenceNumber == next_target) { - availableInput.wakeOne(); - } } - FrameData * getItem() + FrameData * tryGetItem() { QMutexLocker bufferLock(&bufferGuard); if (buffer.empty() || buffer.begin().key() != this->next_target) { - if (buffer.empty() && no_input) { - next_target = 0; - return NULL; - } - availableInput.wait(&bufferGuard); - } - - // availableInput was signalled, but the buffer is empty? We're done here. - if (buffer.empty()) { - next_target = 0; return NULL; } QMap::Iterator result = buffer.begin(); - //next_target++; + if (next_target != result.value()->sequenceNumber) { qWarning("mismatched targets!"); } @@ -170,9 +77,6 @@ public: private: QMutex bufferGuard; - QWaitCondition availableInput; - bool no_input; - int next_target; QMap buffer; @@ -192,31 +96,16 @@ public: outputBuffer = &buffer2; } - void stoppedInput() - { - QWriteLocker bufferLock(&bufferGuard); - no_input = true; - // Release anything waiting for input items. - availableInput.wakeAll(); - } - - // There will be more input - void startInput() - { - QWriteLocker bufferLock(&bufferGuard); - no_input = false; - } // called from the producer thread void addItem(FrameData * input) { QReadLocker readLock(&bufferGuard); inputBuffer->append(input); - availableInput.wakeOne(); } - // Called from the consumer thread - FrameData * getItem() { + FrameData * tryGetItem() + { QReadLocker readLock(&bufferGuard); // There is something for us to get @@ -233,15 +122,7 @@ public: // Nothing on the input buffer either? if (inputBuffer->empty()) { - // If nothing else is coming, return null - if (no_input) - return NULL; - //otherwise, wait on the input buffer - availableInput.wait(&bufferGuard); - // Did we get woken up because no more input is coming? if so - // we're done here - if (no_input && inputBuffer->empty()) - return NULL; + return NULL; } // input buffer is non-empty, so swap the buffers @@ -259,10 +140,7 @@ private: // removing from this buffer can remove things from the current // output buffer if it has a read lock, or swap the buffers if it // has a write lock. - // Checking/modifying no_input requires a write lock. QReadWriteLock bufferGuard; - QWaitCondition availableInput; - bool no_input; // The buffer that is currently being added to QList * inputBuffer; @@ -281,44 +159,71 @@ private: class DataSource { public: - DataSource(int maxFrames=100) + DataSource(int maxFrames=Globals->parallelism + 1) { + final_frame = -1; + last_issued = -2; for (int i=0; i < maxFrames;i++) { allFrames.addItem(new FrameData()); } - allFrames.startInput(); } virtual ~DataSource() { - allFrames.stoppedInput(); while (true) { - FrameData * frame = allFrames.getItem(); + FrameData * frame = allFrames.tryGetItem(); if (frame == NULL) break; delete frame; } } - FrameData * getFrame() + // non-blocking version of getFrame + FrameData * tryGetFrame() { - FrameData * aFrame = allFrames.getItem(); + FrameData * aFrame = allFrames.tryGetItem(); + if (aFrame == NULL) + return NULL; + aFrame->data.clear(); aFrame->sequenceNumber = -1; bool res = getNext(*aFrame); if (!res) { allFrames.addItem(aFrame); + // Datasource broke? + QMutexLocker lock(&last_frame_update); + + final_frame = last_issued; + if (final_frame == last_received) + lastReturned.wakeAll(); + else if (final_frame < last_received) + std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl; return NULL; } + last_issued = aFrame->sequenceNumber; return aFrame; } - void returnFrame(FrameData * inputFrame) + bool returnFrame(FrameData * inputFrame) { allFrames.addItem(inputFrame); + + QMutexLocker lock(&last_frame_update); + last_received = inputFrame->sequenceNumber; + if (inputFrame->sequenceNumber == final_frame) { + lastReturned.wakeAll(); + } + + return this->final_frame != -1; + } + + void waitLast() + { + QMutexLocker lock(&last_frame_update); + lastReturned.wait(&last_frame_update); } virtual void close() = 0; @@ -329,6 +234,12 @@ public: protected: DoubleBuffer allFrames; + int final_frame; + int last_issued; + int last_received; + + QWaitCondition lastReturned; + QMutex last_frame_update; }; // Read a video frame by frame using cv::VideoCapture @@ -339,6 +250,9 @@ public: bool open(Template &input) { + final_frame = -1; + last_issued = -2; + next_idx = 0; basis = input; video.open(input.file.name.toStdString()); @@ -365,6 +279,7 @@ private: if (!res) { return false; } + output.data.last().file.set("FrameNumber", output.sequenceNumber); return true; } @@ -388,6 +303,9 @@ public: basis = input; current_idx = 0; next_sequence = 0; + final_frame = -1; + last_issued = -2; + return isOpen(); } @@ -448,6 +366,9 @@ public: { close(); bool open_res = false; + final_frame = -1; + last_issued = -2; + // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { actualSource = new VideoDataSource(0); @@ -479,84 +400,194 @@ protected: class ProcessingStage : public QRunnable { +public: friend class StreamTransform; public: ProcessingStage(int nThreads = 1) { thread_count = nThreads; - activeThreads.release(thread_count); setAutoDelete(false); } - void markStart() - { - activeThreads.acquire(); - } + virtual void run()=0; - void waitStop() - { - // Wait until all threads have stopped - activeThreads.acquire(thread_count); - activeThreads.release(thread_count); - } + virtual void nextStageRun(FrameData * input)=0; protected: - void markStop() - { - activeThreads.release(); - } - QSemaphore activeThreads; int thread_count; SharedBuffer * inputBuffer; - SharedBuffer * outputBuffer; + ProcessingStage * nextStage; Transform * transform; int stage_id; +}; + +class MultiThreadStage; + +void multistage_run(MultiThreadStage * basis, FrameData * input); + +class MultiThreadStage : public ProcessingStage +{ +public: + MultiThreadStage(int _input) : ProcessingStage(_input) {} + + friend void multistage_run(MultiThreadStage * basis, FrameData * input); + + void run() + { + qFatal("no don't do it!"); + } + + // Called from a different thread than run + virtual void nextStageRun(FrameData * input) + { + QtConcurrent::run(multistage_run, this, input); + } +}; + +void multistage_run(MultiThreadStage * basis, FrameData * input) +{ + if (input == NULL) + qFatal("null input to multi-thread stage"); + // Project the input we got + basis->transform->projectUpdate(input->data); + + basis->nextStage->nextStageRun(input); +} + +class SingleThreadStage : public ProcessingStage +{ public: + SingleThreadStage(bool input_variance) : ProcessingStage(1) + { + currentStatus = STOPPING; + next_target = 0; + if (input_variance) + this->inputBuffer = new DoubleBuffer(); + else + this->inputBuffer = new SequencingBuffer(); + } + ~SingleThreadStage() + { + delete inputBuffer; + } + + int next_target; + enum Status + { + STARTING, + STOPPING + }; + QReadWriteLock statusLock; + Status currentStatus; + // We should start, and enter a wait on input data void run() { - markStart(); + FrameData * currentItem; forever { - FrameData * currentItem = inputBuffer->getItem(); + // Whether or not we get a valid item controls whether or not we + QWriteLocker lock(&statusLock); + currentItem = inputBuffer->tryGetItem(); if (currentItem == NULL) - break; + { + this->currentStatus = STOPPING; + return; + } + lock.unlock(); + if (currentItem->sequenceNumber != next_target) + { + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); + } + next_target = currentItem->sequenceNumber + 1; + // Project the input we got transform->projectUpdate(currentItem->data); - // Add the result to the ouptut buffer - outputBuffer->addItem(currentItem); + + this->nextStage->nextStageRun(currentItem); } - markStop(); } -}; + // Calledfrom a different thread than run. + void nextStageRun(FrameData * input) + { + // add to our input buffer + inputBuffer->addItem(input); + QReadLocker lock(&statusLock); + if (currentStatus == STARTING) + return; + + // Have to change to a write lock to modify currentStatus + lock.unlock(); + QWriteLocker writeLock(&statusLock); + // But someone might have changed it between locks + if (currentStatus == STARTING) + return; + // Ok we can start a thread + QThreadPool::globalInstance()->start(this); + currentStatus = STARTING; + } +}; // No input buffer, instead we draw templates from some data source // Will be operated by the main thread for the stream -class FirstStage : public ProcessingStage +class FirstStage : public SingleThreadStage { public: + FirstStage() : SingleThreadStage(true) {} + DataSourceManager dataSource; // Start drawing frames from the datasource. void run() { + FrameData * currentItem; forever { - //FrameData * aFrame = dataSource.getNext(); - FrameData * aFrame = dataSource.getFrame(); - if (aFrame == NULL) - break; - outputBuffer->addItem(aFrame); + // Whether or not we get a valid item controls whether or not we + QWriteLocker lock(&statusLock); + + currentItem = this->dataSource.tryGetFrame(); + if (currentItem == NULL) + { + this->currentStatus = STOPPING; + return; + } + lock.unlock(); + if (currentItem->sequenceNumber != next_target) + { + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); + } + next_target = currentItem->sequenceNumber + 1; + + this->nextStage->nextStageRun(currentItem); } - this->markStop(); } + + void nextStageRun(FrameData * input) + { + QWriteLocker lock(&statusLock); + + // Return the frame to the frame buffer + bool res = dataSource.returnFrame(input); + // If the data source broke already, we're done. + if (res) + return; + + if (currentStatus == STARTING) + return; + + currentStatus = STARTING; + QThreadPool::globalInstance()->start(this, this->next_target); + } + }; -class LastStage : public ProcessingStage +class LastStage : public SingleThreadStage { public: + LastStage(bool _prev_stage_variance) : SingleThreadStage(_prev_stage_variance) {} TemplateList getOutput() { return collectedOutput; @@ -565,28 +596,35 @@ public: private: TemplateList collectedOutput; public: - DataSource * data; void run() { forever { - // Wait for input - FrameData * frame = inputBuffer->getItem(); - if (frame == NULL) + QWriteLocker lock(&statusLock); + FrameData * currentItem = inputBuffer->tryGetItem(); + if (currentItem == NULL) + { + currentStatus = STOPPING; break; + } + lock.unlock(); + + if (currentItem->sequenceNumber != next_target) + { + qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target); + } + next_target = currentItem->sequenceNumber + 1; + // Just put the item on collectedOutput - collectedOutput.append(frame->data); - // Return the frame to the input frame buffer - data->returnFrame(frame); + collectedOutput.append(currentItem->data); + this->nextStage->nextStageRun(currentItem); } - this->markStop(); } }; class StreamTransform : public CompositeTransform { Q_OBJECT - int threads_per_multi_stage; public: void train(const TemplateList & data) { @@ -627,40 +665,15 @@ public: return; } - // Tell all buffers to expect input - for (int i=0; i < sharedBuffers.size(); i++) { - sharedBuffers[i]->startInput(); - } - - // Start our processing stages - for (int i=0; i < this->processingStages.size(); i++) { - int count = stage_variance[i] ? 1 : threads_per_multi_stage; - for (int j =0; j < count; j ++) processingThreads.start(processingStages[i]); - } - - // Start the final stage - processingThreads.start(&collectionStage); - - // Run the read stage ourselves - readStage.run(); - - // The read stage has stopped (since we ran the read stage). - // Step over the buffers, and call stoppedInput to tell the stage - // reading from each buffer that no more frames will be added after - // the current ones run out, then wait for the thread to finish. - for (int i =0; i < (sharedBuffers.size() - 1); i++) { - // Indicate that no more input will be available - sharedBuffers[i]->stoppedInput(); - - // Wait for the thread to finish. - this->processingStages[i]->waitStop(); - } - // Wait for the collection stage to finish - sharedBuffers.last()->stoppedInput(); - collectionStage.waitStop(); + QThreadPool::globalInstance()->releaseThread(); + readStage.currentStatus = SingleThreadStage::STARTING; + QThreadPool::globalInstance()->start(&readStage, 0); + // Wait for the end. + readStage.dataSource.waitLast(); + QThreadPool::globalInstance()->reserveThread(); // dst is set to all output received by the final stage - dst = collectionStage.getOutput(); + dst = collectionStage->getOutput(); } virtual void finalize(TemplateList & output) @@ -672,78 +685,47 @@ public: // Create and link stages void init() { - int thread_count = 0; - threads_per_multi_stage = 4; + if (transforms.isEmpty()) return; + stage_variance.reserve(transforms.size()); foreach (const br::Transform *transform, transforms) { stage_variance.append(transform->timeVarying()); - thread_count += transform->timeVarying() ? 1 : threads_per_multi_stage; } - if (transforms.isEmpty()) return; - - // Set up the thread pool, 1 stage for each transform, as well as first - // and last stages, but the first stage is operated by the thread that - // calls project so the pool only needs nTransforms+1 total. - processingThreads.setMaxThreadCount(thread_count + 1); - - - // buffer 0 -- output buffer for the read stage, input buffer for - // first transform. Is that transform time-varying? - if (stage_variance[0]) - sharedBuffers.append(new DoubleBuffer()); - // If not, we can run multiple threads - else - sharedBuffers.append(new SingleBuffer()); - readStage.outputBuffer = sharedBuffers.last(); readStage.stage_id = 0; int next_stage_id = 1; - int lastBufferIdx = 0; + bool prev_stage_variance = true; for (int i =0; i < transforms.size(); i++) { - // Set up this stage - processingStages.append(new ProcessingStage(stage_variance[i] ? 1 : threads_per_multi_stage)); + if (stage_variance[i]) + { + processingStages.append(new SingleThreadStage(prev_stage_variance)); + } + else + processingStages.append(new MultiThreadStage(Globals->parallelism)); processingStages.last()->stage_id = next_stage_id++; - processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; - lastBufferIdx++; - // This stage's output buffer, next stage's input buffer. If this is - // the last transform, the next stage is the (time varying) collection - // stage - bool next_variance = (i+1) < transforms.size() ? stage_variance[i+1] : true; - bool current_variance = stage_variance[i]; - // if this is a single threaded stage - if (current_variance) - { - // 1 - 1 case - if (next_variance) - sharedBuffers.append(new DoubleBuffer()); - // 1 - n case - else - sharedBuffers.append(new SingleBuffer()); - } - // This is a multi-threaded stage + // link nextStage pointers + if (i == 0) + this->readStage.nextStage = processingStages[i]; else - { - // If the next stage is single threaded, we need to sequence our - // output (n - 1 case) - if (next_variance) - sharedBuffers.append(new SequencingBuffer()); - // Otherwise, this is an n-n boundary and we don't need to - // adhere to any particular sequence - else - sharedBuffers.append(new SingleBuffer()); - } - processingStages.last()->outputBuffer = sharedBuffers.last(); + processingStages[i-1]->nextStage = processingStages[i]; + + lastBufferIdx++; + processingStages.last()->transform = transforms[i]; + prev_stage_variance = stage_variance[i]; } - collectionStage.inputBuffer = sharedBuffers.last(); - collectionStage.data = &readStage.dataSource; - collectionStage.stage_id = next_stage_id; + collectionStage = new LastStage(prev_stage_variance); + collectionStage->stage_id = next_stage_id; + + // It's a ring buffer, get it? + processingStages.last()->nextStage = collectionStage; + collectionStage->nextStage = &readStage; } ~StreamTransform() @@ -751,22 +733,16 @@ public: for (int i = 0; i < processingStages.size(); i++) { delete processingStages[i]; } - for (int i = 0; i < sharedBuffers.size(); i++) { - delete sharedBuffers[i]; - } - + delete collectionStage; } protected: QList stage_variance; FirstStage readStage; - LastStage collectionStage; + LastStage * collectionStage; QList processingStages; - QList sharedBuffers; - - QThreadPool processingThreads; void _project(const Template &src, Template &dst) const {