Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ The challenges will be released weekly and come in different formats: head-to-he

## 🆕 Latest Challenge

**[Week 22 - Ye Ole PubSub Pub](challenges/week22_the_ole_pubsub_pub)**
**[Week 23 - The Queue Cafe](challenges/week23_cafe_queue)**

#### ✅ Completed Challenges

| | | | |
|---|---|---|---|
| Week21 -> [Dennis Ritchie](challenges/week21_dennis_ritchie) | | | |
| Week20 -> [FN(solid)](challenges/week20_fn_solid) | Week19 -> [Shunting Yard](challenges/week19_shunting_yard_algo) | Week18 -> [Pascals Pascal Triangle](challenges/week18_pascals_pascal_triangle) | Week17 -> [Page Peelers](challenges/week17_page_peelers) |
| Week16 -> [Pen and Paper](challenges/week16_pen_and_paper) | Week15 -> [Terminal Exorcism](challenges/week15_terminal_exorcism) | Week14 -> [Now gimme my money](challenges/week14_now_gimme_my_money) | Week13 -> [The Greatest Programming Book Written. Ever.](challenges/week13_greatest_programming_book_ever_written) |
| Week12 -> [Access Granted](challenges/week12_access_granted) | Week11 -> [We're in!](challenges/week11_we_are_in) | Week10 -> [XOR FTW](challenges/week10_xor_ftw) | Week9 -> [We Need To Go Deeper](challenges/week9_inception) |
| Week8 -> [Red Solo Cups](challenges/week8_red_solo_cups) | Week7 -> [A Variable Says What?](challenges/week7_a_variable_says_what) | Week6 -> [Stay Solid Friends](challenges/week6_stay_solid_friends) | Week5 -> [Swappin Aint Easy](challenges/week5_swappin_aint_easy) |
| Week4 -> [Wilt Chamberlain's 100 Point Game](challenges/week4_wiltchamberlains_100_point_game) | Week3 -> [This Math Aint Math'n](challenges/week3_this_math_aint_mathin) | Week2 -> [Join The Conga Line](challenges/week2_the_conga_line) | Week1 -> [Gossip Spreads](challenges/week1_gossip_spreads) |
| [#22 - Ye Ole PubSub Pub](challenges/week22_the_ole_pubsub_pub) | [#21 - Dennis Ritchie](challenges/week21_dennis_ritchie) | | |
| [#20 - FN(solid)](challenges/week20_fn_solid) | [#19 - Shunting Yard](challenges/week19_shunting_yard_algo) | [#18 - Pascals Pascal Triangle](challenges/week18_pascals_pascal_triangle) | [#17 - Page Peelers](challenges/week17_page_peelers) |
| [#16 - Pen and Paper](challenges/week16_pen_and_paper) | [#15 - Terminal Exorcism](challenges/week15_terminal_exorcism) | [#14 - Now gimme my money](challenges/week14_now_gimme_my_money) | [#13 - The Greatest Programming Book Written. Ever.](challenges/week13_greatest_programming_book_ever_written) |
| [#12 - Access Granted](challenges/week12_access_granted) | [#11 - We're in!](challenges/week11_we_are_in) | [#10 - XOR FTW](challenges/week10_xor_ftw) | [#9 - We Need To Go Deeper](challenges/week9_inception) |
| [#8 - Red Solo Cups](challenges/week8_red_solo_cups) | [#7 - A Variable Says What?](challenges/week7_a_variable_says_what) | [#6 - Stay Solid Friends](challenges/week6_stay_solid_friends) | [#5 - Swappin Aint Easy](challenges/week5_swappin_aint_easy) |
| [#4 - Wilt Chamberlain's 100 Point Game](challenges/week4_wiltchamberlains_100_point_game) | [#3 - This Math Aint Math'n](challenges/week3_this_math_aint_mathin) | [#2 - Join The Conga Line](challenges/week2_the_conga_line) | [#1 - Gossip Spreads](challenges/week1_gossip_spreads) |

---

Expand Down
58 changes: 58 additions & 0 deletions challenges/week23_cafe_queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Week 23: The "Queue Cafe"

Two weeks ago we revisited some easy yet historically significant functions in C (shoutout to Dennis Ritchie). One of them —
```c
while ((c = getchar()) != EOF) putchar(c);
```
---

## 📝 Background

* In a Pub/Sub model:
* Messages are broadcast to all subscribers.
* Every subscriber sees every message.

* In a Queue model:
* Messages are consumed by exactly one subscriber.
* Multiple subscribers may exist, but each message should only be delivered once.

Your job is to make that change.

---

## 🎯 The Challenge

Modify the code so that:

1) Single consumer per message
* When a publisher sends a message, only one subscriber should receive it.
* If multiple subscribers exist, they should fairly share the work (round-robin or first-come-first-served).

---

## ✅ Example

With 2 subscribers and 4 published messages:

* Pub/Sub (current)
* Sub A receives: 1,2,3,4
* Sub B receives: 1,2,3,4
* Queue (goal)
* Sub A receives: 1,3
* Sub B receives: 2,4

🔥 Bonus Ideas

1) Persistence (optional extension)
* Store published messages in a file so the queue can survive restarts.
* Use the line number (or file offset) to track consumer progress.
2) Offset management (optional extension)
* Each subscriber should keep track of where it is in the queue.
* On reconnect, a subscriber should resume from its last offset.

---

## 🛠 Sample Implementations
Inside the `solutions/` directory:

[![C](https://img.shields.io/badge/C-17-blue?logo=c)](solutions)
102 changes: 102 additions & 0 deletions challenges/week23_cafe_queue/solutions/broker.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// broker.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/select.h>

#define PORT 9000
#define MAX_CLIENTS 10
#define LOG "stream.txt"

/**
* Working off the C solution in week1's challenge this function was added
* to append messages to a log for streaming/persistance
*/
int writeToFile(const char *filename, const char *content) {
FILE *filePointer;
filePointer = fopen(filename, "a");

if (filePointer == NULL) {
perror("Error opening file");
return -1;
}

fprintf(filePointer, "%s", content);
fclose(filePointer);
return 0;
}

int main() {
int server_fd, new_socket, client_socks[MAX_CLIENTS];
struct sockaddr_in address;
fd_set readfds;
char buffer[1024];
int max_sd, sd, activity, valread, addrlen = sizeof(address);

// init client sockets list
for (int i = 0; i < MAX_CLIENTS; i++) client_socks[i] = 0;

server_fd = socket(AF_INET, SOCK_STREAM, 0);
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);

bind(server_fd, (struct sockaddr*)&address, sizeof(address));
listen(server_fd, 3);

printf("Broker listening on port %d\n", PORT);

int current_worker = 0;
while (1) {
FD_ZERO(&readfds);
FD_SET(server_fd, &readfds);
max_sd = server_fd;

for (int i = 0; i < MAX_CLIENTS; i++) {
sd = client_socks[i];
if (sd > 0) FD_SET(sd, &readfds);
if (sd > max_sd) max_sd = sd;
}

activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);

if (FD_ISSET(server_fd, &readfds)) {
new_socket = accept(server_fd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_socks[i] == 0) {
client_socks[i] = new_socket;
break;
}
}
printf("New connection\n");
}

for (int i = 0; i < MAX_CLIENTS; i++) {
sd = client_socks[i];
if (FD_ISSET(sd, &readfds)) {
valread = read(sd, buffer, sizeof(buffer));
if (valread <= 0) {
close(sd);
client_socks[i] = 0;
} else {
buffer[valread] = '\0';
// This is the main change ... instead of fan out, send to 1 worker
int attempts = 0; // gotta send to someone
while ( attempts < MAX_CLIENTS){
if (client_socks[current_worker] > 0 && client_socks[current_worker] != sd) {
send(client_socks[current_worker], buffer, valread, 0);
writeToFile(LOG, buffer);
break;
}
current_worker = (current_worker + 1) % MAX_CLIENTS;
attempts++;
}
current_worker = (current_worker + 1) % MAX_CLIENTS;
}
}
}
}
return 0;
}
42 changes: 42 additions & 0 deletions challenges/week23_cafe_queue/solutions/pub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// producer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>

#define PORT 9000

int main() {
int sock;
struct sockaddr_in server_addr;
char buffer[1024];

// create socket
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(1);
}

server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

// connect to broker
if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("connect failed");
exit(1);
}

printf("Connected to broker at 127.0.0.1:%d\n", PORT);
printf("Type messages, Ctrl+D to quit\n");

// read stdin, send lines
while (fgets(buffer, sizeof(buffer), stdin) != NULL) {
send(sock, buffer, strlen(buffer), 0);
}

close(sock);
return 0;
}

68 changes: 68 additions & 0 deletions challenges/week23_cafe_queue/solutions/sub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// consumer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>

#define PORT 9000
#define LOG "stream.txt"

int replayPastMessages(const char *filename) {
FILE *filePointer;
char buffer[256];
filePointer = fopen(filename, "r");

if (filePointer == NULL) {
perror("Error opening file");
return -1;
}

while (fgets(buffer, sizeof(buffer), filePointer) != NULL) {
printf("%s", buffer);
}

fclose(filePointer);
}

int main() {
int sock;
struct sockaddr_in server_addr;
char buffer[1024];
int valread;

// create socket
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(1);
}

server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

// connect to broker
if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("connect failed");
exit(1);
}

printf("Connected to broker at 127.0.0.1:%d\n", PORT);


printf("Replaying past messages...\n");
replayPastMessages(LOG);

printf("Waiting for messages...\n");

// read from broker until connection closes
while ((valread = read(sock, buffer, sizeof(buffer)-1)) > 0) {
buffer[valread] = '\0';
printf("Message: %s", buffer);
fflush(stdout);
}

printf("\nDisconnected from broker\n");
close(sock);
return 0;
}