Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/pulsing-actor/src/cluster/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ impl GossipCluster {

pub async fn unregister_named_actor(&self, path: &ActorPath) {
let key = path.as_str();
let actor_id = {
let named = self.state.named_actors.read().await;
named
.get(&key)
.and_then(|info| info.instances.get(&self.state.local_node))
.map(|inst| inst.actor_id)
};
{
let mut named = self.state.named_actors.write().await;
if let Some(info) = named.get_mut(&key) {
Expand All @@ -680,6 +687,9 @@ impl GossipCluster {
}
}
}
if let Some(aid) = actor_id {
self.unregister_actor(&aid).await;
}
let msg = GossipMessage::NamedActorUnregistered {
path: path.clone(),
node_id: self.state.local_node,
Expand Down
2 changes: 2 additions & 0 deletions crates/pulsing-actor/src/system/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl ActorSystem {
.register_named_actor_full(path.clone(), actor_id, metadata)
.await;
}
// So refer(actor_id) / lookup_actor can resolve this actor on any node
cluster.register_actor(actor_id).await;
}
}
} else {
Expand Down
157 changes: 52 additions & 105 deletions crates/pulsing-py/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1878,19 +1878,18 @@ impl PyActorSystem {

pyo3_async_runtimes::tokio::future_into_py(py, async move {
let members = system.members().await;
// Return all fields as strings for safe JSON serialization
let result: Vec<std::collections::HashMap<String, String>> = members
.into_iter()
.map(|m| {
let mut map = std::collections::HashMap::new();
// Use string representation to avoid JSON integer overflow
map.insert("node_id".to_string(), m.node_id.0.to_string());
map.insert("addr".to_string(), m.addr.to_string());
map.insert("status".to_string(), format!("{:?}", m.status));
map
})
.collect();
Ok(result)
Python::with_gil(|py| -> PyResult<PyObject> {
use pyo3::types::PyDict;
let list = pyo3::types::PyList::empty(py);
for m in members {
let dict = PyDict::new(py);
dict.set_item("node_id", m.node_id.0)?;
dict.set_item("addr", m.addr.to_string())?;
dict.set_item("status", format!("{:?}", m.status))?;
list.append(dict)?;
}
Ok(list.into_pyobject(py)?.into())
})
})
}

Expand Down Expand Up @@ -1920,45 +1919,24 @@ impl PyActorSystem {
ActorPath::new(&name).map_err(to_py_value_err)?
};
let instances = system.get_named_instances_detailed(&path).await;
let result: Vec<std::collections::HashMap<String, serde_json::Value>> = instances
.into_iter()
.map(|(member, instance_opt)| {
let mut map = std::collections::HashMap::new();
// Use decimal string for node_id to match members() format
map.insert(
"node_id".to_string(),
serde_json::Value::String(member.node_id.0.to_string()),
);
map.insert(
"addr".to_string(),
serde_json::Value::String(member.addr.to_string()),
);
map.insert(
"status".to_string(),
serde_json::Value::String(format!("{:?}", member.status)),
);

// Add detailed instance info if available

Python::with_gil(|py| -> PyResult<PyObject> {
use pyo3::types::PyDict;
let list = pyo3::types::PyList::empty(py);
for (member, instance_opt) in instances {
let dict = PyDict::new(py);
dict.set_item("node_id", member.node_id.0)?;
dict.set_item("addr", member.addr.to_string())?;
dict.set_item("status", format!("{:?}", member.status))?;
if let Some(inst) = instance_opt {
// Use decimal string for actor_id to match other APIs
map.insert(
"actor_id".to_string(),
serde_json::Value::String(inst.actor_id.0.to_string()),
);
// Add metadata fields
dict.set_item("actor_id", inst.actor_id.0)?;
for (k, v) in inst.metadata {
map.insert(k, serde_json::Value::String(v));
dict.set_item(k, v)?;
}
}

map
})
.collect();

Python::with_gil(|py| -> PyResult<PyObject> {
use pythonize::pythonize;
let pyobj = pythonize(py, &result)?;
Ok(pyobj.into())
list.append(dict)?;
}
Ok(list.into_pyobject(py)?.into())
})
})
}
Expand All @@ -1971,64 +1949,33 @@ impl PyActorSystem {
let all_named = system.all_named_actors().await;

Python::with_gil(|py| -> PyResult<PyObject> {
use pythonize::pythonize;
let result: Vec<std::collections::HashMap<String, serde_json::Value>> = all_named
.into_iter()
.map(|info| {
let mut map = std::collections::HashMap::new();
map.insert(
"path".to_string(),
serde_json::Value::String(info.path.as_str().to_string()),
);
map.insert(
"instance_count".to_string(),
serde_json::Value::Number(serde_json::Number::from(
info.instance_count(),
)),
);
// Convert instance_nodes (HashSet<NodeId>) to list of node IDs as decimal strings
let instances: Vec<serde_json::Value> = info
.instance_nodes
.iter()
.map(|id| serde_json::Value::String(id.0.to_string()))
.collect();
map.insert("instances".to_string(), serde_json::Value::Array(instances));

// Add detailed instance info if available
let detailed: Vec<serde_json::Value> = info
.instances
.iter()
.map(|(node_id, inst)| {
let mut inst_map = serde_json::Map::new();
// Use decimal string to match members() format
inst_map.insert(
"node_id".to_string(),
serde_json::Value::String(node_id.0.to_string()),
);
inst_map.insert(
"actor_id".to_string(),
serde_json::Value::String(inst.actor_id.0.to_string()),
);
// Add metadata
for (k, v) in &inst.metadata {
inst_map
.insert(k.clone(), serde_json::Value::String(v.clone()));
}
serde_json::Value::Object(inst_map)
})
.collect();
if !detailed.is_empty() {
map.insert(
"detailed_instances".to_string(),
serde_json::Value::Array(detailed),
);
use pyo3::types::PyDict;
let list = pyo3::types::PyList::empty(py);
for info in all_named {
let dict = PyDict::new(py);
dict.set_item("path", info.path.as_str())?;
dict.set_item("instance_count", info.instance_count())?;
let instances_list = pyo3::types::PyList::empty(py);
for id in &info.instance_nodes {
instances_list.append(id.0)?;
}
dict.set_item("instances", instances_list)?;
if !info.instances.is_empty() {
let detailed_list = pyo3::types::PyList::empty(py);
for (node_id, inst) in &info.instances {
let inst_dict = PyDict::new(py);
inst_dict.set_item("node_id", node_id.0)?;
inst_dict.set_item("actor_id", inst.actor_id.0)?;
for (k, v) in &inst.metadata {
inst_dict.set_item(k, v)?;
}
detailed_list.append(inst_dict)?;
}

map
})
.collect();
let pyobj = pythonize(py, &result)?;
Ok(pyobj.into())
dict.set_item("detailed_instances", detailed_list)?;
}
list.append(dict)?;
}
Ok(list.into_pyobject(py)?.into())
})
})
}
Expand Down
4 changes: 3 additions & 1 deletion docs/src/design/cluster-networking.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ In Rust, `ActorSystem::new(config)` builds a `NamingBackend`: if `config.head_ad

---

## Init in Ray: how it works
## Init in Ray / Bootstrap: how it works

The recommended Python API for Ray or torchrun is **`pulsing.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`**; it runs `init_in_ray` and/or `init_in_torchrun` in the background.

- Pulsing runs inside a **Ray** cluster. Each process that uses Pulsing calls `init_in_ray()` (or `async_init_in_ray()`).
- **Seed discovery** uses Ray’s **internal KV store**:
Expand Down
4 changes: 3 additions & 1 deletion docs/src/design/cluster-networking.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ Gossip 的节奏与行为由 `GossipConfig` 控制:`gossip_interval`、`fanout

