diff --git a/openbr/plugins/process.cpp b/openbr/plugins/process.cpp index 548fc11..eae69fa 100644 --- a/openbr/plugins/process.cpp +++ b/openbr/plugins/process.cpp @@ -349,6 +349,7 @@ public: QByteArray data = readArray; QDataStream deserializer(data); Transform *res = Transform::deserialize(deserializer); + sendSignal(SignalType::OUTPUT_AVAILABLE); return res; } @@ -534,6 +535,8 @@ struct ProcessData class ProcessWrapperTransform : public WrapperTransform { Q_OBJECT + Q_PROPERTY(int concurrentCount READ get_concurrentCount WRITE set_concurrentCount RESET reset_concurrentCount STORED false) + BR_PROPERTY(int, concurrentCount, 2) QString baseKey; @@ -579,17 +582,33 @@ class ProcessWrapperTransform : public WrapperTransform if (transform) { QDataStream out(&serialized, QFile::WriteOnly); transform->serialize(out); + tcount = Globals->parallelism; + counter.acquire(counter.available()); + counter.release(this->concurrentCount); } } - QByteArray serialized; + static QSemaphore counter; + mutable int tcount; + mutable QByteArray serialized; void transmitTForm(CommunicationManager *localComm) const { if (serialized.isEmpty() ) qFatal("Trying to transmit empty transform!"); + counter.acquire(1); + static QMutex transmission; + QMutexLocker lock(&transmission); + tcount--; + localComm->writeArray = serialized; + if (tcount == 0) + serialized.clear(); + lock.unlock(); + emit localComm->pulseSendSerialized(); + localComm->getSignal(); + counter.release(1); } void activateProcess(ProcessData *data) const @@ -633,6 +652,7 @@ public: bool processActive; ProcessWrapperTransform() : WrapperTransform(false) { processActive = false; } }; +QSemaphore ProcessWrapperTransform::counter; BR_REGISTER(Transform, ProcessWrapperTransform)