diff --git a/src/check.rs b/src/check.rs index 7eef206..17a62ea 100644 --- a/src/check.rs +++ b/src/check.rs @@ -14,64 +14,87 @@ use crate::{ Context, }; -/// Fetches image data from other Cup instances -async fn get_remote_updates(ctx: &Context, client: &Client, refresh: bool) -> Vec { - let mut remote_images = Vec::new(); - - let handles: Vec<_> = ctx.config.servers - .iter() - .map(|(name, url)| async move { - let base_url = if url.starts_with("http://") || url.starts_with("https://") { - format!("{}/api/v3/", url.trim_end_matches('/')) - } else { - format!("https://{}/api/v3/", url.trim_end_matches('/')) - }; - let json_url = base_url.clone() + "json"; - if refresh { - let refresh_url = base_url + "refresh"; - match client.get(&refresh_url, &[], false).await { - Ok(response) => { - if response.status() != 200 { - ctx.logger.warn(format!("GET {}: Failed to refresh server. Server returned invalid response code: {}", refresh_url, response.status())); - return Vec::new(); - } - }, - Err(e) => { - ctx.logger.warn(format!("GET {}: Failed to refresh server. {}", refresh_url, e)); - return Vec::new(); - }, +/// Fetches image data from a single remote Cup instance +pub async fn get_single_server_updates( + name: &str, + url: &str, + refresh: bool, + ctx: &Context, +) -> Vec { + let client = Client::new(ctx); + let base_url = if url.starts_with("http://") || url.starts_with("https://") { + format!("{}/api/v3/", url.trim_end_matches('/')) + } else { + format!("https://{}/api/v3/", url.trim_end_matches('/')) + }; + let json_url = base_url.clone() + "json"; + if refresh { + let refresh_url = base_url + "refresh"; + match client.get(&refresh_url, &[], false).await { + Ok(response) => { + if response.status() != 200 { + ctx.logger.warn(format!("GET {}: Failed to refresh server. Server returned invalid response code: {}", refresh_url, response.status())); + return Vec::new(); } - } - match client.get(&json_url, &[], false).await { - Ok(response) => { - if response.status() != 200 { - ctx.logger.warn(format!("GET {}: Failed to fetch updates from server. Server returned invalid response code: {}", json_url, response.status())); - return Vec::new(); - } - let json = parse_json(&get_response_body(response).await); - ctx.logger.debug(format!("JSON response for {}: {}", name, json)); - if let Some(updates) = json["images"].as_array() { - let mut server_updates: Vec = updates - .iter() - .filter_map(|img| serde_json::from_value(img.clone()).ok()) - .collect(); - // Add server origin to each image - for update in &mut server_updates { - update.server = Some(name.clone()); - update.status = update.get_status(); - } - ctx.logger.debug(format!("Updates for {}: {:#?}", name, server_updates)); - return server_updates; - } - - Vec::new() + Err(e) => { + ctx.logger.warn(format!( + "GET {}: Failed to refresh server. {}", + refresh_url, e + )); + return Vec::new(); + } + } + } + match client.get(&json_url, &[], false).await { + Ok(response) => { + if response.status() != 200 { + ctx.logger.warn(format!("GET {}: Failed to fetch updates from server. Server returned invalid response code: {}", json_url, response.status())); + return Vec::new(); + } + let json = parse_json(&get_response_body(response).await); + ctx.logger + .debug(format!("JSON response for {}: {}", name, json)); + if let Some(updates) = json["images"].as_array() { + let mut server_updates: Vec = updates + .iter() + .filter_map(|img| serde_json::from_value(img.clone()).ok()) + .collect(); + // Add server origin to each image + for update in &mut server_updates { + update.server = Some(name.to_string()); + update.status = update.get_status(); } - Err(e) => { - ctx.logger.warn(format!("GET {}: Failed to fetch updates from server. {}", json_url, e)); - Vec::new() - }, + ctx.logger + .debug(format!("Updates for {}: {:#?}", name, server_updates)); + return server_updates; } + + Vec::new() + } + Err(e) => { + ctx.logger.warn(format!( + "GET {}: Failed to fetch updates from server. {}", + json_url, e + )); + Vec::new() + } + } +} + +/// Fetches image data from other Cup instances +async fn get_remote_updates(ctx: &Context, refresh: bool) -> Vec { + let mut remote_images = Vec::new(); + + let handles: Vec<_> = ctx + .config + .servers + .iter() + .map(|(name, url)| { + let name = name.clone(); + let url = url.clone(); + let ctx = ctx.clone(); + async move { get_single_server_updates(&name, &url, refresh, &ctx).await } }) .collect(); @@ -97,12 +120,8 @@ fn get_excluded_tags(image: &Image, ctx: &Context) -> Vec { .collect() } -/// Returns a list of updates for all images passed in. -pub async fn get_updates( - references: &Option>, // If a user requested _specific_ references to be checked, this will have a value - refresh: bool, - ctx: &Context, -) -> Vec { +/// Returns a list of updates for local images only (no remote servers). +pub async fn get_local_updates(references: &Option>, ctx: &Context) -> Vec { let client = Client::new(ctx); // Merge references argument with references from config @@ -141,14 +160,6 @@ pub async fn get_updates( images.extend(extra); } - // Get remote images from other servers - let remote_updates = if !ctx.config.servers.is_empty() { - ctx.logger.debug("Fetching updates from remote servers"); - get_remote_updates(ctx, &client, refresh).await - } else { - Vec::new() - }; - ctx.logger.debug(format!( "Checking {:?}", images.iter().map(|image| &image.reference).collect_vec() @@ -165,10 +176,6 @@ pub async fn get_updates( }) .collect::>(); - // Create request client. All network requests share the same client for better performance. - // This client is also configured to retry a failed request up to 3 times with exponential backoff in between. - let client = Client::new(ctx); - // Create a map of images indexed by registry. This solution seems quite inefficient, since each iteration causes a key to be looked up. I can't find anything better at the moment. let mut image_map: FxHashMap<&String, Vec<&Image>> = FxHashMap::default(); @@ -226,7 +233,23 @@ pub async fn get_updates( } // Await all the futures let images = join_all(handles).await; - let mut updates: Vec = images.iter().map(|image| image.to_update()).collect(); - updates.extend_from_slice(&remote_updates); + images.iter().map(|image| image.to_update()).collect() +} + +/// Returns a list of updates for all images passed in. +pub async fn get_updates( + references: &Option>, + refresh: bool, + ctx: &Context, +) -> Vec { + let mut updates = get_local_updates(references, ctx).await; + + // Get remote images from other servers + if !ctx.config.servers.is_empty() { + ctx.logger.debug("Fetching updates from remote servers"); + let remote_updates = get_remote_updates(ctx, refresh).await; + updates.extend(remote_updates); + } + updates } diff --git a/src/registry.rs b/src/registry.rs index a1d74be..aaa50ca 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -129,7 +129,7 @@ pub async fn get_latest_tag( let start = now(); let protocol = get_protocol(&image.parts.registry, &ctx.config.registries); let url = format!( - "{}://{}/v2/{}/tags/list", + "{}://{}/v2/{}/tags/list?n=10000", protocol, &image.parts.registry, &image.parts.repository, ); let authorization = to_bearer_string(&token); diff --git a/src/server.rs b/src/server.rs index 03d2063..267cf25 100644 --- a/src/server.rs +++ b/src/server.rs @@ -51,9 +51,13 @@ const SORT_ORDER: [&str; 8] = [ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { ctx.logger.info("Starting server, please wait..."); - let data = ServerData::new(ctx).await; - let scheduler = JobScheduler::new().await.unwrap(); + let data = ServerData::new(ctx); let data = Arc::new(Mutex::new(data)); + + // Blocking initial refresh — populates data before we start serving + background_refresh(data.clone()).await; + + let scheduler = JobScheduler::new().await.unwrap(); let data_copy = data.clone(); let tz = env::var("TZ") .map(|tz| tz.parse().unwrap_or(Tz::UTC)) @@ -61,16 +65,12 @@ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { if let Some(interval) = &ctx.config.refresh_interval { scheduler .add( - match Job::new_async_tz( - interval, - tz, - move |_uuid, _lock| { - let data_copy = data_copy.clone(); - Box::pin(async move { - data_copy.lock().await.refresh().await; - }) - }, - ) { + match Job::new_async_tz(interval, tz, move |_uuid, _lock| { + let data_copy = data_copy.clone(); + Box::pin(async move { + background_refresh(data_copy).await; + }) + }) { Ok(job) => job, Err(e) => match e { tokio_cron_scheduler::JobSchedulerError::ParseSchedule => error!( @@ -167,7 +167,10 @@ async fn api_full(data: StateRef<'_, Arc>>) -> WebResponse { } async fn refresh(data: StateRef<'_, Arc>>) -> WebResponse { - data.lock().await.refresh().await; + let data = data.clone(); + tokio::spawn(async move { + background_refresh(data).await; + }); WebResponse::new(ResponseBody::from("OK")) } @@ -181,86 +184,111 @@ struct ServerData { } impl ServerData { - async fn new(ctx: &Context) -> Self { - let mut s = Self { - ctx: ctx.clone(), - template: String::new(), - simple_json: Value::Null, - full_json: Value::Null, - raw_updates: Vec::new(), - theme: "neutral", + fn new(ctx: &Context) -> Self { + let theme = match &ctx.config.theme { + Theme::Default => "neutral", + Theme::Blue => "gray", }; - s.refresh().await; - s - } - async fn refresh(&mut self) { - let start = now(); - if !self.raw_updates.is_empty() { - self.ctx.logger.info("Refreshing data"); - } - let updates = sort_update_vec(&get_updates(&None, true, &self.ctx).await); - self.ctx.logger.info(format!( - "✨ Checked {} images in {}ms", - updates.len(), - elapsed(start) - )); - self.raw_updates = updates; - let template = liquid::ParserBuilder::with_stdlib() + // Pre-render the template with empty data so the fallback HTML is valid + // (prevents Liquid tag leaks if the lock can't be acquired during a refresh) + let fallback_template = liquid::ParserBuilder::with_stdlib() .build() .unwrap() .parse(HTML) - .unwrap(); - self.simple_json = to_simple_json(&self.raw_updates); - self.full_json = to_full_json(&self.raw_updates); - let last_updated = Local::now(); - self.simple_json["last_updated"] = last_updated - .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) - .to_string() - .into(); - self.full_json["last_updated"] = self.simple_json["last_updated"].clone(); - self.theme = match &self.ctx.config.theme { - Theme::Default => "neutral", - Theme::Blue => "gray", - }; - let mut metrics = self.simple_json["metrics"] - .as_object() .unwrap() + .render(&object!({ + "metrics": [], + "servers": liquid::object!({}), + "server_ids": Vec::<&str>::new(), + "last_updated": "", + "theme": theme + })) + .unwrap(); + Self { + ctx: ctx.clone(), + template: fallback_template, + simple_json: Value::Null, + full_json: Value::Null, + raw_updates: Vec::new(), + theme, + } + } +} + +/// Refresh all data without blocking the API. Fetches everything outside the lock, +/// then briefly locks to swap in the new data. +async fn background_refresh(data: Arc>) { + let (ctx, is_refresh) = { + let d = data.lock().await; + (d.ctx.clone(), !d.raw_updates.is_empty()) + }; + let start = now(); + if is_refresh { + ctx.logger.info("Refreshing data"); + } + let updates = sort_update_vec(&get_updates(&None, true, &ctx).await); + ctx.logger.info(format!( + "✨ Checked {} images in {}ms", + updates.len(), + elapsed(start) + )); + let mut d = data.lock().await; + d.raw_updates = updates; + let template = liquid::ParserBuilder::with_stdlib() + .build() + .unwrap() + .parse(HTML) + .unwrap(); + d.simple_json = to_simple_json(&d.raw_updates); + d.full_json = to_full_json(&d.raw_updates); + let last_updated = Local::now(); + d.simple_json["last_updated"] = last_updated + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + .to_string() + .into(); + d.full_json["last_updated"] = d.simple_json["last_updated"].clone(); + d.theme = match &d.ctx.config.theme { + Theme::Default => "neutral", + Theme::Blue => "gray", + }; + let mut metrics = d.simple_json["metrics"] + .as_object() + .unwrap() + .iter() + .map(|(key, value)| liquid::object!({ "name": key, "value": value })) + .collect::>(); + metrics.sort_unstable_by(|a, b| { + SORT_ORDER .iter() - .map(|(key, value)| liquid::object!({ "name": key, "value": value })) - .collect::>(); - metrics.sort_unstable_by(|a, b| { - SORT_ORDER - .iter() - .position(|i| i == &a["name"].to_kstr().as_str()) - .unwrap() - .cmp( - &SORT_ORDER - .iter() - .position(|i| i == &b["name"].to_kstr().as_str()) - .unwrap(), - ) - }); - let mut servers: FxHashMap<&str, Vec> = FxHashMap::default(); - self.raw_updates.iter().for_each(|update| { - let key = update.server.as_deref().unwrap_or(""); - match servers.get_mut(&key) { - Some(server) => server.push( - object!({"name": update.reference, "status": update.get_status().to_string()}), - ), - None => { - let _ = servers.insert(key, vec![object!({"name": update.reference, "status": update.get_status().to_string()})]); - } + .position(|i| i == &a["name"].to_kstr().as_str()) + .unwrap() + .cmp( + &SORT_ORDER + .iter() + .position(|i| i == &b["name"].to_kstr().as_str()) + .unwrap(), + ) + }); + let mut servers: FxHashMap<&str, Vec> = FxHashMap::default(); + d.raw_updates.iter().for_each(|update| { + let key = update.server.as_deref().unwrap_or(""); + match servers.get_mut(&key) { + Some(server) => server.push( + object!({"name": update.reference, "status": update.get_status().to_string()}), + ), + None => { + let _ = servers.insert(key, vec![object!({"name": update.reference, "status": update.get_status().to_string()})]); } - }); - let globals = object!({ - "metrics": metrics, - "servers": servers, - "server_ids": servers.into_keys().collect::>(), - "last_updated": last_updated.format("%Y-%m-%d %H:%M:%S").to_string(), - "theme": &self.theme - }); - self.template = template.render(&globals).unwrap(); - } + } + }); + let globals = object!({ + "metrics": metrics, + "servers": servers, + "server_ids": servers.into_keys().collect::>(), + "last_updated": last_updated.format("%Y-%m-%d %H:%M:%S").to_string(), + "theme": &d.theme + }); + d.template = template.render(&globals).unwrap(); } async fn logger(next: &S, ctx: WebContext<'_, C, B>) -> Result>