Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

224 changes: 224 additions & 0 deletions crates/but-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,227 @@ pub mod platform;
pub mod schema;

pub mod panic_capture;

/// A module for proof-of-concepts
pub mod poc {
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc,
},
thread,
time::{Duration, Instant},
};

/// Either the actual data that is more and more complete, or increments that can be merged
/// into the actual data by the receiver.
/// Sending all data whenever it changes is probably better.
pub struct Data(pub usize);

/// Set `duration` to decide how long the function should run, without blocking the caller.
/// IRL the duration would be determined by the amount of work to be done.
///
/// Use `progress` to see fine-grained progress information, either flat or as [`gix::NestedProgress`]
/// so that a progress tree can be built for complex progress ['visualisations'](https://asciinema.org/a/315956).
/// Please note that the underlying implementation, [`Prodash`](https://github.com/GitoxideLabs/prodash?tab=readme-ov-file),
/// also provides renderers. However, there are other crates as well and there is no reason these shouldn't be used if
/// these seem fitter.
///
/// Use `should_interrupt` which should become `true` if the function should stop processing.
/// It must live outside of `scope`, like any other value borrowed by a scoped thread.
///
/// Return a receiver for the data (incremental or increasingly complete, depending on what's more suitable).
///
/// # Cancellation
///
/// The task can be stopped in two ways:
///
/// - by dropping the receiver
/// - this is easy, but has the disadvantage that the sender only stops once it tries to send the next result and fails
/// doing so.
/// - by setting `should_interrupt` to `true`
/// - this mechanism is fine-grained, and the callee is expected to pull the value often, so it will respond
/// swiftly to cancellation requests.
///
/// # `[but_api]` Integration
///
/// This function can't be `[but_api]` annotated until it learns how to deal with `duration` and more importantly,
/// `progress`, the return channel and how to wire up `should_interrupt`. If we want, any of these could serve as markers
/// to indicate long-runnning functions
///
/// # Why not `async`?
///
/// Our computations are not IO bound but compute bound, so there is no benefit to `async` or `tokio`.
/// And I really, really want to avoid all the issues we will be getting when `async` is used, `but-ctx` and `gix::Repository`
/// do not like `await` and require workarounds.
///
/// At first, the integration should be implemented by hand (i.e. for NAPI) before it's generalised.
pub fn long_running_non_blocking_scoped_thread<'scope, 'env>(
scope: &'scope thread::Scope<'scope, 'env>,
duration: Duration,
progres: impl gix::Progress + 'env,
should_interrupt: &'env AtomicBool,
) -> std::sync::mpsc::Receiver<anyhow::Result<Data>> {
let (tx, rx) = mpsc::channel();
scope.spawn(move || run_long_running_worker(duration, progres, should_interrupt, tx));
rx
}

/// Like [`long_running_non_blocking_scoped_thread()`], but uses a regular thread and an owned
/// cancellation flag so the task can outlive the current stack frame.
pub fn long_running_non_blocking_thread(
duration: Duration,
progres: impl gix::Progress + 'static,
should_interrupt: Arc<AtomicBool>,
) -> std::sync::mpsc::Receiver<anyhow::Result<Data>> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || run_long_running_worker(duration, progres, &should_interrupt, tx));
rx
}

fn run_long_running_worker(
duration: Duration,
mut progres: impl gix::Progress,
should_interrupt: &AtomicBool,
tx: mpsc::Sender<anyhow::Result<Data>>,
) {
const UPDATE_INTERVAL: Duration = Duration::from_millis(100);
const INTERRUPT_POLL_INTERVAL: Duration = Duration::from_millis(20);

let total_steps = usize::max(
1,
duration
.as_millis()
.div_ceil(UPDATE_INTERVAL.as_millis())
.try_into()
.unwrap_or(usize::MAX),
);
let start = Instant::now();

progres.init(Some(total_steps), gix::progress::steps());
progres.set_name("proof of concept task".into());

for step in 1..=total_steps {
let scheduled_at = start + duration.mul_f64(step as f64 / total_steps as f64);
while let Some(remaining) = scheduled_at.checked_duration_since(Instant::now()) {
if should_interrupt.load(Ordering::Relaxed) {
progres.fail(format!("interrupted at step {}/{}", step - 1, total_steps));
return;
}
thread::sleep(remaining.min(INTERRUPT_POLL_INTERVAL));
}

progres.set(step);
if tx.send(Ok(Data(step))).is_err() {
progres.info("receiver dropped".into());
return;
}
}

progres.done("completed".into());
progres.show_throughput(start);
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};

use super::poc;

#[test]
fn long_running_non_blocking_scoped_thread_returns_before_work_completes() {
let should_interrupt = AtomicBool::new(false);

thread::scope(|scope| {
let start = Instant::now();
let rx = poc::long_running_non_blocking_scoped_thread(
scope,
Duration::from_millis(50),
gix::progress::Discard,
&should_interrupt,
);

assert!(start.elapsed() < Duration::from_millis(25));
assert!(matches!(
rx.try_recv(),
Err(std::sync::mpsc::TryRecvError::Empty)
));

let values = rx
.into_iter()
.collect::<anyhow::Result<Vec<_>>>()
.expect("proof-of-concept task should complete");

assert_eq!(values.last().map(|data| data.0), Some(1));
});
}

