Commit d9adaaadcab3a608aa87c9a41f882722dc40a83b

Authored by Charles Otto
1 parent b52a58b6

Move to a non-blocking model for StreamTransform

Rather than having threads at each stage block, and wait for input, use the
global thread pool to only execute stages when they have input.
Showing 1 changed file with 247 additions and 271 deletions
openbr/plugins/stream.cpp
... ... @@ -3,7 +3,7 @@
3 3 #include <QThreadPool>
4 4 #include <QSemaphore>
5 5 #include <QMap>
6   -#include <opencv/highgui.h>
  6 +#include <QtConcurrent>
7 7 #include <openbr/openbr_plugin.h>
8 8  
9 9 #include "openbr/core/common.h"
... ... @@ -33,70 +33,7 @@ public:
33 33  
34 34 virtual void addItem(FrameData * input)=0;
35 35  
36   - virtual FrameData * getItem()=0;
37   -
38   - virtual void stoppedInput() =0;
39   - virtual void startInput() = 0;
40   -};
41   -
42   -// For 1 - n boundaries, a buffer class with a single shared buffer, a mutex
43   -// is used to serialize all access to the buffer.
44   -class SingleBuffer : public SharedBuffer
45   -{
46   -public:
47   - SingleBuffer() { no_input = false; }
48   -
49   - void stoppedInput()
50   - {
51   - QMutexLocker bufferLock(&bufferGuard);
52   - no_input = true;
53   - // Release anything waiting for input items.
54   - availableInput.wakeAll();
55   - }
56   -
57   - // There will be more input
58   - void startInput()
59   - {
60   - QMutexLocker bufferLock(&bufferGuard);
61   - no_input = false;
62   - }
63   -
64   - void addItem(FrameData * input)
65   - {
66   - QMutexLocker bufferLock(&bufferGuard);
67   -
68   - buffer.append(input);
69   -
70   - availableInput.wakeOne();
71   - }
72   -
73   - FrameData * getItem()
74   - {
75   - QMutexLocker bufferLock(&bufferGuard);
76   -
77   - if (buffer.empty()) {
78   - // If no further items will come we are done here
79   - if (no_input)
80   - return NULL;
81   - // Wait for an item
82   - availableInput.wait(&bufferGuard);
83   - }
84   -
85   - // availableInput was signalled, but the buffer is still empty? We're done here.
86   - if (buffer.empty())
87   - return NULL;
88   -
89   - FrameData * output = buffer.first();
90   - buffer.removeFirst();
91   - return output;
92   - }
93   -
94   -private:
95   - QMutex bufferGuard;
96   - QWaitCondition availableInput;
97   - bool no_input;
98   -
99   - QList<FrameData *> buffer;
  36 + virtual FrameData * tryGetItem()=0;
100 37 };
101 38  
102 39 // for n - 1 boundaries, multiple threads call addItem, the frames are
... ... @@ -107,56 +44,26 @@ class SequencingBuffer : public SharedBuffer
107 44 public:
108 45 SequencingBuffer()
109 46 {
110   - no_input = false;
111 47 next_target = 0;
112 48 }
113 49  
114   - void stoppedInput()
115   - {
116   - QMutexLocker bufferLock(&bufferGuard);
117   - no_input = true;
118   - // Release anything waiting for input items.
119   - availableInput.wakeAll();
120   - }
121   -
122   - // There will be more input
123   - void startInput()
124   - {
125   - QMutexLocker bufferLock(&bufferGuard);
126   - no_input = false;
127   - }
128   -
129 50 void addItem(FrameData * input)
130 51 {
131 52 QMutexLocker bufferLock(&bufferGuard);
132 53  
133 54 buffer.insert(input->sequenceNumber, input);
134   -
135   - if (input->sequenceNumber == next_target) {
136   - availableInput.wakeOne();
137   - }
138 55 }
139 56  
140   - FrameData * getItem()
  57 + FrameData * tryGetItem()
