diff --git a/grpc_p2p_client/cmd/multi-publish/main.go b/grpc_p2p_client/cmd/multi-publish/main.go index f59e048..e069eac 100644 --- a/grpc_p2p_client/cmd/multi-publish/main.go +++ b/grpc_p2p_client/cmd/multi-publish/main.go @@ -33,6 +33,7 @@ var ( startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") endIdx = flag.Int("end-index", 10000, "index-1") output = flag.String("output", "", "file to write the outgoing data hashes") + batch = flag.Int("batch", 1, "number of messages to send per batch before sleeping") ) func main() { @@ -130,6 +131,18 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data return fmt.Errorf("[%s] ListenCommands failed: %w", ip, err) } + // Drain stream.Recv() to prevent gRPC flow control buffers from filling up. + // If we don't drain this, any pushed traces/responses from the server + // will eventually block the server's stream sending, which in turn blocks us. + go func() { + for { + _, err := stream.Recv() + if err != nil { + return + } + } + }() + println(fmt.Sprintf("Connected to node at: %s…", ip)) for i := 0; i < *count; i++ { @@ -166,13 +179,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } fmt.Printf("[%s] published %d bytes to %q (took %v)\n", ip, len(data), *topic, elapsed) - if *poisson { - lambda := 1.0 / (*sleep).Seconds() - interval := mathrand.ExpFloat64() / lambda - waitTime := time.Duration(interval * float64(time.Second)) - time.Sleep(waitTime) - } else { - time.Sleep(*sleep) + if (i+1)%(*batch) == 0 || i == *count-1 { + if *poisson { + lambda := 1.0 / (*sleep).Seconds() + interval := mathrand.ExpFloat64() / lambda + waitTime := time.Duration(interval * float64(time.Second)) + time.Sleep(waitTime) + } else { + time.Sleep(*sleep) + } } } diff --git a/grpc_p2p_client/k8_cron_job/Dockerfile b/grpc_p2p_client/k8_cron_job/Dockerfile new file mode 100644 index 0000000..226d876 --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/Dockerfile @@ -0,0 +1,38 @@ +# ============================================================================== +# Stage 1: Build the Go binaries +# ============================================================================== +FROM --platform=$BUILDPLATFORM golang:latest AS builder + +ARG TARGETOS +ARG TARGETARCH + +WORKDIR /src + +# Copy go mod and sum files first to leverage Docker cache +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +# Build the Linux binaries dynamically based on the target platform +RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-subscribe ./cmd/multi-subscribe +RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-publish ./cmd/multi-publish + +# ============================================================================== +# Stage 2: Build the final Python image +# ============================================================================== +FROM python:3.10-slim + +WORKDIR /app + +COPY k8_cron_job/entrypoint.sh k8_cron_job/analyze_latency.py ./ +RUN chmod +x entrypoint.sh + +# The P2P node IP list will be mounted dynamically via K8s ConfigMap at runtime +# (Do NOT copy it here so you don't have to rebuild the image when IPs change) + +# Copy the compiled Go binaries from the builder stage +COPY --from=builder /p2p-multi-subscribe /p2p-multi-publish ./ + +# Set the default command when the container starts +ENTRYPOINT ["./entrypoint.sh"] diff --git a/grpc_p2p_client/k8_cron_job/analyze_latency.py b/grpc_p2p_client/k8_cron_job/analyze_latency.py new file mode 100644 index 0000000..b8c9c44 --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/analyze_latency.py @@ -0,0 +1,150 @@ +import sys +import argparse +import os +from collections import defaultdict + +def percentile(data, p): + """ + Calculate the precise percentile of a given data list. + Args: + data (list): A list of numerical values (latencies). + p (float): The target percentile (e.g., 99 for P99). + Returns: + float: The calculated percentile value. + """ + if not data: + return 0.0 + data = sorted(data) + n = len(data) + + # Calculate the ideal fractional index for the target percentile + k = (n - 1) * (p / 100.0) + f = int(k) + c = int(k) + 1 if int(k) + 1 < n else f + + # If the index is exactly an integer, return the value at that index + if f == c: + return data[f] + + # Linear interpolation for values falling between indices + d0 = data[f] * (c - k) + d1 = data[c] * (k - f) + return d0 + d1 + +def main(): + # Setup command-line argument parsing + parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency (K8s version)") + parser.add_argument("--file", default="incoming-trace.tsv", help="Path to the trace file generated by the subscriber") + parser.add_argument("--data-file", help="Path to data file (ignored in this stripped-down K8s version)") + parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to discard to account for warm-up time") + parser.add_argument("--msg-size", type=str, help="Message payload size used for labeling the output report") + args = parser.parse_args() + + target_file = args.file + + # Ensure the trace file exists before attempting to read + if not os.path.exists(target_file): + print(f"Error: File '{target_file}' not found.") + sys.exit(1) + + print(f"Reading data from {target_file}...\n") + + # msg_timestamps will map a message ID (msg_id) to a list of delivery timestamps + msg_timestamps = defaultdict(list) + + try: + # Parse the trace file line by line + with open(target_file, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + + parts = line.split('\t') + # Ensure the line has enough columns + if len(parts) >= 6: + event = parts[0] + # We are only interested in 'DELIVER_MESSAGE' events for latency calculation + if event == "DELIVER_MESSAGE": + msg_id = parts[3] + try: + # The delivery timestamp (in nanoseconds) is expected in the 6th column + timestamp = int(parts[5]) + msg_timestamps[msg_id].append(timestamp) + except ValueError: + # Ignore rows where the timestamp is not a valid integer + pass + except Exception as e: + print(f"Error reading file: {e}") + return + + if not msg_timestamps: + print("No DELIVER_MESSAGE events found (or file is empty/invalid).") + return + + # Process timestamps to calculate latency for each distinct message + results = [] + for msg_id, t_list in msg_timestamps.items(): + if not t_list: + continue + + # Calculate propagation delay: Latest delivery time minus earliest delivery time + t_start = min(t_list) + t_end = max(t_list) + latency_ns = t_end - t_start + latency_ms = latency_ns / 1_000_000.0 # Convert nanoseconds to milliseconds + node_count = len(t_list) + + results.append({ + "msg_id": msg_id, + "t_start": t_start, + "latency_ms": latency_ms, + "node_count": node_count + }) + + # Sort the results chronologically based on when the message was first seen + results = sorted(results, key=lambda x: x["t_start"]) + total_msgs = len(results) + + print(f"Discovered a total of {total_msgs} distinct test messages.") + + # Discard warm-up messages to avoid skewed results from initial network discovery + if total_msgs > args.skip: + valid_msgs = results[args.skip:] + print(f"Discarding the first {args.skip} warm-up messages. Using the remaining {len(valid_msgs)} messages for statistical analysis.\n") + else: + print(f"Warning: Total messages ({total_msgs}) is less than or equal to the number of skipped warm-up messages ({args.skip}). Using all messages.") + valid_msgs = results + + if not valid_msgs: + return + + latencies = [x["latency_ms"] for x in valid_msgs] + + mean_latency = sum(latencies) / len(latencies) + max_latency = max(latencies) + p50 = percentile(latencies, 50) + p90 = percentile(latencies, 90) + p95 = percentile(latencies, 95) + p99 = percentile(latencies, 99) + + print("\n=== P2P Network Propagation Latency Summary ===") + if args.msg_size: + print(f"Message Size (Payload) : {args.msg_size} bytes") + print(f"Sample Size (after filter) : {len(latencies)} messages") + print(f"Mean Latency : {mean_latency:.2f} ms") + print(f"P50 Latency (Median) : {p50:.2f} ms") + print(f"P90 Latency : {p90:.2f} ms") + print(f"P95 Latency : {p95:.2f} ms") + print(f"P99 Latency : {p99:.2f} ms") + print(f"Maximum Latency : {max_latency:.2f} ms") + print("===============================================") + + # Display the top 5 worst-performing messages + print("\n--- Top 5 Outlier Messages (Highest Latency) ---") + outliers = sorted(valid_msgs, key=lambda x: x['latency_ms'], reverse=True)[:5] + for i, m in enumerate(outliers): + print(f"{i+1}. Latency: {m['latency_ms']:.2f} ms | MsgID: {m['msg_id']}") + +if __name__ == "__main__": + main() diff --git a/grpc_p2p_client/k8_cron_job/build_docker.sh b/grpc_p2p_client/k8_cron_job/build_docker.sh new file mode 100755 index 0000000..9926767 --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/build_docker.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Navigate to the parent directory (grpc_p2p_client) where the source files are located +cd "$(dirname "$0")/.." + +echo "=========================================================" +echo "Building P2P Load Test Docker Image for Kubernetes" +echo "=========================================================" + +echo ">>> Building Docker image (mump2p-load-test:latest) using Multi-Stage Build..." + +echo ">>> Building Docker image (mump2p-load-test:latest)..." +# Build the docker image using the parent directory as the build context +docker build -f k8_cron_job/Dockerfile -t mump2p-load-test:latest . + +echo ">>> Build completed successfully!" +echo ">>> You can now tag and push the image to your container registry." +echo "Example:" +echo " docker tag mump2p-load-test:latest your-registry.com/mump2p-load-test:latest" +echo " docker push your-registry.com/mump2p-load-test:latest" +echo "=========================================================" diff --git a/grpc_p2p_client/k8_cron_job/cronjob.yaml b/grpc_p2p_client/k8_cron_job/cronjob.yaml new file mode 100644 index 0000000..c954b3d --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/cronjob.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: mump2p-load-test + namespace: default +spec: + # Schedule to run, e.g., every day at midnight. Adjust as needed. + schedule: "0 0 * * *" + concurrencyPolicy: Forbid + jobTemplate: + spec: + template: + spec: + containers: + - name: mump2p-load-test + image: mump2p-load-test:latest + # Always pull the latest image if tag is latest + imagePullPolicy: Always + # Environment variables configuration for entrypoint.sh + env: + - name: TOPIC + value: "bnb-bench" # Required! + - name: BATCH + value: "1" + - name: SLEEP + value: "450ms" + - name: DATASIZE + value: "80000" + - name: COUNT + value: "4000" + - name: START_IDX + value: "0" + - name: END_IDX + value: "1" + - name: IPFILE + value: "/app/config/ip.p2pnode.bnb.tsv" + volumeMounts: + - name: p2p-ips + mountPath: /app/config + readOnly: true + restartPolicy: OnFailure + volumes: + - name: p2p-ips + configMap: + name: p2p-ip-config diff --git a/grpc_p2p_client/k8_cron_job/entrypoint.sh b/grpc_p2p_client/k8_cron_job/entrypoint.sh new file mode 100755 index 0000000..b97fa5c --- /dev/null +++ b/grpc_p2p_client/k8_cron_job/entrypoint.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +# ============================================================================== +# P2P Load Test Kubernetes Entrypoint +# ============================================================================== + +# 1. Accept Environment Variables (with defaults) +BATCH=${BATCH:-1} +SLEEP=${SLEEP:-"450ms"} +DATASIZE=${DATASIZE:-80000} +COUNT=${COUNT:-4000} +IPFILE=${IPFILE:-"ip.p2pnode.bnb.tsv"} +START_IDX=${START_IDX:-0} +END_IDX=${END_IDX:-1} + +# 2. Check if TOPIC is provided (required) +if [ -z "$TOPIC" ]; then + echo "Error: TOPIC environment variable is required." + exit 1 +fi + +# 3. Check if IP file exists and is not empty +if [ ! -s "$IPFILE" ]; then + echo "Error: IP file '$IPFILE' is missing or empty." + exit 1 +fi + +echo "=========================================================" +echo "Starting P2P Load Test (Kubernetes Mode)" +echo "Configuration: Topic=$TOPIC, Batch=$BATCH, Sleep=$SLEEP, DataSize=$DATASIZE, Count=$COUNT" +echo "IP File: $IPFILE (Start: $START_IDX, End: $END_IDX)" +echo "=========================================================" + +TRACE_FILE="incoming-trace.tsv" +DATA_FILE="incoming-data.tsv" +SUB_LOG="subscriber.log" + +# --- Start Subscriber --- +echo "[$(date '+%H:%M:%S')] Starting subscriber (Background)..." +./p2p-multi-subscribe \ + -topic "$TOPIC" \ + -ipfile "$IPFILE" \ + -output-trace "$TRACE_FILE" \ + -output-data "$DATA_FILE" > "$SUB_LOG" 2>&1 & + +SUB_PID=$! + +# Register cleanup function for interruption +trap 'echo "Terminating..."; kill $SUB_PID 2>/dev/null; wait $SUB_PID 2>/dev/null; exit 1' INT TERM + +# Wait for subscriber to establish connections +echo "[$(date '+%H:%M:%S')] Waiting 5s for subscriber to settle..." +sleep 5 + +# --- Start Publisher --- +echo "[$(date '+%H:%M:%S')] Starting publisher (Foreground)..." +./p2p-multi-publish \ + -batch "$BATCH" \ + -sleep "$SLEEP" \ + -datasize "$DATASIZE" \ + -count "$COUNT" \ + -ipfile "$IPFILE" \ + -start-index "$START_IDX" \ + -end-index "$END_IDX" \ + -topic "$TOPIC" + +# --- Post-Test Propagation --- +echo "[$(date '+%H:%M:%S')] Publisher finished. Waiting 5s for late messages..." +sleep 5 + +# --- Stop Subscriber --- +echo "[$(date '+%H:%M:%S')] Shutting down subscriber (PID: $SUB_PID)..." +kill $SUB_PID 2>/dev/null +wait $SUB_PID 2>/dev/null +trap - INT TERM + +# --- Analysis --- +echo "[$(date '+%H:%M:%S')] Running latency analysis..." +if [ -f "$TRACE_FILE" ]; then + echo -e "\n--- TEST RESULTS ---" + python3 analyze_latency.py --file "$TRACE_FILE" --data-file "$DATA_FILE" --msg-size "$DATASIZE" +else + echo "Error: $TRACE_FILE not found. Testing might have failed." + echo "=== SUBSCRIBER LOGS ===" + cat "$SUB_LOG" 2>/dev/null || echo "No subscriber logs found." + echo "=======================" + exit 1 +fi + +echo "=========================================================" +echo "Test completed successfully." +echo "=========================================================" diff --git a/grpc_p2p_client/local_script/analyze_latency.py b/grpc_p2p_client/local_script/analyze_latency.py new file mode 100644 index 0000000..8a3c9c9 --- /dev/null +++ b/grpc_p2p_client/local_script/analyze_latency.py @@ -0,0 +1,206 @@ +import sys +import argparse +import os +import shutil +import re +from datetime import datetime +from collections import defaultdict + +def get_latest_file_by_mtime(directory, base_filename): + """Find the most recently modified file in a directory matching the base pattern.""" + if not os.path.exists(directory): + return None + + filename_part, ext = os.path.splitext(base_filename) + # Match: filename-DATETIME.ext (YYYYMMDD_HHMMSS) + pattern = re.compile(rf"^{re.escape(filename_part)}-(\d{{8}}_\d{{6}}){re.escape(ext)}$") + + latest_file = None + latest_mtime = -1 + + for f in os.listdir(directory): + if pattern.match(f): + full_path = os.path.join(directory, f) + mtime = os.path.getmtime(full_path) + if mtime > latest_mtime: + latest_mtime = mtime + latest_file = full_path + + return latest_file + +def percentile(data, p): + if not data: + return 0.0 + data = sorted(data) + n = len(data) + + # Find the "ideal fractional index" for the target percentile in the array + k = (n - 1) * (p / 100.0) + f = int(k) + c = int(k) + 1 if int(k) + 1 < n else f + + # If the calculated index is exactly an integer, simply return the value at that index + if f == c: + return data[f] + + # 5. Linear Interpolation + # If k is calculated as 4.3 (falling between index 4 and index 5) + # Use weighted average: (value at index 4 with 70% weight) + (value at index 5 with 30% weight) + d0 = data[f] * (c - k) + d1 = data[c] * (k - f) + return d0 + d1 + +def main(): + parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency") + parser.add_argument("--file", default="incoming-trace.tsv", help="Path to trace file") + parser.add_argument("--data-file", help="Path to data file (optional, will be archived along with trace)") + parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to skip (warm-up)") + parser.add_argument("--msg-size", type=str, help="Message size for archiving (e.g., 900)") + args = parser.parse_args() + + target_file = args.file + + # --- Archiving Logic --- + if args.msg_size: + archive_dir = os.path.join("analyze", f"msg_size_{args.msg_size}") + + # Scenario A: Local file exists -> Archive it with timestamp + if os.path.exists(args.file): + if not os.path.exists(archive_dir): + os.makedirs(archive_dir) + print(f"Created archive directory: {archive_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename, ext = os.path.splitext(os.path.basename(args.file)) + new_path = os.path.join(archive_dir, f"{filename}-{timestamp}{ext}") + + # Move trace file + shutil.move(args.file, new_path) + print(f"Archived '{args.file}' to '{new_path}'") + target_file = new_path + + # Also move data file if provided + if args.data_file and os.path.exists(args.data_file): + data_filename, data_ext = os.path.splitext(os.path.basename(args.data_file)) + new_data_path = os.path.join(archive_dir, f"{data_filename}-{timestamp}{data_ext}") + shutil.move(args.data_file, new_data_path) + print(f"Archived '{args.data_file}' to '{new_data_path}'") + + # Scenario B: Local file missing -> Look for latest by mtime in archive + else: + latest = get_latest_file_by_mtime(archive_dir, os.path.basename(args.file)) + if latest: + print(f"Local file '{args.file}' not found. Using most recent archived file: {latest}") + target_file = latest + else: + print(f"Error: Local file '{args.file}' not found and no archived files in '{archive_dir}'.") + sys.exit(1) + # ---------------- + + if not os.path.exists(target_file): + print(f"Error: File '{target_file}' not found.") + sys.exit(1) + + print(f"Reading data from {target_file}...\n") + + # Dictionary to group timestamps by MSG_ID + # msg_id -> list of timestamps + msg_timestamps = defaultdict(list) + + try: + with open(target_file, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + + parts = line.split('\t') + if len(parts) >= 6: + event = parts[0] + # We only care about DELIVER_MESSAGE + if event == "DELIVER_MESSAGE": + msg_id = parts[3] + try: + # timestamp is the 6th column (index 5) + timestamp = int(parts[5]) + msg_timestamps[msg_id].append(timestamp) + except ValueError: + pass + except Exception as e: + print(f"Error reading file: {e}") + return + + if not msg_timestamps: + print("No DELIVER_MESSAGE events found (or file is empty/invalid).") + return + + # Calculate statistics for each MsgID + results = [] + for msg_id, t_list in msg_timestamps.items(): + if not t_list: + continue + + t_start = min(t_list) + t_end = max(t_list) + latency_ns = t_end - t_start + latency_ms = latency_ns / 1_000_000.0 + node_count = len(t_list) # Simplified, assumes distinct peer trace if node counts are unique + + results.append({ + "msg_id": msg_id, + "t_start": t_start, + "latency_ms": latency_ms, + "node_count": node_count + }) + + # 3. Sort by t_start (chronological order) + results = sorted(results, key=lambda x: x["t_start"]) + total_msgs = len(results) + + print(f"Discovered a total of {total_msgs} distinct test messages.") + + # 4. Skip warm-up messages + if total_msgs > args.skip: + valid_msgs = results[args.skip:] + print(f"Discarding the first {args.skip} warm-up messages. Using the remaining {len(valid_msgs)} messages for statistical analysis.\n") + else: + print(f"Warning: Total messages ({total_msgs}) is less than or equal to the number of skipped warm-up messages ({args.skip}). Using all messages.") + valid_msgs = results + + # 5. Calculate overall P2P latency statistics (Temporal Distribution) + if not valid_msgs: + return + + latencies = [x["latency_ms"] for x in valid_msgs] + latencies = [x["latency_ms"] for x in valid_msgs] + + mean_latency = sum(latencies) / len(latencies) + max_latency = max(latencies) + p50 = percentile(latencies, 50) + p90 = percentile(latencies, 90) + p95 = percentile(latencies, 95) + p99 = percentile(latencies, 99) + + print("\n=== P2P Network Propagation Latency Summary ===") + if args.msg_size: + print(f"Message Size (Payload) : {args.msg_size} bytes") + print(f"Sample Size (after filter) : {len(latencies)} messages") + print(f"Mean Latency : {mean_latency:.2f} ms") + print(f"P50 Latency (Median) : {p50:.2f} ms") + print(f"P90 Latency : {p90:.2f} ms") + print(f"P95 Latency : {p95:.2f} ms") + print(f"P99 Latency : {p99:.2f} ms") + print(f"Maximum Latency : {max_latency:.2f} ms") + print("===============================================") + + # Display top 5 outliers + print("\n--- Top 5 Outlier Messages (Highest Latency) ---") + # Sort messages by latency descending + outliers = sorted(valid_msgs, key=lambda x: x['latency_ms'], reverse=True)[:5] + for i, m in enumerate(outliers): + print(f"{i+1}. Latency: {m['latency_ms']:.2f} ms | MsgID: {m['msg_id']}") + +if __name__ == "__main__": + main() + +# run it with `python3 analyze_latency.py --file incoming-trace.tsv --skip 5` \ No newline at end of file diff --git a/grpc_p2p_client/local_script/run_test.sh b/grpc_p2p_client/local_script/run_test.sh new file mode 100755 index 0000000..195b0bd --- /dev/null +++ b/grpc_p2p_client/local_script/run_test.sh @@ -0,0 +1,148 @@ +#!/bin/bash + +# ============================================================================== +# P2P Load Test Automation Script +# +# This script automates: +# 1. Cleaning up old data +# 2. Starting the subscriber in the background +# 3. Running the publisher in the foreground +# 4. Terminating the subscriber +# 5. Running the analysis script +# 6. Archiving all results +# ============================================================================== + +# Default Configuration +BATCH=1 +SLEEP="500ms" +DATASIZE=80000 +COUNT=1200 +TOPIC="mybnbtest" +IPFILE="ip.p2pnode.bnb.tsv" +START_IDX=0 +END_IDX=1 +RUNS=1 +WAIT_TIME="5m" + +# Usage helper +usage() { + echo "Usage: ./run_test.sh [options]" + echo "Options:" + echo " -batch (default: $BATCH)" + echo " -sleep (default: $SLEEP)" + echo " -datasize (default: $DATASIZE)" + echo " -count (default: $COUNT)" + echo " -topic (default: $TOPIC)" + echo " -ipfile (default: $IPFILE)" + echo " -start-index (default: $START_IDX)" + echo " -end-index (default: $END_IDX)" + echo " -runs (default: $RUNS)" + echo " -wait (default: $WAIT_TIME)" + exit 1 +} + +# Parse Arguments +while [[ $# -gt 0 ]]; do + case $1 in + -batch) BATCH="$2"; shift 2 ;; + -sleep) SLEEP="$2"; shift 2 ;; + -datasize) DATASIZE="$2"; shift 2 ;; + -count) COUNT="$2"; shift 2 ;; + -topic) TOPIC="$2"; shift 2 ;; + -ipfile) IPFILE="$2"; shift 2 ;; + -start-index) START_IDX="$2"; shift 2 ;; + -end-index) END_IDX="$2"; shift 2 ;; + -runs) RUNS="$2"; shift 2 ;; + -wait) WAIT_TIME="$2"; shift 2 ;; + -h|--help) usage ;; + *) echo "Unknown option: $1"; usage ;; + esac +done + +TRACE_FILE="incoming-trace.tsv" +DATA_FILE="incoming-data.tsv" +SUB_LOG="subscriber.log" +SUMMARY_FILE="test_summary.log" + +# Clear summary file at the start +echo "P2P Load Test Summary - $(date)" > "$SUMMARY_FILE" +echo "Configuration: Batch=$BATCH, Sleep=$SLEEP, DataSize=$DATASIZE, Count=$COUNT, Runs=$RUNS, Wait=$WAIT_TIME" >> "$SUMMARY_FILE" +echo "---------------------------------------------------------" >> "$SUMMARY_FILE" + +for (( r=1; r<=RUNS; r++ )); do + echo "=========================================================" + echo ">>> Starting Run $r of $RUNS" + echo "=========================================================" + + # --- Cleanup and Preparation --- + echo "[$(date '+%H:%M:%S')] Cleaning up old test data..." + rm -f "$TRACE_FILE" "$DATA_FILE" "$SUB_LOG" + + # --- Start Subscriber --- + echo "[$(date '+%H:%M:%S')] Starting subscriber (Background)..." + ./p2p-multi-subscribe \ + -topic "$TOPIC" \ + -ipfile "$IPFILE" \ + -output-trace "$TRACE_FILE" \ + -output-data "$DATA_FILE" > "$SUB_LOG" 2>&1 & + + SUB_PID=$! + + # Register cleanup function for interruption + trap 'echo "Terminating..."; kill $SUB_PID 2>/dev/null; wait $SUB_PID 2>/dev/null; exit 1' INT TERM + + # Wait for subscriber to establish connections + echo "[$(date '+%H:%M:%S')] Waiting 5s for subscriber to settle..." + sleep 5 + + # --- Start Publisher --- + echo "[$(date '+%H:%M:%S')] Starting publisher (Foreground)..." + ./p2p-multi-publish \ + -batch "$BATCH" \ + -sleep "$SLEEP" \ + -datasize "$DATASIZE" \ + -count "$COUNT" \ + -ipfile "$IPFILE" \ + -start-index "$START_IDX" \ + -end-index "$END_IDX" \ + -topic "$TOPIC" + + # --- Post-Test Propagation --- + echo "[$(date '+%H:%M:%S')] Publisher finished. Waiting 5s for late messages..." + sleep 5 + + # --- Stop Subscriber for this run --- + echo "[$(date '+%H:%M:%S')] Shutting down subscriber (PID: $SUB_PID)..." + kill $SUB_PID 2>/dev/null + wait $SUB_PID 2>/dev/null + trap - INT TERM + + # --- Analysis --- + echo "[$(date '+%H:%M:%S')] Running latency analysis..." + if [ -f "$TRACE_FILE" ]; then + echo -e "\n--- RUN $r RESULTS ---" >> "$SUMMARY_FILE" + python3 analyze_latency.py --file "$TRACE_FILE" --data-file "$DATA_FILE" --msg-size "$DATASIZE" | tee -a "$SUMMARY_FILE" + else + echo "Error: $TRACE_FILE not found. Testing might have failed." + echo "=== SUBSCRIBER LOGS ===" + cat "$SUB_LOG" 2>/dev/null || echo "No subscriber logs found." + echo "=======================" + echo "Run $r failed - no trace file" >> "$SUMMARY_FILE" + fi + + if [ "$r" -lt "$RUNS" ]; then + echo "[$(date '+%H:%M:%S')] Run $r complete. Waiting $WAIT_TIME before next run..." + sleep "$WAIT_TIME" + fi +done + +# Archive the summary file as well +ARCHIVE_DIR="analyze/msg_size_${DATASIZE}" +TIMESTAMP=$(date +%Y%m%d_%H%M%S) +cp "$SUMMARY_FILE" "$ARCHIVE_DIR/summary-${TIMESTAMP}.log" + +echo "=========================================================" +echo "All $RUNS runs completed successfully." +echo "Full summary available in: $SUMMARY_FILE" +echo "Summary archived to: $ARCHIVE_DIR/summary-${TIMESTAMP}.log" +echo "========================================================="