Commit 7a47d58c67046fe65fd7978dc8a1dae7041f343e

Authored by Charles Otto
1 parent 50752fe9

Some code cleanup, also changes to thread pooling

Don't use the global thread pool for streams. Micro-managing the global thread
pool's active thread count has proven infeasible, therefore in order to avoid
thread based deadlocks, we don't use the global thread pool. Instead, we share
thread pools across sibling Stream transforms.

Misc. code cleanup and better last frame detection.
Showing 1 changed file with 226 additions and 139 deletions
openbr/plugins/stream.cpp
@@ -160,9 +160,8 @@ class DataSource @@ -160,9 +160,8 @@ class DataSource
160 public: 160 public:
161 DataSource(int maxFrames=500) 161 DataSource(int maxFrames=500)
162 { 162 {
  163 + // The sequence number of the last frame
163 final_frame = -1; 164 final_frame = -1;
164 - last_issued = -2;  
165 - last_received = -3;  
166 for (int i=0; i < maxFrames;i++) 165 for (int i=0; i < maxFrames;i++)
167 { 166 {
168 allFrames.addItem(new FrameData()); 167 allFrames.addItem(new FrameData());
@@ -181,57 +180,64 @@ public: @@ -181,57 +180,64 @@ public:
181 } 180 }
182 181
183 // non-blocking version of getFrame 182 // non-blocking version of getFrame
184 - FrameData * tryGetFrame() 183 + // Returns a NULL FrameData if too many frames are out, or the
  184 + // data source is broken. Sets last_frame to true iff the FrameData
  185 + // returned is the last valid frame, and the data source is now broken.
  186 + FrameData * tryGetFrame(bool & last_frame)
185 { 187 {
  188 + last_frame = false;
  189 +
  190 + if (is_broken) {
  191 + return NULL;
  192 + }
  193 +
  194 + // Try to get a FrameData from the pool, if we can't it means too many
  195 + // frames are already out, and we will return NULL to indicate failure
186 FrameData * aFrame = allFrames.tryGetItem(); 196 FrameData * aFrame = allFrames.tryGetItem();
187 - if (aFrame == NULL) 197 + if (aFrame == NULL) {
188 return NULL; 198 return NULL;
  199 + }
189 200
190 aFrame->data.clear(); 201 aFrame->data.clear();
191 aFrame->sequenceNumber = -1; 202 aFrame->sequenceNumber = -1;
192 203
  204 + // Try to read a frame, if this returns false the data source is broken
193 bool res = getNext(*aFrame); 205 bool res = getNext(*aFrame);
194 206
195 - // The datasource broke.  
196 - if (!res) { 207 + // The datasource broke, update final_frame
  208 + if (!res)
  209 + {
  210 + QMutexLocker lock(&last_frame_update);
  211 + final_frame = lookAhead.back()->sequenceNumber;
197 allFrames.addItem(aFrame); 212 allFrames.addItem(aFrame);
  213 + }
  214 + else lookAhead.push_back(aFrame);
198 215
199 - QMutexLocker lock(&last_frame_update);  
200 - // Did we already receive the last frame?  
201 - final_frame = last_issued; 216 + FrameData * rVal = lookAhead.first();
  217 + lookAhead.pop_front();
202 218
203 - // We got the last frame before the data source broke,  
204 - // better pulse lastReturned  
205 - if (final_frame == last_received) {  
206 - lastReturned.wakeAll();  
207 - }  
208 - else if (final_frame < last_received)  
209 - std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl;  
210 219
211 - return NULL; 220 + if (rVal->sequenceNumber == final_frame) {
  221 + last_frame = true;
  222 + is_broken = true;
212 } 223 }
213 - last_issued = aFrame->sequenceNumber;  
214 - return aFrame; 224 +
  225 + return rVal;
215 } 226 }
216 227
217 - // Returns true if the frame returned was the last 228 + // Return a frame to the pool, returns true if the frame returned was the last
218 // frame issued, false otherwise 229 // frame issued, false otherwise
219 bool returnFrame(FrameData * inputFrame) 230 bool returnFrame(FrameData * inputFrame)
220 { 231 {
  232 + int frameNumber = inputFrame->sequenceNumber;
  233 +
221 allFrames.addItem(inputFrame); 234 allFrames.addItem(inputFrame);
222 235
223 bool rval = false; 236 bool rval = false;
224 237
225 QMutexLocker lock(&last_frame_update); 238 QMutexLocker lock(&last_frame_update);
226 - last_received = inputFrame->sequenceNumber;  
227 -  
228 - if (inputFrame->sequenceNumber == final_frame) {  
229 - // This is the reserveThread that matches the releaseThread in  
230 - // Stream::projectUpdate, we do it here to prevent a gap where the  
231 - // thread count is still increased, but a stream worker thread is not  
232 - // running, which might allow something to start that shouldn't  
233 - // start yet.  
234 - QThreadPool::globalInstance()->reserveThread(); 239 +
  240 + if (frameNumber == final_frame) {
235 // We just received the last frame, better pulse 241 // We just received the last frame, better pulse
236 lastReturned.wakeAll(); 242 lastReturned.wakeAll();
237 rval = true; 243 rval = true;
@@ -246,17 +252,57 @@ public: @@ -246,17 +252,57 @@ public:
246 lastReturned.wait(&last_frame_update); 252 lastReturned.wait(&last_frame_update);
247 } 253 }
248 254
249 - virtual void close() = 0;  
250 - virtual bool open(Template & output, int start_index=0) = 0;  
251 - virtual bool isOpen() = 0; 255 + bool open(Template & output, int start_index = 0)
  256 + {
  257 + is_broken = false;
  258 + // The last frame isn't initialized yet
  259 + final_frame = -1;
  260 + // Start our sequence numbers from the input index
  261 + next_sequence_number = start_index;
  262 +
  263 + // Actually open the data source
  264 + bool open_res = concreteOpen(output);
  265 +
  266 + // We couldn't open the data source
  267 + if (!open_res) {
  268 + is_broken = true;
  269 + return false;
  270 + }
  271 +
  272 + // Try to get a frame from the global pool
  273 + FrameData * firstFrame = allFrames.tryGetItem();
  274 +
  275 + // If this fails, things have gone pretty badly.
  276 + if (firstFrame == NULL) {
  277 + is_broken = true;
  278 + return false;
  279 + }
  280 +
  281 + // Read a frame from the video source
  282 + bool res = getNext(*firstFrame);
  283 +
  284 + // the data source broke already, we couldn't even get one frame
  285 + // from it.
  286 + if (!res) {
  287 + is_broken = true;
  288 + return false;
  289 + }
  290 +
  291 + lookAhead.append(firstFrame);
  292 + return true;
  293 + }
252 294
  295 + virtual bool isOpen()=0;
  296 + virtual bool concreteOpen(Template & output) = 0;
253 virtual bool getNext(FrameData & input) = 0; 297 virtual bool getNext(FrameData & input) = 0;
  298 + virtual void close() = 0;
254 299
  300 + int next_sequence_number;
255 protected: 301 protected:
256 DoubleBuffer allFrames; 302 DoubleBuffer allFrames;
257 int final_frame; 303 int final_frame;
258 - int last_issued;  
259 - int last_received; 304 + bool is_broken;
  305 + QList<FrameData *> lookAhead;
260 306
261 QWaitCondition lastReturned; 307 QWaitCondition lastReturned;
262 QMutex last_frame_update; 308 QMutex last_frame_update;
@@ -268,13 +314,8 @@ class VideoDataSource : public DataSource @@ -268,13 +314,8 @@ class VideoDataSource : public DataSource
268 public: 314 public:
269 VideoDataSource(int maxFrames) : DataSource(maxFrames) {} 315 VideoDataSource(int maxFrames) : DataSource(maxFrames) {}
270 316
271 - bool open(Template &input, int start_index=0) 317 + bool concreteOpen(Template &input)
272 { 318 {
273 - final_frame = -1;  
274 - last_issued = -2;  
275 - last_received = -3;  
276 -  
277 - next_idx = start_index;  
278 basis = input; 319 basis = input;
279 bool is_int = false; 320 bool is_int = false;
280 int anInt = input.file.name.toInt(&is_int); 321 int anInt = input.file.name.toInt(&is_int);
@@ -309,25 +350,31 @@ private: @@ -309,25 +350,31 @@ private:
309 return false; 350 return false;
310 351
311 output.data.append(Template(basis.file)); 352 output.data.append(Template(basis.file));
312 - output.data.last().append(cv::Mat()); 353 + output.data.last().m() = cv::Mat();
313 354
314 - output.sequenceNumber = next_idx;  
315 - next_idx++; 355 + output.sequenceNumber = next_sequence_number;
  356 + next_sequence_number++;
316 357
317 - bool res = video.read(output.data.last().last());  
318 - output.data.last().last() = output.data.last().last().clone(); 358 + cv::Mat temp;
  359 + bool res = video.read(temp);
319 360
320 if (!res) { 361 if (!res) {
  362 + output.data.last().m() = cv::Mat();
321 close(); 363 close();
322 return false; 364 return false;
323 } 365 }
  366 +
  367 + // This clone is critical, if we don't do it then the matrix will
  368 + // be an alias of an internal buffer of the video source, leading
  369 + // to various problems later.
  370 + output.data.last().m() = temp.clone();
  371 +
324 output.data.last().file.set("FrameNumber", output.sequenceNumber); 372 output.data.last().file.set("FrameNumber", output.sequenceNumber);
325 return true; 373 return true;
326 } 374 }
327 375
328 cv::VideoCapture video; 376 cv::VideoCapture video;
329 Template basis; 377 Template basis;
330 - int next_idx;  
331 }; 378 };
332 379
333 // Given a template as input, return its matrices one by one on subsequent calls 380 // Given a template as input, return its matrices one by one on subsequent calls
@@ -337,21 +384,16 @@ class TemplateDataSource : public DataSource @@ -337,21 +384,16 @@ class TemplateDataSource : public DataSource
337 public: 384 public:
338 TemplateDataSource(int maxFrames) : DataSource(maxFrames) 385 TemplateDataSource(int maxFrames) : DataSource(maxFrames)
339 { 386 {
340 - current_idx = INT_MAX; 387 + current_matrix_idx = INT_MAX;
341 data_ok = false; 388 data_ok = false;
342 } 389 }
343 - bool data_ok;  
344 390
345 - bool open(Template &input, int start_index=0) 391 + bool concreteOpen(Template &input)
346 { 392 {
347 basis = input; 393 basis = input;
348 - current_idx = 0;  
349 - next_sequence = start_index;  
350 - final_frame = -1;  
351 - last_issued = -2;  
352 - last_received = -3; 394 + current_matrix_idx = 0;
353 395
354 - data_ok = current_idx < basis.size(); 396 + data_ok = current_matrix_idx < basis.size();
355 return data_ok; 397 return data_ok;
356 } 398 }
357 399
@@ -361,39 +403,41 @@ public: @@ -361,39 +403,41 @@ public:
361 403
362 void close() 404 void close()
363 { 405 {
364 - current_idx = INT_MAX; 406 + current_matrix_idx = INT_MAX;
365 basis.clear(); 407 basis.clear();
366 } 408 }
367 409
368 private: 410 private:
369 bool getNext(FrameData & output) 411 bool getNext(FrameData & output)
370 { 412 {
371 - data_ok = current_idx < basis.size(); 413 + data_ok = current_matrix_idx < basis.size();
372 if (!data_ok) 414 if (!data_ok)
373 return false; 415 return false;
374 416
375 - output.data.append(basis[current_idx]);  
376 - current_idx++; 417 + output.data.append(basis[current_matrix_idx]);
  418 + current_matrix_idx++;
377 419
378 - output.sequenceNumber = next_sequence;  
379 - next_sequence++; 420 + output.sequenceNumber = next_sequence_number;
  421 + next_sequence_number++;
380 422
381 output.data.last().file.set("FrameNumber", output.sequenceNumber); 423 output.data.last().file.set("FrameNumber", output.sequenceNumber);
382 return true; 424 return true;
383 } 425 }
384 426
385 Template basis; 427 Template basis;
386 - int current_idx;  
387 - int next_sequence; 428 + // Index of the next matrix to output from the template
  429 + int current_matrix_idx;
  430 +
  431 + // is current_matrix_idx in bounds?
  432 + bool data_ok;
388 }; 433 };
389 434
390 -// Given a template as input, create a VideoDataSource or a TemplateDataSource  
391 -// depending on whether or not it looks like the input template has already  
392 -// loaded frames into memory. 435 +// Given a templatelist as input, create appropriate data source for each
  436 +// individual template
