Commit d105ac30ac935fa88f7940b978d31ccbe58fbdd8

Authored by Wiebe Cazemier
1 parent 4bfef9db

Use separate thread for writing log

The file IO is blocking, so we don't want that in our thread loops.
logger.cpp
... ... @@ -20,15 +20,65 @@ License along with FlashMQ. If not, see <https://www.gnu.org/licenses/>.
20 20 #include <sstream>
21 21 #include <iomanip>
22 22 #include <string.h>
  23 +#include <functional>
23 24  
24 25 #include "exceptions.h"
  26 +#include "utils.h"
25 27  
26 28 Logger *Logger::instance = nullptr;
27 29 std::string Logger::logPath = "";
28 30  
  31 +LogLine::LogLine(std::string &&line, bool alsoToStdOut) :
  32 + line(line),
  33 + alsoToStdOut(alsoToStdOut)
  34 +{
  35 +
  36 +}
  37 +
  38 +LogLine::LogLine(const char *s, size_t len, bool alsoToStdOut) :
  39 + line(s, len),
  40 + alsoToStdOut(alsoToStdOut)
  41 +{
  42 +
  43 +}
  44 +
  45 +LogLine::LogLine() :
  46 + alsoToStdOut(true)
  47 +{
  48 +
  49 +}
  50 +
  51 +const char *LogLine::c_str() const
  52 +{
  53 + return line.c_str();
  54 +}
  55 +
  56 +bool LogLine::alsoLogToStdOut() const
  57 +{
  58 + return alsoToStdOut;
  59 +}
  60 +
29 61 Logger::Logger()
30 62 {
  63 + memset(&linesPending, 1, sizeof(sem_t));
  64 + sem_init(&linesPending, 0, 0);
  65 +
  66 + auto f = std::bind(&Logger::writeLog, this);
  67 + this->writerThread = std::thread(f, this);
  68 +
  69 + pthread_t native = this->writerThread.native_handle();
  70 + pthread_setname_np(native, "LogWriter");
  71 +}
  72 +
  73 +Logger::~Logger()
  74 +{
  75 + if (file)
  76 + {
  77 + fclose(file);
  78 + file = nullptr;
  79 + }
31 80  
  81 + sem_close(&linesPending);
32 82 }
33 83  
34 84 std::string Logger::getLogLevelString(int level) const
... ... @@ -71,10 +121,14 @@ void Logger::logf(int level, const char *str, ...)
71 121 va_end(valist);
72 122 }
73 123  
74   -void Logger::reOpen()
  124 +void Logger::queueReOpen()
75 125 {
76   - std::lock_guard<std::mutex> locker(logMutex);
  126 + reload = true;
  127 + sem_post(&linesPending);
  128 +}
