Commit 6e505719cf88f70b466da308408eb0075da00506

Authored by Charles Otto
1 parent c4ff790d

Don't use default QByteArray serialization, it seems to be unreliable

openbr/core/qtutils.cpp
@@ -500,14 +500,15 @@ QString getAbsolutePath(const QString &filename) @@ -500,14 +500,15 @@ QString getAbsolutePath(const QString &filename)
500 return QFileInfo(filename).absoluteFilePath(); 500 return QFileInfo(filename).absoluteFilePath();
501 } 501 }
502 502
  503 +const int base_block = 100000000;
  504 +
503 BlockCompression::BlockCompression(QIODevice *_basis) 505 BlockCompression::BlockCompression(QIODevice *_basis)
504 { 506 {
505 - blockSize = 100000000; 507 + blockSize = base_block;
506 setBasis(_basis); 508 setBasis(_basis);
507 } 509 }
508 510
509 -BlockCompression::BlockCompression() { blockSize = 100000000; };  
510 - 511 +BlockCompression::BlockCompression() { blockSize = base_block;};
511 512
512 bool BlockCompression::open(QIODevice::OpenMode mode) 513 bool BlockCompression::open(QIODevice::OpenMode mode)
513 { 514 {
@@ -521,14 +522,20 @@ bool BlockCompression::open(QIODevice::OpenMode mode) @@ -521,14 +522,20 @@ bool BlockCompression::open(QIODevice::OpenMode mode)
521 blockWriter.setDevice(basis); 522 blockWriter.setDevice(basis);
522 523
523 if (mode & QIODevice::WriteOnly) { 524 if (mode & QIODevice::WriteOnly) {
524 - precompressedBlockWriter = new QBuffer;  
525 - precompressedBlockWriter->open(QIODevice::ReadWrite); 525 + precompressedBlockWriter.open(QIODevice::WriteOnly);
526 } 526 }
527 else if (mode & QIODevice::ReadOnly) { 527 else if (mode & QIODevice::ReadOnly) {
  528 +
  529 + // Read an initial compressed block from the underlying QIODevice,
  530 + // decompress, and set up a reader on it
528 QByteArray compressedBlock; 531 QByteArray compressedBlock;
529 - blockReader >> compressedBlock; 532 + quint32 block_size;
  533 + blockReader >> block_size;
  534 + compressedBlock.resize(block_size);
  535 + blockReader.readRawData(compressedBlock.data(), block_size);
530 536
531 decompressedBlock = qUncompress(compressedBlock); 537 decompressedBlock = qUncompress(compressedBlock);
  538 +
532 decompressedBlockReader.setBuffer(&decompressedBlock); 539 decompressedBlockReader.setBuffer(&decompressedBlock);
533 decompressedBlockReader.open(QIODevice::ReadOnly); 540 decompressedBlockReader.open(QIODevice::ReadOnly);
534 } 541 }
@@ -538,11 +545,17 @@ bool BlockCompression::open(QIODevice::OpenMode mode) @@ -538,11 +545,17 @@ bool BlockCompression::open(QIODevice::OpenMode mode)
538 545
539 void BlockCompression::close() 546 void BlockCompression::close()
540 { 547 {
541 - // flush output buffer  
542 - if ((openMode() & QIODevice::WriteOnly) && precompressedBlockWriter) {  
543 - QByteArray compressedBlock = qCompress(precompressedBlockWriter->buffer(), -1);  
544 - blockWriter << compressedBlock; 548 + // flush output buffer, since we may have a partial block which hasn't been
  549 + // written to disk yet.
  550 + if ((openMode() & QIODevice::WriteOnly) && precompressedBlockWriter.isOpen()) {
  551 + QByteArray compressedBlock = qCompress(precompressedBlockWriter.buffer());
  552 + precompressedBlockWriter.close();
  553 +
  554 + quint32 bsize= compressedBlock.size();
  555 + blockWriter << bsize;
  556 + blockWriter.writeRawData(compressedBlock.data(), compressedBlock.size());
545 } 557 }
  558 + // close the underlying device.
546 basis->close(); 559 basis->close();
547 } 560 }
548 561
@@ -557,8 +570,10 @@ void BlockCompression::setBasis(QIODevice *_basis) @@ -557,8 +570,10 @@ void BlockCompression::setBasis(QIODevice *_basis)
557 // block from basis 570 // block from basis
558 qint64 BlockCompression::readData(char *data, qint64 remaining) 571 qint64 BlockCompression::readData(char *data, qint64 remaining)
559 { 572 {
  573 + qint64 initial = remaining;
560 qint64 read = 0; 574 qint64 read = 0;
561 while (remaining > 0) { 575 while (remaining > 0) {
  576 + // attempt to read the target amount of data
562 qint64 single_read = decompressedBlockReader.read(data, remaining); 577 qint64 single_read = decompressedBlockReader.read(data, remaining);
563 if (single_read == -1) 578 if (single_read == -1)
564 qFatal("miss read"); 579 qFatal("miss read");
@@ -567,13 +582,19 @@ qint64 BlockCompression::readData(char *data, qint64 remaining) @@ -567,13 +582,19 @@ qint64 BlockCompression::readData(char *data, qint64 remaining)
567 read += single_read; 582 read += single_read;
568 data += single_read; 583 data += single_read;
569 584
570 - // need a new block 585 + // need a new block if we didn't get enough bytes from the previous read
571 if (remaining > 0) { 586 if (remaining > 0) {
572 QByteArray compressedBlock; 587 QByteArray compressedBlock;
573 - blockReader >> compressedBlock;  
574 - if (compressedBlock.size() == 0) {  
575 - return read;  
576 - } 588 +
  589 + // read the size of the next block
  590 + quint32 block_size;
  591 + blockReader >> block_size;
  592 +
  593 + compressedBlock.resize(block_size);
  594 + int actualRead = blockReader.readRawData(compressedBlock.data(), block_size);
  595 + if (actualRead != block_size)
  596 + qFatal("Bad read on nominal block size: %d, only got %d", block_size, remaining);
  597 +
577 decompressedBlock = qUncompress(compressedBlock); 598 decompressedBlock = qUncompress(compressedBlock);
578 599
579 decompressedBlockReader.close(); 600 decompressedBlockReader.close();
@@ -581,7 +602,14 @@ qint64 BlockCompression::readData(char *data, qint64 remaining) @@ -581,7 +602,14 @@ qint64 BlockCompression::readData(char *data, qint64 remaining)
581 decompressedBlockReader.open(QIODevice::ReadOnly); 602 decompressedBlockReader.open(QIODevice::ReadOnly);
582 } 603 }
583 } 604 }
584 - return blockReader.atEnd() && !basis->isReadable() ? -1 : read; 605 +
  606 + if (read != initial)
  607 + qFatal("Failed to read enough");
  608 + bool condition = blockReader.atEnd() && !basis->isReadable() ;
  609 + if (condition)
  610 + qWarning("Returning -1 from read");
  611 +
  612 + return condition ? -1 : read;
585 } 613 }
586 614
587 bool BlockCompression::isSequential() const 615 bool BlockCompression::isSequential() const
@@ -591,36 +619,57 @@ bool BlockCompression::isSequential() const @@ -591,36 +619,57 @@ bool BlockCompression::isSequential() const
591 619
592 qint64 BlockCompression::writeData(const char *data, qint64 remaining) 620 qint64 BlockCompression::writeData(const char *data, qint64 remaining)
593 { 621 {
  622 + const char * endPoint = data + remaining;
  623 + qint64 initial = remaining;
  624 +
594 qint64 written = 0; 625 qint64 written = 0;
595 626
596 while (remaining > 0) { 627 while (remaining > 0) {
597 // how much more can be put in this buffer? 628 // how much more can be put in this buffer?
598 - qint64 capacity = blockSize - precompressedBlockWriter->pos(); 629 + qint64 capacity = blockSize - precompressedBlockWriter.pos();
  630 + if (capacity < 0)
  631 + qFatal("Negative capacity!!!");
599 632
600 // don't try to write beyond capacity 633 // don't try to write beyond capacity
601 qint64 write_size = qMin(capacity, remaining); 634 qint64 write_size = qMin(capacity, remaining);
602 635
603 - qint64 singleWrite = precompressedBlockWriter->write(data, write_size);  
604 - // ignore the error case here, we consdier basis's failure mode the real  
605 - // end case 636 + qint64 singleWrite = precompressedBlockWriter.write(data, write_size);
  637 +
606 if (singleWrite == -1) 638 if (singleWrite == -1)
607 - singleWrite = 0; 639 + qFatal("matrix write failure?");
608 640
609 remaining -= singleWrite; 641 remaining -= singleWrite;
610 data += singleWrite; 642 data += singleWrite;
611 written += singleWrite; 643 written += singleWrite;
  644 + if (data > endPoint)
  645 + qFatal("Wrote past the end");
612 646
613 if (remaining > 0) { 647 if (remaining > 0) {
614 - QByteArray compressedBlock = qCompress(precompressedBlockWriter->buffer(), -1); 648 + QByteArray compressedBlock = qCompress(precompressedBlockWriter.buffer(), -1);
615 649
616 - if (compressedBlock.size() != 0)  
617 - blockWriter << compressedBlock; 650 + if (precompressedBlockWriter.buffer().size() != 0) {
  651 + quint32 block_size = compressedBlock.size();
  652 + blockWriter << block_size;
618 653
619 - delete precompressedBlockWriter;  
620 - precompressedBlockWriter = new QBuffer;  
621 - precompressedBlockWriter->open(QIODevice::ReadWrite); 654 + int write_count = blockWriter.writeRawData(compressedBlock.data(), block_size);
  655 + if (write_count != block_size)
  656 + qFatal("Didn't write enough data");
  657 + }
  658 + else
  659 + qFatal("serialized empty compressed block (?)");
  660 +
  661 + precompressedBlockWriter.close();
  662 + precompressedBlockWriter.open(QIODevice::WriteOnly);
622 } 663 }
623 } 664 }
  665 +
  666 + if (written != initial)
  667 + qFatal("didn't write enough bytes");
  668 +
  669 + bool condition = basis->isWritable();
  670 + if (!condition)
  671 + qWarning("Returning -1 from write");
  672 +
624 return basis->isWritable() ? written : -1; 673 return basis->isWritable() ? written : -1;
625 } 674 }
626 675
openbr/core/qtutils.h
@@ -122,7 +122,7 @@ namespace QtUtils @@ -122,7 +122,7 @@ namespace QtUtils
122 122
123 // write to a QByteArray, when max block sized is reached, compress and write 123 // write to a QByteArray, when max block sized is reached, compress and write
124 // it to basis 124 // it to basis
125 - QBuffer * precompressedBlockWriter; 125 + QBuffer precompressedBlockWriter;
126 QDataStream blockWriter; 126 QDataStream blockWriter;
127 qint64 writeData(const char *data, qint64 remaining); 127 qint64 writeData(const char *data, qint64 remaining);
128 }; 128 };