Commit 25d5b9804d00223e46b525b0b19ca1b6395c4354

Authored by Charles Otto
1 parent 5e02aadf

Add basic support for online video processing

Add StreamTransform which represents applying a series of transforms to some
sequential input (e.g. a video). Handles reading video frames in and processing
them as they become available (rather than loading the entire video into memory
before processing it). The basic idea is a pipeline model, every child
transform of StreamTransform is run by a separate thread, (as is reading in
video frames). A thread waits for a frame as input, when input becomes
available performs its operation, and then places the frame on the following
stage's input buffer. Single threaded processing is not yet implemented.

Moved CompositeTransform to openbr_plugin.h
Add ContractTransform -- combines all input templates into a single template
Preliminary handling of time varying transforms in DistributeTemplateTransform,
the current behavior is not actually correct but will work for the single input
case.
sdk/openbr_plugin.h
@@ -1113,6 +1113,7 @@ class BR_EXPORT TimeVaryingTransform : public Transform @@ -1113,6 +1113,7 @@ class BR_EXPORT TimeVaryingTransform : public Transform
1113 { 1113 {
1114 Q_OBJECT 1114 Q_OBJECT
1115 1115
  1116 +public:
1116 virtual bool timeVarying() const { return true; } 1117 virtual bool timeVarying() const { return true; }
1117 1118
1118 virtual void project(const Template &src, Template &dst) const 1119 virtual void project(const Template &src, Template &dst) const
@@ -1127,6 +1128,23 @@ class BR_EXPORT TimeVaryingTransform : public Transform @@ -1127,6 +1128,23 @@ class BR_EXPORT TimeVaryingTransform : public Transform
1127 (void) dst; (void) src; 1128 (void) dst; (void) src;
1128 } 1129 }
1129 1130
  1131 + // Get a compile failure if this isn't here to go along with the other
  1132 + // projectUpdate, no idea why
  1133 + virtual void projectUpdate(const Template & src, Template & dst)
  1134 + {
  1135 + (void) src; (void) dst;
  1136 + qFatal("do something useful");
  1137 + }
  1138 +
  1139 + virtual void projectUpdate(const TemplateList &src, TemplateList &dst)
  1140 + {
  1141 + foreach (const Template & src_part, src) {
  1142 + Template out;
  1143 + projectUpdate(src_part, out);
  1144 + dst.append(out);
  1145 + }
  1146 + }
  1147 +
1130 protected: 1148 protected:
1131 TimeVaryingTransform(bool independent = true, bool trainable = true) : Transform(independent, trainable) {} 1149 TimeVaryingTransform(bool independent = true, bool trainable = true) : Transform(independent, trainable) {}
1132 }; 1150 };
@@ -1170,6 +1188,51 @@ protected: @@ -1170,6 +1188,51 @@ protected:
1170 UntrainableMetaTransform() : UntrainableTransform(false) {} 1188 UntrainableMetaTransform() : UntrainableTransform(false) {}
1171 }; 1189 };
1172 1190
  1191 +/*!
  1192 + * \brief A MetaTransform that aggregates some sub-transforms
  1193 + */
  1194 +class BR_EXPORT CompositeTransform : public TimeVaryingTransform
  1195 +{
  1196 + Q_OBJECT
  1197 +
  1198 +public:
  1199 + Q_PROPERTY(QList<br::Transform*> transforms READ get_transforms WRITE set_transforms RESET reset_transforms)
  1200 + BR_PROPERTY(QList<br::Transform*>, transforms, QList<br::Transform*>())
  1201 +
  1202 + virtual void project(const Template &src, Template &dst) const
  1203 + {
  1204 + if (timeVarying()) qFatal("No const project defined for time-varying transform");
  1205 + _project(src, dst);
  1206 + }
  1207 +
  1208 + virtual void project(const TemplateList &src, TemplateList &dst) const
  1209 + {
  1210 + if (timeVarying()) qFatal("No const project defined for time-varying transform");
  1211 + _project(src, dst);
  1212 + }
  1213 +
  1214 + bool timeVarying() const { return isTimeVarying; }
  1215 +
  1216 + void init()
  1217 + {
  1218 + isTimeVarying = false;
  1219 + foreach (const br::Transform *transform, transforms) {
  1220 + if (transform->timeVarying()) {
  1221 + isTimeVarying = true;
  1222 + break;
  1223 + }
  1224 + }
  1225 + }
  1226 +
  1227 +protected:
  1228 + bool isTimeVarying;
  1229 +
  1230 + virtual void _project(const Template & src, Template & dst) const = 0;
  1231 + virtual void _project(const TemplateList & src, TemplateList & dst) const = 0;
  1232 +
  1233 + CompositeTransform() : TimeVaryingTransform(false) {}
  1234 +};
  1235 +
