diff --git a/oletools/crypto.py b/oletools/crypto.py index f98a4d7..ae80589 100644 --- a/oletools/crypto.py +++ b/oletools/crypto.py @@ -236,7 +236,8 @@ def is_encrypted(some_file): if zipfile.is_zipfile(some_file): return _is_encrypted_zip(some_file) # otherwise assume it is the name of an ole file - return _is_encrypted_ole(OleFileIO(some_file)) + with OleFileIO(some_file) as ole: + return _is_encrypted_ole(ole) except Exception as exc: log.warning('Failed to check {} for encryption ({}); assume it is not ' 'encrypted.'.format(some_file, exc)) diff --git a/oletools/msodde.py b/oletools/msodde.py index 84a9b7d..1a3a275 100644 --- a/oletools/msodde.py +++ b/oletools/msodde.py @@ -493,17 +493,23 @@ def process_xls(filepath): """ find dde links in excel ole file """ result = [] - for stream in xls_parser.XlsFile(filepath).iter_streams(): - if not isinstance(stream, xls_parser.WorkbookStream): - continue - for record in stream.iter_records(): - if not isinstance(record, xls_parser.XlsRecordSupBook): + xls_file = None + try: + xls_file = xls_parser.XlsFile(filepath) + for stream in xls_file.iter_streams(): + if not isinstance(stream, xls_parser.WorkbookStream): continue - if record.support_link_type in ( - xls_parser.XlsRecordSupBook.LINK_TYPE_OLE_DDE, - xls_parser.XlsRecordSupBook.LINK_TYPE_EXTERNAL): - result.append(record.virt_path.replace(u'\u0003', u' ')) - return u'\n'.join(result) + for record in stream.iter_records(): + if not isinstance(record, xls_parser.XlsRecordSupBook): + continue + if record.support_link_type in ( + xls_parser.XlsRecordSupBook.LINK_TYPE_OLE_DDE, + xls_parser.XlsRecordSupBook.LINK_TYPE_EXTERNAL): + result.append(record.virt_path.replace(u'\u0003', u' ')) + return u'\n'.join(result) + finally: + if xls_file is not None: + xls_file.close() def process_docx(filepath, field_filter_mode=None): @@ -908,13 +914,12 @@ def process_file(filepath, field_filter_mode=None): if xls_parser.is_xls(filepath): logger.debug('Process file as excel 2003 (xls)') return process_xls(filepath) - - ole = olefile.OleFileIO(filepath, path_encoding=None) - if is_ppt(ole): + if is_ppt(filepath): logger.debug('is ppt - cannot have DDE') return u'' logger.debug('Process file as word 2003 (doc)') - return process_doc(ole) + with olefile.OleFileIO(filepath, path_encoding=None) as ole: + return process_doc(ole) with open(filepath, 'rb') as file_handle: if file_handle.read(4) == RTF_START: @@ -970,6 +975,7 @@ def process_maybe_encrypted(filepath, passwords=None, crypto_nesting=0, if not crypto.is_encrypted(filepath): return result except Exception: + logger.debug('Ignoring exception:', exc_info=True) if not crypto.is_encrypted(filepath): raise @@ -997,7 +1003,8 @@ def process_maybe_encrypted(filepath, passwords=None, crypto_nesting=0, try: # (maybe file was not yet created) os.unlink(decrypted_file) except Exception: - pass + logger.debug('Ignoring exception closing decrypted file:', + exc_info=True) return result diff --git a/oletools/oleobj.py b/oletools/oleobj.py index 9c152f3..d9cf876 100644 --- a/oletools/oleobj.py +++ b/oletools/oleobj.py @@ -526,29 +526,35 @@ def find_ole_in_ppt(filename): can contain the actual embedded file we are looking for (caller will check for these). """ - for stream in PptFile(filename).iter_streams(): - for record_idx, record in enumerate(stream.iter_records()): - if isinstance(record, PptRecordExOleVbaActiveXAtom): - ole = None - try: - data_start = next(record.iter_uncompressed()) - if data_start[:len(olefile.MAGIC)] != olefile.MAGIC: - continue # could be an ActiveX control or VBA Storage - - # otherwise, this should be an OLE object - log.debug('Found record with embedded ole object in ppt ' - '(stream "{0}", record no {1})' - .format(stream.name, record_idx)) - ole = record.get_data_as_olefile() - yield ole - except IOError: - log.warning('Error reading data from {0} stream or ' - 'interpreting it as OLE object' - .format(stream.name)) - log.debug('', exc_info=True) - finally: - if ole is not None: - ole.close() + ppt_file = None + try: + ppt_file = PptFile(filename) + for stream in ppt_file.iter_streams(): + for record_idx, record in enumerate(stream.iter_records()): + if isinstance(record, PptRecordExOleVbaActiveXAtom): + ole = None + try: + data_start = next(record.iter_uncompressed()) + if data_start[:len(olefile.MAGIC)] != olefile.MAGIC: + continue # could be ActiveX control / VBA Storage + + # otherwise, this should be an OLE object + log.debug('Found record with embedded ole object in ' + 'ppt (stream "{0}", record no {1})' + .format(stream.name, record_idx)) + ole = record.get_data_as_olefile() + yield ole + except IOError: + log.warning('Error reading data from {0} stream or ' + 'interpreting it as OLE object' + .format(stream.name)) + log.debug('', exc_info=True) + finally: + if ole is not None: + ole.close() + finally: + if ppt_file is not None: + ppt_file.close() class FakeFile(io.RawIOBase): diff --git a/oletools/ppt_record_parser.py b/oletools/ppt_record_parser.py index 430e766..f8d54ea 100644 --- a/oletools/ppt_record_parser.py +++ b/oletools/ppt_record_parser.py @@ -63,7 +63,6 @@ except ImportError: sys.path.insert(0, PARENT_DIR) del PARENT_DIR from oletools import record_base -from oletools.common.errors import CryptoErrorBase # types of relevant records (there are much more than listed here) @@ -109,10 +108,11 @@ RECORD_TYPES = dict([ ]) -# record types where version is not 0x0 or 0xf +# record types where version is not 0x0 or 0x1 or 0xf VERSION_EXCEPTIONS = dict([ (0x0400, 2), # rt_vbainfoatom (0x03ef, 2), # rt_slideatom + (0xe9c7, 7), # tests/test-data/encrypted/encrypted.ppt, not investigated ]) @@ -174,7 +174,7 @@ def is_ppt(filename): for record in stream.iter_records(): if record.type == 0x0ff5: # UserEditAtom have_user_edit = True - elif record.type == 0x1772: # PersisDirectoryAtom + elif record.type == 0x1772: # PersistDirectoryAtom have_persist_dir = True elif record.type == 0x03e8: # DocumentContainer have_document_container = True @@ -185,10 +185,12 @@ def is_ppt(filename): return True else: # ignore other streams/storages since they are optional continue - except CryptoErrorBase: - raise - except Exception: - pass + except Exception as exc: + logging.debug('Ignoring exception in is_ppt, assume is not ppt', + exc_info=True) + finally: + if ppt_file is not None: + ppt_file.close() return False diff --git a/oletools/xls_parser.py b/oletools/xls_parser.py index 2d501e9..2f0bdad 100644 --- a/oletools/xls_parser.py +++ b/oletools/xls_parser.py @@ -88,12 +88,18 @@ def is_xls(filename): substream. See also: oleid.OleID.check_excel """ + xls_file = None try: - for stream in XlsFile(filename).iter_streams(): + xls_file = XlsFile(filename) + for stream in xls_file.iter_streams(): if isinstance(stream, WorkbookStream): return True except Exception: - pass + logging.debug('Ignoring exception in is_xls, assume is not xls', + exc_info=True) + finally: + if xls_file is not None: + xls_file.close() return False diff --git a/tests/msodde/test_basic.py b/tests/msodde/test_basic.py index af767cc..6c48e86 100644 --- a/tests/msodde/test_basic.py +++ b/tests/msodde/test_basic.py @@ -9,11 +9,14 @@ Ensure that from __future__ import print_function import unittest -from oletools import msodde -from tests.test_utils import DATA_BASE_DIR as BASE_DIR +import sys import os from os.path import join from traceback import print_exc +from oletools import msodde +from oletools.crypto import \ + WrongEncryptionPassword, CryptoLibNotImported, check_msoffcrypto +from tests.test_utils import DATA_BASE_DIR as BASE_DIR class TestReturnCode(unittest.TestCase): @@ -46,15 +49,21 @@ class TestReturnCode(unittest.TestCase): def test_invalid_none(self): """ check that no file argument leads to non-zero exit status """ - self.do_test_validity('', True) + if sys.hexversion > 0x03030000: # version 3.3 and higher + # different errors probably depending on whether msoffcryto is + # available or not + expect_error = (AttributeError, FileNotFoundError) + else: + expect_error = (AttributeError, IOError) + self.do_test_validity('', expect_error) def test_invalid_empty(self): """ check that empty file argument leads to non-zero exit status """ - self.do_test_validity(join(BASE_DIR, 'basic/empty'), True) + self.do_test_validity(join(BASE_DIR, 'basic/empty'), Exception) def test_invalid_text(self): """ check that text file argument leads to non-zero exit status """ - self.do_test_validity(join(BASE_DIR, 'basic/text'), True) + self.do_test_validity(join(BASE_DIR, 'basic/text'), Exception) def test_encrypted(self): """ @@ -64,28 +73,38 @@ class TestReturnCode(unittest.TestCase): Encryption) is tested. """ CRYPT_DIR = join(BASE_DIR, 'encrypted') - ADD_ARGS = '', '-j', '-d', '-f', '-a' + have_crypto = check_msoffcrypto() for filename in os.listdir(CRYPT_DIR): - full_name = join(CRYPT_DIR, filename) - for args in ADD_ARGS: - self.do_test_validity(args + ' ' + full_name, True) - - def do_test_validity(self, args, expect_error=False): - """ helper for test_valid_doc[x] """ - have_exception = False + if have_crypto and 'standardpassword' in filename: + # these are automagically decrypted + self.do_test_validity(join(CRYPT_DIR, filename)) + elif have_crypto: + self.do_test_validity(join(CRYPT_DIR, filename), + WrongEncryptionPassword) + else: + self.do_test_validity(join(CRYPT_DIR, filename), + CryptoLibNotImported) + + def do_test_validity(self, filename, expect_error=None): + """ helper for test_[in]valid_* """ + found_error = None + # DEBUG: print('Testing file {}'.format(filename)) try: - msodde.process_file(args, msodde.FIELD_FILTER_BLACKLIST) - except Exception: - have_exception = True - print_exc() - except SystemExit as exc: # sys.exit() was called - have_exception = True - if exc.code is None: - have_exception = False - - self.assertEqual(expect_error, have_exception, - msg='Args={0}, expect={1}, exc={2}' - .format(args, expect_error, have_exception)) + msodde.process_maybe_encrypted(filename, + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) + except Exception as exc: + found_error = exc + # DEBUG: print_exc() + + if expect_error and not found_error: + self.fail('Expected {} but msodde finished without errors for {}' + .format(expect_error, filename)) + elif not expect_error and found_error: + self.fail('Unexpected error {} from msodde for {}' + .format(found_error, filename)) + elif expect_error and not isinstance(found_error, expect_error): + self.fail('Wrong kind of error {} from msodde for {}, expected {}' + .format(type(found_error), filename, expect_error)) class TestDdeLinks(unittest.TestCase): @@ -100,24 +119,27 @@ class TestDdeLinks(unittest.TestCase): def test_with_dde(self): """ check that dde links appear on stdout """ filename = 'dde-test-from-office2003.doc' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) self.assertNotEqual(len(self.get_dde_from_output(output)), 0, msg='Found no dde links in output of ' + filename) def test_no_dde(self): """ check that no dde links appear on stdout """ filename = 'harmless-clean.doc' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) self.assertEqual(len(self.get_dde_from_output(output)), 0, msg='Found dde links in output of ' + filename) def test_with_dde_utf16le(self): """ check that dde links appear on stdout """ filename = 'dde-test-from-office2013-utf_16le-korean.doc' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) self.assertNotEqual(len(self.get_dde_from_output(output)), 0, msg='Found no dde links in output of ' + filename) @@ -125,8 +147,9 @@ class TestDdeLinks(unittest.TestCase): """ check that dde links are found in excel 2007+ files """ expect = ['cmd /c calc.exe', ] for extn in 'xlsx', 'xlsm', 'xlsb': - output = msodde.process_file( - join(BASE_DIR, 'msodde', 'dde-test.' + extn), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', 'dde-test.' + extn), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) self.assertEqual(expect, self.get_dde_from_output(output), msg='unexpected output for dde-test.{0}: {1}' @@ -136,8 +159,9 @@ class TestDdeLinks(unittest.TestCase): """ check that dde in xml from word / excel is found """ for name_part in 'excel2003', 'word2003', 'word2007': filename = 'dde-in-' + name_part + '.xml' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) links = self.get_dde_from_output(output) self.assertEqual(len(links), 1, 'found {0} dde-links in {1}' .format(len(links), filename)) @@ -149,15 +173,17 @@ class TestDdeLinks(unittest.TestCase): def test_clean_rtf_blacklist(self): """ find a lot of hyperlinks in rtf spec """ filename = 'RTF-Spec-1.7.rtf' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_BLACKLIST) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_BLACKLIST) self.assertEqual(len(self.get_dde_from_output(output)), 1413) def test_clean_rtf_ddeonly(self): """ find no dde links in rtf spec """ filename = 'RTF-Spec-1.7.rtf' - output = msodde.process_file( - join(BASE_DIR, 'msodde', filename), msodde.FIELD_FILTER_DDE) + output = msodde.process_maybe_encrypted( + join(BASE_DIR, 'msodde', filename), + field_filter_mode=msodde.FIELD_FILTER_DDE) self.assertEqual(len(self.get_dde_from_output(output)), 0, msg='Found dde links in output of ' + filename) diff --git a/tests/msodde/test_crypto.py b/tests/msodde/test_crypto.py index 187b60b..38b2f06 100644 --- a/tests/msodde/test_crypto.py +++ b/tests/msodde/test_crypto.py @@ -4,10 +4,9 @@ import sys import unittest from os.path import basename, join as pjoin -from tests.test_utils import DATA_BASE_DIR +from tests.test_utils import DATA_BASE_DIR, call_and_capture from oletools import crypto -from oletools import msodde @unittest.skipIf(not crypto.check_msoffcrypto(), @@ -15,15 +14,18 @@ from oletools import msodde .format(basename(sys.executable))) class MsoddeCryptoTest(unittest.TestCase): """Test integration of decryption in msodde.""" + def test_standard_password(self): """Check dde-link is found in xls[mb] sample files.""" for suffix in 'xls', 'xlsx', 'xlsm', 'xlsb': example_file = pjoin(DATA_BASE_DIR, 'encrypted', 'dde-test-encrypt-standardpassword.' + suffix) - link_text = msodde.process_maybe_encrypted(example_file) - self.assertEqual(link_text, 'cmd /c calc.exe', - msg='Unexpected output {!r} for {}' - .format(link_text, suffix)) + output, _ = call_and_capture('msodde', [example_file, ]) + self.assertIn('\nDDE Links:\ncmd /c calc.exe\n', output, + msg='Unexpected output {!r} for {}' + .format(output, suffix)) + + # TODO: add more, in particular a sample with a "proper" password if __name__ == '__main__': diff --git a/tests/oleobj/test_basic.py b/tests/oleobj/test_basic.py index 8154e54..8ad0ef5 100644 --- a/tests/oleobj/test_basic.py +++ b/tests/oleobj/test_basic.py @@ -8,7 +8,7 @@ from hashlib import md5 from glob import glob # Directory with test data, independent of current working directory -from tests.test_utils import DATA_BASE_DIR +from tests.test_utils import DATA_BASE_DIR, call_and_capture from oletools import oleobj @@ -81,10 +81,6 @@ class TestOleObj(unittest.TestCase): """ fixture start: create temp dir """ self.temp_dir = mkdtemp(prefix='oletools-oleobj-') self.did_fail = False - if DEBUG: - import logging - logging.basicConfig(level=logging.DEBUG if DEBUG else logging.INFO) - oleobj.log.setLevel(logging.NOTSET) def tearDown(self): """ fixture end: remove temp dir """ @@ -101,7 +97,8 @@ class TestOleObj(unittest.TestCase): """ test that oleobj can be called with -i and -v - this is the way that amavisd calls oleobj, thinking it is ripOLE + This is how ripOLE used to be often called (e.g. by amavisd-new); + ensure oleobj is a compatible replacement. """ self.do_test_md5(['-d', self.temp_dir, '-v', '-i']) @@ -112,35 +109,52 @@ class TestOleObj(unittest.TestCase): 'embedded-simple-2007.xml', 'embedded-simple-2007-as2003.xml'): full_name = join(DATA_BASE_DIR, 'oleobj', sample_name) - ret_val = oleobj.main(args + [full_name, ]) + output, ret_val = call_and_capture('oleobj', args + [full_name, ], + accept_nonzero_exit=True) if glob(self.temp_dir + 'ole-object-*'): - self.fail('found embedded data in {0}'.format(sample_name)) - self.assertEqual(ret_val, oleobj.RETURN_NO_DUMP) + self.fail('found embedded data in {0}. Output:\n{1}' + .format(sample_name, output)) + self.assertEqual(ret_val, oleobj.RETURN_NO_DUMP, + msg='Wrong return value {} for {}. Output:\n{}' + .format(ret_val, sample_name, output)) - def do_test_md5(self, args, test_fun=oleobj.main): + def do_test_md5(self, args, test_fun=None, only_run_every=1): """ helper for test_md5 and test_md5_args """ - # name of sample, extension of embedded file, md5 hash of embedded file data_dir = join(DATA_BASE_DIR, 'oleobj') - for sample_name, embedded_name, expect_hash in SAMPLES: - ret_val = test_fun(args + [join(data_dir, sample_name), ]) - self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP) + + # name of sample, extension of embedded file, md5 hash of embedded file + for sample_index, (sample_name, embedded_name, expect_hash) \ + in enumerate(SAMPLES): + if sample_index % only_run_every != 0: + continue + args_with_path = args + [join(data_dir, sample_name), ] + if test_fun is None: + output, ret_val = call_and_capture('oleobj', args_with_path, + accept_nonzero_exit=True) + else: + ret_val = test_fun(args_with_path) + output = '[output: see above]' + self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP, + msg='Wrong return value {} for {}. Output:\n{}' + .format(ret_val, sample_name, output)) expect_name = join(self.temp_dir, sample_name + '_' + embedded_name) if not isfile(expect_name): self.did_fail = True - self.fail('{0} not created from {1}'.format(expect_name, - sample_name)) + self.fail('{0} not created from {1}. Output:\n{2}' + .format(expect_name, sample_name, output)) continue md5_hash = calc_md5(expect_name) if md5_hash != expect_hash: self.did_fail = True - self.fail('Wrong md5 {0} of {1} from {2}' - .format(md5_hash, expect_name, sample_name)) + self.fail('Wrong md5 {0} of {1} from {2}. Output:\n{3}' + .format(md5_hash, expect_name, sample_name, output)) continue def test_non_streamed(self): """ Ensure old oleobj behaviour still works: pre-read whole file """ - return self.do_test_md5(['-d', self.temp_dir], test_fun=preread_file) + return self.do_test_md5(['-d', self.temp_dir], test_fun=preread_file, + only_run_every=4) # just in case somebody calls this file as a script diff --git a/tests/oleobj/test_external_links.py b/tests/oleobj/test_external_links.py index 9c7e632..2b7fc5b 100644 --- a/tests/oleobj/test_external_links.py +++ b/tests/oleobj/test_external_links.py @@ -6,7 +6,7 @@ import os from os import path # Directory with test data, independent of current working directory -from tests.test_utils import DATA_BASE_DIR +from tests.test_utils import DATA_BASE_DIR, call_and_capture from oletools import oleobj BASE_DIR = path.join(DATA_BASE_DIR, 'oleobj', 'external_link') @@ -22,8 +22,11 @@ class TestExternalLinks(unittest.TestCase): for filename in filenames: file_path = path.join(dirpath, filename) - ret_val = oleobj.main([file_path]) - self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP) + output, ret_val = call_and_capture('oleobj', [file_path, ], + accept_nonzero_exit=True) + self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP, + msg='Wrong return value {} for {}. Output:\n{}' + .format(ret_val, filename, output)) # just in case somebody calls this file as a script diff --git a/tests/ooxml/test_zip_sub_file.py b/tests/ooxml/test_zip_sub_file.py index ac49fb5..6e6085b 100644 --- a/tests/ooxml/test_zip_sub_file.py +++ b/tests/ooxml/test_zip_sub_file.py @@ -144,15 +144,15 @@ class TestZipSubFile(unittest.TestCase): self.subfile.seek(0, os.SEEK_END) self.compare.seek(0, os.SEEK_END) - self.assertEquals(self.compare.read(10), self.subfile.read(10)) - self.assertEquals(self.compare.tell(), self.subfile.tell()) + self.assertEqual(self.compare.read(10), self.subfile.read(10)) + self.assertEqual(self.compare.tell(), self.subfile.tell()) self.subfile.seek(0) self.compare.seek(0) self.subfile.seek(len(FILE_CONTENTS) - 1) self.compare.seek(len(FILE_CONTENTS) - 1) - self.assertEquals(self.compare.read(10), self.subfile.read(10)) - self.assertEquals(self.compare.tell(), self.subfile.tell()) + self.assertEqual(self.compare.read(10), self.subfile.read(10)) + self.assertEqual(self.compare.tell(), self.subfile.tell()) def test_error_seek(self): """ test correct behaviour if seek beyond end (no exception) """ diff --git a/tests/test_utils/utils.py b/tests/test_utils/utils.py index 2b27131..45cedc8 100644 --- a/tests/test_utils/utils.py +++ b/tests/test_utils/utils.py @@ -47,6 +47,13 @@ def call_and_capture(module, args=None, accept_nonzero_exit=False, except KeyError: env['PYTHONPATH'] = SOURCE_BASE_DIR + # hack: in python2 output encoding (sys.stdout.encoding) was None + # although sys.getdefaultencoding() and sys.getfilesystemencoding were ok + # TODO: maybe can remove this once branch + # "encoding-for-non-unicode-environments" is merged + if 'PYTHONIOENCODING' not in env: + env['PYTHONIOENCODING'] = 'utf8' + # ensure args is a tuple my_args = tuple(args) if args else ()