Skip to content

Commit c97d246

Browse files
isshaddadmatt-aitkendevin-ai-integration[bot]
authored
feat(webapp): sync new orgs + users to Attio CRM on signup (#3896)
Pushes new organizations and users into the Attio CRM at signup time, for Customer Success (TRI-10431). - Orgs → Attio `workspaces`, users → Attio `users`, keyed on Attio's built-in unique `workspace_id` / `user_id` so writes are idempotent upserts. - Runs on the common Redis worker (not inline), so a slow or unavailable Attio never blocks the signup path; failures retry (3 attempts). - Hooks: user-created (alongside the existing Loops call) and org-created (`createOrganization`). - Gated behind `ATTIO_API_KEY`, no key means the sync is skipped entirely, so OSS / self-hosted installs are unaffected. Only creation is covered here (the record "shell"); spend, runs, plan changes, churn, and role/relationship linking are populated by the scheduled full sync, tracked separately. **Deploy note:** requires an Attio API key set as `ATTIO_API_KEY` in the webapp env, with scopes **Records (read-write)** + **Object Configuration (read)**, the assert/upsert endpoint reads object config to resolve the matching attribute. Without the key the sync no-ops. --------- Co-authored-by: Matt Aitken <matt@mattaitken.com> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 3fdfe21 commit c97d246

5 files changed

Lines changed: 194 additions & 19 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,13 @@ import { isValidDuration } from "./services/realtime/duration.server";
88
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
99
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
1010
function durationString() {
11-
return z
12-
.string()
13-
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
11+
return z.string().refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
1412
}
1513

1614
// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
1715
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
1816
// and its _REQUIRED variant. Adds zod issues for empty input or unknown names.
19-
const parseMachinePresetCsv = (
20-
raw: string,
21-
ctx: z.RefinementCtx
22-
): MachinePresetName[] => {
17+
const parseMachinePresetCsv = (raw: string, ctx: z.RefinementCtx): MachinePresetName[] => {
2318
const names = raw
2419
.split(",")
2520
.map((s) => s.trim())
@@ -521,10 +516,7 @@ const EnvironmentSchema = z
521516
.string()
522517
.optional()
523518
.transform((v, ctx) =>
524-
parseMachinePresetCsv(
525-
v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x",
526-
ctx
527-
)
519+
parseMachinePresetCsv(v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", ctx)
528520
),
529521

530522
DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"),
@@ -696,6 +688,7 @@ const EnvironmentSchema = z
696688
ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
697689

698690
LOOPS_API_KEY: z.string().optional(),
691+
ATTIO_API_KEY: z.string().optional(),
699692
MARQS_DISABLE_REBALANCING: BoolEnv.default(false),
700693
MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
701694
.number()
@@ -1186,7 +1179,9 @@ const EnvironmentSchema = z
11861179
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
11871180
// no-op because the gate-side singleton refuses to construct a buffer
11881181
// when the system is off.
1189-
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
1182+
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z
1183+
.string()
1184+
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
11901185
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
11911186
TRIGGER_MOLLIFIER_REDIS_HOST: z
11921187
.string()
@@ -1196,7 +1191,7 @@ const EnvironmentSchema = z
11961191
.number()
11971192
.optional()
11981193
.transform(
1199-
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
1194+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
12001195
),
12011196
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
12021197
.string()
@@ -1206,7 +1201,9 @@ const EnvironmentSchema = z
12061201
.string()
12071202
.optional()
12081203
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1209-
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1204+
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z
1205+
.string()
1206+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
12101207
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
12111208
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
12121209
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
@@ -1270,11 +1267,7 @@ const EnvironmentSchema = z
12701267
// (retrieve, trace) have a safety net while PG replica lag settles.
12711268
TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30),
12721269
// ioredis per-request retry limit on the buffer's Redis client.
1273-
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce
1274-
.number()
1275-
.int()
1276-
.positive()
1277-
.default(20),
1270+
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce.number().int().positive().default(20),
12781271
// ioredis reconnect backoff envelope for the buffer client: the base
12791272
// grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered.
12801273
TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50),

apps/webapp/app/models/organization.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { env } from "~/env.server";
1414
import { featuresForUrl } from "~/features.server";
1515
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
1616
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
17+
import { enqueueAttioWorkspaceSync } from "~/services/attio.server";
1718
export type { Organization };
1819

1920
const nanoid = customAlphabet("1234567890abcdef", 4);
@@ -104,6 +105,16 @@ export async function createOrganization(
104105
},
105106
});
106107

