Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-build-env/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ runs:
run: |
test -e ~/.cargo/bin/cargo-zigbuild || cargo install cargo-zigbuild
test -e ~/.cargo/bin/rnr || cargo install rnr
test -e ~/.cargo/bin/cargo-nextest || cargo install cargo-nextest
test -e ~/.cargo/bin/cargo-nextest || cargo install --locked cargo-nextest
test -e ~/.cargo/bin/cargo-binstall || cargo install cargo-binstall
test -e ~/.cargo/bin/dx || cargo binstall dioxus-cli@0.7.0 -y
test -e ~/.cargo/bin/trunk || cargo install trunk --locked
Expand Down
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Documentation dependencies for Probing
# Auto-generated from pyproject.toml

mkdocs>=1.5.0
mkdocs-material>=9.4.0
mkdocs==1.6.1
mkdocs-material>=9.7.2
mkdocs-material-extensions>=1.3.0
mkdocs-static-i18n>=1.0.0
mkdocstrings>=0.24.0
Expand Down
1 change: 1 addition & 0 deletions docs/src/design/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ Probing's core mission is simple: **make distributed systems feel Pythonic again
| [Profiling](profiling.md) | Performance data collection |
| [Debugging](debugging.md) | Debugging capabilities |
| [Distributed](distributed.md) | Multi-node support |
| [Cluster with Pulsing](cluster-pulsing.md) | Using Pulsing for membership and failure detection |
| [Extensibility](extensibility.md) | Custom tables and metrics |
1 change: 1 addition & 0 deletions docs/src/design/index.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ Probing 的核心使命很简单:**让分布式系统重新变得 Pythonic**
| [性能分析](profiling.md) | 性能数据收集 |
| [调试](debugging.md) | 调试能力 |
| [分布式](distributed.md) | 多节点支持 |
| [基于 Pulsing 的集群](cluster-pulsing.zh.md) | 使用 Pulsing 做成员发现与故障检测 |
| [扩展机制](extensibility.md) | 自定义表和指标 |
15 changes: 13 additions & 2 deletions probing/extensions/python/src/extensions/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl PythonExt {
})
}

