diff --git a/openbr/core/resource.h b/openbr/core/resource.h index ae18d36..a620969 100644 --- a/openbr/core/resource.h +++ b/openbr/core/resource.h @@ -24,6 +24,7 @@ #include #include #include +#include template class ResourceMaker @@ -52,7 +53,7 @@ public: : resourceMaker(rm) , availableResources(new QList()) , lock(new QMutex()) - , totalResources(new QSemaphore(QThread::idealThreadCount())) + , totalResources(new QSemaphore(br::Globals->parallelism)) {} ~Resource() diff --git a/openbr/plugins/meta.cpp b/openbr/plugins/meta.cpp index f00676c..7f827dc 100644 --- a/openbr/plugins/meta.cpp +++ b/openbr/plugins/meta.cpp @@ -645,17 +645,28 @@ public: QList input_buffer; input_buffer.reserve(src.size()); + QFutureSynchronizer futures; + for (int i =0; i < src.size();i++) { input_buffer.append(TemplateList()); output_buffer.append(TemplateList()); } - - QFutureSynchronizer futures; + QList > temp; + temp.reserve(src.size()); for (int i=0; iparallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i])); - else _projectList( transform, &input_buffer[i], &output_buffer[i]); + + if (Globals->parallelism > 1) temp.append(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i])); + else _projectList(transform, &input_buffer[i], &output_buffer[i]); + } + // We add the futures in reverse order, since in Qt 5.1 at least the + // waiting thread will wait on them in the order added (which for uniform priority + // threads is the order of execution), and we want the waiting thread to go in the opposite order + // so that it can steal runnables and do something besides wait. + for (int i = temp.size() - 1; i > 0; i--) { + futures.addFuture(temp[i]); } + futures.waitForFinished(); for (int i=0; idata.clear(); aFrame->sequenceNumber = -1; + // Try to read a frame, if this returns false the data source is broken bool res = getNext(*aFrame); - // The datasource broke. - if (!res) { + // The datasource broke, update final_frame + if (!res) + { + QMutexLocker lock(&last_frame_update); + final_frame = lookAhead.back()->sequenceNumber; allFrames.addItem(aFrame); + } + else lookAhead.push_back(aFrame); - QMutexLocker lock(&last_frame_update); - // Did we already receive the last frame? - final_frame = last_issued; + FrameData * rVal = lookAhead.first(); + lookAhead.pop_front(); - // We got the last frame before the data source broke, - // better pulse lastReturned - if (final_frame == last_received) { - lastReturned.wakeAll(); - } - else if (final_frame < last_received) - std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl; - return NULL; + if (rVal->sequenceNumber == final_frame) { + last_frame = true; + is_broken = true; } - last_issued = aFrame->sequenceNumber; - return aFrame; + + return rVal; } - // Returns true if the frame returned was the last + // Return a frame to the pool, returns true if the frame returned was the last // frame issued, false otherwise bool returnFrame(FrameData * inputFrame) { + int frameNumber = inputFrame->sequenceNumber; + allFrames.addItem(inputFrame); bool rval = false; QMutexLocker lock(&last_frame_update); - last_received = inputFrame->sequenceNumber; - if (inputFrame->sequenceNumber == final_frame) { + if (frameNumber == final_frame) { // We just received the last frame, better pulse lastReturned.wakeAll(); rval = true; @@ -240,17 +252,57 @@ public: lastReturned.wait(&last_frame_update); } - virtual void close() = 0; - virtual bool open(Template & output, int start_index=0) = 0; - virtual bool isOpen() = 0; + bool open(Template & output, int start_index = 0) + { + is_broken = false; + // The last frame isn't initialized yet + final_frame = -1; + // Start our sequence numbers from the input index + next_sequence_number = start_index; + + // Actually open the data source + bool open_res = concreteOpen(output); + + // We couldn't open the data source + if (!open_res) { + is_broken = true; + return false; + } + + // Try to get a frame from the global pool + FrameData * firstFrame = allFrames.tryGetItem(); + + // If this fails, things have gone pretty badly. + if (firstFrame == NULL) { + is_broken = true; + return false; + } + + // Read a frame from the video source + bool res = getNext(*firstFrame); + + // the data source broke already, we couldn't even get one frame + // from it. + if (!res) { + is_broken = true; + return false; + } + lookAhead.append(firstFrame); + return true; + } + + virtual bool isOpen()=0; + virtual bool concreteOpen(Template & output) = 0; virtual bool getNext(FrameData & input) = 0; + virtual void close() = 0; + int next_sequence_number; protected: DoubleBuffer allFrames; int final_frame; - int last_issued; - int last_received; + bool is_broken; + QList lookAhead; QWaitCondition lastReturned; QMutex last_frame_update; @@ -262,13 +314,8 @@ class VideoDataSource : public DataSource public: VideoDataSource(int maxFrames) : DataSource(maxFrames) {} - bool open(Template &input, int start_index=0) + bool concreteOpen(Template &input) { - final_frame = -1; - last_issued = -2; - last_received = -3; - - next_idx = start_index; basis = input; bool is_int = false; int anInt = input.file.name.toInt(&is_int); @@ -306,25 +353,31 @@ private: return false; output.data.append(Template(basis.file)); - output.data.last().append(cv::Mat()); + output.data.last().m() = cv::Mat(); - output.sequenceNumber = next_idx; - next_idx++; + output.sequenceNumber = next_sequence_number; + next_sequence_number++; - bool res = video.read(output.data.last().last()); - output.data.last().last() = output.data.last().last().clone(); + cv::Mat temp; + bool res = video.read(temp); if (!res) { + output.data.last().m() = cv::Mat(); close(); return false; } + + // This clone is critical, if we don't do it then the matrix will + // be an alias of an internal buffer of the video source, leading + // to various problems later. + output.data.last().m() = temp.clone(); + output.data.last().file.set("FrameNumber", output.sequenceNumber); return true; } cv::VideoCapture video; Template basis; - int next_idx; }; // Given a template as input, return its matrices one by one on subsequent calls @@ -334,21 +387,16 @@ class TemplateDataSource : public DataSource public: TemplateDataSource(int maxFrames) : DataSource(maxFrames) { - current_idx = INT_MAX; + current_matrix_idx = INT_MAX; data_ok = false; } - bool data_ok; - bool open(Template &input, int start_index=0) + bool concreteOpen(Template &input) { basis = input; - current_idx = 0; - next_sequence = start_index; - final_frame = -1; - last_issued = -2; - last_received = -3; + current_matrix_idx = 0; - data_ok = current_idx < basis.size(); + data_ok = current_matrix_idx < basis.size(); return data_ok; } @@ -358,39 +406,41 @@ public: void close() { - current_idx = INT_MAX; + current_matrix_idx = INT_MAX; basis.clear(); } private: bool getNext(FrameData & output) { - data_ok = current_idx < basis.size(); + data_ok = current_matrix_idx < basis.size(); if (!data_ok) return false; - output.data.append(basis[current_idx]); - current_idx++; + output.data.append(basis[current_matrix_idx]); + current_matrix_idx++; - output.sequenceNumber = next_sequence; - next_sequence++; + output.sequenceNumber = next_sequence_number; + next_sequence_number++; output.data.last().file.set("FrameNumber", output.sequenceNumber); return true; } Template basis; - int current_idx; - int next_sequence; + // Index of the next matrix to output from the template + int current_matrix_idx; + + // is current_matrix_idx in bounds? + bool data_ok; }; -// Given a template as input, create a VideoDataSource or a TemplateDataSource -// depending on whether or not it looks like the input template has already -// loaded frames into memory. +// Given a templatelist as input, create appropriate data source for each +// individual template class DataSourceManager : public DataSource { public: - DataSourceManager() + DataSourceManager() : DataSource(500) { actualSource = NULL; } @@ -411,29 +461,25 @@ public: bool open(TemplateList & input) { - currentIdx = 0; + current_template_idx = 0; templates = input; - return open(templates[currentIdx]); + return DataSource::open(templates[current_template_idx]); } - bool open(Template & input, int start_index=0) + bool concreteOpen(Template & input) { close(); - final_frame = -1; - last_issued = -2; - last_received = -3; - next_frame = start_index; // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { actualSource = new VideoDataSource(0); - actualSource->open(input, next_frame); + actualSource->concreteOpen(input); } else { // create frame dealer actualSource = new TemplateDataSource(0); - actualSource->open(input, next_frame); + actualSource->concreteOpen(input); } if (!isOpen()) { delete actualSource; @@ -446,30 +492,47 @@ public: bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } protected: - int currentIdx; - int next_frame; + // Index of the template in the templatelist we are currently reading from + int current_template_idx; + TemplateList templates; DataSource * actualSource; bool getNext(FrameData & output) { bool res = actualSource->getNext(output); + output.sequenceNumber = next_sequence_number; + if (res) { - next_frame = output.sequenceNumber+1; + output.data.last().file.set("FrameNumber", output.sequenceNumber); + next_sequence_number++; + if (output.data.last().last().empty()) + qDebug("broken matrix"); return true; } + while(!res) { - currentIdx++; + output.data.clear(); + current_template_idx++; - if (currentIdx >= templates.size()) + // No more templates? We're done + if (current_template_idx >= templates.size()) return false; - bool open_res = open(templates[currentIdx], next_frame); + + // open the next data source + bool open_res = concreteOpen(templates[current_template_idx]); if (!open_res) return false; + + // get a frame from it res = actualSource->getNext(output); } + output.sequenceNumber = next_sequence_number++; + output.data.last().file.set("FrameNumber", output.sequenceNumber); + + if (output.data.last().last().empty()) + qDebug("broken matrix"); - next_frame = output.sequenceNumber+1; return res; } @@ -477,9 +540,14 @@ protected: class ProcessingStage; -class BasicLoop : public QRunnable +class BasicLoop : public QRunnable, public QFutureInterface { public: + BasicLoop() + { + this->reportStarted(); + } + void run(); QList * stages; @@ -505,13 +573,13 @@ public: int stage_id; virtual void reset()=0; - protected: int thread_count; SharedBuffer * inputBuffer; ProcessingStage * nextStage; QList * stages; + QThreadPool * threads; Transform * transform; }; @@ -530,6 +598,7 @@ void BasicLoop::run() current_idx++; current_idx = current_idx % stages->size(); } + this->reportFinished(); } class MultiThreadStage : public ProcessingStage @@ -564,7 +633,6 @@ public: } }; - class SingleThreadStage : public ProcessingStage { public: @@ -627,18 +695,20 @@ public: lock.unlock(); if (newItem) - { - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = newItem; - - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); - } + startThread(newItem); return input; } + void startThread(br::FrameData * newItem) + { + BasicLoop * next = new BasicLoop(); + next->stages = stages; + next->start_idx = this->stage_id; + next->startItem = newItem; + this->threads->start(next, stages->size() - stage_id); + } + // Calledfrom a different thread than run. bool tryAcquireNextStage(FrameData *& input) @@ -674,7 +744,7 @@ public: }; // No input buffer, instead we draw templates from some data source -// Will be operated by the main thread for the stream +// Will be operated by the main thread for the stream. starts threads class FirstStage : public SingleThreadStage { public: @@ -684,44 +754,51 @@ public: FrameData * run(FrameData * input, bool & should_continue) { - // Is there anything on our input buffer? If so we should start a thread with that. + // Try to get a frame from the datasource QWriteLocker lock(&statusLock); - input = dataSource.tryGetFrame(); - // Datasource broke? - if (!input) + bool last_frame = false; + input = dataSource.tryGetFrame(last_frame); + + // Datasource broke, or is currently out of frames? + if (!input || last_frame) { + // We will just stop and not continue. currentStatus = STOPPING; - should_continue = false; - return NULL; + if (!input) { + should_continue = false; + return NULL; + } } lock.unlock(); - + // Can we enter the next stage? should_continue = nextStage->tryAcquireNextStage(input); - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = NULL; - - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); + // We are exiting leaving this stage, should we start another + // thread here? Normally we will always re-queue a thread on + // the first stage, but if we received the last frame there is + // no need to. + if (!last_frame) { + startThread(NULL); + } return input; } - // Calledfrom a different thread than run. + // The last stage, trying to access the first stage bool tryAcquireNextStage(FrameData *& input) { + // Return the frame, was it the last one? bool was_last = dataSource.returnFrame(input); input = NULL; + + // OK we won't continue. if (was_last) { return false; } - if (!dataSource.isOpen()) - return false; - QReadLocker lock(&statusLock); - // Thread is already running, we should just return + // A thread is already in the first stage, + // we should just return if (currentStatus == STARTING) { return false; @@ -744,6 +821,7 @@ public: }; +// starts threads class LastStage : public SingleThreadStage { public: @@ -774,11 +852,14 @@ public: } next_target = input->sequenceNumber + 1; + // add the item to our output buffer collectedOutput.append(input->data); + // Can we enter the read stage? should_continue = nextStage->tryAcquireNextStage(input); - // Is there anything on our input buffer? If so we should start a thread with that. + // Is there anything on our input buffer? If so we should start a thread + // in this stage to process that frame. QWriteLocker lock(&statusLock); FrameData * newItem = inputBuffer->tryGetItem(); if (!newItem) @@ -788,23 +869,18 @@ public: lock.unlock(); if (newItem) - { - BasicLoop * next = new BasicLoop(); - next->stages = stages; - next->start_idx = this->stage_id; - next->startItem = newItem; - - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); - } + startThread(newItem); return input; } }; + class StreamTransform : public CompositeTransform { Q_OBJECT public: + void train(const TemplateList & data) { foreach(Transform * transform, transforms) { @@ -834,21 +910,17 @@ public: bool res = readStage->dataSource.open(dst); if (!res) return; - QThreadPool::globalInstance()->releaseThread(); + // Start the first thread in the stream. readStage->currentStatus = SingleThreadStage::STARTING; + readStage->startThread(NULL); - BasicLoop loop; - loop.stages = &this->processingStages; - loop.start_idx = 0; - loop.startItem = NULL; - loop.setAutoDelete(false); - - QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id); - - // Wait for the end. + // Wait for the stream to reach the last frame available from + // the data source. readStage->dataSource.waitLast(); - QThreadPool::globalInstance()->reserveThread(); + // Now that there are no more incoming frames, call finalize + // on each transform in turn to collect any last templates + // they wish to issue. TemplateList final_output; // Push finalize through the stages @@ -864,7 +936,8 @@ public: final_output.append(output_set); } - // dst is set to all output received by the final stage + // dst is set to all output received by the final stage, along + // with anything output via the calls to finalize. dst = collectionStage->getOutput(); dst.append(final_output); @@ -876,7 +949,8 @@ public: virtual void finalize(TemplateList & output) { (void) output; - // Not handling this yet -cao + // Nothing in particular to do here, stream calls finalize + // on all child transforms as part of projectUpdate } // Create and link stages @@ -884,6 +958,19 @@ public: { if (transforms.isEmpty()) return; + // We share a thread pool across streams attached to the same + // parent tranform, retrieve or create a thread pool based + // on our parent transform. + QMutexLocker poolLock(&poolsAccess); + QHash::Iterator it; + if (!pools.contains(this->parent())) { + it = pools.insert(this->parent(), new QThreadPool(this)); + it.value()->setMaxThreadCount(Globals->parallelism); + } + else it = pools.find(this->parent()); + threads = it.value(); + poolLock.unlock(); + stage_variance.reserve(transforms.size()); foreach (const br::Transform *transform, transforms) { stage_variance.append(transform->timeVarying()); @@ -894,6 +981,7 @@ public: processingStages.push_back(readStage); readStage->stage_id = 0; readStage->stages = &this->processingStages; + readStage->threads = this->threads; int next_stage_id = 1; @@ -901,9 +989,7 @@ public: for (int i =0; i < transforms.size(); i++) { if (stage_variance[i]) - { processingStages.append(new SingleThreadStage(prev_stage_variance)); - } else processingStages.append(new MultiThreadStage(Globals->parallelism)); @@ -914,6 +1000,7 @@ public: processingStages[i]->nextStage = processingStages[i+1]; processingStages.last()->stages = &this->processingStages; + processingStages.last()->threads = this->threads; processingStages.last()->transform = transforms[i]; prev_stage_variance = stage_variance[i]; @@ -923,6 +1010,7 @@ public: processingStages.append(collectionStage); collectionStage->stage_id = next_stage_id; collectionStage->stages = &this->processingStages; + collectionStage->threads = this->threads; processingStages[processingStages.size() - 2]->nextStage = collectionStage; @@ -945,6 +1033,10 @@ protected: QList processingStages; + static QHash pools; + static QMutex poolsAccess; + QThreadPool * threads; + void _project(const Template &src, Template &dst) const { (void) src; (void) dst; @@ -957,6 +1049,9 @@ protected: } }; +QHash StreamTransform::pools; +QMutex StreamTransform::poolsAccess; + BR_REGISTER(Transform, StreamTransform)