108+
// Fire-and-forget; never blocks org creation.
109+
void enqueueAttioWorkspaceSync({
110+
orgId: organization.id,
111+
title: organization.title,
112+
slug: organization.slug,
113+
companySize: organization.companySize,
114+
createdAt: organization.createdAt,
115+
adminUserId: userId,
116+
});
117+
107118
return { ...organization };
108119
}
109120

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import { z } from "zod";
2+
import { prisma } from "~/db.server";
3+
import { env } from "~/env.server";
4+
import { logger } from "./logger.server";
5+
6+
// Syncs new orgs/users into Attio (workspaces/users objects) at signup, via the
7+
// common worker so a slow Attio never blocks signup. Ongoing field updates are
8+
// handled by the scheduled sync, not here. No-op without ATTIO_API_KEY.
9+
10+
const ATTIO_API = "https://api.attio.com/v2";
11+
const IS_TEST = env.APP_ENV !== "production";
12+
13+
export const AttioWorkspaceSyncSchema = z.object({
14+
orgId: z.string(),
15+
title: z.string(),
16+
slug: z.string(),
17+
companySize: z.string().nullish(),
18+
createdAt: z.coerce.date(),
19+
adminUserId: z.string(),
20+
});
21+
export type AttioWorkspaceSync = z.infer<typeof AttioWorkspaceSyncSchema>;
22+
23+
export const AttioUserSyncSchema = z.object({
24+
userId: z.string(),
25+
email: z.string(),
26+
referralSource: z.string().nullish(),
27+
marketingEmails: z.boolean(),
28+
createdAt: z.coerce.date(),
29+
});
30+
export type AttioUserSync = z.infer<typeof AttioUserSyncSchema>;
31+
32+
class AttioClient {
33+
constructor(private readonly apiKey: string) {}
34+
35+
// Create-or-update by unique attribute; returns the record id. Throws on failure so the worker retries.
36+
async #assert(object: string, matchingAttribute: string, values: Record<string, unknown>): Promise<string> {
37+
const url = `${ATTIO_API}/objects/${object}/records?matching_attribute=${matchingAttribute}`;
38+
const response = await fetch(url, {
39+
method: "PUT",
40+
headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" },
41+
body: JSON.stringify({ data: { values } }),
42+
});
43+
44+
if (!response.ok) {
45+
const body = await response.text();
46+
logger.error("Attio assert failed", { object, matchingAttribute, status: response.status, body });
47+
throw new Error(`Attio assert ${object} failed with status ${response.status}`);
48+
}
49+
50+
const recordId = ((await response.json()) as any).data?.id?.record_id;
51+
if (typeof recordId !== "string") {
52+
throw new Error(`Attio assert ${object}: response missing data.id.record_id`);
53+
}
54+
return recordId;
55+
}
56+
57+
async upsertWorkspace(payload: AttioWorkspaceSync, emailDomain?: string) {
58+
// The creating user is an admin of the new org — set their role and link them to the workspace.
59+
const adminRecordId = await this.#assert("users", "user_id", {
60+
user_id: payload.adminUserId,
61+
role: "Admin",
62+
is_test: IS_TEST,
63+
});
64+
65+
await this.#assert("workspaces", "workspace_id", {
66+
workspace_id: payload.orgId,
67+
name: payload.title,
68+
org_slug: payload.slug,
69+
company_size: payload.companySize ?? undefined,
70+
email_domain: emailDomain,
71+
signup_date: toDate(payload.createdAt),
72+
plan: "Free",
73+
account_status: "Active",
74+
is_test: IS_TEST,
75+
users: [{ target_object: "users", target_record_id: adminRecordId }],
76+
});
77+
}
78+
79+
async upsertUser(payload: AttioUserSync) {
80+
await this.#assert("users", "user_id", {
81+
user_id: payload.userId,
82+
primary_email_address: payload.email,
83+
marketing_opt_in: payload.marketingEmails,
84+
referral_source: payload.referralSource ?? undefined,
85+
signup_date: toDate(payload.createdAt),
86+
is_test: IS_TEST,
87+
});
88+
}
89+
}
90+
91+
// Attio `date` attributes want a bare YYYY-MM-DD value.
92+
function toDate(date: Date): string {
93+
return date.toISOString().slice(0, 10);
94+
}
95+
96+
// Domain from an email; the cloud-side matcher normalizes it further.
97+
function domainFromEmail(email: string | undefined): string | undefined {
98+
return email?.split("@")[1]?.toLowerCase().trim() || undefined;
99+
}
100+
101+
export const attioClient = env.ATTIO_API_KEY ? new AttioClient(env.ATTIO_API_KEY) : null;
102+
103+
export async function enqueueAttioWorkspaceSync(payload: AttioWorkspaceSync) {
104+
if (!attioClient) return;
105+
try {
106+
// Lazy import to avoid a circular dependency with commonWorker (which imports this module's schemas).
107+
const { commonWorker } = await import("~/v3/commonWorker.server");
108+
await commonWorker.enqueue({ id: `attio:workspace:${payload.orgId}`, job: "attio.syncWorkspace", payload });
109+
} catch (error) {
110+
logger.error("Failed to enqueue Attio workspace sync", { orgId: payload.orgId, error });
111+
}
112+
}
113+
114+
export async function enqueueAttioUserSync(payload: AttioUserSync) {
115+
if (!attioClient) return;
116+
try {
117+
const { commonWorker } = await import("~/v3/commonWorker.server");
118+
await commonWorker.enqueue({ id: `attio:user:${payload.userId}`, job: "attio.syncUser", payload });
119+
} catch (error) {
120+
logger.error("Failed to enqueue Attio user sync", { userId: payload.userId, error });
121+
}
122+
}
123+
124+
export async function runAttioWorkspaceSync(payload: AttioWorkspaceSync) {
125+
if (!attioClient) return;
126+
const admin = await prisma.user.findFirst({
127+
where: { id: payload.adminUserId },
128+
select: { email: true },
129+
});
130+
await attioClient.upsertWorkspace(payload, domainFromEmail(admin?.email));
131+
}
132+
133+
export async function runAttioUserSync(payload: AttioUserSync) {
134+
if (!attioClient) return;
135+
await attioClient.upsertUser(payload);
136+
}

