Commit d4b8e77b767ae034296a1b2772dd0492fcb74e89

Authored by Philippe Lagadec
Committed by GitHub
2 parents 281a2e3c be6bdfa6

Merge pull request #208 from christian-intra2net/doc-in-msodde

Doc in msodde (and json output)
oletools/msodde.py
@@ -6,7 +6,7 @@ msodde is a script to parse MS Office documents @@ -6,7 +6,7 @@ msodde is a script to parse MS Office documents
6 (e.g. Word, Excel), to detect and extract DDE links. 6 (e.g. Word, Excel), to detect and extract DDE links.
7 7
8 Supported formats: 8 Supported formats:
9 -- Word 2007+ (.docx, .dotx, .docm, .dotm) 9 +- Word 97-2003 (.doc, .dot), Word 2007+ (.docx, .dotx, .docm, .dotm)
10 10
11 Author: Philippe Lagadec - http://www.decalage.info 11 Author: Philippe Lagadec - http://www.decalage.info
12 License: BSD, see source code or documentation 12 License: BSD, see source code or documentation
@@ -46,8 +46,10 @@ from __future__ import print_function @@ -46,8 +46,10 @@ from __future__ import print_function
46 # CHANGELOG: 46 # CHANGELOG:
47 # 2017-10-18 v0.52 PL: - first version 47 # 2017-10-18 v0.52 PL: - first version
48 # 2017-10-20 PL: - fixed issue #202 (handling empty xml tags) 48 # 2017-10-20 PL: - fixed issue #202 (handling empty xml tags)
  49 +# 2017-10-25 CH: - add json output
  50 +# 2017-10-25 CH: - parse doc
49 51
50 -__version__ = '0.52dev2' 52 +__version__ = '0.52dev3'
51 53
52 #------------------------------------------------------------------------------ 54 #------------------------------------------------------------------------------
53 # TODO: detect beginning/end of fields, to separate each field 55 # TODO: detect beginning/end of fields, to separate each field
@@ -71,7 +73,14 @@ import argparse @@ -71,7 +73,14 @@ import argparse
71 import zipfile 73 import zipfile
72 import os 74 import os
73 import sys 75 import sys
  76 +import json
74 77
  78 +from oletools.thirdparty import olefile
  79 +
  80 +# === PYTHON 2+3 SUPPORT ======================================================
  81 +
  82 +if sys.version_info[0] >= 3:
  83 + unichr = chr
75 84
76 # === CONSTANTS ============================================================== 85 # === CONSTANTS ==============================================================
77 86
@@ -83,23 +92,195 @@ TAG_W_INSTRTEXT = '{%s}instrText' % NS_WORD @@ -83,23 +92,195 @@ TAG_W_INSTRTEXT = '{%s}instrText' % NS_WORD
83 TAG_W_FLDSIMPLE = '{%s}fldSimple' % NS_WORD 92 TAG_W_FLDSIMPLE = '{%s}fldSimple' % NS_WORD
84 TAG_W_INSTRATTR= '{%s}instr' % NS_WORD 93 TAG_W_INSTRATTR= '{%s}instr' % NS_WORD
85 94
86 -# === FUNCTIONS ============================================================== 95 +# banner to be printed at program start
  96 +BANNER = """msodde %s - http://decalage.info/python/oletools
  97 +THIS IS WORK IN PROGRESS - Check updates regularly!
  98 +Please report any issue at https://github.com/decalage2/oletools/issues
  99 +""" % __version__
87 100
88 -def process_args():  
89 - parser = argparse.ArgumentParser(description='A python tool to detect and extract DDE links in MS Office files')  
90 - parser.add_argument("filepath", help="path of the file to be analyzed") 101 +BANNER_JSON = dict(type='meta', version=__version__, name='msodde',
  102 + link='http://decalage.info/python/oletools',
  103 + message='THIS IS WORK IN PROGRESS - Check updates regularly! '
  104 + 'Please report any issue at '
  105 + 'https://github.com/decalage2/oletools/issues')
91 106
92 - args = parser.parse_args() 107 +# === ARGUMENT PARSING =======================================================
93 108
94 - if not os.path.exists(args.filepath):  
95 - print('File {} does not exist.'.format(args.filepath))  
96 - sys.exit(1) 109 +class ArgParserWithBanner(argparse.ArgumentParser):
  110 + """ Print banner before showing any error """
  111 + def error(self, message):
  112 + print(BANNER)
  113 + super(ArgParserWithBanner, self).error(message)
