Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ jobs:
# tests under LLVM (slow compile, ~5-15x runtime overhead). Single
# job hit the 60min ceiling; per-shard fits in 30min. fail-fast:
# false so one flaky shard doesn't mask the rest.
#
# Serialize test binaries within each shard with `-j1`. The shards
# themselves still run in parallel, but running multiple TSan-
# instrumented scheduler tests concurrently inside one constrained
# GitHub runner can starve/wedge progress and has hung shard 3/5 in
# `spsc-scheduler-test` while the same binary passes repeatedly alone.
name: Zig TSan (shard ${{ matrix.shard }}/5)
runs-on: ubuntu-latest
timeout-minutes: 30
Expand All @@ -192,7 +198,7 @@ jobs:
with:
version: ${{ env.ZIG_VERSION }}
- working-directory: zig
run: zig build test-tsan -Dshard-index=${{ matrix.shard }} -Dshard-count=5
run: zig build test-tsan -Dshard-index=${{ matrix.shard }} -Dshard-count=5 -j1

zig-hammer:
# Hammer stress tests (`*-hammer-test.zig`): high-iteration
Expand Down
56 changes: 56 additions & 0 deletions examples/parallel_du/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# pdu

Parallel recursive directory size calculator in CLEAR.

`du.cht` scans the current directory, maps entries through a concurrent
pipeline, and sums apparent file sizes for files and directories. It is
intended as a direct, filesystem-heavy comparison point for Rust's `diskus`.

`bench.rb` is only a harness: it builds the CLEAR binary and times external
commands. The directory traversal being measured is `du.cht` versus `diskus`,
not Ruby. The harness runs both tools with the same thread count, verifies that
the reported byte totals match, and reports wall-clock time plus max RSS.

## Build

```bash
BUNDLE_WITHOUT=development ./clear build --optimized examples/parallel_du/du.cht -o examples/parallel_du/pdu
```

## Run

```bash
cd path/to/scan
/path/to/easy-vm/examples/parallel_du/pdu
```

## Benchmark

```bash
ruby examples/parallel_du/bench.rb src 5
```

The benchmark runs `pdu` and `diskus --apparent-size` from the target directory.
By default it uses `nproc` worker threads for both tools. Override with
`BENCH_CORES=N` or `CLEAR_THREADS=N`.
If no target is supplied, the harness scans `src/`.

Known limitations:

- `pdu` currently scans one root path: the benchmark runs from the target
directory and scans `.`.
- `pdu` reports apparent file size only. `diskus` also supports disk-usage
blocks, multiple input paths, human-readable formatting, and verbose error
reporting.
- `diskus` deduplicates hard-linked files by device/inode. `pdu` does not.
The benchmark aborts on a size mismatch, so trees with hardlinks are expected
to fail until `pdu` grows the same deduplication behavior.
- `pdu` ignores non-file/non-directory entries returned by `listAll`.
- The benchmark is a workload comparison, not a full semantic replacement
claim for `diskus`.

If `diskus` is not installed on this machine:

```bash
cargo install diskus --version 0.7.0 --locked
```
135 changes: 135 additions & 0 deletions examples/parallel_du/bench.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env ruby

require 'benchmark'
require 'open3'

ROOT = File.expand_path('../..', __dir__)
EXAMPLE_DIR = File.expand_path(__dir__)
SOURCE = File.join(EXAMPLE_DIR, 'du.cht')
BINARY = File.join(EXAMPLE_DIR, 'pdu')
DISKUS = ENV['DISKUS'] || File.join(Dir.home, '.cargo', 'bin', 'diskus')

def usage!
warn "usage: ruby examples/parallel_du/bench.rb [path] [runs]"
exit 1
end

def run_checked(env, argv, chdir:, timeout:)
out, err, status = Open3.capture3(env, 'timeout', "#{timeout}s", *argv, chdir: chdir)
return out.empty? ? err : out if status.success?

warn "command failed: #{argv.join(' ')}"
warn "timed out after #{timeout}s" if status.exitstatus == 124
warn err unless err.empty?
warn out unless out.empty?
exit status.exitstatus || 1
end

