From b2eb42a0b959c5abbbc1af7400adfd0bd67983e7 Mon Sep 17 00:00:00 2001 From: Charles Otto Date: Fri, 8 Nov 2013 15:50:28 -0500 Subject: [PATCH] Some long overdue cleanup/reorganization in stream.cpp --- openbr/plugins/stream.cpp | 1228 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 1 file changed, 536 insertions(+), 692 deletions(-) diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 982d061..2420705 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -16,6 +16,16 @@ using namespace cv; namespace br { +class Idiocy : public QObject +{ + Q_OBJECT +public: + enum StreamModes { StreamVideo, + DistributeFrames, + Auto}; + + Q_ENUMS(StreamModes) +}; class FrameData { @@ -181,10 +191,138 @@ private: QList buffer2; }; +// Given a template as input, return N templates as output, one at a time on subsequent +// calls to getNext +class TemplateProcessor +{ +public: + virtual ~TemplateProcessor() {} + virtual bool open(Template & input)=0; + virtual bool isOpen()=0; + virtual void close()=0; + virtual bool getNextTemplate(Template & output)=0; +protected: + Template basis; +}; + +static QMutex openLock; + +// Read a video frame by frame using cv::VideoCapture +class VideoReader : public TemplateProcessor +{ +public: + VideoReader() {} + + bool open(Template &input) + { + basis = input; + + // We can open either files (well actually this includes addresses of ip cameras + // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate + // overload of open that takes an integer, not a string. + // So, does this look like an integer? + bool is_int = false; + int anInt = input.file.name.toInt(&is_int); + if (is_int) + { + bool rc = video.open(anInt); + + if (!rc) + { + qDebug("open failed!"); + } + if (!video.isOpened()) + { + qDebug("Video not open!"); + } + } else { + // Yes, we should specify absolute path: + // http://stackoverflow.com/questions/9396459/loading-a-video-in-opencv-in-python + QString fileName = (Globals->path.isEmpty() ? "" : Globals->path + "/") + input.file.name; + // On windows, this appears to not be thread-safe + QMutexLocker lock(&openLock); + video.open(QFileInfo(fileName).absoluteFilePath().toStdString()); + } + + return video.isOpened(); + } + + bool isOpen() { return video.isOpened(); } + + void close() { video.release(); } + + bool getNextTemplate(Template & output) + { + if (!isOpen()) { + qDebug("video source is not open"); + return false; + } + output.file = basis.file; + output.m() = cv::Mat(); + + cv::Mat temp; + bool res = video.read(temp); + + if (!res) { + // The video capture broke, return false. + output.m() = cv::Mat(); + close(); + return false; + } + + // This clone is critical, if we don't do it then the matrix will + // be an alias of an internal buffer of the video source, leading + // to various problems later. + output.m() = temp.clone(); + return true; + } +protected: + cv::VideoCapture video; +}; + + +class DirectReturn : public TemplateProcessor +{ +public: + DirectReturn() + { + data_ok = false; + } + + // We don't do anything, just prepare to return input when getNext is called. + bool open(Template &input) + { + basis = input; + data_ok =true; + return data_ok; + } + + bool isOpen() { return data_ok; } + + void close() + { + data_ok = false; + basis.clear(); + } + + bool getNextTemplate(Template & output) + { + if (!data_ok) + return false; + output = basis; + data_ok = false; + return true; + } + +protected: + // Have we sent our template yet? + bool data_ok; +}; + // Interface for sequentially getting data from some data source. -// Initialized off of a template, can represent a video file (stored in the template's filename) -// or a set of images already loaded into memory stored as multiple matrices in an input template. +// Given a TemplateList, return single template frames sequentially by applying a TemplateProcessor +// to each individual template. class DataSource { public: @@ -196,6 +334,7 @@ public: { allFrames.addItem(new FrameData()); } + frameSource = NULL; } virtual ~DataSource() @@ -209,6 +348,71 @@ public: } } + void close() + { + if (this->frameSource) + { + frameSource->close(); + delete frameSource; + frameSource = NULL; + } + } + + int size() + { + return this->templates.size(); + } + + bool open(TemplateList & input, br::Idiocy::StreamModes _mode) + { + // Set up variables specific to us + current_template_idx = 0; + templates = input; + mode = _mode; + + is_broken = false; + allReturned = false; + + // The last frame isn't initialized yet + final_frame = -1; + // Start our sequence numbers from the input index + next_sequence_number = 0; + + // Actually open the data source + bool open_res = openNextTemplate(); + + // We couldn't open the data source + if (!open_res) { + is_broken = true; + return false; + } + + // Try to get a frame from the global pool + FrameData * firstFrame = allFrames.tryGetItem(); + + // If this fails, things have gone pretty badly. + if (firstFrame == NULL) { + is_broken = true; + return false; + } + + // Read a frame from the video source + bool res = getNextFrame(*firstFrame); + + // the data source broke already, we couldn't even get one frame + // from it even though it claimed to have opened successfully. + if (!res) { + is_broken = true; + return false; + } + + // We read one frame ahead of the last one returned, this allows + // us to know which frame is the final frame when we return it. + lookAhead.append(firstFrame); + return true; + } + + // non-blocking version of getFrame // Returns a NULL FrameData if too many frames are out, or the // data source is broken. Sets last_frame to true iff the FrameData @@ -228,7 +432,7 @@ public: return NULL; // Try to actually read a frame, if this returns false the data source is broken - bool res = getNext(*aFrame); + bool res = getNextFrame(*aFrame); // The datasource broke, update final_frame if (!res) @@ -291,282 +495,119 @@ public: return true; } - bool open(Template & output, int start_index = 0) +protected: + + bool openNextTemplate() { - is_broken = false; - allReturned = false; + if (this->current_template_idx >= this->templates.size()) + return false; - // The last frame isn't initialized yet - final_frame = -1; - // Start our sequence numbers from the input index - next_sequence_number = start_index; + bool open_res = false; + while (!open_res) + { + if (frameSource) + frameSource->close(); - // Actually open the data source - bool open_res = concreteOpen(output); + if (mode == br::Idiocy::Auto) + { + delete frameSource; + if (this->templates[this->current_template_idx].empty()) + frameSource = new VideoReader(); + else + frameSource = new DirectReturn(); + } + else if (mode == br::Idiocy::DistributeFrames) + { + if (!frameSource) + frameSource = new DirectReturn(); + } + else if (mode == br::Idiocy::StreamVideo) + { + if (!frameSource) + frameSource = new VideoReader(); + } - // We couldn't open the data source - if (!open_res) { - is_broken = true; - return false; + open_res = frameSource->open(this->templates[current_template_idx]); + if (!open_res) + { + current_template_idx++; + if (current_template_idx >= this->templates.size()) + return false; + } } + return true; + } - // Try to get a frame from the global pool - FrameData * firstFrame = allFrames.tryGetItem(); + bool getNextFrame(FrameData & output) + { + bool got_frame = false; - // If this fails, things have gone pretty badly. - if (firstFrame == NULL) { - is_broken = true; - return false; - } + Template aTemplate; - // Read a frame from the video source - bool res = getNext(*firstFrame); + while (!got_frame) + { + got_frame = frameSource->getNextTemplate(aTemplate); + + // OK we got a frame + if (got_frame) { + // set the sequence number and tempalte of this frame + output.sequenceNumber = next_sequence_number; + output.data.append(aTemplate); + // set the frame number in the template's metadata + output.data.last().file.set("FrameNumber", output.sequenceNumber); + next_sequence_number++; + return true; + } - // the data source broke already, we couldn't even get one frame - // from it even though it claimed to have opened successfully. - if (!res) { - is_broken = true; - return false; + // advance to the next tempalte in our list + this->current_template_idx++; + bool open_res = this->openNextTemplate(); + + // couldn't get the next template? nothing to do, otherwise we try to read + // a frame at the top of this loop. + if (!open_res) { + return false; + } } - // We read one frame ahead of the last one returned, this allows - // us to know which frame is the final frame when we return it. - lookAhead.append(firstFrame); - return true; + return false; } - /* - * Pure virtual methods - */ + // Index of the template in the templatelist we are currently reading from + int current_template_idx; - // isOpen doesn't appear to particularly work when used on opencv - // VideoCaptures, so we don't use it for anything important. - virtual bool isOpen()=0; - // Called from open, open the data source specified by the input - // template, don't worry about setting any of the state variables - // set in open. - virtual bool concreteOpen(Template & output) = 0; - // Get the next frame from the data source, store the results in - // FrameData (including the actual frame and appropriate sequence - // number). - virtual bool getNext(FrameData & input) = 0; - // close the currently open data source. - virtual void close() = 0; + // What do we do to each template + br::Idiocy::StreamModes mode; + + // list of templates we are workign from + TemplateList templates; + + // processor for the current template + TemplateProcessor * frameSource; int next_sequence_number; -protected: - DoubleBuffer allFrames; int final_frame; bool is_broken; bool allReturned; + + DoubleBuffer allFrames; QList lookAhead; QWaitCondition lastReturned; QMutex last_frame_update; }; -static QMutex openLock; -// Read a video frame by frame using cv::VideoCapture -class VideoDataSource : public DataSource +class ProcessingStage; + +class BasicLoop : public QRunnable, public QFutureInterface { public: - VideoDataSource(int maxFrames) : DataSource(maxFrames) {} - - bool concreteOpen(Template &input) + BasicLoop() { - basis = input; + this->reportStarted(); + } - // We can open either files (well actually this includes addresses of ip cameras - // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate - // overload of open that takes an integer, not a string. - // So, does this look like an integer? - bool is_int = false; - int anInt = input.file.name.toInt(&is_int); - if (is_int) - { - bool rc = video.open(anInt); - - if (!rc) - { - qDebug("open failed!"); - } - if (!video.isOpened()) - { - qDebug("Video not open!"); - } - } else { - // Yes, we should specify absolute path: - // http://stackoverflow.com/questions/9396459/loading-a-video-in-opencv-in-python - QString fileName = (Globals->path.isEmpty() ? "" : Globals->path + "/") + input.file.name; - // On windows, this appears to not be thread-safe - QMutexLocker lock(&openLock); - video.open(QFileInfo(fileName).absoluteFilePath().toStdString()); - } - - return video.isOpened(); - } - - bool isOpen() { return video.isOpened(); } - - void close() { - video.release(); - } - -private: - bool getNext(FrameData & output) - { - if (!isOpen()) { - qDebug("video source is not open"); - return false; - } - - output.data.append(Template(basis.file)); - output.data.last().m() = cv::Mat(); - - output.sequenceNumber = next_sequence_number; - next_sequence_number++; - - cv::Mat temp; - bool res = video.read(temp); - - if (!res) { - // The video capture broke, return false. - output.data.last().m() = cv::Mat(); - close(); - return false; - } - - // This clone is critical, if we don't do it then the matrix will - // be an alias of an internal buffer of the video source, leading - // to various problems later. - output.data.last().m() = temp.clone(); - - output.data.last().file.set("FrameNumber", output.sequenceNumber); - return true; - } - - cv::VideoCapture video; - Template basis; -}; - -// Given a template as input, return its matrices one by one on subsequent calls -// to getNext -class TemplateDataSource : public DataSource -{ -public: - TemplateDataSource(int maxFrames) : DataSource(maxFrames) - { - current_matrix_idx = INT_MAX; - data_ok = false; - } - - // To "open" it we just set appropriate indices, we assume that if this - // is an image, it is already loaded into memory. - bool concreteOpen(Template &input) - { - basis = input; - current_matrix_idx = 0; - - data_ok = current_matrix_idx < basis.size(); - qDebug("concrete open res is %d %d %d", data_ok, current_matrix_idx, basis.size()); - return data_ok; - } - - bool isOpen() { - return data_ok; - } - - void close() - { - current_matrix_idx = INT_MAX; - basis.clear(); - } - -private: - bool getNext(FrameData & output) - { - data_ok = current_matrix_idx < basis.size(); - if (!data_ok) - return false; - - - output.data.append(basis[current_matrix_idx]); - output.data.last().file = basis.file; - current_matrix_idx++; - - output.sequenceNumber = next_sequence_number; - next_sequence_number++; - - output.data.last().file.set("FrameNumber", output.sequenceNumber); - return true; - } - - Template basis; - // Index of the next matrix to output from the template - int current_matrix_idx; - - // is current_matrix_idx in bounds? - bool data_ok; -}; - -class SingleDataSource : public DataSource -{ -public: - SingleDataSource(int maxFrames) : DataSource(maxFrames) - { - data_ok = false; - } - - // To "open" it we just set appropriate indices, we assume that if this - // is an image, it is already loaded into memory. - bool concreteOpen(Template &input) - { - basis = input; -// basis.file.name = (Globals->path.isEmpty() ? "" : Globals->path + "/") + basis.file.name; - - data_ok =true; - return data_ok; - } - - bool isOpen() { - return data_ok; - } - - void close() - { - data_ok = false; - basis.clear(); - } - -private: - bool getNext(FrameData & output) - { - if (!data_ok) - return false; - - output.data.append(basis); - data_ok = false; - return true; - } - - Template basis; - - // Have we sent our template yet? - bool data_ok; -}; - - -class ProcessingStage; - -class BasicLoop : public QRunnable, public QFutureInterface -{ -public: - BasicLoop() - { - this->reportStarted(); - } - - void run(); + void run(); QList * stages; int start_idx; @@ -785,106 +826,129 @@ public: }; -// Appened to the end of a Stream's transform sequence. Collects the output -// from each frame on a single templatelist -class LastStage : public SingleThreadStage +// Semi-functional, doesn't do anything productive outside of stream::train +class CollectSets : public TimeVaryingTransform { + Q_OBJECT public: - LastStage(bool _prev_stage_variance) : SingleThreadStage(_prev_stage_variance) {} - TemplateList getOutput() + CollectSets() : TimeVaryingTransform(false, false) {} + + QList sets; + + void projectUpdate(const TemplateList &src, TemplateList &dst) { - return collectedOutput; + (void) dst; + sets.append(src); } -private: - TemplateList collectedOutput; + void train(const TemplateList & data) + { + (void) data; + } + +}; + +// This stage reads new frames from the data source. +class ReadStage : public SingleThreadStage +{ public: + ReadStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ } + + DataSource dataSource; void reset() { - collectedOutput.clear(); + dataSource.close(); SingleThreadStage::reset(); } FrameData * run(FrameData * input, bool & should_continue) { - if (input == NULL) { - qFatal("NULL input to stage %d", this->stage_id); - } - - if (input->sequenceNumber != next_target) - { - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target); - } - next_target = input->sequenceNumber + 1; - - // add the item to our output buffer - collectedOutput.append(input->data); + if (input == NULL) + qFatal("NULL frame in input stage"); - // Can we enter the read stage? + // Can we enter the next stage? should_continue = nextStage->tryAcquireNextStage(input); - // Is there anything on our input buffer? If so we should start a thread - // in this stage to process that frame. + // Try to get a frame from the datasource, we keep working on + // the frame we have, but we will queue another job for the next + // frame if a frame is currently available. QWriteLocker lock(&statusLock); - FrameData * newItem = inputBuffer->tryGetItem(); - if (!newItem) - { - this->currentStatus = STOPPING; + bool last_frame = false; + FrameData * newFrame = dataSource.tryGetFrame(last_frame); + + // Were we able to get a frame? + if (newFrame) startThread(newFrame); + // If not this stage will enter a stopped state. + else { + currentStatus = STOPPING; } - lock.unlock(); - if (newItem) - startThread(newItem); + lock.unlock(); return input; } - void status(){ - qDebug("Collection stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size()); - } + // The last stage, trying to access the first stage + bool tryAcquireNextStage(FrameData *& input) + { + // Return the frame, was it the last one? + bool was_last = dataSource.returnFrame(input); + input = NULL; -}; + // OK we won't continue. + if (was_last) { + return false; + } -// Semi-functional, doesn't do anything productive outside of stream::train -class CollectSets : public TimeVaryingTransform -{ - Q_OBJECT -public: - CollectSets() : TimeVaryingTransform(false, false) {} + QReadLocker lock(&statusLock); + // If the first stage is already active we will just end. + if (currentStatus == STARTING) + { + return false; + } - QList sets; + // Otherwise we will try to continue, but to do so we have to + // escalate the lock, and sadly there is no way to do so without + // releasing the read-mode lock, and getting a new write-mode lock. + lock.unlock(); - void projectUpdate(const TemplateList &src, TemplateList &dst) - { - (void) dst; - sets.append(src); - } + QWriteLocker writeLock(&statusLock); + // currentStatus might have changed in the gap between releasing the read + // lock and getting the write lock. + if (currentStatus == STARTING) + { + return false; + } - void train(const TemplateList & data) - { - (void) data; + bool last_frame = false; + // Try to get a frame from the data source, if we get one we will + // continue to the first stage. + input = dataSource.tryGetFrame(last_frame); + + if (!input) { + return false; + } + + currentStatus = STARTING; + + return true; } + void status(){ + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size()); + } }; -class FirstStage; - class DirectStreamTransform : public CompositeTransform { Q_OBJECT public: - enum StreamModes { StreamVideo, - DistributeFrames, - Auto}; - - Q_ENUMS(StreamModes) - Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames) - Q_PROPERTY(StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode) + Q_PROPERTY(br::Idiocy::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode) BR_PROPERTY(int, activeFrames, 100) - BR_PROPERTY(StreamModes, readMode, Auto) + BR_PROPERTY(br::Idiocy::StreamModes, readMode, br::Idiocy::Auto) friend class StreamTransfrom; @@ -958,8 +1022,163 @@ public: // on all child transforms as part of projectUpdate } - void projectUpdate(const TemplateList & src, TemplateList & dst); - void init(); + // start processing, consider all templates in src a continuous + // 'video' + void projectUpdate(const TemplateList & src, TemplateList & dst) + { + dst = src; + + bool res = readStage->dataSource.open(dst,readMode); + if (!res) { + qDebug("stream failed to open %s", qPrintable(dst[0].file.name)); + return; + } + + // Start the first thread in the stream. + QWriteLocker lock(&readStage->statusLock); + readStage->currentStatus = SingleThreadStage::STARTING; + + // We have to get a frame before starting the thread + bool last_frame = false; + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame); + if (firstFrame == NULL) + qFatal("Failed to read first frame of video"); + + readStage->startThread(firstFrame); + lock.unlock(); + + // Wait for the stream to process the last frame available from + // the data source. + bool wait_res = false; + wait_res = readStage->dataSource.waitLast(); + + // Now that there are no more incoming frames, call finalize + // on each transform in turn to collect any last templates + // they wish to issue. + TemplateList final_output; + + // Push finalize through the stages + for (int i=0; i < this->transforms.size(); i++) + { + TemplateList output_set; + transforms[i]->finalize(output_set); + + for (int j=i+1; j < transforms.size();j++) + { + transforms[j]->projectUpdate(output_set); + } + final_output.append(output_set); + } + + // dst is set to all output received by the final stage, along + // with anything output via the calls to finalize. + //dst = collectionStage->getOutput(); + foreach(const TemplateList & list, collector->sets) { + dst.append(list); + } + collector->sets.clear(); + + dst.append(final_output); + + foreach(ProcessingStage * stage, processingStages) { + stage->reset(); + } + } + + + // Create and link stages + void init() + { + if (transforms.isEmpty()) return; + + for (int i=0; i < processingStages.size();i++) + delete processingStages[i]; + processingStages.clear(); + + // call CompositeTransform::init so that trainable is set + // correctly. + CompositeTransform::init(); + + // We share a thread pool across streams attached to the same + // parent tranform, retrieve or create a thread pool based + // on our parent transform. + QMutexLocker poolLock(&poolsAccess); + QHash::Iterator it; + if (!pools.contains(this->parent())) { + it = pools.insert(this->parent(), new QThreadPool(this->parent())); + it.value()->setMaxThreadCount(Globals->parallelism); + } + else it = pools.find(this->parent()); + threads = it.value(); + poolLock.unlock(); + + // Are our children time varying or not? This decides whether + // we run them in single threaded or multi threaded stages + stage_variance.clear(); + stage_variance.reserve(transforms.size()); + foreach (const br::Transform *transform, transforms) { + stage_variance.append(transform->timeVarying()); + } + + // Additionally, we have a separate stage responsible for reading + // frames from the data source + readStage = new ReadStage(activeFrames); + + processingStages.push_back(readStage); + readStage->stage_id = 0; + readStage->stages = &this->processingStages; + readStage->threads = this->threads; + + // Initialize and link a processing stage for each of our child + // transforms. + int next_stage_id = 1; + bool prev_stage_variance = true; + for (int i =0; i < transforms.size(); i++) + { + if (stage_variance[i]) + // Whether or not the previous stage is multi-threaded controls + // the type of input buffer we need in a single threaded stage. + processingStages.append(new SingleThreadStage(prev_stage_variance)); + else + processingStages.append(new MultiThreadStage(Globals->parallelism)); + + processingStages.last()->stage_id = next_stage_id++; + + // link nextStage pointers, the stage we just appeneded is i+1 since + // the read stage was added before this loop + processingStages[i]->nextStage = processingStages[i+1]; + + processingStages.last()->stages = &this->processingStages; + processingStages.last()->threads = this->threads; + + processingStages.last()->transform = transforms[i]; + prev_stage_variance = stage_variance[i]; + } + + // We also have the last stage, which just puts the output of the + // previous stages on a template list. + collectionStage = new SingleThreadStage(prev_stage_variance); + collectionStage->transform = this->collector.data(); + + + processingStages.append(collectionStage); + collectionStage->stage_id = next_stage_id; + collectionStage->stages = &this->processingStages; + collectionStage->threads = this->threads; + + // the last transform stage points to collection stage + processingStages[processingStages.size() - 2]->nextStage = collectionStage; + + // And the collection stage points to the read stage, because this is + // a ring buffer. + collectionStage->nextStage = readStage; + } + + DirectStreamTransform() + { + this->collector = QSharedPointer(new CollectSets()); + } + ~DirectStreamTransform() { // Delete all the stages @@ -972,8 +1191,9 @@ public: protected: QList stage_variance; - FirstStage * readStage; - LastStage * collectionStage; + ReadStage * readStage; + SingleThreadStage * collectionStage; + QSharedPointer collector; QList processingStages; @@ -1023,10 +1243,10 @@ public: } Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames) - Q_PROPERTY(br::DirectStreamTransform::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode) + Q_PROPERTY(br::Idiocy::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode) BR_PROPERTY(int, activeFrames, 100) - BR_PROPERTY(br::DirectStreamTransform::StreamModes, readMode, br::DirectStreamTransform::Auto) + BR_PROPERTY(br::Idiocy::StreamModes, readMode, br::Idiocy::Auto) bool timeVarying() const { return true; } @@ -1150,382 +1370,6 @@ private: BR_REGISTER(Transform, StreamTransform) -// Given a templatelist as input, create appropriate data source for each -// individual template -class DataSourceManager : public DataSource -{ -public: - DataSourceManager(int activeFrames=100) : DataSource(activeFrames) - { - actualSource = NULL; - } - - ~DataSourceManager() - { - close(); - } - - int size() - { - return this->allFrames.size(); - } - - void close() - { - if (actualSource) { - actualSource->close(); - delete actualSource; - actualSource = NULL; - } - } - - // We are used through a call to open(TemplateList) - bool open(TemplateList & input, DirectStreamTransform::StreamModes _mode) - { - // Set up variables specific to us - current_template_idx = 0; - templates = input; - mode = _mode; - - // Call datasourece::open on the first template to set up - // state variables - return DataSource::open(templates[current_template_idx]); - } - void projectUpdate(const TemplateList & src, TemplateList & dst); - - // Create an actual data source of appropriate type for this template - // (initially called via the call to DataSource::open, called later - // as we run out of frames on our templates). - bool concreteOpen(Template & input) - { - close(); - - bool open_res = false; - - if (mode == DirectStreamTransform::DistributeFrames) - { - actualSource = new SingleDataSource(0); - open_res = actualSource->concreteOpen(input); - } - // Input has no matrices? Its probably a video that hasn't been loaded yet - else if (mode == DirectStreamTransform::StreamVideo || (mode == DirectStreamTransform::Auto && input.empty()) ) { - actualSource = new VideoDataSource(0); - open_res = actualSource->concreteOpen(input); - } - // If the input is not empty, we assume it is a set of frames already - // in memory. - else { - qDebug("in template open"); - actualSource = new TemplateDataSource(0); - open_res = actualSource->concreteOpen(input); - } - - // The data source failed to open - if (!open_res) { - delete actualSource; - actualSource = NULL; - return false; - } - return true; - } - - bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } - -protected: - // Index of the template in the templatelist we are currently reading from - int current_template_idx; - DirectStreamTransform::StreamModes mode; - TemplateList templates; - DataSource * actualSource; - // Get the next frame, if we run out of frames on the current template - // move on to the next one. - bool getNext(FrameData & output) - { - bool res = actualSource->getNext(output); - output.sequenceNumber = next_sequence_number; - - // OK we got a frame - if (res) { - // Override the sequence number set by actualSource - output.data.last().file.set("FrameNumber", output.sequenceNumber); - next_sequence_number++; -// if (output.data.last().last().empty()) -// qDebug("broken matrix"); - return true; - } - - // We didn't get a frame, try to move on to the next template. - while(!res) { - output.data.clear(); - current_template_idx++; - - // No more templates? We're done - if (current_template_idx >= templates.size()) - return false; - - // open the next data source - bool open_res = concreteOpen(templates[current_template_idx]); - // We couldn't open it, give up? We could maybe continue here - // but don't currently. - if (!open_res) - return false; - - // get a frame from the newly opened data source, if that fails - // we continue to open the next one. - res = actualSource->getNext(output); - } - // Finally, set the sequence number for the frame we actually return. - output.sequenceNumber = next_sequence_number++; - output.data.last().file.set("FrameNumber", output.sequenceNumber); - -// if (output.data.last().last().empty()) -// qDebug("broken matrix"); - - return res; - } - -}; - -// This stage reads new frames from the data source. -class FirstStage : public SingleThreadStage -{ -public: - FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ } - - DataSourceManager dataSource; - - void reset() - { - dataSource.close(); - SingleThreadStage::reset(); - } - - FrameData * run(FrameData * input, bool & should_continue) - { - if (input == NULL) - qFatal("NULL frame in input stage"); - - // Can we enter the next stage? - should_continue = nextStage->tryAcquireNextStage(input); - - // Try to get a frame from the datasource, we keep working on - // the frame we have, but we will queue another job for the next - // frame if a frame is currently available. - QWriteLocker lock(&statusLock); - bool last_frame = false; - FrameData * newFrame = dataSource.tryGetFrame(last_frame); - - // Were we able to get a frame? - if (newFrame) startThread(newFrame); - // If not this stage will enter a stopped state. - else { - currentStatus = STOPPING; - } - - lock.unlock(); - - return input; - } - - // The last stage, trying to access the first stage - bool tryAcquireNextStage(FrameData *& input) - { - // Return the frame, was it the last one? - bool was_last = dataSource.returnFrame(input); - input = NULL; - - // OK we won't continue. - if (was_last) { - return false; - } - - QReadLocker lock(&statusLock); - // If the first stage is already active we will just end. - if (currentStatus == STARTING) - { - return false; - } - - // Otherwise we will try to continue, but to do so we have to - // escalate the lock, and sadly there is no way to do so without - // releasing the read-mode lock, and getting a new write-mode lock. - lock.unlock(); - - QWriteLocker writeLock(&statusLock); - // currentStatus might have changed in the gap between releasing the read - // lock and getting the write lock. - if (currentStatus == STARTING) - { - return false; - } - - bool last_frame = false; - // Try to get a frame from the data source, if we get one we will - // continue to the first stage. - input = dataSource.tryGetFrame(last_frame); - - if (!input) { - return false; - } - - currentStatus = STARTING; - - return true; - } - - void status(){ - qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size()); - } - - -}; - - -// start processing, consider all templates in src a continuous -// 'video' -void DirectStreamTransform::projectUpdate(const TemplateList & src, TemplateList & dst) -{ - dst = src; - - bool res = readStage->dataSource.open(dst,readMode); - if (!res) { - qDebug("stream failed to open %s", qPrintable(dst[0].file.name)); - return; - } - - // Start the first thread in the stream. - QWriteLocker lock(&readStage->statusLock); - readStage->currentStatus = SingleThreadStage::STARTING; - - // We have to get a frame before starting the thread - bool last_frame = false; - FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame); - if (firstFrame == NULL) - qFatal("Failed to read first frame of video"); - - readStage->startThread(firstFrame); - lock.unlock(); - - // Wait for the stream to process the last frame available from - // the data source. - bool wait_res = false; - wait_res = readStage->dataSource.waitLast(); - - // Now that there are no more incoming frames, call finalize - // on each transform in turn to collect any last templates - // they wish to issue. - TemplateList final_output; - - // Push finalize through the stages - for (int i=0; i < this->transforms.size(); i++) - { - TemplateList output_set; - transforms[i]->finalize(output_set); - - for (int j=i+1; j < transforms.size();j++) - { - transforms[j]->projectUpdate(output_set); - } - final_output.append(output_set); - } - - // dst is set to all output received by the final stage, along - // with anything output via the calls to finalize. - dst = collectionStage->getOutput(); - dst.append(final_output); - - foreach(ProcessingStage * stage, processingStages) { - stage->reset(); - } -} - - -// Create and link stages -void DirectStreamTransform::init() -{ - if (transforms.isEmpty()) return; - - for (int i=0; i < processingStages.size();i++) - delete processingStages[i]; - processingStages.clear(); - - // call CompositeTransform::init so that trainable is set - // correctly. - CompositeTransform::init(); - - // We share a thread pool across streams attached to the same - // parent tranform, retrieve or create a thread pool based - // on our parent transform. - QMutexLocker poolLock(&poolsAccess); - QHash::Iterator it; - if (!pools.contains(this->parent())) { - it = pools.insert(this->parent(), new QThreadPool(this->parent())); - it.value()->setMaxThreadCount(Globals->parallelism); - } - else it = pools.find(this->parent()); - threads = it.value(); - poolLock.unlock(); - - // Are our children time varying or not? This decides whether - // we run them in single threaded or multi threaded stages - stage_variance.clear(); - stage_variance.reserve(transforms.size()); - foreach (const br::Transform *transform, transforms) { - stage_variance.append(transform->timeVarying()); - } - - // Additionally, we have a separate stage responsible for reading - // frames from the data source - readStage = new FirstStage(activeFrames); - - processingStages.push_back(readStage); - readStage->stage_id = 0; - readStage->stages = &this->processingStages; - readStage->threads = this->threads; - - // Initialize and link a processing stage for each of our child - // transforms. - int next_stage_id = 1; - bool prev_stage_variance = true; - for (int i =0; i < transforms.size(); i++) - { - if (stage_variance[i]) - // Whether or not the previous stage is multi-threaded controls - // the type of input buffer we need in a single threaded stage. - processingStages.append(new SingleThreadStage(prev_stage_variance)); - else - processingStages.append(new MultiThreadStage(Globals->parallelism)); - - processingStages.last()->stage_id = next_stage_id++; - - // link nextStage pointers, the stage we just appeneded is i+1 since - // the read stage was added before this loop - processingStages[i]->nextStage = processingStages[i+1]; - - processingStages.last()->stages = &this->processingStages; - processingStages.last()->threads = this->threads; - - processingStages.last()->transform = transforms[i]; - prev_stage_variance = stage_variance[i]; - } - - // We also have the last stage, which just puts the output of the - // previous stages on a template list. - collectionStage = new LastStage(prev_stage_variance); - processingStages.append(collectionStage); - collectionStage->stage_id = next_stage_id; - collectionStage->stages = &this->processingStages; - collectionStage->threads = this->threads; - - // the last transform stage points to collection stage - processingStages[processingStages.size() - 2]->nextStage = collectionStage; - - // And the collection stage points to the read stage, because this is - // a ring buffer. - collectionStage->nextStage = readStage; -} - - } // namespace br #include "stream.moc" -- libgit2 0.21.4