#[test]
fn long_running_non_blocking_scoped_thread_stops_when_interrupted() {
let should_interrupt = AtomicBool::new(false);

thread::scope(|scope| {
let rx = poc::long_running_non_blocking_scoped_thread(
scope,
Duration::from_millis(200),
gix::progress::Discard,
&should_interrupt,
);
should_interrupt.store(true, Ordering::Relaxed);

assert!(matches!(
rx.recv_timeout(Duration::from_secs(1)),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
));
});
}

#[test]
fn long_running_non_blocking_thread_returns_before_work_completes() {
let should_interrupt = Arc::new(AtomicBool::new(false));
let start = Instant::now();
let rx = poc::long_running_non_blocking_thread(
Duration::from_millis(50),
gix::progress::Discard,
should_interrupt,
);

assert!(start.elapsed() < Duration::from_millis(25));
assert!(matches!(
rx.try_recv(),
Err(std::sync::mpsc::TryRecvError::Empty)
));

let values = rx
.into_iter()
.collect::<anyhow::Result<Vec<_>>>()
.expect("proof-of-concept task should complete");

assert_eq!(values.last().map(|data| data.0), Some(1));
}

#[test]
fn long_running_non_blocking_thread_stops_when_interrupted() {
let should_interrupt = Arc::new(AtomicBool::new(false));
let rx = poc::long_running_non_blocking_thread(
Duration::from_millis(200),
gix::progress::Discard,
should_interrupt.clone(),
);
should_interrupt.store(true, Ordering::Relaxed);

assert!(matches!(
rx.recv_timeout(Duration::from_secs(1)),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
));
}
}
5 changes: 4 additions & 1 deletion crates/but-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ gitbutler-reference.workspace = true
gitbutler-commit = { workspace = true, optional = true, features = ["testing"] }

clap = { workspace = true, features = ["env"] }
gix.workspace = true
gix = { workspace = true, features = ["interrupt"] }
anyhow.workspace = true
itertools.workspace = true
humantime = "2.3.0"
tracing-forest.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
dirs-next = "2.0.0"
serde_json.workspace = true
tokio.workspace = true
but-api = { path = "../but-api" }
prodash = { version = "31.0.0", default-features = false, features = ["progress-tree", "render-line", "render-line-autoconfigure", "render-line-crossterm"] }

[lints]
workspace = true
8 changes: 7 additions & 1 deletion crates/but-testing/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};

use gitbutler_reference::RemoteRefname;
use gitbutler_stack::StackId;
Expand Down Expand Up @@ -269,6 +269,12 @@ pub enum Subcommands {
/// the short-name of the reference to delete.
short_name: String,
},
/// Run the proof-of-concept long-running operation with progress rendering and interrupt handling.
LongRunning {
/// How long the proof-of-concept task should run, like `5s`, `250ms`, or `1m`.
#[arg(short = 'd', long, default_value = "5s", value_parser = humantime::parse_duration)]
duration: Duration,
},
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/but-testing/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use gitbutler_branch_actions::BranchListingFilter;
use crate::command::discard_change::IndicesOrHeaders;

pub mod diff;
pub mod poc;
pub mod project;

pub mod assignment {
Expand Down
63 changes: 63 additions & 0 deletions crates/but-testing/src/command/poc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{io, sync::Arc, thread, time::Duration};

use anyhow::{Result, bail};

const DEFAULT_FRAME_RATE: f32 = 60.0;

pub fn long_running(duration: Duration, trace: bool) -> Result<()> {
#[allow(unsafe_code)]
let _interrupt_handler = unsafe {
// SAFETY: The closure only calls a function that in turn sets an atomic. No memory handling is triggered.
gix::interrupt::init_handler(1, gix::interrupt::trigger)?.auto_deregister()
};

let progress = progress_tree(trace);
let renderer = setup_line_renderer(&progress);
let result = thread::scope(|scope| -> Result<Option<usize>> {
let rx = but_api::poc::long_running_non_blocking_scoped_thread(
scope,
duration,
progress.add_child("long-running"),
&gix::interrupt::IS_INTERRUPTED,
);
let mut last = None;
for data in rx {
last = Some(data?.0);
}
Ok(last)
});
renderer.shutdown_and_wait();

let last = result?;
if gix::interrupt::is_triggered() {
bail!("Interrupted");
}
if let Some(last) = last {
println!("Completed {last} step(s).");
}
Ok(())
}

fn progress_tree(trace: bool) -> Arc<prodash::tree::Root> {
prodash::tree::root::Options {
message_buffer_capacity: if trace { 10_000 } else { 200 },
..Default::default()
}
.into()
}

fn setup_line_renderer(progress: &Arc<prodash::tree::Root>) -> prodash::render::line::JoinHandle {
prodash::render::line(
io::stderr(),
Arc::downgrade(progress),
prodash::render::line::Options {
frames_per_second: DEFAULT_FRAME_RATE,
initial_delay: Some(Duration::from_millis(500)),
timestamp: true,
throughput: true,
hide_cursor: true,
..prodash::render::line::Options::default()
}
.auto_configure(prodash::render::line::StreamKind::Stderr),
)
}
Loading
Loading