Commit bb99fd8afdb08083c90876ab5f0d6d0e32f0c6e8

Authored by caotto
2 parents e9a774f1 d43c8217

Merge pull request #30 from biometrics/stream_processing

some updates to buffer management
Showing 1 changed file with 194 additions and 68 deletions
sdk/plugins/stream.cpp
1 -#define QT_NO_DEBUG_OUTPUT  
2 -  
3 #include <openbr_plugin.h> 1 #include <openbr_plugin.h>
4 #include <QReadWriteLock> 2 #include <QReadWriteLock>
5 #include <QWaitCondition> 3 #include <QWaitCondition>
@@ -29,19 +27,15 @@ public: @@ -29,19 +27,15 @@ public:
29 class SharedBuffer 27 class SharedBuffer
30 { 28 {
31 public: 29 public:
32 - SharedBuffer(int _maxItems = 200) : maxItems(_maxItems) {} 30 + SharedBuffer() {}
33 virtual ~SharedBuffer() {} 31 virtual ~SharedBuffer() {}
34 32
35 virtual void addItem(FrameData * input)=0; 33 virtual void addItem(FrameData * input)=0;
36 34
37 virtual FrameData * getItem()=0; 35 virtual FrameData * getItem()=0;
38 36
39 - int getMaxItems() { return maxItems; }  
40 -  
41 virtual void stoppedInput() =0; 37 virtual void stoppedInput() =0;
42 virtual void startInput() = 0; 38 virtual void startInput() = 0;
43 -protected:  
44 - int maxItems;  
45 }; 39 };
46 40
47 // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex 41 // For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex
@@ -49,7 +43,7 @@ protected: @@ -49,7 +43,7 @@ protected:
49 class SingleBuffer : public SharedBuffer 43 class SingleBuffer : public SharedBuffer
50 { 44 {
51 public: 45 public:
52 - SingleBuffer(unsigned _maxItems = 20) : SharedBuffer(_maxItems) { no_input = false; } 46 + SingleBuffer() { no_input = false; }
53 47
54 void stoppedInput() 48 void stoppedInput()
55 { 49 {
@@ -70,14 +64,8 @@ public: @@ -70,14 +64,8 @@ public:
70 { 64 {
71 QMutexLocker bufferLock(&bufferGuard); 65 QMutexLocker bufferLock(&bufferGuard);
72 66
73 - // If the buffer is too full, wait for space to become available  
74 - if (buffer.size() >= maxItems) {  
75 - availableOutputSpace.wait(&bufferGuard);  
76 - }  
77 -  
78 buffer.append(input); 67 buffer.append(input);
79 68
80 - // Wait for certain # of items?  
81 availableInput.wakeOne(); 69 availableInput.wakeOne();
82 } 70 }
83 71
@@ -99,61 +87,176 @@ public: @@ -99,61 +87,176 @@ public:
99 87
100 FrameData * output = buffer.first(); 88 FrameData * output = buffer.first();
101 buffer.removeFirst(); 89 buffer.removeFirst();
102 - if (buffer.size() < maxItems / 2)  
103 - availableOutputSpace.wakeAll();  
104 return output; 90 return output;
105 } 91 }
106 92
107 private: 93 private:
108 QMutex bufferGuard; 94 QMutex bufferGuard;
109 QWaitCondition availableInput; 95 QWaitCondition availableInput;
110 - QWaitCondition availableOutputSpace;  
111 bool no_input; 96 bool no_input;
112 97
113 QList<FrameData *> buffer; 98 QList<FrameData *> buffer;
114 }; 99 };
115 100
116 -// Interface for sequentially getting data from some data source.  
117 -// Initialized off of a template, can represent a video file (stored in the template's filename)  
118 -// or a set of images already loaded into memory stored as multiple matrices in an input template.  
119 -class DataSource 101 +// For 1 - 1 boundaries, a double buffering scheme
  102 +// Producer/consumer read/write from separate buffers, and switch if their
  103 +// buffer runs out/overflows. Synchronization is handled by a read/write lock
  104 +// threads are "reading" if they are adding to/removing from their individual
  105 +// buffer, and writing if they access or swap with the other buffer.
  106 +class DoubleBuffer : public SharedBuffer
