Commit 6188a9ccf0d310ff89768e9f3a3215d19bbbdd07

Authored by Charles Otto
1 parent ff98581b

Update threading model for stream (again)

Change the threading model used in stream slightly, generally as a given frame
is processed a single thread will attempt to continue working on it as it
progresses through the processing stages. Frames will be queued if a single
thread stage is already occupied when the thread gets there.

This update can be used with a large total number of frames being processed
without, while still maintaining relatively smooth playback by giving later
stages higher priority in the qt thread pool.
Showing 1 changed file with 211 additions and 123 deletions
openbr/plugins/stream.cpp
@@ -66,7 +66,7 @@ public: @@ -66,7 +66,7 @@ public:
66 QMap<int, FrameData *>::Iterator result = buffer.begin(); 66 QMap<int, FrameData *>::Iterator result = buffer.begin();
67 67
68 if (next_target != result.value()->sequenceNumber) { 68 if (next_target != result.value()->sequenceNumber) {
69 - qWarning("mismatched targets!"); 69 + qFatal("mismatched targets!");
70 } 70 }
71 71
72 next_target = next_target + 1; 72 next_target = next_target + 1;
@@ -160,7 +160,7 @@ private: @@ -160,7 +160,7 @@ private:
160 class DataSource 160 class DataSource
161 { 161 {
162 public: 162 public:
163 - DataSource(int maxFrames=Globals->parallelism + 1) 163 + DataSource(int maxFrames=500)
164 { 164 {
165 final_frame = -1; 165 final_frame = -1;
166 last_issued = -2; 166 last_issued = -2;
@@ -399,7 +399,19 @@ protected: @@ -399,7 +399,19 @@ protected:
399 399
400 }; 400 };
401 401
402 -class ProcessingStage : public QRunnable 402 +class ProcessingStage;
  403 +
  404 +class BasicLoop : public QRunnable
  405 +{
  406 +public:
  407 + void run();
  408 +
  409 + QList<ProcessingStage *> * stages;
  410 + int start_idx;
  411 + FrameData * startItem;
  412 +};
  413 +
  414 +class ProcessingStage
403 { 415 {
404 public: 416 public:
405 friend class StreamTransform; 417 friend class StreamTransform;
@@ -407,55 +419,68 @@ public: @@ -407,55 +419,68 @@ public:
407 ProcessingStage(int nThreads = 1) 419 ProcessingStage(int nThreads = 1)
408 { 420 {
409 thread_count = nThreads; 421 thread_count = nThreads;
410 - setAutoDelete(false);  
411 } 422 }
  423 + virtual ~ProcessingStage() {}
  424 +
  425 + virtual FrameData* run(FrameData * input, bool & should_continue)=0;
412 426
413 - virtual void run()=0; 427 + virtual bool tryAcquireNextStage(FrameData *& input)=0;
414 428
415 - virtual void nextStageRun(FrameData * input)=0; 429 + int stage_id;
416 430
417 protected: 431 protected:
418 int thread_count; 432 int thread_count;
419 433
420 SharedBuffer * inputBuffer; 434 SharedBuffer * inputBuffer;
421 ProcessingStage * nextStage; 435 ProcessingStage * nextStage;
  436 + QList<ProcessingStage *> * stages;
422 Transform * transform; 437 Transform * transform;
423 - int stage_id;  
424 438
425 }; 439 };
426 440
427 -class MultiThreadStage;  
428 -  
429 -void multistage_run(MultiThreadStage * basis, FrameData * input); 441 +void BasicLoop::run()
  442 +{
  443 + int current_idx = start_idx;
  444 + FrameData * target_item = startItem;
  445 + bool should_continue = true;
  446 + forever
  447 + {
  448 + target_item = stages->at(current_idx)->run(target_item, should_continue);
  449 + if (!should_continue) {
  450 + break;
  451 + }
  452 + current_idx++;
  453 + current_idx = current_idx % stages->size();
  454 + }
  455 +}
430 456
431 class MultiThreadStage : public ProcessingStage 457 class MultiThreadStage : public ProcessingStage
432 { 458 {
433 public: 459 public:
434 MultiThreadStage(int _input) : ProcessingStage(_input) {} 460 MultiThreadStage(int _input) : ProcessingStage(_input) {}
435 461
436 - friend void multistage_run(MultiThreadStage * basis, FrameData * input);  
437 462
438 - void run() 463 + FrameData * run(FrameData * input, bool & should_continue)
439 { 464 {
440 - qFatal("no don't do it!"); 465 + if (input == NULL) {
  466 + qFatal("null input to multi-thread stage");
  467 + }
  468 + // Project the input we got
  469 + transform->projectUpdate(input->data);
  470 +
  471 + should_continue = nextStage->tryAcquireNextStage(input);
  472 +
  473 + return input;
441 } 474 }
442 475
443 // Called from a different thread than run 476 // Called from a different thread than run
444 - virtual void nextStageRun(FrameData * input) 477 + virtual bool tryAcquireNextStage(FrameData *& input)
445 { 478 {
446 - QtConcurrent::run(multistage_run, this, input); 479 + (void) input;
  480 + return true;
447 } 481 }
448 }; 482 };
449 483
450 -void multistage_run(MultiThreadStage * basis, FrameData * input)  
451 -{  
452 - if (input == NULL)  
453 - qFatal("null input to multi-thread stage");  
454 - // Project the input we got  
455 - basis->transform->projectUpdate(input->data);  
456 -  
457 - basis->nextStage->nextStageRun(input);  
458 -}  
459 484
460 class SingleThreadStage : public ProcessingStage 485 class SingleThreadStage : public ProcessingStage
461 { 486 {
@@ -464,10 +489,12 @@ public: @@ -464,10 +489,12 @@ public:
464 { 489 {
465 currentStatus = STOPPING; 490 currentStatus = STOPPING;
466 next_target = 0; 491 next_target = 0;
467 - if (input_variance) 492 + if (input_variance) {
468 this->inputBuffer = new DoubleBuffer(); 493 this->inputBuffer = new DoubleBuffer();
469 - else 494 + }
  495 + else {
470 this->inputBuffer = new SequencingBuffer(); 496 this->inputBuffer = new SequencingBuffer();
  497 + }
471 } 498 }
472 ~SingleThreadStage() 499 ~SingleThreadStage()
473 { 500 {
@@ -483,52 +510,75 @@ public: @@ -483,52 +510,75 @@ public:
483 QReadWriteLock statusLock; 510 QReadWriteLock statusLock;
484 Status currentStatus; 511 Status currentStatus;
485 512
486 - // We should start, and enter a wait on input data  
487 - void run() 513 + FrameData * run(FrameData * input, bool & should_continue)
488 { 514 {
489 - FrameData * currentItem;  
490 - forever 515 + if (input == NULL)
  516 + qFatal("NULL input to stage %d", this->stage_id);
  517 +
  518 + if (input->sequenceNumber != next_target)
491 { 519 {
492 - // Whether or not we get a valid item controls whether or not we  
493 - QWriteLocker lock(&statusLock);  
494 - currentItem = inputBuffer->tryGetItem();  
495 - if (currentItem == NULL)  
496 - {  
497 - this->currentStatus = STOPPING;  
498 - return;  
499 - }  
500 - lock.unlock();  
501 - if (currentItem->sequenceNumber != next_target)  
502 - {  
503 - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
504 - }  
505 - next_target = currentItem->sequenceNumber + 1; 520 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target);
  521 + }
  522 + next_target = input->sequenceNumber + 1;
  523 +
  524 + // Project the input we got
  525 + transform->projectUpdate(input->data);
  526 +
  527 + should_continue = nextStage->tryAcquireNextStage(input);
  528 +
  529 + // Is there anything on our input buffer? If so we should start a thread with that.
  530 + QWriteLocker lock(&statusLock);
  531 + FrameData * newItem = inputBuffer->tryGetItem();
  532 + if (!newItem)
  533 + {
  534 + this->currentStatus = STOPPING;
  535 + }
  536 + lock.unlock();
506 537
507 - // Project the input we got  
508 - transform->projectUpdate(currentItem->data); 538 + if (newItem)
  539 + {
  540 + BasicLoop * next = new BasicLoop();
  541 + next->stages = stages;
  542 + next->start_idx = this->stage_id;
  543 + next->startItem = newItem;
509 544
510 - this->nextStage->nextStageRun(currentItem); 545 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
511 } 546 }
  547 +
  548 + return input;
512 } 549 }
513 550
  551 +
514 // Calledfrom a different thread than run. 552 // Calledfrom a different thread than run.
515 - void nextStageRun(FrameData * input) 553 + bool tryAcquireNextStage(FrameData *& input)
516 { 554 {
517 - // add to our input buffer  
518 inputBuffer->addItem(input); 555 inputBuffer->addItem(input);
  556 +
519 QReadLocker lock(&statusLock); 557 QReadLocker lock(&statusLock);
  558 + // Thread is already running, we should just return
520 if (currentStatus == STARTING) 559 if (currentStatus == STARTING)
521 - return;  
522 - 560 + {
  561 + return false;
  562 + }
523 // Have to change to a write lock to modify currentStatus 563 // Have to change to a write lock to modify currentStatus
524 lock.unlock(); 564 lock.unlock();
  565 +
525 QWriteLocker writeLock(&statusLock); 566 QWriteLocker writeLock(&statusLock);
526 - // But someone might have changed it between locks 567 + // But someone else might have started a thread in the meantime
527 if (currentStatus == STARTING) 568 if (currentStatus == STARTING)
528 - return;  
529 - // Ok we can start a thread  
530 - QThreadPool::globalInstance()->start(this); 569 + {
  570 + return false;
  571 + }
  572 + // Ok we might start a thread, as long as we can get something back
  573 + // from the input buffer
  574 + input = inputBuffer->tryGetItem();
  575 +
  576 + if (!input)
  577 + return false;
  578 +
531 currentStatus = STARTING; 579 currentStatus = STARTING;
  580 +
  581 + return true;
532 } 582 }
533 }; 583 };
534 584
@@ -540,47 +590,63 @@ public: @@ -540,47 +590,63 @@ public:
540 FirstStage() : SingleThreadStage(true) {} 590 FirstStage() : SingleThreadStage(true) {}
541 591
542 DataSourceManager dataSource; 592 DataSourceManager dataSource;
543 - // Start drawing frames from the datasource.  
544 - void run() 593 +
  594 + FrameData * run(FrameData * input, bool & should_continue)
545 { 595 {
546 - FrameData * currentItem;  
547 - forever 596 + if (input != NULL) {
  597 + dataSource.returnFrame(input);
  598 + }
  599 +
  600 + // Is there anything on our input buffer? If so we should start a thread with that.
  601 + QWriteLocker lock(&statusLock);
  602 + input = dataSource.tryGetFrame();
  603 + // Datasource broke?
  604 + if (!input)
548 { 605 {
549 - // Whether or not we get a valid item controls whether or not we  
550 - QWriteLocker lock(&statusLock); 606 + currentStatus = STOPPING;
  607 + should_continue = false;
  608 + return NULL;
  609 + }
  610 + lock.unlock();
551 611
552 - currentItem = this->dataSource.tryGetFrame();  
553 - if (currentItem == NULL)  
554 - {  
555 - this->currentStatus = STOPPING;  
556 - return;  
557 - }  
558 - lock.unlock();  
559 - if (currentItem->sequenceNumber != next_target)  
560 - {  
561 - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
562 - }  
563 - next_target = currentItem->sequenceNumber + 1; 612 + should_continue = nextStage->tryAcquireNextStage(input);
564 613
565 - this->nextStage->nextStageRun(currentItem);  
566 - } 614 + BasicLoop * next = new BasicLoop();
  615 + next->stages = stages;
  616 + next->start_idx = this->stage_id;
  617 + next->startItem = NULL;
  618 +
  619 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
  620 +
  621 + return input;
567 } 622 }
568 623
569 - void nextStageRun(FrameData * input) 624 + // Calledfrom a different thread than run.
  625 + bool tryAcquireNextStage(FrameData *& input)
570 { 626 {
571 - QWriteLocker lock(&statusLock);  
572 -  
573 - // Return the frame to the frame buffer  
574 - bool res = dataSource.returnFrame(input);  
575 - // If the data source broke already, we're done.  
576 - if (res)  
577 - return; 627 + dataSource.returnFrame(input);
  628 + input = NULL;
578 629
  630 + QReadLocker lock(&statusLock);
  631 + // Thread is already running, we should just return
579 if (currentStatus == STARTING) 632 if (currentStatus == STARTING)
580 - return; 633 + {
  634 + return false;
  635 + }
  636 + // Have to change to a write lock to modify currentStatus
  637 + lock.unlock();
581 638
  639 + QWriteLocker writeLock(&statusLock);
  640 + // But someone else might have started a thread in the meantime
  641 + if (currentStatus == STARTING)
  642 + {
  643 + return false;
  644 + }
  645 + // Ok we'll start a thread
582 currentStatus = STARTING; 646 currentStatus = STARTING;
583 - QThreadPool::globalInstance()->start(this, this->next_target); 647 +
  648 + // We always start a readstage thread with null input, so nothing to do here
  649 + return true;
584 } 650 }
585 651
586 }; 652 };
@@ -597,29 +663,42 @@ public: @@ -597,29 +663,42 @@ public:
597 private: 663 private:
598 TemplateList collectedOutput; 664 TemplateList collectedOutput;
599 public: 665 public:
600 - void run() 666 + FrameData * run(FrameData * input, bool & should_continue)
601 { 667 {
602 - forever 668 + if (input == NULL) {
  669 + qFatal("NULL input to stage %d", this->stage_id);
  670 + }
  671 +
  672 + if (input->sequenceNumber != next_target)
603 { 673 {
604 - QWriteLocker lock(&statusLock);  
605 - FrameData * currentItem = inputBuffer->tryGetItem();  
606 - if (currentItem == NULL)  
607 - {  
608 - currentStatus = STOPPING;  
609 - break;  
610 - }  
611 - lock.unlock(); 674 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target);
  675 + }
  676 + next_target = input->sequenceNumber + 1;
612 677
613 - if (currentItem->sequenceNumber != next_target)  
614 - {  
615 - qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);  
616 - }  
617 - next_target = currentItem->sequenceNumber + 1; 678 + collectedOutput.append(input->data);
618 679
619 - // Just put the item on collectedOutput  
620 - collectedOutput.append(currentItem->data);  
621 - this->nextStage->nextStageRun(currentItem); 680 + should_continue = nextStage->tryAcquireNextStage(input);
  681 +
  682 + // Is there anything on our input buffer? If so we should start a thread with that.
  683 + QWriteLocker lock(&statusLock);
  684 + FrameData * newItem = inputBuffer->tryGetItem();
  685 + if (!newItem)
  686 + {
  687 + this->currentStatus = STOPPING;
622 } 688 }
  689 + lock.unlock();
  690 +
  691 + if (newItem)
  692 + {
  693 + BasicLoop * next = new BasicLoop();
  694 + next->stages = stages;
  695 + next->start_idx = this->stage_id;
  696 + next->startItem = newItem;
  697 +
  698 + QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id);
  699 + }
  700 +
  701 + return input;
