diff --git a/openbr/openbr.cpp b/openbr/openbr.cpp index 008a4b1..0a0ccc7 100644 --- a/openbr/openbr.cpp +++ b/openbr/openbr.cpp @@ -283,8 +283,9 @@ const char *br_version() void br_slave_process(const char * baseName) { - WorkerProcess worker; - worker.transform = Globals->algorithm; - worker.baseName = baseName; - worker.mainLoop(); + WorkerProcess * worker = new WorkerProcess; + worker->transform = Globals->algorithm; + worker->baseName = baseName; + worker->mainLoop(); + delete worker; } diff --git a/openbr/openbr_plugin.cpp b/openbr/openbr_plugin.cpp index a504642..5ae2006 100644 --- a/openbr/openbr_plugin.cpp +++ b/openbr/openbr_plugin.cpp @@ -42,6 +42,8 @@ using namespace br; using namespace cv; +Q_DECLARE_METATYPE(QLocalSocket::LocalSocketState) + // Some globals used to transfer data to Context::messageHandler so that // we can restart the process if we try and fail to create a QApplication. static bool creating_qapp = false; @@ -938,7 +940,7 @@ void br::Context::initialize(int &argc, char *argv[], QString sdkPath, bool use_ qRegisterMetaType< QList >(); qRegisterMetaType< QList >(); qRegisterMetaType< QAbstractSocket::SocketState> (); - + qRegisterMetaType< QLocalSocket::LocalSocketState> (); Globals = new Context(); Globals->init(File()); diff --git a/openbr/plugins/process.cpp b/openbr/plugins/process.cpp index af9c63c..aefc82e 100644 --- a/openbr/plugins/process.cpp +++ b/openbr/plugins/process.cpp @@ -1,12 +1,13 @@ + #include +#include #include #include +#include #include #include - -#include -#include +#include #include "openbr_internal.h" #include "openbr/core/opencvutils.h" @@ -16,93 +17,375 @@ using namespace cv; namespace br { -enum SignalType +class CommunicationManager : public QObject { - INPUT_AVAILABLE, - OUTPUT_AVAILABLE, - SHOULD_END + Q_OBJECT +public: + CommunicationManager() + { + moveToThread(QCoreApplication::instance()->thread()); + server.moveToThread(QCoreApplication::instance()->thread() ); + outbound.moveToThread(QCoreApplication::instance()->thread() ); + + // signals for our sever + connect(&server, SIGNAL(newConnection()), this, SLOT(receivedConnection() )); + connect(this, SIGNAL(pulseStartServer(QString)), this, SLOT(startServerInternal(QString)), Qt::BlockingQueuedConnection); + + // internals, cause work to be done by the main thread because reasons. + connect(this, SIGNAL(pulseSignal()), this, SLOT(sendSignalInternal()), Qt::BlockingQueuedConnection); + connect(this, SIGNAL(pulseReadSignal() ), this, SLOT(readSignalInternal()), Qt::BlockingQueuedConnection); + connect(this, SIGNAL(pulseReadSerialized() ), this, SLOT(readSerializedInternal()), Qt::BlockingQueuedConnection); + connect(this, SIGNAL(pulseSendSerialized() ), this, SLOT(sendSerializedInternal() ), Qt::BlockingQueuedConnection); + connect(this, SIGNAL(pulseShutdown() ), this, SLOT(shutdownInternal() ), Qt::BlockingQueuedConnection); + + // signals for our outbound connection + connect(&outbound, SIGNAL(connected() ), this, SLOT(outboundConnected() )); + connect(&outbound, SIGNAL(disconnected() ), this, SLOT(outboundDisconnected() )); + + connect(&outbound, SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(outboundConnectionError(QLocalSocket::LocalSocketError) ) ); + connect(&outbound, SIGNAL(stateChanged(QLocalSocket::LocalSocketState)), this, SLOT(outboundStateChanged(QLocalSocket::LocalSocketState) ) ); + + inbound = NULL; + } + + enum SignalType + { + INPUT_AVAILABLE, + OUTPUT_AVAILABLE, + SHOULD_END + }; + + +public slots: + // matching server signals + void receivedConnection() + { + inbound = server.nextPendingConnection(); + connect(inbound, SIGNAL(disconnected() ), this, SLOT(inboundDisconnected() )); + connect(inbound, SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(inboundConnectionError(QLocalSocket::LocalSocketError) ) ); + connect(inbound, SIGNAL(stateChanged(QLocalSocket::LocalSocketState)), this, SLOT(inboundStateChanged(QLocalSocket::LocalSocketState) ) ); + + receivedWait.wakeAll(); + } + + // matching outbound socket signals + + // Oh boy. + void outboundConnected() + { + outboundWait.wakeAll(); + } + + // Oh no! + void outboundDisconnected() + { + //qDebug() << key << " outbound socket has disconnected"; + } + + // informative. + void outboundConnectionError(QLocalSocket::LocalSocketError socketError) + { + //qDebug() << key << " outbound socket error " << socketError; + } + + void outboundStateChanged(QLocalSocket::LocalSocketState socketState) + { + //qDebug() << key << " outbound socket state changed to " << socketState; + } + + // matching inbound socket signals + void inboundDisconnected() + { + //qDebug() << key << " inbound socket has disconnected"; + } + + void inboundConnectionError(QLocalSocket::LocalSocketError socketError) + { + //qDebug() << key << " inbound socket error " << socketError; + } + + void inboundStateChanged(QLocalSocket::LocalSocketState socketState) + { + //qDebug() << key << " inbound socket state changed to " << socketState; + } + + void startServerInternal(QString serverName) + { + if (!server.isListening()) { + bool listen_res = server.listen(serverName); + if (!listen_res) + qDebug() << key << " Failed to start server at " << serverName; + } + } + + void sendSignalInternal() + { + SignalType signal= sendType; + qint64 signal_wrote = outbound.write((char *) &signal, sizeof(signal)); + if (signal_wrote != sizeof(signal)) + qDebug() << key << " inconsistent signal size"; + + bool res = outbound.waitForBytesWritten(-1); + if (!res) + qDebug() << key << " failed to wait for bytes written in signal size"; + } + + void readSignalInternal() + { + while (inbound->bytesAvailable() < sizeof(readSignal) ) { + bool size_ready = inbound->waitForReadyRead(-1); + if (!size_ready) + { + qDebug("Failed to received object size in signal!"); + } + } + + qint64 signalBytesRead = inbound->read((char *) &readSignal, sizeof(readSignal)); + if (signalBytesRead != sizeof(readSignal)) + qDebug("Inconsistent signal size read!"); + + if (readSignal == SHOULD_END) + { + server.close(); + outbound.abort(); + inbound->abort(); + } + } + + void sendSerializedInternal() + { + qint64 serializedSize = writeArray.size(); + qint64 size_wrote = outbound.write((char *) &serializedSize, sizeof(serializedSize)); + if (size_wrote != sizeof(serializedSize)) { + qDebug() << key << "inconsistent size sent in send data!"; + return; + } + bool res = outbound.waitForBytesWritten(-1); + + if (!res) { + qDebug() << key << " wait for bytes failed!"; + return; + } + + qint64 data_wrote = outbound.write(writeArray.data(), serializedSize); + if (data_wrote != serializedSize) + qDebug() << key << " inconsistent data written!"; + + while (outbound.bytesToWrite() > 0) { + bool write_res = outbound.waitForBytesWritten(-1); + if (!write_res) { + qDebug() << key << " wait for bytes failed!"; + return; + } + } + + return; + + } + + void readSerializedInternal() + { + qint64 bufferSize; + while (inbound->bytesAvailable() < qint64(sizeof(bufferSize))) { + bool size_ready = inbound->waitForReadyRead(-1); + if (!size_ready) + { + qDebug() << key << " Failed to received object size in read data!"; + qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString(); + return; + } + } + qint64 sizeBytesRead = inbound->read((char *) &bufferSize, sizeof(bufferSize)); + if (sizeBytesRead != sizeof(bufferSize)) { + qDebug("failed to read size of buffer!"); + return; + } + + // allocate the input buffer + readArray.resize(bufferSize); + + // read the data, we may get it in serveral bursts + qint64 arrayPosition = 0; + while (arrayPosition < bufferSize) { + if (!inbound->bytesAvailable()) { + bool ready_res = inbound->waitForReadyRead(-1); + + if (!ready_res) { + qDebug() << key << "failed to wait for data!"; + return; + } + } + + // how many bytes do we still need? + qint64 bytes_remaining = bufferSize - arrayPosition; + + if (bytes_remaining < inbound->bytesAvailable() ) + { + qDebug() << key << "!!!excessive bytes received"; + } + arrayPosition += inbound->read(readArray.data()+arrayPosition, qMin(inbound->bytesAvailable(), bytes_remaining)); + } + if (arrayPosition != bufferSize) + qDebug() << key << "Read wrong size object!"; + + } + + void shutdownInternal() + { + outbound.abort(); + inbound->abort(); + server.close(); + } + +signals: + void pulseStartServer(QString serverName); + void pulseSignal(); + void pulseReadSignal(); + void pulseReadSerialized(); + void pulseSendSerialized(); + void pulseShutdown(); + + +public: + QByteArray readArray; + QByteArray writeArray; + + SignalType readSignal; + QMutex receivedLock; + QWaitCondition receivedWait; + + QMutex outboundLock; + QWaitCondition outboundWait; + + QString key; + QString serverName; + QString remoteName; + + QLocalSocket * inbound; + QLocalSocket outbound; + QLocalServer server; + + + void waitForInbound() + { + QMutexLocker locker(&receivedLock); + while (!inbound || inbound->state() != QLocalSocket::ConnectedState) { + bool res = receivedWait.wait(&receivedLock,30*1000); + if (!res) + { + qDebug() << key << " " << QThread::currentThread() << " waiting timed out, server thread is " << server.thread() << " application thread " << QCoreApplication::instance()->thread(); + } + } + } + + void connectToRemote(const QString & remoteName) + { + outbound.connectToServer(remoteName); + + QMutexLocker locker(&outboundLock); + while (outbound.state() != QLocalSocket::ConnectedState) { + outboundWait.wait(&outboundLock,30*1000); + } + } + + SignalType getSignal() + { + emit pulseReadSignal(); + return readSignal; + } + + + template + bool readData(T & input) + { + emit pulseReadSerialized(); + QDataStream deserializer(readArray); + deserializer >> input; + return true; + } + + template + bool sendData(const T & output) + { + QBuffer buffer; + buffer.open(QBuffer::ReadWrite); + + QDataStream serializer(&buffer); + serializer << output; + writeArray = buffer.data(); + emit pulseSendSerialized(); + return true; + } + + SignalType sendType; + void sendSignal(SignalType signal) + { + sendType = signal; + if (QThread::currentThread() == this->thread() ) + this->sendSignalInternal(); + else + emit pulseSignal(); + } + + void startServer(QString server) + { + emit pulseStartServer(server); + } + + void shutdown() + { + emit pulseShutdown(); + } + + }; -class EnrollmentWorker +class EnrollmentWorker : public QObject { + Q_OBJECT public: - QLocalServer inbound; - QLocalSocket outbound; - QLocalSocket * receiver; + CommunicationManager * comm; + QString name; ~EnrollmentWorker() { delete transform; + delete comm; } br::Transform * transform; +public: void connections(const QString & baseName) { - inbound.listen(baseName+"_worker"); - outbound.connectToServer(baseName+"_master"); - inbound.waitForNewConnection(-1); - receiver = inbound.nextPendingConnection(); - outbound.waitForConnected(-1); + comm = new CommunicationManager(); + name = baseName; + comm->key = "worker_"+baseName.mid(1,5); + comm->startServer(baseName+"_worker"); + comm->connectToRemote(baseName+"_master"); + + comm->waitForInbound(); } void workerLoop() { - SignalType signal; - + QString sign = "worker " + name; + CommunicationManager::SignalType signal; forever { - while (receiver->bytesAvailable() < qint64(sizeof(signal))) { - receiver->waitForReadyRead(-1); - } - receiver->read((char *) &signal, sizeof(signal)); + signal= comm->getSignal(); - if (signal == SHOULD_END) { - outbound.close(); - inbound.close(); + if (signal == CommunicationManager::SHOULD_END) { break; } - - qint64 inBufferSize; - while (receiver->bytesAvailable() < qint64(sizeof(inBufferSize))) { - receiver->waitForReadyRead(-1); - } - receiver->read((char *) &inBufferSize, sizeof(inBufferSize)); - - QByteArray inArray(inBufferSize,'0'); - - qint64 arrayPosition = 0; - while (arrayPosition < inBufferSize) { - if (!receiver->bytesAvailable()) - receiver->waitForReadyRead(-1); - arrayPosition += receiver->read(inArray.data()+arrayPosition, receiver->bytesAvailable()); - } - TemplateList inList; TemplateList outList; - // deserialize the template list - QDataStream deserializer(inArray); - deserializer >> inList; - // and project it + comm->readData(inList); transform->projectUpdate(inList,outList); - - // serialize the output list - QBuffer outBuff; - outBuff.open(QBuffer::ReadWrite); - QDataStream serializer(&outBuff); - serializer << outList; - - // send the size of the buffer - //qint64 bufferSize = outBuff.size(); - qint64 bufferSize = outBuff.data().size(); - outbound.write((char *) &bufferSize, sizeof(bufferSize)); - - outbound.write(outBuff.data().data(), bufferSize); - while (outbound.bytesToWrite() > 0) { - outbound.waitForBytesWritten(-1); - } + comm->sendData(outList); } + comm->shutdown(); } }; @@ -115,6 +398,15 @@ void WorkerProcess::mainLoop() delete processInterface; } +void shutUp(QtMsgType type, const QMessageLogContext &context, const QString &msg) +{ + // Qt you have no idea how much I don't care about you. + // Please tell me more about how you want every single god damn thing to be created from and used by exactly one thread. + // It does not matter, so shut up already. + // p.s. I hope you die. +} + + /*! * \ingroup transforms * \brief Interface to a separate process @@ -128,53 +420,22 @@ class ProcessWrapperTransform : public TimeVaryingTransform BR_PROPERTY(QString, transform, "") QString baseKey; - QProcess workerProcess; - QLocalServer inbound; - QLocalSocket outbound; - QLocalSocket * receiver; + QProcess * workerProcess; + CommunicationManager * comm; void projectUpdate(const TemplateList &src, TemplateList &dst) { + if (!processActive) { activateProcess(); } + comm->sendSignal(CommunicationManager::INPUT_AVAILABLE); - SignalType signal = INPUT_AVAILABLE; - outbound.write((char *) &signal, sizeof(SignalType)); - - QBuffer inBuffer; - inBuffer.open(QBuffer::ReadWrite); - QDataStream serializer(&inBuffer); - serializer << src; + comm->sendData(src); - qint64 in_size = inBuffer.size(); - outbound.write((char *) &in_size, sizeof(in_size)); - - outbound.write(inBuffer.data(), in_size); - - while (outbound.bytesToWrite() > 0) { - outbound.waitForBytesWritten(-1); - } - - qint64 out_size; - - // read the size - receiver->waitForReadyRead(-1); - receiver->read((char *) &out_size, sizeof(out_size)); - QByteArray outBuffer(out_size,'0'); - - // read the (serialized) output templatelist - qint64 arrayPosition = 0; - while (arrayPosition < out_size) { - if (!receiver->bytesAvailable()) - receiver->waitForReadyRead(-1); - arrayPosition += receiver->read(outBuffer.data()+arrayPosition, receiver->bytesAvailable()); - } - // and deserialize it. - QDataStream deserialize(outBuffer); - deserialize >> dst; + comm->readData(dst); } @@ -191,6 +452,7 @@ class ProcessWrapperTransform : public TimeVaryingTransform void activateProcess() { + comm = new CommunicationManager(); processActive = true; // generate a uuid for our local servers @@ -209,20 +471,17 @@ class ProcessWrapperTransform : public TimeVaryingTransform argumentList.append("-slave"); argumentList.append(baseKey); - // start listening - inbound.listen(baseKey+"_master"); + comm->key = "master_"+baseKey.mid(1,5); - workerProcess.setProcessChannelMode(QProcess::ForwardedChannels); - workerProcess.start("br", argumentList); - workerProcess.waitForStarted(-1); + comm->startServer(baseKey+"_master"); - // blocking wait for the connection from the worker process - inbound.waitForNewConnection(-1); - receiver = inbound.nextPendingConnection(); + workerProcess = new QProcess(); + workerProcess->setProcessChannelMode(QProcess::ForwardedChannels); + workerProcess->start("br", argumentList); + workerProcess->waitForStarted(-1); - // Now, create our connection to the worker process. - outbound.connectToServer(baseKey+"_worker"); - outbound.waitForConnected(-1); + comm->waitForInbound(); + comm->connectToRemote(baseKey+"_worker"); } bool timeVarying() const { @@ -233,15 +492,19 @@ class ProcessWrapperTransform : public TimeVaryingTransform { // end the process if (this->processActive) { + comm->sendSignal(CommunicationManager::SHOULD_END); - SignalType signal = SHOULD_END; - outbound.write((char *) &signal, sizeof(SignalType)); - outbound.waitForBytesWritten(-1); - outbound.close(); + // I don't even want to talk about it. + qInstallMessageHandler(shutUp); + workerProcess->waitForFinished(-1); + delete workerProcess; + qInstallMessageHandler(0); - workerProcess.waitForFinished(-1); - inbound.close(); processActive = false; + comm->inbound->abort(); + comm->outbound.abort(); + comm->server.close(); + delete comm; } }