Commit 845605d8c010c28f81ab0148a213d27c474b2884

Authored by Josh Klontz
2 parents a16a89e2 f2944b14

Merge branch 'master' of https://github.com/biometrics/openbr

openbr/plugins/ct8.cpp
... ... @@ -11,7 +11,7 @@
11 11 #include <exception>
12 12 #include <string>
13 13 #include <vector>
14   -#include <openbr_plugin.h>
  14 +#include <openbr/openbr_plugin.h>
15 15  
16 16 #include "core/resource.h"
17 17  
... ...
openbr/plugins/gui.cpp
1 1 #include <QApplication>
2 2 #include <QLabel>
  3 +#include <QElapsedTimer>
3 4 #include <opencv2/imgproc/imgproc.hpp>
4 5 #include <openbr/openbr_plugin.h>
5 6  
... ... @@ -66,13 +67,13 @@ public slots:
66 67 {
67 68 window->setPixmap(input);
68 69 window->setFixedSize(input.size());
69   - window->setVisible(true);
70 70 }
71 71  
72 72 void createWindow()
73 73 {
74 74 delete window;
75 75 window = new QLabel();
  76 + window->setVisible(true);
76 77 }
77 78 };
78 79  
... ... @@ -87,6 +88,9 @@ class Show2Transform : public TimeVaryingTransform
87 88 {
88 89 Q_OBJECT
89 90 public:
  91 + Q_PROPERTY(QStringList keys READ get_keys WRITE set_keys RESET reset_keys STORED false)
  92 + BR_PROPERTY(QStringList, keys, QStringList("FrameNumber"))
  93 +
90 94 Show2Transform() : TimeVaryingTransform(false, false)
91 95 {
92 96 // Create our GUI proxy
... ... @@ -120,9 +124,22 @@ public:
120 124 return;
121 125  
122 126 foreach (const Template & t, src) {
  127 + // build label
  128 + QString newTitle;
  129 + foreach (const QString & s, keys) {
  130 +
  131 + if (t.file.contains(s)) {
  132 + QString out = t.file.get<QString>(s);
  133 + newTitle = newTitle + s + ": " + out + " ";
  134 + }
  135 +
  136 + }
  137 + emit this->changeTitle(newTitle);
  138 +
123 139 foreach(const cv::Mat & m, t) {
124 140 qImageBuffer = toQImage(m);
125 141 displayBuffer.convertFromImage(qImageBuffer);
  142 +
126 143 // Emit an explicit copy of our pixmap so that the pixmap used
127 144 // by the main thread isn't damaged when we update displayBuffer
128 145 // later.
... ... @@ -140,6 +157,7 @@ public:
140 157 void init()
141 158 {
142 159 emit needWindow();
  160 + connect(this, SIGNAL(changeTitle(QString)), gui->window, SLOT(setWindowTitle(QString)));
143 161 }
144 162  
145 163 protected:
... ... @@ -150,10 +168,114 @@ protected:
150 168 signals:
151 169 void needWindow();
152 170 void updateImage(const QPixmap & input);
  171 + void changeTitle(const QString & input);
153 172 };
154 173  
155 174 BR_REGISTER(Transform, Show2Transform)
156 175  
  176 +class FPSSynch : public TimeVaryingTransform
  177 +{
  178 + Q_OBJECT
  179 + Q_PROPERTY(int targetFPS READ get_targetFPS WRITE set_targetFPS RESET reset_targetFPS STORED false)
  180 + BR_PROPERTY(int, targetFPS, 30)
  181 +
  182 +public:
  183 + FPSSynch() : TimeVaryingTransform(false, false) {}
  184 +
  185 + ~FPSSynch() {}
  186 +
  187 + void train(const TemplateList &data) { (void) data; }
  188 +
  189 +
  190 + void projectUpdate(const TemplateList &src, TemplateList &dst)
  191 + {
  192 + dst = src;
  193 + qint64 time_delta = timer.elapsed();
  194 +
  195 + qint64 wait_time = target_wait - time_delta;
  196 + timer.start();
  197 +
  198 + if (wait_time < 0) {
  199 + return;
  200 + }
  201 +
  202 + QThread::msleep(wait_time);
  203 + }
  204 +
  205 + void finalize(TemplateList & output)
  206 + {
  207 + (void) output;
  208 + }
  209 +
  210 + void init()
  211 + {
  212 + target_wait = 1000 / targetFPS;
  213 + timer.start();
  214 + }
  215 +
  216 +protected:
  217 + QElapsedTimer timer;
  218 + qint64 target_wait;
  219 +};
  220 +BR_REGISTER(Transform, FPSSynch)
  221 +
  222 +class FPSCalc : public TimeVaryingTransform
  223 +{
  224 + Q_OBJECT
  225 + Q_PROPERTY(int targetFPS READ get_targetFPS WRITE set_targetFPS RESET reset_targetFPS)
  226 + BR_PROPERTY(int, targetFPS, 30)
  227 +
  228 +
  229 +public:
  230 + FPSCalc() : TimeVaryingTransform(false, false) { initialized = false; }
  231 +
  232 + ~FPSCalc() {}
  233 +
  234 + void train(const TemplateList &data) { (void) data; }
  235 +
  236 +
  237 + void projectUpdate(const TemplateList &src, TemplateList &dst)
  238 + {
  239 + dst = src;
  240 +
  241 + if (!initialized) {
  242 + initialized = true;
  243 + timer.start();
  244 + }
  245 + framesSeen++;
  246 +
  247 + if (dst.empty())
  248 + return;
  249 +
  250 + qint64 elapsed = timer.elapsed();
  251 + if (elapsed > 1000) {
  252 + double fps = 1000 * framesSeen / elapsed;
  253 + //output.data.last().file.set("FrameNumber", output.sequenceNumber);
  254 + dst.first().file.set("AvgFPS", fps);
  255 + }
  256 + }
  257 +
  258 + void finalize(TemplateList & output)
  259 + {
  260 + (void) output;
  261 + }
  262 +
  263 + void init()
  264 + {
  265 + initialized = false;
  266 + framesSeen = 0;
  267 + }
  268 +
  269 +protected:
  270 + bool initialized;
  271 + QElapsedTimer timer;
  272 + qint64 framesSeen;
  273 +};
  274 +BR_REGISTER(Transform, FPSCalc)
  275 +
  276 +
  277 +
  278 +
157 279 } // namespace br
158 280  
159 281 #include "gui.moc"
... ...
openbr/plugins/ipc2013.cpp
1   -#include <openbr_plugin.h>
  1 +#include <openbr/openbr_plugin.h>
2 2 #include <pxcaccelerator.h>
3 3 #include <pxcface.h>
4 4 #include <pxcimage.h>
... ...
openbr/plugins/nec3.cpp
... ... @@ -4,7 +4,7 @@
4 4  
5 5 #include <NeoFacePro.h>
6 6  
7   -#include <openbr_plugin.h>
  7 +#include <openbr/openbr_plugin.h>
8 8 #include "core/resource.h"
9 9  
10 10 using namespace br;
... ...
openbr/plugins/nt4.cpp
... ... @@ -10,7 +10,7 @@
10 10 #include <NMatcherParams.h>
11 11 #include <NTemplate.h>
12 12 #include <NLicensing.h>
13   -#include <openbr_plugin.h>
  13 +#include <openbr/openbr_plugin.h>
14 14  
15 15 //IRIS
16 16 #include <NEExtractor.h>
... ...
openbr/plugins/pbd.cpp
... ... @@ -2,7 +2,7 @@
2 2  
3 3 #include <opencv2/highgui/highgui.hpp>
4 4  
5   -#include <openbr_plugin.h>
  5 +#include <openbr/openbr_plugin.h>
6 6  
7 7 #include "core/opencvutils.h"
8 8  
... ...
openbr/plugins/pp5.cpp
... ... @@ -11,7 +11,7 @@
11 11 #include <pittpatt_raw_image_io.h>
12 12 #include <pittpatt_sdk.h>
13 13 #include <pittpatt_license.h>
14   -#include <openbr_plugin.h>
  14 +#include <openbr/openbr_plugin.h>
15 15  
16 16 #include "openbr/core/resource.h"
17 17  
... ...
openbr/plugins/stasm.cpp
1 1 #include <stasm_dll.hpp>
2 2 #include <opencv2/highgui/highgui.hpp>
3   -#include <openbr_plugin.h>
  3 +#include <openbr/openbr_plugin.h>
4 4  
5 5 using namespace cv;
6 6  
... ...
openbr/plugins/stream.cpp
... ... @@ -4,6 +4,7 @@
4 4 #include <QSemaphore>
5 5 #include <QMap>
6 6 #include <opencv/highgui.h>
  7 +#include <QtConcurrent>
7 8 #include <openbr/openbr_plugin.h>
8 9  
9 10 #include "openbr/core/common.h"
... ... @@ -33,70 +34,7 @@ public:
33 34  
34 35 virtual void addItem(FrameData * input)=0;
35 36  
36   - virtual FrameData * getItem()=0;
37   -
38   - virtual void stoppedInput() =0;
39   - virtual void startInput() = 0;
40   -};
41   -
42   -// For 1 - n boundaries, a buffer class with a single shared buffer, a mutex
43   -// is used to serialize all access to the buffer.
44   -class SingleBuffer : public SharedBuffer
45   -{
46   -public:
47   - SingleBuffer() { no_input = false; }
48   -
49   - void stoppedInput()
50   - {
51   - QMutexLocker bufferLock(&bufferGuard);
52   - no_input = true;
53   - // Release anything waiting for input items.
54   - availableInput.wakeAll();
55   - }
56   -
57   - // There will be more input
58   - void startInput()
59   - {
60   - QMutexLocker bufferLock(&bufferGuard);
61   - no_input = false;
62   - }
63   -
64   - void addItem(FrameData * input)
65   - {
66   - QMutexLocker bufferLock(&bufferGuard);
67   -
68   - buffer.append(input);
69   -
70   - availableInput.wakeOne();
71   - }
72   -
73   - FrameData * getItem()
74   - {
75   - QMutexLocker bufferLock(&bufferGuard);
76   -
77   - if (buffer.empty()) {
78   - // If no further items will come we are done here
79   - if (no_input)
80   - return NULL;
81   - // Wait for an item
82   - availableInput.wait(&bufferGuard);
83   - }
84   -
85   - // availableInput was signalled, but the buffer is still empty? We're done here.
86   - if (buffer.empty())
87   - return NULL;
88   -
89   - FrameData * output = buffer.first();
90   - buffer.removeFirst();
91   - return output;
92   - }
93   -
94   -private:
95   - QMutex bufferGuard;
96   - QWaitCondition availableInput;
97   - bool no_input;
98   -
99   - QList<FrameData *> buffer;
  37 + virtual FrameData * tryGetItem()=0;
100 38 };
101 39  
102 40 // for n - 1 boundaries, multiple threads call addItem, the frames are
... ... @@ -107,56 +45,26 @@ class SequencingBuffer : public SharedBuffer
107 45 public:
108 46 SequencingBuffer()
109 47 {
110   - no_input = false;
111 48 next_target = 0;
112 49 }
113 50  
114   - void stoppedInput()
115   - {
116   - QMutexLocker bufferLock(&bufferGuard);
117   - no_input = true;
118   - // Release anything waiting for input items.
119   - availableInput.wakeAll();
120   - }
121   -
122   - // There will be more input
123   - void startInput()
124   - {
125   - QMutexLocker bufferLock(&bufferGuard);
126   - no_input = false;
127   - }
128   -
129 51 void addItem(FrameData * input)
130 52 {
131 53 QMutexLocker bufferLock(&bufferGuard);
132 54  
133 55 buffer.insert(input->sequenceNumber, input);
134   -
135   - if (input->sequenceNumber == next_target) {
136   - availableInput.wakeOne();
137   - }
138 56 }
139 57  
140   - FrameData * getItem()
  58 + FrameData * tryGetItem()
141 59 {
142 60 QMutexLocker bufferLock(&bufferGuard);
143 61  
144 62 if (buffer.empty() || buffer.begin().key() != this->next_target) {
145   - if (buffer.empty() && no_input) {
146   - next_target = 0;
147   - return NULL;
148   - }
149   - availableInput.wait(&bufferGuard);
150   - }
151   -
152   - // availableInput was signalled, but the buffer is empty? We're done here.
153   - if (buffer.empty()) {
154   - next_target = 0;
155 63 return NULL;
156 64 }
157 65  
158 66 QMap<int, FrameData *>::Iterator result = buffer.begin();
159   - //next_target++;
  67 +
160 68 if (next_target != result.value()->sequenceNumber) {
161 69 qWarning("mismatched targets!");
162 70 }
... ... @@ -170,9 +78,6 @@ public:
170 78  
171 79 private:
172 80 QMutex bufferGuard;
173   - QWaitCondition availableInput;
174   - bool no_input;
175   -
176 81 int next_target;
177 82  
178 83 QMap<int, FrameData *> buffer;
... ... @@ -192,31 +97,16 @@ public:
192 97 outputBuffer = &buffer2;
193 98 }
194 99  
195   - void stoppedInput()
196   - {
197   - QWriteLocker bufferLock(&bufferGuard);
198   - no_input = true;
199   - // Release anything waiting for input items.
200   - availableInput.wakeAll();
201   - }
202   -
203   - // There will be more input
204   - void startInput()
205   - {
206   - QWriteLocker bufferLock(&bufferGuard);
207   - no_input = false;
208   - }
209 100  
210 101 // called from the producer thread
211 102 void addItem(FrameData * input)
212 103 {
213 104 QReadLocker readLock(&bufferGuard);
214 105 inputBuffer->append(input);
215   - availableInput.wakeOne();
216 106 }
217 107  
218   - // Called from the consumer thread
219   - FrameData * getItem() {
  108 + FrameData * tryGetItem()
  109 + {
220 110 QReadLocker readLock(&bufferGuard);
221 111  
222 112 // There is something for us to get
... ... @@ -233,15 +123,7 @@ public:
233 123  
234 124 // Nothing on the input buffer either?
235 125 if (inputBuffer->empty()) {
236   - // If nothing else is coming, return null
237   - if (no_input)
238   - return NULL;
239   - //otherwise, wait on the input buffer
240   - availableInput.wait(&bufferGuard);
241   - // Did we get woken up because no more input is coming? if so
242   - // we're done here
243   - if (no_input && inputBuffer->empty())
244   - return NULL;
  126 + return NULL;
245 127 }
246 128  
247 129 // input buffer is non-empty, so swap the buffers
... ... @@ -259,10 +141,7 @@ private:
259 141 // removing from this buffer can remove things from the current
260 142 // output buffer if it has a read lock, or swap the buffers if it
261 143 // has a write lock.
262   - // Checking/modifying no_input requires a write lock.
263 144 QReadWriteLock bufferGuard;
264   - QWaitCondition availableInput;
265   - bool no_input;
266 145  
267 146 // The buffer that is currently being added to
268 147 QList<FrameData *> * inputBuffer;
... ... @@ -281,44 +160,71 @@ private:
281 160 class DataSource
282 161 {
283 162 public:
284   - DataSource(int maxFrames=100)
  163 + DataSource(int maxFrames=Globals->parallelism + 1)
285 164 {
  165 + final_frame = -1;
  166 + last_issued = -2;
286 167 for (int i=0; i < maxFrames;i++)
287 168 {
288 169 allFrames.addItem(new FrameData());
289 170 }
290   - allFrames.startInput();
291 171 }
292 172  
293 173 virtual ~DataSource()
294 174 {
295   - allFrames.stoppedInput();
296 175 while (true)
297 176 {
298   - FrameData * frame = allFrames.getItem();
  177 + FrameData * frame = allFrames.tryGetItem();
299 178 if (frame == NULL)
300 179 break;
301 180 delete frame;
302 181 }
303 182 }
304 183  
305   - FrameData * getFrame()
  184 + // non-blocking version of getFrame
  185 + FrameData * tryGetFrame()
306 186 {
307   - FrameData * aFrame = allFrames.getItem();
  187 + FrameData * aFrame = allFrames.tryGetItem();
  188 + if (aFrame == NULL)
  189 + return NULL;
  190 +
308 191 aFrame->data.clear();
309 192 aFrame->sequenceNumber = -1;
310 193  
311 194 bool res = getNext(*aFrame);
312 195 if (!res) {
313 196 allFrames.addItem(aFrame);
  197 + // Datasource broke?
  198 + QMutexLocker lock(&last_frame_update);
  199 +
  200 + final_frame = last_issued;
  201 + if (final_frame == last_received)
  202 + lastReturned.wakeAll();
  203 + else if (final_frame < last_received)
  204 + std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl;
314 205 return NULL;
315 206 }
  207 + last_issued = aFrame->sequenceNumber;
316 208 return aFrame;
317 209 }
318 210  
319   - void returnFrame(FrameData * inputFrame)
  211 + bool returnFrame(FrameData * inputFrame)
320 212 {
321 213 allFrames.addItem(inputFrame);
  214 +
  215 + QMutexLocker lock(&last_frame_update);
  216 + last_received = inputFrame->sequenceNumber;
  217 + if (inputFrame->sequenceNumber == final_frame) {
  218 + lastReturned.wakeAll();
  219 + }
  220 +
  221 + return this->final_frame != -1;
  222 + }
  223 +
  224 + void waitLast()
  225 + {
  226 + QMutexLocker lock(&last_frame_update);
  227 + lastReturned.wait(&last_frame_update);
322 228 }
323 229  
324 230 virtual void close() = 0;
... ... @@ -329,6 +235,12 @@ public:
329 235  
330 236 protected:
331 237 DoubleBuffer allFrames;
  238 + int final_frame;
  239 + int last_issued;
  240 + int last_received;
  241 +
  242 + QWaitCondition lastReturned;
  243 + QMutex last_frame_update;
332 244 };
333 245  
334 246 // Read a video frame by frame using cv::VideoCapture
... ... @@ -339,6 +251,9 @@ public:
339 251  
340 252 bool open(Template &input)
341 253 {
  254 + final_frame = -1;
  255 + last_issued = -2;
  256 +
342 257 next_idx = 0;
343 258 basis = input;
344 259 video.open(input.file.name.toStdString());
... ... @@ -365,6 +280,7 @@ private:
365 280 if (!res) {
366 281 return false;
367 282 }
  283 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
368 284 return true;
369 285 }
370 286  
... ... @@ -388,6 +304,9 @@ public:
388 304 basis = input;
389 305 current_idx = 0;
390 306 next_sequence = 0;
  307 + final_frame = -1;
  308 + last_issued = -2;
  309 +
391 310 return isOpen();
392 311 }
393 312  
... ... @@ -448,6 +367,9 @@ public:
448 367 {
449 368 close();
450 369 bool open_res = false;
  370 + final_frame = -1;
  371 + last_issued = -2;
  372 +
451 373 // Input has no matrices? Its probably a video that hasn't been loaded yet
452 374 if (input.empty()) {
453 375 actualSource = new VideoDataSource(0);
... ... @@ -479,84 +401,194 @@ protected:
479 401  
480 402 class ProcessingStage : public QRunnable
481 403 {
  404 +public:
482 405 friend class StreamTransform;
483 406 public:
484 407 ProcessingStage(int nThreads = 1)
485 408 {
486 409 thread_count = nThreads;
487   - activeThreads.release(thread_count);
488 410 setAutoDelete(false);
489 411 }
490 412  
491   - void markStart()
492   - {
493   - activeThreads.acquire();
494   - }
  413 + virtual void run()=0;
495 414  
496   - void waitStop()
497   - {
498   - // Wait until all threads have stopped
499   - activeThreads.acquire(thread_count);
500   - activeThreads.release(thread_count);
501   - }
  415 + virtual void nextStageRun(FrameData * input)=0;
502 416  
503 417 protected:
504   - void markStop()
505   - {
506   - activeThreads.release();
507   - }
508   - QSemaphore activeThreads;
509 418 int thread_count;
510 419  
511 420 SharedBuffer * inputBuffer;
512   - SharedBuffer * outputBuffer;
  421 + ProcessingStage * nextStage;
513 422 Transform * transform;
514 423 int stage_id;
515 424  
  425 +};
  426 +
  427 +class MultiThreadStage;
  428 +
  429 +void multistage_run(MultiThreadStage * basis, FrameData * input);
  430 +
  431 +class MultiThreadStage : public ProcessingStage
  432 +{
  433 +public:
  434 + MultiThreadStage(int _input) : ProcessingStage(_input) {}
  435 +
  436 + friend void multistage_run(MultiThreadStage * basis, FrameData * input);
  437 +
  438 + void run()
  439 + {
  440 + qFatal("no don't do it!");
  441 + }
  442 +
  443 + // Called from a different thread than run
  444 + virtual void nextStageRun(FrameData * input)
  445 + {
  446 + QtConcurrent::run(multistage_run, this, input);
  447 + }
  448 +};
  449 +
  450 +void multistage_run(MultiThreadStage * basis, FrameData * input)
  451 +{
  452 + if (input == NULL)
  453 + qFatal("null input to multi-thread stage");
  454 + // Project the input we got
  455 + basis->transform->projectUpdate(input->data);
  456 +
  457 + basis->nextStage->nextStageRun(input);
  458 +}
  459 +
  460 +class SingleThreadStage : public ProcessingStage
  461 +{
516 462 public:
  463 + SingleThreadStage(bool input_variance) : ProcessingStage(1)
  464 + {
  465 + currentStatus = STOPPING;
  466 + next_target = 0;
  467 + if (input_variance)
  468 + this->inputBuffer = new DoubleBuffer();
  469 + else
  470 + this->inputBuffer = new SequencingBuffer();
  471 + }
  472 + ~SingleThreadStage()
  473 + {
  474 + delete inputBuffer;
  475 + }
  476 +
  477 + int next_target;
  478 + enum Status
  479 + {
  480 + STARTING,
  481 + STOPPING
  482 + };
  483 + QReadWriteLock statusLock;
  484 + Status currentStatus;
  485 +
517 486 // We should start, and enter a wait on input data
518 487 void run()
519 488 {
520   - markStart();
  489 + FrameData * currentItem;
521 490 forever
522 491 {
523   - FrameData * currentItem = inputBuffer->getItem();
  492 + // Whether or not we get a valid item controls whether or not we
  493 + QWriteLocker lock(&statusLock);
  494 + currentItem = inputBuffer->tryGetItem();
524 495 if (currentItem == NULL)
525   - break;
  496 + {
  497 + this->currentStatus = STOPPING;
  498 + return;
  499 + }
  500 + lock.unlock();
  501 + if (currentItem->sequenceNumber != next_target)
  502 + {
  503 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  504 + }
  505 + next_target = currentItem->sequenceNumber + 1;
  506 +
526 507 // Project the input we got
527 508 transform->projectUpdate(currentItem->data);
528   - // Add the result to the ouptut buffer
529   - outputBuffer->addItem(currentItem);
  509 +
  510 + this->nextStage->nextStageRun(currentItem);
530 511 }
531   - markStop();
532 512 }
533   -};
534 513  
  514 + // Calledfrom a different thread than run.
  515 + void nextStageRun(FrameData * input)
  516 + {
  517 + // add to our input buffer
  518 + inputBuffer->addItem(input);
  519 + QReadLocker lock(&statusLock);
  520 + if (currentStatus == STARTING)
  521 + return;
  522 +
  523 + // Have to change to a write lock to modify currentStatus
  524 + lock.unlock();
  525 + QWriteLocker writeLock(&statusLock);
  526 + // But someone might have changed it between locks
  527 + if (currentStatus == STARTING)
  528 + return;
  529 + // Ok we can start a thread
  530 + QThreadPool::globalInstance()->start(this);
  531 + currentStatus = STARTING;
  532 + }
  533 +};
535 534  
536 535 // No input buffer, instead we draw templates from some data source
537 536 // Will be operated by the main thread for the stream
538   -class FirstStage : public ProcessingStage
  537 +class FirstStage : public SingleThreadStage
539 538 {
540 539 public:
  540 + FirstStage() : SingleThreadStage(true) {}
  541 +
541 542 DataSourceManager dataSource;
542 543 // Start drawing frames from the datasource.
543 544 void run()
544 545 {
  546 + FrameData * currentItem;
545 547 forever
546 548 {
547   - //FrameData * aFrame = dataSource.getNext();
548   - FrameData * aFrame = dataSource.getFrame();
549   - if (aFrame == NULL)
550   - break;
551   - outputBuffer->addItem(aFrame);
  549 + // Whether or not we get a valid item controls whether or not we
  550 + QWriteLocker lock(&statusLock);
  551 +
  552 + currentItem = this->dataSource.tryGetFrame();
  553 + if (currentItem == NULL)
  554 + {
  555 + this->currentStatus = STOPPING;
  556 + return;
  557 + }
  558 + lock.unlock();
  559 + if (currentItem->sequenceNumber != next_target)
  560 + {
  561 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  562 + }
  563 + next_target = currentItem->sequenceNumber + 1;
  564 +
  565 + this->nextStage->nextStageRun(currentItem);
552 566 }
553   - this->markStop();
554 567 }
  568 +
  569 + void nextStageRun(FrameData * input)
  570 + {
  571 + QWriteLocker lock(&statusLock);
  572 +
  573 + // Return the frame to the frame buffer
  574 + bool res = dataSource.returnFrame(input);
  575 + // If the data source broke already, we're done.
  576 + if (res)
  577 + return;
  578 +
  579 + if (currentStatus == STARTING)
  580 + return;
  581 +
  582 + currentStatus = STARTING;
  583 + QThreadPool::globalInstance()->start(this, this->next_target);
  584 + }
  585 +
555 586 };
556 587  
557   -class LastStage : public ProcessingStage
  588 +class LastStage : public SingleThreadStage
558 589 {
559 590 public:
  591 + LastStage(bool _prev_stage_variance) : SingleThreadStage(_prev_stage_variance) {}
560 592 TemplateList getOutput()
561 593 {
562 594 return collectedOutput;
... ... @@ -565,28 +597,35 @@ public:
565 597 private:
566 598 TemplateList collectedOutput;
567 599 public:
568   - DataSource * data;
569 600 void run()
570 601 {
571 602 forever
572 603 {
573   - // Wait for input
574   - FrameData * frame = inputBuffer->getItem();
575   - if (frame == NULL)
  604 + QWriteLocker lock(&statusLock);
  605 + FrameData * currentItem = inputBuffer->tryGetItem();
  606 + if (currentItem == NULL)
  607 + {
  608 + currentStatus = STOPPING;
576 609 break;
  610 + }
  611 + lock.unlock();
  612 +
  613 + if (currentItem->sequenceNumber != next_target)
  614 + {
  615 + qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  616 + }
  617 + next_target = currentItem->sequenceNumber + 1;
  618 +
577 619 // Just put the item on collectedOutput
578   - collectedOutput.append(frame->data);
579   - // Return the frame to the input frame buffer
580   - data->returnFrame(frame);
  620 + collectedOutput.append(currentItem->data);
  621 + this->nextStage->nextStageRun(currentItem);
581 622 }
582   - this->markStop();
583 623 }
584 624 };
585 625  
586 626 class StreamTransform : public CompositeTransform
587 627 {
588 628 Q_OBJECT
589   - int threads_per_multi_stage;
590 629 public:
591 630 void train(const TemplateList & data)
592 631 {
... ... @@ -627,40 +666,15 @@ public:
627 666 return;
628 667 }
629 668  
630   - // Tell all buffers to expect input
631   - for (int i=0; i < sharedBuffers.size(); i++) {
632   - sharedBuffers[i]->startInput();
633   - }
634   -
635   - // Start our processing stages
636   - for (int i=0; i < this->processingStages.size(); i++) {
637   - int count = stage_variance[i] ? 1 : threads_per_multi_stage;
638   - for (int j =0; j < count; j ++) processingThreads.start(processingStages[i]);
639   - }
640   -
641   - // Start the final stage
642   - processingThreads.start(&collectionStage);
643   -
644   - // Run the read stage ourselves
645   - readStage.run();
646   -
647   - // The read stage has stopped (since we ran the read stage).
648   - // Step over the buffers, and call stoppedInput to tell the stage
649   - // reading from each buffer that no more frames will be added after
650   - // the current ones run out, then wait for the thread to finish.
651   - for (int i =0; i < (sharedBuffers.size() - 1); i++) {
652   - // Indicate that no more input will be available
653   - sharedBuffers[i]->stoppedInput();
654   -
655   - // Wait for the thread to finish.
656   - this->processingStages[i]->waitStop();
657   - }
658   - // Wait for the collection stage to finish
659   - sharedBuffers.last()->stoppedInput();
660   - collectionStage.waitStop();
  669 + QThreadPool::globalInstance()->releaseThread();
  670 + readStage.currentStatus = SingleThreadStage::STARTING;
  671 + QThreadPool::globalInstance()->start(&readStage, 0);
  672 + // Wait for the end.
  673 + readStage.dataSource.waitLast();
  674 + QThreadPool::globalInstance()->reserveThread();
661 675  
662 676 // dst is set to all output received by the final stage
663   - dst = collectionStage.getOutput();
  677 + dst = collectionStage->getOutput();
664 678 }
665 679  
666 680 virtual void finalize(TemplateList & output)
... ... @@ -672,78 +686,47 @@ public:
672 686 // Create and link stages
673 687 void init()
674 688 {
675   - int thread_count = 0;
676   - threads_per_multi_stage = 4;
  689 + if (transforms.isEmpty()) return;
  690 +
677 691 stage_variance.reserve(transforms.size());
678 692 foreach (const br::Transform *transform, transforms) {
679 693 stage_variance.append(transform->timeVarying());
680   - thread_count += transform->timeVarying() ? 1 : threads_per_multi_stage;
681 694 }
682   - if (transforms.isEmpty()) return;
683   -
684   - // Set up the thread pool, 1 stage for each transform, as well as first
685   - // and last stages, but the first stage is operated by the thread that
686   - // calls project so the pool only needs nTransforms+1 total.
687   - processingThreads.setMaxThreadCount(thread_count + 1);
688   -
689   -
690   - // buffer 0 -- output buffer for the read stage, input buffer for
691   - // first transform. Is that transform time-varying?
692   - if (stage_variance[0])
693   - sharedBuffers.append(new DoubleBuffer());
694   - // If not, we can run multiple threads
695   - else
696   - sharedBuffers.append(new SingleBuffer());
697 695  
698   - readStage.outputBuffer = sharedBuffers.last();
699 696 readStage.stage_id = 0;
700 697  
701 698 int next_stage_id = 1;
702   -
703 699 int lastBufferIdx = 0;
  700 + bool prev_stage_variance = true;
704 701 for (int i =0; i < transforms.size(); i++)
705 702 {
706   - // Set up this stage
707   - processingStages.append(new ProcessingStage(stage_variance[i] ? 1 : threads_per_multi_stage));
  703 + if (stage_variance[i])
  704 + {
  705 + processingStages.append(new SingleThreadStage(prev_stage_variance));
  706 + }
  707 + else
  708 + processingStages.append(new MultiThreadStage(Globals->parallelism));
708 709  
709 710 processingStages.last()->stage_id = next_stage_id++;
710   - processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx];
711   - lastBufferIdx++;
712 711  
713   - // This stage's output buffer, next stage's input buffer. If this is
714   - // the last transform, the next stage is the (time varying) collection
715   - // stage
716   - bool next_variance = (i+1) < transforms.size() ? stage_variance[i+1] : true;
717   - bool current_variance = stage_variance[i];
718   - // if this is a single threaded stage
719   - if (current_variance)
720   - {
721   - // 1 - 1 case
722   - if (next_variance)
723   - sharedBuffers.append(new DoubleBuffer());
724   - // 1 - n case
725   - else
726   - sharedBuffers.append(new SingleBuffer());
727   - }
728   - // This is a multi-threaded stage
  712 + // link nextStage pointers
  713 + if (i == 0)
  714 + this->readStage.nextStage = processingStages[i];
729 715 else
730   - {
731   - // If the next stage is single threaded, we need to sequence our
732   - // output (n - 1 case)
733   - if (next_variance)
734   - sharedBuffers.append(new SequencingBuffer());
735   - // Otherwise, this is an n-n boundary and we don't need to
736   - // adhere to any particular sequence
737   - else
738   - sharedBuffers.append(new SingleBuffer());
739   - }
740   - processingStages.last()->outputBuffer = sharedBuffers.last();
  716 + processingStages[i-1]->nextStage = processingStages[i];
  717 +
  718 + lastBufferIdx++;
  719 +
741 720 processingStages.last()->transform = transforms[i];
  721 + prev_stage_variance = stage_variance[i];
742 722 }
743 723  
744   - collectionStage.inputBuffer = sharedBuffers.last();
745   - collectionStage.data = &readStage.dataSource;
746   - collectionStage.stage_id = next_stage_id;
  724 + collectionStage = new LastStage(prev_stage_variance);
  725 + collectionStage->stage_id = next_stage_id;
  726 +
  727 + // It's a ring buffer, get it?
  728 + processingStages.last()->nextStage = collectionStage;
  729 + collectionStage->nextStage = &readStage;
747 730 }
748 731  
749 732 ~StreamTransform()
... ... @@ -751,22 +734,16 @@ public:
751 734 for (int i = 0; i < processingStages.size(); i++) {
752 735 delete processingStages[i];
753 736 }
754   - for (int i = 0; i < sharedBuffers.size(); i++) {
755   - delete sharedBuffers[i];
756   - }
757   -
  737 + delete collectionStage;
758 738 }
759 739  
760 740 protected:
761 741 QList<bool> stage_variance;
762 742  
763 743 FirstStage readStage;
764   - LastStage collectionStage;
  744 + LastStage * collectionStage;
765 745  
766 746 QList<ProcessingStage *> processingStages;
767   - QList<SharedBuffer *> sharedBuffers;
768   -
769   - QThreadPool processingThreads;
770 747  
771 748 void _project(const Template &src, Template &dst) const
772 749 {
... ...