Commit d43c8217fc072cc0e34aae2894902473be314898
1 parent
46985ff5
Update buffer classes slightly
Simplify SingleBuffer since controlling the total amount of frames being processed works quite well. Add a double-buffer scheme, which leads to a really marginal performance improvement (but I think it's cool).
Showing
1 changed file
with
102 additions
and
20 deletions
sdk/plugins/stream.cpp
| 1 | -#define QT_NO_DEBUG_OUTPUT | ||
| 2 | - | ||
| 3 | #include <openbr_plugin.h> | 1 | #include <openbr_plugin.h> |
| 4 | #include <QReadWriteLock> | 2 | #include <QReadWriteLock> |
| 5 | #include <QWaitCondition> | 3 | #include <QWaitCondition> |
| @@ -29,19 +27,15 @@ public: | @@ -29,19 +27,15 @@ public: | ||
| 29 | class SharedBuffer | 27 | class SharedBuffer |
| 30 | { | 28 | { |
| 31 | public: | 29 | public: |
| 32 | - SharedBuffer(int _maxItems = 200000) : maxItems(_maxItems) {} | 30 | + SharedBuffer() {} |
| 33 | virtual ~SharedBuffer() {} | 31 | virtual ~SharedBuffer() {} |
| 34 | 32 | ||
| 35 | virtual void addItem(FrameData * input)=0; | 33 | virtual void addItem(FrameData * input)=0; |
| 36 | 34 | ||
| 37 | virtual FrameData * getItem()=0; | 35 | virtual FrameData * getItem()=0; |
| 38 | 36 | ||
| 39 | - int getMaxItems() { return maxItems; } | ||
| 40 | - | ||
| 41 | virtual void stoppedInput() =0; | 37 | virtual void stoppedInput() =0; |
| 42 | virtual void startInput() = 0; | 38 | virtual void startInput() = 0; |
| 43 | -protected: | ||
| 44 | - int maxItems; | ||
| 45 | }; | 39 | }; |
| 46 | 40 | ||
| 47 | // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex | 41 | // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex |
| @@ -49,7 +43,7 @@ protected: | @@ -49,7 +43,7 @@ protected: | ||
| 49 | class SingleBuffer : public SharedBuffer | 43 | class SingleBuffer : public SharedBuffer |
| 50 | { | 44 | { |
| 51 | public: | 45 | public: |
| 52 | - SingleBuffer(unsigned _maxItems = 20000) : SharedBuffer(_maxItems) { no_input = false; } | 46 | + SingleBuffer() { no_input = false; } |
| 53 | 47 | ||
| 54 | void stoppedInput() | 48 | void stoppedInput() |
| 55 | { | 49 | { |
| @@ -70,14 +64,8 @@ public: | @@ -70,14 +64,8 @@ public: | ||
| 70 | { | 64 | { |
| 71 | QMutexLocker bufferLock(&bufferGuard); | 65 | QMutexLocker bufferLock(&bufferGuard); |
| 72 | 66 | ||
| 73 | - // If the buffer is too full, wait for space to become available | ||
| 74 | - if (buffer.size() >= maxItems) { | ||
| 75 | - availableOutputSpace.wait(&bufferGuard); | ||
| 76 | - } | ||
| 77 | - | ||
| 78 | buffer.append(input); | 67 | buffer.append(input); |
| 79 | 68 | ||
| 80 | - // Wait for certain # of items? | ||
| 81 | availableInput.wakeOne(); | 69 | availableInput.wakeOne(); |
| 82 | } | 70 | } |
| 83 | 71 | ||
| @@ -99,20 +87,114 @@ public: | @@ -99,20 +87,114 @@ public: | ||
| 99 | 87 | ||
| 100 | FrameData * output = buffer.first(); | 88 | FrameData * output = buffer.first(); |
| 101 | buffer.removeFirst(); | 89 | buffer.removeFirst(); |
| 102 | - if (buffer.size() < maxItems / 2) | ||
| 103 | - availableOutputSpace.wakeAll(); | ||
| 104 | return output; | 90 | return output; |
| 105 | } | 91 | } |
| 106 | 92 | ||
| 107 | private: | 93 | private: |
| 108 | QMutex bufferGuard; | 94 | QMutex bufferGuard; |
| 109 | QWaitCondition availableInput; | 95 | QWaitCondition availableInput; |
| 110 | - QWaitCondition availableOutputSpace; | ||
| 111 | bool no_input; | 96 | bool no_input; |
| 112 | 97 | ||
| 113 | QList<FrameData *> buffer; | 98 | QList<FrameData *> buffer; |
| 114 | }; | 99 | }; |
| 115 | 100 | ||
| 101 | +// For 1 - 1 boundaries, a double buffering scheme | ||
| 102 | +// Producer/consumer read/write from separate buffers, and switch if their | ||
| 103 | +// buffer runs out/overflows. Synchronization is handled by a read/write lock | ||
| 104 | +// threads are "reading" if they are adding to/removing from their individual | ||
| 105 | +// buffer, and writing if they access or swap with the other buffer. | ||
| 106 | +class DoubleBuffer : public SharedBuffer | ||
| 107 | +{ | ||
| 108 | +public: | ||
| 109 | + DoubleBuffer() | ||
| 110 | + { | ||
| 111 | + inputBuffer = &buffer1; | ||
| 112 | + outputBuffer = &buffer2; | ||
| 113 | + } | ||
| 114 | + | ||
| 115 | + void stoppedInput() | ||
| 116 | + { | ||
| 117 | + QWriteLocker bufferLock(&bufferGuard); | ||
| 118 | + no_input = true; | ||
| 119 | + // Release anything waiting for input items. | ||
| 120 | + availableInput.wakeAll(); | ||
| 121 | + } | ||
| 122 | + | ||
| 123 | + // There will be more input | ||
| 124 | + void startInput() | ||
| 125 | + { | ||
| 126 | + QWriteLocker bufferLock(&bufferGuard); | ||
| 127 | + no_input = false; | ||
| 128 | + } | ||
| 129 | + | ||
| 130 | + // called from the producer thread | ||
| 131 | + void addItem(FrameData * input) | ||
| 132 | + { | ||
| 133 | + QReadLocker readLock(&bufferGuard); | ||
| 134 | + inputBuffer->append(input); | ||
| 135 | + availableInput.wakeOne(); | ||
| 136 | + } | ||
| 137 | + | ||
| 138 | + // Called from the consumer thread | ||
| 139 | + FrameData * getItem() { | ||
| 140 | + QReadLocker readLock(&bufferGuard); | ||
| 141 | + | ||
| 142 | + // There is something for us to get | ||
| 143 | + if (!outputBuffer->empty()) { | ||
| 144 | + FrameData * output = outputBuffer->first(); | ||
| 145 | + outputBuffer->removeFirst(); | ||
| 146 | + return output; | ||
| 147 | + } | ||
| 148 | + | ||
| 149 | + // Outputbuffer is empty, try to swap with the input buffer, we need a | ||
| 150 | + // write lock to do that. | ||
| 151 | + readLock.unlock(); | ||
| 152 | + QWriteLocker writeLock(&bufferGuard); | ||
| 153 | + | ||
| 154 | + // Nothing on the input buffer either? | ||
| 155 | + if (inputBuffer->empty()) { | ||
| 156 | + // If nothing else is coming, return null | ||
| 157 | + if (no_input) | ||
| 158 | + return NULL; | ||
| 159 | + //otherwise, wait on the input buffer | ||
| 160 | + availableInput.wait(&bufferGuard); | ||
| 161 | + // Did we get woken up because no more input is coming? if so | ||
| 162 | + // we're done here | ||
| 163 | + if (no_input && inputBuffer->empty()) | ||
| 164 | + return NULL; | ||
| 165 | + } | ||
| 166 | + | ||
| 167 | + // input buffer is non-empty, so swap the buffers | ||
| 168 | + std::swap(inputBuffer, outputBuffer); | ||
| 169 | + | ||
| 170 | + // Return a frame | ||
| 171 | + FrameData * output = outputBuffer->first(); | ||
| 172 | + outputBuffer->removeFirst(); | ||
| 173 | + return output; | ||
| 174 | + } | ||
| 175 | + | ||
| 176 | +private: | ||
| 177 | + // The read-write lock. The thread adding to this buffer can add | ||
| 178 | + // to the current input buffer if it has a read lock. The thread | ||
| 179 | + // removing from this buffer can remove things from the current | ||
| 180 | + // output buffer if it has a read lock, or swap the buffers if it | ||
| 181 | + // has a write lock. | ||
| 182 | + // Checking/modifying no_input requires a write lock. | ||
| 183 | + QReadWriteLock bufferGuard; | ||
| 184 | + QWaitCondition availableInput; | ||
| 185 | + bool no_input; | ||
| 186 | + | ||
| 187 | + // The buffer that is currently being added to | ||
| 188 | + QList<FrameData *> * inputBuffer; | ||
| 189 | + // The buffer that is currently being removed from | ||
| 190 | + QList<FrameData *> * outputBuffer; | ||
| 191 | + | ||
| 192 | + // The buffers pointed at by inputBuffer/outputBuffer | ||
| 193 | + QList<FrameData *> buffer1; | ||
| 194 | + QList<FrameData *> buffer2; | ||
| 195 | +}; | ||
| 196 | + | ||
| 197 | + | ||
| 116 | // Interface for sequentially getting data from some data source. | 198 | // Interface for sequentially getting data from some data source. |
| 117 | // Initialized off of a template, can represent a video file (stored in the template's filename) | 199 | // Initialized off of a template, can represent a video file (stored in the template's filename) |
| 118 | // or a set of images already loaded into memory stored as multiple matrices in an input template. | 200 | // or a set of images already loaded into memory stored as multiple matrices in an input template. |
| @@ -166,7 +248,7 @@ public: | @@ -166,7 +248,7 @@ public: | ||
| 166 | virtual bool getNext(FrameData & input) = 0; | 248 | virtual bool getNext(FrameData & input) = 0; |
| 167 | 249 | ||
| 168 | protected: | 250 | protected: |
| 169 | - SingleBuffer allFrames; | 251 | + DoubleBuffer allFrames; |
| 170 | }; | 252 | }; |
| 171 | 253 | ||
| 172 | // Read a video frame by frame using cv::VideoCapture | 254 | // Read a video frame by frame using cv::VideoCapture |
| @@ -526,7 +608,7 @@ public: | @@ -526,7 +608,7 @@ public: | ||
| 526 | } | 608 | } |
| 527 | 609 | ||
| 528 | // buffer 0 -- output buffer for the read stage | 610 | // buffer 0 -- output buffer for the read stage |
| 529 | - sharedBuffers.append(new SingleBuffer()); | 611 | + sharedBuffers.append(new DoubleBuffer()); |
| 530 | readStage.outputBuffer = sharedBuffers.last(); | 612 | readStage.outputBuffer = sharedBuffers.last(); |
| 531 | readStage.stage_id = 0; | 613 | readStage.stage_id = 0; |
| 532 | 614 | ||
| @@ -542,7 +624,7 @@ public: | @@ -542,7 +624,7 @@ public: | ||
| 542 | processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; | 624 | processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; |
| 543 | lastBufferIdx++; | 625 | lastBufferIdx++; |
| 544 | 626 | ||
| 545 | - sharedBuffers.append(new SingleBuffer()); | 627 | + sharedBuffers.append(new DoubleBuffer()); |
| 546 | processingStages.last()->outputBuffer = sharedBuffers.last(); | 628 | processingStages.last()->outputBuffer = sharedBuffers.last(); |
| 547 | processingStages.last()->transform = transforms[i]; | 629 | processingStages.last()->transform = transforms[i]; |
| 548 | } | 630 | } |