diff --git a/crates/pulsing-actor/src/cluster/gossip.rs b/crates/pulsing-actor/src/cluster/gossip.rs index 4a4fc5963..a6fa72ae5 100644 --- a/crates/pulsing-actor/src/cluster/gossip.rs +++ b/crates/pulsing-actor/src/cluster/gossip.rs @@ -671,6 +671,13 @@ impl GossipCluster { pub async fn unregister_named_actor(&self, path: &ActorPath) { let key = path.as_str(); + let actor_id = { + let named = self.state.named_actors.read().await; + named + .get(&key) + .and_then(|info| info.instances.get(&self.state.local_node)) + .map(|inst| inst.actor_id) + }; { let mut named = self.state.named_actors.write().await; if let Some(info) = named.get_mut(&key) { @@ -680,6 +687,9 @@ impl GossipCluster { } } } + if let Some(aid) = actor_id { + self.unregister_actor(&aid).await; + } let msg = GossipMessage::NamedActorUnregistered { path: path.clone(), node_id: self.state.local_node, diff --git a/crates/pulsing-actor/src/system/spawn.rs b/crates/pulsing-actor/src/system/spawn.rs index 1bca313a4..83961fcf5 100644 --- a/crates/pulsing-actor/src/system/spawn.rs +++ b/crates/pulsing-actor/src/system/spawn.rs @@ -99,6 +99,8 @@ impl ActorSystem { .register_named_actor_full(path.clone(), actor_id, metadata) .await; } + // So refer(actor_id) / lookup_actor can resolve this actor on any node + cluster.register_actor(actor_id).await; } } } else { diff --git a/crates/pulsing-py/src/actor.rs b/crates/pulsing-py/src/actor.rs index e0a8d782a..b4c16e579 100644 --- a/crates/pulsing-py/src/actor.rs +++ b/crates/pulsing-py/src/actor.rs @@ -1878,19 +1878,18 @@ impl PyActorSystem { pyo3_async_runtimes::tokio::future_into_py(py, async move { let members = system.members().await; - // Return all fields as strings for safe JSON serialization - let result: Vec> = members - .into_iter() - .map(|m| { - let mut map = std::collections::HashMap::new(); - // Use string representation to avoid JSON integer overflow - map.insert("node_id".to_string(), m.node_id.0.to_string()); - map.insert("addr".to_string(), m.addr.to_string()); - map.insert("status".to_string(), format!("{:?}", m.status)); - map - }) - .collect(); - Ok(result) + Python::with_gil(|py| -> PyResult { + use pyo3::types::PyDict; + let list = pyo3::types::PyList::empty(py); + for m in members { + let dict = PyDict::new(py); + dict.set_item("node_id", m.node_id.0)?; + dict.set_item("addr", m.addr.to_string())?; + dict.set_item("status", format!("{:?}", m.status))?; + list.append(dict)?; + } + Ok(list.into_pyobject(py)?.into()) + }) }) } @@ -1920,45 +1919,24 @@ impl PyActorSystem { ActorPath::new(&name).map_err(to_py_value_err)? }; let instances = system.get_named_instances_detailed(&path).await; - let result: Vec> = instances - .into_iter() - .map(|(member, instance_opt)| { - let mut map = std::collections::HashMap::new(); - // Use decimal string for node_id to match members() format - map.insert( - "node_id".to_string(), - serde_json::Value::String(member.node_id.0.to_string()), - ); - map.insert( - "addr".to_string(), - serde_json::Value::String(member.addr.to_string()), - ); - map.insert( - "status".to_string(), - serde_json::Value::String(format!("{:?}", member.status)), - ); - - // Add detailed instance info if available + + Python::with_gil(|py| -> PyResult { + use pyo3::types::PyDict; + let list = pyo3::types::PyList::empty(py); + for (member, instance_opt) in instances { + let dict = PyDict::new(py); + dict.set_item("node_id", member.node_id.0)?; + dict.set_item("addr", member.addr.to_string())?; + dict.set_item("status", format!("{:?}", member.status))?; if let Some(inst) = instance_opt { - // Use decimal string for actor_id to match other APIs - map.insert( - "actor_id".to_string(), - serde_json::Value::String(inst.actor_id.0.to_string()), - ); - // Add metadata fields + dict.set_item("actor_id", inst.actor_id.0)?; for (k, v) in inst.metadata { - map.insert(k, serde_json::Value::String(v)); + dict.set_item(k, v)?; } } - - map - }) - .collect(); - - Python::with_gil(|py| -> PyResult { - use pythonize::pythonize; - let pyobj = pythonize(py, &result)?; - Ok(pyobj.into()) + list.append(dict)?; + } + Ok(list.into_pyobject(py)?.into()) }) }) } @@ -1971,64 +1949,33 @@ impl PyActorSystem { let all_named = system.all_named_actors().await; Python::with_gil(|py| -> PyResult { - use pythonize::pythonize; - let result: Vec> = all_named - .into_iter() - .map(|info| { - let mut map = std::collections::HashMap::new(); - map.insert( - "path".to_string(), - serde_json::Value::String(info.path.as_str().to_string()), - ); - map.insert( - "instance_count".to_string(), - serde_json::Value::Number(serde_json::Number::from( - info.instance_count(), - )), - ); - // Convert instance_nodes (HashSet) to list of node IDs as decimal strings - let instances: Vec = info - .instance_nodes - .iter() - .map(|id| serde_json::Value::String(id.0.to_string())) - .collect(); - map.insert("instances".to_string(), serde_json::Value::Array(instances)); - - // Add detailed instance info if available - let detailed: Vec = info - .instances - .iter() - .map(|(node_id, inst)| { - let mut inst_map = serde_json::Map::new(); - // Use decimal string to match members() format - inst_map.insert( - "node_id".to_string(), - serde_json::Value::String(node_id.0.to_string()), - ); - inst_map.insert( - "actor_id".to_string(), - serde_json::Value::String(inst.actor_id.0.to_string()), - ); - // Add metadata - for (k, v) in &inst.metadata { - inst_map - .insert(k.clone(), serde_json::Value::String(v.clone())); - } - serde_json::Value::Object(inst_map) - }) - .collect(); - if !detailed.is_empty() { - map.insert( - "detailed_instances".to_string(), - serde_json::Value::Array(detailed), - ); + use pyo3::types::PyDict; + let list = pyo3::types::PyList::empty(py); + for info in all_named { + let dict = PyDict::new(py); + dict.set_item("path", info.path.as_str())?; + dict.set_item("instance_count", info.instance_count())?; + let instances_list = pyo3::types::PyList::empty(py); + for id in &info.instance_nodes { + instances_list.append(id.0)?; + } + dict.set_item("instances", instances_list)?; + if !info.instances.is_empty() { + let detailed_list = pyo3::types::PyList::empty(py); + for (node_id, inst) in &info.instances { + let inst_dict = PyDict::new(py); + inst_dict.set_item("node_id", node_id.0)?; + inst_dict.set_item("actor_id", inst.actor_id.0)?; + for (k, v) in &inst.metadata { + inst_dict.set_item(k, v)?; + } + detailed_list.append(inst_dict)?; } - - map - }) - .collect(); - let pyobj = pythonize(py, &result)?; - Ok(pyobj.into()) + dict.set_item("detailed_instances", detailed_list)?; + } + list.append(dict)?; + } + Ok(list.into_pyobject(py)?.into()) }) }) } diff --git a/docs/src/design/cluster-networking.md b/docs/src/design/cluster-networking.md index 8ae2775ab..11b2445ea 100644 --- a/docs/src/design/cluster-networking.md +++ b/docs/src/design/cluster-networking.md @@ -67,7 +67,9 @@ In Rust, `ActorSystem::new(config)` builds a `NamingBackend`: if `config.head_ad --- -## Init in Ray: how it works +## Init in Ray / Bootstrap: how it works + +The recommended Python API for Ray or torchrun is **`pulsing.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`**; it runs `init_in_ray` and/or `init_in_torchrun` in the background. - Pulsing runs inside a **Ray** cluster. Each process that uses Pulsing calls `init_in_ray()` (or `async_init_in_ray()`). - **Seed discovery** uses Ray’s **internal KV store**: diff --git a/docs/src/design/cluster-networking.zh.md b/docs/src/design/cluster-networking.zh.md index f3fdef1a5..0bcd7574f 100644 --- a/docs/src/design/cluster-networking.zh.md +++ b/docs/src/design/cluster-networking.zh.md @@ -67,7 +67,9 @@ Gossip 的节奏与行为由 `GossipConfig` 控制:`gossip_interval`、`fanout --- -## Init in Ray:实现原理 +## Init in Ray / Bootstrap:实现原理 + +推荐在 Ray 或 torchrun 下使用统一入口 **`pulsing.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`**;其后台会执行 `init_in_ray` 和/或 `init_in_torchrun`。 - Pulsing 运行在 **Ray** 集群内。每个使用 Pulsing 的进程调用 `init_in_ray()`(或 `async_init_in_ray()`)。 - **Seed 发现**使用 Ray 的 **internal KV**: diff --git a/docs/src/examples/llm_inference.md b/docs/src/examples/llm_inference.md index bf03eb7f9..4a7ce687f 100644 --- a/docs/src/examples/llm_inference.md +++ b/docs/src/examples/llm_inference.md @@ -21,9 +21,10 @@ The router needs an **actor system address** so workers can join the same cluste ```bash pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ - --http_host 0.0.0.0 \ + --name my-llm \ + -- \ --http_port 8080 \ - --model_name my-llm \ + --model_name gpt2 \ --worker_name worker ``` @@ -34,22 +35,23 @@ You can run **one or more** workers. Each worker should join the router node via ### Option A: Transformers worker (Terminal B) ```bash -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ +pulsing actor pulsing.serving.TransformersWorker \ --addr 0.0.0.0:8001 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model_name gpt2 ``` ### Option B: vLLM worker (Terminal C) ```bash pulsing actor pulsing.serving.vllm.VllmWorker \ - --model Qwen/Qwen2.5-0.5B \ --addr 0.0.0.0:8002 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model Qwen/Qwen2.5-0.5B ``` ## 3) Verify cluster + workers @@ -71,25 +73,25 @@ pulsing inspect cluster --seeds 127.0.0.1:8000 ### Non-streaming ```bash -curl -s http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{"model":"my-llm","messages":[{"role":"user","content":"Hello"}],"stream":false}' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "Hello"}], "stream": false}' ``` ### Streaming (SSE) ```bash -curl -N http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{"model":"my-llm","messages":[{"role":"user","content":"Tell me a joke"}],"stream":true}' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "Tell me a joke"}], "stream": true}' ``` ## Troubleshooting - If you see `No available workers`, ensure: - - router is started with `--addr` - - workers join via `--seeds ` - - the worker actor name is `worker` (default) + - router is started with `--addr` and workers join via `--seeds ` + - the worker actor **name** matches: workers started with `--name worker` (before `--`), or start the router with `--worker_name ` (after `--`) to match your worker name + - check: `pulsing inspect actors --seeds 127.0.0.1:8000` — you should see an actor with the name the router is looking for (default `worker`) See also: diff --git a/docs/src/examples/llm_inference.zh.md b/docs/src/examples/llm_inference.zh.md index 58732d297..59e6f08ad 100644 --- a/docs/src/examples/llm_inference.zh.md +++ b/docs/src/examples/llm_inference.zh.md @@ -21,9 +21,10 @@ Router 需要指定 **actor system 地址**,以便其它进程启动的 worker ```bash pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ - --http_host 0.0.0.0 \ + --name my-llm \ + -- \ --http_port 8080 \ - --model_name my-llm \ + --model_name gpt2 \ --worker_name worker ``` @@ -34,22 +35,23 @@ pulsing actor pulsing.serving.Router \ ### 方案 A:Transformers Worker(终端 B) ```bash -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ +pulsing actor pulsing.serving.TransformersWorker \ --addr 0.0.0.0:8001 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model_name gpt2 ``` ### 方案 B:vLLM Worker(终端 C) ```bash pulsing actor pulsing.serving.vllm.VllmWorker \ - --model Qwen/Qwen2.5-0.5B \ --addr 0.0.0.0:8002 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model Qwen/Qwen2.5-0.5B ``` ## 3)验证集群与 worker @@ -71,25 +73,25 @@ pulsing inspect cluster --seeds 127.0.0.1:8000 ### 非流式 ```bash -curl -s http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{"model":"my-llm","messages":[{"role":"user","content":"Hello"}],"stream":false}' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "Hello"}], "stream": false}' ``` ### 流式(SSE) ```bash -curl -N http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{"model":"my-llm","messages":[{"role":"user","content":"Tell me a joke"}],"stream":true}' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "讲个笑话"}], "stream": true}' ``` ## 排障 - 如果出现 `No available workers`,请检查: - - router 是否带了 `--addr` - - worker 是否通过 `--seeds ` 加入 - - worker actor 名称是否为 `worker`(默认) + - Router 已用 `--addr` 启动,Worker 已用 `--seeds ` 加入 + - **名字一致**:Worker 用 `--name worker`(`--` 前)启动,或 Router 用 `--worker_name <名字>`(`--` 后)与 Worker 一致 + - 执行 `pulsing inspect actors --seeds 127.0.0.1:8000`,确认能看到 Router 在找的名字(默认 `worker`) 更多: diff --git a/docs/src/guide/operations.md b/docs/src/guide/operations.md index baf6dfd10..93a732ba5 100644 --- a/docs/src/guide/operations.md +++ b/docs/src/guide/operations.md @@ -6,7 +6,14 @@ Pulsing ships with built-in CLI tools for starting actors, inspecting systems, a ## Starting Actors -The `pulsing actor` command starts actors by providing their full class path. The CLI automatically matches command-line arguments to the Actor's constructor parameters. +The `pulsing actor` command starts actors by providing their full class path. Arguments are split by `--` so that **actor-level options** (e.g. `--addr`, `--seeds`, `--name`) and **Actor constructor arguments** never collide. + +### Parameter separation (`--`) + +- **Before `--`**: All arguments are passed to the `actor` subcommand as-is (positional actor type + any options such as `--addr`, `--seeds`, `--name`). +- **After `--`**: All `--key value` pairs are collected and passed to the Actor's constructor. Use this to avoid name collision with actor-level options. + +If you omit `--`, only the arguments recognized by the `actor` subcommand are used; constructor args must then be passed via `-D actor.extra_kwargs='{"key":"value"}'` or by using `--` when you need to pass both. ### Format @@ -22,34 +29,36 @@ Actor type must be a full class path: #### Router (OpenAI-compatible HTTP API) ```bash +# Actor-level (addr, name) before --; Router constructor args after -- pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ - --http_host 0.0.0.0 \ + --name my-llm \ + -- \ --http_port 8080 \ - --model_name my-llm \ - --worker_name worker \ - --scheduler_type stream_load + --model_name gpt2 \ + --worker_name worker ``` #### Transformers Worker ```bash -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ +pulsing actor pulsing.serving.TransformersWorker \ --addr 0.0.0.0:8001 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model_name gpt2 ``` #### vLLM Worker ```bash pulsing actor pulsing.serving.vllm.VllmWorker \ - --model Qwen/Qwen2 \ --addr 0.0.0.0:8002 \ --seeds 127.0.0.1:8000 \ --name worker \ + -- \ + --model Qwen/Qwen2 \ --role aggregated \ --max_new_tokens 512 ``` @@ -58,62 +67,37 @@ pulsing actor pulsing.serving.vllm.VllmWorker \ ```bash # Start multiple workers with different names -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ +pulsing actor pulsing.serving.TransformersWorker \ --name worker-1 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- --model_name gpt2 -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ +pulsing actor pulsing.serving.TransformersWorker \ --name worker-2 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- --model_name gpt2 # Router targeting specific worker name pulsing actor pulsing.serving.Router \ - --worker_name worker-1 \ - --seeds 127.0.0.1:8000 + --addr 0.0.0.0:8000 \ + --seeds 127.0.0.1:8000 \ + -- --worker_name worker-1 ``` -### Common Options +### Common options (before `--`) - `--name NAME`: Actor name (default: "worker") - `--addr ADDR`: Actor System bind address - `--seeds SEEDS`: Comma-separated list of seed nodes -- Any other `--param value` pairs matching the Actor's constructor signature - -### How It Works - -```bash -# Pass parameters directly as command-line arguments -pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ - --preload true \ - --name my-worker \ - --seeds 127.0.0.1:8000 - -# Start vLLM worker with all parameters -pulsing actor pulsing.serving.vllm.VllmWorker \ - --model Qwen/Qwen2 \ - --role aggregated \ - --max_new_tokens 512 \ - --name vllm-worker \ - --seeds 127.0.0.1:8000 -``` -Options: -- `--name NAME`: Actor name (default: "worker") -- `--addr ADDR`: Actor System bind address -- `--seeds SEEDS`: Comma-separated list of seed nodes -- Any other `--param value` pairs matching the Actor's constructor signature +Arguments after `--` are passed to the Actor's constructor as `--key value` pairs. The Actor class must: - Be importable from the specified module path -- Inherit from `pulsing.core.Actor` -- Have a constructor with named parameters (the CLI automatically matches arguments to constructor parameters) +- Be an `pulsing.core.Actor` subclass or a `@pulsing.remote` class +- Have a constructor with named parameters (arguments after `--` are matched to constructor parameters) -**How it works:** -The CLI inspects the Actor class constructor signature and automatically extracts matching parameters from command-line arguments. You can use `--help` to see available parameters, or check the Actor class documentation. +**How it works:** The CLI passes everything before `--` to the actor subcommand, and collects every `--key value` after `--` into the Actor constructor. Use `pulsing actor --help` to see actor-level options; for constructor parameters, see the Actor class documentation. --- @@ -227,10 +211,11 @@ pulsing bench gpt2 --url http://localhost:8080 | Task | Command | |------|---------| -| Start router | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 --http_port 8080` | -| Start worker | `pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --seeds ...` | -| Start multiple workers | `pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --name worker-1 --seeds ...` | -| Router with custom worker | `pulsing actor pulsing.serving.Router --worker_name worker-1 --seeds ...` | +| Start router | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 --name my-llm -- --http_port 8080 --model_name gpt2 --worker_name worker` | +| Start worker | `pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 --name worker -- --model_name gpt2` | +| Chat completions | `curl -X POST http://localhost:8080/v1/chat/completions -H "Content-Type: application/json" -d '{"model":"gpt2","messages":[{"role":"user","content":"Hello"}],"stream":false}'` | +| Start multiple workers | `pulsing actor ... --name worker-1 --seeds ... -- --model_name gpt2` | +| Router with custom worker | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 -- --worker_name worker-1` | | List actors | `pulsing inspect actors --endpoint 127.0.0.1:8000` | | Inspect cluster | `pulsing inspect cluster --seeds 127.0.0.1:8000` | | Inspect actors | `pulsing inspect actors --seeds 127.0.0.1:8000 --top 10` | diff --git a/docs/src/guide/operations.zh.md b/docs/src/guide/operations.zh.md index bb148c9fb..6dd8870fb 100644 --- a/docs/src/guide/operations.zh.md +++ b/docs/src/guide/operations.zh.md @@ -6,7 +6,14 @@ Pulsing 内置 CLI 工具,用于启动 actors、检查系统和基准测试分 ## 启动 Actor -`pulsing actor` 命令通过提供完整的类路径来启动 actors。CLI 会自动将命令行参数匹配到 Actor 的构造函数参数。 +`pulsing actor` 通过完整类路径启动 actor。参数以 `--` 分隔,避免 **actor 级选项**(如 `--addr`、`--seeds`、`--name`)与 **Actor 构造参数** 重名。 + +### 参数分隔(`--`) + +- **`--` 之前**:整段原样传给 `actor` 子命令(位置参数:actor 类型 + 任意选项如 `--addr`、`--seeds`、`--name`)。 +- **`--` 之后**:所有 `--key value` 会收集并传给 Actor 的构造函数。用于传入构造参数,避免与 actor 级选项冲突。 + +若不写 `--`,则只会使用 `actor` 子命令能识别的参数;若要同时传 actor 级与构造参数,请使用 `--` 分隔,或通过 `-D actor.extra_kwargs='{"key":"value"}'` 传构造参数。 ### 格式 @@ -22,8 +29,11 @@ Actor 类型必须是完整的类路径: #### Router(OpenAI 兼容 HTTP API) ```bash +# actor 级(addr、name)在 -- 前;Router 构造参数在 -- 后 pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ + --name my-llm \ + -- \ --http_host 0.0.0.0 \ --http_port 8080 \ --model_name my-llm \ @@ -35,21 +45,23 @@ pulsing actor pulsing.serving.Router \ ```bash pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ --addr 0.0.0.0:8001 \ --seeds 127.0.0.1:8000 \ - --name worker + --name worker \ + -- \ + --model_name gpt2 \ + --device cpu ``` #### vLLM Worker ```bash pulsing actor pulsing.serving.vllm.VllmWorker \ - --model Qwen/Qwen2 \ --addr 0.0.0.0:8002 \ --seeds 127.0.0.1:8000 \ --name worker \ + -- \ + --model Qwen/Qwen2 \ --role aggregated \ --max_new_tokens 512 ``` @@ -59,36 +71,36 @@ pulsing actor pulsing.serving.vllm.VllmWorker \ ```bash # 启动多个不同名称的 worker pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ --name worker-1 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- --model_name gpt2 pulsing actor pulsing.serving.worker.TransformersWorker \ - --model_name gpt2 \ --name worker-2 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- --model_name gpt2 # Router 路由到特定 worker 名称 pulsing actor pulsing.serving.Router \ - --worker_name worker-1 \ - --seeds 127.0.0.1:8000 + --addr 0.0.0.0:8000 \ + --seeds 127.0.0.1:8000 \ + -- --worker_name worker-1 ``` -### 通用选项 +### 通用选项(`--` 之前) - `--name NAME`: Actor 名称(默认: "worker") - `--addr ADDR`: Actor System 绑定地址 - `--seeds SEEDS`: 逗号分隔的种子节点列表 -- 任何其他 `--param value` 参数对,匹配 Actor 的构造函数签名 -### 工作原理 - -CLI 会检查 Actor 类的构造函数签名,并自动从命令行参数中提取匹配的参数。可以使用 `--help` 查看可用参数,或查看 Actor 类的文档。 +`--` 之后的参数以 `--key value` 形式传入 Actor 构造函数。 Actor 类必须: -- 可以从指定的模块路径导入 +- 可从指定模块路径导入 - 继承自 `pulsing.core.Actor` -- 具有带命名参数的构造函数(CLI 会自动将参数匹配到构造函数参数) +- 构造函数为命名参数(`--` 后的参数会匹配到构造参数) + +**工作原理**:`--` 之前的整段原样传给 actor 子命令;`--` 之后的每个 `--key value` 会收集并传入 Actor 构造函数。可用 `pulsing actor --help` 查看 actor 级选项;构造参数见各 Actor 类文档。 --- @@ -209,10 +221,10 @@ pulsing bench gpt2 --url http://localhost:8080 | 任务 | 命令 | |------|------| -| 启动 router | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 --http_port 8080` | -| 启动 worker | `pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --seeds ...` | -| 启动多个 worker | `pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --name worker-1 --seeds ...` | -| Router 指定 worker | `pulsing actor pulsing.serving.Router --worker_name worker-1 --seeds ...` | +| 启动 router | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 -- --http_port 8080 --model_name my-llm` | +| 启动 worker | `pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8001 --seeds ... -- --model_name gpt2` | +| 启动多个 worker | `pulsing actor ... --name worker-1 --seeds ... -- --model_name gpt2` | +| Router 指定 worker | `pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 -- --worker_name worker-1` | | 列出 actors | `pulsing inspect actors --endpoint 127.0.0.1:8000` | | 检查集群 | `pulsing inspect cluster --seeds 127.0.0.1:8000` | | 检查 actors | `pulsing inspect actors --seeds 127.0.0.1:8000 --top 10` | diff --git a/docs/src/guide/style.md b/docs/src/guide/style.md index 046652375..3d333fe95 100644 --- a/docs/src/guide/style.md +++ b/docs/src/guide/style.md @@ -27,12 +27,14 @@ This page defines terminology and style conventions for Pulsing documentation an ### Starting Actors +Arguments are split by `--`: before `--` go to the actor subcommand (e.g. `--addr`, `--seeds`, `--name`); after `--` are passed to the Actor constructor. + ```bash -pulsing actor [options] +pulsing actor [options] [-- constructor-args] -# Examples -pulsing actor pulsing.serving.Router --http_port 8080 --model_name my-llm -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --device cpu +# Examples (use -- to separate actor-level options from constructor args) +pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 -- --http_port 8080 --model_name my-llm +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8001 -- --model_name gpt2 --device cpu ``` ### Inspect Commands (Observer Mode) diff --git a/docs/src/guide/style.zh.md b/docs/src/guide/style.zh.md index 694b87e29..2cda8cb2a 100644 --- a/docs/src/guide/style.zh.md +++ b/docs/src/guide/style.zh.md @@ -27,12 +27,14 @@ ### 启动 Actor +参数以 `--` 分隔:`--` 前为 actor 子命令选项(如 `--addr`、`--seeds`、`--name`),`--` 后为 Actor 构造参数。 + ```bash -pulsing actor <完整类路径> [选项] +pulsing actor <完整类路径> [选项] [-- 构造参数] -# 示例 -pulsing actor pulsing.serving.Router --http_port 8080 --model_name my-llm -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --device cpu +# 示例(用 -- 区分 actor 级选项与构造参数) +pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 -- --http_port 8080 --model_name my-llm +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8001 -- --model_name gpt2 --device cpu ``` ### 检查命令(观察者模式) diff --git a/docs/src/quickstart/cluster_networking.md b/docs/src/quickstart/cluster_networking.md index 256b755e9..d66a57044 100644 --- a/docs/src/quickstart/cluster_networking.md +++ b/docs/src/quickstart/cluster_networking.md @@ -10,10 +10,17 @@ This page describes **how to form and use** a Pulsing cluster. For protocol and |------|--------------------|----------| | **Gossip + seed** | Bind address + optional seed addresses to join | Kubernetes, VMs, bare metal; no single point of failure | | **Head node** | One node as head, others with head address | Simple ops; one fixed coordinator address | -| **Init in Ray** | `init_in_ray()` in each process; no seeds | Already using Ray; automatic seed discovery | +| **Bootstrap (Ray/torchrun)** | `bootstrap(ray=..., torchrun=..., ...)`; no seeds | Already using Ray or torchrun; auto-detect or specify backend | All modes use a **single HTTP/2 port** per node. No etcd, NATS, or Redis. +### When to use `init()` vs `bootstrap()` + +- **`await pul.init(addr=..., seeds=...)`** or **`await pul.init()`** (standalone): use when **you** have the config (bind address, seed list, or no cluster). You control how the process joins. +- **`pul.bootstrap(ray=..., torchrun=..., ...)`**: use when the process is **already inside** a Ray or torchrun job and you want Pulsing to **auto-join** that environment (no seeds to pass; bootstrap discovers or uses Ray/torch.distributed). +- **Do not mix for the same “first init”**: pick one. If you already called `init()`, you do **not** need `bootstrap()`. Calling `bootstrap(on_ready=...)` after `init()` is safe (on_ready runs immediately because the system is already initialized). +- **Using only init (no bootstrap)** is fine: use `await pul.init(...)` for explicit config, `init_in_ray()` in a Ray process, and `init_in_torchrun()` in torchrun. All set the global system the same way. You give up a single "auto-detect" entry point (bootstrap tries both backends) and the optional "init in background then wait" pattern; otherwise behavior is the same. + --- ## Mode 1: Gossip + seed @@ -116,32 +123,47 @@ You can also use `SystemConfig.with_head_node()` / `.with_head_addr(addr)` and p --- -## Mode 3: Init in Ray +## Mode 3: Bootstrap (Ray / torchrun) -### Requirements - -- Ray installed and `ray.init()` called before `init_in_ray()` -- Every process that uses Pulsing (driver and workers) must call `init_in_ray()` in that process +Use the **bootstrap** API to form a Pulsing cluster when you are already in a Ray or torchrun environment. Omit `ray`/`torchrun` or pass both `True` to auto-detect (try Ray first, then torchrun). ### Usage +**Recommended: unified bootstrap** + +```python +import pulsing as pul + +# Auto-detect: try Ray then torchrun (default) +pul.bootstrap() +# Or block until ready +if pul.bootstrap(wait_timeout=30): + system = pul.get_system() + +# Only Ray +pul.bootstrap(ray=True, torchrun=False, wait_timeout=10) + +# Only torchrun (e.g. launched with torchrun) +pul.bootstrap(ray=False, torchrun=True, on_ready=lambda s: print("ready:", s)) +``` + +**Ray cluster: driver + workers** + +In a Ray cluster, the driver can use `bootstrap(ray=True, ...)`. Every worker process must also initialize Pulsing; use `worker_process_setup_hook` so each worker runs `init_in_ray` on startup: + ```python import ray from pulsing.integrations.ray import init_in_ray -# Recommended: hook so every worker runs init_in_ray at startup ray.init(runtime_env={"worker_process_setup_hook": init_in_ray}) -# Driver must also init -init_in_ray() - -# Use Pulsing as usual +# Driver: bootstrap (calls init_in_ray under the hood when Ray is available) import pulsing as pul -@pul.remote -class MyActor: - def run(self): return "ok" - -actor = await MyActor.spawn(name="my_actor") +if pul.bootstrap(ray=True, torchrun=False, wait_timeout=30): + @pul.remote + class MyActor: + def run(self): return "ok" + actor = await MyActor.spawn(name="my_actor") ``` **Async** (e.g. async Ray actors): @@ -160,31 +182,31 @@ cleanup() ### When to use -- You already run Ray and want Pulsing on the same nodes as one cluster -- You want one-line cluster formation per process without managing seeds or head address -- You are okay depending on Ray’s KV only for bootstrap; after that Pulsing uses its own gossip +- You already run Ray or torchrun and want Pulsing on the same nodes as one cluster +- You want one API (`bootstrap`) to auto-detect or choose backend without managing seeds or head address +- You are okay depending on Ray KV or torch.distributed only for bootstrap; after that Pulsing uses its own gossip ### Limitations -- Requires Ray and its internal KV -- Every process must call `init_in_ray()` (driver explicitly; workers via hook) +- Ray path requires Ray and its internal KV; torchrun path requires `torch.distributed.init_process_group()` (e.g. torchrun) +- In a Ray cluster, every process must run Pulsing init (driver via `bootstrap(ray=True)` or `init_in_ray()`; workers via `worker_process_setup_hook`) - One Pulsing cluster per Ray cluster (one KV key) --- ## Comparison and choice -| Criterion | Gossip + seed | Head node | Init in Ray | -|-----------|----------------|-----------|-------------| -| External deps | None | None | Ray | +| Criterion | Gossip + seed | Head node | Bootstrap (Ray/torchrun) | +|-----------|----------------|-----------|--------------------------| +| External deps | None | None | Ray and/or PyTorch | | Single point of failure | No | Yes (head) | No | -| Config | addr + optional seeds | addr + head addr or head role | None (Ray KV) | -| Best environment | K8s, VMs, bare metal | One coordinator OK | Existing Ray cluster | -| Python `init()` | `addr`, `seeds` | Via SystemConfig if exposed | `init_in_ray()` | +| Config | addr + optional seeds | addr + head addr or head role | `bootstrap(ray=..., torchrun=...)` | +| Best environment | K8s, VMs, bare metal | One coordinator OK | Existing Ray or torchrun | +| Python init | `addr`, `seeds` | Via SystemConfig if exposed | `bootstrap()` or `init_in_ray` / `init_in_torchrun` | **Suggested choice:** -- **Already on Ray** → **Init in Ray** +- **Already on Ray or torchrun** → **Bootstrap** (`pul.bootstrap(ray=..., torchrun=...)`) - **No SPOF, no Ray** → **Gossip + seed** (use a K8s Service as seed when on K8s) - **One fixed coordinator, simple ops** → **Head node** @@ -194,7 +216,7 @@ cleanup() 1. **Gossip + seed**: In K8s use a Service as seed; keep one port open for all nodes (actor + gossip). 2. **Head node**: Run head on a stable host/port; tune heartbeat timeout under load. -3. **Init in Ray**: Call `init_in_ray()` in the driver and set `worker_process_setup_hook`; use `cleanup()` in tests if needed. +3. **Bootstrap**: Prefer `pul.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`; in a Ray cluster set `worker_process_setup_hook=init_in_ray` for workers. Use `cleanup()` in tests if needed. 4. **Security**: For any mode, enable TLS (e.g. passphrase) for cluster traffic — see [Security](../guide/security.md). --- diff --git a/docs/src/quickstart/cluster_networking.zh.md b/docs/src/quickstart/cluster_networking.zh.md index d3cfe5174..12bc3e375 100644 --- a/docs/src/quickstart/cluster_networking.zh.md +++ b/docs/src/quickstart/cluster_networking.zh.md @@ -10,10 +10,17 @@ |------|----------------|----------| | **Gossip + seed** | 绑定地址 + 可选 seed 地址以加入 | Kubernetes、VM、裸机;无单点故障 | | **Head 节点** | 一个节点作 Head,其余填 Head 地址 | 运维简单;一个固定协调地址 | -| **Init in Ray** | 每个进程调用 `init_in_ray()`,无需 seeds | 已在用 Ray;自动发现 seed | +| **Bootstrap (Ray/torchrun)** | `bootstrap(ray=..., torchrun=..., ...)`,无需 seeds | 已在用 Ray 或 torchrun;自动检测或指定后端 | 所有方式每节点**单一 HTTP/2 端口**,不依赖 etcd、NATS、Redis。 +### 何时用 `init()`、何时用 `bootstrap()` + +- **`await pul.init(addr=..., seeds=...)`** 或 **`await pul.init()`**(单机):当你**自己**有配置(绑定地址、seed 列表或不做集群)时用,由你决定进程如何加入。 +- **`pul.bootstrap(ray=..., torchrun=..., ...)`**:当进程**已经**在 Ray 或 torchrun 任务里、希望 Pulsing **自动加入**该环境时用(无需传 seeds,由 bootstrap 通过 Ray/torch.distributed 发现或组网)。 +- **同一次「首次初始化」只选其一**:二选一。若已经调过 `init()`,就**不需要**再调 `bootstrap()`。在 `init()` 之后调 `bootstrap(on_ready=...)` 是安全的(系统已就绪,on_ready 会立刻执行)。 +- **坚持都走 init(不用 bootstrap)** 也可以:显式配置用 `await pul.init(...)`,Ray 里用 `init_in_ray()`,torchrun 里用 `init_in_torchrun()`,效果相同,都会设置全局 system。只是没有「一个入口自动试两个后端」的便利,也没有「后台 init 再在主线程 wait」的用法;其它行为一致。 + --- ## 方式一:Gossip + seed @@ -116,32 +123,47 @@ await pul.init(addr="0.0.0.0:8001", head_addr="192.168.1.10:8000") --- -## 方式三:Init in Ray +## 方式三:Bootstrap(Ray / torchrun) -### 前置条件 - -- 已安装 Ray,且先执行 `ray.init()` 再调用 `init_in_ray()` -- 每个使用 Pulsing 的进程(driver 与 worker)都必须在该进程中调用 `init_in_ray()` +在 Ray 或 torchrun 环境下组建 Pulsing 集群时,使用 **bootstrap** 接口。不传 `ray`/`torchrun` 或两者都传 `True` 时会自动检测(先试 Ray,再试 torchrun)。 ### 用法 +**推荐:统一使用 bootstrap** + +```python +import pulsing as pul + +# 自动检测:先试 Ray 再试 torchrun(默认) +pul.bootstrap() +# 或阻塞直到就绪 +if pul.bootstrap(wait_timeout=30): + system = pul.get_system() + +# 仅 Ray +pul.bootstrap(ray=True, torchrun=False, wait_timeout=10) + +# 仅 torchrun(例如用 torchrun 启动) +pul.bootstrap(ray=False, torchrun=True, on_ready=lambda s: print("ready:", s)) +``` + +**Ray 集群:driver + workers** + +在 Ray 集群中,driver 可调用 `bootstrap(ray=True, ...)`。每个 worker 进程也必须初始化 Pulsing,用 `worker_process_setup_hook` 让各 worker 启动时执行 `init_in_ray`: + ```python import ray from pulsing.integrations.ray import init_in_ray -# 推荐:用 hook 让每个 worker 启动时执行 init_in_ray ray.init(runtime_env={"worker_process_setup_hook": init_in_ray}) -# driver 也必须初始化 -init_in_ray() - -# 按常规使用 Pulsing +# Driver:bootstrap(在 Ray 可用时内部会调 init_in_ray) import pulsing as pul -@pul.remote -class MyActor: - def run(self): return "ok" - -actor = await MyActor.spawn(name="my_actor") +if pul.bootstrap(ray=True, torchrun=False, wait_timeout=30): + @pul.remote + class MyActor: + def run(self): return "ok" + actor = await MyActor.spawn(name="my_actor") ``` **异步**(如 async Ray actor): @@ -160,31 +182,31 @@ cleanup() ### 何时选用 -- 已在用 Ray,希望 Pulsing 在同一批节点上组成一个集群 -- 希望每个进程一行代码完成组网,无需自己维护 seed 或 Head 地址 -- 能接受仅在启动阶段依赖 Ray 的 KV;之后仅用 Pulsing 自己的 gossip +- 已在用 Ray 或 torchrun,希望 Pulsing 在同一批节点上组成一个集群 +- 希望用统一 API(`bootstrap`)自动检测或指定后端,无需维护 seed 或 Head 地址 +- 能接受仅在启动阶段依赖 Ray KV 或 torch.distributed;之后仅用 Pulsing 自己的 gossip ### 限制 -- 依赖 Ray 及其 internal KV -- 每个进程都必须调用 `init_in_ray()`(driver 显式;worker 通过 hook) +- Ray 路径依赖 Ray 及其 internal KV;torchrun 路径需先调用 `torch.distributed.init_process_group()`(如用 torchrun 启动) +- Ray 集群下每个进程都需完成 Pulsing 初始化(driver 通过 `bootstrap(ray=True)` 或 `init_in_ray()`;worker 通过 `worker_process_setup_hook`) - 一个 Ray 集群对应一个 Pulsing 集群(一个 KV key) --- ## 对比与选型 -| 维度 | Gossip + seed | Head 节点 | Init in Ray | -|------|----------------|-----------|-------------| -| 外部依赖 | 无 | 无 | Ray | +| 维度 | Gossip + seed | Head 节点 | Bootstrap (Ray/torchrun) | +|------|----------------|-----------|--------------------------| +| 外部依赖 | 无 | 无 | Ray 和/或 PyTorch | | 单点故障 | 无 | 有(Head) | 无 | -| 配置 | addr + 可选 seeds | addr + Head 地址或 Head 角色 | 无(Ray KV) | -| 适用环境 | K8s、VM、裸机 | 可接受单一协调节点 | 已有 Ray 集群 | -| Python init() | `addr`、`seeds` | 通过 SystemConfig(若暴露) | `init_in_ray()` | +| 配置 | addr + 可选 seeds | addr + Head 地址或 Head 角色 | `bootstrap(ray=..., torchrun=...)` | +| 适用环境 | K8s、VM、裸机 | 可接受单一协调节点 | 已有 Ray 或 torchrun | +| Python init() | `addr`、`seeds` | 通过 SystemConfig(若暴露) | `bootstrap()` 或 `init_in_ray` / `init_in_torchrun` | **选型建议:** -- **已有 Ray** → **Init in Ray** +- **已有 Ray 或 torchrun** → **Bootstrap**(`pul.bootstrap(ray=..., torchrun=...)`) - **不要单点且不用 Ray** → **Gossip + seed**(K8s 下用 Service 作 seed) - **一个固定协调节点、运维简单** → **Head 节点** @@ -194,7 +216,7 @@ cleanup() 1. **Gossip + seed**:K8s 下用 Service 作 seed;各节点开放同一端口(Actor + Gossip)。 2. **Head 节点**:Head 部署在稳定主机/端口;根据负载调整心跳超时。 -3. **Init in Ray**:Driver 中调用 `init_in_ray()` 并设置 `worker_process_setup_hook`;测试中如需可调用 `cleanup()`。 +3. **Bootstrap**:优先使用 `pul.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...)`;Ray 集群下为 worker 设置 `worker_process_setup_hook=init_in_ray`。测试中如需可调用 `cleanup()`。 4. **安全**:任意方式均可为集群流量开启 TLS(如 passphrase),见 [安全](../guide/security.zh.md)。 --- diff --git a/docs/src/quickstart/llm_inference.md b/docs/src/quickstart/llm_inference.md index 2b2536ba9..2e6e66712 100644 --- a/docs/src/quickstart/llm_inference.md +++ b/docs/src/quickstart/llm_inference.md @@ -55,15 +55,15 @@ Open **Terminal A**: ```bash pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ + -- \ --http_port 8080 \ --model_name my-llm ``` | Flag | Description | |------|-------------| -| `--addr` | Actor system address (workers join here) | -| `--http_port` | OpenAI-compatible HTTP endpoint | -| `--model_name` | Model name in API responses | +| `--addr` (before `--`) | Actor system address (workers join here) | +| `--http_port`, `--model_name` (after `--`) | Router constructor: HTTP port, model name in API responses | --- @@ -75,25 +75,27 @@ Open **Terminal B**: ```bash pulsing actor pulsing.serving.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ --addr 0.0.0.0:8001 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- \ + --model_name gpt2 \ + --device cpu ``` === "vLLM (GPU)" ```bash pulsing actor pulsing.serving.VllmWorker \ - --model Qwen/Qwen2.5-0.5B \ --addr 0.0.0.0:8002 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + -- \ + --model Qwen/Qwen2.5-0.5B ``` | Flag | Description | |------|-------------| -| `--model` / `--model_name` | Model name/path (TransformersWorker uses `--model_name`, VllmWorker uses `--model`) | -| `--seeds` | Router address to join cluster | +| `--addr`, `--seeds` (before `--`) | Actor-level: bind address, seed nodes | +| `--model` / `--model_name` (after `--`) | Constructor: model name/path | --- @@ -145,10 +147,10 @@ Add more workers to handle more load: ```bash # Terminal C -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --addr 0.0.0.0:8003 --seeds 127.0.0.1:8000 +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8003 --seeds 127.0.0.1:8000 -- --model_name gpt2 # Terminal D -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --addr 0.0.0.0:8004 --seeds 127.0.0.1:8000 +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8004 --seeds 127.0.0.1:8000 -- --model_name gpt2 ``` The Router automatically load-balances across all workers. @@ -159,7 +161,7 @@ The Router automatically load-balances across all workers. | Problem | Solution | |---------|----------| -| `No available workers` | Ensure workers use `--seeds ` | +| `No available workers` | Router looks for actors named `worker` by default. (1) Start workers with `--name worker` (before `--`). (2) Or start Router with `--worker_name ` (after `--`) to match. (3) Workers must use `--seeds `. Check: `pulsing inspect actors --seeds 127.0.0.1:8000` and ensure a `worker` (or your custom name) appears. | | Connection refused | Check router started with `--addr` | | Slow startup | First request loads model weights | diff --git a/docs/src/quickstart/llm_inference.zh.md b/docs/src/quickstart/llm_inference.zh.md index f38a56efb..2d4651602 100644 --- a/docs/src/quickstart/llm_inference.zh.md +++ b/docs/src/quickstart/llm_inference.zh.md @@ -55,15 +55,17 @@ pip install pulsing ```bash pulsing actor pulsing.serving.Router \ --addr 0.0.0.0:8000 \ + --name my-llm \ + -- \ --http_port 8080 \ - --model_name my-llm + --model_name gpt2 \ + --worker_name worker ``` | 参数 | 说明 | |------|------| -| `--addr` | Actor 系统地址(Worker 加入此地址) | -| `--http_port` | OpenAI 兼容 HTTP 端点 | -| `--model_name` | API 响应中的模型名称 | +| `--addr`、`--name`(`--` 前) | Actor 系统地址、Router 名称 | +| `--http_port`、`--model_name`、`--worker_name`(`--` 后) | Router 构造参数:HTTP 端口、API 模型名、目标 worker 名 | --- @@ -75,25 +77,28 @@ pulsing actor pulsing.serving.Router \ ```bash pulsing actor pulsing.serving.TransformersWorker \ - --model_name gpt2 \ - --device cpu \ --addr 0.0.0.0:8001 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + --name worker \ + -- \ + --model_name gpt2 ``` === "vLLM (GPU)" ```bash pulsing actor pulsing.serving.VllmWorker \ - --model Qwen/Qwen2.5-0.5B \ --addr 0.0.0.0:8002 \ - --seeds 127.0.0.1:8000 + --seeds 127.0.0.1:8000 \ + --name worker \ + -- \ + --model Qwen/Qwen2.5-0.5B ``` | 参数 | 说明 | |------|------| -| `--model` / `--model_name` | 模型名称/路径(TransformersWorker 用 `--model_name`,VllmWorker 用 `--model`) | -| `--seeds` | 加入集群的 Router 地址 | +| `--addr`、`--seeds`(`--` 前) | actor 级:绑定地址、种子节点 | +| `--model` / `--model_name`(`--` 后) | 构造参数:模型名称/路径 | --- @@ -116,25 +121,17 @@ pulsing inspect cluster --seeds 127.0.0.1:8000 ### 非流式 ```bash -curl -s http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{ - "model": "my-llm", - "messages": [{"role": "user", "content": "Hello"}], - "stream": false - }' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "Hello"}], "stream": false}' ``` ### 流式 (SSE) ```bash -curl -N http://localhost:8080/v1/chat/completions \ +curl -X POST http://localhost:8080/v1/chat/completions \ -H "Content-Type: application/json" \ - -d '{ - "model": "my-llm", - "messages": [{"role": "user", "content": "讲个笑话"}], - "stream": true - }' + -d '{"model": "gpt2", "messages": [{"role": "user", "content": "讲个笑话"}], "stream": true}' ``` --- @@ -145,10 +142,10 @@ curl -N http://localhost:8080/v1/chat/completions \ ```bash # 终端 C -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --addr 0.0.0.0:8003 --seeds 127.0.0.1:8000 +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8003 --seeds 127.0.0.1:8000 -- --model_name gpt2 # 终端 D -pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --addr 0.0.0.0:8004 --seeds 127.0.0.1:8000 +pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8004 --seeds 127.0.0.1:8000 -- --model_name gpt2 ``` Router 会自动在所有 Worker 间负载均衡。 @@ -159,7 +156,7 @@ Router 会自动在所有 Worker 间负载均衡。 | 问题 | 解决方案 | |------|----------| -| `No available workers` | 确保 Worker 使用 `--seeds ` | +| `No available workers` | Router 默认按名字 `worker` 查找。需 (1) Worker 用 `--name worker`(在 `--` 前)启动,或 (2) Router 用 `--worker_name <名字>`(在 `--` 后)与 Worker 一致;(3) Worker 必须 `--seeds `。可执行 `pulsing inspect actors --seeds 127.0.0.1:8000` 确认是否有名为 `worker` 的 actor。 | | 连接被拒绝 | 检查 Router 是否以 `--addr` 启动 | | 启动慢 | 首次请求需要加载模型权重 | diff --git a/llms.binding.md b/llms.binding.md index a2c22ac19..f74c36560 100644 --- a/llms.binding.md +++ b/llms.binding.md @@ -91,7 +91,7 @@ result = await proxy.any_method(args) `pul.mount` registers any Python object as a Pulsing actor, enabling tight integration between Ray actors and Pulsing. -**Running Pulsing in a Ray cluster:** Every process (driver and workers) must initialize Pulsing. Use `pulsing.ray.init_in_ray()` and pass it in `ray.init(runtime_env={"worker_process_setup_hook": init_in_ray})` so workers call it on startup; the driver must call `init_in_ray()` once in code. See the `pulsing.ray` module for details. +**Running Pulsing in a Ray cluster:** Use `pulsing.bootstrap(ray=True, torchrun=False, wait_timeout=...)` in the driver; set `ray.init(runtime_env={"worker_process_setup_hook": init_in_ray})` so every worker runs `init_in_ray` on startup. See [Cluster Networking](../docs/src/quickstart/cluster_networking.md) and `pulsing.bootstrap` for details. ```python import pulsing as pul diff --git a/python/pulsing/__init__.py b/python/pulsing/__init__.py index 55a410fc0..7513aed5f 100644 --- a/python/pulsing/__init__.py +++ b/python/pulsing/__init__.py @@ -72,6 +72,29 @@ def cleanup_ray(): return cleanup() +# torchrun / torch.distributed integration (lazy import) +def init_inside_torchrun(): + """Initialize Pulsing in current process and join cluster via torch.distributed. + + Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after + torch.distributed.init_process_group() (e.g. when launched with torchrun). + + Usage:: + + import torch.distributed as dist + dist.init_process_group(...) + system = pul.init_inside_torchrun() + """ + from pulsing.integrations.torchrun import init_in_torchrun + + return init_in_torchrun() + + +# Bootstrap: single API — pulsing.bootstrap(ray=..., torchrun=..., on_ready=..., wait_timeout=...) +from pulsing.bootstrap import bootstrap, stop as bootstrap_stop # noqa: E402 + +bootstrap.stop = bootstrap_stop + # Import exceptions from pulsing.exceptions import ( PulsingError, @@ -248,6 +271,8 @@ async def refer(actorid: ActorId | str) -> ActorRef: if isinstance(actorid, str): # Parse string to ActorId actorid = ActorId.from_str(actorid) + if isinstance(actorid, int): + actorid = ActorId(actorid) return await system.refer(actorid) @@ -313,6 +338,10 @@ async def read(self, topic, **kwargs): # Ray integration "init_inside_ray", "cleanup_ray", + # torchrun integration + "init_inside_torchrun", + # Bootstrap (auto cluster in background, wait_ready() for callers) + "bootstrap", # Types "Actor", "ActorSystem", diff --git a/python/pulsing/bootstrap.py b/python/pulsing/bootstrap.py new file mode 100644 index 000000000..5e64ca074 --- /dev/null +++ b/python/pulsing/bootstrap.py @@ -0,0 +1,174 @@ +""" +pulsing.bootstrap - Auto cluster formation; single API. + +Background thread tries to form/join the Pulsing cluster using the chosen backend(s). +Call bootstrap(ray=..., torchrun=..., on_ready=callback, wait_timeout=...) — one standard API. + +若不传 ray / torchrun,默认两者都为 True,即先试 Ray 再试 torchrun,相当于自动检测当前环境。 + +Usage: + from pulsing import bootstrap + + bootstrap() # 不传则自动检测:两个都试(默认) + bootstrap(ray=True, torchrun=False) # only try Ray + bootstrap(ray=False, torchrun=True) # only try torchrun + bootstrap(on_ready=lambda system: ...) # start + callback when ready + if bootstrap(wait_timeout=30): ... # start + block until ready +""" + +from __future__ import annotations + +import logging +import threading +import time +from typing import Callable + +logger = logging.getLogger(__name__) + +_bootstrap_thread: threading.Thread | None = None +_stop = threading.Event() +_on_ready_callbacks: list[Callable[..., None]] = [] +_lock = threading.Lock() + + +def _try_init_once(*, ray: bool, torchrun: bool) -> bool: + """Try to initialize Pulsing with the given backends. Return True if now initialized.""" + from pulsing.core import is_initialized + + if is_initialized(): + return True + + if ray: + try: + from pulsing.integrations.ray import init_in_ray + + init_in_ray() + if is_initialized(): + logger.debug("Pulsing bootstrap: initialized via init_in_ray") + return True + except Exception as e: + logger.debug("Pulsing bootstrap: init_in_ray skipped: %s", e) + + if torchrun: + try: + from pulsing.integrations.torchrun import init_in_torchrun + + init_in_torchrun() + if is_initialized(): + logger.debug("Pulsing bootstrap: initialized via init_in_torchrun") + return True + except Exception as e: + logger.debug("Pulsing bootstrap: init_in_torchrun skipped: %s", e) + + return False + + +def _fire_on_ready() -> None: + from pulsing.core import get_system, is_initialized + + with _lock: + callbacks = _on_ready_callbacks.copy() + _on_ready_callbacks.clear() + if not is_initialized(): + return + system = get_system() + for cb in callbacks: + try: + try: + cb(system) + except TypeError: + cb() + except Exception as e: + logger.warning("bootstrap on_ready callback failed: %s", e) + + +def _bootstrap_loop( + interval_sec: float, + *, + ray: bool, + torchrun: bool, +) -> None: + while not _stop.wait(timeout=interval_sec): + if _try_init_once(ray=ray, torchrun=torchrun): + _fire_on_ready() + return + + +def _start( + interval_sec: float = 2.0, + *, + ray: bool = True, + torchrun: bool = True, +) -> None: + global _bootstrap_thread + if _bootstrap_thread is not None and _bootstrap_thread.is_alive(): + return + _stop.clear() + _bootstrap_thread = threading.Thread( + target=_bootstrap_loop, + args=(interval_sec,), + kwargs={"ray": ray, "torchrun": torchrun}, + name="pulsing-bootstrap", + daemon=True, + ) + _bootstrap_thread.start() + logger.debug( + "Pulsing bootstrap thread started (ray=%s, torchrun=%s)", ray, torchrun + ) + + +def bootstrap( + *, + ray: bool = True, + torchrun: bool = True, + on_ready: Callable[..., None] | None = None, + interval_sec: float = 2.0, + wait_timeout: float | None = None, +) -> bool | None: + """ + Start auto cluster formation in a Ray or torchrun environment. + + Use init(addr=..., seeds=...) (or init() for standalone) when you have explicit + config; use bootstrap() only when this process is inside Ray/torchrun and you + want Pulsing to auto-join. If the system is already initialized (e.g. you + called init() earlier), bootstrap() only runs on_ready if provided. + + - ray: if True, try init_in_ray (default True). 不传则与 torchrun 一起默认 True,即自动检测(两个都试)。 + - torchrun: if True, try init_in_torchrun (default True). + - bootstrap() — 不传 ray/torchrun 时默认两个都试,相当于自动检测环境;start background only. + - bootstrap(ray=False, torchrun=True) — only try torchrun. + - bootstrap(on_ready=callback) — call callback when cluster is ready. + Callback can be () -> None or (system: ActorSystem) -> None. + - bootstrap(wait_timeout=30) — block until ready or timeout; returns True/False. + + Returns True if wait_timeout was set and cluster became ready in time; + False if wait_timeout was set and timed out; None if wait_timeout was not set. + """ + from pulsing.core import is_initialized + + if on_ready is not None: + with _lock: + _on_ready_callbacks.append(on_ready) + if is_initialized(): + _fire_on_ready() + + _start(interval_sec, ray=ray, torchrun=torchrun) + + if wait_timeout is None: + return None + + deadline = time.monotonic() + wait_timeout + while True: + if is_initialized(): + return True + if time.monotonic() >= deadline: + return False + time.sleep(0.2) + + +def stop() -> None: + """Stop the bootstrap background loop.""" + _stop.set() + + +__all__ = ["bootstrap", "stop"] diff --git a/python/pulsing/cli/__main__.py b/python/pulsing/cli/__main__.py index 3baf04f48..0bcbb1743 100644 --- a/python/pulsing/cli/__main__.py +++ b/python/pulsing/cli/__main__.py @@ -9,7 +9,8 @@ def actor( addr: str | None = None, seeds: str | None = None, name: str = "worker", # Actor name (default: "worker") - **kwargs, # Additional arguments for Actor constructor + extra_kwargs: dict + | None = None, # Additional arguments for Actor constructor (--key value from CLI) ): r""" Start an Actor-based service. @@ -23,32 +24,32 @@ def actor( - Example: 'pulsing.serving.VllmWorker' - Example: 'my_module.my_actor.MyCustomActor' - Pass constructor parameters directly as command-line arguments. - The CLI will automatically match parameters to the Actor's constructor signature. + Parameter separation (avoids name collision): + - Actor-level (process/cluster): --addr, --seeds, --name. Pass before \"--\". + - Actor constructor args: pass after \"--\" so they never collide with --addr/--seeds/--name. Note: To list actors, use 'pulsing inspect actors' instead. Args: - actor_type: Full class path (positional argument), e.g., 'pulsing.serving.worker.TransformersWorker' - addr: Actor System bind address (e.g., '0.0.0.0:8000') - seeds: Comma-separated list of seed nodes (e.g., '192.168.1.1:8000,192.168.1.2:8000') - name: Actor name. Default: 'worker'. Use different names to run multiple workers in the same cluster. - **kwargs: Additional arguments matching the Actor's constructor parameters. - Pass parameters directly as command-line arguments, e.g., --model_name gpt2 --device cpu + actor_type: Full class path (positional), e.g. pulsing.serving.Router + addr: Bind address (e.g. 0.0.0.0:8000) + seeds: Comma-separated seed nodes (e.g. 192.168.1.1:8000,192.168.1.2:8000) + name: Actor name (default: worker). Use different names for multiple workers. + extra_kwargs: Constructor arguments. Pass after \"--\" or via -D actor.extra_kwargs='{...}'. Examples: - # Start a Transformers worker - pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --device cpu --name my-worker + # Actor-level before \"--\", constructor args after (no collision) + pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 --name my-llm -- --model_name my-llm --http_port 8080 - # Start a vLLM worker - pulsing actor pulsing.serving.VllmWorker --model Qwen/Qwen2 --role aggregated --max_new_tokens 512 --name vllm-worker + # Start a Transformers worker + pulsing actor pulsing.serving.TransformersWorker --addr 0.0.0.0:8000 -- --model_name gpt2 --device cpu - # Start a Router with OpenAI-compatible API - pulsing actor pulsing.serving.Router --http_host 0.0.0.0 --http_port 8080 --model_name my-llm --worker_name worker + # Start a Router (constructor args after --) + pulsing actor pulsing.serving.Router --addr 0.0.0.0:8000 --name my-llm -- --http_host 0.0.0.0 --http_port 8080 --model_name my-llm --worker_name worker - # Start multiple workers with different names - pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --name worker-1 --seeds 127.0.0.1:8000 - pulsing actor pulsing.serving.TransformersWorker --model_name gpt2 --name worker-2 --seeds 127.0.0.1:8000 + # Multiple workers + pulsing actor pulsing.serving.TransformersWorker --name worker-1 --seeds 127.0.0.1:8000 -- --model_name gpt2 + pulsing actor pulsing.serving.TransformersWorker --name worker-2 --seeds 127.0.0.1:8000 -- --model_name gpt2 """ from .actors import start_generic_actor @@ -73,13 +74,19 @@ def actor( if seeds: seed_list = [s.strip() for s in seeds.split(",") if s.strip()] + # extra_kwargs may be str when passed via -D actor.extra_kwargs='{...}' + import json + + kwargs = extra_kwargs or {} + if isinstance(kwargs, str): + kwargs = json.loads(kwargs) if kwargs.strip() else {} # Start generic Actor class start_generic_actor( actor_type=actor_type, addr=addr, seeds=seed_list, name=name, - extra_kwargs=kwargs, # All additional CLI arguments + extra_kwargs=kwargs, ) @@ -306,11 +313,50 @@ def examples(name: str | None = None): print(f"Quick run:\n python -m pulsing.examples.{name}") +def _collect_key_value_pairs(tokens: list[str]) -> dict: + """Collect --key value pairs from token list into a dict. Keys normalized to snake_case.""" + extra = {} + i = 0 + while i < len(tokens): + a = tokens[i] + if a.startswith("--") and not a.startswith("---") and len(a) > 2: + key = a[2:].replace("-", "_") + if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"): + extra[key] = tokens[i + 1] + i += 2 + continue + i += 1 + return extra + + +def _actor_argv_rewrite(argv: list[str]) -> list[str]: + """Pre-parse 'pulsing actor ...' argv: split on \"--\" only. + + - Before \"--\": entire token list is passed through to the actor subcommand (no name-based parsing). + - After \"--\": collected as --key value and injected as actor.extra_kwargs (constructor args). + - If there is no \"--\", argv is left unchanged. + """ + import json + + if len(argv) < 2 or argv[1] != "actor": + return argv + rest = argv[2:] + if "--" not in rest: + return argv + dash_idx = rest.index("--") + before, after = rest[:dash_idx], rest[dash_idx + 1 :] + extra = _collect_key_value_pairs(after) + if not extra: + return argv + return ( + [argv[0], "actor"] + before + ["-D", f"actor.extra_kwargs={json.dumps(extra)}"] + ) + + def main(): import sys # Make `pulsing examples ` work with positional arguments - # hp framework treats params with default values as --name options, so we convert here if ( len(sys.argv) >= 3 and sys.argv[1] == "examples" @@ -318,6 +364,9 @@ def main(): ): sys.argv = [sys.argv[0], "examples", "--name", sys.argv[2]] + sys.argv[3:] + # Pre-parse 'actor' so --model_name my-llm etc. become actor.extra_kwargs (avoids required kwargs positional) + sys.argv = _actor_argv_rewrite(sys.argv) + hp.launch() diff --git a/python/pulsing/cli/actor_loader.py b/python/pulsing/cli/actor_loader.py index 001073951..ffb85f461 100644 --- a/python/pulsing/cli/actor_loader.py +++ b/python/pulsing/cli/actor_loader.py @@ -1,24 +1,23 @@ -"""Generic Actor loader - dynamically load and instantiate Actor classes""" +"""Generic Actor loader - dynamically load Actor or @remote classes""" import importlib -import json -from typing import Any +from typing import Any, Union from pulsing.core import Actor -def load_actor_class(class_path: str) -> type[Actor]: - """Load Actor class from module path +def load_actor_class(class_path: str) -> Union[type[Actor], Any]: + """Load Actor 或 @remote 类 Args: - class_path: Full class path, e.g., 'pulsing.serving.worker.TransformersWorker' + class_path: 完整类路径,如 'pulsing.serving.worker.TransformersWorker' Returns: - Actor class + Actor 子类或带 .spawn 的 @remote 类 Raises: - ImportError: If module or class cannot be imported - ValueError: If class is not an Actor subclass + ImportError: 模块/类无法导入 + ValueError: 既非 Actor 子类也非 @remote 类(无 .spawn) """ if "." not in class_path: raise ValueError( @@ -26,7 +25,6 @@ def load_actor_class(class_path: str) -> type[Actor]: f"Example: pulsing.serving.worker.TransformersWorker" ) - # Split module path and class name parts = class_path.rsplit(".", 1) if len(parts) != 2: raise ValueError( @@ -36,7 +34,6 @@ def load_actor_class(class_path: str) -> type[Actor]: module_path, class_name = parts try: - # Import module module = importlib.import_module(module_path) except ImportError as e: raise ImportError( @@ -44,7 +41,6 @@ def load_actor_class(class_path: str) -> type[Actor]: f"Make sure the module is installed and the path is correct." ) from e - # Get class from module if not hasattr(module, class_name): raise AttributeError( f"Class '{class_name}' not found in module '{module_path}'.\n" @@ -53,11 +49,13 @@ def load_actor_class(class_path: str) -> type[Actor]: actor_class = getattr(module, class_name) - # Verify it's an Actor subclass - if not isinstance(actor_class, type) or not issubclass(actor_class, Actor): - raise ValueError( - f"'{class_name}' is not an Actor subclass.\n" - f"Expected a class that inherits from pulsing.core.Actor" - ) + # Actor 子类 或 @remote 类(有 .spawn) + if isinstance(actor_class, type) and issubclass(actor_class, Actor): + return actor_class + if hasattr(actor_class, "spawn") and callable(getattr(actor_class, "spawn")): + return actor_class - return actor_class + raise ValueError( + f"'{class_name}' is not an Actor subclass nor a @remote class (no .spawn).\n" + f"Use pulsing.core.Actor or @pulsing.remote." + ) diff --git a/python/pulsing/cli/actors.py b/python/pulsing/cli/actors.py index 1278ddc98..d48ec343e 100644 --- a/python/pulsing/cli/actors.py +++ b/python/pulsing/cli/actors.py @@ -33,12 +33,15 @@ def start_generic_actor( print(f"Error: {e}") return - print(f" Class: {actor_class.__name__}") - print(f" Module: {actor_class.__module__}") + # @remote 返回的是 ActorClass 包装,取原始类用于 __name__ / __module__ / signature + cls_for_display = getattr(actor_class, "_cls", actor_class) - # Get Actor constructor signature + print(f" Class: {cls_for_display.__name__}") + print(f" Module: {cls_for_display.__module__}") + + # Get Actor constructor signature(用原始类) try: - actor_sig = inspect.signature(actor_class.__init__) + actor_sig = inspect.signature(cls_for_display.__init__) except Exception as e: print(f"Error: Cannot inspect Actor constructor: {e}") return @@ -104,38 +107,71 @@ def start_generic_actor( if addr: print(f" Address: {addr}") + is_remote = hasattr(actor_class, "spawn") and callable( + getattr(actor_class, "spawn") + ) + async def run(): - try: - # Instantiate Actor - actor_instance = actor_class(**constructor_kwargs) - except TypeError as e: - print(f"\nError: Failed to instantiate {actor_class.__name__}") - print(f" {e}") - print("\nHint: Check the constructor signature and provided parameters.") - required_params = [ - name - for name, param in actor_sig.parameters.items() - if name != "self" and param.default == inspect.Parameter.empty - ] - if required_params: - print(f" Required parameters: {', '.join(required_params)}") - print(f" All constructor parameters: {', '.join(actor_params)}") - print(f" Provided parameters: {', '.join(constructor_kwargs.keys())}") - return - except Exception as e: - print(f"\nError: Failed to create Actor instance: {e}") - import traceback - - traceback.print_exc() - return - - # Spawn and run - await spawn_and_run( - actor_instance, - name=name, - addr=addr, - seeds=seeds if seeds else None, - public=True, - ) + if is_remote: + # @remote 类:init 后直接 spawn(name=..., **constructor_kwargs) + from pulsing.core import init + from pulsing.core.helpers import run_until_signal + + system = await init(addr=addr, seeds=seeds if seeds else None) + try: + proxy = await actor_class.spawn( + name=name, + public=True, + **constructor_kwargs, + ) + except TypeError as e: + print(f"\nError: Failed to spawn {cls_for_display.__name__}") + print(f" {e}") + print( + "\nHint: Check the constructor signature and provided parameters." + ) + required_params = [ + n + for n, p in actor_sig.parameters.items() + if n != "self" and p.default == inspect.Parameter.empty + ] + if required_params: + print(f" Required parameters: {', '.join(required_params)}") + return + print(f"[{name}] Started at {system.addr}") + await run_until_signal(name) + else: + # Actor 子类:实例化后 spawn_and_run + try: + actor_instance = actor_class(**constructor_kwargs) + except TypeError as e: + print(f"\nError: Failed to instantiate {cls_for_display.__name__}") + print(f" {e}") + print( + "\nHint: Check the constructor signature and provided parameters." + ) + required_params = [ + name + for name, param in actor_sig.parameters.items() + if name != "self" and param.default == inspect.Parameter.empty + ] + if required_params: + print(f" Required parameters: {', '.join(required_params)}") + print(f" All constructor parameters: {', '.join(actor_params)}") + print(f" Provided parameters: {', '.join(constructor_kwargs.keys())}") + return + except Exception as e: + print(f"\nError: Failed to create Actor instance: {e}") + import traceback + + traceback.print_exc() + return + await spawn_and_run( + actor_instance, + name=name, + addr=addr, + seeds=seeds if seeds else None, + public=True, + ) uvloop.run(run()) diff --git a/python/pulsing/core/remote.py b/python/pulsing/core/remote.py index de8fe3fb3..16f22457f 100644 --- a/python/pulsing/core/remote.py +++ b/python/pulsing/core/remote.py @@ -594,13 +594,21 @@ def _inject_delayed(self, actor_ref: ActorRef) -> None: actor_ref, delay_sec ) - def on_start(self, actor_id) -> None: + def on_start(self, actor_id): + """调用用户 on_start;若为 async 则返回 coroutine 供 Rust 端 run_coroutine_threadsafe 执行。""" if hasattr(self._instance, "on_start"): - self._instance.on_start(actor_id) + r = self._instance.on_start(actor_id) + if asyncio.iscoroutine(r): + return r + return None - def on_stop(self) -> None: + def on_stop(self): + """调用用户 on_stop;若为 async 则返回 coroutine 供 Rust 端执行。""" if hasattr(self._instance, "on_stop"): - self._instance.on_stop() + r = self._instance.on_stop() + if asyncio.iscoroutine(r): + return r + return None def metadata(self) -> dict[str, str]: if hasattr(self._instance, "metadata") and callable(self._instance.metadata): diff --git a/python/pulsing/integrations/ray.py b/python/pulsing/integrations/ray.py index 4978ea131..693b2e325 100644 --- a/python/pulsing/integrations/ray.py +++ b/python/pulsing/integrations/ray.py @@ -1,15 +1,19 @@ """ pulsing.ray - Initialize Pulsing in Ray cluster -Each Ray worker process can call init_in_ray() to start Pulsing and auto-join the cluster. +For automatic cluster formation, use pulsing.bootstrap(ray=True, torchrun=False) (or +pul.bootstrap() to try both Ray and torchrun). This module provides init_in_ray(), +used by bootstrap and as worker_process_setup_hook so each Ray worker initializes Pulsing. + Uses Ray's internal KV store to coordinate seed node discovery. Recommended usage: import ray from pulsing.integrations.ray import init_in_ray + import pulsing as pul ray.init(runtime_env={"worker_process_setup_hook": init_in_ray}) - init_in_ray() # driver process also needs initialization + pul.bootstrap(ray=True, torchrun=False, wait_timeout=30) # driver """ try: diff --git a/python/pulsing/integrations/torchrun.py b/python/pulsing/integrations/torchrun.py new file mode 100644 index 000000000..bc58b7deb --- /dev/null +++ b/python/pulsing/integrations/torchrun.py @@ -0,0 +1,150 @@ +""" +pulsing.torchrun - Initialize Pulsing in torchrun / torch.distributed + +For automatic cluster formation, use pulsing.bootstrap(ray=False, torchrun=True). +This module provides init_in_torchrun(), used by bootstrap. + +Rank 0 starts Pulsing and broadcasts its listen address to other ranks via +torch.distributed.broadcast_object_list(); others join with seeds=[rank0_addr]. + +Requires torch.distributed.init_process_group() to be called first (e.g. by torchrun). + +Usage: + # Recommended: use bootstrap in your script (launched with torchrun) + import pulsing as pul + pul.bootstrap(ray=False, torchrun=True, wait_timeout=30) + + # Or call init_in_torchrun directly after init_process_group + import torch.distributed as dist + from pulsing.integrations.torchrun import init_in_torchrun + dist.init_process_group(...) + init_in_torchrun() # rank 0 becomes seed, others join +""" + +from __future__ import annotations + +import os +import threading +from typing import TYPE_CHECKING + +try: + import torch.distributed as dist +except ImportError: + raise ImportError( + "pulsing.integrations.torchrun requires PyTorch. Install with: pip install torch" + ) + +import asyncio + +if TYPE_CHECKING: + from pulsing.core import ActorSystem + + +# Reuse async init helpers (same pattern as ray integration) +def _start_background_loop(): + """Start background event loop for sync init.""" + global _loop, _thread + if _thread is not None: + return + + ready = threading.Event() + + def _run(): + global _loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + _loop = loop + ready.set() + loop.run_forever() + + _thread = threading.Thread(target=_run, daemon=True, name="pulsing-torchrun-loop") + _thread.start() + ready.wait() + + +_loop = None +_thread = None + + +def _run_sync(coro): + """Run async init in background loop.""" + _start_background_loop() + fut = asyncio.run_coroutine_threadsafe(coro, _loop) + return fut.result(timeout=60) + + +async def _do_init(addr: str, seeds: list[str] | None = None): + from pulsing.core import init + + return await init(addr=addr, seeds=seeds) + + +def _get_master_addr() -> str: + return os.environ.get("MASTER_ADDR", "127.0.0.1") + + +def init_in_torchrun() -> ActorSystem: + """Initialize Pulsing in current process and join cluster using torch.distributed. + + Rank 0 binds to 0.0.0.0:0, then broadcasts MASTER_ADDR:port to all ranks. + Other ranks receive the seed address and init with seeds=[seed_addr]. + + Must be called after torch.distributed.init_process_group() (e.g. under torchrun). + """ + if not dist.is_initialized(): + raise RuntimeError( + "torch.distributed not initialized. Call torch.distributed.init_process_group() first (e.g. use torchrun)." + ) + + rank = dist.get_rank() + master_addr = _get_master_addr() + + if rank == 0: + # Rank 0: start Pulsing, get bound port, advertise MASTER_ADDR:port + system = _run_sync(_do_init("0.0.0.0:0")) + bound = str(system.addr) + # bound is e.g. "0.0.0.0:12345"; advertise as MASTER_ADDR:12345 + port = bound.split(":")[-1] + seed_addr = f"{master_addr}:{port}" + object_list = [seed_addr] + else: + object_list = [None] + + dist.broadcast_object_list(object_list, src=0) + seed_addr = object_list[0] + + if rank == 0: + return system + + # Non-rank0: join with seed + return _run_sync(_do_init("0.0.0.0:0", seeds=[seed_addr])) + + +async def async_init_in_torchrun() -> ActorSystem: + """Initialize Pulsing under torch.distributed (async version).""" + if not dist.is_initialized(): + raise RuntimeError( + "torch.distributed not initialized. Call torch.distributed.init_process_group() first." + ) + + rank = dist.get_rank() + master_addr = _get_master_addr() + + if rank == 0: + system = await _do_init("0.0.0.0:0") + bound = str(system.addr) + port = bound.split(":")[-1] + seed_addr = f"{master_addr}:{port}" + object_list = [seed_addr] + else: + object_list = [None] + + dist.broadcast_object_list(object_list, src=0) + seed_addr = object_list[0] + + if rank == 0: + return system + return await _do_init("0.0.0.0:0", seeds=[seed_addr]) + + +__all__ = ["init_in_torchrun", "async_init_in_torchrun"] diff --git a/python/pulsing/serving/router.py b/python/pulsing/serving/router.py index d17dfcc1d..dfba1a920 100644 --- a/python/pulsing/serving/router.py +++ b/python/pulsing/serving/router.py @@ -8,7 +8,7 @@ from aiohttp import web -from pulsing.core import Actor, ActorId, ActorSystem, Message, get_system +from pulsing.core import ActorId, ActorSystem, get_system, remote @dataclass @@ -196,13 +196,15 @@ async def _sync_generate( created = int(time.time()) try: - msg = Message.from_json( - "GenerateRequest", {"prompt": prompt, "max_new_tokens": max_tokens} - ) - result = (await worker_ref.ask(msg)).to_json() - text = result.get("text", "") - prompt_tokens = result.get("prompt_tokens", 0) - completion_tokens = result.get("completion_tokens", 0) + worker = worker_ref.as_any() + result = await worker.generate(prompt=prompt, max_new_tokens=max_tokens) + if isinstance(result, dict) and "error" in result: + text = f"[Error: {result['error']}]" + prompt_tokens = completion_tokens = 0 + else: + text = result.get("text", "") + prompt_tokens = result.get("prompt_tokens", 0) + completion_tokens = result.get("completion_tokens", 0) except Exception as e: text = f"[Error: {e}]" prompt_tokens = completion_tokens = 0 @@ -244,26 +246,16 @@ async def _stream_generate( obj_type = "chat.completion.chunk" if is_chat else "text_completion" try: - req_msg = Message.from_json( - "GenerateStreamRequest", - {"prompt": prompt, "max_new_tokens": max_tokens}, - ) - stream_message = await worker_ref.ask(req_msg) - - # Check if returned message is a stream message - if not stream_message.is_stream: - # If not stream message, might be error message - error_data = stream_message.to_json() - error_msg = error_data.get("error", "Unknown error") - await stream_response.write( - f"data: {json.dumps({'error': error_msg})}\n\n".encode() - ) - await stream_response.write(b"data: [DONE]\n\n") - return stream_response - - reader = stream_message.stream_reader() - - async for chunk in reader: + worker = worker_ref.as_any() + stream = worker.generate_stream(prompt=prompt, max_new_tokens=max_tokens) + + async for chunk in stream: + if isinstance(chunk, dict) and chunk.get("error"): + await stream_response.write( + f"data: {json.dumps({'error': chunk['error']})}\n\n".encode() + ) + await stream_response.write(b"data: [DONE]\n\n") + return stream_response try: finish_reason = chunk.get("finish_reason") text = chunk.get("text", "") @@ -427,35 +419,18 @@ async def stop_router(runner: web.AppRunner): print("[Router] HTTP server stopped") -class Router(Actor): - """Router Actor - OpenAI-compatible HTTP API router as an Actor +@remote +class Router: + """Router - OpenAI 兼容 HTTP API 路由,通过 pulsing.remote 暴露 health_check / get_config。 - This actor wraps the start_router/stop_router functions to provide - a CLI-compatible entry point via `pulsing actor pulsing.serving.Router`. + 包装 start_router/stop_router,支持 CLI:pulsing actor pulsing.serving.Router。 Args: - http_host: HTTP listen address (default: "0.0.0.0") - http_port: HTTP listen port (default: 8080) - model_name: Model name for API responses (default: "pulsing-model") - worker_name: Worker actor name to route requests to (default: "worker") - scheduler_type: Scheduler type, supports: - - "stream_load": Stream load-aware (default, recommended) - - "random": Random - - "round_robin": Round robin - - "power_of_two": Power-of-Two Choices - - "cache_aware": Cache-aware - - Example: - # Start via CLI - pulsing actor pulsing.serving.Router \\ - --http_host 0.0.0.0 \\ - --http_port 8080 \\ - --model_name my-llm \\ - --worker_name worker - - # Or programmatically - router = Router(http_port=8080, model_name="my-llm") - await system.spawn(router, name="router", public=True) + http_host: HTTP 监听地址 (default: "0.0.0.0") + http_port: HTTP 监听端口 (default: 8080) + model_name: API 响应中的模型名 (default: "pulsing-model") + worker_name: 路由目标 worker 名称 (default: "worker") + scheduler_type: 调度策略,支持 stream_load / random / round_robin / power_of_two / cache_aware """ def __init__( @@ -464,7 +439,7 @@ def __init__( http_port: int = 8080, model_name: str = "pulsing-model", worker_name: str = "worker", - scheduler_type: str = "stream_load", + scheduler_type: str = "round_robin", ): self.http_host = http_host self.http_port = http_port @@ -517,27 +492,20 @@ def metadata(self) -> dict[str, str]: "scheduler_type": self.scheduler_type, } - async def receive(self, msg: Message) -> Message | None: - """Handle diagnostic messages""" - if msg.msg_type == "HealthCheck": - return Message.from_json( - "Ok", - { - "status": "healthy", - "http_port": self.http_port, - "model_name": self.model_name, - }, - ) - elif msg.msg_type == "GetConfig": - return Message.from_json( - "Config", - { - "http_host": self.http_host, - "http_port": self.http_port, - "model_name": self.model_name, - "worker_name": self.worker_name, - "scheduler_type": self.scheduler_type, - }, - ) - else: - return Message.from_json("Error", {"error": f"Unknown: {msg.msg_type}"}) + def health_check(self) -> dict: + """健康检查。""" + return { + "status": "healthy", + "http_port": self.http_port, + "model_name": self.model_name, + } + + def get_config(self) -> dict: + """路由配置。""" + return { + "http_host": self.http_host, + "http_port": self.http_port, + "model_name": self.model_name, + "worker_name": self.worker_name, + "scheduler_type": self.scheduler_type, + } diff --git a/python/pulsing/serving/scheduler.py b/python/pulsing/serving/scheduler.py index 751cb2a53..b5672e50b 100644 --- a/python/pulsing/serving/scheduler.py +++ b/python/pulsing/serving/scheduler.py @@ -17,6 +17,8 @@ from abc import ABC, abstractmethod from typing import Any +import pulsing + # Import Rust policies if available try: from pulsing._core import ( @@ -53,7 +55,7 @@ def __init__(self, actor_system, worker_name: str = "worker"): async def get_available_workers(self): try: return await self._system.get_named_instances(self._worker_name) - except Exception: + except Exception as e: return [] async def get_worker_count(self) -> int: @@ -109,8 +111,7 @@ async def select_worker( async with self._lock: self._index = (self._index + 1) % len(workers) selected_worker = workers[self._index] - - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) class RandomScheduler(Scheduler): @@ -128,7 +129,7 @@ async def select_worker( return None selected_worker = random.choice(workers) - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) class LeastConnectionScheduler(Scheduler): @@ -154,7 +155,7 @@ async def select_worker( node_id = selected_worker.get("node_id") self._request_counts[node_id] = self._request_counts.get(node_id, 0) + 1 - return await self._resolve_worker(node_id=node_id) + return await pulsing.refer(selected_worker.get("actor_id")) # ============================================================================ @@ -224,7 +225,7 @@ async def select_worker( return None selected_worker = workers[selected_idx] - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) class RustRoundRobinScheduler(RustSchedulerBase): @@ -255,7 +256,7 @@ async def select_worker( return None selected_worker = workers[selected_idx] - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) def reset(self): """Reset round-robin counter""" @@ -294,7 +295,7 @@ async def select_worker( return None selected_worker = workers[selected_idx] - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) def update_loads(self, loads: dict[str, int]): """Update cached load information @@ -351,7 +352,7 @@ async def select_worker( return None selected_worker = workers[selected_idx] - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) def reset(self): """Reset hash ring""" @@ -421,7 +422,7 @@ async def select_worker( return None selected_worker = workers[selected_idx] - return await self._resolve_worker(node_id=selected_worker.get("node_id")) + return await pulsing.refer(selected_worker.get("actor_id")) def add_worker(self, url: str, model_id: str = "default"): """Add worker to cache tree""" diff --git a/python/pulsing/serving/vllm/worker.py b/python/pulsing/serving/vllm/worker.py index 16a70fa77..5268d80c3 100644 --- a/python/pulsing/serving/vllm/worker.py +++ b/python/pulsing/serving/vllm/worker.py @@ -1,4 +1,4 @@ -"""vLLM Worker Actor - High-performance inference Worker based on vLLM V1 engine +"""vLLM Worker - High-performance inference Worker based on vLLM V1 engine (pulsing.remote) Referencing Dynamo implementation, supports: 1. Prefill/Decode separation (PD Disaggregation) @@ -9,22 +9,19 @@ 6. Engine monitoring and health checks """ -# VllmWorker Actor - Main Actor class - import asyncio import logging import os import uuid from typing import Any -from pulsing.core import Actor, ActorId, Message, StreamMessage +from pulsing.core import ActorId, StreamMessage, remote from .handlers import BaseWorkerHandler, DecodeWorkerHandler, PrefillWorkerHandler from .utils import _is_macos, _setup_macos_metal_env try: from vllm.engine.arg_utils import AsyncEngineArgs - from vllm.sampling_params import SamplingParams from vllm.usage.usage_lib import UsageContext from vllm.v1.engine.async_llm import AsyncLLM @@ -35,15 +32,16 @@ logger = logging.getLogger(__name__) -class VllmWorker(Actor): - """vLLM inference Worker Actor +@remote +class VllmWorker: + """vLLM inference Worker,通过 pulsing.remote 暴露方法。 - Supports vLLM V1 engine, features aligned with Dynamo: - 1. Supports PD separation (Prefill / Decode roles) - 2. Supports multimodal input (Image) - 3. Supports KV Cache cross-node transfer parameters - 4. Supports LoRA dynamic loading/unloading - 5. Supports OpenAI-compatible text input/output + 支持 vLLM V1 engine,功能对齐 Dynamo: + 1. PD 分离(Prefill / Decode 角色) + 2. 多模态输入(Image) + 3. KV Cache 管理与清理 + 4. LoRA 动态加载/卸载 + 5. OpenAI 兼容文本输入/输出 """ def __init__( @@ -428,162 +426,142 @@ def metadata(self) -> dict[str, str]: return meta - async def receive(self, msg: Message) -> Message | StreamMessage: - # If engine not ready, wait for initialization to complete + async def _ensure_ready(self) -> None: + """等待 engine 就绪,超时则抛出异常。""" + if not VLLM_AVAILABLE: + raise RuntimeError("vLLM not installed or version incompatible") + max_wait = 60.0 + wait_interval = 0.5 + waited = 0.0 + while not self._is_ready and waited < max_wait: + await asyncio.sleep(wait_interval) + waited += wait_interval if not self._is_ready: - if not VLLM_AVAILABLE: - error_msg = "vLLM not installed or version incompatible" - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(error_msg)) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": error_msg}) - - # Wait for engine initialization to complete - max_wait = 60.0 - wait_interval = 0.5 - waited = 0.0 - - while not self._is_ready and waited < max_wait: - await asyncio.sleep(wait_interval) - waited += wait_interval - - if not self._is_ready: - error_msg = f"vLLM engine initialization timeout after {max_wait}s" - logger.error(error_msg) - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(error_msg)) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": error_msg}) - - try: - if msg.msg_type in ("GenerateRequest", "ChatCompletionRequest"): - return await self._handle_generate(msg) - elif msg.msg_type in ( - "GenerateStreamRequest", - "ChatCompletionStreamRequest", - ): - return await self._handle_generate_stream(msg) - elif msg.msg_type == "HealthCheck": - # Detailed health check - health_status = self._handler.engine_monitor.get_health_status() - health_status["role"] = self.role - health_status["worker_id"] = self.worker_id - return Message.from_json("Ok", health_status) - elif msg.msg_type == "ClearKVCache": - result = await self._handler.clear_kv_cache() - return Message.from_json("Ok", result) - elif msg.msg_type == "LoadLoRA": - # LoRA loading support - data = msg.to_json() - lora_name = data.get("lora_name") - lora_path = data.get("lora_path") - if not lora_name or not lora_path: - return Message.from_json( - "Error", - {"error": "Missing required fields: lora_name and lora_path"}, - ) - result = await self._handler.load_lora(lora_name, lora_path) - return Message.from_json("Ok", result) - elif msg.msg_type == "UnloadLoRA": - # LoRA unloading support - data = msg.to_json() - lora_name = data.get("lora_name") - if not lora_name: - return Message.from_json( - "Error", {"error": "Missing required field: lora_name"} - ) - result = await self._handler.unload_lora(lora_name) - return Message.from_json("Ok", result) - elif msg.msg_type == "ListLoRAs": - # LoRA listing support - result = await self._handler.list_loras() - return Message.from_json("Ok", result) - else: - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task( - writer.error(f"Unsupported type: {msg.msg_type}") - ) - writer.close() - return stream_msg - return Message.from_json( - "Error", {"error": f"Unsupported type: {msg.msg_type}"} - ) - except Exception as e: - logger.exception(f"Error handling {msg.msg_type}: {e}") - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(str(e))) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": str(e)}) + raise RuntimeError(f"vLLM engine initialization timeout after {max_wait}s") + + def _build_request( + self, prompt: str = "", max_new_tokens: int | None = None, **kwargs + ) -> dict: + data = kwargs.copy() + data.setdefault("prompt", prompt) + data.setdefault("max_new_tokens", max_new_tokens or self.default_max_new_tokens) + return data + + async def _collect_generate_result(self, data: dict) -> dict: + """从 handler.generate 的迭代结果聚合成单次响应 dict。""" + accumulated_text = "" + finish_reason = None + result_count = 0 + async for result in self._handler.generate(data): + result_count += 1 + if "choices" in result and len(result["choices"]) > 0: + choice = result["choices"][0] + if "delta" in choice and "content" in choice["delta"]: + accumulated_text += choice["delta"]["content"] + elif "message" in choice and "content" in choice["message"]: + accumulated_text = choice["message"]["content"] + elif "text" in choice: + accumulated_text = choice["text"] + if "finish_reason" in choice and choice["finish_reason"]: + finish_reason = choice["finish_reason"] + if accumulated_text or result_count > 0: + return { + "text": accumulated_text, + "finish_reason": finish_reason or "stop", + "completion_tokens": ( + len(accumulated_text.split()) if accumulated_text else 0 + ), + "prompt_tokens": 0, + } + return {"error": "No output"} - async def _handle_generate(self, msg: Message) -> Message: - data = msg.to_json() + # ------------------------------------------------------------------------- + # 对外方法(替代原 receive 的消息类型分支) + # ------------------------------------------------------------------------- + async def generate( + self, + prompt: str = "", + max_new_tokens: int | None = None, + **kwargs, + ) -> dict: + """同步生成,返回 {text, finish_reason, prompt_tokens, completion_tokens} 或 {error}。""" + await self._ensure_ready() try: - # Use handler to generate results - # Accumulate complete text and information - accumulated_text = "" - finish_reason = None - result_count = 0 - - async for result in self._handler.generate(data): - result_count += 1 - - # Extract text content (support different formats) - if "choices" in result and len(result["choices"]) > 0: - choice = result["choices"][0] - - # Streaming format: extract from delta.content - if "delta" in choice and "content" in choice["delta"]: - accumulated_text += choice["delta"]["content"] - # Non-streaming format: extract from message.content - elif "message" in choice and "content" in choice["message"]: - accumulated_text = choice["message"]["content"] - # Or extract directly from text - elif "text" in choice: - accumulated_text = choice["text"] - - # Extract finish_reason - if "finish_reason" in choice and choice["finish_reason"]: - finish_reason = choice["finish_reason"] - - # Return complete response (OpenAI format) - if accumulated_text or result_count > 0: - response = { - "text": accumulated_text, - "finish_reason": finish_reason or "stop", - "completion_tokens": ( - len(accumulated_text.split()) if accumulated_text else 0 - ), - "prompt_tokens": 0, # TODO: Calculate actual prompt tokens - } - return Message.from_json("GenerateResponse", response) - return Message.from_json("Error", {"error": "No output"}) + data = self._build_request( + prompt=prompt, max_new_tokens=max_new_tokens, **kwargs + ) + return await self._collect_generate_result(data) except Exception as e: - logger.exception(f"Error in generate: {e}") - return Message.from_json("Error", {"error": str(e)}) + logger.exception("Error in generate: %s", e) + return {"error": str(e)} - async def _handle_generate_stream(self, msg: Message) -> StreamMessage: + async def generate_stream( + self, + prompt: str = "", + max_new_tokens: int | None = None, + **kwargs, + ): + """流式生成,async generator 逐条 yield chunk。""" + await self._ensure_ready() stream_msg, writer = StreamMessage.create("GenerateStream") + data = self._build_request( + prompt=prompt, max_new_tokens=max_new_tokens, **kwargs + ) async def produce(): try: - data = msg.to_json() async for chunk in self._handler.generate(data): await writer.write(chunk) if chunk.get("finish_reason"): break except Exception as e: - logger.exception(f"Error in stream generation: {e}") + logger.exception("Error in stream generation: %s", e) await writer.error(str(e)) finally: writer.close() asyncio.create_task(produce()) - return stream_msg + async for chunk in stream_msg.stream_reader(): + yield chunk + + def health_check(self) -> dict: + """健康检查,含 engine 状态。""" + if not self._is_ready or not self._handler: + return { + "role": self.role, + "worker_id": self.worker_id, + "ready": False, + "error": "Engine not ready", + } + status = self._handler.engine_monitor.get_health_status() + status["role"] = self.role + status["worker_id"] = self.worker_id + return status + + async def clear_kv_cache(self) -> dict: + """清理 KV Cache。""" + await self._ensure_ready() + result = await self._handler.clear_kv_cache() + return result if isinstance(result, dict) else {"ok": True} + + async def load_lora(self, lora_name: str, lora_path: str) -> dict: + """加载 LoRA。""" + await self._ensure_ready() + if not lora_name or not lora_path: + return {"error": "Missing required fields: lora_name and lora_path"} + result = await self._handler.load_lora(lora_name, lora_path) + return result if isinstance(result, dict) else {"ok": True} + + async def unload_lora(self, lora_name: str) -> dict: + """卸载 LoRA。""" + await self._ensure_ready() + if not lora_name: + return {"error": "Missing required field: lora_name"} + result = await self._handler.unload_lora(lora_name) + return result if isinstance(result, dict) else {"ok": True} + + async def list_loras(self) -> Any: + """列出已加载的 LoRA。""" + await self._ensure_ready() + return await self._handler.list_loras() diff --git a/python/pulsing/serving/vllm_worker.py b/python/pulsing/serving/vllm_worker.py index ae0c66476..54a9cbb3f 100644 --- a/python/pulsing/serving/vllm_worker.py +++ b/python/pulsing/serving/vllm_worker.py @@ -1,4 +1,4 @@ -"""vLLM Worker Actor - High-Performance Inference Worker Based on vLLM V1 Engine +"""vLLM Worker - High-Performance Inference Worker Based on vLLM V1 Engine (pulsing.remote) Referencing Dynamo implementation, supports: 1. Prefill/Decode separation (PD Disaggregation) @@ -9,22 +9,19 @@ 6. Engine monitoring and health checks """ -# VllmWorker Actor - Main Actor Class - import asyncio import logging import os import uuid from typing import Any -from pulsing.core import Actor, ActorId, Message, StreamMessage +from pulsing.core import ActorId, StreamMessage, remote from .vllm_handlers import BaseWorkerHandler, DecodeWorkerHandler, PrefillWorkerHandler from .vllm_utils import _is_macos, _setup_macos_metal_env try: from vllm.engine.arg_utils import AsyncEngineArgs - from vllm.sampling_params import SamplingParams from vllm.usage.usage_lib import UsageContext from vllm.v1.engine.async_llm import AsyncLLM @@ -35,15 +32,16 @@ logger = logging.getLogger(__name__) -class VllmWorker(Actor): - """vLLM Inference Worker Actor +@remote +class VllmWorker: + """vLLM Inference Worker,通过 pulsing.remote 暴露方法。 - Supports vLLM V1 engine, features aligned with Dynamo: - 1. Supports PD separation (Prefill / Decode roles) - 2. Supports multimodal input (Image) - 3. Supports KV Cache cross-node transmission parameters - 4. Supports LoRA dynamic loading/unloading - 5. Supports OpenAI-compatible text input/output + 支持 vLLM V1 engine,功能对齐 Dynamo: + 1. PD 分离(Prefill / Decode 角色) + 2. 多模态输入(Image) + 3. KV Cache 管理与清理 + 4. LoRA 动态加载/卸载 + 5. OpenAI 兼容文本输入/输出 """ def __init__( @@ -277,162 +275,142 @@ def metadata(self) -> dict[str, str]: return meta - async def receive(self, msg: Message) -> Message | StreamMessage: - # If engine is not ready, wait for initialization to complete + async def _ensure_ready(self) -> None: + """等待 engine 就绪,超时则抛出异常。""" + if not VLLM_AVAILABLE: + raise RuntimeError("vLLM not installed or version incompatible") + max_wait = 60.0 + wait_interval = 0.5 + waited = 0.0 + while not self._is_ready and waited < max_wait: + await asyncio.sleep(wait_interval) + waited += wait_interval if not self._is_ready: - if not VLLM_AVAILABLE: - error_msg = "vLLM not installed or version incompatible" - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(error_msg)) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": error_msg}) - - # Wait for engine initialization to complete - max_wait = 60.0 - wait_interval = 0.5 - waited = 0.0 - - while not self._is_ready and waited < max_wait: - await asyncio.sleep(wait_interval) - waited += wait_interval - - if not self._is_ready: - error_msg = f"vLLM engine initialization timeout after {max_wait}s" - logger.error(error_msg) - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(error_msg)) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": error_msg}) - - try: - if msg.msg_type in ("GenerateRequest", "ChatCompletionRequest"): - return await self._handle_generate(msg) - elif msg.msg_type in ( - "GenerateStreamRequest", - "ChatCompletionStreamRequest", - ): - return await self._handle_generate_stream(msg) - elif msg.msg_type == "HealthCheck": - # Detailed health check - health_status = self._handler.engine_monitor.get_health_status() - health_status["role"] = self.role - health_status["worker_id"] = self.worker_id - return Message.from_json("Ok", health_status) - elif msg.msg_type == "ClearKVCache": - result = await self._handler.clear_kv_cache() - return Message.from_json("Ok", result) - elif msg.msg_type == "LoadLoRA": - # LoRA loading support - data = msg.to_json() - lora_name = data.get("lora_name") - lora_path = data.get("lora_path") - if not lora_name or not lora_path: - return Message.from_json( - "Error", - {"error": "Missing required fields: lora_name and lora_path"}, - ) - result = await self._handler.load_lora(lora_name, lora_path) - return Message.from_json("Ok", result) - elif msg.msg_type == "UnloadLoRA": - # LoRA unloading support - data = msg.to_json() - lora_name = data.get("lora_name") - if not lora_name: - return Message.from_json( - "Error", {"error": "Missing required field: lora_name"} - ) - result = await self._handler.unload_lora(lora_name) - return Message.from_json("Ok", result) - elif msg.msg_type == "ListLoRAs": - # LoRA list support - result = await self._handler.list_loras() - return Message.from_json("Ok", result) - else: - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task( - writer.error(f"Unsupported type: {msg.msg_type}") - ) - writer.close() - return stream_msg - return Message.from_json( - "Error", {"error": f"Unsupported type: {msg.msg_type}"} - ) - except Exception as e: - logger.exception(f"Error handling {msg.msg_type}: {e}") - if msg.msg_type.endswith("StreamRequest"): - stream_msg, writer = StreamMessage.create("Error") - asyncio.create_task(writer.error(str(e))) - writer.close() - return stream_msg - return Message.from_json("Error", {"error": str(e)}) + raise RuntimeError(f"vLLM engine initialization timeout after {max_wait}s") + + def _build_request( + self, prompt: str = "", max_new_tokens: int | None = None, **kwargs + ) -> dict: + data = kwargs.copy() + data.setdefault("prompt", prompt) + data.setdefault("max_new_tokens", max_new_tokens or self.default_max_new_tokens) + return data + + async def _collect_generate_result(self, data: dict) -> dict: + """从 handler.generate 的迭代结果聚合成单次响应 dict。""" + accumulated_text = "" + finish_reason = None + result_count = 0 + async for result in self._handler.generate(data): + result_count += 1 + if "choices" in result and len(result["choices"]) > 0: + choice = result["choices"][0] + if "delta" in choice and "content" in choice["delta"]: + accumulated_text += choice["delta"]["content"] + elif "message" in choice and "content" in choice["message"]: + accumulated_text = choice["message"]["content"] + elif "text" in choice: + accumulated_text = choice["text"] + if "finish_reason" in choice and choice["finish_reason"]: + finish_reason = choice["finish_reason"] + if accumulated_text or result_count > 0: + return { + "text": accumulated_text, + "finish_reason": finish_reason or "stop", + "completion_tokens": ( + len(accumulated_text.split()) if accumulated_text else 0 + ), + "prompt_tokens": 0, + } + return {"error": "No output"} - async def _handle_generate(self, msg: Message) -> Message: - data = msg.to_json() + # ------------------------------------------------------------------------- + # 对外方法(替代原 receive 的消息类型分支) + # ------------------------------------------------------------------------- + async def generate( + self, + prompt: str = "", + max_new_tokens: int | None = None, + **kwargs, + ) -> dict: + """同步生成,返回 {text, finish_reason, prompt_tokens, completion_tokens} 或 {error}。""" + await self._ensure_ready() try: - # Use handler to generate results - # Accumulate complete text and information - accumulated_text = "" - finish_reason = None - result_count = 0 - - async for result in self._handler.generate(data): - result_count += 1 - - # Extract text content (supports different formats) - if "choices" in result and len(result["choices"]) > 0: - choice = result["choices"][0] - - # Streaming format: extract from delta.content - if "delta" in choice and "content" in choice["delta"]: - accumulated_text += choice["delta"]["content"] - # Non-streaming format: extract from message.content - elif "message" in choice and "content" in choice["message"]: - accumulated_text = choice["message"]["content"] - # Or extract directly from text - elif "text" in choice: - accumulated_text = choice["text"] - - # Extract finish_reason - if "finish_reason" in choice and choice["finish_reason"]: - finish_reason = choice["finish_reason"] - - # Return complete response (OpenAI format) - if accumulated_text or result_count > 0: - response = { - "text": accumulated_text, - "finish_reason": finish_reason or "stop", - "completion_tokens": ( - len(accumulated_text.split()) if accumulated_text else 0 - ), - "prompt_tokens": 0, # TODO: Calculate actual prompt tokens - } - return Message.from_json("GenerateResponse", response) - return Message.from_json("Error", {"error": "No output"}) + data = self._build_request( + prompt=prompt, max_new_tokens=max_new_tokens, **kwargs + ) + return await self._collect_generate_result(data) except Exception as e: - logger.exception(f"Error in generate: {e}") - return Message.from_json("Error", {"error": str(e)}) + logger.exception("Error in generate: %s", e) + return {"error": str(e)} - async def _handle_generate_stream(self, msg: Message) -> StreamMessage: + async def generate_stream( + self, + prompt: str = "", + max_new_tokens: int | None = None, + **kwargs, + ): + """流式生成,async generator 逐条 yield chunk。""" + await self._ensure_ready() stream_msg, writer = StreamMessage.create("GenerateStream") + data = self._build_request( + prompt=prompt, max_new_tokens=max_new_tokens, **kwargs + ) async def produce(): try: - data = msg.to_json() async for chunk in self._handler.generate(data): await writer.write(chunk) if chunk.get("finish_reason"): break except Exception as e: - logger.exception(f"Error in stream generation: {e}") + logger.exception("Error in stream generation: %s", e) await writer.error(str(e)) finally: writer.close() asyncio.create_task(produce()) - return stream_msg + async for chunk in stream_msg.stream_reader(): + yield chunk + + def health_check(self) -> dict: + """健康检查,含 engine 状态。""" + if not self._is_ready or not self._handler: + return { + "role": self.role, + "worker_id": self.worker_id, + "ready": False, + "error": "Engine not ready", + } + status = self._handler.engine_monitor.get_health_status() + status["role"] = self.role + status["worker_id"] = self.worker_id + return status + + async def clear_kv_cache(self) -> dict: + """清理 KV Cache。""" + await self._ensure_ready() + result = await self._handler.clear_kv_cache() + return result if isinstance(result, dict) else {"ok": True} + + async def load_lora(self, lora_name: str, lora_path: str) -> dict: + """加载 LoRA。""" + await self._ensure_ready() + if not lora_name or not lora_path: + return {"error": "Missing required fields: lora_name and lora_path"} + result = await self._handler.load_lora(lora_name, lora_path) + return result if isinstance(result, dict) else {"ok": True} + + async def unload_lora(self, lora_name: str) -> dict: + """卸载 LoRA。""" + await self._ensure_ready() + if not lora_name: + return {"error": "Missing required field: lora_name"} + result = await self._handler.unload_lora(lora_name) + return result if isinstance(result, dict) else {"ok": True} + + async def list_loras(self) -> Any: + """列出已加载的 LoRA。""" + await self._ensure_ready() + return await self._handler.list_loras() diff --git a/python/pulsing/serving/worker.py b/python/pulsing/serving/worker.py index 992c2e1af..b39277069 100644 --- a/python/pulsing/serving/worker.py +++ b/python/pulsing/serving/worker.py @@ -1,11 +1,12 @@ -"""Transformers Worker Actor - LLM Inference Worker""" +"""Transformers Worker - LLM Inference Worker (pulsing.remote)""" import asyncio import time import uuid from dataclasses import dataclass +from threading import Thread -from pulsing.core import Actor, ActorId, Message, StreamMessage +from pulsing.core import ActorId, StreamMessage, remote @dataclass @@ -18,10 +19,11 @@ class GenerationConfig: do_sample: bool = False -class TransformersWorker(Actor): - """Transformers LLM Inference Worker, supports synchronous and streaming generation +@remote +class TransformersWorker: + """Transformers LLM Inference Worker,支持同步/流式生成与负载订阅。 - Supports streaming load subscription (SubscribeLoad), Router can subscribe and receive load updates in real-time. + 通过 pulsing.remote 暴露方法:generate、generate_stream、subscribe_load、health_check、get_load。 """ def __init__( @@ -45,11 +47,8 @@ def __init__( self._tokenizer = None self._is_loaded = False - # Load tracking self._current_load = 0 self._request_count = 0 - - # Load subscribers (streaming push) self._load_subscribers: list = [] async def on_start(self, actor_id: ActorId) -> None: @@ -62,7 +61,6 @@ async def on_start(self, actor_id: ActorId) -> None: def on_stop(self) -> None: self._model = None self._tokenizer = None - # Close all subscription streams for writer in self._load_subscribers: try: writer.close() @@ -71,7 +69,6 @@ def on_stop(self) -> None: self._load_subscribers.clear() def metadata(self) -> dict[str, str]: - """Returns worker metadata""" return { "type": "worker", "model": self.model_name, @@ -92,7 +89,6 @@ def load_ratio(self) -> float: return self._current_load / max(1, self.capacity) def _get_load_snapshot(self) -> dict: - """Get load snapshot""" return { "worker_id": self.worker_id, "node_id": self._node_id or self.worker_id, @@ -103,27 +99,21 @@ def _get_load_snapshot(self) -> dict: } async def _push_load_update(self): - """Push load update to all subscribers""" if not self._load_subscribers: return - snapshot = self._get_load_snapshot() - dead_writers = [] - + dead = [] for writer in self._load_subscribers: try: await writer.write(snapshot) except Exception: - dead_writers.append(writer) - - # Clean up disconnected connections - for w in dead_writers: + dead.append(writer) + for w in dead: self._load_subscribers.remove(w) async def load_model(self): if self._is_loaded: return - try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer @@ -132,81 +122,32 @@ async def load_model(self): print(f"[Worker] Loading {self.model_name}...") self._tokenizer = AutoTokenizer.from_pretrained(self.model_name) - torch_dtype = torch.float16 if self.device in ("cuda", "mps") else torch.float32 model_kwargs = {"device_map": "auto"} if self.device == "cuda" else {} - self._model = AutoModelForCausalLM.from_pretrained( self.model_name, torch_dtype=torch_dtype, **model_kwargs ) - if self.device != "cuda": self._model.to(self.device) - self._model.eval() self._is_loaded = True print(f"[Worker] Model ready on {self.device}") - async def receive(self, msg: Message) -> Message | StreamMessage: - try: - if msg.msg_type == "GenerateRequest": - return await self._handle_generate(msg) - elif msg.msg_type == "GenerateStreamRequest": - return await self._handle_generate_stream(msg) - elif msg.msg_type == "SubscribeLoad": - return self._handle_subscribe_load() - elif msg.msg_type == "HealthCheck": - return Message.from_json( - "Ok", - { - "status": "healthy", - "worker_id": self.worker_id, - "is_loaded": self._is_loaded, - }, - ) - elif msg.msg_type == "GetLoad": - return Message.from_json("LoadInfo", self._get_load_snapshot()) - else: - return Message.from_json("Error", {"error": f"Unknown: {msg.msg_type}"}) - except Exception as e: - print(f"[Worker] Error: {e}") - return Message.from_json("Error", {"error": str(e)}) - - def _handle_subscribe_load(self) -> StreamMessage: - """Handle load subscription request, returns a stream that continuously pushes load updates""" - stream_msg, writer = StreamMessage.create("LoadStream") - self._load_subscribers.append(writer) - - worker = self - - async def produce(): - try: - # Immediately send current state - await writer.write(worker._get_load_snapshot()) - - # Periodic push (every second) - while True: - await asyncio.sleep(1.0) - await writer.write(worker._get_load_snapshot()) - except Exception: - pass - finally: - if writer in worker._load_subscribers: - worker._load_subscribers.remove(writer) - writer.close() - - asyncio.create_task(produce()) - return stream_msg + # ------------------------------------------------------------------------- + # 对外方法(替代原 receive 的消息类型分支) + # ------------------------------------------------------------------------- - async def _handle_generate(self, msg: Message) -> Message: + async def generate( + self, + prompt: str, + max_new_tokens: int | None = None, + ) -> dict: + """同步生成,返回 {text, prompt_tokens, completion_tokens} 或 {error}。""" + if max_new_tokens is None: + max_new_tokens = self.gen_config.max_new_tokens if not self._is_loaded: await self.load_model() - data = msg.to_json() - prompt = data.get("prompt", "") - max_new_tokens = data.get("max_new_tokens", self.gen_config.max_new_tokens) - - # Start request - increase load self._current_load += 1 self._request_count += 1 asyncio.create_task(self._push_load_update()) @@ -214,7 +155,7 @@ async def _handle_generate(self, msg: Message) -> Message: try: loop = asyncio.get_running_loop() - def _generate_sync(): + def _run(): inputs = self._tokenizer(prompt, return_tensors="pt").to( self._model.device ) @@ -224,85 +165,73 @@ def _generate_sync(): pad_token_id=self._tokenizer.eos_token_id, do_sample=self.gen_config.do_sample, ) - input_len = inputs["input_ids"].shape[1] new_tokens = outputs[0][input_len:] text = self._tokenizer.decode(new_tokens, skip_special_tokens=True) return text, input_len, len(new_tokens) text, prompt_tokens, completion_tokens = await loop.run_in_executor( - None, _generate_sync - ) - - return Message.from_json( - "GenerateResponse", - { - "text": text, - "worker_id": self.worker_id, - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - }, + None, _run ) + return { + "text": text, + "worker_id": self.worker_id, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + } + except Exception as e: + print(f"[Worker] Error: {e}") + return {"error": str(e)} finally: - # Request completed - decrease load self._current_load -= 1 asyncio.create_task(self._push_load_update()) - async def _handle_generate_stream(self, msg: Message) -> StreamMessage: - from threading import Thread - + async def generate_stream( + self, + prompt: str, + max_new_tokens: int | None = None, + ): + """流式生成,async generator 逐条 yield {text, worker_id} 或最终 {finish_reason, prompt_tokens, completion_tokens}。""" + if max_new_tokens is None: + max_new_tokens = self.gen_config.max_new_tokens if not self._is_loaded: await self.load_model() - data = msg.to_json() - prompt = data.get("prompt", "") - max_new_tokens = data.get("max_new_tokens", self.gen_config.max_new_tokens) - - # Start request - increase load self._current_load += 1 self._request_count += 1 asyncio.create_task(self._push_load_update()) stream_msg, writer = StreamMessage.create("GenerateStream") - - # Save reference for decreasing load in produce worker = self async def produce(): try: + from transformers import TextIteratorStreamer + inputs = worker._tokenizer(prompt, return_tensors="pt").to( worker._model.device ) input_len = inputs["input_ids"].shape[1] - - from transformers import TextIteratorStreamer - streamer = TextIteratorStreamer( worker._tokenizer, skip_prompt=True, skip_special_tokens=True ) - generation_kwargs = { + gen_kwargs = { **inputs, "max_new_tokens": max_new_tokens, "pad_token_id": worker._tokenizer.eos_token_id, "do_sample": worker.gen_config.do_sample, "streamer": streamer, } - - thread = Thread(target=worker._model.generate, kwargs=generation_kwargs) + thread = Thread(target=worker._model.generate, kwargs=gen_kwargs) thread.start() - token_count = 0 for text in streamer: if text: token_count += 1 await writer.write( - { - "text": text, - "worker_id": worker.worker_id, - } + {"text": text, "worker_id": worker.worker_id} ) thread.join() - await writer.write( { "text": "", @@ -318,10 +247,45 @@ async def produce(): except Exception: pass finally: - # Request completed - decrease load worker._current_load -= 1 asyncio.create_task(worker._push_load_update()) writer.close() asyncio.create_task(produce()) - return stream_msg + async for chunk in stream_msg.stream_reader(): + yield chunk + + async def subscribe_load(self): + """订阅负载更新,async generator 每秒 yield 一次负载快照。""" + stream_msg, writer = StreamMessage.create("LoadStream") + self._load_subscribers.append(writer) + worker = self + + async def produce(): + try: + await writer.write(worker._get_load_snapshot()) + while True: + await asyncio.sleep(1.0) + await writer.write(worker._get_load_snapshot()) + except Exception: + pass + finally: + if writer in worker._load_subscribers: + worker._load_subscribers.remove(writer) + writer.close() + + asyncio.create_task(produce()) + async for snapshot in stream_msg.stream_reader(): + yield snapshot + + def health_check(self) -> dict: + """健康检查。""" + return { + "status": "healthy", + "worker_id": self.worker_id, + "is_loaded": self._is_loaded, + } + + def get_load(self) -> dict: + """当前负载快照。""" + return self._get_load_snapshot()