diff --git a/oletools/crypto.py b/oletools/crypto.py index e78a7f5..bdcb66f 100644 --- a/oletools/crypto.py +++ b/oletools/crypto.py @@ -168,6 +168,7 @@ def enable_logging(): """ log.setLevel(logging.NOTSET) + def is_encrypted(some_file): """ Determine whether document contains encrypted content. @@ -197,17 +198,55 @@ def is_encrypted(some_file): :returns: True if (and only if) the file contains encrypted content """ log.debug('is_encrypted') - if isinstance(some_file, OleFileIO): - return is_encrypted_ole(some_file) # assume it is OleFileIO - if zipfile.is_zipfile(some_file): - return is_encrypted_zip(some_file) - # otherwise assume it is the name of an ole file - return is_encrypted_ole(OleFileIO(some_file)) + # ask msoffcrypto if possible + if check_msoffcrypto(): + log.debug('Checking for encryption using msoffcrypto') + file_handle = None + file_pos = None + try: + if isinstance(some_file, OleFileIO): + # TODO: hacky, replace once msoffcrypto-tools accepts OleFileIO + file_handle = some_file.fp + file_pos = file_handle.tell() + file_handle.seek(0) + else: + file_handle = open(some_file, 'rb') + + return msoffcrypto.OfficeFile(file_handle).is_encrypted() + + except Exception as exc: + log.warning('msoffcrypto failed to interpret file {} or determine ' + 'whether it is encrypted: {}' + .format(file_handle.name, exc)) + + finally: + try: + if file_pos is not None: # input was OleFileIO + file_handle.seek(file_pos) + else: # input was file name + file_handle.close() + except Exception as exc: + log.warning('Ignoring error during clean up: {}'.format(exc)) + + # if that failed, try ourselves with older and less accurate code + try: + if isinstance(some_file, OleFileIO): + return _is_encrypted_ole(some_file) + if zipfile.is_zipfile(some_file): + return _is_encrypted_zip(some_file) + # otherwise assume it is the name of an ole file + return _is_encrypted_ole(OleFileIO(some_file)) + except Exception as exc: + log.warning('Failed to check {} for encryption ({}); assume it is not ' + 'encrypted.'.format(some_file, exc)) -def is_encrypted_zip(filename): + return False + + +def _is_encrypted_zip(filename): """Specialization of :py:func:`is_encrypted` for zip-based files.""" - log.debug('is_encrypted_zip') + log.debug('Checking for encryption in zip file') # TODO: distinguish OpenXML from normal zip files # try to decrypt a few bytes from first entry with zipfile.ZipFile(filename, 'r') as zipper: @@ -220,9 +259,9 @@ def is_encrypted_zip(filename): return 'crypt' in str(rt_err) -def is_encrypted_ole(ole): +def _is_encrypted_ole(ole): """Specialization of :py:func:`is_encrypted` for ole files.""" - log.debug('is_encrypted_ole') + log.debug('Checking for encryption in OLE file') # check well known property for password protection # (this field may be missing for Powerpoint2000, for example) # TODO: check whether password protection always implies encryption. Could @@ -256,8 +295,6 @@ def is_encrypted_ole(ole): f_encrypted = (temp16 & 0x0100) >> 8 if f_encrypted: return True - except Exception: - raise finally: if stream is not None: stream.close() @@ -324,6 +361,8 @@ def decrypt(filename, passwords=None, **temp_file_args): crypto_file = msoffcrypto.OfficeFile(reader) except Exception as exc: # e.g. ppt, not yet supported by msoffcrypto if 'Unrecognized file format' in str(exc): + log.debug('Caught exception', exc_info=True) + # raise different exception without stack trace of original exc if sys.version_info.major == 2: raise UnsupportedEncryptionError(filename) @@ -337,6 +376,7 @@ def decrypt(filename, passwords=None, **temp_file_args): .format(filename)) for password in passwords: + log.debug('Trying to decrypt with password {!r}'.format(password)) write_descriptor = None write_handle = None decrypt_file = None @@ -354,6 +394,8 @@ def decrypt(filename, passwords=None, **temp_file_args): write_handle = None break except Exception: + log.debug('Failed to decrypt', exc_info=True) + # error-clean up: close everything and del temp file if write_handle: write_handle.close() @@ -363,4 +405,5 @@ def decrypt(filename, passwords=None, **temp_file_args): os.unlink(decrypt_file) decrypt_file = None # if we reach this, all passwords were tried without success + log.debug('All passwords failed') return decrypt_file diff --git a/oletools/msodde.py b/oletools/msodde.py index 0d5bf35..dd40c80 100644 --- a/oletools/msodde.py +++ b/oletools/msodde.py @@ -987,6 +987,9 @@ def process_maybe_encrypted(filepath, passwords=None, crypto_nesting=0, try: logger.debug('Trying to decrypt file') decrypted_file = crypto.decrypt(filepath, passwords) + if not decrypted_file: + logger.error('Decrypt failed, run with debug output to get details') + raise crypto.WrongEncryptionPassword(filepath) logger.info('Analyze decrypted file') result = process_maybe_encrypted(decrypted_file, passwords, crypto_nesting+1, **kwargs) diff --git a/oletools/olevba.py b/oletools/olevba.py index 3cf86f3..a8ed874 100644 --- a/oletools/olevba.py +++ b/oletools/olevba.py @@ -3893,6 +3893,7 @@ def process_file(filename, data, container, options, crypto_nesting=0): [crypto.WRITE_PROTECT_ENCRYPTION_PASSWORD, ] decrypted_file = crypto.decrypt(filename, passwords) if not decrypted_file: + log.error('Decrypt failed, run with debug output to get details') raise crypto.WrongEncryptionPassword(filename) log.info('Working on decrypted file') return process_file(decrypted_file, data, container or filename, diff --git a/tests/common/log_helper/log_helper_test_main.py b/tests/common/log_helper/log_helper_test_main.py index 98375fb..fb0ccca 100644 --- a/tests/common/log_helper/log_helper_test_main.py +++ b/tests/common/log_helper/log_helper_test_main.py @@ -34,12 +34,16 @@ def init_logging_and_log(args): level = args[-1] use_json = 'as-json' in args throw = 'throw' in args + percent_autoformat = '%-autoformat' in args if 'enable' in args: log_helper.enable_logging(use_json, level, stream=sys.stdout) _log() + if percent_autoformat: + logger.info('The %s is %d.', 'answer', 47) + if throw: raise Exception('An exception occurred before ending the logging') diff --git a/tests/common/log_helper/test_log_helper.py b/tests/common/log_helper/test_log_helper.py index d7034db..bcd0de0 100644 --- a/tests/common/log_helper/test_log_helper.py +++ b/tests/common/log_helper/test_log_helper.py @@ -112,6 +112,11 @@ class TestLogHelper(unittest.TestCase): ] self.assertEqual(jout, jexpect) + def test_percent_autoformat(self): + """Test that auto-formatting of log strings with `%` works.""" + output = self._run_test(['enable', '%-autoformat', 'info']) + self.assertIn('The answer is 47.', output) + def test_json_correct_on_exceptions(self): """ Test that even on unhandled exceptions our JSON is always correct diff --git a/tests/msodde/test_crypto.py b/tests/msodde/test_crypto.py index 2eb0273..187b60b 100644 --- a/tests/msodde/test_crypto.py +++ b/tests/msodde/test_crypto.py @@ -2,7 +2,7 @@ import sys import unittest -from os.path import join as pjoin +from os.path import basename, join as pjoin from tests.test_utils import DATA_BASE_DIR @@ -11,8 +11,8 @@ from oletools import msodde @unittest.skipIf(not crypto.check_msoffcrypto(), - 'Module msoffcrypto not installed for python{}.{}' - .format(sys.version_info.major, sys.version_info.minor)) + 'Module msoffcrypto not installed for {}' + .format(basename(sys.executable))) class MsoddeCryptoTest(unittest.TestCase): """Test integration of decryption in msodde.""" def test_standard_password(self): diff --git a/tests/oleid/test_basic.py b/tests/oleid/test_basic.py index e527fa2..ce4187a 100644 --- a/tests/oleid/test_basic.py +++ b/tests/oleid/test_basic.py @@ -20,7 +20,7 @@ class TestOleIDBasic(unittest.TestCase): """Run all file in test-data through oleid and compare to known ouput""" # this relies on order of indicators being constant, could relax that # Also requires that files have the correct suffixes (no rtf in doc) - NON_OLE_SUFFIXES = ('.xml', '.csv', '.rtf', '') + NON_OLE_SUFFIXES = ('.xml', '.csv', '.rtf', '', '.odt', '.ods', '.odp') NON_OLE_VALUES = (False, ) WORD = b'Microsoft Office Word' PPT = b'Microsoft Office PowerPoint' @@ -121,6 +121,33 @@ class TestOleIDBasic(unittest.TestCase): 'msodde/harmless-clean.docx': (False,), 'oleform/oleform-PR314.docm': (False,), 'basic/encrypted.docx': CRYPT, + 'oleobj/external_link/sample_with_external_link_to_doc.docx': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.xlsb': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.dotm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.xlsm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.pptx': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.dotx': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.docm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.potm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.xlsx': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.potx': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.ppsm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.pptm': (False,), + 'oleobj/external_link/sample_with_external_link_to_doc.ppsx': (False,), + 'encrypted/autostart-encrypt-standardpassword.xlsm': + (True, False, 'unknown', True, False, False, False, False, False, False, 0), + 'encrypted/autostart-encrypt-standardpassword.xls': + (True, True, EXCEL, True, False, True, True, False, False, False, 0), + 'encrypted/dde-test-encrypt-standardpassword.xlsx': + (True, False, 'unknown', True, False, False, False, False, False, False, 0), + 'encrypted/dde-test-encrypt-standardpassword.xlsm': + (True, False, 'unknown', True, False, False, False, False, False, False, 0), + 'encrypted/autostart-encrypt-standardpassword.xlsb': + (True, False, 'unknown', True, False, False, False, False, False, False, 0), + 'encrypted/dde-test-encrypt-standardpassword.xls': + (True, True, EXCEL, True, False, False, True, False, False, False, 0), + 'encrypted/dde-test-encrypt-standardpassword.xlsb': + (True, False, 'unknown', True, False, False, False, False, False, False, 0), } indicator_names = [] @@ -148,7 +175,8 @@ class TestOleIDBasic(unittest.TestCase): OLE_VALUES[name])) except KeyError: print('Should add oleid output for {} to {} ({})' - .format(name, __name__, values[3:])) + .format(name, __name__, values)) + # just in case somebody calls this file as a script if __name__ == '__main__': diff --git a/tests/olevba/test_basic.py b/tests/olevba/test_basic.py index 798fba5..28238fc 100644 --- a/tests/olevba/test_basic.py +++ b/tests/olevba/test_basic.py @@ -3,21 +3,71 @@ Test basic functionality of olevba[3] """ import unittest -import sys -if sys.version_info.major <= 2: - from oletools import olevba -else: - from oletools import olevba3 as olevba import os from os.path import join +import re # Directory with test data, independent of current working directory -from tests.test_utils import DATA_BASE_DIR +from tests.test_utils import DATA_BASE_DIR, call_and_capture class TestOlevbaBasic(unittest.TestCase): """Tests olevba basic functionality""" + def test_text_behaviour(self): + """Test behaviour of olevba when presented with pure text file.""" + self.do_test_behaviour('text') + + def test_empty_behaviour(self): + """Test behaviour of olevba when presented with pure text file.""" + self.do_test_behaviour('empty') + + def do_test_behaviour(self, filename): + """Helper for test_{text,empty}_behaviour.""" + input_file = join(DATA_BASE_DIR, 'basic', filename) + output, _ = call_and_capture('olevba', args=(input_file, )) + + # check output + self.assertTrue(re.search(r'^Type:\s+Text\s*$', output, re.MULTILINE), + msg='"Type: Text" not found in output:\n' + output) + self.assertTrue(re.search(r'^No suspicious .+ found.$', output, + re.MULTILINE), + msg='"No suspicous...found" not found in output:\n' + \ + output) + self.assertNotIn('error', output.lower()) + + # check warnings + for line in output.splitlines(): + if line.startswith('WARNING ') and 'encrypted' in line: + continue # encryption warnings are ok + elif 'warn' in line.lower(): + raise self.fail('Found "warn" in output line: "{}"' + .format(line.rstrip())) + self.assertIn('not encrypted', output) + + def test_rtf_behaviour(self): + """Test behaviour of olevba when presented with an rtf file.""" + input_file = join(DATA_BASE_DIR, 'msodde', 'RTF-Spec-1.7.rtf') + output, ret_code = call_and_capture('olevba', args=(input_file, ), + accept_nonzero_exit=True) + + # check that return code is olevba.RETURN_OPEN_ERROR + self.assertEqual(ret_code, 5) + + # check output: + self.assertIn('FileOpenError', output) + self.assertIn('is RTF', output) + self.assertIn('rtfobj.py', output) + self.assertIn('not encrypted', output) + + # check warnings + for line in output.splitlines(): + if line.startswith('WARNING ') and 'encrypted' in line: + continue # encryption warnings are ok + elif 'warn' in line.lower(): + raise self.fail('Found "warn" in output line: "{}"' + .format(line.rstrip())) + def test_crypt_return(self): """ Tests that encrypted files give a certain return code. @@ -28,23 +78,23 @@ class TestOlevbaBasic(unittest.TestCase): CRYPT_DIR = join(DATA_BASE_DIR, 'encrypted') CRYPT_RETURN_CODE = 9 ADD_ARGS = [], ['-d', ], ['-a', ], ['-j', ], ['-t', ] - EXCEPTIONS = ['autostart-encrypt-standardpassword.xlsm', # These ... - 'autostart-encrypt-standardpassword.xlsb', # files ... - 'dde-test-encrypt-standardpassword.xls', # are ... - 'dde-test-encrypt-standardpassword.xlsx', # decrypted - 'dde-test-encrypt-standardpassword.xlsm', # per ... - 'dde-test-encrypt-standardpassword.xlsb'] # default. + EXCEPTIONS = ['autostart-encrypt-standardpassword.xls', # These ... + 'autostart-encrypt-standardpassword.xlsm', # files ... + 'autostart-encrypt-standardpassword.xlsb', # are ... + 'dde-test-encrypt-standardpassword.xls', # automati... + 'dde-test-encrypt-standardpassword.xlsx', # ...cally... + 'dde-test-encrypt-standardpassword.xlsm', # decrypted. + 'dde-test-encrypt-standardpassword.xlsb'] for filename in os.listdir(CRYPT_DIR): if filename in EXCEPTIONS: continue full_name = join(CRYPT_DIR, filename) for args in ADD_ARGS: - try: - ret_code = olevba.main(args + [full_name, ]) - except SystemExit as se: - ret_code = se.code or 0 # se.code can be None + _, ret_code = call_and_capture('olevba', + args=[full_name, ] + args, + accept_nonzero_exit=True) self.assertEqual(ret_code, CRYPT_RETURN_CODE, - msg='Wrong return code {} for args {}' + msg='Wrong return code {} for args {}'\ .format(ret_code, args + [filename, ])) diff --git a/tests/olevba/test_crypto.py b/tests/olevba/test_crypto.py index b2dc84d..aad78df 100644 --- a/tests/olevba/test_crypto.py +++ b/tests/olevba/test_crypto.py @@ -2,20 +2,18 @@ import sys import unittest -import os -from os.path import join as pjoin -from subprocess import check_output, CalledProcessError +from os.path import basename, join as pjoin import json from collections import OrderedDict -from tests.test_utils import DATA_BASE_DIR, SOURCE_BASE_DIR +from tests.test_utils import DATA_BASE_DIR, call_and_capture from oletools import crypto @unittest.skipIf(not crypto.check_msoffcrypto(), - 'Module msoffcrypto not installed for python{}.{}' - .format(sys.version_info.major, sys.version_info.minor)) + 'Module msoffcrypto not installed for {}' + .format(basename(sys.executable))) class OlevbaCryptoWriteProtectTest(unittest.TestCase): """ Test documents that are 'write-protected' through encryption. @@ -34,25 +32,12 @@ class OlevbaCryptoWriteProtectTest(unittest.TestCase): """ def test_autostart(self): """Check that autostart macro is found in xls[mb] sample file.""" - # create a PYTHONPATH environment var to prefer our olevba - env = os.environ - try: - env['PYTHONPATH'] = SOURCE_BASE_DIR + os.pathsep + \ - os.environ['PYTHONPATH'] - except KeyError: - env['PYTHONPATH'] = SOURCE_BASE_DIR - for suffix in 'xlsm', 'xlsb': example_file = pjoin( DATA_BASE_DIR, 'encrypted', 'autostart-encrypt-standardpassword.' + suffix) - try: - output = check_output([sys.executable, '-m', 'olevba', '-j', - example_file], - universal_newlines=True, env=env) - except CalledProcessError as err: - print(err.output) - raise + output, _ = call_and_capture('olevba', args=('-j', example_file), + exclude_stderr=True) data = json.loads(output, object_pairs_hook=OrderedDict) # debug: json.dump(data, sys.stdout, indent=4) self.assertEqual(len(data), 4) diff --git a/tests/test-data/encrypted/autostart-encrypt-standardpassword.xls b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xls new file mode 100755 index 0000000..65c2ac7 --- /dev/null +++ b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xls diff --git a/tests/test_utils/__init__.py b/tests/test_utils/__init__.py index 5eda62a..16281fe 100644 --- a/tests/test_utils/__init__.py +++ b/tests/test_utils/__init__.py @@ -1,10 +1 @@ -from os.path import dirname, join, abspath - -# Base dir of project, contains subdirs "tests" and "oletools" and README.md -PROJECT_ROOT = dirname(dirname(dirname(abspath(__file__)))) - -# Directory with test data, independent of current working directory -DATA_BASE_DIR = join(PROJECT_ROOT, 'tests', 'test-data') - -# Directory with source code -SOURCE_BASE_DIR = join(PROJECT_ROOT, 'oletools') +from .utils import * diff --git a/tests/test_utils/utils.py b/tests/test_utils/utils.py new file mode 100644 index 0000000..2b27131 --- /dev/null +++ b/tests/test_utils/utils.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +"""Utils generally useful for unittests.""" + +import sys +import os +from os.path import dirname, join, abspath +from subprocess import check_output, PIPE, STDOUT, CalledProcessError + + +# Base dir of project, contains subdirs "tests" and "oletools" and README.md +PROJECT_ROOT = dirname(dirname(dirname(abspath(__file__)))) + +# Directory with test data, independent of current working directory +DATA_BASE_DIR = join(PROJECT_ROOT, 'tests', 'test-data') + +# Directory with source code +SOURCE_BASE_DIR = join(PROJECT_ROOT, 'oletools') + + +def call_and_capture(module, args=None, accept_nonzero_exit=False, + exclude_stderr=False): + """ + Run module as script, capturing and returning output and return code. + + This is the best way to capture a module's stdout and stderr; trying to + modify sys.stdout/sys.stderr to StringIO-Buffers frequently causes trouble. + + Only drawback sofar: stdout and stderr are merged into one (which is + what users see on their shell as well). When testing for json-compatible + output you should `exclude_stderr` to `False` since logging ignores stderr, + so unforseen warnings (e.g. issued by pypy) would mess up your json. + + :param str module: name of module to test, e.g. `olevba` + :param args: arguments for module's main function + :param bool fail_nonzero: Raise error if command returns non-0 return code + :param bool exclude_stderr: Exclude output to `sys.stderr` from output + (e.g. if parsing output through json) + :returns: ret_code, output + :rtype: int, str + """ + # create a PYTHONPATH environment var to prefer our current code + env = os.environ.copy() + try: + env['PYTHONPATH'] = SOURCE_BASE_DIR + os.pathsep + \ + os.environ['PYTHONPATH'] + except KeyError: + env['PYTHONPATH'] = SOURCE_BASE_DIR + + # ensure args is a tuple + my_args = tuple(args) if args else () + + ret_code = -1 + try: + output = check_output((sys.executable, '-m', module) + my_args, + universal_newlines=True, env=env, + stderr=PIPE if exclude_stderr else STDOUT) + ret_code = 0 + + except CalledProcessError as err: + if accept_nonzero_exit: + ret_code = err.returncode + output = err.output + else: + print(err.output) + raise + + return output, ret_code