Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ resolver = "2"
members = [
"src/runtime",
"src/backends/ggml",
"src/scheduler",
]

[workspace.package]
Expand Down
23 changes: 23 additions & 0 deletions src/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2026 PopSolutions Cooperative

[package]
name = "spanker-scheduler"
version = "0.1.0"
description = "Distributed scheduler for the PopSolutions Sails — multi-card topology, collective ops."
keywords = ["scheduler", "distributed", "popsolutions", "fpga", "collective"]
categories = ["hardware-support", "concurrency"]

edition.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
rust-version.workspace = true

[lib]
name = "spanker_scheduler"
path = "src/lib.rs"

[dependencies]
spanker-runtime = { path = "../runtime" }
thiserror = { workspace = true }
229 changes: 229 additions & 0 deletions src/scheduler/src/collective.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026 PopSolutions Cooperative

//! Collective-ops trait surfaces and host-side mock impls.
//!
//! [`AllReduce`] and [`AllGather`] are the data-path collective
//! ops the scheduler exposes to the runtime. Real device impls
//! land alongside the inter-card link protocol (ADR-014, MAST
//! cross-stream issue filed alongside this PR).
//!
//! [`TensorParallel`] and [`ModelParallel`] are minimal marker
//! traits today — they expose `shard_count` so the runtime can
//! plan partitioning without committing to a tensor type. Bodies
//! grow when the GGML matmul (PR #5b) needs concrete shard
//! geometry.

use spanker_runtime::SpankerControl;

use crate::topology::{MockSail, Topology};
use crate::{Error, Result};

/// Reduction operation for [`AllReduce`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReduceOp {
/// Element-wise sum across cards.
Sum,
/// Element-wise maximum across cards.
Max,
/// Element-wise minimum across cards.
Min,
/// Element-wise average across cards.
Avg,
}

/// Reduce a per-card array across all sails using `op`. After
/// the call every per-card buffer contains the same reduced
/// result.
pub trait AllReduce {
/// Reduce `per_card[i]` across all `i` using `op`. All
/// buffers must have the same length; mismatched shapes
/// produce [`Error::ShapeMismatch`]. The number of buffers
/// must equal the topology size.
fn all_reduce_f32(&self, per_card: &mut [Vec<f32>], op: ReduceOp) -> Result<()>;
}

/// Gather per-card buffers into a single flat result.
pub trait AllGather {
/// Concatenate `per_card[0..n]` in topology-index order and
/// return the flat buffer. The number of buffers must equal
/// the topology size.
fn all_gather_f32(&self, per_card: &[Vec<f32>]) -> Result<Vec<f32>>;
}

/// Tensor-parallel partitioning interface.
///
/// Skeleton today: only exposes the shard count. Real geometry
/// arrives with PR #5b.
pub trait TensorParallel {
/// Number of shards a tensor would be split into.
fn shard_count(&self) -> usize;
}

/// Model-parallel (layer-sharded) partitioning interface.
///
/// Skeleton today: only exposes the shard count.
pub trait ModelParallel {
/// Number of shards a model would be split across.
fn shard_count(&self) -> usize;
}

impl<H> TensorParallel for Topology<H> {
fn shard_count(&self) -> usize {
self.n_sails()
}
}

impl<H> ModelParallel for Topology<H> {
fn shard_count(&self) -> usize {
self.n_sails()
}
}

// -- mock impls (host-side simulation) --

fn validate_uniform<T>(per_card: &[Vec<T>], n_sails: usize) -> Result<usize> {
if per_card.len() != n_sails {
return Err(Error::TopologyMismatch {
expected: n_sails,
actual: per_card.len(),
});
}
let stride = per_card.first().map(|v| v.len()).unwrap_or(0);
for (i, v) in per_card.iter().enumerate() {
if v.len() != stride {
return Err(Error::ShapeMismatch {
sail: i,
expected: stride,
actual: v.len(),
});
}
}
Ok(stride)
}

impl AllReduce for Topology<MockSail> {
fn all_reduce_f32(&self, per_card: &mut [Vec<f32>], op: ReduceOp) -> Result<()> {
let stride = validate_uniform(per_card, self.n_sails())?;
if stride == 0 {
return Ok(());
}

let n = per_card.len() as f32;
let mut reduced = vec![0.0f32; stride];
for i in 0..stride {
let initial = per_card[0][i];
let acc = match op {
ReduceOp::Sum | ReduceOp::Avg => {
let mut s = 0.0f32;
for v in per_card.iter() {
s += v[i];
}
if matches!(op, ReduceOp::Avg) {
s / n
} else {
s
}
}
ReduceOp::Max => {
let mut m = initial;
for v in per_card.iter().skip(1) {
if v[i] > m {
m = v[i];
}
}
m
}
ReduceOp::Min => {
let mut m = initial;
for v in per_card.iter().skip(1) {
if v[i] < m {
m = v[i];
}
}
m
}
};
reduced[i] = acc;
}

for v in per_card.iter_mut() {
v.clone_from(&reduced);
}
Ok(())
}
}

