Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions grpc_p2p_client/cmd/multi-publish/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0")
endIdx = flag.Int("end-index", 10000, "index-1")
output = flag.String("output", "", "file to write the outgoing data hashes")
batch = flag.Int("batch", 1, "number of messages to send per batch before sleeping")
)

func main() {
Expand Down Expand Up @@ -130,6 +131,18 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
return fmt.Errorf("[%s] ListenCommands failed: %w", ip, err)
}

// Drain stream.Recv() to prevent gRPC flow control buffers from filling up.
// If we don't drain this, any pushed traces/responses from the server
// will eventually block the server's stream sending, which in turn blocks us.
go func() {
for {
_, err := stream.Recv()
if err != nil {
return
}
}
}()

println(fmt.Sprintf("Connected to node at: %s…", ip))

for i := 0; i < *count; i++ {
Expand Down Expand Up @@ -166,13 +179,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
}
fmt.Printf("[%s] published %d bytes to %q (took %v)\n", ip, len(data), *topic, elapsed)

if *poisson {
lambda := 1.0 / (*sleep).Seconds()
interval := mathrand.ExpFloat64() / lambda
waitTime := time.Duration(interval * float64(time.Second))
time.Sleep(waitTime)
} else {
time.Sleep(*sleep)
if (i+1)%(*batch) == 0 || i == *count-1 {
if *poisson {
lambda := 1.0 / (*sleep).Seconds()
interval := mathrand.ExpFloat64() / lambda
waitTime := time.Duration(interval * float64(time.Second))
time.Sleep(waitTime)
} else {
time.Sleep(*sleep)
}
}
}

Expand Down
38 changes: 38 additions & 0 deletions grpc_p2p_client/k8_cron_job/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# ==============================================================================
# Stage 1: Build the Go binaries
# ==============================================================================
FROM --platform=$BUILDPLATFORM golang:latest AS builder

ARG TARGETOS
ARG TARGETARCH

WORKDIR /src

# Copy go mod and sum files first to leverage Docker cache
COPY go.mod go.sum ./
RUN go mod download

COPY . .

# Build the Linux binaries dynamically based on the target platform
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-subscribe ./cmd/multi-subscribe
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /p2p-multi-publish ./cmd/multi-publish

# ==============================================================================
# Stage 2: Build the final Python image
# ==============================================================================
FROM python:3.10-slim

WORKDIR /app

COPY k8_cron_job/entrypoint.sh k8_cron_job/analyze_latency.py ./
RUN chmod +x entrypoint.sh

# The P2P node IP list will be mounted dynamically via K8s ConfigMap at runtime
# (Do NOT copy it here so you don't have to rebuild the image when IPs change)

# Copy the compiled Go binaries from the builder stage
COPY --from=builder /p2p-multi-subscribe /p2p-multi-publish ./

# Set the default command when the container starts
ENTRYPOINT ["./entrypoint.sh"]
150 changes: 150 additions & 0 deletions grpc_p2p_client/k8_cron_job/analyze_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import sys
import argparse
import os
from collections import defaultdict

def percentile(data, p):
"""
Calculate the precise percentile of a given data list.
Args:
data (list): A list of numerical values (latencies).
p (float): The target percentile (e.g., 99 for P99).
Returns:
float: The calculated percentile value.
"""
if not data:
return 0.0
data = sorted(data)
n = len(data)

# Calculate the ideal fractional index for the target percentile
k = (n - 1) * (p / 100.0)
f = int(k)
c = int(k) + 1 if int(k) + 1 < n else f

# If the index is exactly an integer, return the value at that index
if f == c:
return data[f]

# Linear interpolation for values falling between indices
d0 = data[f] * (c - k)
d1 = data[c] * (k - f)
return d0 + d1

def main():
# Setup command-line argument parsing
parser = argparse.ArgumentParser(description="Calculate P2P Propagation Latency (K8s version)")
parser.add_argument("--file", default="incoming-trace.tsv", help="Path to the trace file generated by the subscriber")
parser.add_argument("--data-file", help="Path to data file (ignored in this stripped-down K8s version)")
parser.add_argument("--skip", type=int, default=5, help="Number of initial messages to discard to account for warm-up time")
parser.add_argument("--msg-size", type=str, help="Message payload size used for labeling the output report")
args = parser.parse_args()

target_file = args.file

# Ensure the trace file exists before attempting to read
if not os.path.exists(target_file):
print(f"Error: File '{target_file}' not found.")
sys.exit(1)

print(f"Reading data from {target_file}...\n")

# msg_timestamps will map a message ID (msg_id) to a list of delivery timestamps
msg_timestamps = defaultdict(list)