def run_measured(env, argv, chdir:, timeout:)
out, err, status = Open3.capture3(
env,
'/usr/bin/time',
'-f',
'__MAX_RSS_KB=%M',
'timeout',
"#{timeout}s",
*argv,
chdir: chdir
)
rss_line = err.lines.find { |line| line.start_with?('__MAX_RSS_KB=') }
rss_kb = rss_line&.split('=', 2)&.last&.to_i
clean_err = err.lines.reject { |line| line.start_with?('__MAX_RSS_KB=') }.join

if status.success?
return [out.empty? ? clean_err : out, rss_kb]
end

warn "command failed: #{argv.join(' ')}"
warn "timed out after #{timeout}s" if status.exitstatus == 124
warn clean_err unless clean_err.empty?
warn out unless out.empty?
exit status.exitstatus || 1
end

def command_available?(path)
return File.executable?(path) if path.include?(File::SEPARATOR)

ENV.fetch('PATH', '').split(File::PATH_SEPARATOR).any? do |dir|
File.executable?(File.join(dir, path))
end
end

def best_and_median(samples)
sorted = samples.sort
[sorted.first, sorted[sorted.length / 2]]
end

def parse_size(output)
first_line = output.lines.first&.strip || ''
match = first_line.match(/\A([0-9]+)/)
abort "could not parse byte count from output: #{first_line.inspect}" unless match
match[1].to_i
end

target = File.expand_path(ARGV[0] || File.join(ROOT, 'src'))
runs = (ARGV[1] || ENV['RUNS'] || '5').to_i
timeout = (ENV['BENCH_TIMEOUT'] || '30').to_i
usage! if runs <= 0
abort "BENCH_TIMEOUT must be positive" if timeout <= 0
abort "target does not exist: #{target}" unless File.exist?(target)

nproc = `nproc 2>/dev/null`.strip.to_i
nproc = 1 if nproc <= 0
default_threads = nproc.to_s
threads = ENV['CLEAR_THREADS'] || ENV['BENCH_CORES'] || default_threads

puts "Building pdu..."
run_checked(
{ 'BUNDLE_WITHOUT' => 'development' },
['./clear', 'build', '--optimized', SOURCE, '-o', BINARY],
chdir: ROOT,
timeout: timeout
)

unless command_available?(DISKUS)
warn "diskus not found at #{DISKUS}"
warn "Install with: cargo install diskus --version 0.7.0 --locked"
exit 1
end

tools = [
['pdu', { 'CLEAR_THREADS' => threads }, [BINARY]],
['diskus', {}, [DISKUS, '--threads', threads, '--apparent-size', '.']]
]

puts "Target: #{target}"
puts "Runs: #{runs}"
puts "Timeout: #{timeout}s"
puts "Threads: #{threads}"
puts

expected_size = nil
tools.each do |name, env, argv|
times = []
rss_samples = []
last_output = nil
last_size = nil

runs.times do
elapsed = Benchmark.realtime do
last_output, rss_kb = run_measured(env, argv, chdir: target, timeout: timeout)
rss_samples << rss_kb if rss_kb
end
last_size = parse_size(last_output)
times << elapsed
end

expected_size ||= last_size
abort "size mismatch: #{name} returned #{last_size}, expected #{expected_size}" if last_size != expected_size

best, median = best_and_median(times)
rss_best, rss_median = best_and_median(rss_samples)
first_line = last_output.lines.first&.strip || ''
puts "%-8s best %.4fs median %.4fs rss_best %dKB rss_median %dKB %s" %
[name, best, median, rss_best, rss_median, first_line]
end
63 changes: 31 additions & 32 deletions examples/parallel_du/du.cht
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
-- Parallel du — recursive directory size calculator
-- pdu: parallel recursive directory size calculator
--
-- Scans a directory tree, summing file sizes.
-- Spawns BG fibers for subdirectories to scan in parallel.
-- Exercises: File I/O (fileSize, listAll), string parsing, BG fibers, promise lists.
-- Scans the current directory tree, summing apparent file sizes.
-- Uses a concurrent pipeline so independent entries can run in parallel.
--
-- Usage: ./du
-- Compare: du -sb .
-- Usage: ./pdu
-- Compare: du -sb . or diskus --apparent-size .

FN scanDir(path: String) RETURNS !Int64 EFFECTS REENTRANT ->
entries = listAll(path);
MUTABLE total: Int64 = 0;
FN entrySize(entry: String)
RETURNS !Int64
EFFECTS REENTRANT
->
fullPath = substr(entry, 2, entry.length() - 2);
prefix = substr(entry, 0, 2);

