diff --git a/Cargo.lock b/Cargo.lock index 19f9918..3485212 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,14 @@ dependencies = [ "thiserror", ] +[[package]] +name = "spanker-scheduler" +version = "0.1.0" +dependencies = [ + "spanker-runtime", + "thiserror", +] + [[package]] name = "syn" version = "2.0.117" diff --git a/Cargo.toml b/Cargo.toml index 5a53e5f..6157506 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ resolver = "2" members = [ "src/runtime", "src/backends/ggml", + "src/scheduler", ] [workspace.package] diff --git a/src/scheduler/Cargo.toml b/src/scheduler/Cargo.toml new file mode 100644 index 0000000..e21d0bc --- /dev/null +++ b/src/scheduler/Cargo.toml @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 PopSolutions Cooperative + +[package] +name = "spanker-scheduler" +version = "0.1.0" +description = "Distributed scheduler for the PopSolutions Sails — multi-card topology, collective ops." +keywords = ["scheduler", "distributed", "popsolutions", "fpga", "collective"] +categories = ["hardware-support", "concurrency"] + +edition.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true +rust-version.workspace = true + +[lib] +name = "spanker_scheduler" +path = "src/lib.rs" + +[dependencies] +spanker-runtime = { path = "../runtime" } +thiserror = { workspace = true } diff --git a/src/scheduler/src/collective.rs b/src/scheduler/src/collective.rs new file mode 100644 index 0000000..5212f6e --- /dev/null +++ b/src/scheduler/src/collective.rs @@ -0,0 +1,229 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2026 PopSolutions Cooperative + +//! Collective-ops trait surfaces and host-side mock impls. +//! +//! [`AllReduce`] and [`AllGather`] are the data-path collective +//! ops the scheduler exposes to the runtime. Real device impls +//! land alongside the inter-card link protocol (ADR-014, MAST +//! cross-stream issue filed alongside this PR). +//! +//! [`TensorParallel`] and [`ModelParallel`] are minimal marker +//! traits today — they expose `shard_count` so the runtime can +//! plan partitioning without committing to a tensor type. Bodies +//! grow when the GGML matmul (PR #5b) needs concrete shard +//! geometry. + +use spanker_runtime::SpankerControl; + +use crate::topology::{MockSail, Topology}; +use crate::{Error, Result}; + +/// Reduction operation for [`AllReduce`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReduceOp { + /// Element-wise sum across cards. + Sum, + /// Element-wise maximum across cards. + Max, + /// Element-wise minimum across cards. + Min, + /// Element-wise average across cards. + Avg, +} + +/// Reduce a per-card array across all sails using `op`. After +/// the call every per-card buffer contains the same reduced +/// result. +pub trait AllReduce { + /// Reduce `per_card[i]` across all `i` using `op`. All + /// buffers must have the same length; mismatched shapes + /// produce [`Error::ShapeMismatch`]. The number of buffers + /// must equal the topology size. + fn all_reduce_f32(&self, per_card: &mut [Vec], op: ReduceOp) -> Result<()>; +} + +/// Gather per-card buffers into a single flat result. +pub trait AllGather { + /// Concatenate `per_card[0..n]` in topology-index order and + /// return the flat buffer. The number of buffers must equal + /// the topology size. + fn all_gather_f32(&self, per_card: &[Vec]) -> Result>; +} + +/// Tensor-parallel partitioning interface. +/// +/// Skeleton today: only exposes the shard count. Real geometry +/// arrives with PR #5b. +pub trait TensorParallel { + /// Number of shards a tensor would be split into. + fn shard_count(&self) -> usize; +} + +/// Model-parallel (layer-sharded) partitioning interface. +/// +/// Skeleton today: only exposes the shard count. +pub trait ModelParallel { + /// Number of shards a model would be split across. + fn shard_count(&self) -> usize; +} + +impl TensorParallel for Topology { + fn shard_count(&self) -> usize { + self.n_sails() + } +} + +impl ModelParallel for Topology { + fn shard_count(&self) -> usize { + self.n_sails() + } +} + +// -- mock impls (host-side simulation) -- + +fn validate_uniform(per_card: &[Vec], n_sails: usize) -> Result { + if per_card.len() != n_sails { + return Err(Error::TopologyMismatch { + expected: n_sails, + actual: per_card.len(), + }); + } + let stride = per_card.first().map(|v| v.len()).unwrap_or(0); + for (i, v) in per_card.iter().enumerate() { + if v.len() != stride { + return Err(Error::ShapeMismatch { + sail: i, + expected: stride, + actual: v.len(), + }); + } + } + Ok(stride) +} + +impl AllReduce for Topology { + fn all_reduce_f32(&self, per_card: &mut [Vec], op: ReduceOp) -> Result<()> { + let stride = validate_uniform(per_card, self.n_sails())?; + if stride == 0 { + return Ok(()); + } + + let n = per_card.len() as f32; + let mut reduced = vec![0.0f32; stride]; + for i in 0..stride { + let initial = per_card[0][i]; + let acc = match op { + ReduceOp::Sum | ReduceOp::Avg => { + let mut s = 0.0f32; + for v in per_card.iter() { + s += v[i]; + } + if matches!(op, ReduceOp::Avg) { + s / n + } else { + s + } + } + ReduceOp::Max => { + let mut m = initial; + for v in per_card.iter().skip(1) { + if v[i] > m { + m = v[i]; + } + } + m + } + ReduceOp::Min => { + let mut m = initial; + for v in per_card.iter().skip(1) { + if v[i] < m { + m = v[i]; + } + } + m + } + }; + reduced[i] = acc; + } + + for v in per_card.iter_mut() { + v.clone_from(&reduced); + } + Ok(()) + } +} + +impl AllGather for Topology { + fn all_gather_f32(&self, per_card: &[Vec]) -> Result> { + let stride = validate_uniform(per_card, self.n_sails())?; + let mut out = Vec::with_capacity(stride * self.n_sails()); + for v in per_card { + out.extend_from_slice(v); + } + Ok(out) + } +} + +// -- real-device impls (deferred) -- + +impl AllReduce for Topology { + fn all_reduce_f32(&self, _per_card: &mut [Vec], _op: ReduceOp) -> Result<()> { + Err(Error::NotImplemented( + "AllReduce on real device requires SPANKER_IOC_WORK_SUBMIT (PR #6b)", + )) + } +} + +impl AllGather for Topology { + fn all_gather_f32(&self, _per_card: &[Vec]) -> Result> { + Err(Error::NotImplemented( + "AllGather on real device requires SPANKER_IOC_WORK_SUBMIT (PR #6b)", + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn topology_shard_count_matches_n_sails() { + let t = Topology::::with_mock(3); + assert_eq!( as TensorParallel>::shard_count(&t), 3); + assert_eq!( as ModelParallel>::shard_count(&t), 3); + } + + #[test] + fn all_reduce_sum_topology_mismatch() { + let t = Topology::::with_mock(2); + let mut per_card = vec![vec![1.0f32, 2.0], vec![3.0, 4.0], vec![5.0, 6.0]]; + let err = t + .all_reduce_f32(&mut per_card, ReduceOp::Sum) + .expect_err("expected TopologyMismatch"); + assert!(matches!( + err, + Error::TopologyMismatch { + expected: 2, + actual: 3 + } + )); + } + + #[test] + fn all_reduce_sum_shape_mismatch() { + let t = Topology::::with_mock(2); + let mut per_card = vec![vec![1.0f32, 2.0], vec![3.0]]; + let err = t + .all_reduce_f32(&mut per_card, ReduceOp::Sum) + .expect_err("expected ShapeMismatch"); + assert!(matches!( + err, + Error::ShapeMismatch { + sail: 1, + expected: 2, + actual: 1 + } + )); + } +} diff --git a/src/scheduler/src/intercard.rs b/src/scheduler/src/intercard.rs new file mode 100644 index 0000000..82f3514 --- /dev/null +++ b/src/scheduler/src/intercard.rs @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2026 PopSolutions Cooperative + +//! Inter-card link contract — Rust mirror of the constants and +//! `link_state_t` enum landed by MAST #14 (intercard skeleton). +//! +//! Until ADR-014 (inter-card link architecture choice) lands the +//! actual protocol, this module exposes only the *shape* of the +//! link surface so the scheduler can reason about topology +//! without committing to a wire protocol. Real bandwidth and +//! latency numbers come from the cross-stream issue against +//! MAST filed alongside this PR. + +/// Number of high-speed lanes per inter-card link. Default per +/// MAST #14 is 4; PCB rev-A may parametrise per Sail variant. +pub const INTERCARD_LANES: usize = 4; + +/// Width of one lane, in bits. Mirrors MAST #14's +/// `INTERCARD_LANE_WIDTH = 32`. +pub const INTERCARD_LANE_WIDTH: usize = 32; + +/// Aggregate bus width seen by the link MAC, in bits. Mirrors +/// MAST #14's `INTERCARD_BUS_WIDTH = 128` (default). +pub const INTERCARD_BUS_WIDTH: usize = 128; + +/// State of a single inter-card link, mirroring `link_state_t` +/// in MAST #14. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LinkState { + /// Link is down; no traffic. + Down, + /// Lane training in progress. + Training, + /// Link is up and ready for traffic. + Up, + /// Hard error; link must be retrained or replaced. + Error, +} + +/// A single point-to-point link between two sails in a topology. +/// +/// `local_sail` and `remote_sail` are indices into +/// [`crate::Topology::sails`]; the protocol that flows over the +/// link is opaque to this crate and lands in ADR-014. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Link { + /// Topology index of the originating sail. + pub local_sail: usize, + /// Topology index of the peer sail. + pub remote_sail: usize, + /// Current state of this link. + pub state: LinkState, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn intercard_constants_match_mast_14() { + // MAST #14 contract: 4 lanes × 32 bits = 128-bit bus. + assert_eq!(INTERCARD_LANES * INTERCARD_LANE_WIDTH, INTERCARD_BUS_WIDTH); + } + + #[test] + fn link_state_is_copy() { + let s = LinkState::Up; + let _ = s; + let _ = s; // would fail to compile if !Copy + } +} diff --git a/src/scheduler/src/lib.rs b/src/scheduler/src/lib.rs new file mode 100644 index 0000000..06314ad --- /dev/null +++ b/src/scheduler/src/lib.rs @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2026 PopSolutions Cooperative + +//! # spanker-scheduler — distributed scheduler for the PopSolutions Sails +//! +//! Per `project_multicard_parallelism.md` (multi-card parallelism is +//! a first-class architectural requirement) and the cross-stream +//! contract with `popsolutions/MAST` (intercard skeleton, MAST #14) +//! and `popsolutions/Stays` (PCB connector pinout). +//! +//! ## What this crate exposes +//! +//! - [`Topology`]: opaque handle over the connected Sails plus +//! their inter-card link graph. Generic over the per-sail +//! handle type so unit tests can drive a [`MockSail`] vector +//! without `/dev/spanker*` present. +//! - [`AllReduce`], [`AllGather`]: collective-ops trait surfaces +//! for the multi-card data path. Implemented host-side on +//! [`Topology`] for now (real implementations land +//! when the kernel ABI gains work-submission ioctls and the +//! inter-card link protocol is specified per ADR-014). +//! - [`TensorParallel`], [`ModelParallel`]: marker / shape traits +//! the runtime consumes when partitioning workloads. Bodies +//! land alongside real-device matmul (PR #5b) and inter-card +//! link bandwidth characterisation (cross-stream issue against +//! MAST filed alongside this PR). +//! - Inter-card constants imported from MAST #14: see +//! [`intercard`] module. + +#![warn(missing_docs)] +#![deny(unsafe_op_in_unsafe_fn)] + +pub mod collective; +pub mod intercard; +pub mod topology; + +pub use collective::{AllGather, AllReduce, ModelParallel, ReduceOp, TensorParallel}; +pub use intercard::{Link, LinkState, INTERCARD_BUS_WIDTH, INTERCARD_LANES, INTERCARD_LANE_WIDTH}; +pub use topology::{MockSail, Topology}; + +/// Errors returned by this crate. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// `Topology::enumerate()` found no `/dev/spanker*` device + /// nodes (typical cause: `spanker.ko` is not loaded, or the + /// driver's PCIe probe path has not yet been wired up to + /// create per-Sail nodes). + #[error("no Sails enumerated; spanker.ko may not be loaded or no devices probed yet")] + NoSails, + + /// Per-card buffer count differs from the topology size. + #[error("topology mismatch: expected {expected} sails, got {actual}")] + TopologyMismatch { + /// Number of sails the topology was built with. + expected: usize, + /// Number of per-card buffers the caller passed. + actual: usize, + }, + + /// Per-card buffers have inconsistent shapes (collective ops + /// require uniform shape across cards). + #[error("buffer shape mismatch on sail {sail}: expected {expected} elems, got {actual}")] + ShapeMismatch { + /// Which sail's buffer disagrees. + sail: usize, + /// Shape of sail 0's buffer (the reference). + expected: usize, + /// Shape of the offending buffer. + actual: usize, + }, + + /// Real-device collective op is not yet wired up. + #[error("not implemented yet: {0}")] + NotImplemented(&'static str), + + /// Underlying runtime error (open, ioctl). + #[error(transparent)] + Runtime(#[from] spanker_runtime::Error), +} + +/// Convenience alias for results returned by this crate. +pub type Result = std::result::Result; diff --git a/src/scheduler/src/topology.rs b/src/scheduler/src/topology.rs new file mode 100644 index 0000000..443e7b7 --- /dev/null +++ b/src/scheduler/src/topology.rs @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2026 PopSolutions Cooperative + +//! Topology over connected Sails plus their inter-card links. +//! +//! [`Topology`] is generic over the per-sail handle type so unit +//! tests can drive a [`MockSail`] vector without `/dev/spanker*` +//! being present. The real-device path is implemented on +//! `Topology`; the mock path is on +//! `Topology`. + +use std::path::PathBuf; + +use spanker_runtime::SpankerControl; + +use crate::intercard::{Link, LinkState}; +use crate::{Error, Result}; + +/// In-process mock for one Sail. Carries an `id` so collective-op +/// implementations on `Topology` can disambiguate cards +/// when reducing host-side. Tests should construct mock +/// topologies via [`Topology::with_mock`] rather than this type +/// directly. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct MockSail { + id: usize, +} + +impl MockSail { + /// Construct a new mock sail with the given index. + pub fn new(id: usize) -> Self { + Self { id } + } + + /// Topology index of this mock sail. + pub fn id(&self) -> usize { + self.id + } +} + +/// Topology over connected Sails plus their inter-card link graph. +/// +/// Generic over the per-sail handle type; defaults to +/// [`SpankerControl`] for the real-device path. Tests use +/// [`Topology`] via [`Topology::with_mock`]. +pub struct Topology { + sails: Vec, + links: Vec, +} + +impl Topology { + /// Walk `/dev/spanker0..N` opening each as a [`SpankerControl`] + /// per ADR-002. Stops at the first index where `/dev/spankerN` + /// does not exist; returns [`Error::NoSails`] if none were + /// found. + /// + /// Inter-card link discovery is a follow-up — until ADR-014 + /// pins the link protocol, the returned `Topology` has an + /// empty `links` vector even when multiple sails are present. + pub fn enumerate() -> Result { + let mut sails = Vec::new(); + for index in 0..256 { + let path = PathBuf::from(format!("/dev/spanker{index}")); + if !path.exists() { + break; + } + sails.push(SpankerControl::open_path(&path)?); + } + if sails.is_empty() { + return Err(Error::NoSails); + } + Ok(Self { + sails, + links: Vec::new(), + }) + } +} + +impl Topology { + /// Construct a fully-meshed mock topology with `n_sails` + /// cards, all inter-card links in [`LinkState::Up`]. + pub fn with_mock(n_sails: usize) -> Self { + let sails = (0..n_sails).map(MockSail::new).collect(); + let mut links = Vec::with_capacity(n_sails.saturating_sub(1) * n_sails); + for local in 0..n_sails { + for remote in 0..n_sails { + if local == remote { + continue; + } + links.push(Link { + local_sail: local, + remote_sail: remote, + state: LinkState::Up, + }); + } + } + Self { sails, links } + } +} + +impl Topology { + /// Number of sails in this topology. + pub fn n_sails(&self) -> usize { + self.sails.len() + } + + /// Inter-card links in this topology. + pub fn links(&self) -> &[Link] { + &self.links + } +} + +#[cfg(test)] +impl Topology { + /// Per-sail handles in topology-index order. Test-only + /// accessor used by the topology's own unit tests; the + /// public surface deliberately keeps the handle vector + /// opaque so future refactors can change its representation. + pub(crate) fn sails_for_tests(&self) -> &[H] { + &self.sails + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn enumerate_reports_no_sails_when_dev_missing() { + // /dev/spanker0 won't exist on a host without spanker.ko + // loaded; the cargo test runner is not root, so we cannot + // create test nodes. This guards the empty-iteration path. + // Use match instead of expect_err to avoid requiring Debug + // on Topology's success type. + let path = std::path::Path::new("/dev/spanker0"); + if path.exists() { + // CI host happens to have the module loaded — skip. + return; + } + match Topology::enumerate() { + Err(Error::NoSails) => {} + Err(other) => panic!("expected NoSails, got {other:?}"), + Ok(_) => panic!("expected NoSails, got Ok"), + } + } + + #[test] + fn with_mock_zero_sails_has_no_links() { + let t = Topology::::with_mock(0); + assert_eq!(t.n_sails(), 0); + assert!(t.links().is_empty()); + } + + #[test] + fn with_mock_two_sails_has_two_links() { + let t = Topology::::with_mock(2); + assert_eq!(t.n_sails(), 2); + // Fully-meshed directed: (0->1) and (1->0). + assert_eq!(t.links().len(), 2); + for link in t.links() { + assert_eq!(link.state, LinkState::Up); + assert_ne!(link.local_sail, link.remote_sail); + } + } + + #[test] + fn with_mock_four_sails_has_twelve_links() { + let t = Topology::::with_mock(4); + assert_eq!(t.n_sails(), 4); + // n*(n-1) = 12 directed links in a fully-meshed topology. + assert_eq!(t.links().len(), 12); + } + + #[test] + fn mock_sail_carries_id() { + let t = Topology::::with_mock(3); + for (i, sail) in t.sails_for_tests().iter().enumerate() { + assert_eq!(sail.id(), i); + } + } +} diff --git a/src/scheduler/tests/topology_mock.rs b/src/scheduler/tests/topology_mock.rs new file mode 100644 index 0000000..ba22612 --- /dev/null +++ b/src/scheduler/tests/topology_mock.rs @@ -0,0 +1,133 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2026 PopSolutions Cooperative + +//! Integration tests — `Topology` exercises the +//! collective-ops contract end-to-end. +//! +//! The real-device path on `Topology` returns +//! `Error::NotImplemented` until the kernel ABI gains +//! `SPANKER_IOC_WORK_SUBMIT` (PR #6b). Inter-card link bandwidth / +//! latency characterisation comes from the cross-stream issue +//! against MAST filed alongside this PR. + +use spanker_scheduler::{ + AllGather, AllReduce, Error, MockSail, ReduceOp, Topology, INTERCARD_BUS_WIDTH, + INTERCARD_LANES, INTERCARD_LANE_WIDTH, +}; + +#[test] +fn all_reduce_sum_two_cards_yields_sum() { + let t = Topology::::with_mock(2); + let mut per_card = vec![vec![1.0f32, 2.0, 3.0], vec![10.0, 20.0, 30.0]]; + t.all_reduce_f32(&mut per_card, ReduceOp::Sum) + .expect("AllReduce sum on 2 cards"); + for v in &per_card { + assert_eq!(v, &[11.0, 22.0, 33.0]); + } +} + +#[test] +fn all_reduce_sum_four_cards_yields_total() { + let t = Topology::::with_mock(4); + let mut per_card = vec![vec![1.0f32], vec![2.0], vec![3.0], vec![4.0]]; + t.all_reduce_f32(&mut per_card, ReduceOp::Sum) + .expect("AllReduce sum on 4 cards"); + for v in &per_card { + assert_eq!(v, &[10.0]); + } +} + +#[test] +fn all_reduce_avg_four_cards_yields_mean() { + let t = Topology::::with_mock(4); + let mut per_card = vec![vec![1.0f32], vec![2.0], vec![3.0], vec![4.0]]; + t.all_reduce_f32(&mut per_card, ReduceOp::Avg) + .expect("AllReduce avg on 4 cards"); + for v in &per_card { + assert_eq!(v, &[2.5]); + } +} + +#[test] +fn all_reduce_max_picks_per_index_max() { + let t = Topology::::with_mock(3); + let mut per_card = vec![ + vec![1.0f32, 5.0, 9.0], + vec![7.0, 2.0, 8.0], + vec![3.0, 6.0, 4.0], + ]; + t.all_reduce_f32(&mut per_card, ReduceOp::Max) + .expect("AllReduce max on 3 cards"); + for v in &per_card { + assert_eq!(v, &[7.0, 6.0, 9.0]); + } +} + +#[test] +fn all_reduce_min_picks_per_index_min() { + let t = Topology::::with_mock(3); + let mut per_card = vec![ + vec![1.0f32, 5.0, 9.0], + vec![7.0, 2.0, 8.0], + vec![3.0, 6.0, 4.0], + ]; + t.all_reduce_f32(&mut per_card, ReduceOp::Min) + .expect("AllReduce min on 3 cards"); + for v in &per_card { + assert_eq!(v, &[1.0, 2.0, 4.0]); + } +} + +#[test] +fn all_gather_four_cards_concatenates_in_order() { + let t = Topology::::with_mock(4); + let per_card = vec![ + vec![1.0f32, 2.0], + vec![3.0, 4.0], + vec![5.0, 6.0], + vec![7.0, 8.0], + ]; + let gathered = t.all_gather_f32(&per_card).expect("AllGather on 4 cards"); + assert_eq!(gathered, vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]); +} + +#[test] +fn all_gather_topology_mismatch() { + let t = Topology::::with_mock(2); + let per_card = vec![vec![1.0f32, 2.0], vec![3.0, 4.0], vec![5.0, 6.0]]; + let err = t + .all_gather_f32(&per_card) + .expect_err("expected TopologyMismatch"); + assert!(matches!( + err, + Error::TopologyMismatch { + expected: 2, + actual: 3 + } + )); +} + +#[test] +fn all_gather_shape_mismatch() { + let t = Topology::::with_mock(2); + let per_card = vec![vec![1.0f32, 2.0], vec![3.0]]; + let err = t + .all_gather_f32(&per_card) + .expect_err("expected ShapeMismatch"); + assert!(matches!( + err, + Error::ShapeMismatch { + sail: 1, + expected: 2, + actual: 1 + } + )); +} + +#[test] +fn intercard_constants_match_mast_14_contract() { + assert_eq!(INTERCARD_LANES, 4); + assert_eq!(INTERCARD_LANE_WIDTH, 32); + assert_eq!(INTERCARD_BUS_WIDTH, 128); + assert_eq!(INTERCARD_LANES * INTERCARD_LANE_WIDTH, INTERCARD_BUS_WIDTH); +}