diff --git b/.gitignore a/.gitignore new file mode 100644 index 0000000..8a9d35c --- /dev/null +++ a/.gitignore @@ -0,0 +1 @@ +*.user diff --git b/CMakeLists.txt a/CMakeLists.txt new file mode 100644 index 0000000..9b61a2d --- /dev/null +++ a/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.5) + +project(FlashMQ LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + + +add_executable(FlashMQ main.cpp utils.cpp threaddata.cpp) + +target_link_libraries(FlashMQ pthread) diff --git b/main.cpp a/main.cpp new file mode 100644 index 0000000..b611ec7 --- /dev/null +++ a/main.cpp @@ -0,0 +1,98 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utils.h" +#include "threaddata.h" + +#define MAX_EVENTS 1024 +#define NR_OF_THREADS 4 + + + +void do_thread_work(ThreadData &threadData) +{ + +} + +int main() +{ + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + + int optval = 1; + check(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval, sizeof(optval)) < 0); + + int flags = fcntl(listen_fd, F_GETFL); + check(fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK ) < 0); + + struct sockaddr_in in_addr; + in_addr.sin_family = AF_INET; + in_addr.sin_addr.s_addr = INADDR_ANY; + in_addr.sin_port = htons(1883); + + check(bind(listen_fd, (struct sockaddr *)(&in_addr), sizeof(struct sockaddr_in)) < 0); + check(listen(listen_fd, 1024) < 0); + + int epoll_fd_accept = check(epoll_create(999)); + + struct epoll_event events[MAX_EVENTS]; + struct epoll_event ev; + memset(&ev, 0, sizeof (struct epoll_event)); + memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS); + + ev.data.fd = listen_fd; + ev.events = EPOLLIN; + check(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev) < 0); + + std::vector threads; + + for (int i = 0; i < NR_OF_THREADS; i++) + { + ThreadData t(i); + std::thread thread(do_thread_work, std::ref(t)); + t.thread = std::move(thread); + threads.push_back(std::move(t)); + } + + std::cout << "Listening..." << std::endl; + + uint next_thread_index = 0; + + while (1) + { + int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100); + + for (int i = 0; i < num_fds; i++) + { + int cur_fd = events[i].data.fd; + if (cur_fd == listen_fd) + { + ThreadData &thread_data = threads[next_thread_index++ % NR_OF_THREADS]; + + std::cout << "Accepting connection on thread " << thread_data.threadnr << std::endl; + + struct sockaddr addr; + memset(&addr, 0, sizeof(struct sockaddr)); + socklen_t len = sizeof(struct sockaddr); + int fd = check(accept(cur_fd, &addr, &len)); + + // TODO: make client + + thread_data.giveFdToEpoll(fd); + } + else + { + throw std::runtime_error("The main thread had activity on an accepted socket?"); + } + + } + + } + + return 0; +} diff --git b/threaddata.cpp a/threaddata.cpp new file mode 100644 index 0000000..9e1ce51 --- /dev/null +++ a/threaddata.cpp @@ -0,0 +1,18 @@ +#include "threaddata.h" + + +ThreadData::ThreadData(int threadnr) : + threadnr(threadnr) +{ + epollfd = check(epoll_create(999)); + event_fd = eventfd(0, EFD_NONBLOCK); +} + +void ThreadData::giveFdToEpoll(int fd) +{ + struct epoll_event ev; + memset(&ev, 0, sizeof (struct epoll_event)); + ev.data.fd = fd; + ev.events = EPOLLIN; + check(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)); +} diff --git b/threaddata.h a/threaddata.h new file mode 100644 index 0000000..bc210f3 --- /dev/null +++ a/threaddata.h @@ -0,0 +1,22 @@ +#ifndef THREADDATA_H +#define THREADDATA_H + +#include +#include "utils.h" +#include +#include + +class ThreadData +{ +public: + std::thread thread; + int threadnr = 0; + int epollfd = 0; + int event_fd = 0; + + ThreadData(int threadnr); + + void giveFdToEpoll(int fd); +}; + +#endif // THREADDATA_H diff --git b/utils.cpp a/utils.cpp new file mode 100644 index 0000000..67cfb5a --- /dev/null +++ a/utils.cpp @@ -0,0 +1,3 @@ +#include "utils.h" + + diff --git b/utils.h a/utils.h new file mode 100644 index 0000000..c6018dc --- /dev/null +++ a/utils.h @@ -0,0 +1,20 @@ +#ifndef UTILS_H +#define UTILS_H + +#include +#include +#include + +template int check(int rc) +{ + if (rc < 0) + { + char *err = strerror(errno); + std::string msg(err); + throw T(msg); + } + + return rc; +} + +#endif // UTILS_H