Commit b2eb42a0b959c5abbbc1af7400adfd0bd67983e7

Authored by Charles Otto
1 parent e11b68d6

Some long overdue cleanup/reorganization in stream.cpp

Merge Datasource/DataSourceManager, leaving DataSource as the primary interface
for opening a template list. Introduce an alternate hieararchy for things that
are done to individual templates.

Rename FirstStage->ReadStage. Drop LastStage as a unique class, it can be
represented as a single threaded stage with a transform that just collects the
templates it receives.

Introduce a separate class contaiing the stream mode enum. This avoids a lot of
circular dependencies, giving us a more sane file layout.
Showing 1 changed file with 536 additions and 692 deletions
openbr/plugins/stream.cpp
... ... @@ -16,6 +16,16 @@ using namespace cv;
16 16 namespace br
17 17 {
18 18  
  19 +class Idiocy : public QObject
  20 +{
  21 + Q_OBJECT
  22 +public:
  23 + enum StreamModes { StreamVideo,
  24 + DistributeFrames,
  25 + Auto};
  26 +
  27 + Q_ENUMS(StreamModes)
  28 +};
19 29  
20 30 class FrameData
21 31 {
... ... @@ -181,10 +191,138 @@ private:
181 191 QList<FrameData *> buffer2;
182 192 };
183 193  
  194 +// Given a template as input, return N templates as output, one at a time on subsequent
  195 +// calls to getNext
  196 +class TemplateProcessor
  197 +{
  198 +public:
  199 + virtual ~TemplateProcessor() {}
  200 + virtual bool open(Template & input)=0;
  201 + virtual bool isOpen()=0;
  202 + virtual void close()=0;
  203 + virtual bool getNextTemplate(Template & output)=0;
  204 +protected:
  205 + Template basis;
  206 +};
  207 +
  208 +static QMutex openLock;
  209 +
  210 +// Read a video frame by frame using cv::VideoCapture
  211 +class VideoReader : public TemplateProcessor
  212 +{
  213 +public:
  214 + VideoReader() {}
  215 +
  216 + bool open(Template &input)
  217 + {
  218 + basis = input;
  219 +
  220 + // We can open either files (well actually this includes addresses of ip cameras
  221 + // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate
  222 + // overload of open that takes an integer, not a string.
  223 + // So, does this look like an integer?
  224 + bool is_int = false;
  225 + int anInt = input.file.name.toInt(&is_int);
  226 + if (is_int)
  227 + {
  228 + bool rc = video.open(anInt);
  229 +
  230 + if (!rc)
  231 + {
  232 + qDebug("open failed!");
  233 + }
  234 + if (!video.isOpened())
  235 + {
  236 + qDebug("Video not open!");
  237 + }
  238 + } else {
  239 + // Yes, we should specify absolute path:
  240 + // http://stackoverflow.com/questions/9396459/loading-a-video-in-opencv-in-python
  241 + QString fileName = (Globals->path.isEmpty() ? "" : Globals->path + "/") + input.file.name;
  242 + // On windows, this appears to not be thread-safe
  243 + QMutexLocker lock(&openLock);
  244 + video.open(QFileInfo(fileName).absoluteFilePath().toStdString());
  245 + }
  246 +
  247 + return video.isOpened();
  248 + }
  249 +
  250 + bool isOpen() { return video.isOpened(); }
  251 +
  252 + void close() { video.release(); }
  253 +
  254 + bool getNextTemplate(Template & output)
  255 + {
  256 + if (!isOpen()) {
  257 + qDebug("video source is not open");
  258 + return false;
  259 + }
  260 + output.file = basis.file;
  261 + output.m() = cv::Mat();
  262 +
  263 + cv::Mat temp;
  264 + bool res = video.read(temp);
  265 +
  266 + if (!res) {
  267 + // The video capture broke, return false.
  268 + output.m() = cv::Mat();
  269 + close();
  270 + return false;
  271 + }
  272 +
  273 + // This clone is critical, if we don't do it then the matrix will
  274 + // be an alias of an internal buffer of the video source, leading
  275 + // to various problems later.
  276 + output.m() = temp.clone();
  277 + return true;
  278 + }
  279 +protected:
  280 + cv::VideoCapture video;
  281 +};
  282 +
  283 +
  284 +class DirectReturn : public TemplateProcessor
  285 +{
  286 +public:
  287 + DirectReturn()
  288 + {
  289 + data_ok = false;
  290 + }
  291 +
  292 + // We don't do anything, just prepare to return input when getNext is called.
  293 + bool open(Template &input)
  294 + {
  295 + basis = input;
  296 + data_ok =true;
  297 + return data_ok;
  298 + }
  299 +
  300 + bool isOpen() { return data_ok; }
  301 +
  302 + void close()
  303 + {
  304 + data_ok = false;
  305 + basis.clear();
  306 + }
  307 +
  308 + bool getNextTemplate(Template & output)
  309 + {
  310 + if (!data_ok)
  311 + return false;
  312 + output = basis;
  313 + data_ok = false;
  314 + return true;
  315 + }
  316 +
  317 +protected:
  318 + // Have we sent our template yet?
  319 + bool data_ok;
  320 +};
  321 +
184 322  
185 323 // Interface for sequentially getting data from some data source.
186   -// Initialized off of a template, can represent a video file (stored in the template's filename)
187   -// or a set of images already loaded into memory stored as multiple matrices in an input template.
  324 +// Given a TemplateList, return single template frames sequentially by applying a TemplateProcessor
  325 +// to each individual template.
188 326 class DataSource
189 327 {
190 328 public:
... ... @@ -196,6 +334,7 @@ public:
196 334 {
197 335 allFrames.addItem(new FrameData());
198 336 }
  337 + frameSource = NULL;
199 338 }
200 339  
201 340 virtual ~DataSource()
... ... @@ -209,6 +348,71 @@ public:
209 348 }
210 349 }
211 350  
  351 + void close()
  352 + {
  353 + if (this->frameSource)
  354 + {
  355 + frameSource->close();
  356 + delete frameSource;
  357 + frameSource = NULL;
  358 + }
  359 + }
  360 +
  361 + int size()
  362 + {
  363 + return this->templates.size();
  364 + }
  365 +
  366 + bool open(TemplateList & input, br::Idiocy::StreamModes _mode)
  367 + {
  368 + // Set up variables specific to us
  369 + current_template_idx = 0;
  370 + templates = input;
  371 + mode = _mode;
  372 +
  373 + is_broken = false;
  374 + allReturned = false;
  375 +
  376 + // The last frame isn't initialized yet
  377 + final_frame = -1;
  378 + // Start our sequence numbers from the input index
  379 + next_sequence_number = 0;
  380 +
  381 + // Actually open the data source
  382 + bool open_res = openNextTemplate();
  383 +
  384 + // We couldn't open the data source
  385 + if (!open_res) {
  386 + is_broken = true;
  387 + return false;
  388 + }
  389 +
  390 + // Try to get a frame from the global pool
  391 + FrameData * firstFrame = allFrames.tryGetItem();
  392 +
  393 + // If this fails, things have gone pretty badly.
  394 + if (firstFrame == NULL) {
  395 + is_broken = true;
  396 + return false;
  397 + }
  398 +
  399 + // Read a frame from the video source
  400 + bool res = getNextFrame(*firstFrame);
  401 +
  402 + // the data source broke already, we couldn't even get one frame
  403 + // from it even though it claimed to have opened successfully.
  404 + if (!res) {
  405 + is_broken = true;
  406 + return false;
  407 + }
  408 +
  409 + // We read one frame ahead of the last one returned, this allows
  410 + // us to know which frame is the final frame when we return it.
  411 + lookAhead.append(firstFrame);
  412 + return true;
  413 + }
  414 +
  415 +
