Commit 1aceddc3cc411d3d567968a0cb1834c14e7f46c0

Authored by Charles Otto
1 parent 8dea5a3e

Add an explicit open mode parameter to stream

Allows users to directly specify how stream handles input data (i.e. reads from
a video source, or just distributes the templates)
Showing 1 changed file with 381 additions and 361 deletions
openbr/plugins/stream.cpp
@@ -16,6 +16,7 @@ using namespace cv; @@ -16,6 +16,7 @@ using namespace cv;
16 namespace br 16 namespace br
17 { 17 {
18 18
  19 +
19 class FrameData 20 class FrameData
20 { 21 {
21 public: 22 public:
@@ -505,132 +506,6 @@ private: @@ -505,132 +506,6 @@ private:
505 bool data_ok; 506 bool data_ok;
506 }; 507 };
507 508
508 -// Given a templatelist as input, create appropriate data source for each  
509 -// individual template  
510 -class DataSourceManager : public DataSource  
511 -{  
512 -public:  
513 - DataSourceManager(int activeFrames=100) : DataSource(activeFrames)  
514 - {  
515 - actualSource = NULL;  
516 - }  
517 -  
518 - ~DataSourceManager()  
519 - {  
520 - close();  
521 - }  
522 -  
523 - int size()  
524 - {  
525 - return this->allFrames.size();  
526 - }  
527 -  
528 - void close()  
529 - {  
530 - if (actualSource) {  
531 - actualSource->close();  
532 - delete actualSource;  
533 - actualSource = NULL;  
534 - }  
535 - }  
536 -  
537 - // We are used through a call to open(TemplateList)  
538 - bool open(TemplateList & input)  
539 - {  
540 - // Set up variables specific to us  
541 - current_template_idx = 0;  
542 - templates = input;  
543 -  
544 - // Call datasourece::open on the first template to set up  
545 - // state variables  
546 - return DataSource::open(templates[current_template_idx]);  
547 - }  
548 -  
549 - // Create an actual data source of appropriate type for this template  
550 - // (initially called via the call to DataSource::open, called later  
551 - // as we run out of frames on our templates).  
552 - bool concreteOpen(Template & input)  
553 - {  
554 - close();  
555 -  
556 - bool open_res = false;  
557 - // Input has no matrices? Its probably a video that hasn't been loaded yet  
558 - if (input.empty()) {  
559 - actualSource = new VideoDataSource(0);  
560 - open_res = actualSource->concreteOpen(input);  
561 - }  
562 - // If the input is not empty, we assume it is a set of frames already  
563 - // in memory.  
564 - else {  
565 - actualSource = new TemplateDataSource(0);  
566 - open_res = actualSource->concreteOpen(input);  
567 - }  
568 -  
569 - // The data source failed to open  
570 - if (!open_res) {  
571 - delete actualSource;  
572 - actualSource = NULL;  
573 - return false;  
574 - }  
575 - return true;  
576 - }  
577 -  
578 - bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }  
579 -  
580 -protected:  
581 - // Index of the template in the templatelist we are currently reading from  
582 - int current_template_idx;  
583 -  
584 - TemplateList templates;  
585 - DataSource * actualSource;  
586 - // Get the next frame, if we run out of frames on the current template  
587 - // move on to the next one.  
588 - bool getNext(FrameData & output)  
589 - {  
590 - bool res = actualSource->getNext(output);  
591 - output.sequenceNumber = next_sequence_number;  
592 -  
593 - // OK we got a frame  
594 - if (res) {  
595 - // Override the sequence number set by actualSource  
596 - output.data.last().file.set("FrameNumber", output.sequenceNumber);  
597 - next_sequence_number++;  
598 - if (output.data.last().last().empty())  
599 - qDebug("broken matrix");  
600 - return true;  
601 - }  
602 -  
603 - // We didn't get a frame, try to move on to the next template.  
604 - while(!res) {  
605 - output.data.clear();  
606 - current_template_idx++;  
607 -  
608 - // No more templates? We're done  
609 - if (current_template_idx >= templates.size())  
610 - return false;  
611 -  
612 - // open the next data source  
613 - bool open_res = concreteOpen(templates[current_template_idx]);  
614 - // We couldn't open it, give up? We could maybe continue here  
615 - // but don't currently.  
616 - if (!open_res)  
617 - return false;  
618 -  
619 - // get a frame from the newly opened data source, if that fails  
620 - // we continue to open the next one.  
621 - res = actualSource->getNext(output);  
622 - }  
623 - // Finally, set the sequence number for the frame we actually return.  
624 - output.sequenceNumber = next_sequence_number++;  
625 - output.data.last().file.set("FrameNumber", output.sequenceNumber);  
626 -  
627 - if (output.data.last().last().empty())  
628 - qDebug("broken matrix");  
629 -  
630 - return res;  
631 - }  
632 -  
633 -};  
634 509
635 class ProcessingStage; 510 class ProcessingStage;
636 511
@@ -861,100 +736,6 @@ public: @@ -861,100 +736,6 @@ public:
861 736
862 }; 737 };
863 738
864 -// This stage reads new frames from the data source.  
865 -class FirstStage : public SingleThreadStage  
866 -{  
867 -public:  
868 - FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ }  
869 -  
870 - DataSourceManager dataSource;  
871 -  
872 - void reset()  
873 - {  
874 - dataSource.close();  
875 - SingleThreadStage::reset();  
876 - }  
877 -  
878 - FrameData * run(FrameData * input, bool & should_continue)  
879 - {  
880 - if (input == NULL)  
881 - qFatal("NULL frame in input stage");  
882 -  
883 - // Can we enter the next stage?  
884 - should_continue = nextStage->tryAcquireNextStage(input);  
885 -  
886 - // Try to get a frame from the datasource, we keep working on  
887 - // the frame we have, but we will queue another job for the next  
888 - // frame if a frame is currently available.  
889 - QWriteLocker lock(&statusLock);  
890 - bool last_frame = false;  
891 - FrameData * newFrame = dataSource.tryGetFrame(last_frame);  
892 -  
893 - // Were we able to get a frame?  
894 - if (newFrame) startThread(newFrame);  
895 - // If not this stage will enter a stopped state.  
896 - else {  
897 - currentStatus = STOPPING;  
898 - }  
899 -  
900 - lock.unlock();  
901 -  
902 - return input;  
903 - }  
904 -  
905 - // The last stage, trying to access the first stage  
906 - bool tryAcquireNextStage(FrameData *& input)  
907 - {  
908 - // Return the frame, was it the last one?  
909 - bool was_last = dataSource.returnFrame(input);  
910 - input = NULL;  
911 -  
912 - // OK we won't continue.  
913 - if (was_last) {  
914 - return false;  
915 - }  
916 -  
917 - QReadLocker lock(&statusLock);  
918 - // If the first stage is already active we will just end.  
919 - if (currentStatus == STARTING)  
920 - {  
921 - return false;  
922 - }  
923 -  
924 - // Otherwise we will try to continue, but to do so we have to  
925 - // escalate the lock, and sadly there is no way to do so without  
926 - // releasing the read-mode lock, and getting a new write-mode lock.  
927 - lock.unlock();  
928 -  
929 - QWriteLocker writeLock(&statusLock);  
930 - // currentStatus might have changed in the gap between releasing the read  
931 - // lock and getting the write lock.  
932 - if (currentStatus == STARTING)  
933 - {  
934 - return false;  
935 - }  
936 -  
937 - bool last_frame = false;  
938 - // Try to get a frame from the data source, if we get one we will  
939 - // continue to the first stage.  
940 - input = dataSource.tryGetFrame(last_frame);  
941 -  
942 - if (!input) {  
943 - return false;  
944 - }  
945 -  
946 - currentStatus = STARTING;  
947 -  
948 - return true;  
949 - }  
950 -  
951 - void status(){  
952 - qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size());  
953 - }  
954 -  
955 -  
956 -};  
957 -  
958 // Appened to the end of a Stream's transform sequence. Collects the output 739 // Appened to the end of a Stream's transform sequence. Collects the output
959 // from each frame on a single templatelist 740 // from each frame on a single templatelist
960 class LastStage : public SingleThreadStage 741 class LastStage : public SingleThreadStage
@@ -1038,12 +819,23 @@ public: @@ -1038,12 +819,23 @@ public:
1038 819
1039 }; 820 };
1040 821
  822 +class FirstStage;
  823 +
