Skip to content

Commit 624b89d

Browse files
committed
fix(hubspot): freeze cursor on failure + request full OAuth scopes
1 parent 1515ff9 commit 624b89d

2 files changed

Lines changed: 82 additions & 61 deletions

File tree

apps/sim/lib/webhooks/polling/hubspot.ts

Lines changed: 80 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,12 @@ async function pollListMembership(
382382

383383
let processedCount = 0
384384
let failedCount = 0
385+
// Memberships are pre-sorted ASC by membershipTimestamp; freeze the cursor at the first
386+
// failure so the failed item and everything after it retries on the next poll.
385387
let highestTs = watermark
388+
let cursorFrozen = false
386389

387390
for (const member of memberships) {
388-
if (compareIsoTimestamps(member.membershipTimestamp, highestTs) > 0) {
389-
highestTs = member.membershipTimestamp
390-
}
391-
392391
try {
393392
await pollingIdempotency.executeWithIdempotency(
394393
'hubspot',
@@ -417,8 +416,12 @@ async function pollListMembership(
417416
}
418417
)
419418
processedCount++
419+
if (!cursorFrozen && compareIsoTimestamps(member.membershipTimestamp, highestTs) > 0) {
420+
highestTs = member.membershipTimestamp
421+
}
420422
} catch (error) {
421423
failedCount++
424+
cursorFrozen = true
422425
logger.error(
423426
`[${requestId}] Error processing HubSpot list membership ${member.recordId}:`,
424427
getErrorMessage(error, 'Unknown error')
@@ -710,24 +713,17 @@ async function processRecords(
710713
let skippedCount = 0
711714
let highestSeenMs = 0
712715
let maxIdAtHighestTimestamp = ''
716+
// Stop advancing the cursor at the first failure so that the failed record and all later
717+
// records (sorted ASC) get re-fetched on the next poll. Without this gate, a transient
718+
// failure on a record at a high timestamp would advance the cursor past it permanently.
719+
let cursorFrozen = false
713720

714721
for (const record of records) {
715722
const occurredAtMs = extractPropertyTimestampMs(record, filterProperty)
716-
if (Number.isFinite(occurredAtMs)) {
717-
if (occurredAtMs > highestSeenMs) {
718-
highestSeenMs = occurredAtMs
719-
maxIdAtHighestTimestamp = record.id
720-
} else if (occurredAtMs === highestSeenMs) {
721-
if (compareObjectIds(record.id, maxIdAtHighestTimestamp) > 0) {
722-
maxIdAtHighestTimestamp = record.id
723-
}
724-
}
725-
}
726723

727-
// property_changed semantics — diff against the per-record snapshot of the watched property.
728-
// First time we see a record, treat it as a change (matches Zapier's "Updated Property" behavior).
729724
let previousValue: string | null | undefined
730725
let propertyValue: string | null | undefined
726+
let handledBySkip = false
731727
if (eventType === 'property_changed' && targetProperty && snapshot) {
732728
propertyValue = record.properties?.[targetProperty] ?? null
733729
const had = Object.hasOwn(snapshot.values, record.id)
@@ -737,58 +733,82 @@ async function processRecords(
737733
delete snapshot.values[record.id]
738734
snapshot.values[record.id] = propertyValue ?? null
739735
skippedCount++
740-
continue
736+
handledBySkip = true
741737
}
742-
// Update snapshot now so subsequent records in this loop see it; persisted at end.
743-
snapshot.values[record.id] = propertyValue ?? null
738+
// Note: we do NOT pre-update the snapshot before processing. If emission fails the
739+
// record must re-fetch on the next poll AND still appear as a change vs. the prior
740+
// snapshot — otherwise we'd silently skip it on retry.
744741
}
745742

746-
try {
747-
await pollingIdempotency.executeWithIdempotency(
748-
'hubspot',
749-
`${webhookData.id}:${objectType}:${eventType}:${record.id}:${Number.isFinite(occurredAtMs) ? occurredAtMs : record.updatedAt}`,
750-
async () => {
751-
const payload: Record<string, unknown> = {
752-
objectType,
753-
eventType,
754-
objectId: record.id,
755-
occurredAt: Number.isFinite(occurredAtMs)
756-
? new Date(occurredAtMs).toISOString()
757-
: record.updatedAt,
758-
properties: record.properties,
759-
createdAt: record.createdAt,
760-
updatedAt: record.updatedAt,
761-
archived: record.archived,
762-
timestamp: new Date().toISOString(),
763-
}
764-
if (eventType === 'property_changed' && targetProperty) {
765-
payload.propertyName = targetProperty
766-
payload.propertyValue = propertyValue ?? null
767-
payload.previousValue = previousValue ?? null
768-
}
743+
let handledSuccessfully = handledBySkip
744+
if (!handledBySkip) {
745+
try {
746+
await pollingIdempotency.executeWithIdempotency(
747+
'hubspot',
748+
`${webhookData.id}:${objectType}:${eventType}:${record.id}:${Number.isFinite(occurredAtMs) ? occurredAtMs : record.updatedAt}`,
749+
async () => {
750+
const payload: Record<string, unknown> = {
751+
objectType,
752+
eventType,
753+
objectId: record.id,
754+
occurredAt: Number.isFinite(occurredAtMs)
755+
? new Date(occurredAtMs).toISOString()
756+
: record.updatedAt,
757+
properties: record.properties,
758+
createdAt: record.createdAt,
759+
updatedAt: record.updatedAt,
760+
archived: record.archived,
761+
timestamp: new Date().toISOString(),
762+
}
763+
if (eventType === 'property_changed' && targetProperty) {
764+
payload.propertyName = targetProperty
765+
payload.propertyValue = propertyValue ?? null
766+
payload.previousValue = previousValue ?? null
767+
}
769768

770-
const result = await processPolledWebhookEvent(
771-
webhookData,
772-
workflowData,
773-
payload,
774-
requestId
775-
)
776-
if (!result.success) {
777-
throw new Error(
778-
`Webhook processing failed (${result.statusCode}): ${result.error ?? 'unknown'}`
769+
const result = await processPolledWebhookEvent(
770+
webhookData,
771+
workflowData,
772+
payload,
773+
requestId
779774
)
775+
if (!result.success) {
776+
throw new Error(
777+
`Webhook processing failed (${result.statusCode}): ${result.error ?? 'unknown'}`
778+
)
779+
}
780+
return { recordId: record.id, processed: true }
780781
}
781-
return { recordId: record.id, processed: true }
782+
)
783+
784+
processedCount++
785+
handledSuccessfully = true
786+
if (eventType === 'property_changed' && targetProperty && snapshot) {
787+
snapshot.values[record.id] = propertyValue ?? null
782788
}
783-
)
789+
} catch (error) {
790+
failedCount++
791+
cursorFrozen = true
792+
logger.error(
793+
`[${requestId}] Error processing HubSpot ${objectType} ${record.id}:`,
794+
getErrorMessage(error, 'Unknown error')
795+
)
796+
}
797+
}
784798

785-
processedCount++
786-
} catch (error) {
787-
failedCount++
788-
logger.error(
789-
`[${requestId}] Error processing HubSpot ${objectType} ${record.id}:`,
790-
getErrorMessage(error, 'Unknown error')
791-
)
799+
// Advance the cursor only for records handled (emitted or intentionally skipped) WITHOUT
800+
// any prior failure in this batch. Records are pre-sorted (timestamp ASC, id ASC), so
801+
// the watermark we persist is the highest contiguously-successful (timestamp, id) pair.
802+
// Anything after the first failure stays unfrozen so it gets re-fetched next poll.
803+
if (handledSuccessfully && !cursorFrozen && Number.isFinite(occurredAtMs)) {
804+
if (occurredAtMs > highestSeenMs) {
805+
highestSeenMs = occurredAtMs
806+
maxIdAtHighestTimestamp = record.id
807+
} else if (occurredAtMs === highestSeenMs) {
808+
if (compareObjectIds(record.id, maxIdAtHighestTimestamp) > 0) {
809+
maxIdAtHighestTimestamp = record.id
810+
}
811+
}
792812
}
793813
}
794814

apps/sim/triggers/hubspot/poller.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
hubspotPipelinesSelectorContract,
88
hubspotPropertiesSelectorContract,
99
} from '@/lib/api/contracts/selectors/hubspot'
10+
import { getScopesForService } from '@/lib/oauth/utils'
1011
import { isCredentialSetValue } from '@/executor/constants'
1112
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
1213
import type { TriggerConfig } from '@/triggers/types'
@@ -42,7 +43,7 @@ export const hubspotPollingTrigger: TriggerConfig = {
4243
type: 'oauth-input',
4344
description: 'Connect a HubSpot account so Sim can poll your CRM on your behalf.',
4445
serviceId: 'hubspot',
45-
requiredScopes: [],
46+
requiredScopes: getScopesForService('hubspot'),
4647
required: true,
4748
mode: 'trigger',
4849
supportsCredentialSets: true,

0 commit comments

Comments
 (0)