97 114
98 - return args  
99 115
  116 +def existing_file(filename):
  117 + """ called by argument parser to see whether given file exists """
  118 + if not os.path.exists(filename):
  119 + raise argparse.ArgumentTypeError('File {0} does not exist.'
  120 + .format(filename))
  121 + return filename
100 122
101 123
102 -def process_file(filepath): 124 +def process_args(cmd_line_args=None):
  125 + parser = ArgParserWithBanner(description='A python tool to detect and extract DDE links in MS Office files')
  126 + parser.add_argument("filepath", help="path of the file to be analyzed",
  127 + type=existing_file, metavar='FILE')
  128 + parser.add_argument("--json", '-j', action='store_true',
  129 + help="Output in json format")
  130 +
  131 + return parser.parse_args(cmd_line_args)
  132 +
  133 +
  134 +# === FUNCTIONS ==============================================================
  135 +
  136 +# from [MS-DOC], section 2.8.25 (PlcFld):
  137 +# A field consists of two parts: field instructions and, optionally, a result. All fields MUST begin with
  138 +# Unicode character 0x0013 with sprmCFSpec applied with a value of 1. This is the field begin
  139 +# character. All fields MUST end with a Unicode character 0x0015 with sprmCFSpec applied with a value
  140 +# of 1. This is the field end character. If the field has a result, then there MUST be a Unicode character
  141 +# 0x0014 with sprmCFSpec applied with a value of 1 somewhere between the field begin character and
  142 +# the field end character. This is the field separator. The field result is the content between the field
  143 +# separator and the field end character. The field instructions are the content between the field begin
  144 +# character and the field separator, if one is present, or between the field begin character and the field
  145 +# end character if no separator is present. The field begin character, field end character, and field
  146 +# separator are collectively referred to as field characters.
  147 +
  148 +
  149 +def process_ole_field(data):
  150 + """ check if field instructions start with DDE
  151 +
  152 + expects unicode input, returns unicode output (empty if not dde) """
  153 + #print('processing field \'{0}\''.format(data))
  154 +
  155 + if data.lstrip().lower().startswith(u'dde'):
  156 + #print('--> is DDE!')
  157 + return data
  158 + else:
  159 + return u''
  160 +
  161 +
  162 +OLE_FIELD_START = 0x13
  163 +OLE_FIELD_SEP = 0x14
  164 +OLE_FIELD_END = 0x15
  165 +OLE_FIELD_MAX_SIZE = 1000 # max field size to analyze, rest is ignored
  166 +
  167 +
  168 +def process_ole_stream(stream):
  169 + """ find dde links in single ole stream
  170 +
  171 + since ole file stream are subclasses of io.BytesIO, they are buffered, so
  172 + reading char-wise is not that bad performanc-wise """
  173 +
  174 + have_start = False
  175 + have_sep = False
  176 + field_contents = None
  177 + result_parts = []
  178 + max_size_exceeded = False
  179 + idx = -1
  180 + while True:
  181 + idx += 1
  182 + char = stream.read(1) # loop over every single byte
  183 + if len(char) == 0:
  184 + break
  185 + else:
  186 + char = ord(char)
  187 +
  188 + if char == OLE_FIELD_START:
  189 + #print('DEBUG: have start at {}'.format(idx))
  190 + #if have_start:
  191 + # print("DEBUG: dismissing previous contents of length {}"
  192 + # .format(len(field_contents)))
  193 + have_start = True
  194 + have_sep = False
  195 + max_size_exceeded = False
  196 + field_contents = u''
  197 + continue
  198 + elif not have_start:
  199 + continue
  200 +
  201 + # now we are after start char but not at end yet
  202 + if char == OLE_FIELD_SEP:
  203 + #print('DEBUG: have sep at {}'.format(idx))
  204 + have_sep = True
  205 + elif char == OLE_FIELD_END:
  206 + #print('DEBUG: have end at {}'.format(idx))
  207 +
  208 + # have complete field now, process it
  209 + result_parts.append(process_ole_field(field_contents))
  210 +
  211 + # re-set variables for next field
  212 + have_start = False
  213 + have_sep = False
  214 + field_contents = None
  215 + elif not have_sep:
  216 + # check that array does not get too long by accident
  217 + if max_size_exceeded:
  218 + pass
  219 + elif len(field_contents) > OLE_FIELD_MAX_SIZE:
  220 + #print('DEBUG: exceeded max size')
  221 + max_size_exceeded = True
  222 +
  223 + # appending a raw byte to a unicode string here. Not clean but
  224 + # all we do later is check for the ascii-sequence 'DDE' later...
  225 + elif char < 128:
  226 + field_contents += unichr(char)
  227 + #print('DEBUG: at idx {:4d}: add byte {} ({})'
  228 + # .format(idx, unichr(char), char))
  229 + else:
  230 + field_contents += u'?'
  231 + #print('DEBUG: at idx {:4d}: add byte ? ({})'
  232 + # .format(idx, char))
  233 + #print('\nstream len = {}'.format(idx))
  234 +
  235 + # copy behaviour of process_xml: Just concatenate unicode strings
  236 + return u''.join(result_parts)
  237 +
  238 +
  239 +def process_ole_storage(ole):
  240 + """ process a "directory" inside an ole stream """
  241 + results = []
  242 + for st in ole.listdir(streams=True, storages=True):
  243 + st_type = ole.get_type(st)
  244 + if st_type == olefile.STGTY_STREAM: # a stream
  245 + stream = None
  246 + links = ''
  247 + try:
  248 + stream = ole.openstream(st)
  249 + #print('Checking stream {0}'.format(st))
  250 + links = process_ole_stream(stream)
  251 + except:
  252 + raise
  253 + finally:
  254 + if stream:
  255 + stream.close()
  256 + if links:
  257 + results.append(links)
  258 + elif st_type == olefile.STGTY_STORAGE: # a storage
  259 + #print('Checking storage {0}'.format(st))
  260 + links = process_ole_storage(st)
  261 + if links:
  262 + results.extend(links)
  263 + else:
  264 + #print('Warning: unexpected type {0} for entry {1}. Ignore it'
  265 + # .format(st_type, st))
  266 + continue
  267 + return results
  268 +
  269 +
  270 +def process_ole(filepath):
  271 + """ find dde links in ole file
  272 +
  273 + like process_xml, returns a concatenated unicode string of dde links or
  274 + empty if none were found. dde-links will still being with the dde[auto] key
  275 + word (possibly after some whitespace)
  276 + """
  277 + #print('Looks like ole')
  278 + ole = olefile.OleFileIO(filepath, path_encoding=None)
  279 + text_parts = process_ole_storage(ole)
  280 + return u'\n'.join(text_parts)
  281 +
  282 +
  283 +def process_xml(filepath):
