diff --git a/openbr/openbr_export.cpp b/openbr/openbr_export.cpp index 0eccdb5..dac8526 100644 --- a/openbr/openbr_export.cpp +++ b/openbr/openbr_export.cpp @@ -69,7 +69,6 @@ $ cd bin $ export LD_LIBRARY_PATH=../lib:${LD_LIBRARY_PATH} $ sudo ldconfig -$ sudo cp ../share/openbr/70-yubikey.rules /etc/udev/rules.d # Only needed if you were given a license dongle. \endverbatim * \par OS X \verbatim @@ -80,10 +79,6 @@ $ export DYLD_FRAMEWORK_PATH=../lib:${DYLD_FRAMEWORK_PATH} * \par Windows * No configuration is necessary! * - * \section installation_license_dongle License Dongle - * In the unlikely event that you were given a USB License Dongle, then dongle must be in the computer in order to use the SDK. - * No configuration of the dongle is needed. - * * \section installation_done Start Working * To test for successful installation: \verbatim diff --git a/openbr/plugins/stream.cpp b/openbr/plugins/stream.cpp index 195ee80..5944216 100644 --- a/openbr/plugins/stream.cpp +++ b/openbr/plugins/stream.cpp @@ -31,8 +31,10 @@ public: virtual ~SharedBuffer() {} virtual void addItem(FrameData * input)=0; + virtual void reset()=0; virtual FrameData * tryGetItem()=0; + virtual int size()=0; }; // for n - 1 boundaries, multiple threads call addItem, the frames are @@ -74,6 +76,21 @@ public: return output; } + virtual int size() + { + QMutexLocker lock(&bufferGuard); + return buffer.size(); + } + virtual void reset() + { + if (size() != 0) + qDebug("Sequencing buffer has non-zero size during reset!"); + + QMutexLocker lock(&bufferGuard); + next_target = 0; + } + + private: QMutex bufferGuard; int next_target; @@ -95,6 +112,11 @@ public: outputBuffer = &buffer2; } + int size() + { + QReadLocker readLock(&bufferGuard); + return inputBuffer->size() + outputBuffer->size(); + } // called from the producer thread void addItem(FrameData * input) @@ -133,6 +155,13 @@ public: return output; } + virtual void reset() + { + if (this->size() != 0) + qDebug("Shared buffer has non-zero size during reset!"); + } + + private: // The read-write lock. The thread adding to this buffer can add // to the current input buffer if it has a read lock. The thread @@ -194,14 +223,13 @@ public: // Try to get a FrameData from the pool, if we can't it means too many // frames are already out, and we will return NULL to indicate failure FrameData * aFrame = allFrames.tryGetItem(); - if (aFrame == NULL) { + if (aFrame == NULL) return NULL; - } aFrame->data.clear(); aFrame->sequenceNumber = -1; - // Try to read a frame, if this returns false the data source is broken + // Try to actually read a frame, if this returns false the data source is broken bool res = getNext(*aFrame); // The datasource broke, update final_frame @@ -211,12 +239,15 @@ public: final_frame = lookAhead.back()->sequenceNumber; allFrames.addItem(aFrame); } - else lookAhead.push_back(aFrame); + else { + lookAhead.push_back(aFrame); + } + // we will return the first frame on the lookAhead buffer FrameData * rVal = lookAhead.first(); lookAhead.pop_front(); - + // If this is the last frame, say so if (rVal->sequenceNumber == final_frame) { last_frame = true; is_broken = true; @@ -239,6 +270,7 @@ public: if (frameNumber == final_frame) { // We just received the last frame, better pulse + allReturned = true; lastReturned.wakeAll(); rval = true; } @@ -246,15 +278,24 @@ public: return rval; } - void waitLast() + bool waitLast() { QMutexLocker lock(&last_frame_update); - lastReturned.wait(&last_frame_update); + + while (!allReturned) + { + // This would be a safer wait if we used a timeout, but + // theoretically that should never matter. + lastReturned.wait(&last_frame_update); + } + return true; } bool open(Template & output, int start_index = 0) { is_broken = false; + allReturned = false; + // The last frame isn't initialized yet final_frame = -1; // Start our sequence numbers from the input index @@ -282,19 +323,34 @@ public: bool res = getNext(*firstFrame); // the data source broke already, we couldn't even get one frame - // from it. + // from it even though it claimed to have opened successfully. if (!res) { is_broken = true; return false; } + // We read one frame ahead of the last one returned, this allows + // us to know which frame is the final frame when we return it. lookAhead.append(firstFrame); return true; } + /* + * Pure virtual methods + */ + + // isOpen doesn't appear to particularly work when used on opencv + // VideoCaptures, so we don't use it for anything important. virtual bool isOpen()=0; + // Called from open, open the data source specified by the input + // template, don't worry about setting any of the state variables + // set in open. virtual bool concreteOpen(Template & output) = 0; + // Get the next frame from the data source, store the results in + // FrameData (including the actual frame and appropriate sequence + // number). virtual bool getNext(FrameData & input) = 0; + // close the currently open data source. virtual void close() = 0; int next_sequence_number; @@ -302,6 +358,7 @@ protected: DoubleBuffer allFrames; int final_frame; bool is_broken; + bool allReturned; QList lookAhead; QWaitCondition lastReturned; @@ -317,6 +374,11 @@ public: bool concreteOpen(Template &input) { basis = input; + + // We can open either files (well actually this includes addresses of ip cameras + // through ffmpeg), or webcams. Webcam VideoCaptures are created through a separate + // overload of open that takes an integer, not a string. + // So, does this look like an integer? bool is_int = false; int anInt = input.file.name.toInt(&is_int); if (is_int) @@ -349,8 +411,10 @@ public: private: bool getNext(FrameData & output) { - if (!isOpen()) + if (!isOpen()) { + qDebug("video source is not open"); return false; + } output.data.append(Template(basis.file)); output.data.last().m() = cv::Mat(); @@ -362,6 +426,7 @@ private: bool res = video.read(temp); if (!res) { + // The video capture broke, return false. output.data.last().m() = cv::Mat(); close(); return false; @@ -391,6 +456,8 @@ public: data_ok = false; } + // To "open" it we just set appropriate indices, we assume that if this + // is an image, it is already loaded into memory. bool concreteOpen(Template &input) { basis = input; @@ -440,7 +507,7 @@ private: class DataSourceManager : public DataSource { public: - DataSourceManager() : DataSource(500) + DataSourceManager(int activeFrames=100) : DataSource(activeFrames) { actualSource = NULL; } @@ -450,6 +517,11 @@ public: close(); } + int size() + { + return this->allFrames.size(); + } + void close() { if (actualSource) { @@ -459,29 +531,40 @@ public: } } + // We are used through a call to open(TemplateList) bool open(TemplateList & input) { + // Set up variables specific to us current_template_idx = 0; templates = input; + // Call datasourece::open on the first template to set up + // state variables return DataSource::open(templates[current_template_idx]); } + // Create an actual data source of appropriate type for this template + // (initially called via the call to DataSource::open, called later + // as we run out of frames on our templates). bool concreteOpen(Template & input) { close(); + bool open_res = false; // Input has no matrices? Its probably a video that hasn't been loaded yet if (input.empty()) { actualSource = new VideoDataSource(0); - actualSource->concreteOpen(input); + open_res = actualSource->concreteOpen(input); } + // If the input is not empty, we assume it is a set of frames already + // in memory. else { - // create frame dealer actualSource = new TemplateDataSource(0); - actualSource->concreteOpen(input); + open_res = actualSource->concreteOpen(input); } - if (!isOpen()) { + + // The data source failed to open + if (!open_res) { delete actualSource; actualSource = NULL; return false; @@ -497,12 +580,16 @@ protected: TemplateList templates; DataSource * actualSource; + // Get the next frame, if we run out of frames on the current template + // move on to the next one. bool getNext(FrameData & output) { bool res = actualSource->getNext(output); output.sequenceNumber = next_sequence_number; + // OK we got a frame if (res) { + // Override the sequence number set by actualSource output.data.last().file.set("FrameNumber", output.sequenceNumber); next_sequence_number++; if (output.data.last().last().empty()) @@ -510,7 +597,7 @@ protected: return true; } - + // We didn't get a frame, try to move on to the next template. while(!res) { output.data.clear(); current_template_idx++; @@ -521,12 +608,16 @@ protected: // open the next data source bool open_res = concreteOpen(templates[current_template_idx]); + // We couldn't open it, give up? We could maybe continue here + // but don't currently. if (!open_res) return false; - // get a frame from it + // get a frame from the newly opened data source, if that fails + // we continue to open the next one. res = actualSource->getNext(output); } + // Finally, set the sequence number for the frame we actually return. output.sequenceNumber = next_sequence_number++; output.data.last().file.set("FrameNumber", output.sequenceNumber); @@ -573,6 +664,9 @@ public: int stage_id; virtual void reset()=0; + + virtual void status()=0; + protected: int thread_count; @@ -606,7 +700,8 @@ class MultiThreadStage : public ProcessingStage public: MultiThreadStage(int _input) : ProcessingStage(_input) {} - + // Not much to worry about here, we will project the input + // and try to continue to the next stage. FrameData * run(FrameData * input, bool & should_continue) { if (input == NULL) { @@ -620,7 +715,8 @@ public: return input; } - // Called from a different thread than run + // Called from a different thread than run. Nothing to worry about + // we offer no restrictions on when loops may enter this stage. virtual bool tryAcquireNextStage(FrameData *& input) { (void) input; @@ -631,6 +727,9 @@ public: { // nothing to do. } + void status(){ + qDebug("multi thread stage %d, nothing to worry about", this->stage_id); + } }; class SingleThreadStage : public ProcessingStage @@ -640,13 +739,18 @@ public: { currentStatus = STOPPING; next_target = 0; + // If the previous stage is single-threaded, queued inputs + // are stored in a double buffer if (input_variance) { this->inputBuffer = new DoubleBuffer(); } + // If it's multi-threaded we need to put the inputs back in order + // before we can use them, so we use a sequencing buffer. else { this->inputBuffer = new SequencingBuffer(); } } + ~SingleThreadStage() { delete inputBuffer; @@ -657,6 +761,7 @@ public: QWriteLocker writeLock(&statusLock); currentStatus = STOPPING; next_target = 0; + inputBuffer->reset(); } @@ -706,7 +811,13 @@ public: next->stages = stages; next->start_idx = this->stage_id; next->startItem = newItem; - this->threads->start(next, stages->size() - stage_id); + + // We start threads with priority equal to their stage id + // This is intended to ensure progression, we do queued late stage + // jobs before queued early stage jobs, and so tend to finish frames + // rather than go stage by stage. In Qt 5.1, priorities are priorities + // so we use the stage_id directly. + this->threads->start(next, stage_id); } @@ -741,45 +852,50 @@ public: return true; } + + void status(){ + qDebug("single thread stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size()); + } + }; -// No input buffer, instead we draw templates from some data source -// Will be operated by the main thread for the stream. starts threads +// This stage reads new frames from the data source. class FirstStage : public SingleThreadStage { public: - FirstStage() : SingleThreadStage(true) {} + FirstStage(int activeFrames = 100) : SingleThreadStage(true), dataSource(activeFrames){ } DataSourceManager dataSource; + void reset() + { + dataSource.close(); + SingleThreadStage::reset(); + } + FrameData * run(FrameData * input, bool & should_continue) { - // Try to get a frame from the datasource + if (input == NULL) + qFatal("NULL frame in input stage"); + + // Can we enter the next stage? + should_continue = nextStage->tryAcquireNextStage(input); + + // Try to get a frame from the datasource, we keep working on + // the frame we have, but we will queue another job for the next + // frame if a frame is currently available. QWriteLocker lock(&statusLock); bool last_frame = false; - input = dataSource.tryGetFrame(last_frame); + FrameData * newFrame = dataSource.tryGetFrame(last_frame); - // Datasource broke, or is currently out of frames? - if (!input || last_frame) - { - // We will just stop and not continue. + // Were we able to get a frame? + if (newFrame) startThread(newFrame); + // If not this stage will enter a stopped state. + else { currentStatus = STOPPING; - if (!input) { - should_continue = false; - return NULL; - } } - lock.unlock(); - // Can we enter the next stage? - should_continue = nextStage->tryAcquireNextStage(input); - // We are exiting leaving this stage, should we start another - // thread here? Normally we will always re-queue a thread on - // the first stage, but if we received the last frame there is - // no need to. - if (!last_frame) { - startThread(NULL); - } + lock.unlock(); return input; } @@ -797,31 +913,48 @@ public: } QReadLocker lock(&statusLock); - // A thread is already in the first stage, - // we should just return + // If the first stage is already active we will just end. if (currentStatus == STARTING) { return false; } - // Have to change to a write lock to modify currentStatus + + // Otherwise we will try to continue, but to do so we have to + // escalate the lock, and sadly there is no way to do so without + // releasing the read-mode lock, and getting a new write-mode lock. lock.unlock(); QWriteLocker writeLock(&statusLock); - // But someone else might have started a thread in the meantime + // currentStatus might have changed in the gap between releasing the read + // lock and getting the write lock. if (currentStatus == STARTING) { return false; } - // Ok we'll start a thread + + bool last_frame = false; + // Try to get a frame from the data source, if we get one we will + // continue to the first stage. + input = dataSource.tryGetFrame(last_frame); + + if (!input) { + return false; + } + currentStatus = STARTING; - // We always start a readstage thread with null input, so nothing to do here return true; } + void status(){ + qDebug("Read stage %d, status starting? %d, next frame %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->dataSource.size()); + } + + }; -// starts threads +// Appened to the end of a Stream's transform sequence. Collects the output +// from each frame on a single templatelist class LastStage : public SingleThreadStage { public: @@ -834,6 +967,7 @@ public: private: TemplateList collectedOutput; public: + void reset() { collectedOutput.clear(); @@ -873,6 +1007,11 @@ public: return input; } + + void status(){ + qDebug("Collection stage %d, status starting? %d, next %d buffer size %d", this->stage_id, this->currentStatus == SingleThreadStage::STARTING, this->next_target, this->inputBuffer->size()); + } + }; @@ -880,6 +1019,8 @@ class StreamTransform : public CompositeTransform { Q_OBJECT public: + Q_PROPERTY(int activeFrames READ get_activeFrames WRITE set_activeFrames RESET reset_activeFrames) + BR_PROPERTY(int, activeFrames, 100) void train(const TemplateList & data) { @@ -902,7 +1043,8 @@ public: qFatal("whatever"); } - // start processing + // start processing, consider all templates in src a continuous + // 'video' void projectUpdate(const TemplateList & src, TemplateList & dst) { dst = src; @@ -911,12 +1053,21 @@ public: if (!res) return; // Start the first thread in the stream. + QWriteLocker lock(&readStage->statusLock); readStage->currentStatus = SingleThreadStage::STARTING; - readStage->startThread(NULL); - // Wait for the stream to reach the last frame available from + // We have to get a frame before starting the thread + bool last_frame = false; + FrameData * firstFrame = readStage->dataSource.tryGetFrame(last_frame); + if (firstFrame == NULL) + qFatal("Failed to read first frame of video"); + readStage->startThread(firstFrame); + lock.unlock(); + + // Wait for the stream to process the last frame available from // the data source. - readStage->dataSource.waitLast(); + bool wait_res = false; + wait_res = readStage->dataSource.waitLast(); // Now that there are no more incoming frames, call finalize // on each transform in turn to collect any last templates @@ -958,6 +1109,8 @@ public: { if (transforms.isEmpty()) return; + // call CompositeTransform::init so that trainable is set + // correctly. CompositeTransform::init(); // We share a thread pool across streams attached to the same @@ -973,24 +1126,31 @@ public: threads = it.value(); poolLock.unlock(); + // Are our children time varying or not? This decides whether + // we run them in single threaded or multi threaded stages stage_variance.reserve(transforms.size()); foreach (const br::Transform *transform, transforms) { stage_variance.append(transform->timeVarying()); } - readStage = new FirstStage(); + // Additionally, we have a separate stage responsible for reading + // frames from the data source + readStage = new FirstStage(activeFrames); processingStages.push_back(readStage); readStage->stage_id = 0; readStage->stages = &this->processingStages; readStage->threads = this->threads; + // Initialize and link a processing stage for each of our child + // transforms. int next_stage_id = 1; - bool prev_stage_variance = true; for (int i =0; i < transforms.size(); i++) { if (stage_variance[i]) + // Whether or not the previous stage is multi-threaded controls + // the type of input buffer we need in a single threaded stage. processingStages.append(new SingleThreadStage(prev_stage_variance)); else processingStages.append(new MultiThreadStage(Globals->parallelism)); @@ -1008,20 +1168,25 @@ public: prev_stage_variance = stage_variance[i]; } + // We also have the last stage, which just puts the output of the + // previous stages on a template list. collectionStage = new LastStage(prev_stage_variance); processingStages.append(collectionStage); collectionStage->stage_id = next_stage_id; collectionStage->stages = &this->processingStages; collectionStage->threads = this->threads; + // the last transform stage points to collection stage processingStages[processingStages.size() - 2]->nextStage = collectionStage; - // It's a ring buffer, get it? + // And the collection stage points to the read stage, because this is + // a ring buffer. collectionStage->nextStage = readStage; } ~StreamTransform() { + // Delete all the stages for (int i = 0; i < processingStages.size(); i++) { delete processingStages[i]; }