---

## Init in Ray:实现原理
## Init in Ray / Bootstrap:实现原理

推荐在 Ray 或 torchrun 下使用统一入口 **`pulsing.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`**;其后台会执行 `init_in_ray` 和/或 `init_in_torchrun`。

- Pulsing 运行在 **Ray** 集群内。每个使用 Pulsing 的进程调用 `init_in_ray()`(或 `async_init_in_ray()`)。
- **Seed 发现**使用 Ray 的 **internal KV**:
Expand Down
32 changes: 17 additions & 15 deletions docs/src/examples/llm_inference.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ The router needs an **actor system address** so workers can join the same cluste
```bash
pulsing actor pulsing.serving.Router \
--addr 0.0.0.0:8000 \
--http_host 0.0.0.0 \
--name my-llm \
-- \
--http_port 8080 \
--model_name my-llm \
--model_name gpt2 \
--worker_name worker
```

Expand All @@ -34,22 +35,23 @@ You can run **one or more** workers. Each worker should join the router node via
### Option A: Transformers worker (Terminal B)

```bash
pulsing actor pulsing.serving.worker.TransformersWorker \
--model_name gpt2 \
--device cpu \
pulsing actor pulsing.serving.TransformersWorker \
--addr 0.0.0.0:8001 \
--seeds 127.0.0.1:8000 \
--name worker
--name worker \
-- \
--model_name gpt2
```

