diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index a7122442531..2ea4327838b 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -1,5 +1,5 @@ -use std::collections::HashSet; use std::sync::Mutex; +use std::{collections::HashSet, time::Instant}; use async_trait::async_trait; @@ -67,39 +67,35 @@ impl SubgraphAssignmentProvider { #[async_trait] impl SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider { - async fn start( - &self, - loc: DeploymentLocator, - stop_block: Option, - ) -> Result<(), SubgraphAssignmentProviderError> { + async fn start(&self, loc: DeploymentLocator, stop_block: Option) { let logger = self.logger_factory.subgraph_logger(&loc); // If subgraph ID already in set if !self.deployment_registry.insert(loc.id) { info!(logger, "Subgraph deployment is already running"); - return Err(SubgraphAssignmentProviderError::AlreadyRunning( - loc.hash.clone(), - )); + return; } + let start_time = Instant::now(); + self.instance_manager .cheap_clone() .start_subgraph(loc, stop_block) .await; - Ok(()) + debug!( + logger, + "Subgraph started"; + "start_ms" => start_time.elapsed().as_millis() + ); } - async fn stop( - &self, - deployment: DeploymentLocator, - ) -> Result<(), SubgraphAssignmentProviderError> { + async fn stop(&self, deployment: DeploymentLocator) { // If subgraph ID was in set if self.deployment_registry.remove(&deployment.id) { // Shut down subgraph processing self.instance_manager.stop_subgraph(deployment).await; } - Ok(()) } } diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 87a7ebe4663..b05ccdf4e33 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; -use std::time::Instant; use async_trait::async_trait; use graph::blockchain::Blockchain; @@ -11,16 +10,10 @@ use graph::components::subgraph::Settings; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::subgraph::Graft; use graph::data::value::Word; -use graph::futures01; -use graph::futures01::future; -use graph::futures01::stream; -use graph::futures01::Future; -use graph::futures01::Stream; -use graph::futures03::compat::Future01CompatExt; -use graph::futures03::compat::Stream01CompatExt; -use graph::futures03::future::FutureExt; +use graph::futures03; use graph::futures03::future::TryFutureExt; -use graph::futures03::stream::TryStreamExt; +use graph::futures03::Stream; +use graph::futures03::StreamExt; use graph::prelude::{ CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, SubgraphRegistrar as SubgraphRegistrarTrait, *, @@ -80,14 +73,7 @@ where } } - pub fn start(&self) -> impl Future { - let logger_clone1 = self.logger.clone(); - let logger_clone2 = self.logger.clone(); - let provider = self.provider.clone(); - let node_id = self.node_id.clone(); - let assignment_event_stream_cancel_handle = - self.assignment_event_stream_cancel_guard.handle(); - + pub async fn start(self: Arc) -> Result<(), Error> { // The order of the following three steps is important: // - Start assignment event stream // - Read assignments table and start assigned subgraphs @@ -102,154 +88,137 @@ where // // The discrepancy between the start time of the event stream and the table read can result // in some extraneous events on start up. Examples: - // - The event stream sees an Add event for subgraph A, but the table query finds that + // - The event stream sees an 'set' event for subgraph A, but the table query finds that // subgraph A is already in the table. - // - The event stream sees a Remove event for subgraph B, but the table query finds that + // - The event stream sees a 'removed' event for subgraph B, but the table query finds that // subgraph B has already been removed. - // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning - // (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent. + // The `change_assignment` function handles these cases by ignoring + // such cases which makes the operations idempotent // Start event stream - let assignment_event_stream = self.assignment_events(); + let assignment_event_stream = self.cheap_clone().assignment_events().await; // Deploy named subgraphs found in store - self.start_assigned_subgraphs().and_then(move |()| { - // Spawn a task to handle assignment events. - // Blocking due to store interactions. Won't be blocking after #905. - graph::spawn_blocking( - assignment_event_stream - .compat() - .map_err(SubgraphAssignmentProviderError::Unknown) - .cancelable(&assignment_event_stream_cancel_handle) - .compat() - .for_each(move |assignment_event| { - assert_eq!(assignment_event.node_id(), &node_id); - handle_assignment_event( - assignment_event, - provider.clone(), - logger_clone1.clone(), - ) - .boxed() - .compat() - }) - .map_err(move |e| match e { - CancelableError::Cancel => panic!("assignment event stream canceled"), - CancelableError::Error(e) => { - error!(logger_clone2, "Assignment event stream failed: {}", e); - panic!("assignment event stream failed: {}", e); - } - }) - .compat(), - ); + self.start_assigned_subgraphs().await?; + + let cancel_handle = self.assignment_event_stream_cancel_guard.handle(); + + // Spawn a task to handle assignment events. + let fut = assignment_event_stream.for_each({ + move |event| { + // The assignment stream should run forever. If it gets + // cancelled, that probably indicates a serious problem and + // we panic + if cancel_handle.is_canceled() { + panic!("assignment event stream canceled"); + } - Ok(()) - }) + let this = self.cheap_clone(); + async move { + this.change_assignment(event).await; + } + } + }); + + graph::spawn(fut); + Ok(()) } - pub fn assignment_events(&self) -> impl Stream + Send { - let store = self.store.clone(); - let node_id = self.node_id.clone(); - let logger = self.logger.clone(); + /// Start/stop subgraphs as needed, considering the current assignment + /// state in the database, ignoring changes that do not affect this + /// node, do not require anything to change, or for which we can not + /// find the assignment status from the database + async fn change_assignment(&self, change: AssignmentChange) { + let (deployment, operation) = change.into_parts(); - self.subscription_manager - .subscribe() - .map_err(|()| anyhow!("Entity change stream failed")) - .map(|event| { - let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect(); - stream::iter_ok(changes) - }) - .flatten() - .and_then( - move |(deployment, operation)| -> Result + Send>, _> { - trace!(logger, "Received assignment change"; - "deployment" => %deployment, - "operation" => format!("{:?}", operation), - ); - - match operation { - AssignmentOperation::Set => { - store - .assignment_status(&deployment) - .map_err(|e| { - anyhow!("Failed to get subgraph assignment entity: {}", e) - }) - .map(|assigned| -> Box + Send> { - let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string())); - if let Some((assigned,is_paused)) = assigned { - if assigned == node_id { - - if is_paused{ - // Subgraph is paused, so we don't start it - debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); - return Box::new(stream::empty()); - } - - // Start subgraph on this node - debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); - Box::new(stream::once(Ok(AssignmentEvent::Add { - deployment, - node_id: node_id.clone(), - }))) - } else { - // Ensure it is removed from this node - debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); - Box::new(stream::once(Ok(AssignmentEvent::Remove { - deployment, - node_id: node_id.clone(), - }))) - } - } else { - // Was added/updated, but is now gone. - debug!(logger, "Deployment assignee not found in database"; "action" => "ignore"); - Box::new(stream::empty()) - } - }) - } - AssignmentOperation::Removed => { - // Send remove event without checking node ID. - // If node ID does not match, then this is a no-op when handled in - // assignment provider. - Ok(Box::new(stream::once(Ok(AssignmentEvent::Remove { - deployment, - node_id: node_id.clone(), - })))) + trace!(self.logger, "Received assignment change"; + "deployment" => %deployment, + "operation" => format!("{:?}", operation), + ); + + match operation { + AssignmentOperation::Set => { + let assigned = match self.store.assignment_status(&deployment).await { + Ok(assigned) => assigned, + Err(e) => { + error!( + self.logger, + "Failed to get subgraph assignment entity"; "deployment" => deployment, "error" => e.to_string() + ); + return; + } + }; + + let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string())); + if let Some((assigned, is_paused)) = assigned { + if &assigned == &self.node_id { + if is_paused { + // Subgraph is paused, so we don't start it + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); + return; } + + // Start subgraph on this node + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); + self.provider.start(deployment, None).await; + } else { + // Ensure it is removed from this node + debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); + self.provider.stop(deployment).await } - }, - ) + } else { + // Was added/updated, but is now gone. + debug!(self.logger, "Deployment assignee not found in database"; "action" => "ignore"); + } + } + AssignmentOperation::Removed => { + // Send remove event without checking node ID. + // If node ID does not match, then this is a no-op when handled in + // assignment provider. + self.provider.stop(deployment).await; + } + } + } + + pub async fn assignment_events(self: Arc) -> impl Stream + Send { + self.subscription_manager + .subscribe() + .map(|event| futures03::stream::iter(event.changes.clone())) .flatten() } - fn start_assigned_subgraphs(&self) -> impl Future { - let provider = self.provider.clone(); + async fn start_assigned_subgraphs(&self) -> Result<(), Error> { let logger = self.logger.clone(); let node_id = self.node_id.clone(); - future::result(self.store.active_assignments(&self.node_id)) - .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e)) - .and_then(move |deployments| { - // This operation should finish only after all subgraphs are - // started. We wait for the spawned tasks to complete by giving - // each a `sender` and waiting for all of them to be dropped, so - // the receiver terminates without receiving anything. - let deployments = HashSet::::from_iter(deployments); - let deployments_len = deployments.len(); - let (sender, receiver) = futures01::sync::mpsc::channel::<()>(1); - for id in deployments { - let sender = sender.clone(); - let logger = logger.clone(); - - graph::spawn( - start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)), - ); - } - drop(sender); - receiver.collect().then(move |_| { - info!(logger, "Started all assigned subgraphs"; - "count" => deployments_len, "node_id" => &node_id); - future::ok(()) - }) - }) + let deployments = self + .store + .active_assignments(&self.node_id) + .await + .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))?; + // This operation should finish only after all subgraphs are + // started. We wait for the spawned tasks to complete by giving + // each a `sender` and waiting for all of them to be dropped, so + // the receiver terminates without receiving anything. + let deployments = HashSet::::from_iter(deployments); + let deployments_len = deployments.len(); + debug!(logger, "Starting all assigned subgraphs"; + "count" => deployments_len, "node_id" => &node_id); + let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1); + for id in deployments { + let sender = sender.clone(); + let provider = self.provider.cheap_clone(); + + graph::spawn(async move { + provider.start(id, None).await; + drop(sender) + }); + } + drop(sender); + let _: Vec<_> = receiver.collect().await; + info!(logger, "Started all assigned subgraphs"; + "count" => deployments_len, "node_id" => &node_id); + Ok(()) } } @@ -442,66 +411,6 @@ where } } -async fn handle_assignment_event( - event: AssignmentEvent, - provider: Arc, - logger: Logger, -) -> Result<(), CancelableError> { - let logger = logger.clone(); - - debug!(logger, "Received assignment event: {:?}", event); - - match event { - AssignmentEvent::Add { - deployment, - node_id: _, - } => { - start_subgraph(deployment, provider.clone(), logger).await; - Ok(()) - } - AssignmentEvent::Remove { - deployment, - node_id: _, - } => match provider.stop(deployment).await { - Ok(()) => Ok(()), - Err(e) => Err(CancelableError::Error(e)), - }, - } -} - -async fn start_subgraph( - deployment: DeploymentLocator, - provider: Arc, - logger: Logger, -) { - let logger = logger - .new(o!("subgraph_id" => deployment.hash.to_string(), "sgd" => deployment.id.to_string())); - - trace!(logger, "Start subgraph"); - - let start_time = Instant::now(); - let result = provider.start(deployment.clone(), None).await; - - debug!( - logger, - "Subgraph started"; - "start_ms" => start_time.elapsed().as_millis() - ); - - match result { - Ok(()) => (), - Err(SubgraphAssignmentProviderError::AlreadyRunning(_)) => (), - Err(e) => { - // Errors here are likely an issue with the subgraph. - error!( - logger, - "Subgraph instance failed to start"; - "error" => e.to_string() - ); - } - } -} - /// Resolves the subgraph's earliest block async fn resolve_start_block( manifest: &SubgraphManifest, diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 585df5945f1..f3872b16580 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -10,6 +10,7 @@ use diesel::sql_types::Integer; use diesel_derives::{AsExpression, FromSqlRow}; pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; use slog::Logger; +use tokio_stream::wrappers::ReceiverStream; pub use super::subgraph::Entity; pub use err::{StoreError, StoreResult}; @@ -18,7 +19,6 @@ use strum_macros::Display; pub use traits::*; pub use write::Batch; -use futures01::{Async, Stream}; use serde::{Deserialize, Serialize}; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -633,37 +633,8 @@ impl PartialEq for StoreEvent { } } -/// A `StoreEventStream` produces the `StoreEvents`. Various filters can be applied -/// to it to reduce which and how many events are delivered by the stream. -pub struct StoreEventStream { - source: S, -} - /// A boxed `StoreEventStream` -pub type StoreEventStreamBox = - StoreEventStream, Error = ()> + Send>>; - -impl Stream for StoreEventStream -where - S: Stream, Error = ()> + Send, -{ - type Item = Arc; - type Error = (); - - fn poll(&mut self) -> Result>, Self::Error> { - self.source.poll() - } -} - -impl StoreEventStream -where - S: Stream, Error = ()> + Send + 'static, -{ - // Create a new `StoreEventStream` from another such stream - pub fn new(source: S) -> Self { - StoreEventStream { source } - } -} +pub type StoreEventStreamBox = ReceiverStream>; /// An entity operation that can be transacted into the store. #[derive(Clone, Debug, PartialEq)] diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index eae3a1b0b4c..f29c66f4784 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -121,7 +121,7 @@ pub trait SubgraphStore: Send + Sync + 'static { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - fn assignment_status( + async fn assignment_status( &self, deployment: &DeploymentLocator, ) -> Result, StoreError>; @@ -129,7 +129,8 @@ pub trait SubgraphStore: Send + Sync + 'static { fn assignments(&self, node: &NodeId) -> Result, StoreError>; /// Returns assignments that are not paused - fn active_assignments(&self, node: &NodeId) -> Result, StoreError>; + async fn active_assignments(&self, node: &NodeId) + -> Result, StoreError>; /// Return `true` if a subgraph `name` exists, regardless of whether the /// subgraph has any deployments attached to it diff --git a/graph/src/components/subgraph/provider.rs b/graph/src/components/subgraph/provider.rs index 5edc22391c8..3e33f6fd5bf 100644 --- a/graph/src/components/subgraph/provider.rs +++ b/graph/src/components/subgraph/provider.rs @@ -5,13 +5,6 @@ use crate::{components::store::DeploymentLocator, prelude::*}; /// Common trait for subgraph providers. #[async_trait] pub trait SubgraphAssignmentProvider: Send + Sync + 'static { - async fn start( - &self, - deployment: DeploymentLocator, - stop_block: Option, - ) -> Result<(), SubgraphAssignmentProviderError>; - async fn stop( - &self, - deployment: DeploymentLocator, - ) -> Result<(), SubgraphAssignmentProviderError>; + async fn start(&self, deployment: DeploymentLocator, stop_block: Option); + async fn stop(&self, deployment: DeploymentLocator); } diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index c8786e9b473..8e5aa8a2f9e 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -1,5 +1,4 @@ use crate::{ - components::store::DeploymentLocator, derive::CacheWeight, prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError}, runtime::gas::{Gas, GasSizeOf}, @@ -83,28 +82,6 @@ impl<'de> de::Deserialize<'de> for NodeId { } } -#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] -#[serde(tag = "type")] -pub enum AssignmentEvent { - Add { - deployment: DeploymentLocator, - node_id: NodeId, - }, - Remove { - deployment: DeploymentLocator, - node_id: NodeId, - }, -} - -impl AssignmentEvent { - pub fn node_id(&self) -> &NodeId { - match self { - AssignmentEvent::Add { node_id, .. } => node_id, - AssignmentEvent::Remove { node_id, .. } => node_id, - } - } -} - /// An entity attribute name is represented as a string. pub type Attribute = String; diff --git a/graph/src/lib.rs b/graph/src/lib.rs index ee288c5729a..05407603f48 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -131,8 +131,8 @@ pub mod prelude { EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow, EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore, - QueryStoreManager, StoreError, StoreEvent, StoreEventStream, StoreEventStreamBox, - SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, + QueryStoreManager, StoreError, StoreEvent, StoreEventStreamBox, SubgraphStore, + UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, }; pub use crate::components::subgraph::{ BlockState, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, RuntimeHostBuilder, @@ -152,7 +152,7 @@ pub mod prelude { Query, QueryError, QueryExecutionError, QueryResult, QueryTarget, QueryVariables, }; pub use crate::data::store::scalar::{BigDecimal, BigInt, BigIntSign}; - pub use crate::data::store::{AssignmentEvent, Attribute, Entity, NodeId, Value, ValueType}; + pub use crate::data::store::{Attribute, Entity, NodeId, Value, ValueType}; pub use crate::data::subgraph::schema::SubgraphDeploymentEntity; pub use crate::data::subgraph::{ CreateSubgraphResult, DataSourceContext, DeploymentHash, DeploymentState, Link, diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 1776e0feba3..8855ef1a954 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -1,8 +1,6 @@ use anyhow::Result; use git_testament::{git_testament, render_testament}; -use graph::futures01::Future as _; -use graph::futures03::compat::Future01CompatExt; use graph::futures03::future::TryFutureExt; use crate::config::Config; @@ -524,9 +522,9 @@ pub async fn run( graph::spawn( subgraph_registrar + .cheap_clone() .start() - .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) - .compat(), + .map_err(|e| panic!("failed to initialize subgraph provider {}", e)), ); // Start admin JSON-RPC server. diff --git a/node/src/manager/commands/listen.rs b/node/src/manager/commands/listen.rs index 69c3ff93cbf..d53dfaae455 100644 --- a/node/src/manager/commands/listen.rs +++ b/node/src/manager/commands/listen.rs @@ -1,8 +1,8 @@ use std::io::Write; use std::sync::Arc; -use graph::futures01::Stream as _; -use graph::futures03::compat::Future01CompatExt; +use graph::futures03::{future, StreamExt}; + use graph::{ components::store::SubscriptionManager as _, prelude::{serde_json, Error}, @@ -12,25 +12,16 @@ use graph_store_postgres::SubscriptionManager; async fn listen(mgr: Arc) -> Result<(), Error> { let events = mgr.subscribe(); println!("press ctrl-c to stop"); - let res = events - .inspect(move |event| { - serde_json::to_writer_pretty(std::io::stdout(), event) + events + .for_each(move |event| { + serde_json::to_writer_pretty(std::io::stdout(), &event) .expect("event can be serialized to JSON"); writeln!(std::io::stdout()).unwrap(); std::io::stdout().flush().unwrap(); + future::ready(()) }) - .collect() - .compat() .await; - match res { - Ok(_) => { - println!("stream finished") - } - Err(()) => { - eprintln!("stream failed") - } - } Ok(()) } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 38048c55ba3..060341fb6e0 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -216,8 +216,7 @@ pub async fn run( let locator = locate(subgraph_store.as_ref(), &hash)?; - SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)) - .await?; + SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)).await; loop { tokio::time::sleep(Duration::from_millis(1000)).await; diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 4ed8bada0a4..a92652b54aa 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -30,6 +30,7 @@ use diesel::{ Connection as _, }; use graph::{ + cheap_clone::CheapClone, components::store::DeploymentLocator, data::{ store::scalar::ToPrimitive, @@ -1886,8 +1887,9 @@ pub fn is_empty(conn: &mut PgConnection) -> Result { /// a query returns either success or anything but a /// `Err(StoreError::DatabaseUnavailable)`. This only works for tables that /// are mirrored through `refresh_tables` +#[derive(Clone, CheapClone)] pub struct Mirror { - pools: Vec, + pools: Arc>, } impl Mirror { @@ -1917,6 +1919,7 @@ impl Mirror { pools.push(pool.clone()); pools }); + let pools = Arc::new(pools); Mirror { pools } } @@ -1925,7 +1928,7 @@ impl Mirror { /// used for non-critical uses like command line tools pub fn primary_only(primary: ConnectionPool) -> Mirror { Mirror { - pools: vec![primary], + pools: Arc::new(vec![primary]), } } @@ -1940,7 +1943,7 @@ impl Mirror { mut f: impl 'a + FnMut(&mut PooledConnection>) -> Result, ) -> Result { - for pool in &self.pools { + for pool in self.pools.as_ref() { let mut conn = match pool.get() { Ok(conn) => conn, Err(StoreError::DatabaseUnavailable) => continue, @@ -1955,6 +1958,27 @@ impl Mirror { Err(StoreError::DatabaseUnavailable) } + /// An async version of `read` that spawns a blocking task to do the + /// actual work. This is useful when you want to call `read` from an + /// async context + pub(crate) async fn read_async(&self, mut f: F) -> Result + where + T: 'static + Send, + F: 'static + + Send + + FnMut(&mut PooledConnection>) -> Result, + { + let this = self.cheap_clone(); + let res = graph::spawn_blocking(async move { this.read(|conn| f(conn)) }).await; + match res { + Ok(v) => v, + Err(e) => Err(internal_error!( + "spawn_blocking in read_async failed: {}", + e + )), + } + } + /// Refresh the contents of mirrored tables from the primary (through /// the fdw mapping that `ForeignServer` establishes) pub(crate) fn refresh_tables( @@ -2050,8 +2074,10 @@ impl Mirror { self.read(|conn| queries::assignments(conn, node)) } - pub fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { - self.read(|conn| queries::active_assignments(conn, node)) + pub async fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + let node = node.clone(); + self.read_async(move |conn| queries::active_assignments(conn, &node)) + .await } pub fn assigned_node(&self, site: &Site) -> Result, StoreError> { @@ -2062,8 +2088,12 @@ impl Mirror { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - pub fn assignment_status(&self, site: &Site) -> Result, StoreError> { - self.read(|conn| queries::assignment_status(conn, site)) + pub async fn assignment_status( + &self, + site: Arc, + ) -> Result, StoreError> { + self.read_async(move |conn| queries::assignment_status(conn, &site)) + .await } pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result, StoreError> { diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index b9da04f30d7..300022d200e 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -221,6 +221,6 @@ impl SubscriptionManagerTrait for SubscriptionManager { self.subscriptions.write().unwrap().insert(id, sender); // Return the subscription ID and entity change stream - StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat())) + ReceiverStream::new(receiver) } } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 2cb2df8a0d6..7f5993735c2 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1455,12 +1455,12 @@ impl SubgraphStoreTrait for SubgraphStore { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - fn assignment_status( + async fn assignment_status( &self, deployment: &DeploymentLocator, ) -> Result, StoreError> { let site = self.find_site(deployment.id.into())?; - self.mirror.assignment_status(site.as_ref()) + self.mirror.assignment_status(site).await } fn assignments(&self, node: &NodeId) -> Result, StoreError> { @@ -1469,9 +1469,13 @@ impl SubgraphStoreTrait for SubgraphStore { .map(|sites| sites.iter().map(|site| site.into()).collect()) } - fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + async fn active_assignments( + &self, + node: &NodeId, + ) -> Result, StoreError> { self.mirror .active_assignments(node) + .await .map(|sites| sites.iter().map(|site| site.into()).collect()) } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c969edb2a78..362cef37f44 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -282,12 +282,11 @@ impl TestContext { pub async fn start_and_sync_to(&self, stop_block: BlockPtr) { // In case the subgraph has been previously started. - self.provider.stop(self.deployment.clone()).await.unwrap(); + self.provider.stop(self.deployment.clone()).await; self.provider .start(self.deployment.clone(), Some(stop_block.number)) - .await - .expect("unable to start subgraph"); + .await; debug!(self.logger, "TEST: syncing to {}", stop_block.number); @@ -303,12 +302,9 @@ impl TestContext { pub async fn start_and_sync_to_error(&self, stop_block: BlockPtr) -> SubgraphError { // In case the subgraph has been previously started. - self.provider.stop(self.deployment.clone()).await.unwrap(); + self.provider.stop(self.deployment.clone()).await; - self.provider - .start(self.deployment.clone(), None) - .await - .expect("unable to start subgraph"); + self.provider.start(self.deployment.clone(), None).await; wait_for_sync( &self.logger, diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 880591b8fa3..cd2c059e2dc 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -82,11 +82,7 @@ async fn data_source_revert() -> anyhow::Result<()> { let stop_block = test_ptr(2); base_ctx.start_and_sync_to(stop_block).await; - base_ctx - .provider - .stop(base_ctx.deployment.clone()) - .await - .unwrap(); + base_ctx.provider.stop(base_ctx.deployment.clone()).await; // Test loading data sources from DB. let stop_block = test_ptr(3);