Commit 7be0fcf4fd3e47fc3d50401753fd4086c146feeb

Authored by Wiebe Cazemier
1 parent b724f979

Quit threads by using task queue

This fixes the 100ms wait time requirement.
authplugin.cpp
@@ -97,6 +97,9 @@ void AuthPlugin::init() @@ -97,6 +97,9 @@ void AuthPlugin::init()
97 if (settings.authPluginSerializeInit) 97 if (settings.authPluginSerializeInit)
98 lock.lock(); 98 lock.lock();
99 99
  100 + if (quitting)
  101 + return;
  102 +
100 AuthOptCompatWrap &authOpts = settings.getAuthOptsCompat(); 103 AuthOptCompatWrap &authOpts = settings.getAuthOptsCompat();
101 int result = init_v2(&pluginData, authOpts.head(), authOpts.size()); 104 int result = init_v2(&pluginData, authOpts.head(), authOpts.size());
102 if (result != 0) 105 if (result != 0)
@@ -125,6 +128,9 @@ void AuthPlugin::securityInit(bool reloading) @@ -125,6 +128,9 @@ void AuthPlugin::securityInit(bool reloading)
125 if (settings.authPluginSerializeInit) 128 if (settings.authPluginSerializeInit)
126 lock.lock(); 129 lock.lock();
127 130
  131 + if (quitting)
  132 + return;
  133 +
128 AuthOptCompatWrap &authOpts = settings.getAuthOptsCompat(); 134 AuthOptCompatWrap &authOpts = settings.getAuthOptsCompat();
129 int result = security_init_v2(pluginData, authOpts.head(), authOpts.size(), reloading); 135 int result = security_init_v2(pluginData, authOpts.head(), authOpts.size(), reloading);
130 if (result != 0) 136 if (result != 0)
@@ -201,6 +207,11 @@ AuthResult AuthPlugin::unPwdCheck(const std::string &username, const std::string @@ -201,6 +207,11 @@ AuthResult AuthPlugin::unPwdCheck(const std::string &username, const std::string
201 return r; 207 return r;
202 } 208 }
203 209
  210 +void AuthPlugin::setQuitting()
  211 +{
  212 + this->quitting = true;
  213 +}
  214 +
204 std::string AuthResultToString(AuthResult r) 215 std::string AuthResultToString(AuthResult r)
205 { 216 {
206 { 217 {
authplugin.h
@@ -63,6 +63,7 @@ class AuthPlugin @@ -63,6 +63,7 @@ class AuthPlugin
63 Logger *logger = nullptr; 63 Logger *logger = nullptr;
64 bool initialized = false; 64 bool initialized = false;
65 bool wanted = false; 65 bool wanted = false;
  66 + bool quitting = false;
66 67
67 void *loadSymbol(void *handle, const char *symbol) const; 68 void *loadSymbol(void *handle, const char *symbol) const;
68 public: 69 public:
@@ -79,6 +80,8 @@ public: @@ -79,6 +80,8 @@ public:
79 AuthResult aclCheck(const std::string &clientid, const std::string &username, const std::string &topic, AclAccess access); 80 AuthResult aclCheck(const std::string &clientid, const std::string &username, const std::string &topic, AclAccess access);
80 AuthResult unPwdCheck(const std::string &username, const std::string &password); 81 AuthResult unPwdCheck(const std::string &username, const std::string &password);
81 82
  83 + void setQuitting();
  84 +
82 }; 85 };
83 86
84 #endif // AUTHPLUGIN_H 87 #endif // AUTHPLUGIN_H
mainapp.cpp
@@ -440,7 +440,12 @@ void MainApp::start() @@ -440,7 +440,12 @@ void MainApp::start()
440 440
441 for(std::shared_ptr<ThreadData> &thread : threads) 441 for(std::shared_ptr<ThreadData> &thread : threads)
442 { 442 {
443 - thread->quit(); 443 + thread->queueQuit();
  444 + }
  445 +
  446 + for(std::shared_ptr<ThreadData> &thread : threads)
  447 + {
  448 + thread->waitForQuit();
444 } 449 }
445 450
446 for(auto pair : listenerMap) 451 for(auto pair : listenerMap)
threaddata.cpp
@@ -52,7 +52,6 @@ void ThreadData::start(thread_f f) @@ -52,7 +52,6 @@ void ThreadData::start(thread_f f)
52 void ThreadData::quit() 52 void ThreadData::quit()
53 { 53 {
54 running = false; 54 running = false;
55 - thread.join();  
56 } 55 }
57 56
58 void ThreadData::giveClient(Client_p client) 57 void ThreadData::giveClient(Client_p client)
@@ -109,6 +108,23 @@ void ThreadData::queueDoKeepAliveCheck() @@ -109,6 +108,23 @@ void ThreadData::queueDoKeepAliveCheck()
109 wakeUpThread(); 108 wakeUpThread();
110 } 109 }
111 110
  111 +void ThreadData::queueQuit()
  112 +{
  113 + std::lock_guard<std::mutex> locker(taskQueueMutex);
  114 +
  115 + auto f = std::bind(&ThreadData::quit, this);
  116 + taskQueue.push_front(f);
  117 +
  118 + authPlugin.setQuitting();
  119 +
  120 + wakeUpThread();
  121 +}
  122 +
  123 +void ThreadData::waitForQuit()
  124 +{
  125 + thread.join();
  126 +}
  127 +
112 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial? 128 // TODO: profile how fast hash iteration is. Perhaps having a second list/vector is beneficial?
113 void ThreadData::doKeepAliveCheck() 129 void ThreadData::doKeepAliveCheck()
114 { 130 {
threaddata.h
@@ -33,6 +33,7 @@ class ThreadData @@ -33,6 +33,7 @@ class ThreadData
33 void reload(std::shared_ptr<Settings> settings); 33 void reload(std::shared_ptr<Settings> settings);
34 void wakeUpThread(); 34 void wakeUpThread();
35 void doKeepAliveCheck(); 35 void doKeepAliveCheck();
  36 + void quit();
36 37
37 public: 38 public:
38 Settings settingsLocalCopy; // Is updated on reload, within the thread loop. 39 Settings settingsLocalCopy; // Is updated on reload, within the thread loop.
@@ -50,7 +51,7 @@ public: @@ -50,7 +51,7 @@ public:
50 ThreadData(ThreadData &&other) = delete; 51 ThreadData(ThreadData &&other) = delete;
51 52
52 void start(thread_f f); 53 void start(thread_f f);
53 - void quit(); 54 +
54 void giveClient(Client_p client); 55 void giveClient(Client_p client);
55 Client_p getClient(int fd); 56 Client_p getClient(int fd);
56 void removeClient(Client_p client); 57 void removeClient(Client_p client);
@@ -60,6 +61,8 @@ public: @@ -60,6 +61,8 @@ public:
60 void initAuthPlugin(); 61 void initAuthPlugin();
61 void queueReload(std::shared_ptr<Settings> settings); 62 void queueReload(std::shared_ptr<Settings> settings);
62 void queueDoKeepAliveCheck(); 63 void queueDoKeepAliveCheck();
  64 + void queueQuit();
  65 + void waitForQuit();
63 66
64 }; 67 };
65 68