diff --git a/oletools/util/log_helper/_json_formatter.py b/oletools/util/log_helper/_json_formatter.py index fb41525..93e5182 100644 --- a/oletools/util/log_helper/_json_formatter.py +++ b/oletools/util/log_helper/_json_formatter.py @@ -3,6 +3,9 @@ import json class JsonFormatter(logging.Formatter): + """ + Format every message to be logged as a JSON object + """ def __init__(self, fmt=None, datefmt=None): super(JsonFormatter, self).__init__(fmt, datefmt) self._is_first_line = True @@ -19,7 +22,7 @@ class JsonFormatter(logging.Formatter): self._is_first_line = False json_dict = record.msg \ - if type(record.msg) is dict \ + if isinstance(record.msg, dict) \ else dict(msg=record.msg) json_dict['level'] = record.levelname diff --git a/oletools/util/log_helper/log_helper.py b/oletools/util/log_helper/log_helper.py index 57bfb5c..7a44102 100644 --- a/oletools/util/log_helper/log_helper.py +++ b/oletools/util/log_helper/log_helper.py @@ -125,7 +125,10 @@ class LogHelper: print('[') def end_logging(self): - """ called at end of main """ + """ + Must be called at the end of the main function + if the caller wants json-compatible ouptut + """ if not self._is_enabled: return self._is_enabled = False @@ -171,7 +174,9 @@ class LogHelper: return logger def _make_json(self, logger): - # remove handlers (sometimes there are multiple by default) + """ + Replace handlers of every logger by a handler that uses the JSON formatter + """ for handler in logger.handlers: logger.removeHandler(handler) new_handler = logging.StreamHandler(sys.stdout) diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/util/__init__.py diff --git a/tests/util/log_helper/__init__.py b/tests/util/log_helper/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/util/log_helper/__init__.py diff --git a/tests/util/log_helper/log_helper_test_imported.py b/tests/util/log_helper/log_helper_test_imported.py new file mode 100644 index 0000000..29d088b --- /dev/null +++ b/tests/util/log_helper/log_helper_test_imported.py @@ -0,0 +1,23 @@ +""" +Dummy file that logs messages, meant to be imported +by the main test file +""" + +from oletools.util.log_helper import log_helper +import logging + +DEBUG_MESSAGE = 'imported: debug log' +INFO_MESSAGE = 'imported: info log' +WARNING_MESSAGE = 'imported: warning log' +ERROR_MESSAGE = 'imported: error log' +CRITICAL_MESSAGE = 'imported: critical log' + +logger = log_helper.get_or_create_logger('test_imported', logging.ERROR) + + +def log(): + logger.debug(DEBUG_MESSAGE) + logger.info(INFO_MESSAGE) + logger.warning(WARNING_MESSAGE) + logger.error(ERROR_MESSAGE) + logger.critical(CRITICAL_MESSAGE) diff --git a/tests/util/log_helper/log_helper_test_main.py b/tests/util/log_helper/log_helper_test_main.py new file mode 100644 index 0000000..b2f3f66 --- /dev/null +++ b/tests/util/log_helper/log_helper_test_main.py @@ -0,0 +1,100 @@ +""" Test log_helpers """ + +import sys +import logging +from tests.util.log_helper import log_helper_test_imported +from oletools.util.log_helper import log_helper + +DEBUG_MESSAGE = 'main: debug log' +INFO_MESSAGE = 'main: info log' +WARNING_MESSAGE = 'main: warning log' +ERROR_MESSAGE = 'main: error log' +CRITICAL_MESSAGE = 'main: critical log' + +logger = log_helper.get_or_create_logger('test_main') + + +def init_logging_and_log(cmd_line_args=None): + args = cmd_line_args if cmd_line_args else sys.argv + + if 'silent' in args: + return _log_silently() + elif 'dictionary' in args: + return _log_dictionary(args) + elif 'current_level' in args: + _enable_logging() + return _log_at_current_level() + elif 'default' in args: + _enable_logging() + return _log(logger) + + use_json = '-j' in args + throw_exception = 'throw' in args + + level = _parse_log_level(args) + + _enable_logging(use_json, level) + _log(logger) + log_helper_test_imported.log() + + if throw_exception: + raise Exception('An exception occurred before ending the logging') + + _end_logging() + + +def _parse_log_level(args): + if 'debug' in args: + return 'debug' + elif 'info' in args: + return 'info' + elif 'warning' in args: + return 'warning' + elif 'error' in args: + return 'error' + else: + return 'critical' + + +def _log_dictionary(args): + level = _parse_log_level(args) + _enable_logging(True, level) + + logger.log_at_current_level({ + 'msg': DEBUG_MESSAGE + }) + log_helper.end_logging() + + +def _enable_logging(use_json=False, level='warning'): + log_helper.enable_logging(use_json, level, stream=sys.stdout) + + +def _log_at_current_level(): + logger.log_at_current_level(DEBUG_MESSAGE) + + +def _log_silently(): + silent_logger = log_helper.get_or_create_silent_logger('test_main_silent', logging.DEBUG - 1) + _log(silent_logger) + + +def _log(current_logger): + current_logger.debug(DEBUG_MESSAGE) + current_logger.info(INFO_MESSAGE) + current_logger.warning(WARNING_MESSAGE) + current_logger.error(ERROR_MESSAGE) + current_logger.critical(CRITICAL_MESSAGE) + + +def _end_logging(): + log_helper.end_logging() + + +if __name__ == '__main__': + # since we are using subprocess, add delimiters so we can easily extract + # the output that matters (when debugging tests we get extra output, + # so we need to ignore it) + print('<#') + init_logging_and_log(sys.argv) + print('#>') diff --git a/tests/util/log_helper/test_log_helper.py b/tests/util/log_helper/test_log_helper.py new file mode 100644 index 0000000..0326c08 --- /dev/null +++ b/tests/util/log_helper/test_log_helper.py @@ -0,0 +1,156 @@ +""" Test the log helper + +This tests the generic log helper. +Check if it handles imported modules correctly +and that the default silent logger won't log when nothing is enabled +""" + +import unittest +import sys +import json +import re +from tests.util.log_helper import log_helper_test_main +from tests.util.log_helper import log_helper_test_imported +from os.path import dirname, join +from subprocess import check_output, STDOUT, CalledProcessError + +ROOT_DIRECTORY = dirname(dirname(dirname(__file__))) +TEST_FILE = join(dirname(__file__), 'log_helper_test_main.py') +PYTHON_EXECUTABLE = sys.executable +REGEX = re.compile('<#(.*)(:?#>|Traceback)', re.MULTILINE | re.DOTALL) + +MAIN_LOG_MESSAGES = [ + log_helper_test_main.DEBUG_MESSAGE, + log_helper_test_main.INFO_MESSAGE, + log_helper_test_main.WARNING_MESSAGE, + log_helper_test_main.ERROR_MESSAGE, + log_helper_test_main.CRITICAL_MESSAGE +] + + +class TestLogHelper(unittest.TestCase): + def test_default_logging_to_stderr(self): + """ + Basic test for simple logging + """ + output = self._run_test(['default']) + self.assertIn(log_helper_test_main.WARNING_MESSAGE, output) + + def test_logging_silently(self): + """ + Test that nothing will be logged when logging is not enabled + and we are using a silent logger (uses the NullHandler) + """ + output = self._run_test(['silent']) + self.assertTrue(len(output) == 0) + + def test_setting_level_in_main_module(self): + """ + Make sure that the level set in the main module is kept when + logging from imported modules. + """ + output = self._run_test(['debug']) + + expected_messages = MAIN_LOG_MESSAGES + [ + log_helper_test_imported.DEBUG_MESSAGE, + log_helper_test_imported.INFO_MESSAGE, + log_helper_test_imported.WARNING_MESSAGE, + log_helper_test_imported.ERROR_MESSAGE, + log_helper_test_imported.CRITICAL_MESSAGE + ] + + for msg in expected_messages: + self.assertIn(msg, output) + + def test_logging_at_current_level(self): + """ + Test that logging at current level will always print a message + """ + output = self._run_test(['current_level']) + self.assertIn(log_helper_test_main.DEBUG_MESSAGE, output) + + def test_logging_as_json(self): + """ + Basic test for json logging + """ + output = self._run_test(['critical', '-j']) + + try: + json_data = json.loads(output) + self._assert_json_messages(json_data, [ + log_helper_test_main.CRITICAL_MESSAGE, + log_helper_test_imported.CRITICAL_MESSAGE + ]) + except ValueError: + self.fail('Invalid json:\n' + output) + self.assertNotEqual(len(json_data), 0, msg='Output was empty') + + def test_logging_dictionary_as_json(self): + """ + Test support for passing a dictionary to the logger + and have it logged as JSON + """ + output = self._run_test(['dictionary']) + + try: + json_data = json.loads(output) + self._assert_json_messages(json_data, [ + log_helper_test_main.DEBUG_MESSAGE + ]) + except ValueError: + self.fail('Invalid json:\n' + output) + self.assertNotEqual(len(json_data), 0, msg='Output was empty') + + def test_json_correct_on_exceptions(self): + """ + Test that even on unhandled exceptions our JSON is always correct + """ + output = self._run_test(['critical', 'throw', '-j'], True) + + try: + json_data = json.loads(output) + self._assert_json_messages(json_data, [ + log_helper_test_main.CRITICAL_MESSAGE, + log_helper_test_imported.CRITICAL_MESSAGE + ]) + except ValueError: + self.fail('Invalid json:\n' + output) + self.assertNotEqual(len(json_data), 0, msg='Output was empty') + + def _assert_json_messages(self, json_data, messages): + self.assertEquals(len(json_data), len(messages)) + + for i in range(len(messages)): + self.assertEquals(messages[i], json_data[i]['msg']) + + @staticmethod + def _run_test(args, ignore_exceptions=False): + """ + Use subprocess to better simulate the real scenario and avoid + logging conflicts when running multiple tests (since logging depends on singletons, + we might get errors or false positives between sequential tests runs) + """ + try: + output = check_output( + [PYTHON_EXECUTABLE, TEST_FILE] + args, + shell=False, + cwd=ROOT_DIRECTORY, + stderr=STDOUT, + universal_newlines=True + ) + + if not isinstance(output, str): + output = output.decode('utf-8') + except CalledProcessError as ex: + if ignore_exceptions: + output = ex.output + else: + # we want tests to fail if an exception occur + raise ex + + return REGEX.search(output).group(1).strip() + + +# just in case somebody calls this file as a script +if __name__ == '__main__': + unittest.main()