Commit 0d7cfeeff9b9405d488ec351a2758910b51c682f

Authored by jklontz
2 parents 9d6685d3 f2165737

Merge pull request #79 from biometrics/distribute_priority

A way to avoid deadlocks when using stream inside a distribute transform
openbr/core/resource.h
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 #include <QSharedPointer> 24 #include <QSharedPointer>
25 #include <QString> 25 #include <QString>
26 #include <QThread> 26 #include <QThread>
  27 +#include <openbr/openbr_plugin.h>
27 28
28 template <typename T> 29 template <typename T>
29 class ResourceMaker 30 class ResourceMaker
@@ -52,7 +53,7 @@ public: @@ -52,7 +53,7 @@ public:
52 : resourceMaker(rm) 53 : resourceMaker(rm)
53 , availableResources(new QList<T*>()) 54 , availableResources(new QList<T*>())
54 , lock(new QMutex()) 55 , lock(new QMutex())
55 - , totalResources(new QSemaphore(QThread::idealThreadCount())) 56 + , totalResources(new QSemaphore(br::Globals->parallelism))
56 {} 57 {}
57 58
58 ~Resource() 59 ~Resource()
openbr/plugins/meta.cpp
@@ -645,17 +645,28 @@ public: @@ -645,17 +645,28 @@ public:
645 QList<TemplateList> input_buffer; 645 QList<TemplateList> input_buffer;
646 input_buffer.reserve(src.size()); 646 input_buffer.reserve(src.size());
647 647
  648 + QFutureSynchronizer<void> futures;
  649 +
648 for (int i =0; i < src.size();i++) { 650 for (int i =0; i < src.size();i++) {
649 input_buffer.append(TemplateList()); 651 input_buffer.append(TemplateList());
650 output_buffer.append(TemplateList()); 652 output_buffer.append(TemplateList());
651 } 653 }
652 -  
653 - QFutureSynchronizer<void> futures; 654 + QList<QFuture<void> > temp;
  655 + temp.reserve(src.size());
654 for (int i=0; i<src.size(); i++) { 656 for (int i=0; i<src.size(); i++) {
655 input_buffer[i].append(src[i]); 657 input_buffer[i].append(src[i]);
656 - if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i]));  
657 - else _projectList( transform, &input_buffer[i], &output_buffer[i]); 658 +
  659 + if (Globals->parallelism > 1) temp.append(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i]));
  660 + else _projectList(transform, &input_buffer[i], &output_buffer[i]);
  661 + }
  662 + // We add the futures in reverse order, since in Qt 5.1 at least the
  663 + // waiting thread will wait on them in the order added (which for uniform priority
  664 + // threads is the order of execution), and we want the waiting thread to go in the opposite order
  665 + // so that it can steal runnables and do something besides wait.
  666 + for (int i = temp.size() - 1; i > 0; i--) {
  667 + futures.addFuture(temp[i]);
658 } 668 }
  669 +
