diff --git a/sdk/plugins/stream.cpp b/sdk/plugins/stream.cpp index 5f136e3..9617c38 100644 --- a/sdk/plugins/stream.cpp +++ b/sdk/plugins/stream.cpp @@ -1,5 +1,3 @@ -#define QT_NO_DEBUG_OUTPUT - #include #include #include @@ -29,19 +27,15 @@ public: class SharedBuffer { public: - SharedBuffer(int _maxItems = 200000) : maxItems(_maxItems) {} + SharedBuffer() {} virtual ~SharedBuffer() {} virtual void addItem(FrameData * input)=0; virtual FrameData * getItem()=0; - int getMaxItems() { return maxItems; } - virtual void stoppedInput() =0; virtual void startInput() = 0; -protected: - int maxItems; }; // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex @@ -49,7 +43,7 @@ protected: class SingleBuffer : public SharedBuffer { public: - SingleBuffer(unsigned _maxItems = 20000) : SharedBuffer(_maxItems) { no_input = false; } + SingleBuffer() { no_input = false; } void stoppedInput() { @@ -70,14 +64,8 @@ public: { QMutexLocker bufferLock(&bufferGuard); - // If the buffer is too full, wait for space to become available - if (buffer.size() >= maxItems) { - availableOutputSpace.wait(&bufferGuard); - } - buffer.append(input); - // Wait for certain # of items? availableInput.wakeOne(); } @@ -99,20 +87,114 @@ public: FrameData * output = buffer.first(); buffer.removeFirst(); - if (buffer.size() < maxItems / 2) - availableOutputSpace.wakeAll(); return output; } private: QMutex bufferGuard; QWaitCondition availableInput; - QWaitCondition availableOutputSpace; bool no_input; QList buffer; }; +// For 1 - 1 boundaries, a double buffering scheme +// Producer/consumer read/write from separate buffers, and switch if their +// buffer runs out/overflows. Synchronization is handled by a read/write lock +// threads are "reading" if they are adding to/removing from their individual +// buffer, and writing if they access or swap with the other buffer. +class DoubleBuffer : public SharedBuffer +{ +public: + DoubleBuffer() + { + inputBuffer = &buffer1; + outputBuffer = &buffer2; + } + + void stoppedInput() + { + QWriteLocker bufferLock(&bufferGuard); + no_input = true; + // Release anything waiting for input items. + availableInput.wakeAll(); + } + + // There will be more input + void startInput() + { + QWriteLocker bufferLock(&bufferGuard); + no_input = false; + } + + // called from the producer thread + void addItem(FrameData * input) + { + QReadLocker readLock(&bufferGuard); + inputBuffer->append(input); + availableInput.wakeOne(); + } + + // Called from the consumer thread + FrameData * getItem() { + QReadLocker readLock(&bufferGuard); + + // There is something for us to get + if (!outputBuffer->empty()) { + FrameData * output = outputBuffer->first(); + outputBuffer->removeFirst(); + return output; + } + + // Outputbuffer is empty, try to swap with the input buffer, we need a + // write lock to do that. + readLock.unlock(); + QWriteLocker writeLock(&bufferGuard); + + // Nothing on the input buffer either? + if (inputBuffer->empty()) { + // If nothing else is coming, return null + if (no_input) + return NULL; + //otherwise, wait on the input buffer + availableInput.wait(&bufferGuard); + // Did we get woken up because no more input is coming? if so + // we're done here + if (no_input && inputBuffer->empty()) + return NULL; + } + + // input buffer is non-empty, so swap the buffers + std::swap(inputBuffer, outputBuffer); + + // Return a frame + FrameData * output = outputBuffer->first(); + outputBuffer->removeFirst(); + return output; + } + +private: + // The read-write lock. The thread adding to this buffer can add + // to the current input buffer if it has a read lock. The thread + // removing from this buffer can remove things from the current + // output buffer if it has a read lock, or swap the buffers if it + // has a write lock. + // Checking/modifying no_input requires a write lock. + QReadWriteLock bufferGuard; + QWaitCondition availableInput; + bool no_input; + + // The buffer that is currently being added to + QList * inputBuffer; + // The buffer that is currently being removed from + QList * outputBuffer; + + // The buffers pointed at by inputBuffer/outputBuffer + QList buffer1; + QList buffer2; +}; + + // Interface for sequentially getting data from some data source. // Initialized off of a template, can represent a video file (stored in the template's filename) // or a set of images already loaded into memory stored as multiple matrices in an input template. @@ -166,7 +248,7 @@ public: virtual bool getNext(FrameData & input) = 0; protected: - SingleBuffer allFrames; + DoubleBuffer allFrames; }; // Read a video frame by frame using cv::VideoCapture @@ -526,7 +608,7 @@ public: } // buffer 0 -- output buffer for the read stage - sharedBuffers.append(new SingleBuffer()); + sharedBuffers.append(new DoubleBuffer()); readStage.outputBuffer = sharedBuffers.last(); readStage.stage_id = 0; @@ -542,7 +624,7 @@ public: processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; lastBufferIdx++; - sharedBuffers.append(new SingleBuffer()); + sharedBuffers.append(new DoubleBuffer()); processingStages.last()->outputBuffer = sharedBuffers.last(); processingStages.last()->transform = transforms[i]; }