1041 class DirectStreamTransform : public CompositeTransform 824 class DirectStreamTransform : public CompositeTransform
1042 { 825 {
1043 Q_OBJECT 826 Q_OBJECT
1044 public: 827 public:
  828 +
  829 + enum StreamModes { StreamVideo,
  830 + DistributeFrames,
  831 + Auto};
  832 +
  833 + Q_ENUMS(StreamModes)
  834 +
1045 Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames) 835 Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames)
  836 + Q_PROPERTY(StreamModes readMode READ get_readMode WRITE set_readMode RESET reset_readMode)
1046 BR_PROPERTY(int, activeFrames, 100) 837 BR_PROPERTY(int, activeFrames, 100)
  838 + BR_PROPERTY(StreamModes, readMode, Auto)
1047 839
1048 friend class StreamTransfrom; 840 friend class StreamTransfrom;
1049 841
@@ -1109,63 +901,6 @@ public: @@ -1109,63 +901,6 @@ public:
1109 qFatal("whatever"); 901 qFatal("whatever");
1110 } 902 }
1111 903
1112 - // start processing, consider all templates in src a continuous  
1113 - // 'video'  
1114 - void projectUpdate(const TemplateList & src, TemplateList & dst)  
1115 - {  
1116 - dst = src;  
1117 -  
1118 - bool res = readStage->dataSource.open(dst);  
1119 - if (!res) {  
1120 - qDebug("stream failed to open %s", qPrintable(dst[0].file.name));  
1121 - return;  
1122 - }  
1123 -  
1124 - // Start the first thread in the stream.  
1125 - QWriteLocker lock(&readStage->statusLock);  
1126 - readStage->currentStatus = SingleThreadStage::STARTING;  
1127 -  
1128 - // We have to get a frame before starting the thread  
1129 - bool last_frame = false;  
1130 - FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame);  
1131 - if (firstFrame == NULL)  
1132 - qFatal("Failed to read first frame of video");  
1133 -  
1134 - readStage->startThread(firstFrame);  
1135 - lock.unlock();  
1136 -  
1137 - // Wait for the stream to process the last frame available from  
1138 - // the data source.  
1139 - bool wait_res = false;  
1140 - wait_res = readStage->dataSource.waitLast();  
1141 -  
1142 - // Now that there are no more incoming frames, call finalize  
1143 - // on each transform in turn to collect any last templates  
1144 - // they wish to issue.  
1145 - TemplateList final_output;  
1146 -  
1147 - // Push finalize through the stages  
1148 - for (int i=0; i < this->transforms.size(); i++)  
1149 - {  
1150 - TemplateList output_set;  
1151 - transforms[i]->finalize(output_set);  
1152 -  
1153 - for (int j=i+1; j < transforms.size();j++)  
1154 - {  
1155 - transforms[j]->projectUpdate(output_set);  
1156 - }  
1157 - final_output.append(output_set);  
1158 - }  
1159 -  
1160 - // dst is set to all output received by the final stage, along  
1161 - // with anything output via the calls to finalize.  
1162 - dst = collectionStage->getOutput();  
1163 - dst.append(final_output);  
1164 -  
1165 - foreach(ProcessingStage * stage, processingStages) {  
1166 - stage->reset();  
1167 - }  
1168 - }  
1169 904
1170 virtual void finalize(TemplateList & output) 905 virtual void finalize(TemplateList & output)
1171 { 906 {
@@ -1174,90 +909,8 @@ public: @@ -1174,90 +909,8 @@ public:
1174 // on all child transforms as part of projectUpdate 909 // on all child transforms as part of projectUpdate
1175 } 910 }
1176 911
1177 - // Create and link stages  
1178 - void init()  
1179 - {  
1180 - if (transforms.isEmpty()) return;  
1181 -  
1182 - for (int i=0; i < processingStages.size();i++)  
1183 - delete processingStages[i];  
1184 - processingStages.clear();  
1185 -  
1186 - // call CompositeTransform::init so that trainable is set  
1187 - // correctly.  
1188 - CompositeTransform::init();  
1189 -  
1190 - // We share a thread pool across streams attached to the same  
1191 - // parent tranform, retrieve or create a thread pool based  
1192 - // on our parent transform.  
1193 - QMutexLocker poolLock(&poolsAccess);  
1194 - QHash<QObject *, QThreadPool *>::Iterator it;  
1195 - if (!pools.contains(this->parent())) {  
1196 - it = pools.insert(this->parent(), new QThreadPool(this));  
1197 - it.value()->setMaxThreadCount(Globals->parallelism);  
1198 - }  
1199 - else it = pools.find(this->parent());  
1200 - threads = it.value();  
1201 - poolLock.unlock();  
1202 -  
1203 - // Are our children time varying or not? This decides whether  
1204 - // we run them in single threaded or multi threaded stages  
1205 - stage_variance.reserve(transforms.size());  
1206 - foreach (const br::Transform *transform, transforms) {  
1207 - stage_variance.append(transform->timeVarying());  
1208 - }  
1209 -  
1210 - // Additionally, we have a separate stage responsible for reading  
1211 - // frames from the data source  
1212 - readStage = new FirstStage(activeFrames);  
1213 -  
1214 - processingStages.push_back(readStage);  
1215 - readStage->stage_id = 0;  
1216 - readStage->stages = &this->processingStages;  
1217 - readStage->threads = this->threads;  
1218 -  
1219 - // Initialize and link a processing stage for each of our child  
1220 - // transforms.  
1221 - int next_stage_id = 1;  
1222 - bool prev_stage_variance = true;  
1223 - for (int i =0; i < transforms.size(); i++)  
1224 - {  
1225 - if (stage_variance[i])  
1226 - // Whether or not the previous stage is multi-threaded controls  
1227 - // the type of input buffer we need in a single threaded stage.  
1228 - processingStages.append(new SingleThreadStage(prev_stage_variance));  
1229 - else  
1230 - processingStages.append(new MultiThreadStage(Globals->parallelism));  
1231 -  
1232 - processingStages.last()->stage_id = next_stage_id++;  
1233 -  
1234 - // link nextStage pointers, the stage we just appeneded is i+1 since  
1235 - // the read stage was added before this loop  
1236 - processingStages[i]->nextStage = processingStages[i+1];  
1237 -  
1238 - processingStages.last()->stages = &this->processingStages;  
1239 - processingStages.last()->threads = this->threads;  
1240 -  
1241 - processingStages.last()->transform = transforms[i];  
1242 - prev_stage_variance = stage_variance[i];  
1243 - }  
1244 -  
1245 - // We also have the last stage, which just puts the output of the  
1246 - // previous stages on a template list.  
1247 - collectionStage = new LastStage(prev_stage_variance);  
1248 - processingStages.append(collectionStage);  
1249 - collectionStage->stage_id = next_stage_id;  
1250 - collectionStage->stages = &this->processingStages;  
1251 - collectionStage->threads = this->threads;  
1252 -  
1253 - // the last transform stage points to collection stage  
1254 - processingStages[processingStages.size() - 2]->nextStage = collectionStage;  
1255 -  
1256 - // And the collection stage points to the read stage, because this is  
1257 - // a ring buffer.  
1258 - collectionStage->nextStage = readStage;  
1259 - }  
1260 - 912 + void projectUpdate(const TemplateList & src, TemplateList & dst);
  913 + void init();