120 { 107 {
121 public: 108 public:
122 - DataSource() {}  
123 - virtual ~DataSource() {} 109 + DoubleBuffer()
  110 + {
  111 + inputBuffer = &buffer1;
  112 + outputBuffer = &buffer2;
  113 + }
124 114
125 - virtual FrameData * getNext() = 0;  
126 - virtual void close() = 0;  
127 - virtual bool open(Template & input) = 0;  
128 - virtual bool isOpen() = 0; 115 + void stoppedInput()
  116 + {
  117 + QWriteLocker bufferLock(&bufferGuard);
  118 + no_input = true;
  119 + // Release anything waiting for input items.
  120 + availableInput.wakeAll();
  121 + }
  122 +
  123 + // There will be more input
  124 + void startInput()
  125 + {
  126 + QWriteLocker bufferLock(&bufferGuard);
  127 + no_input = false;
  128 + }
  129 +
  130 + // called from the producer thread
  131 + void addItem(FrameData * input)
  132 + {
  133 + QReadLocker readLock(&bufferGuard);
  134 + inputBuffer->append(input);
  135 + availableInput.wakeOne();
  136 + }
  137 +
  138 + // Called from the consumer thread
  139 + FrameData * getItem() {
  140 + QReadLocker readLock(&bufferGuard);
  141 +
  142 + // There is something for us to get
  143 + if (!outputBuffer->empty()) {
  144 + FrameData * output = outputBuffer->first();
  145 + outputBuffer->removeFirst();
  146 + return output;
  147 + }
  148 +
  149 + // Outputbuffer is empty, try to swap with the input buffer, we need a
  150 + // write lock to do that.
  151 + readLock.unlock();
  152 + QWriteLocker writeLock(&bufferGuard);
  153 +
  154 + // Nothing on the input buffer either?
  155 + if (inputBuffer->empty()) {
  156 + // If nothing else is coming, return null
  157 + if (no_input)
  158 + return NULL;
  159 + //otherwise, wait on the input buffer
  160 + availableInput.wait(&bufferGuard);
  161 + // Did we get woken up because no more input is coming? if so
  162 + // we're done here
  163 + if (no_input && inputBuffer->empty())
  164 + return NULL;
  165 + }
  166 +
  167 + // input buffer is non-empty, so swap the buffers
  168 + std::swap(inputBuffer, outputBuffer);
  169 +
  170 + // Return a frame
  171 + FrameData * output = outputBuffer->first();
  172 + outputBuffer->removeFirst();
  173 + return output;
  174 + }
  175 +
  176 +private:
  177 + // The read-write lock. The thread adding to this buffer can add
  178 + // to the current input buffer if it has a read lock. The thread
  179 + // removing from this buffer can remove things from the current
  180 + // output buffer if it has a read lock, or swap the buffers if it
  181 + // has a write lock.
  182 + // Checking/modifying no_input requires a write lock.
  183 + QReadWriteLock bufferGuard;
  184 + QWaitCondition availableInput;
  185 + bool no_input;
  186 +
  187 + // The buffer that is currently being added to
  188 + QList<FrameData *> * inputBuffer;
  189 + // The buffer that is currently being removed from
  190 + QList<FrameData *> * outputBuffer;
  191 +
  192 + // The buffers pointed at by inputBuffer/outputBuffer
  193 + QList<FrameData *> buffer1;
  194 + QList<FrameData *> buffer2;
129 }; 195 };
130 196
131 -// Read a video frame by frame using cv::VideoCapture  
132 -class VideoDataSource : public DataSource 197 +
  198 +// Interface for sequentially getting data from some data source.
  199 +// Initialized off of a template, can represent a video file (stored in the template's filename)
  200 +// or a set of images already loaded into memory stored as multiple matrices in an input template.
  201 +class DataSource
