Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 128 additions & 29 deletions crates/context_aware_config/src/api/context/handlers.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let version_id = add_config_version(
&state,
tags,
req_change_reason.into(),
transaction_conn,
&workspace_context.schema_name,
)?;
Ok((put_response, version_id))
})?;

Can you add the trigger for the webhook in add_config_version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No , add_config_version is synchronous and the trigger is async , we cannot call async function inside transaction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you can return future which can be awaited on outside the synchronous block of the transaction

Anyways, on failure of webhook we dont even need to revert the queries so response awaiting outside transaction is safe as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight problem with this approach - I would be using execute_webhook_call which needs &mut DBConnection, but the transaction still holds the mutable borrow when we create the future. The future can't capture db_conn — the borrow checker won't allow it.
You can't return a future that captures &mut DBConnection from inside a transaction because the connection's lifetime is scoped to the transaction closure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explanation:
Modified add_config_version:

pub fn add_config_version<'a, T>(
    state: &'a Data<AppState>,
    tags: Option<Vec<String>>,
    description: Description,
    db_conn: &'a mut DBConnection,
    workspace_context: &'a WorkspaceContext,
    payload: &'a T,
) -> superposition::Result<(i64, Pin<Box<dyn Future<Output = bool> + Send + 'a>>)>
where
    T: Serialize + Send + Sync,
{
    use config_versions::dsl::config_versions;
    let version_id = generate_snowflake_id(state)?;
    let config = generate_cac(db_conn, &workspace_context.schema_name)?;
    let json_config = json!(config);
    let config_hash = blake3::hash(json_config.to_string().as_bytes()).to_string();

    let config_version = ConfigVersion {
        id: version_id,
        config: json_config,
        config_hash,
        tags,
        created_at: Utc::now(),
        description,
    };
    diesel::insert_into(config_versions)
        .values(&config_version)
        .returning(ConfigVersion::as_returning())
        .schema_name(&workspace_context.schema_name)
        .execute(db_conn)?;

    let webhook_future = Box::pin(async move {
        execute_webhook_call(
            payload,
            &Some(version_id.to_string()),
            workspace_context,
            WebhookEvent::ConfigChanged,
            state,
            db_conn,
        )
        .await
    });

    Ok((version_id, webhook_future))
}

Sample Usage:

#[authorized]
#[put("")]
async fn create_handler(
    workspace_context: WorkspaceContext,
    state: Data<AppState>,
    custom_headers: CustomHeaders,
    req: Json<PutRequest>,
    mut db_conn: DbConnection,
    user: User,
) -> superposition::Result<HttpResponse> {
    let tags = parse_config_tags(custom_headers.config_tags)?;
    let description = match req.description.clone() {
        Some(val) => val,
        None => {
            // TODO: get rid of `query_description` function altogether
            let resp = query_description(
                Value::Object(req.context.clone().into_inner().into()),
                &mut db_conn,
                &workspace_context.schema_name,
            );
            match resp {
                Err(AppError::DbError(diesel::result::Error::NotFound)) => {
                    return Err(bad_argument!(
                        "Description is required when context does not exist"
                    ));
                }
                Ok(desc) => desc,
                Err(e) => return Err(e),
            }
        }
    };
    let req_change_reason = req.change_reason.clone();

    validate_change_reason(
        &workspace_context,
        &req_change_reason,
        &mut db_conn,
        &state.master_encryption_key,
    )?;

    let (put_response, version_id, webhook_future) = db_conn
        .transaction::<_, superposition::AppError, _>(|transaction_conn| {
            let put_response = operations::upsert(
                req.into_inner(),
                description,
                transaction_conn,
                true,
                &user,
                &workspace_context,
                false,
                &state.master_encryption_key,
            )
            .map_err(|err: superposition::AppError| {
                log::error!("context put failed with error: {:?}", err);
                err
            })?;

            let (version_id, future_webhook) = add_config_version(
                &state,
                tags,
                req_change_reason.into(),
                transaction_conn,
                &workspace_context,
                &put_response,
            )?;
            Ok((put_response, version_id, future_webhook))
        })?;

    let webhook_status = webhook_future.await;

    let mut http_resp = if webhook_status {
        HttpResponse::Ok()
    } else {
        HttpResponse::build(
            actix_web::http::StatusCode::from_u16(512)
                .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
        )
    };

    http_resp.insert_header((
        AppHeader::XConfigVersion.to_string(),
        version_id.to_string(),
    ));

    #[cfg(feature = "high-performance-mode")]
    put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
        .await?;

    Ok(http_resp.json(put_response))
}

You will get an error saying :

lifetime may not live long enough
returning this value requires that `'1` must outlive `'2`

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the future borrows transaction_conn, which only lives for the duration of the transaction closure — but we're trying to return the future out of the closure and .await it after the transaction completes.
We can't borrow transaction_conn in a future that outlives the transaction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need db conn with async stuff ?
The only async part is API call, which does not even require db conn

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diesel::{
};
use serde_json::{Map, Value};
use service_utils::{
helpers::{fetch_dimensions_info_map, parse_config_tags},
helpers::{execute_webhook_call, fetch_dimensions_info_map, parse_config_tags},
service::types::{
AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext,
},
Expand All @@ -36,7 +36,7 @@ use superposition_types::{
QueryMap,
},
database::{
models::{ChangeReason, Description, cac::Context},
models::{ChangeReason, Description, cac::Context, others::WebhookEvent},
schema::contexts::{self, id},
},
logic::evaluate_local_cohorts_skip_unresolved,
Expand Down Expand Up @@ -135,19 +135,34 @@ async fn create_handler(
Ok((put_response, version_id))
})?;

let mut http_resp = HttpResponse::Ok();
let DbConnection(mut conn) = db_conn;
let webhook_status = execute_webhook_call(
&put_response,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};

http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
version_id.to_string(),
));