77 129  
  130 +void Logger::reOpen()
  131 +{
78 132 if (file)
79 133 {
80 134 fclose(file);
... ... @@ -86,8 +140,7 @@ void Logger::reOpen()
86 140  
87 141 if ((file = fopen(logPath.c_str(), "a")) == nullptr)
88 142 {
89   - std::string msg(strerror(errno));
90   - throw ConfigFileException("Error opening logfile: " + msg);
  143 + logf(LOG_ERR, "(Re)opening log file '%s' error: %s. Logging to stdout.", logPath.c_str(), strerror(errno));
91 144 }
92 145 }
93 146  
... ... @@ -106,8 +159,6 @@ void Logger::setLogPath(const std::string &amp;path)
106 159  
107 160 void Logger::setFlags(bool logDebug, bool logSubscriptions)
108 161 {
109   - std::lock_guard<std::mutex> locker(logMutex);
110   -
111 162 if (logDebug)
112 163 curLogLevel |= LOG_DEBUG;
113 164 else
... ... @@ -119,13 +170,64 @@ void Logger::setFlags(bool logDebug, bool logSubscriptions)
119 170 curLogLevel &= ~(LOG_UNSUBSCRIBE & LOG_SUBSCRIBE);
120 171 }
121 172  
  173 +void Logger::quit()
  174 +{
  175 + running = false;
  176 + sem_post(&linesPending);
  177 + writerThread.join();
  178 +}
  179 +
  180 +void Logger::writeLog()
  181 +{
  182 + LogLine line;
  183 + while(running)
  184 + {
  185 + sem_wait(&linesPending);
  186 +
  187 + if (reload)
  188 + {
  189 + reOpen();
  190 + }
  191 +
  192 + {
  193 + std::lock_guard<std::mutex> locker(logMutex);
  194 +
  195 + if (lines.empty())
  196 + continue;
  197 +
  198 + line = std::move(lines.front());
  199 + lines.pop();
  200 + }
  201 +
  202 + if (this->file)
  203 + {
  204 + if (fputs(line.c_str(), this->file) < 0 ||
  205 + fputs("\n", this->file) < 0 ||
  206 + fflush(this->file) != 0)
  207 + {
  208 + alsoLogToStd = true;
  209 + fputs("Writing to log failed. Enabling stdout logger.", stderr);
  210 + }
  211 + }
  212 +
  213 + if (!this->file || line.alsoLogToStdOut())
  214 + {
  215 + FILE *output = stdout;
  216 +#ifdef TESTING
  217 + output = stderr; // the stdout interfers with Qt test XML output, so using stderr.
  218 +#endif
  219 + fputs(line.c_str(), output);
  220 + fputs("\n", output);
  221 + fflush(output);
  222 + }
  223 + }
  224 +}
  225 +
122 226 void Logger::logf(int level, const char *str, va_list valist)
123 227 {
124 228 if ((level & curLogLevel) == 0)
125 229 return;
126 230  
127   - std::lock_guard<std::mutex> locker(logMutex);
128   -
129 231 time_t time = std::time(nullptr);
130 232 struct tm tm = *std::localtime(&time);
131 233 std::ostringstream oss;
... ... @@ -136,28 +238,21 @@ void Logger::logf(int level, const char *str, va_list valist)
136 238 const std::string s = oss.str();
137 239 const char *logfmtstring = s.c_str();
138 240  
139   - if (this->file)
140   - {
141   - va_list valist2;
142   - va_copy(valist2, valist);
143   - vfprintf(this->file, logfmtstring, valist2);
144   - fprintf(this->file, "\n");
145   - fflush(this->file);
146   - va_end(valist2);
147   - }
  241 + char buf[512];
  242 +
  243 + va_list valist2;
  244 + va_copy(valist2, valist);
  245 + vsnprintf(buf, 512, logfmtstring, valist);
  246 + size_t len = strlen(buf);
  247 + LogLine line(buf, len, alsoLogToStd);
  248 + va_end(valist2);
148 249  
149   - if (!this->file || alsoLogToStd)
150 250 {
151   -#ifdef TESTING
152   - vfprintf(stderr, logfmtstring, valist); // the stdout interfers with Qt test XML output, so using stderr.
153   - fprintf(stderr, "\n");
154   - fflush(stderr);
155   -#else
156   - vfprintf(stdout, logfmtstring, valist);
157   - fprintf(stdout, "\n");
158   - fflush(stdout);
159   -#endif
  251 + std::lock_guard<std::mutex> locker(logMutex);
  252 + lines.push(std::move(line));
160 253 }
  254 +
  255 + sem_post(&linesPending);
161 256 }
162 257  
163 258 int logSslError(const char *str, size_t len, void *u)
... ...
logger.h
... ... @@ -21,33 +21,64 @@ License along with FlashMQ. If not, see &lt;https://www.gnu.org/licenses/&gt;.
21 21 #include <stdio.h>
22 22 #include <stdarg.h>
23 23 #include <mutex>
  24 +#include <queue>
  25 +#include <thread>
  26 +#include "semaphore.h"
24 27  
25 28 #include "flashmq_plugin.h"
26 29  
27 30 int logSslError(const char *str, size_t len, void *u);
28 31  
  32 +class LogLine
  33 +{
  34 + std::string line;
  35 + bool alsoToStdOut;
  36 +
  37 +public:
  38 + LogLine(std::string &&line, bool alsoToStdOut);
  39 + LogLine(const char *s, size_t len, bool alsoToStdOut);
  40 + LogLine();
  41 + LogLine(const LogLine &other) = delete;
  42 + LogLine(LogLine &&other) = default;
  43 + LogLine &operator=(LogLine &&other) = default;
  44 +
  45 + const char *c_str() const;
  46 + bool alsoLogToStdOut() const;
  47 +};
  48 +
29 49 class Logger
30 50 {
31 51 static Logger *instance;
32 52 static std::string logPath;
33 53 int curLogLevel = LOG_ERR | LOG_WARNING | LOG_NOTICE | LOG_INFO | LOG_SUBSCRIBE | LOG_UNSUBSCRIBE ;
34 54 std::mutex logMutex;
  55 + std::queue<LogLine> lines;
  56 + sem_t linesPending;
  57 + std::thread writerThread;
  58 + bool running = true;
35 59 FILE *file = nullptr;
36 60 bool alsoLogToStd = true;
  61 + bool reload = false;
37 62  
38 63 Logger();
  64 + ~Logger();
39 65 std::string getLogLevelString(int level) const;
  66 + void reOpen();
  67 + void writeLog();
40 68  
41 69 public:
42 70 static Logger *getInstance();
43 71 void logf(int level, const char *str, va_list args);
44 72 void logf(int level, const char *str, ...);
45   - void reOpen();
  73 +
  74 + void queueReOpen();
46 75 void noLongerLogToStd();
47 76  
48 77 void setLogPath(const std::string &path);
49 78 void setFlags(bool logDebug, bool logSubscriptions);
50 79  
  80 + void quit();
  81 +
51 82 };
52 83  
53 84 #endif // LOGGER_H
... ...
main.cpp
... ... @@ -93,6 +93,7 @@ int main(int argc, char *argv[])
93 93 logger->logf(LOG_NOTICE, "Starting FlashMQ version %s, debug build %s.", FLASHMQ_VERSION, sse.c_str());
94 94 #endif
95 95 mainApp->start();
  96 + logger->quit();
96 97 }
97 98 catch (ConfigFileException &ex)
98 99 {
... ...
mainapp.cpp
... ... @@ -643,7 +643,7 @@ void MainApp::loadConfig()
643 643 listeners = settings->listeners;
644 644  
645 645 logger->setLogPath(settings->logPath);
646   - logger->reOpen();
  646 + logger->queueReOpen();
647 647 logger->setFlags(settings->logDebug, settings->logSubscriptions);
648 648  
649 649 setlimits();
... ...