Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/concurrent/06_dynamic_spawn/TIMEOUT
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5
19 changes: 19 additions & 0 deletions benchmarks/concurrent/11_parallel_aggregation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ merge is a single sequential pass over 1,000 entries.
SHARD amortizes well when per-item work is expensive (parsing,
transformation, I/O). For simple counting it is over-engineered.

## Current profile note

`SHARD(...) s> CONCURRENT EACH` now runs as real shard-parallel work: one
producer routes keys into per-shard bounded queues, and one worker fiber drains
each shard. `clear profile` shows the shard workers distributed across
schedulers, so the old failure mode (a single serial SHARD loop) is no longer
the limiting factor.

The remaining cost is per-item transport. Every event still pays for a channel
push, channel pop, scheduler wake/drain work, and a tiny map increment. The
profile is therefore dominated by worker dispatch and scheduler/channel
overhead rather than the histogram update itself.

Runtime batching would reduce this overhead by pushing groups of keys through
each shard queue while preserving per-item CLEAR semantics. That is deferred
to v0.2. `WINDOW(size: N)` is not a replacement here because it changes `_`
into a batch array; the optimization needed for SHARD is an internal transport
batch, not a user-visible pipeline batch.

## TODO: PARALLEL FOLD primitive

The remaining gap is structural. SHARD routes each item to its owning fiber
Expand Down
38 changes: 38 additions & 0 deletions benchmarks/concurrent/12_false_sharing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,44 @@ Tests whether CLEAR's `@shared:locked` eliminates false sharing by construction.
| Go heap-alloc (racy) | ~3ms | n/a - no mutex |
| C padded (racy) | ~3ms | n/a - no mutex |

## CLEAR scheduler mode

This benchmark should use stackful CLEAR workers, for example
`BG { @standard:@parallel -> ... }`.

The worker task is deliberately tiny:

```clear
FOR j IN (0_i64 ..< increments) DO
WITH EXCLUSIVE ref AS inner {
inner.value = inner.value + 1;
}
END
```

Each worker repeats an uncontended lock, one integer increment, and unlock roughly
1.25M times. There is no I/O, no meaningful blocking, and only `threadCount()`
long-running workers, so per-task memory is not the limiting factor. The hot cost
is per-iteration dispatch.

FSM workers are correct here, but they are the wrong tradeoff for this shape. The
FSM lowering must preserve resumable lock semantics, so each `WITH EXCLUSIVE`
goes through the FSM lock protocol, state dispatch, body segment, unlock segment,
and cleanup bookkeeping. Stackful workers lower to a tight acquire/body/release
loop. On the 32-thread benchmark, the fixed FSM path was about 2x slower than the
stackful path for this specific workload.

The memory tradeoff goes the other direction. FSM tasks avoid per-fiber stacks;
the runtime benchmark reports a compact `FsmTask` plus small state storage versus
a stackful task with `Task`, `Fiber`, and a reserved stack. That is the right trade
for huge numbers of parked, blocked, or lightly suspended tasks. It is not the
right trade for a small number of CPU-bound workers executing millions of tiny
critical sections.

This benchmark is therefore an example of why CLEAR supports both models:
use FSMs when task count and memory footprint dominate, and use stackful fibers
when hot-loop compute throughput dominates.

## Interpretation