try:
# Parse the trace file line by line
with open(target_file, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue

parts = line.split('\t')
# Ensure the line has enough columns
if len(parts) >= 6:
event = parts[0]
# We are only interested in 'DELIVER_MESSAGE' events for latency calculation
if event == "DELIVER_MESSAGE":
msg_id = parts[3]
try:
# The delivery timestamp (in nanoseconds) is expected in the 6th column
timestamp = int(parts[5])
msg_timestamps[msg_id].append(timestamp)
except ValueError:
# Ignore rows where the timestamp is not a valid integer
pass
except Exception as e:
print(f"Error reading file: {e}")
return

if not msg_timestamps:
print("No DELIVER_MESSAGE events found (or file is empty/invalid).")
return

# Process timestamps to calculate latency for each distinct message
results = []
for msg_id, t_list in msg_timestamps.items():
if not t_list:
continue

# Calculate propagation delay: Latest delivery time minus earliest delivery time
t_start = min(t_list)
t_end = max(t_list)
latency_ns = t_end - t_start
latency_ms = latency_ns / 1_000_000.0 # Convert nanoseconds to milliseconds
node_count = len(t_list)

results.append({
"msg_id": msg_id,
"t_start": t_start,
"latency_ms": latency_ms,
"node_count": node_count
})

# Sort the results chronologically based on when the message was first seen
results = sorted(results, key=lambda x: x["t_start"])
total_msgs = len(results)

print(f"Discovered a total of {total_msgs} distinct test messages.")

# Discard warm-up messages to avoid skewed results from initial network discovery
if total_msgs > args.skip:
valid_msgs = results[args.skip:]
print(f"Discarding the first {args.skip} warm-up messages. Using the remaining {len(valid_msgs)} messages for statistical analysis.\n")
else:
print(f"Warning: Total messages ({total_msgs}) is less than or equal to the number of skipped warm-up messages ({args.skip}). Using all messages.")
valid_msgs = results

if not valid_msgs:
return

latencies = [x["latency_ms"] for x in valid_msgs]

mean_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
p50 = percentile(latencies, 50)
p90 = percentile(latencies, 90)
p95 = percentile(latencies, 95)
p99 = percentile(latencies, 99)

print("\n=== P2P Network Propagation Latency Summary ===")
if args.msg_size:
print(f"Message Size (Payload) : {args.msg_size} bytes")
print(f"Sample Size (after filter) : {len(latencies)} messages")
print(f"Mean Latency : {mean_latency:.2f} ms")
print(f"P50 Latency (Median) : {p50:.2f} ms")
print(f"P90 Latency : {p90:.2f} ms")
print(f"P95 Latency : {p95:.2f} ms")
print(f"P99 Latency : {p99:.2f} ms")
print(f"Maximum Latency : {max_latency:.2f} ms")
print("===============================================")

# Display the top 5 worst-performing messages
print("\n--- Top 5 Outlier Messages (Highest Latency) ---")
outliers = sorted(valid_msgs, key=lambda x: x['latency_ms'], reverse=True)[:5]
for i, m in enumerate(outliers):
print(f"{i+1}. Latency: {m['latency_ms']:.2f} ms | MsgID: {m['msg_id']}")

if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions grpc_p2p_client/k8_cron_job/build_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

# Exit immediately if a command exits with a non-zero status
set -e

# Navigate to the parent directory (grpc_p2p_client) where the source files are located
cd "$(dirname "$0")/.."

echo "========================================================="
echo "Building P2P Load Test Docker Image for Kubernetes"
echo "========================================================="

echo ">>> Building Docker image (mump2p-load-test:latest) using Multi-Stage Build..."

echo ">>> Building Docker image (mump2p-load-test:latest)..."
# Build the docker image using the parent directory as the build context
docker build -f k8_cron_job/Dockerfile -t mump2p-load-test:latest .

echo ">>> Build completed successfully!"
echo ">>> You can now tag and push the image to your container registry."
echo "Example:"
echo " docker tag mump2p-load-test:latest your-registry.com/mump2p-load-test:latest"
echo " docker push your-registry.com/mump2p-load-test:latest"
echo "========================================================="
45 changes: 45 additions & 0 deletions grpc_p2p_client/k8_cron_job/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: mump2p-load-test
namespace: default
spec:
# Schedule to run, e.g., every day at midnight. Adjust as needed.
schedule: "0 0 * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: mump2p-load-test
image: mump2p-load-test:latest
# Always pull the latest image if tag is latest
imagePullPolicy: Always
# Environment variables configuration for entrypoint.sh
env:
- name: TOPIC
value: "bnb-bench" # Required!
- name: BATCH
value: "1"
- name: SLEEP
value: "450ms"
- name: DATASIZE
value: "80000"
- name: COUNT
value: "4000"
- name: START_IDX
value: "0"
- name: END_IDX
value: "1"
- name: IPFILE
value: "/app/config/ip.p2pnode.bnb.tsv"
volumeMounts:
- name: p2p-ips
mountPath: /app/config
readOnly: true
restartPolicy: OnFailure
volumes:
- name: p2p-ips
configMap:
name: p2p-ip-config
Loading
Loading