Commit 50752fe9dd1d1dd0008d0bb3ccc92ab9414fe3c2

Authored by Charles Otto
1 parent 5d5bca6a

Avoid a deadlock related to using stream inside distribute transforms

Start the individual project jobs in distribute as qrunnables rather than
through qtconcurrent::run. This is necessary since run does not allow us to
assign a priority to a task. Run the distribute tasks with a lower priority
than stream tasks.
This prevents a deadlock where a thread in stream::projectUpdate would try to
start a job and wait for it to finish, but in the meantime a distribute job
would use up the thread the stream thread released. Due to the use of
TimeVaryingTransformWrapper in distribute, stream transforms are a limited
resource, and the previously discussed situation would cause a deadlock where
stream transforms (in stream::projectUpdate) were waiting for their job to
finish, but the job never started because distribute transform threads were in
resource waiting for stream transforms to become available.

The current implementation has some important drawbacks. Not using
qtconcurrent::run means we cannot use the QFutureSynchronizer API to wait for
threads to finish. I am (temporarily) starting FakeMain in (in br) on a
local thread pool, and in distribute waiting for the complete global thread
pool to become available (this would be a self-blocking wait without starting
FakeMain in a different thread pool).

This is quite restrictive, we cannot currently ever nest distribute transforms
for example.

Also, using priority in this way is not general, nesting a distribute transform
inside a stream is infeasible under this scheme (well it already wouldn't work
due to the previously discussed problem, but this issue is a little more
persistent).
app/br/br.cpp
@@ -235,8 +235,10 @@ int main(int argc, char *argv[]) @@ -235,8 +235,10 @@ int main(int argc, char *argv[])
235 { 235 {
236 br_initialize(argc, argv); 236 br_initialize(argc, argv);
237 237
  238 + QThreadPool separate;
  239 + separate.setMaxThreadCount(1);
238 FakeMain *fakeMain = new FakeMain(argc, argv); 240 FakeMain *fakeMain = new FakeMain(argc, argv);
239 - QThreadPool::globalInstance()->start(fakeMain); 241 + separate.start(fakeMain);
240 QCoreApplication::exec(); 242 QCoreApplication::exec();
241 243
242 br_finalize(); 244 br_finalize();
openbr/plugins/meta.cpp
@@ -597,6 +597,26 @@ static void _projectList(const Transform *transform, const TemplateList *src, Te @@ -597,6 +597,26 @@ static void _projectList(const Transform *transform, const TemplateList *src, Te
597 transform->project(*src, *dst); 597 transform->project(*src, *dst);
598 } 598 }
599 599
  600 +class ProjectListJob : public QRunnable
  601 +{
  602 +public:
  603 + ProjectListJob(Transform * _transform, const TemplateList * _src, TemplateList * _dst)
  604 + {
  605 + transform = _transform;
  606 + src = _src;
  607 + dst = _dst;
  608 + this->setAutoDelete(true);
  609 + }
  610 +
  611 + Transform * transform;
  612 + const TemplateList * src;
  613 + TemplateList * dst;
  614 + void run()
  615 + {
  616 + _projectList(transform, src, dst);
  617 + }
  618 +};
  619 +
600 class DistributeTemplateTransform : public MetaTransform 620 class DistributeTemplateTransform : public MetaTransform
601 { 621 {
602 Q_OBJECT 622 Q_OBJECT
@@ -650,13 +670,17 @@ public: @@ -650,13 +670,17 @@ public:
650 output_buffer.append(TemplateList()); 670 output_buffer.append(TemplateList());
651 } 671 }
652 672
653 - QFutureSynchronizer<void> futures;  
654 for (int i=0; i<src.size(); i++) { 673 for (int i=0; i<src.size(); i++) {
655 input_buffer[i].append(src[i]); 674 input_buffer[i].append(src[i]);
656 - if (Globals->parallelism) futures.addFuture(QtConcurrent::run(_projectList, transform, &input_buffer[i], &output_buffer[i]));  
657 - else _projectList( transform, &input_buffer[i], &output_buffer[i]); 675 +
  676 + if (Globals->parallelism)
  677 + QThreadPool::globalInstance()->start(new ProjectListJob(transform, &input_buffer[i], &output_buffer[i]), 0);
  678 + else _projectList(transform, &input_buffer[i], &output_buffer[i]);
658 } 679 }
659 - futures.waitForFinished(); 680 +
  681 + bool wait_res = QThreadPool::globalInstance()->waitForDone();
  682 + if (!wait_res)
  683 + qDebug("global thread pool wait failed!");
660 684
661 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]); 685 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]);
662 } 686 }