Commit 535886af716a45f7d2b675d9eeb28326725ef03f

Authored by jklontz
2 parents 0d513134 115566b0

Merge pull request #119 from biometrics/shadow_process

Add a wrapper transform that runs a child transform in a separate process
app/br/br.cpp
... ... @@ -171,7 +171,11 @@ public:
171 171 check(parc == 1, "Incorrect parameter count for 'daemon'.");
172 172 daemon = true;
173 173 daemon_pipe = parv[0];
174   - } else if (!strcmp(fun, "exit")) {
  174 + } else if (!strcmp(fun,"slave")) {
  175 + check(parc == 1, "Incorrect parameter count for 'slave'");
  176 + br_slave_process(parv[0]);
  177 + }
  178 + else if (!strcmp(fun, "exit")) {
175 179 check(parc == 0, "No parameters expected for 'exit'.");
176 180 daemon = false;
177 181 } else if (!strcmp(fun, "getHeader")) {
... ...
openbr/openbr.cpp
... ... @@ -22,6 +22,7 @@
22 22 #include "core/fuse.h"
23 23 #include "core/plot.h"
24 24 #include "core/qtutils.h"
  25 +#include "plugins/openbr_internal.h"
25 26  
26 27 using namespace br;
27 28  
... ... @@ -279,3 +280,11 @@ const char *br_version()
279 280 static QByteArray version = Context::version().toLocal8Bit();
280 281 return version.data();
281 282 }
  283 +
  284 +void br_slave_process(const char * baseName)
  285 +{
  286 + WorkerProcess worker;
  287 + worker.transform = Globals->algorithm;
  288 + worker.baseName = baseName;
  289 + worker.mainLoop();
  290 +}