133 { 202 {
134 public: 203 public:
135 - VideoDataSource() {}  
136 -  
137 - FrameData * getNext() 204 + DataSource(int maxFrames=100)
138 { 205 {
139 - if (!isOpen())  
140 - return NULL; 206 + for (int i=0; i < maxFrames;i++)
  207 + {
  208 + allFrames.addItem(new FrameData());
  209 + }
  210 + allFrames.startInput();
  211 + }
141 212
142 - FrameData * output = new FrameData();  
143 - output->data.append(Template(basis.file));  
144 - output->data.last().append(cv::Mat()); 213 + virtual ~DataSource()
  214 + {
  215 + allFrames.stoppedInput();
  216 + while (true)
  217 + {
  218 + FrameData * frame = allFrames.getItem();
  219 + if (frame == NULL)
  220 + break;
  221 + delete frame;
  222 + }
  223 + }
145 224
146 - output->sequenceNumber = next_idx;  
147 - next_idx++; 225 + FrameData * getFrame()
  226 + {
  227 + FrameData * aFrame = allFrames.getItem();
  228 + aFrame->data.clear();
  229 + aFrame->sequenceNumber = -1;
148 230
149 - bool res = video.read(output->data.last().last()); 231 + bool res = getNext(*aFrame);
150 if (!res) { 232 if (!res) {
151 - delete output; 233 + allFrames.addItem(aFrame);
152 return NULL; 234 return NULL;
153 } 235 }
154 - return output; 236 + return aFrame;
155 } 237 }
156 238
  239 + void returnFrame(FrameData * inputFrame)
  240 + {
  241 + allFrames.addItem(inputFrame);
  242 + }
  243 +
  244 + virtual void close() = 0;
  245 + virtual bool open(Template & output) = 0;
  246 + virtual bool isOpen() = 0;
  247 +
  248 + virtual bool getNext(FrameData & input) = 0;
  249 +
  250 +protected:
  251 + DoubleBuffer allFrames;
  252 +};
  253 +
  254 +// Read a video frame by frame using cv::VideoCapture
  255 +class VideoDataSource : public DataSource
  256 +{
  257 +public:
  258 + VideoDataSource(int maxFrames) : DataSource(maxFrames) {}
  259 +
157 bool open(Template &input) 260 bool open(Template &input)
158 { 261 {
159 next_idx = 0; 262 next_idx = 0;
@@ -167,6 +270,24 @@ public: @@ -167,6 +270,24 @@ public:
167 void close() { video.release(); } 270 void close() { video.release(); }
168 271
169 private: 272 private:
  273 + bool getNext(FrameData & output)
  274 + {
  275 + if (!isOpen())
  276 + return false;
  277 +
  278 + output.data.append(Template(basis.file));
  279 + output.data.last().append(cv::Mat());
  280 +
  281 + output.sequenceNumber = next_idx;
  282 + next_idx++;
  283 +
  284 + bool res = video.read(output.data.last().last());
  285 + if (!res) {
  286 + return false;
  287 + }
  288 + return true;
  289 + }
  290 +
170 cv::VideoCapture video; 291 cv::VideoCapture video;
171 Template basis; 292 Template basis;
172 int next_idx; 293 int next_idx;
@@ -177,21 +298,9 @@ private: @@ -177,21 +298,9 @@ private:
177 class TemplateDataSource : public DataSource 298 class TemplateDataSource : public DataSource
178 { 299 {
179 public: 300 public:
180 - TemplateDataSource() { current_idx = INT_MAX; }  
181 -  
182 - FrameData * getNext() 301 + TemplateDataSource(int maxFrames) : DataSource(maxFrames)
183 { 302 {
184 - if (!isOpen())  
185 - return NULL;  
186 -  
187 - FrameData * output = new FrameData();  
188 - output->data.append(basis[current_idx]);  
189 - current_idx++;  
190 -  
191 - output->sequenceNumber = next_sequence;  
192 - next_sequence++;  
193 -  
194 - return output; 303 + current_idx = INT_MAX;
195 } 304 }
196 305
197 bool open(Template &input) 306 bool open(Template &input)
@@ -211,6 +320,20 @@ public: @@ -211,6 +320,20 @@ public:
211 } 320 }
212 321
213 private: 322 private:
  323 + bool getNext(FrameData & output)
  324 + {
  325 + if (!isOpen())
  326 + return false;
  327 +
  328 + output.data.append(basis[current_idx]);
  329 + current_idx++;
  330 +
  331 + output.sequenceNumber = next_sequence;
  332 + next_sequence++;
  333 +
  334 + return true;
  335 + }
  336 +
214 Template basis; 337 Template basis;
215 int current_idx; 338 int current_idx;
216 int next_sequence; 339 int next_sequence;
@@ -232,12 +355,6 @@ public: @@ -232,12 +355,6 @@ public:
232 close(); 355 close();
233 } 356 }
234 357
235 - FrameData * getNext()  
236 - {  
237 - if (!isOpen()) return NULL;  
238 - return actualSource->getNext();  
239 - }  
240 -  
241 void close() 358 void close()
242 { 359 {
243 if (actualSource) { 360 if (actualSource) {
@@ -253,13 +370,13 @@ public: @@ -253,13 +370,13 @@ public:
253 bool open_res = false; 370 bool open_res = false;
254 // Input has no matrices? Its probably a video that hasn't been loaded yet 371 // Input has no matrices? Its probably a video that hasn't been loaded yet
255 if (input.empty()) { 372 if (input.empty()) {
256 - actualSource = new VideoDataSource(); 373 + actualSource = new VideoDataSource(0);
257 open_res = actualSource->open(input); 374 open_res = actualSource->open(input);
258 qDebug("created video resource status %d", open_res); 375 qDebug("created video resource status %d", open_res);
259 } 376 }
260 else { 377 else {
261 // create frame dealer 378 // create frame dealer
262 - actualSource = new TemplateDataSource(); 379 + actualSource = new TemplateDataSource(0);
263 open_res = actualSource->open(input); 380 open_res = actualSource->open(input);
264 } 381 }
265 if (!isOpen()) { 382 if (!isOpen()) {
@@ -274,6 +391,11 @@ public: @@ -274,6 +391,11 @@ public:
274 391
275 protected: 392 protected:
276 DataSource * actualSource; 393 DataSource * actualSource;
  394 + bool getNext(FrameData & output)
  395 + {
  396 + return actualSource->getNext(output);
  397 + }
  398 +
277 }; 399 };
278 400
279 class ProcessingStage : public QRunnable 401 class ProcessingStage : public QRunnable
@@ -347,7 +469,8 @@ public: @@ -347,7 +469,8 @@ public:
347 { 469 {
348 forever 470 forever
349 { 471 {
350 - FrameData * aFrame = dataSource.getNext(); 472 + //FrameData * aFrame = dataSource.getNext();
  473 + FrameData * aFrame = dataSource.getFrame();
351 if (aFrame == NULL) 474 if (aFrame == NULL)
352 break; 475 break;
353 outputBuffer->addItem(aFrame); 476 outputBuffer->addItem(aFrame);
@@ -367,6 +490,7 @@ public: @@ -367,6 +490,7 @@ public:
367 private: 490 private:
368 TemplateList collectedOutput; 491 TemplateList collectedOutput;
369 public: 492 public:
  493 + DataSource * data;
370 void run() 494 void run()
371 { 495 {
372 forever 496 forever
@@ -377,7 +501,8 @@ public: @@ -377,7 +501,8 @@ public:
377 break; 501 break;
378 // Just put the item on collectedOutput 502 // Just put the item on collectedOutput
379 collectedOutput.append(frame->data); 503 collectedOutput.append(frame->data);
380 - delete frame; 504 + // Return the frame to the input frame buffer
  505 + data->returnFrame(frame);
381 } 506 }
382 this->markStop(); 507 this->markStop();
383 } 508 }
@@ -483,7 +608,7 @@ public: @@ -483,7 +608,7 @@ public:
483 } 608 }
484 609
485 // buffer 0 -- output buffer for the read stage 610 // buffer 0 -- output buffer for the read stage
486 - sharedBuffers.append(new SingleBuffer()); 611 + sharedBuffers.append(new DoubleBuffer());
487 readStage.outputBuffer = sharedBuffers.last(); 612 readStage.outputBuffer = sharedBuffers.last();
488 readStage.stage_id = 0; 613 readStage.stage_id = 0;
489 614
@@ -499,12 +624,13 @@ public: @@ -499,12 +624,13 @@ public:
499 processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx]; 624 processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx];
500 lastBufferIdx++; 625 lastBufferIdx++;
501 626
502 - sharedBuffers.append(new SingleBuffer()); 627 + sharedBuffers.append(new DoubleBuffer());
503 processingStages.last()->outputBuffer = sharedBuffers.last(); 628 processingStages.last()->outputBuffer = sharedBuffers.last();
504 processingStages.last()->transform = transforms[i]; 629 processingStages.last()->transform = transforms[i];
505 } 630 }
506 631
507 collectionStage.inputBuffer = sharedBuffers.last(); 632 collectionStage.inputBuffer = sharedBuffers.last();
  633 + collectionStage.data = &readStage.dataSource;
508 collectionStage.stage_id = next_stage_id; 634 collectionStage.stage_id = next_stage_id;
509 } 635 }
510 636