/// Handle eval request
/// Handle eval request. Catches panics from REPL so a single bad request cannot crash the server.
fn handle_eval(&self, body: &[u8]) -> Result<Vec<u8>, EngineError> {
let code = String::from_utf8(body.to_vec()).map_err(|e| {
log::error!("Failed to convert body to UTF-8 string: {e}");
Expand All @@ -175,7 +175,18 @@ impl PythonExt {
log::debug!("Python eval code: {code}");

let mut repl = PythonRepl::default();
Ok(repl.process(code.as_str()).unwrap_or_default().into_bytes())
let out =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| repl.process(code.as_str())));
match out {
Ok(Some(s)) => Ok(s.into_bytes()),
Ok(None) => Ok(Vec::new()),
Err(_) => {
log::error!("Python REPL process panicked; returning error response");
Ok(serde_json::json!({"error": "REPL execution panicked"})
.to_string()
.into_bytes())
}
}
}

/// Set up a Python crash handler
Expand Down
14 changes: 11 additions & 3 deletions probing/extensions/python/src/features/pprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ impl PprofHolder {
let _ = self.0.lock().map(|mut holder| {
match ProfilerGuardBuilder::default().frequency(freq).build() {
Ok(ph) => holder.replace(ph),
Err(_) => todo!(),
Err(e) => {
log::error!("pprof ProfilerGuard build failed (freq={freq}): {e}; profiling unavailable");
None
}
};
});
}
Expand All @@ -38,12 +41,17 @@ impl PprofHolder {
// }

pub fn flamegraph(&self) -> Result<String> {
let holder = self.0.lock().unwrap();
let holder = self
.0
.lock()
.map_err(|e| anyhow::anyhow!("pprof lock poisoned: {e}"))?;

if let Some(pp) = holder.as_ref() {
let report = pp.report().build()?;
let mut graph: Vec<u8> = vec![];
report.flamegraph(&mut graph).unwrap();
report
.flamegraph(&mut graph)
.map_err(|e| anyhow::anyhow!("pprof flamegraph write failed: {e}"))?;
let graph = String::from_utf8(graph)?;
Ok(graph)
} else {
Expand Down
77 changes: 41 additions & 36 deletions probing/extensions/python/src/features/torch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,42 @@ struct Frame {
module: String,
}

const TORCH_QUERY: &str = r#"
select module, stage, median(CAST(duration AS DOUBLE))
from python.torch_trace
where module <> 'None'
group by module, stage
order by (stage, module);
"#;

/// Query torch profiling data. Prefer the global ENGINE (server's engine) so that
/// when PROBING_TORCH_PROFILING=on the flamegraph uses the same data as the UI.
fn query_profiling_impl() -> Result<probing_proto::types::DataFrame> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?;

rt.block_on(async {
let engine = probing_core::ENGINE.read().await;
let result = engine
.async_query(TORCH_QUERY)
.await
.map_err(|e| anyhow::anyhow!("Torch query failed: {e}"))?;
Ok(result.unwrap_or_default())
})
}

pub fn query_profiling() -> Result<Vec<String>> {
let data = thread::spawn(|| -> Result<probing_proto::types::DataFrame> {
// Use global ENGINE first (server's engine with python.torch_trace data)
match query_profiling_impl() {
Ok(df) => return Ok(df),
Err(e) => {
log::debug!("Global engine torch query failed ({e}), trying minimal engine");
}
}
// Fallback: build a minimal engine (e.g. when not running inside server)
let engine = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
Expand All @@ -25,42 +59,13 @@ pub fn query_profiling() -> Result<Vec<String>> {
.build()
.await
})?;

let query = r#"
select module, stage, median(duration)
from python.torch_trace
where module <> 'None'
group by module, stage
order by (stage, module);
"#;

// Check if we're already inside a tokio runtime to avoid nested runtime panic
match tokio::runtime::Handle::try_current() {
Ok(_handle) => {
// Inside a runtime, spawn a new thread
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { engine.async_query(query).await })
})
.join()
.map_err(|_| anyhow::anyhow!("Thread panicked"))?
.map_err(|e| anyhow::anyhow!(e))?
.ok_or_else(|| anyhow::anyhow!("Query returned no data"))
}
Err(_) => {
// Not in a runtime, create a new one
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
.block_on(async { engine.async_query(query).await })?
.ok_or_else(|| anyhow::anyhow!("Query returned no data"))
}
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
Ok(rt
.block_on(async { engine.async_query(TORCH_QUERY).await })?
.unwrap_or_default())
})
.join()
.map_err(|_| anyhow::anyhow!("error joining thread"))??;
Expand Down
38 changes: 21 additions & 17 deletions probing/extensions/python/src/repl/console.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
use pyo3::ffi::c_str;
use pyo3::{
types::{PyAnyMethods, PyDict},
Bound, Py, PyAny, Python,
Py, PyAny, Python,
};

use crate::repl::python_repl::PythonConsole;

pub struct NativePythonConsole {
console: Py<PyAny>,
/// None if import or debug_console lookup failed (avoids panic in Default).
console: Option<Py<PyAny>>,
}

impl Default for NativePythonConsole {
#[inline(never)]
fn default() -> Self {
Self {
console: Python::with_gil(|py| {
let global = PyDict::new(py);
let code = c_str!("from probing.repl import debug_console");
let _ = py.run(code, Some(&global), Some(&global));
let ret: Bound<'_, PyAny> = global
.get_item("debug_console")
.map_err(|err| {
eprintln!("error initializing console: {err}");
})
.unwrap();
ret.unbind()
}),
}
let console = Python::with_gil(|py| {
let global = PyDict::new(py);
let code = c_str!("from probing.repl import debug_console");
if py.run(code, Some(&global), Some(&global)).is_err() {
log::warn!("probing.repl import failed; REPL will be unavailable");
return None;
}
match global.get_item("debug_console") {
Ok(ret) => Some(ret.unbind()),
Err(e) => {
log::warn!("error initializing console (debug_console not found or failed): {e}; REPL will be unavailable");
None
}
}
});
Self { console }
}
}

impl PythonConsole for NativePythonConsole {
fn try_execute(&mut self, cmd: String) -> Option<String> {
Python::with_gil(|py| match self.console.call_method1(py, "push", (cmd,)) {
let console = self.console.as_ref()?;
Python::with_gil(|py| match console.call_method1(py, "push", (cmd,)) {
Ok(obj) => {
if obj.is_none(py) {
None
Expand Down
4 changes: 2 additions & 2 deletions python/probing/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
>>> import probing
>>> # Execute a simple SQL query
>>> df = probing.query("SHOW TABLES")
>>> type(df)
<class 'pandas.core.frame.DataFrame'>
>>> type(df) # doctest: +ELLIPSIS
<class '...DataFrame'>

>>> # Load a custom extension
>>> mod = probing.load_extension("probing.ext.example")
Expand Down
8 changes: 8 additions & 0 deletions python/probing/ext/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ def collective_hook():
trace_all_collectives(verbose=is_true(trace_verbose))


_hook_registered = False


def init():
global _hook_registered
if _hook_registered:
return
_hook_registered = True

from torch.optim.optimizer import register_optimizer_step_post_hook

register_optimizer_step_post_hook(optimizer_step_post_hook)
Expand Down
36 changes: 35 additions & 1 deletion python/probing/handlers/pythonext.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def get_chrome_tracing(limit: int = 1000) -> str:
# Query trace events from the database
# IMPORTANT: Order by timestamp ASC to process events in chronological order
# This ensures span_start events are processed before their corresponding span_end events
if limit is None:
limit = 1000
limit_clause = f" LIMIT {limit}" if limit > 0 else ""
query = f"""
SELECT
Expand Down Expand Up @@ -491,7 +493,10 @@ def start_trace(
watch_list = []
silent_watch_list = watch or []

trace(function, watch=watch_list, silent_watch=silent_watch_list, depth=depth)
depth_val = 1 if depth is None else depth
trace(
function, watch=watch_list, silent_watch=silent_watch_list, depth=depth_val
)
return json.dumps({"success": True, "message": f"Started tracing {function}"})
except Exception as e:
return json.dumps({"success": False, "error": str(e)})
Expand All @@ -516,6 +521,33 @@ def stop_trace(function: str) -> str:
return json.dumps({"success": False, "error": str(e)})


@ext_handler(
"pythonext",
["magics", "pythonext/magics", "python/magics"],
)
def get_magics_list() -> str:
"""Get magic commands as JSON for UI quick actions.

Returns:
JSON string: [{"group": "Trace", "items": [{"label": "...", "command": "..."}, ...]}, ...]
"""
try:
from probing.repl import debug_console
from probing.repl.help_magic import get_magics_for_ui

if (
debug_console
and getattr(debug_console, "code_executor", None)
and debug_console.code_executor
):
shell = debug_console.code_executor.km.kernel.shell
result = get_magics_for_ui(shell)
return json.dumps(result)
return "[]"
except Exception as e:
return json.dumps({"error": str(e), "traceback": traceback.format_exc()})


@ext_handler("pythonext", "trace/variables")
def get_trace_variables(function: Optional[str] = None, limit: int = 100) -> str:
"""Get trace variables from database.
Expand All @@ -530,6 +562,8 @@ def get_trace_variables(function: Optional[str] = None, limit: int = 100) -> str
try:
import probing

if limit is None:
limit = 100
# Try with python namespace first, fallback to direct table name
if function:
queries = [
Expand Down
42 changes: 27 additions & 15 deletions python/probing/inspect/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,43 @@

_last_full_refresh_time = 0
FULL_REFRESH_INTERVAL_SECONDS = 5 * 60
# Limit number of objects to scan to avoid DoS in large processes
MAX_OBJECTS_TO_SCAN = 200_000


def update_cache(x):
"""Add a single object to the appropriate cache if it is a Tensor/Module/Optimizer."""
import torch

idx = id(x)
if isinstance(x, torch.Tensor):
if idx not in tensor_cache:
tensor_cache[idx] = weakref.ref(x)
return tensor_cache[idx]
if isinstance(x, torch.nn.Module):
if idx not in module_cache:
module_cache[idx] = weakref.ref(x)
return module_cache[idx]
if isinstance(x, torch.optim.Optimizer):
if idx not in optim_cache:
optim_cache[idx] = weakref.ref(x)
return optim_cache[idx]
try:
idx = id(x)
if isinstance(x, torch.Tensor):
if idx not in tensor_cache:
tensor_cache[idx] = weakref.ref(x)
return tensor_cache[idx]
if isinstance(x, torch.nn.Module):
if idx not in module_cache:
module_cache[idx] = weakref.ref(x)
return module_cache[idx]
if isinstance(x, torch.optim.Optimizer):
if idx not in optim_cache:
optim_cache[idx] = weakref.ref(x)
return optim_cache[idx]
except (ReferenceError, TypeError, AttributeError, RuntimeError):
pass
return None


def refresh_cache():
import gc

for obj in gc.get_objects():
update_cache(obj)
objects = gc.get_objects()
n = min(len(objects), MAX_OBJECTS_TO_SCAN) if MAX_OBJECTS_TO_SCAN else len(objects)
for i in range(n):
try:
update_cache(objects[i])
except (ReferenceError, TypeError, AttributeError, RuntimeError):
continue
global _last_full_refresh_time
_last_full_refresh_time = time.time()

Expand Down
Loading
Loading