103 z = zipfile.ZipFile(filepath) 284 z = zipfile.ZipFile(filepath)
104 data = z.read('word/document.xml') 285 data = z.read('word/document.xml')
105 z.close() 286 z.close()
@@ -117,26 +298,63 @@ def process_file(filepath): @@ -117,26 +298,63 @@ def process_file(filepath):
117 # concatenate the attribute of the field, if present: 298 # concatenate the attribute of the field, if present:
118 if elem.attrib is not None: 299 if elem.attrib is not None:
119 text += elem.attrib[TAG_W_INSTRATTR] 300 text += elem.attrib[TAG_W_INSTRATTR]
120 -  
121 301
122 return text 302 return text
123 303
124 304
125 -#=== MAIN ================================================================= 305 +def process_file(filepath):
  306 + """ decides to either call process_xml or process_ole """
  307 + if olefile.isOleFile(filepath):
  308 + return process_ole(filepath)
  309 + else:
  310 + return process_xml(filepath)
  311 +
126 312
127 -def main():  
128 - # print banner with version  
129 - print ('msodde %s - http://decalage.info/python/oletools' % __version__)  
130 - print ('THIS IS WORK IN PROGRESS - Check updates regularly!')  
131 - print ('Please report any issue at https://github.com/decalage2/oletools/issues')  
132 - print ('') 313 +#=== MAIN =================================================================
133 314
134 - args = process_args()  
135 - print('Opening file: %s' % args.filepath)  
136 - text = process_file(args.filepath)  
137 - print ('DDE Links:')  
138 - print(text) 315 +def main(cmd_line_args=None):
  316 + """ Main function, called if this file is called as a script
  317 +
  318 + Optional argument: command line arguments to be forwarded to ArgumentParser
  319 + in process_args. Per default (cmd_line_args=None), sys.argv is used. Option
  320 + mainly added for unit-testing
  321 + """
  322 + args = process_args(cmd_line_args)
  323 +
  324 + if args.json:
  325 + jout = []
  326 + jout.append(BANNER_JSON)
  327 + else:
  328 + # print banner with version
  329 + print(BANNER)
  330 +
  331 + if not args.json:
  332 + print('Opening file: %s' % args.filepath)
  333 +
  334 + text = ''
  335 + return_code = 1
  336 + try:
  337 + text = process_file(args.filepath)
  338 + return_code = 0
  339 + except Exception as exc:
  340 + if args.json:
  341 + jout.append(dict(type='error', error=type(exc).__name__,
  342 + message=str(exc))) # strange: str(exc) is enclosed in ""
  343 + else:
  344 + raise
  345 +
  346 + if args.json:
  347 + for line in text.splitlines():
  348 + jout.append(dict(type='dde-link', link=line.strip()))
  349 + json.dump(jout, sys.stdout, check_circular=False, indent=4)
  350 + print() # add a newline after closing "]"
  351 + return return_code # required if we catch an exception in json-mode
  352 + else:
  353 + print ('DDE Links:')
  354 + print(text)
  355 +
  356 + return return_code