212 416 // non-blocking version of getFrame
213 417 // Returns a NULL FrameData if too many frames are out, or the
214 418 // data source is broken. Sets last_frame to true iff the FrameData
... ... @@ -228,7 +432,7 @@ public:
228 432 return NULL;
229 433  
230 434 // Try to actually read a frame, if this returns false the data source is broken
231   - bool res = getNext(*aFrame);
  435 + bool res = getNextFrame(*aFrame);
232 436  
233 437 // The datasource broke, update final_frame
234 438 if (!res)
... ... @@ -291,282 +495,119 @@ public:
291 495 return true;
292 496 }
293 497  
294   - bool open(Template & output, int start_index = 0)
  498 +protected:
  499 +
  500 + bool openNextTemplate()
295 501 {
296   - is_broken = false;
297   - allReturned = false;
  502 + if (this->current_template_idx >= this->templates.size())
  503 + return false;
298 504  
299   - // The last frame isn't initialized yet
300   - final_frame = -1;
301   - // Start our sequence numbers from the input index
302   - next_sequence_number = start_index;
  505 + bool open_res = false;
  506 + while (!open_res)
  507 + {
  508 + if (frameSource)
  509 + frameSource->close();
303 510  
304   - // Actually open the data source
305   - bool open_res = concreteOpen(output);
  511 + if (mode == br::Idiocy::Auto)
  512 + {
  513 + delete frameSource;
  514 + if (this->templates[this->current_template_idx].empty())
  515 + frameSource = new VideoReader();
  516 + else
  517 + frameSource = new DirectReturn();
  518 + }
  519 + else if (mode == br::Idiocy::DistributeFrames)
  520 + {
  521 + if (!frameSource)
  522 + frameSource = new DirectReturn();
  523 + }
  524 + else if (mode == br::Idiocy::StreamVideo)
  525 + {
  526 + if (!frameSource)
  527 + frameSource = new VideoReader();
  528 + }
306 529  
307   - // We couldn't open the data source
308   - if (!open_res) {
309   - is_broken = true;
310   - return false;
  530 + open_res = frameSource->open(this->templates[current_template_idx]);
  531 + if (!open_res)
  532 + {
  533 + current_template_idx++;
  534 + if (current_template_idx >= this->templates.size())
  535 + return false;
  536 + }
311 537 }
  538 + return true;
  539 + }
312 540  
313   - // Try to get a frame from the global pool
314   - FrameData * firstFrame = allFrames.tryGetItem();
  541 + bool getNextFrame(FrameData & output)
  542 + {
  543 + bool got_frame = false;
315 544  
316   - // If this fails, things have gone pretty badly.
317   - if (firstFrame == NULL) {
318   - is_broken = true;
319   - return false;
320   - }
  545 + Template aTemplate;
321 546  
322   - // Read a frame from the video source
323   - bool res = getNext(*firstFrame);
  547 + while (!got_frame)
  548 + {
  549 + got_frame = frameSource->getNextTemplate(aTemplate);
  550 +
  551 + // OK we got a frame
  552 + if (got_frame) {
  553 + // set the sequence number and tempalte of this frame
  554 + output.sequenceNumber = next_sequence_number;
  555 + output.data.append(aTemplate);
  556 + // set the frame number in the template's metadata
  557 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  558 + next_sequence_number++;
  559 + return true;
  560 + }
324 561  
325   - // the data source broke already, we couldn't even get one frame
326   - // from it even though it claimed to have opened successfully.
327   - if (!res) {
328   - is_broken = true;
329   - return false;
  562 + // advance to the next tempalte in our list
  563 + this->current_template_idx++;
  564 + bool open_res = this->openNextTemplate();
  565 +
  566 + // couldn't get the next template? nothing to do, otherwise we try to read
  567 + // a frame at the top of this loop.
  568 + if (!open_res) {
  569 + return false;
  570 + }
330 571 }
331 572  
332   - // We read one frame ahead of the last one returned, this allows
333   - // us to know which frame is the final frame when we return it.
334   - lookAhead.append(firstFrame);
335   - return true;
  573 + return false;
336 574 }
337 575  
338   - /*
339   - * Pure virtual methods
340   - */
  576 + // Index of the template in the templatelist we are currently reading from
  577 + int current_template_idx;
341 578  
342   - // isOpen doesn't appear to particularly work when used on opencv
343   - // VideoCaptures, so we don't use it for anything important.
344   - virtual bool isOpen()=0;
345   - // Called from open, open the data source specified by the input
346   - // template, don't worry about setting any of the state variables
347   - // set in open.
348   - virtual bool concreteOpen(Template & output) = 0;
349   - // Get the next frame from the data source, store the results in
350   - // FrameData (including the actual frame and appropriate sequence
351   - // number).
352   - virtual bool getNext(FrameData & input) = 0;
353   - // close the currently open data source.
354   - virtual void close() = 0;
  579 + // What do we do to each template
  580 + br::Idiocy::StreamModes mode;
  581 +
  582 + // list of templates we are workign from
  583 + TemplateList templates;
  584 +
  585 + // processor for the current template
  586 + TemplateProcessor * frameSource;
355 587  
356 588 int next_sequence_number;
357   -protected:
358   - DoubleBuffer allFrames;
359 589 int final_frame;
360 590 bool is_broken;
361 591 bool allReturned;
  592 +
  593 + DoubleBuffer allFrames;
362 594 QList<FrameData *> lookAhead;
363 595  
364 596 QWaitCondition lastReturned;
365 597 QMutex last_frame_update;
366 598 };
367 599  
368   -static QMutex openLock;
369   -// Read a video frame by frame using cv::VideoCapture
370   -class VideoDataSource : public DataSource
  600 +class ProcessingStage;
  601 +
  602 +class BasicLoop : public QRunnable, public QFutureInterface<void>
371 603 {
372 604 public:
373   - VideoDataSource(int maxFrames) : DataSource(maxFrames) {}
374   -
375   - bool concreteOpen(Template &input)
  605 + BasicLoop()
376 606 {
377   - basis = input;
  607 + this->reportStarted();
  608 + }
378 609  
379   - // We can open either files (well actually this includes addresses of ip cameras
380   - // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate
381   - // overload of open that takes an integer, not a string.
382   - // So, does this look like an integer?
383   - bool is_int = false;
384   - int anInt = input.file.name.toInt(&is_int);
385   - if (is_int)
386   - {
387   - bool rc = video.open(anInt);
388   -
389   - if (!rc)
390   - {
391   - qDebug("open failed!");
392   - }
393   - if (!video.isOpened())
394   - {
395   - qDebug("Video not open!");
396   - }
397   - } else {
398   - // Yes, we should specify absolute path:
399   - // http://stackoverflow.com/questions/9396459/loading-a-video-in-opencv-in-python
400   - QString fileName = (Globals->path.isEmpty() ? "" : Globals->path + "/") + input.file.name;
401   - // On windows, this appears to not be thread-safe
402   - QMutexLocker lock(&openLock);
403   - video.open(QFileInfo(fileName).absoluteFilePath().toStdString());
404   - }
405   -
406   - return video.isOpened();
407   - }
408   -
409   - bool isOpen() { return video.isOpened(); }
410   -
411   - void close() {
412   - video.release();
413   - }
414   -
415   -private:
416   - bool getNext(FrameData & output)
417   - {
418   - if (!isOpen()) {
419   - qDebug("video source is not open");
420   - return false;
421   - }
422   -
423   - output.data.append(Template(basis.file));
424   - output.data.last().m() = cv::Mat();
425   -
426   - output.sequenceNumber = next_sequence_number;
427   - next_sequence_number++;
428   -
429   - cv::Mat temp;
430   - bool res = video.read(temp);
431   -
432   - if (!res) {
433   - // The video capture broke, return false.
434   - output.data.last().m() = cv::Mat();
435   - close();
436   - return false;
437   - }
438   -
439   - // This clone is critical, if we don't do it then the matrix will
440   - // be an alias of an internal buffer of the video source, leading
441   - // to various problems later.
442   - output.data.last().m() = temp.clone();
443   -
444   - output.data.last().file.set("FrameNumber", output.sequenceNumber);
445   - return true;
446   - }
447   -
448   - cv::VideoCapture video;
449   - Template basis;
450   -};
451   -
452   -// Given a template as input, return its matrices one by one on subsequent calls
453   -// to getNext
454   -class TemplateDataSource : public DataSource
455   -{
456   -public:
457   - TemplateDataSource(int maxFrames) : DataSource(maxFrames)
458   - {
459   - current_matrix_idx = INT_MAX;
460   - data_ok = false;
461   - }
462   -
463   - // To "open" it we just set appropriate indices, we assume that if this
464   - // is an image, it is already loaded into memory.
465   - bool concreteOpen(Template &input)
466   - {
467   - basis = input;
468   - current_matrix_idx = 0;
469   -
470   - data_ok = current_matrix_idx < basis.size();
471   - qDebug("concrete open res is %d %d %d", data_ok, current_matrix_idx, basis.size());
472   - return data_ok;
473   - }
474   -
475   - bool isOpen() {
476   - return data_ok;
477   - }
478   -
479   - void close()
480   - {
481   - current_matrix_idx = INT_MAX;
482   - basis.clear();
483   - }
484   -
485   -private:
486   - bool getNext(FrameData & output)
487   - {
488   - data_ok = current_matrix_idx < basis.size();
489   - if (!data_ok)
490   - return false;
491   -
492   -
493   - output.data.append(basis[current_matrix_idx]);
494   - output.data.last().file = basis.file;
495   - current_matrix_idx++;
496   -
497   - output.sequenceNumber = next_sequence_number;
498   - next_sequence_number++;
499   -
500   - output.data.last().file.set("FrameNumber", output.sequenceNumber);
501   - return true;
502   - }
503   -
504   - Template basis;
505   - // Index of the next matrix to output from the template
506   - int current_matrix_idx;
507   -
508   - // is current_matrix_idx in bounds?
509   - bool data_ok;
510   -};
511   -
512   -class SingleDataSource : public DataSource
513   -{
514   -public:
515   - SingleDataSource(int maxFrames) : DataSource(maxFrames)
516   - {
517   - data_ok = false;
518   - }
519   -
520   - // To "open" it we just set appropriate indices, we assume that if this
521   - // is an image, it is already loaded into memory.
522   - bool concreteOpen(Template &input)
523   - {
524   - basis = input;
525   -// basis.file.name = (Globals->path.isEmpty() ? "" : Globals->path + "/") + basis.file.name;
526   -
527   - data_ok =true;
528   - return data_ok;
529   - }
530   -
531   - bool isOpen() {
532   - return data_ok;
533   - }
534   -
535   - void close()
536   - {
537   - data_ok = false;
538   - basis.clear();
539   - }
540   -
541   -private:
542   - bool getNext(FrameData & output)
543   - {
544   - if (!data_ok)
545   - return false;
546   -
547   - output.data.append(basis);
548   - data_ok = false;
549   - return true;
550   - }
551   -
552   - Template basis;
553   -
554   - // Have we sent our template yet?
555   - bool data_ok;
556   -};
557   -
558   -
559   -class ProcessingStage;
560   -
561   -class BasicLoop : public QRunnable, public QFutureInterface<void>
562   -{
563   -public:
564   - BasicLoop()
565   - {
566   - this->reportStarted();
567   - }
568   -
569   - void run();
  610 + void run();
570 611  
571 612 QList<ProcessingStage *> * stages;
572 613 int start_idx;
... ... @@ -785,106 +826,129 @@ public:
785 826  
786 827 };
787 828  
788   -// Appened to the end of a Stream's transform sequence. Collects the output
789   -// from each frame on a single templatelist
790   -class LastStage : public SingleThreadStage
  829 +// Semi-functional, doesn't do anything productive outside of stream::train
  830 +class CollectSets : public TimeVaryingTransform
