diff --git a/client.cpp b/client.cpp index 100a64f..7452dd0 100644 --- a/client.cpp +++ b/client.cpp @@ -1,15 +1,77 @@ #include "client.h" -Client::Client(int fd, ThreadData &threadData) : +Client::Client(int fd, ThreadData_p threadData) : fd(fd), threadData(threadData) { int flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags | O_NONBLOCK); + readbuf = (char*)malloc(CLIENT_BUFFER_SIZE); } Client::~Client() { - epoll_ctl(threadData.epollfd, EPOLL_CTL_DEL, fd, NULL); // NOTE: the last NULL can cause crash on old kernels + epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL); // NOTE: the last NULL can cause crash on old kernels close(fd); + free(readbuf); } + +// false means any kind of error we want to get rid of the client for. +bool Client::readFdIntoBuffer() +{ + int read_size = getMaxWriteSize(); + + int n; + while ((n = read(fd, &readbuf[wi], read_size)) != 0) + { + if (n < 0) + { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + else + return false; + } + + wi += n; + size_t bytesUsed = getBufBytesUsed(); + + // TODO: we need a buffer to keep partial frames in, so/and can we reduce the size of this buffer again periodically? + if (bytesUsed >= bufsize) + { + const size_t newBufSize = bufsize * 2; + readbuf = (char*)realloc(readbuf, newBufSize); + bufsize = newBufSize; + } + + wi = wi % bufsize; + read_size = getMaxWriteSize(); + } + + if (n == 0) // client disconnected. + { + return false; + } + + return true; +} + +void Client::writeTest() +{ + char *p = &readbuf[ri]; + size_t max_read = getMaxReadSize(); + ri = (ri + max_read) % bufsize; + write(fd, p, max_read); +} + + + + + + + + + + + diff --git a/client.h b/client.h index 132bfa7..925c900 100644 --- a/client.h +++ b/client.h @@ -6,21 +6,54 @@ #include "threaddata.h" +#define CLIENT_BUFFER_SIZE 16 + class ThreadData; +typedef std::shared_ptr ThreadData_p; class Client { int fd; - // maybe read buffer? - - ThreadData &threadData; + char *readbuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around. + size_t bufsize = CLIENT_BUFFER_SIZE; + int wi = 0; + int ri = 0; + + ThreadData_p threadData; + + size_t getBufBytesUsed() + { + size_t result = 0; + if (wi >= ri) + result = wi - ri; + else + result = (bufsize + wi) - ri; + }; + + size_t getMaxWriteSize() + { + size_t available = bufsize - getBufBytesUsed(); + size_t space_at_end = bufsize - wi; + size_t answer = std::min(available, space_at_end); + return answer; + } + + size_t getMaxReadSize() + { + size_t available = getBufBytesUsed(); + size_t space_to_end = bufsize - ri; + size_t answer = std::min(available, space_to_end); + return answer; + } public: - Client(int fd, ThreadData &threadData); + Client(int fd, ThreadData_p threadData); ~Client(); int getFd() { return fd;} + bool readFdIntoBuffer(); + void writeTest(); }; diff --git a/main.cpp b/main.cpp index cb32107..4851ea9 100644 --- a/main.cpp +++ b/main.cpp @@ -16,9 +16,9 @@ -void do_thread_work(ThreadData &threadData) +void do_thread_work(ThreadData *threadData) { - int epoll_fd = threadData.epollfd; + int epoll_fd = threadData->epollfd; struct epoll_event events[MAX_EVENTS]; memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS); @@ -34,11 +34,14 @@ void do_thread_work(ThreadData &threadData) struct epoll_event cur_ev = events[i]; int fd = cur_ev.data.fd; - Client_p client = threadData.getClient(fd); + Client_p client = threadData->getClient(fd); if (client) // TODO: is this check necessary? { - // TODO left here + if (!client->readFdIntoBuffer()) + threadData->removeClient(client); + client->writeTest(); + } } } @@ -74,14 +77,14 @@ int main() ev.events = EPOLLIN; check(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev) < 0); - std::vector threads; + std::vector> threads; for (int i = 0; i < NR_OF_THREADS; i++) { - ThreadData t(i); - std::thread thread(do_thread_work, std::ref(t)); - t.thread = std::move(thread); - threads.push_back(std::move(t)); + std::shared_ptr t(new ThreadData(i)); + std::thread thread(do_thread_work, t.get()); + t->thread = std::move(thread); + threads.push_back(t); } std::cout << "Listening..." << std::endl; @@ -97,9 +100,9 @@ int main() int cur_fd = events[i].data.fd; if (cur_fd == listen_fd) { - ThreadData &thread_data = threads[next_thread_index++ % NR_OF_THREADS]; + std::shared_ptr thread_data = threads[next_thread_index++ % NR_OF_THREADS]; - std::cout << "Accepting connection on thread " << thread_data.threadnr << std::endl; + std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl; struct sockaddr addr; memset(&addr, 0, sizeof(struct sockaddr)); @@ -107,7 +110,7 @@ int main() int fd = check(accept(cur_fd, &addr, &len)); Client_p client(new Client(fd, thread_data)); - thread_data.giveClient(client); + thread_data->giveClient(client); } else { diff --git a/threaddata.h b/threaddata.h index 90149c6..8e3f771 100644 --- a/threaddata.h +++ b/threaddata.h @@ -28,4 +28,6 @@ public: void removeClient(Client_p client); }; +typedef std::shared_ptr ThreadData_p; + #endif // THREADDATA_H