Commit d11fd3645257f7af1ea649cbe698ec621cf4603c

Authored by Wiebe Cazemier
1 parent f3110e90

Reading client data works

And I had to fix a threading bug.
client.cpp
1 #include "client.h" 1 #include "client.h"
2 2
3 -Client::Client(int fd, ThreadData &threadData) : 3 +Client::Client(int fd, ThreadData_p threadData) :
4 fd(fd), 4 fd(fd),
5 threadData(threadData) 5 threadData(threadData)
6 { 6 {
7 int flags = fcntl(fd, F_GETFL); 7 int flags = fcntl(fd, F_GETFL);
8 fcntl(fd, F_SETFL, flags | O_NONBLOCK); 8 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  9 + readbuf = (char*)malloc(CLIENT_BUFFER_SIZE);
9 } 10 }
10 11
11 Client::~Client() 12 Client::~Client()
12 { 13 {
13 - epoll_ctl(threadData.epollfd, EPOLL_CTL_DEL, fd, NULL); // NOTE: the last NULL can cause crash on old kernels 14 + epoll_ctl(threadData->epollfd, EPOLL_CTL_DEL, fd, NULL); // NOTE: the last NULL can cause crash on old kernels
14 close(fd); 15 close(fd);
  16 + free(readbuf);
15 } 17 }
  18 +
  19 +// false means any kind of error we want to get rid of the client for.
  20 +bool Client::readFdIntoBuffer()
  21 +{
  22 + int read_size = getMaxWriteSize();
  23 +
  24 + int n;
  25 + while ((n = read(fd, &readbuf[wi], read_size)) != 0)
  26 + {
  27 + if (n < 0)
  28 + {
  29 + if (errno == EINTR)
  30 + continue;
  31 + if (errno == EAGAIN || errno == EWOULDBLOCK)
  32 + break;
  33 + else
  34 + return false;
  35 + }
  36 +
  37 + wi += n;
  38 + size_t bytesUsed = getBufBytesUsed();
  39 +
  40 + // TODO: we need a buffer to keep partial frames in, so/and can we reduce the size of this buffer again periodically?
  41 + if (bytesUsed >= bufsize)
  42 + {
  43 + const size_t newBufSize = bufsize * 2;
  44 + readbuf = (char*)realloc(readbuf, newBufSize);
  45 + bufsize = newBufSize;
  46 + }
  47 +
  48 + wi = wi % bufsize;
  49 + read_size = getMaxWriteSize();
  50 + }
  51 +
  52 + if (n == 0) // client disconnected.
  53 + {
  54 + return false;
  55 + }
  56 +
  57 + return true;
  58 +}
  59 +
  60 +void Client::writeTest()
  61 +{
  62 + char *p = &readbuf[ri];
  63 + size_t max_read = getMaxReadSize();
  64 + ri = (ri + max_read) % bufsize;
  65 + write(fd, p, max_read);
  66 +}
  67 +
  68 +
  69 +
  70 +
  71 +
  72 +
  73 +
  74 +
  75 +
  76 +
  77 +
client.h
@@ -6,21 +6,54 @@ @@ -6,21 +6,54 @@
6 6
7 #include "threaddata.h" 7 #include "threaddata.h"
8 8
  9 +#define CLIENT_BUFFER_SIZE 16
  10 +
9 class ThreadData; 11 class ThreadData;
  12 +typedef std::shared_ptr<ThreadData> ThreadData_p;
10 13
11 class Client 14 class Client
12 { 15 {
13 int fd; 16 int fd;
14 17
15 - // maybe read buffer?  
16 -  
17 - ThreadData &threadData; 18 + char *readbuf = NULL; // With many clients, it may not be smart to keep a (big) buffer around.
  19 + size_t bufsize = CLIENT_BUFFER_SIZE;
  20 + int wi = 0;
  21 + int ri = 0;
  22 +
  23 + ThreadData_p threadData;
  24 +
  25 + size_t getBufBytesUsed()
  26 + {
  27 + size_t result = 0;
  28 + if (wi >= ri)
  29 + result = wi - ri;
  30 + else
  31 + result = (bufsize + wi) - ri;
  32 + };
  33 +
  34 + size_t getMaxWriteSize()
  35 + {
  36 + size_t available = bufsize - getBufBytesUsed();
  37 + size_t space_at_end = bufsize - wi;
  38 + size_t answer = std::min<int>(available, space_at_end);
  39 + return answer;
  40 + }
  41 +
  42 + size_t getMaxReadSize()
  43 + {
  44 + size_t available = getBufBytesUsed();
  45 + size_t space_to_end = bufsize - ri;
  46 + size_t answer = std::min<int>(available, space_to_end);
  47 + return answer;
  48 + }
18 49
19 public: 50 public:
20 - Client(int fd, ThreadData &threadData); 51 + Client(int fd, ThreadData_p threadData);
21 ~Client(); 52 ~Client();
22 53
23 int getFd() { return fd;} 54 int getFd() { return fd;}
  55 + bool readFdIntoBuffer();
  56 + void writeTest();
24 57
25 }; 58 };
26 59
main.cpp
@@ -16,9 +16,9 @@ @@ -16,9 +16,9 @@
16 16
17 17
18 18
19 -void do_thread_work(ThreadData &threadData) 19 +void do_thread_work(ThreadData *threadData)
20 { 20 {
21 - int epoll_fd = threadData.epollfd; 21 + int epoll_fd = threadData->epollfd;
22 22
23 struct epoll_event events[MAX_EVENTS]; 23 struct epoll_event events[MAX_EVENTS];
24 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS); 24 memset(&events, 0, sizeof (struct epoll_event)*MAX_EVENTS);
@@ -34,11 +34,14 @@ void do_thread_work(ThreadData &amp;threadData) @@ -34,11 +34,14 @@ void do_thread_work(ThreadData &amp;threadData)
34 struct epoll_event cur_ev = events[i]; 34 struct epoll_event cur_ev = events[i];
35 int fd = cur_ev.data.fd; 35 int fd = cur_ev.data.fd;
36 36
37 - Client_p client = threadData.getClient(fd); 37 + Client_p client = threadData->getClient(fd);
38 38
39 if (client) // TODO: is this check necessary? 39 if (client) // TODO: is this check necessary?
40 { 40 {
41 - // TODO left here 41 + if (!client->readFdIntoBuffer())
  42 + threadData->removeClient(client);
  43 + client->writeTest();
  44 +
42 } 45 }
43 } 46 }
44 } 47 }
@@ -74,14 +77,14 @@ int main() @@ -74,14 +77,14 @@ int main()
74 ev.events = EPOLLIN; 77 ev.events = EPOLLIN;
75 check<std::runtime_error>(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev) < 0); 78 check<std::runtime_error>(epoll_ctl(epoll_fd_accept, EPOLL_CTL_ADD, listen_fd, &ev) < 0);
76 79
77 - std::vector<ThreadData> threads; 80 + std::vector<std::shared_ptr<ThreadData>> threads;
78 81
79 for (int i = 0; i < NR_OF_THREADS; i++) 82 for (int i = 0; i < NR_OF_THREADS; i++)
80 { 83 {
81 - ThreadData t(i);  
82 - std::thread thread(do_thread_work, std::ref(t));  
83 - t.thread = std::move(thread);  
84 - threads.push_back(std::move(t)); 84 + std::shared_ptr<ThreadData> t(new ThreadData(i));
  85 + std::thread thread(do_thread_work, t.get());
  86 + t->thread = std::move(thread);
  87 + threads.push_back(t);
85 } 88 }
86 89
87 std::cout << "Listening..." << std::endl; 90 std::cout << "Listening..." << std::endl;
@@ -97,9 +100,9 @@ int main() @@ -97,9 +100,9 @@ int main()
97 int cur_fd = events[i].data.fd; 100 int cur_fd = events[i].data.fd;
98 if (cur_fd == listen_fd) 101 if (cur_fd == listen_fd)
99 { 102 {
100 - ThreadData &thread_data = threads[next_thread_index++ % NR_OF_THREADS]; 103 + std::shared_ptr<ThreadData> thread_data = threads[next_thread_index++ % NR_OF_THREADS];
101 104
102 - std::cout << "Accepting connection on thread " << thread_data.threadnr << std::endl; 105 + std::cout << "Accepting connection on thread " << thread_data->threadnr << std::endl;
103 106
104 struct sockaddr addr; 107 struct sockaddr addr;
105 memset(&addr, 0, sizeof(struct sockaddr)); 108 memset(&addr, 0, sizeof(struct sockaddr));
@@ -107,7 +110,7 @@ int main() @@ -107,7 +110,7 @@ int main()
107 int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len)); 110 int fd = check<std::runtime_error>(accept(cur_fd, &addr, &len));
108 111
109 Client_p client(new Client(fd, thread_data)); 112 Client_p client(new Client(fd, thread_data));
110 - thread_data.giveClient(client); 113 + thread_data->giveClient(client);
111 } 114 }
112 else 115 else
113 { 116 {
threaddata.h
@@ -28,4 +28,6 @@ public: @@ -28,4 +28,6 @@ public:
28 void removeClient(Client_p client); 28 void removeClient(Client_p client);
29 }; 29 };
30 30
  31 +typedef std::shared_ptr<ThreadData> ThreadData_p;
  32 +
31 #endif // THREADDATA_H 33 #endif // THREADDATA_H