Skip to content

Cocos44/Broker-Server

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Message Broker Server

A high-performance, event-driven message broker implemented in C, following a publish/subscribe architecture. The system facilitates communication between UDP publishers and TCP subscribers through a central server that routes messages based on topic matching, including wildcard support.


Architecture Overview

The system comprises three components:

  • Broker Server — accepts connections from both UDP publishers and TCP subscribers, maintains a client database, and routes messages to the appropriate subscribers.
  • TCP Subscriber — connects to the broker, issues subscribe/unsubscribe commands, and receives messages for its registered topics.
  • UDP Publisher — sends topic-tagged data payloads to the broker for distribution.

Protocol Design

Application Layer

TCP is a stream-based protocol and may fragment packets into multiple segments. To address this, all TCP communication is framed using a custom application layer that guarantees complete message delivery before processing.

int recv_message(int sockfd, void *buf, int len) {
    int total = 0;
    while (total < len) {
        int r = recv(sockfd, (char *)buf + total, len - total, 0);
        if (r <= 0) return -1;
        total += r;
    }
    return 0;
}

int send_message(int sockfd, void *buf, int len) {
    int total = 0;
    while (total < len) {
        int sent = send(sockfd, buf + total, len - total, 0);
        if (sent <= 0) return -1;
        total += sent;
    }
    return 0;
}

All messages conform to one of two fixed-size structures:

// Server -> TCP Subscriber
struct __attribute__((packed)) server_message {
    char udp_address[50];          // Source UDP client address (IP:port).
    char topic[MAX_TOPIC_LEN + 1]; // Message topic.
    uint8_t type;                  // Data type identifier.
    char data[1501];               // Payload data.
};

// TCP Subscriber -> Server
struct __attribute__((packed)) client_message {
    uint8_t type;                  // Command type: 0 = subscribe, 1 = unsubscribe, 2 = disconnect.
    char topic[MAX_TOPIC_LEN + 1]; // Target topic (ignored for disconnect).
};

Server

I/O Multiplexing with epoll

The server monitors multiple file descriptors concurrently using epoll, which provides O(1) event detection via an efficient kernel-managed data structure. This makes the server suitable for handling a large number of simultaneous connections without performance degradation — unlike poll, which has O(n) complexity as it must iterate over all monitored descriptors on each call.

The server monitors:

  • The TCP listening socket — for incoming subscriber connections.
  • The UDP socket — for incoming publisher messages.
  • Standard input — for operator commands (currently: exit).
  • Each connected TCP client socket — for subscribe/unsubscribe/disconnect commands.

Core Data Structures

struct __attribute__((packed)) server_status {
    int epoll_fd;                      // epoll instance file descriptor.
    int tcp_socket;                    // TCP listening socket.
    int udp_socket;                    // UDP socket.
    struct epoll_event ev;             // Reusable event structure for epoll_ctl calls.
    struct epoll_event events[MAX_EVENTS]; // Buffer for events returned by epoll_wait.
};

struct __attribute__((packed)) server_database {
    struct client_t *clients;          // Dynamically allocated array of client records.
    int number_of_clients;             // Current number of tracked clients.
    int capacity;                      // Current allocated capacity (doubles on overflow).
};

Client Lifecycle

When a TCP subscriber connects, the server:

  1. Accepts the connection and reads the client's ID.
  2. Checks the database for an existing record with that ID:
    • If found and currently connected: rejects the duplicate connection and closes the socket.
    • If found and disconnected: restores the session (retaining subscriptions) with the new file descriptor.
    • If not found: creates a new client record and inserts it into the database.
  3. Adds the new socket to the epoll instance for monitoring.

Client records are never deleted — they persist in the database to support reconnection with state restoration. The database vector doubles in capacity when full, ensuring amortized O(1) insertion.

UDP Message Routing

When a UDP datagram is received:

  1. The server constructs a server_message containing the topic, payload, data type, and source address.
  2. It iterates over all tracked clients and, for each client, checks whether any of their subscriptions match the incoming topic.
  3. All matching connected clients receive the message.

Topic Matching and Wildcard Support

Topic matching is implemented using dynamic programming. Topics follow a hierarchical structure, with levels separated by /.

Example: precis/ec101/temperature has three levels: precis, ec101, temperature.

The boolean table dp[i][j] is defined as:

dp[i][j] is true if the first i levels of the client's subscription pattern match the first j levels of the incoming topic.

Base Cases

Condition Value
dp[0][0] true — empty pattern matches empty topic
dp[i][0] true if levels 1..i are all *

Transition Rules

Pattern Level Rule
* dp[i][j] = dp[i-1][j] || dp[i][j-1] — matches zero or more levels
+ dp[i][j] = dp[i-1][j-1] — matches exactly one level
Literal dp[i][j] = dp[i-1][j-1] && (subscription[i-1] == topic[j-1])

The final result is read from dp[num_subscription_levels][num_topic_levels].


Subscriber Client

The subscriber uses epoll to monitor two file descriptors simultaneously:

  • The TCP socket connected to the broker.
  • Standard input for user commands.

Supported Commands

Command Description
subscribe <topic> Registers the client for the specified topic on the server.
unsubscribe <topic> Deregisters the client from the specified topic on the server.
exit Sends a disconnect notification to the server and closes the connection.

On receiving a message from the server, the client decodes and prints it to standard output.


Implementation Notes

  • Nagle's Algorithm is disabled (TCP_NODELAY) on all TCP sockets to minimise latency for small, frequent messages.
  • Buffer overflows are prevented throughout by using strncpy with explicit size bounds, followed by manual null-termination of all string buffers.
  • Standard input is set to non-blocking mode on both the server and client to integrate cleanly with the epoll event loop without stalling.
  • All header file documentation follows Doxygen conventions.

Building and Running

Server

./server <port>

Subscriber

./subscriber <client_id> <server_ip> <server_port>

Dependencies

  • Linux kernel ≥ 2.5.44 (epoll support required)
  • POSIX-compliant C standard library
  • GCC or compatible C compiler

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors