Skip to content

Commit 216c861

Browse files
waleedlatif1claude
andcommitted
fix(data-drains): address PR review (update schema, S3 date partition, self-hosted gate)
- update body schema: drop the discriminated-union-with-.optional() that silently required destinationType for any non-undefined body. The route already validates destination payloads against the typed configSchema/credentialsSchema for the existing drain, so the contract is now a flat partial — clients can send {enabled:false} without supplying destinationType - S3 buildKey: partition by run startedAt instead of new Date() per chunk so a single run that crosses midnight still lands under one YYYY/MM/DD prefix - self-hosted gate: wire DATA_DRAINS_ENABLED into authorizeDrainAccess and the cron dispatcher route so the docs claim ("reserved for server-side feature gating") is actually enforced — mutating endpoints 404 and the dispatcher no-ops when unset Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0fd2c7f commit 216c861

8 files changed

Lines changed: 77 additions & 26 deletions

File tree

apps/docs/content/docs/en/enterprise/data-drains.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,6 @@ DATA_DRAINS_ENABLED=true
162162
NEXT_PUBLIC_DATA_DRAINS_ENABLED=true
163163
```
164164

165-
`NEXT_PUBLIC_DATA_DRAINS_ENABLED` shows the **Settings → Enterprise → Data Drains** page in the UI. `DATA_DRAINS_ENABLED` is reserved for server-side feature gating on self-hosted deployments. Both should be set to `true` together.
165+
`NEXT_PUBLIC_DATA_DRAINS_ENABLED` shows the **Settings → Enterprise → Data Drains** page in the UI. `DATA_DRAINS_ENABLED` gates the server-side mutating endpoints and the cron dispatcher — when unset on a self-hosted deployment, drain create/update/delete/run requests return `404` and the dispatcher is a no-op. Both should be set to `true` together.
166166

167167
Data Drains otherwise rely on the standard Trigger.dev background job infrastructure used elsewhere in Sim — no additional setup is required. The cron dispatcher runs hourly and fans out due drains as background jobs.

apps/sim/app/api/cron/run-data-drains/route.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { type NextRequest, NextResponse } from 'next/server'
44
import { verifyCronAuth } from '@/lib/auth/internal'
5+
import { isBillingEnabled, isDataDrainsEnabled } from '@/lib/core/config/feature-flags'
56
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
67
import { dispatchDueDrains } from '@/lib/data-drains/dispatcher'
78

@@ -11,6 +12,13 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
1112
const authError = verifyCronAuth(request, 'Data drain dispatcher')
1213
if (authError) return authError
1314

15+
// Self-hosted opt-in: skip dispatch entirely when the deployment hasn't
16+
// enabled drains. Sim Cloud (billing enabled) gates per-org by enterprise
17+
// plan inside the dispatcher's join.
18+
if (!isBillingEnabled && !isDataDrainsEnabled) {
19+
return NextResponse.json({ success: true, dispatched: 0, skipped: 'disabled' })
20+
}
21+
1422
try {
1523
const result = await dispatchDueDrains()
1624
logger.info('Data drain dispatcher run complete', result)

apps/sim/lib/api/contracts/data-drains.ts

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,23 +73,20 @@ export const createDataDrainBodySchema = z.intersection(
7373
dataDrainDestinationBodySchema
7474
)
7575

76-
export const updateDataDrainBodySchema = z.intersection(
77-
drainCommonBodyFieldsSchema.partial(),
78-
z
79-
.discriminatedUnion('destinationType', [
80-
z.object({
81-
destinationType: z.literal('s3'),
82-
destinationConfig: s3ConfigBodySchema.optional(),
83-
destinationCredentials: s3CredentialsBodySchema.optional(),
84-
}),
85-
z.object({
86-
destinationType: z.literal('webhook'),
87-
destinationConfig: webhookConfigBodySchema.optional(),
88-
destinationCredentials: webhookCredentialsBodySchema.optional(),
89-
}),
90-
])
91-
.optional()
92-
)
76+
/**
77+
* Update bodies are partial — every field is optional. We deliberately don't
78+
* use a discriminated union here: clients sending `{ enabled: false }` should
79+
* not be forced to also send `destinationType`. The route validates the
80+
* destination payloads against the typed `configSchema` / `credentialsSchema`
81+
* for the existing drain's destination type before persisting, so the
82+
* structural shape is still enforced — just at the route layer rather than at
83+
* the contract boundary.
84+
*/
85+
export const updateDataDrainBodySchema = drainCommonBodyFieldsSchema.partial().extend({
86+
destinationType: dataDrainDestinationTypeSchema.optional(),
87+
destinationConfig: z.record(z.string(), z.unknown()).optional(),
88+
destinationCredentials: z.record(z.string(), z.unknown()).optional(),
89+
})
9390

9491
const drainDestinationResponseSchema = z.discriminatedUnion('destinationType', [
9592
z.object({

apps/sim/lib/data-drains/access.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@ import { and, eq } from 'drizzle-orm'
44
import { NextResponse } from 'next/server'
55
import { getSession } from '@/lib/auth'
66
import { isOrganizationOnEnterprisePlan } from '@/lib/billing/core/subscription'
7-
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
7+
import { isBillingEnabled, isDataDrainsEnabled } from '@/lib/core/config/feature-flags'
8+
9+
/**
10+
* On Sim Cloud (billing enabled), enterprise plan is the gate. On self-hosted
11+
* deployments, owners opt in by setting `DATA_DRAINS_ENABLED=true` — and
12+
* routes 404 until they do, so enterprise customers don't accidentally expose
13+
* mutating endpoints just by deploying a newer image.
14+
*/
815

916
export interface DrainAccessSession {
1017
user: {
@@ -53,6 +60,15 @@ export async function authorizeDrainAccess(
5360
}
5461

5562
if (options.requireMutating) {
63+
if (!isBillingEnabled && !isDataDrainsEnabled) {
64+
return {
65+
ok: false,
66+
response: NextResponse.json(
67+
{ error: 'Data Drains are not enabled on this deployment' },
68+
{ status: 404 }
69+
),
70+
}
71+
}
5672
if (memberEntry.role !== 'owner' && memberEntry.role !== 'admin') {
5773
return {
5874
ok: false,

apps/sim/lib/data-drains/destinations/s3.test.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ describe('s3Destination openSession', () => {
4747
source: 'workflow_logs' as const,
4848
sequence,
4949
rowCount: 1,
50+
runStartedAt: new Date('2025-06-15T12:00:00Z'),
5051
})
5152
const signal = new AbortController().signal
5253

@@ -90,7 +91,14 @@ describe('s3Destination openSession', () => {
9091
const result = await session.deliver({
9192
body: Buffer.from('x'),
9293
contentType: 'application/x-ndjson',
93-
metadata: { drainId: 'd', runId: 'r', source: 'audit_logs', sequence: 0, rowCount: 1 },
94+
metadata: {
95+
drainId: 'd',
96+
runId: 'r',
97+
source: 'audit_logs',
98+
sequence: 0,
99+
rowCount: 1,
100+
runStartedAt: new Date('2025-06-15T12:00:00Z'),
101+
},
94102
signal: new AbortController().signal,
95103
})
96104
expect(result.locator).toMatch(
@@ -111,7 +119,14 @@ describe('s3Destination openSession', () => {
111119
session.deliver({
112120
body: Buffer.from('x'),
113121
contentType: 'application/x-ndjson',
114-
metadata: { drainId: 'd', runId: 'r', source: 'audit_logs', sequence: 0, rowCount: 1 },
122+
metadata: {
123+
drainId: 'd',
124+
runId: 'r',
125+
source: 'audit_logs',
126+
sequence: 0,
127+
rowCount: 1,
128+
runStartedAt: new Date('2025-06-15T12:00:00Z'),
129+
},
115130
signal: new AbortController().signal,
116131
})
117132
).rejects.toThrow(/AccessDenied 403/)

apps/sim/lib/data-drains/destinations/s3.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,20 @@ function normalizePrefix(raw: string | undefined): string {
6868

6969
function buildKey(
7070
config: S3DestinationConfig,
71-
metadata: { drainId: string; runId: string; source: string; sequence: number }
71+
metadata: {
72+
drainId: string
73+
runId: string
74+
source: string
75+
sequence: number
76+
runStartedAt: Date
77+
}
7278
): string {
73-
const now = new Date()
74-
const yyyy = now.getUTCFullYear().toString().padStart(4, '0')
75-
const mm = (now.getUTCMonth() + 1).toString().padStart(2, '0')
76-
const dd = now.getUTCDate().toString().padStart(2, '0')
79+
// Partition by the run's start time so all chunks from one run share a
80+
// single date prefix even if delivery crosses a midnight boundary.
81+
const partition = metadata.runStartedAt
82+
const yyyy = partition.getUTCFullYear().toString().padStart(4, '0')
83+
const mm = (partition.getUTCMonth() + 1).toString().padStart(2, '0')
84+
const dd = partition.getUTCDate().toString().padStart(2, '0')
7785
const seq = metadata.sequence.toString().padStart(5, '0')
7886
const prefix = normalizePrefix(config.prefix)
7987
return `${prefix}${metadata.source}/${metadata.drainId}/${yyyy}/${mm}/${dd}/${metadata.runId}-${seq}.ndjson`

apps/sim/lib/data-drains/service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ export async function runDrain(
101101
source: drain.source,
102102
sequence,
103103
rowCount: chunk.length,
104+
runStartedAt: startedAt,
104105
},
105106
signal,
106107
})

apps/sim/lib/data-drains/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ export interface DeliveryMetadata {
5757
/** 0-based chunk index within the run. */
5858
sequence: number
5959
rowCount: number
60+
/**
61+
* Wall-clock start of the run. Destinations that partition by date (e.g. S3
62+
* `YYYY/MM/DD` keys) should derive the partition from this so a single run
63+
* lands under one prefix even when delivery crosses a midnight boundary.
64+
*/
65+
runStartedAt: Date
6066
}
6167

6268
export interface DeliveryResult {

0 commit comments

Comments
 (0)