... ...
openbr/openbr.h
... ... @@ -408,6 +408,12 @@ BR_EXPORT void br_train_n(int num_inputs, const char *inputs[], const char *mode
408 408 */
409 409 BR_EXPORT const char *br_version();
410 410  
  411 +
  412 +/*!
  413 + * \brief For internal use via ProcessWrapperTransform
  414 + */
  415 +BR_EXPORT void br_slave_process(const char * baseKey);
  416 +
411 417 /*! @}*/
412 418  
413 419 #ifdef __cplusplus
... ...
openbr/openbr_plugin.cpp
... ... @@ -17,6 +17,7 @@
17 17 #include <QCoreApplication>
18 18 #include <QCryptographicHash>
19 19 #include <QFutureSynchronizer>
  20 +#include <QLocalSocket>
20 21 #include <QMetaProperty>
21 22 #include <QPointF>
22 23 #include <QProcess>
... ... @@ -936,6 +937,8 @@ void br::Context::initialize(int &amp;argc, char *argv[], QString sdkPath, bool use_
936 937 qRegisterMetaType< QList<float> >();
937 938 qRegisterMetaType< QList<br::Transform*> >();
938 939 qRegisterMetaType< QList<br::Distance*> >();
  940 + qRegisterMetaType< QAbstractSocket::SocketState> ();
  941 +
939 942  
940 943 Globals = new Context();
941 944 Globals->init(File());
... ...
openbr/plugins/openbr_internal.h
... ... @@ -240,6 +240,20 @@ protected:
240 240 CompositeTransform() : TimeVaryingTransform(false) {}
241 241 };
242 242  
  243 +class EnrollmentWorker;
  244 +
  245 +class WorkerProcess
  246 +{
  247 +public:
  248 +
  249 + QString transform;
  250 + QString baseName;
  251 + EnrollmentWorker * processInterface;
  252 +
  253 + void mainLoop();
  254 +};
  255 +
  256 +
243 257 }
244 258  
245 259 #endif // OPENBR_INTERNAL_H
... ...
openbr/plugins/process.cpp 0 → 100644
  1 +
  2 +#include <QBuffer>
  3 +#include <QLocalServer>
  4 +#include <QLocalSocket>
  5 +#include <QProcess>
  6 +#include <QUuid>
  7 +
  8 +#include <iostream>
  9 +#include <fstream>
  10 +
  11 +#include "openbr_internal.h"
  12 +#include "openbr/core/opencvutils.h"
  13 +
  14 +using namespace cv;
  15 +
  16 +namespace br
  17 +{
  18 +
  19 +enum SignalType
  20 +{
  21 + INPUT_AVAILABLE,
  22 + OUTPUT_AVAILABLE,
  23 + SHOULD_END
  24 +};
  25 +
  26 +class EnrollmentWorker
  27 +{
  28 +public:
  29 + QLocalServer inbound;
  30 + QLocalSocket outbound;
  31 + QLocalSocket * receiver;
  32 +
  33 + ~EnrollmentWorker()
  34 + {
  35 + delete transform;
  36 + }
  37 +
  38 + br::Transform * transform;
  39 +
  40 + void connections(const QString & baseName)
  41 + {
  42 + inbound.listen(baseName+"_worker");
  43 + outbound.connectToServer(baseName+"_master");
  44 + inbound.waitForNewConnection(-1);
  45 + receiver = inbound.nextPendingConnection();
  46 + outbound.waitForConnected(-1);
  47 + }
  48 +
  49 + void workerLoop()
  50 + {
  51 + SignalType signal;
  52 +
  53 + forever
  54 + {
  55 + while (receiver->bytesAvailable() < qint64(sizeof(signal))) {
  56 + receiver->waitForReadyRead(-1);
  57 + }
  58 + receiver->read((char *) &signal, sizeof(signal));
  59 +
  60 + if (signal == SHOULD_END) {
  61 + outbound.close();
  62 + inbound.close();
  63 + break;
  64 + }
  65 +
  66 + qint64 inBufferSize;
  67 + while (receiver->bytesAvailable() < qint64(sizeof(inBufferSize))) {
  68 + receiver->waitForReadyRead(-1);
  69 + }
  70 + receiver->read((char *) &inBufferSize, sizeof(inBufferSize));
  71 +
  72 + QByteArray inArray(inBufferSize,'0');
  73 +
  74 + qint64 arrayPosition = 0;
  75 + while (arrayPosition < inBufferSize) {
  76 + if (!receiver->bytesAvailable())
  77 + receiver->waitForReadyRead(-1);
  78 + arrayPosition += receiver->read(inArray.data()+arrayPosition, receiver->bytesAvailable());
  79 + }
  80 +
  81 + TemplateList inList;
  82 + TemplateList outList;
  83 + // deserialize the template list
  84 + QDataStream deserializer(inArray);
  85 + deserializer >> inList;
  86 +
  87 + // and project it
  88 + transform->projectUpdate(inList,outList);
  89 +
  90 + // serialize the output list
  91 + QBuffer outBuff;
  92 + outBuff.open(QBuffer::ReadWrite);
  93 + QDataStream serializer(&outBuff);
  94 + serializer << outList;
  95 +
  96 + // send the size of the buffer
  97 + //qint64 bufferSize = outBuff.size();
  98 + qint64 bufferSize = outBuff.data().size();
  99 + outbound.write((char *) &bufferSize, sizeof(bufferSize));
  100 +
  101 + outbound.write(outBuff.data().data(), bufferSize);
  102 + while (outbound.bytesToWrite() > 0) {
  103 + outbound.waitForBytesWritten(-1);
  104 + }
  105 + }
  106 + }
  107 +
  108 +
  109 +};
  110 +
  111 +void WorkerProcess::mainLoop()
  112 +{
  113 + processInterface = new EnrollmentWorker();
  114 + processInterface->transform = Transform::make(this->transform,NULL);
  115 + processInterface->connections(baseName);
  116 + processInterface->workerLoop();
  117 + delete processInterface;
  118 +}
  119 +
  120 +/*!
  121 + * \ingroup transforms
  122 + * \brief Interface to a separate process
  123 + * \author Charles Otto \cite caotto
  124 + */
  125 +class ProcessWrapperTransform : public TimeVaryingTransform
  126 +{
  127 + Q_OBJECT
  128 +
  129 + Q_PROPERTY(QString transform READ get_transform WRITE set_transform RESET reset_transform)
  130 + BR_PROPERTY(QString, transform, "")
  131 +
  132 + QString baseKey;
  133 + QProcess workerProcess;
  134 +
  135 + QLocalServer inbound;
  136 + QLocalSocket outbound;
  137 + QLocalSocket * receiver;
  138 +
  139 + void projectUpdate(const TemplateList &src, TemplateList &dst)
  140 + {
  141 + if (!processActive)
  142 + {
  143 + activateProcess();
  144 + }
  145 +
  146 + SignalType signal = INPUT_AVAILABLE;
  147 + outbound.write((char *) &signal, sizeof(SignalType));
  148 +
  149 + QBuffer inBuffer;
  150 + inBuffer.open(QBuffer::ReadWrite);
  151 + QDataStream serializer(&inBuffer);
  152 + serializer << src;
  153 +
  154 + qint64 in_size = inBuffer.size();
  155 + outbound.write((char *) &in_size, sizeof(in_size));
  156 +
  157 + outbound.write(inBuffer.data(), in_size);
  158 +
  159 + while (outbound.bytesToWrite() > 0) {
  160 + outbound.waitForBytesWritten(-1);
  161 + }
  162 +
  163 + qint64 out_size;
  164 +
  165 + // read the size
  166 + receiver->waitForReadyRead(-1);
  167 + receiver->read((char *) &out_size, sizeof(out_size));
  168 + QByteArray outBuffer(out_size,'0');
  169 +
  170 + // read the (serialized) output templatelist
  171 + qint64 arrayPosition = 0;
  172 + while (arrayPosition < out_size) {
  173 + if (!receiver->bytesAvailable())
  174 + receiver->waitForReadyRead(-1);
  175 + arrayPosition += receiver->read(outBuffer.data()+arrayPosition, receiver->bytesAvailable());
  176 + }
  177 + // and deserialize it.
  178 + QDataStream deserialize(outBuffer);
  179 + deserialize >> dst;
  180 + }
  181 +
  182 +
  183 + void train(const TemplateList& data)
  184 + {
  185 + (void) data;
  186 + }
  187 +
  188 + // create the process
  189 + void init()
  190 + {
  191 + processActive = false;
  192 + }
  193 +
  194 + void activateProcess()
  195 + {
  196 + processActive = true;
  197 +
  198 + // generate a uuid for our local servers
  199 + QUuid id = QUuid::createUuid();
  200 + baseKey = id.toString();
  201 +
  202 + QStringList argumentList;
  203 + argumentList.append("-useGui");
  204 + argumentList.append("0");
  205 + argumentList.append("-algorithm");
  206 + argumentList.append(transform);
  207 + argumentList.append("-path");
  208 + argumentList.append(Globals->path);
  209 + argumentList.append("-parallelism");
  210 + argumentList.append(QString::number(0));
  211 + argumentList.append("-slave");
  212 + argumentList.append(baseKey);
  213 +
  214 + // start listening
  215 + inbound.listen(baseKey+"_master");
  216 +
  217 + workerProcess.setProcessChannelMode(QProcess::ForwardedChannels);
  218 + workerProcess.start("br", argumentList);
  219 + workerProcess.waitForStarted(-1);
  220 +
  221 + // blocking wait for the connection from the worker process
  222 + inbound.waitForNewConnection(-1);
  223 + receiver = inbound.nextPendingConnection();
  224 +
  225 + // Now, create our connection to the worker process.
  226 + outbound.connectToServer(baseKey+"_worker");
  227 + outbound.waitForConnected(-1);
  228 + }
  229 +
  230 + bool timeVarying() const {
  231 + return false;
  232 + }
  233 +
  234 + ~ProcessWrapperTransform()
  235 + {
  236 + // end the process
  237 + if (this->processActive) {
  238 +
  239 + SignalType signal = SHOULD_END;
  240 + outbound.write((char *) &signal, sizeof(SignalType));
  241 + outbound.waitForBytesWritten(-1);
  242 + outbound.close();
  243 +
  244 + workerProcess.waitForFinished(-1);
  245 + inbound.close();
  246 + processActive = false;
  247 + }
  248 + }
  249 +
  250 +public:
  251 + bool processActive;
  252 + ProcessWrapperTransform() : TimeVaryingTransform(false,false) { processActive = false; }
  253 +};
  254 +
  255 +BR_REGISTER(Transform, ProcessWrapperTransform)
  256 +
  257 +
  258 +}
  259 +
  260 +
  261 +#include "process.moc"
  262 +
  263 +
  264 +
  265 +
... ...