From 10270cc1e38ed86a2898c8f77e6ee8bab52bfce9 Mon Sep 17 00:00:00 2001 From: Isaac T Date: Fri, 24 Apr 2026 03:24:17 -0400 Subject: [PATCH 1/6] fix: drain gRPC receive stream in multi-publish Adds a background goroutine to continuously consume messages from the gRPC receive stream. This prevents flow control buffers from filling up on the server side, ensuring that pushed traces or responses do not eventually block the publisher's Send operations. --- grpc_p2p_client/cmd/multi-publish/main.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/grpc_p2p_client/cmd/multi-publish/main.go b/grpc_p2p_client/cmd/multi-publish/main.go index f59e048..ab895fe 100644 --- a/grpc_p2p_client/cmd/multi-publish/main.go +++ b/grpc_p2p_client/cmd/multi-publish/main.go @@ -130,6 +130,18 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data return fmt.Errorf("[%s] ListenCommands failed: %w", ip, err) } + // Drain stream.Recv() to prevent gRPC flow control buffers from filling up. + // If we don't drain this, any pushed traces/responses from the server + // will eventually block the server's stream sending, which in turn blocks us. + go func() { + for { + _, err := stream.Recv() + if err != nil { + return + } + } + }() + println(fmt.Sprintf("Connected to node at: %s…", ip)) for i := 0; i < *count; i++ { From 0190473fc5dfd5e1a53319520087d399688e9e4c Mon Sep 17 00:00:00 2001 From: Isaac T Date: Mon, 27 Apr 2026 11:11:19 -0400 Subject: [PATCH 2/6] feat: add batch support for controlled message bursts --- grpc_p2p_client/cmd/multi-publish/main.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/grpc_p2p_client/cmd/multi-publish/main.go b/grpc_p2p_client/cmd/multi-publish/main.go index ab895fe..e069eac 100644 --- a/grpc_p2p_client/cmd/multi-publish/main.go +++ b/grpc_p2p_client/cmd/multi-publish/main.go @@ -33,6 +33,7 @@ var ( startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") endIdx = flag.Int("end-index", 10000, "index-1") output = flag.String("output", "", "file to write the outgoing data hashes") + batch = flag.Int("batch", 1, "number of messages to send per batch before sleeping") ) func main() { @@ -178,13 +179,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } fmt.Printf("[%s] published %d bytes to %q (took %v)\n", ip, len(data), *topic, elapsed) - if *poisson { - lambda := 1.0 / (*sleep).Seconds() - interval := mathrand.ExpFloat64() / lambda - waitTime := time.Duration(interval * float64(time.Second)) - time.Sleep(waitTime) - } else { - time.Sleep(*sleep) + if (i+1)%(*batch) == 0 || i == *count-1 { + if *poisson { + lambda := 1.0 / (*sleep).Seconds() + interval := mathrand.ExpFloat64() / lambda + waitTime := time.Duration(interval * float64(time.Second)) + time.Sleep(waitTime) + } else { + time.Sleep(*sleep) + } } } From 022d783f91b5d010b5313762ff16a0e0552e601c Mon Sep 17 00:00:00 2001 From: Isaac T Date: Tue, 28 Apr 2026 16:49:26 -0400 Subject: [PATCH 3/6] feat: add script to analyze p50/95/99 latency --- grpc_p2p_client/analyze_latency.py | 196 +++++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 grpc_p2p_client/analyze_latency.py diff --git a/grpc_p2p_client/analyze_latency.py b/grpc_p2p_client/analyze_latency.py new file mode 100644 index 0000000..6b3b753 --- /dev/null +++ b/grpc_p2p_client/analyze_latency.py @@ -0,0 +1,196 @@ +import sys +import argparse +import os +import shutil +import re +from datetime import datetime +from collections import defaultdict + +def get_latest_file_by_mtime(directory, base_filename): + """Find the most recently modified file in a directory matching the base pattern.""" + if not os.path.exists(directory): + return None + + filename_part, ext = os.path.splitext(base_filename) + # Match: filename-DATETIME.ext (YYYYMMDD_HHMMSS) + pattern = re.compile(rf"^{re.escape(filename_part)}-(\d{{8}}_\d{{6}}){re.escape(ext)}$") + + latest_file = None + latest_mtime = -1 + + for f in os.listdir(directory): + if pattern.match(f): + full_path = os.path.join(directory, f) + mtime = os.path.getmtime(full_path) + if mtime > latest_mtime: + latest_mtime = mtime + latest_file = full_path + + return latest_file + +def percentile(data, p): + if not data: + return 0.0 + data = sorted(data) + n = len(data) + + # Find the "ideal fractional index" for the target percentile in the array + k = (n - 1) * (p / 100.0) + f = int(k) + c = int(k) + 1 if int(k) + 1 < n else f + + # If the calculated index is exactly an integer, simply return the value at that index + if f == c: + return data[f] + + # 5. Linear Interpolation + # If k is calculated as 4.3 (falling between index 4 and index 5) + # Use weighted average: (value at index 4 with 70% weight) + (value at index 5 with 30% weight) + d0 = data[f] * (c - k) + d1 = data[c] * (k - f) + return d0 + d1 + +def main(): + parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency") + parser.add_argument("--file", default="incoming-trace.tsv", help="Path to trace file") + parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to skip (warm-up)") + parser.add_argument("--msg-size", type=str, help="Message size for archiving (e.g., 900)") + args = parser.parse_args() + + target_file = args.file + + # --- Archiving Logic --- + if args.msg_size: + archive_dir = os.path.join("analyze", f"msg_size_{args.msg_size}") + + # Scenario A: Local file exists -> Archive it with timestamp + if os.path.exists(args.file): + if not os.path.exists(archive_dir): + os.makedirs(archive_dir) + print(f"Created archive directory: {archive_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename, ext = os.path.splitext(os.path.basename(args.file)) + new_path = os.path.join(archive_dir, f"{filename}-{timestamp}{ext}") + + # Move file + shutil.move(args.file, new_path) + print(f"Archived '{args.file}' to '{new_path}'") + target_file = new_path + + # Scenario B: Local file missing -> Look for latest by mtime in archive + else: + latest = get_latest_file_by_mtime(archive_dir, os.path.basename(args.file)) + if latest: + print(f"Local file '{args.file}' not found. Using most recent archived file: {latest}") + target_file = latest + else: + print(f"Error: Local file '{args.file}' not found and no archived files in '{archive_dir}'.") + sys.exit(1) + # ---------------- + + if not os.path.exists(target_file): + print(f"Error: File '{target_file}' not found.") + sys.exit(1) + + print(f"Reading data from {target_file}...\n") + + # Dictionary to group timestamps by MSG_ID + # msg_id -> list of timestamps + msg_timestamps = defaultdict(list) + + try: + with open(target_file, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + + parts = line.split('\t') + if len(parts) >= 6: + event = parts[0] + # We only care about DELIVER_MESSAGE + if event == "DELIVER_MESSAGE": + msg_id = parts[3] + try: + # timestamp is the 6th column (index 5) + timestamp = int(parts[5]) + msg_timestamps[msg_id].append(timestamp) + except ValueError: + pass + except Exception as e: + print(f"Error reading file: {e}") + return + + if not msg_timestamps: + print("No DELIVER_MESSAGE events found (or file is empty/invalid).") + return + + # Calculate statistics for each MsgID + results = [] + for msg_id, t_list in msg_timestamps.items(): + if not t_list: + continue + + t_start = min(t_list) + t_end = max(t_list) + latency_ns = t_end - t_start + latency_ms = latency_ns / 1_000_000.0 + node_count = len(t_list) # Simplified, assumes distinct peer trace if node counts are unique + + results.append({ + "msg_id": msg_id, + "t_start": t_start, + "latency_ms": latency_ms, + "node_count": node_count + }) + + # 3. Sort by t_start (chronological order) + results = sorted(results, key=lambda x: x["t_start"]) + total_msgs = len(results) + + print(f"Discovered a total of {total_msgs} distinct test messages.") + + # 4. Skip warm-up messages + if total_msgs > args.skip: + valid_msgs = results[args.skip:] + print(f"Discarding the first {args.skip} warm-up messages. Using the remaining {len(valid_msgs)} messages for statistical analysis.\n") + else: + print(f"Warning: Total messages ({total_msgs}) is less than or equal to the number of skipped warm-up messages ({args.skip}). Using all messages.") + valid_msgs = results + + # 5. Calculate overall P2P latency statistics (Temporal Distribution) + if not valid_msgs: + return + + latencies = [x["latency_ms"] for x in valid_msgs] + latencies = [x["latency_ms"] for x in valid_msgs] + + mean_latency = sum(latencies) / len(latencies) + max_latency = max(latencies) + p50 = percentile(latencies, 50) + p90 = percentile(latencies, 90) + p95 = percentile(latencies, 95) + p99 = percentile(latencies, 99) + + print("\n=== P2P Network Propagation Latency Summary ===") + print(f"Sample Size (after filter) : {len(latencies)} messages") + print(f"Mean Latency : {mean_latency:.2f} ms") + print(f"P50 Latency (Median) : {p50:.2f} ms") + print(f"P90 Latency : {p90:.2f} ms") + print(f"P95 Latency : {p95:.2f} ms") + print(f"P99 Latency : {p99:.2f} ms") + print(f"Maximum Latency : {max_latency:.2f} ms") + print("===============================================") + + # Display top 5 outliers + print("\n--- Top 5 Outlier Messages (Highest Latency) ---") + # Sort messages by latency descending + outliers = sorted(valid_msgs, key=lambda x: x['latency_ms'], reverse=True)[:5] + for i, m in enumerate(outliers): + print(f"{i+1}. Latency: {m['latency_ms']:.2f} ms | MsgID: {m['msg_id']}") + +if __name__ == "__main__": + main() + +# run it with `python3 analyze_latency.py --file incoming-trace.tsv --skip 5` \ No newline at end of file From 787e1b6464bc3e7687f5b772bcb10b1ce5c424f3 Mon Sep 17 00:00:00 2001 From: Isaac T Date: Wed, 6 May 2026 14:31:45 -0400 Subject: [PATCH 4/6] feat(p2p): containerize load test for kubernetes cronjob - Add Dockerfile with multi-stage build to compile Go binaries for Linux - Create entrypoint.sh driven by environment variables with graceful shutdown - cronjob.yaml template using ConfigMap to inject node IPs dynamically --- grpc_p2p_client/k8_cron_job/Dockerfile | 35 ++++ .../k8_cron_job/analyze_latency.py | 150 ++++++++++++++++++ grpc_p2p_client/k8_cron_job/build_docker.sh | 24 +++ grpc_p2p_client/k8_cron_job/cronjob.yaml | 45 ++++++ grpc_p2p_client/k8_cron_job/entrypoint.sh | 92 +++++++++++ 5 files changed, 346 insertions(+) create mode 100644 grpc_p2p_client/k8_cron_job/Dockerfile create mode 100644 grpc_p2p_client/k8_cron_job/analyze_latency.py create mode 100755 grpc_p2p_client/k8_cron_job/build_docker.sh create mode 100644 grpc_p2p_client/k8_cron_job/cronjob.yaml create mode 100755 grpc_p2p_client/k8_cron_job/entrypoint.sh diff --git a/grpc_p2p_client/k8_cron_job/Dockerfile b/grpc_p2p_client/k8_cron_job/Dockerfile new file mode 100644 index 0000000..3c0b2aa --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/Dockerfile @@ -0,0 +1,35 @@ +# ============================================================================== +# Stage 1: Build the Go binaries +# ============================================================================== +FROM golang:latest AS builder + +WORKDIR /src + +# Copy go mod and sum files first to leverage Docker cache +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +# Build the Linux binaries +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /p2p-multi-subscribe ./cmd/multi-subscribe +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /p2p-multi-publish ./cmd/multi-publish + +# ============================================================================== +# Stage 2: Build the final Python image +# ============================================================================== +FROM python:3.10-slim + +WORKDIR /app + +COPY k8_cron_job/entrypoint.sh k8_cron_job/analyze_latency.py ./ +RUN chmod +x entrypoint.sh + +# The P2P node IP list will be mounted dynamically via K8s ConfigMap at runtime +# (Do NOT copy it here so you don't have to rebuild the image when IPs change) + +# Copy the compiled Go binaries from the builder stage +COPY --from=builder /p2p-multi-subscribe /p2p-multi-publish ./ + +# Set the default command when the container starts +ENTRYPOINT ["./entrypoint.sh"] diff --git a/grpc_p2p_client/k8_cron_job/analyze_latency.py b/grpc_p2p_client/k8_cron_job/analyze_latency.py new file mode 100644 index 0000000..b8c9c44 --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/analyze_latency.py @@ -0,0 +1,150 @@ +import sys +import argparse +import os +from collections import defaultdict + +def percentile(data, p): + """ + Calculate the precise percentile of a given data list. + Args: + data (list): A list of numerical values (latencies). + p (float): The target percentile (e.g., 99 for P99). + Returns: + float: The calculated percentile value. + """ + if not data: + return 0.0 + data = sorted(data) + n = len(data) + + # Calculate the ideal fractional index for the target percentile + k = (n - 1) * (p / 100.0) + f = int(k) + c = int(k) + 1 if int(k) + 1 < n else f + + # If the index is exactly an integer, return the value at that index + if f == c: + return data[f] + + # Linear interpolation for values falling between indices + d0 = data[f] * (c - k) + d1 = data[c] * (k - f) + return d0 + d1 + +def main(): + # Setup command-line argument parsing + parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency (K8s version)") + parser.add_argument("--file", default="incoming-trace.tsv", help="Path to the trace file generated by the subscriber") + parser.add_argument("--data-file", help="Path to data file (ignored in this stripped-down K8s version)") + parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to discard to account for warm-up time") + parser.add_argument("--msg-size", type=str, help="Message payload size used for labeling the output report") + args = parser.parse_args() + + target_file = args.file + + # Ensure the trace file exists before attempting to read + if not os.path.exists(target_file): + print(f"Error: File '{target_file}' not found.") + sys.exit(1) + + print(f"Reading data from {target_file}...\n") + + # msg_timestamps will map a message ID (msg_id) to a list of delivery timestamps + msg_timestamps = defaultdict(list) + + try: + # Parse the trace file line by line + with open(target_file, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + + parts = line.split('\t') + # Ensure the line has enough columns + if len(parts) >= 6: + event = parts[0] + # We are only interested in 'DELIVER_MESSAGE' events for latency calculation + if event == "DELIVER_MESSAGE": + msg_id = parts[3] + try: + # The delivery timestamp (in nanoseconds) is expected in the 6th column + timestamp = int(parts[5]) + msg_timestamps[msg_id].append(timestamp) + except ValueError: + # Ignore rows where the timestamp is not a valid integer + pass + except Exception as e: + print(f"Error reading file: {e}") + return + + if not msg_timestamps: + print("No DELIVER_MESSAGE events found (or file is empty/invalid).") + return + + # Process timestamps to calculate latency for each distinct message + results = [] + for msg_id, t_list in msg_timestamps.items(): + if not t_list: + continue + + # Calculate propagation delay: Latest delivery time minus earliest delivery time + t_start = min(t_list) + t_end = max(t_list) + latency_ns = t_end - t_start + latency_ms = latency_ns / 1_000_000.0 # Convert nanoseconds to milliseconds + node_count = len(t_list) + + results.append({ + "msg_id": msg_id, + "t_start": t_start, + "latency_ms": latency_ms, + "node_count": node_count + }) + + # Sort the results chronologically based on when the message was first seen + results = sorted(results, key=lambda x: x["t_start"]) + total_msgs = len(results) + + print(f"Discovered a total of {total_msgs} distinct test messages.") + + # Discard warm-up messages to avoid skewed results from initial network discovery + if total_msgs > args.skip: + valid_msgs = results[args.skip:] + print(f"Discarding the first {args.skip} warm-up messages. Using the remaining {len(valid_msgs)} messages for statistical analysis.\n") + else: + print(f"Warning: Total messages ({total_msgs}) is less than or equal to the number of skipped warm-up messages ({args.skip}). Using all messages.") + valid_msgs = results + + if not valid_msgs: + return + + latencies = [x["latency_ms"] for x in valid_msgs] + + mean_latency = sum(latencies) / len(latencies) + max_latency = max(latencies) + p50 = percentile(latencies, 50) + p90 = percentile(latencies, 90) + p95 = percentile(latencies, 95) + p99 = percentile(latencies, 99) + + print("\n=== P2P Network Propagation Latency Summary ===") + if args.msg_size: + print(f"Message Size (Payload) : {args.msg_size} bytes") + print(f"Sample Size (after filter) : {len(latencies)} messages") + print(f"Mean Latency : {mean_latency:.2f} ms") + print(f"P50 Latency (Median) : {p50:.2f} ms") + print(f"P90 Latency : {p90:.2f} ms") + print(f"P95 Latency : {p95:.2f} ms") + print(f"P99 Latency : {p99:.2f} ms") + print(f"Maximum Latency : {max_latency:.2f} ms") + print("===============================================") + + # Display the top 5 worst-performing messages + print("\n--- Top 5 Outlier Messages (Highest Latency) ---") + outliers = sorted(valid_msgs, key=lambda x: x['latency_ms'], reverse=True)[:5] + for i, m in enumerate(outliers): + print(f"{i+1}. Latency: {m['latency_ms']:.2f} ms | MsgID: {m['msg_id']}") + +if __name__ == "__main__": + main() diff --git a/grpc_p2p_client/k8_cron_job/build_docker.sh b/grpc_p2p_client/k8_cron_job/build_docker.sh new file mode 100755 index 0000000..9926767 --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/build_docker.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Navigate to the parent directory (grpc_p2p_client) where the source files are located +cd "$(dirname "$0")/.." + +echo "=========================================================" +echo "Building P2P Load Test Docker Image for Kubernetes" +echo "=========================================================" + +echo ">>> Building Docker image (mump2p-load-test:latest) using Multi-Stage Build..." + +echo ">>> Building Docker image (mump2p-load-test:latest)..." +# Build the docker image using the parent directory as the build context +docker build -f k8_cron_job/Dockerfile -t mump2p-load-test:latest . + +echo ">>> Build completed successfully!" +echo ">>> You can now tag and push the image to your container registry." +echo "Example:" +echo " docker tag mump2p-load-test:latest your-registry.com/mump2p-load-test:latest" +echo " docker push your-registry.com/mump2p-load-test:latest" +echo "=========================================================" diff --git a/grpc_p2p_client/k8_cron_job/cronjob.yaml b/grpc_p2p_client/k8_cron_job/cronjob.yaml new file mode 100644 index 0000000..c954b3d --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/cronjob.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: mump2p-load-test + namespace: default +spec: + # Schedule to run, e.g., every day at midnight. Adjust as needed. + schedule: "0 0 * * *" + concurrencyPolicy: Forbid + jobTemplate: + spec: + template: + spec: + containers: + - name: mump2p-load-test + image: mump2p-load-test:latest + # Always pull the latest image if tag is latest + imagePullPolicy: Always + # Environment variables configuration for entrypoint.sh + env: + - name: TOPIC + value: "bnb-bench" # Required! + - name: BATCH + value: "1" + - name: SLEEP + value: "450ms" + - name: DATASIZE + value: "80000" + - name: COUNT + value: "4000" + - name: START_IDX + value: "0" + - name: END_IDX + value: "1" + - name: IPFILE + value: "/app/config/ip.p2pnode.bnb.tsv" + volumeMounts: + - name: p2p-ips + mountPath: /app/config + readOnly: true + restartPolicy: OnFailure + volumes: + - name: p2p-ips + configMap: + name: p2p-ip-config diff --git a/grpc_p2p_client/k8_cron_job/entrypoint.sh b/grpc_p2p_client/k8_cron_job/entrypoint.sh new file mode 100755 index 0000000..b97fa5c --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/entrypoint.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +# ============================================================================== +# P2P Load Test Kubernetes Entrypoint +# ============================================================================== + +# 1. Accept Environment Variables (with defaults) +BATCH=${BATCH:-1} +SLEEP=${SLEEP:-"450ms"} +DATASIZE=${DATASIZE:-80000} +COUNT=${COUNT:-4000} +IPFILE=${IPFILE:-"ip.p2pnode.bnb.tsv"} +START_IDX=${START_IDX:-0} +END_IDX=${END_IDX:-1} + +# 2. Check if TOPIC is provided (required) +if [ -z "$TOPIC" ]; then + echo "Error: TOPIC environment variable is required." + exit 1 +fi + +# 3. Check if IP file exists and is not empty +if [ ! -s "$IPFILE" ]; then + echo "Error: IP file '$IPFILE' is missing or empty." + exit 1 +fi + +echo "=========================================================" +echo "Starting P2P Load Test (Kubernetes Mode)" +echo "Configuration: Topic=$TOPIC, Batch=$BATCH, Sleep=$SLEEP, DataSize=$DATASIZE, Count=$COUNT" +echo "IP File: $IPFILE (Start: $START_IDX, End: $END_IDX)" +echo "=========================================================" + +TRACE_FILE="incoming-trace.tsv" +DATA_FILE="incoming-data.tsv" +SUB_LOG="subscriber.log" + +# --- Start Subscriber --- +echo "[$(date '+%H:%M:%S')] Starting subscriber (Background)..." +./p2p-multi-subscribe \ + -topic "$TOPIC" \ + -ipfile "$IPFILE" \ + -output-trace "$TRACE_FILE" \ + -output-data "$DATA_FILE" > "$SUB_LOG" 2>&1 & + +SUB_PID=$! + +# Register cleanup function for interruption +trap 'echo "Terminating..."; kill $SUB_PID 2>/dev/null; wait $SUB_PID 2>/dev/null; exit 1' INT TERM + +# Wait for subscriber to establish connections +echo "[$(date '+%H:%M:%S')] Waiting 5s for subscriber to settle..." +sleep 5 + +# --- Start Publisher --- +echo "[$(date '+%H:%M:%S')] Starting publisher (Foreground)..." +./p2p-multi-publish \ + -batch "$BATCH" \ + -sleep "$SLEEP" \ + -datasize "$DATASIZE" \ + -count "$COUNT" \ + -ipfile "$IPFILE" \ + -start-index "$START_IDX" \ + -end-index "$END_IDX" \ + -topic "$TOPIC" + +# --- Post-Test Propagation --- +echo "[$(date '+%H:%M:%S')] Publisher finished. Waiting 5s for late messages..." +sleep 5 + +# --- Stop Subscriber --- +echo "[$(date '+%H:%M:%S')] Shutting down subscriber (PID: $SUB_PID)..." +kill $SUB_PID 2>/dev/null +wait $SUB_PID 2>/dev/null +trap - INT TERM + +# --- Analysis --- +echo "[$(date '+%H:%M:%S')] Running latency analysis..." +if [ -f "$TRACE_FILE" ]; then + echo -e "\n--- TEST RESULTS ---" + python3 analyze_latency.py --file "$TRACE_FILE" --data-file "$DATA_FILE" --msg-size "$DATASIZE" +else + echo "Error: $TRACE_FILE not found. Testing might have failed." + echo "=== SUBSCRIBER LOGS ===" + cat "$SUB_LOG" 2>/dev/null || echo "No subscriber logs found." + echo "=======================" + exit 1 +fi + +echo "=========================================================" +echo "Test completed successfully." +echo "=========================================================" From 0068d489bce94de6666f1e9dff408e9c1214e405 Mon Sep 17 00:00:00 2001 From: Isaac T Date: Wed, 6 May 2026 14:43:16 -0400 Subject: [PATCH 5/6] feat(k8s): support multi-architecture Docker builds - Use TARGETOS and TARGETARCH args in Dockerfile to compile Go binaries dynamically - Add --platform=$BUILDPLATFORM to the Go builder stage for faster cross-compilation --- grpc_p2p_client/k8_cron_job/Dockerfile | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/grpc_p2p_client/k8_cron_job/Dockerfile b/grpc_p2p_client/k8_cron_job/Dockerfile index 3c0b2aa..226d876 100644 --- a/grpc_p2p_client/k8_cron_job/Dockerfile +++ b/grpc_p2p_client/k8_cron_job/Dockerfile @@ -1,7 +1,10 @@ # ============================================================================== # Stage 1: Build the Go binaries # ============================================================================== -FROM golang:latest AS builder +FROM --platform=$BUILDPLATFORM golang:latest AS builder + +ARG TARGETOS +ARG TARGETARCH WORKDIR /src @@ -11,9 +14,9 @@ RUN go mod download COPY . . -# Build the Linux binaries -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /p2p-multi-subscribe ./cmd/multi-subscribe -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /p2p-multi-publish ./cmd/multi-publish +# Build the Linux binaries dynamically based on the target platform +RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-subscribe ./cmd/multi-subscribe +RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-publish ./cmd/multi-publish # ============================================================================== # Stage 2: Build the final Python image From a7632b1a2e79ab06a9f2d9b86186e678233846e2 Mon Sep 17 00:00:00 2001 From: Isaac T Date: Wed, 6 May 2026 14:51:38 -0400 Subject: [PATCH 6/6] script for local test --- .../{ => local_script}/analyze_latency.py | 12 +- grpc_p2p_client/local_script/run_test.sh | 148 ++++++++++++++++++ 2 files changed, 159 insertions(+), 1 deletion(-) rename grpc_p2p_client/{ => local_script}/analyze_latency.py (91%) create mode 100755 grpc_p2p_client/local_script/run_test.sh diff --git a/grpc_p2p_client/analyze_latency.py b/grpc_p2p_client/local_script/analyze_latency.py similarity index 91% rename from grpc_p2p_client/analyze_latency.py rename to grpc_p2p_client/local_script/analyze_latency.py index 6b3b753..8a3c9c9 100644 --- a/grpc_p2p_client/analyze_latency.py +++ b/grpc_p2p_client/local_script/analyze_latency.py @@ -53,6 +53,7 @@ def percentile(data, p): def main(): parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency") parser.add_argument("--file", default="incoming-trace.tsv", help="Path to trace file") + parser.add_argument("--data-file", help="Path to data file (optional, will be archived along with trace)") parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to skip (warm-up)") parser.add_argument("--msg-size", type=str, help="Message size for archiving (e.g., 900)") args = parser.parse_args() @@ -73,10 +74,17 @@ def main(): filename, ext = os.path.splitext(os.path.basename(args.file)) new_path = os.path.join(archive_dir, f"{filename}-{timestamp}{ext}") - # Move file + # Move trace file shutil.move(args.file, new_path) print(f"Archived '{args.file}' to '{new_path}'") target_file = new_path + + # Also move data file if provided + if args.data_file and os.path.exists(args.data_file): + data_filename, data_ext = os.path.splitext(os.path.basename(args.data_file)) + new_data_path = os.path.join(archive_dir, f"{data_filename}-{timestamp}{data_ext}") + shutil.move(args.data_file, new_data_path) + print(f"Archived '{args.data_file}' to '{new_data_path}'") # Scenario B: Local file missing -> Look for latest by mtime in archive else: @@ -174,6 +182,8 @@ def main(): p99 = percentile(latencies, 99) print("\n=== P2P Network Propagation Latency Summary ===") + if args.msg_size: + print(f"Message Size (Payload) : {args.msg_size} bytes") print(f"Sample Size (after filter) : {len(latencies)} messages") print(f"Mean Latency : {mean_latency:.2f} ms") print(f"P50 Latency (Median) : {p50:.2f} ms") diff --git a/grpc_p2p_client/local_script/run_test.sh b/grpc_p2p_client/local_script/run_test.sh new file mode 100755 index 0000000..195b0bd --- /dev/null +++ b/grpc_p2p_client/local_script/run_test.sh @@ -0,0 +1,148 @@ +#!/bin/bash + +# ============================================================================== +# P2P Load Test Automation Script +# +# This script automates: +# 1. Cleaning up old data +# 2. Starting the subscriber in the background +# 3. Running the publisher in the foreground +# 4. Terminating the subscriber +# 5. Running the analysis script +# 6. Archiving all results +# ============================================================================== + +# Default Configuration +BATCH=1 +SLEEP="500ms" +DATASIZE=80000 +COUNT=1200 +TOPIC="mybnbtest" +IPFILE="ip.p2pnode.bnb.tsv" +START_IDX=0 +END_IDX=1 +RUNS=1 +WAIT_TIME="5m" + +# Usage helper +usage() { + echo "Usage: ./run_test.sh [options]" + echo "Options:" + echo " -batch (default: $BATCH)" + echo " -sleep (default: $SLEEP)" + echo " -datasize (default: $DATASIZE)" + echo " -count (default: $COUNT)" + echo " -topic (default: $TOPIC)" + echo " -ipfile (default: $IPFILE)" + echo " -start-index (default: $START_IDX)" + echo " -end-index (default: $END_IDX)" + echo " -runs (default: $RUNS)" + echo " -wait (default: $WAIT_TIME)" + exit 1 +} + +# Parse Arguments +while [[ $# -gt 0 ]]; do + case $1 in + -batch) BATCH="$2"; shift 2 ;; + -sleep) SLEEP="$2"; shift 2 ;; + -datasize) DATASIZE="$2"; shift 2 ;; + -count) COUNT="$2"; shift 2 ;; + -topic) TOPIC="$2"; shift 2 ;; + -ipfile) IPFILE="$2"; shift 2 ;; + -start-index) START_IDX="$2"; shift 2 ;; + -end-index) END_IDX="$2"; shift 2 ;; + -runs) RUNS="$2"; shift 2 ;; + -wait) WAIT_TIME="$2"; shift 2 ;; + -h|--help) usage ;; + *) echo "Unknown option: $1"; usage ;; + esac +done + +TRACE_FILE="incoming-trace.tsv" +DATA_FILE="incoming-data.tsv" +SUB_LOG="subscriber.log" +SUMMARY_FILE="test_summary.log" + +# Clear summary file at the start +echo "P2P Load Test Summary - $(date)" > "$SUMMARY_FILE" +echo "Configuration: Batch=$BATCH, Sleep=$SLEEP, DataSize=$DATASIZE, Count=$COUNT, Runs=$RUNS, Wait=$WAIT_TIME" >> "$SUMMARY_FILE" +echo "---------------------------------------------------------" >> "$SUMMARY_FILE" + +for (( r=1; r<=RUNS; r++ )); do + echo "=========================================================" + echo ">>> Starting Run $r of $RUNS" + echo "=========================================================" + + # --- Cleanup and Preparation --- + echo "[$(date '+%H:%M:%S')] Cleaning up old test data..." + rm -f "$TRACE_FILE" "$DATA_FILE" "$SUB_LOG" + + # --- Start Subscriber --- + echo "[$(date '+%H:%M:%S')] Starting subscriber (Background)..." + ./p2p-multi-subscribe \ + -topic "$TOPIC" \ + -ipfile "$IPFILE" \ + -output-trace "$TRACE_FILE" \ + -output-data "$DATA_FILE" > "$SUB_LOG" 2>&1 & + + SUB_PID=$! + + # Register cleanup function for interruption + trap 'echo "Terminating..."; kill $SUB_PID 2>/dev/null; wait $SUB_PID 2>/dev/null; exit 1' INT TERM + + # Wait for subscriber to establish connections + echo "[$(date '+%H:%M:%S')] Waiting 5s for subscriber to settle..." + sleep 5 + + # --- Start Publisher --- + echo "[$(date '+%H:%M:%S')] Starting publisher (Foreground)..." + ./p2p-multi-publish \ + -batch "$BATCH" \ + -sleep "$SLEEP" \ + -datasize "$DATASIZE" \ + -count "$COUNT" \ + -ipfile "$IPFILE" \ + -start-index "$START_IDX" \ + -end-index "$END_IDX" \ + -topic "$TOPIC" + + # --- Post-Test Propagation --- + echo "[$(date '+%H:%M:%S')] Publisher finished. Waiting 5s for late messages..." + sleep 5 + + # --- Stop Subscriber for this run --- + echo "[$(date '+%H:%M:%S')] Shutting down subscriber (PID: $SUB_PID)..." + kill $SUB_PID 2>/dev/null + wait $SUB_PID 2>/dev/null + trap - INT TERM + + # --- Analysis --- + echo "[$(date '+%H:%M:%S')] Running latency analysis..." + if [ -f "$TRACE_FILE" ]; then + echo -e "\n--- RUN $r RESULTS ---" >> "$SUMMARY_FILE" + python3 analyze_latency.py --file "$TRACE_FILE" --data-file "$DATA_FILE" --msg-size "$DATASIZE" | tee -a "$SUMMARY_FILE" + else + echo "Error: $TRACE_FILE not found. Testing might have failed." + echo "=== SUBSCRIBER LOGS ===" + cat "$SUB_LOG" 2>/dev/null || echo "No subscriber logs found." + echo "=======================" + echo "Run $r failed - no trace file" >> "$SUMMARY_FILE" + fi + + if [ "$r" -lt "$RUNS" ]; then + echo "[$(date '+%H:%M:%S')] Run $r complete. Waiting $WAIT_TIME before next run..." + sleep "$WAIT_TIME" + fi +done + +# Archive the summary file as well +ARCHIVE_DIR="analyze/msg_size_${DATASIZE}" +TIMESTAMP=$(date +%Y%m%d_%H%M%S) +cp "$SUMMARY_FILE" "$ARCHIVE_DIR/summary-${TIMESTAMP}.log" + +echo "=========================================================" +echo "All $RUNS runs completed successfully." +echo "Full summary available in: $SUMMARY_FILE" +echo "Summary archived to: $ARCHIVE_DIR/summary-${TIMESTAMP}.log" +echo "========================================================="