From b76e2de12c007ee74bde1d379edc3ca71e330e40 Mon Sep 17 00:00:00 2001 From: Saurav Suman Date: Thu, 12 Feb 2026 23:02:35 +0530 Subject: [PATCH] feat: added ConfigChanged webhook event for config mutation notifications --- .../src/api/context/handlers.rs | 157 ++++++++++--- .../src/api/default_config/handlers.rs | 63 +++++- .../src/api/dimension/handlers.rs | 72 +++++- crates/context_aware_config/src/helpers.rs | 2 + .../src/api/experiments/cac_api.rs | 43 ++-- .../src/api/experiments/handlers.rs | 209 +++++------------- .../src/api/experiments/helpers.rs | 64 +----- crates/service_utils/src/helpers.rs | 17 +- crates/service_utils/src/service/types.rs | 2 +- .../src/database/models/others.rs | 1 + 10 files changed, 346 insertions(+), 284 deletions(-) diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index 0776c6cf4..1db8c984f 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -13,7 +13,7 @@ use diesel::{ }; use serde_json::{Map, Value}; use service_utils::{ - helpers::{fetch_dimensions_info_map, parse_config_tags}, + helpers::{execute_webhook_call, fetch_dimensions_info_map, parse_config_tags}, service::types::{ AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext, }, @@ -36,7 +36,7 @@ use superposition_types::{ QueryMap, }, database::{ - models::{ChangeReason, Description, cac::Context}, + models::{ChangeReason, Description, cac::Context, others::WebhookEvent}, schema::contexts::{self, id}, }, logic::evaluate_local_cohorts_skip_unresolved, @@ -135,7 +135,25 @@ async fn create_handler( Ok((put_response, version_id)) })?; - let mut http_resp = HttpResponse::Ok(); + let DbConnection(mut conn) = db_conn; + let webhook_status = execute_webhook_call( + &put_response, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), @@ -143,11 +161,8 @@ async fn create_handler( )); #[cfg(feature = "high-performance-mode")] - { - let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) - .await?; - } + put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) + .await?; Ok(http_resp.json(put_response)) } @@ -197,7 +212,26 @@ async fn update_handler( )?; Ok((override_resp, version_id)) })?; - let mut http_resp = HttpResponse::Ok(); + + let DbConnection(mut conn) = db_conn; + let webhook_status = execute_webhook_call( + &override_resp, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), @@ -205,11 +239,8 @@ async fn update_handler( )); #[cfg(feature = "high-performance-mode")] - { - let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) - .await?; - } + put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) + .await?; Ok(http_resp.json(override_resp)) } @@ -282,7 +313,26 @@ async fn move_handler( Ok((move_response, version_id)) })?; - let mut http_resp = HttpResponse::Ok(); + + let DbConnection(mut conn) = db_conn; + let webhook_status = execute_webhook_call( + &move_response, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), @@ -290,11 +340,8 @@ async fn move_handler( )); #[cfg(feature = "high-performance-mode")] - { - let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) - .await?; - } + put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) + .await?; Ok(http_resp.json(move_response)) } @@ -524,14 +571,32 @@ async fn delete_handler( Ok(version_id) })?; + let DbConnection(mut conn) = db_conn; + let webhook_change_reason = format!("Deleted context by {}", user.username); + let webhook_status = execute_webhook_call( + &webhook_change_reason, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] - { - let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) - .await?; - } + put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) + .await?; + + let mut http_resp = if webhook_status { + HttpResponse::NoContent() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; - Ok(HttpResponse::NoContent() + Ok(http_resp .insert_header(( AppHeader::XConfigVersion.to_string().as_str(), version_id.to_string().as_str(), @@ -728,13 +793,29 @@ async fn bulk_operations_handler( )?; Ok((response, version_id)) })?; - let mut resp_builder = HttpResponse::Ok(); + let webhook_status = execute_webhook_call( + &response, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + + let mut resp_builder = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; resp_builder.insert_header(( AppHeader::XConfigVersion.to_string(), version_id.to_string(), )); - // Commit the transaction #[cfg(feature = "high-performance-mode")] put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) .await?; @@ -824,6 +905,17 @@ async fn weight_recompute_handler( let version_id = add_config_version(&state, tags, config_version_desc, transaction_conn, &workspace_context.schema_name)?; Ok(version_id) })?; + + let webhook_status = execute_webhook_call( + &response, + &Some(config_version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis( config_version_id, @@ -833,7 +925,14 @@ async fn weight_recompute_handler( ) .await?; - let mut http_resp = HttpResponse::Ok(); + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), config_version_id.to_string(), diff --git a/crates/context_aware_config/src/api/default_config/handlers.rs b/crates/context_aware_config/src/api/default_config/handlers.rs index 169e19495..c77e56609 100644 --- a/crates/context_aware_config/src/api/default_config/handlers.rs +++ b/crates/context_aware_config/src/api/default_config/handlers.rs @@ -10,7 +10,7 @@ use diesel::{ use jsonschema::ValidationError; use serde_json::Value; use service_utils::{ - helpers::parse_config_tags, + helpers::{execute_webhook_call, parse_config_tags}, service::types::{ AppHeader, AppState, CustomHeaders, DbConnection, EncryptionKey, SchemaName, WorkspaceContext, @@ -35,6 +35,7 @@ use superposition_types::{ models::{ Description, cac::{self as models, Context, DefaultConfig, FunctionType}, + others::WebhookEvent, }, schema::{self, contexts::dsl::contexts, default_configs::dsl}, }, @@ -164,10 +165,28 @@ async fn create_handler( Ok(version_id) })?; + let webhook_status = execute_webhook_call( + &default_config, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) .await?; - let mut http_resp = HttpResponse::Ok(); + + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), @@ -296,11 +315,28 @@ async fn update_handler( Ok((val, version_id)) })?; + let webhook_status = execute_webhook_call( + &db_row, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) .await?; - let mut http_resp = HttpResponse::Ok(); + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), version_id.to_string(), @@ -518,6 +554,19 @@ async fn delete_handler( }); if resp.is_ok() { + let webhook_payload = serde_json::json!({ + "key": key, + }); // passing the deleted key here + let webhook_status = execute_webhook_call( + &webhook_payload, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis( version_id, @@ -526,6 +575,14 @@ async fn delete_handler( &mut conn, ) .await?; + + if !webhook_status { + return Ok(HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + .finish()); + } } resp } else { diff --git a/crates/context_aware_config/src/api/dimension/handlers.rs b/crates/context_aware_config/src/api/dimension/handlers.rs index be972df62..4ad716cb3 100644 --- a/crates/context_aware_config/src/api/dimension/handlers.rs +++ b/crates/context_aware_config/src/api/dimension/handlers.rs @@ -6,7 +6,7 @@ use chrono::Utc; use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; use serde_json::Value; use service_utils::{ - helpers::parse_config_tags, + helpers::{execute_webhook_call, parse_config_tags}, service::types::{ AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext, }, @@ -24,6 +24,7 @@ use superposition_types::{ models::{ Description, cac::{DependencyGraph, Dimension, DimensionType}, + others::WebhookEvent, }, schema::dimensions::{self, dsl::*}, }, @@ -199,6 +200,7 @@ async fn create_handler( match insert_resp { Ok(inserted_dimension) => { let is_mandatory = workspace_context + .clone() .settings .mandatory_dimensions .unwrap_or_default() @@ -234,11 +236,28 @@ async fn create_handler( } })?; + let webhook_status = execute_webhook_call( + &inserted_dimension, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) .await?; - let mut http_resp = HttpResponse::Created(); + let mut http_resp = if webhook_status { + HttpResponse::Created() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), version_id.to_string(), @@ -362,6 +381,8 @@ async fn update_handler( )?; } + let update_change_reason = update_req.change_reason.clone(); + let (result, is_mandatory, version_id) = conn .transaction::<_, superposition::AppError, _>(|transaction_conn| { if let Some(position_val) = update_req.position { @@ -430,6 +451,7 @@ async fn update_handler( .map_err(|err| db_error!(err))?; let is_mandatory = workspace_context + .clone() .settings .mandatory_dimensions .unwrap_or_default() @@ -438,7 +460,7 @@ async fn update_handler( let version_id = add_config_version( &state, tags, - dimension_data.change_reason.into(), + update_change_reason.into(), transaction_conn, &workspace_context.schema_name, )?; @@ -446,11 +468,28 @@ async fn update_handler( Ok((result, is_mandatory, version_id)) })?; + let webhook_status = execute_webhook_call( + &result, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + #[cfg(feature = "high-performance-mode")] put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn) .await?; - let mut http_resp = HttpResponse::Ok(); + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; http_resp.insert_header(( AppHeader::XConfigVersion.to_string(), version_id.to_string(), @@ -542,7 +581,7 @@ async fn delete_handler( )?; if context_ids.is_empty() { - let (resp, _version_id) = conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let (resp, version_id) = conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { use dimensions::dsl; if !dimension_data.dependency_graph.is_empty() { @@ -621,12 +660,31 @@ async fn delete_handler( #[cfg(feature = "high-performance-mode")] put_config_in_redis( - _version_id, - state, + version_id, + state.clone(), &workspace_context.schema_name, &mut conn, ) .await?; + + let webhook_status = execute_webhook_call( + &dimension_data, + &Some(version_id.to_string()), + &workspace_context, + WebhookEvent::ConfigChanged, + &state, + &mut conn, + ) + .await; + + if !webhook_status { + return Ok(HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + .finish()); + } + Ok(resp) } else { Err(bad_argument!( diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index d6c3aadc4..e4f898e57 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -208,6 +208,7 @@ pub fn add_config_version( let config = generate_cac(db_conn, schema_name)?; let json_config = json!(config); let config_hash = blake3::hash(json_config.to_string().as_bytes()).to_string(); + let config_version = ConfigVersion { id: version_id, config: json_config, @@ -221,6 +222,7 @@ pub fn add_config_version( .returning(ConfigVersion::as_returning()) .schema_name(schema_name) .execute(db_conn)?; + Ok(version_id) } diff --git a/crates/experimentation_platform/src/api/experiments/cac_api.rs b/crates/experimentation_platform/src/api/experiments/cac_api.rs index 3d26e84dd..c993e1816 100644 --- a/crates/experimentation_platform/src/api/experiments/cac_api.rs +++ b/crates/experimentation_platform/src/api/experiments/cac_api.rs @@ -5,9 +5,7 @@ use actix_web::web::Data; use reqwest::{Response, StatusCode}; use serde::de::DeserializeOwned; use serde_json::{Map, Value}; -use service_utils::service::types::{ - AppState, OrganisationId, WorkspaceContext, WorkspaceId, -}; +use service_utils::service::types::{AppState, WorkspaceContext}; use superposition_macros::{bad_argument, response_error, unexpected_error}; use superposition_types::{ Cac, Condition, User, @@ -21,21 +19,22 @@ use superposition_types::{ }; pub fn construct_header_map( - workspace_id: &WorkspaceId, - organisation_id: &OrganisationId, + workspace_context: &WorkspaceContext, other_headers: Vec<(&str, String)>, ) -> superposition::Result { let mut headers = HeaderMap::new(); - let workspace_val = HeaderValue::from_str(workspace_id).map_err(|err| { - log::error!("failed to set header: {}", err); - unexpected_error!("Something went wrong") - })?; + let workspace_val = + HeaderValue::from_str(&workspace_context.workspace_id).map_err(|err| { + log::error!("failed to set header: {}", err); + unexpected_error!("Something went wrong") + })?; headers.insert(HeaderName::from_static("x-tenant"), workspace_val); - let org_val = HeaderValue::from_str(organisation_id).map_err(|err| { - log::error!("failed to set header: {}", err); - unexpected_error!("Something went wrong") - })?; + let org_val = + HeaderValue::from_str(&workspace_context.organisation_id).map_err(|err| { + log::error!("failed to set header: {}", err); + unexpected_error!("Something went wrong") + })?; headers.insert(HeaderName::from_static("x-org-id"), org_val); for (header, value) in other_headers { @@ -191,11 +190,7 @@ pub async fn get_resolved_config( let extra_headers = vec![("x-user", user_str)]; - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(workspace_context, extra_headers)?; let response = http_client .get(&url) .headers(headers_map.into()) @@ -227,11 +222,7 @@ pub async fn get_context_override( let extra_headers = vec![("x-user", user_str)]; - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(workspace_context, extra_headers)?; let response = http_client .get(&url) .headers(headers_map.into()) @@ -271,11 +262,7 @@ pub async fn validate_context( let extra_headers = vec![("x-user", user_str)]; - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(workspace_context, extra_headers)?; let payload = Cac::::try_from((**condition).clone()).map_err(|err| { log::error!("failed to decode condition with error : {}", err); bad_argument!(err) diff --git a/crates/experimentation_platform/src/api/experiments/handlers.rs b/crates/experimentation_platform/src/api/experiments/handlers.rs index 2e0b864e1..d5d9db5a9 100644 --- a/crates/experimentation_platform/src/api/experiments/handlers.rs +++ b/crates/experimentation_platform/src/api/experiments/handlers.rs @@ -80,8 +80,8 @@ use crate::api::{ }, experiments::{ helpers::{ - fetch_and_validate_change_reason_with_function, fetch_webhook_by_event, - validate_control_overrides, validate_delete_experiment_variants, + fetch_and_validate_change_reason_with_function, validate_control_overrides, + validate_delete_experiment_variants, }, types::StartedByChangeSet, }, @@ -284,11 +284,7 @@ async fn create_handler( .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(&workspace_context, extra_headers)?; // Step 1: Perform the HTTP request and handle errors let response = http_client @@ -378,27 +374,15 @@ async fn create_handler( })?; let response = ExperimentResponse::from(inserted_experiment); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentCreated, + let webhook_status = execute_webhook_call( + &response, + &config_version_id, &workspace_context, + WebhookEvent::ExperimentCreated, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &response, - &config_version_id, - &workspace_context, - WebhookEvent::ExperimentCreated, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -445,27 +429,15 @@ async fn conclude_handler( let experiment_response = ExperimentResponse::from(response); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentConcluded, + let webhook_status = execute_webhook_call( + &experiment_response, + &config_version_id, &workspace_context, + WebhookEvent::ExperimentConcluded, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &experiment_response, - &config_version_id, - &workspace_context, - WebhookEvent::ExperimentConcluded, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -642,11 +614,7 @@ pub async fn conclude( .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(workspace_context, extra_headers)?; let response = http_client .put(&url) @@ -727,27 +695,15 @@ async fn discard_handler( let experiment_response = ExperimentResponse::from(response); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentDiscarded, + let webhook_status = execute_webhook_call( + &experiment_response, + &config_version_id, &workspace_context, + WebhookEvent::ExperimentDiscarded, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &experiment_response, - &config_version_id, - &workspace_context, - WebhookEvent::ExperimentDiscarded, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -817,11 +773,7 @@ pub async fn discard( .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(workspace_context, extra_headers)?; let response = http_client .put(&url) @@ -1339,22 +1291,15 @@ async fn ramp_handler( } else { WebhookEvent::ExperimentInprogress }; - let webhook_status = if let Ok(webhook) = - fetch_webhook_by_event(&state, &user, &webhook_event, &workspace_context).await - { - execute_webhook_call( - &webhook, - &experiment_response, - &config_version_id, - &workspace_context, - webhook_event, - &state, - &mut conn, - ) - .await - } else { - true - }; + let webhook_status = execute_webhook_call( + &experiment_response, + &config_version_id, + &workspace_context, + webhook_event, + &state, + &mut conn, + ) + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -1596,11 +1541,7 @@ async fn update_handler( .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - extra_headers, - )?; + let headers_map = construct_header_map(&workspace_context, extra_headers)?; let response = http_client .put(&url) @@ -1687,27 +1628,15 @@ async fn update_handler( let experiment_response = ExperimentResponse::from(updated_experiment); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentUpdated, + let webhook_status = execute_webhook_call( + &experiment_response, + &config_version_id, &workspace_context, + WebhookEvent::ExperimentUpdated, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &experiment_response, - &config_version_id, - &workspace_context, - WebhookEvent::ExperimentUpdated, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -1750,27 +1679,15 @@ async fn pause_handler( let experiment_response = ExperimentResponse::from(response); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentPaused, + let webhook_status = execute_webhook_call( + &experiment_response, + &None, &workspace_context, + WebhookEvent::ExperimentPaused, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &experiment_response, - &None, - &workspace_context, - WebhookEvent::ExperimentPaused, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() @@ -1849,27 +1766,15 @@ async fn resume_handler( let experiment_response = ExperimentResponse::from(response); - let webhook_status = if let Ok(webhook) = fetch_webhook_by_event( - &state, - &user, - &WebhookEvent::ExperimentInprogress, + let webhook_status = execute_webhook_call( + &experiment_response, + &None, &workspace_context, + WebhookEvent::ExperimentInprogress, + &state, + &mut conn, ) - .await - { - execute_webhook_call( - &webhook, - &experiment_response, - &None, - &workspace_context, - WebhookEvent::ExperimentInprogress, - &state, - &mut conn, - ) - .await - } else { - true - }; + .await; let mut http_resp = if webhook_status { HttpResponse::Ok() diff --git a/crates/experimentation_platform/src/api/experiments/helpers.rs b/crates/experimentation_platform/src/api/experiments/helpers.rs index d31d7df8b..0f3ff8a24 100644 --- a/crates/experimentation_platform/src/api/experiments/helpers.rs +++ b/crates/experimentation_platform/src/api/experiments/helpers.rs @@ -32,7 +32,6 @@ use superposition_types::{ experimentation::{ Experiment, ExperimentStatusType, GroupType, Variant, VariantType, }, - others::{Webhook, WebhookEvent}, }, schema::experiments::dsl as experiments, }, @@ -405,11 +404,7 @@ pub async fn fetch_cac_config( state.cac_host, query_params.to_query_param(), ); - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - vec![], - )?; + let headers_map = construct_header_map(workspace_context, vec![])?; let response = http_client .get(&url) @@ -440,57 +435,6 @@ pub async fn fetch_cac_config( } } -pub async fn fetch_webhook_by_event( - state: &Data, - user: &User, - event: &WebhookEvent, - workspace_context: &WorkspaceContext, -) -> superposition::Result { - let http_client = reqwest::Client::new(); - let url = format!("{}/webhook/event/{event}", state.cac_host); - let user_str = serde_json::to_string(user).map_err(|err| { - log::error!("Something went wrong, failed to stringify user data {err}"); - unexpected_error!( - "Something went wrong, failed to stringify user data {}", - err - ) - })?; - - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - vec![("x-user", user_str)], - )?; - - let response = http_client - .get(&url) - .headers(headers_map.into()) - .header( - header::AUTHORIZATION, - format!("Internal {}", state.superposition_token), - ) - .send() - .await; - - match response { - Ok(res) => { - if res.status() == 404 { - log::info!("No Webhook found for event: {}", event); - return Ok(Webhook::default()); - } - let webhook = res.json::().await.map_err(|err| { - log::error!("failed to parse Webhook response with error: {}", err); - unexpected_error!("Failed to parse Webhook.") - })?; - Ok(webhook) - } - Err(error) => { - log::error!("Failed to fetch Webhook with error: {:?}", error); - Err(unexpected_error!(error)) - } - } -} - pub fn handle_experiment_group_membership( experiment: &Experiment, new_group_id: &Option, @@ -818,11 +762,7 @@ pub async fn fetch_and_validate_change_reason_with_function( change_reason: change_reason.clone(), }; - let headers_map = construct_header_map( - &workspace_context.workspace_id, - &workspace_context.organisation_id, - vec![], - )?; + let headers_map = construct_header_map(workspace_context, vec![])?; let response = http_client .post(&url) diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index 74de9af24..fed8bb3b0 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -8,7 +8,9 @@ use std::{ use actix_web::{Error, error::ErrorInternalServerError, web::Data}; use anyhow::anyhow; use chrono::Utc; -use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use diesel::{ + ExpressionMethods, PgArrayExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper, +}; use log::info; use once_cell::sync::Lazy; @@ -32,6 +34,7 @@ use superposition_types::{ dimensions::{self, dimension}, secrets::dsl as secrets_dsl, variables::dsl as variables_dsl, + webhooks::{self, dsl::webhooks as webhook_dsl}, }, superposition_schema::superposition::workspaces, }, @@ -315,7 +318,6 @@ fn fetch_secrets( } pub async fn execute_webhook_call( - webhook: &Webhook, payload: &T, config_version_opt: &Option, workspace_context: &WorkspaceContext, @@ -326,6 +328,17 @@ pub async fn execute_webhook_call( where T: Serialize, { + let webhook = match webhook_dsl + .filter(webhooks::events.contains(vec![event])) + .schema_name(&workspace_context.schema_name) + .first::(conn) + { + Ok(webhook) => webhook, + Err(_) => { + log::info!("No webhook found for this event, skipping event: {}", event); + return true; + } + }; if !webhook.enabled { log::info!("Webhook is disabled, skipping call"); return true; diff --git a/crates/service_utils/src/service/types.rs b/crates/service_utils/src/service/types.rs index 2261d3489..a684e0753 100644 --- a/crates/service_utils/src/service/types.rs +++ b/crates/service_utils/src/service/types.rs @@ -149,7 +149,7 @@ impl FromRequest for OrganisationId { } } -#[derive(Deref, DerefMut, Clone, Debug)] +#[derive(Deref, DerefMut, Clone, Debug, Serialize, Deserialize)] pub struct SchemaName(pub String); impl Default for SchemaName { diff --git a/crates/superposition_types/src/database/models/others.rs b/crates/superposition_types/src/database/models/others.rs index b72fe8a07..df412789e 100644 --- a/crates/superposition_types/src/database/models/others.rs +++ b/crates/superposition_types/src/database/models/others.rs @@ -131,6 +131,7 @@ pub enum WebhookEvent { ExperimentConcluded, ExperimentDiscarded, ExperimentPaused, + ConfigChanged, } impl TryFrom for WebhookEvent {