623 } 702 }
624 }; 703 };
625 704
@@ -660,17 +739,22 @@ public: @@ -660,17 +739,22 @@ public:
660 qFatal("Expected single template input to stream"); 739 qFatal("Expected single template input to stream");
661 740
662 dst = src; 741 dst = src;
663 - bool res = readStage.dataSource.open(dst[0]);  
664 - if (!res) {  
665 - qWarning("failed to stream template %s", qPrintable(dst[0].file.name));  
666 - return;  
667 - } 742 + bool res = readStage->dataSource.open(dst[0]);
  743 + if (!res) return;
668 744
669 QThreadPool::globalInstance()->releaseThread(); 745 QThreadPool::globalInstance()->releaseThread();
670 - readStage.currentStatus = SingleThreadStage::STARTING;  
671 - QThreadPool::globalInstance()->start(&readStage, 0); 746 + readStage->currentStatus = SingleThreadStage::STARTING;
  747 +
  748 + BasicLoop loop;
  749 + loop.stages = &this->processingStages;
  750 + loop.start_idx = 0;
  751 + loop.startItem = NULL;
  752 + loop.setAutoDelete(false);
  753 +
  754 + QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id);
  755 +
672 // Wait for the end. 756 // Wait for the end.
673 - readStage.dataSource.waitLast(); 757 + readStage->dataSource.waitLast();
674 QThreadPool::globalInstance()->reserveThread(); 758 QThreadPool::globalInstance()->reserveThread();
675 759
676 // dst is set to all output received by the final stage 760 // dst is set to all output received by the final stage
@@ -693,10 +777,14 @@ public: @@ -693,10 +777,14 @@ public:
693 stage_variance.append(transform->timeVarying()); 777 stage_variance.append(transform->timeVarying());
694 } 778 }
695 779
696 - readStage.stage_id = 0; 780 + readStage = new FirstStage();
  781 +
  782 + processingStages.push_back(readStage);
  783 + readStage->stage_id = 0;
  784 + readStage->stages = &this->processingStages;
