diff --git a/openbr/plugins/process.cpp b/openbr/plugins/process.cpp index 7044068..98da122 100644 --- a/openbr/plugins/process.cpp +++ b/openbr/plugins/process.cpp @@ -21,17 +21,18 @@ class CommunicationManager : public QObject { Q_OBJECT public: + int timeout_ms; QThread *basis; CommunicationManager() { + timeout_ms = 30000; + basis = new QThread; - basis->moveToThread(QCoreApplication::instance()->thread()); moveToThread(basis); server.moveToThread(basis); outbound.moveToThread(basis); // signals for our sever - connect(this, SIGNAL(pulseEndThread()), basis, SLOT(quit() )); connect(&server, SIGNAL(newConnection()), this, SLOT(receivedConnection() )); connect(this, SIGNAL(pulseStartServer(QString)), this, SLOT(startServerInternal(QString)), Qt::BlockingQueuedConnection); connect(this, SIGNAL(pulseOutboundConnect(QString)), this, SLOT(startConnectInternal(QString) ), Qt::BlockingQueuedConnection); @@ -62,7 +63,6 @@ public: void shutDownThread() { - emit pulseEndThread(); basis->quit(); basis->wait(); delete basis; @@ -144,23 +144,34 @@ public slots: void sendSignalInternal() { - SignalType signal= sendType; + SignalType signal = sendType; qint64 signal_wrote = outbound.write((char *) &signal, sizeof(signal)); + if (signal_wrote != sizeof(signal)) qDebug() << key << " inconsistent signal size"; - bool res = outbound.waitForBytesWritten(-1); - if (!res) - qDebug() << key << " failed to wait for bytes written in signal size"; + while (outbound.bytesToWrite() > 0) { + bool written = outbound.waitForBytesWritten(timeout_ms); + + if (!written && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) { + qDebug() << key << " failed to wait for bytes written in signal size"; + return; + } + } } void readSignalInternal() { while (inbound->bytesAvailable() < qint64(sizeof(readSignal)) ) { - bool size_ready = inbound->waitForReadyRead(-1); - if (!size_ready) - { - qDebug("Failed to received object size in signal!"); + bool size_ready = inbound->waitForReadyRead(timeout_ms); + + // wait timed out, now what? + if (!size_ready && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) { + readSignal = SHOULD_END; + server.close(); + outbound.abort(); + inbound->abort(); + return; } } @@ -184,11 +195,14 @@ public slots: qDebug() << key << "inconsistent size sent in send data!"; return; } - bool res = outbound.waitForBytesWritten(-1); + + while (outbound.bytesToWrite() > 0) { + bool written = outbound.waitForBytesWritten(timeout_ms); - if (!res) { - qDebug() << key << " wait for bytes failed!"; - return; + if (!written && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) { + qDebug() << key << " wait for bytes failed!"; + return; + } } qint64 data_wrote = outbound.write(writeArray.data(), serializedSize); @@ -196,8 +210,8 @@ public slots: qDebug() << key << " inconsistent data written!"; while (outbound.bytesToWrite() > 0) { - bool write_res = outbound.waitForBytesWritten(-1); - if (!write_res) { + bool write_res = outbound.waitForBytesWritten(); + if (!write_res && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) { qDebug() << key << " wait for bytes failed!"; return; } @@ -211,14 +225,14 @@ public slots: { qint64 bufferSize; while (inbound->bytesAvailable() < qint64(sizeof(bufferSize))) { - bool size_ready = inbound->waitForReadyRead(-1); - if (!size_ready) - { - qDebug() << key << " Failed to received object size in read data!"; - qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString(); - return; + bool size_ready = inbound->waitForReadyRead(timeout_ms); + if (!size_ready && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) { + qDebug() << key << " Failed to received object size in read data!"; + qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString(); + return; } } + qint64 sizeBytesRead = inbound->read((char *) &bufferSize, sizeof(bufferSize)); if (sizeBytesRead != sizeof(bufferSize)) { qDebug("failed to read size of buffer!"); @@ -231,10 +245,10 @@ public slots: // read the data, we may get it in serveral bursts qint64 arrayPosition = 0; while (arrayPosition < bufferSize) { - if (!inbound->bytesAvailable()) { - bool ready_res = inbound->waitForReadyRead(-1); + while (!inbound->bytesAvailable()) { + bool ready_res = inbound->waitForReadyRead(timeout_ms); - if (!ready_res) { + if (!ready_res && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) { qDebug() << key << "failed to wait for data!"; return; } @@ -251,7 +265,6 @@ public slots: } if (arrayPosition != bufferSize) qDebug() << key << "Read wrong size object!"; - } void shutdownInternal() @@ -276,8 +289,6 @@ signals: void pulseSendSerialized(); void pulseShutdown(); void pulseOutboundConnect(QString serverName); - void pulseEndThread(); - public: QByteArray readArray; @@ -417,8 +428,7 @@ public: { QString sign = "worker " + name; CommunicationManager::SignalType signal; - forever - { + forever { signal= comm->getSignal(); if (signal == CommunicationManager::SHOULD_END) { @@ -448,28 +458,39 @@ class ProcessInterface : public QObject { Q_OBJECT public: + QThread *basis; + + ~ProcessInterface() + { + basis->quit(); + basis->wait(); + delete basis; + } + ProcessInterface() { - this->moveToThread(QCoreApplication::instance()->thread()); - workerProcess.moveToThread(QCoreApplication::instance()->thread()); + basis = new QThread(); + + moveToThread(basis); + workerProcess.moveToThread(basis); + connect(this, SIGNAL(pulseEnd()), this, SLOT(endProcessInternal()), Qt::BlockingQueuedConnection); connect(this, SIGNAL(pulseStart(QStringList)), this, SLOT(startProcessInternal(QStringList)), Qt::BlockingQueuedConnection); + + basis->start(); } + QProcess workerProcess; void endProcess() { - if (QThread::currentThread() != QCoreApplication::instance()->thread()) - emit pulseEnd(); - else - endProcessInternal(); + emit pulseEnd(); } + void startProcess(QStringList arguments) { - if (QThread::currentThread() != QCoreApplication::instance()->thread() ) - emit pulseStart(arguments); - else - startProcessInternal(arguments); + emit pulseStart(arguments); } + signals: void pulseEnd(); void pulseStart(QStringList); @@ -500,10 +521,12 @@ struct ProcessData ~ProcessData() { - comm.sendSignal(CommunicationManager::SHOULD_END); - proc.endProcess(); - comm.shutdown(); - comm.shutDownThread(); + if (initialized) { + comm.sendSignal(CommunicationManager::SHOULD_END); + proc.endProcess(); + comm.shutdown(); + comm.shutDownThread(); + } } }; @@ -541,9 +564,10 @@ class ProcessWrapperTransform : public WrapperTransform activateProcess(data); CommunicationManager *localComm = &(data->comm); - localComm->sendSignal(CommunicationManager::INPUT_AVAILABLE); + localComm->sendSignal(CommunicationManager::INPUT_AVAILABLE); localComm->sendData(src); + localComm->readData(dst); processes.release(data); }