Commit c9ad1390d2d6078c22bdcc9cf0ae58eef7aa3fcf

Authored by Josh Klontz
2 parents dd58874a a3434aad

Merge branch 'master' of https://github.com/biometrics/openbr

Showing 1 changed file with 70 additions and 46 deletions
openbr/plugins/process.cpp
... ... @@ -21,17 +21,18 @@ class CommunicationManager : public QObject
21 21 {
22 22 Q_OBJECT
23 23 public:
  24 + int timeout_ms;
24 25 QThread *basis;
25 26 CommunicationManager()
26 27 {
  28 + timeout_ms = 30000;
  29 +
27 30 basis = new QThread;
28   - basis->moveToThread(QCoreApplication::instance()->thread());
29 31 moveToThread(basis);
30 32 server.moveToThread(basis);
31 33 outbound.moveToThread(basis);
32 34  
33 35 // signals for our sever
34   - connect(this, SIGNAL(pulseEndThread()), basis, SLOT(quit() ));
35 36 connect(&server, SIGNAL(newConnection()), this, SLOT(receivedConnection() ));
36 37 connect(this, SIGNAL(pulseStartServer(QString)), this, SLOT(startServerInternal(QString)), Qt::BlockingQueuedConnection);
37 38 connect(this, SIGNAL(pulseOutboundConnect(QString)), this, SLOT(startConnectInternal(QString) ), Qt::BlockingQueuedConnection);
... ... @@ -62,7 +63,6 @@ public:
62 63  
63 64 void shutDownThread()
64 65 {
65   - emit pulseEndThread();
66 66 basis->quit();
67 67 basis->wait();
68 68 delete basis;
... ... @@ -144,23 +144,34 @@ public slots:
144 144  
145 145 void sendSignalInternal()
146 146 {
147   - SignalType signal= sendType;
  147 + SignalType signal = sendType;
148 148 qint64 signal_wrote = outbound.write((char *) &signal, sizeof(signal));
  149 +
149 150 if (signal_wrote != sizeof(signal))
150 151 qDebug() << key << " inconsistent signal size";
151 152  
152   - bool res = outbound.waitForBytesWritten(-1);
153   - if (!res)
154   - qDebug() << key << " failed to wait for bytes written in signal size";
  153 + while (outbound.bytesToWrite() > 0) {
  154 + bool written = outbound.waitForBytesWritten(timeout_ms);
  155 +
  156 + if (!written && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) {
  157 + qDebug() << key << " failed to wait for bytes written in signal size";
  158 + return;
  159 + }
  160 + }
155 161 }
156 162  
157 163 void readSignalInternal()
158 164 {
159 165 while (inbound->bytesAvailable() < qint64(sizeof(readSignal)) ) {
160   - bool size_ready = inbound->waitForReadyRead(-1);
161   - if (!size_ready)
162   - {
163   - qDebug("Failed to received object size in signal!");
  166 + bool size_ready = inbound->waitForReadyRead(timeout_ms);
  167 +
  168 + // wait timed out, now what?
  169 + if (!size_ready && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) {
  170 + readSignal = SHOULD_END;
  171 + server.close();
  172 + outbound.abort();
  173 + inbound->abort();
  174 + return;
164 175 }
165 176 }
166 177  
... ... @@ -184,11 +195,14 @@ public slots:
184 195 qDebug() << key << "inconsistent size sent in send data!";
185 196 return;
186 197 }
187   - bool res = outbound.waitForBytesWritten(-1);
  198 +
  199 + while (outbound.bytesToWrite() > 0) {
  200 + bool written = outbound.waitForBytesWritten(timeout_ms);
188 201  
189   - if (!res) {
190   - qDebug() << key << " wait for bytes failed!";
191   - return;
  202 + if (!written && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) {
  203 + qDebug() << key << " wait for bytes failed!";
  204 + return;
  205 + }
192 206 }
193 207  
194 208 qint64 data_wrote = outbound.write(writeArray.data(), serializedSize);
... ... @@ -196,8 +210,8 @@ public slots:
196 210 qDebug() << key << " inconsistent data written!";
197 211  
198 212 while (outbound.bytesToWrite() > 0) {
199   - bool write_res = outbound.waitForBytesWritten(-1);
200   - if (!write_res) {
  213 + bool write_res = outbound.waitForBytesWritten();
  214 + if (!write_res && (!outbound.isValid() || outbound.state() == QLocalSocket::UnconnectedState)) {
201 215 qDebug() << key << " wait for bytes failed!";
202 216 return;
203 217 }
... ... @@ -211,14 +225,14 @@ public slots:
211 225 {
212 226 qint64 bufferSize;
213 227 while (inbound->bytesAvailable() < qint64(sizeof(bufferSize))) {
214   - bool size_ready = inbound->waitForReadyRead(-1);
215   - if (!size_ready)
216   - {
217   - qDebug() << key << " Failed to received object size in read data!";
218   - qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString();
219   - return;
  228 + bool size_ready = inbound->waitForReadyRead(timeout_ms);
  229 + if (!size_ready && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) {
  230 + qDebug() << key << " Failed to received object size in read data!";
  231 + qDebug() << key << "inbound status: " << inbound->state() << " error: " << inbound->errorString();
  232 + return;
220 233 }
221 234 }
  235 +
222 236 qint64 sizeBytesRead = inbound->read((char *) &bufferSize, sizeof(bufferSize));
223 237 if (sizeBytesRead != sizeof(bufferSize)) {
224 238 qDebug("failed to read size of buffer!");
... ... @@ -231,10 +245,10 @@ public slots:
231 245 // read the data, we may get it in serveral bursts
232 246 qint64 arrayPosition = 0;
233 247 while (arrayPosition < bufferSize) {
234   - if (!inbound->bytesAvailable()) {
235   - bool ready_res = inbound->waitForReadyRead(-1);
  248 + while (!inbound->bytesAvailable()) {
  249 + bool ready_res = inbound->waitForReadyRead(timeout_ms);
236 250  
237   - if (!ready_res) {
  251 + if (!ready_res && (!inbound->isValid() || inbound->state() == QLocalSocket::UnconnectedState)) {
238 252 qDebug() << key << "failed to wait for data!";
239 253 return;
240 254 }
... ... @@ -251,7 +265,6 @@ public slots:
251 265 }
252 266 if (arrayPosition != bufferSize)
253 267 qDebug() << key << "Read wrong size object!";
254   -
255 268 }
256 269  
257 270 void shutdownInternal()
... ... @@ -276,8 +289,6 @@ signals:
276 289 void pulseSendSerialized();
277 290 void pulseShutdown();
278 291 void pulseOutboundConnect(QString serverName);
279   - void pulseEndThread();
280   -
281 292  
282 293 public:
283 294 QByteArray readArray;
... ... @@ -417,8 +428,7 @@ public:
417 428 {
418 429 QString sign = "worker " + name;
419 430 CommunicationManager::SignalType signal;
420   - forever
421   - {
  431 + forever {
422 432 signal= comm->getSignal();
423 433  
424 434 if (signal == CommunicationManager::SHOULD_END) {
... ... @@ -448,28 +458,39 @@ class ProcessInterface : public QObject
448 458 {
449 459 Q_OBJECT
450 460 public:
  461 + QThread *basis;
  462 +
  463 + ~ProcessInterface()
  464 + {
  465 + basis->quit();
  466 + basis->wait();
  467 + delete basis;
  468 + }
  469 +
451 470 ProcessInterface()
452 471 {
453   - this->moveToThread(QCoreApplication::instance()->thread());
454   - workerProcess.moveToThread(QCoreApplication::instance()->thread());
  472 + basis = new QThread();
  473 +
  474 + moveToThread(basis);
  475 + workerProcess.moveToThread(basis);
  476 +
455 477 connect(this, SIGNAL(pulseEnd()), this, SLOT(endProcessInternal()), Qt::BlockingQueuedConnection);
456 478 connect(this, SIGNAL(pulseStart(QStringList)), this, SLOT(startProcessInternal(QStringList)), Qt::BlockingQueuedConnection);
  479 +
  480 + basis->start();
457 481 }
  482 +
458 483 QProcess workerProcess;
459 484 void endProcess()
460 485 {
461   - if (QThread::currentThread() != QCoreApplication::instance()->thread())
462   - emit pulseEnd();
463   - else
464   - endProcessInternal();
  486 + emit pulseEnd();
465 487 }
  488 +
466 489 void startProcess(QStringList arguments)
467 490 {
468   - if (QThread::currentThread() != QCoreApplication::instance()->thread() )
469   - emit pulseStart(arguments);
470   - else
471   - startProcessInternal(arguments);
  491 + emit pulseStart(arguments);
472 492 }
  493 +
473 494 signals:
474 495 void pulseEnd();
475 496 void pulseStart(QStringList);
... ... @@ -500,10 +521,12 @@ struct ProcessData
500 521  
501 522 ~ProcessData()
502 523 {
503   - comm.sendSignal(CommunicationManager::SHOULD_END);
504   - proc.endProcess();
505   - comm.shutdown();
506   - comm.shutDownThread();
  524 + if (initialized) {
  525 + comm.sendSignal(CommunicationManager::SHOULD_END);
  526 + proc.endProcess();
  527 + comm.shutdown();
  528 + comm.shutDownThread();
  529 + }
507 530 }
508 531 };
509 532  
... ... @@ -541,9 +564,10 @@ class ProcessWrapperTransform : public WrapperTransform
541 564 activateProcess(data);
542 565  
543 566 CommunicationManager *localComm = &(data->comm);
544   - localComm->sendSignal(CommunicationManager::INPUT_AVAILABLE);
545 567  
  568 + localComm->sendSignal(CommunicationManager::INPUT_AVAILABLE);
546 569 localComm->sendData(src);
  570 +
547 571 localComm->readData(dst);
548 572 processes.release(data);
549 573 }
... ...