697 785
698 int next_stage_id = 1; 786 int next_stage_id = 1;
699 - int lastBufferIdx = 0; 787 +
700 bool prev_stage_variance = true; 788 bool prev_stage_variance = true;
701 for (int i =0; i < transforms.size(); i++) 789 for (int i =0; i < transforms.size(); i++)
702 { 790 {
@@ -709,24 +797,25 @@ public: @@ -709,24 +797,25 @@ public:
709 797
710 processingStages.last()->stage_id = next_stage_id++; 798 processingStages.last()->stage_id = next_stage_id++;
711 799
712 - // link nextStage pointers  
713 - if (i == 0)  
714 - this->readStage.nextStage = processingStages[i];  
715 - else  
716 - processingStages[i-1]->nextStage = processingStages[i]; 800 + // link nextStage pointers, the stage we just appeneded is i+1 since
  801 + // the read stage was added before this loop
  802 + processingStages[i]->nextStage = processingStages[i+1];
717 803
718 - lastBufferIdx++; 804 + processingStages.last()->stages = &this->processingStages;
719 805
720 processingStages.last()->transform = transforms[i]; 806 processingStages.last()->transform = transforms[i];
721 prev_stage_variance = stage_variance[i]; 807 prev_stage_variance = stage_variance[i];
722 } 808 }
723 809
724 collectionStage = new LastStage(prev_stage_variance); 810 collectionStage = new LastStage(prev_stage_variance);
  811 + processingStages.append(collectionStage);