apps/webapp/app/services/telemetry.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { Organization } from "~/models/organization.server";
55
import type { Project } from "~/models/project.server";
66
import type { User } from "~/models/user.server";
77
import { singleton } from "~/utils/singleton";
8+
import { enqueueAttioUserSync } from "./attio.server";
89
import { loopsClient } from "./loops.server";
910

1011
type Options = {
@@ -74,6 +75,14 @@ class Telemetry {
7475
email: user.email,
7576
name: user.name,
7677
});
78+
79+
enqueueAttioUserSync({
80+
userId: user.id,
81+
email: user.email,
82+
referralSource: referralSource ?? user.referralSource,
83+
marketingEmails: user.marketingEmails,
84+
createdAt: user.createdAt,
85+
});
7786
}
7887
},
7988
};

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import { z } from "zod";
55
import { env } from "~/env.server";
66
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
77
import { sendEmail } from "~/services/email.server";
8+
import {
9+
AttioUserSyncSchema,
10+
AttioWorkspaceSyncSchema,
11+
runAttioUserSync,
12+
runAttioWorkspaceSync,
13+
} from "~/services/attio.server";
814
import { logger } from "~/services/logger.server";
915
import { singleton } from "~/utils/singleton";
1016
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
@@ -46,6 +52,20 @@ function initializeWorker() {
4652
maxAttempts: 3,
4753
},
4854
},
55+
"attio.syncWorkspace": {
56+
schema: AttioWorkspaceSyncSchema,
57+
visibilityTimeoutMs: 30_000,
58+
retry: {
59+
maxAttempts: 3,
60+
},
61+
},
62+
"attio.syncUser": {
63+
schema: AttioUserSyncSchema,
64+
visibilityTimeoutMs: 30_000,
65+
retry: {
66+
maxAttempts: 3,
67+
},
68+
},
4969
"v3.resumeBatchRun": {
5070
schema: z.object({
5171
batchRunId: z.string(),
@@ -213,6 +233,12 @@ function initializeWorker() {
213233
scheduleEmail: async ({ payload }) => {
214234
await sendEmail(payload);
215235
},
236+
"attio.syncWorkspace": async ({ payload }) => {
237+
await runAttioWorkspaceSync(payload);
238+
},
239+
"attio.syncUser": async ({ payload }) => {
240+
await runAttioUserSync(payload);
241+
},
216242
"v3.resumeBatchRun": async ({ payload }) => {
217243
const service = new ResumeBatchRunService();
218244
await service.call(payload.batchRunId);

0 commit comments

Comments
 (0)