-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_poll.cpp
More file actions
163 lines (129 loc) · 5.31 KB
/
server_poll.cpp
File metadata and controls
163 lines (129 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <thread>
#include <cstring>
#include <sys/select.h>
#include <poll.h>
#include <unordered_set>
#include <vector>
const int CLIENT_LIMIT = 100;
const size_t MAX_BUFFER_SIZE = 4096;
char read_buffer[MAX_BUFFER_SIZE];
char write_buffer[MAX_BUFFER_SIZE];
std::vector<pollfd> v_fds;
int server_socket = -1;
std::queue<std::pair<std::string,int>> message_queue;
std::mutex message_queue_mtx;
std::condition_variable message_notifier;
void server_sender(){
std::unique_lock<std::mutex> queue_lock(message_queue_mtx, std::defer_lock);
while(1){
queue_lock.lock();
if (!message_queue.empty()){
//Since we have a single thread that pops, its safe to assume if we are non empty here, we would be non-empty throughout this thread before we perform pop()
std::string message;
if (message_queue.front().second == 0){
message = "Server: ";
}
message += std::move(message_queue.front().first);
message_queue.pop();
queue_lock.unlock();
for (auto fd: v_fds){
if (fd.events == 0) continue;
if (fd.fd == server_socket || fd.fd == STDIN_FILENO) continue;
send(fd.fd, message.data(), message.size(),0);
}
}else{
message_notifier.wait(queue_lock);
//I'll have the lock aquired again, unlock it
queue_lock.unlock();
}
}
}
int main(){
std::unique_lock<std::mutex> queue_lock(message_queue_mtx, std::defer_lock);
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket < 0){
perror("socket failed");
return -1;
}
v_fds.push_back({server_socket,POLLIN, 0});
v_fds.push_back({STDIN_FILENO,POLLIN, 0});
sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(1100);
if (inet_pton(AF_INET, "0.0.0.0", &server_address.sin_addr) <=0 ){
perror("Failed to convert the given address to a usable binary format");
close(server_socket);
return 0;
}
size_t server_address_size = sizeof(server_address);
//Bind the Socket to the given port
if (bind(server_socket, reinterpret_cast<const sockaddr*>(&server_address), server_address_size) < 0){
perror("Failed to bind the given IP and Port to the server socket");
close(server_socket);
return 0;
}
if (listen(server_socket,CLIENT_LIMIT) < 0){
perror("Failed to listen from the server socket");
close(server_socket);
return 0;
}
std::cout<<"Server Started Listening to Incoming Connections in 0.0.0.0::1100 \n";
//Good Time To Get our server_worker to work
std::thread message_broadcaster(server_sender);
message_broadcaster.detach();
while(1){
if (poll(v_fds.data(), v_fds.size(), 0) < 0){
perror("Select Failed, shutting down the server! \n");
return -1;
}
for (auto &fd_poll_struct: v_fds){
int fd = fd_poll_struct.fd;
if (fd_poll_struct.revents & POLLIN){
if (fd == STDIN_FILENO){
ssize_t bytes_read = read(STDIN_FILENO, write_buffer, MAX_BUFFER_SIZE);
std::string server_message(write_buffer,bytes_read);
//Broadcast this message to all clients
queue_lock.lock();
message_queue.push({server_message,0});
queue_lock.unlock();
message_notifier.notify_all();
continue;
}else if(fd == server_socket){
// Ready for Accepting a Connection!
int client_fd = accept(server_socket, nullptr, nullptr); //Make this Asynchronous!
if (client_fd < 0){
std::cout<<"Failed to Accept a Connection.. Client might have terminated the connection \n";
continue;
}
std::cout<<"Connected to a New Client \n";
v_fds.push_back({client_fd,POLLIN,0});
}else{
size_t bytes_received = read(fd, read_buffer, 1024);
if (bytes_received <= 0){
fd_poll_struct.events = 0;
std::cout<<"Connection Closed by Client";
continue;
}
std::string client_message(read_buffer,bytes_received);
if (client_message == "end"){
//Remove the client from the FD_SET and SET of clients
fd_poll_struct.events = 0;
std::cout<<"Connection Closed by Client";
continue;
}
//Broadcast this message to all clients
queue_lock.lock();
message_queue.push({client_message,1});
queue_lock.unlock();
message_notifier.notify_all();
std::cout<<client_message<<"\n";
}
}
}
}
close(server_socket);
}