From 7a47d58c67046fe65fd7978dc8a1dae7041f343e Mon Sep 17 00:00:00 2001 From: Charles Otto Date: Wed, 31 Jul 2013 13:29:32 -0400 Subject: [PATCH] Some code cleanup, also changes to thread pooling --- openbr/plugins/stream.cpp | 365 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------- 1 file changed, 226 insertions(+), 139 deletions(-) diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 609823c..8efe6e9 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -160,9 +160,8 @@ class DataSource public: DataSource(int maxFrames=500) { + // The sequence number of the last frame final_frame = -1; - last_issued = -2; - last_received = -3; for (int i=0; i < maxFrames;i++) { allFrames.addItem(new FrameData()); @@ -181,57 +180,64 @@ public: } // non-blocking version of getFrame - FrameData * tryGetFrame() + // Returns a NULL FrameData if too many frames are out, or the + // data source is broken. Sets last_frame to true iff the FrameData + // returned is the last valid frame, and the data source is now broken. + FrameData * tryGetFrame(bool & last_frame) { + last_frame = false; + + if (is_broken) { + return NULL; + } + + // Try to get a FrameData from the pool, if we can't it means too many + // frames are already out, and we will return NULL to indicate failure FrameData * aFrame = allFrames.tryGetItem(); - if (aFrame == NULL) + if (aFrame == NULL) { return NULL; + } aFrame->data.clear(); aFrame->sequenceNumber = -1; + // Try to read a frame, if this returns false the data source is broken bool res = getNext(*aFrame); - // The datasource broke. - if (!res) { + // The datasource broke, update final_frame + if (!res) + { + QMutexLocker lock(&last_frame_update); + final_frame = lookAhead.back()->sequenceNumber; allFrames.addItem(aFrame); + } + else lookAhead.push_back(aFrame); - QMutexLocker lock(&last_frame_update); - // Did we already receive the last frame? - final_frame = last_issued; + FrameData * rVal = lookAhead.first(); + lookAhead.pop_front(); - // We got the last frame before the data source broke, - // better pulse lastReturned - if (final_frame == last_received) { - lastReturned.wakeAll(); - } - else if (final_frame < last_received) - std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl; - return NULL; + if (rVal->sequenceNumber == final_frame) { + last_frame = true; + is_broken = true; } - last_issued = aFrame->sequenceNumber; - return aFrame; + + return rVal; } - // Returns true if the frame returned was the last + // Return a frame to the pool, returns true if the frame returned was the last // frame issued, false otherwise bool returnFrame(FrameData * inputFrame) { + int frameNumber = inputFrame->sequenceNumber; + allFrames.addItem(inputFrame); bool rval = false; QMutexLocker lock(&last_frame_update); - last_received = inputFrame->sequenceNumber; - - if (inputFrame->sequenceNumber == final_frame) { - // This is the reserveThread that matches the releaseThread in - // Stream::projectUpdate, we do it here to prevent a gap where the - // thread count is still increased, but a stream worker thread is not - // running, which might allow something to start that shouldn't - // start yet. - QThreadPool::globalInstance()->reserveThread(); + + if (frameNumber == final_frame) { // We just received the last frame, better pulse lastReturned.wakeAll(); rval = true; @@ -246,17 +252,57 @@ public: lastReturned.wait(&last_frame_update); } - virtual void close() = 0; - virtual bool open(Template & output, int start_index=0) = 0; - virtual bool isOpen() = 0; + bool open(Template & output, int start_index = 0) + { + is_broken = false; + // The last frame isn't initialized yet + final_frame = -1; + // Start our sequence numbers from the input index + next_sequence_number = start_index; + + // Actually open the data source + bool open_res = concreteOpen(output); + + // We couldn't open the data source + if (!open_res) { + is_broken = true; + return false; + } + + // Try to get a frame from the global pool + FrameData * firstFrame = allFrames.tryGetItem(); + + // If this fails, things have gone pretty badly. + if (firstFrame == NULL) { + is_broken = true; + return false; + } + + // Read a frame from the video source + bool res = getNext(*firstFrame); + + // the data source broke already, we couldn't even get one frame + // from it. + if (!res) { + is_broken = true; + return false; + } + + lookAhead.append(firstFrame); + return true; + } + virtual bool isOpen()=0; + virtual bool concreteOpen(Template & output) = 0; virtual bool getNext(FrameData & input) = 0; + virtual void close() = 0; + int next_sequence_number; protected: DoubleBuffer allFrames; int final_frame; - int last_issued; - int last_received; + bool is_broken; + QList lookAhead; QWaitCondition lastReturned; QMutex last_frame_update; @@ -268,13 +314,8 @@ class VideoDataSource : public DataSource public: VideoDataSource(int maxFrames) : DataSource(maxFrames) {} - bool open(Template &input, int start_index=0) + bool concreteOpen(Template &input) { - final_frame = -1; - last_issued = -2; - last_received = -3; - - next_idx = start_index; basis = input; bool is_int = false; int anInt = input.file.name.toInt(&is_int); @@ -309,25 +350,31 @@ private: return false; output.data.append(Template(basis.file)); - output.data.last().append(cv::Mat()); + output.data.last().m() = cv::Mat(); - output.sequenceNumber = next_idx; - next_idx++; + output.sequenceNumber = next_sequence_number; + next_sequence_number++; - bool res = video.read(output.data.last().last()); - output.data.last().last() = output.data.last().last().clone(); + cv::Mat temp; + bool res = video.read(temp); if (!res) { + output.data.last().m() = cv::Mat(); close(); return false; } + + // This clone is critical, if we don't do it then the matrix will + // be an alias of an internal buffer of the video source, leading + // to various problems later. + output.data.last().m() = temp.clone(); + output.data.last().file.set("FrameNumber", output.sequenceNumber); return true; } cv::VideoCapture video; Template basis; - int next_idx; }; // Given a template as input, return its matrices one by one on subsequent calls @@ -337,21 +384,16 @@ class TemplateDataSource : public DataSource public: TemplateDataSource(int maxFrames) : DataSource(maxFrames) { - current_idx = INT_MAX; + current_matrix_idx = INT_MAX; data_ok = false; } - bool data_ok; - bool open(Template &input, int start_index=0) + bool concreteOpen(Template &input) { basis = input; - current_idx = 0; - next_sequence = start_index; - final_frame = -1; - last_issued = -2; - last_received = -3; + current_matrix_idx = 0; - data_ok = current_idx < basis.size(); + data_ok = current_matrix_idx < basis.size(); return data_ok; } @@ -361,39 +403,41 @@ public: void close() { - current_idx = INT_MAX; + current_matrix_idx = INT_MAX; basis.clear(); } private: bool getNext(FrameData & output) { - data_ok = current_idx < basis.size(); + data_ok = current_matrix_idx < basis.size(); if (!data_ok) return false; - output.data.append(basis[current_idx]); - current_idx++; + output.data.append(basis[current_matrix_idx]); + current_matrix_idx++; - output.sequenceNumber = next_sequence; - next_sequence++; + output.sequenceNumber = next_sequence_number; + next_sequence_number++; output.data.last().file.set("FrameNumber", output.sequenceNumber); return true; } Template basis; - int current_idx; - int next_sequence; + // Index of the next matrix to output from the template + int current_matrix_idx; + + // is current_matrix_idx in bounds? + bool data_ok; }; -// Given a template as input, create a VideoDataSource or a TemplateDataSource -// depending on whether or not it looks like the input template has already -// loaded frames into memory. +// Given a templatelist as input, create appropriate data source for each +// individual template class DataSourceManager : public DataSource { public: - DataSourceManager() + DataSourceManager() : DataSource(500) { actualSource = NULL; } @@ -414,29 +458,25 @@ public: bool open(TemplateList & input) { - currentIdx = 0; + current_template_idx = 0; templates = input; - return open(templates[currentIdx]); + return DataSource::open(templates[current_template_idx]); } - bool open(Template & input, int start_index=0) + bool concreteOpen(Template & input) { close(); - final_frame = -1; - last_issued = -2; - last_received = -3; - next_frame = start_index; // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { actualSource = new VideoDataSource(0); - actualSource->open(input, next_frame); + actualSource->concreteOpen(input); } else { // create frame dealer actualSource = new TemplateDataSource(0); - actualSource->open(input, next_frame); + actualSource->concreteOpen(input); } if (!isOpen()) { delete actualSource; @@ -449,30 +489,47 @@ public: bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } protected: - int currentIdx; - int next_frame; + // Index of the template in the templatelist we are currently reading from + int current_template_idx; + TemplateList templates; DataSource * actualSource; bool getNext(FrameData & output) { bool res = actualSource->getNext(output); + output.sequenceNumber = next_sequence_number; + if (res) { - next_frame = output.sequenceNumber+1; + output.data.last().file.set("FrameNumber", output.sequenceNumber); + next_sequence_number++; + if (output.data.last().last().empty()) + qDebug("broken matrix"); return true; } + while(!res) { - currentIdx++; + output.data.clear(); + current_template_idx++; - if (currentIdx >= templates.size()) + // No more templates? We're done + if (current_template_idx >= templates.size()) return false; - bool open_res = open(templates[currentIdx], next_frame); + + // open the next data source + bool open_res = concreteOpen(templates[current_template_idx]); if (!open_res) return false; + + // get a frame from it res = actualSource->getNext(output); } + output.sequenceNumber = next_sequence_number++; + output.data.last().file.set("FrameNumber", output.sequenceNumber); + + if (output.data.last().last().empty()) + qDebug("broken matrix"); - next_frame = output.sequenceNumber+1; return res; } @@ -480,9 +537,14 @@ protected: class ProcessingStage; -class BasicLoop : public QRunnable +class BasicLoop : public QRunnable, public QFutureInterface { public: + BasicLoop() + { + this->reportStarted(); + } + void run(); QList * stages; @@ -508,13 +570,13 @@ public: int stage_id; virtual void reset()=0; - protected: int thread_count; SharedBuffer * inputBuffer; ProcessingStage * nextStage; QList * stages; + QThreadPool * threads; Transform * transform; }; @@ -533,6 +595,7 @@ void BasicLoop::run() current_idx++; current_idx = current_idx % stages->size(); } + this->reportFinished(); } class MultiThreadStage : public ProcessingStage @@ -567,7 +630,6 @@ public: } }; - class SingleThreadStage : public ProcessingStage { public: @@ -630,18 +692,20 @@ public: lock.unlock(); if (newItem) - { - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = newItem; - - QThreadPool::globalInstance()->start(next, stages->size() - stage_id); - } + startThread(newItem); return input; } + void startThread(br::FrameData * newItem) + { + BasicLoop * next = new BasicLoop(); + next->stages = stages; + next->start_idx = this->stage_id; + next->startItem = newItem; + this->threads->start(next, stages->size() - stage_id); + } + // Calledfrom a different thread than run. bool tryAcquireNextStage(FrameData *& input) @@ -677,7 +741,7 @@ public: }; // No input buffer, instead we draw templates from some data source -// Will be operated by the main thread for the stream +// Will be operated by the main thread for the stream. starts threads class FirstStage : public SingleThreadStage { public: @@ -687,44 +751,51 @@ public: FrameData * run(FrameData * input, bool & should_continue) { - // Is there anything on our input buffer? If so we should start a thread with that. + // Try to get a frame from the datasource QWriteLocker lock(&statusLock); - input = dataSource.tryGetFrame(); - // Datasource broke? - if (!input) + bool last_frame = false; + input = dataSource.tryGetFrame(last_frame); + + // Datasource broke, or is currently out of frames? + if (!input || last_frame) { + // We will just stop and not continue. currentStatus = STOPPING; - should_continue = false; - return NULL; + if (!input) { + should_continue = false; + return NULL; + } } lock.unlock(); - + // Can we enter the next stage? should_continue = nextStage->tryAcquireNextStage(input); - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = NULL; - - QThreadPool::globalInstance()->start(next, stages->size() - stage_id); + // We are exiting leaving this stage, should we start another + // thread here? Normally we will always re-queue a thread on + // the first stage, but if we received the last frame there is + // no need to. + if (!last_frame) { + startThread(NULL); + } return input; } - // Calledfrom a different thread than run. + // The last stage, trying to access the first stage bool tryAcquireNextStage(FrameData *& input) { + // Return the frame, was it the last one? bool was_last = dataSource.returnFrame(input); input = NULL; + + // OK we won't continue. if (was_last) { return false; } - if (!dataSource.isOpen()) - return false; - QReadLocker lock(&statusLock); - // Thread is already running, we should just return + // A thread is already in the first stage, + // we should just return if (currentStatus == STARTING) { return false; @@ -747,6 +818,7 @@ public: }; +// starts threads class LastStage : public SingleThreadStage { public: @@ -777,11 +849,14 @@ public: } next_target = input->sequenceNumber + 1; + // add the item to our output buffer collectedOutput.append(input->data); + // Can we enter the read stage? should_continue = nextStage->tryAcquireNextStage(input); - // Is there anything on our input buffer? If so we should start a thread with that. + // Is there anything on our input buffer? If so we should start a thread + // in this stage to process that frame. QWriteLocker lock(&statusLock); FrameData * newItem = inputBuffer->tryGetItem(); if (!newItem) @@ -791,23 +866,18 @@ public: lock.unlock(); if (newItem) - { - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = newItem; - - QThreadPool::globalInstance()->start(next, stages->size() - stage_id); - } + startThread(newItem); return input; } }; + class StreamTransform : public CompositeTransform { Q_OBJECT public: + void train(const TemplateList & data) { foreach(Transform * transform, transforms) { @@ -837,23 +907,17 @@ public: bool res = readStage->dataSource.open(dst); if (!res) return; + // Start the first thread in the stream. readStage->currentStatus = SingleThreadStage::STARTING; + readStage->startThread(NULL); - BasicLoop loop; - loop.stages = &this->processingStages; - loop.start_idx = 0; - loop.startItem = NULL; - loop.setAutoDelete(false); - - // Create a thread, then allow it to start by releasing. We queue the thread - // first so that any lower priority threads that are already queued don't start - // instead of the new one. - QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id); - QThreadPool::globalInstance()->releaseThread(); - - // Wait for the end. + // Wait for the stream to reach the last frame available from + // the data source. readStage->dataSource.waitLast(); + // Now that there are no more incoming frames, call finalize + // on each transform in turn to collect any last templates + // they wish to issue. TemplateList final_output; // Push finalize through the stages @@ -869,7 +933,8 @@ public: final_output.append(output_set); } - // dst is set to all output received by the final stage + // dst is set to all output received by the final stage, along + // with anything output via the calls to finalize. dst = collectionStage->getOutput(); dst.append(final_output); @@ -881,7 +946,8 @@ public: virtual void finalize(TemplateList & output) { (void) output; - // Not handling this yet -cao + // Nothing in particular to do here, stream calls finalize + // on all child transforms as part of projectUpdate } // Create and link stages @@ -889,6 +955,19 @@ public: { if (transforms.isEmpty()) return; + // We share a thread pool across streams attached to the same + // parent tranform, retrieve or create a thread pool based + // on our parent transform. + QMutexLocker poolLock(&poolsAccess); + QHash::Iterator it; + if (!pools.contains(this->parent())) { + it = pools.insert(this->parent(), new QThreadPool(this)); + it.value()->setMaxThreadCount(Globals->parallelism); + } + else it = pools.find(this->parent()); + threads = it.value(); + poolLock.unlock(); + stage_variance.reserve(transforms.size()); foreach (const br::Transform *transform, transforms) { stage_variance.append(transform->timeVarying()); @@ -899,6 +978,7 @@ public: processingStages.push_back(readStage); readStage->stage_id = 0; readStage->stages = &this->processingStages; + readStage->threads = this->threads; int next_stage_id = 1; @@ -906,9 +986,7 @@ public: for (int i =0; i < transforms.size(); i++) { if (stage_variance[i]) - { processingStages.append(new SingleThreadStage(prev_stage_variance)); - } else processingStages.append(new MultiThreadStage(Globals->parallelism)); @@ -919,6 +997,7 @@ public: processingStages[i]->nextStage = processingStages[i+1]; processingStages.last()->stages = &this->processingStages; + processingStages.last()->threads = this->threads; processingStages.last()->transform = transforms[i]; prev_stage_variance = stage_variance[i]; @@ -928,6 +1007,7 @@ public: processingStages.append(collectionStage); collectionStage->stage_id = next_stage_id; collectionStage->stages = &this->processingStages; + collectionStage->threads = this->threads; processingStages[processingStages.size() - 2]->nextStage = collectionStage; @@ -950,6 +1030,10 @@ protected: QList processingStages; + static QHash pools; + static QMutex poolsAccess; + QThreadPool * threads; + void _project(const Template &src, Template &dst) const { (void) src; (void) dst; @@ -962,6 +1046,9 @@ protected: } }; +QHash StreamTransform::pools; +QMutex StreamTransform::poolsAccess; + BR_REGISTER(Transform, StreamTransform) -- libgit2 0.21.4