141 58 {
142 59 QMutexLocker bufferLock(&bufferGuard);
143 60  
144 61 if (buffer.empty() || buffer.begin().key() != this->next_target) {
145   - if (buffer.empty() && no_input) {
146   - next_target = 0;
147   - return NULL;
148   - }
149   - availableInput.wait(&bufferGuard);
150   - }
151   -
152   - // availableInput was signalled, but the buffer is empty? We're done here.
153   - if (buffer.empty()) {
154   - next_target = 0;
155 62 return NULL;
156 63 }
157 64  
158 65 QMap<int, FrameData *>::Iterator result = buffer.begin();
159   - //next_target++;
  66 +
160 67 if (next_target != result.value()->sequenceNumber) {
161 68 qWarning("mismatched targets!");
162 69 }
... ... @@ -170,9 +77,6 @@ public:
170 77  
171 78 private:
172 79 QMutex bufferGuard;
173   - QWaitCondition availableInput;
174   - bool no_input;
175   -
176 80 int next_target;
177 81  
178 82 QMap<int, FrameData *> buffer;
... ... @@ -192,31 +96,16 @@ public:
192 96 outputBuffer = &buffer2;
193 97 }
194 98  
195   - void stoppedInput()
196   - {
197   - QWriteLocker bufferLock(&bufferGuard);
198   - no_input = true;
199   - // Release anything waiting for input items.
200   - availableInput.wakeAll();
201   - }
202   -
203   - // There will be more input
204   - void startInput()
205   - {
206   - QWriteLocker bufferLock(&bufferGuard);
207   - no_input = false;
208   - }
209 99  
210 100 // called from the producer thread
211 101 void addItem(FrameData * input)
212 102 {
213 103 QReadLocker readLock(&bufferGuard);
214 104 inputBuffer->append(input);
215   - availableInput.wakeOne();
216 105 }
217 106  
218   - // Called from the consumer thread
219   - FrameData * getItem() {
  107 + FrameData * tryGetItem()
  108 + {
220 109 QReadLocker readLock(&bufferGuard);
221 110  
222 111 // There is something for us to get
... ... @@ -233,15 +122,7 @@ public:
233 122  
234 123 // Nothing on the input buffer either?
235 124 if (inputBuffer->empty()) {
236   - // If nothing else is coming, return null
237   - if (no_input)
238   - return NULL;
239   - //otherwise, wait on the input buffer
240   - availableInput.wait(&bufferGuard);
241   - // Did we get woken up because no more input is coming? if so
242   - // we're done here
243   - if (no_input && inputBuffer->empty())
244   - return NULL;
  125 + return NULL;
245 126 }
246 127  
247 128 // input buffer is non-empty, so swap the buffers
... ... @@ -259,10 +140,7 @@ private:
259 140 // removing from this buffer can remove things from the current
260 141 // output buffer if it has a read lock, or swap the buffers if it
261 142 // has a write lock.
262   - // Checking/modifying no_input requires a write lock.
263 143 QReadWriteLock bufferGuard;
264   - QWaitCondition availableInput;
265   - bool no_input;
266 144  
267 145 // The buffer that is currently being added to
268 146 QList<FrameData *> * inputBuffer;
... ... @@ -281,44 +159,71 @@ private:
281 159 class DataSource
282 160 {
283 161 public:
284   - DataSource(int maxFrames=100)
  162 + DataSource(int maxFrames=Globals->parallelism + 1)
285 163 {
  164 + final_frame = -1;
  165 + last_issued = -2;
286 166 for (int i=0; i < maxFrames;i++)
287 167 {
288 168 allFrames.addItem(new FrameData());
289 169 }
290   - allFrames.startInput();
291 170 }
292 171  
293 172 virtual ~DataSource()
294 173 {
295   - allFrames.stoppedInput();
296 174 while (true)
297 175 {
298   - FrameData * frame = allFrames.getItem();
  176 + FrameData * frame = allFrames.tryGetItem();
299 177 if (frame == NULL)
300 178 break;
301 179 delete frame;
302 180 }
303 181 }
304 182  
305   - FrameData * getFrame()
  183 + // non-blocking version of getFrame
  184 + FrameData * tryGetFrame()
306 185 {
307   - FrameData * aFrame = allFrames.getItem();
  186 + FrameData * aFrame = allFrames.tryGetItem();
  187 + if (aFrame == NULL)
  188 + return NULL;
  189 +
308 190 aFrame->data.clear();
309 191 aFrame->sequenceNumber = -1;
310 192  
311 193 bool res = getNext(*aFrame);
312 194 if (!res) {
313 195 allFrames.addItem(aFrame);
  196 + // Datasource broke?
  197 + QMutexLocker lock(&last_frame_update);
  198 +
  199 + final_frame = last_issued;
  200 + if (final_frame == last_received)
  201 + lastReturned.wakeAll();
  202 + else if (final_frame < last_received)
  203 + std::cout << "Bad last frame " << final_frame << " but received " << last_received << std::endl;
314 204 return NULL;
315 205 }
  206 + last_issued = aFrame->sequenceNumber;
316 207 return aFrame;
317 208 }
318 209  
319   - void returnFrame(FrameData * inputFrame)
  210 + bool returnFrame(FrameData * inputFrame)
320 211 {
321 212 allFrames.addItem(inputFrame);
  213 +
  214 + QMutexLocker lock(&last_frame_update);
  215 + last_received = inputFrame->sequenceNumber;
  216 + if (inputFrame->sequenceNumber == final_frame) {
  217 + lastReturned.wakeAll();
  218 + }
  219 +
  220 + return this->final_frame != -1;
  221 + }
  222 +
  223 + void waitLast()
  224 + {
  225 + QMutexLocker lock(&last_frame_update);
  226 + lastReturned.wait(&last_frame_update);
322 227 }
323 228  
324 229 virtual void close() = 0;
... ... @@ -329,6 +234,12 @@ public:
329 234  
330 235 protected:
331 236 DoubleBuffer allFrames;
  237 + int final_frame;
  238 + int last_issued;
  239 + int last_received;
  240 +
  241 + QWaitCondition lastReturned;
  242 + QMutex last_frame_update;
332 243 };
333 244  
334 245 // Read a video frame by frame using cv::VideoCapture
... ... @@ -339,6 +250,9 @@ public:
339 250  
340 251 bool open(Template &input)
341 252 {
  253 + final_frame = -1;
  254 + last_issued = -2;
  255 +
342 256 next_idx = 0;
343 257 basis = input;
344 258 video.open(input.file.name.toStdString());
... ... @@ -365,6 +279,7 @@ private:
365 279 if (!res) {
366 280 return false;
367 281 }
  282 + output.data.last().file.set("FrameNumber", output.sequenceNumber);
368 283 return true;
369 284 }
370 285  
... ... @@ -388,6 +303,9 @@ public:
388 303 basis = input;
389 304 current_idx = 0;
390 305 next_sequence = 0;
  306 + final_frame = -1;
  307 + last_issued = -2;
  308 +
391 309 return isOpen();
392 310 }
393 311  
... ... @@ -448,6 +366,9 @@ public:
448 366 {
449 367 close();
450 368 bool open_res = false;
  369 + final_frame = -1;
  370 + last_issued = -2;
  371 +
451 372 // Input has no matrices? Its probably a video that hasn't been loaded yet
452 373 if (input.empty()) {
453 374 actualSource = new VideoDataSource(0);
... ... @@ -479,84 +400,194 @@ protected:
479 400  
480 401 class ProcessingStage : public QRunnable
481 402 {
  403 +public:
482 404 friend class StreamTransform;
483 405 public:
484 406 ProcessingStage(int nThreads = 1)
485 407 {
486 408 thread_count = nThreads;
487   - activeThreads.release(thread_count);
488 409 setAutoDelete(false);
489 410 }
490 411  
491   - void markStart()
492   - {
493   - activeThreads.acquire();
494   - }
  412 + virtual void run()=0;
495 413  
496   - void waitStop()
497   - {
498   - // Wait until all threads have stopped
499   - activeThreads.acquire(thread_count);
500   - activeThreads.release(thread_count);
501   - }
  414 + virtual void nextStageRun(FrameData * input)=0;
502 415  
503 416 protected:
504   - void markStop()
505   - {
506   - activeThreads.release();
507   - }
508   - QSemaphore activeThreads;
509 417 int thread_count;
510 418  
511 419 SharedBuffer * inputBuffer;
512   - SharedBuffer * outputBuffer;
  420 + ProcessingStage * nextStage;
513 421 Transform * transform;
514 422 int stage_id;
515 423  
  424 +};
  425 +
  426 +class MultiThreadStage;
  427 +
  428 +void multistage_run(MultiThreadStage * basis, FrameData * input);
  429 +
  430 +class MultiThreadStage : public ProcessingStage
  431 +{
  432 +public:
  433 + MultiThreadStage(int _input) : ProcessingStage(_input) {}
  434 +
  435 + friend void multistage_run(MultiThreadStage * basis, FrameData * input);
  436 +
  437 + void run()
  438 + {
  439 + qFatal("no don't do it!");
  440 + }
  441 +
  442 + // Called from a different thread than run
  443 + virtual void nextStageRun(FrameData * input)
  444 + {
  445 + QtConcurrent::run(multistage_run, this, input);
  446 + }
  447 +};
  448 +
  449 +void multistage_run(MultiThreadStage * basis, FrameData * input)
  450 +{
  451 + if (input == NULL)
  452 + qFatal("null input to multi-thread stage");
  453 + // Project the input we got
  454 + basis->transform->projectUpdate(input->data);
  455 +
  456 + basis->nextStage->nextStageRun(input);
  457 +}
  458 +
  459 +class SingleThreadStage : public ProcessingStage
  460 +{
516 461 public:
  462 + SingleThreadStage(bool input_variance) : ProcessingStage(1)
  463 + {
  464 + currentStatus = STOPPING;
  465 + next_target = 0;
  466 + if (input_variance)
  467 + this->inputBuffer = new DoubleBuffer();
  468 + else
  469 + this->inputBuffer = new SequencingBuffer();
  470 + }
  471 + ~SingleThreadStage()
  472 + {
  473 + delete inputBuffer;
  474 + }
  475 +
  476 + int next_target;
  477 + enum Status
  478 + {
  479 + STARTING,
  480 + STOPPING
  481 + };
  482 + QReadWriteLock statusLock;
  483 + Status currentStatus;
  484 +
517 485 // We should start, and enter a wait on input data
518 486 void run()
519 487 {
520   - markStart();
  488 + FrameData * currentItem;
521 489 forever
522 490 {
523   - FrameData * currentItem = inputBuffer->getItem();
  491 + // Whether or not we get a valid item controls whether or not we
  492 + QWriteLocker lock(&statusLock);
  493 + currentItem = inputBuffer->tryGetItem();
524 494 if (currentItem == NULL)
525   - break;
  495 + {
  496 + this->currentStatus = STOPPING;
  497 + return;
  498 + }
  499 + lock.unlock();
  500 + if (currentItem->sequenceNumber != next_target)
  501 + {
  502 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  503 + }
  504 + next_target = currentItem->sequenceNumber + 1;
  505 +
526 506 // Project the input we got
527 507 transform->projectUpdate(currentItem->data);
528   - // Add the result to the ouptut buffer
529   - outputBuffer->addItem(currentItem);
  508 +
  509 + this->nextStage->nextStageRun(currentItem);
530 510 }
531   - markStop();
532 511 }
533   -};
534 512  
  513 + // Calledfrom a different thread than run.
  514 + void nextStageRun(FrameData * input)
  515 + {
  516 + // add to our input buffer
  517 + inputBuffer->addItem(input);
  518 + QReadLocker lock(&statusLock);
  519 + if (currentStatus == STARTING)
  520 + return;
  521 +
  522 + // Have to change to a write lock to modify currentStatus
  523 + lock.unlock();
  524 + QWriteLocker writeLock(&statusLock);
  525 + // But someone might have changed it between locks
  526 + if (currentStatus == STARTING)
  527 + return;
  528 + // Ok we can start a thread
  529 + QThreadPool::globalInstance()->start(this);
  530 + currentStatus = STARTING;
  531 + }
  532 +};
535 533  
536 534 // No input buffer, instead we draw templates from some data source
537 535 // Will be operated by the main thread for the stream
538   -class FirstStage : public ProcessingStage
  536 +class FirstStage : public SingleThreadStage
539 537 {
540 538 public:
  539 + FirstStage() : SingleThreadStage(true) {}
  540 +
541 541 DataSourceManager dataSource;
542 542 // Start drawing frames from the datasource.
543 543 void run()
544 544 {
  545 + FrameData * currentItem;
545 546 forever
546 547 {
547   - //FrameData * aFrame = dataSource.getNext();
548   - FrameData * aFrame = dataSource.getFrame();
549   - if (aFrame == NULL)
550   - break;
551   - outputBuffer->addItem(aFrame);
  548 + // Whether or not we get a valid item controls whether or not we
  549 + QWriteLocker lock(&statusLock);
  550 +
  551 + currentItem = this->dataSource.tryGetFrame();
  552 + if (currentItem == NULL)
  553 + {
  554 + this->currentStatus = STOPPING;
  555 + return;
  556 + }
  557 + lock.unlock();
  558 + if (currentItem->sequenceNumber != next_target)
  559 + {
  560 + qFatal("out of order frames for stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  561 + }
  562 + next_target = currentItem->sequenceNumber + 1;
  563 +
  564 + this->nextStage->nextStageRun(currentItem);
552 565 }
553   - this->markStop();
554 566 }
  567 +
  568 + void nextStageRun(FrameData * input)
  569 + {
  570 + QWriteLocker lock(&statusLock);
  571 +
  572 + // Return the frame to the frame buffer
  573 + bool res = dataSource.returnFrame(input);
  574 + // If the data source broke already, we're done.
  575 + if (res)
  576 + return;
  577 +
  578 + if (currentStatus == STARTING)
  579 + return;
  580 +
  581 + currentStatus = STARTING;
  582 + QThreadPool::globalInstance()->start(this, this->next_target);
  583 + }
  584 +
555 585 };
556 586  
557   -class LastStage : public ProcessingStage
  587 +class LastStage : public SingleThreadStage
558 588 {
559 589 public:
  590 + LastStage(bool _prev_stage_variance) : SingleThreadStage(_prev_stage_variance) {}
560 591 TemplateList getOutput()
561 592 {
562 593 return collectedOutput;
... ... @@ -565,28 +596,35 @@ public:
565 596 private:
566 597 TemplateList collectedOutput;
567 598 public:
568   - DataSource * data;
569 599 void run()
570 600 {
571 601 forever
572 602 {
573   - // Wait for input
574   - FrameData * frame = inputBuffer->getItem();
575   - if (frame == NULL)
  603 + QWriteLocker lock(&statusLock);
  604 + FrameData * currentItem = inputBuffer->tryGetItem();
  605 + if (currentItem == NULL)
  606 + {
  607 + currentStatus = STOPPING;
576 608 break;
  609 + }
  610 + lock.unlock();
  611 +
  612 + if (currentItem->sequenceNumber != next_target)
  613 + {
  614 + qFatal("out of order frames for collection stage %d, got %d expected %d", this->stage_id, currentItem->sequenceNumber, this->next_target);
  615 + }
  616 + next_target = currentItem->sequenceNumber + 1;
  617 +
577 618 // Just put the item on collectedOutput
578   - collectedOutput.append(frame->data);
579   - // Return the frame to the input frame buffer
580   - data->returnFrame(frame);
  619 + collectedOutput.append(currentItem->data);
  620 + this->nextStage->nextStageRun(currentItem);
581 621 }
582   - this->markStop();
583 622 }
584 623 };
585 624  
586 625 class StreamTransform : public CompositeTransform
587 626 {
588 627 Q_OBJECT
589   - int threads_per_multi_stage;
590 628 public:
591 629 void train(const TemplateList & data)
592 630 {
... ... @@ -627,40 +665,15 @@ public:
627 665 return;
628 666 }
629 667  
630   - // Tell all buffers to expect input
631   - for (int i=0; i < sharedBuffers.size(); i++) {
632   - sharedBuffers[i]->startInput();
633   - }
634   -
635   - // Start our processing stages
636   - for (int i=0; i < this->processingStages.size(); i++) {
637   - int count = stage_variance[i] ? 1 : threads_per_multi_stage;
638   - for (int j =0; j < count; j ++) processingThreads.start(processingStages[i]);
639   - }
640   -
641   - // Start the final stage
642   - processingThreads.start(&collectionStage);
643   -
644   - // Run the read stage ourselves
645   - readStage.run();
646   -
647   - // The read stage has stopped (since we ran the read stage).
648   - // Step over the buffers, and call stoppedInput to tell the stage
649   - // reading from each buffer that no more frames will be added after
650   - // the current ones run out, then wait for the thread to finish.
651   - for (int i =0; i < (sharedBuffers.size() - 1); i++) {
652   - // Indicate that no more input will be available
653   - sharedBuffers[i]->stoppedInput();
654   -
655   - // Wait for the thread to finish.
656   - this->processingStages[i]->waitStop();
657   - }
658   - // Wait for the collection stage to finish
659   - sharedBuffers.last()->stoppedInput();
660   - collectionStage.waitStop();
  668 + QThreadPool::globalInstance()->releaseThread();
  669 + readStage.currentStatus = SingleThreadStage::STARTING;
  670 + QThreadPool::globalInstance()->start(&readStage, 0);
  671 + // Wait for the end.
  672 + readStage.dataSource.waitLast();
  673 + QThreadPool::globalInstance()->reserveThread();
661 674  
662 675 // dst is set to all output received by the final stage
663   - dst = collectionStage.getOutput();
  676 + dst = collectionStage->getOutput();
664 677 }
665 678  
666 679 virtual void finalize(TemplateList & output)
... ... @@ -672,78 +685,47 @@ public:
672 685 // Create and link stages
673 686 void init()
674 687 {
675   - int thread_count = 0;
676   - threads_per_multi_stage = 4;
  688 + if (transforms.isEmpty()) return;
  689 +
677 690 stage_variance.reserve(transforms.size());
678 691 foreach (const br::Transform *transform, transforms) {
679 692 stage_variance.append(transform->timeVarying());
680   - thread_count += transform->timeVarying() ? 1 : threads_per_multi_stage;
681 693 }
682   - if (transforms.isEmpty()) return;
683   -
684   - // Set up the thread pool, 1 stage for each transform, as well as first
685   - // and last stages, but the first stage is operated by the thread that
686   - // calls project so the pool only needs nTransforms+1 total.
687   - processingThreads.setMaxThreadCount(thread_count + 1);
688   -
689   -
690   - // buffer 0 -- output buffer for the read stage, input buffer for
691   - // first transform. Is that transform time-varying?
692   - if (stage_variance[0])
693   - sharedBuffers.append(new DoubleBuffer());
694   - // If not, we can run multiple threads
695   - else
696   - sharedBuffers.append(new SingleBuffer());
697 694  
698   - readStage.outputBuffer = sharedBuffers.last();
699 695 readStage.stage_id = 0;
700 696  
701 697 int next_stage_id = 1;
702   -
703 698 int lastBufferIdx = 0;
  699 + bool prev_stage_variance = true;
704 700 for (int i =0; i < transforms.size(); i++)
705 701 {
706   - // Set up this stage
707   - processingStages.append(new ProcessingStage(stage_variance[i] ? 1 : threads_per_multi_stage));
  702 + if (stage_variance[i])
  703 + {
  704 + processingStages.append(new SingleThreadStage(prev_stage_variance));
  705 + }
  706 + else
  707 + processingStages.append(new MultiThreadStage(Globals->parallelism));
708 708  
709 709 processingStages.last()->stage_id = next_stage_id++;
710   - processingStages.last()->inputBuffer = sharedBuffers[lastBufferIdx];
711   - lastBufferIdx++;
712 710  
713   - // This stage's output buffer, next stage's input buffer. If this is
714   - // the last transform, the next stage is the (time varying) collection
715   - // stage
716   - bool next_variance = (i+1) < transforms.size() ? stage_variance[i+1] : true;
717   - bool current_variance = stage_variance[i];
718   - // if this is a single threaded stage
719   - if (current_variance)
720   - {
721   - // 1 - 1 case
722   - if (next_variance)
723   - sharedBuffers.append(new DoubleBuffer());
724   - // 1 - n case
725   - else
726   - sharedBuffers.append(new SingleBuffer());
727   - }
728   - // This is a multi-threaded stage
  711 + // link nextStage pointers
  712 + if (i == 0)
  713 + this->readStage.nextStage = processingStages[i];
729 714 else
730   - {
731   - // If the next stage is single threaded, we need to sequence our
732   - // output (n - 1 case)
733   - if (next_variance)
734   - sharedBuffers.append(new SequencingBuffer());
735   - // Otherwise, this is an n-n boundary and we don't need to
736   - // adhere to any particular sequence
737   - else
738   - sharedBuffers.append(new SingleBuffer());
739   - }
740   - processingStages.last()->outputBuffer = sharedBuffers.last();
  715 + processingStages[i-1]->nextStage = processingStages[i];
  716 +
  717 + lastBufferIdx++;
  718 +
741 719 processingStages.last()->transform = transforms[i];
  720 + prev_stage_variance = stage_variance[i];
742 721 }
743 722  
744   - collectionStage.inputBuffer = sharedBuffers.last();
745   - collectionStage.data = &readStage.dataSource;
746   - collectionStage.stage_id = next_stage_id;
  723 + collectionStage = new LastStage(prev_stage_variance);
  724 + collectionStage->stage_id = next_stage_id;
  725 +
  726 + // It's a ring buffer, get it?
  727 + processingStages.last()->nextStage = collectionStage;
  728 + collectionStage->nextStage = &readStage;
747 729 }
748 730  
749 731 ~StreamTransform()
... ... @@ -751,22 +733,16 @@ public:
751 733 for (int i = 0; i < processingStages.size(); i++) {
752 734 delete processingStages[i];
753 735 }
754   - for (int i = 0; i < sharedBuffers.size(); i++) {
755   - delete sharedBuffers[i];
756   - }
757   -
  736 + delete collectionStage;
758 737 }
759 738  
760 739 protected:
761 740 QList<bool> stage_variance;
762 741  
763 742 FirstStage readStage;
764   - LastStage collectionStage;
  743 + LastStage * collectionStage;
765 744  
766 745 QList<ProcessingStage *> processingStages;
767   - QList<SharedBuffer *> sharedBuffers;
768   -
769   - QThreadPool processingThreads;
770 746  
771 747 void _project(const Template &src, Template &dst) const
772 748 {
... ...