1173 /*! @}*/ 1236 /*! @}*/
1174 1237
1175 /*! 1238 /*!
sdk/plugins/meta.cpp
@@ -86,51 +86,6 @@ static void incrementStep() @@ -86,51 +86,6 @@ static void incrementStep()
86 } 86 }
87 87
88 /*! 88 /*!
89 - * \brief A MetaTransform that aggregates some sub-transforms  
90 - */  
91 -class BR_EXPORT CompositeTransform : public TimeVaryingTransform  
92 -{  
93 - Q_OBJECT  
94 -  
95 -public:  
96 - Q_PROPERTY(QList<br::Transform*> transforms READ get_transforms WRITE set_transforms RESET reset_transforms)  
97 - BR_PROPERTY(QList<br::Transform*>, transforms, QList<br::Transform*>())  
98 -  
99 - virtual void project(const Template &src, Template &dst) const  
100 - {  
101 - if (timeVarying()) qFatal("No const project defined for time-varying transform");  
102 - _project(src, dst);  
103 - }  
104 -  
105 - virtual void project(const TemplateList &src, TemplateList &dst) const  
106 - {  
107 - if (timeVarying()) qFatal("No const project defined for time-varying transform");  
108 - _project(src, dst);  
109 - }  
110 -  
111 - bool timeVarying() const { return isTimeVarying; }  
112 -  
113 - void init()  
114 - {  
115 - isTimeVarying = false;  
116 - foreach (const br::Transform *transform, transforms) {  
117 - if (transform->timeVarying()) {  
118 - isTimeVarying = true;  
119 - break;  
120 - }  
121 - }  
122 - }  
123 -  
124 -protected:  
125 - bool isTimeVarying;  
126 -  
127 - virtual void _project(const Template & src, Template & dst) const = 0;  
128 - virtual void _project(const TemplateList & src, TemplateList & dst) const = 0;  
129 -  
130 - CompositeTransform() : TimeVaryingTransform(false) {}  
131 -};  
132 -  
133 -/*!  
134 * \ingroup Transforms 89 * \ingroup Transforms
135 * \brief Transforms in series. 90 * \brief Transforms in series.
136 * \author Josh Klontz \cite jklontz 91 * \author Josh Klontz \cite jklontz
@@ -284,6 +239,38 @@ BR_REGISTER(Transform, ExpandTransform) @@ -284,6 +239,38 @@ BR_REGISTER(Transform, ExpandTransform)
284 239
285 /*! 240 /*!
286 * \ingroup transforms 241 * \ingroup transforms
  242 + * \brief It's like the opposite of ExpandTransform, but not really
  243 + * \author Charles Otto \cite caotto
  244 + *
  245 + * Given a set of templatelists as input, concatenate them onto a single Template
  246 + */
  247 +class ContractTransform : public UntrainableMetaTransform
  248 +{
  249 + Q_OBJECT
  250 +
  251 + virtual void project(const TemplateList &src, TemplateList &dst) const
  252 + {
  253 + //dst = Expanded(src);
  254 + Template out;
  255 +
  256 + foreach (const Template & t, src) {
  257 + out.merge(t);
  258 + }
  259 + dst.clear();
  260 + dst.append(out);
  261 + }
  262 +
  263 + virtual void project(const Template & src, Template & dst) const
  264 + {
  265 + qFatal("this has gone bad");
  266 + (void) src; (void) dst;
  267 + }
  268 +};
  269 +
  270 +BR_REGISTER(Transform, ContractTransform)
  271 +
  272 +/*!
  273 + * \ingroup transforms
287 * \brief Transforms in parallel. 274 * \brief Transforms in parallel.
288 * \author Josh Klontz \cite jklontz 275 * \author Josh Klontz \cite jklontz
289 * 276 *
@@ -626,6 +613,15 @@ public: @@ -626,6 +613,15 @@ public:
626 // Process the single elemnt templates in parallel if parallelism is enabled. 613 // Process the single elemnt templates in parallel if parallelism is enabled.
627 void project(const TemplateList &src, TemplateList &dst) const 614 void project(const TemplateList &src, TemplateList &dst) const
628 { 615 {
  616 + // Little ugly, but if we own a timeVaryingTransform and this gets called
  617 + // cast off the const modifier and use projectUpdate. This allows us to
  618 + // act as a single point of entry.
  619 + if (transform->timeVarying())
  620 + {
  621 + DistributeTemplateTransform * non_const = (DistributeTemplateTransform *) this;
  622 + non_const->projectUpdate(src,dst);
  623 + return;
  624 + }
629 // Pre-allocate output for each template 625 // Pre-allocate output for each template
630 QList<TemplateList> output_buffer; 626 QList<TemplateList> output_buffer;
631 output_buffer.reserve(src.size()); 627 output_buffer.reserve(src.size());
@@ -655,6 +651,15 @@ public: @@ -655,6 +651,15 @@ public:
655 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]); 651 for (int i=0; i<src.size(); i++) dst.append(output_buffer[i]);
656 } 652 }
657 653
  654 + void projectUpdate(const TemplateList &src, TemplateList &dst)
  655 + {
  656 + if (!transform->timeVarying()) {
  657 + this->project(src, dst);
  658 + return;
  659 + }
  660 + this->transform->projectUpdate(src, dst);
  661 + }
  662 +
658 663
659 private: 664 private:
660 665
sdk/plugins/stream.cpp 0 โ†’ 100644
  1 +#define QT_NO_DEBUG_OUTPUT
  2 +
  3 +#include <openbr_plugin.h>
  4 +#include <QReadWriteLock>
  5 +#include <QWaitCondition>
  6 +#include <QThreadPool>
  7 +
  8 +#include "core/common.h"
  9 +#include "core/opencvutils.h"
  10 +#include "core/qtutils.h"
  11 +
  12 +#include "opencv/highgui.h"
  13 +
  14 +#include <iostream>
  15 +
  16 +using namespace cv;
  17 +
  18 +namespace br
  19 +{
  20 +
  21 +class FrameData
  22 +{
  23 +public:
  24 + int sequenceNumber;
  25 + TemplateList data;
  26 +};
  27 +
  28 +// A buffer shared between adjacent processing stages in a stream
  29 +class SharedBuffer
  30 +{
  31 +public:
  32 + SharedBuffer(int _maxItems = 200) : maxItems(_maxItems) {}
  33 + virtual ~SharedBuffer() {}
  34 +
  35 + virtual void addItem(FrameData * input)=0;
  36 +
  37 + virtual FrameData * getItem()=0;
  38 +
  39 + int getMaxItems() { return maxItems; }
  40 +
  41 + virtual void stoppedInput() =0;
  42 + virtual void startInput() = 0;
  43 +protected:
  44 + int maxItems;
  45 +};
  46 +
  47 +// For 1 - 1 boundaries, a buffer class with a single shared buffer, a mutex
  48 +// is used to serialize all access to the buffer.
  49 +class SingleBuffer : public SharedBuffer
  50 +{
  51 +public:
  52 + SingleBuffer(unsigned _maxItems = 20) : SharedBuffer(_maxItems) { no_input = false; }
  53 +
  54 + void stoppedInput()
  55 + {
  56 + QMutexLocker bufferLock(&bufferGuard);
  57 + no_input = true;
  58 + // Release anything waiting for input items.
  59 + availableInput.wakeAll();
  60 + }
  61 +
  62 + // There will be more input
  63 + void startInput()
  64 + {
  65 + QMutexLocker bufferLock(&bufferGuard);
  66 + no_input = false;
  67 + }
  68 +
  69 + void addItem(FrameData * input)
  70 + {
  71 + QMutexLocker bufferLock(&bufferGuard);
  72 +
  73 + // If the buffer is too full, wait for space to become available
  74 + if (buffer.size() >= maxItems) {
  75 + availableOutputSpace.wait(&bufferGuard);
  76 + }
  77 +
  78 + buffer.append(input);
  79 +
  80 + // Wait for certain # of items?
  81 + availableInput.wakeOne();
  82 + }
  83 +
  84 + FrameData * getItem()
  85 + {
  86 + QMutexLocker bufferLock(&bufferGuard);
  87 +
  88 + if (buffer.empty()) {
  89 + // If no further items will come we are done here
  90 + if (no_input)
  91 + return NULL;
  92 + // Wait for an item
  93 + availableInput.wait(&bufferGuard);
  94 + }
  95 +
  96 + // availableInput was signalled, but the buffer is still empty? We're done here.
  97 + if (buffer.empty())
  98 + return NULL;
  99 +
  100 + FrameData * output = buffer.first();
  101 + buffer.removeFirst();
  102 + if (buffer.size() < maxItems / 2)
  103 + availableOutputSpace.wakeAll();
  104 + return output;
  105 + }
  106 +
  107 +private:
  108 + QMutex bufferGuard;
  109 + QWaitCondition availableInput;
  110 + QWaitCondition availableOutputSpace;
  111 + bool no_input;
  112 +
  113 + QList<FrameData *> buffer;
  114 +};
  115 +
  116 +// Interface for sequentially getting data from some data source.
  117 +// Initialized off of a template, can represent a video file (stored in the template's filename)
  118 +// or a set of images already loaded into memory stored as multiple matrices in an input template.
  119 +class DataSource
  120 +{
  121 +public:
  122 + DataSource() {}
  123 + virtual ~DataSource() {}
  124 +
  125 + virtual FrameData * getNext() = 0;
  126 + virtual void close() = 0;
  127 + virtual bool open(Template & input) = 0;
  128 + virtual bool isOpen() = 0;
  129 +};
  130 +
  131 +// Read a video frame by frame using cv::VideoCapture
  132 +class VideoDataSource : public DataSource
  133 +{
  134 +public:
  135 + VideoDataSource() {}
  136 +
  137 + FrameData * getNext()
  138 + {
  139 + if (!isOpen())
  140 + return NULL;
  141 +
  142 + FrameData * output = new FrameData();
  143 + output->data.append(Template(basis.file));
  144 + output->data.last().append(cv::Mat());
  145 +
  146 + output->sequenceNumber = next_idx;
  147 + next_idx++;
  148 +
  149 + bool res = video.read(output->data.last().last());
  150 + if (!res) {
  151 + delete output;
  152 + return NULL;
  153 + }
  154 + return output;
  155 + }
  156 +
  157 + bool open(Template &input)
  158 + {
  159 + next_idx = 0;
  160 + basis = input;
  161 + video.open(input.file.name.toStdString());
  162 + return video.isOpened();
  163 + }
  164 +
  165 + bool isOpen() { return video.isOpened(); }
  166 +
  167 + void close() { video.release(); }
  168 +
  169 +private:
  170 + cv::VideoCapture video;
  171 + Template basis;
  172 + int next_idx;
  173 +};
  174 +
  175 +// Given a template as input, return its matrices one by one on subsequent calls
  176 +// to getNext
  177 +class TemplateDataSource : public DataSource
  178 +{
  179 +public:
  180 + TemplateDataSource() { current_idx = INT_MAX; }
  181 +
  182 + FrameData * getNext()
  183 + {
  184 + if (!isOpen())
  185 + return NULL;
  186 +
  187 + FrameData * output = new FrameData();
  188 + output->data.append(basis[current_idx]);
  189 + current_idx++;
  190 +
  191 + output->sequenceNumber = next_sequence;
  192 + next_sequence++;
  193 +
  194 + return output;
  195 + }
  196 +
  197 + bool open(Template &input)
  198 + {
  199 + basis = input;
  200 + current_idx = 0;
  201 + next_sequence = 0;
  202 + return isOpen();
  203 + }
  204 +
  205 + bool isOpen() { return current_idx < basis.size() ; }
  206 +
  207 + void close()
  208 + {
  209 + current_idx = INT_MAX;
  210 + basis.clear();
  211 + }
  212 +
  213 +private:
  214 + Template basis;
  215 + int current_idx;
  216 + int next_sequence;
  217 +};
  218 +
  219 +// Given a template as input, create a VideoDataSource or a TemplateDataSource
  220 +// depending on whether or not it looks like the input template has already
  221 +// loaded frames into memory.
  222 +class DataSourceManager : public DataSource
  223 +{
  224 +public:
  225 + DataSourceManager()
  226 + {
  227 + actualSource = NULL;
  228 + }
  229 +
  230 + ~DataSourceManager()
  231 + {
  232 + close();
  233 + }
  234 +
  235 + FrameData * getNext()
  236 + {
  237 + if (!isOpen()) return NULL;
  238 + return actualSource->getNext();
  239 + }
  240 +
  241 + void close()
  242 + {
  243 + if (actualSource) {
  244 + actualSource->close();
  245 + delete actualSource;
  246 + actualSource = NULL;
  247 + }
  248 + }
  249 +
  250 + bool open(Template & input)
  251 + {
  252 + close();
  253 + bool open_res = false;
  254 + // Input has no matrices? Its probably a video that hasn't been loaded yet
  255 + if (input.empty()) {
  256 + actualSource = new VideoDataSource();
  257 + open_res = actualSource->open(input);
  258 + qDebug("created video resource status %d", open_res);
  259 + }
  260 + else {
  261 + // create frame dealer
  262 + actualSource = new TemplateDataSource();
  263 + open_res = actualSource->open(input);
  264 + }
  265 + if (!isOpen()) {
  266 + delete actualSource;
  267 + actualSource = NULL;
  268 + return false;
  269 + }
  270 + return true;
  271 + }
  272 +
  273 + bool isOpen() { return !actualSource ? false : actualSource->isOpen(); }
  274 +
  275 +protected:
  276 + DataSource * actualSource;
  277 +};
  278 +
  279 +class ProcessingStage : public QRunnable
  280 +{
  281 + friend class StreamTransform;
  282 +public:
  283 + ProcessingStage()
  284 + {
  285 + setAutoDelete(false);
  286 + }
  287 +
  288 + void markStart()
  289 + {
  290 + QMutexLocker lock(&stoppedGuard);
  291 + stopped = false;
  292 + }
  293 +
  294 + void waitStop()
  295 + {
  296 + stoppedGuard.lock();
  297 + while (!stopped)
  298 + {
  299 + waitStopped.wait(&stoppedGuard);
  300 + }
  301 + stoppedGuard.unlock();
  302 + }
  303 +
  304 +protected:
  305 + void markStop()
  306 + {
  307 + QMutexLocker lock(&stoppedGuard);
  308 + stopped = true;
  309 + this->waitStopped.wakeAll();
  310 + }
  311 + QMutex stoppedGuard;
  312 + QWaitCondition waitStopped;
  313 + bool stopped;
  314 +
  315 + SharedBuffer * inputBuffer;
  316 + SharedBuffer * outputBuffer;
  317 + Transform * transform;
  318 + int stage_id;
  319 +
  320 +public:
  321 + // We should start, and enter a wait on input data
  322 + void run()
  323 + {
  324 + forever
  325 + {
  326 + FrameData * currentItem = inputBuffer->getItem();
  327 + if (currentItem == NULL)
  328 + break;
  329 + // Project the input we got
  330 + transform->projectUpdate(currentItem->data);
  331 + // Add the result to the ouptut buffer
  332 + outputBuffer->addItem(currentItem);
  333 + }
  334 + markStop();
  335 + }
  336 +
  337 +};
  338 +
  339 +// No input buffer, instead we draw templates from some data source
  340 +// Will be operated by the main thread for the stream
  341 +class FirstStage : public ProcessingStage
  342 +{
  343 +public:
  344 + DataSourceManager dataSource;
  345 + // Start drawing frames from the datasource.
  346 + void run()
  347 + {
  348 + forever
  349 + {
  350 + FrameData * aFrame = dataSource.getNext();
  351 + if (aFrame == NULL)
  352 + break;
  353 + outputBuffer->addItem(aFrame);
  354 + }
  355 + this->markStop();
  356 + }
  357 +};
  358 +
  359 +class LastStage : public ProcessingStage
  360 +{
  361 +public:
  362 + TemplateList getOutput()
  363 + {
  364 + return collectedOutput;
  365 + }
  366 +
  367 +private:
  368 + TemplateList collectedOutput;
  369 +public:
  370 + void run()
  371 + {
  372 + forever
  373 + {
  374 + // Wait for input
  375 + FrameData * frame = inputBuffer->getItem();
  376 + if (frame == NULL)
  377 + break;
  378 + // Just put the item on collectedOutput
  379 + collectedOutput.append(frame->data);
  380 + delete frame;
  381 + }
  382 + this->markStop();
  383 + }
  384 +};
  385 +
  386 +class StreamTransform : public CompositeTransform
  387 +{
  388 + Q_OBJECT
  389 +public:
  390 + void train(const TemplateList & data)
  391 + {
  392 + foreach(Transform * transform, transforms) {
  393 + transform->train(data);
  394 + }
  395 + }
  396 +
  397 + bool timeVarying() const { return true; }
  398 +
  399 + void project(const Template &src, Template &dst) const
  400 + {
  401 + (void) src; (void) dst;
  402 + qFatal("nope");
  403 + }
  404 + void project(const TemplateList & src, TemplateList & dst) const
  405 + {
  406 + (void) src; (void) dst;
  407 + qFatal("nope");
  408 + }
  409 +
  410 + void projectUpdate(const Template &src, Template &dst)
  411 + {
  412 + (void) src; (void) dst;
  413 + qFatal("whatever");
  414 + }
  415 +
  416 + // start processing
  417 + void projectUpdate(const TemplateList & src, TemplateList & dst)
  418 + {
  419 + if (src.size() != 1)
  420 + qFatal("Expected single template input to stream");
  421 +
  422 + dst = src;
  423 + bool res = readStage.dataSource.open(dst[0]);
  424 + if (!res) {
  425 + qWarning("failed to stream template %s", qPrintable(dst[0].file.name));
  426 + return;
  427 + }
  428 +
  429 + // Tell all buffers to expect input
  430 + for (int i=0; i < sharedBuffers.size(); i++) {
  431 + sharedBuffers[i]->startInput();
  432 + }
  433 +
  434 + // Start our processing stages
  435 + for (int i=0; i < this->processingStages.size(); i++) {
  436 + processingStages[i]->markStart();
  437 + processingThreads.start(processingStages[i]);
  438 + }
  439 +
  440 + // Start the final stage
  441 + collectionStage.markStart();
  442 + processingThreads.start(&collectionStage);
  443 +
  444 + // Run the read stage ourselves
  445 + readStage.run();
  446 +
  447 + // The read stage has stopped (since we ran the read stage).
  448 + // Step over the buffers, and call stoppedInput to tell the stage
  449 + // reading from each buffer that no more frames will be added after
  450 + // the current ones run out, then wait for the thread to finish.
  451 + for (int i =0; i < (sharedBuffers.size() - 1); i++) {
  452 + // Indicate that no more input will be available
  453 + sharedBuffers[i]->stoppedInput();
  454 +
  455 + // Wait for the thread to finish.
  456 + this->processingStages[i]->waitStop();
  457 + }
  458 + // Wait for the collection stage to finish
  459 + sharedBuffers.last()->stoppedInput();
  460 + collectionStage.waitStop();
  461 +
  462 + // dst is set to all output received by the final stage
  463 + dst = collectionStage.getOutput();
  464 + }
  465 +
  466 + virtual void finalize(TemplateList & output)
  467 + {
  468 + (void) output;
  469 + // Not handling this yet -cao
  470 + }
  471 +
  472 + // Create and link stages
  473 + void init()
  474 + {
  475 + // Set up the thread pool, 1 stage for each transform, as well as first
  476 + // and last stages, but the first stage is operated by the thread that
  477 + // calls project so the pool only needs nTransforms+1 total.
  478 + processingThreads.setMaxThreadCount(transforms.size() + 1);
  479 +
  480 + stage_variance.reserve(transforms.size());
  481 + foreach (const br::Transform *transform, transforms) {
  482 + stage_variance.append(transform->timeVarying());
  483 + }
  484 +
  485 + // buffer 0 -- output buffer for the read stage
  486 + sharedBuffers.append(new SingleBuffer());
  487 + readStage.outputBuffer = sharedBuffers.last();
  488 + readStage.stage_id = 0;
  489 +
  490 + int next_stage_id = 1;
  491 +
  492 + int lastBufferIdx = 0;
  493 + for (int i =0; i < transforms.size(); i++)
  494 + {
  495 + // Set up this stage
  496 + processingStages.append(new ProcessingStage());
  497 +
  498 + processingStages.last()->stage_id = next_stage_id++;
  499 + processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx];
  500 + lastBufferIdx++;
  501 +
  502 + sharedBuffers.append(new SingleBuffer());
  503 + processingStages.last()->outputBuffer = sharedBuffers.last();
  504 + processingStages.last()->transform = transforms[i];
  505 + }
  506 +
  507 + collectionStage.inputBuffer = sharedBuffers.last();
  508 + collectionStage.stage_id = next_stage_id;
  509 + }
  510 +
  511 + ~StreamTransform()
  512 + {
  513 + for (int i = 0; i < processingStages.size(); i++) {
  514 + delete processingStages[i];
  515 + }
  516 + for (int i = 0; i < sharedBuffers.size(); i++) {
  517 + delete sharedBuffers[i];
  518 + }
  519 +
  520 + }
  521 +
  522 +protected:
  523 + QList<bool> stage_variance;
  524 +
  525 + FirstStage readStage;
  526 + LastStage collectionStage;
  527 +
  528 + QList<ProcessingStage *> processingStages;
  529 + QList<SharedBuffer *> sharedBuffers;
  530 +
  531 + QThreadPool processingThreads;
  532 +
  533 + void _project(const Template &src, Template &dst) const
  534 + {
  535 + (void) src; (void) dst;
  536 + qFatal("nope");
  537 + }
  538 + void _project(const TemplateList & src, TemplateList & dst) const
  539 + {
  540 + (void) src; (void) dst;
  541 + qFatal("nope");
  542 + }
  543 +};
  544 +
  545 +BR_REGISTER(Transform, StreamTransform)
  546 +
  547 +
  548 +} // namespace br
  549 +
  550 +#include "stream.moc"
  551 +