791 831 {
  832 + Q_OBJECT
792 833 public:
793   - LastStage(bool _prev_stage_variance) : SingleThreadStage(_prev_stage_variance) {}
794   - TemplateList getOutput()
  834 + CollectSets() : TimeVaryingTransform(false, false) {}
  835 +
  836 + QList<TemplateList> sets;
  837 +
  838 + void projectUpdate(const TemplateList &src, TemplateList &dst)
795 839 {
796   - return collectedOutput;
  840 + (void) dst;
  841 + sets.append(src);
797 842 }
798 843  
799   -private:
800   - TemplateList collectedOutput;
  844 + void train(const TemplateList & data)
  845 + {
  846 + (void) data;
  847 + }
  848 +
  849 +};
  850 +
  851 +// This stage reads new frames from the data source.
  852 +class ReadStage : public SingleThreadStage
  853 +{
801 854 public:
  855 + ReadStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ }
  856 +
  857 + DataSource dataSource;
802 858  
803 859 void reset()
804 860 {
805   - collectedOutput.clear();
  861 + dataSource.close();
806 862 SingleThreadStage::reset();
807 863 }
808 864  
809 865 FrameData * run(FrameData * input, bool & should_continue)
810 866 {
811   - if (input == NULL) {
812   - qFatal("NULL input to stage %d", this->stage_id);
813   - }
814   -
815   - if (input->sequenceNumber != next_target)
816   - {
817   - qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, input->sequenceNumber, this->next_target);
818   - }
819   - next_target = input->sequenceNumber + 1;
820   -
821   - // add the item to our output buffer
822   - collectedOutput.append(input->data);
  867 + if (input == NULL)
  868 + qFatal("NULL frame in input stage");
