Skip to content

Commit 27ea61a

Browse files
committed
fix(hubspot): cursor-based seed for list_membership polling
1 parent 1181ecc commit 27ea61a

1 file changed

Lines changed: 133 additions & 98 deletions

File tree

apps/sim/lib/webhooks/polling/hubspot.ts

Lines changed: 133 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,19 @@ interface HubSpotWebhookConfig {
4444
maxRecordsPerPoll?: number
4545
lastSeenTimestampMs?: string
4646
lastSeenObjectId?: string
47-
/** List-membership cursor — the ISO joined-at of the last membership we emitted. */
48-
lastSeenMembershipTimestamp?: string
47+
/**
48+
* List-membership cursor — the `after` value to pass to the next
49+
* `/lists/{id}/memberships/join-order` request. The HubSpot endpoint walks
50+
* ASC by default (oldest first); we use cursor-based resume because
51+
* `before`-mode (DESC) has ambiguous bootstrap semantics.
52+
*/
53+
lastSeenMembershipCursor?: string
54+
/**
55+
* True once we've walked to the end of the list once. While this is false we
56+
* are still in the seed pass — we paginate forward without emitting so the
57+
* workflow doesn't see a flood of historical members on activation.
58+
*/
59+
membershipSeedComplete?: boolean
4960
/** Snapshot of the watched property's last-seen value per record (property_changed event). */
5061
propertySnapshot?: {
5162
property: string
@@ -334,60 +345,69 @@ async function pollListMembership(
334345
throw new Error(`HubSpot list_membership trigger ${webhookId} is missing listId`)
335346
}
336347
const nowMs = Date.now()
337-
const watermark = config.lastSeenMembershipTimestamp
338-
339-
// First poll: capture the current head of the list and emit nothing.
340-
if (!watermark) {
341-
const head = await fetchListMembershipHead(listId, accessToken, requestId, logger)
342-
await updateWebhookProviderConfig(
343-
webhookId,
344-
{
345-
lastSeenMembershipTimestamp: head ?? new Date(nowMs).toISOString(),
346-
lastCheckedTimestamp: new Date(nowMs).toISOString(),
347-
},
348-
logger
349-
)
350-
await markWebhookSuccess(webhookId, logger)
351-
logger.info(`[${requestId}] Seeded HubSpot list_membership ${webhookId} watermark to ${head}`)
352-
return 'success'
353-
}
354-
348+
const seedComplete = config.membershipSeedComplete === true
355349
const maxRecords = Math.min(
356350
Math.max(config.maxRecordsPerPoll ?? DEFAULT_MAX_RECORDS, 1),
357351
MAX_MAX_RECORDS
358352
)
359-
const memberships = await fetchListMembershipsSince(
353+
354+
// The HubSpot endpoint walks ASC by default. We resume from a stored `after` cursor —
355+
// empty cursor means "from the beginning of the list". During the seed pass we paginate
356+
// forward without emitting; once we reach the end (no `paging.next.after`) we mark the
357+
// seed complete and re-fetch from the cursor-to-last-page on each normal poll. New
358+
// members appended to the list show up on subsequent fetches; the idempotency layer
359+
// dedups the records we've already seen on the boundary page.
360+
const result = await fetchListMembershipPages({
360361
listId,
361-
watermark,
362-
maxRecords,
363362
accessToken,
363+
initialAfter: config.lastSeenMembershipCursor?.trim() || undefined,
364+
pageLimit: seedComplete ? maxRecords : MAX_PAGES_PER_POLL * HUBSPOT_PAGE_LIMIT,
364365
requestId,
365-
logger
366-
)
366+
logger,
367+
})
368+
369+
if (!seedComplete) {
370+
// Seed phase: don't emit. Just save the cursor and (when reached) flip the flag.
371+
const update: Record<string, unknown> = {
372+
lastCheckedTimestamp: new Date(nowMs).toISOString(),
373+
lastSeenMembershipCursor: result.resumeCursor ?? '',
374+
}
375+
if (result.reachedEnd) {
376+
update.membershipSeedComplete = true
377+
logger.info(
378+
`[${requestId}] HubSpot list_membership ${webhookId} seed complete (list=${listId})`
379+
)
380+
} else {
381+
logger.info(
382+
`[${requestId}] HubSpot list_membership ${webhookId} seed in progress (list=${listId}, scanned ${result.scanned})`
383+
)
384+
}
385+
await updateWebhookProviderConfig(webhookId, update, logger)
386+
await markWebhookSuccess(webhookId, logger)
387+
return 'success'
388+
}
367389

368-
if (memberships.length === 0) {
390+
if (result.records.length === 0) {
369391
await updateWebhookProviderConfig(
370392
webhookId,
371-
{ lastCheckedTimestamp: new Date(nowMs).toISOString() },
393+
{
394+
lastCheckedTimestamp: new Date(nowMs).toISOString(),
395+
lastSeenMembershipCursor: result.resumeCursor ?? '',
396+
},
372397
logger
373398
)
374399
await markWebhookSuccess(webhookId, logger)
375-
logger.info(`[${requestId}] No new HubSpot list_membership for webhook ${webhookId}`)
376400
return 'success'
377401
}
378402

379403
logger.info(
380-
`[${requestId}] Found ${memberships.length} new HubSpot list memberships for webhook ${webhookId}`
404+
`[${requestId}] Found ${result.records.length} HubSpot list memberships for webhook ${webhookId}`
381405
)
382406

383407
let processedCount = 0
384408
let failedCount = 0
385-
// Memberships are pre-sorted ASC by membershipTimestamp; freeze the cursor at the first
386-
// failure so the failed item and everything after it retries on the next poll.
387-
let highestTs = watermark
388-
let cursorFrozen = false
389409

390-
for (const member of memberships) {
410+
for (const member of result.records) {
391411
try {
392412
await pollingIdempotency.executeWithIdempotency(
393413
'hubspot',
@@ -401,38 +421,41 @@ async function pollListMembership(
401421
listId,
402422
timestamp: new Date().toISOString(),
403423
}
404-
const result = await processPolledWebhookEvent(
424+
const wfResult = await processPolledWebhookEvent(
405425
webhookData,
406426
workflowData,
407427
payload,
408428
requestId
409429
)
410-
if (!result.success) {
430+
if (!wfResult.success) {
411431
throw new Error(
412-
`Webhook processing failed (${result.statusCode}): ${result.error ?? 'unknown'}`
432+
`Webhook processing failed (${wfResult.statusCode}): ${wfResult.error ?? 'unknown'}`
413433
)
414434
}
415435
return { recordId: member.recordId, processed: true }
416436
}
417437
)
418438
processedCount++
419-
if (!cursorFrozen && compareIsoTimestamps(member.membershipTimestamp, highestTs) > 0) {
420-
highestTs = member.membershipTimestamp
421-
}
422439
} catch (error) {
423440
failedCount++
424-
cursorFrozen = true
425441
logger.error(
426442
`[${requestId}] Error processing HubSpot list membership ${member.recordId}:`,
427443
getErrorMessage(error, 'Unknown error')
428444
)
429445
}
430446
}
431447

448+
// HubSpot's `paging.next.after` is page-granular — there's no per-record cursor we can
449+
// freeze on. Advance the cursor only when the entire batch succeeded; otherwise replay
450+
// the page next poll and let idempotency dedup the records that already landed.
451+
const advanceCursor = failedCount === 0
452+
const nextCursor = advanceCursor
453+
? (result.resumeCursor ?? '')
454+
: (config.lastSeenMembershipCursor?.trim() ?? '')
432455
await updateWebhookProviderConfig(
433456
webhookId,
434457
{
435-
lastSeenMembershipTimestamp: highestTs,
458+
lastSeenMembershipCursor: nextCursor,
436459
lastCheckedTimestamp: new Date(nowMs).toISOString(),
437460
},
438461
logger
@@ -821,89 +844,101 @@ async function processRecords(
821844
}
822845
}
823846

824-
interface ListMembership {
847+
interface ListMembershipRecord {
825848
recordId: string
826849
membershipTimestamp: string
827850
}
828851

829-
async function fetchListMembershipHead(
830-
listId: string,
831-
accessToken: string,
832-
requestId: string,
852+
interface FetchListMembershipPagesArgs {
853+
listId: string
854+
accessToken: string
855+
initialAfter: string | undefined
856+
/** Soft cap on emitted records (normal mode) or pages × HUBSPOT_PAGE_LIMIT (seed mode). */
857+
pageLimit: number
858+
requestId: string
833859
logger: Logger
834-
): Promise<string | null> {
835-
const url = `https://api.hubapi.com/crm/v3/lists/${encodeURIComponent(listId)}/memberships/join-order?limit=1`
836-
const response = await fetch(url, { headers: { Authorization: `Bearer ${accessToken}` } })
837-
if (!response.ok) {
838-
const errorText = await response.text().catch(() => '')
839-
logger.error(
840-
`[${requestId}] HubSpot list memberships head fetch failed ${response.status}: ${errorText}`
841-
)
842-
throw new Error(`HubSpot list memberships head fetch ${response.status}`)
843-
}
844-
const data = (await response.json()) as { results?: ListMembership[] }
845-
return data.results?.[0]?.membershipTimestamp ?? null
846860
}
847861

848-
async function fetchListMembershipsSince(
849-
listId: string,
850-
watermark: string,
851-
maxRecords: number,
852-
accessToken: string,
853-
requestId: string,
854-
logger: Logger
855-
): Promise<ListMembership[]> {
856-
// HubSpot returns members in join-order ASC. We paginate until either we find a member
857-
// with a join timestamp <= watermark or we hit the per-poll cap.
858-
const collected: ListMembership[] = []
859-
let after: string | undefined
862+
interface FetchListMembershipPagesResult {
863+
records: ListMembershipRecord[]
864+
/** Cursor to persist for the next poll; empty if we are at the very start. */
865+
resumeCursor: string | undefined
866+
/** True when the API stopped returning `paging.next.after` — we've consumed everything. */
867+
reachedEnd: boolean
868+
/** Total records scanned across pages (useful for seed-progress logging). */
869+
scanned: number
870+
}
871+
872+
/**
873+
* Walks `/lists/{listId}/memberships/join-order` ASC from an optional cursor until either
874+
* (a) the per-poll page cap is reached or (b) the API stops returning a next cursor.
875+
*
876+
* The endpoint returns members in ascending join-order by default. We never use `before`
877+
* (DESC mode) because its bootstrap semantics — what value to pass for "newest first" —
878+
* are undocumented in HubSpot's SDK type and behave inconsistently in practice. ASC + a
879+
* stored cursor is provably correct and lets new members appear on subsequent polls as
880+
* they're appended past our cursor's position.
881+
*/
882+
async function fetchListMembershipPages(
883+
args: FetchListMembershipPagesArgs
884+
): Promise<FetchListMembershipPagesResult> {
885+
const { listId, accessToken, initialAfter, pageLimit, requestId, logger } = args
886+
887+
const records: ListMembershipRecord[] = []
888+
let after: string | undefined = initialAfter
860889
let pages = 0
890+
let reachedEnd = false
891+
let scanned = 0
861892

862-
do {
863-
const params = new URLSearchParams({ limit: String(Math.min(maxRecords, 100)) })
893+
while (pages < MAX_PAGES_PER_POLL) {
894+
const params = new URLSearchParams({ limit: String(HUBSPOT_PAGE_LIMIT) })
864895
if (after) params.set('after', after)
865896
const url = `https://api.hubapi.com/crm/v3/lists/${encodeURIComponent(listId)}/memberships/join-order?${params.toString()}`
866897
const response = await fetch(url, { headers: { Authorization: `Bearer ${accessToken}` } })
898+
867899
if (!response.ok) {
868900
const errorText = await response.text().catch(() => '')
869901
logger.error(
870902
`[${requestId}] HubSpot list memberships fetch failed ${response.status}: ${errorText}`
871903
)
872-
throw new Error(`HubSpot list memberships fetch ${response.status}`)
904+
throw new Error(
905+
`HubSpot list memberships fetch ${response.status}: ${errorText.slice(0, 500)}`
906+
)
873907
}
908+
874909
const data = (await response.json()) as {
875-
results?: ListMembership[]
910+
results?: Array<{ recordId: string; membershipTimestamp: string }>
876911
paging?: { next?: { after?: string } }
877912
}
878-
for (const m of data.results ?? []) {
879-
if (compareIsoTimestamps(m.membershipTimestamp, watermark) > 0) {
880-
collected.push(m)
881-
}
913+
const batch = data.results ?? []
914+
scanned += batch.length
915+
const nextAfter = data.paging?.next?.after
916+
917+
for (const m of batch) {
918+
records.push({
919+
recordId: m.recordId,
920+
membershipTimestamp: m.membershipTimestamp,
921+
})
882922
}
883-
after = data.paging?.next?.after
923+
884924
pages++
885-
if (collected.length >= maxRecords) break
886-
if (pages >= MAX_PAGES_PER_POLL) {
887-
logger.warn(
888-
`[${requestId}] HubSpot list-membership poll hit MAX_PAGES_PER_POLL — remaining will roll over`
889-
)
925+
if (!nextAfter) {
926+
reachedEnd = true
890927
break
891928
}
892-
} while (after)
893-
894-
collected.sort((a, b) => compareIsoTimestamps(a.membershipTimestamp, b.membershipTimestamp))
895-
return collected.slice(0, maxRecords)
896-
}
929+
after = nextAfter
930+
if (records.length >= pageLimit) break
931+
}
897932

898-
function compareIsoTimestamps(a: string, b: string): number {
899-
const aMs = Date.parse(a)
900-
const bMs = Date.parse(b)
901-
if (Number.isFinite(aMs) && Number.isFinite(bMs)) {
902-
if (aMs === bMs) return 0
903-
return aMs > bMs ? 1 : -1
933+
return {
934+
records,
935+
// If we walked to the end, hold onto the cursor we LAST used so the next poll re-fetches
936+
// the tail page and picks up any members appended since. If we stopped early, the next
937+
// cursor walks us forward through the rest of the list.
938+
resumeCursor: after,
939+
reachedEnd,
940+
scanned,
904941
}
905-
if (a === b) return 0
906-
return a > b ? 1 : -1
907942
}
908943

909944
/** Numeric compare for HubSpot ids (decimal strings, treated numerically by GT/LT). */

0 commit comments

Comments
 (0)