Commit ba5f51af86ddded5345fe1b3226ffaaba8ae62d5

Authored by Charles Otto
1 parent 885c3ec1

ProcessWrapper round 2

In a shocking turn of events, Qt is annoying, and not particularly
consistent across platforms.

Lot of not particularly intelligent business with signals to get things to
execute on the main thread.
I would rather not even use the event driven interfaces to
QLocalServer/QLocalSocket but sadly it seems that on windows even if we
try and use the blocking calls, we are still required to use using exactly one
thread per object. Of course that makes sense since obviously we are writing
a simple event-driven GUI application.
openbr/openbr.cpp
... ... @@ -283,8 +283,9 @@ const char *br_version()
283 283  
284 284 void br_slave_process(const char * baseName)
285 285 {
286   - WorkerProcess worker;
287   - worker.transform = Globals->algorithm;
288   - worker.baseName = baseName;
289   - worker.mainLoop();
  286 + WorkerProcess * worker = new WorkerProcess;
  287 + worker->transform = Globals->algorithm;
  288 + worker->baseName = baseName;
  289 + worker->mainLoop();
  290 + delete worker;
290 291 }
... ...
openbr/openbr_plugin.cpp
... ... @@ -42,6 +42,8 @@
42 42 using namespace br;
43 43 using namespace cv;
44 44  
  45 +Q_DECLARE_METATYPE(QLocalSocket::LocalSocketState)
  46 +