#[cfg(feature = "high-performance-mode")]
{
let DbConnection(mut conn) = db_conn;
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
}
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;

Ok(http_resp.json(put_response))
}
Expand Down Expand Up @@ -197,19 +212,35 @@ async fn update_handler(
)?;
Ok((override_resp, version_id))
})?;
let mut http_resp = HttpResponse::Ok();

let DbConnection(mut conn) = db_conn;
let webhook_status = execute_webhook_call(
&override_resp,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};

http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
version_id.to_string(),
));

#[cfg(feature = "high-performance-mode")]
{
let DbConnection(mut conn) = db_conn;
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
}
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;

Ok(http_resp.json(override_resp))
}
Expand Down Expand Up @@ -282,19 +313,35 @@ async fn move_handler(

Ok((move_response, version_id))
})?;
let mut http_resp = HttpResponse::Ok();

let DbConnection(mut conn) = db_conn;
let webhook_status = execute_webhook_call(
&move_response,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};

http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
version_id.to_string(),
));

#[cfg(feature = "high-performance-mode")]
{
let DbConnection(mut conn) = db_conn;
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
}
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;

Ok(http_resp.json(move_response))
}
Expand Down Expand Up @@ -524,14 +571,32 @@ async fn delete_handler(
Ok(version_id)
})?;

let DbConnection(mut conn) = db_conn;
let webhook_change_reason = format!("Deleted context by {}", user.username);
let webhook_status = execute_webhook_call(
&webhook_change_reason,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

#[cfg(feature = "high-performance-mode")]
{
let DbConnection(mut conn) = db_conn;
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
}
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;

let mut http_resp = if webhook_status {
HttpResponse::NoContent()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};

Ok(HttpResponse::NoContent()
Ok(http_resp
.insert_header((
AppHeader::XConfigVersion.to_string().as_str(),
version_id.to_string().as_str(),
Expand Down Expand Up @@ -728,13 +793,29 @@ async fn bulk_operations_handler(
)?;
Ok((response, version_id))
})?;
let mut resp_builder = HttpResponse::Ok();
let webhook_status = execute_webhook_call(
&response,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

let mut resp_builder = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};
resp_builder.insert_header((
AppHeader::XConfigVersion.to_string(),
version_id.to_string(),
));

// Commit the transaction
#[cfg(feature = "high-performance-mode")]
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
Expand Down Expand Up @@ -824,6 +905,17 @@ async fn weight_recompute_handler(
let version_id = add_config_version(&state, tags, config_version_desc, transaction_conn, &workspace_context.schema_name)?;
Ok(version_id)
})?;

let webhook_status = execute_webhook_call(
&response,
&Some(config_version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

#[cfg(feature = "high-performance-mode")]
put_config_in_redis(
config_version_id,
Expand All @@ -833,7 +925,14 @@ async fn weight_recompute_handler(
)
.await?;

let mut http_resp = HttpResponse::Ok();
let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};
http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
config_version_id.to_string(),
Expand Down
63 changes: 60 additions & 3 deletions crates/context_aware_config/src/api/default_config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use diesel::{
use jsonschema::ValidationError;
use serde_json::Value;
use service_utils::{
helpers::parse_config_tags,
helpers::{execute_webhook_call, parse_config_tags},
service::types::{
AppHeader, AppState, CustomHeaders, DbConnection, EncryptionKey, SchemaName,
WorkspaceContext,
Expand All @@ -35,6 +35,7 @@ use superposition_types::{
models::{
Description,
cac::{self as models, Context, DefaultConfig, FunctionType},
others::WebhookEvent,
},
schema::{self, contexts::dsl::contexts, default_configs::dsl},
},
Expand Down Expand Up @@ -164,10 +165,28 @@ async fn create_handler(
Ok(version_id)
})?;

let webhook_status = execute_webhook_call(
&default_config,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

#[cfg(feature = "high-performance-mode")]
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;
let mut http_resp = HttpResponse::Ok();

let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};

http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
Expand Down Expand Up @@ -296,11 +315,28 @@ async fn update_handler(
Ok((val, version_id))
})?;

let webhook_status = execute_webhook_call(
&db_row,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

#[cfg(feature = "high-performance-mode")]
put_config_in_redis(version_id, state, &workspace_context.schema_name, &mut conn)
.await?;

let mut http_resp = HttpResponse::Ok();
let mut http_resp = if webhook_status {
HttpResponse::Ok()
} else {
HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
};
http_resp.insert_header((
AppHeader::XConfigVersion.to_string(),
version_id.to_string(),
Expand Down Expand Up @@ -518,6 +554,19 @@ async fn delete_handler(
});

if resp.is_ok() {
let webhook_payload = serde_json::json!({
"key": key,
}); // passing the deleted key here
let webhook_status = execute_webhook_call(
&webhook_payload,
&Some(version_id.to_string()),
&workspace_context,
WebhookEvent::ConfigChanged,
&state,
&mut conn,
)
.await;

#[cfg(feature = "high-performance-mode")]
put_config_in_redis(
version_id,
Expand All @@ -526,6 +575,14 @@ async fn delete_handler(
&mut conn,
)
.await?;

if !webhook_status {
return Ok(HttpResponse::build(
actix_web::http::StatusCode::from_u16(512)
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR),
)
.finish());
}
}
resp
} else {
Expand Down
Loading
Loading