diff --git a/oletools/msodde.py b/oletools/msodde.py index fd121b2..a425513 100644 --- a/oletools/msodde.py +++ b/oletools/msodde.py @@ -6,7 +6,7 @@ msodde is a script to parse MS Office documents (e.g. Word, Excel), to detect and extract DDE links. Supported formats: -- Word 2007+ (.docx, .dotx, .docm, .dotm) +- Word 97-2003 (.doc, .dot), Word 2007+ (.docx, .dotx, .docm, .dotm) Author: Philippe Lagadec - http://www.decalage.info License: BSD, see source code or documentation @@ -46,8 +46,10 @@ from __future__ import print_function # CHANGELOG: # 2017-10-18 v0.52 PL: - first version # 2017-10-20 PL: - fixed issue #202 (handling empty xml tags) +# 2017-10-25 CH: - add json output +# 2017-10-25 CH: - parse doc -__version__ = '0.52dev2' +__version__ = '0.52dev3' #------------------------------------------------------------------------------ # TODO: detect beginning/end of fields, to separate each field @@ -71,7 +73,14 @@ import argparse import zipfile import os import sys +import json +from oletools.thirdparty import olefile + +# === PYTHON 2+3 SUPPORT ====================================================== + +if sys.version_info[0] >= 3: + unichr = chr # === CONSTANTS ============================================================== @@ -83,23 +92,195 @@ TAG_W_INSTRTEXT = '{%s}instrText' % NS_WORD TAG_W_FLDSIMPLE = '{%s}fldSimple' % NS_WORD TAG_W_INSTRATTR= '{%s}instr' % NS_WORD -# === FUNCTIONS ============================================================== +# banner to be printed at program start +BANNER = """msodde %s - http://decalage.info/python/oletools +THIS IS WORK IN PROGRESS - Check updates regularly! +Please report any issue at https://github.com/decalage2/oletools/issues +""" % __version__ -def process_args(): - parser = argparse.ArgumentParser(description='A python tool to detect and extract DDE links in MS Office files') - parser.add_argument("filepath", help="path of the file to be analyzed") +BANNER_JSON = dict(type='meta', version=__version__, name='msodde', + link='http://decalage.info/python/oletools', + message='THIS IS WORK IN PROGRESS - Check updates regularly! ' + 'Please report any issue at ' + 'https://github.com/decalage2/oletools/issues') - args = parser.parse_args() +# === ARGUMENT PARSING ======================================================= - if not os.path.exists(args.filepath): - print('File {} does not exist.'.format(args.filepath)) - sys.exit(1) +class ArgParserWithBanner(argparse.ArgumentParser): + """ Print banner before showing any error """ + def error(self, message): + print(BANNER) + super(ArgParserWithBanner, self).error(message) - return args +def existing_file(filename): + """ called by argument parser to see whether given file exists """ + if not os.path.exists(filename): + raise argparse.ArgumentTypeError('File {0} does not exist.' + .format(filename)) + return filename -def process_file(filepath): +def process_args(cmd_line_args=None): + parser = ArgParserWithBanner(description='A python tool to detect and extract DDE links in MS Office files') + parser.add_argument("filepath", help="path of the file to be analyzed", + type=existing_file, metavar='FILE') + parser.add_argument("--json", '-j', action='store_true', + help="Output in json format") + + return parser.parse_args(cmd_line_args) + + +# === FUNCTIONS ============================================================== + +# from [MS-DOC], section 2.8.25 (PlcFld): +# A field consists of two parts: field instructions and, optionally, a result. All fields MUST begin with +# Unicode character 0x0013 with sprmCFSpec applied with a value of 1. This is the field begin +# character. All fields MUST end with a Unicode character 0x0015 with sprmCFSpec applied with a value +# of 1. This is the field end character. If the field has a result, then there MUST be a Unicode character +# 0x0014 with sprmCFSpec applied with a value of 1 somewhere between the field begin character and +# the field end character. This is the field separator. The field result is the content between the field +# separator and the field end character. The field instructions are the content between the field begin +# character and the field separator, if one is present, or between the field begin character and the field +# end character if no separator is present. The field begin character, field end character, and field +# separator are collectively referred to as field characters. + + +def process_ole_field(data): + """ check if field instructions start with DDE + + expects unicode input, returns unicode output (empty if not dde) """ + #print('processing field \'{0}\''.format(data)) + + if data.lstrip().lower().startswith(u'dde'): + #print('--> is DDE!') + return data + else: + return u'' + + +OLE_FIELD_START = 0x13 +OLE_FIELD_SEP = 0x14 +OLE_FIELD_END = 0x15 +OLE_FIELD_MAX_SIZE = 1000 # max field size to analyze, rest is ignored + + +def process_ole_stream(stream): + """ find dde links in single ole stream + + since ole file stream are subclasses of io.BytesIO, they are buffered, so + reading char-wise is not that bad performanc-wise """ + + have_start = False + have_sep = False + field_contents = None + result_parts = [] + max_size_exceeded = False + idx = -1 + while True: + idx += 1 + char = stream.read(1) # loop over every single byte + if len(char) == 0: + break + else: + char = ord(char) + + if char == OLE_FIELD_START: + #print('DEBUG: have start at {}'.format(idx)) + #if have_start: + # print("DEBUG: dismissing previous contents of length {}" + # .format(len(field_contents))) + have_start = True + have_sep = False + max_size_exceeded = False + field_contents = u'' + continue + elif not have_start: + continue + + # now we are after start char but not at end yet + if char == OLE_FIELD_SEP: + #print('DEBUG: have sep at {}'.format(idx)) + have_sep = True + elif char == OLE_FIELD_END: + #print('DEBUG: have end at {}'.format(idx)) + + # have complete field now, process it + result_parts.append(process_ole_field(field_contents)) + + # re-set variables for next field + have_start = False + have_sep = False + field_contents = None + elif not have_sep: + # check that array does not get too long by accident + if max_size_exceeded: + pass + elif len(field_contents) > OLE_FIELD_MAX_SIZE: + #print('DEBUG: exceeded max size') + max_size_exceeded = True + + # appending a raw byte to a unicode string here. Not clean but + # all we do later is check for the ascii-sequence 'DDE' later... + elif char < 128: + field_contents += unichr(char) + #print('DEBUG: at idx {:4d}: add byte {} ({})' + # .format(idx, unichr(char), char)) + else: + field_contents += u'?' + #print('DEBUG: at idx {:4d}: add byte ? ({})' + # .format(idx, char)) + #print('\nstream len = {}'.format(idx)) + + # copy behaviour of process_xml: Just concatenate unicode strings + return u''.join(result_parts) + + +def process_ole_storage(ole): + """ process a "directory" inside an ole stream """ + results = [] + for st in ole.listdir(streams=True, storages=True): + st_type = ole.get_type(st) + if st_type == olefile.STGTY_STREAM: # a stream + stream = None + links = '' + try: + stream = ole.openstream(st) + #print('Checking stream {0}'.format(st)) + links = process_ole_stream(stream) + except: + raise + finally: + if stream: + stream.close() + if links: + results.append(links) + elif st_type == olefile.STGTY_STORAGE: # a storage + #print('Checking storage {0}'.format(st)) + links = process_ole_storage(st) + if links: + results.extend(links) + else: + #print('Warning: unexpected type {0} for entry {1}. Ignore it' + # .format(st_type, st)) + continue + return results + + +def process_ole(filepath): + """ find dde links in ole file + + like process_xml, returns a concatenated unicode string of dde links or + empty if none were found. dde-links will still being with the dde[auto] key + word (possibly after some whitespace) + """ + #print('Looks like ole') + ole = olefile.OleFileIO(filepath, path_encoding=None) + text_parts = process_ole_storage(ole) + return u'\n'.join(text_parts) + + +def process_xml(filepath): z = zipfile.ZipFile(filepath) data = z.read('word/document.xml') z.close() @@ -117,26 +298,63 @@ def process_file(filepath): # concatenate the attribute of the field, if present: if elem.attrib is not None: text += elem.attrib[TAG_W_INSTRATTR] - return text -#=== MAIN ================================================================= +def process_file(filepath): + """ decides to either call process_xml or process_ole """ + if olefile.isOleFile(filepath): + return process_ole(filepath) + else: + return process_xml(filepath) + -def main(): - # print banner with version - print ('msodde %s - http://decalage.info/python/oletools' % __version__) - print ('THIS IS WORK IN PROGRESS - Check updates regularly!') - print ('Please report any issue at https://github.com/decalage2/oletools/issues') - print ('') +#=== MAIN ================================================================= - args = process_args() - print('Opening file: %s' % args.filepath) - text = process_file(args.filepath) - print ('DDE Links:') - print(text) +def main(cmd_line_args=None): + """ Main function, called if this file is called as a script + + Optional argument: command line arguments to be forwarded to ArgumentParser + in process_args. Per default (cmd_line_args=None), sys.argv is used. Option + mainly added for unit-testing + """ + args = process_args(cmd_line_args) + + if args.json: + jout = [] + jout.append(BANNER_JSON) + else: + # print banner with version + print(BANNER) + + if not args.json: + print('Opening file: %s' % args.filepath) + + text = '' + return_code = 1 + try: + text = process_file(args.filepath) + return_code = 0 + except Exception as exc: + if args.json: + jout.append(dict(type='error', error=type(exc).__name__, + message=str(exc))) # strange: str(exc) is enclosed in "" + else: + raise + + if args.json: + for line in text.splitlines(): + jout.append(dict(type='dde-link', link=line.strip())) + json.dump(jout, sys.stdout, check_circular=False, indent=4) + print() # add a newline after closing "]" + return return_code # required if we catch an exception in json-mode + else: + print ('DDE Links:') + print(text) + + return return_code if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/tests/howto_add_unittests.txt b/tests/howto_add_unittests.txt new file mode 100644 index 0000000..3178741 --- /dev/null +++ b/tests/howto_add_unittests.txt @@ -0,0 +1,37 @@ +Howto: Add unittests +-------------------- + +For helping python's unittest to discover your tests, do the +following: + +* create a subdirectory within oletools/tests/ + - The directory name must be a valid python package name, + so must not include '-', for example + - e.g. oletools/tests/my_feature + +* Create a __init__.py inside that directory + - can be empty but must be there + +* Copy the unittest_template.py into your test directory + +* Rename your copy of the template to fit its purpose + - file name must start with 'test' and end with '.py' + - e.g. oletools/tests/my_feature/test_bla.py + +* Create python code inside that directory + - classes names must start with Test and must be subclasses + of Unittest.TestCase + - test functions inside your test cases must start with test_ + - see unittest_template.py for examples + +* If your unit test requires test files, put them into a subdir + of oletools/tests/test-data with some name that clarifies what + tests it belongs to + - e.g. oletools/tests/test-data/my_feature/example.doc + - Do not add files with actual evil malware macros! Only harmless + test data! + +* Test that unittests work by running from the oletools base dir: + python -m unittest discover -v + +* Re-test with python2 and python3 (if possible) diff --git a/tests/msodde_doc/__init__.py b/tests/msodde_doc/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/msodde_doc/__init__.py diff --git a/tests/msodde_doc/test_basic.py b/tests/msodde_doc/test_basic.py new file mode 100644 index 0000000..0d366b1 --- /dev/null +++ b/tests/msodde_doc/test_basic.py @@ -0,0 +1,119 @@ +""" Test some basic behaviour of msodde.py + +Ensure that +- doc and docx are read without error +- garbage returns error return status +- dde-links are found where appropriate +""" + +from __future__ import print_function + +import unittest +from oletools import msodde +import shlex +from os.path import join, dirname, normpath +import sys + +# python 2/3 version conflict: +if sys.version_info.major <= 2: + from StringIO import StringIO + #from io import BytesIO as StringIO - try if print() gives UnicodeError +else: + from io import StringIO + + +# base directory for test input +BASE_DIR = normpath(join(dirname(__file__), '..', 'test-data')) + + +class TestReturnCode(unittest.TestCase): + + def test_valid_doc(self): + """ check that a valid doc file leads to 0 exit status """ + print(join(BASE_DIR, 'msodde-doc/test_document.doc')) + self.do_test_validity(join(BASE_DIR, 'msodde-doc/test_document.doc')) + + def test_valid_docx(self): + """ check that a valid docx file leads to 0 exit status """ + self.do_test_validity(join(BASE_DIR, 'msodde-doc/test_document.docx')) + + def test_invalid_none(self): + """ check that no file argument leads to non-zero exit status """ + self.do_test_validity('', True) + + def test_invalid_empty(self): + """ check that empty file argument leads to non-zero exit status """ + self.do_test_validity(join(BASE_DIR, 'basic/empty'), True) + + def test_invalid_text(self): + """ check that text file argument leads to non-zero exit status """ + self.do_test_validity(join(BASE_DIR, 'basic/text'), True) + + def do_test_validity(self, args, expect_error=False): + """ helper for test_valid_doc[x] """ + args = shlex.split(args) + return_code = -1 + have_exception = False + try: + return_code = msodde.main(args) + except Exception: + have_exception = True + except SystemExit as se: # sys.exit() was called + return_code = se.code + if se.code is None: + return_code = 0 + + self.assertEqual(expect_error, have_exception or (return_code != 0)) + + +class OutputCapture: + """ context manager that captures stdout """ + + def __init__(self): + self.output = StringIO() # in py2, this actually is BytesIO + + def __enter__(self): + sys.stdout = self.output + return self + + def __exit__(self, exc_type, exc_value, traceback): + sys.stdout = sys.__stdout__ # re-set to original + + if exc_type: # there has been an error + print('Got error during output capture!') + print('Print captured output and re-raise:') + for line in self.output.getvalue().splitlines(): + print(line.rstrip()) # print output before re-raising + + def __iter__(self): + for line in self.output.getvalue().splitlines(): + yield line.rstrip() # remove newline at end of line + + +class TestDdeInDoc(unittest.TestCase): + + def test_with_dde(self): + """ check that dde links appear on stdout """ + with OutputCapture() as capturer: + msodde.main([join(BASE_DIR, 'msodde-doc', 'dde-test.doc')]) + + for line in capturer: + print(line) + pass # we just want to get the last line + + self.assertNotEqual(len(line.strip()), 0) + + def test_no_dde(self): + """ check that no dde links appear on stdout """ + with OutputCapture() as capturer: + msodde.main([join(BASE_DIR, 'msodde-doc', 'test_document.doc')]) + + for line in capturer: + print(line) + pass # we just want to get the last line + + self.assertEqual(line.strip(), '') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test-data/basic/empty b/tests/test-data/basic/empty new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/test-data/basic/empty diff --git a/tests/test-data/basic/text b/tests/test-data/basic/text new file mode 100644 index 0000000..a7f8d9e --- /dev/null +++ b/tests/test-data/basic/text @@ -0,0 +1 @@ +bla diff --git a/tests/test-data/msodde-doc/dde-test.doc b/tests/test-data/msodde-doc/dde-test.doc new file mode 100644 index 0000000..da5562c --- /dev/null +++ b/tests/test-data/msodde-doc/dde-test.doc diff --git a/tests/test-data/msodde-doc/test_document.doc b/tests/test-data/msodde-doc/test_document.doc new file mode 100644 index 0000000..2c1768f --- /dev/null +++ b/tests/test-data/msodde-doc/test_document.doc diff --git a/tests/test-data/msodde-doc/test_document.docx b/tests/test-data/msodde-doc/test_document.docx new file mode 100644 index 0000000..4dd2265 --- /dev/null +++ b/tests/test-data/msodde-doc/test_document.docx diff --git a/tests/unittest_template.py b/tests/unittest_template.py new file mode 100644 index 0000000..a5c2cb6 --- /dev/null +++ b/tests/unittest_template.py @@ -0,0 +1,37 @@ +""" Test my new feature + +Some more info if you want + +Should work with python2 and python3! +""" + +import unittest + +# if you need data from oletools/test-data/DIR/, uncomment these lines: +#from os.path import join, dirname, normpath +#Directory with test data, independent of current working directory +#DATA_DIR = normpath(join(dirname(__file__), '..', 'test-data', 'DIR')) + + +class TestMyFeature(unittest.TestCase): + """ Tests my cool new feature """ + + def test_this(self): + """ check that this works """ + pass # your code here + + def test_that(self): + """ check that that also works """ + pass # your code here + + def helper_function(self, filename): + """ to be called from other test functions to avoid copy-and-paste + + this is not called by unittest directly, only from your functions """ + pass # your code here + # e.g.: msodde.main(join(DATA_DIR, filename)) + + +# just in case somebody calls this file as a script +if __name__ == '__main__': + unittest.main()