45 47 // Some globals used to transfer data to Context::messageHandler so that
46 48 // we can restart the process if we try and fail to create a QApplication.
47 49 static bool creating_qapp = false;
... ... @@ -938,7 +940,7 @@ void br::Context::initialize(int &argc, char *argv[], QString sdkPath, bool use_
938 940 qRegisterMetaType< QList<br::Transform*> >();
939 941 qRegisterMetaType< QList<br::Distance*> >();
940 942 qRegisterMetaType< QAbstractSocket::SocketState> ();
941   -
  943 + qRegisterMetaType< QLocalSocket::LocalSocketState> ();
942 944  
943 945 Globals = new Context();
944 946 Globals->init(File());
... ...
openbr/plugins/process.cpp
1 1  
  2 +
2 3 #include <QBuffer>
  4 +#include <QCoreApplication>
3 5 #include <QLocalServer>
4 6 #include <QLocalSocket>
  7 +#include <QMutex>
5 8 #include <QProcess>
6 9 #include <QUuid>
7   -
8   -#include <iostream>
9   -#include <fstream>
  10 +#include <QWaitCondition>
10 11  
11 12 #include "openbr_internal.h"
12 13 #include "openbr/core/opencvutils.h"
... ... @@ -16,93 +17,375 @@ using namespace cv;
16 17 namespace br
17 18 {
18 19  
19   -enum SignalType
  20 +class CommunicationManager : public QObject
20 21 {
21   - INPUT_AVAILABLE,
22   - OUTPUT_AVAILABLE,
23   - SHOULD_END
  22 + Q_OBJECT
  23 +public:
  24 + CommunicationManager()
  25 + {
  26 + moveToThread(QCoreApplication::instance()->thread());
  27 + server.moveToThread(QCoreApplication::instance()->thread() );
  28 + outbound.moveToThread(QCoreApplication::instance()->thread() );
  29 +
  30 + // signals for our sever
  31 + connect(&server, SIGNAL(newConnection()), this, SLOT(receivedConnection() ));
  32 + connect(this, SIGNAL(pulseStartServer(QString)), this, SLOT(startServerInternal(QString)), Qt::BlockingQueuedConnection);
  33 +
  34 + // internals, cause work to be done by the main thread because reasons.
  35 + connect(this, SIGNAL(pulseSignal()), this, SLOT(sendSignalInternal()), Qt::BlockingQueuedConnection);
  36 + connect(this, SIGNAL(pulseReadSignal() ), this, SLOT(readSignalInternal()), Qt::BlockingQueuedConnection);
  37 + connect(this, SIGNAL(pulseReadSerialized() ), this, SLOT(readSerializedInternal()), Qt::BlockingQueuedConnection);
  38 + connect(this, SIGNAL(pulseSendSerialized() ), this, SLOT(sendSerializedInternal() ), Qt::BlockingQueuedConnection);
  39 + connect(this, SIGNAL(pulseShutdown() ), this, SLOT(shutdownInternal() ), Qt::BlockingQueuedConnection);
  40 +
  41 + // signals for our outbound connection
  42 + connect(&outbound, SIGNAL(connected() ), this, SLOT(outboundConnected() ));
  43 + connect(&outbound, SIGNAL(disconnected() ), this, SLOT(outboundDisconnected() ));
  44 +
  45 + connect(&outbound, SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(outboundConnectionError(QLocalSocket::LocalSocketError) ) );
  46 + connect(&outbound, SIGNAL(stateChanged(QLocalSocket::LocalSocketState)), this, SLOT(outboundStateChanged(QLocalSocket::LocalSocketState) ) );
  47 +
  48 + inbound = NULL;
  49 + }
  50 +
  51 + enum SignalType
  52 + {
  53 + INPUT_AVAILABLE,
  54 + OUTPUT_AVAILABLE,
  55 + SHOULD_END
  56 + };
  57 +
  58 +
  59 +public slots:
  60 + // matching server signals
  61 + void receivedConnection()
  62 + {
  63 + inbound = server.nextPendingConnection();
  64 + connect(inbound, SIGNAL(disconnected() ), this, SLOT(inboundDisconnected() ));
  65 + connect(inbound, SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(inboundConnectionError(QLocalSocket::LocalSocketError) ) );
  66 + connect(inbound, SIGNAL(stateChanged(QLocalSocket::LocalSocketState)), this, SLOT(inboundStateChanged(QLocalSocket::LocalSocketState) ) );
  67 +
  68 + receivedWait.wakeAll();
  69 + }
  70 +
  71 + // matching outbound socket signals
  72 +
  73 + // Oh boy.
  74 + void outboundConnected()
  75 + {
  76 + outboundWait.wakeAll();
  77 + }
  78 +
  79 + // Oh no!
  80 + void outboundDisconnected()
  81 + {
  82 + //qDebug() << key << " outbound socket has disconnected";
  83 + }
  84 +
  85 + // informative.
  86 + void outboundConnectionError(QLocalSocket::LocalSocketError socketError)
  87 + {
  88 + //qDebug() << key << " outbound socket error " << socketError;
  89 + }
  90 +
  91 + void outboundStateChanged(QLocalSocket::LocalSocketState socketState)
  92 + {
  93 + //qDebug() << key << " outbound socket state changed to " << socketState;
  94 + }
  95 +
  96 + // matching inbound socket signals
  97 + void inboundDisconnected()
  98 + {
  99 + //qDebug() << key << " inbound socket has disconnected";
  100 + }
  101 +
  102 + void inboundConnectionError(QLocalSocket::LocalSocketError socketError)
  103 + {
  104 + //qDebug() << key << " inbound socket error " << socketError;
  105 + }
  106 +
  107 + void inboundStateChanged(QLocalSocket::LocalSocketState socketState)
  108 + {
  109 + //qDebug() << key << " inbound socket state changed to " << socketState;
  110 + }
  111 +
  112 + void startServerInternal(QString serverName)
  113 + {
  114 + if (!server.isListening()) {
  115 + bool listen_res = server.listen(serverName);
  116 + if (!listen_res)
  117 + qDebug() << key << " Failed to start server at " << serverName;
  118 + }
  119 + }
  120 +
  121 + void sendSignalInternal()
  122 + {
  123 + SignalType signal= sendType;
  124 + qint64 signal_wrote = outbound.write((char *) &signal, sizeof(signal));
  125 + if (signal_wrote != sizeof(signal))
  126 + qDebug() << key << " inconsistent signal size";
  127 +
  128 + bool res = outbound.waitForBytesWritten(-1);
  129 + if (!res)
  130 + qDebug() << key << " failed to wait for bytes written in signal size";
  131 + }
  132 +
  133 + void readSignalInternal()
  134 + {
  135 + while (inbound->bytesAvailable() < sizeof(readSignal) ) {
  136 + bool size_ready = inbound->waitForReadyRead(-1);
  137 + if (!size_ready)
  138 + {
  139 + qDebug("Failed to received object size in signal!");
  140 + }
  141 + }
  142 +
  143 + qint64 signalBytesRead = inbound->read((char *) &readSignal, sizeof(readSignal));
  144 + if (signalBytesRead != sizeof(readSignal))
  145 + qDebug("Inconsistent signal size read!");
  146 +
  147 + if (readSignal == SHOULD_END)
  148 + {
  149 + server.close();
  150 + outbound.abort();
  151 + inbound->abort();
  152 + }
  153 + }
  154 +
  155 + void sendSerializedInternal()
  156 + {
  157 + qint64 serializedSize = writeArray.size();
  158 + qint64 size_wrote = outbound.write((char *) &serializedSize, sizeof(serializedSize));
  159 + if (size_wrote != sizeof(serializedSize)) {
  160 + qDebug() << key << "inconsistent size sent in send data!";
  161 + return;
  162 + }
  163 + bool res = outbound.waitForBytesWritten(-1);
  164 +
  165 + if (!res) {
  166 + qDebug() << key << " wait for bytes failed!";
  167 + return;
  168 + }
  169 +
  170 + qint64 data_wrote = outbound.write(writeArray.data(), serializedSize);
  171 + if (data_wrote != serializedSize)
  172 + qDebug() << key << " inconsistent data written!";
  173 +
  174 + while (outbound.bytesToWrite() > 0) {
  175 + bool write_res = outbound.waitForBytesWritten(-1);
  176 + if (!write_res) {
  177 + qDebug() << key << " wait for bytes failed!";
  178 + return;
  179 + }
  180 + }
  181 +
  182 + return;
  183 +
  184 + }
  185 +
  186 + void readSerializedInternal()
  187 + {
  188 + qint64 bufferSize;
  189 + while (inbound->bytesAvailable() < qint64(sizeof(bufferSize))) {
  190 + bool size_ready = inbound->waitForReadyRead(-1);
  191 + if (!size_ready)
  192 + {
  193 + qDebug() << key << " Failed to received object size in read data!";
  194 + qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString();
  195 + return;
  196 + }
  197 + }
  198 + qint64 sizeBytesRead = inbound->read((char *) &bufferSize, sizeof(bufferSize));
  199 + if (sizeBytesRead != sizeof(bufferSize)) {
  200 + qDebug("failed to read size of buffer!");
  201 + return;
  202 + }
  203 +
  204 + // allocate the input buffer
  205 + readArray.resize(bufferSize);
  206 +
  207 + // read the data, we may get it in serveral bursts
  208 + qint64 arrayPosition = 0;
  209 + while (arrayPosition < bufferSize) {
  210 + if (!inbound->bytesAvailable()) {
  211 + bool ready_res = inbound->waitForReadyRead(-1);
  212 +
  213 + if (!ready_res) {
  214 + qDebug() << key << "failed to wait for data!";
  215 + return;
  216 + }
  217 + }
  218 +
  219 + // how many bytes do we still need?
  220 + qint64 bytes_remaining = bufferSize - arrayPosition;
  221 +
  222 + if (bytes_remaining < inbound->bytesAvailable() )
  223 + {
  224 + qDebug() << key << "!!!excessive bytes received";
  225 + }
  226 + arrayPosition += inbound->read(readArray.data()+arrayPosition, qMin(inbound->bytesAvailable(), bytes_remaining));
  227 + }
  228 + if (arrayPosition != bufferSize)
  229 + qDebug() << key << "Read wrong size object!";
  230 +
  231 + }
  232 +
  233 + void shutdownInternal()
  234 + {
  235 + outbound.abort();
  236 + inbound->abort();
  237 + server.close();
  238 + }
  239 +
  240 +signals:
  241 + void pulseStartServer(QString serverName);
  242 + void pulseSignal();
  243 + void pulseReadSignal();
  244 + void pulseReadSerialized();
  245 + void pulseSendSerialized();
  246 + void pulseShutdown();
  247 +
  248 +
  249 +public:
  250 + QByteArray readArray;
  251 + QByteArray writeArray;
  252 +
  253 + SignalType readSignal;
  254 + QMutex receivedLock;
  255 + QWaitCondition receivedWait;
  256 +
  257 + QMutex outboundLock;
  258 + QWaitCondition outboundWait;
  259 +
  260 + QString key;
  261 + QString serverName;
  262 + QString remoteName;
  263 +
  264 + QLocalSocket * inbound;
  265 + QLocalSocket outbound;
  266 + QLocalServer server;
  267 +
  268 +
  269 + void waitForInbound()
  270 + {
  271 + QMutexLocker locker(&receivedLock);
  272 + while (!inbound || inbound->state() != QLocalSocket::ConnectedState) {
  273 + bool res = receivedWait.wait(&receivedLock,30*1000);
  274 + if (!res)
  275 + {
  276 + qDebug() << key << " " << QThread::currentThread() << " waiting timed out, server thread is " << server.thread() << " application thread " << QCoreApplication::instance()->thread();
  277 + }
  278 + }
  279 + }
  280 +
  281 + void connectToRemote(const QString & remoteName)
  282 + {
  283 + outbound.connectToServer(remoteName);
  284 +
  285 + QMutexLocker locker(&outboundLock);
  286 + while (outbound.state() != QLocalSocket::ConnectedState) {
  287 + outboundWait.wait(&outboundLock,30*1000);
  288 + }
  289 + }
  290 +
  291 + SignalType getSignal()
  292 + {
  293 + emit pulseReadSignal();
  294 + return readSignal;
  295 + }
  296 +
  297 +
  298 + template<typename T>
  299 + bool readData(T & input)
  300 + {
  301 + emit pulseReadSerialized();
  302 + QDataStream deserializer(readArray);
  303 + deserializer >> input;
  304 + return true;
  305 + }
  306 +
  307 + template<typename T>
  308 + bool sendData(const T & output)
  309 + {
  310 + QBuffer buffer;
  311 + buffer.open(QBuffer::ReadWrite);
  312 +
  313 + QDataStream serializer(&buffer);
  314 + serializer << output;
  315 + writeArray = buffer.data();
  316 + emit pulseSendSerialized();
  317 + return true;
  318 + }
  319 +
  320 + SignalType sendType;
  321 + void sendSignal(SignalType signal)
  322 + {
  323 + sendType = signal;
  324 + if (QThread::currentThread() == this->thread() )
  325 + this->sendSignalInternal();
  326 + else
  327 + emit pulseSignal();
  328 + }
  329 +
  330 + void startServer(QString server)
  331 + {
  332 + emit pulseStartServer(server);
  333 + }
  334 +
  335 + void shutdown()
  336 + {
  337 + emit pulseShutdown();
  338 + }
  339 +
  340 +
24 341 };
25 342  
26   -class EnrollmentWorker
  343 +class EnrollmentWorker : public QObject
27 344 {
  345 + Q_OBJECT
28 346 public:
29   - QLocalServer inbound;
30   - QLocalSocket outbound;
31   - QLocalSocket * receiver;
  347 + CommunicationManager * comm;
  348 + QString name;
32 349  
33 350 ~EnrollmentWorker()
34 351 {
35 352 delete transform;
  353 + delete comm;
36 354 }
37 355  
38 356 br::Transform * transform;
39 357  
  358 +public:
40 359 void connections(const QString & baseName)
41 360 {
42   - inbound.listen(baseName+"_worker");
43   - outbound.connectToServer(baseName+"_master");
44   - inbound.waitForNewConnection(-1);
45   - receiver = inbound.nextPendingConnection();
46   - outbound.waitForConnected(-1);
  361 + comm = new CommunicationManager();
  362 + name = baseName;
  363 + comm->key = "worker_"+baseName.mid(1,5);
  364 + comm->startServer(baseName+"_worker");
  365 + comm->connectToRemote(baseName+"_master");
  366 +
  367 + comm->waitForInbound();
47 368 }
48 369  
49 370 void workerLoop()
50 371 {
51   - SignalType signal;
52   -
  372 + QString sign = "worker " + name;
  373 + CommunicationManager::SignalType signal;
53 374 forever
54 375 {
55   - while (receiver->bytesAvailable() < qint64(sizeof(signal))) {
56   - receiver->waitForReadyRead(-1);
57   - }
58   - receiver->read((char *) &signal, sizeof(signal));
  376 + signal= comm->getSignal();
59 377  
60   - if (signal == SHOULD_END) {
61   - outbound.close();
62   - inbound.close();
  378 + if (signal == CommunicationManager::SHOULD_END) {
63 379 break;
64 380 }
65   -
66   - qint64 inBufferSize;
67   - while (receiver->bytesAvailable() < qint64(sizeof(inBufferSize))) {
68   - receiver->waitForReadyRead(-1);
69   - }
70   - receiver->read((char *) &inBufferSize, sizeof(inBufferSize));
71   -
72   - QByteArray inArray(inBufferSize,'0');
73   -
74   - qint64 arrayPosition = 0;
75   - while (arrayPosition < inBufferSize) {
76   - if (!receiver->bytesAvailable())
77   - receiver->waitForReadyRead(-1);
78   - arrayPosition += receiver->read(inArray.data()+arrayPosition, receiver->bytesAvailable());
79   - }
80   -
81 381 TemplateList inList;
82 382 TemplateList outList;
83   - // deserialize the template list
84   - QDataStream deserializer(inArray);
85   - deserializer >> inList;
86 383  
87   - // and project it
  384 + comm->readData(inList);
88 385 transform->projectUpdate(inList,outList);
89   -
90   - // serialize the output list
91   - QBuffer outBuff;
92   - outBuff.open(QBuffer::ReadWrite);
93   - QDataStream serializer(&outBuff);
94   - serializer << outList;
95   -
96   - // send the size of the buffer
97   - //qint64 bufferSize = outBuff.size();
98   - qint64 bufferSize = outBuff.data().size();
99   - outbound.write((char *) &bufferSize, sizeof(bufferSize));
100   -
101   - outbound.write(outBuff.data().data(), bufferSize);
102   - while (outbound.bytesToWrite() > 0) {
103   - outbound.waitForBytesWritten(-1);
104   - }
  386 + comm->sendData(outList);
105 387 }
  388 + comm->shutdown();
106 389 }
107 390 };
108 391  
... ... @@ -115,6 +398,15 @@ void WorkerProcess::mainLoop()
115 398 delete processInterface;
116 399 }
117 400  
  401 +void shutUp(QtMsgType type, const QMessageLogContext &context, const QString &msg)
  402 +{
  403 + // Qt you have no idea how much I don't care about you.
  404 + // Please tell me more about how you want every single god damn thing to be created from and used by exactly one thread.
  405 + // It does not matter, so shut up already.
  406 + // p.s. I hope you die.
  407 +}
  408 +
  409 +
118 410 /*!
119 411 * \ingroup transforms
120 412 * \brief Interface to a separate process
... ... @@ -128,53 +420,22 @@ class ProcessWrapperTransform : public TimeVaryingTransform
128 420 BR_PROPERTY(QString, transform, "")
129 421  
130 422 QString baseKey;
131   - QProcess workerProcess;
132 423  
133   - QLocalServer inbound;
134   - QLocalSocket outbound;
135   - QLocalSocket * receiver;
  424 + QProcess * workerProcess;
  425 + CommunicationManager * comm;
136 426  
137 427 void projectUpdate(const TemplateList &src, TemplateList &dst)
138 428 {
  429 +
139 430 if (!processActive)
140 431 {
141 432 activateProcess();
142 433 }
  434 + comm->sendSignal(CommunicationManager::INPUT_AVAILABLE);
143 435  
144   - SignalType signal = INPUT_AVAILABLE;
145   - outbound.write((char *) &signal, sizeof(SignalType));
146   -
147   - QBuffer inBuffer;
148   - inBuffer.open(QBuffer::ReadWrite);
149   - QDataStream serializer(&inBuffer);
150   - serializer << src;
  436 + comm->sendData(src);
151 437  
152   - qint64 in_size = inBuffer.size();
153   - outbound.write((char *) &in_size, sizeof(in_size));
154   -
155   - outbound.write(inBuffer.data(), in_size);
156   -
157   - while (outbound.bytesToWrite() > 0) {
158   - outbound.waitForBytesWritten(-1);
159   - }
160   -
161   - qint64 out_size;
162   -
163   - // read the size
164   - receiver->waitForReadyRead(-1);
165   - receiver->read((char *) &out_size, sizeof(out_size));
166   - QByteArray outBuffer(out_size,'0');
167   -
168   - // read the (serialized) output templatelist
169   - qint64 arrayPosition = 0;
170   - while (arrayPosition < out_size) {
171   - if (!receiver->bytesAvailable())
172   - receiver->waitForReadyRead(-1);
173   - arrayPosition += receiver->read(outBuffer.data()+arrayPosition, receiver->bytesAvailable());
174   - }
175   - // and deserialize it.
176   - QDataStream deserialize(outBuffer);
177   - deserialize >> dst;
  438 + comm->readData(dst);
178 439 }
179 440  
180 441  
... ... @@ -191,6 +452,7 @@ class ProcessWrapperTransform : public TimeVaryingTransform
191 452  
192 453 void activateProcess()
193 454 {
  455 + comm = new CommunicationManager();
194 456 processActive = true;
195 457  
196 458 // generate a uuid for our local servers
... ... @@ -209,20 +471,17 @@ class ProcessWrapperTransform : public TimeVaryingTransform
209 471 argumentList.append("-slave");
210 472 argumentList.append(baseKey);
211 473  
212   - // start listening
213   - inbound.listen(baseKey+"_master");
  474 + comm->key = "master_"+baseKey.mid(1,5);
214 475  
215   - workerProcess.setProcessChannelMode(QProcess::ForwardedChannels);
216   - workerProcess.start("br", argumentList);
217   - workerProcess.waitForStarted(-1);
  476 + comm->startServer(baseKey+"_master");
218 477  
219   - // blocking wait for the connection from the worker process
220   - inbound.waitForNewConnection(-1);
221   - receiver = inbound.nextPendingConnection();
  478 + workerProcess = new QProcess();
  479 + workerProcess->setProcessChannelMode(QProcess::ForwardedChannels);
  480 + workerProcess->start("br", argumentList);
  481 + workerProcess->waitForStarted(-1);
222 482  
223   - // Now, create our connection to the worker process.
224   - outbound.connectToServer(baseKey+"_worker");
225   - outbound.waitForConnected(-1);
  483 + comm->waitForInbound();
  484 + comm->connectToRemote(baseKey+"_worker");
226 485 }
227 486  
228 487 bool timeVarying() const {
... ... @@ -233,15 +492,19 @@ class ProcessWrapperTransform : public TimeVaryingTransform
233 492 {
234 493 // end the process
235 494 if (this->processActive) {
  495 + comm->sendSignal(CommunicationManager::SHOULD_END);
236 496  
237   - SignalType signal = SHOULD_END;
238   - outbound.write((char *) &signal, sizeof(SignalType));
239   - outbound.waitForBytesWritten(-1);
240   - outbound.close();
  497 + // I don't even want to talk about it.
  498 + qInstallMessageHandler(shutUp);
  499 + workerProcess->waitForFinished(-1);
  500 + delete workerProcess;
  501 + qInstallMessageHandler(0);
241 502  
242   - workerProcess.waitForFinished(-1);
243   - inbound.close();
244 503 processActive = false;
  504 + comm->inbound->abort();
  505 + comm->outbound.abort();
  506 + comm->server.close();
  507 + delete comm;
245 508 }
246 509 }
247 510  
... ...