diff --git a/lava-api-mock/Cargo.toml b/lava-api-mock/Cargo.toml index 84c525a..085656b 100644 --- a/lava-api-mock/Cargo.toml +++ b/lava-api-mock/Cargo.toml @@ -12,9 +12,12 @@ readme = "../README.md" [dependencies] boulder = { version="0.3", features = ["persian-rug"] } chrono = { version = "0.4", features = ["serde"] } +clocks = "0.0.1" clone-replace = "0.1" django-query = { version="0.2", features = ["wiremock", "persian-rug", "clone-replace"] } futures = "0.3" +itertools = "0.10" +num = "0.4" persian-rug = { version = "0.1", features = ["clone-replace"] } rust_decimal = "1" rust_decimal_macros = "1" diff --git a/lava-api-mock/src/lib.rs b/lava-api-mock/src/lib.rs index c82e0a1..99bedd3 100644 --- a/lava-api-mock/src/lib.rs +++ b/lava-api-mock/src/lib.rs @@ -28,9 +28,9 @@ //! and [`BuildableWithPersianRug`](boulder::BuildableWithPersianRug) //! which are from the [`boulder`] crate. //! -//! # LavaMock +//! # Server //! -//! Most users will want to base their tests around [`LavaMock`], +//! Most users will want to base their tests around [`Server`], //! which is a [`django-query`](django_query) derived server, which //! provides all of the v0.2 query REST endpoints of a standard Lava //! server. See the documentation for details of its limitations. The @@ -42,14 +42,14 @@ //! Example: //! ```rust //! use futures::stream::TryStreamExt; -//! use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState}; +//! use lava_api_mock::{Server, PaginationLimits, PopulationParams, SharedState}; //! use lava_api::Lava; //! //! # tokio_test::block_on( async { //! // Make the mock server //! let limits = PaginationLimits::new(); //! let population = PopulationParams::new(); -//! let mock = LavaMock::new(SharedState::new_populated(population), limits).await; +//! let mock = Server::new(SharedState::new_populated(population), limits).await; //! //! // Make the Lava client for reading back data from the server //! let lava = Lava::new(&mock.uri(), None).expect("failed to make lava client"); @@ -69,7 +69,8 @@ mod devices; mod devicetypes; mod jobs; -mod lava_mock; +mod mock; +mod server; mod state; mod tags; mod testcases; @@ -80,7 +81,8 @@ pub use devices::{Device, Health as DeviceHealth, State as DeviceState}; pub use devicetypes::{Alias, Architecture, BitWidth, Core, DeviceType, ProcessorFamily}; pub use jobs::Job; pub use jobs::{Health as JobHealth, State as JobState}; -pub use lava_mock::{LavaMock, PaginationLimits}; +pub use mock::{create_mock, Mock as LavaMock}; +pub use server::{PaginationLimits, Server}; pub use state::{PopulationParams, SharedState, State}; pub use tags::Tag; pub use testcases::{Metadata, PassFail, TestCase, TestSet, TestSuite}; diff --git a/lava-api-mock/src/mock.rs b/lava-api-mock/src/mock.rs new file mode 100644 index 0000000..dc717bc --- /dev/null +++ b/lava-api-mock/src/mock.rs @@ -0,0 +1,774 @@ +use crate::{JobHealth, JobState, Server, SharedState, State}; + +use boulder::{ + GeneratableWithPersianRug, GeneratorWithPersianRug, RepeatFromPersianRug, + SubsetsFromPersianRug, TryRepeatFromPersianRug, +}; +use chrono::{DateTime, Utc}; +use clocks::Clock; +use clone_replace::MutateGuard; +use num::NumCast; +use persian_rug::{Accessor, Mutator, Proxy}; +use std::collections::BTreeMap; + +type Device = crate::Device; +type DeviceType = crate::DeviceType; +type Job = crate::Job; +type Tag = crate::Tag; +type Worker = crate::Worker; + +pub trait Generator { + type Output; + fn generate(&mut self, context: MutateGuard) -> (Self::Output, MutateGuard); +} + +impl Generator for T +where + T: boulder::GeneratorWithPersianRug, +{ + type Output = >::Output; + + fn generate(&mut self, context: MutateGuard) -> (Self::Output, MutateGuard) { + >::generate(self, context) + } +} + +impl Generator for Box + 'static> { + type Output = T; + fn generate(&mut self, context: MutateGuard) -> (Self::Output, MutateGuard) { + self.as_mut().generate(context) + } +} + +pub trait GeneratorExt: Generator { + fn take_n(&mut self, context: MutateGuard, count: usize) -> TakeNIterator { + TakeNIterator::new(self, context, count) + } +} + +impl GeneratorExt for T where T: Generator {} + +struct IdGenerator { + _type_marker: core::marker::PhantomData, + _out_marker: core::marker::PhantomData, +} + +impl IdGenerator { + pub fn new() -> Self { + Self { + _type_marker: Default::default(), + _out_marker: Default::default(), + } + } +} + +#[persian_rug::constraints(context=C, access(T))] +impl GeneratorWithPersianRug for IdGenerator +where + U: NumCast, +{ + type Output = U; + + fn generate<'b, B>(&mut self, context: B) -> (U, B) + where + B: 'b + Mutator, + { + (num::cast(context.get_iter::().count()).unwrap(), context) + } +} + +pub async fn create_mock(now: DateTime) -> (Mock, Clock) { + let clock = Clock::new_fake(now); + (Mock::new_with_clock(clock.clone()).await, clock) +} + +pub struct TakeNIterator<'a, G> +where + G: Generator + ?Sized, +{ + count: usize, + context: Option>, + generator: &'a mut G, +} + +impl<'a, G> TakeNIterator<'a, G> +where + G: Generator + ?Sized, +{ + pub fn new(generator: &'a mut G, context: MutateGuard, count: usize) -> Self { + Self { + count, + context: Some(context), + generator, + } + } + + #[allow(dead_code)] + pub fn into_inner(self) -> MutateGuard { + self.context.unwrap() + } +} + +impl<'a, G> Iterator for TakeNIterator<'a, G> +where + G: Generator + ?Sized, +{ + type Item = ::Output; + + fn next(&mut self) -> Option { + if self.count == 0usize { + None + } else { + let (result, context) = self.generator.generate(self.context.take().unwrap()); + self.context = Some(context); + self.count -= 1; + Some(result) + } + } +} + +/// A simplified API for using the mock crate +/// +/// This integrates four major pieces that you can construct +/// separately (and much more flexibly) to make many test cases easier +/// to write. +/// +/// - A [SharedState] that holds the actual data. +/// - A [Server] that exposes a REST API on some port. +/// - A [Clock] that you can synchronise to in test cases to +/// reproduce timing-critical issues. +/// - A set of [Generator] instances for producing new +/// jobs, tags, devices, device types and workers. +pub struct Mock { + state: SharedState, + server: Server, + clock: Clock, + + devices_lut: BTreeMap>, + device_types_lut: BTreeMap>, + jobs_lut: BTreeMap>, + tags_lut: BTreeMap>, + workers_lut: BTreeMap>, + + devices: Box>>, + device_types: Box>>, + jobs: Box>>, + tags: Box>>, + workers: Box>>, + + bulk_devices: Box>>, + bulk_device_types: Box>>, + bulk_jobs: Box>>, + bulk_tags: Box>>, + bulk_workers: Box>>, +} + +impl Mock { + /// Create a new mock + /// + /// The mock's clock will be a wall clock. + pub async fn new() -> Self { + Self::new_with_clock(Default::default()).await + } + + /// Create a new mock with the given clock + /// + /// The mock will use the clock given. + pub async fn new_with_clock(clock: Clock) -> Self { + let mut s = SharedState::new(); + let c = clock.clone(); + let c2 = clock.clone(); + + let mut g = Proxy::>::generator(); + for _ in g.take_n(s.mutate(), 10) {} + + Self { + state: s.clone(), + server: Server::new(s, Default::default()).await, + + clock, + + devices_lut: BTreeMap::new(), + device_types_lut: BTreeMap::new(), + jobs_lut: BTreeMap::new(), + tags_lut: BTreeMap::new(), + workers_lut: BTreeMap::new(), + + devices: Box::new( + Proxy::::generator() + .device_type(RepeatFromPersianRug::new()) + .physical_owner(TryRepeatFromPersianRug::new()) + .physical_group(TryRepeatFromPersianRug::new()) + .tags(SubsetsFromPersianRug::new()) + .health(|| crate::DeviceHealth::Good) + .state(|| crate::DeviceState::Idle) + .worker_host(RepeatFromPersianRug::new()), + ), + device_types: Box::new(Proxy::::generator()), + jobs: Box::new( + Proxy::::generator() + .id(IdGenerator::::new()) + .submitter(RepeatFromPersianRug::new()) + .viewing_groups(SubsetsFromPersianRug::new()) + .requested_device_type(TryRepeatFromPersianRug::new()) + .tags(SubsetsFromPersianRug::new()) + .submit_time(move || Some(c.now())) + .start_time(|| None) + .end_time(|| None) + .state(|| JobState::Submitted) + .health(|| JobHealth::Unknown) + .actual_device(|| None), + ), + tags: Box::new(Proxy::::generator().id(IdGenerator::::new())), + workers: Box::new(Proxy::::generator()), + + bulk_devices: Box::new(Proxy::::generator()), + bulk_device_types: Box::new(Proxy::::generator()), + bulk_jobs: Box::new( + Proxy::::generator() + .id(IdGenerator::::new()) + .submitter(RepeatFromPersianRug::new()) + .viewing_groups(SubsetsFromPersianRug::new()) + .requested_device_type(TryRepeatFromPersianRug::new()) + .tags(SubsetsFromPersianRug::new()) + .submit_time(move || Some(c2.now())) + .start_time(|| None) + .end_time(|| None) + .state(|| JobState::Submitted) + .health(|| JobHealth::Unknown) + .actual_device(|| None), + ), + bulk_tags: Box::new(Proxy::::generator().id(IdGenerator::::new())), + bulk_workers: Box::new(Proxy::::generator()), + } + } + + /// Get the URI for the mock server + pub fn uri(&self) -> String { + self.server.uri() + } + + /// Execute a function on the data pointed to by the given proxy + #[persian_rug::constraints(context = State, access(T))] + pub fn with_proxy(&self, proxy: &Proxy, f: F) -> R + where + F: FnOnce(&T) -> R, + { + f(self.state.access().get(proxy)) + } + + /// Execute a function on the data pointed to by the given proxy + #[persian_rug::constraints(context = State, access(T))] + pub fn with_option_proxy(&self, proxy: &Option>, f: F) -> R + where + F: FnOnce(Option<&T>) -> R, + { + let a = self.state.access(); + f(proxy.as_ref().map(|p| a.get(p))) + } + + /// Execute a function on the mutable data pointed to by the given + /// proxy + #[persian_rug::constraints(context = State, access(T))] + pub fn with_proxy_mut(&mut self, proxy: &Proxy, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + f(self.state.mutate().get_mut(proxy)) + } + + /// Execute a function on the mutable data pointed to by the given + /// proxy + #[persian_rug::constraints(context = State, access(T))] + pub fn with_option_proxy_mut(&mut self, proxy: &Option>, f: F) -> R + where + F: FnOnce(Option<&mut T>) -> R, + { + let mut m = self.state.mutate(); + f(proxy.as_ref().map(|p| m.get_mut(p))) + } + + /// Get an [Accessor] for the [State] the mock holds + /// + /// This permits you to access an unchanging, read-only view of + /// the data which the mock is currently serving. + pub fn accessor(&self) -> impl Accessor { + self.state.access() + } + + /// Get an [Mutator] for the [State] the mock holds + /// + /// This permits you to mutate a writable copy of the data the + /// mock holds. Note that modifications will only become visible + /// when the mutator is dropped. + pub fn mutator(&mut self) -> impl Mutator { + self.state.mutate() + } + + /// Execute a function on the device with the given hostname + pub fn with_device(&self, hostname: H, f: F) -> Option + where + H: AsRef, + F: FnOnce(&Device) -> T, + { + self.devices_lut + .get(hostname.as_ref()) + .map(|d| f(self.state.access().get(d))) + } + + /// Execute a function on the mutable device with the given hostname + pub fn with_device_mut(&mut self, hostname: H, f: F) -> Option + where + H: AsRef, + F: FnOnce(&mut Device) -> T, + { + self.devices_lut + .get(hostname.as_ref()) + .map(|d| f(self.state.mutate().get_mut(d))) + } + + /// Get the [Proxy] for the device with the given hostname + pub fn get_device_proxy(&self, hostname: H) -> Option> + where + H: AsRef, + { + self.devices_lut.get(hostname.as_ref()).copied() + } + + /// Execute a function on each device + pub fn with_devices(&self, mut f: F) + where + F: FnMut(&Device), + { + for d in self.state.access().get_iter() { + f(d) + } + } + + /// Execute a function on each mutable device + pub fn with_devices_mut(&mut self, mut f: F) + where + F: FnMut(&mut Device), + { + for d in self.state.mutate().get_iter_mut() { + f(d) + } + } + + /// Execute a function on the device type with the given name + pub fn with_device_type(&self, name: N, f: F) -> Option + where + N: AsRef, + F: FnOnce(&DeviceType) -> T, + { + self.device_types_lut + .get(name.as_ref()) + .map(|dt| f(self.state.access().get(dt))) + } + + /// Execute a function on the mutable device type with the given name + pub fn with_device_type_mut(&mut self, name: N, f: F) -> Option + where + N: AsRef, + F: FnOnce(&mut DeviceType) -> T, + { + self.device_types_lut + .get(name.as_ref()) + .map(|dt| f(self.state.mutate().get_mut(dt))) + } + + /// Get the [Proxy] for the device type with the given name + pub fn get_device_type_proxy(&self, name: N) -> Option> + where + N: AsRef, + { + self.device_types_lut.get(name.as_ref()).copied() + } + + /// Execute a function on each device + pub fn with_device_types(&self, mut f: F) + where + F: FnMut(&DeviceType), + { + for d in self.state.access().get_iter() { + f(d) + } + } + + /// Execute a function on each mutable device + pub fn with_device_types_mut(&mut self, mut f: F) + where + F: FnMut(&mut DeviceType), + { + for d in self.state.mutate().get_iter_mut() { + f(d) + } + } + + /// Execute a function on the job with the given id + pub fn with_job(&self, id: i64, f: F) -> Option + where + F: FnOnce(&Job) -> T, + { + self.jobs_lut + .get(&id) + .map(|j| f(self.state.access().get(j))) + } + + /// Execute a function on the mutable job with the given id + pub fn with_job_mut(&mut self, id: i64, f: F) -> Option + where + F: FnOnce(&mut Job) -> T, + { + self.jobs_lut + .get(&id) + .map(|j| f(self.state.mutate().get_mut(j))) + } + + /// Get the [Proxy] for the job with the given id + pub fn get_job_proxy(&self, job: i64) -> Option> { + self.jobs_lut.get(&job).copied() + } + + /// Execute a function on each job + pub fn with_jobs(&self, mut f: F) + where + F: FnMut(&Job), + { + for d in self.state.access().get_iter() { + f(d) + } + } + + /// Execute a function on each mutable job + pub fn with_jobs_mut(&mut self, mut f: F) + where + F: FnMut(&mut Job), + { + for d in self.state.mutate().get_iter_mut() { + f(d) + } + } + + /// Execute a function on the tag with the given name + pub fn with_tag(&self, tag: &str, f: F) -> Option + where + F: FnOnce(&Tag) -> T, + { + self.tags_lut + .get(tag) + .map(|t| f(self.state.access().get(t))) + } + + /// Execute a function on the mutable tag with the given name + pub fn with_tag_mut(&mut self, tag: &str, f: F) -> Option + where + F: FnOnce(&mut Tag) -> T, + { + self.tags_lut + .get(tag) + .map(|t| f(self.state.mutate().get_mut(t))) + } + + /// Get the [Proxy] for the tag with the given id + pub fn get_tag_proxy(&self, tag: &str) -> Option> { + self.tags_lut.get(tag).copied() + } + + /// Execute a function on every tag + pub fn with_tags(&self, mut f: F) + where + F: FnMut(&Tag), + { + for d in self.state.access().get_iter() { + f(d) + } + } + + /// Execute a function on every mutable tag + pub fn with_tags_mut(&mut self, mut f: F) + where + F: FnMut(&mut Tag), + { + for d in self.state.mutate().get_iter_mut() { + f(d) + } + } + + /// Execute a function on the worker with the given name + pub fn with_worker(&self, worker: &str, f: F) -> Option + where + F: FnOnce(&Worker) -> T, + { + self.workers_lut + .get(worker) + .map(|t| f(self.state.access().get(t))) + } + + /// Execute a function on the mutable worker with the given name + pub fn with_worker_mut(&mut self, worker: &str, f: F) -> Option + where + F: FnOnce(&mut Worker) -> T, + { + self.workers_lut + .get(worker) + .map(|t| f(self.state.mutate().get_mut(t))) + } + + /// Get the [Proxy] for the worker with the given name + pub fn get_worker_proxy(&self, worker: &str) -> Option> { + self.workers_lut.get(worker).copied() + } + + /// Execute a function on every worker + pub fn with_workers(&self, mut f: F) + where + F: FnMut(&Worker), + { + for d in self.state.access().get_iter() { + f(d) + } + } + + /// Execute a function on every mutable worker + pub fn with_workers_mut(&mut self, mut f: F) + where + F: FnMut(&mut Worker), + { + for d in self.state.mutate().get_iter_mut() { + f(d) + } + } + + pub fn add_device( + &mut self, + hostname: H, + device_type: D, + tags: T, + ) -> Option + where + D: AsRef, + H: ToString, + T: AsRef<[T1]>, + T1: AsRef, + { + let device_type = self.device_types_lut.get(device_type.as_ref())?; + + let tags = tags + .as_ref() + .iter() + .filter_map(|t| self.tags_lut.get(t.as_ref())) + .copied() + .collect(); + + let dev = { + let m = self.state.mutate(); + + let (dev, mut m) = self.devices.generate(m); + + let device = m.get_mut(&dev); + device.hostname = hostname.to_string(); + device.tags = tags; + device.device_type = *device_type; + + dev + }; + + self.devices_lut.insert(hostname.to_string(), dev); + Some(self.state.access().get(&dev).hostname.clone()) + } + + pub fn add_device_type(&mut self, name: D) -> String + where + D: ToString, + { + let dt = { + let m = self.state.mutate(); + let (dt, mut m) = self.device_types.generate(m); + + let device_type = m.get_mut(&dt); + device_type.name = name.to_string(); + dt + }; + + self.device_types_lut.insert(name.to_string(), dt); + self.state.access().get(&dt).name.to_string() + } + + pub fn add_job(&mut self, requested_device_type: Option, tags: T) -> i64 + where + D: AsRef, + T: AsRef<[T1]>, + T1: AsRef, + { + let j = { + let m = self.state.mutate(); + + let (j, mut m) = self.jobs.generate(m); + + let job = m.get_mut(&j); + job.requested_device_type = requested_device_type + .as_ref() + .and_then(|dt| self.device_types_lut.get(dt.as_ref())) + .copied(); + job.submit_time = Some(self.clock.now()); + job.tags = tags + .as_ref() + .iter() + .filter_map(|t| self.tags_lut.get(t.as_ref())) + .copied() + .collect(); + + j + }; + + let id = self.state.access().get(&j).id; + self.jobs_lut.insert(id, j); + id + } + + pub fn schedule_job(&mut self, job: i64, device: &str) { + let mut m = self.state.mutate(); + let j = m.get_mut(self.jobs_lut.get(&job).expect("invalid job id")); + let d = self.devices_lut.get(device); + j.actual_device = d.copied(); + j.state = JobState::Scheduled; + } + + pub fn start_job(&mut self, job: i64) { + let mut m = self.state.mutate(); + let j = m.get_mut(self.jobs_lut.get(&job).expect("invalid job id")); + j.state = JobState::Running; + j.start_time = Some(self.clock.now()); + } + + pub fn end_job(&mut self, job: i64, health: JobHealth) { + let mut m = self.state.mutate(); + let j = m.get_mut(self.jobs_lut.get(&job).expect("invalid job id")); + j.state = JobState::Finished; + j.end_time = Some(self.clock.now()); + j.health = health; + } + + pub fn add_tag(&mut self, name: N) -> String + where + N: ToString, + { + let tag = { + let m = self.state.mutate(); + let (t, mut m) = self.tags.generate(m); + + let tag = m.get_mut(&t); + tag.name = name.to_string(); + + t + }; + + self.tags_lut.insert(name.to_string(), tag); + + name.to_string() + } + + pub fn add_worker(&mut self, hostname: H) -> String + where + H: ToString, + { + let w = { + let m = self.state.mutate(); + let (w, mut m) = self.workers.generate(m); + + let worker = m.get_mut(&w); + worker.hostname = hostname.to_string(); + + w + }; + + self.workers_lut.insert(hostname.to_string(), w); + + self.state.access().get(&w).hostname.to_string() + } + + //// Add bulk devices + pub fn generate_devices(&mut self, count: usize) -> Vec { + let mut devices = Vec::new(); + for v in self.bulk_devices.take_n(self.state.mutate(), count) { + devices.push(v); + } + + let a = self.state.access(); + devices + .into_iter() + .map(|d| { + let hostname = &a.get(&d).hostname; + self.devices_lut.insert(hostname.clone(), d); + hostname.clone() + }) + .collect() + } + + pub fn generate_device_types(&mut self, count: usize) -> Vec { + let mut device_types = Vec::new(); + for v in self.bulk_device_types.take_n(self.state.mutate(), count) { + device_types.push(v); + } + + let a = self.state.access(); + device_types + .into_iter() + .map(|dt| { + let name = &a.get(&dt).name; + self.device_types_lut.insert(name.clone(), dt); + name.clone() + }) + .collect() + } + + pub fn generate_jobs(&mut self, count: usize) -> Vec { + let mut jobs = Vec::new(); + for v in self.bulk_jobs.take_n(self.state.mutate(), count) { + jobs.push(v); + } + + let a = self.state.access(); + jobs.into_iter() + .map(|j| { + let id = a.get(&j).id; + self.jobs_lut.insert(id, j); + id + }) + .collect() + } + + pub fn generate_tags(&mut self, count: usize) -> Vec { + let mut tags = Vec::new(); + for v in self.bulk_tags.take_n(self.state.mutate(), count) { + tags.push(v); + } + + let a = self.state.access(); + tags.into_iter() + .map(|t| { + let name = &a.get(&t).name; + self.tags_lut.insert(name.clone(), t); + name.clone() + }) + .collect() + } + + pub fn generate_workers(&mut self, count: usize) -> Vec { + let mut workers = Vec::new(); + for v in self.bulk_workers.take_n(self.state.mutate(), count) { + workers.push(v); + } + + let a = self.state.access(); + workers + .into_iter() + .map(|w| { + let hostname = &a.get(&w).hostname; + self.workers_lut.insert(hostname.clone(), w); + hostname.clone() + }) + .collect() + } +} diff --git a/lava-api-mock/src/lava_mock.rs b/lava-api-mock/src/server.rs similarity index 95% rename from lava-api-mock/src/lava_mock.rs rename to lava-api-mock/src/server.rs index f03ffea..dfa6462 100644 --- a/lava-api-mock/src/lava_mock.rs +++ b/lava-api-mock/src/server.rs @@ -6,7 +6,7 @@ use clone_replace::MutateGuard; use django_query::mock::{nested_endpoint_matches, NestedEndpointParams}; use std::sync::Arc; -/// Pagination limits for constructing a [`LavaMock`] instance. +/// Pagination limits for constructing a [`Server`] instance. /// /// A running Lava instance allows the default pagination of endpoints /// to be customised, and specifying default pagination can be @@ -53,7 +53,7 @@ impl PaginationLimits { /// - `/api/v0.2/jobs//tests/` /// - `/api/v0.2/jobs//suites/` /// -/// You can use [`uri`](LavaMock::uri) to find the initial portion +/// You can use [`uri`](Server::uri) to find the initial portion /// of the URL for your test instance. /// /// The mock object does not support the Lava mutation endpoints, but @@ -61,21 +61,21 @@ impl PaginationLimits { /// There are two ways to do this: /// - You can keep a clone of the [`SharedState`] you pass in and obtain /// a [`MutateGuard`] with [`mutate`](SharedState::mutate). -/// - You can call [`state_mut`](LavaMock::state_mut) to get a [`MutateGuard`] +/// - You can call [`state_mut`](Server::state_mut) to get a [`MutateGuard`] /// for the enclosed [`SharedState`] directly. -pub struct LavaMock { +pub struct Server { server: wiremock::MockServer, state: SharedState, } -impl LavaMock { - /// Create and start a new [`LavaMock`] +impl Server { + /// Create and start a new [`Server`] /// /// Here `p` is the [`SharedState`] becomes the underlying data /// source for the mock, and `limits` are the default pagination /// limits as a [`PaginationLimits`] object, which are applied /// when the client does not give any. - pub async fn new(p: SharedState, limits: PaginationLimits) -> LavaMock { + pub async fn new(p: SharedState, limits: PaginationLimits) -> Server { let s = wiremock::MockServer::start().await; wiremock::Mock::given(wiremock::matchers::method("GET")) @@ -146,13 +146,13 @@ impl LavaMock { .mount(&s) .await; - LavaMock { + Server { server: s, state: p, } } - /// Create and start a default new [`LavaMock`]. + /// Create and start a default new [`Server`]. /// /// This mock will have a default [`SharedState`] and default /// [`PaginationLimits`]. This gives a mock object with an empty @@ -266,7 +266,7 @@ mod test { .take(500) .collect::>(); - let mock = LavaMock::new(s, Default::default()).await; + let mock = Server::new(s, Default::default()).await; let devices = make_request(mock.uri(), "devices/") .await diff --git a/lava-api/Cargo.toml b/lava-api/Cargo.toml index 9bfaa09..ec5bb0f 100644 --- a/lava-api/Cargo.toml +++ b/lava-api/Cargo.toml @@ -12,8 +12,10 @@ readme = "../README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -chrono = { version = "0.4", features = ["serde"] } +chrono = { version = "0.4.20", features = ["serde"] } futures = "0.3" +lava-api-mock = { path = "../lava-api-mock" } +persian-rug = "0.1" serde = { version = "^1.0.97", features = ["derive"] } serde_json = "^1" serde_with = "2.0" @@ -34,6 +36,5 @@ lava-api-mock = { path = "../lava-api-mock" } wiremock = "0.5" django-query = "0.2" boulder = "0.3" -persian-rug = "0.1" test-log = "0.2" tokio-test = "0.4" diff --git a/lava-api/src/device.rs b/lava-api/src/device.rs index 7ce8376..1ed87c6 100644 --- a/lava-api/src/device.rs +++ b/lava-api/src/device.rs @@ -5,6 +5,7 @@ use futures::FutureExt; use futures::{stream, stream::Stream, stream::StreamExt}; use serde::Deserialize; use serde_with::DeserializeFromStr; +use std::convert::{Infallible, TryFrom, TryInto}; use std::pin::Pin; use std::task::{Context, Poll}; use strum::{Display, EnumString}; @@ -132,60 +133,58 @@ impl<'a> Stream for Devices<'a> { } } +impl TryFrom for Health { + type Error = Infallible; + fn try_from(dev: lava_api_mock::DeviceHealth) -> Result { + use Health::*; + match dev { + lava_api_mock::DeviceHealth::Unknown => Ok(Unknown), + lava_api_mock::DeviceHealth::Maintenance => Ok(Maintenance), + lava_api_mock::DeviceHealth::Good => Ok(Good), + lava_api_mock::DeviceHealth::Bad => Ok(Bad), + lava_api_mock::DeviceHealth::Looping => Ok(Looping), + lava_api_mock::DeviceHealth::Retired => Ok(Retired), + } + } +} + +impl Device { + #[persian_rug::constraints(context = C, access(lava_api_mock::Tag, lava_api_mock::DeviceType, lava_api_mock::Worker))] + pub fn from_mock<'b, B, C>(dev: &lava_api_mock::Device, context: B) -> Device + where + B: 'b + persian_rug::Accessor, + C: persian_rug::Context + 'static, + { + Self { + hostname: dev.hostname.clone(), + worker_host: context.get(&dev.worker_host).hostname.clone(), + device_type: context.get(&dev.device_type).name.clone(), + description: dev.description.clone(), + health: dev.health.clone().try_into().unwrap(), + tags: dev + .tags + .iter() + .map(|t| Tag::from_mock(context.get(t), context.clone())) + .collect::>(), + } + } +} + #[cfg(test)] mod tests { - use super::{Device, Health, Tag}; use crate::Lava; use boulder::{Buildable, Builder}; + use chrono::Utc; use futures::TryStreamExt; use lava_api_mock::{ - Device as MockDevice, DeviceHealth as MockDeviceHealth, DeviceType as MockDeviceType, - LavaMock, PaginationLimits, PopulationParams, SharedState, State, Tag as MockTag, - Worker as MockWorker, + create_mock, PaginationLimits, PopulationParams, Server, SharedState, State, }; - use persian_rug::{Accessor, Context}; - use std::collections::BTreeMap; - use std::convert::{Infallible, TryFrom, TryInto}; + use persian_rug::Accessor; + use std::collections::{BTreeMap, BTreeSet}; + use std::iter::FromIterator; use test_log::test; - impl TryFrom for Health { - type Error = Infallible; - fn try_from(dev: MockDeviceHealth) -> Result { - use Health::*; - match dev { - MockDeviceHealth::Unknown => Ok(Unknown), - MockDeviceHealth::Maintenance => Ok(Maintenance), - MockDeviceHealth::Good => Ok(Good), - MockDeviceHealth::Bad => Ok(Bad), - MockDeviceHealth::Looping => Ok(Looping), - MockDeviceHealth::Retired => Ok(Retired), - } - } - } - - impl Device { - #[persian_rug::constraints(context = C, access(MockTag, MockDeviceType, MockWorker))] - pub fn from_mock<'b, B, C>(dev: &MockDevice, context: B) -> Device - where - B: 'b + Accessor, - C: Context + 'static, - { - Self { - hostname: dev.hostname.clone(), - worker_host: context.get(&dev.worker_host).hostname.clone(), - device_type: context.get(&dev.device_type).name.clone(), - description: dev.description.clone(), - health: dev.health.clone().try_into().unwrap(), - tags: dev - .tags - .iter() - .map(|t| Tag::from_mock(context.get(t), context.clone())) - .collect::>(), - } - } - } - /// Stream 50 devices with a page limit of 5 from the server /// checking that we correctly reconstruct their tags and that /// they are all accounted for (that pagination is handled @@ -194,7 +193,7 @@ mod tests { async fn test_basic() { let state = SharedState::new_populated(PopulationParams::builder().devices(50usize).build()); - let server = LavaMock::new( + let server = Server::new( state.clone(), PaginationLimits::builder().devices(Some(5)).build(), ) @@ -202,6 +201,7 @@ mod tests { let mut map = BTreeMap::new(); let start = state.access(); + for device in start.get_iter::>() { map.insert(device.hostname.clone(), device); } @@ -235,4 +235,45 @@ mod tests { } assert_eq!(seen.len(), 50); } + + #[test(tokio::test)] + async fn test_basic_mock() { + let (mut p, _clock) = create_mock(Utc::now()).await; + + let devices = BTreeSet::from_iter(p.generate_devices(50).into_iter()); + + let lava = Lava::new(&p.uri(), None).expect("failed to make lava server"); + + let mut ld = lava.devices(); + + let mut seen = BTreeSet::new(); + while let Some(device) = ld.try_next().await.expect("failed to get device") { + assert!(!seen.contains(&device.hostname)); + assert!(devices.contains(&device.hostname)); + + p.with_device(&device.hostname, |dev| { + assert_eq!(device.hostname, dev.hostname); + p.with_proxy(&dev.worker_host, |h| { + assert_eq!(device.worker_host, h.hostname); + }); + p.with_proxy(&dev.device_type, |dt| { + assert_eq!(device.device_type, dt.name); + }); + assert_eq!(device.description, dev.description); + assert_eq!(device.health.to_string(), dev.health.to_string()); + + assert_eq!(device.tags.len(), dev.tags.len()); + for i in 0..device.tags.len() { + p.with_proxy(&dev.tags[i], |t| { + assert_eq!(device.tags[i].id, t.id); + assert_eq!(device.tags[i].name, t.name); + assert_eq!(device.tags[i].description, t.description); + }); + } + }); + + seen.insert(device.hostname.clone()); + } + assert_eq!(seen.len(), 50); + } } diff --git a/lava-api/src/job.rs b/lava-api/src/job.rs index 2e54e6d..aa3e2c1 100644 --- a/lava-api/src/job.rs +++ b/lava-api/src/job.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use serde_with::DeserializeFromStr; +use std::convert::{Infallible, TryFrom, TryInto}; use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; @@ -174,13 +175,13 @@ impl<'a> Jobs<'a> { /// Example: /// ```rust /// use futures::stream::TryStreamExt; -/// # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState}; +/// # use lava_api_mock::{Server, PaginationLimits, PopulationParams, SharedState}; /// use lava_api::{Lava, job::State, job::Ordering}; /// # /// # tokio_test::block_on( async { /// # let limits = PaginationLimits::new(); /// # let population = PopulationParams::new(); -/// # let mock = LavaMock::new(SharedState::new_populated(population), limits).await; +/// # let mock = Server::new(SharedState::new_populated(population), limits).await; /// # let service_uri = mock.uri(); /// # let lava_token = None; /// @@ -538,9 +539,96 @@ pub async fn cancel_job(lava: &Lava, id: i64) -> Result<(), CancellationError> { } } +impl Job { + #[persian_rug::constraints( + context = C, + access( + lava_api_mock::User, + lava_api_mock::Group, + lava_api_mock::Tag, + lava_api_mock::Device, + lava_api_mock::DeviceType + ) + )] + pub fn from_mock<'b, B, C>(job: &lava_api_mock::Job, context: B) -> Job + where + B: 'b + persian_rug::Accessor, + C: persian_rug::Context + 'static, + { + Self { + id: job.id, + submitter: context.get(&job.submitter).username.clone(), + viewing_groups: job + .viewing_groups + .iter() + .map(|g| context.get(g).id) + .collect::>(), + description: job.description.clone(), + health_check: job.health_check, + requested_device_type: job + .requested_device_type + .map(|d| context.get(&d).name.to_string()), + tags: job + .tags + .iter() + .map(|t| Tag::from_mock(context.get(t), context.clone())) + .collect::>(), + actual_device: job + .actual_device + .as_ref() + .map(|d| context.get(d).hostname.to_string()), + submit_time: job.submit_time.unwrap(), + start_time: job.start_time, + end_time: job.end_time, + state: job.state.try_into().unwrap(), + health: job.health.try_into().unwrap(), + priority: job.priority, + definition: job.definition.clone(), + original_definition: job.original_definition.clone(), + multinode_definition: job.multinode_definition.clone(), + failure_tags: job + .failure_tags + .iter() + .map(|t| Tag::from_mock(context.get(t), context.clone())) + .collect::>(), + failure_comment: job.failure_comment.clone(), + } + } +} + +impl TryFrom for State { + type Error = Infallible; + fn try_from(state: lava_api_mock::JobState) -> Result { + use State::*; + + match state { + lava_api_mock::JobState::Submitted => Ok(Submitted), + lava_api_mock::JobState::Scheduling => Ok(Scheduling), + lava_api_mock::JobState::Scheduled => Ok(Scheduled), + lava_api_mock::JobState::Running => Ok(Running), + lava_api_mock::JobState::Canceling => Ok(Canceling), + lava_api_mock::JobState::Finished => Ok(Finished), + } + } +} + +impl TryFrom for Health { + type Error = Infallible; + fn try_from(health: lava_api_mock::JobHealth) -> Result { + use Health::*; + + match health { + lava_api_mock::JobHealth::Unknown => Ok(Unknown), + lava_api_mock::JobHealth::Complete => Ok(Complete), + lava_api_mock::JobHealth::Incomplete => Ok(Incomplete), + lava_api_mock::JobHealth::Canceled => Ok(Canceled), + } + } +} + #[cfg(test)] mod tests { - use super::{Health, Job, Ordering, State, Tag}; + use super::{Health, Ordering, State}; use crate::Lava; use boulder::{ @@ -549,104 +637,18 @@ mod tests { }; use chrono::{DateTime, Duration, Utc}; use futures::TryStreamExt; + use lava_api_mock::{ - Device as MockDevice, DeviceType as MockDeviceType, Group as MockGroup, Job as MockJob, - JobHealth as MockJobHealth, JobState as MockJobState, LavaMock, PaginationLimits, - PopulationParams, SharedState, Tag as MockTag, User as MockUser, + create_mock, JobHealth as MockJobHealth, JobState as MockJobState, PaginationLimits, + PopulationParams, Server, SharedState, }; - use persian_rug::{Accessor, Context, Proxy}; - use std::collections::BTreeMap; - use std::convert::{Infallible, TryFrom, TryInto}; + + use persian_rug::{Accessor, Proxy}; + use std::collections::{BTreeMap, BTreeSet}; + use std::iter::FromIterator; use std::str::FromStr; use test_log::test; - impl Job { - #[persian_rug::constraints( - context = C, - access( - MockUser, - MockGroup, - MockTag, - MockDevice, - MockDeviceType - ) - )] - pub fn from_mock<'b, B, C>(job: &MockJob, context: B) -> Job - where - B: 'b + Accessor, - C: Context + 'static, - { - Self { - id: job.id, - submitter: context.get(&job.submitter).username.clone(), - viewing_groups: job - .viewing_groups - .iter() - .map(|g| context.get(g).id) - .collect::>(), - description: job.description.clone(), - health_check: job.health_check, - requested_device_type: job - .requested_device_type - .map(|d| context.get(&d).name.to_string()), - tags: job - .tags - .iter() - .map(|t| Tag::from_mock(context.get(t), context.clone())) - .collect::>(), - actual_device: job - .actual_device - .as_ref() - .map(|d| context.get(d).hostname.to_string()), - submit_time: job.submit_time.unwrap(), - start_time: job.start_time, - end_time: job.end_time, - state: job.state.try_into().unwrap(), - health: job.health.try_into().unwrap(), - priority: job.priority, - definition: job.definition.clone(), - original_definition: job.original_definition.clone(), - multinode_definition: job.multinode_definition.clone(), - failure_tags: job - .failure_tags - .iter() - .map(|t| Tag::from_mock(context.get(t), context.clone())) - .collect::>(), - failure_comment: job.failure_comment.clone(), - } - } - } - - impl TryFrom for State { - type Error = Infallible; - fn try_from(state: MockJobState) -> Result { - use State::*; - - match state { - MockJobState::Submitted => Ok(Submitted), - MockJobState::Scheduling => Ok(Scheduling), - MockJobState::Scheduled => Ok(Scheduled), - MockJobState::Running => Ok(Running), - MockJobState::Canceling => Ok(Canceling), - MockJobState::Finished => Ok(Finished), - } - } - } - - impl TryFrom for Health { - type Error = Infallible; - fn try_from(health: MockJobHealth) -> Result { - use Health::*; - - match health { - MockJobHealth::Unknown => Ok(Unknown), - MockJobHealth::Complete => Ok(Complete), - MockJobHealth::Incomplete => Ok(Incomplete), - MockJobHealth::Canceled => Ok(Canceled), - } - } - } - #[test] fn test_display() { assert_eq!(State::Submitted.to_string(), "Submitted"); @@ -685,14 +687,10 @@ mod tests { ); } - /// Stream 50 jobs with a page limit of 7 from the server - /// checking that we correctly reconstruct their tags and that - /// they are all accounted for (that pagination is handled - /// properly) #[test(tokio::test)] async fn test_basic() { let state = SharedState::new_populated(PopulationParams::builder().jobs(50usize).build()); - let server = LavaMock::new( + let server = Server::new( state.clone(), PaginationLimits::builder().jobs(Some(7)).build(), ) @@ -747,7 +745,6 @@ mod tests { assert_eq!(job.definition, jj.definition); assert_eq!(job.original_definition, jj.original_definition); assert_eq!(job.multinode_definition, jj.multinode_definition); - assert_eq!(job.failure_tags.len(), jj.failure_tags.len()); for i in 0..job.failure_tags.len() { assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id); @@ -759,13 +756,96 @@ mod tests { assert_eq!(seen.len(), 50); } + /* Create a datetime that is accurately represented with a fractional + part only of microseconds; remove any nanoseconds + */ + fn microsecond_accurate(dt: chrono::DateTime) -> chrono::DateTime { + dt - Duration::nanoseconds(dt.timestamp_nanos() - dt.timestamp_micros() * 1000) + } + + /// Stream 50 jobs with a page limit of 7 from the server + /// checking that we correctly reconstruct their tags and that + /// they are all accounted for (that pagination is handled + /// properly) + #[test(tokio::test)] + async fn test_basic_mock() { + let (mut p, _) = create_mock(microsecond_accurate(Utc::now())).await; + + let jobs = BTreeSet::from_iter(p.generate_jobs(50).into_iter()); + + let lava = Lava::new(&p.uri(), None).expect("failed to make lava server"); + + let mut lj = lava.jobs().query(); + + let mut seen = BTreeSet::new(); + while let Some(job) = lj.try_next().await.expect("failed to get job") { + assert!(!seen.contains(&job.id)); + assert!(jobs.contains(&job.id)); + + p.with_job(job.id, |jj| { + assert_eq!(job.description, jj.description); + + p.with_proxy(&jj.submitter, |u| { + assert_eq!(job.submitter, u.username); + }); + assert_eq!(job.viewing_groups.len(), jj.viewing_groups.len()); + + for i in 0..job.viewing_groups.len() { + p.with_proxy(&jj.viewing_groups[i], |g| { + assert_eq!(job.viewing_groups[i], g.id); + }); + } + + assert_eq!(job.health_check, jj.health_check); + + p.with_option_proxy(&jj.requested_device_type, |dt| { + assert_eq!(job.requested_device_type.as_ref(), dt.map(|dt| &dt.name)); + }); + + assert_eq!(job.tags.len(), jj.tags.len()); + for i in 0..job.tags.len() { + p.with_proxy(&jj.tags[i], |t| { + assert_eq!(job.tags[i].id, t.id); + assert_eq!(job.tags[i].name, t.name); + assert_eq!(job.tags[i].description, t.description); + }); + } + + p.with_option_proxy(&jj.actual_device, |d| { + assert_eq!(job.actual_device.as_ref(), d.map(|h| &h.hostname)); + }); + + assert_eq!(Some(job.submit_time), jj.submit_time); + assert_eq!(job.start_time, jj.start_time); + assert_eq!(job.end_time, jj.end_time); + assert_eq!(job.state.to_string(), jj.state.to_string()); + assert_eq!(job.health.to_string(), jj.health.to_string()); + assert_eq!(job.priority, jj.priority); + assert_eq!(job.definition, jj.definition); + assert_eq!(job.original_definition, jj.original_definition); + assert_eq!(job.multinode_definition, jj.multinode_definition); + + assert_eq!(job.failure_tags.len(), jj.failure_tags.len()); + for i in 0..job.failure_tags.len() { + p.with_proxy(&jj.viewing_groups[i], |g| { + assert_eq!(job.viewing_groups[i], g.id); + }) + } + assert_eq!(job.failure_comment, jj.failure_comment); + }); + + seen.insert(job.id); + } + assert_eq!(seen.len(), 50); + } + /// Stream 50 jobs with a page limit of 7 from the server /// checking that we correctly reconstruct their tags and that /// they are all accounted for (that pagination is handled /// properly) #[test(tokio::test)] async fn test_jobs_builder() { - let mut server = lava_api_mock::LavaMock::new( + let mut server = lava_api_mock::Server::new( SharedState::new_populated( PopulationParams::builder() .tags(5usize) diff --git a/lava-api/src/lib.rs b/lava-api/src/lib.rs index c09ded1..a47919a 100644 --- a/lava-api/src/lib.rs +++ b/lava-api/src/lib.rs @@ -20,13 +20,13 @@ //! Example: //! ```rust //! use futures::stream::TryStreamExt; -//! # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState}; +//! # use lava_api_mock::{Server, PaginationLimits, PopulationParams, SharedState}; //! use lava_api::Lava; //! # //! # tokio_test::block_on( async { //! # let limits = PaginationLimits::new(); //! # let population = PopulationParams::new(); -//! # let mock = LavaMock::new(SharedState::new_populated(population), limits).await; +//! # let mock = Server::new(SharedState::new_populated(population), limits).await; //! # let service_uri = mock.uri(); //! # let lava_token = None; //! diff --git a/lava-api/src/tag.rs b/lava-api/src/tag.rs index 1ee475e..dbe9e37 100644 --- a/lava-api/src/tag.rs +++ b/lava-api/src/tag.rs @@ -13,38 +13,39 @@ pub struct Tag { pub description: Option, } +impl Tag { + pub fn from_mock<'b, B, C>(tag: &lava_api_mock::Tag, _context: B) -> Tag + where + B: 'b + persian_rug::Accessor, + C: persian_rug::Context + 'static, + { + Self { + id: tag.id, + name: tag.name.clone(), + description: tag.description.clone(), + } + } +} + #[cfg(test)] mod tests { - use super::Tag; use crate::Lava; use boulder::{Buildable, Builder}; + use chrono::Utc; use lava_api_mock::{ - LavaMock, PaginationLimits, PopulationParams, SharedState, State, Tag as MockTag, + create_mock, PaginationLimits, PopulationParams, Server, SharedState, State, Tag as MockTag, }; - use persian_rug::{Accessor, Context}; - use std::collections::BTreeMap; + use persian_rug::Accessor; + use std::collections::{BTreeMap, BTreeSet}; + use std::iter::FromIterator; use test_log::test; - impl Tag { - pub fn from_mock<'b, B, C>(tag: &MockTag, _context: B) -> Tag - where - B: 'b + Accessor, - C: Context + 'static, - { - Self { - id: tag.id, - name: tag.name.clone(), - description: tag.description.clone(), - } - } - } - /// Stream 49 tags with a page limit of 5 from the server #[test(tokio::test)] async fn test_basic() { let state = SharedState::new_populated(PopulationParams::builder().tags(49usize).build()); - let server = LavaMock::new( + let server = Server::new( state.clone(), PaginationLimits::builder().workers(Some(5)).build(), ) @@ -73,4 +74,30 @@ mod tests { } assert_eq!(seen.len(), 49); } + + #[test(tokio::test)] + async fn test_basic_mock() { + let (mut p, _clock) = create_mock(Utc::now()).await; + + let tag_names = BTreeSet::from_iter(p.generate_tags(49).into_iter()); + + let lava = Lava::new(&p.uri(), None).expect("failed to make lava server"); + + let tags = lava.tags().await.expect("failed to get tags"); + + let mut seen = BTreeSet::new(); + for tag in tags { + assert!(!seen.contains(&tag.id)); + assert!(tag_names.contains(&tag.name)); + + p.with_tag(&tag.name, |tk| { + assert_eq!(tag.id, tk.id); + assert_eq!(tag.name, tk.name); + assert_eq!(tag.description, tk.description); + }); + + seen.insert(tag.id); + } + assert_eq!(seen.len(), 49); + } } diff --git a/lava-api/src/test.rs b/lava-api/src/test.rs index 18052b4..bddac9f 100644 --- a/lava-api/src/test.rs +++ b/lava-api/src/test.rs @@ -145,7 +145,7 @@ mod tests { use crate::Lava; use boulder::{Buildable, Builder}; use futures::TryStreamExt; - use lava_api_mock::{Job, LavaMock, PaginationLimits, PopulationParams, SharedState, State}; + use lava_api_mock::{Job, PaginationLimits, PopulationParams, Server, SharedState, State}; use persian_rug::Accessor; use std::collections::BTreeMap; use test_log::test; @@ -252,7 +252,7 @@ result: fail .test_cases(20usize) .build(); let state = SharedState::new_populated(pop); - let server = LavaMock::new( + let server = Server::new( state.clone(), PaginationLimits::builder().test_cases(Some(6)).build(), ) diff --git a/lava-api/src/worker.rs b/lava-api/src/worker.rs index 75c2c70..da66dc8 100644 --- a/lava-api/src/worker.rs +++ b/lava-api/src/worker.rs @@ -31,10 +31,14 @@ pub struct Worker { mod tests { use crate::Lava; use boulder::{Buildable, Builder}; + use chrono::Utc; use futures::TryStreamExt; - use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState, State, Worker}; + use lava_api_mock::{ + create_mock, PaginationLimits, PopulationParams, Server, SharedState, State, Worker, + }; use persian_rug::Accessor; - use std::collections::BTreeMap; + use std::collections::{BTreeMap, BTreeSet}; + use std::iter::FromIterator; use test_log::test; /// Stream 51 workers with a page limit of 2 from the server @@ -42,7 +46,7 @@ mod tests { async fn test_basic() { let state = SharedState::new_populated(PopulationParams::builder().workers(51usize).build()); - let server = LavaMock::new( + let server = Server::new( state.clone(), PaginationLimits::builder().workers(Some(2)).build(), ) @@ -71,4 +75,30 @@ mod tests { } assert_eq!(seen.len(), 51); } + + /// Stream 51 workers with a page limit of 2 from the server + #[test(tokio::test)] + async fn test_basic_mock() { + let (mut p, _clock) = create_mock(Utc::now()).await; + + let workers = BTreeSet::from_iter(p.generate_workers(51).into_iter()); + + let lava = Lava::new(&p.uri(), None).expect("failed to make lava server"); + + let mut lw = lava.workers(); + + let mut seen = BTreeSet::new(); + while let Some(worker) = lw.try_next().await.expect("failed to get worker") { + assert!(!seen.contains(&worker.hostname)); + assert!(workers.contains(&worker.hostname)); + p.with_worker(&worker.hostname, |wk| { + assert_eq!(worker.hostname, wk.hostname); + assert_eq!(worker.state.to_string(), wk.state.to_string()); + assert_eq!(worker.health.to_string(), wk.health.to_string()); + }); + + seen.insert(worker.hostname.clone()); + } + assert_eq!(seen.len(), 51); + } }