From d0473da1b93c0603fe1be110c3fe1b66181665b1 Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Sun, 7 Feb 2021 16:54:02 +0100 Subject: [PATCH] Auto-detect number of threads --- mainapp.cpp | 14 +++++++++++--- mainapp.h | 1 + threaddata.cpp | 17 ++++++++++++++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/mainapp.cpp b/mainapp.cpp index 36afe2a..5d49326 100644 --- a/mainapp.cpp +++ b/mainapp.cpp @@ -4,6 +4,7 @@ #include "getopt.h" #include #include +#include #include #include @@ -11,7 +12,6 @@ #include "logger.h" #define MAX_EVENTS 1024 -#define NR_OF_THREADS 4 #define VERSION "0.1" @@ -148,12 +148,20 @@ void do_thread_work(ThreadData *threadData) MainApp::MainApp(const std::string &configFilePath) : subscriptionStore(new SubscriptionStore()) { + this->num_threads = get_nprocs(); + + if (num_threads <= 0) + throw std::runtime_error("Invalid number of CPUs: " + std::to_string(num_threads)); + epollFdAccept = check(epoll_create(999)); taskEventFd = eventfd(0, EFD_NONBLOCK); confFileParser.reset(new ConfigFileParser(configFilePath)); loadConfig(); + // TODO: override in conf possibility. + logger->logf(LOG_NOTICE, "%d CPUs are detected, making as many threads.", num_threads); + auto f = std::bind(&MainApp::queueCleanup, this); timer.addCallback(f, 86400000, "session expiration"); } @@ -342,7 +350,7 @@ void MainApp::start() ev.events = EPOLLIN; check(epoll_ctl(this->epollFdAccept, EPOLL_CTL_ADD, taskEventFd, &ev)); - for (int i = 0; i < NR_OF_THREADS; i++) + for (int i = 0; i < num_threads; i++) { std::shared_ptr t(new ThreadData(i, subscriptionStore, *confFileParser.get())); t->start(&do_thread_work); @@ -373,7 +381,7 @@ void MainApp::start() { if (cur_fd == listen_fd_plain || cur_fd == listen_fd_ssl) { - std::shared_ptr thread_data = threads[next_thread_index++ % NR_OF_THREADS]; + std::shared_ptr thread_data = threads[next_thread_index++ % num_threads]; logger->logf(LOG_INFO, "Accepting connection on thread %d", thread_data->threadnr); diff --git a/mainapp.h b/mainapp.h index 6b1077a..413ca7d 100644 --- a/mainapp.h +++ b/mainapp.h @@ -24,6 +24,7 @@ class MainApp { static MainApp *instance; + int num_threads = 0; bool started = false; bool running = true; diff --git a/threaddata.cpp b/threaddata.cpp index d9d3a5d..d4d7c9c 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -21,7 +21,22 @@ void ThreadData::start(thread_f f) std::ostringstream threadName; threadName << "FlashMQ T " << threadnr; threadName.flush(); - pthread_setname_np(native, threadName.str().c_str()); + const char *c_str = threadName.str().c_str(); + pthread_setname_np(native, c_str); + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(threadnr, &cpuset); + check(pthread_setaffinity_np(native, sizeof(cpuset), &cpuset)); + + // It's not really necessary to get affinity again, but now I'm logging truth instead assumption. + check(pthread_getaffinity_np(native, sizeof(cpuset), &cpuset)); + int pinned_cpu = -1; + for (int j = 0; j < CPU_SETSIZE; j++) + if (CPU_ISSET(j, &cpuset)) + pinned_cpu = j; + + logger->logf(LOG_NOTICE, "Thread '%s' pinned to CPU %d", c_str, pinned_cpu); } void ThreadData::quit() -- libgit2 0.21.4