From 72ac5c3571ca35f78ef113f144340360378933a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 17 Apr 2026 14:09:59 +0200 Subject: [PATCH 1/4] fix: ignore results that have no activity data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/service/activity.service.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 2c87ad2ea5..35e72f6fe3 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -271,11 +271,8 @@ export default class ActivityService extends LoggerBase { // propagates out of prepareMemberData and crashes the entire batch, marking all other // results in the batch with the same error even though they are valid. if (!activity) { - this.log.error({ platform }, 'Activity data is missing.') - results.set(resultId, { - success: false, - err: new UnrepeatableError('Activity data is missing.'), - }) + this.log.warn({ platform }, 'Activity data is missing, skipping and marking as processed.') + results.set(resultId, { success: true }) continue } From bfda5ecb7fed8981234983efe12c86e444895cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 17 Apr 2026 14:33:06 +0200 Subject: [PATCH 2/4] fix: bugfix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/service/activity.service.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 35e72f6fe3..b202b20621 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -432,7 +432,14 @@ export default class ActivityService extends LoggerBase { ): Promise> { const resultMap = new Map() - let relevantPayloads = payloads + let relevantPayloads = payloads.filter((p) => { + if (!p.activity) { + this.log.warn({ platform: p.platform }, 'Activity data is missing, skipping and marking as processed.') + resultMap.set(p.resultId, { success: true }) + return false + } + return true + }) this.log.trace(`[ACTIVITY] Processing ${relevantPayloads.length} activities!`) const prepareMemberResults = this.prepareMemberData(relevantPayloads) From 9be56e29a4eaf897cb0071a49e5457ea15625840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 17 Apr 2026 14:51:34 +0200 Subject: [PATCH 3/4] fix: bugfix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/service/activity.service.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index b202b20621..3c99569c5c 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -434,7 +434,10 @@ export default class ActivityService extends LoggerBase { let relevantPayloads = payloads.filter((p) => { if (!p.activity) { - this.log.warn({ platform: p.platform }, 'Activity data is missing, skipping and marking as processed.') + this.log.warn( + { platform: p.platform }, + 'Activity data is missing, skipping and marking as processed.', + ) resultMap.set(p.resultId, { success: true }) return false } From e7f3f57c1fab2b0fa516af420979a797694b4081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 17 Apr 2026 15:32:11 +0200 Subject: [PATCH 4/4] fix: bugfix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/service/activity.service.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 3c99569c5c..9c0ba07d2f 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -435,7 +435,12 @@ export default class ActivityService extends LoggerBase { let relevantPayloads = payloads.filter((p) => { if (!p.activity) { this.log.warn( - { platform: p.platform }, + { + resultId: p.resultId, + integrationId: p.integrationId, + segmentId: p.segmentId, + platform: p.platform, + }, 'Activity data is missing, skipping and marking as processed.', ) resultMap.set(p.resultId, { success: true })