Commit 19f21bf21e1e1a99b4709aacc54d62bc79c256a9

Authored by Wiebe Cazemier
0 parents

bare start

.gitignore 0 → 100644
  1 +++ a/.gitignore
  1 +*.user
CMakeLists.txt 0 → 100644
  1 +++ a/CMakeLists.txt
  1 +cmake_minimum_required(VERSION 3.5)
  2 +
  3 +project(FlashMQ LANGUAGES CXX)
  4 +
  5 +set(CMAKE_CXX_STANDARD 11)
  6 +set(CMAKE_CXX_STANDARD_REQUIRED ON)
  7 +
  8 +
  9 +add_executable(FlashMQ main.cpp utils.cpp threaddata.cpp)
  10 +
  11 +target_link_libraries(FlashMQ pthread)
main.cpp 0 → 100644
  1 +++ a/main.cpp
  1 +#include <iostream>
  2 +#include <sys/socket.h>
  3 +#include <stdexcept>
  4 +#include <netinet/in.h>
  5 +#include <sys/epoll.h>
  6 +#include <fcntl.h>
  7 +#include <thread>
  8 +#include <vector>
  9 +
  10 +#include "utils.h"
  11 +#include "threaddata.h"
  12 +
  13 +#define MAX_EVENTS 1024
  14 +#define NR_OF_THREADS 4
  15 +
  16 +
  17 +
  18 +void do_thread_work(ThreadData &threadData)
  19 +{
  20 +
  21 +}
  22 +
  23 +int main()
  24 +{
  25 + int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  26 +
  27 + int optval = 1;
  28 + check<std::runtime_error>(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval, sizeof(optval)) < 0);
  29 +
  30 + int flags = fcntl(listen_fd, F_GETFL);
  31 + check<std::runtime_error>(fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK ) < 0);
  32 +
  33 + struct sockaddr_in in_addr;
  34 + in_addr.sin_family = AF_INET;
  35 + in_addr.sin_addr.s_addr = INADDR_ANY;
  36 + in_addr.sin_port = htons(1883);
  37 +
  38 + check<std::runtime_error>(bind(listen_fd, (struct sockaddr *)(&in_addr), sizeof(struct sockaddr_in)) < 0);
  39 + check<std::runtime_error>(listen(listen_fd, 1024) < 0);
  40 +
  41 + int epoll_fd_accept = check<std::runtime_error>(epoll_create(999));
  42 +
  43 + struct epoll_event events[MAX_EVENTS];
  44 + struct epoll_event ev;
  45 + memset(&ev, 0, sizeof (struct epoll_event));
  46 + memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
  47 +
  48 + ev.data.fd = listen_fd;
  49 + ev.events = EPOLLIN;
  50 + check<std::runtime_error>(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev) < 0);
  51 +
  52 + std::vector<ThreadData> threads;
  53 +
  54 + for (int i = 0; i < NR_OF_THREADS; i++)
  55 + {
  56 + ThreadData t(i);
  57 + std::thread thread(do_thread_work, std::ref(t));
  58 + t.thread = std::move(thread);
  59 + threads.push_back(std::move(t));
  60 + }
  61 +
  62 + std::cout << "Listening..." << std::endl;
  63 +
  64 + uint next_thread_index = 0;
  65 +
  66 + while (1)
  67 + {
  68 + int num_fds = epoll_wait(epoll_fd_accept, events, MAX_EVENTS, 100);
  69 +
  70 + for (int i = 0; i < num_fds; i++)
  71 + {
  72 + int cur_fd = events[i].data.fd;
  73 + if (cur_fd == listen_fd)
  74 + {
  75 + ThreadData &thread_data = threads[next_thread_index++ % NR_OF_THREADS];
  76 +
  77 + std::cout << "Accepting connection on thread " << thread_data.threadnr << std::endl;
  78 +
  79 + struct sockaddr addr;
  80 + memset(&addr, 0, sizeof(struct sockaddr));
  81 + socklen_t len = sizeof(struct sockaddr);
  82 + int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len));
  83 +
  84 + // TODO: make client
  85 +
  86 + thread_data.giveFdToEpoll(fd);
  87 + }
  88 + else
  89 + {
  90 + throw std::runtime_error("The main thread had activity on an accepted socket?");
  91 + }
  92 +
  93 + }
  94 +
  95 + }
  96 +
  97 + return 0;
  98 +}
threaddata.cpp 0 → 100644
  1 +++ a/threaddata.cpp
  1 +#include "threaddata.h"
  2 +
  3 +
  4 +ThreadData::ThreadData(int threadnr) :
  5 + threadnr(threadnr)
  6 +{
  7 + epollfd = check<std::runtime_error>(epoll_create(999));
  8 + event_fd = eventfd(0, EFD_NONBLOCK);
  9 +}
  10 +
  11 +void ThreadData::giveFdToEpoll(int fd)
  12 +{
  13 + struct epoll_event ev;
  14 + memset(&ev, 0, sizeof (struct epoll_event));
  15 + ev.data.fd = fd;
  16 + ev.events = EPOLLIN;
  17 + check<std::runtime_error>(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev));
  18 +}
threaddata.h 0 → 100644
  1 +++ a/threaddata.h
  1 +#ifndef THREADDATA_H
  2 +#define THREADDATA_H
  3 +
  4 +#include <thread>
  5 +#include "utils.h"
  6 +#include <sys/epoll.h>
  7 +#include <sys/eventfd.h>
  8 +
  9 +class ThreadData
  10 +{
  11 +public:
  12 + std::thread thread;
  13 + int threadnr = 0;
  14 + int epollfd = 0;
  15 + int event_fd = 0;
  16 +
  17 + ThreadData(int threadnr);
  18 +
  19 + void giveFdToEpoll(int fd);
  20 +};
  21 +
  22 +#endif // THREADDATA_H
utils.cpp 0 → 100644
  1 +++ a/utils.cpp
  1 +#include "utils.h"
  2 +
  3 +
utils.h 0 → 100644
  1 +++ a/utils.h
  1 +#ifndef UTILS_H
  2 +#define UTILS_H
  3 +
  4 +#include <string.h>
  5 +#include <errno.h>
  6 +#include <string>
  7 +
  8 +template<typename T> int check(int rc)
  9 +{
  10 + if (rc < 0)
  11 + {
  12 + char *err = strerror(errno);
  13 + std::string msg(err);
  14 + throw T(msg);
  15 + }
  16 +
  17 + return rc;
  18 +}
  19 +
  20 +#endif // UTILS_H