Commit c3c7c41145586dd71cd27200601a8f5671d827d3

Authored by Charles Otto
1 parent 737babda

Some improvements and refactoring of stream

Start readstage threads by getting a frame from the datasource first, rather
than starting with a null item, and getting a frame in run. This is slightly
more awkward at startup, but is more consistent with how other single thread
stages work, and we don't end up queueing unnecessary threads (since we only
start a thread if we have a frame). We still do reads in the first stage,
but they are for the next frame, not the one we are currently working on.

Reset SequencingBuffers as part of resetting the processing stages after a
stream::project ends. This is of course necessary since sequencing buffers wait
to get specific frames before reporting new frames as available.

Use stage numbers directly as priorities for new jobs since qt 5.1 fixed the
bug where priorities were backwards

Surface the active frame count as a parameter of stream, and lower the default
from 500 to 100, though that may still be unnecessarily high for alot of cases.

Add some debug methods, improve some comments.
Showing 1 changed file with 221 additions and 56 deletions
openbr/plugins/stream.cpp
... ... @@ -31,8 +31,10 @@ public:
31 31 virtual ~SharedBuffer() {}
32 32  
33 33 virtual void addItem(FrameData * input)=0;
  34 + virtual void reset()=0;
34 35  
35 36 virtual FrameData * tryGetItem()=0;
  37 + virtual int size()=0;
36 38 };
37 39  
38 40 // for n - 1 boundaries, multiple threads call addItem, the frames are
... ... @@ -74,6 +76,21 @@ public:
74 76 return output;
75 77 }
76 78  
  79 + virtual int size()
  80 + {
  81 + QMutexLocker lock(&bufferGuard);
  82 + return buffer.size();
  83 + }
  84 + virtual void reset()
  85 + {
  86 + if (size() != 0)
  87 + qDebug("Sequencing buffer has non-zero size during reset!");
  88 +
  89 + QMutexLocker lock(&bufferGuard);
  90 + next_target = 0;
  91 + }
  92 +
  93 +
77 94 private:
78 95 QMutex bufferGuard;
79 96 int next_target;
... ... @@ -95,6 +112,11 @@ public:
95 112 outputBuffer = &buffer2;
96 113 }
97 114  
  115 + int size()
  116 + {
  117 + QReadLocker readLock(&bufferGuard);
  118 + return inputBuffer->size() + outputBuffer->size();
  119 + }
98 120  
99 121 // called from the producer thread
100 122 void addItem(FrameData * input)
... ... @@ -133,6 +155,13 @@ public:
133 155 return output;
134 156 }
135 157  
  158 + virtual void reset()
  159 + {
  160 + if (this->size() != 0)
  161 + qDebug("Shared buffer has non-zero size during reset!");
  162 + }
  163 +
  164 +
136 165 private:
137 166 // The read-write lock. The thread adding to this buffer can add
138 167 // to the current input buffer if it has a read lock. The thread
... ... @@ -194,14 +223,13 @@ public:
194 223 // Try to get a FrameData from the pool, if we can't it means too many
195 224 // frames are already out, and we will return NULL to indicate failure
196 225 FrameData * aFrame = allFrames.tryGetItem();
197   - if (aFrame == NULL) {
  226 + if (aFrame == NULL)
198 227 return NULL;
199   - }
200 228  
201 229 aFrame->data.clear();
202 230 aFrame->sequenceNumber = -1;
203 231  
204   - // Try to read a frame, if this returns false the data source is broken
  232 + // Try to actually read a frame, if this returns false the data source is broken
205 233 bool res = getNext(*aFrame);
206 234  
207 235 // The datasource broke, update final_frame
... ... @@ -211,12 +239,15 @@ public:
211 239 final_frame = lookAhead.back()->sequenceNumber;
212 240 allFrames.addItem(aFrame);
213 241 }
214   - else lookAhead.push_back(aFrame);
  242 + else {
  243 + lookAhead.push_back(aFrame);
  244 + }
215 245  
  246 + // we will return the first frame on the lookAhead buffer
216 247 FrameData * rVal = lookAhead.first();
217 248 lookAhead.pop_front();
218 249  
219   -
  250 + // If this is the last frame, say so