393 class DataSourceManager : public DataSource 437 class DataSourceManager : public DataSource
394 { 438 {
395 public: 439 public:
396 - DataSourceManager() 440 + DataSourceManager() : DataSource(500)
397 { 441 {
398 actualSource = NULL; 442 actualSource = NULL;
399 } 443 }
@@ -414,29 +458,25 @@ public: @@ -414,29 +458,25 @@ public:
414 458
415 bool open(TemplateList & input) 459 bool open(TemplateList & input)
416 { 460 {
417 - currentIdx = 0; 461 + current_template_idx = 0;
418 templates = input; 462 templates = input;
419 463
420 - return open(templates[currentIdx]); 464 + return DataSource::open(templates[current_template_idx]);
421 } 465 }
422 466
423 - bool open(Template & input, int start_index=0) 467 + bool concreteOpen(Template & input)
424 { 468 {
425 close(); 469 close();
426 - final_frame = -1;  
427 - last_issued = -2;  
428 - last_received = -3;  
429 - next_frame = start_index;  
430 470
431 // Input has no matrices? Its probably a video that hasn't been loaded yet 471 // Input has no matrices? Its probably a video that hasn't been loaded yet
432 if (input.empty()) { 472 if (input.empty()) {
433 actualSource = new VideoDataSource(0); 473 actualSource = new VideoDataSource(0);
434 - actualSource->open(input, next_frame); 474 + actualSource->concreteOpen(input);
435 } 475 }
436 else { 476 else {
437 // create frame dealer 477 // create frame dealer
438 actualSource = new TemplateDataSource(0); 478 actualSource = new TemplateDataSource(0);
439 - actualSource->open(input, next_frame); 479 + actualSource->concreteOpen(input);
440 } 480 }
441 if (!isOpen()) { 481 if (!isOpen()) {
442 delete actualSource; 482 delete actualSource;
@@ -449,30 +489,47 @@ public: @@ -449,30 +489,47 @@ public:
449 bool isOpen() { return !actualSource ? false : actualSource->isOpen(); } 489 bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }
450 490
451 protected: 491 protected:
452 - int currentIdx;  
453 - int next_frame; 492 + // Index of the template in the templatelist we are currently reading from
  493 + int current_template_idx;
  494 +
454 TemplateList templates; 495 TemplateList templates;
455 DataSource * actualSource; 496 DataSource * actualSource;
456 bool getNext(FrameData & output) 497 bool getNext(FrameData & output)
457 { 498 {
458 bool res = actualSource->getNext(output); 499 bool res = actualSource->getNext(output);
  500 + output.sequenceNumber = next_sequence_number;
  501 +
459 if (res) { 502 if (res) {
460 - next_frame = output.sequenceNumber+1; 503 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  504 + next_sequence_number++;
  505 + if (output.data.last().last().empty())
  506 + qDebug("broken matrix");
461 return true; 507 return true;
462 } 508 }
463 509
  510 +
464 while(!res) { 511 while(!res) {
465 - currentIdx++; 512 + output.data.clear();
  513 + current_template_idx++;
466 514
467 - if (currentIdx >= templates.size()) 515 + // No more templates? We're done
  516 + if (current_template_idx >= templates.size())
468 return false; 517 return false;
469 - bool open_res = open(templates[currentIdx], next_frame); 518 +
  519 + // open the next data source
  520 + bool open_res = concreteOpen(templates[current_template_idx]);
470 if (!open_res) 521 if (!open_res)
471 return false; 522 return false;
  523 +
  524 + // get a frame from it
472 res = actualSource->getNext(output); 525 res = actualSource->getNext(output);
473 } 526 }
  527 + output.sequenceNumber = next_sequence_number++;
  528 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  529 +
  530 + if (output.data.last().last().empty())
  531 + qDebug("broken matrix");
474 532
475 - next_frame = output.sequenceNumber+1;  
476 return res; 533 return res;
477 } 534 }
478 535
@@ -480,9 +537,14 @@ protected: @@ -480,9 +537,14 @@ protected:
480 537
481 class ProcessingStage; 538 class ProcessingStage;
482 539
483 -class BasicLoop : public QRunnable 540 +class BasicLoop : public QRunnable, public QFutureInterface<void>
484 { 541 {
485 public: 542 public:
  543 + BasicLoop()
  544 + {
  545 + this->reportStarted();
  546 + }
  547 +
486 void run(); 548 void run();
487 549
488 QList<ProcessingStage *> * stages; 550 QList<ProcessingStage *> * stages;
@@ -508,13 +570,13 @@ public: @@ -508,13 +570,13 @@ public:
508 int stage_id; 570 int stage_id;
509 571
510 virtual void reset()=0; 572 virtual void reset()=0;
511 -  
512 protected: 573 protected:
513 int thread_count; 574 int thread_count;
514 575
515 SharedBuffer * inputBuffer; 576 SharedBuffer * inputBuffer;
516 ProcessingStage * nextStage; 577 ProcessingStage * nextStage;
517 QList<ProcessingStage *> * stages; 578 QList<ProcessingStage *> * stages;
  579 + QThreadPool * threads;
518 Transform * transform; 580 Transform * transform;
519 581
520 }; 582 };
@@ -533,6 +595,7 @@ void BasicLoop::run() @@ -533,6 +595,7 @@ void BasicLoop::run()
533 current_idx++; 595 current_idx++;
534 current_idx = current_idx % stages->size(); 596 current_idx = current_idx % stages->size();
535 } 597 }
  598 + this->reportFinished();
536 } 599 }
537 600
538 class MultiThreadStage : public ProcessingStage 601 class MultiThreadStage : public ProcessingStage
@@ -567,7 +630,6 @@ public: @@ -567,7 +630,6 @@ public:
567 } 630 }
568 }; 631 };
569 632
570 -  
571 class SingleThreadStage : public ProcessingStage 633 class SingleThreadStage : public ProcessingStage
572 { 634 {
573 public: 635 public:
@@ -630,18 +692,20 @@ public: @@ -630,18 +692,20 @@ public:
630 lock.unlock(); 692 lock.unlock();
631 693
632 if (newItem) 694 if (newItem)
633 - {  
634 - BasicLoop * next = new BasicLoop();  
635 - next->stages = stages;  
636 - next->start_idx = this->stage_id;  
637 - next->startItem = newItem;  
638 -  
639 - QThreadPool::globalInstance()->start(next, stages->size() - stage_id);  
640 - } 695 + startThread(newItem);
641 696
642 return input; 697 return input;
643 } 698 }
644 699
  700 + void startThread(br::FrameData * newItem)
  701 + {
  702 + BasicLoop * next = new BasicLoop();
  703 + next->stages = stages;
  704 + next->start_idx = this->stage_id;
  705 + next->startItem = newItem;
  706 + this->threads->start(next, stages->size() - stage_id);
  707 + }
  708 +
