From d1b72af925a9146a45b1e0e5caf8f9baf8ade32d Mon Sep 17 00:00:00 2001 From: Isenewo Oluwaseyi Ephraim Date: Mon, 30 Mar 2026 13:46:27 +0100 Subject: [PATCH] =?UTF-8?q?feat(backend):=20POST=20/api/v1/streams=20?= =?UTF-8?q?=E2=80=94=20create=20stream=20endpoint=20with=20Zod=20validatio?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the create stream API for issue #3. Changes: - Add to StreamRepository using Drizzle insert().values().returning() consistent with existing query patterns - Add Zod schema validating: - payer / recipient (non-empty strings) - ratePerSecond / totalAmount (positive decimal strings) - startTime / endTime (ISO-8601 datetime; endTime optional) - Add POST /api/v1/streams handler: validate body → persist → return 201 + stream - Stream status defaults to active; lastSettledAt seeded from startTime - Return structured 400 validation errors with per-field details via zod.flatten().fieldErrors Tests (supertest): - 201 success path with mocked repository - 400 missing required fields - 400 invalid ratePerSecond (non-decimal string) - 400 invalid startTime (non-ISO-8601) - 500 when repository throws unexpectedly Security notes: - All input sanitised through Zod before reaching the repository - No raw user input passed to DB queries - Decimal fields accepted as strings to avoid float precision issues Draft — optional chain invoke not yet wired (follow-up issue) --- src/api/v1/streams.test.ts | 67 ++++++++++++++++++++++++++++ src/api/v1/streams.ts | 50 +++++++++++++++++++++ src/repositories/streamRepository.ts | 10 ++++- 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/api/v1/streams.test.ts b/src/api/v1/streams.test.ts index d473a4a..88ced64 100644 --- a/src/api/v1/streams.test.ts +++ b/src/api/v1/streams.test.ts @@ -35,6 +35,73 @@ describe("Stream API Routes", () => { }); }); + describe("POST /api/v1/streams", () => { + const validBody = { + payer: "GPAYER", + recipient: "GRECIPIENT", + ratePerSecond: "0.000001", + startTime: "2026-01-01T00:00:00.000Z", + totalAmount: "100.0", + }; + + it("should return 201 and the created stream", async () => { + const created = { id: "123e4567-e89b-12d3-a456-426614174000", ...validBody, status: "active" }; + const spy = jest + .spyOn(StreamRepository.prototype, "create") + .mockResolvedValue(created as never); + + const response = await request(app) + .post("/api/v1/streams") + .send(validBody); + + expect(response.status).toBe(201); + expect(response.body.id).toBe(created.id); + expect(response.body.status).toBe("active"); + spy.mockRestore(); + }); + + it("should return 400 when required fields are missing", async () => { + const response = await request(app) + .post("/api/v1/streams") + .send({ payer: "GPAYER" }); + + expect(response.status).toBe(400); + expect(response.body.error).toBe("Validation failed"); + expect(response.body.details).toHaveProperty("recipient"); + }); + + it("should return 400 when ratePerSecond is not a decimal string", async () => { + const response = await request(app) + .post("/api/v1/streams") + .send({ ...validBody, ratePerSecond: "not-a-number" }); + + expect(response.status).toBe(400); + expect(response.body.details).toHaveProperty("ratePerSecond"); + }); + + it("should return 400 when startTime is not ISO-8601", async () => { + const response = await request(app) + .post("/api/v1/streams") + .send({ ...validBody, startTime: "not-a-date" }); + + expect(response.status).toBe(400); + expect(response.body.details).toHaveProperty("startTime"); + }); + + it("should return 500 when repository throws", async () => { + const spy = jest + .spyOn(StreamRepository.prototype, "create") + .mockRejectedValue(new Error("DB error")); + + const response = await request(app) + .post("/api/v1/streams") + .send(validBody); + + expect(response.status).toBe(500); + spy.mockRestore(); + }); + }); + describe("GET /api/v1/streams", () => { it("should return 200 and the list of streams", async () => { const mockResult = { diff --git a/src/api/v1/streams.ts b/src/api/v1/streams.ts index df1b234..a432615 100644 --- a/src/api/v1/streams.ts +++ b/src/api/v1/streams.ts @@ -1,9 +1,59 @@ import { Router, Request, Response } from "express"; +import { z } from "zod"; import { StreamRepository, FindAllParams } from "../../repositories/streamRepository"; const router = Router(); const streamRepository = new StreamRepository(); +const createStreamSchema = z.object({ + payer: z.string().min(1, "payer is required"), + recipient: z.string().min(1, "recipient is required"), + ratePerSecond: z + .string() + .regex(/^\d+(\.\d+)?$/, "ratePerSecond must be a positive decimal string"), + startTime: z.string().datetime({ message: "startTime must be an ISO-8601 datetime" }), + endTime: z + .string() + .datetime({ message: "endTime must be an ISO-8601 datetime" }) + .optional(), + totalAmount: z + .string() + .regex(/^\d+(\.\d+)?$/, "totalAmount must be a positive decimal string"), +}); + +type CreateStreamBody = z.infer; + +// POST /api/v1/streams +router.post("/", async (req: Request, res: Response) => { + try { + const parsed = createStreamSchema.safeParse(req.body); + if (!parsed.success) { + return res.status(400).json({ + error: "Validation failed", + details: parsed.error.flatten().fieldErrors, + }); + } + + const body = parsed.data as CreateStreamBody; + + const stream = await streamRepository.create({ + payer: body.payer, + recipient: body.recipient, + ratePerSecond: body.ratePerSecond, + startTime: new Date(body.startTime), + endTime: body.endTime ? new Date(body.endTime) : undefined, + totalAmount: body.totalAmount, + status: "active", + lastSettledAt: new Date(body.startTime), + }); + + return res.status(201).json(stream); + } catch (error) { + console.error("Error creating stream:", error); + return res.status(500).json({ error: "Internal server error" }); + } +}); + // GET /api/v1/streams/:id router.get("/:id", async (req: Request, res: Response) => { try { diff --git a/src/repositories/streamRepository.ts b/src/repositories/streamRepository.ts index 7455c1c..53b81b8 100644 --- a/src/repositories/streamRepository.ts +++ b/src/repositories/streamRepository.ts @@ -1,6 +1,6 @@ import { eq, and, desc, sql } from "drizzle-orm"; import { db } from "../db/index"; -import { streams, Stream } from "../db/schema"; +import { streams, Stream, NewStream } from "../db/schema"; export interface FindAllParams { payer?: string; @@ -61,6 +61,14 @@ export class StreamRepository { }; } + async create(data: NewStream): Promise { + const [created] = await db.insert(streams).values(data).returning(); + if (!created) { + throw new Error("Stream insert did not return a row"); + } + return created; + } + private calculateAccruedEstimate(stream: Stream): number { if (stream.status !== "active") return 0;