impl AllGather for Topology<MockSail> {
fn all_gather_f32(&self, per_card: &[Vec<f32>]) -> Result<Vec<f32>> {
let stride = validate_uniform(per_card, self.n_sails())?;
let mut out = Vec::with_capacity(stride * self.n_sails());
for v in per_card {
out.extend_from_slice(v);
}
Ok(out)
}
}

// -- real-device impls (deferred) --

impl AllReduce for Topology<SpankerControl> {
fn all_reduce_f32(&self, _per_card: &mut [Vec<f32>], _op: ReduceOp) -> Result<()> {
Err(Error::NotImplemented(
"AllReduce on real device requires SPANKER_IOC_WORK_SUBMIT (PR #6b)",
))
}
}

impl AllGather for Topology<SpankerControl> {
fn all_gather_f32(&self, _per_card: &[Vec<f32>]) -> Result<Vec<f32>> {
Err(Error::NotImplemented(
"AllGather on real device requires SPANKER_IOC_WORK_SUBMIT (PR #6b)",
))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn topology_shard_count_matches_n_sails() {
let t = Topology::<MockSail>::with_mock(3);
assert_eq!(<Topology<MockSail> as TensorParallel>::shard_count(&t), 3);
assert_eq!(<Topology<MockSail> as ModelParallel>::shard_count(&t), 3);
}

#[test]
fn all_reduce_sum_topology_mismatch() {
let t = Topology::<MockSail>::with_mock(2);
let mut per_card = vec![vec![1.0f32, 2.0], vec![3.0, 4.0], vec![5.0, 6.0]];
let err = t
.all_reduce_f32(&mut per_card, ReduceOp::Sum)
.expect_err("expected TopologyMismatch");
assert!(matches!(
err,
Error::TopologyMismatch {
expected: 2,
actual: 3
}
));
}

#[test]
fn all_reduce_sum_shape_mismatch() {
let t = Topology::<MockSail>::with_mock(2);
let mut per_card = vec![vec![1.0f32, 2.0], vec![3.0]];
let err = t
.all_reduce_f32(&mut per_card, ReduceOp::Sum)
.expect_err("expected ShapeMismatch");
assert!(matches!(
err,
Error::ShapeMismatch {
sail: 1,
expected: 2,
actual: 1
}
));
}
}
71 changes: 71 additions & 0 deletions src/scheduler/src/intercard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026 PopSolutions Cooperative

//! Inter-card link contract — Rust mirror of the constants and
//! `link_state_t` enum landed by MAST #14 (intercard skeleton).
//!
//! Until ADR-014 (inter-card link architecture choice) lands the
//! actual protocol, this module exposes only the *shape* of the
//! link surface so the scheduler can reason about topology
//! without committing to a wire protocol. Real bandwidth and
//! latency numbers come from the cross-stream issue against
//! MAST filed alongside this PR.

/// Number of high-speed lanes per inter-card link. Default per
/// MAST #14 is 4; PCB rev-A may parametrise per Sail variant.
pub const INTERCARD_LANES: usize = 4;

/// Width of one lane, in bits. Mirrors MAST #14's
/// `INTERCARD_LANE_WIDTH = 32`.
pub const INTERCARD_LANE_WIDTH: usize = 32;

/// Aggregate bus width seen by the link MAC, in bits. Mirrors
/// MAST #14's `INTERCARD_BUS_WIDTH = 128` (default).
pub const INTERCARD_BUS_WIDTH: usize = 128;

/// State of a single inter-card link, mirroring `link_state_t`
/// in MAST #14.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LinkState {
/// Link is down; no traffic.
Down,
/// Lane training in progress.
Training,
/// Link is up and ready for traffic.
Up,
/// Hard error; link must be retrained or replaced.
Error,
}

/// A single point-to-point link between two sails in a topology.
///
/// `local_sail` and `remote_sail` are indices into
/// [`crate::Topology::sails`]; the protocol that flows over the
/// link is opaque to this crate and lands in ADR-014.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Link {
/// Topology index of the originating sail.
pub local_sail: usize,
/// Topology index of the peer sail.
pub remote_sail: usize,
/// Current state of this link.
pub state: LinkState,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn intercard_constants_match_mast_14() {
// MAST #14 contract: 4 lanes × 32 bits = 128-bit bus.
assert_eq!(INTERCARD_LANES * INTERCARD_LANE_WIDTH, INTERCARD_BUS_WIDTH);
}

#[test]
fn link_state_is_copy() {
let s = LinkState::Up;
let _ = s;
let _ = s; // would fail to compile if !Copy
}
}
Loading
Loading