645 709
646 // Calledfrom a different thread than run. 710 // Calledfrom a different thread than run.
647 bool tryAcquireNextStage(FrameData *& input) 711 bool tryAcquireNextStage(FrameData *& input)
@@ -677,7 +741,7 @@ public: @@ -677,7 +741,7 @@ public:
677 }; 741 };
678 742
679 // No input buffer, instead we draw templates from some data source 743 // No input buffer, instead we draw templates from some data source
680 -// Will be operated by the main thread for the stream 744 +// Will be operated by the main thread for the stream. starts threads
681 class FirstStage : public SingleThreadStage 745 class FirstStage : public SingleThreadStage
682 { 746 {
683 public: 747 public:
@@ -687,44 +751,51 @@ public: @@ -687,44 +751,51 @@ public:
687 751
688 FrameData * run(FrameData * input, bool & should_continue) 752 FrameData * run(FrameData * input, bool & should_continue)
689 { 753 {
690 - // Is there anything on our input buffer? If so we should start a thread with that. 754 + // Try to get a frame from the datasource
691 QWriteLocker lock(&statusLock); 755 QWriteLocker lock(&statusLock);
692 - input = dataSource.tryGetFrame();  
693 - // Datasource broke?  
694 - if (!input) 756 + bool last_frame = false;
  757 + input = dataSource.tryGetFrame(last_frame);
  758 +
  759 + // Datasource broke, or is currently out of frames?
  760 + if (!input || last_frame)
695 { 761 {
  762 + // We will just stop and not continue.
696 currentStatus = STOPPING; 763 currentStatus = STOPPING;
697 - should_continue = false;  
698 - return NULL; 764 + if (!input) {
  765 + should_continue = false;
  766 + return NULL;
  767 + }
699 } 768 }
700 lock.unlock(); 769 lock.unlock();
701 - 770 + // Can we enter the next stage?
702 should_continue = nextStage->tryAcquireNextStage(input); 771 should_continue = nextStage->tryAcquireNextStage(input);
703 772
704 - BasicLoop * next = new BasicLoop();  
705 - next->stages = stages;  
706 - next->start_idx = this->stage_id;  
707 - next->startItem = NULL;  
708 -  
709 - QThreadPool::globalInstance()->start(next, stages->size() - stage_id); 773 + // We are exiting leaving this stage, should we start another
  774 + // thread here? Normally we will always re-queue a thread on
  775 + // the first stage, but if we received the last frame there is
  776 + // no need to.
  777 + if (!last_frame) {
  778 + startThread(NULL);
  779 + }
710 780
711 return input; 781 return input;
712 } 782 }
713 783
714 - // Calledfrom a different thread than run. 784 + // The last stage, trying to access the first stage
715 bool tryAcquireNextStage(FrameData *& input) 785 bool tryAcquireNextStage(FrameData *& input)
716 { 786 {
  787 + // Return the frame, was it the last one?
717 bool was_last = dataSource.returnFrame(input); 788 bool was_last = dataSource.returnFrame(input);
718 input = NULL; 789 input = NULL;
  790 +
  791 + // OK we won't continue.
719 if (was_last) { 792 if (was_last) {
720 return false; 793 return false;
721 } 794 }
722 795
723 - if (!dataSource.isOpen())  
724 - return false;  
725 -  
726 QReadLocker lock(&statusLock); 796 QReadLocker lock(&statusLock);
727 - // Thread is already running, we should just return 797 + // A thread is already in the first stage,
  798 + // we should just return
728 if (currentStatus == STARTING) 799 if (currentStatus == STARTING)
729 { 800 {
730 return false; 801 return false;
@@ -747,6 +818,7 @@ public: @@ -747,6 +818,7 @@ public:
747 818
748 }; 819 };
749 820
  821 +// starts threads
750 class LastStage : public SingleThreadStage 822 class LastStage : public SingleThreadStage
751 { 823 {
752 public: 824 public:
@@ -777,11 +849,14 @@ public: @@ -777,11 +849,14 @@ public:
777 } 849 }
778 next_target = input->sequenceNumber + 1; 850 next_target = input->sequenceNumber + 1;
779 851
  852 + // add the item to our output buffer
780 collectedOutput.append(input->data); 853 collectedOutput.append(input->data);
781 854
  855 + // Can we enter the read stage?
782 should_continue = nextStage->tryAcquireNextStage(input); 856 should_continue = nextStage->tryAcquireNextStage(input);
783 857
784 - // Is there anything on our input buffer? If so we should start a thread with that. 858 + // Is there anything on our input buffer? If so we should start a thread
  859 + // in this stage to process that frame.
785 QWriteLocker lock(&statusLock); 860 QWriteLocker lock(&statusLock);
786 FrameData * newItem = inputBuffer->tryGetItem(); 861 FrameData * newItem = inputBuffer->tryGetItem();
787 if (!newItem) 862 if (!newItem)
@@ -791,23 +866,18 @@ public: @@ -791,23 +866,18 @@ public:
791 lock.unlock(); 866 lock.unlock();
792 867
793 if (newItem) 868 if (newItem)
794 - {  
795 - BasicLoop * next = new BasicLoop();  
796 - next->stages = stages;  
797 - next->start_idx = this->stage_id;  
798 - next->startItem = newItem;  
799 -  
800 - QThreadPool::globalInstance()->start(next, stages->size() - stage_id);  
801 - } 869 + startThread(newItem);
802 870
803 return input; 871 return input;
804 } 872 }
805 }; 873 };
806 874
  875 +
