diff --git a/.bazelignore b/.bazelignore index f41c42fd7..4a93c4add 100644 --- a/.bazelignore +++ b/.bazelignore @@ -1,3 +1,4 @@ # Cargo output directory target/ -bazel/proto/ \ No newline at end of file +bazel/proto/ +examples/large_bes \ No newline at end of file diff --git a/.buildkite/pipeline.yaml b/.buildkite/pipeline.yaml index 2f431218d..ce35edc47 100644 --- a/.buildkite/pipeline.yaml +++ b/.buildkite/pipeline.yaml @@ -1,7 +1,7 @@ steps: - label: ":aspect: Test" agents: - queue: aspect-default + queue: aspect-huge command: | echo "--- :aspect-build: Workflows environment" /etc/aspect/workflows/bin/configure_workflows_env diff --git a/Cargo.lock b/Cargo.lock index 1d7ee3788..402d6c138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1489,6 +1489,7 @@ dependencies = [ name = "galvanize" version = "0.0.0-dev" dependencies = [ + "libc", "nix 0.30.1", "proc_pidinfo", "procfs", diff --git a/crates/galvanize/BUILD.bazel b/crates/galvanize/BUILD.bazel index bcc5e92ad..8e7b45669 100644 --- a/crates/galvanize/BUILD.bazel +++ b/crates/galvanize/BUILD.bazel @@ -14,7 +14,8 @@ rust_library( name = "galvanize", srcs = glob(["src/**/*.rs"]), deps = [ - "@crates//:nix" + "@crates//:nix", + "@crates//:libc" ] + select({ ":macos": ["@crates//:proc_pidinfo"], ":linux": ["@crates//:procfs"], diff --git a/crates/galvanize/Cargo.toml b/crates/galvanize/Cargo.toml index b64bc83b9..13d461015 100644 --- a/crates/galvanize/Cargo.toml +++ b/crates/galvanize/Cargo.toml @@ -10,7 +10,8 @@ readme.workspace = true rust-version.workspace = true [dependencies] -nix = { version = "0.30.1", features = ["fs"] } +libc = "0.2" +nix = { version = "0.30.1", features = ["fs", "poll"] } [target.'cfg(target_os="linux")'.dependencies] procfs = "0.18.0" diff --git a/crates/galvanize/src/lib.rs b/crates/galvanize/src/lib.rs index d9b39c6de..e4b00b3ef 100644 --- a/crates/galvanize/src/lib.rs +++ b/crates/galvanize/src/lib.rs @@ -1,10 +1,88 @@ use std::fs::File; use std::io::{self, ErrorKind, Read}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use nix::sys::stat::Mode; use nix::unistd::mkfifo; +/// Returns `false` when the process does not exist (ESRCH) or is a zombie. +/// EPERM (process exists but we can't signal it) is treated as alive. +fn is_pid_alive(pid: u32) -> bool { + // SAFETY: kill(pid, 0) is the standard POSIX existence check. Signal 0 is + // never delivered; the call only validates the pid and our permission to + // signal it. + let rc = unsafe { libc::kill(pid as libc::pid_t, 0) }; + if rc != 0 { + return io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH); + } + // kill(pid, 0) succeeds for zombie processes: they still hold a PID slot + // until the parent calls waitpid, but they have already exited and will + // never create new files. Treat them as dead so callers don't spin forever. + !is_pid_zombie(pid) +} + +#[cfg(target_os = "linux")] +fn is_pid_zombie(pid: u32) -> bool { + use procfs::process::Process; + Process::new(pid as i32) + .and_then(|p| p.stat()) + .map(|s| s.state == 'Z') + .unwrap_or(false) +} + +#[cfg(target_os = "macos")] +fn is_pid_zombie(pid: u32) -> bool { + // proc_pidinfo(PROC_PIDTBSDINFO) fills proc_bsdinfo; pbi_status holds the + // process state where SZOMB == 5 per . + use std::mem; + unsafe { + let mut info: libc::proc_bsdinfo = mem::zeroed(); + let ret = libc::proc_pidinfo( + pid as libc::c_int, + libc::PROC_PIDTBSDINFO, + 0, + &mut info as *mut _ as *mut libc::c_void, + mem::size_of::() as libc::c_int, + ); + ret > 0 && info.pbi_status == 5 + } +} + +#[cfg(target_os = "linux")] +fn is_path_open_for_pid(path: &Path, pid: u32) -> io::Result { + use procfs::process::{FDTarget, Process}; + let proc = Process::new(pid as i32).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + for fd in proc.fd().map_err(|err| io::Error::other(err))? { + let fd = fd.map_err(|err| io::Error::other(err))?; + if let FDTarget::Path(fd_path) = &fd.target { + if fd_path == path { + return Ok(true); + } + } + } + Ok(false) +} + +#[cfg(target_os = "macos")] +fn is_path_open_for_pid(path: &Path, pid: u32) -> io::Result { + use proc_pidinfo::*; + let pid_val = Pid(pid); + for fd in proc_pidinfo_list::(pid_val)? { + match proc_pidfdinfo::(pid_val, fd.proc_fd)? { + Some(vnode) => match vnode.path() { + Ok(p) => { + if path == p { + return Ok(true); + } + } + _ => continue, + }, + None => continue, + } + } + Ok(false) +} + pub struct Pipe { path: PathBuf, inner: File, @@ -30,50 +108,13 @@ impl Pipe { }) } - #[cfg(target_os = "linux")] - fn is_path_open(&self, pid: u32) -> io::Result { - use procfs::process::{FDTarget, Process}; - let proc = Process::new(pid as i32).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - for fd in proc.fd().map_err(|err| io::Error::other(err))? { - let fd = fd.map_err(|err| io::Error::other(err))?; - if let FDTarget::Path(fd_path) = fd.target { - // Resolve the path exactly as the kernel reports it - if fd_path == self.path { - return Ok(true); - } - } - } - Ok(false) - } - - #[cfg(target_os = "macos")] - pub fn is_path_open(&self, pid: u32) -> io::Result { - use proc_pidinfo::*; - let pid = Pid(pid); - for fd in proc_pidinfo_list::(pid)? { - match proc_pidfdinfo::(pid, fd.proc_fd)? { - Some(vnode) => match vnode.path() { - Ok(p) if self.path == p => { - return Ok(true); - } - // ignore vnode entries without a path - _ => continue, - }, - None => continue, - } - } - Ok(false) - } - fn read_with_policy(&mut self, buf: &mut [u8]) -> io::Result { match self.policy { RetryPolicy::Never => self.inner.read(buf).map_err(|err| err.into()), RetryPolicy::IfOpenForPid(pid) => loop { match self.inner.read(buf) { Ok(nr) if nr == 0 => { - // it is okay to return an empty buffer if the FD is still open. - if self.is_path_open(pid)? { + if is_path_open_for_pid(&self.path, pid)? { return Ok(nr); } else { return Err(std::io::Error::new( @@ -82,10 +123,8 @@ impl Pipe { )); } } - // If EOF error was encountered and the path is still open by the PID - // then retry this stream. Err(err) if err.kind() == ErrorKind::UnexpectedEof => { - if self.is_path_open(pid)? { + if is_path_open_for_pid(&self.path, pid)? { continue; } else { return Err(std::io::Error::new( @@ -107,3 +146,54 @@ impl Read for Pipe { self.read_with_policy(buf) } } + +/// A regular file that streams its contents as the writer (identified by `pid`) appends to it. +/// +/// Busy-polls for file existence at open time, then reads with the same retry logic as +/// [`Pipe`] with [`RetryPolicy::IfOpenForPid`]: on EOF, checks whether the writer process +/// still has the file open. Returns `BrokenPipe` when the writer closes the file. +pub struct StreamingFile { + path: PathBuf, + inner: File, + pid: u32, +} + +impl StreamingFile { + /// Polls until `path` exists (10 ms sleep between checks), then opens it. + /// Returns `BrokenPipe` immediately if `pid` exits before the file appears. + /// Path is canonicalized after open for accurate fd matching. + pub fn open(path: PathBuf, pid: u32) -> io::Result { + while !path.exists() { + if !is_pid_alive(pid) { + return Err(io::Error::new( + ErrorKind::BrokenPipe, + "process exited before the file was created", + )); + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } + let inner = File::open(&path)?; + let path = path.canonicalize()?; + Ok(Self { path, inner, pid }) + } +} + +impl Read for StreamingFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self.inner.read(buf) { + // Ok(0): at the current end of the file. If the writer still has it open, + // return Ok(0) to signal "no data yet, try again later". If the writer + // has closed the file, the stream is done — signal BrokenPipe. + // Callers that cannot tolerate Ok(0) (e.g. a zstd Decoder) should wrap + // this in a blocking retry adapter. + Ok(0) => { + if is_path_open_for_pid(&self.path, self.pid)? { + Ok(0) + } else { + Err(std::io::Error::new(ErrorKind::BrokenPipe, "end of stream")) + } + } + other => other, + } + } +}