220 251 if (rVal->sequenceNumber == final_frame) {
221 252 last_frame = true;
222 253 is_broken = true;
... ... @@ -239,6 +270,7 @@ public:
239 270  
240 271 if (frameNumber == final_frame) {
241 272 // We just received the last frame, better pulse
  273 + allReturned = true;
242 274 lastReturned.wakeAll();
243 275 rval = true;
244 276 }
... ... @@ -246,15 +278,24 @@ public:
246 278 return rval;
247 279 }
248 280  
249   - void waitLast()
  281 + bool waitLast()
250 282 {
251 283 QMutexLocker lock(&last_frame_update);
252   - lastReturned.wait(&last_frame_update);
  284 +
  285 + while (!allReturned)
  286 + {
  287 + // This would be a safer wait if we used a timeout, but
  288 + // theoretically that should never matter.
  289 + lastReturned.wait(&last_frame_update);
  290 + }
  291 + return true;
253 292 }
254 293  
255 294 bool open(Template & output, int start_index = 0)
256 295 {
257 296 is_broken = false;
  297 + allReturned = false;
  298 +
258 299 // The last frame isn't initialized yet
259 300 final_frame = -1;
260 301 // Start our sequence numbers from the input index
... ... @@ -282,19 +323,34 @@ public:
282 323 bool res = getNext(*firstFrame);
283 324  
284 325 // the data source broke already, we couldn't even get one frame
285   - // from it.
  326 + // from it even though it claimed to have opened successfully.
286 327 if (!res) {
287 328 is_broken = true;
288 329 return false;
289 330 }
290 331  
  332 + // We read one frame ahead of the last one returned, this allows
  333 + // us to know which frame is the final frame when we return it.
291 334 lookAhead.append(firstFrame);
292 335 return true;
293 336 }
294 337  
  338 + /*
  339 + * Pure virtual methods
  340 + */
  341 +
  342 + // isOpen doesn't appear to particularly work when used on opencv
  343 + // VideoCaptures, so we don't use it for anything important.
295 344 virtual bool isOpen()=0;
  345 + // Called from open, open the data source specified by the input
  346 + // template, don't worry about setting any of the state variables
  347 + // set in open.
296 348 virtual bool concreteOpen(Template & output) = 0;
  349 + // Get the next frame from the data source, store the results in
  350 + // FrameData (including the actual frame and appropriate sequence
  351 + // number).
297 352 virtual bool getNext(FrameData & input) = 0;
  353 + // close the currently open data source.
298 354 virtual void close() = 0;
299 355  
300 356 int next_sequence_number;
... ... @@ -302,6 +358,7 @@ protected:
302 358 DoubleBuffer allFrames;
303 359 int final_frame;
304 360 bool is_broken;
  361 + bool allReturned;
