A high-performance, event-driven message broker implemented in C, following a publish/subscribe architecture. The system facilitates communication between UDP publishers and TCP subscribers through a central server that routes messages based on topic matching, including wildcard support.
The system comprises three components:
- Broker Server — accepts connections from both UDP publishers and TCP subscribers, maintains a client database, and routes messages to the appropriate subscribers.
- TCP Subscriber — connects to the broker, issues subscribe/unsubscribe commands, and receives messages for its registered topics.
- UDP Publisher — sends topic-tagged data payloads to the broker for distribution.
TCP is a stream-based protocol and may fragment packets into multiple segments. To address this, all TCP communication is framed using a custom application layer that guarantees complete message delivery before processing.
int recv_message(int sockfd, void *buf, int len) {
int total = 0;
while (total < len) {
int r = recv(sockfd, (char *)buf + total, len - total, 0);
if (r <= 0) return -1;
total += r;
}
return 0;
}
int send_message(int sockfd, void *buf, int len) {
int total = 0;
while (total < len) {
int sent = send(sockfd, buf + total, len - total, 0);
if (sent <= 0) return -1;
total += sent;
}
return 0;
}All messages conform to one of two fixed-size structures:
// Server -> TCP Subscriber
struct __attribute__((packed)) server_message {
char udp_address[50]; // Source UDP client address (IP:port).
char topic[MAX_TOPIC_LEN + 1]; // Message topic.
uint8_t type; // Data type identifier.
char data[1501]; // Payload data.
};
// TCP Subscriber -> Server
struct __attribute__((packed)) client_message {
uint8_t type; // Command type: 0 = subscribe, 1 = unsubscribe, 2 = disconnect.
char topic[MAX_TOPIC_LEN + 1]; // Target topic (ignored for disconnect).
};The server monitors multiple file descriptors concurrently using epoll, which provides O(1) event detection via an efficient kernel-managed data structure. This makes the server suitable for handling a large number of simultaneous connections without performance degradation — unlike poll, which has O(n) complexity as it must iterate over all monitored descriptors on each call.
The server monitors:
- The TCP listening socket — for incoming subscriber connections.
- The UDP socket — for incoming publisher messages.
- Standard input — for operator commands (currently:
exit). - Each connected TCP client socket — for subscribe/unsubscribe/disconnect commands.
struct __attribute__((packed)) server_status {
int epoll_fd; // epoll instance file descriptor.
int tcp_socket; // TCP listening socket.
int udp_socket; // UDP socket.
struct epoll_event ev; // Reusable event structure for epoll_ctl calls.
struct epoll_event events[MAX_EVENTS]; // Buffer for events returned by epoll_wait.
};
struct __attribute__((packed)) server_database {
struct client_t *clients; // Dynamically allocated array of client records.
int number_of_clients; // Current number of tracked clients.
int capacity; // Current allocated capacity (doubles on overflow).
};When a TCP subscriber connects, the server:
- Accepts the connection and reads the client's ID.
- Checks the database for an existing record with that ID:
- If found and currently connected: rejects the duplicate connection and closes the socket.
- If found and disconnected: restores the session (retaining subscriptions) with the new file descriptor.
- If not found: creates a new client record and inserts it into the database.
- Adds the new socket to the epoll instance for monitoring.
Client records are never deleted — they persist in the database to support reconnection with state restoration. The database vector doubles in capacity when full, ensuring amortized O(1) insertion.
When a UDP datagram is received:
- The server constructs a
server_messagecontaining the topic, payload, data type, and source address. - It iterates over all tracked clients and, for each client, checks whether any of their subscriptions match the incoming topic.
- All matching connected clients receive the message.
Topic matching is implemented using dynamic programming. Topics follow a hierarchical structure, with levels separated by /.
Example: precis/ec101/temperature has three levels: precis, ec101, temperature.
The boolean table dp[i][j] is defined as:
dp[i][j]istrueif the firstilevels of the client's subscription pattern match the firstjlevels of the incoming topic.
| Condition | Value |
|---|---|
dp[0][0] |
true — empty pattern matches empty topic |
dp[i][0] |
true if levels 1..i are all * |
| Pattern Level | Rule |
|---|---|
* |
dp[i][j] = dp[i-1][j] || dp[i][j-1] — matches zero or more levels |
+ |
dp[i][j] = dp[i-1][j-1] — matches exactly one level |
| Literal | dp[i][j] = dp[i-1][j-1] && (subscription[i-1] == topic[j-1]) |
The final result is read from dp[num_subscription_levels][num_topic_levels].
The subscriber uses epoll to monitor two file descriptors simultaneously:
- The TCP socket connected to the broker.
- Standard input for user commands.
| Command | Description |
|---|---|
subscribe <topic> |
Registers the client for the specified topic on the server. |
unsubscribe <topic> |
Deregisters the client from the specified topic on the server. |
exit |
Sends a disconnect notification to the server and closes the connection. |
On receiving a message from the server, the client decodes and prints it to standard output.
- Nagle's Algorithm is disabled (
TCP_NODELAY) on all TCP sockets to minimise latency for small, frequent messages. - Buffer overflows are prevented throughout by using
strncpywith explicit size bounds, followed by manual null-termination of all string buffers. - Standard input is set to non-blocking mode on both the server and client to integrate cleanly with the epoll event loop without stalling.
- All header file documentation follows Doxygen conventions.
./server <port>./subscriber <client_id> <server_ip> <server_port>- Linux kernel ≥ 2.5.44 (epoll support required)
- POSIX-compliant C standard library
- GCC or compatible C compiler