Commit e22a452f10e64f20dcc6ae32fb964c552accbbdb

Authored by Samir Aguiar
Committed by Christian Herdtweck
1 parent fffedc77

log helper: add unit tests

oletools/util/log_helper/_json_formatter.py
@@ -3,6 +3,9 @@ import json @@ -3,6 +3,9 @@ import json
3 3
4 4
5 class JsonFormatter(logging.Formatter): 5 class JsonFormatter(logging.Formatter):
  6 + """
  7 + Format every message to be logged as a JSON object
  8 + """
6 def __init__(self, fmt=None, datefmt=None): 9 def __init__(self, fmt=None, datefmt=None):
7 super(JsonFormatter, self).__init__(fmt, datefmt) 10 super(JsonFormatter, self).__init__(fmt, datefmt)
8 self._is_first_line = True 11 self._is_first_line = True
@@ -19,7 +22,7 @@ class JsonFormatter(logging.Formatter): @@ -19,7 +22,7 @@ class JsonFormatter(logging.Formatter):
19 self._is_first_line = False 22 self._is_first_line = False
20 23
21 json_dict = record.msg \ 24 json_dict = record.msg \
22 - if type(record.msg) is dict \ 25 + if isinstance(record.msg, dict) \
23 else dict(msg=record.msg) 26 else dict(msg=record.msg)
24 json_dict['level'] = record.levelname 27 json_dict['level'] = record.levelname
25 28
oletools/util/log_helper/log_helper.py
@@ -125,7 +125,10 @@ class LogHelper: @@ -125,7 +125,10 @@ class LogHelper:
125 print('[') 125 print('[')
126 126
127 def end_logging(self): 127 def end_logging(self):
128 - """ called at end of main """ 128 + """
  129 + Must be called at the end of the main function
  130 + if the caller wants json-compatible ouptut
  131 + """
