diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 340712e..6594517 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -162,6 +162,7 @@ public: { final_frame = -1; last_issued = -2; + last_received = -3; for (int i=0; i < maxFrames;i++) { allFrames.addItem(new FrameData()); @@ -194,30 +195,43 @@ public: // The datasource broke. if (!res) { allFrames.addItem(aFrame); - QMutexLocker lock(&last_frame_update); + QMutexLocker lock(&last_frame_update); + // Did we already receive the last frame? final_frame = last_issued; - if (final_frame == last_received) + + // We got the last frame before the data source broke, + // better pulse lastReturned + if (final_frame == last_received) { lastReturned.wakeAll(); + } else if (final_frame < last_received) std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl; + return NULL; } last_issued = aFrame->sequenceNumber; return aFrame; } + // Returns true if the frame returned was the last + // frame issued, false otherwise bool returnFrame(FrameData * inputFrame) { allFrames.addItem(inputFrame); + bool rval = false; + QMutexLocker lock(&last_frame_update); last_received = inputFrame->sequenceNumber; + if (inputFrame->sequenceNumber == final_frame) { + // We just received the last frame, better pulse lastReturned.wakeAll(); + rval = true; } - return this->final_frame != -1; + return rval; } void waitLast() @@ -252,6 +266,7 @@ public: { final_frame = -1; last_issued = -2; + last_received = -3; next_idx = 0; basis = input; @@ -312,6 +327,7 @@ public: next_sequence = 0; final_frame = -1; last_issued = -2; + last_received = -3; data_ok = current_idx < basis.size(); return data_ok; @@ -380,6 +396,7 @@ public: bool open_res = false; final_frame = -1; last_issued = -2; + last_received = -3; // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { @@ -439,6 +456,8 @@ public: int stage_id; + virtual void reset()=0; + protected: int thread_count; @@ -490,6 +509,11 @@ public: (void) input; return true; } + + void reset() + { + // nothing to do. + } }; @@ -512,6 +536,14 @@ public: delete inputBuffer; } + void reset() + { + QWriteLocker writeLock(&statusLock); + currentStatus = STOPPING; + next_target = 0; + } + + int next_target; enum Status { @@ -631,8 +663,11 @@ public: // Calledfrom a different thread than run. bool tryAcquireNextStage(FrameData *& input) { - dataSource.returnFrame(input); + bool was_last = dataSource.returnFrame(input); input = NULL; + if (was_last) { + return false; + } if (!dataSource.isOpen()) return false; @@ -673,6 +708,12 @@ public: private: TemplateList collectedOutput; public: + void reset() + { + collectedOutput.clear(); + SingleThreadStage::reset(); + } + FrameData * run(FrameData * input, bool & should_continue) { if (input == NULL) { @@ -767,8 +808,28 @@ public: readStage->dataSource.waitLast(); QThreadPool::globalInstance()->reserveThread(); + TemplateList final_output; + + // Push finalize through the stages + for (int i=0; i < this->transforms.size(); i++) + { + TemplateList output_set; + transforms[i]->finalize(output_set); + + for (int j=i+1; j < transforms.size();j++) + { + transforms[j]->projectUpdate(output_set); + } + final_output.append(output_set); + } + // dst is set to all output received by the final stage dst = collectionStage->getOutput(); + dst.append(final_output); + + foreach(ProcessingStage * stage, processingStages) { + stage->reset(); + } } virtual void finalize(TemplateList & output)