305 362 QList<FrameData *> lookAhead;
306 363  
307 364 QWaitCondition lastReturned;
... ... @@ -317,6 +374,11 @@ public:
317 374 bool concreteOpen(Template &input)
318 375 {
319 376 basis = input;
  377 +
  378 + // We can open either files (well actually this includes addresses of ip cameras
  379 + // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate
  380 + // overload of open that takes an integer, not a string.
  381 + // So, does this look like an integer?
320 382 bool is_int = false;
321 383 int anInt = input.file.name.toInt(&is_int);
322 384 if (is_int)
... ... @@ -349,8 +411,10 @@ public:
349 411 private:
350 412 bool getNext(FrameData & output)
351 413 {
352   - if (!isOpen())
  414 + if (!isOpen()) {
  415 + qDebug("video source is not open");
353 416 return false;
  417 + }
354 418  
355 419 output.data.append(Template(basis.file));
356 420 output.data.last().m() = cv::Mat();
... ... @@ -362,6 +426,7 @@ private:
362 426 bool res = video.read(temp);
363 427  
364 428 if (!res) {
  429 + // The video capture broke, return false.
365 430 output.data.last().m() = cv::Mat();
366 431 close();
367 432 return false;
... ... @@ -391,6 +456,8 @@ public:
391 456 data_ok = false;
392 457 }
393 458  
  459 + // To "open" it we just set appropriate indices, we assume that if this
  460 + // is an image, it is already loaded into memory.
394 461 bool concreteOpen(Template &input)
395 462 {
396 463 basis = input;
... ... @@ -440,7 +507,7 @@ private:
440 507 class DataSourceManager : public DataSource
441 508 {
442 509 public:
443   - DataSourceManager() : DataSource(500)
  510 + DataSourceManager(int activeFrames=100) : DataSource(activeFrames)
444 511 {
445 512 actualSource = NULL;
446 513 }
... ... @@ -450,6 +517,11 @@ public:
450 517 close();
451 518 }
452 519  
  520 + int size()
  521 + {
  522 + return this->allFrames.size();
  523 + }
  524 +
453 525 void close()
454 526 {
455 527 if (actualSource) {
... ... @@ -459,29 +531,40 @@ public:
459 531 }
460 532 }
461 533  
  534 + // We are used through a call to open(TemplateList)
462 535 bool open(TemplateList & input)
463 536 {
  537 + // Set up variables specific to us
464 538 current_template_idx = 0;
465 539 templates = input;
466 540  
  541 + // Call datasourece::open on the first template to set up
  542 + // state variables
467 543 return DataSource::open(templates[current_template_idx]);
468 544 }
469 545  
  546 + // Create an actual data source of appropriate type for this template
  547 + // (initially called via the call to DataSource::open, called later
  548 + // as we run out of frames on our templates).
470 549 bool concreteOpen(Template & input)
471 550 {
472 551 close();
473 552  
  553 + bool open_res = false;
474 554 // Input has no matrices? Its probably a video that hasn't been loaded yet
475 555 if (input.empty()) {
476 556 actualSource = new VideoDataSource(0);
477   - actualSource->concreteOpen(input);
  557 + open_res = actualSource->concreteOpen(input);
478 558 }
  559 + // If the input is not empty, we assume it is a set of frames already
  560 + // in memory.
479 561 else {
480   - // create frame dealer
481 562 actualSource = new TemplateDataSource(0);
482   - actualSource->concreteOpen(input);
  563 + open_res = actualSource->concreteOpen(input);
483 564 }
484   - if (!isOpen()) {
  565 +
  566 + // The data source failed to open
  567 + if (!open_res) {
485 568 delete actualSource;
486 569 actualSource = NULL;
487 570 return false;
... ... @@ -497,12 +580,16 @@ protected:
497 580  
498 581 TemplateList templates;
499 582 DataSource * actualSource;
  583 + // Get the next frame, if we run out of frames on the current template
  584 + // move on to the next one.
500 585 bool getNext(FrameData & output)
501 586 {
502 587 bool res = actualSource->getNext(output);
503 588 output.sequenceNumber = next_sequence_number;
504 589  
  590 + // OK we got a frame
505 591 if (res) {
  592 + // Override the sequence number set by actualSource
506 593 output.data.last().file.set("FrameNumber", output.sequenceNumber);
507 594 next_sequence_number++;
508 595 if (output.data.last().last().empty())
... ... @@ -510,7 +597,7 @@ protected:
510 597 return true;
511 598 }
512 599  
513   -
  600 + // We didn't get a frame, try to move on to the next template.
514 601 while(!res) {
515 602 output.data.clear();
516 603 current_template_idx++;
... ... @@ -521,12 +608,16 @@ protected:
521 608  
522 609 // open the next data source
523 610 bool open_res = concreteOpen(templates[current_template_idx]);
  611 + // We couldn't open it, give up? We could maybe continue here
  612 + // but don't currently.
524 613 if (!open_res)
525 614 return false;
526 615  
527   - // get a frame from it
  616 + // get a frame from the newly opened data source, if that fails
  617 + // we continue to open the next one.
528 618 res = actualSource->getNext(output);
529 619 }
  620 + // Finally, set the sequence number for the frame we actually return.
530 621 output.sequenceNumber = next_sequence_number++;
531 622 output.data.last().file.set("FrameNumber", output.sequenceNumber);
532 623  
... ... @@ -573,6 +664,9 @@ public:
573 664 int stage_id;
574 665  
575 666 virtual void reset()=0;
  667 +
  668 + virtual void status()=0;
  669 +
576 670 protected:
577 671 int thread_count;
578 672  
... ... @@ -606,7 +700,8 @@ class MultiThreadStage : public ProcessingStage
606 700 public:
607 701 MultiThreadStage(int _input) : ProcessingStage(_input) {}
608 702  
609   -
  703 + // Not much to worry about here, we will project the input
  704 + // and try to continue to the next stage.
610 705 FrameData * run(FrameData * input, bool & should_continue)
611 706 {
612 707 if (input == NULL) {
... ... @@ -620,7 +715,8 @@ public:
620 715 return input;
621 716 }
622 717  
623   - // Called from a different thread than run
  718 + // Called from a different thread than run. Nothing to worry about
  719 + // we offer no restrictions on when loops may enter this stage.
624 720 virtual bool tryAcquireNextStage(FrameData *& input)
625 721 {
626 722 (void) input;
... ... @@ -631,6 +727,9 @@ public:
631 727 {
632 728 // nothing to do.
633 729 }
  730 + void status(){
  731 + qDebug("multi thread stage %d, nothing to worry about", this->stage_id);
  732 + }
634 733 };
635 734  
636 735 class SingleThreadStage : public ProcessingStage
... ... @@ -640,13 +739,18 @@ public:
640 739 {
641 740 currentStatus = STOPPING;
642 741 next_target = 0;
  742 + // If the previous stage is single-threaded, queued inputs
  743 + // are stored in a double buffer
643 744 if (input_variance) {
644 745 this->inputBuffer = new DoubleBuffer();
645 746 }
  747 + // If it's multi-threaded we need to put the inputs back in order
  748 + // before we can use them, so we use a sequencing buffer.
646 749 else {
647 750 this->inputBuffer = new SequencingBuffer();
648 751 }
649 752 }
  753 +
650 754 ~SingleThreadStage()
651 755 {
652 756 delete inputBuffer;
... ... @@ -657,6 +761,7 @@ public:
657 761 QWriteLocker writeLock(&statusLock);
658 762 currentStatus = STOPPING;
659 763 next_target = 0;
  764 + inputBuffer->reset();
660 765 }
661 766  
662 767  
... ... @@ -706,7 +811,13 @@ public:
706 811 next->stages = stages;
707 812 next->start_idx = this->stage_id;
708 813 next->startItem = newItem;
709   - this->threads->start(next, stages->size() - stage_id);
  814 +
  815 + // We start threads with priority equal to their stage id
  816 + // This is intended to ensure progression, we do queued late stage
  817 + // jobs before queued early stage jobs, and so tend to finish frames
  818 + // rather than go stage by stage. In Qt 5.1, priorities are priorities
  819 + // so we use the stage_id directly.
  820 + this->threads->start(next, stage_id);
710 821 }
711 822  
712 823  
... ... @@ -741,45 +852,50 @@ public:
741 852  
742 853 return true;
743 854 }
  855 +
  856 + void status(){
  857 + qDebug("single thread stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size());
  858 + }
  859 +
744 860 };
745 861  
746   -// No input buffer, instead we draw templates from some data source
747   -// Will be operated by the main thread for the stream. starts threads
  862 +// This stage reads new frames from the data source.
748 863 class FirstStage : public SingleThreadStage
749 864 {
750 865 public:
751   - FirstStage() : SingleThreadStage(true) {}
  866 + FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ }
752 867  
753 868 DataSourceManager dataSource;
754 869  
  870 + void reset()
  871 + {
  872 + dataSource.close();
  873 + SingleThreadStage::reset();
  874 + }
  875 +
755 876 FrameData * run(FrameData * input, bool & should_continue)
756 877 {
757   - // Try to get a frame from the datasource
  878 + if (input == NULL)
  879 + qFatal("NULL frame in input stage");
  880 +
  881 + // Can we enter the next stage?
  882 + should_continue = nextStage->tryAcquireNextStage(input);
  883 +
  884 + // Try to get a frame from the datasource, we keep working on
  885 + // the frame we have, but we will queue another job for the next
  886 + // frame if a frame is currently available.
758 887 QWriteLocker lock(&statusLock);
759 888 bool last_frame = false;
760   - input = dataSource.tryGetFrame(last_frame);
  889 + FrameData * newFrame = dataSource.tryGetFrame(last_frame);
761 890  
762   - // Datasource broke, or is currently out of frames?
763   - if (!input || last_frame)
764   - {
765   - // We will just stop and not continue.
  891 + // Were we able to get a frame?
  892 + if (newFrame) startThread(newFrame);
  893 + // If not this stage will enter a stopped state.
  894 + else {
766 895 currentStatus = STOPPING;
767   - if (!input) {
768   - should_continue = false;
769   - return NULL;
770   - }
771 896 }
772   - lock.unlock();
773   - // Can we enter the next stage?
774   - should_continue = nextStage->tryAcquireNextStage(input);
775 897  
776   - // We are exiting leaving this stage, should we start another
777   - // thread here? Normally we will always re-queue a thread on
778   - // the first stage, but if we received the last frame there is
779   - // no need to.
780   - if (!last_frame) {
781   - startThread(NULL);
782   - }
  898 + lock.unlock();
783 899  
784 900 return input;
785 901 }
... ... @@ -797,31 +913,48 @@ public:
797 913 }
798 914  
799 915 QReadLocker lock(&statusLock);
800   - // A thread is already in the first stage,
801   - // we should just return
  916 + // If the first stage is already active we will just end.
802 917 if (currentStatus == STARTING)
803 918 {
804 919 return false;
805 920 }
806   - // Have to change to a write lock to modify currentStatus
  921 +
  922 + // Otherwise we will try to continue, but to do so we have to
  923 + // escalate the lock, and sadly there is no way to do so without
  924 + // releasing the read-mode lock, and getting a new write-mode lock.
807 925 lock.unlock();
808 926  
809 927 QWriteLocker writeLock(&statusLock);
810   - // But someone else might have started a thread in the meantime
  928 + // currentStatus might have changed in the gap between releasing the read
  929 + // lock and getting the write lock.
811 930 if (currentStatus == STARTING)
812 931 {
813 932 return false;
814 933 }
815   - // Ok we'll start a thread
  934 +
  935 + bool last_frame = false;
  936 + // Try to get a frame from the data source, if we get one we will
  937 + // continue to the first stage.
  938 + input = dataSource.tryGetFrame(last_frame);
  939 +
  940 + if (!input) {
  941 + return false;
  942 + }
  943 +
816 944 currentStatus = STARTING;
817 945  
818   - // We always start a readstage thread with null input, so nothing to do here
819 946 return true;
820 947 }
821 948  
  949 + void status(){
  950 + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size());
  951 + }
  952 +
  953 +
822 954 };
823 955  
824   -// starts threads
  956 +// Appened to the end of a Stream's transform sequence. Collects the output
  957 +// from each frame on a single templatelist
825 958 class LastStage : public SingleThreadStage
826 959 {
827 960 public:
... ... @@ -834,6 +967,7 @@ public:
834 967 private:
835 968 TemplateList collectedOutput;
836 969 public:
  970 +
837 971 void reset()
838 972 {
839 973 collectedOutput.clear();
... ... @@ -873,6 +1007,11 @@ public:
873 1007  
874 1008 return input;
875 1009 }
  1010 +
  1011 + void status(){
  1012 + qDebug("Collection stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size());
  1013 + }
  1014 +
876 1015 };
877 1016  
878 1017  
... ... @@ -880,6 +1019,8 @@ class StreamTransform : public CompositeTransform
880 1019 {
881 1020 Q_OBJECT
882 1021 public:
  1022 + Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames)
  1023 + BR_PROPERTY(int, activeFrames, 100)
883 1024  
884 1025 void train(const TemplateList & data)
885 1026 {
... ... @@ -902,7 +1043,8 @@ public:
902 1043 qFatal("whatever");
903 1044 }
904 1045  
905   - // start processing
  1046 + // start processing, consider all templates in src a continuous
  1047 + // 'video'
906 1048 void projectUpdate(const TemplateList & src, TemplateList & dst)
907 1049 {
908 1050 dst = src;
... ... @@ -911,12 +1053,21 @@ public:
911 1053 if (!res) return;
912 1054  
913 1055 // Start the first thread in the stream.
  1056 + QWriteLocker lock(&readStage->statusLock);
914 1057 readStage->currentStatus = SingleThreadStage::STARTING;
915   - readStage->startThread(NULL);
916 1058  
917   - // Wait for the stream to reach the last frame available from
  1059 + // We have to get a frame before starting the thread
  1060 + bool last_frame = false;
  1061 + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame);
  1062 + if (firstFrame == NULL)
  1063 + qFatal("Failed to read first frame of video");
  1064 + readStage->startThread(firstFrame);
  1065 + lock.unlock();
  1066 +
  1067 + // Wait for the stream to process the last frame available from
918 1068 // the data source.
919   - readStage->dataSource.waitLast();
  1069 + bool wait_res = false;
  1070 + wait_res = readStage->dataSource.waitLast();
920 1071  
921 1072 // Now that there are no more incoming frames, call finalize
922 1073 // on each transform in turn to collect any last templates
... ... @@ -958,6 +1109,8 @@ public:
958 1109 {
959 1110 if (transforms.isEmpty()) return;
960 1111  
  1112 + // call CompositeTransform::init so that trainable is set
  1113 + // correctly.
961 1114 CompositeTransform::init();
962 1115  
963 1116 // We share a thread pool across streams attached to the same
... ... @@ -973,24 +1126,31 @@ public:
973 1126 threads = it.value();
974 1127 poolLock.unlock();
975 1128  
  1129 + // Are our children time varying or not? This decides whether
  1130 + // we run them in single threaded or multi threaded stages
976 1131 stage_variance.reserve(transforms.size());
977 1132 foreach (const br::Transform *transform, transforms) {
978 1133 stage_variance.append(transform->timeVarying());
979 1134 }
980 1135  
981   - readStage = new FirstStage();
  1136 + // Additionally, we have a separate stage responsible for reading
  1137 + // frames from the data source
  1138 + readStage = new FirstStage(activeFrames);
982 1139  
983 1140 processingStages.push_back(readStage);
984 1141 readStage->stage_id = 0;
985 1142 readStage->stages = &this->processingStages;
986 1143 readStage->threads = this->threads;
987 1144  
  1145 + // Initialize and link a processing stage for each of our child
  1146 + // transforms.
988 1147 int next_stage_id = 1;
989   -
990 1148 bool prev_stage_variance = true;
991 1149 for (int i =0; i < transforms.size(); i++)
992 1150 {
993 1151 if (stage_variance[i])
  1152 + // Whether or not the previous stage is multi-threaded controls
  1153 + // the type of input buffer we need in a single threaded stage.
994 1154 processingStages.append(new SingleThreadStage(prev_stage_variance));
995 1155 else
996 1156 processingStages.append(new MultiThreadStage(Globals->parallelism));
... ... @@ -1008,20 +1168,25 @@ public:
1008 1168 prev_stage_variance = stage_variance[i];
1009 1169 }
1010 1170  
  1171 + // We also have the last stage, which just puts the output of the
  1172 + // previous stages on a template list.
1011 1173 collectionStage = new LastStage(prev_stage_variance);
1012 1174 processingStages.append(collectionStage);
1013 1175 collectionStage->stage_id = next_stage_id;
1014 1176 collectionStage->stages = &this->processingStages;
1015 1177 collectionStage->threads = this->threads;
1016 1178  
  1179 + // the last transform stage points to collection stage
1017 1180 processingStages[processingStages.size() - 2]->nextStage = collectionStage;
1018 1181  
1019   - // It's a ring buffer, get it?
  1182 + // And the collection stage points to the read stage, because this is
  1183 + // a ring buffer.
1020 1184 collectionStage->nextStage = readStage;
1021 1185 }
1022 1186  
1023 1187 ~StreamTransform()
1024 1188 {
  1189 + // Delete all the stages
1025 1190 for (int i = 0; i < processingStages.size(); i++) {
1026 1191 delete processingStages[i];
1027 1192 }
... ...