diff --git a/app/br/br.cpp b/app/br/br.cpp index 53040f8..83640e9 100644 --- a/app/br/br.cpp +++ b/app/br/br.cpp @@ -171,7 +171,11 @@ public: check(parc == 1, "Incorrect parameter count for 'daemon'."); daemon = true; daemon_pipe = parv[0]; - } else if (!strcmp(fun, "exit")) { + } else if (!strcmp(fun,"slave")) { + check(parc == 1, "Incorrect parameter count for 'slave'"); + br_slave_process(parv[0]); + } + else if (!strcmp(fun, "exit")) { check(parc == 0, "No parameters expected for 'exit'."); daemon = false; } else if (!strcmp(fun, "getHeader")) { diff --git a/openbr/openbr.cpp b/openbr/openbr.cpp index 0950fa4..008a4b1 100644 --- a/openbr/openbr.cpp +++ b/openbr/openbr.cpp @@ -22,6 +22,7 @@ #include "core/fuse.h" #include "core/plot.h" #include "core/qtutils.h" +#include "plugins/openbr_internal.h" using namespace br; @@ -279,3 +280,11 @@ const char *br_version() static QByteArray version = Context::version().toLocal8Bit(); return version.data(); } + +void br_slave_process(const char * baseName) +{ + WorkerProcess worker; + worker.transform = Globals->algorithm; + worker.baseName = baseName; + worker.mainLoop(); +} diff --git a/openbr/openbr.h b/openbr/openbr.h index 9a115ce..93d0d8f 100644 --- a/openbr/openbr.h +++ b/openbr/openbr.h @@ -408,6 +408,12 @@ BR_EXPORT void br_train_n(int num_inputs, const char *inputs[], const char *mode */ BR_EXPORT const char *br_version(); + +/*! + * \brief For internal use via ProcessWrapperTransform + */ +BR_EXPORT void br_slave_process(const char * baseKey); + /*! @}*/ #ifdef __cplusplus diff --git a/openbr/openbr_plugin.cpp b/openbr/openbr_plugin.cpp index 02a1caa..a504642 100644 --- a/openbr/openbr_plugin.cpp +++ b/openbr/openbr_plugin.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -936,6 +937,8 @@ void br::Context::initialize(int &argc, char *argv[], QString sdkPath, bool use_ qRegisterMetaType< QList >(); qRegisterMetaType< QList >(); qRegisterMetaType< QList >(); + qRegisterMetaType< QAbstractSocket::SocketState> (); + Globals = new Context(); Globals->init(File()); diff --git a/openbr/plugins/openbr_internal.h b/openbr/plugins/openbr_internal.h index 3348b8c..e95a72d 100644 --- a/openbr/plugins/openbr_internal.h +++ b/openbr/plugins/openbr_internal.h @@ -240,6 +240,20 @@ protected: CompositeTransform() : TimeVaryingTransform(false) {} }; +class EnrollmentWorker; + +class WorkerProcess +{ +public: + + QString transform; + QString baseName; + EnrollmentWorker * processInterface; + + void mainLoop(); +}; + + } #endif // OPENBR_INTERNAL_H diff --git a/openbr/plugins/process.cpp b/openbr/plugins/process.cpp new file mode 100644 index 0000000..f7a4ff1 --- /dev/null +++ b/openbr/plugins/process.cpp @@ -0,0 +1,265 @@ + +#include +#include +#include +#include +#include + +#include +#include + +#include "openbr_internal.h" +#include "openbr/core/opencvutils.h" + +using namespace cv; + +namespace br +{ + +enum SignalType +{ + INPUT_AVAILABLE, + OUTPUT_AVAILABLE, + SHOULD_END +}; + +class EnrollmentWorker +{ +public: + QLocalServer inbound; + QLocalSocket outbound; + QLocalSocket * receiver; + + ~EnrollmentWorker() + { + delete transform; + } + + br::Transform * transform; + + void connections(const QString & baseName) + { + inbound.listen(baseName+"_worker"); + outbound.connectToServer(baseName+"_master"); + inbound.waitForNewConnection(-1); + receiver = inbound.nextPendingConnection(); + outbound.waitForConnected(-1); + } + + void workerLoop() + { + SignalType signal; + + forever + { + while (receiver->bytesAvailable() < qint64(sizeof(signal))) { + receiver->waitForReadyRead(-1); + } + receiver->read((char *) &signal, sizeof(signal)); + + if (signal == SHOULD_END) { + outbound.close(); + inbound.close(); + break; + } + + qint64 inBufferSize; + while (receiver->bytesAvailable() < qint64(sizeof(inBufferSize))) { + receiver->waitForReadyRead(-1); + } + receiver->read((char *) &inBufferSize, sizeof(inBufferSize)); + + QByteArray inArray(inBufferSize,'0'); + + qint64 arrayPosition = 0; + while (arrayPosition < inBufferSize) { + if (!receiver->bytesAvailable()) + receiver->waitForReadyRead(-1); + arrayPosition += receiver->read(inArray.data()+arrayPosition, receiver->bytesAvailable()); + } + + TemplateList inList; + TemplateList outList; + // deserialize the template list + QDataStream deserializer(inArray); + deserializer >> inList; + + // and project it + transform->projectUpdate(inList,outList); + + // serialize the output list + QBuffer outBuff; + outBuff.open(QBuffer::ReadWrite); + QDataStream serializer(&outBuff); + serializer << outList; + + // send the size of the buffer + //qint64 bufferSize = outBuff.size(); + qint64 bufferSize = outBuff.data().size(); + outbound.write((char *) &bufferSize, sizeof(bufferSize)); + + outbound.write(outBuff.data().data(), bufferSize); + while (outbound.bytesToWrite() > 0) { + outbound.waitForBytesWritten(-1); + } + } + } + + +}; + +void WorkerProcess::mainLoop() +{ + processInterface = new EnrollmentWorker(); + processInterface->transform = Transform::make(this->transform,NULL); + processInterface->connections(baseName); + processInterface->workerLoop(); + delete processInterface; +} + +/*! + * \ingroup transforms + * \brief Interface to a separate process + * \author Charles Otto \cite caotto + */ +class ProcessWrapperTransform : public TimeVaryingTransform +{ + Q_OBJECT + + Q_PROPERTY(QString transform READ get_transform WRITE set_transform RESET reset_transform) + BR_PROPERTY(QString, transform, "") + + QString baseKey; + QProcess workerProcess; + + QLocalServer inbound; + QLocalSocket outbound; + QLocalSocket * receiver; + + void projectUpdate(const TemplateList &src, TemplateList &dst) + { + if (!processActive) + { + activateProcess(); + } + + SignalType signal = INPUT_AVAILABLE; + outbound.write((char *) &signal, sizeof(SignalType)); + + QBuffer inBuffer; + inBuffer.open(QBuffer::ReadWrite); + QDataStream serializer(&inBuffer); + serializer << src; + + qint64 in_size = inBuffer.size(); + outbound.write((char *) &in_size, sizeof(in_size)); + + outbound.write(inBuffer.data(), in_size); + + while (outbound.bytesToWrite() > 0) { + outbound.waitForBytesWritten(-1); + } + + qint64 out_size; + + // read the size + receiver->waitForReadyRead(-1); + receiver->read((char *) &out_size, sizeof(out_size)); + QByteArray outBuffer(out_size,'0'); + + // read the (serialized) output templatelist + qint64 arrayPosition = 0; + while (arrayPosition < out_size) { + if (!receiver->bytesAvailable()) + receiver->waitForReadyRead(-1); + arrayPosition += receiver->read(outBuffer.data()+arrayPosition, receiver->bytesAvailable()); + } + // and deserialize it. + QDataStream deserialize(outBuffer); + deserialize >> dst; + } + + + void train(const TemplateList& data) + { + (void) data; + } + + // create the process + void init() + { + processActive = false; + } + + void activateProcess() + { + processActive = true; + + // generate a uuid for our local servers + QUuid id = QUuid::createUuid(); + baseKey = id.toString(); + + QStringList argumentList; + argumentList.append("-useGui"); + argumentList.append("0"); + argumentList.append("-algorithm"); + argumentList.append(transform); + argumentList.append("-path"); + argumentList.append(Globals->path); + argumentList.append("-parallelism"); + argumentList.append(QString::number(0)); + argumentList.append("-slave"); + argumentList.append(baseKey); + + // start listening + inbound.listen(baseKey+"_master"); + + workerProcess.setProcessChannelMode(QProcess::ForwardedChannels); + workerProcess.start("br", argumentList); + workerProcess.waitForStarted(-1); + + // blocking wait for the connection from the worker process + inbound.waitForNewConnection(-1); + receiver = inbound.nextPendingConnection(); + + // Now, create our connection to the worker process. + outbound.connectToServer(baseKey+"_worker"); + outbound.waitForConnected(-1); + } + + bool timeVarying() const { + return false; + } + + ~ProcessWrapperTransform() + { + // end the process + if (this->processActive) { + + SignalType signal = SHOULD_END; + outbound.write((char *) &signal, sizeof(SignalType)); + outbound.waitForBytesWritten(-1); + outbound.close(); + + workerProcess.waitForFinished(-1); + inbound.close(); + processActive = false; + } + } + +public: + bool processActive; + ProcessWrapperTransform() : TimeVaryingTransform(false,false) { processActive = false; } +}; + +BR_REGISTER(Transform, ProcessWrapperTransform) + + +} + + +#include "process.moc" + + + +