Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pub struct AppState {
pub linear_client: Option<LinearClient>,
pub lark_verification_token: Option<String>,
pub update_debounce: DebounceMap,
pub debounce_delay_ms: u64,
}
40 changes: 21 additions & 19 deletions src/debounce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use tokio::sync::{oneshot, Mutex};

use crate::models::Issue;

/// How long to wait after the last update before firing the notification.
/// Burst duplicates from Linear arrive within ~100ms of each other; 500ms
/// is comfortably above that while keeping notifications responsive.
pub const DEBOUNCE_MS: u64 = 500;

pub struct PendingUpdate {
/// Whether this debounce batch originated from a create event.
pub is_create: bool,
/// Latest issue state (replaced on every new update for this issue).
pub issue: Issue,
pub url: String,
Expand Down Expand Up @@ -43,29 +40,34 @@ impl DebounceMap {
url: String,
changes: Vec<String>,
dm_email: Option<String>,
is_create: bool,
) -> oneshot::Receiver<()> {
let mut map = self.0.lock().await;

let (merged_changes, merged_dm_email) = if let Some(existing) = map.remove(&issue_id) {
// Cancel the old timer task.
let _ = existing.cancel_tx.send(());
// Accumulate change descriptions; skip exact duplicates.
let mut all = existing.changes;
for c in &changes {
if !all.contains(c) {
all.push(c.clone());
let (merged_is_create, merged_changes, merged_dm_email) =
if let Some(existing) = map.remove(&issue_id) {
// Cancel the old timer task.
let _ = existing.cancel_tx.send(());
// A create followed by updates is still a "create".
let merged_create = existing.is_create || is_create;
// Accumulate change descriptions; skip exact duplicates.
let mut all = existing.changes;
for c in &changes {
if !all.contains(c) {
all.push(c.clone());
}
}
}
// Prefer the latest DM email if present.
(all, dm_email.or(existing.dm_email))
} else {
(changes, dm_email)
};
// Prefer the latest DM email if present.
(merged_create, all, dm_email.or(existing.dm_email))
} else {
(is_create, changes, dm_email)
};

let (cancel_tx, cancel_rx) = oneshot::channel();
map.insert(
issue_id,
PendingUpdate {
is_create: merged_is_create,
issue,
url,
changes: merged_changes,
Expand Down
79 changes: 52 additions & 27 deletions src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{error, info, warn};

use crate::{
config::AppState,
debounce::{PendingUpdate, DEBOUNCE_MS},
debounce::PendingUpdate,
lark::{build_lark_card, CardEvent},
models::{Actor, CommentData, Issue, LinearPayload, UpdatedFrom},
utils::{build_change_fields, verify_signature},
Expand Down Expand Up @@ -63,23 +63,40 @@ pub async fn webhook_handler(
}
};
info!(
"processing Issue create – {} {}",
"queuing debounced Issue create – {} {}",
issue.identifier, issue.title
);

let card_msg = build_lark_card(&CardEvent::IssueCreated {
issue: &issue,
url: &payload.url,
});
let dm_email = issue.assignee.as_ref().and_then(|a| a.email.clone());

let issue_id = issue.id.clone();

// Phase 2: DM assignee on create if assignee is set
if let Some(ref assignee) = issue.assignee {
if let Some(ref email) = assignee.email {
super::try_dm_assignee(&state, &issue, &payload.url, email).await;
let cancel_rx = state
.update_debounce
.upsert(
issue_id.clone(),
issue,
payload.url.clone(),
vec![],
dm_email,
true,
)
.await;

let state2 = Arc::clone(&state);
let delay = state.debounce_delay_ms;
tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(delay)) => {
if let Some(p) = state2.update_debounce.take(&issue_id).await {
send_debounced_notification(&state2, p).await;
}
}
_ = cancel_rx => {}
}
}
});

card_msg
return StatusCode::OK;
}
("Issue", "update") => {
let issue: Issue = match serde_json::from_value(payload.data.clone()) {
Expand Down Expand Up @@ -118,7 +135,6 @@ pub async fn webhook_handler(

let issue_id = issue.id.clone();

// Merge with any pending update for this issue and (re)start the timer.
let cancel_rx = state
.update_debounce
.upsert(
Expand All @@ -127,21 +143,20 @@ pub async fn webhook_handler(
payload.url.clone(),
changes,
dm_email,
false,
)
.await;

// Spawn the timer task; whichever fires first wins.
let state2 = Arc::clone(&state);
let delay = state.debounce_delay_ms;
tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(DEBOUNCE_MS)) => {
_ = tokio::time::sleep(std::time::Duration::from_millis(delay)) => {
if let Some(p) = state2.update_debounce.take(&issue_id).await {
send_update_notification(&state2, p).await;
send_debounced_notification(&state2, p).await;
}
}
_ = cancel_rx => {
// A newer update cancelled this task; the replacement task will fire.
}
_ = cancel_rx => {}
}
});

Expand Down Expand Up @@ -190,20 +205,22 @@ pub async fn webhook_handler(
}

// ---------------------------------------------------------------------------
// Debounced update sender
// Debounced notification sender (handles both create and update)
// ---------------------------------------------------------------------------

async fn send_update_notification(state: &AppState, pending: PendingUpdate) {
async fn send_debounced_notification(state: &AppState, pending: PendingUpdate) {
let PendingUpdate {
is_create,
issue,
url,
changes,
dm_email,
..
} = pending;

let kind = if is_create { "create" } else { "update" };
info!(
"sending debounced update for {} – changes: {}",
"sending debounced {kind} for {} – changes: {}",
issue.identifier,
if changes.is_empty() {
"none".to_string()
Expand All @@ -212,11 +229,19 @@ async fn send_update_notification(state: &AppState, pending: PendingUpdate) {
}
);

let card_msg = build_lark_card(&CardEvent::IssueUpdated {
issue: &issue,
url: &url,
changes,
});
let card_msg = if is_create {
build_lark_card(&CardEvent::IssueCreated {
issue: &issue,
url: &url,
changes,
})
} else {
build_lark_card(&CardEvent::IssueUpdated {
issue: &issue,
url: &url,
changes,
})
};

send_lark_card(state, &card_msg).await;

Expand Down
21 changes: 19 additions & 2 deletions src/lark/cards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum CardEvent<'a> {
IssueCreated {
issue: &'a Issue,
url: &'a str,
changes: Vec<String>,
},
IssueUpdated {
issue: &'a Issue,
Expand All @@ -80,7 +81,11 @@ pub enum CardEvent<'a> {

pub fn build_lark_card(event: &CardEvent) -> LarkMessage {
match event {
CardEvent::IssueCreated { issue, url } => build_issue_created_card(issue, url),
CardEvent::IssueCreated {
issue,
url,
changes,
} => build_issue_created_card(issue, url, changes),
CardEvent::IssueUpdated {
issue,
url,
Expand All @@ -94,7 +99,7 @@ pub fn build_lark_card(event: &CardEvent) -> LarkMessage {
}
}

fn build_issue_created_card(issue: &Issue, url: &str) -> LarkMessage {
fn build_issue_created_card(issue: &Issue, url: &str, changes: &[String]) -> LarkMessage {
let color = priority_color(issue.priority);
let assignee_name = issue
.assignee
Expand Down Expand Up @@ -127,6 +132,18 @@ fn build_issue_created_card(issue: &Issue, url: &str) -> LarkMessage {
}
}

// Change lines (populated when a create is merged with subsequent updates)
if !changes.is_empty() {
let change_text = changes.join("\n");
elements.push(json!({
"tag": "div",
"text": {
"tag": "lark_md",
"content": change_text,
}
}));
}

elements.push(build_fields(
&issue.state.name,
&priority_display(issue.priority),
Expand Down
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ async fn main() {
info!("LARK_VERIFICATION_TOKEN set – event verification enabled");
}

let debounce_delay_ms: u64 = env::var("DEBOUNCE_DELAY_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5000);
info!("debounce delay: {debounce_delay_ms}ms");

let state = Arc::new(AppState {
webhook_secret,
lark_webhook_url,
Expand All @@ -66,6 +72,7 @@ async fn main() {
linear_client,
lark_verification_token,
update_debounce: DebounceMap::new(),
debounce_delay_ms,
});

let app = Router::new()
Expand Down