diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 2c87ad2ea5..9c0ba07d2f 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -271,11 +271,8 @@ export default class ActivityService extends LoggerBase { // propagates out of prepareMemberData and crashes the entire batch, marking all other // results in the batch with the same error even though they are valid. if (!activity) { - this.log.error({ platform }, 'Activity data is missing.') - results.set(resultId, { - success: false, - err: new UnrepeatableError('Activity data is missing.'), - }) + this.log.warn({ platform }, 'Activity data is missing, skipping and marking as processed.') + results.set(resultId, { success: true }) continue } @@ -435,7 +432,22 @@ export default class ActivityService extends LoggerBase { ): Promise> { const resultMap = new Map() - let relevantPayloads = payloads + let relevantPayloads = payloads.filter((p) => { + if (!p.activity) { + this.log.warn( + { + resultId: p.resultId, + integrationId: p.integrationId, + segmentId: p.segmentId, + platform: p.platform, + }, + 'Activity data is missing, skipping and marking as processed.', + ) + resultMap.set(p.resultId, { success: true }) + return false + } + return true + }) this.log.trace(`[ACTIVITY] Processing ${relevantPayloads.length} activities!`) const prepareMemberResults = this.prepareMemberData(relevantPayloads)