From 54e5bbf9fabbb793b23c832eb24f6297aae8e662 Mon Sep 17 00:00:00 2001 From: sunjqa1 Date: Wed, 11 Mar 2026 02:43:19 +0000 Subject: [PATCH 1/3] feat(dashboard): add training analysis page and improve communication/inference tables --- dashboard/app.py | 41 ++- dashboard/pages/communication.py | 72 +++-- dashboard/pages/inference.py | 4 +- dashboard/pages/training.py | 524 +++++++++++++++++++++++++++++++ dashboard/utils/data_loader.py | 39 +-- 5 files changed, 622 insertions(+), 58 deletions(-) create mode 100644 dashboard/pages/training.py diff --git a/dashboard/app.py b/dashboard/app.py index 705177e..e5e0006 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -24,7 +24,7 @@ ) # Initialize session state -if "data_loader" not in st.session_state: +if "ni" not in st.session_state: st.session_state.data_loader = InfiniMetricsDataLoader() if "selected_accelerators" not in st.session_state: st.session_state.selected_accelerators = [] @@ -96,19 +96,22 @@ def render_dashboard(run_id_filter: str):
InfiniMetrics Dashboard 用于统一展示 通信(NCCL / 集合通信)、 - 推理(Direct / Service)、 + 训练(Training / 分布式训练)、 + 推理(Direct / Service 推理)算子(核心算子性能) 等 AI 加速卡性能测试结果。
测试框架输出 JSON(环境 / 配置 / 标量指标) + CSV(曲线 / 时序数据), - Dashboard 自动加载并支持多次运行的对比分析与可视化。 + Dashboard 自动加载并支持多次运行的 + 性能对比趋势分析 与 + 可视化展示
""", unsafe_allow_html=True, @@ -150,6 +153,7 @@ def _parse_time(t): # ========== Categorize runs ========== comm_runs = [r for r in runs if r.get("testcase", "").startswith("comm")] infer_runs = [r for r in runs if r.get("testcase", "").startswith("infer")] + train_runs = [r for r in runs if r.get("testcase", "").startswith("train")] ops_runs, hw_runs = [], [] for r in runs: @@ -161,13 +165,14 @@ def _parse_time(t): hw_runs.append(r) # ========== KPI ========== - c1, c2, c3, c4, c5, c6 = st.columns(6) + c1, c2, c3, c4, c5, c6, c7 = st.columns(7) c1.metric("总测试数", total) c2.metric("成功率", f"{(success/total*100):.1f}%") c3.metric("通信测试", len(comm_runs)) c4.metric("推理测试", len(infer_runs)) - c5.metric("算子测试", len(ops_runs)) - c6.metric("硬件检测", len(hw_runs)) + c5.metric("训练测试", len(train_runs)) + c6.metric("算子测试", len(ops_runs)) + c7.metric("硬件检测", len(hw_runs)) st.caption(f"失败测试数:{fail}") st.caption(f"当前筛选:加速卡={','.join(selected_accs) or '全部'}") @@ -181,8 +186,9 @@ def _latest(lst): latest_comm = _latest(comm_runs) latest_infer = _latest(infer_runs) latest_ops = _latest(ops_runs) + latest_train = _latest(train_runs) - colA, colB, colC = st.columns(3) + colA, colB, colC, colD = st.columns(4) with colA: st.markdown("#### 🔗 通信(最新)") @@ -211,6 +217,17 @@ def _latest(lst): st.write(f"- time: {latest_ops.get('time','')}") st.write(f"- status: {'✅' if latest_ops.get('success') else '❌'}") + with colD: + st.markdown("#### 🏋️ 训练(最新)") + if not latest_train: + st.info("暂无训练结果") + else: + framework = latest_train.get("config", {}).get("framework", "unknown") + model = latest_train.get("config", {}).get("model", "unknown") + st.write(f"- 框架/模型: `{framework}/{model}`") + st.write(f"- time: {latest_train.get('time','')}") + st.write(f"- status: {'✅' if latest_train.get('success') else '❌'}") + st.divider() # ========== Recent runs table ========== @@ -267,13 +284,15 @@ def _latest(lst): st.markdown("---") st.markdown("### 🚀 快速导航") - col1, col2, col3 = st.columns(3) + col1, col2, col3, col4 = st.columns(4) if col1.button("🔗 通信测试分析", use_container_width=True): st.switch_page("pages/communication.py") if col2.button("⚡ 算子测试分析", use_container_width=True): st.switch_page("pages/operator.py") - if col3.button("🤖 推理测试分析", use_container_width=True): + if col3.button("🚀 推理测试分析", use_container_width=True): st.switch_page("pages/inference.py") + if col4.button("🏋️ 训练测试分析", use_container_width=True): + st.switch_page("pages/training.py") except Exception as e: st.error(f"Dashboard 加载失败: {e}") diff --git a/dashboard/pages/communication.py b/dashboard/pages/communication.py index e0a9c9f..5ba42fd 100644 --- a/dashboard/pages/communication.py +++ b/dashboard/pages/communication.py @@ -17,7 +17,7 @@ create_summary_table_infer, ) -init_page("推理测试分析 | InfiniMetrics", "🔗") +init_page("通信测试分析 | InfiniMetrics", "🔗") def main(): @@ -58,7 +58,7 @@ def main(): # Status filter show_success = st.checkbox("仅显示成功测试", value=True) - # Apply filters + # Apply filter filtered_runs = [ r for r in comm_runs @@ -119,6 +119,7 @@ def main(): run_info = filtered_runs[idx] result = st.session_state.data_loader.load_test_result(run_info["path"]) run_info["data"] = result + selected_runs.append(run_info) # Tabs for different views @@ -181,26 +182,53 @@ def main(): if len(selected_runs) == 1: st.markdown("#### 📌 核心指标(最新)") run = selected_runs[0] - core = extract_core_metrics(run) + max_bw = None + avg_lat = None + duration = None + + for metric in run.get("data", {}).get("metrics", []): + metric_name = metric.get("name", "") + + # bandwidth + if ( + metric_name == "comm.bandwidth" + and metric.get("data") is not None + ): + df = metric["data"] + if "bandwidth_gbs" in df.columns: + max_bw = df["bandwidth_gbs"].max() + + # latency + elif ( + metric_name == "comm.latency" and metric.get("data") is not None + ): + df = metric["data"] + if "latency_us" in df.columns: + avg_lat = df["latency_us"].mean() + + # duration + elif metric_name == "comm.duration": + duration = metric.get("value") c1, c2, c3 = st.columns(3) - c1.metric( - "峰值带宽", - ( - f"{core['bandwidth_gbps']:.2f} GB/s" - if core["bandwidth_gbps"] - else "-" - ), - ) - c2.metric( - "平均延迟", - f"{core['latency_us']:.2f} μs" if core["latency_us"] else "-", - ) - c3.metric( - "测试耗时", - f"{core['duration_ms']:.2f} ms" if core["duration_ms"] else "-", - ) + with c1: + if max_bw is not None and max_bw > 0: + st.metric("峰值带宽", f"{max_bw:.2f} GB/s") + else: + st.metric("峰值带宽", "-") + + with c2: + if avg_lat is not None and avg_lat > 0: + st.metric("平均延迟", f"{avg_lat:.2f} μs") + else: + st.metric("平均延迟", "-") + + with c3: + if duration is not None and duration > 0: + st.metric("测试耗时", f"{duration:.2f} ms") + else: + st.metric("测试耗时", "-") # Gauge charts for key metrics if len(selected_runs) == 1: st.markdown("#### 关键指标") @@ -221,7 +249,7 @@ def main(): max_bw = df["bandwidth_gbs"].max() fig = create_gauge_chart( max_bw, - 300, # Theoretical max for A100 NVLink + 300, "峰值带宽", "blue", "GB/s", @@ -242,7 +270,7 @@ def main(): avg_lat = df["latency_us"].mean() fig = create_gauge_chart( avg_lat, - 1000, # Reference: 1000 µs + 1000, "平均延迟", "red", "µs", @@ -261,7 +289,7 @@ def main(): if duration > 0: fig = create_gauge_chart( duration, - duration * 2, # Scale to show progress + duration * 2, "测试耗时", "green", "ms", diff --git a/dashboard/pages/inference.py b/dashboard/pages/inference.py index e9db234..12b937c 100644 --- a/dashboard/pages/inference.py +++ b/dashboard/pages/inference.py @@ -12,7 +12,7 @@ create_summary_table_infer, ) -init_page("推理测试分析 | InfiniMetrics", "🤖") +init_page("推理测试分析 | InfiniMetrics", "🚀") def main(): @@ -178,7 +178,7 @@ def _plot_metric(metric_name_contains: str, container): _plot_metric("infer.compute_latency", c1) _plot_metric("infer.ttft", c2) - _plot_metric("infer.direct_throughput", c3) + _plot_metric("infer.direct_throughput_tps", c3) # ---------- Tables ---------- with tab2: diff --git a/dashboard/pages/training.py b/dashboard/pages/training.py new file mode 100644 index 0000000..7702db5 --- /dev/null +++ b/dashboard/pages/training.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +"""Training tests analysis page.""" + +import streamlit as st +import pandas as pd +import plotly.graph_objects as go +import plotly.express as px + +from common import init_page +from components.header import render_header +from utils.visualizations import create_gauge_chart + +init_page("训练测试分析 | InfiniMetrics", "🏋️") + + +def main(): + render_header() + st.markdown("## 🏋️ 训练性能测试分析") + + dl = st.session_state.data_loader + + # Load all training-related tests + runs = load_training_runs(dl) + + if not runs: + st.info("未找到训练测试结果\n请将训练测试结果放在 test_output/train/ 或 test_output/training/ 目录下") + return + + # ---------- Sidebar Filters ---------- + with st.sidebar: + st.markdown("### 🔍 筛选条件") + + frameworks = sorted( + {r.get("config", {}).get("framework", "unknown") for r in runs} + ) + models = sorted({r.get("config", {}).get("model", "unknown") for r in runs}) + device_counts = sorted({r.get("device_used", 1) for r in runs}) + + selected_fw = st.multiselect("框架", frameworks, default=frameworks) + selected_models = st.multiselect("模型", models, default=models) + selected_dev = st.multiselect("设备数", device_counts, default=device_counts) + only_success = st.checkbox("仅显示成功测试", value=True) + + st.markdown("---") + st.markdown("### 📈 图表选项") + y_log = st.checkbox("Y轴对数刻度", value=False) + smoothing = st.slider("平滑窗口", 1, 50, 5, help="对曲线进行移动平均平滑") + + # Apply filters + filtered = filter_runs( + runs, selected_fw, selected_models, selected_dev, only_success + ) + st.caption(f"找到 {len(filtered)} 个训练测试") + + if not filtered: + st.warning("没有符合条件的测试结果") + return + + # ---------- Run Selection ---------- + options = create_run_options(filtered) + selected = st.multiselect( + "选择要分析的测试运行(可多选对比)", + list(options.keys()), + default=list(options.keys())[: min(3, len(options))], + ) + + if not selected: + return + + # Load selected runs + selected_runs = load_selected_runs(dl, filtered, options, selected) + + # Tabs + tab1, tab2, tab3, tab4 = st.tabs(["📈 性能曲线", "📊 吞吐量对比", "📋 数据表格", "🔍 详细配置"]) + + with tab1: + render_performance_curves(selected_runs, smoothing, y_log) + + with tab2: + render_throughput_comparison(selected_runs) + + with tab3: + render_data_tables(selected_runs) + + with tab4: + render_config_details(selected_runs) + + +def load_training_runs(data_loader): + """Load all training-related test runs""" + runs = data_loader.list_test_runs("train") + + if not runs: + all_runs = data_loader.list_test_runs() + runs = [ + r + for r in all_runs + if any( + keyword in str(r.get("path", "")).lower() + or keyword in r.get("testcase", "").lower() + for keyword in [ + "/train/", + "/training/", + "train.", + "megatron", + "lora", + "sft", + ] + ) + ] + + return runs + + +def filter_runs(runs, selected_fw, selected_models, selected_dev, only_success): + """Apply filters to runs""" + return [ + r + for r in runs + if ( + not selected_fw + or r.get("config", {}).get("framework", "unknown") in selected_fw + ) + and ( + not selected_models + or r.get("config", {}).get("model", "unknown") in selected_models + ) + and (not selected_dev or r.get("device_used", 1) in selected_dev) + and (not only_success or r.get("success", False)) + ] + + +def create_run_options(runs): + """Create run selection options""" + options = {} + for i, r in enumerate(runs): + config = r.get("config", {}) + framework = config.get("framework", "unknown") + model = config.get("model", "unknown") + device = r.get("device_used", "?") + time_str = r.get("time", "")[5:16] if r.get("time") else "unknown" + run_id = r.get("run_id", "")[:8] + + label = f"{framework}/{model} | {device}GPU | {time_str} | {run_id}" + options[label] = i + + return options + + +def load_selected_runs(data_loader, filtered_runs, options, selected_labels): + """Load the selected test run""" + selected_runs = [] + for label in selected_labels: + idx = options[label] + run_info = filtered_runs[idx].copy() + run_info["data"] = data_loader.load_test_result(run_info["path"]) + selected_runs.append(run_info) + + return selected_runs + + +def render_performance_curves(selected_runs, smoothing, y_log): + """Render performance curves""" + st.markdown("### 训练指标曲线") + + metrics = [ + ("train.loss", "Loss", "损失值", ""), + ("train.ppl", "Perplexity", "困惑度", ""), + ("train.throughput", "Throughput", "吞吐量", "tokens/s/GPU"), + ] + + cols = st.columns(3) + + for idx, (metric_key, title, ylabel, unit) in enumerate(metrics): + with cols[idx]: + st.markdown(f"**{title}**") + + if len(selected_runs) == 1: + plot_single_metric( + selected_runs[0], metric_key, title, ylabel, unit, smoothing, y_log + ) + else: + plot_multi_metric_comparison( + selected_runs, metric_key, title, ylabel, unit, smoothing, y_log + ) + + # Memory usage (only for single run) + if len(selected_runs) == 1: + render_memory_usage(selected_runs[0]) + + +def plot_single_metric(run, metric_key, title, ylabel, unit, smoothing, y_log): + """Draw a curve for a single indicator""" + metrics = run["data"].get("metrics", []) + target_metric = next( + ( + m + for m in metrics + if metric_key in m.get("name", "") and m.get("data") is not None + ), + None, + ) + + if not target_metric: + st.info(f"无{title}数据") + return + + df = target_metric["data"].copy() + if df.empty or len(df.columns) < 2: + st.info("数据为空") + return + + # Apply smoothing + if smoothing > 1 and len(df) > smoothing: + df.iloc[:, 1] = df.iloc[:, 1].rolling(window=smoothing, min_periods=1).mean() + + # Create a chart + fig = go.Figure() + fig.add_trace( + go.Scatter( + x=df[df.columns[0]], + y=df[df.columns[1]], + mode="lines", + name=title, + line=dict(width=2), + ) + ) + + fig.update_layout( + title=f"{title} - {run.get('config', {}).get('framework', '')}", + xaxis_title="Iteration", + yaxis_title=f"{ylabel} {unit}", + template="plotly_white", + height=350, + margin=dict(l=40, r=20, t=40, b=40), + ) + + if y_log: + fig.update_yaxes(type="log") + + st.plotly_chart(fig, use_container_width=True) + + +def plot_multi_metric_comparison( + runs, metric_key, title, ylabel, unit, smoothing, y_log +): + """Comparison of multiple operation indexes""" + colors = px.colors.qualitative.Set2 + + fig = go.Figure() + found_data = False + + for i, run in enumerate(runs): + metrics = run["data"].get("metrics", []) + target_metric = next( + ( + m + for m in metrics + if metric_key in m.get("name", "") and m.get("data") is not None + ), + None, + ) + + if not target_metric: + continue + + df = target_metric["data"].copy() + if df.empty or len(df.columns) < 2: + continue + + found_data = True + + # Apply smoothing + if smoothing > 1 and len(df) > smoothing: + df.iloc[:, 1] = ( + df.iloc[:, 1].rolling(window=smoothing, min_periods=1).mean() + ) + + # Building legend labels + config = run.get("config", {}) + framework = config.get("framework", "unknown") + model = config.get("model", "unknown") + device = run.get("device_used", "?") + + fig.add_trace( + go.Scatter( + x=df[df.columns[0]], + y=df[df.columns[1]], + mode="lines", + name=f"{framework}/{model} ({device}GPU)", + line=dict(color=colors[i % len(colors)], width=2), + ) + ) + + if not found_data: + st.info(f"无{title}数据") + return + + fig.update_layout( + title=f"{title} 对比", + xaxis_title="Iteration", + yaxis_title=f"{ylabel} {unit}", + template="plotly_white", + height=350, + legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), + margin=dict(l=40, r=20, t=40, b=40), + ) + + if y_log: + fig.update_yaxes(type="log") + + st.plotly_chart(fig, use_container_width=True) + + +def render_memory_usage(run): + """Render memory usage gauge""" + memory_metric = next( + ( + m + for m in run["data"].get("metrics", []) + if m.get("name") == "train.peak_memory_usage" + ), + None, + ) + + if not memory_metric or not memory_metric.get("value"): + return + + st.markdown("#### 💾 显存使用") + value = memory_metric["value"] + unit = memory_metric.get("unit", "GB") + + # Try to get max memory from environment + max_value = value * 1.5 + try: + env = run["data"].get("environment", {}) + acc = env["cluster"][0]["machine"]["accelerators"][0] + max_value = float(acc.get("memory_gb_per_card", 80)) * int(acc.get("count", 1)) + except: + pass + + col1, col2, col3 = st.columns([1, 2, 1]) + with col2: + fig = create_gauge_chart(value, max_value, "峰值显存使用", "green", unit) + st.plotly_chart(fig, use_container_width=True) + + +def render_throughput_comparison(selected_runs): + """Render throughput comparison""" + st.markdown("### 吞吐量对比") + + throughput_data = [] + for run in selected_runs: + metrics = run["data"].get("metrics", []) + throughput_metric = next( + ( + m + for m in metrics + if "throughput" in m.get("name", "").lower() + and m.get("data") is not None + ), + None, + ) + + if not throughput_metric: + continue + + df = throughput_metric["data"] + if df.empty or len(df.columns) < 2: + continue + + avg_tput = df[df.columns[1]].mean() + peak_tput = df[df.columns[1]].max() + + config = run.get("config", {}) + throughput_data.append( + { + "运行": f"{config.get('framework', 'unknown')}/{config.get('model', 'unknown')} ({run.get('device_used', '?')}GPU)", + "平均吞吐量 (tokens/s/GPU)": round(avg_tput, 2), + "峰值吞吐量 (tokens/s/GPU)": round(peak_tput, 2), + } + ) + + if not throughput_data: + st.info("无可对比的吞吐量数据") + return + + # Display table + df = pd.DataFrame(throughput_data) + st.dataframe(df, width="stretch", hide_index=True) + + # Bar chart + fig = go.Figure() + for i, data in enumerate(throughput_data): + fig.add_trace( + go.Bar( + name=data["运行"], + x=["平均吞吐量", "峰值吞吐量"], + y=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], + text=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], + textposition="auto", + ) + ) + + fig.update_layout( + title="吞吐量对比", + barmode="group", + template="plotly_white", + height=400, + yaxis_title="tokens/s/GPU", + ) + st.plotly_chart(fig, use_container_width=True) + + +def render_data_tables(selected_runs): + """Render data tables""" + for run in selected_runs: + with st.expander(f"{run.get('run_id', 'Unknown')} - 原始数据"): + for metric in run["data"].get("metrics", []): + if metric.get("data") is not None: + df = metric["data"].copy() + st.markdown(f"**{metric.get('name', 'Unknown')}**") + + if len(df.columns) == 2: + df.columns = ["Iteration", metric.get("name", "Value")] + + st.dataframe(df, width="stretch", hide_index=True) + + +def render_config_details(selected_runs): + """Render configuration details""" + for run in selected_runs: + with st.expander(f"{run.get('run_id', 'Unknown')} - 配置与环境"): + summary_df = create_training_summary(run["data"]) + if not summary_df.empty: + st.markdown("**配置摘要**") + st.dataframe(summary_df, width="stretch", hide_index=True) + + col1, col2 = st.columns(2) + with col1: + st.markdown("**完整配置**") + st.json(run["data"].get("config", {})) + with col2: + st.markdown("**环境信息**") + st.json(run["data"].get("environment", {})) + + if run["data"].get("resolved"): + st.markdown("**解析信息**") + st.json(run["data"].get("resolved", {})) + + +def create_training_summary(test_result: dict) -> pd.DataFrame: + """Create configuration summary for training tests""" + rows = [] + + # Environment info + try: + env = test_result.get("environment", {}) + if "cluster" in env and len(env["cluster"]) > 0: + acc = env["cluster"][0]["machine"]["accelerators"][0] + rows.extend( + [ + {"指标": "加速卡", "数值": str(acc.get("model", "Unknown"))}, + {"指标": "卡数", "数值": str(acc.get("count", "Unknown"))}, + {"指标": "显存/卡", "数值": f"{acc.get('memory_gb_per_card','?')} GB"}, + ] + ) + except: + pass + + # Config info + cfg = test_result.get("config", {}) + train_args = cfg.get("train_args", {}) + + rows.extend( + [ + {"指标": "框架", "数值": str(cfg.get("framework", "unknown"))}, + {"指标": "模型", "数值": str(cfg.get("model", "unknown"))}, + ] + ) + + # Parallel config + parallel = train_args.get("parallel", {}) + rows.append( + { + "指标": "并行配置", + "数值": f"DP={parallel.get('dp', 1)}, TP={parallel.get('tp', 1)}, PP={parallel.get('pp', 1)}", + } + ) + + # Other configs + config_items = [ + ("MBS/GBS", f"{train_args.get('mbs', '?')}/{train_args.get('gbs', '?')}"), + ("序列长度", str(train_args.get("seq_len", "?"))), + ("隐藏层大小", str(train_args.get("hidden_size", "?"))), + ("层数", str(train_args.get("num_layers", "?"))), + ("精度", str(train_args.get("precision", "?"))), + ("预热迭代", str(cfg.get("warmup_iterations", "?"))), + ("训练迭代", str(train_args.get("train_iters", "?"))), + ] + + for label, value in config_items: + rows.append({"指标": label, "数值": value}) + + # Scalar metrics + for m in test_result.get("metrics", []): + if m.get("type") == "scalar": + value = m.get("value", "") + unit = m.get("unit", "") + rows.append( + { + "指标": str(m.get("name")), + "数值": f"{value} {unit}" if unit and value != "" else str(value), + } + ) + + df = pd.DataFrame(rows) + if not df.empty: + df["数值"] = df["数值"].astype(str) + return df + + +if __name__ == "__main__": + main() diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py index f2b9c66..504687a 100644 --- a/dashboard/utils/data_loader.py +++ b/dashboard/utils/data_loader.py @@ -180,38 +180,31 @@ def _get_csv_base_dir(self, json_data: Dict[str, Any], json_path: Path) -> Path: def _resolve_csv_path(self, csv_url: str, base_dir: Path) -> Optional[Path]: """ Resolve CSV path from raw_data_url and base_dir. - - Handles cases like: - - base_dir/output/communication + "./comm/xxx.csv" but file is actually base_dir/"xxx.csv" - - base_dir/output/infer + "./infer/xxx.csv" but file is base_dir/"xxx.csv" """ try: if not csv_url: return None - # strip leading "./" - rel = csv_url[2:] if csv_url.startswith("./") else csv_url - rel_path = Path(rel) - - candidates = [] - - # 1) base_dir / rel - candidates.append(base_dir / rel_path) + # Clean the path + clean_path = csv_url[2:] if csv_url.startswith("./") else csv_url + rel_path = Path(clean_path) - # 2) base_dir / basename (most common fallback for your current layout) - candidates.append(base_dir / rel_path.name) + # Project root + project_root = Path(__file__).parent.parent.parent - # 3) base_dir.parent / rel (just in case) - candidates.append(base_dir.parent / rel_path) + # All possible paths to search for CSV files + # Adjust these based on your actual directory structure + candidates = [ + base_dir / rel_path, + base_dir / rel_path.name, + base_dir.parent / rel_path, + base_dir.parent / rel_path.name, + project_root / "test_output" / rel_path, + ] - # 4) base_dir.parent / basename - candidates.append(base_dir.parent / rel_path.name) + # Returns the first existing path + return next((p for p in candidates if p.exists()), None) - for p in candidates: - if p.exists(): - return p - - return None except Exception: return None From cc804e9dfcd5b3102bb1e39d15d25c641dd678c0 Mon Sep 17 00:00:00 2001 From: sunjqa1 Date: Mon, 16 Mar 2026 02:06:03 +0000 Subject: [PATCH 2/3] feat(dashboard): Optimizing the Streamlit dashboard architecture --- dashboard/app.py | 2 +- dashboard/pages/communication.py | 83 ++---- dashboard/pages/training.py | 462 ++---------------------------- dashboard/utils/data_loader.py | 3 - dashboard/utils/training_plots.py | 261 +++++++++++++++++ dashboard/utils/training_utils.py | 163 +++++++++++ dashboard/utils/visualizations.py | 24 +- 7 files changed, 487 insertions(+), 511 deletions(-) create mode 100644 dashboard/utils/training_plots.py create mode 100644 dashboard/utils/training_utils.py diff --git a/dashboard/app.py b/dashboard/app.py index e5e0006..122b0e0 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -24,7 +24,7 @@ ) # Initialize session state -if "ni" not in st.session_state: +if "data_loader" not in st.session_state: st.session_state.data_loader = InfiniMetricsDataLoader() if "selected_accelerators" not in st.session_state: st.session_state.selected_accelerators = [] diff --git a/dashboard/pages/communication.py b/dashboard/pages/communication.py index 5ba42fd..2f41fe4 100644 --- a/dashboard/pages/communication.py +++ b/dashboard/pages/communication.py @@ -179,64 +179,31 @@ def main(): fig = plot_comparison_matrix(selected_runs, "latency", y_log_scale) st.plotly_chart(fig, use_container_width=True) - if len(selected_runs) == 1: - st.markdown("#### 📌 核心指标(最新)") - run = selected_runs[0] - - max_bw = None - avg_lat = None - duration = None - - for metric in run.get("data", {}).get("metrics", []): - metric_name = metric.get("name", "") - - # bandwidth - if ( - metric_name == "comm.bandwidth" - and metric.get("data") is not None - ): - df = metric["data"] - if "bandwidth_gbs" in df.columns: - max_bw = df["bandwidth_gbs"].max() - - # latency - elif ( - metric_name == "comm.latency" and metric.get("data") is not None - ): - df = metric["data"] - if "latency_us" in df.columns: - avg_lat = df["latency_us"].mean() - - # duration - elif metric_name == "comm.duration": - duration = metric.get("value") - c1, c2, c3 = st.columns(3) - - with c1: - if max_bw is not None and max_bw > 0: - st.metric("峰值带宽", f"{max_bw:.2f} GB/s") - else: - st.metric("峰值带宽", "-") - - with c2: - if avg_lat is not None and avg_lat > 0: - st.metric("平均延迟", f"{avg_lat:.2f} μs") - else: - st.metric("平均延迟", "-") - - with c3: - if duration is not None and duration > 0: - st.metric("测试耗时", f"{duration:.2f} ms") - else: - st.metric("测试耗时", "-") - # Gauge charts for key metrics if len(selected_runs) == 1: st.markdown("#### 关键指标") run = selected_runs[0] + core = extract_core_metrics(run) + + # First Line: numerical indicators + cols = st.columns(3) + cols[0].metric( + "峰值带宽", + f"{core['bandwidth_gbps']:.2f} GB/s" + if core["bandwidth_gbps"] + else "-", + ) + cols[1].metric( + "平均延迟", + f"{core['latency_us']:.2f} μs" if core["latency_us"] else "-", + ) + cols[2].metric( + "测试耗时", + f"{core['duration_ms']:.2f} ms" if core["duration_ms"] else "-", + ) - col1, col2, col3 = st.columns(3) + cols = st.columns(3) - with col1: + with cols[0]: # Find max bandwidth max_bw = 0 for metric in run.get("data", {}).get("metrics", []): @@ -249,7 +216,7 @@ def main(): max_bw = df["bandwidth_gbs"].max() fig = create_gauge_chart( max_bw, - 300, + 300, # Theoretical max for A100 NVLink "峰值带宽", "blue", "GB/s", @@ -257,7 +224,7 @@ def main(): st.plotly_chart(fig, use_container_width=True) break - with col2: + with cols[1]: # Find average latency avg_lat = 0 for metric in run.get("data", {}).get("metrics", []): @@ -270,7 +237,7 @@ def main(): avg_lat = df["latency_us"].mean() fig = create_gauge_chart( avg_lat, - 1000, + 1000, # Reference: 1000 µs "平均延迟", "red", "µs", @@ -278,7 +245,7 @@ def main(): st.plotly_chart(fig, use_container_width=True) break - with col3: + with cols[2]: # Extract duration duration = 0 for metric in run.get("data", {}).get("metrics", []): @@ -289,7 +256,7 @@ def main(): if duration > 0: fig = create_gauge_chart( duration, - duration * 2, + duration * 2, # Scale to show progress "测试耗时", "green", "ms", diff --git a/dashboard/pages/training.py b/dashboard/pages/training.py index 7702db5..c3f713b 100644 --- a/dashboard/pages/training.py +++ b/dashboard/pages/training.py @@ -2,13 +2,22 @@ """Training tests analysis page.""" import streamlit as st -import pandas as pd -import plotly.graph_objects as go -import plotly.express as px from common import init_page from components.header import render_header -from utils.visualizations import create_gauge_chart +from utils.training_utils import ( + load_training_runs, + filter_runs, + create_run_options, + load_selected_runs, + create_training_summary, +) +from utils.training_plots import ( + render_performance_curves, + render_throughput_comparison, + render_data_tables, + render_config_details, +) init_page("训练测试分析 | InfiniMetrics", "🏋️") @@ -18,15 +27,13 @@ def main(): st.markdown("## 🏋️ 训练性能测试分析") dl = st.session_state.data_loader - - # Load all training-related tests runs = load_training_runs(dl) if not runs: st.info("未找到训练测试结果\n请将训练测试结果放在 test_output/train/ 或 test_output/training/ 目录下") return - # ---------- Sidebar Filters ---------- + # Sidebar Filters with st.sidebar: st.markdown("### 🔍 筛选条件") @@ -56,7 +63,7 @@ def main(): st.warning("没有符合条件的测试结果") return - # ---------- Run Selection ---------- + # Run Selection options = create_run_options(filtered) selected = st.multiselect( "选择要分析的测试运行(可多选对比)", @@ -75,449 +82,12 @@ def main(): with tab1: render_performance_curves(selected_runs, smoothing, y_log) - with tab2: render_throughput_comparison(selected_runs) - with tab3: render_data_tables(selected_runs) - with tab4: - render_config_details(selected_runs) - - -def load_training_runs(data_loader): - """Load all training-related test runs""" - runs = data_loader.list_test_runs("train") - - if not runs: - all_runs = data_loader.list_test_runs() - runs = [ - r - for r in all_runs - if any( - keyword in str(r.get("path", "")).lower() - or keyword in r.get("testcase", "").lower() - for keyword in [ - "/train/", - "/training/", - "train.", - "megatron", - "lora", - "sft", - ] - ) - ] - - return runs - - -def filter_runs(runs, selected_fw, selected_models, selected_dev, only_success): - """Apply filters to runs""" - return [ - r - for r in runs - if ( - not selected_fw - or r.get("config", {}).get("framework", "unknown") in selected_fw - ) - and ( - not selected_models - or r.get("config", {}).get("model", "unknown") in selected_models - ) - and (not selected_dev or r.get("device_used", 1) in selected_dev) - and (not only_success or r.get("success", False)) - ] - - -def create_run_options(runs): - """Create run selection options""" - options = {} - for i, r in enumerate(runs): - config = r.get("config", {}) - framework = config.get("framework", "unknown") - model = config.get("model", "unknown") - device = r.get("device_used", "?") - time_str = r.get("time", "")[5:16] if r.get("time") else "unknown" - run_id = r.get("run_id", "")[:8] - - label = f"{framework}/{model} | {device}GPU | {time_str} | {run_id}" - options[label] = i - - return options - - -def load_selected_runs(data_loader, filtered_runs, options, selected_labels): - """Load the selected test run""" - selected_runs = [] - for label in selected_labels: - idx = options[label] - run_info = filtered_runs[idx].copy() - run_info["data"] = data_loader.load_test_result(run_info["path"]) - selected_runs.append(run_info) - - return selected_runs - - -def render_performance_curves(selected_runs, smoothing, y_log): - """Render performance curves""" - st.markdown("### 训练指标曲线") - - metrics = [ - ("train.loss", "Loss", "损失值", ""), - ("train.ppl", "Perplexity", "困惑度", ""), - ("train.throughput", "Throughput", "吞吐量", "tokens/s/GPU"), - ] - - cols = st.columns(3) - - for idx, (metric_key, title, ylabel, unit) in enumerate(metrics): - with cols[idx]: - st.markdown(f"**{title}**") - - if len(selected_runs) == 1: - plot_single_metric( - selected_runs[0], metric_key, title, ylabel, unit, smoothing, y_log - ) - else: - plot_multi_metric_comparison( - selected_runs, metric_key, title, ylabel, unit, smoothing, y_log - ) - - # Memory usage (only for single run) - if len(selected_runs) == 1: - render_memory_usage(selected_runs[0]) - - -def plot_single_metric(run, metric_key, title, ylabel, unit, smoothing, y_log): - """Draw a curve for a single indicator""" - metrics = run["data"].get("metrics", []) - target_metric = next( - ( - m - for m in metrics - if metric_key in m.get("name", "") and m.get("data") is not None - ), - None, - ) - - if not target_metric: - st.info(f"无{title}数据") - return - - df = target_metric["data"].copy() - if df.empty or len(df.columns) < 2: - st.info("数据为空") - return - - # Apply smoothing - if smoothing > 1 and len(df) > smoothing: - df.iloc[:, 1] = df.iloc[:, 1].rolling(window=smoothing, min_periods=1).mean() - - # Create a chart - fig = go.Figure() - fig.add_trace( - go.Scatter( - x=df[df.columns[0]], - y=df[df.columns[1]], - mode="lines", - name=title, - line=dict(width=2), - ) - ) - - fig.update_layout( - title=f"{title} - {run.get('config', {}).get('framework', '')}", - xaxis_title="Iteration", - yaxis_title=f"{ylabel} {unit}", - template="plotly_white", - height=350, - margin=dict(l=40, r=20, t=40, b=40), - ) - - if y_log: - fig.update_yaxes(type="log") - - st.plotly_chart(fig, use_container_width=True) - - -def plot_multi_metric_comparison( - runs, metric_key, title, ylabel, unit, smoothing, y_log -): - """Comparison of multiple operation indexes""" - colors = px.colors.qualitative.Set2 - - fig = go.Figure() - found_data = False - - for i, run in enumerate(runs): - metrics = run["data"].get("metrics", []) - target_metric = next( - ( - m - for m in metrics - if metric_key in m.get("name", "") and m.get("data") is not None - ), - None, - ) - - if not target_metric: - continue - - df = target_metric["data"].copy() - if df.empty or len(df.columns) < 2: - continue - - found_data = True - - # Apply smoothing - if smoothing > 1 and len(df) > smoothing: - df.iloc[:, 1] = ( - df.iloc[:, 1].rolling(window=smoothing, min_periods=1).mean() - ) - - # Building legend labels - config = run.get("config", {}) - framework = config.get("framework", "unknown") - model = config.get("model", "unknown") - device = run.get("device_used", "?") - - fig.add_trace( - go.Scatter( - x=df[df.columns[0]], - y=df[df.columns[1]], - mode="lines", - name=f"{framework}/{model} ({device}GPU)", - line=dict(color=colors[i % len(colors)], width=2), - ) - ) - - if not found_data: - st.info(f"无{title}数据") - return - - fig.update_layout( - title=f"{title} 对比", - xaxis_title="Iteration", - yaxis_title=f"{ylabel} {unit}", - template="plotly_white", - height=350, - legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), - margin=dict(l=40, r=20, t=40, b=40), - ) - - if y_log: - fig.update_yaxes(type="log") - - st.plotly_chart(fig, use_container_width=True) - - -def render_memory_usage(run): - """Render memory usage gauge""" - memory_metric = next( - ( - m - for m in run["data"].get("metrics", []) - if m.get("name") == "train.peak_memory_usage" - ), - None, - ) - - if not memory_metric or not memory_metric.get("value"): - return - - st.markdown("#### 💾 显存使用") - value = memory_metric["value"] - unit = memory_metric.get("unit", "GB") - - # Try to get max memory from environment - max_value = value * 1.5 - try: - env = run["data"].get("environment", {}) - acc = env["cluster"][0]["machine"]["accelerators"][0] - max_value = float(acc.get("memory_gb_per_card", 80)) * int(acc.get("count", 1)) - except: - pass - - col1, col2, col3 = st.columns([1, 2, 1]) - with col2: - fig = create_gauge_chart(value, max_value, "峰值显存使用", "green", unit) - st.plotly_chart(fig, use_container_width=True) - - -def render_throughput_comparison(selected_runs): - """Render throughput comparison""" - st.markdown("### 吞吐量对比") - - throughput_data = [] - for run in selected_runs: - metrics = run["data"].get("metrics", []) - throughput_metric = next( - ( - m - for m in metrics - if "throughput" in m.get("name", "").lower() - and m.get("data") is not None - ), - None, - ) - - if not throughput_metric: - continue - - df = throughput_metric["data"] - if df.empty or len(df.columns) < 2: - continue - - avg_tput = df[df.columns[1]].mean() - peak_tput = df[df.columns[1]].max() - - config = run.get("config", {}) - throughput_data.append( - { - "运行": f"{config.get('framework', 'unknown')}/{config.get('model', 'unknown')} ({run.get('device_used', '?')}GPU)", - "平均吞吐量 (tokens/s/GPU)": round(avg_tput, 2), - "峰值吞吐量 (tokens/s/GPU)": round(peak_tput, 2), - } - ) - - if not throughput_data: - st.info("无可对比的吞吐量数据") - return - - # Display table - df = pd.DataFrame(throughput_data) - st.dataframe(df, width="stretch", hide_index=True) - - # Bar chart - fig = go.Figure() - for i, data in enumerate(throughput_data): - fig.add_trace( - go.Bar( - name=data["运行"], - x=["平均吞吐量", "峰值吞吐量"], - y=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], - text=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], - textposition="auto", - ) - ) - - fig.update_layout( - title="吞吐量对比", - barmode="group", - template="plotly_white", - height=400, - yaxis_title="tokens/s/GPU", - ) - st.plotly_chart(fig, use_container_width=True) - - -def render_data_tables(selected_runs): - """Render data tables""" - for run in selected_runs: - with st.expander(f"{run.get('run_id', 'Unknown')} - 原始数据"): - for metric in run["data"].get("metrics", []): - if metric.get("data") is not None: - df = metric["data"].copy() - st.markdown(f"**{metric.get('name', 'Unknown')}**") - - if len(df.columns) == 2: - df.columns = ["Iteration", metric.get("name", "Value")] - - st.dataframe(df, width="stretch", hide_index=True) - - -def render_config_details(selected_runs): - """Render configuration details""" - for run in selected_runs: - with st.expander(f"{run.get('run_id', 'Unknown')} - 配置与环境"): - summary_df = create_training_summary(run["data"]) - if not summary_df.empty: - st.markdown("**配置摘要**") - st.dataframe(summary_df, width="stretch", hide_index=True) - - col1, col2 = st.columns(2) - with col1: - st.markdown("**完整配置**") - st.json(run["data"].get("config", {})) - with col2: - st.markdown("**环境信息**") - st.json(run["data"].get("environment", {})) - - if run["data"].get("resolved"): - st.markdown("**解析信息**") - st.json(run["data"].get("resolved", {})) - - -def create_training_summary(test_result: dict) -> pd.DataFrame: - """Create configuration summary for training tests""" - rows = [] - - # Environment info - try: - env = test_result.get("environment", {}) - if "cluster" in env and len(env["cluster"]) > 0: - acc = env["cluster"][0]["machine"]["accelerators"][0] - rows.extend( - [ - {"指标": "加速卡", "数值": str(acc.get("model", "Unknown"))}, - {"指标": "卡数", "数值": str(acc.get("count", "Unknown"))}, - {"指标": "显存/卡", "数值": f"{acc.get('memory_gb_per_card','?')} GB"}, - ] - ) - except: - pass - - # Config info - cfg = test_result.get("config", {}) - train_args = cfg.get("train_args", {}) - - rows.extend( - [ - {"指标": "框架", "数值": str(cfg.get("framework", "unknown"))}, - {"指标": "模型", "数值": str(cfg.get("model", "unknown"))}, - ] - ) - - # Parallel config - parallel = train_args.get("parallel", {}) - rows.append( - { - "指标": "并行配置", - "数值": f"DP={parallel.get('dp', 1)}, TP={parallel.get('tp', 1)}, PP={parallel.get('pp', 1)}", - } - ) - - # Other configs - config_items = [ - ("MBS/GBS", f"{train_args.get('mbs', '?')}/{train_args.get('gbs', '?')}"), - ("序列长度", str(train_args.get("seq_len", "?"))), - ("隐藏层大小", str(train_args.get("hidden_size", "?"))), - ("层数", str(train_args.get("num_layers", "?"))), - ("精度", str(train_args.get("precision", "?"))), - ("预热迭代", str(cfg.get("warmup_iterations", "?"))), - ("训练迭代", str(train_args.get("train_iters", "?"))), - ] - - for label, value in config_items: - rows.append({"指标": label, "数值": value}) - - # Scalar metrics - for m in test_result.get("metrics", []): - if m.get("type") == "scalar": - value = m.get("value", "") - unit = m.get("unit", "") - rows.append( - { - "指标": str(m.get("name")), - "数值": f"{value} {unit}" if unit and value != "" else str(value), - } - ) - - df = pd.DataFrame(rows) - if not df.empty: - df["数值"] = df["数值"].astype(str) - return df + render_config_details(selected_runs, create_training_summary) if __name__ == "__main__": diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py index 504687a..507ca46 100644 --- a/dashboard/utils/data_loader.py +++ b/dashboard/utils/data_loader.py @@ -195,9 +195,6 @@ def _resolve_csv_path(self, csv_url: str, base_dir: Path) -> Optional[Path]: # All possible paths to search for CSV files # Adjust these based on your actual directory structure candidates = [ - base_dir / rel_path, - base_dir / rel_path.name, - base_dir.parent / rel_path, base_dir.parent / rel_path.name, project_root / "test_output" / rel_path, ] diff --git a/dashboard/utils/training_plots.py b/dashboard/utils/training_plots.py new file mode 100644 index 0000000..a49453b --- /dev/null +++ b/dashboard/utils/training_plots.py @@ -0,0 +1,261 @@ +"""Training plot functions.""" + +import streamlit as st +import pandas as pd +import plotly.graph_objects as go +import plotly.express as px + +from utils.training_utils import get_metric_dataframe, apply_smoothing +from utils.visualizations import create_gauge_chart + + +def render_performance_curves(selected_runs, smoothing, y_log): + """Render performance curves""" + st.markdown("### 训练指标曲线") + + metrics = [ + ("train.loss", "Loss", "损失值", ""), + ("train.ppl", "Perplexity", "困惑度", ""), + ("train.throughput", "Throughput", "吞吐量", "tokens/s/GPU"), + ] + + cols = st.columns(3) + + for idx, (metric_key, title, ylabel, unit) in enumerate(metrics): + with cols[idx]: + st.markdown(f"**{title}**") + + if len(selected_runs) == 1: + plot_single_metric( + selected_runs[0], metric_key, title, ylabel, unit, smoothing, y_log + ) + else: + plot_multi_metric_comparison( + selected_runs, metric_key, title, ylabel, unit, smoothing, y_log + ) + + # Memory usage (only for single run) + if len(selected_runs) == 1: + render_memory_usage(selected_runs[0]) + + +def plot_single_metric(run, metric_key, title, ylabel, unit, smoothing, y_log): + """Draw a curve for a single indicator""" + target_metric = get_metric_dataframe(run, metric_key) + + if not target_metric: + st.info(f"无{title}数据") + return + + df = target_metric["data"].copy() + if df.empty or len(df.columns) < 2: + st.info("数据为空") + return + + df = apply_smoothing(df, smoothing) + + fig = go.Figure() + fig.add_trace( + go.Scatter( + x=df[df.columns[0]], + y=df[df.columns[1]], + mode="lines", + name=title, + line=dict(width=2), + ) + ) + + fig.update_layout( + title=f"{title} - {run.get('config', {}).get('framework', '')}", + xaxis_title="Iteration", + yaxis_title=f"{ylabel} {unit}", + template="plotly_white", + height=350, + margin=dict(l=40, r=20, t=40, b=40), + ) + + if y_log: + fig.update_yaxes(type="log") + + st.plotly_chart(fig, use_container_width=True) + + +def plot_multi_metric_comparison( + runs, metric_key, title, ylabel, unit, smoothing, y_log +): + """Comparison of multiple operation indexes""" + colors = px.colors.qualitative.Set2 + fig = go.Figure() + found_data = False + + for i, run in enumerate(runs): + target_metric = get_metric_dataframe(run, metric_key) + if not target_metric: + continue + + df = target_metric["data"].copy() + if df.empty or len(df.columns) < 2: + continue + + found_data = True + df = apply_smoothing(df, smoothing) + + config = run.get("config", {}) + framework = config.get("framework", "unknown") + model = config.get("model", "unknown") + device = run.get("device_used", "?") + + fig.add_trace( + go.Scatter( + x=df[df.columns[0]], + y=df[df.columns[1]], + mode="lines", + name=f"{framework}/{model} ({device}GPU)", + line=dict(color=colors[i % len(colors)], width=2), + ) + ) + + if not found_data: + st.info(f"无{title}数据") + return + + fig.update_layout( + title=f"{title} 对比", + xaxis_title="Iteration", + yaxis_title=f"{ylabel} {unit}", + template="plotly_white", + height=350, + legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), + margin=dict(l=40, r=20, t=40, b=40), + ) + + if y_log: + fig.update_yaxes(type="log") + + st.plotly_chart(fig, use_container_width=True) + + +def render_memory_usage(run): + """Render memory usage gauge""" + memory_metric = get_metric_dataframe(run, "train.peak_memory_usage") + + if not memory_metric or not memory_metric.get("value"): + return + + st.markdown("#### 💾 显存使用") + value = memory_metric["value"] + unit = memory_metric.get("unit", "GB") + + # Try to get max memory from environment + max_value = value * 1.5 + try: + env = run["data"].get("environment", {}) + if env and "cluster" in env and len(env["cluster"]) > 0: + acc = env["cluster"][0].get("machine", {}).get("accelerators", []) + if acc and len(acc) > 0: + memory_per_card = float(acc[0].get("memory_gb_per_card", 80)) + card_count = int(acc[0].get("count", 1)) + max_value = memory_per_card * card_count + else: + st.warning("未找到加速卡信息,使用默认显存上限") + else: + st.warning("未找到环境信息,使用默认显存上限") + except Exception as e: + st.warning(f"解析环境信息失败: {e},使用默认显存上限") + + col1, col2, col3 = st.columns([1, 2, 1]) + with col2: + fig = create_gauge_chart(value, max_value, "峰值显存使用", "green", unit) + st.plotly_chart(fig, use_container_width=True) + + +def render_throughput_comparison(selected_runs): + """Render throughput comparison""" + st.markdown("### 吞吐量对比") + + throughput_data = [] + for run in selected_runs: + target_metric = get_metric_dataframe(run, "throughput") + if not target_metric: + continue + + df = target_metric["data"] + if df.empty or len(df.columns) < 2: + continue + + avg_tput = df[df.columns[1]].mean() + peak_tput = df[df.columns[1]].max() + + config = run.get("config", {}) + throughput_data.append( + { + "运行": f"{config.get('framework', 'unknown')}/{config.get('model', 'unknown')} ({run.get('device_used', '?')}GPU)", + "平均吞吐量 (tokens/s/GPU)": round(avg_tput, 2), + "峰值吞吐量 (tokens/s/GPU)": round(peak_tput, 2), + } + ) + + if not throughput_data: + st.info("无可对比的吞吐量数据") + return + + # Display table + df = pd.DataFrame(throughput_data) + st.dataframe(df, width="stretch", hide_index=True) + + # Bar chart + fig = go.Figure() + for i, data in enumerate(throughput_data): + fig.add_trace( + go.Bar( + name=data["运行"], + x=["平均吞吐量", "峰值吞吐量"], + y=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], + text=[data["平均吞吐量 (tokens/s/GPU)"], data["峰值吞吐量 (tokens/s/GPU)"]], + textposition="auto", + ) + ) + + fig.update_layout( + title="吞吐量对比", + barmode="group", + template="plotly_white", + height=400, + yaxis_title="tokens/s/GPU", + ) + st.plotly_chart(fig, use_container_width=True) + + +def render_data_tables(selected_runs): + """Render data tables""" + for run in selected_runs: + with st.expander(f"{run.get('run_id', 'Unknown')} - 原始数据"): + for metric in run["data"].get("metrics", []): + if metric.get("data") is not None: + df = metric["data"].copy() + st.markdown(f"**{metric.get('name', 'Unknown')}**") + if len(df.columns) == 2: + df.columns = ["Iteration", metric.get("name", "Value")] + st.dataframe(df, width="stretch", hide_index=True) + + +def render_config_details(selected_runs, summary_func): + """Render configuration details""" + for run in selected_runs: + with st.expander(f"{run.get('run_id', 'Unknown')} - 配置与环境"): + summary_df = summary_func(run["data"]) + if not summary_df.empty: + st.markdown("**配置摘要**") + st.dataframe(summary_df, width="stretch", hide_index=True) + + col1, col2 = st.columns(2) + with col1: + st.markdown("**完整配置**") + st.json(run["data"].get("config", {})) + with col2: + st.markdown("**环境信息**") + st.json(run["data"].get("environment", {})) + + if run["data"].get("resolved"): + st.markdown("**解析信息**") + st.json(run["data"].get("resolved", {})) diff --git a/dashboard/utils/training_utils.py b/dashboard/utils/training_utils.py new file mode 100644 index 0000000..975777f --- /dev/null +++ b/dashboard/utils/training_utils.py @@ -0,0 +1,163 @@ +"""Training page utilities.""" + +import streamlit as st +import pandas as pd +import plotly.graph_objects as go +import plotly.express as px + + +def load_training_runs(data_loader): + """Load all training-related test runs""" + runs = data_loader.list_test_runs("train") + + if not runs: + all_runs = data_loader.list_test_runs() + runs = [ + r + for r in all_runs + if any( + keyword in str(r.get("path", "")).lower() + or keyword in r.get("testcase", "").lower() + for keyword in [ + "/train/", + "/training/", + "train.", + "megatron", + "lora", + "sft", + ] + ) + ] + return runs + + +def filter_runs(runs, selected_fw, selected_models, selected_dev, only_success): + """Apply filters to runs""" + return [ + r + for r in runs + if ( + not selected_fw + or r.get("config", {}).get("framework", "unknown") in selected_fw + ) + and ( + not selected_models + or r.get("config", {}).get("model", "unknown") in selected_models + ) + and (not selected_dev or r.get("device_used", 1) in selected_dev) + and (not only_success or r.get("success", False)) + ] + + +def create_run_options(runs): + """Create run selection options""" + return { + f"{r.get('config', {}).get('framework', 'unknown')}/" + f"{r.get('config', {}).get('model', 'unknown')} | " + f"{r.get('device_used', '?')}GPU | " + f"{r.get('time', '')[:16]}": i + for i, r in enumerate(runs) + } + + +def load_selected_runs(data_loader, filtered_runs, options, selected_labels): + """Load the selected test run""" + selected_runs = [] + for label in selected_labels: + idx = options[label] + run_info = filtered_runs[idx].copy() + run_info["data"] = data_loader.load_test_result(run_info["path"]) + selected_runs.append(run_info) + return selected_runs + + +def get_metric_dataframe(run, metric_key): + """Get metric dataframe""" + metrics = run["data"].get("metrics", []) + return next( + ( + m + for m in metrics + if metric_key in m.get("name", "") and m.get("data") is not None + ), + None, + ) + + +def apply_smoothing(df, smoothing): + """Apply smoothing to dataframe""" + if smoothing > 1 and len(df) > smoothing: + df = df.copy() + df.iloc[:, 1] = df.iloc[:, 1].rolling(window=smoothing, min_periods=1).mean() + return df + + +def create_training_summary(test_result: dict) -> pd.DataFrame: + """Create configuration summary for training tests""" + rows = [] + + # Environment info + try: + env = test_result.get("environment", {}) + if "cluster" in env and len(env["cluster"]) > 0: + acc = env["cluster"][0]["machine"]["accelerators"][0] + rows.extend( + [ + {"指标": "加速卡", "数值": str(acc.get("model", "Unknown"))}, + {"指标": "卡数", "数值": str(acc.get("count", "Unknown"))}, + {"指标": "显存/卡", "数值": f"{acc.get('memory_gb_per_card','?')} GB"}, + ] + ) + except Exception as e: + st.warning(f"解析环境信息失败: {e}") + + # Config info + cfg = test_result.get("config", {}) + train_args = cfg.get("train_args", {}) + + rows.extend( + [ + {"指标": "框架", "数值": str(cfg.get("framework", "unknown"))}, + {"指标": "模型", "数值": str(cfg.get("model", "unknown"))}, + ] + ) + + # Parallel config + parallel = train_args.get("parallel", {}) + rows.append( + { + "指标": "并行配置", + "数值": f"DP={parallel.get('dp', 1)}, TP={parallel.get('tp', 1)}, PP={parallel.get('pp', 1)}", + } + ) + + # Other configs + config_items = [ + ("MBS/GBS", f"{train_args.get('mbs', '?')}/{train_args.get('gbs', '?')}"), + ("序列长度", str(train_args.get("seq_len", "?"))), + ("隐藏层大小", str(train_args.get("hidden_size", "?"))), + ("层数", str(train_args.get("num_layers", "?"))), + ("精度", str(train_args.get("precision", "?"))), + ("预热迭代", str(cfg.get("warmup_iterations", "?"))), + ("训练迭代", str(train_args.get("train_iters", "?"))), + ] + + for label, value in config_items: + rows.append({"指标": label, "数值": value}) + + # Scalar metrics + for m in test_result.get("metrics", []): + if m.get("type") == "scalar": + value = m.get("value", "") + unit = m.get("unit", "") + rows.append( + { + "指标": str(m.get("name")), + "数值": f"{value} {unit}" if unit and value != "" else str(value), + } + ) + + df = pd.DataFrame(rows) + if not df.empty: + df["数值"] = df["数值"].astype(str) + return df diff --git a/dashboard/utils/visualizations.py b/dashboard/utils/visualizations.py index 30c064a..9846809 100644 --- a/dashboard/utils/visualizations.py +++ b/dashboard/utils/visualizations.py @@ -275,18 +275,36 @@ def create_summary_table(test_result: Dict[str, Any]) -> pd.DataFrame: def create_gauge_chart( - value: float, max_value: float, title: str, color: str = "blue", unit: str = "" + value: float, + max_value: float, + title: str, + color: str = "blue", + unit: str = "", + decimals: Optional[int] = None, # optional ) -> go.Figure: """Create a gauge chart for single metric visualization.""" + + if decimals is None: + if value < 10: + decimals = 2 + elif value < 100: + decimals = 1 + else: + decimals = 0 + fig = go.Figure( go.Indicator( mode="gauge+number", value=value, domain={"x": [0, 1], "y": [0.05, 0.85]}, title={"text": title, "font": {"size": 18}}, - number={"suffix": f" {unit}", "font": {"size": 36}}, + number={ + "suffix": f" {unit}", + "font": {"size": 36}, + "valueformat": f".{decimals}f", + }, gauge={ - "axis": {"range": [0, max_value]}, + "axis": {"range": [0, max_value], "tickformat": f".{decimals}f"}, "bar": {"color": color}, "steps": [ {"range": [0, max_value * 0.6], "color": "lightgray"}, From 0cab9331ce4632176b6dc9252846993b7ddb4386 Mon Sep 17 00:00:00 2001 From: sunjqa1 Date: Tue, 17 Mar 2026 06:32:09 +0000 Subject: [PATCH 3/3] Unified Directory of test results --- dashboard/app.py | 2 +- dashboard/pages/training.py | 2 +- dashboard/utils/data_loader.py | 2 +- dashboard/utils/data_sources.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dashboard/app.py b/dashboard/app.py index c6bd8ab..5bbbfb2 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -65,7 +65,7 @@ def main(): st.markdown("---") results_dir = st.text_input( - "测试结果目录", value="../output", help="包含 JSON/CSV 测试结果的目录" + "测试结果目录", value="./output", help="包含 JSON/CSV 测试结果的目录" ) if not use_mongodb and results_dir != str( diff --git a/dashboard/pages/training.py b/dashboard/pages/training.py index c3f713b..1db0a07 100644 --- a/dashboard/pages/training.py +++ b/dashboard/pages/training.py @@ -30,7 +30,7 @@ def main(): runs = load_training_runs(dl) if not runs: - st.info("未找到训练测试结果\n请将训练测试结果放在 test_output/train/ 或 test_output/training/ 目录下") + st.info("未找到训练测试结果\n请将训练测试结果放在 output/train/ 或 output/training/ 目录下") return # Sidebar Filters diff --git a/dashboard/utils/data_loader.py b/dashboard/utils/data_loader.py index b605920..f843617 100644 --- a/dashboard/utils/data_loader.py +++ b/dashboard/utils/data_loader.py @@ -28,7 +28,7 @@ class InfiniMetricsDataLoader: def __init__( self, - results_dir: str = "../output", + results_dir: str = "./output", use_mongodb: bool = False, mongo_config=None, fallback_to_files: bool = True, diff --git a/dashboard/utils/data_sources.py b/dashboard/utils/data_sources.py index f3abf21..7ce2269 100644 --- a/dashboard/utils/data_sources.py +++ b/dashboard/utils/data_sources.py @@ -51,7 +51,7 @@ def source_type(self) -> str: class FileDataSource(DataSource): """File-based data source (reads from JSON/CSV files).""" - def __init__(self, results_dir: str = "../output"): + def __init__(self, results_dir: str = "./output"): self.results_dir = Path(results_dir) @property