diff --git a/Cargo.lock b/Cargo.lock index 538958ef..70d6b11b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2350,6 +2350,7 @@ version = "0.1.0" dependencies = [ "bitfield 0.17.0", "bitflags 2.9.4", + "cfu-service", "critical-section", "defmt 0.3.100", "embassy-futures", diff --git a/Cargo.toml b/Cargo.toml index f9c4e650..2e634339 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ embassy-sync = "0.7.2" embassy-time = "0.5.0" embassy-time-driver = "0.2.1" embedded-batteries-async = "0.3" +cfu-service = { path = "./cfu-service" } embedded-cfu-protocol = { git = "https://github.com/OpenDevicePartnership/embedded-cfu" } embedded-hal = "1.0" embedded-hal-async = "1.0" diff --git a/cfu-service/src/buffer.rs b/cfu-service/src/buffer.rs index 3494c87f..d0300e00 100644 --- a/cfu-service/src/buffer.rs +++ b/cfu-service/src/buffer.rs @@ -10,14 +10,9 @@ use embassy_sync::{ }; use embassy_time::{Duration, TimeoutError, with_timeout}; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::{ - GlobalRawMutex, - cfu::{ - self, - component::{CfuDevice, InternalResponseData, RequestData}, - }, - error, intrusive_list, trace, -}; +use embedded_services::{GlobalRawMutex, error, intrusive_list, trace}; + +use crate::component::{CfuDevice, InternalResponseData, RequestData}; /// Internal state for [`Buffer`] #[derive(Copy, Clone, Default)] @@ -115,9 +110,11 @@ impl<'a> Buffer<'a> { } /// Process a fw version request - async fn process_get_fw_version(&self) -> InternalResponseData { - if let Ok(InternalResponseData::FwVersionResponse(mut response)) = - cfu::route_request(self.buffered_id, RequestData::FwVersionRequest).await + async fn process_get_fw_version(&self, cfu_client: &crate::CfuClient) -> InternalResponseData { + if let Ok(InternalResponseData::FwVersionResponse(mut response)) = cfu_client + .context + .route_request(self.buffered_id, RequestData::FwVersionRequest) + .await { // Update the component ID in the response to match our external ID response.component_info[0].component_id = self.cfu_device.component_id(); @@ -128,8 +125,12 @@ impl<'a> Buffer<'a> { } } - async fn process_abort_update(&self) -> InternalResponseData { - match cfu::route_request(self.buffered_id, RequestData::AbortUpdate).await { + async fn process_abort_update(&self, cfu_client: &crate::CfuClient) -> InternalResponseData { + match cfu_client + .context + .route_request(self.buffered_id, RequestData::AbortUpdate) + .await + { Ok(response) => response, Err(e) => { error!("Failed to abort update for device {}: {:?}", self.buffered_id, e); @@ -139,11 +140,13 @@ impl<'a> Buffer<'a> { } /// Process a give offer request - async fn process_give_offer(&self, offer: &FwUpdateOffer) -> InternalResponseData { + async fn process_give_offer(&self, offer: &FwUpdateOffer, cfu_client: &crate::CfuClient) -> InternalResponseData { let mut offer = *offer; offer.component_info.component_id = self.buffered_id; - if let Ok(response @ InternalResponseData::OfferResponse(_)) = - cfu::route_request(self.buffered_id, RequestData::GiveOffer(offer)).await + if let Ok(response @ InternalResponseData::OfferResponse(_)) = cfu_client + .context + .route_request(self.buffered_id, RequestData::GiveOffer(offer)) + .await { response } else { @@ -157,7 +160,12 @@ impl<'a> Buffer<'a> { } /// Process update content - async fn process_give_content(&self, state: &mut State, content: &FwUpdateContentCommand) -> InternalResponseData { + async fn process_give_content( + &self, + state: &mut State, + content: &FwUpdateContentCommand, + cfu_client: &crate::CfuClient, + ) -> InternalResponseData { // Clear out any pending response if this is a new FW update if content.header.flags & FW_UPDATE_FLAG_FIRST_BLOCK != 0 { state.pending_response = None; @@ -171,7 +179,11 @@ impl<'a> Buffer<'a> { trace!("Content successfully buffered"); } else { // Buffered component can accept new content, send it - if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await { + if let Err(e) = cfu_client + .context + .send_device_request(self.buffered_id, RequestData::GiveContent(*content)) + .await + { error!( "Failed to send content to buffered component {:?}: {:?}", self.buffered_id, e @@ -181,7 +193,12 @@ impl<'a> Buffer<'a> { } // Wait for a response from the buffered component - match with_timeout(self.config.buffer_timeout, cfu::wait_device_response(self.buffered_id)).await { + match with_timeout( + self.config.buffer_timeout, + cfu_client.context.wait_device_response(self.buffered_id), + ) + .await + { Err(TimeoutError) => { // Component didn't respond in time state.component_busy = true; @@ -242,7 +259,7 @@ impl<'a> Buffer<'a> { } /// Wait for an event - pub async fn wait_event(&self) -> Event { + pub async fn wait_event(&self, cfu_client: &crate::CfuClient) -> Event { let is_busy = self.state.lock().await.component_busy; match select3( // Wait for a buffered content request @@ -250,7 +267,7 @@ impl<'a> Buffer<'a> { // Wait for a request from the host self.cfu_device.wait_request(), // Wait for response from the buffered component - cfu::wait_device_response(self.buffered_id), + cfu_client.context.wait_device_response(self.buffered_id), ) .await { @@ -275,14 +292,18 @@ impl<'a> Buffer<'a> { } /// Top-level event processing function - pub async fn process(&self, event: Event) -> Option { + pub async fn process(&self, event: Event, cfu_client: &crate::CfuClient) -> Option { let mut state = self.state.lock().await; match event { - Event::CfuRequest(request) => Some(self.process_request(&mut state, request).await), + Event::CfuRequest(request) => Some(self.process_request(&mut state, request, cfu_client).await), Event::BufferedContent(content) => { // Send the buffered content to the component // Don't need to wait for a response here, the response will be caught later by either [`wait_event`] or [`process_give_content`] - if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(content)).await { + if let Err(e) = cfu_client + .context + .send_device_request(self.buffered_id, RequestData::GiveContent(content)) + .await + { error!( "Failed to send content to buffered component {:?}: {:?}", self.buffered_id, e @@ -303,23 +324,28 @@ impl<'a> Buffer<'a> { } /// Process a CFU message and produce a response - async fn process_request(&self, state: &mut State, request: RequestData) -> InternalResponseData { + async fn process_request( + &self, + state: &mut State, + request: RequestData, + cfu_client: &crate::CfuClient, + ) -> InternalResponseData { match request { RequestData::FwVersionRequest => { trace!("Got FwVersionRequest"); - self.process_get_fw_version().await + self.process_get_fw_version(cfu_client).await } RequestData::GiveOffer(offer) => { trace!("Got GiveOffer"); - self.process_give_offer(&offer).await + self.process_give_offer(&offer, cfu_client).await } RequestData::GiveContent(content) => { trace!("Got GiveContent"); - self.process_give_content(state, &content).await + self.process_give_content(state, &content, cfu_client).await } RequestData::AbortUpdate => { trace!("Got AbortUpdate"); - self.process_abort_update().await + self.process_abort_update(cfu_client).await } RequestData::FinalizeUpdate => { trace!("Got FinalizeUpdate"); @@ -356,7 +382,7 @@ impl<'a> Buffer<'a> { } /// Register the buffer with all relevant services - pub async fn register(&'static self) -> Result<(), intrusive_list::Error> { - cfu::register_device(&self.cfu_device).await + pub fn register(&'static self, cfu_client: &crate::CfuClient) -> Result<(), intrusive_list::Error> { + cfu_client.context.register_device(&self.cfu_device) } } diff --git a/embedded-service/src/cfu/component.rs b/cfu-service/src/component.rs similarity index 97% rename from embedded-service/src/cfu/component.rs rename to cfu-service/src/component.rs index d89773b3..6cac8120 100644 --- a/embedded-service/src/cfu/component.rs +++ b/cfu-service/src/component.rs @@ -9,9 +9,8 @@ use embedded_cfu_protocol::writer::{CfuWriterAsync, CfuWriterError}; use heapless::Vec; use super::CfuError; -use crate::GlobalRawMutex; -use crate::cfu::route_request; -use crate::intrusive_list; +use embedded_services::GlobalRawMutex; +use embedded_services::intrusive_list; /// Component internal update state #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -222,7 +221,7 @@ impl CfuComponentDefault { } } /// wait for a request and process it - pub async fn process_request(&self) -> Result<(), CfuError> { + pub async fn process_request(&self, cfu_client: &'static crate::CfuClient) -> Result<(), CfuError> { match self.device.wait_request().await { RequestData::FwVersionRequest => { let fwv = self.get_fw_version().await.map_err(CfuError::ProtocolError)?; @@ -247,8 +246,10 @@ impl CfuComponentDefault { // panic safety: adding 1 here is safe because MAX_CMPT_COUNT is 1 more than MAX_SUBCMPT_COUNT for (index, id) in arr.iter().enumerate() { //info!("Forwarding GetFwVersion command to sub-component: {}", id); - if let InternalResponseData::FwVersionResponse(fwv) = - route_request(*id, RequestData::FwVersionRequest).await? + if let InternalResponseData::FwVersionResponse(fwv) = cfu_client + .context + .route_request(*id, RequestData::FwVersionRequest) + .await? { comp_info[index + 1] = fwv .component_info diff --git a/cfu-service/src/lib.rs b/cfu-service/src/lib.rs index 9e6b4948..9e2ff403 100644 --- a/cfu-service/src/lib.rs +++ b/cfu-service/src/lib.rs @@ -1,20 +1,20 @@ #![no_std] +use embassy_sync::channel::Channel; use embedded_cfu_protocol::client::CfuReceiveContent; use embedded_cfu_protocol::components::CfuComponentTraits; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::cfu::component::*; -use embedded_services::cfu::{CfuError, ContextToken}; -use embedded_services::{comms, error, info, trace}; +use embedded_services::{GlobalRawMutex, comms, error, info, intrusive_list, trace}; pub mod buffer; +pub mod component; pub mod host; pub mod splitter; pub mod task; pub struct CfuClient { /// Cfu Client context - context: ContextToken, + context: ClientContext, /// Comms endpoint tp: comms::Endpoint, } @@ -37,21 +37,32 @@ impl CfuReceiveContent for CfuClient { impl CfuClient { /// Create a new Cfu Client - pub fn create() -> Option { - Some(Self { - context: ContextToken::create()?, + pub async fn new(service_storage: &'static embassy_sync::once_lock::OnceLock) -> &'static Self { + let service_storage = service_storage.get_or_init(|| Self { + context: ClientContext::new(), tp: comms::Endpoint::uninit(comms::EndpointID::Internal(comms::Internal::Nonvol)), - }) + }); + + service_storage.init().await; + + service_storage } + + async fn init(&'static self) { + if comms::register_endpoint(self, &self.tp).await.is_err() { + error!("Failed to register cfu endpoint"); + } + } + pub async fn process_request(&self) -> Result<(), CfuError> { let request = self.context.wait_request().await; //let device = self.context.get_device(request.id).await?; let comp = request.id; match request.data { - RequestData::FwVersionRequest => { + component::RequestData::FwVersionRequest => { info!("Received FwVersionRequest, comp {}", comp); - if let Ok(device) = self.context.get_device(comp).await { + if let Ok(device) = self.context.get_device(comp) { let resp = device .execute_device_request(request.data) .await @@ -60,7 +71,7 @@ impl CfuClient { // TODO replace with signal to component to get its own fw version //cfu::send_request(comp, RequestData::FwVersionRequest).await?; match resp { - InternalResponseData::FwVersionResponse(r) => { + component::InternalResponseData::FwVersionResponse(r) => { let ver = r.component_info[0].fw_version; info!("got fw version {:?} for comp {}", ver, comp); } @@ -74,15 +85,15 @@ impl CfuClient { } Err(CfuError::InvalidComponent) } - RequestData::GiveContent(_content_cmd) => Ok(()), - RequestData::GiveOffer(_offer_cmd) => Ok(()), - RequestData::PrepareComponentForUpdate => Ok(()), - RequestData::AbortUpdate => Ok(()), - RequestData::FinalizeUpdate => Ok(()), - RequestData::GiveOfferExtended(_) => { + component::RequestData::GiveContent(_content_cmd) => Ok(()), + component::RequestData::GiveOffer(_offer_cmd) => Ok(()), + component::RequestData::PrepareComponentForUpdate => Ok(()), + component::RequestData::AbortUpdate => Ok(()), + component::RequestData::FinalizeUpdate => Ok(()), + component::RequestData::GiveOfferExtended(_) => { // Don't currently support extended offers self.context - .send_response(InternalResponseData::OfferResponse( + .send_response(component::InternalResponseData::OfferResponse( FwUpdateOfferResponse::new_with_failure( HostToken::Driver, OfferRejectReason::InvalidComponent, @@ -92,10 +103,10 @@ impl CfuClient { .await; Ok(()) } - RequestData::GiveOfferInformation(_) => { + component::RequestData::GiveOfferInformation(_) => { // Don't currently support information offers self.context - .send_response(InternalResponseData::OfferResponse( + .send_response(component::InternalResponseData::OfferResponse( FwUpdateOfferResponse::new_with_failure( HostToken::Driver, OfferRejectReason::InvalidComponent, @@ -107,6 +118,155 @@ impl CfuClient { } } } + + pub fn register_device( + &self, + device: &'static impl component::CfuDeviceContainer, + ) -> Result<(), intrusive_list::Error> { + self.context.register_device(device) + } + + pub async fn route_request( + &self, + to: ComponentId, + request: component::RequestData, + ) -> Result { + self.context.route_request(to, request).await + } } impl comms::MailboxDelegate for CfuClient {} + +/// Error type +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum CfuError { + /// Image did not pass validation + BadImage, + /// Component either doesn't exist + InvalidComponent, + /// Component is busy + ComponentBusy, + /// Component encountered a protocol error during execution + ProtocolError(CfuProtocolError), +} + +/// Request to the power policy service +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct Request { + /// Component that sent this request + pub id: ComponentId, + /// Request data + pub data: component::RequestData, +} + +/// Cfu context +pub struct ClientContext { + /// Registered devices + devices: embedded_services::intrusive_list::IntrusiveList, + /// Request to components + request: Channel, + /// Response from components + response: Channel, +} + +impl Default for ClientContext { + fn default() -> Self { + Self::new() + } +} + +impl ClientContext { + pub fn new() -> Self { + Self { + devices: embedded_services::intrusive_list::IntrusiveList::new(), + request: Channel::new(), + response: Channel::new(), + } + } + + /// Register a device with the Cfu Client service + fn register_device( + &self, + device: &'static impl component::CfuDeviceContainer, + ) -> Result<(), intrusive_list::Error> { + let device = device.get_cfu_component_device(); + if self.get_device(device.component_id()).is_ok() { + return Err(intrusive_list::Error::NodeAlreadyInList); + } + + self.devices.push(device) + } + + /// Convenience function to send a request to the Cfu service + pub async fn send_request( + &self, + from: ComponentId, + request: component::RequestData, + ) -> Result { + self.request + .send(Request { + id: from, + data: request, + }) + .await; + Ok(self.response.receive().await) + } + + /// Convenience function to route a request to a specific component + pub async fn route_request( + &self, + to: ComponentId, + request: component::RequestData, + ) -> Result { + let device = self.get_device(to)?; + device + .execute_device_request(request) + .await + .map_err(CfuError::ProtocolError) + } + + /// Send a request to the specific CFU device, but don't wait for a response + pub async fn send_device_request(&self, to: ComponentId, request: component::RequestData) -> Result<(), CfuError> { + let device = self.get_device(to)?; + device.send_request(request).await; + Ok(()) + } + + /// Wait for a response from the specific CFU device + pub async fn wait_device_response(&self, to: ComponentId) -> Result { + let device = self.get_device(to)?; + Ok(device.wait_response().await) + } + + /// Wait for a cfu request + pub async fn wait_request(&self) -> Request { + self.request.receive().await + } + + /// Send a response to a cfu request + pub async fn send_response(&self, response: component::InternalResponseData) { + self.response.send(response).await + } + + /// Get a device by its ID + pub fn get_device(&self, id: ComponentId) -> Result<&'static component::CfuDevice, CfuError> { + for device in &self.devices { + if let Some(data) = device.data::() { + if data.component_id() == id { + return Ok(data); + } + } else { + error!("Non-device located in devices list"); + } + } + + Err(CfuError::InvalidComponent) + } + + /// Provides access to the device list + pub fn devices(&self) -> &intrusive_list::IntrusiveList { + &self.devices + } +} diff --git a/cfu-service/src/splitter.rs b/cfu-service/src/splitter.rs index 055d2936..5210619d 100644 --- a/cfu-service/src/splitter.rs +++ b/cfu-service/src/splitter.rs @@ -3,15 +3,10 @@ use core::{future::Future, iter::zip}; +use crate::component; use embassy_futures::join::{join, join3, join4}; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::{ - cfu::{ - self, - component::{CfuDevice, InternalResponseData, RequestData}, - }, - error, intrusive_list, trace, -}; +use embedded_services::{error, intrusive_list, trace}; /// Trait containing customization functionality for [`Splitter`] pub trait Customization { @@ -28,7 +23,7 @@ pub trait Customization { /// Splitter struct pub struct Splitter<'a, C: Customization> { /// CFU device - cfu_device: CfuDevice, + cfu_device: component::CfuDevice, /// Component ID for each individual device devices: &'a [ComponentId], /// Customization for the Splitter @@ -45,7 +40,7 @@ impl<'a, C: Customization> Splitter<'a, C> { None } else { Some(Self { - cfu_device: CfuDevice::new(component_id), + cfu_device: component::CfuDevice::new(component_id), devices, customization, }) @@ -53,33 +48,35 @@ impl<'a, C: Customization> Splitter<'a, C> { } /// Create a new invalid FW version response - fn create_invalid_fw_version_response(&self) -> InternalResponseData { + fn create_invalid_fw_version_response(&self) -> component::InternalResponseData { let dev_inf = FwVerComponentInfo::new(FwVersion::new(0xffffffff), self.cfu_device.component_id()); let comp_info: [FwVerComponentInfo; MAX_CMPT_COUNT] = [dev_inf; MAX_CMPT_COUNT]; - InternalResponseData::FwVersionResponse(GetFwVersionResponse { + component::InternalResponseData::FwVersionResponse(GetFwVersionResponse { header: GetFwVersionResponseHeader::new(1, GetFwVerRespHeaderByte3::NoSpecialFlags), component_info: comp_info, }) } /// Create a content rejection response - fn create_content_rejection(sequence: u16) -> InternalResponseData { - InternalResponseData::ContentResponse(FwUpdateContentResponse::new( + fn create_content_rejection(sequence: u16) -> component::InternalResponseData { + component::InternalResponseData::ContentResponse(FwUpdateContentResponse::new( sequence, CfuUpdateContentResponseStatus::ErrorInvalid, )) } /// Process a fw version request - async fn process_get_fw_version(&self) -> InternalResponseData { + async fn process_get_fw_version(&self, cfu_client: &crate::CfuClient) -> component::InternalResponseData { let mut versions = [GetFwVersionResponse { header: Default::default(), component_info: Default::default(), }; MAX_SUPPORTED_DEVICES]; let success = map_slice_join(self.devices, &mut versions, |device_id| async move { - if let Ok(InternalResponseData::FwVersionResponse(version_info)) = - cfu::route_request(*device_id, RequestData::FwVersionRequest).await + if let Ok(component::InternalResponseData::FwVersionResponse(version_info)) = cfu_client + .context + .route_request(*device_id, component::RequestData::FwVersionRequest) + .await { Some(version_info) } else { @@ -94,14 +91,18 @@ impl<'a, C: Customization> Splitter<'a, C> { // The overall component version comes first overall_version.component_info[0].component_id = self.cfu_device.component_id(); - InternalResponseData::FwVersionResponse(overall_version) + component::InternalResponseData::FwVersionResponse(overall_version) } else { self.create_invalid_fw_version_response() } } /// Process a give offer request - async fn process_give_offer(&self, offer: &FwUpdateOffer) -> InternalResponseData { + async fn process_give_offer( + &self, + offer: &FwUpdateOffer, + cfu_client: &crate::CfuClient, + ) -> component::InternalResponseData { let mut offer_responses = [FwUpdateOfferResponse::default(); MAX_SUPPORTED_DEVICES]; let success = map_slice_join(self.devices, &mut offer_responses, |device_id| async move { @@ -109,8 +110,10 @@ impl<'a, C: Customization> Splitter<'a, C> { // Override with the correct component ID for the device offer.component_info.component_id = *device_id; - if let Ok(InternalResponseData::OfferResponse(response)) = - cfu::route_request(*device_id, RequestData::GiveOffer(offer)).await + if let Ok(component::InternalResponseData::OfferResponse(response)) = cfu_client + .context + .route_request(*device_id, component::RequestData::GiveOffer(offer)) + .await { Some(response) } else { @@ -121,19 +124,27 @@ impl<'a, C: Customization> Splitter<'a, C> { .await; if success && let Some(offer_responses_slice) = offer_responses.get(..self.devices.len()) { - InternalResponseData::OfferResponse(self.customization.resolve_offer_response(offer_responses_slice)) + component::InternalResponseData::OfferResponse( + self.customization.resolve_offer_response(offer_responses_slice), + ) } else { self.create_invalid_fw_version_response() } } /// Process update content - async fn process_give_content(&self, content: &FwUpdateContentCommand) -> InternalResponseData { + async fn process_give_content( + &self, + content: &FwUpdateContentCommand, + cfu_client: &crate::CfuClient, + ) -> component::InternalResponseData { let mut content_responses = [FwUpdateContentResponse::default(); MAX_SUPPORTED_DEVICES]; let success = map_slice_join(self.devices, &mut content_responses, |device_id| async move { - if let Ok(InternalResponseData::ContentResponse(response)) = - cfu::route_request(*device_id, RequestData::GiveContent(*content)).await + if let Ok(component::InternalResponseData::ContentResponse(response)) = cfu_client + .context + .route_request(*device_id, component::RequestData::GiveContent(*content)) + .await { Some(response) } else { @@ -144,57 +155,63 @@ impl<'a, C: Customization> Splitter<'a, C> { .await; if success && let Some(content_responses_slice) = content_responses.get(..self.devices.len()) { - InternalResponseData::ContentResponse(self.customization.resolve_content_response(content_responses_slice)) + component::InternalResponseData::ContentResponse( + self.customization.resolve_content_response(content_responses_slice), + ) } else { Self::create_content_rejection(content.header.sequence_num) } } /// Wait for a CFU message - pub async fn wait_request(&self) -> RequestData { + pub async fn wait_request(&self) -> component::RequestData { self.cfu_device.wait_request().await } /// Process a CFU message and produce a response - pub async fn process_request(&self, request: RequestData) -> InternalResponseData { + pub async fn process_request( + &self, + request: component::RequestData, + cfu_client: &crate::CfuClient, + ) -> component::InternalResponseData { match request { - RequestData::FwVersionRequest => { + component::RequestData::FwVersionRequest => { trace!("Got FwVersionRequest"); - self.process_get_fw_version().await + self.process_get_fw_version(cfu_client).await } - RequestData::GiveOffer(offer) => { + component::RequestData::GiveOffer(offer) => { trace!("Got GiveOffer"); - self.process_give_offer(&offer).await + self.process_give_offer(&offer, cfu_client).await } - RequestData::GiveContent(content) => { + component::RequestData::GiveContent(content) => { trace!("Got GiveContent"); - self.process_give_content(&content).await + self.process_give_content(&content, cfu_client).await } - RequestData::AbortUpdate => { + component::RequestData::AbortUpdate => { trace!("Got AbortUpdate"); - InternalResponseData::ComponentPrepared + component::InternalResponseData::ComponentPrepared } - RequestData::FinalizeUpdate => { + component::RequestData::FinalizeUpdate => { trace!("Got FinalizeUpdate"); - InternalResponseData::ComponentPrepared + component::InternalResponseData::ComponentPrepared } - RequestData::PrepareComponentForUpdate => { + component::RequestData::PrepareComponentForUpdate => { trace!("Got PrepareComponentForUpdate"); - InternalResponseData::ComponentPrepared + component::InternalResponseData::ComponentPrepared } - RequestData::GiveOfferExtended(_) => { + component::RequestData::GiveOfferExtended(_) => { trace!("Got GiveExtendedOffer"); // Extended offers are not currently supported - InternalResponseData::OfferResponse(FwUpdateOfferResponse::new_with_failure( + component::InternalResponseData::OfferResponse(FwUpdateOfferResponse::new_with_failure( HostToken::Driver, OfferRejectReason::InvalidComponent, OfferStatus::Reject, )) } - RequestData::GiveOfferInformation(_) => { + component::RequestData::GiveOfferInformation(_) => { trace!("Got GiveOfferInformation"); // Offer information is not currently supported - InternalResponseData::OfferResponse(FwUpdateOfferResponse::new_with_failure( + component::InternalResponseData::OfferResponse(FwUpdateOfferResponse::new_with_failure( HostToken::Driver, OfferRejectReason::InvalidComponent, OfferStatus::Reject, @@ -204,12 +221,12 @@ impl<'a, C: Customization> Splitter<'a, C> { } /// Send a response to the CFU message - pub async fn send_response(&self, response: InternalResponseData) { + pub async fn send_response(&self, response: component::InternalResponseData) { self.cfu_device.send_response(response).await; } - pub async fn register(&'static self) -> Result<(), intrusive_list::Error> { - cfu::register_device(&self.cfu_device).await + pub fn register(&'static self, cfu_client: &crate::CfuClient) -> Result<(), intrusive_list::Error> { + cfu_client.context.register_device(&self.cfu_device) } } diff --git a/cfu-service/src/task.rs b/cfu-service/src/task.rs index ef71d8cf..f4dd1187 100644 --- a/cfu-service/src/task.rs +++ b/cfu-service/src/task.rs @@ -1,21 +1,12 @@ -use embassy_sync::once_lock::OnceLock; -use embedded_services::{comms, error, info}; +use embedded_services::{error, info}; use crate::CfuClient; -pub async fn task() { +pub async fn task(cfu_client: &'static CfuClient) { info!("Starting cfu client task"); - static CLIENT: OnceLock = OnceLock::new(); - #[allow(clippy::expect_used)] // panic safety: singleton panic on initialization - let cfuclient = CLIENT.get_or_init(|| CfuClient::create().expect("cfu client singleton already initialized")); - - if comms::register_endpoint(cfuclient, &cfuclient.tp).await.is_err() { - error!("Failed to register cfu endpoint"); - return; - } loop { - if let Err(e) = cfuclient.process_request().await { + if let Err(e) = cfu_client.process_request().await { error!("Error processing request: {:?}", e); } } diff --git a/embedded-service/src/cfu/mod.rs b/embedded-service/src/cfu/mod.rs deleted file mode 100644 index 3c3f97ea..00000000 --- a/embedded-service/src/cfu/mod.rs +++ /dev/null @@ -1,169 +0,0 @@ -//! Cfu Service related data structures and messages -//pub mod action; -pub mod component; - -use core::sync::atomic::{AtomicBool, Ordering}; - -use crate::GlobalRawMutex; -use embassy_sync::channel::Channel; -use embassy_sync::once_lock::OnceLock; -use embedded_cfu_protocol::protocol_definitions::{CfuProtocolError, ComponentId}; - -use crate::cfu::component::{CfuDevice, CfuDeviceContainer, DEVICE_CHANNEL_SIZE, InternalResponseData, RequestData}; -use crate::{error, intrusive_list}; - -/// Error type -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum CfuError { - /// Image did not pass validation - BadImage, - /// Component either doesn't exist - InvalidComponent, - /// Component is busy - ComponentBusy, - /// Component encountered a protocol error during execution - ProtocolError(CfuProtocolError), -} - -/// Request to the power policy service -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct Request { - /// Component that sent this request - pub id: ComponentId, - /// Request data - pub data: RequestData, -} - -/// Cfu context -struct ClientContext { - /// Registered devices - devices: intrusive_list::IntrusiveList, - /// Request to components - request: Channel, - /// Response from components - response: Channel, -} - -impl ClientContext { - fn new() -> Self { - Self { - devices: intrusive_list::IntrusiveList::new(), - request: Channel::new(), - response: Channel::new(), - } - } -} - -static CONTEXT: OnceLock = OnceLock::new(); - -/// Init Cfu Client service -pub fn init() { - CONTEXT.get_or_init(ClientContext::new); -} - -/// Register a device with the Cfu Client service -pub async fn register_device(device: &'static impl CfuDeviceContainer) -> Result<(), intrusive_list::Error> { - let device = device.get_cfu_component_device(); - if get_device(device.component_id()).await.is_some() { - return Err(intrusive_list::Error::NodeAlreadyInList); - } - - CONTEXT.get().await.devices.push(device) -} - -/// Find a device by its ID -async fn get_device(id: ComponentId) -> Option<&'static CfuDevice> { - for device in &CONTEXT.get().await.devices { - if let Some(data) = device.data::() { - if data.component_id() == id { - return Some(data); - } - } else { - error!("Non-device located in devices list"); - } - } - - None -} - -/// Convenience function to send a request to the Cfu service -pub async fn send_request(from: ComponentId, request: RequestData) -> Result { - let context = CONTEXT.get().await; - context - .request - .send(Request { - id: from, - data: request, - }) - .await; - Ok(context.response.receive().await) -} - -/// Convenience function to route a request to a specific component -pub async fn route_request(to: ComponentId, request: RequestData) -> Result { - if let Some(device) = get_device(to).await { - device - .execute_device_request(request) - .await - .map_err(CfuError::ProtocolError) - } else { - Err(CfuError::InvalidComponent) - } -} - -/// Send a request to the specific CFU device, but don't wait for a response -pub async fn send_device_request(to: ComponentId, request: RequestData) -> Result<(), CfuError> { - if let Some(device) = get_device(to).await { - device.send_request(request).await; - Ok(()) - } else { - Err(CfuError::InvalidComponent) - } -} - -/// Wait for a response from the specific CFU device -pub async fn wait_device_response(to: ComponentId) -> Result { - if let Some(device) = get_device(to).await { - Ok(device.wait_response().await) - } else { - Err(CfuError::InvalidComponent) - } -} - -/// Singleton struct to give access to the cfu client context -pub struct ContextToken(()); - -impl ContextToken { - /// Create a new context token, returning None if this function has been called before - pub fn create() -> Option { - static INIT: AtomicBool = AtomicBool::new(false); - if INIT.load(Ordering::SeqCst) { - return None; - } - - INIT.store(true, Ordering::SeqCst); - Some(ContextToken(())) - } - - /// Wait for a cfu request - pub async fn wait_request(&self) -> Request { - CONTEXT.get().await.request.receive().await - } - - /// Send a response to a cfu request - pub async fn send_response(&self, response: InternalResponseData) { - CONTEXT.get().await.response.send(response).await - } - - /// Get a device by its ID - pub async fn get_device(&self, id: ComponentId) -> Result<&'static CfuDevice, CfuError> { - get_device(id).await.ok_or(CfuError::InvalidComponent) - } - - /// Provides access to the device list - pub async fn devices(&self) -> &intrusive_list::IntrusiveList { - &CONTEXT.get().await.devices - } -} diff --git a/embedded-service/src/lib.rs b/embedded-service/src/lib.rs index 7facbbe0..1ba82192 100644 --- a/embedded-service/src/lib.rs +++ b/embedded-service/src/lib.rs @@ -14,7 +14,6 @@ pub mod thread_mode_cell; pub mod activity; pub mod broadcaster; pub mod buffer; -pub mod cfu; pub mod comms; pub mod ec_type; pub mod fmt; @@ -76,7 +75,6 @@ pub type Never = core::convert::Infallible; pub async fn init() { comms::init(); activity::init(); - cfu::init(); keyboard::init(); power::policy::init(); } diff --git a/examples/rt685s-evk/Cargo.lock b/examples/rt685s-evk/Cargo.lock index 6a4f7f14..d01c0dd6 100644 --- a/examples/rt685s-evk/Cargo.lock +++ b/examples/rt685s-evk/Cargo.lock @@ -216,6 +216,19 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" +[[package]] +name = "cfu-service" +version = "0.1.0" +dependencies = [ + "defmt 0.3.100", + "embassy-futures", + "embassy-sync", + "embassy-time", + "embedded-cfu-protocol", + "embedded-services", + "heapless", +] + [[package]] name = "convert_case" version = "0.6.0" @@ -1356,6 +1369,7 @@ checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" name = "rt685s-evk-example" version = "0.1.0" dependencies = [ + "cfu-service", "cortex-m", "cortex-m-rt", "crc", @@ -1635,6 +1649,7 @@ version = "0.1.0" dependencies = [ "bitfield 0.17.0", "bitflags 2.9.4", + "cfu-service", "defmt 0.3.100", "embassy-futures", "embassy-sync", diff --git a/examples/rt685s-evk/Cargo.toml b/examples/rt685s-evk/Cargo.toml index ca667b92..2f3cd630 100644 --- a/examples/rt685s-evk/Cargo.toml +++ b/examples/rt685s-evk/Cargo.toml @@ -80,6 +80,8 @@ platform-service = { path = "../../platform-service", features = [ ] } embedded-mcu-hal = { git = "https://github.com/OpenDevicePartnership/embedded-mcu" } +cfu-service = { path = "../../cfu-service", features = ["defmt"] } + # Needed otherwise cargo will pull from git [patch."https://github.com/OpenDevicePartnership/embedded-services"] embedded-services = { path = "../../embedded-service" } diff --git a/examples/rt685s-evk/src/bin/type_c.rs b/examples/rt685s-evk/src/bin/type_c.rs index 8264a995..a287cab7 100644 --- a/examples/rt685s-evk/src/bin/type_c.rs +++ b/examples/rt685s-evk/src/bin/type_c.rs @@ -2,6 +2,7 @@ #![no_main] use ::tps6699x::{ADDR1, TPS66994_NUM_PORTS}; +use cfu_service::CfuClient; use embassy_embedded_hal::shared_bus::asynch::i2c::I2cDevice; use embassy_executor::Spawner; use embassy_imxrt::gpio::{Input, Inverter, Pull}; @@ -9,6 +10,7 @@ use embassy_imxrt::i2c::Async; use embassy_imxrt::i2c::master::{Config, I2cMaster}; use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_sync::pubsub::PubSubChannel; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion, HostToken}; @@ -88,6 +90,10 @@ async fn service_task( ) { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell> = StaticCell::new(); @@ -107,7 +113,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_policy_context).await; + type_c_service::task::task(service, wrappers, power_policy_context, cfu_client).await; } #[embassy_executor::main] diff --git a/examples/rt685s-evk/src/bin/type_c_cfu.rs b/examples/rt685s-evk/src/bin/type_c_cfu.rs index 66612e85..f91cd32e 100644 --- a/examples/rt685s-evk/src/bin/type_c_cfu.rs +++ b/examples/rt685s-evk/src/bin/type_c_cfu.rs @@ -2,6 +2,8 @@ #![no_main] use ::tps6699x::{ADDR1, TPS66994_NUM_PORTS}; +use cfu_service::CfuClient; +use cfu_service::component::{InternalResponseData, RequestData}; use embassy_embedded_hal::shared_bus::asynch::i2c::I2cDevice; use embassy_executor::Spawner; use embassy_imxrt::gpio::{Input, Inverter, Pull}; @@ -9,17 +11,16 @@ use embassy_imxrt::i2c::Async; use embassy_imxrt::i2c::master::{Config, I2cMaster}; use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_sync::pubsub::PubSubChannel; use embassy_time::Timer; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::*; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion}; -use embedded_services::cfu::component::InternalResponseData; -use embedded_services::cfu::component::RequestData; use embedded_services::power::policy::{CommsMessage, DeviceId as PowerId}; use embedded_services::type_c::ControllerId; use embedded_services::type_c::controller::Context; -use embedded_services::{GlobalRawMutex, IntrusiveList, cfu}; +use embedded_services::{GlobalRawMutex, IntrusiveList}; use embedded_services::{error, info}; use embedded_usb_pd::GlobalPortId; use static_cell::StaticCell; @@ -78,8 +79,8 @@ async fn interrupt_task(mut int_in: Input<'static>, mut interrupt: Interrupt<'st #[embassy_executor::task] async fn fw_update_task() { Timer::after_millis(1000).await; - let context = cfu::ContextToken::create().unwrap(); - let device = context.get_device(CONTROLLER0_CFU_ID).await.unwrap(); + let context = cfu_service::ClientContext::new(); + let device = context.get_device(CONTROLLER0_CFU_ID).unwrap(); info!("Getting FW version"); let response = device @@ -175,6 +176,10 @@ async fn service_task( ) -> ! { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell> = StaticCell::new(); @@ -194,7 +199,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_policy_context).await; + type_c_service::task::task(service, wrappers, power_policy_context, cfu_client).await; unreachable!() } diff --git a/examples/std/Cargo.lock b/examples/std/Cargo.lock index 7bd330cb..9e83d88a 100644 --- a/examples/std/Cargo.lock +++ b/examples/std/Cargo.lock @@ -1777,6 +1777,7 @@ version = "0.1.0" dependencies = [ "bitfield 0.17.0", "bitflags 2.9.4", + "cfu-service", "embassy-futures", "embassy-sync", "embassy-time", diff --git a/examples/std/src/bin/cfu_buffer.rs b/examples/std/src/bin/cfu_buffer.rs index c2c546b5..4186ec84 100644 --- a/examples/std/src/bin/cfu_buffer.rs +++ b/examples/std/src/bin/cfu_buffer.rs @@ -4,16 +4,13 @@ use embassy_time::{Duration, Timer}; use log::*; use static_cell::StaticCell; +use cfu_service::component::{InternalResponseData, RequestData}; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::{ - GlobalRawMutex, - cfu::{self, component::InternalResponseData, route_request}, -}; +use embedded_services::GlobalRawMutex; +use cfu_service::CfuClient; use cfu_service::buffer; -use crate::cfu::component::RequestData; - /// Component ID for the CFU buffer const CFU_BUFFER_ID: ComponentId = 0x06; @@ -23,7 +20,7 @@ const CFU_COMPONENT0_ID: ComponentId = 0x20; mod mock { use std::sync::atomic::AtomicBool; - use embedded_services::cfu::component::{CfuDevice, CfuDeviceContainer, InternalResponseData}; + use cfu_service::component::{CfuDevice, CfuDeviceContainer, InternalResponseData}; use super::*; @@ -147,10 +144,10 @@ async fn device_task(device: &'static mock::Device) { } #[embassy_executor::task] -async fn buffer_task(buffer: &'static buffer::Buffer<'static>) { +async fn buffer_task(buffer: &'static buffer::Buffer<'static>, cfu_client: &'static CfuClient) { loop { - let request = buffer.wait_event().await; - if let Some(response) = buffer.process(request).await { + let request = buffer.wait_event(cfu_client).await; + if let Some(response) = buffer.process(request, cfu_client).await { buffer.send_response(response).await; } } @@ -160,6 +157,11 @@ async fn buffer_task(buffer: &'static buffer::Buffer<'static>) { async fn run(spawner: Spawner) { embedded_services::init().await; + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + + spawner.must_spawn(cfu_service_task(cfu_client)); + info!("Creating device 0"); static DEVICE0: OnceLock = OnceLock::new(); let device0 = DEVICE0.get_or_init(|| { @@ -172,7 +174,7 @@ async fn run(spawner: Spawner) { }, ) }); - cfu::register_device(device0).await.unwrap(); + cfu_client.register_device(device0).unwrap(); spawner.must_spawn(device_task(device0)); info!("Creating buffer"); @@ -189,11 +191,12 @@ async fn run(spawner: Spawner) { buffer::Config::with_timeout(Duration::from_millis(75)), ) }); - buffer.register().await.unwrap(); - spawner.must_spawn(buffer_task(buffer)); + buffer.register(cfu_client).unwrap(); + spawner.must_spawn(buffer_task(buffer, cfu_client)); info!("Getting FW version"); - let response = route_request(CFU_BUFFER_ID, RequestData::FwVersionRequest) + let response = cfu_client + .route_request(CFU_BUFFER_ID, RequestData::FwVersionRequest) .await .unwrap(); let prev_version = match response { @@ -206,18 +209,19 @@ async fn run(spawner: Spawner) { info!("Got version: {prev_version:#x}"); info!("Giving offer"); - let offer = route_request( - CFU_BUFFER_ID, - RequestData::GiveOffer(FwUpdateOffer::new( - HostToken::Driver, + let offer = cfu_client + .route_request( CFU_BUFFER_ID, - FwVersion::new(0x211), - 0, - 0, - )), - ) - .await - .unwrap(); + RequestData::GiveOffer(FwUpdateOffer::new( + HostToken::Driver, + CFU_BUFFER_ID, + FwVersion::new(0x211), + 0, + 0, + )), + ) + .await + .unwrap(); info!("Got response: {offer:?}"); for i in 0..10 { @@ -235,7 +239,8 @@ async fn run(spawner: Spawner) { info!("Giving content"); let now = embassy_time::Instant::now(); - let response = route_request(CFU_BUFFER_ID, RequestData::GiveContent(request)) + let response = cfu_client + .route_request(CFU_BUFFER_ID, RequestData::GiveContent(request)) .await .unwrap(); info!("Got response in {:?} ms: {:?}", now.elapsed().as_millis(), response); @@ -246,8 +251,8 @@ async fn run(spawner: Spawner) { } #[embassy_executor::task] -async fn cfu_service_task() -> ! { - cfu_service::task::task().await; +async fn cfu_service_task(cfu_client: &'static CfuClient) -> ! { + cfu_service::task::task(cfu_client).await; unreachable!() } @@ -256,7 +261,6 @@ fn main() { static EXECUTOR: StaticCell = StaticCell::new(); let executor = EXECUTOR.init(Executor::new()); executor.run(|spawner| { - spawner.must_spawn(cfu_service_task()); spawner.must_spawn(run(spawner)); }); } diff --git a/examples/std/src/bin/cfu_client.rs b/examples/std/src/bin/cfu_client.rs index 1a7a4a9b..1c51f2b4 100644 --- a/examples/std/src/bin/cfu_client.rs +++ b/examples/std/src/bin/cfu_client.rs @@ -7,24 +7,25 @@ use static_cell::StaticCell; use embedded_cfu_protocol::protocol_definitions::{ ComponentId, FwUpdateOffer, FwVersion, HostToken, MAX_SUBCMPT_COUNT, }; -use embedded_services::cfu; -use embedded_services::cfu::component::CfuComponentDefault; -use crate::cfu::component::RequestData; +use cfu_service::{ + CfuClient, + component::{CfuComponentDefault, RequestData}, +}; #[embassy_executor::task] -async fn device_task0(component: &'static CfuComponentDefault) { +async fn device_task0(component: &'static CfuComponentDefault, cfu_client: &'static CfuClient) { loop { - if let Err(e) = component.process_request().await { + if let Err(e) = component.process_request(cfu_client).await { error!("Error processing request: {e:?}"); } } } #[embassy_executor::task] -async fn device_task1(component: &'static CfuComponentDefault) { +async fn device_task1(component: &'static CfuComponentDefault, cfu_client: &'static CfuClient) { loop { - if let Err(e) = component.process_request().await { + if let Err(e) = component.process_request(cfu_client).await { error!("Error processing request: {e:?}"); } } @@ -34,20 +35,25 @@ async fn device_task1(component: &'static CfuComponentDefault) { async fn run(spawner: Spawner) { embedded_services::init().await; + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + + spawner.must_spawn(cfu_service_task(cfu_client)); + info!("Creating device 0"); static DEVICE0: OnceLock> = OnceLock::new(); let mut subs: [Option; MAX_SUBCMPT_COUNT] = [None; MAX_SUBCMPT_COUNT]; subs[0] = Some(2); let device0 = DEVICE0.get_or_init(|| CfuComponentDefault::new(1, true, subs, CfuWriterNop {})); - cfu::register_device(device0).await.unwrap(); - spawner.must_spawn(device_task0(device0)); + cfu_client.register_device(device0).unwrap(); + spawner.must_spawn(device_task0(device0, cfu_client)); info!("Creating device 1"); static DEVICE1: OnceLock> = OnceLock::new(); let device1 = DEVICE1.get_or_init(|| CfuComponentDefault::new(2, false, [None; MAX_SUBCMPT_COUNT], CfuWriterNop {})); - cfu::register_device(device1).await.unwrap(); - spawner.must_spawn(device_task1(device1)); + cfu_client.register_device(device1).unwrap(); + spawner.must_spawn(device_task1(device1, cfu_client)); let dummy_offer0 = FwUpdateOffer::new( HostToken::Driver, @@ -72,7 +78,7 @@ async fn run(spawner: Spawner) { 0, ); - match cfu::route_request(1, RequestData::GiveOffer(dummy_offer0)).await { + match cfu_client.route_request(1, RequestData::GiveOffer(dummy_offer0)).await { Ok(resp) => { info!("got okay response to device0 update {resp:?}"); } @@ -80,7 +86,7 @@ async fn run(spawner: Spawner) { error!("offer failed with error {e:?}"); } } - match cfu::route_request(2, RequestData::GiveOffer(dummy_offer1)).await { + match cfu_client.route_request(2, RequestData::GiveOffer(dummy_offer1)).await { Ok(resp) => { info!("got okay response to device1 update {resp:?}"); } @@ -91,8 +97,8 @@ async fn run(spawner: Spawner) { } #[embassy_executor::task] -async fn cfu_service_task() -> ! { - cfu_service::task::task().await; +async fn cfu_service_task(cfu_client: &'static CfuClient) -> ! { + cfu_service::task::task(cfu_client).await; unreachable!() } @@ -102,7 +108,6 @@ fn main() { static EXECUTOR: StaticCell = StaticCell::new(); let executor = EXECUTOR.init(Executor::new()); executor.run(|spawner| { - spawner.must_spawn(cfu_service_task()); spawner.must_spawn(run(spawner)); }); } diff --git a/examples/std/src/bin/cfu_splitter.rs b/examples/std/src/bin/cfu_splitter.rs index b7122dac..506e4942 100644 --- a/examples/std/src/bin/cfu_splitter.rs +++ b/examples/std/src/bin/cfu_splitter.rs @@ -3,12 +3,10 @@ use embassy_sync::once_lock::OnceLock; use log::*; use static_cell::StaticCell; +use cfu_service::component::{InternalResponseData, RequestData}; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::cfu::{self, component::InternalResponseData, route_request}; -use cfu_service::splitter; - -use crate::cfu::component::RequestData; +use cfu_service::{CfuClient, splitter}; /// Component ID for the CFU Splitter const CFU_SPLITTER_ID: ComponentId = 0x06; @@ -19,7 +17,7 @@ const CFU_COMPONENT0_ID: ComponentId = 0x20; const CFU_COMPONENT1_ID: ComponentId = 0x21; mod mock { - use embedded_services::cfu::component::{CfuDevice, CfuDeviceContainer, InternalResponseData}; + use cfu_service::component::{CfuDevice, CfuDeviceContainer, InternalResponseData}; use super::*; @@ -166,10 +164,13 @@ async fn device_task(device: &'static mock::Device) { } #[embassy_executor::task] -async fn splitter_task(splitter: &'static splitter::Splitter<'static, mock::Customization>) { +async fn splitter_task( + splitter: &'static splitter::Splitter<'static, mock::Customization>, + cfu_client: &'static CfuClient, +) { loop { let request = splitter.wait_request().await; - let response = splitter.process_request(request).await; + let response = splitter.process_request(request, cfu_client).await; splitter.send_response(response).await; } } @@ -178,6 +179,11 @@ async fn splitter_task(splitter: &'static splitter::Splitter<'static, mock::Cust async fn run(spawner: Spawner) { embedded_services::init().await; + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + + spawner.must_spawn(cfu_service_task(cfu_client)); + info!("Creating device 0"); static DEVICE0: OnceLock = OnceLock::new(); let device0 = DEVICE0.get_or_init(|| { @@ -190,7 +196,7 @@ async fn run(spawner: Spawner) { }, ) }); - cfu::register_device(device0).await.unwrap(); + cfu_client.register_device(device0).unwrap(); spawner.must_spawn(device_task(device0)); info!("Creating device 1"); @@ -205,7 +211,7 @@ async fn run(spawner: Spawner) { }, ) }); - cfu::register_device(device1).await.unwrap(); + cfu_client.register_device(device1).unwrap(); spawner.must_spawn(device_task(device1)); info!("Creating splitter"); @@ -213,11 +219,12 @@ async fn run(spawner: Spawner) { static DEVICES: [ComponentId; 2] = [CFU_COMPONENT0_ID, CFU_COMPONENT1_ID]; let customization = mock::Customization {}; let splitter = SPLITTER.get_or_init(|| splitter::Splitter::new(CFU_SPLITTER_ID, &DEVICES, customization).unwrap()); - splitter.register().await.unwrap(); - spawner.must_spawn(splitter_task(splitter)); + splitter.register(cfu_client).unwrap(); + spawner.must_spawn(splitter_task(splitter, cfu_client)); info!("Getting FW version"); - let response = route_request(CFU_SPLITTER_ID, RequestData::FwVersionRequest) + let response = cfu_client + .route_request(CFU_SPLITTER_ID, RequestData::FwVersionRequest) .await .unwrap(); let prev_version = match response { @@ -230,18 +237,19 @@ async fn run(spawner: Spawner) { info!("Got version: {prev_version:#x}"); info!("Giving offer"); - let offer = route_request( - CFU_SPLITTER_ID, - RequestData::GiveOffer(FwUpdateOffer::new( - HostToken::Driver, + let offer = cfu_client + .route_request( CFU_SPLITTER_ID, - FwVersion::new(0x211), - 0, - 0, - )), - ) - .await - .unwrap(); + RequestData::GiveOffer(FwUpdateOffer::new( + HostToken::Driver, + CFU_SPLITTER_ID, + FwVersion::new(0x211), + 0, + 0, + )), + ) + .await + .unwrap(); info!("Got response: {offer:?}"); let header = FwUpdateContentHeader { @@ -256,15 +264,16 @@ async fn run(spawner: Spawner) { data: [0u8; DEFAULT_DATA_LENGTH], }; - let response = route_request(CFU_SPLITTER_ID, RequestData::GiveContent(request)) + let response = cfu_client + .route_request(CFU_SPLITTER_ID, RequestData::GiveContent(request)) .await .unwrap(); info!("Got response: {response:?}"); } #[embassy_executor::task] -async fn cfu_service_task() -> ! { - cfu_service::task::task().await; +async fn cfu_service_task(cfu_client: &'static CfuClient) -> ! { + cfu_service::task::task(cfu_client).await; unreachable!() } @@ -274,7 +283,6 @@ fn main() { static EXECUTOR: StaticCell = StaticCell::new(); let executor = EXECUTOR.init(Executor::new()); executor.run(|spawner| { - spawner.must_spawn(cfu_service_task()); spawner.must_spawn(run(spawner)); }); } diff --git a/examples/std/src/bin/type_c/external.rs b/examples/std/src/bin/type_c/external.rs index 1b4cbcc8..943d7b3f 100644 --- a/examples/std/src/bin/type_c/external.rs +++ b/examples/std/src/bin/type_c/external.rs @@ -1,6 +1,8 @@ //! Low-level example of external messaging with a simple type-C service +use cfu_service::CfuClient; use embassy_executor::{Executor, Spawner}; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_sync::pubsub::PubSubChannel; use embassy_time::Timer; use embedded_services::power::policy::*; @@ -104,6 +106,10 @@ async fn service_task( ) { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell> = StaticCell::new(); @@ -124,7 +130,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_context).await; + type_c_service::task::task(service, wrappers, power_context, cfu_client).await; } fn create_wrapper( diff --git a/examples/std/src/bin/type_c/service.rs b/examples/std/src/bin/type_c/service.rs index d47b6e2d..1307c283 100644 --- a/examples/std/src/bin/type_c/service.rs +++ b/examples/std/src/bin/type_c/service.rs @@ -1,3 +1,4 @@ +use cfu_service::CfuClient; use embassy_executor::{Executor, Spawner}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; @@ -151,6 +152,10 @@ async fn service_task( ) { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell> = StaticCell::new(); @@ -171,7 +176,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_policy_context).await; + type_c_service::task::task(service, wrappers, power_policy_context, cfu_client).await; } fn create_wrapper( diff --git a/examples/std/src/bin/type_c/ucsi.rs b/examples/std/src/bin/type_c/ucsi.rs index 1600ae2b..eb8f4ffc 100644 --- a/examples/std/src/bin/type_c/ucsi.rs +++ b/examples/std/src/bin/type_c/ucsi.rs @@ -1,6 +1,8 @@ use crate::mock_controller::Wrapper; +use cfu_service::CfuClient; use embassy_executor::{Executor, Spawner}; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_sync::pubsub::PubSubChannel; use embedded_services::GlobalRawMutex; use embedded_services::IntrusiveList; @@ -193,6 +195,10 @@ async fn service_task( ) -> ! { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell< PubSubChannel, @@ -214,7 +220,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_policy_context).await; + type_c_service::task::task(service, wrappers, power_policy_context, cfu_client).await; unreachable!() } diff --git a/examples/std/src/bin/type_c/unconstrained.rs b/examples/std/src/bin/type_c/unconstrained.rs index 8b896bbb..bb2039a7 100644 --- a/examples/std/src/bin/type_c/unconstrained.rs +++ b/examples/std/src/bin/type_c/unconstrained.rs @@ -1,6 +1,8 @@ use crate::mock_controller::Wrapper; +use cfu_service::CfuClient; use embassy_executor::Executor; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_sync::pubsub::PubSubChannel; use embassy_time::Timer; use embedded_services::power::policy::PowerCapability; @@ -119,6 +121,10 @@ async fn service_task( ) -> ! { info!("Starting type-c task"); + // Spin up CFU service + static CFU_CLIENT: OnceLock = OnceLock::new(); + let cfu_client = CfuClient::new(&CFU_CLIENT).await; + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell> = StaticCell::new(); @@ -139,7 +145,7 @@ async fn service_task( static SERVICE: StaticCell = StaticCell::new(); let service = SERVICE.init(service); - type_c_service::task::task(service, wrappers, power_policy_context).await; + type_c_service::task::task(service, wrappers, power_policy_context, cfu_client).await; unreachable!() } diff --git a/type-c-service/Cargo.toml b/type-c-service/Cargo.toml index d52f8291..75f6e1eb 100644 --- a/type-c-service/Cargo.toml +++ b/type-c-service/Cargo.toml @@ -26,6 +26,7 @@ embedded-usb-pd.workspace = true heapless.workspace = true log = { workspace = true, optional = true } tps6699x = { workspace = true, features = ["embassy"] } +cfu-service.workspace = true [dev-dependencies] embassy-time = { workspace = true, features = ["std", "generic-queue-8"] } @@ -45,6 +46,7 @@ defmt = [ "embassy-sync/defmt", "tps6699x/defmt", "embedded-usb-pd/defmt", + "cfu-service/defmt", ] log = [ "dep:log", @@ -52,4 +54,5 @@ log = [ "embassy-time/log", "embassy-sync/log", "tps6699x/log", + "cfu-service/log", ] diff --git a/type-c-service/src/task.rs b/type-c-service/src/task.rs index b328003f..e0afb818 100644 --- a/type-c-service/src/task.rs +++ b/type-c-service/src/task.rs @@ -17,6 +17,7 @@ pub async fn task_closure< service: &'static Service<'a>, wrappers: [&'a ControllerWrapper<'a, M, C, V, POLICY_CHANNEL_SIZE>; N], power_policy_context: &'a embedded_services::power::policy::policy::Context, + cfu_client: &'a cfu_service::CfuClient, f: F, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, @@ -33,8 +34,7 @@ pub async fn task_closure< for controller_wrapper in wrappers { if controller_wrapper - .register(service.controllers(), power_policy_context) - .await + .register(service.controllers(), power_policy_context, cfu_client) .is_err() { error!("Failed to register a controller"); @@ -52,16 +52,23 @@ pub async fn task<'a, M, C, V, const N: usize, const POLICY_CHANNEL_SIZE: usize> service: &'static Service<'a>, wrappers: [&'a ControllerWrapper<'a, M, C, V, POLICY_CHANNEL_SIZE>; N], power_policy_context: &'a embedded_services::power::policy::policy::Context, + cfu_client: &'a cfu_service::CfuClient, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, C: embedded_services::sync::Lockable, V: crate::wrapper::FwOfferValidator, ::Inner: embedded_services::type_c::controller::Controller, { - task_closure(service, wrappers, power_policy_context, |service: &Service| async { - if let Err(e) = service.process_next_event().await { - error!("Type-C service processing error: {:#?}", e); - } - }) + task_closure( + service, + wrappers, + power_policy_context, + cfu_client, + |service: &Service| async { + if let Err(e) = service.process_next_event().await { + error!("Type-C service processing error: {:#?}", e); + } + }, + ) .await; } diff --git a/type-c-service/src/wrapper/backing.rs b/type-c-service/src/wrapper/backing.rs index 6c5d2db6..ec076bf8 100644 --- a/type-c-service/src/wrapper/backing.rs +++ b/type-c-service/src/wrapper/backing.rs @@ -46,6 +46,7 @@ //! ``` use core::cell::{RefCell, RefMut}; +use cfu_service::component::CfuDevice; use embassy_sync::{ blocking_mutex::raw::RawMutex, pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}, @@ -168,7 +169,7 @@ pub trait DynPortState<'a> { pub struct Registration<'a, const POLICY_CHANNEL_SIZE: usize> { pub context: &'a embedded_services::type_c::controller::Context, pub pd_controller: &'a embedded_services::type_c::controller::Device<'a>, - pub cfu_device: &'a embedded_services::cfu::component::CfuDevice, + pub cfu_device: &'a CfuDevice, pub power_devices: &'a [embedded_services::power::policy::device::Device], } @@ -187,7 +188,7 @@ pub struct Storage<'a, const N: usize, M: RawMutex, const POLICY_CHANNEL_SIZE: u context: &'a embedded_services::type_c::controller::Context, controller_id: ControllerId, pd_ports: [GlobalPortId; N], - cfu_device: embedded_services::cfu::component::CfuDevice, + cfu_device: CfuDevice, power_devices: [embedded_services::power::policy::device::Device; N], // State-related @@ -206,7 +207,7 @@ impl<'a, const N: usize, M: RawMutex, const POLICY_CHANNEL_SIZE: usize> Storage< context, controller_id, pd_ports: ports.map(|(port, _)| port), - cfu_device: embedded_services::cfu::component::CfuDevice::new(cfu_id), + cfu_device: CfuDevice::new(cfu_id), power_devices: ports .map(|(_, device)| embedded_services::power::policy::device::Device::new(device, power_policy_context)), pd_alerts: [const { PubSubChannel::new() }; N], diff --git a/type-c-service/src/wrapper/cfu.rs b/type-c-service/src/wrapper/cfu.rs index 209ffbe2..69464894 100644 --- a/type-c-service/src/wrapper/cfu.rs +++ b/type-c-service/src/wrapper/cfu.rs @@ -1,8 +1,8 @@ //! CFU message bridge //! TODO: remove this once we have a more generic FW update implementation +use cfu_service::component::{InternalResponseData, RequestData}; use embassy_futures::select::{Either, select}; use embedded_cfu_protocol::protocol_definitions::*; -use embedded_services::cfu::component::{InternalResponseData, RequestData}; use embedded_services::power; use embedded_services::type_c::controller::Controller; use embedded_services::{debug, error}; diff --git a/type-c-service/src/wrapper/message.rs b/type-c-service/src/wrapper/message.rs index f8416b3a..d6efd461 100644 --- a/type-c-service/src/wrapper/message.rs +++ b/type-c-service/src/wrapper/message.rs @@ -44,7 +44,7 @@ pub struct EventPowerPolicyCommand<'a> { #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum EventCfu { /// CFU request - Request(embedded_services::cfu::component::RequestData), + Request(cfu_service::component::RequestData), /// Recovery tick /// /// Occurs when the FW update has timed out to abort the update and return hardware to its normal state @@ -164,7 +164,7 @@ pub enum Output<'a> { /// CFU recovery tick CfuRecovery, /// CFU response - CfuResponse(embedded_services::cfu::component::InternalResponseData), + CfuResponse(cfu_service::component::InternalResponseData), /// Dp status update DpStatusUpdate(OutputDpStatusChanged), } diff --git a/type-c-service/src/wrapper/mod.rs b/type-c-service/src/wrapper/mod.rs index 9cb42a8d..aef64ffb 100644 --- a/type-c-service/src/wrapper/mod.rs +++ b/type-c-service/src/wrapper/mod.rs @@ -4,7 +4,7 @@ //! This struct current currently supports messages from the following services: //! * Type-C: [`embedded_services::type_c::controller::Command`] //! * Power policy: [`embedded_services::power::policy::device::Command`] -//! * CFU: [`embedded_services::cfu::Request`] +//! * CFU: [`cfu_service::Request`] //! # Event loop //! This struct follows a standard wait/process/finalize event loop. //! @@ -608,10 +608,11 @@ where } /// Register all devices with their respective services - pub async fn register( + pub fn register( &'static self, controllers: &intrusive_list::IntrusiveList, power_policy_context: &embedded_services::power::policy::policy::Context, + cfu_client: &cfu_service::CfuClient, ) -> Result<(), Error<::BusError>> { // TODO: Unify these devices? for device in self.registration.power_devices { @@ -634,15 +635,13 @@ where })?; //TODO: Remove when we have a more general framework in place - embedded_services::cfu::register_device(self.registration.cfu_device) - .await - .map_err(|_| { - error!( - "Controller{}: Failed to register CFU device", - self.registration.pd_controller.id().0 - ); - Error::Pd(PdError::Failed) - })?; + cfu_client.register_device(self.registration.cfu_device).map_err(|_| { + error!( + "Controller{}: Failed to register CFU device", + self.registration.pd_controller.id().0 + ); + Error::Pd(PdError::Failed) + })?; Ok(()) } }