129 if not self._is_enabled: 132 if not self._is_enabled:
130 return 133 return
131 self._is_enabled = False 134 self._is_enabled = False
@@ -171,7 +174,9 @@ class LogHelper: @@ -171,7 +174,9 @@ class LogHelper:
171 return logger 174 return logger
172 175
173 def _make_json(self, logger): 176 def _make_json(self, logger):
174 - # remove handlers (sometimes there are multiple by default) 177 + """
  178 + Replace handlers of every logger by a handler that uses the JSON formatter
  179 + """
175 for handler in logger.handlers: 180 for handler in logger.handlers:
176 logger.removeHandler(handler) 181 logger.removeHandler(handler)
177 new_handler = logging.StreamHandler(sys.stdout) 182 new_handler = logging.StreamHandler(sys.stdout)
tests/util/__init__.py 0 → 100644
tests/util/log_helper/__init__.py 0 → 100644
tests/util/log_helper/log_helper_test_imported.py 0 → 100644
  1 +"""
  2 +Dummy file that logs messages, meant to be imported
  3 +by the main test file
  4 +"""
  5 +
  6 +from oletools.util.log_helper import log_helper
  7 +import logging
  8 +
  9 +DEBUG_MESSAGE = 'imported: debug log'
  10 +INFO_MESSAGE = 'imported: info log'
  11 +WARNING_MESSAGE = 'imported: warning log'
  12 +ERROR_MESSAGE = 'imported: error log'
  13 +CRITICAL_MESSAGE = 'imported: critical log'
  14 +
  15 +logger = log_helper.get_or_create_logger('test_imported', logging.ERROR)
  16 +
  17 +
  18 +def log():
  19 + logger.debug(DEBUG_MESSAGE)
  20 + logger.info(INFO_MESSAGE)
  21 + logger.warning(WARNING_MESSAGE)
  22 + logger.error(ERROR_MESSAGE)
  23 + logger.critical(CRITICAL_MESSAGE)
tests/util/log_helper/log_helper_test_main.py 0 → 100644
  1 +""" Test log_helpers """
  2 +
  3 +import sys
  4 +import logging
  5 +from tests.util.log_helper import log_helper_test_imported
  6 +from oletools.util.log_helper import log_helper
  7 +
  8 +DEBUG_MESSAGE = 'main: debug log'
  9 +INFO_MESSAGE = 'main: info log'
  10 +WARNING_MESSAGE = 'main: warning log'
  11 +ERROR_MESSAGE = 'main: error log'
  12 +CRITICAL_MESSAGE = 'main: critical log'
  13 +
  14 +logger = log_helper.get_or_create_logger('test_main')
  15 +
  16 +
  17 +def init_logging_and_log(cmd_line_args=None):
  18 + args = cmd_line_args if cmd_line_args else sys.argv
  19 +
  20 + if 'silent' in args:
  21 + return _log_silently()
  22 + elif 'dictionary' in args:
  23 + return _log_dictionary(args)
  24 + elif 'current_level' in args:
  25 + _enable_logging()
  26 + return _log_at_current_level()
  27 + elif 'default' in args:
  28 + _enable_logging()
  29 + return _log(logger)
  30 +
  31 + use_json = '-j' in args
  32 + throw_exception = 'throw' in args
  33 +
  34 + level = _parse_log_level(args)
  35 +
  36 + _enable_logging(use_json, level)
  37 + _log(logger)
  38 + log_helper_test_imported.log()
  39 +
  40 + if throw_exception:
  41 + raise Exception('An exception occurred before ending the logging')
  42 +
  43 + _end_logging()
  44 +
  45 +
  46 +def _parse_log_level(args):
  47 + if 'debug' in args:
  48 + return 'debug'
  49 + elif 'info' in args:
  50 + return 'info'
  51 + elif 'warning' in args:
  52 + return 'warning'
  53 + elif 'error' in args:
  54 + return 'error'
  55 + else:
  56 + return 'critical'
  57 +
  58 +
  59 +def _log_dictionary(args):
  60 + level = _parse_log_level(args)
  61 + _enable_logging(True, level)
  62 +
  63 + logger.log_at_current_level({
  64 + 'msg': DEBUG_MESSAGE
  65 + })
  66 + log_helper.end_logging()
  67 +
  68 +
  69 +def _enable_logging(use_json=False, level='warning'):
  70 + log_helper.enable_logging(use_json, level, stream=sys.stdout)
  71 +
  72 +
  73 +def _log_at_current_level():
  74 + logger.log_at_current_level(DEBUG_MESSAGE)
  75 +
  76 +
  77 +def _log_silently():
  78 + silent_logger = log_helper.get_or_create_silent_logger('test_main_silent', logging.DEBUG - 1)
  79 + _log(silent_logger)
  80 +
  81 +
  82 +def _log(current_logger):
  83 + current_logger.debug(DEBUG_MESSAGE)
  84 + current_logger.info(INFO_MESSAGE)
  85 + current_logger.warning(WARNING_MESSAGE)
  86 + current_logger.error(ERROR_MESSAGE)
  87 + current_logger.critical(CRITICAL_MESSAGE)
  88 +
  89 +
  90 +def _end_logging():
  91 + log_helper.end_logging()
  92 +
  93 +
  94 +if __name__ == '__main__':
  95 + # since we are using subprocess, add delimiters so we can easily extract
  96 + # the output that matters (when debugging tests we get extra output,
  97 + # so we need to ignore it)
  98 + print('<#')
  99 + init_logging_and_log(sys.argv)
  100 + print('#>')
tests/util/log_helper/test_log_helper.py 0 → 100644
  1 +""" Test the log helper
  2 +
  3 +This tests the generic log helper.
  4 +Check if it handles imported modules correctly
  5 +and that the default silent logger won't log when nothing is enabled
  6 +"""
  7 +
  8 +import unittest
  9 +import sys
  10 +import json
  11 +import re
  12 +from tests.util.log_helper import log_helper_test_main
  13 +from tests.util.log_helper import log_helper_test_imported
  14 +from os.path import dirname, join
  15 +from subprocess import check_output, STDOUT, CalledProcessError
  16 +
  17 +ROOT_DIRECTORY = dirname(dirname(dirname(__file__)))
  18 +TEST_FILE = join(dirname(__file__), 'log_helper_test_main.py')
  19 +PYTHON_EXECUTABLE = sys.executable
  20 +REGEX = re.compile('<#(.*)(:?#>|Traceback)', re.MULTILINE | re.DOTALL)
  21 +
  22 +MAIN_LOG_MESSAGES = [
  23 + log_helper_test_main.DEBUG_MESSAGE,
  24 + log_helper_test_main.INFO_MESSAGE,
  25 + log_helper_test_main.WARNING_MESSAGE,
  26 + log_helper_test_main.ERROR_MESSAGE,
  27 + log_helper_test_main.CRITICAL_MESSAGE
  28 +]
  29 +
  30 +
  31 +class TestLogHelper(unittest.TestCase):
  32 + def test_default_logging_to_stderr(self):
  33 + """
  34 + Basic test for simple logging
  35 + """
  36 + output = self._run_test(['default'])
  37 + self.assertIn(log_helper_test_main.WARNING_MESSAGE, output)
  38 +
  39 + def test_logging_silently(self):
  40 + """
  41 + Test that nothing will be logged when logging is not enabled
  42 + and we are using a silent logger (uses the NullHandler)
  43 + """
  44 + output = self._run_test(['silent'])
  45 + self.assertTrue(len(output) == 0)
  46 +
  47 + def test_setting_level_in_main_module(self):
  48 + """
  49 + Make sure that the level set in the main module is kept when
  50 + logging from imported modules.
  51 + """
  52 + output = self._run_test(['debug'])
  53 +
  54 + expected_messages = MAIN_LOG_MESSAGES + [
  55 + log_helper_test_imported.DEBUG_MESSAGE,
  56 + log_helper_test_imported.INFO_MESSAGE,
  57 + log_helper_test_imported.WARNING_MESSAGE,
  58 + log_helper_test_imported.ERROR_MESSAGE,
  59 + log_helper_test_imported.CRITICAL_MESSAGE
  60 + ]
  61 +
  62 + for msg in expected_messages:
  63 + self.assertIn(msg, output)
  64 +
  65 + def test_logging_at_current_level(self):
  66 + """
  67 + Test that logging at current level will always print a message
  68 + """
  69 + output = self._run_test(['current_level'])
  70 + self.assertIn(log_helper_test_main.DEBUG_MESSAGE, output)
  71 +
  72 + def test_logging_as_json(self):
  73 + """
  74 + Basic test for json logging
  75 + """
  76 + output = self._run_test(['critical', '-j'])
  77 +
  78 + try:
  79 + json_data = json.loads(output)
  80 + self._assert_json_messages(json_data, [
  81 + log_helper_test_main.CRITICAL_MESSAGE,
  82 + log_helper_test_imported.CRITICAL_MESSAGE
  83 + ])
  84 + except ValueError:
  85 + self.fail('Invalid json:\n' + output)
  86 + self.assertNotEqual(len(json_data), 0, msg='Output was empty')
  87 +
  88 + def test_logging_dictionary_as_json(self):
  89 + """
  90 + Test support for passing a dictionary to the logger
  91 + and have it logged as JSON
  92 + """
  93 + output = self._run_test(['dictionary'])
  94 +
  95 + try:
  96 + json_data = json.loads(output)
  97 + self._assert_json_messages(json_data, [
  98 + log_helper_test_main.DEBUG_MESSAGE
  99 + ])
  100 + except ValueError:
  101 + self.fail('Invalid json:\n' + output)
  102 + self.assertNotEqual(len(json_data), 0, msg='Output was empty')
  103 +
  104 + def test_json_correct_on_exceptions(self):
  105 + """
  106 + Test that even on unhandled exceptions our JSON is always correct
  107 + """
  108 + output = self._run_test(['critical', 'throw', '-j'], True)
  109 +
  110 + try:
  111 + json_data = json.loads(output)
  112 + self._assert_json_messages(json_data, [
  113 + log_helper_test_main.CRITICAL_MESSAGE,
  114 + log_helper_test_imported.CRITICAL_MESSAGE
  115 + ])
  116 + except ValueError:
  117 + self.fail('Invalid json:\n' + output)
  118 + self.assertNotEqual(len(json_data), 0, msg='Output was empty')
  119 +
  120 + def _assert_json_messages(self, json_data, messages):
  121 + self.assertEquals(len(json_data), len(messages))
  122 +
  123 + for i in range(len(messages)):
  124 + self.assertEquals(messages[i], json_data[i]['msg'])
  125 +
  126 + @staticmethod
  127 + def _run_test(args, ignore_exceptions=False):
  128 + """
  129 + Use subprocess to better simulate the real scenario and avoid
  130 + logging conflicts when running multiple tests (since logging depends on singletons,
  131 + we might get errors or false positives between sequential tests runs)
  132 + """
  133 + try:
  134 + output = check_output(
  135 + [PYTHON_EXECUTABLE, TEST_FILE] + args,
  136 + shell=False,
  137 + cwd=ROOT_DIRECTORY,
  138 + stderr=STDOUT,
  139 + universal_newlines=True
  140 + )
  141 +
  142 + if not isinstance(output, str):
  143 + output = output.decode('utf-8')
  144 + except CalledProcessError as ex:
  145 + if ignore_exceptions:
  146 + output = ex.output
  147 + else:
  148 + # we want tests to fail if an exception occur
  149 + raise ex
  150 +
  151 + return REGEX.search(output).group(1).strip()
  152 +
  153 +
  154 +# just in case somebody calls this file as a script
  155 +if __name__ == '__main__':
  156 + unittest.main()