823 869  
824   - // Can we enter the read stage?
  870 + // Can we enter the next stage?
825 871 should_continue = nextStage->tryAcquireNextStage(input);
826 872  
827   - // Is there anything on our input buffer? If so we should start a thread
828   - // in this stage to process that frame.
  873 + // Try to get a frame from the datasource, we keep working on
  874 + // the frame we have, but we will queue another job for the next
  875 + // frame if a frame is currently available.
829 876 QWriteLocker lock(&statusLock);
830   - FrameData * newItem = inputBuffer->tryGetItem();
831   - if (!newItem)
832   - {
833   - this->currentStatus = STOPPING;
  877 + bool last_frame = false;
  878 + FrameData * newFrame = dataSource.tryGetFrame(last_frame);
  879 +
  880 + // Were we able to get a frame?
  881 + if (newFrame) startThread(newFrame);
  882 + // If not this stage will enter a stopped state.
  883 + else {
  884 + currentStatus = STOPPING;
834 885 }
835   - lock.unlock();
836 886  
837   - if (newItem)
838   - startThread(newItem);
  887 + lock.unlock();
839 888  
840 889 return input;
841 890 }
842 891  
843   - void status(){
844   - qDebug("Collection stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size());
845   - }
  892 + // The last stage, trying to access the first stage
  893 + bool tryAcquireNextStage(FrameData *& input)
  894 + {
  895 + // Return the frame, was it the last one?
  896 + bool was_last = dataSource.returnFrame(input);
  897 + input = NULL;
846 898  
847   -};
  899 + // OK we won't continue.
  900 + if (was_last) {
  901 + return false;
  902 + }
848 903  
849   -// Semi-functional, doesn't do anything productive outside of stream::train
850   -class CollectSets : public TimeVaryingTransform
851   -{
852   - Q_OBJECT
853   -public:
854   - CollectSets() : TimeVaryingTransform(false, false) {}
  904 + QReadLocker lock(&statusLock);
  905 + // If the first stage is already active we will just end.
  906 + if (currentStatus == STARTING)
  907 + {
  908 + return false;
  909 + }
855 910  
856   - QList<TemplateList> sets;
  911 + // Otherwise we will try to continue, but to do so we have to
  912 + // escalate the lock, and sadly there is no way to do so without
  913 + // releasing the read-mode lock, and getting a new write-mode lock.
  914 + lock.unlock();
857 915  
858   - void projectUpdate(const TemplateList &src, TemplateList &dst)
859   - {
860   - (void) dst;
861   - sets.append(src);
862   - }
  916 + QWriteLocker writeLock(&statusLock);
  917 + // currentStatus might have changed in the gap between releasing the read
  918 + // lock and getting the write lock.
  919 + if (currentStatus == STARTING)
  920 + {
  921 + return false;
  922 + }
863 923  
864   - void train(const TemplateList & data)
865   - {
866   - (void) data;
  924 + bool last_frame = false;
  925 + // Try to get a frame from the data source, if we get one we will
  926 + // continue to the first stage.
  927 + input = dataSource.tryGetFrame(last_frame);
  928 +
  929 + if (!input) {
  930 + return false;
  931 + }
  932 +
  933 + currentStatus = STARTING;
  934 +
  935 + return true;
867 936 }
868 937  
  938 + void status(){
  939 + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size());
  940 + }
869 941 };
870 942  
871   -class FirstStage;
872   -
873 943 class DirectStreamTransform : public CompositeTransform
874 944 {
875 945 Q_OBJECT
876 946 public:
877 947  
878   - enum StreamModes { StreamVideo,
879   - DistributeFrames,
880   - Auto};
881   -
882   - Q_ENUMS(StreamModes)
883   -
884 948 Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames)
885   - Q_PROPERTY(StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode)
  949 + Q_PROPERTY(br::Idiocy::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode)
