diff --git a/.travis.yml b/.travis.yml index de2b1f7..a0a8f50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,5 +17,8 @@ matrix: - python: pypy - python: pypy3 +install: + - pip install msoffcrypto-tool + script: - python setup.py test diff --git a/oletools/common/errors.py b/oletools/common/errors.py index 4ee4cb1..d16cd55 100644 --- a/oletools/common/errors.py +++ b/oletools/common/errors.py @@ -4,10 +4,42 @@ Errors used in several tools to avoid duplication .. codeauthor:: Intra2net AG """ -class FileIsEncryptedError(ValueError): +class CryptoErrorBase(ValueError): + """Base class for crypto-based exceptions.""" + pass + + +class CryptoLibNotImported(CryptoErrorBase, ImportError): + """Exception thrown if msoffcrypto is needed but could not be imported.""" + + def __init__(self): + super(CryptoLibNotImported, self).__init__( + 'msoffcrypto-tools could not be imported') + + +class UnsupportedEncryptionError(CryptoErrorBase): """Exception thrown if file is encrypted and cannot deal with it.""" - # see also: same class in olevba[3] and record_base def __init__(self, filename=None): - super(FileIsEncryptedError, self).__init__( + super(UnsupportedEncryptionError, self).__init__( 'Office file {}is encrypted, not yet supported' .format('' if filename is None else filename + ' ')) + + +class WrongEncryptionPassword(CryptoErrorBase): + """Exception thrown if encryption could be handled but passwords wrong.""" + def __init__(self, filename=None): + super(WrongEncryptionPassword, self).__init__( + 'Given passwords could not decrypt office file{}' + .format('' if filename is None else ' ' + filename)) + + +class MaxCryptoNestingReached(CryptoErrorBase): + """ + Exception thrown if decryption is too deeply layered. + + (...or decrypt code creates inf loop) + """ + def __init__(self, n_layers, filename=None): + super(MaxCryptoNestingReached, self).__init__( + 'Encountered more than {} layers of encryption for office file{}' + .format(n_layers, '' if filename is None else ' ' + filename)) diff --git a/oletools/crypto.py b/oletools/crypto.py new file mode 100644 index 0000000..c265bcf --- /dev/null +++ b/oletools/crypto.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python +""" +crypto.py + +Module to be used by other scripts and modules in oletools, that provides +information on encryption in OLE files. + +Uses :py:mod:`msoffcrypto-tool` to decrypt if it is available. Otherwise +decryption will fail with an ImportError. + +Encryption/Write-Protection can be realized in many different ways. They range +from setting a single flag in an otherwise unprotected file to embedding a +regular file (e.g. xlsx) in an EncryptedStream inside an OLE file. That means +that (1) that lots of bad things are accesible even if no encryption password +is known, and (2) even basic attributes like the file type can change by +decryption. Therefore I suggest the following general routine to deal with +potentially encrypted files:: + + def script_main_function(input_file, passwords, crypto_nesting=0, args): + '''Wrapper around main function to deal with encrypted files.''' + initial_stuff(input_file, args) + result = None + try: + result = do_your_thing_assuming_no_encryption(input_file) + if not crypto.is_encrypted(input_file): + return result + except Exception: + if not crypto.is_encrypted(input_file): + raise + # we reach this point only if file is encrypted + # check if this is an encrypted file in an encrypted file in an ... + if crypto_nesting >= crypto.MAX_NESTING_DEPTH: + raise crypto.MaxCryptoNestingReached(crypto_nesting, filename) + decrypted_file = None + try: + decrypted_file = crypto.decrypt(input_file, passwords) + # might still be encrypted, so call this again recursively + result = script_main_function(decrypted_file, passwords, + crypto_nesting+1, args) + except Exception: + raise + finally: # clean up + try: # (maybe file was not yet created) + os.unlink(decrypted_file) + except Exception: + pass + +(Realized e.g. in :py:mod:`oletools.msodde`). +That means that caller code needs another wrapper around its main function. I +did try it another way first (a transparent on-demand unencrypt) but for the +above reasons I believe this is the better way. Also, non-top-level-code can +just assume that it works on unencrypted data and fail with an exception if +encrypted data makes its work impossible. No need to check `if is_encrypted()` +at the start of functions. + +.. seealso:: [MS-OFFCRYPTO] +.. seealso:: https://github.com/nolze/msoffcrypto-tool + +crypto is part of the python-oletools package: +http://www.decalage.info/python/oletools +""" + +# === LICENSE ================================================================= + +# crypto is copyright (c) 2014-2019 Philippe Lagadec (http://www.decalage.info) +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +# ----------------------------------------------------------------------------- +# CHANGELOG: +# 2019-02-14 v0.01 CH: - first version with encryption check from oleid + +__version__ = '0.01' + +import sys +import struct +import os +from os.path import splitext, isfile +from tempfile import mkstemp +import zipfile +from oletools.common.errors import CryptoErrorBase, WrongEncryptionPassword, \ + UnsupportedEncryptionError, MaxCryptoNestingReached, CryptoLibNotImported +from olefile import OleFileIO + +try: + import msoffcrypto +except ImportError: + msoffcrypto = None + + +#: if there is an encrypted file embedded in an encrypted file, +#: how deep down do we go +MAX_NESTING_DEPTH = 10 + + +def is_encrypted(some_file): + """ + Determine whether document contains encrypted content. + + This should return False for documents that are just write-protected or + signed or finalized. It should return True if ANY content of the file is + encrypted and can therefore not be analyzed by other oletools modules + without given a password. + + Exception: there are way to write-protect an office document by embedding + it as encrypted stream with hard-coded standard password into an otherwise + empty OLE file. From an office user point of view, this is no encryption, + but regarding file structure this is encryption, so we return `True` for + these. + + This should not raise exceptions needlessly. + + This implementation is rather simple: it returns True if the file contains + streams with typical encryption names (c.f. [MS-OFFCRYPTO]). It does not + test whether these streams actually contain data or whether the ole file + structure contains the necessary references to these. It also checks the + "well-known property" PIDSI_DOC_SECURITY if the SummaryInformation stream + is accessible (c.f. [MS-OLEPS] 2.25.1) + + :param some_file: File name or an opened OleFileIO + :type some_file: :py:class:`olefile.OleFileIO` or `str` + :returns: True if (and only if) the file contains encrypted content + """ + if not isinstance(some_file, str): + return is_encrypted_ole(some_file) # assume it is OleFileIO + if zipfile.is_zipfile(some_file): + return is_encrypted_zip(some_file) + # otherwise assume it is the name of an ole file + return is_encrypted_ole(OleFileIO(some_file)) + + +def is_encrypted_zip(filename): + """Specialization of :py:func:`is_encrypted` for zip-based files.""" + # try to decrypt a few bytes from first entry + with zipfile.ZipFile(filename, 'r') as zipper: + first_entry = zipper.infolist()[0] + try: + with zipper.open(first_entry, 'r') as reader: + reader.read(min(16, first_entry.file_size)) + return False + except RuntimeError as rt_err: + return 'crypt' in str(rt_err) + + +def is_encrypted_ole(ole): + """Specialization of :py:func:`is_encrypted` for ole files.""" + # check well known property for password protection + # (this field may be missing for Powerpoint2000, for example) + # TODO: check whether password protection always implies encryption. Could + # write-protection or signing with password trigger this as well? + if ole.exists("\x05SummaryInformation"): + suminfo_data = ole.getproperties("\x05SummaryInformation") + if 0x13 in suminfo_data and (suminfo_data[0x13] & 1): + return True + + # check a few stream names + # TODO: check whether these actually contain data and whether other + # necessary properties exist / are set + elif ole.exists('EncryptionInfo'): + return True + # or an encrypted ppt file + elif ole.exists('EncryptedSummary') and \ + not ole.exists('SummaryInformation'): + return True + + # Word-specific old encryption: + if ole.exists('WordDocument'): + # check for Word-specific encryption flag: + stream = None + try: + stream = ole.openstream(["WordDocument"]) + # pass header 10 bytes + stream.read(10) + # read flag structure: + temp16 = struct.unpack("H", stream.read(2))[0] + f_encrypted = (temp16 & 0x0100) >> 8 + if f_encrypted: + return True + except Exception: + raise + finally: + if stream is not None: + stream.close() + + # no indication of encryption + return False + + +#: one way to achieve "write protection" in office files is to encrypt the file +#: using this password +WRITE_PROTECT_ENCRYPTION_PASSWORD = 'VelvetSweatshop' + + +def _check_msoffcrypto(): + """Raise a :py:class:`CryptoLibNotImported` if msoffcrypto not imported.""" + if msoffcrypto is None: + raise CryptoLibNotImported() + + +def check_msoffcrypto(): + """Return `True` iff :py:mod:`msoffcrypto` could be imported.""" + return msoffcrypto is not None + + +def decrypt(filename, passwords=None, **temp_file_args): + """ + Try to decrypt an encrypted file + + This function tries to decrypt the given file using a given set of + passwords. If no password is given, tries the standard password for write + protection. Creates a file with decrypted data whose file name is returned. + If the decryption fails, None is returned. + + :param str filename: path to an ole file on disc + :param passwords: list/set/tuple/... of passwords or a single password or + None + :type passwords: iterable or str or None + :param temp_file_args: arguments for :py:func:`tempfile.mkstemp` e.g., + `dirname` or `prefix`. `suffix` will default to + suffix of input `filename`, `prefix` defaults to + `oletools-decrypt-`; `text` will be ignored + :returns: name of the decrypted temporary file. + :raises: :py:class:`ImportError` if :py:mod:`msoffcrypto-tools` not found + :raises: :py:class:`ValueError` if the given file is not encrypted + """ + _check_msoffcrypto() + + # normalize password so we always have a list/tuple + if isinstance(passwords, str): + passwords = (passwords, ) + elif not passwords: + passwords = (WRITE_PROTECT_ENCRYPTION_PASSWORD, ) + + # check temp file args + if 'prefix' not in temp_file_args: + temp_file_args['prefix'] = 'oletools-decrypt-' + if 'suffix' not in temp_file_args: + temp_file_args['suffix'] = splitext(filename)[1] + temp_file_args['text'] = False + + decrypt_file = None + with open(filename, 'rb') as reader: + try: + crypto_file = msoffcrypto.OfficeFile(reader) + except Exception as exc: # e.g. ppt, not yet supported by msoffcrypto + if 'Unrecognized file format' in str(exc): + # raise different exception without stack trace of original exc + if sys.version_info.major == 2: + raise UnsupportedEncryptionError(filename) + else: + # this is a syntax error in python 2, so wrap it in exec() + exec('raise UnsupportedEncryptionError(filename) from None') + else: + raise + if not crypto_file.is_encrypted(): + raise ValueError('Given input file {} is not encrypted!' + .format(filename)) + + for password in passwords: + write_descriptor = None + write_handle = None + decrypt_file = None + try: + crypto_file.load_key(password=password) + + # create temp file + write_descriptor, decrypt_file = mkstemp(**temp_file_args) + write_handle = os.fdopen(write_descriptor, 'wb') + write_descriptor = None # is now handled via write_handle + crypto_file.decrypt(write_handle) + + # decryption was successfull; clean up and return + write_handle.close() + write_handle = None + break + except Exception: + # error-clean up: close everything and del temp file + if write_handle: + write_handle.close() + elif write_descriptor: + os.close(write_descriptor) + if decrypt_file and isfile(decrypt_file): + os.unlink(decrypt_file) + decrypt_file = None + # if we reach this, all passwords were tried without success + return decrypt_file diff --git a/oletools/msodde.py b/oletools/msodde.py index b59b77e..10673c1 100644 --- a/oletools/msodde.py +++ b/oletools/msodde.py @@ -11,7 +11,6 @@ Supported formats: - RTF - CSV (exported from / imported into Excel) - XML (exported from Word 2003, Word 2007+, Excel 2003, (Excel 2007+?) -- raises an error if run with files encrypted using MS Crypto API RC4 Author: Philippe Lagadec - http://www.decalage.info License: BSD, see source code or documentation @@ -52,7 +51,6 @@ from __future__ import print_function import argparse import os -from os.path import abspath, dirname import sys import re import csv @@ -62,9 +60,9 @@ import olefile from oletools import ooxml from oletools import xls_parser from oletools import rtfobj -from oletools import oleid +from oletools.ppt_record_parser import is_ppt +from oletools import crypto from oletools.common.log_helper import log_helper -from oletools.common.errors import FileIsEncryptedError # ----------------------------------------------------------------------------- # CHANGELOG: @@ -305,6 +303,9 @@ def process_args(cmd_line_args=None): default=DEFAULT_LOG_LEVEL, help="logging level debug/info/warning/error/critical " "(default=%(default)s)") + parser.add_argument("-p", "--password", type=str, action='append', + help='if encrypted office files are encountered, try ' + 'decryption with this password. May be repeated.') filter_group = parser.add_argument_group( title='Filter which OpenXML field commands are returned', description='Only applies to OpenXML (e.g. docx) and rtf, not to OLE ' @@ -352,10 +353,9 @@ def process_doc_field(data): if data.lstrip().lower().startswith(u'dde'): return data - elif data.lstrip().lower().startswith(u'\x00d\x00d\x00e\x00'): + if data.lstrip().lower().startswith(u'\x00d\x00d\x00e\x00'): return data - else: - return u'' + return u'' OLE_FIELD_START = 0x13 @@ -379,7 +379,7 @@ def process_doc_stream(stream): while True: idx += 1 char = stream.read(1) # loop over every single byte - if len(char) == 0: + if len(char) == 0: # pylint: disable=len-as-condition break else: char = ord(char) @@ -417,7 +417,7 @@ def process_doc_stream(stream): pass elif len(field_contents) > OLE_FIELD_MAX_SIZE: logger.debug('field exceeds max size of {0}. Ignore rest' - .format(OLE_FIELD_MAX_SIZE)) + .format(OLE_FIELD_MAX_SIZE)) max_size_exceeded = True # appending a raw byte to a unicode string here. Not clean but @@ -437,7 +437,7 @@ def process_doc_stream(stream): logger.debug('big field was not a field after all') logger.debug('Checked {0} characters, found {1} fields' - .format(idx, len(result_parts))) + .format(idx, len(result_parts))) return result_parts @@ -462,11 +462,10 @@ def process_doc(ole): direntry = ole._load_direntry(sid) is_stream = direntry.entry_type == olefile.STGTY_STREAM logger.debug('direntry {:2d} {}: {}' - .format(sid, '[orphan]' if is_orphan else direntry.name, - 'is stream of size {}'.format(direntry.size) - if is_stream else - 'no stream ({})' - .format(direntry.entry_type))) + .format(sid, '[orphan]' if is_orphan else direntry.name, + 'is stream of size {}'.format(direntry.size) + if is_stream else + 'no stream ({})'.format(direntry.entry_type))) if is_stream: new_parts = process_doc_stream( ole._open(direntry.isectStart, direntry.size)) @@ -525,7 +524,8 @@ def process_docx(filepath, field_filter_mode=None): else: elem = curr_elem if elem is None: - raise BadOOXML(filepath, 'Got "None"-Element from iter_xml') + raise ooxml.BadOOXML(filepath, + 'Got "None"-Element from iter_xml') # check if FLDCHARTYPE and whether "begin" or "end" tag attrib_type = elem.attrib.get(ATTR_W_FLDCHARTYPE[0]) or \ @@ -535,7 +535,7 @@ def process_docx(filepath, field_filter_mode=None): level += 1 if attrib_type == "end": level -= 1 - if level == 0 or level == -1: # edge-case; level gets -1 + if level in (0, -1): # edge-case; level gets -1 all_fields.append(ddetext) ddetext = u'' level = 0 # reset edge-case @@ -564,6 +564,7 @@ def process_docx(filepath, field_filter_mode=None): def unquote(field): + """TODO: document what exactly is happening here...""" if "QUOTE" not in field or NO_QUOTES: return field # split into components @@ -606,7 +607,7 @@ def field_is_blacklisted(contents): except ValueError: # first word is no blacklisted command return False logger.debug('trying to match "{0}" to blacklist command {1}' - .format(contents, FIELD_BLACKLIST[index])) + .format(contents, FIELD_BLACKLIST[index])) _, nargs_required, nargs_optional, sw_with_arg, sw_solo, sw_format \ = FIELD_BLACKLIST[index] @@ -618,11 +619,12 @@ def field_is_blacklisted(contents): nargs += 1 if nargs < nargs_required: logger.debug('too few args: found {0}, but need at least {1} in "{2}"' - .format(nargs, nargs_required, contents)) + .format(nargs, nargs_required, contents)) return False - elif nargs > nargs_required + nargs_optional: - logger.debug('too many args: found {0}, but need at most {1}+{2} in "{3}"' - .format(nargs, nargs_required, nargs_optional, contents)) + if nargs > nargs_required + nargs_optional: + logger.debug('too many args: found {0}, but need at most {1}+{2} in ' + '"{3}"' + .format(nargs, nargs_required, nargs_optional, contents)) return False # check switches @@ -632,14 +634,14 @@ def field_is_blacklisted(contents): if expect_arg: # this is an argument for the last switch if arg_choices and (word not in arg_choices): logger.debug('Found invalid switch argument "{0}" in "{1}"' - .format(word, contents)) + .format(word, contents)) return False expect_arg = False arg_choices = [] # in general, do not enforce choices continue # "no further questions, your honor" elif not FIELD_SWITCH_REGEX.match(word): logger.debug('expected switch, found "{0}" in "{1}"' - .format(word, contents)) + .format(word, contents)) return False # we want a switch and we got a valid one switch = word[1] @@ -661,7 +663,7 @@ def field_is_blacklisted(contents): arg_choices = [] # too many choices to list them here else: logger.debug('unexpected switch {0} in "{1}"' - .format(switch, contents)) + .format(switch, contents)) return False # if nothing went wrong sofar, the contents seems to match the blacklist @@ -676,7 +678,7 @@ def process_xlsx(filepath): tag = elem.tag.lower() if tag == 'ddelink' or tag.endswith('}ddelink'): # we have found a dde link. Try to get more info about it - link_info = ['DDE-Link'] + link_info = [] if 'ddeService' in elem.attrib: link_info.append(elem.attrib['ddeService']) if 'ddeTopic' in elem.attrib: @@ -687,16 +689,15 @@ def process_xlsx(filepath): for subfile, content_type, handle in parser.iter_non_xml(): try: logger.info('Parsing non-xml subfile {0} with content type {1}' - .format(subfile, content_type)) + .format(subfile, content_type)) for record in xls_parser.parse_xlsb_part(handle, content_type, subfile): logger.debug('{0}: {1}'.format(subfile, record)) if isinstance(record, xls_parser.XlsbBeginSupBook) and \ record.link_type == \ xls_parser.XlsbBeginSupBook.LINK_TYPE_DDE: - dde_links.append('DDE-Link ' + record.string1 + ' ' + - record.string2) - except Exception: + dde_links.append(record.string1 + ' ' + record.string2) + except Exception as exc: if content_type.startswith('application/vnd.ms-excel.') or \ content_type.startswith('application/vnd.ms-office.'): # pylint: disable=bad-indentation # should really be able to parse these either as xml or records @@ -727,7 +728,8 @@ class RtfFieldParser(rtfobj.RtfParser): def open_destination(self, destination): if destination.cword == b'fldinst': - logger.debug('*** Start field data at index %Xh' % destination.start) + logger.debug('*** Start field data at index %Xh' + % destination.start) def close_destination(self, destination): if destination.cword == b'fldinst': @@ -758,7 +760,7 @@ def process_rtf(file_handle, field_filter_mode=None): all_fields = [field.decode('ascii') for field in rtfparser.fields] # apply field command filter logger.debug('found {1} fields, filtering with mode "{0}"' - .format(field_filter_mode, len(all_fields))) + .format(field_filter_mode, len(all_fields))) if field_filter_mode in (FIELD_FILTER_ALL, None): clean_fields = all_fields elif field_filter_mode == FIELD_FILTER_DDE: @@ -815,11 +817,12 @@ def process_csv(filepath): results, _ = process_csv_dialect(file_handle, delim) except csv.Error: # e.g. sniffing fails logger.debug('failed to csv-parse with delimiter {0!r}' - .format(delim)) + .format(delim)) if is_small and not results: # try whole file as single cell, since sniffing fails in this case - logger.debug('last attempt: take whole file as single unquoted cell') + logger.debug('last attempt: take whole file as single unquoted ' + 'cell') file_handle.seek(0) match = CSV_DDE_FORMAT.match(file_handle.read(CSV_SMALL_THRESH)) if match: @@ -836,8 +839,8 @@ def process_csv_dialect(file_handle, delimiters): delimiters=delimiters) dialect.strict = False # microsoft is never strict logger.debug('sniffed csv dialect with delimiter {0!r} ' - 'and quote char {1!r}' - .format(dialect.delimiter, dialect.quotechar)) + 'and quote char {1!r}' + .format(dialect.delimiter, dialect.quotechar)) # rewind file handle to start file_handle.seek(0) @@ -892,19 +895,12 @@ def process_file(filepath, field_filter_mode=None): logger.debug('Process file as excel 2003 (xls)') return process_xls(filepath) - # encrypted files also look like ole, even if office 2007+ (xml-based) - # so check for encryption, first ole = olefile.OleFileIO(filepath, path_encoding=None) - oid = oleid.OleID(ole) - if oid.check_encrypted().value: - log.debug('is encrypted - raise error') - raise FileIsEncryptedError(filepath) - elif oid.check_powerpoint().value: - log.debug('is ppt - cannot have DDE') + if is_ppt(ole): + logger.debug('is ppt - cannot have DDE') return u'' - else: - logger.debug('Process file as word 2003 (doc)') - return process_doc(ole) + logger.debug('Process file as word 2003 (doc)') + return process_doc(ole) with open(filepath, 'rb') as file_handle: if file_handle.read(4) == RTF_START: @@ -921,22 +917,73 @@ def process_file(filepath, field_filter_mode=None): if doctype == ooxml.DOCTYPE_EXCEL: logger.debug('Process file as excel 2007+ (xlsx)') return process_xlsx(filepath) - elif doctype in (ooxml.DOCTYPE_EXCEL_XML, ooxml.DOCTYPE_EXCEL_XML2003): + if doctype in (ooxml.DOCTYPE_EXCEL_XML, ooxml.DOCTYPE_EXCEL_XML2003): logger.debug('Process file as xml from excel 2003/2007+') return process_excel_xml(filepath) - elif doctype in (ooxml.DOCTYPE_WORD_XML, ooxml.DOCTYPE_WORD_XML2003): + if doctype in (ooxml.DOCTYPE_WORD_XML, ooxml.DOCTYPE_WORD_XML2003): logger.debug('Process file as xml from word 2003/2007+') return process_docx(filepath) - elif doctype is None: + if doctype is None: logger.debug('Process file as csv') return process_csv(filepath) - else: # could be docx; if not: this is the old default code path - logger.debug('Process file as word 2007+ (docx)') - return process_docx(filepath, field_filter_mode) + # could be docx; if not: this is the old default code path + logger.debug('Process file as word 2007+ (docx)') + return process_docx(filepath, field_filter_mode) # === MAIN ================================================================= + +def process_maybe_encrypted(filepath, passwords=None, crypto_nesting=0, + **kwargs): + """ + Process a file that might be encrypted. + + Calls :py:func:`process_file` and if that fails tries to decrypt and + process the result. Based on recommendation in module doc string of + :py:mod:`oletools.crypto`. + + :param str filepath: path to file on disc. + :param passwords: list of passwords (str) to try for decryption or None + :param int crypto_nesting: How many decryption layers were already used to + get the given file. + :param kwargs: same as :py:func:`process_file` + :returns: same as :py:func:`process_file` + """ + result = u'' + try: + result = process_file(filepath, **kwargs) + if not crypto.is_encrypted(filepath): + return result + except Exception: + if not crypto.is_encrypted(filepath): + raise + + # we reach this point only if file is encrypted + # check if this is an encrypted file in an encrypted file in an ... + if crypto_nesting >= crypto.MAX_NESTING_DEPTH: + raise crypto.MaxCryptoNestingReached(crypto_nesting, filepath) + + decrypted_file = None + if passwords is None: + passwords = [crypto.WRITE_PROTECT_ENCRYPTION_PASSWORD, ] + else: + passwords = list(passwords) + \ + [crypto.WRITE_PROTECT_ENCRYPTION_PASSWORD, ] + try: + logger.debug('Trying to decrypt file') + decrypted_file = crypto.decrypt(filepath, passwords) + logger.info('Analyze decrypted file') + result = process_maybe_encrypted(decrypted_file, passwords, + crypto_nesting+1, **kwargs) + finally: # clean up + try: # (maybe file was not yet created) + os.unlink(decrypted_file) + except Exception: + pass + return result + + def main(cmd_line_args=None): """ Main function, called if this file is called as a script @@ -961,10 +1008,12 @@ def main(cmd_line_args=None): text = '' return_code = 1 try: - text = process_file(args.filepath, args.field_filter_mode) + text = process_maybe_encrypted( + args.filepath, args.password, + field_filter_mode=args.field_filter_mode) return_code = 0 except Exception as exc: - logger.exception(exc.message) + logger.exception(str(exc)) logger.print_str('DDE Links:') logger.print_str(text) diff --git a/oletools/oleid.py b/oletools/oleid.py index ec0e237..fd9eff9 100644 --- a/oletools/oleid.py +++ b/oletools/oleid.py @@ -93,6 +93,7 @@ except ImportError: sys.path.insert(0, PARENT_DIR) del PARENT_DIR from oletools.thirdparty.prettytable import prettytable +from oletools import crypto import olefile @@ -279,20 +280,7 @@ class OleID(object): self.indicators.append(encrypted) if not self.ole: return None - # check if bit 1 of security field = 1: - # (this field may be missing for Powerpoint2000, for example) - if self.suminfo_data is None: - self.check_properties() - if 0x13 in self.suminfo_data: - if self.suminfo_data[0x13] & 1: - encrypted.value = True - # check if this is an OpenXML encrypted file - elif self.ole.exists('EncryptionInfo'): - encrypted.value = True - # or an encrypted ppt file - if self.ole.exists('EncryptedSummary') and \ - not self.ole.exists('SummaryInformation'): - encrypted.value = True + encrypted.value = crypto.is_encrypted(self.ole) return encrypted def check_word(self): @@ -316,27 +304,7 @@ class OleID(object): return None, None if self.ole.exists('WordDocument'): word.value = True - # check for Word-specific encryption flag: - stream = None - try: - stream = self.ole.openstream(["WordDocument"]) - # pass header 10 bytes - stream.read(10) - # read flag structure: - temp16 = struct.unpack("H", stream.read(2))[0] - f_encrypted = (temp16 & 0x0100) >> 8 - if f_encrypted: - # correct encrypted indicator if present or add one - encrypt_ind = self.get_indicator('encrypted') - if encrypt_ind: - encrypt_ind.value = True - else: - self.indicators.append('encrypted', True, name='Encrypted') - except Exception: - raise - finally: - if stream is not None: - stream.close() + # check for VBA macros: if self.ole.exists('Macros'): macros.value = True diff --git a/oletools/olevba.py b/oletools/olevba.py index ef82189..2877662 100644 --- a/oletools/olevba.py +++ b/oletools/olevba.py @@ -312,8 +312,7 @@ from pyparsing import \ from oletools import ppt_parser from oletools import oleform from oletools import rtfobj -from oletools import oleid -from oletools.common.errors import FileIsEncryptedError +from oletools import crypto from oletools.common import codepages # monkeypatch email to fix issue #32: @@ -2585,12 +2584,6 @@ class VBA_Parser(object): # This looks like an OLE file self.open_ole(_file) - # check whether file is encrypted (need to do this before try ppt) - log.debug('Check encryption of ole file') - crypt_indicator = oleid.OleID(self.ole_file).check_encrypted() - if crypt_indicator.value: - raise FileIsEncryptedError(filename) - # if this worked, try whether it is a ppt file (special ole file) self.open_ppt() if self.type is None and zipfile.is_zipfile(_file): @@ -3741,6 +3734,10 @@ def parse_args(cmd_line_args=None): help='find files recursively in subdirectories.') parser.add_option("-z", "--zip", dest='zip_password', type='str', default=None, help='if the file is a zip archive, open all files from it, using the provided password.') + parser.add_option("-p", "--password", type='str', action='append', + default=[], + help='if encrypted office files are encountered, try ' + 'decryption with this password. May be repeated.') parser.add_option("-f", "--zipfname", dest='zip_fname', type='str', default='*', help='if the file is a zip archive, file(s) to be opened within the zip. Wildcards * and ? are supported. (default:*)') # output mode; could make this even simpler with add_option(type='choice') but that would make @@ -3790,6 +3787,106 @@ def parse_args(cmd_line_args=None): return options, args +def process_file(filename, data, container, options, crypto_nesting=0): + """ + Part of main function that processes a single file. + + This handles exceptions and encryption. + + Returns a single code summarizing the status of processing of this file + """ + try: + # Open the file + vba_parser = VBA_Parser_CLI(filename, data=data, container=container, + relaxed=options.relaxed) + + if options.output_mode == 'detailed': + # fully detailed output + vba_parser.process_file(show_decoded_strings=options.show_decoded_strings, + display_code=options.display_code, + hide_attributes=options.hide_attributes, vba_code_only=options.vba_code_only, + show_deobfuscated_code=options.show_deobfuscated_code, + deobfuscate=options.deobfuscate) + elif options.output_mode == 'triage': + # summarized output for triage: + vba_parser.process_file_triage(show_decoded_strings=options.show_decoded_strings, + deobfuscate=options.deobfuscate) + elif options.output_mode == 'json': + print_json( + vba_parser.process_file_json(show_decoded_strings=options.show_decoded_strings, + display_code=options.display_code, + hide_attributes=options.hide_attributes, vba_code_only=options.vba_code_only, + show_deobfuscated_code=options.show_deobfuscated_code, + deobfuscate=options.deobfuscate)) + else: # (should be impossible) + raise ValueError('unexpected output mode: "{0}"!'.format(options.output_mode)) + + # even if processing succeeds, file might still be encrypted + log.debug('Checking for encryption') + if not crypto.is_encrypted(filename): + return RETURN_OK + except Exception as exc: + log.debug('Checking for encryption') + if crypto.is_encrypted(filename): + pass # deal with this below + else: + if isinstance(exc, (SubstreamOpenError, UnexpectedDataError)): + if options.output_mode in ('triage', 'unspecified'): + print('%-12s %s - Error opening substream or uenxpected ' \ + 'content' % ('?', filename)) + elif options.output_mode == 'json': + print_json(file=filename, type='error', + error=type(exc).__name__, message=str(exc)) + else: + log.exception('Error opening substream or unexpected ' + 'content in %s' % filename) + return RETURN_OPEN_ERROR + elif isinstance(exc, FileOpenError): + if options.output_mode in ('triage', 'unspecified'): + print('%-12s %s - File format not supported' % ('?', filename)) + elif options.output_mode == 'json': + print_json(file=filename, type='error', + error=type(exc).__name__, message=str(exc)) + else: + log.exception('Failed to open %s -- probably not supported!' % filename) + return RETURN_OPEN_ERROR + elif isinstance(exc, ProcessingError): + if options.output_mode in ('triage', 'unspecified'): + print('%-12s %s - %s' % ('!ERROR', filename, exc.orig_exc)) + elif options.output_mode == 'json': + print_json(file=filename, type='error', + error=type(exc).__name__, + message=str(exc.orig_exc)) + else: + log.exception('Error processing file %s (%s)!' + % (filename, exc.orig_exc)) + return RETURN_PARSE_ERROR + else: + raise # let caller deal with this + + # we reach this point only if file is encrypted + # check if this is an encrypted file in an encrypted file in an ... + if crypto_nesting >= crypto.MAX_NESTING_DEPTH: + raise crypto.MaxCryptoNestingReached(crypto_nesting, filename) + + decrypted_file = None + try: + log.debug('Checking encryption passwords {}'.format(options.password)) + passwords = options.password + \ + [crypto.WRITE_PROTECT_ENCRYPTION_PASSWORD, ] + decrypted_file = crypto.decrypt(filename, passwords) + if not decrypted_file: + raise crypto.WrongEncryptionPassword(filename) + log.info('Working on decrypted file') + return process_file(decrypted_file, data, container or filename, + options, crypto_nesting+1) + except Exception: + raise + finally: # clean up + if decrypted_file is not None and os.path.isfile(decrypted_file): + os.unlink(decrypted_file) + + def main(cmd_line_args=None): """ Main function, called when olevba is run from the command line @@ -3824,35 +3921,44 @@ def main(cmd_line_args=None): if options.output_mode == 'triage' and options.show_deobfuscated_code: log.info('ignoring option --reveal in triage output mode') - # Column headers (do not know how many files there will be yet, so if no output_mode - # was specified, we will print triage for first file --> need these headers) - if options.output_mode in ('triage', 'unspecified'): + # gather info on all files that must be processed + # ignore directory names stored in zip files: + all_input_info = tuple((container, filename, data) for + container, filename, data in xglob.iter_files( + args, recursive=options.recursive, + zip_password=options.zip_password, + zip_fname=options.zip_fname) + if not (container and filename.endswith('/'))) + + # specify output mode if options -t, -d and -j were not specified + if options.output_mode == 'unspecified': + if len(all_input_info) == 1: + options.output_mode = 'detailed' + else: + options.output_mode = 'triage' + + # Column headers for triage mode + if options.output_mode == 'triage': print('%-12s %-65s' % ('Flags', 'Filename')) print('%-12s %-65s' % ('-' * 11, '-' * 65)) previous_container = None count = 0 container = filename = data = None - vba_parser = None return_code = RETURN_OK try: - for container, filename, data in xglob.iter_files(args, recursive=options.recursive, - zip_password=options.zip_password, zip_fname=options.zip_fname): - # ignore directory names stored in zip files: - if container and filename.endswith('/'): - continue - + for container, filename, data in all_input_info: # handle errors from xglob if isinstance(data, Exception): if isinstance(data, PathNotFoundException): - if options.output_mode in ('triage', 'unspecified'): + if options.output_mode == 'triage': print('%-12s %s - File not found' % ('?', filename)) elif options.output_mode != 'json': log.error('Given path %r does not exist!' % filename) return_code = RETURN_FILE_NOT_FOUND if return_code == 0 \ else RETURN_SEVERAL_ERRS else: - if options.output_mode in ('triage', 'unspecified'): + if options.output_mode == 'triage': print('%-12s %s - Failed to read from zip file %s' % ('?', filename, container)) elif options.output_mode != 'json': log.error('Exception opening/reading %r from zip file %r: %s' @@ -3864,107 +3970,42 @@ def main(cmd_line_args=None): error=type(data).__name__, message=str(data)) continue - try: - # close the previous file if analyzing several: - # (this must be done here to avoid closing the file if there is only 1, - # to fix issue #219) - if vba_parser is not None: - vba_parser.close() - # Open the file - vba_parser = VBA_Parser_CLI(filename, data=data, container=container, - relaxed=options.relaxed) - - if options.output_mode == 'detailed': - # fully detailed output - vba_parser.process_file(show_decoded_strings=options.show_decoded_strings, - display_code=options.display_code, - hide_attributes=options.hide_attributes, vba_code_only=options.vba_code_only, - show_deobfuscated_code=options.show_deobfuscated_code, - deobfuscate=options.deobfuscate) - elif options.output_mode in ('triage', 'unspecified'): - # print container name when it changes: - if container != previous_container: - if container is not None: - print('\nFiles in %s:' % container) - previous_container = container - # summarized output for triage: - vba_parser.process_file_triage(show_decoded_strings=options.show_decoded_strings, - deobfuscate=options.deobfuscate) - elif options.output_mode == 'json': - print_json( - vba_parser.process_file_json(show_decoded_strings=options.show_decoded_strings, - display_code=options.display_code, - hide_attributes=options.hide_attributes, vba_code_only=options.vba_code_only, - show_deobfuscated_code=options.show_deobfuscated_code, - deobfuscate=options.deobfuscate)) - else: # (should be impossible) - raise ValueError('unexpected output mode: "{0}"!'.format(options.output_mode)) - count += 1 - - except (SubstreamOpenError, UnexpectedDataError) as exc: - if options.output_mode in ('triage', 'unspecified'): - print('%-12s %s - Error opening substream or uenxpected ' \ - 'content' % ('?', filename)) - elif options.output_mode == 'json': - print_json(file=filename, type='error', - error=type(exc).__name__, message=str(exc)) - else: - log.exception('Error opening substream or unexpected ' - 'content in %s' % filename) - return_code = RETURN_OPEN_ERROR if return_code == 0 \ - else RETURN_SEVERAL_ERRS - except FileOpenError as exc: - if options.output_mode in ('triage', 'unspecified'): - print('%-12s %s - File format not supported' % ('?', filename)) - elif options.output_mode == 'json': - print_json(file=filename, type='error', - error=type(exc).__name__, message=str(exc)) - else: - log.exception('Failed to open %s -- probably not supported!' % filename) - return_code = RETURN_OPEN_ERROR if return_code == 0 \ - else RETURN_SEVERAL_ERRS - except ProcessingError as exc: - if options.output_mode in ('triage', 'unspecified'): - print('%-12s %s - %s' % ('!ERROR', filename, exc.orig_exc)) - elif options.output_mode == 'json': - print_json(file=filename, type='error', - error=type(exc).__name__, - message=str(exc.orig_exc)) - else: - log.exception('Error processing file %s (%s)!' - % (filename, exc.orig_exc)) - return_code = RETURN_PARSE_ERROR if return_code == 0 \ - else RETURN_SEVERAL_ERRS - except FileIsEncryptedError as exc: - if options.output_mode in ('triage', 'unspecified'): - print('%-12s %s - File is encrypted' % ('!ERROR', filename)) - elif options.output_mode == 'json': - print_json(file=filename, type='error', - error=type(exc).__name__, message=str(exc)) - else: - log.exception('File %s is encrypted!' % (filename)) - return_code = RETURN_ENCRYPTED if return_code == 0 \ - else RETURN_SEVERAL_ERRS - # Here we do not close the vba_parser, because process_file may need it below. + if options.output_mode == 'triage': + # print container name when it changes: + if container != previous_container: + if container is not None: + print('\nFiles in %s:' % container) + previous_container = container + + # process the file, handling errors and encryption + curr_return_code = process_file(filename, data, container, options) + count += 1 + + # adjust overall return code + if curr_return_code == RETURN_OK: + continue # do not modify overall return code + if return_code == RETURN_OK: + return_code = curr_return_code # first error return code + else: + return_code = RETURN_SEVERAL_ERRS # several errors if options.output_mode == 'triage': print('\n(Flags: OpX=OpenXML, XML=Word2003XML, FlX=FlatOPC XML, MHT=MHTML, TXT=Text, M=Macros, ' \ 'A=Auto-executable, S=Suspicious keywords, I=IOCs, H=Hex strings, ' \ 'B=Base64 strings, D=Dridex strings, V=VBA strings, ?=Unknown)\n') - if count == 1 and options.output_mode == 'unspecified': - # if options -t, -d and -j were not specified and it's a single file, print details: - vba_parser.process_file(show_decoded_strings=options.show_decoded_strings, - display_code=options.display_code, - hide_attributes=options.hide_attributes, vba_code_only=options.vba_code_only, - show_deobfuscated_code=options.show_deobfuscated_code, - deobfuscate=options.deobfuscate) - if options.output_mode == 'json': # print last json entry (a last one without a comma) and closing ] print_json(type='MetaInformation', return_code=return_code, n_processed=count, _json_is_last=True) + except crypto.CryptoErrorBase as exc: + log.exception('Problems with encryption in main: {}'.format(exc), + exc_info=True) + if return_code == RETURN_OK: + return_code = RETURN_ENCRYPTED + else: + return_code == RETURN_SEVERAL_ERRS except Exception as exc: # some unexpected error, maybe some of the types caught in except clauses # above were not sufficient. This is very bad, so log complete trace at exception level diff --git a/oletools/ppt_record_parser.py b/oletools/ppt_record_parser.py index acdc0dd..430e766 100644 --- a/oletools/ppt_record_parser.py +++ b/oletools/ppt_record_parser.py @@ -63,7 +63,7 @@ except ImportError: sys.path.insert(0, PARENT_DIR) del PARENT_DIR from oletools import record_base -from oletools.common.errors import FileIsEncryptedError +from oletools.common.errors import CryptoErrorBase # types of relevant records (there are much more than listed here) @@ -149,6 +149,10 @@ def is_ppt(filename): Param filename can be anything that OleFileIO constructor accepts: name of file or file data or data stream. + Will not try to decrypt the file not even try to determine whether it is + encrypted. If the file is encrypted will either raise an error or just + return `False`. + see also: oleid.OleID.check_powerpoint """ have_current_user = False @@ -181,11 +185,8 @@ def is_ppt(filename): return True else: # ignore other streams/storages since they are optional continue - except FileIsEncryptedError: - assert ppt_file is not None, \ - 'Encryption error should not be raised from just opening OLE file.' - # just rely on stream names, copied from oleid - return ppt_file.exists('PowerPoint Document') + except CryptoErrorBase: + raise except Exception: pass return False diff --git a/oletools/record_base.py b/oletools/record_base.py index f30ef7f..6a41ae8 100644 --- a/oletools/record_base.py +++ b/oletools/record_base.py @@ -74,7 +74,6 @@ PARENT_DIR = os.path.normpath(os.path.dirname(os.path.dirname( if PARENT_DIR not in sys.path: sys.path.insert(0, PARENT_DIR) del PARENT_DIR -from oletools.common.errors import FileIsEncryptedError from oletools import oleid @@ -127,10 +126,9 @@ class OleRecordFile(olefile.OleFileIO): """ def open(self, filename, *args, **kwargs): - """Call OleFileIO.open, raise error if is encrypted.""" + """Call OleFileIO.open.""" #super(OleRecordFile, self).open(filename, *args, **kwargs) OleFileIO.open(self, filename, *args, **kwargs) - self.is_encrypted = oleid.OleID(self).check_encrypted().value @classmethod def stream_class_for_name(cls, stream_name): @@ -163,8 +161,7 @@ class OleRecordFile(olefile.OleFileIO): stream = clz(self._open(direntry.isectStart, direntry.size), direntry.size, None if is_orphan else direntry.name, - direntry.entry_type, - self.is_encrypted) + direntry.entry_type) yield stream stream.close() @@ -177,14 +174,13 @@ class OleRecordStream(object): abstract base class """ - def __init__(self, stream, size, name, stream_type, is_encrypted=False): + def __init__(self, stream, size, name, stream_type): self.stream = stream self.size = size self.name = name if stream_type not in ENTRY_TYPE2STR: raise ValueError('Unknown stream type: {0}'.format(stream_type)) self.stream_type = stream_type - self.is_encrypted = is_encrypted def read_record_head(self): """ read first few bytes of record to determine size and type @@ -213,9 +209,6 @@ class OleRecordStream(object): Stream must be positioned at start of records (e.g. start of stream). """ - if self.is_encrypted: - raise FileIsEncryptedError() - while True: # unpacking as in olevba._extract_vba pos = self.stream.tell() diff --git a/oletools/xls_parser.py b/oletools/xls_parser.py index e29aadd..6e6c20d 100644 --- a/oletools/xls_parser.py +++ b/oletools/xls_parser.py @@ -101,7 +101,7 @@ def read_unicode(data, start_idx, n_chars): """ read a unicode string from a XLUnicodeStringNoCch structure """ # first bit 0x0 --> only low-bytes are saved, all high bytes are 0 # first bit 0x1 --> 2 bytes per character - low_bytes_only = (ord(data[start_idx]) == 0) + low_bytes_only = (ord(data[start_idx:start_idx+1]) == 0) if low_bytes_only: end_idx = start_idx + 1 + n_chars return data[start_idx+1:end_idx].decode('ascii'), end_idx @@ -349,6 +349,7 @@ class XlsRecordSupBook(XlsRecord): LINK_TYPE_EXTERNAL = 'external workbook' def finish_constructing(self, _): + """Finish constructing this record; called at end of constructor.""" # set defaults self.ctab = None self.cch = None diff --git a/setup.py b/setup.py index ca22f2f..9e513f3 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ to install this package. # 2018-09-15 PL: - easygui is now a dependency # 2018-09-22 PL: - colorclass is now a dependency # 2018-10-27 PL: - fixed issue #359 (bug when importing log_helper) +# 2019-02-26 CH: - add optional dependency msoffcrypto for decryption #--- TODO --------------------------------------------------------------------- @@ -317,6 +318,10 @@ def main(): "easygui", 'colorclass', ], + extras_require = { + # msoffcrypto-tools by nolze can be used to decrypt some office files + 'decrypt': ['msoffcrypto'] + } ) diff --git a/tests/common/log_helper/test_log_helper.py b/tests/common/log_helper/test_log_helper.py index 03dee68..87f579d 100644 --- a/tests/common/log_helper/test_log_helper.py +++ b/tests/common/log_helper/test_log_helper.py @@ -13,9 +13,11 @@ from tests.common.log_helper import log_helper_test_main from tests.common.log_helper import log_helper_test_imported from os.path import dirname, join, relpath, abspath +from tests.test_utils import PROJECT_ROOT + # this is the common base of "tests" and "oletools" dirs -ROOT_DIRECTORY = abspath(join(__file__, '..', '..', '..', '..')) -TEST_FILE = relpath(join(dirname(__file__), 'log_helper_test_main.py'), ROOT_DIRECTORY) +TEST_FILE = relpath(join(dirname(abspath(__file__)), 'log_helper_test_main.py'), + PROJECT_ROOT) PYTHON_EXECUTABLE = sys.executable MAIN_LOG_MESSAGES = [ @@ -90,9 +92,9 @@ class TestLogHelper(unittest.TestCase): child = subprocess.Popen( [PYTHON_EXECUTABLE, TEST_FILE] + args, shell=False, - env={'PYTHONPATH': ROOT_DIRECTORY}, + env={'PYTHONPATH': PROJECT_ROOT}, universal_newlines=True, - cwd=ROOT_DIRECTORY, + cwd=PROJECT_ROOT, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE diff --git a/tests/msodde/test_basic.py b/tests/msodde/test_basic.py index 3386462..af767cc 100644 --- a/tests/msodde/test_basic.py +++ b/tests/msodde/test_basic.py @@ -123,7 +123,7 @@ class TestDdeLinks(unittest.TestCase): def test_excel(self): """ check that dde links are found in excel 2007+ files """ - expect = ['DDE-Link cmd /c calc.exe', ] + expect = ['cmd /c calc.exe', ] for extn in 'xlsx', 'xlsm', 'xlsb': output = msodde.process_file( join(BASE_DIR, 'msodde', 'dde-test.' + extn), msodde.FIELD_FILTER_BLACKLIST) diff --git a/tests/msodde/test_crypto.py b/tests/msodde/test_crypto.py new file mode 100644 index 0000000..2eb0273 --- /dev/null +++ b/tests/msodde/test_crypto.py @@ -0,0 +1,30 @@ +"""Check decryption of files from msodde works.""" + +import sys +import unittest +from os.path import join as pjoin + +from tests.test_utils import DATA_BASE_DIR + +from oletools import crypto +from oletools import msodde + + +@unittest.skipIf(not crypto.check_msoffcrypto(), + 'Module msoffcrypto not installed for python{}.{}' + .format(sys.version_info.major, sys.version_info.minor)) +class MsoddeCryptoTest(unittest.TestCase): + """Test integration of decryption in msodde.""" + def test_standard_password(self): + """Check dde-link is found in xls[mb] sample files.""" + for suffix in 'xls', 'xlsx', 'xlsm', 'xlsb': + example_file = pjoin(DATA_BASE_DIR, 'encrypted', + 'dde-test-encrypt-standardpassword.' + suffix) + link_text = msodde.process_maybe_encrypted(example_file) + self.assertEqual(link_text, 'cmd /c calc.exe', + msg='Unexpected output {!r} for {}' + .format(link_text, suffix)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/olevba/test_basic.py b/tests/olevba/test_basic.py index d319a12..798fba5 100644 --- a/tests/olevba/test_basic.py +++ b/tests/olevba/test_basic.py @@ -28,7 +28,15 @@ class TestOlevbaBasic(unittest.TestCase): CRYPT_DIR = join(DATA_BASE_DIR, 'encrypted') CRYPT_RETURN_CODE = 9 ADD_ARGS = [], ['-d', ], ['-a', ], ['-j', ], ['-t', ] + EXCEPTIONS = ['autostart-encrypt-standardpassword.xlsm', # These ... + 'autostart-encrypt-standardpassword.xlsb', # files ... + 'dde-test-encrypt-standardpassword.xls', # are ... + 'dde-test-encrypt-standardpassword.xlsx', # decrypted + 'dde-test-encrypt-standardpassword.xlsm', # per ... + 'dde-test-encrypt-standardpassword.xlsb'] # default. for filename in os.listdir(CRYPT_DIR): + if filename in EXCEPTIONS: + continue full_name = join(CRYPT_DIR, filename) for args in ADD_ARGS: try: diff --git a/tests/olevba/test_crypto.py b/tests/olevba/test_crypto.py new file mode 100644 index 0000000..b2dc84d --- /dev/null +++ b/tests/olevba/test_crypto.py @@ -0,0 +1,81 @@ +"""Check decryption of files from olevba works.""" + +import sys +import unittest +import os +from os.path import join as pjoin +from subprocess import check_output, CalledProcessError +import json +from collections import OrderedDict + +from tests.test_utils import DATA_BASE_DIR, SOURCE_BASE_DIR + +from oletools import crypto + + +@unittest.skipIf(not crypto.check_msoffcrypto(), + 'Module msoffcrypto not installed for python{}.{}' + .format(sys.version_info.major, sys.version_info.minor)) +class OlevbaCryptoWriteProtectTest(unittest.TestCase): + """ + Test documents that are 'write-protected' through encryption. + + Excel has a way to 'write-protect' documents by encrypting them with a + hard-coded standard password. When looking at the file-structure you see + an OLE-file with streams `EncryptedPackage`, `StrongEncryptionSpace`, and + `EncryptionInfo`. Contained in the first is the actual file. When opening + such a file in excel, it is decrypted without the user noticing. + + Olevba should detect such encryption, try to decrypt with the standard + password and look for VBA code in the decrypted file. + + All these tests are skipped if the module `msoffcrypto-tools` is not + installed. + """ + def test_autostart(self): + """Check that autostart macro is found in xls[mb] sample file.""" + # create a PYTHONPATH environment var to prefer our olevba + env = os.environ + try: + env['PYTHONPATH'] = SOURCE_BASE_DIR + os.pathsep + \ + os.environ['PYTHONPATH'] + except KeyError: + env['PYTHONPATH'] = SOURCE_BASE_DIR + + for suffix in 'xlsm', 'xlsb': + example_file = pjoin( + DATA_BASE_DIR, 'encrypted', + 'autostart-encrypt-standardpassword.' + suffix) + try: + output = check_output([sys.executable, '-m', 'olevba', '-j', + example_file], + universal_newlines=True, env=env) + except CalledProcessError as err: + print(err.output) + raise + data = json.loads(output, object_pairs_hook=OrderedDict) + # debug: json.dump(data, sys.stdout, indent=4) + self.assertEqual(len(data), 4) + self.assertIn('script_name', data[0]) + self.assertIn('version', data[0]) + self.assertEqual(data[0]['type'], 'MetaInformation') + self.assertIn('return_code', data[-1]) + self.assertEqual(data[-1]['type'], 'MetaInformation') + self.assertEqual(data[1]['container'], None) + self.assertEqual(data[1]['file'], example_file) + self.assertEqual(data[1]['analysis'], None) + self.assertEqual(data[1]['macros'], []) + self.assertEqual(data[1]['type'], 'OLE') + self.assertEqual(data[2]['container'], example_file) + self.assertNotEqual(data[2]['file'], example_file) + self.assertEqual(data[2]['type'], "OpenXML") + analysis = data[2]['analysis'] + self.assertEqual(analysis[0]['type'], 'AutoExec') + self.assertEqual(analysis[0]['keyword'], 'Auto_Open') + macros = data[2]['macros'] + self.assertEqual(macros[0]['vba_filename'], 'Modul1.bas') + self.assertIn('Sub Auto_Open()', macros[0]['code']) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/ppt_parser/test_basic.py b/tests/ppt_parser/test_basic.py index 9772e96..b653257 100644 --- a/tests/ppt_parser/test_basic.py +++ b/tests/ppt_parser/test_basic.py @@ -16,7 +16,7 @@ class TestBasic(unittest.TestCase): def test_is_ppt(self): """ test ppt_record_parser.is_ppt(filename) """ - exceptions = [] + exceptions = ['encrypted.ppt', ] # actually is ppt but embedded for base_dir, _, files in os.walk(DATA_BASE_DIR): for filename in files: if filename in exceptions: diff --git a/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsb b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsb new file mode 100755 index 0000000..b905d7c --- /dev/null +++ b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsb diff --git a/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsm b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsm new file mode 100755 index 0000000..2b2e113 --- /dev/null +++ b/tests/test-data/encrypted/autostart-encrypt-standardpassword.xlsm diff --git a/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xls b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xls new file mode 100755 index 0000000..c61f12b --- /dev/null +++ b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xls diff --git a/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsb b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsb new file mode 100755 index 0000000..3518a20 --- /dev/null +++ b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsb diff --git a/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsm b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsm new file mode 100755 index 0000000..b9cce05 --- /dev/null +++ b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsm diff --git a/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsx b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsx new file mode 100755 index 0000000..c677227 --- /dev/null +++ b/tests/test-data/encrypted/dde-test-encrypt-standardpassword.xlsx diff --git a/tests/test_utils/__init__.py b/tests/test_utils/__init__.py index c6671c7..5eda62a 100644 --- a/tests/test_utils/__init__.py +++ b/tests/test_utils/__init__.py @@ -1,4 +1,10 @@ -from os.path import dirname, join +from os.path import dirname, join, abspath + +# Base dir of project, contains subdirs "tests" and "oletools" and README.md +PROJECT_ROOT = dirname(dirname(dirname(abspath(__file__)))) # Directory with test data, independent of current working directory -DATA_BASE_DIR = join(dirname(dirname(__file__)), 'test-data') +DATA_BASE_DIR = join(PROJECT_ROOT, 'tests', 'test-data') + +# Directory with source code +SOURCE_BASE_DIR = join(PROJECT_ROOT, 'oletools')