725 collectionStage->stage_id = next_stage_id; 812 collectionStage->stage_id = next_stage_id;
  813 + collectionStage->stages = &this->processingStages;
  814 +
  815 + processingStages[processingStages.size() - 2]->nextStage = collectionStage;
726 816
727 // It's a ring buffer, get it? 817 // It's a ring buffer, get it?
728 - processingStages.last()->nextStage = collectionStage;  
729 - collectionStage->nextStage = &readStage; 818 + collectionStage->nextStage = readStage;
730 } 819 }
731 820
732 ~StreamTransform() 821 ~StreamTransform()
@@ -734,13 +823,12 @@ public: @@ -734,13 +823,12 @@ public:
734 for (int i = 0; i < processingStages.size(); i++) { 823 for (int i = 0; i < processingStages.size(); i++) {
735 delete processingStages[i]; 824 delete processingStages[i];
736 } 825 }
737 - delete collectionStage;  
738 } 826 }
739 827
740 protected: 828 protected:
741 QList<bool> stage_variance; 829 QList<bool> stage_variance;
742 830
743 - FirstStage readStage; 831 + FirstStage * readStage;
744 LastStage * collectionStage; 832 LastStage * collectionStage;
745 833
746 QList<ProcessingStage *> processingStages; 834 QList<ProcessingStage *> processingStages;