659 futures.waitForFinished(); 670 futures.waitForFinished();
660 671
661 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]); 672 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]);
openbr/plugins/stream.cpp
@@ -160,9 +160,8 @@ class DataSource @@ -160,9 +160,8 @@ class DataSource
160 public: 160 public:
161 DataSource(int maxFrames=500) 161 DataSource(int maxFrames=500)
162 { 162 {
  163 + // The sequence number of the last frame
163 final_frame = -1; 164 final_frame = -1;
164 - last_issued = -2;  
165 - last_received = -3;  
166 for (int i=0; i < maxFrames;i++) 165 for (int i=0; i < maxFrames;i++)
167 { 166 {
168 allFrames.addItem(new FrameData()); 167 allFrames.addItem(new FrameData());
@@ -181,51 +180,64 @@ public: @@ -181,51 +180,64 @@ public:
181 } 180 }
182 181
183 // non-blocking version of getFrame 182 // non-blocking version of getFrame
184 - FrameData * tryGetFrame() 183 + // Returns a NULL FrameData if too many frames are out, or the
  184 + // data source is broken. Sets last_frame to true iff the FrameData
  185 + // returned is the last valid frame, and the data source is now broken.
  186 + FrameData * tryGetFrame(bool & last_frame)
185 { 187 {
  188 + last_frame = false;
  189 +
  190 + if (is_broken) {
  191 + return NULL;
  192 + }
  193 +
  194 + // Try to get a FrameData from the pool, if we can't it means too many
  195 + // frames are already out, and we will return NULL to indicate failure
186 FrameData * aFrame = allFrames.tryGetItem(); 196 FrameData * aFrame = allFrames.tryGetItem();
187 - if (aFrame == NULL) 197 + if (aFrame == NULL) {
188 return NULL; 198 return NULL;
  199 + }
189 200
190 aFrame->data.clear(); 201 aFrame->data.clear();
191 aFrame->sequenceNumber = -1; 202 aFrame->sequenceNumber = -1;
192 203
  204 + // Try to read a frame, if this returns false the data source is broken
193 bool res = getNext(*aFrame); 205 bool res = getNext(*aFrame);
194 206
195 - // The datasource broke.  
196 - if (!res) { 207 + // The datasource broke, update final_frame
  208 + if (!res)
  209 + {
  210 + QMutexLocker lock(&last_frame_update);
  211 + final_frame = lookAhead.back()->sequenceNumber;
197 allFrames.addItem(aFrame); 212 allFrames.addItem(aFrame);
  213 + }
  214 + else lookAhead.push_back(aFrame);
198 215
199 - QMutexLocker lock(&last_frame_update);  
200 - // Did we already receive the last frame?  
201 - final_frame = last_issued; 216 + FrameData * rVal = lookAhead.first();
  217 + lookAhead.pop_front();
202 218
203 - // We got the last frame before the data source broke,  
204 - // better pulse lastReturned  
205 - if (final_frame == last_received) {  
206 - lastReturned.wakeAll();  
207 - }  
208 - else if (final_frame < last_received)  
209 - std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl;  
210 219
211 - return NULL; 220 + if (rVal->sequenceNumber == final_frame) {
  221 + last_frame = true;
  222 + is_broken = true;
212 } 223 }
213 - last_issued = aFrame->sequenceNumber;  
214 - return aFrame; 224 +
  225 + return rVal;
215 } 226 }
216 227
217 - // Returns true if the frame returned was the last 228 + // Return a frame to the pool, returns true if the frame returned was the last
218 // frame issued, false otherwise 229 // frame issued, false otherwise
219 bool returnFrame(FrameData * inputFrame) 230 bool returnFrame(FrameData * inputFrame)
220 { 231 {
  232 + int frameNumber = inputFrame->sequenceNumber;
  233 +
221 allFrames.addItem(inputFrame); 234 allFrames.addItem(inputFrame);
222 235
223 bool rval = false; 236 bool rval = false;
224 237
225 QMutexLocker lock(&last_frame_update); 238 QMutexLocker lock(&last_frame_update);
226 - last_received = inputFrame->sequenceNumber;  
227 239
228 - if (inputFrame->sequenceNumber == final_frame) { 240 + if (frameNumber == final_frame) {
229 // We just received the last frame, better pulse 241 // We just received the last frame, better pulse
230 lastReturned.wakeAll(); 242 lastReturned.wakeAll();
231 rval = true; 243 rval = true;
@@ -240,17 +252,57 @@ public: @@ -240,17 +252,57 @@ public:
240 lastReturned.wait(&last_frame_update); 252 lastReturned.wait(&last_frame_update);
241 } 253 }
242 254
243 - virtual void close() = 0;  
244 - virtual bool open(Template & output, int start_index=0) = 0;  
245 - virtual bool isOpen() = 0; 255 + bool open(Template & output, int start_index = 0)
  256 + {
  257 + is_broken = false;
  258 + // The last frame isn't initialized yet
  259 + final_frame = -1;
  260 + // Start our sequence numbers from the input index
  261 + next_sequence_number = start_index;
  262 +
  263 + // Actually open the data source
  264 + bool open_res = concreteOpen(output);
  265 +
  266 + // We couldn't open the data source
  267 + if (!open_res) {
  268 + is_broken = true;
  269 + return false;
  270 + }
  271 +
  272 + // Try to get a frame from the global pool
  273 + FrameData * firstFrame = allFrames.tryGetItem();
  274 +
  275 + // If this fails, things have gone pretty badly.
  276 + if (firstFrame == NULL) {
  277 + is_broken = true;
  278 + return false;
  279 + }
  280 +
  281 + // Read a frame from the video source
  282 + bool res = getNext(*firstFrame);
  283 +
  284 + // the data source broke already, we couldn't even get one frame
  285 + // from it.
  286 + if (!res) {
  287 + is_broken = true;
  288 + return false;
  289 + }
246 290
  291 + lookAhead.append(firstFrame);
  292 + return true;
  293 + }
  294 +
  295 + virtual bool isOpen()=0;
  296 + virtual bool concreteOpen(Template & output) = 0;
247 virtual bool getNext(FrameData & input) = 0; 297 virtual bool getNext(FrameData & input) = 0;
  298 + virtual void close() = 0;
248 299
  300 + int next_sequence_number;
249 protected: 301 protected:
250 DoubleBuffer allFrames; 302 DoubleBuffer allFrames;
251 int final_frame; 303 int final_frame;
252 - int last_issued;  
253 - int last_received; 304 + bool is_broken;
  305 + QList<FrameData *> lookAhead;
254 306
255 QWaitCondition lastReturned; 307 QWaitCondition lastReturned;
256 QMutex last_frame_update; 308 QMutex last_frame_update;
@@ -262,13 +314,8 @@ class VideoDataSource : public DataSource @@ -262,13 +314,8 @@ class VideoDataSource : public DataSource
262 public: 314 public:
263 VideoDataSource(int maxFrames) : DataSource(maxFrames) {} 315 VideoDataSource(int maxFrames) : DataSource(maxFrames) {}
264 316
265 - bool open(Template &input, int start_index=0) 317 + bool concreteOpen(Template &input)
266 { 318 {
267 - final_frame = -1;  
268 - last_issued = -2;  
269 - last_received = -3;  
270 -  
271 - next_idx = start_index;  
272 basis = input; 319 basis = input;
273 bool is_int = false; 320 bool is_int = false;
274 int anInt = input.file.name.toInt(&is_int); 321 int anInt = input.file.name.toInt(&is_int);
@@ -306,25 +353,31 @@ private: @@ -306,25 +353,31 @@ private:
306 return false; 353 return false;
307 354
308 output.data.append(Template(basis.file)); 355 output.data.append(Template(basis.file));
309 - output.data.last().append(cv::Mat()); 356 + output.data.last().m() = cv::Mat();
310 357
311 - output.sequenceNumber = next_idx;  
312 - next_idx++; 358 + output.sequenceNumber = next_sequence_number;
  359 + next_sequence_number++;
313 360
314 - bool res = video.read(output.data.last().last());  
315 - output.data.last().last() = output.data.last().last().clone(); 361 + cv::Mat temp;
  362 + bool res = video.read(temp);
316 363
317 if (!res) { 364 if (!res) {
  365 + output.data.last().m() = cv::Mat();
318 close(); 366 close();
319 return false; 367 return false;
320 } 368 }
  369 +
  370 + // This clone is critical, if we don't do it then the matrix will
  371 + // be an alias of an internal buffer of the video source, leading
  372 + // to various problems later.
  373 + output.data.last().m() = temp.clone();
  374 +
321 output.data.last().file.set("FrameNumber", output.sequenceNumber); 375 output.data.last().file.set("FrameNumber", output.sequenceNumber);
322 return true; 376 return true;
323 } 377 }
324 378
325 cv::VideoCapture video; 379 cv::VideoCapture video;
326 Template basis; 380 Template basis;
327 - int next_idx;  
328 }; 381 };
329 382
330 // Given a template as input, return its matrices one by one on subsequent calls 383 // Given a template as input, return its matrices one by one on subsequent calls
@@ -334,21 +387,16 @@ class TemplateDataSource : public DataSource @@ -334,21 +387,16 @@ class TemplateDataSource : public DataSource
334 public: 387 public:
335 TemplateDataSource(int maxFrames) : DataSource(maxFrames) 388 TemplateDataSource(int maxFrames) : DataSource(maxFrames)
336 { 389 {
337 - current_idx = INT_MAX; 390 + current_matrix_idx = INT_MAX;
338 data_ok = false; 391 data_ok = false;
339 } 392 }
340 - bool data_ok;  
341 393
342 - bool open(Template &input, int start_index=0) 394 + bool concreteOpen(Template &input)
343 { 395 {
344 basis = input; 396 basis = input;
345 - current_idx = 0;  
346 - next_sequence = start_index;  
347 - final_frame = -1;  
348 - last_issued = -2;  
349 - last_received = -3; 397 + current_matrix_idx = 0;
350 398
351 - data_ok = current_idx < basis.size(); 399 + data_ok = current_matrix_idx < basis.size();
352 return data_ok; 400 return data_ok;
353 } 401 }
354 402
@@ -358,39 +406,41 @@ public: @@ -358,39 +406,41 @@ public:
358 406
359 void close() 407 void close()
360 { 408 {
361 - current_idx = INT_MAX; 409 + current_matrix_idx = INT_MAX;
362 basis.clear(); 410 basis.clear();
363 } 411 }
364 412
365 private: 413 private:
366 bool getNext(FrameData & output) 414 bool getNext(FrameData & output)
367 { 415 {
368 - data_ok = current_idx < basis.size(); 416 + data_ok = current_matrix_idx < basis.size();
369 if (!data_ok) 417 if (!data_ok)
370 return false; 418 return false;
371 419
372 - output.data.append(basis[current_idx]);  
373 - current_idx++; 420 + output.data.append(basis[current_matrix_idx]);
  421 + current_matrix_idx++;
374 422
375 - output.sequenceNumber = next_sequence;  
376 - next_sequence++; 423 + output.sequenceNumber = next_sequence_number;
  424 + next_sequence_number++;
377 425
378 output.data.last().file.set("FrameNumber", output.sequenceNumber); 426 output.data.last().file.set("FrameNumber", output.sequenceNumber);
379 return true; 427 return true;
380 } 428 }
381 429
382 Template basis; 430 Template basis;
383 - int current_idx;  
384 - int next_sequence; 431 + // Index of the next matrix to output from the template
  432 + int current_matrix_idx;
  433 +
  434 + // is current_matrix_idx in bounds?
  435 + bool data_ok;
385 }; 436 };
386 437
387 -// Given a template as input, create a VideoDataSource or a TemplateDataSource  
388 -// depending on whether or not it looks like the input template has already  
389 -// loaded frames into memory. 438 +// Given a templatelist as input, create appropriate data source for each
  439 +// individual template
