diff --git a/sdk/plugins/stream.cpp b/sdk/plugins/stream.cpp index a61f9fb..9617c38 100644 --- a/sdk/plugins/stream.cpp +++ b/sdk/plugins/stream.cpp @@ -1,5 +1,3 @@ -#define QT_NO_DEBUG_OUTPUT - #include #include #include @@ -29,19 +27,15 @@ public: class SharedBuffer { public: - SharedBuffer(int _maxItems = 200) : maxItems(_maxItems) {} + SharedBuffer() {} virtual ~SharedBuffer() {} virtual void addItem(FrameData * input)=0; virtual FrameData * getItem()=0; - int getMaxItems() { return maxItems; } - virtual void stoppedInput() =0; virtual void startInput() = 0; -protected: - int maxItems; }; // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex @@ -49,7 +43,7 @@ protected: class SingleBuffer : public SharedBuffer { public: - SingleBuffer(unsigned _maxItems = 20) : SharedBuffer(_maxItems) { no_input = false; } + SingleBuffer() { no_input = false; } void stoppedInput() { @@ -70,14 +64,8 @@ public: { QMutexLocker bufferLock(&bufferGuard); - // If the buffer is too full, wait for space to become available - if (buffer.size() >= maxItems) { - availableOutputSpace.wait(&bufferGuard); - } - buffer.append(input); - // Wait for certain # of items? availableInput.wakeOne(); } @@ -99,61 +87,176 @@ public: FrameData * output = buffer.first(); buffer.removeFirst(); - if (buffer.size() < maxItems / 2) - availableOutputSpace.wakeAll(); return output; } private: QMutex bufferGuard; QWaitCondition availableInput; - QWaitCondition availableOutputSpace; bool no_input; QList buffer; }; -// Interface for sequentially getting data from some data source. -// Initialized off of a template, can represent a video file (stored in the template's filename) -// or a set of images already loaded into memory stored as multiple matrices in an input template. -class DataSource +// For 1 - 1 boundaries, a double buffering scheme +// Producer/consumer read/write from separate buffers, and switch if their +// buffer runs out/overflows. Synchronization is handled by a read/write lock +// threads are "reading" if they are adding to/removing from their individual +// buffer, and writing if they access or swap with the other buffer. +class DoubleBuffer : public SharedBuffer { public: - DataSource() {} - virtual ~DataSource() {} + DoubleBuffer() + { + inputBuffer = &buffer1; + outputBuffer = &buffer2; + } - virtual FrameData * getNext() = 0; - virtual void close() = 0; - virtual bool open(Template & input) = 0; - virtual bool isOpen() = 0; + void stoppedInput() + { + QWriteLocker bufferLock(&bufferGuard); + no_input = true; + // Release anything waiting for input items. + availableInput.wakeAll(); + } + + // There will be more input + void startInput() + { + QWriteLocker bufferLock(&bufferGuard); + no_input = false; + } + + // called from the producer thread + void addItem(FrameData * input) + { + QReadLocker readLock(&bufferGuard); + inputBuffer->append(input); + availableInput.wakeOne(); + } + + // Called from the consumer thread + FrameData * getItem() { + QReadLocker readLock(&bufferGuard); + + // There is something for us to get + if (!outputBuffer->empty()) { + FrameData * output = outputBuffer->first(); + outputBuffer->removeFirst(); + return output; + } + + // Outputbuffer is empty, try to swap with the input buffer, we need a + // write lock to do that. + readLock.unlock(); + QWriteLocker writeLock(&bufferGuard); + + // Nothing on the input buffer either? + if (inputBuffer->empty()) { + // If nothing else is coming, return null + if (no_input) + return NULL; + //otherwise, wait on the input buffer + availableInput.wait(&bufferGuard); + // Did we get woken up because no more input is coming? if so + // we're done here + if (no_input && inputBuffer->empty()) + return NULL; + } + + // input buffer is non-empty, so swap the buffers + std::swap(inputBuffer, outputBuffer); + + // Return a frame + FrameData * output = outputBuffer->first(); + outputBuffer->removeFirst(); + return output; + } + +private: + // The read-write lock. The thread adding to this buffer can add + // to the current input buffer if it has a read lock. The thread + // removing from this buffer can remove things from the current + // output buffer if it has a read lock, or swap the buffers if it + // has a write lock. + // Checking/modifying no_input requires a write lock. + QReadWriteLock bufferGuard; + QWaitCondition availableInput; + bool no_input; + + // The buffer that is currently being added to + QList * inputBuffer; + // The buffer that is currently being removed from + QList * outputBuffer; + + // The buffers pointed at by inputBuffer/outputBuffer + QList buffer1; + QList buffer2; }; -// Read a video frame by frame using cv::VideoCapture -class VideoDataSource : public DataSource + +// Interface for sequentially getting data from some data source. +// Initialized off of a template, can represent a video file (stored in the template's filename) +// or a set of images already loaded into memory stored as multiple matrices in an input template. +class DataSource { public: - VideoDataSource() {} - - FrameData * getNext() + DataSource(int maxFrames=100) { - if (!isOpen()) - return NULL; + for (int i=0; i < maxFrames;i++) + { + allFrames.addItem(new FrameData()); + } + allFrames.startInput(); + } - FrameData * output = new FrameData(); - output->data.append(Template(basis.file)); - output->data.last().append(cv::Mat()); + virtual ~DataSource() + { + allFrames.stoppedInput(); + while (true) + { + FrameData * frame = allFrames.getItem(); + if (frame == NULL) + break; + delete frame; + } + } - output->sequenceNumber = next_idx; - next_idx++; + FrameData * getFrame() + { + FrameData * aFrame = allFrames.getItem(); + aFrame->data.clear(); + aFrame->sequenceNumber = -1; - bool res = video.read(output->data.last().last()); + bool res = getNext(*aFrame); if (!res) { - delete output; + allFrames.addItem(aFrame); return NULL; } - return output; + return aFrame; } + void returnFrame(FrameData * inputFrame) + { + allFrames.addItem(inputFrame); + } + + virtual void close() = 0; + virtual bool open(Template & output) = 0; + virtual bool isOpen() = 0; + + virtual bool getNext(FrameData & input) = 0; + +protected: + DoubleBuffer allFrames; +}; + +// Read a video frame by frame using cv::VideoCapture +class VideoDataSource : public DataSource +{ +public: + VideoDataSource(int maxFrames) : DataSource(maxFrames) {} + bool open(Template &input) { next_idx = 0; @@ -167,6 +270,24 @@ public: void close() { video.release(); } private: + bool getNext(FrameData & output) + { + if (!isOpen()) + return false; + + output.data.append(Template(basis.file)); + output.data.last().append(cv::Mat()); + + output.sequenceNumber = next_idx; + next_idx++; + + bool res = video.read(output.data.last().last()); + if (!res) { + return false; + } + return true; + } + cv::VideoCapture video; Template basis; int next_idx; @@ -177,21 +298,9 @@ private: class TemplateDataSource : public DataSource { public: - TemplateDataSource() { current_idx = INT_MAX; } - - FrameData * getNext() + TemplateDataSource(int maxFrames) : DataSource(maxFrames) { - if (!isOpen()) - return NULL; - - FrameData * output = new FrameData(); - output->data.append(basis[current_idx]); - current_idx++; - - output->sequenceNumber = next_sequence; - next_sequence++; - - return output; + current_idx = INT_MAX; } bool open(Template &input) @@ -211,6 +320,20 @@ public: } private: + bool getNext(FrameData & output) + { + if (!isOpen()) + return false; + + output.data.append(basis[current_idx]); + current_idx++; + + output.sequenceNumber = next_sequence; + next_sequence++; + + return true; + } + Template basis; int current_idx; int next_sequence; @@ -232,12 +355,6 @@ public: close(); } - FrameData * getNext() - { - if (!isOpen()) return NULL; - return actualSource->getNext(); - } - void close() { if (actualSource) { @@ -253,13 +370,13 @@ public: bool open_res = false; // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { - actualSource = new VideoDataSource(); + actualSource = new VideoDataSource(0); open_res = actualSource->open(input); qDebug("created video resource status %d", open_res); } else { // create frame dealer - actualSource = new TemplateDataSource(); + actualSource = new TemplateDataSource(0); open_res = actualSource->open(input); } if (!isOpen()) { @@ -274,6 +391,11 @@ public: protected: DataSource * actualSource; + bool getNext(FrameData & output) + { + return actualSource->getNext(output); + } + }; class ProcessingStage : public QRunnable @@ -347,7 +469,8 @@ public: { forever { - FrameData * aFrame = dataSource.getNext(); + //FrameData * aFrame = dataSource.getNext(); + FrameData * aFrame = dataSource.getFrame(); if (aFrame == NULL) break; outputBuffer->addItem(aFrame); @@ -367,6 +490,7 @@ public: private: TemplateList collectedOutput; public: + DataSource * data; void run() { forever @@ -377,7 +501,8 @@ public: break; // Just put the item on collectedOutput collectedOutput.append(frame->data); - delete frame; + // Return the frame to the input frame buffer + data->returnFrame(frame); } this->markStop(); } @@ -483,7 +608,7 @@ public: } // buffer 0 -- output buffer for the read stage - sharedBuffers.append(new SingleBuffer()); + sharedBuffers.append(new DoubleBuffer()); readStage.outputBuffer = sharedBuffers.last(); readStage.stage_id = 0; @@ -499,12 +624,13 @@ public: processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; lastBufferIdx++; - sharedBuffers.append(new SingleBuffer()); + sharedBuffers.append(new DoubleBuffer()); processingStages.last()->outputBuffer = sharedBuffers.last(); processingStages.last()->transform = transforms[i]; } collectionStage.inputBuffer = sharedBuffers.last(); + collectionStage.data = &readStage.dataSource; collectionStage.stage_id = next_stage_id; }