**CLEAR vs Rust Arc<Mutex>**: same mechanism (heap alloc + mutex), CLEAR is ~2x faster.
Expand Down
Binary file removed benchmarks/concurrent/12_false_sharing/bench
Binary file not shown.
2 changes: 1 addition & 1 deletion benchmarks/concurrent/12_false_sharing/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ FN main() RETURNS Void ->
MUTABLE wi = 0_i64;
WHILE wi < workers DO
ref = counters[wi];
futures.append(BG { @parallel ->
futures.append(BG { @standard:@parallel ->
FOR j IN (0_i64 ..< increments) DO
WITH EXCLUSIVE ref AS inner {
inner.value = inner.value + 1;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/concurrent/14_nested_lock/TIMEOUT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5
10
25 changes: 0 additions & 25 deletions benchmarks/concurrent/15_fsm_vs_stackful/bench_stackful.cht

This file was deleted.

68 changes: 21 additions & 47 deletions benchmarks/concurrent/16_observables/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Concurrent Observables — CLEAR vs Go vs Rust

Benchmarks the lock-free `@observable` runtime backing CLEAR's
`~T@observable` types against equivalents in Go and Rust, plus
`@locked` baselines in each.
Benchmarks the CLEAR-language `~T@observable` pipeline-terminal
form against matching Go and Rust stream/channel implementations.

Two distinct measurements live in this directory:

Expand All @@ -12,11 +11,10 @@ Two distinct measurements live in this directory:
join via `NEXT`. Measures end-to-end cost of the language form,
including stream yield/resume overhead.

2. **Cross-language atomic-counter comparison** (`bench_clear.zig`,
`bench.go`, `bench.rs`): 1 writer thread + K reader threads
hammer a shared atomic accumulator. Measures the underlying
lock-free runtime (`obs.AtomicSum` for CLEAR; `atomic.Int64` for
Go; `AtomicI64` for Rust) head-to-head with no language overhead.
2. **Runtime-level helper** (`bench_clear.zig`): hand-written Zig
for isolating `obs.AtomicSum` itself. This is not used by the
benchmark runner's CLEAR headline result because it is not `.cht`
code.

```clear
-- bench.cht (the canonical CLEAR form)
Expand All @@ -25,36 +23,29 @@ final = NEXT running;
```

The compiler heap-allocates an `*ObservableSum(i64)` plus a
WaitGroup, **spawns a CONSUMER fiber cross-scheduler** that pulls
from `gen` and calls `.add(item)` per emit, and `NEXT` parks main
on the WG until the consumer's `defer ctx.acc.finish()` issues
`wg.done()`. Producer (BG STREAM gen), consumer fiber, and main
all run on different worker threads in the default multi-threaded
runtime, so the fold genuinely overlaps with the joiner.
WaitGroup, spawns a consumer fiber that pulls from `gen` and calls
`.add(item)` per emit, and `NEXT` parks main on the WG until the
consumer's `defer ctx.acc.finish()` issues `wg.done()`.

## Workload

- 1 writer producing 5,000,000 increments
- K reader threads each calling `view()` until the writer finishes
(K ∈ {1, 4, 8})
- 1 producer emitting `0..2,000,000`
- 1 consumer summing the stream
- 1 joiner waiting for the final sum
- deterministic checksum: `sum(0..N-1) + N * 131`

## Results (this box, ReleaseFast / `-O` / `--release`)

### CLEAR-language pipeline form (`bench.cht` → `./clear build --optimized`)

```
CLEAR observable: 12499997500000 (sum 0..N-1) in 61 ms (~12 ns/item)
```

5M values produced by a `BG STREAM`, folded via `s> SUM _` (which
auto-produces a `~Int64@observable`), and joined via `NEXT`. The
producer (BG STREAM gen), consumer fiber (spawned by the SUM emit
cross-scheduler), and main (parked on the observable's WaitGroup)
all run on different worker threads concurrently:
2M values are produced by a `BG STREAM`, folded via `s> SUM _`
(which auto-produces a `~Int64@observable`), and joined via `NEXT`.
`bench.go` and `bench.rs` mirror this shape with bounded channels:
producer -> consumer sum -> join.

```clear
FN main() RETURNS Void ->
n_writes: Int64 = 5_000_000_i64;
n_writes: Int64 = 2_000_000_i64;
gen: ~?Int64[] = BG STREAM {
MUTABLE i: Int64 = 0_i64;
WHILE i < n_writes DO YIELD i; i = i + 1_i64; END
Expand All @@ -70,25 +61,8 @@ FN main() RETURNS Void ->
END
```

The 12 ns/item is the language-form cost: BG STREAM yield/resume
overhead per item + cross-scheduler atomic add + WaitGroup join.
The pure atomic-add-only number (no stream fiber, no consumer
fiber spawn) lives in the cross-language reader-stress table below.

### Concurrent reader stress (`bench_clear.zig` / `bench.go` / `bench.rs`)

1 writer thread + K reader threads hammering the shared atomic
counter. Same workload across all four implementations. (Median of
3 runs; high variance on hot CPUs, especially at 8 readers.)

| Variant | 1 reader | 4 readers | 8 readers |
|--------------------------------------|------------------------|------------------------|------------------------|
| **CLEAR `obs.AtomicSum`** | 37 ns/inc, **142 M r/s** | 67 ns/inc, **466 M r/s** | 88 ns/inc, **831 M r/s** |
| Go `atomic.Int64` | 30 ns/inc, 950 M r/s | 50 ns/inc, 2.97 G r/s | 53 ns/inc, 4.31 G r/s |
| Rust `AtomicI64` | 35 ns/inc, 400 M r/s | 50 ns/inc, 1.78 G r/s | 58 ns/inc, 2.40 G r/s |
| **CLEAR `compat.Mutex`** (@locked) | 130 ns/inc, 7.4 M r/s | 416 ns/inc, 9.5 M r/s | 797 ns/inc, 10.2 M r/s |
| Go `sync.Mutex` | 63 ns/inc, 25 M r/s | 384 ns/inc, 11 M r/s | 1220 ns/inc, 7.7 M r/s |
| Rust `Mutex<i64>` | 157 ns/inc, 6.9 M r/s | 288 ns/inc, 4.8 M r/s | 430 ns/inc, 6.6 M r/s |
The measured cost is the language-form cost: BG STREAM yield/resume
overhead per item + observable accumulator add + WaitGroup join.

### Perf optimization round (`obs.AtomicSum` vs raw `std.atomic.Value`)

Expand Down Expand Up @@ -152,7 +126,7 @@ const __obs_acc = CheatLib.obs.ObservableSum(i64).new(rt.heapAlloc()) catch unre
const __obs_wg = rt.heapAlloc().create(CheatHeader.WaitGroup) catch unreachable;
__obs_wg.* = CheatHeader.WaitGroup.init(rt.getSched()); __obs_wg.add(1);
__obs_acc.setCompletion(@ptrCast(__obs_wg), CheatHeader.obsWgDone, CheatHeader.obsWgWait, CheatHeader.obsWgDestroy);
// Spawn consumer fiber cross-scheduler:
// Spawn consumer fiber:
const ConsumerCtx = struct { acc: *..., gen: ..., fn run(...) {
defer ctx.acc.finish(); // wg.done() via callback
while (try ctx.gen.next()) |it| ctx.acc.add(it);
Expand Down
1 change: 1 addition & 0 deletions benchmarks/concurrent/16_observables/TIMEOUT
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10
25 changes: 12 additions & 13 deletions benchmarks/concurrent/16_observables/bench.cht
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,18 @@
--
-- The pipeline `gen s> SUM _` heap-allocates an `*ObservableSum(i64)`
-- and spawns a CONSUMER fiber that pulls from `gen` and calls
-- `.add(item)` per emit. The consumer fiber is unpinned, so in the
-- default multi-threaded runtime it runs on a sibling worker
-- thread concurrently with the BG STREAM gen producer fiber.
-- `.add(item)` per emit. The consumer stays on the source scheduler
-- so stream wakeups remain local; cross-scheduler stream wakeups are
-- covered separately by runtime tests.
-- `NEXT running` parks main on the observable's WaitGroup and
-- wakes when the consumer publishes `.finish()`.
--
-- The metric is wall time for 5M concurrent producer→consumer→
-- accumulator emits, the same workload bench.go and bench.rs
-- measure. The comparable read-side hot-poll (`WHILE current <
-- expected DO WITH VIEW running AS s ... END`) is currently
-- gated by a scheduler limitation -- pinned-vs-ready pickNext
-- (scheduler.zig:826-830) starves an unpinned producer when the
-- joiner is the pinned main fiber. The cross-language
-- many-readers comparison lives in `bench_clear.zig` (real OS
-- threads + obs.AtomicSum directly).
-- The metric is wall time for 2M producer->consumer->accumulator
-- emits. bench.go and bench.rs mirror this shape with channels:
-- one producer, one consumer sum, and a join.

FN main() RETURNS Void ->
n_writes: Int64 = 5_000_000_i64;
n_writes: Int64 = 2_000_000_i64;
expected: Int64 = (n_writes * (n_writes - 1_i64)) / 2_i64;

gen: ~?Int64[] = BG STREAM {
Expand All @@ -34,8 +28,13 @@ FN main() RETURNS Void ->
running: ~Int64@observable = gen s> SUM _;
final = NEXT running;
elapsed = timestampMs() - t0;
checksum: Int64 = final + (n_writes * 131_i64);
expected_checksum: Int64 = expected + (n_writes * 131_i64);

ASSERT final == expected, "final mismatch";
ASSERT checksum == expected_checksum, "checksum mismatch";
print("CLEAR observable: ", final, " (sum 0..N-1) in ", elapsed, " ms");
print("BENCH_INFO: CLEAR stream_sum final=", final, " checksum=", checksum, " n=", n_writes);
print("BENCH_RESULT: ", elapsed, " ms");
RETURN;
END
Loading
Loading