-- Collect subdirectory scan promises for parallel execution.
futures: ~Int64[]@list = [];
IF prefix == "f:" THEN
RETURN fileSize(fullPath);
ELSE_IF prefix == "d:" THEN
RETURN scanDir(fullPath);
END

FOR i IN (0_i64 ..< entries.length()) DO
entry = entries[i];
prefix = substr(entry, 0, 2);
name = substr(entry, 2, entry.length() - 2);
fullPath = "${path}/${name}";
RETURN 0;
END

IF prefix == "f:" THEN
size = fileSize(fullPath);
IF size > 0 -> total += size;
ELSE_IF prefix == "d:" THEN
append(futures, BG { @service ->
scanDir(fullPath);
});
END
END
FN scanDir(path: String)
RETURNS !Int64
EFFECTS REENTRANT
->
entries = listAll(path)
s> SELECT "${substr(_, 0, 2)}${path}/${substr(_, 2, _.length() - 2)}";

-- Wait for all subdirectory scans and sum their results.
FOR j IN (0_i64 ..< futures.length()) ->
total += NEXT futures[j];
children = entries
s> CONCURRENT SELECT entrySize(_)
s> REDUCE(0) acc + _;

RETURN total;
RETURN fileSize(path) + children;
END

FN main() RETURNS Void ->
path = ".";
total = scanDir(path);
print("${total.toString()}\t${path}");
path = ".";
total = scanDir(path);
print("${total.toString()}\t${path}");
END
69 changes: 69 additions & 0 deletions spec/clear_fmt_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,75 @@ def write(name, content)
expect(out).to include("\n s> SELECT _ * 2\n s> SUM _")
end

it "keeps top-level FNs at column 0 after ELSE_IF blocks in metadata-wrapped functions" do
src = <<~CLEAR
FN entrySize(entry: String) RETURNS !Int64 EFFECTS REENTRANT ->
IF entry == "f:" THEN
RETURN 1;
ELSE_IF entry == "d:" THEN
RETURN 2;
END

RETURN 0;
END

FN scanDir(path: String) RETURNS !Int64 EFFECTS REENTRANT ->
RETURN entrySize(path);
END

FN main() RETURNS Void ->
RETURN;
END
CLEAR
path = write("elseif_fn.cht", src)
out, _, _ = run_fmt("--stdout", path)
expect(out).to include("ELSE_IF entry == \"d:\" THEN\n RETURN 2;\n END\n\n RETURN 0;\nEND\n\nFN scanDir")
expect(out).to include("\nFN main() RETURNS Void ->\n")
end

it "keeps body indentation balanced after multiple ELSE_IF outdents" do
src = <<~CLEAR
FN classify(x: Int64) RETURNS Int64 ->
IF x == 0 THEN
RETURN 0;
ELSE_IF x == 1 THEN
RETURN 1;
ELSE_IF x == 2 THEN
RETURN 2;
ELSE
RETURN 3;
END

RETURN 4;
END

FN after() RETURNS Void ->
RETURN;
END
CLEAR
path = write("elseif_balance.cht", src)
out, _, _ = run_fmt("--stdout", path)
expect(out).to include(" ELSE_IF x == 1 THEN\n RETURN 1;\n ELSE_IF x == 2 THEN\n")
expect(out).to include(" ELSE\n RETURN 3;\n END\n\n RETURN 4;\nEND\n\nFN after")
end

it "indents an existing single-stage pipeline continuation one level past the receiver" do
src = <<~CLEAR
FN scanDir(path: String) RETURNS !Int64 EFFECTS REENTRANT ->
entries = listAll(path)
s> SELECT _;

RETURN entries
s> CONCURRENT SELECT _
s> REDUCE(0_i64) acc + _;
END
CLEAR
path = write("pipeline_cont.cht", src)
out, _, _ = run_fmt("--stdout", path)
expect(out).to include(" entries = listAll(path)\n s> SELECT _;\n")
expect(out).to include(" RETURN entries\n s> CONCURRENT SELECT _\n s> REDUCE(0) acc + _;\n")
end

it "gives `s> RECOVER(...)` one extra indent relative to sibling stages" do
src = <<~CLEAR
FN main() RETURNS Int64 ->
Expand Down
Loading
Loading