390 class DataSourceManager : public DataSource 440 class DataSourceManager : public DataSource
391 { 441 {
392 public: 442 public:
393 - DataSourceManager() 443 + DataSourceManager() : DataSource(500)
394 { 444 {
395 actualSource = NULL; 445 actualSource = NULL;
396 } 446 }
@@ -411,29 +461,25 @@ public: @@ -411,29 +461,25 @@ public:
411 461
412 bool open(TemplateList & input) 462 bool open(TemplateList & input)
413 { 463 {
414 - currentIdx = 0; 464 + current_template_idx = 0;
415 templates = input; 465 templates = input;
416 466
417 - return open(templates[currentIdx]); 467 + return DataSource::open(templates[current_template_idx]);
418 } 468 }
419 469
420 - bool open(Template & input, int start_index=0) 470 + bool concreteOpen(Template & input)
421 { 471 {
422 close(); 472 close();
423 - final_frame = -1;  
424 - last_issued = -2;  
425 - last_received = -3;  
426 - next_frame = start_index;  
427 473
428 // Input has no matrices? Its probably a video that hasn't been loaded yet 474 // Input has no matrices? Its probably a video that hasn't been loaded yet
429 if (input.empty()) { 475 if (input.empty()) {
430 actualSource = new VideoDataSource(0); 476 actualSource = new VideoDataSource(0);
431 - actualSource->open(input, next_frame); 477 + actualSource->concreteOpen(input);
432 } 478 }
433 else { 479 else {
434 // create frame dealer 480 // create frame dealer
435 actualSource = new TemplateDataSource(0); 481 actualSource = new TemplateDataSource(0);
436 - actualSource->open(input, next_frame); 482 + actualSource->concreteOpen(input);
437 } 483 }
438 if (!isOpen()) { 484 if (!isOpen()) {
439 delete actualSource; 485 delete actualSource;
@@ -446,30 +492,47 @@ public: @@ -446,30 +492,47 @@ public:
446 bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } 492 bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }
447 493
448 protected: 494 protected:
449 - int currentIdx;  
450 - int next_frame; 495 + // Index of the template in the templatelist we are currently reading from
  496 + int current_template_idx;
  497 +
451 TemplateList templates; 498 TemplateList templates;
452 DataSource * actualSource; 499 DataSource * actualSource;
453 bool getNext(FrameData & output) 500 bool getNext(FrameData & output)
454 { 501 {
455 bool res = actualSource->getNext(output); 502 bool res = actualSource->getNext(output);
  503 + output.sequenceNumber = next_sequence_number;
  504 +
456 if (res) { 505 if (res) {
457 - next_frame = output.sequenceNumber+1; 506 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  507 + next_sequence_number++;
  508 + if (output.data.last().last().empty())
  509 + qDebug("broken matrix");
458 return true; 510 return true;
459 } 511 }
460 512
  513 +
461 while(!res) { 514 while(!res) {
462 - currentIdx++; 515 + output.data.clear();
  516 + current_template_idx++;
463 517
464 - if (currentIdx >= templates.size()) 518 + // No more templates? We're done
  519 + if (current_template_idx >= templates.size())
465 return false; 520 return false;
466 - bool open_res = open(templates[currentIdx], next_frame); 521 +
  522 + // open the next data source
  523 + bool open_res = concreteOpen(templates[current_template_idx]);
467 if (!open_res) 524 if (!open_res)
468 return false; 525 return false;
  526 +
  527 + // get a frame from it
469 res = actualSource->getNext(output); 528 res = actualSource->getNext(output);
470 } 529 }
  530 + output.sequenceNumber = next_sequence_number++;
  531 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  532 +
  533 + if (output.data.last().last().empty())
  534 + qDebug("broken matrix");
471 535
472 - next_frame = output.sequenceNumber+1;  
473 return res; 536 return res;
474 } 537 }
475 538
@@ -477,9 +540,14 @@ protected: @@ -477,9 +540,14 @@ protected:
477 540
478 class ProcessingStage; 541 class ProcessingStage;
479 542
480 -class BasicLoop : public QRunnable 543 +class BasicLoop : public QRunnable, public QFutureInterface<void>
481 { 544 {
482 public: 545 public:
  546 + BasicLoop()
  547 + {
  548 + this->reportStarted();
  549 + }
  550 +
483 void run(); 551 void run();
484 552
485 QList<ProcessingStage *> * stages; 553 QList<ProcessingStage *> * stages;
@@ -505,13 +573,13 @@ public: @@ -505,13 +573,13 @@ public:
505 int stage_id; 573 int stage_id;
506 574
507 virtual void reset()=0; 575 virtual void reset()=0;
508 -  
509 protected: 576 protected:
510 int thread_count; 577 int thread_count;
511 578
512 SharedBuffer * inputBuffer; 579 SharedBuffer * inputBuffer;
513 ProcessingStage * nextStage; 580 ProcessingStage * nextStage;
514 QList<ProcessingStage *> * stages; 581 QList<ProcessingStage *> * stages;
  582 + QThreadPool * threads;
515 Transform * transform; 583 Transform * transform;
516 584
517 }; 585 };
@@ -530,6 +598,7 @@ void BasicLoop::run() @@ -530,6 +598,7 @@ void BasicLoop::run()
530 current_idx++; 598 current_idx++;
531 current_idx = current_idx % stages->size(); 599 current_idx = current_idx % stages->size();
532 } 600 }
  601 + this->reportFinished();
533 } 602 }
534 603
535 class MultiThreadStage : public ProcessingStage 604 class MultiThreadStage : public ProcessingStage
@@ -564,7 +633,6 @@ public: @@ -564,7 +633,6 @@ public:
564 } 633 }
565 }; 634 };
566 635
567 -  
568 class SingleThreadStage : public ProcessingStage 636 class SingleThreadStage : public ProcessingStage
569 { 637 {
570 public: 638 public:
@@ -627,18 +695,20 @@ public: @@ -627,18 +695,20 @@ public:
627 lock.unlock(); 695 lock.unlock();
628 696
629 if (newItem) 697 if (newItem)
630 - {  
631 - BasicLoop * next = new BasicLoop();  
632 - next->stages = stages;  
633 - next->start_idx = this->stage_id;  
634 - next->startItem = newItem;  
635 -  
636 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);  
637 - } 698 + startThread(newItem);
638 699
639 return input; 700 return input;
640 } 701 }
641 702
  703 + void startThread(br::FrameData * newItem)
  704 + {
  705 + BasicLoop * next = new BasicLoop();
  706 + next->stages = stages;
  707 + next->start_idx = this->stage_id;
  708 + next->startItem = newItem;
  709 + this->threads->start(next, stages->size() - stage_id);
  710 + }
  711 +