1261 ~DirectStreamTransform() 914 ~DirectStreamTransform()
1262 { 915 {
1263 // Delete all the stages 916 // Delete all the stages
@@ -1448,6 +1101,373 @@ private: @@ -1448,6 +1101,373 @@ private:
1448 1101
1449 BR_REGISTER(Transform, StreamTransform) 1102 BR_REGISTER(Transform, StreamTransform)
1450 1103
  1104 +// Given a templatelist as input, create appropriate data source for each
  1105 +// individual template
  1106 +class DataSourceManager : public DataSource
  1107 +{
  1108 +public:
  1109 + DataSourceManager(int activeFrames=100) : DataSource(activeFrames)
  1110 + {
  1111 + actualSource = NULL;
  1112 + }
  1113 +
  1114 + ~DataSourceManager()
  1115 + {
  1116 + close();
  1117 + }
  1118 +
  1119 + int size()
  1120 + {
  1121 + return this->allFrames.size();
  1122 + }
  1123 +
  1124 + void close()
  1125 + {
  1126 + if (actualSource) {
  1127 + actualSource->close();
  1128 + delete actualSource;
  1129 + actualSource = NULL;
  1130 + }
  1131 + }
  1132 +
  1133 + // We are used through a call to open(TemplateList)
  1134 + bool open(TemplateList & input, DirectStreamTransform::StreamModes _mode)
  1135 + {
  1136 + // Set up variables specific to us
  1137 + current_template_idx = 0;
  1138 + templates = input;
  1139 + mode = _mode;
  1140 +
  1141 + // Call datasourece::open on the first template to set up
  1142 + // state variables
  1143 + return DataSource::open(templates[current_template_idx]);
  1144 + }
  1145 + void projectUpdate(const TemplateList & src, TemplateList & dst);
  1146 +
  1147 + // Create an actual data source of appropriate type for this template
  1148 + // (initially called via the call to DataSource::open, called later
  1149 + // as we run out of frames on our templates).
  1150 + bool concreteOpen(Template & input)
  1151 + {
  1152 + close();
  1153 +
  1154 + bool open_res = false;
  1155 +
  1156 + // Input has no matrices? Its probably a video that hasn't been loaded yet
  1157 + if (mode == DirectStreamTransform::StreamVideo || mode ~= DirectStreamTransform::DistributeFrames && input.empty()) {
  1158 + actualSource = new VideoDataSource(0);
  1159 + open_res = actualSource->concreteOpen(input);
  1160 + }
  1161 + // If the input is not empty, we assume it is a set of frames already
  1162 + // in memory.
  1163 + else {
  1164 + actualSource = new TemplateDataSource(0);
  1165 + open_res = actualSource->concreteOpen(input);
  1166 + }
  1167 +
  1168 + // The data source failed to open
  1169 + if (!open_res) {
  1170 + delete actualSource;
  1171 + actualSource = NULL;
  1172 + return false;
  1173 + }
  1174 + return true;
  1175 + }
  1176 +
  1177 + bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }
  1178 +
  1179 +protected:
  1180 + // Index of the template in the templatelist we are currently reading from
  1181 + int current_template_idx;
  1182 + DirectStreamTransform::StreamModes mode;
  1183 + TemplateList templates;
  1184 + DataSource * actualSource;
  1185 + // Get the next frame, if we run out of frames on the current template
  1186 + // move on to the next one.
  1187 + bool getNext(FrameData & output)
  1188 + {
  1189 + bool res = actualSource->getNext(output);
  1190 + output.sequenceNumber = next_sequence_number;
  1191 +
  1192 + // OK we got a frame
  1193 + if (res) {
  1194 + // Override the sequence number set by actualSource
  1195 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  1196 + next_sequence_number++;
  1197 + if (output.data.last().last().empty())
  1198 + qDebug("broken matrix");
  1199 + return true;
  1200 + }
  1201 +
  1202 + // We didn't get a frame, try to move on to the next template.
  1203 + while(!res) {
  1204 + output.data.clear();
  1205 + current_template_idx++;
  1206 +
  1207 + // No more templates? We're done
  1208 + if (current_template_idx >= templates.size())
  1209 + return false;
  1210 +
  1211 + // open the next data source
  1212 + bool open_res = concreteOpen(templates[current_template_idx]);
  1213 + // We couldn't open it, give up? We could maybe continue here
  1214 + // but don't currently.
  1215 + if (!open_res)
  1216 + return false;
  1217 +
  1218 + // get a frame from the newly opened data source, if that fails
  1219 + // we continue to open the next one.
  1220 + res = actualSource->getNext(output);
  1221 + }
  1222 + // Finally, set the sequence number for the frame we actually return.
  1223 + output.sequenceNumber = next_sequence_number++;
  1224 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
  1225 +
  1226 + if (output.data.last().last().empty())
  1227 + qDebug("broken matrix");
  1228 +
  1229 + return res;
  1230 + }
  1231 +
  1232 +};
  1233 +
  1234 +// This stage reads new frames from the data source.
  1235 +class FirstStage : public SingleThreadStage
  1236 +{
  1237 +public:
  1238 + FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ }
  1239 +
  1240 + DataSourceManager dataSource;
  1241 +
  1242 + void reset()
  1243 + {
  1244 + dataSource.close();
  1245 + SingleThreadStage::reset();
  1246 + }
  1247 +
  1248 + FrameData * run(FrameData * input, bool & should_continue)
  1249 + {
  1250 + if (input == NULL)
  1251 + qFatal("NULL frame in input stage");
  1252 +
  1253 + // Can we enter the next stage?
  1254 + should_continue = nextStage->tryAcquireNextStage(input);
  1255 +
  1256 + // Try to get a frame from the datasource, we keep working on
  1257 + // the frame we have, but we will queue another job for the next
  1258 + // frame if a frame is currently available.
  1259 + QWriteLocker lock(&statusLock);
  1260 + bool last_frame = false;
  1261 + FrameData * newFrame = dataSource.tryGetFrame(last_frame);
  1262 +
  1263 + // Were we able to get a frame?
  1264 + if (newFrame) startThread(newFrame);
  1265 + // If not this stage will enter a stopped state.
  1266 + else {
  1267 + currentStatus = STOPPING;
  1268 + }
  1269 +
  1270 + lock.unlock();
  1271 +
  1272 + return input;
  1273 + }
  1274 +
  1275 + // The last stage, trying to access the first stage
  1276 + bool tryAcquireNextStage(FrameData *& input)
  1277 + {
  1278 + // Return the frame, was it the last one?
  1279 + bool was_last = dataSource.returnFrame(input);
  1280 + input = NULL;
  1281 +
  1282 + // OK we won't continue.
  1283 + if (was_last) {
  1284 + return false;
  1285 + }
  1286 +
  1287 + QReadLocker lock(&statusLock);
  1288 + // If the first stage is already active we will just end.
  1289 + if (currentStatus == STARTING)
  1290 + {
  1291 + return false;
  1292 + }
  1293 +
  1294 + // Otherwise we will try to continue, but to do so we have to
  1295 + // escalate the lock, and sadly there is no way to do so without
  1296 + // releasing the read-mode lock, and getting a new write-mode lock.
  1297 + lock.unlock();
  1298 +
  1299 + QWriteLocker writeLock(&statusLock);
  1300 + // currentStatus might have changed in the gap between releasing the read
  1301 + // lock and getting the write lock.
  1302 + if (currentStatus == STARTING)
  1303 + {
  1304 + return false;
  1305 + }
  1306 +
  1307 + bool last_frame = false;
  1308 + // Try to get a frame from the data source, if we get one we will
  1309 + // continue to the first stage.
  1310 + input = dataSource.tryGetFrame(last_frame);
  1311 +
  1312 + if (!input) {
  1313 + return false;
  1314 + }
  1315 +
  1316 + currentStatus = STARTING;
  1317 +
  1318 + return true;
  1319 + }
  1320 +
  1321 + void status(){
  1322 + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size());
  1323 + }
  1324 +
  1325 +
  1326 +};
  1327 +
  1328 +
  1329 +// start processing, consider all templates in src a continuous
  1330 +// 'video'
  1331 +void DirectStreamTransform::projectUpdate(const TemplateList & src, TemplateList & dst)
  1332 +{
  1333 + dst = src;
  1334 +
  1335 + bool res = readStage->dataSource.open(dst,readMode);
  1336 + if (!res) {
  1337 + qDebug("stream failed to open %s", qPrintable(dst[0].file.name));
  1338 + return;
  1339 + }
  1340 +
  1341 + // Start the first thread in the stream.
  1342 + QWriteLocker lock(&readStage->statusLock);
  1343 + readStage->currentStatus = SingleThreadStage::STARTING;
  1344 +
  1345 + // We have to get a frame before starting the thread
  1346 + bool last_frame = false;
  1347 + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame);
  1348 + if (firstFrame == NULL)
  1349 + qFatal("Failed to read first frame of video");
  1350 +
  1351 + readStage->startThread(firstFrame);
  1352 + lock.unlock();
  1353 +
  1354 + // Wait for the stream to process the last frame available from
  1355 + // the data source.
  1356 + bool wait_res = false;
  1357 + wait_res = readStage->dataSource.waitLast();
  1358 +
  1359 + // Now that there are no more incoming frames, call finalize
  1360 + // on each transform in turn to collect any last templates
  1361 + // they wish to issue.
  1362 + TemplateList final_output;
  1363 +
  1364 + // Push finalize through the stages
  1365 + for (int i=0; i < this->transforms.size(); i++)
  1366 + {
  1367 + TemplateList output_set;
  1368 + transforms[i]->finalize(output_set);
  1369 +
  1370 + for (int j=i+1; j < transforms.size();j++)
  1371 + {
  1372 + transforms[j]->projectUpdate(output_set);
  1373 + }
  1374 + final_output.append(output_set);
  1375 + }
  1376 +
  1377 + // dst is set to all output received by the final stage, along
  1378 + // with anything output via the calls to finalize.
  1379 + dst = collectionStage->getOutput();
  1380 + dst.append(final_output);
  1381 +
  1382 + foreach(ProcessingStage * stage, processingStages) {
  1383 + stage->reset();
  1384 + }
  1385 +}
  1386 +
  1387 +
  1388 +// Create and link stages
  1389 +void DirectStreamTransform::init()
  1390 +{
  1391 + if (transforms.isEmpty()) return;
  1392 +
  1393 + for (int i=0; i < processingStages.size();i++)
  1394 + delete processingStages[i];
  1395 + processingStages.clear();
  1396 +
  1397 + // call CompositeTransform::init so that trainable is set
  1398 + // correctly.
  1399 + CompositeTransform::init();
  1400 +
  1401 + // We share a thread pool across streams attached to the same
  1402 + // parent tranform, retrieve or create a thread pool based
  1403 + // on our parent transform.
  1404 + QMutexLocker poolLock(&poolsAccess);
  1405 + QHash<QObject *, QThreadPool *>::Iterator it;
  1406 + if (!pools.contains(this->parent())) {
  1407 + it = pools.insert(this->parent(), new QThreadPool(this->parent()));
  1408 + it.value()->setMaxThreadCount(Globals->parallelism);
  1409 + }
  1410 + else it = pools.find(this->parent());
  1411 + threads = it.value();
  1412 + poolLock.unlock();
  1413 +
  1414 + // Are our children time varying or not? This decides whether
  1415 + // we run them in single threaded or multi threaded stages
  1416 + stage_variance.reserve(transforms.size());
  1417 + foreach (const br::Transform *transform, transforms) {
  1418 + stage_variance.append(transform->timeVarying());
  1419 + }
  1420 +
  1421 + // Additionally, we have a separate stage responsible for reading
  1422 + // frames from the data source
  1423 + readStage = new FirstStage(activeFrames);
  1424 +
  1425 + processingStages.push_back(readStage);
  1426 + readStage->stage_id = 0;
  1427 + readStage->stages = &this->processingStages;
  1428 + readStage->threads = this->threads;
  1429 +
  1430 + // Initialize and link a processing stage for each of our child
  1431 + // transforms.
  1432 + int next_stage_id = 1;
  1433 + bool prev_stage_variance = true;
  1434 + for (int i =0; i < transforms.size(); i++)
  1435 + {
  1436 + if (stage_variance[i])
  1437 + // Whether or not the previous stage is multi-threaded controls
  1438 + // the type of input buffer we need in a single threaded stage.
  1439 + processingStages.append(new SingleThreadStage(prev_stage_variance));
  1440 + else
  1441 + processingStages.append(new MultiThreadStage(Globals->parallelism));
  1442 +
  1443 + processingStages.last()->stage_id = next_stage_id++;
  1444 +
  1445 + // link nextStage pointers, the stage we just appeneded is i+1 since
  1446 + // the read stage was added before this loop
  1447 + processingStages[i]->nextStage = processingStages[i+1];
  1448 +
  1449 + processingStages.last()->stages = &this->processingStages;
  1450 + processingStages.last()->threads = this->threads;
  1451 +
  1452 + processingStages.last()->transform = transforms[i];
  1453 + prev_stage_variance = stage_variance[i];
  1454 + }
  1455 +
  1456 + // We also have the last stage, which just puts the output of the
  1457 + // previous stages on a template list.
  1458 + collectionStage = new LastStage(prev_stage_variance);
  1459 + processingStages.append(collectionStage);
  1460 + collectionStage->stage_id = next_stage_id;
  1461 + collectionStage->stages = &this->processingStages;
  1462 + collectionStage->threads = this->threads;
  1463 +
  1464 + // the last transform stage points to collection stage
  1465 + processingStages[processingStages.size() - 2]->nextStage = collectionStage;
  1466 +
  1467 + // And the collection stage points to the read stage, because this is
  1468 + // a ring buffer.
  1469 + collectionStage->nextStage = readStage;
  1470 +}
1451 1471
1452 1472
1453 } // namespace br 1473 } // namespace br