Commit d0473da1b93c0603fe1be110c3fe1b66181665b1

Authored by Wiebe Cazemier
1 parent 64ec702a

Auto-detect number of threads

mainapp.cpp
@@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
4 #include "getopt.h" 4 #include "getopt.h"
5 #include <unistd.h> 5 #include <unistd.h>
6 #include <stdio.h> 6 #include <stdio.h>
  7 +#include <sys/sysinfo.h>
7 8
8 #include <openssl/ssl.h> 9 #include <openssl/ssl.h>
9 #include <openssl/err.h> 10 #include <openssl/err.h>
@@ -11,7 +12,6 @@ @@ -11,7 +12,6 @@
11 #include "logger.h" 12 #include "logger.h"
12 13
13 #define MAX_EVENTS 1024 14 #define MAX_EVENTS 1024
14 -#define NR_OF_THREADS 4  
15 15
16 #define VERSION "0.1" 16 #define VERSION "0.1"
17 17
@@ -148,12 +148,20 @@ void do_thread_work(ThreadData *threadData) @@ -148,12 +148,20 @@ void do_thread_work(ThreadData *threadData)
148 MainApp::MainApp(const std::string &configFilePath) : 148 MainApp::MainApp(const std::string &configFilePath) :
149 subscriptionStore(new SubscriptionStore()) 149 subscriptionStore(new SubscriptionStore())
150 { 150 {
  151 + this->num_threads = get_nprocs();
  152 +
  153 + if (num_threads <= 0)
  154 + throw std::runtime_error("Invalid number of CPUs: " + std::to_string(num_threads));
  155 +
151 epollFdAccept = check<std::runtime_error>(epoll_create(999)); 156 epollFdAccept = check<std::runtime_error>(epoll_create(999));
152 taskEventFd = eventfd(0, EFD_NONBLOCK); 157 taskEventFd = eventfd(0, EFD_NONBLOCK);
153 158
154 confFileParser.reset(new ConfigFileParser(configFilePath)); 159 confFileParser.reset(new ConfigFileParser(configFilePath));
155 loadConfig(); 160 loadConfig();
156 161
  162 + // TODO: override in conf possibility.
  163 + logger->logf(LOG_NOTICE, "%d CPUs are detected, making as many threads.", num_threads);
  164 +
157 auto f = std::bind(&MainApp::queueCleanup, this); 165 auto f = std::bind(&MainApp::queueCleanup, this);
158 timer.addCallback(f, 86400000, "session expiration"); 166 timer.addCallback(f, 86400000, "session expiration");
159 } 167 }
@@ -342,7 +350,7 @@ void MainApp::start() @@ -342,7 +350,7 @@ void MainApp::start()
342 ev.events = EPOLLIN; 350 ev.events = EPOLLIN;
343 check<std::runtime_error>(epoll_ctl(this->epollFdAccept, EPOLL_CTL_ADD, taskEventFd, &ev)); 351 check<std::runtime_error>(epoll_ctl(this->epollFdAccept, EPOLL_CTL_ADD, taskEventFd, &ev));
344 352
345 - for (int i = 0; i < NR_OF_THREADS; i++) 353 + for (int i = 0; i < num_threads; i++)
346 { 354 {
347 std::shared_ptr<ThreadData> t(new ThreadData(i, subscriptionStore, *confFileParser.get())); 355 std::shared_ptr<ThreadData> t(new ThreadData(i, subscriptionStore, *confFileParser.get()));
348 t->start(&do_thread_work); 356 t->start(&do_thread_work);
@@ -373,7 +381,7 @@ void MainApp::start() @@ -373,7 +381,7 @@ void MainApp::start()
373 { 381 {
374 if (cur_fd == listen_fd_plain || cur_fd == listen_fd_ssl) 382 if (cur_fd == listen_fd_plain || cur_fd == listen_fd_ssl)
375 { 383 {
376 - std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS]; 384 + std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % num_threads];
377 385
378 logger->logf(LOG_INFO, "Accepting connection on thread %d", thread_data->threadnr); 386 logger->logf(LOG_INFO, "Accepting connection on thread %d", thread_data->threadnr);
379 387
mainapp.h
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 class MainApp 24 class MainApp
25 { 25 {
26 static MainApp *instance; 26 static MainApp *instance;
  27 + int num_threads = 0;
27 28
28 bool started = false; 29 bool started = false;
29 bool running = true; 30 bool running = true;
threaddata.cpp
@@ -21,7 +21,22 @@ void ThreadData::start(thread_f f) @@ -21,7 +21,22 @@ void ThreadData::start(thread_f f)
21 std::ostringstream threadName; 21 std::ostringstream threadName;
22 threadName << "FlashMQ T " << threadnr; 22 threadName << "FlashMQ T " << threadnr;
23 threadName.flush(); 23 threadName.flush();
24 - pthread_setname_np(native, threadName.str().c_str()); 24 + const char *c_str = threadName.str().c_str();
  25 + pthread_setname_np(native, c_str);
  26 +
  27 + cpu_set_t cpuset;
  28 + CPU_ZERO(&cpuset);
  29 + CPU_SET(threadnr, &cpuset);
  30 + check<std::runtime_error>(pthread_setaffinity_np(native, sizeof(cpuset), &cpuset));
  31 +
  32 + // It's not really necessary to get affinity again, but now I'm logging truth instead assumption.
  33 + check<std::runtime_error>(pthread_getaffinity_np(native, sizeof(cpuset), &cpuset));
  34 + int pinned_cpu = -1;
  35 + for (int j = 0; j < CPU_SETSIZE; j++)
  36 + if (CPU_ISSET(j, &cpuset))
  37 + pinned_cpu = j;
  38 +
  39 + logger->logf(LOG_NOTICE, "Thread '%s' pinned to CPU %d", c_str, pinned_cpu);
25 } 40 }
26 41
27 void ThreadData::quit() 42 void ThreadData::quit()