642 712
643 // Calledfrom a different thread than run. 713 // Calledfrom a different thread than run.
644 bool tryAcquireNextStage(FrameData *& input) 714 bool tryAcquireNextStage(FrameData *& input)
@@ -674,7 +744,7 @@ public: @@ -674,7 +744,7 @@ public:
674 }; 744 };
675 745
676 // No input buffer, instead we draw templates from some data source 746 // No input buffer, instead we draw templates from some data source
677 -// Will be operated by the main thread for the stream 747 +// Will be operated by the main thread for the stream. starts threads
678 class FirstStage : public SingleThreadStage 748 class FirstStage : public SingleThreadStage
679 { 749 {
680 public: 750 public:
@@ -684,44 +754,51 @@ public: @@ -684,44 +754,51 @@ public:
684 754
685 FrameData * run(FrameData * input, bool & should_continue) 755 FrameData * run(FrameData * input, bool & should_continue)
686 { 756 {
687 - // Is there anything on our input buffer? If so we should start a thread with that. 757 + // Try to get a frame from the datasource
688 QWriteLocker lock(&statusLock); 758 QWriteLocker lock(&statusLock);
689 - input = dataSource.tryGetFrame();  
690 - // Datasource broke?  
691 - if (!input) 759 + bool last_frame = false;
  760 + input = dataSource.tryGetFrame(last_frame);
  761 +
  762 + // Datasource broke, or is currently out of frames?
  763 + if (!input || last_frame)
692 { 764 {
  765 + // We will just stop and not continue.
693 currentStatus = STOPPING; 766 currentStatus = STOPPING;
694 - should_continue = false;  
695 - return NULL; 767 + if (!input) {
  768 + should_continue = false;
  769 + return NULL;
  770 + }
696 } 771 }
697 lock.unlock(); 772 lock.unlock();
698 - 773 + // Can we enter the next stage?
699 should_continue = nextStage->tryAcquireNextStage(input); 774 should_continue = nextStage->tryAcquireNextStage(input);
700 775
701 - BasicLoop * next = new BasicLoop();  
702 - next->stages = stages;  
703 - next->start_idx = this->stage_id;  
704 - next->startItem = NULL;  
705 -  
706 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); 776 + // We are exiting leaving this stage, should we start another
  777 + // thread here? Normally we will always re-queue a thread on
  778 + // the first stage, but if we received the last frame there is
  779 + // no need to.
  780 + if (!last_frame) {
  781 + startThread(NULL);
  782 + }
707 783
708 return input; 784 return input;
709 } 785 }
710 786
711 - // Calledfrom a different thread than run. 787 + // The last stage, trying to access the first stage
712 bool tryAcquireNextStage(FrameData *& input) 788 bool tryAcquireNextStage(FrameData *& input)
713 { 789 {
  790 + // Return the frame, was it the last one?
714 bool was_last = dataSource.returnFrame(input); 791 bool was_last = dataSource.returnFrame(input);
715 input = NULL; 792 input = NULL;
  793 +
  794 + // OK we won't continue.
716 if (was_last) { 795 if (was_last) {
717 return false; 796 return false;
718 } 797 }
719 798
720 - if (!dataSource.isOpen())  
721 - return false;  
722 -  
723 QReadLocker lock(&statusLock); 799 QReadLocker lock(&statusLock);
724 - // Thread is already running, we should just return 800 + // A thread is already in the first stage,
  801 + // we should just return
725 if (currentStatus == STARTING) 802 if (currentStatus == STARTING)
726 { 803 {
727 return false; 804 return false;
@@ -744,6 +821,7 @@ public: @@ -744,6 +821,7 @@ public:
744 821
745 }; 822 };
746 823
  824 +// starts threads
747 class LastStage : public SingleThreadStage 825 class LastStage : public SingleThreadStage
748 { 826 {
749 public: 827 public:
@@ -774,11 +852,14 @@ public: @@ -774,11 +852,14 @@ public:
774 } 852 }
775 next_target = input->sequenceNumber + 1; 853 next_target = input->sequenceNumber + 1;
776 854
  855 + // add the item to our output buffer
777 collectedOutput.append(input->data); 856 collectedOutput.append(input->data);
778 857
  858 + // Can we enter the read stage?
779 should_continue = nextStage->tryAcquireNextStage(input); 859 should_continue = nextStage->tryAcquireNextStage(input);
780 860
781 - // Is there anything on our input buffer? If so we should start a thread with that. 861 + // Is there anything on our input buffer? If so we should start a thread
  862 + // in this stage to process that frame.
782 QWriteLocker lock(&statusLock); 863 QWriteLocker lock(&statusLock);
783 FrameData * newItem = inputBuffer->tryGetItem(); 864 FrameData * newItem = inputBuffer->tryGetItem();
784 if (!newItem) 865 if (!newItem)
@@ -788,23 +869,18 @@ public: @@ -788,23 +869,18 @@ public:
788 lock.unlock(); 869 lock.unlock();
789 870
790 if (newItem) 871 if (newItem)
791 - {  
792 - BasicLoop * next = new BasicLoop();  
793 - next->stages = stages;  
794 - next->start_idx = this->stage_id;  
795 - next->startItem = newItem;  
796 -  
797 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);  
798 - } 872 + startThread(newItem);
799 873
800 return input; 874 return input;
801 } 875 }
802 }; 876 };
803 877
  878 +