### Option B: vLLM worker (Terminal C)

```bash
pulsing actor pulsing.serving.vllm.VllmWorker \
--model Qwen/Qwen2.5-0.5B \
--addr 0.0.0.0:8002 \
--seeds 127.0.0.1:8000 \
--name worker
--name worker \
-- \
--model Qwen/Qwen2.5-0.5B
```

## 3) Verify cluster + workers
Expand All @@ -71,25 +73,25 @@ pulsing inspect cluster --seeds 127.0.0.1:8000
### Non-streaming

```bash
curl -s http://localhost:8080/v1/chat/completions \
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"my-llm","messages":[{"role":"user","content":"Hello"}],"stream":false}'
-d '{"model": "gpt2", "messages": [{"role": "user", "content": "Hello"}], "stream": false}'
```

### Streaming (SSE)

```bash
curl -N http://localhost:8080/v1/chat/completions \
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"my-llm","messages":[{"role":"user","content":"Tell me a joke"}],"stream":true}'
-d '{"model": "gpt2", "messages": [{"role": "user", "content": "Tell me a joke"}], "stream": true}'
```

## Troubleshooting

- If you see `No available workers`, ensure:
- router is started with `--addr`
- workers join via `--seeds <router_addr>`
- the worker actor name is `worker` (default)
- router is started with `--addr` and workers join via `--seeds <router_addr>`
- the worker actor **name** matches: workers started with `--name worker` (before `--`), or start the router with `--worker_name <name>` (after `--`) to match your worker name
- check: `pulsing inspect actors --seeds 127.0.0.1:8000` — you should see an actor with the name the router is looking for (default `worker`)

See also:

Expand Down
32 changes: 17 additions & 15 deletions docs/src/examples/llm_inference.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ Router 需要指定 **actor system 地址**,以便其它进程启动的 worker
```bash
pulsing actor pulsing.serving.Router \
--addr 0.0.0.0:8000 \
--http_host 0.0.0.0 \
--name my-llm \
-- \
--http_port 8080 \
--model_name my-llm \
--model_name gpt2 \
--worker_name worker
```

Expand All @@ -34,22 +35,23 @@ pulsing actor pulsing.serving.Router \
### 方案 A:Transformers Worker(终端 B)

```bash
pulsing actor pulsing.serving.worker.TransformersWorker \
--model_name gpt2 \
--device cpu \
pulsing actor pulsing.serving.TransformersWorker \
--addr 0.0.0.0:8001 \
--seeds 127.0.0.1:8000 \
--name worker
--name worker \
-- \
--model_name gpt2
```

### 方案 B:vLLM Worker(终端 C)

```bash
pulsing actor pulsing.serving.vllm.VllmWorker \
--model Qwen/Qwen2.5-0.5B \
--addr 0.0.0.0:8002 \
--seeds 127.0.0.1:8000 \
--name worker
--name worker \
-- \
--model Qwen/Qwen2.5-0.5B
```

## 3)验证集群与 worker
Expand All @@ -71,25 +73,25 @@ pulsing inspect cluster --seeds 127.0.0.1:8000
### 非流式

```bash
curl -s http://localhost:8080/v1/chat/completions \
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"my-llm","messages":[{"role":"user","content":"Hello"}],"stream":false}'
-d '{"model": "gpt2", "messages": [{"role": "user", "content": "Hello"}], "stream": false}'
```

### 流式(SSE)

```bash
curl -N http://localhost:8080/v1/chat/completions \
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"my-llm","messages":[{"role":"user","content":"Tell me a joke"}],"stream":true}'
-d '{"model": "gpt2", "messages": [{"role": "user", "content": "讲个笑话"}], "stream": true}'
```

## 排障

- 如果出现 `No available workers`,请检查:
- router 是否带了 `--addr`
- worker 是否通过 `--seeds <router_addr>` 加入
- worker actor 名称是否为 `worker`(默认)
- Router 已用 `--addr` 启动,Worker 已用 `--seeds <router_addr>` 加入
- **名字一致**:Worker 用 `--name worker`(`--` 前)启动,或 Router 用 `--worker_name <名字>`(`--` 后)与 Worker 一致
- 执行 `pulsing inspect actors --seeds 127.0.0.1:8000`,确认能看到 Router 在找的名字(默认 `worker`

更多:

Expand Down
Loading
Loading