886 950 BR_PROPERTY(int, activeFrames, 100)
887   - BR_PROPERTY(StreamModes, readMode, Auto)
  951 + BR_PROPERTY(br::Idiocy::StreamModes, readMode, br::Idiocy::Auto)
888 952  
889 953 friend class StreamTransfrom;
890 954  
... ... @@ -958,8 +1022,163 @@ public:
958 1022 // on all child transforms as part of projectUpdate
959 1023 }
960 1024  
961   - void projectUpdate(const TemplateList & src, TemplateList & dst);
962   - void init();
  1025 + // start processing, consider all templates in src a continuous
  1026 + // 'video'
  1027 + void projectUpdate(const TemplateList & src, TemplateList & dst)
  1028 + {
  1029 + dst = src;
  1030 +
  1031 + bool res = readStage->dataSource.open(dst,readMode);
  1032 + if (!res) {
  1033 + qDebug("stream failed to open %s", qPrintable(dst[0].file.name));
  1034 + return;
  1035 + }
  1036 +
  1037 + // Start the first thread in the stream.
  1038 + QWriteLocker lock(&readStage->statusLock);
  1039 + readStage->currentStatus = SingleThreadStage::STARTING;
  1040 +
  1041 + // We have to get a frame before starting the thread
  1042 + bool last_frame = false;
  1043 + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame);
  1044 + if (firstFrame == NULL)
  1045 + qFatal("Failed to read first frame of video");
  1046 +
  1047 + readStage->startThread(firstFrame);
  1048 + lock.unlock();
  1049 +
  1050 + // Wait for the stream to process the last frame available from
  1051 + // the data source.
  1052 + bool wait_res = false;
  1053 + wait_res = readStage->dataSource.waitLast();
  1054 +
  1055 + // Now that there are no more incoming frames, call finalize
  1056 + // on each transform in turn to collect any last templates
  1057 + // they wish to issue.
  1058 + TemplateList final_output;
  1059 +
  1060 + // Push finalize through the stages
  1061 + for (int i=0; i < this->transforms.size(); i++)
  1062 + {
  1063 + TemplateList output_set;
  1064 + transforms[i]->finalize(output_set);
  1065 +
  1066 + for (int j=i+1; j < transforms.size();j++)
  1067 + {
  1068 + transforms[j]->projectUpdate(output_set);
  1069 + }
  1070 + final_output.append(output_set);
  1071 + }
  1072 +
  1073 + // dst is set to all output received by the final stage, along
  1074 + // with anything output via the calls to finalize.
  1075 + //dst = collectionStage->getOutput();
  1076 + foreach(const TemplateList & list, collector->sets) {
  1077 + dst.append(list);
  1078 + }
  1079 + collector->sets.clear();
  1080 +
  1081 + dst.append(final_output);
  1082 +
  1083 + foreach(ProcessingStage * stage, processingStages) {
  1084 + stage->reset();
  1085 + }
  1086 + }
  1087 +
  1088 +
  1089 + // Create and link stages
  1090 + void init()
  1091 + {
  1092 + if (transforms.isEmpty()) return;
  1093 +
  1094 + for (int i=0; i < processingStages.size();i++)
  1095 + delete processingStages[i];
  1096 + processingStages.clear();
  1097 +
  1098 + // call CompositeTransform::init so that trainable is set
  1099 + // correctly.
  1100 + CompositeTransform::init();
  1101 +
  1102 + // We share a thread pool across streams attached to the same
  1103 + // parent tranform, retrieve or create a thread pool based
  1104 + // on our parent transform.
  1105 + QMutexLocker poolLock(&poolsAccess);
  1106 + QHash<QObject *, QThreadPool *>::Iterator it;
  1107 + if (!pools.contains(this->parent())) {
  1108 + it = pools.insert(this->parent(), new QThreadPool(this->parent()));
  1109 + it.value()->setMaxThreadCount(Globals->parallelism);
  1110 + }
  1111 + else it = pools.find(this->parent());
  1112 + threads = it.value();
  1113 + poolLock.unlock();
  1114 +
  1115 + // Are our children time varying or not? This decides whether
  1116 + // we run them in single threaded or multi threaded stages
  1117 + stage_variance.clear();
  1118 + stage_variance.reserve(transforms.size());
  1119 + foreach (const br::Transform *transform, transforms) {
  1120 + stage_variance.append(transform->timeVarying());
  1121 + }
  1122 +
  1123 + // Additionally, we have a separate stage responsible for reading
  1124 + // frames from the data source
  1125 + readStage = new ReadStage(activeFrames);
  1126 +
  1127 + processingStages.push_back(readStage);
  1128 + readStage->stage_id = 0;
  1129 + readStage->stages = &this->processingStages;
  1130 + readStage->threads = this->threads;
  1131 +
  1132 + // Initialize and link a processing stage for each of our child
  1133 + // transforms.
  1134 + int next_stage_id = 1;
  1135 + bool prev_stage_variance = true;
  1136 + for (int i =0; i < transforms.size(); i++)
  1137 + {
  1138 + if (stage_variance[i])
  1139 + // Whether or not the previous stage is multi-threaded controls
  1140 + // the type of input buffer we need in a single threaded stage.
  1141 + processingStages.append(new SingleThreadStage(prev_stage_variance));
  1142 + else
  1143 + processingStages.append(new MultiThreadStage(Globals->parallelism));
  1144 +
  1145 + processingStages.last()->stage_id = next_stage_id++;
  1146 +
  1147 + // link nextStage pointers, the stage we just appeneded is i+1 since
  1148 + // the read stage was added before this loop
  1149 + processingStages[i]->nextStage = processingStages[i+1];
  1150 +
  1151 + processingStages.last()->stages = &this->processingStages;
  1152 + processingStages.last()->threads = this->threads;
  1153 +
  1154 + processingStages.last()->transform = transforms[i];
  1155 + prev_stage_variance = stage_variance[i];
  1156 + }
  1157 +
  1158 + // We also have the last stage, which just puts the output of the
  1159 + // previous stages on a template list.
  1160 + collectionStage = new SingleThreadStage(prev_stage_variance);
  1161 + collectionStage->transform = this->collector.data();
  1162 +
  1163 +
  1164 + processingStages.append(collectionStage);
  1165 + collectionStage->stage_id = next_stage_id;
  1166 + collectionStage->stages = &this->processingStages;
  1167 + collectionStage->threads = this->threads;
  1168 +
  1169 + // the last transform stage points to collection stage
  1170 + processingStages[processingStages.size() - 2]->nextStage = collectionStage;
  1171 +
  1172 + // And the collection stage points to the read stage, because this is
  1173 + // a ring buffer.
  1174 + collectionStage->nextStage = readStage;
  1175 + }
  1176 +
  1177 + DirectStreamTransform()
  1178 + {
  1179 + this->collector = QSharedPointer<CollectSets>(new CollectSets());
  1180 + }
  1181 +