804 class StreamTransform : public CompositeTransform 879 class StreamTransform : public CompositeTransform
805 { 880 {
806 Q_OBJECT 881 Q_OBJECT
807 public: 882 public:
  883 +
808 void train(const TemplateList & data) 884 void train(const TemplateList & data)
809 { 885 {
810 foreach(Transform * transform, transforms) { 886 foreach(Transform * transform, transforms) {
@@ -834,21 +910,17 @@ public: @@ -834,21 +910,17 @@ public:
834 bool res = readStage->dataSource.open(dst); 910 bool res = readStage->dataSource.open(dst);
835 if (!res) return; 911 if (!res) return;
836 912
837 - QThreadPool::globalInstance()->releaseThread(); 913 + // Start the first thread in the stream.
838 readStage->currentStatus = SingleThreadStage::STARTING; 914 readStage->currentStatus = SingleThreadStage::STARTING;
  915 + readStage->startThread(NULL);
839 916
840 - BasicLoop loop;  
841 - loop.stages = &this->processingStages;  
842 - loop.start_idx = 0;  
843 - loop.startItem = NULL;  
844 - loop.setAutoDelete(false);  
845 -  
846 - QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id);  
847 -  
848 - // Wait for the end. 917 + // Wait for the stream to reach the last frame available from
  918 + // the data source.
849 readStage->dataSource.waitLast(); 919 readStage->dataSource.waitLast();
850 - QThreadPool::globalInstance()->reserveThread();  
851 920
  921 + // Now that there are no more incoming frames, call finalize
  922 + // on each transform in turn to collect any last templates
  923 + // they wish to issue.
852 TemplateList final_output; 924 TemplateList final_output;
853 925
854 // Push finalize through the stages 926 // Push finalize through the stages
@@ -864,7 +936,8 @@ public: @@ -864,7 +936,8 @@ public:
864 final_output.append(output_set); 936 final_output.append(output_set);
865 } 937 }
866 938
867 - // dst is set to all output received by the final stage 939 + // dst is set to all output received by the final stage, along
  940 + // with anything output via the calls to finalize.
868 dst = collectionStage->getOutput(); 941 dst = collectionStage->getOutput();
869 dst.append(final_output); 942 dst.append(final_output);
870 943
@@ -876,7 +949,8 @@ public: @@ -876,7 +949,8 @@ public:
876 virtual void finalize(TemplateList & output) 949 virtual void finalize(TemplateList & output)
877 { 950 {
878 (void) output; 951 (void) output;
879 - // Not handling this yet -cao 952 + // Nothing in particular to do here, stream calls finalize
  953 + // on all child transforms as part of projectUpdate
880 } 954 }
881 955
882 // Create and link stages 956 // Create and link stages
@@ -884,6 +958,19 @@ public: @@ -884,6 +958,19 @@ public:
884 { 958 {
885 if (transforms.isEmpty()) return; 959 if (transforms.isEmpty()) return;
886 960
  961 + // We share a thread pool across streams attached to the same
  962 + // parent tranform, retrieve or create a thread pool based
  963 + // on our parent transform.
  964 + QMutexLocker poolLock(&poolsAccess);
  965 + QHash<QObject *, QThreadPool *>::Iterator it;
  966 + if (!pools.contains(this->parent())) {
  967 + it = pools.insert(this->parent(), new QThreadPool(this));
  968 + it.value()->setMaxThreadCount(Globals->parallelism);
  969 + }
  970 + else it = pools.find(this->parent());
  971 + threads = it.value();
  972 + poolLock.unlock();
  973 +
887 stage_variance.reserve(transforms.size()); 974 stage_variance.reserve(transforms.size());
888 foreach (const br::Transform *transform, transforms) { 975 foreach (const br::Transform *transform, transforms) {
889 stage_variance.append(transform->timeVarying()); 976 stage_variance.append(transform->timeVarying());
@@ -894,6 +981,7 @@ public: @@ -894,6 +981,7 @@ public:
894 processingStages.push_back(readStage); 981 processingStages.push_back(readStage);
895 readStage->stage_id = 0; 982 readStage->stage_id = 0;
896 readStage->stages = &this->processingStages; 983 readStage->stages = &this->processingStages;
  984 + readStage->threads = this->threads;
897 985
898 int next_stage_id = 1; 986 int next_stage_id = 1;
899 987
@@ -901,9 +989,7 @@ public: @@ -901,9 +989,7 @@ public:
901 for (int i =0; i < transforms.size(); i++) 989 for (int i =0; i < transforms.size(); i++)
902 { 990 {
903 if (stage_variance[i]) 991 if (stage_variance[i])
904 - {  
905 processingStages.append(new SingleThreadStage(prev_stage_variance)); 992 processingStages.append(new SingleThreadStage(prev_stage_variance));
906 - }  
907 else 993 else
908 processingStages.append(new MultiThreadStage(Globals->parallelism)); 994 processingStages.append(new MultiThreadStage(Globals->parallelism));
909 995
@@ -914,6 +1000,7 @@ public: @@ -914,6 +1000,7 @@ public:
914 processingStages[i]->nextStage = processingStages[i+1]; 1000 processingStages[i]->nextStage = processingStages[i+1];
915 1001
916 processingStages.last()->stages = &this->processingStages; 1002 processingStages.last()->stages = &this->processingStages;
  1003 + processingStages.last()->threads = this->threads;
917 1004
918 processingStages.last()->transform = transforms[i]; 1005 processingStages.last()->transform = transforms[i];
919 prev_stage_variance = stage_variance[i]; 1006 prev_stage_variance = stage_variance[i];
@@ -923,6 +1010,7 @@ public: @@ -923,6 +1010,7 @@ public:
923 processingStages.append(collectionStage); 1010 processingStages.append(collectionStage);
924 collectionStage->stage_id = next_stage_id; 1011 collectionStage->stage_id = next_stage_id;
925 collectionStage->stages = &this->processingStages; 1012 collectionStage->stages = &this->processingStages;
  1013 + collectionStage->threads = this->threads;
926 1014
927 processingStages[processingStages.size() - 2]->nextStage = collectionStage; 1015 processingStages[processingStages.size() - 2]->nextStage = collectionStage;
928 1016
@@ -945,6 +1033,10 @@ protected: @@ -945,6 +1033,10 @@ protected:
945 1033
946 QList<ProcessingStage *> processingStages; 1034 QList<ProcessingStage *> processingStages;
947 1035
  1036 + static QHash<QObject *, QThreadPool *> pools;
  1037 + static QMutex poolsAccess;
  1038 + QThreadPool * threads;
  1039 +
948 void _project(const Template &src, Template &dst) const 1040 void _project(const Template &src, Template &dst) const
949 { 1041 {
950 (void) src; (void) dst; 1042 (void) src; (void) dst;
@@ -957,6 +1049,9 @@ protected: @@ -957,6 +1049,9 @@ protected:
957 } 1049 }
958 }; 1050 };
959 1051
  1052 +QHash<QObject *, QThreadPool *> StreamTransform::pools;
  1053 +QMutex StreamTransform::poolsAccess;
  1054 +
960 BR_REGISTER(Transform, StreamTransform) 1055 BR_REGISTER(Transform, StreamTransform)
961 1056
962 1057