139 357
140 358
141 if __name__ == '__main__': 359 if __name__ == '__main__':
142 - main() 360 + sys.exit(main())
tests/howto_add_unittests.txt 0 → 100644
  1 +Howto: Add unittests
  2 +--------------------
  3 +
  4 +For helping python's unittest to discover your tests, do the
  5 +following:
  6 +
  7 +* create a subdirectory within oletools/tests/
  8 + - The directory name must be a valid python package name,
  9 + so must not include '-', for example
  10 + - e.g. oletools/tests/my_feature
  11 +
  12 +* Create a __init__.py inside that directory
  13 + - can be empty but must be there
  14 +
  15 +* Copy the unittest_template.py into your test directory
  16 +
  17 +* Rename your copy of the template to fit its purpose
  18 + - file name must start with 'test' and end with '.py'
  19 + - e.g. oletools/tests/my_feature/test_bla.py
  20 +
  21 +* Create python code inside that directory
  22 + - classes names must start with Test and must be subclasses
  23 + of Unittest.TestCase
  24 + - test functions inside your test cases must start with test_
  25 + - see unittest_template.py for examples
  26 +
  27 +* If your unit test requires test files, put them into a subdir
  28 + of oletools/tests/test-data with some name that clarifies what
  29 + tests it belongs to
  30 + - e.g. oletools/tests/test-data/my_feature/example.doc
  31 + - Do not add files with actual evil malware macros! Only harmless
  32 + test data!
  33 +
  34 +* Test that unittests work by running from the oletools base dir:
  35 + python -m unittest discover -v
  36 +
  37 +* Re-test with python2 and python3 (if possible)