807 class StreamTransform : public CompositeTransform 876 class StreamTransform : public CompositeTransform
808 { 877 {
809 Q_OBJECT 878 Q_OBJECT
810 public: 879 public:
  880 +
811 void train(const TemplateList & data) 881 void train(const TemplateList & data)
812 { 882 {
813 foreach(Transform * transform, transforms) { 883 foreach(Transform * transform, transforms) {
@@ -837,23 +907,17 @@ public: @@ -837,23 +907,17 @@ public:
837 bool res = readStage->dataSource.open(dst); 907 bool res = readStage->dataSource.open(dst);
838 if (!res) return; 908 if (!res) return;
839 909
  910 + // Start the first thread in the stream.
840 readStage->currentStatus = SingleThreadStage::STARTING; 911 readStage->currentStatus = SingleThreadStage::STARTING;
  912 + readStage->startThread(NULL);
841 913
842 - BasicLoop loop;  
843 - loop.stages = &this->processingStages;  
844 - loop.start_idx = 0;  
845 - loop.startItem = NULL;  
846 - loop.setAutoDelete(false);  
847 -  
848 - // Create a thread, then allow it to start by releasing. We queue the thread  
849 - // first so that any lower priority threads that are already queued don't start  
850 - // instead of the new one.  
851 - QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id);  
852 - QThreadPool::globalInstance()->releaseThread();  
853 -  
854 - // Wait for the end. 914 + // Wait for the stream to reach the last frame available from
  915 + // the data source.
855 readStage->dataSource.waitLast(); 916 readStage->dataSource.waitLast();
856 917
  918 + // Now that there are no more incoming frames, call finalize
  919 + // on each transform in turn to collect any last templates
  920 + // they wish to issue.
857 TemplateList final_output; 921 TemplateList final_output;
858 922
859 // Push finalize through the stages 923 // Push finalize through the stages
@@ -869,7 +933,8 @@ public: @@ -869,7 +933,8 @@ public:
869 final_output.append(output_set); 933 final_output.append(output_set);
870 } 934 }
871 935
872 - // dst is set to all output received by the final stage 936 + // dst is set to all output received by the final stage, along
  937 + // with anything output via the calls to finalize.
873 dst = collectionStage->getOutput(); 938 dst = collectionStage->getOutput();
874 dst.append(final_output); 939 dst.append(final_output);
875 940
@@ -881,7 +946,8 @@ public: @@ -881,7 +946,8 @@ public:
881 virtual void finalize(TemplateList & output) 946 virtual void finalize(TemplateList & output)
882 { 947 {
883 (void) output; 948 (void) output;
884 - // Not handling this yet -cao 949 + // Nothing in particular to do here, stream calls finalize
  950 + // on all child transforms as part of projectUpdate
885 } 951 }
886 952
887 // Create and link stages 953 // Create and link stages
@@ -889,6 +955,19 @@ public: @@ -889,6 +955,19 @@ public:
889 { 955 {
890 if (transforms.isEmpty()) return; 956 if (transforms.isEmpty()) return;
891 957
  958 + // We share a thread pool across streams attached to the same
  959 + // parent tranform, retrieve or create a thread pool based
  960 + // on our parent transform.
  961 + QMutexLocker poolLock(&poolsAccess);
  962 + QHash<QObject *, QThreadPool *>::Iterator it;
  963 + if (!pools.contains(this->parent())) {
  964 + it = pools.insert(this->parent(), new QThreadPool(this));
  965 + it.value()->setMaxThreadCount(Globals->parallelism);
  966 + }
  967 + else it = pools.find(this->parent());
  968 + threads = it.value();
  969 + poolLock.unlock();
  970 +
892 stage_variance.reserve(transforms.size()); 971 stage_variance.reserve(transforms.size());
893 foreach (const br::Transform *transform, transforms) { 972 foreach (const br::Transform *transform, transforms) {
894 stage_variance.append(transform->timeVarying()); 973 stage_variance.append(transform->timeVarying());
@@ -899,6 +978,7 @@ public: @@ -899,6 +978,7 @@ public:
899 processingStages.push_back(readStage); 978 processingStages.push_back(readStage);
900 readStage->stage_id = 0; 979 readStage->stage_id = 0;
901 readStage->stages = &this->processingStages; 980 readStage->stages = &this->processingStages;
  981 + readStage->threads = this->threads;
902 982
903 int next_stage_id = 1; 983 int next_stage_id = 1;
904 984
@@ -906,9 +986,7 @@ public: @@ -906,9 +986,7 @@ public:
906 for (int i =0; i < transforms.size(); i++) 986 for (int i =0; i < transforms.size(); i++)
907 { 987 {
908 if (stage_variance[i]) 988 if (stage_variance[i])
909 - {  
910 processingStages.append(new SingleThreadStage(prev_stage_variance)); 989 processingStages.append(new SingleThreadStage(prev_stage_variance));
911 - }  
912 else 990 else
913 processingStages.append(new MultiThreadStage(Globals->parallelism)); 991 processingStages.append(new MultiThreadStage(Globals->parallelism));
914 992
@@ -919,6 +997,7 @@ public: @@ -919,6 +997,7 @@ public:
919 processingStages[i]->nextStage = processingStages[i+1]; 997 processingStages[i]->nextStage = processingStages[i+1];
920 998
921 processingStages.last()->stages = &this->processingStages; 999 processingStages.last()->stages = &this->processingStages;
  1000 + processingStages.last()->threads = this->threads;
922 1001
923 processingStages.last()->transform = transforms[i]; 1002 processingStages.last()->transform = transforms[i];
924 prev_stage_variance = stage_variance[i]; 1003 prev_stage_variance = stage_variance[i];
@@ -928,6 +1007,7 @@ public: @@ -928,6 +1007,7 @@ public:
928 processingStages.append(collectionStage); 1007 processingStages.append(collectionStage);
929 collectionStage->stage_id = next_stage_id; 1008 collectionStage->stage_id = next_stage_id;
930 collectionStage->stages = &this->processingStages; 1009 collectionStage->stages = &this->processingStages;
  1010 + collectionStage->threads = this->threads;
931 1011
932 processingStages[processingStages.size() - 2]->nextStage = collectionStage; 1012 processingStages[processingStages.size() - 2]->nextStage = collectionStage;
933 1013
@@ -950,6 +1030,10 @@ protected: @@ -950,6 +1030,10 @@ protected:
950 1030
951 QList<ProcessingStage *> processingStages; 1031 QList<ProcessingStage *> processingStages;
952 1032
  1033 + static QHash<QObject *, QThreadPool *> pools;
  1034 + static QMutex poolsAccess;
  1035 + QThreadPool * threads;
  1036 +
953 void _project(const Template &src, Template &dst) const 1037 void _project(const Template &src, Template &dst) const
954 { 1038 {
955 (void) src; (void) dst; 1039 (void) src; (void) dst;
@@ -962,6 +1046,9 @@ protected: @@ -962,6 +1046,9 @@ protected:
962 } 1046 }
963 }; 1047 };
964 1048
  1049 +QHash<QObject *, QThreadPool *> StreamTransform::pools;
  1050 +QMutex StreamTransform::poolsAccess;
  1051 +
965 BR_REGISTER(Transform, StreamTransform) 1052 BR_REGISTER(Transform, StreamTransform)
966 1053
967 1054