Commit 5d5bca6a394916c65d1fa386a5b0b44d4d20f76b

Authored by Charles Otto
1 parent 8057fb57

Rearrange the release/reserve threads in stream::projectUpdate

In stream projectUpdate, queue the intial worker thread before calling
releaseThread, and waiting. Also, call reserveThread before the last worker
thread ends.

The goal here is to avoid cases where the threadCount is increased, before the
extra thread is used up by the current stream, as part of a scheme to avoid
deadlocks related to using stream within an active distribute transform.
Showing 1 changed file with 13 additions and 5 deletions
openbr/plugins/stream.cpp
@@ -226,6 +226,12 @@ public: @@ -226,6 +226,12 @@ public:
226 last_received = inputFrame->sequenceNumber; 226 last_received = inputFrame->sequenceNumber;
227 227
228 if (inputFrame->sequenceNumber == final_frame) { 228 if (inputFrame->sequenceNumber == final_frame) {
  229 + // This is the reserveThread that matches the releaseThread in
  230 + // Stream::projectUpdate, we do it here to prevent a gap where the
  231 + // thread count is still increased, but a stream worker thread is not
  232 + // running, which might allow something to start that shouldn't
  233 + // start yet.
  234 + QThreadPool::globalInstance()->reserveThread();
229 // We just received the last frame, better pulse 235 // We just received the last frame, better pulse
230 lastReturned.wakeAll(); 236 lastReturned.wakeAll();
231 rval = true; 237 rval = true;
@@ -630,7 +636,7 @@ public: @@ -630,7 +636,7 @@ public:
630 next->start_idx = this->stage_id; 636 next->start_idx = this->stage_id;
631 next->startItem = newItem; 637 next->startItem = newItem;
632 638
633 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); 639 + QThreadPool::globalInstance()->start(next, stages->size() - stage_id);
634 } 640 }
635 641
636 return input; 642 return input;
@@ -700,7 +706,7 @@ public: @@ -700,7 +706,7 @@ public:
700 next->start_idx = this->stage_id; 706 next->start_idx = this->stage_id;
701 next->startItem = NULL; 707 next->startItem = NULL;
702 708
703 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); 709 + QThreadPool::globalInstance()->start(next, stages->size() - stage_id);
704 710
705 return input; 711 return input;
706 } 712 }
@@ -791,7 +797,7 @@ public: @@ -791,7 +797,7 @@ public:
791 next->start_idx = this->stage_id; 797 next->start_idx = this->stage_id;
792 next->startItem = newItem; 798 next->startItem = newItem;
793 799
794 - QThreadPool::globalInstance()->start(next, stages->size() - this->stage_id); 800 + QThreadPool::globalInstance()->start(next, stages->size() - stage_id);
795 } 801 }
796 802
797 return input; 803 return input;
@@ -831,7 +837,6 @@ public: @@ -831,7 +837,6 @@ public:
831 bool res = readStage->dataSource.open(dst); 837 bool res = readStage->dataSource.open(dst);
832 if (!res) return; 838 if (!res) return;
833 839
834 - QThreadPool::globalInstance()->releaseThread();  
835 readStage->currentStatus = SingleThreadStage::STARTING; 840 readStage->currentStatus = SingleThreadStage::STARTING;
836 841
837 BasicLoop loop; 842 BasicLoop loop;
@@ -840,11 +845,14 @@ public: @@ -840,11 +845,14 @@ public:
840 loop.startItem = NULL; 845 loop.startItem = NULL;
841 loop.setAutoDelete(false); 846 loop.setAutoDelete(false);
842 847
  848 + // Create a thread, then allow it to start by releasing. We queue the thread
  849 + // first so that any lower priority threads that are already queued don't start
  850 + // instead of the new one.
843 QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id); 851 QThreadPool::globalInstance()->start(&loop, processingStages.size() - processingStages[0]->stage_id);
  852 + QThreadPool::globalInstance()->releaseThread();
844 853
845 // Wait for the end. 854 // Wait for the end.
846 readStage->dataSource.waitLast(); 855 readStage->dataSource.waitLast();
847 - QThreadPool::globalInstance()->reserveThread();  
848 856
849 TemplateList final_output; 857 TemplateList final_output;
850 858