diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 14e3ab5..964e5d1 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -16,6 +16,7 @@ using namespace cv; namespace br { + class FrameData { public: @@ -505,132 +506,6 @@ private: bool data_ok; }; -// Given a templatelist as input, create appropriate data source for each -// individual template -class DataSourceManager : public DataSource -{ -public: - DataSourceManager(int activeFrames=100) : DataSource(activeFrames) - { - actualSource = NULL; - } - - ~DataSourceManager() - { - close(); - } - - int size() - { - return this->allFrames.size(); - } - - void close() - { - if (actualSource) { - actualSource->close(); - delete actualSource; - actualSource = NULL; - } - } - - // We are used through a call to open(TemplateList) - bool open(TemplateList & input) - { - // Set up variables specific to us - current_template_idx = 0; - templates = input; - - // Call datasourece::open on the first template to set up - // state variables - return DataSource::open(templates[current_template_idx]); - } - - // Create an actual data source of appropriate type for this template - // (initially called via the call to DataSource::open, called later - // as we run out of frames on our templates). - bool concreteOpen(Template & input) - { - close(); - - bool open_res = false; - // Input has no matrices? Its probably a video that hasn't been loaded yet - if (input.empty()) { - actualSource = new VideoDataSource(0); - open_res = actualSource->concreteOpen(input); - } - // If the input is not empty, we assume it is a set of frames already - // in memory. - else { - actualSource = new TemplateDataSource(0); - open_res = actualSource->concreteOpen(input); - } - - // The data source failed to open - if (!open_res) { - delete actualSource; - actualSource = NULL; - return false; - } - return true; - } - - bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } - -protected: - // Index of the template in the templatelist we are currently reading from - int current_template_idx; - - TemplateList templates; - DataSource * actualSource; - // Get the next frame, if we run out of frames on the current template - // move on to the next one. - bool getNext(FrameData & output) - { - bool res = actualSource->getNext(output); - output.sequenceNumber = next_sequence_number; - - // OK we got a frame - if (res) { - // Override the sequence number set by actualSource - output.data.last().file.set("FrameNumber", output.sequenceNumber); - next_sequence_number++; - if (output.data.last().last().empty()) - qDebug("broken matrix"); - return true; - } - - // We didn't get a frame, try to move on to the next template. - while(!res) { - output.data.clear(); - current_template_idx++; - - // No more templates? We're done - if (current_template_idx >= templates.size()) - return false; - - // open the next data source - bool open_res = concreteOpen(templates[current_template_idx]); - // We couldn't open it, give up? We could maybe continue here - // but don't currently. - if (!open_res) - return false; - - // get a frame from the newly opened data source, if that fails - // we continue to open the next one. - res = actualSource->getNext(output); - } - // Finally, set the sequence number for the frame we actually return. - output.sequenceNumber = next_sequence_number++; - output.data.last().file.set("FrameNumber", output.sequenceNumber); - - if (output.data.last().last().empty()) - qDebug("broken matrix"); - - return res; - } - -}; class ProcessingStage; @@ -861,100 +736,6 @@ public: }; -// This stage reads new frames from the data source. -class FirstStage : public SingleThreadStage -{ -public: - FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ } - - DataSourceManager dataSource; - - void reset() - { - dataSource.close(); - SingleThreadStage::reset(); - } - - FrameData * run(FrameData * input, bool & should_continue) - { - if (input == NULL) - qFatal("NULL frame in input stage"); - - // Can we enter the next stage? - should_continue = nextStage->tryAcquireNextStage(input); - - // Try to get a frame from the datasource, we keep working on - // the frame we have, but we will queue another job for the next - // frame if a frame is currently available. - QWriteLocker lock(&statusLock); - bool last_frame = false; - FrameData * newFrame = dataSource.tryGetFrame(last_frame); - - // Were we able to get a frame? - if (newFrame) startThread(newFrame); - // If not this stage will enter a stopped state. - else { - currentStatus = STOPPING; - } - - lock.unlock(); - - return input; - } - - // The last stage, trying to access the first stage - bool tryAcquireNextStage(FrameData *& input) - { - // Return the frame, was it the last one? - bool was_last = dataSource.returnFrame(input); - input = NULL; - - // OK we won't continue. - if (was_last) { - return false; - } - - QReadLocker lock(&statusLock); - // If the first stage is already active we will just end. - if (currentStatus == STARTING) - { - return false; - } - - // Otherwise we will try to continue, but to do so we have to - // escalate the lock, and sadly there is no way to do so without - // releasing the read-mode lock, and getting a new write-mode lock. - lock.unlock(); - - QWriteLocker writeLock(&statusLock); - // currentStatus might have changed in the gap between releasing the read - // lock and getting the write lock. - if (currentStatus == STARTING) - { - return false; - } - - bool last_frame = false; - // Try to get a frame from the data source, if we get one we will - // continue to the first stage. - input = dataSource.tryGetFrame(last_frame); - - if (!input) { - return false; - } - - currentStatus = STARTING; - - return true; - } - - void status(){ - qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size()); - } - - -}; - // Appened to the end of a Stream's transform sequence. Collects the output // from each frame on a single templatelist class LastStage : public SingleThreadStage @@ -1038,12 +819,23 @@ public: }; +class FirstStage; + class DirectStreamTransform : public CompositeTransform { Q_OBJECT public: + + enum StreamModes { StreamVideo, + DistributeFrames, + Auto}; + + Q_ENUMS(StreamModes) + Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames) + Q_PROPERTY(StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode) BR_PROPERTY(int, activeFrames, 100) + BR_PROPERTY(StreamModes, readMode, Auto) friend class StreamTransfrom; @@ -1109,63 +901,6 @@ public: qFatal("whatever"); } - // start processing, consider all templates in src a continuous - // 'video' - void projectUpdate(const TemplateList & src, TemplateList & dst) - { - dst = src; - - bool res = readStage->dataSource.open(dst); - if (!res) { - qDebug("stream failed to open %s", qPrintable(dst[0].file.name)); - return; - } - - // Start the first thread in the stream. - QWriteLocker lock(&readStage->statusLock); - readStage->currentStatus = SingleThreadStage::STARTING; - - // We have to get a frame before starting the thread - bool last_frame = false; - FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame); - if (firstFrame == NULL) - qFatal("Failed to read first frame of video"); - - readStage->startThread(firstFrame); - lock.unlock(); - - // Wait for the stream to process the last frame available from - // the data source. - bool wait_res = false; - wait_res = readStage->dataSource.waitLast(); - - // Now that there are no more incoming frames, call finalize - // on each transform in turn to collect any last templates - // they wish to issue. - TemplateList final_output; - - // Push finalize through the stages - for (int i=0; i < this->transforms.size(); i++) - { - TemplateList output_set; - transforms[i]->finalize(output_set); - - for (int j=i+1; j < transforms.size();j++) - { - transforms[j]->projectUpdate(output_set); - } - final_output.append(output_set); - } - - // dst is set to all output received by the final stage, along - // with anything output via the calls to finalize. - dst = collectionStage->getOutput(); - dst.append(final_output); - - foreach(ProcessingStage * stage, processingStages) { - stage->reset(); - } - } virtual void finalize(TemplateList & output) { @@ -1174,90 +909,8 @@ public: // on all child transforms as part of projectUpdate } - // Create and link stages - void init() - { - if (transforms.isEmpty()) return; - - for (int i=0; i < processingStages.size();i++) - delete processingStages[i]; - processingStages.clear(); - - // call CompositeTransform::init so that trainable is set - // correctly. - CompositeTransform::init(); - - // We share a thread pool across streams attached to the same - // parent tranform, retrieve or create a thread pool based - // on our parent transform. - QMutexLocker poolLock(&poolsAccess); - QHash::Iterator it; - if (!pools.contains(this->parent())) { - it = pools.insert(this->parent(), new QThreadPool(this)); - it.value()->setMaxThreadCount(Globals->parallelism); - } - else it = pools.find(this->parent()); - threads = it.value(); - poolLock.unlock(); - - // Are our children time varying or not? This decides whether - // we run them in single threaded or multi threaded stages - stage_variance.reserve(transforms.size()); - foreach (const br::Transform *transform, transforms) { - stage_variance.append(transform->timeVarying()); - } - - // Additionally, we have a separate stage responsible for reading - // frames from the data source - readStage = new FirstStage(activeFrames); - - processingStages.push_back(readStage); - readStage->stage_id = 0; - readStage->stages = &this->processingStages; - readStage->threads = this->threads; - - // Initialize and link a processing stage for each of our child - // transforms. - int next_stage_id = 1; - bool prev_stage_variance = true; - for (int i =0; i < transforms.size(); i++) - { - if (stage_variance[i]) - // Whether or not the previous stage is multi-threaded controls - // the type of input buffer we need in a single threaded stage. - processingStages.append(new SingleThreadStage(prev_stage_variance)); - else - processingStages.append(new MultiThreadStage(Globals->parallelism)); - - processingStages.last()->stage_id = next_stage_id++; - - // link nextStage pointers, the stage we just appeneded is i+1 since - // the read stage was added before this loop - processingStages[i]->nextStage = processingStages[i+1]; - - processingStages.last()->stages = &this->processingStages; - processingStages.last()->threads = this->threads; - - processingStages.last()->transform = transforms[i]; - prev_stage_variance = stage_variance[i]; - } - - // We also have the last stage, which just puts the output of the - // previous stages on a template list. - collectionStage = new LastStage(prev_stage_variance); - processingStages.append(collectionStage); - collectionStage->stage_id = next_stage_id; - collectionStage->stages = &this->processingStages; - collectionStage->threads = this->threads; - - // the last transform stage points to collection stage - processingStages[processingStages.size() - 2]->nextStage = collectionStage; - - // And the collection stage points to the read stage, because this is - // a ring buffer. - collectionStage->nextStage = readStage; - } - + void projectUpdate(const TemplateList & src, TemplateList & dst); + void init(); ~DirectStreamTransform() { // Delete all the stages @@ -1448,6 +1101,373 @@ private: BR_REGISTER(Transform, StreamTransform) +// Given a templatelist as input, create appropriate data source for each +// individual template +class DataSourceManager : public DataSource +{ +public: + DataSourceManager(int activeFrames=100) : DataSource(activeFrames) + { + actualSource = NULL; + } + + ~DataSourceManager() + { + close(); + } + + int size() + { + return this->allFrames.size(); + } + + void close() + { + if (actualSource) { + actualSource->close(); + delete actualSource; + actualSource = NULL; + } + } + + // We are used through a call to open(TemplateList) + bool open(TemplateList & input, DirectStreamTransform::StreamModes _mode) + { + // Set up variables specific to us + current_template_idx = 0; + templates = input; + mode = _mode; + + // Call datasourece::open on the first template to set up + // state variables + return DataSource::open(templates[current_template_idx]); + } + void projectUpdate(const TemplateList & src, TemplateList & dst); + + // Create an actual data source of appropriate type for this template + // (initially called via the call to DataSource::open, called later + // as we run out of frames on our templates). + bool concreteOpen(Template & input) + { + close(); + + bool open_res = false; + + // Input has no matrices? Its probably a video that hasn't been loaded yet + if (mode == DirectStreamTransform::StreamVideo || mode ~= DirectStreamTransform::DistributeFrames && input.empty()) { + actualSource = new VideoDataSource(0); + open_res = actualSource->concreteOpen(input); + } + // If the input is not empty, we assume it is a set of frames already + // in memory. + else { + actualSource = new TemplateDataSource(0); + open_res = actualSource->concreteOpen(input); + } + + // The data source failed to open + if (!open_res) { + delete actualSource; + actualSource = NULL; + return false; + } + return true; + } + + bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } + +protected: + // Index of the template in the templatelist we are currently reading from + int current_template_idx; + DirectStreamTransform::StreamModes mode; + TemplateList templates; + DataSource * actualSource; + // Get the next frame, if we run out of frames on the current template + // move on to the next one. + bool getNext(FrameData & output) + { + bool res = actualSource->getNext(output); + output.sequenceNumber = next_sequence_number; + + // OK we got a frame + if (res) { + // Override the sequence number set by actualSource + output.data.last().file.set("FrameNumber", output.sequenceNumber); + next_sequence_number++; + if (output.data.last().last().empty()) + qDebug("broken matrix"); + return true; + } + + // We didn't get a frame, try to move on to the next template. + while(!res) { + output.data.clear(); + current_template_idx++; + + // No more templates? We're done + if (current_template_idx >= templates.size()) + return false; + + // open the next data source + bool open_res = concreteOpen(templates[current_template_idx]); + // We couldn't open it, give up? We could maybe continue here + // but don't currently. + if (!open_res) + return false; + + // get a frame from the newly opened data source, if that fails + // we continue to open the next one. + res = actualSource->getNext(output); + } + // Finally, set the sequence number for the frame we actually return. + output.sequenceNumber = next_sequence_number++; + output.data.last().file.set("FrameNumber", output.sequenceNumber); + + if (output.data.last().last().empty()) + qDebug("broken matrix"); + + return res; + } + +}; + +// This stage reads new frames from the data source. +class FirstStage : public SingleThreadStage +{ +public: + FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ } + + DataSourceManager dataSource; + + void reset() + { + dataSource.close(); + SingleThreadStage::reset(); + } + + FrameData * run(FrameData * input, bool & should_continue) + { + if (input == NULL) + qFatal("NULL frame in input stage"); + + // Can we enter the next stage? + should_continue = nextStage->tryAcquireNextStage(input); + + // Try to get a frame from the datasource, we keep working on + // the frame we have, but we will queue another job for the next + // frame if a frame is currently available. + QWriteLocker lock(&statusLock); + bool last_frame = false; + FrameData * newFrame = dataSource.tryGetFrame(last_frame); + + // Were we able to get a frame? + if (newFrame) startThread(newFrame); + // If not this stage will enter a stopped state. + else { + currentStatus = STOPPING; + } + + lock.unlock(); + + return input; + } + + // The last stage, trying to access the first stage + bool tryAcquireNextStage(FrameData *& input) + { + // Return the frame, was it the last one? + bool was_last = dataSource.returnFrame(input); + input = NULL; + + // OK we won't continue. + if (was_last) { + return false; + } + + QReadLocker lock(&statusLock); + // If the first stage is already active we will just end. + if (currentStatus == STARTING) + { + return false; + } + + // Otherwise we will try to continue, but to do so we have to + // escalate the lock, and sadly there is no way to do so without + // releasing the read-mode lock, and getting a new write-mode lock. + lock.unlock(); + + QWriteLocker writeLock(&statusLock); + // currentStatus might have changed in the gap between releasing the read + // lock and getting the write lock. + if (currentStatus == STARTING) + { + return false; + } + + bool last_frame = false; + // Try to get a frame from the data source, if we get one we will + // continue to the first stage. + input = dataSource.tryGetFrame(last_frame); + + if (!input) { + return false; + } + + currentStatus = STARTING; + + return true; + } + + void status(){ + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size()); + } + + +}; + + +// start processing, consider all templates in src a continuous +// 'video' +void DirectStreamTransform::projectUpdate(const TemplateList & src, TemplateList & dst) +{ + dst = src; + + bool res = readStage->dataSource.open(dst,readMode); + if (!res) { + qDebug("stream failed to open %s", qPrintable(dst[0].file.name)); + return; + } + + // Start the first thread in the stream. + QWriteLocker lock(&readStage->statusLock); + readStage->currentStatus = SingleThreadStage::STARTING; + + // We have to get a frame before starting the thread + bool last_frame = false; + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame); + if (firstFrame == NULL) + qFatal("Failed to read first frame of video"); + + readStage->startThread(firstFrame); + lock.unlock(); + + // Wait for the stream to process the last frame available from + // the data source. + bool wait_res = false; + wait_res = readStage->dataSource.waitLast(); + + // Now that there are no more incoming frames, call finalize + // on each transform in turn to collect any last templates + // they wish to issue. + TemplateList final_output; + + // Push finalize through the stages + for (int i=0; i < this->transforms.size(); i++) + { + TemplateList output_set; + transforms[i]->finalize(output_set); + + for (int j=i+1; j < transforms.size();j++) + { + transforms[j]->projectUpdate(output_set); + } + final_output.append(output_set); + } + + // dst is set to all output received by the final stage, along + // with anything output via the calls to finalize. + dst = collectionStage->getOutput(); + dst.append(final_output); + + foreach(ProcessingStage * stage, processingStages) { + stage->reset(); + } +} + + +// Create and link stages +void DirectStreamTransform::init() +{ + if (transforms.isEmpty()) return; + + for (int i=0; i < processingStages.size();i++) + delete processingStages[i]; + processingStages.clear(); + + // call CompositeTransform::init so that trainable is set + // correctly. + CompositeTransform::init(); + + // We share a thread pool across streams attached to the same + // parent tranform, retrieve or create a thread pool based + // on our parent transform. + QMutexLocker poolLock(&poolsAccess); + QHash::Iterator it; + if (!pools.contains(this->parent())) { + it = pools.insert(this->parent(), new QThreadPool(this->parent())); + it.value()->setMaxThreadCount(Globals->parallelism); + } + else it = pools.find(this->parent()); + threads = it.value(); + poolLock.unlock(); + + // Are our children time varying or not? This decides whether + // we run them in single threaded or multi threaded stages + stage_variance.reserve(transforms.size()); + foreach (const br::Transform *transform, transforms) { + stage_variance.append(transform->timeVarying()); + } + + // Additionally, we have a separate stage responsible for reading + // frames from the data source + readStage = new FirstStage(activeFrames); + + processingStages.push_back(readStage); + readStage->stage_id = 0; + readStage->stages = &this->processingStages; + readStage->threads = this->threads; + + // Initialize and link a processing stage for each of our child + // transforms. + int next_stage_id = 1; + bool prev_stage_variance = true; + for (int i =0; i < transforms.size(); i++) + { + if (stage_variance[i]) + // Whether or not the previous stage is multi-threaded controls + // the type of input buffer we need in a single threaded stage. + processingStages.append(new SingleThreadStage(prev_stage_variance)); + else + processingStages.append(new MultiThreadStage(Globals->parallelism)); + + processingStages.last()->stage_id = next_stage_id++; + + // link nextStage pointers, the stage we just appeneded is i+1 since + // the read stage was added before this loop + processingStages[i]->nextStage = processingStages[i+1]; + + processingStages.last()->stages = &this->processingStages; + processingStages.last()->threads = this->threads; + + processingStages.last()->transform = transforms[i]; + prev_stage_variance = stage_variance[i]; + } + + // We also have the last stage, which just puts the output of the + // previous stages on a template list. + collectionStage = new LastStage(prev_stage_variance); + processingStages.append(collectionStage); + collectionStage->stage_id = next_stage_id; + collectionStage->stages = &this->processingStages; + collectionStage->threads = this->threads; + + // the last transform stage points to collection stage + processingStages[processingStages.size() - 2]->nextStage = collectionStage; + + // And the collection stage points to the read stage, because this is + // a ring buffer. + collectionStage->nextStage = readStage; +} } // namespace br