963 1182 ~DirectStreamTransform()
964 1183 {
965 1184 // Delete all the stages
... ... @@ -972,8 +1191,9 @@ public:
972 1191 protected:
973 1192 QList<bool> stage_variance;
974 1193  
975   - FirstStage * readStage;
976   - LastStage * collectionStage;
  1194 + ReadStage * readStage;
  1195 + SingleThreadStage * collectionStage;
  1196 + QSharedPointer<CollectSets> collector;
977 1197  
978 1198 QList<ProcessingStage *> processingStages;
979 1199  
... ... @@ -1023,10 +1243,10 @@ public:
1023 1243 }
1024 1244  
1025 1245 Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames)
1026   - Q_PROPERTY(br::DirectStreamTransform::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode)
  1246 + Q_PROPERTY(br::Idiocy::StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode)
1027 1247  
1028 1248 BR_PROPERTY(int, activeFrames, 100)
1029   - BR_PROPERTY(br::DirectStreamTransform::StreamModes, readMode, br::DirectStreamTransform::Auto)
  1249 + BR_PROPERTY(br::Idiocy::StreamModes, readMode, br::Idiocy::Auto)
1030 1250  
1031 1251 bool timeVarying() const { return true; }
1032 1252  
... ... @@ -1150,382 +1370,6 @@ private:
1150 1370  
1151 1371 BR_REGISTER(Transform, StreamTransform)
1152 1372  
1153   -// Given a templatelist as input, create appropriate data source for each
1154   -// individual template
1155   -class DataSourceManager : public DataSource
1156   -{
1157   -public:
1158   - DataSourceManager(int activeFrames=100) : DataSource(activeFrames)
1159   - {
1160   - actualSource = NULL;
1161   - }
1162   -
1163   - ~DataSourceManager()
1164   - {
1165   - close();
1166   - }
1167   -
1168   - int size()
1169   - {
1170   - return this->allFrames.size();
1171   - }
1172   -
1173   - void close()
1174   - {
1175   - if (actualSource) {
1176   - actualSource->close();
1177   - delete actualSource;
1178   - actualSource = NULL;
1179   - }
1180   - }
1181   -
1182   - // We are used through a call to open(TemplateList)
1183   - bool open(TemplateList & input, DirectStreamTransform::StreamModes _mode)
1184   - {
1185   - // Set up variables specific to us
1186   - current_template_idx = 0;
1187   - templates = input;
1188   - mode = _mode;
1189   -
1190   - // Call datasourece::open on the first template to set up
1191   - // state variables
1192   - return DataSource::open(templates[current_template_idx]);
1193   - }
1194   - void projectUpdate(const TemplateList & src, TemplateList & dst);
1195   -
1196   - // Create an actual data source of appropriate type for this template
1197   - // (initially called via the call to DataSource::open, called later
1198   - // as we run out of frames on our templates).
1199   - bool concreteOpen(Template & input)
1200   - {
1201   - close();
1202   -
1203   - bool open_res = false;
1204   -
1205   - if (mode == DirectStreamTransform::DistributeFrames)
1206   - {
1207   - actualSource = new SingleDataSource(0);
1208   - open_res = actualSource->concreteOpen(input);
1209   - }
1210   - // Input has no matrices? Its probably a video that hasn't been loaded yet
1211   - else if (mode == DirectStreamTransform::StreamVideo || (mode == DirectStreamTransform::Auto && input.empty()) ) {
1212   - actualSource = new VideoDataSource(0);
1213   - open_res = actualSource->concreteOpen(input);
1214   - }
1215   - // If the input is not empty, we assume it is a set of frames already
1216   - // in memory.
1217   - else {
1218   - qDebug("in template open");
1219   - actualSource = new TemplateDataSource(0);
1220   - open_res = actualSource->concreteOpen(input);
1221   - }
1222   -
1223   - // The data source failed to open
1224   - if (!open_res) {
1225   - delete actualSource;
1226   - actualSource = NULL;
1227   - return false;
1228   - }
1229   - return true;
1230   - }
1231   -
1232   - bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }
1233   -
1234   -protected:
1235   - // Index of the template in the templatelist we are currently reading from
1236   - int current_template_idx;
1237   - DirectStreamTransform::StreamModes mode;
1238   - TemplateList templates;
1239   - DataSource * actualSource;
1240   - // Get the next frame, if we run out of frames on the current template
1241   - // move on to the next one.
1242   - bool getNext(FrameData & output)
1243   - {
1244   - bool res = actualSource->getNext(output);
1245   - output.sequenceNumber = next_sequence_number;
1246   -
1247   - // OK we got a frame
1248   - if (res) {
1249   - // Override the sequence number set by actualSource
1250   - output.data.last().file.set("FrameNumber", output.sequenceNumber);
1251   - next_sequence_number++;
1252   -// if (output.data.last().last().empty())
1253   -// qDebug("broken matrix");
1254   - return true;
1255   - }
1256   -
1257   - // We didn't get a frame, try to move on to the next template.
1258   - while(!res) {
1259   - output.data.clear();
1260   - current_template_idx++;
1261   -
1262   - // No more templates? We're done
1263   - if (current_template_idx >= templates.size())
1264   - return false;
1265   -
1266   - // open the next data source
1267   - bool open_res = concreteOpen(templates[current_template_idx]);
1268   - // We couldn't open it, give up? We could maybe continue here
1269   - // but don't currently.
1270   - if (!open_res)
1271   - return false;
1272   -
1273   - // get a frame from the newly opened data source, if that fails
1274   - // we continue to open the next one.
1275   - res = actualSource->getNext(output);
1276   - }
1277   - // Finally, set the sequence number for the frame we actually return.
1278   - output.sequenceNumber = next_sequence_number++;
1279   - output.data.last().file.set("FrameNumber", output.sequenceNumber);
1280   -
1281   -// if (output.data.last().last().empty())
1282   -// qDebug("broken matrix");
1283   -
1284   - return res;
1285   - }
1286   -
1287   -};
1288   -
1289   -// This stage reads new frames from the data source.
1290   -class FirstStage : public SingleThreadStage
1291   -{
1292   -public:
1293   - FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ }
1294   -
1295   - DataSourceManager dataSource;
1296   -
1297   - void reset()
1298   - {
1299   - dataSource.close();
1300   - SingleThreadStage::reset();
1301   - }
1302   -
1303   - FrameData * run(FrameData * input, bool & should_continue)
1304   - {
1305   - if (input == NULL)
1306   - qFatal("NULL frame in input stage");
1307   -
1308   - // Can we enter the next stage?
1309   - should_continue = nextStage->tryAcquireNextStage(input);
1310   -
1311   - // Try to get a frame from the datasource, we keep working on
1312   - // the frame we have, but we will queue another job for the next
1313   - // frame if a frame is currently available.
1314   - QWriteLocker lock(&statusLock);
1315   - bool last_frame = false;
1316   - FrameData * newFrame = dataSource.tryGetFrame(last_frame);
1317   -
1318   - // Were we able to get a frame?
1319   - if (newFrame) startThread(newFrame);
1320   - // If not this stage will enter a stopped state.
1321   - else {
1322   - currentStatus = STOPPING;
1323   - }
1324   -
1325   - lock.unlock();
1326   -
1327   - return input;
1328   - }
1329   -
1330   - // The last stage, trying to access the first stage
1331   - bool tryAcquireNextStage(FrameData *& input)
1332   - {
1333   - // Return the frame, was it the last one?
1334   - bool was_last = dataSource.returnFrame(input);
1335   - input = NULL;
1336   -
1337   - // OK we won't continue.
1338   - if (was_last) {
1339   - return false;
1340   - }
1341   -
1342   - QReadLocker lock(&statusLock);
1343   - // If the first stage is already active we will just end.
1344   - if (currentStatus == STARTING)
1345   - {
1346   - return false;
1347   - }
1348   -
1349   - // Otherwise we will try to continue, but to do so we have to
1350   - // escalate the lock, and sadly there is no way to do so without
1351   - // releasing the read-mode lock, and getting a new write-mode lock.
1352   - lock.unlock();
1353   -
1354   - QWriteLocker writeLock(&statusLock);
1355   - // currentStatus might have changed in the gap between releasing the read
1356   - // lock and getting the write lock.
1357   - if (currentStatus == STARTING)
1358   - {
1359   - return false;
1360   - }
1361   -
1362   - bool last_frame = false;
1363   - // Try to get a frame from the data source, if we get one we will
1364   - // continue to the first stage.
1365   - input = dataSource.tryGetFrame(last_frame);
1366   -
1367   - if (!input) {
1368   - return false;
1369   - }
1370   -
1371   - currentStatus = STARTING;
1372   -
1373   - return true;
1374   - }
1375   -
1376   - void status(){
1377   - qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size());
1378   - }
1379   -
1380   -
1381   -};
1382   -
1383   -
1384   -// start processing, consider all templates in src a continuous
1385   -// 'video'
1386   -void DirectStreamTransform::projectUpdate(const TemplateList & src, TemplateList & dst)
1387   -{
1388   - dst = src;
1389   -
1390   - bool res = readStage->dataSource.open(dst,readMode);
1391   - if (!res) {
1392   - qDebug("stream failed to open %s", qPrintable(dst[0].file.name));
1393   - return;
1394   - }
1395   -
1396   - // Start the first thread in the stream.
1397   - QWriteLocker lock(&readStage->statusLock);
1398   - readStage->currentStatus = SingleThreadStage::STARTING;
1399   -
1400   - // We have to get a frame before starting the thread
1401   - bool last_frame = false;
1402   - FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame);
1403   - if (firstFrame == NULL)
1404   - qFatal("Failed to read first frame of video");
1405   -
1406   - readStage->startThread(firstFrame);
1407   - lock.unlock();
1408   -
1409   - // Wait for the stream to process the last frame available from
1410   - // the data source.
1411   - bool wait_res = false;
1412   - wait_res = readStage->dataSource.waitLast();
1413   -
1414   - // Now that there are no more incoming frames, call finalize
1415   - // on each transform in turn to collect any last templates
1416   - // they wish to issue.
1417   - TemplateList final_output;
1418   -
1419   - // Push finalize through the stages
1420   - for (int i=0; i < this->transforms.size(); i++)
1421   - {
1422   - TemplateList output_set;
1423   - transforms[i]->finalize(output_set);
1424   -
1425   - for (int j=i+1; j < transforms.size();j++)
1426   - {
1427   - transforms[j]->projectUpdate(output_set);
1428   - }
1429   - final_output.append(output_set);
1430   - }
1431   -
1432   - // dst is set to all output received by the final stage, along
1433   - // with anything output via the calls to finalize.
1434   - dst = collectionStage->getOutput();
1435   - dst.append(final_output);
1436   -
1437   - foreach(ProcessingStage * stage, processingStages) {
1438   - stage->reset();
1439   - }
1440   -}
1441   -
1442   -
1443   -// Create and link stages
1444   -void DirectStreamTransform::init()
1445   -{
1446   - if (transforms.isEmpty()) return;
1447   -
1448   - for (int i=0; i < processingStages.size();i++)
1449   - delete processingStages[i];
1450   - processingStages.clear();
1451   -
1452   - // call CompositeTransform::init so that trainable is set
1453   - // correctly.
1454   - CompositeTransform::init();
1455   -
1456   - // We share a thread pool across streams attached to the same
1457   - // parent tranform, retrieve or create a thread pool based
1458   - // on our parent transform.
1459   - QMutexLocker poolLock(&poolsAccess);
1460   - QHash<QObject *, QThreadPool *>::Iterator it;
1461   - if (!pools.contains(this->parent())) {
1462   - it = pools.insert(this->parent(), new QThreadPool(this->parent()));
1463   - it.value()->setMaxThreadCount(Globals->parallelism);
1464   - }
1465   - else it = pools.find(this->parent());
1466   - threads = it.value();
1467   - poolLock.unlock();
1468   -
1469   - // Are our children time varying or not? This decides whether
1470   - // we run them in single threaded or multi threaded stages
1471   - stage_variance.clear();
1472   - stage_variance.reserve(transforms.size());
1473   - foreach (const br::Transform *transform, transforms) {
1474   - stage_variance.append(transform->timeVarying());
1475   - }
1476   -
1477   - // Additionally, we have a separate stage responsible for reading
1478   - // frames from the data source
1479   - readStage = new FirstStage(activeFrames);
1480   -
1481   - processingStages.push_back(readStage);
1482   - readStage->stage_id = 0;
1483   - readStage->stages = &this->processingStages;
1484   - readStage->threads = this->threads;
1485   -
1486   - // Initialize and link a processing stage for each of our child
1487   - // transforms.
1488   - int next_stage_id = 1;
1489   - bool prev_stage_variance = true;
1490   - for (int i =0; i < transforms.size(); i++)
1491   - {
1492   - if (stage_variance[i])
1493   - // Whether or not the previous stage is multi-threaded controls
1494   - // the type of input buffer we need in a single threaded stage.
1495   - processingStages.append(new SingleThreadStage(prev_stage_variance));
1496   - else
1497   - processingStages.append(new MultiThreadStage(Globals->parallelism));
1498   -
1499   - processingStages.last()->stage_id = next_stage_id++;
1500   -
1501   - // link nextStage pointers, the stage we just appeneded is i+1 since
1502   - // the read stage was added before this loop
1503   - processingStages[i]->nextStage = processingStages[i+1];
1504   -
1505   - processingStages.last()->stages = &this->processingStages;
1506   - processingStages.last()->threads = this->threads;
1507   -
1508   - processingStages.last()->transform = transforms[i];
1509   - prev_stage_variance = stage_variance[i];
1510   - }
1511   -
1512   - // We also have the last stage, which just puts the output of the
1513   - // previous stages on a template list.
1514   - collectionStage = new LastStage(prev_stage_variance);
1515   - processingStages.append(collectionStage);
1516   - collectionStage->stage_id = next_stage_id;
1517   - collectionStage->stages = &this->processingStages;
1518   - collectionStage->threads = this->threads;
1519   -
1520   - // the last transform stage points to collection stage
1521   - processingStages[processingStages.size() - 2]->nextStage = collectionStage;
1522   -
1523   - // And the collection stage points to the read stage, because this is
1524   - // a ring buffer.
1525   - collectionStage->nextStage = readStage;
1526   -}
1527   -
1528   -
1529 1373 } // namespace br
1530 1374  
1531 1375 #include "stream.moc"
... ...