tests/msodde_doc/__init__.py 0 → 100644
tests/msodde_doc/test_basic.py 0 → 100644
  1 +""" Test some basic behaviour of msodde.py
  2 +
  3 +Ensure that
  4 +- doc and docx are read without error
  5 +- garbage returns error return status
  6 +- dde-links are found where appropriate
  7 +"""
  8 +
  9 +from __future__ import print_function
  10 +
  11 +import unittest
  12 +from oletools import msodde
  13 +import shlex
  14 +from os.path import join, dirname, normpath
  15 +import sys
  16 +
  17 +# python 2/3 version conflict:
  18 +if sys.version_info.major <= 2:
  19 + from StringIO import StringIO
  20 + #from io import BytesIO as StringIO - try if print() gives UnicodeError
  21 +else:
  22 + from io import StringIO
  23 +
  24 +
  25 +# base directory for test input
  26 +BASE_DIR = normpath(join(dirname(__file__), '..', 'test-data'))
  27 +
  28 +
  29 +class TestReturnCode(unittest.TestCase):
  30 +
  31 + def test_valid_doc(self):
  32 + """ check that a valid doc file leads to 0 exit status """
  33 + print(join(BASE_DIR, 'msodde-doc/test_document.doc'))
  34 + self.do_test_validity(join(BASE_DIR, 'msodde-doc/test_document.doc'))
  35 +
  36 + def test_valid_docx(self):
  37 + """ check that a valid docx file leads to 0 exit status """
  38 + self.do_test_validity(join(BASE_DIR, 'msodde-doc/test_document.docx'))
  39 +
  40 + def test_invalid_none(self):
  41 + """ check that no file argument leads to non-zero exit status """
  42 + self.do_test_validity('', True)
  43 +
  44 + def test_invalid_empty(self):
  45 + """ check that empty file argument leads to non-zero exit status """
  46 + self.do_test_validity(join(BASE_DIR, 'basic/empty'), True)
  47 +
  48 + def test_invalid_text(self):
  49 + """ check that text file argument leads to non-zero exit status """
  50 + self.do_test_validity(join(BASE_DIR, 'basic/text'), True)
  51 +
  52 + def do_test_validity(self, args, expect_error=False):
  53 + """ helper for test_valid_doc[x] """
  54 + args = shlex.split(args)
  55 + return_code = -1
  56 + have_exception = False
  57 + try:
  58 + return_code = msodde.main(args)
  59 + except Exception:
  60 + have_exception = True
  61 + except SystemExit as se: # sys.exit() was called
  62 + return_code = se.code
  63 + if se.code is None:
  64 + return_code = 0
  65 +
  66 + self.assertEqual(expect_error, have_exception or (return_code != 0))
  67 +
  68 +
  69 +class OutputCapture:
  70 + """ context manager that captures stdout """
  71 +
  72 + def __init__(self):
  73 + self.output = StringIO() # in py2, this actually is BytesIO
  74 +
  75 + def __enter__(self):
  76 + sys.stdout = self.output
  77 + return self
  78 +
  79 + def __exit__(self, exc_type, exc_value, traceback):
  80 + sys.stdout = sys.__stdout__ # re-set to original
  81 +
  82 + if exc_type: # there has been an error
  83 + print('Got error during output capture!')
  84 + print('Print captured output and re-raise:')
  85 + for line in self.output.getvalue().splitlines():
  86 + print(line.rstrip()) # print output before re-raising
  87 +
  88 + def __iter__(self):
  89 + for line in self.output.getvalue().splitlines():
  90 + yield line.rstrip() # remove newline at end of line
  91 +
  92 +
  93 +class TestDdeInDoc(unittest.TestCase):
  94 +
  95 + def test_with_dde(self):
  96 + """ check that dde links appear on stdout """
  97 + with OutputCapture() as capturer:
  98 + msodde.main([join(BASE_DIR, 'msodde-doc', 'dde-test.doc')])
  99 +
  100 + for line in capturer:
  101 + print(line)
  102 + pass # we just want to get the last line
  103 +
  104 + self.assertNotEqual(len(line.strip()), 0)
  105 +
  106 + def test_no_dde(self):
  107 + """ check that no dde links appear on stdout """
  108 + with OutputCapture() as capturer:
  109 + msodde.main([join(BASE_DIR, 'msodde-doc', 'test_document.doc')])
  110 +
  111 + for line in capturer:
  112 + print(line)
  113 + pass # we just want to get the last line
  114 +
  115 + self.assertEqual(line.strip(), '')
  116 +
  117 +
  118 +if __name__ == '__main__':
  119 + unittest.main()
tests/test-data/basic/empty 0 → 100644
tests/test-data/basic/text 0 → 100644
  1 +bla
tests/test-data/msodde-doc/dde-test.doc 0 → 100644
No preview for this file type
tests/test-data/msodde-doc/test_document.doc 0 → 100644
No preview for this file type
tests/test-data/msodde-doc/test_document.docx 0 → 100644
No preview for this file type
tests/unittest_template.py 0 → 100644
  1 +""" Test my new feature
  2 +
  3 +Some more info if you want
  4 +
  5 +Should work with python2 and python3!
  6 +"""
  7 +
  8 +import unittest
  9 +
  10 +# if you need data from oletools/test-data/DIR/, uncomment these lines:
  11 +#from os.path import join, dirname, normpath
  12 +#Directory with test data, independent of current working directory
  13 +#DATA_DIR = normpath(join(dirname(__file__), '..', 'test-data', 'DIR'))
  14 +
  15 +
  16 +class TestMyFeature(unittest.TestCase):
  17 + """ Tests my cool new feature """
  18 +
  19 + def test_this(self):
  20 + """ check that this works """
  21 + pass # your code here
  22 +
  23 + def test_that(self):
  24 + """ check that that also works """
  25 + pass # your code here
  26 +
  27 + def helper_function(self, filename):
  28 + """ to be called from other test functions to avoid copy-and-paste
  29 +
  30 + this is not called by unittest directly, only from your functions """
  31 + pass # your code here
  32 + # e.g.: msodde.main(join(DATA_DIR, filename))
  33 +
  34 +
  35 +# just in case somebody calls this file as a script
  36 +if __name__ == '__main__':
  37 + unittest.main()