diff --git a/handwritten/pubsub/package.json b/handwritten/pubsub/package.json index 299fa8301546..351d1b788cca 100644 --- a/handwritten/pubsub/package.json +++ b/handwritten/pubsub/package.json @@ -71,8 +71,11 @@ "p-defer": "^3.0.0" }, "devDependencies": { + "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1", "@grpc/proto-loader": "^0.8.0", "@opentelemetry/sdk-trace-base": "^1.17.0", + "@opentelemetry/sdk-trace-node": "^2.7.0", + "@types/chai": "^5.2.3", "@types/duplexify": "^3.6.4", "@types/extend": "^3.0.4", "@types/lodash.snakecase": "^4.1.9", @@ -84,7 +87,9 @@ "@types/sinon": "^21.0.0", "@types/tmp": "^0.2.6", "@types/uuid": "^11.0.0", + "avro-js": "^1.12.1", "c8": "^10.1.3", + "chai": "^6.2.2", "codecov": "^3.8.3", "execa": "~5.1.0", "gapic-tools": "^1.0.1", diff --git a/handwritten/pubsub/system-test/avro-js.d.ts b/handwritten/pubsub/system-test/avro-js.d.ts new file mode 100644 index 000000000000..4a8496b76e69 --- /dev/null +++ b/handwritten/pubsub/system-test/avro-js.d.ts @@ -0,0 +1,10 @@ +declare module 'avro-js' { + function parse(def: string): Parser; + + class Parser { + fromBuffer(buf: Buffer): T; + fromString(str: string): T; + toBuffer(item: T): Buffer; + toString(item: T): string; + } +} diff --git a/handwritten/pubsub/system-test/avro-samples.test.ts b/handwritten/pubsub/system-test/avro-samples.test.ts new file mode 100644 index 000000000000..246dccdf1f57 --- /dev/null +++ b/handwritten/pubsub/system-test/avro-samples.test.ts @@ -0,0 +1,128 @@ +import {Message, PubSub, Schema} from '../src'; +import * as assert from 'assert'; +import {describe, it, after, before} from 'mocha'; +import {TestResources} from './testResources'; +import * as avro from 'avro-js'; +import * as fs from 'fs'; + +describe('Avro Samples System Tests', () => { + const pubsub = new PubSub(); + const resources = new TestResources('ps-sys-avro'); + + let topicName: string; + let subName: string; + let schemaId: string; + + before(async () => { + topicName = resources.generateName('topic'); + subName = resources.generateName('sub'); + schemaId = resources.generateName('schema'); + + const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString(); + await pubsub.createSchema(schemaId, 'AVRO', definition); + await pubsub.createTopic({ + name: topicName, + schemaSettings: { + schema: await pubsub.schema(schemaId).getName(), + encoding: 'BINARY', + }, + }); + + const [topic] = await pubsub.topic(topicName).get(); + await topic.createSubscription(subName); + }); + + after(async () => { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + resources.filterForCleanup(subscriptions).map(x => x.delete?.()) + ); + + const [topics] = await pubsub.getTopics(); + await Promise.all( + resources.filterForCleanup(topics).map((x: any) => x.delete?.()) + ); + + const schemas: any[] = []; + for await (const s of pubsub.listSchemas()) { + schemas.push(pubsub.schema(s.name!)); + } + await Promise.all( + resources.filterForCleanup(schemas).map(x => x.delete?.()) + ); + }); + + it('should publish and listen for avro records', async () => { + const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString(); + const [topic] = await pubsub.topic(topicName).get(); + const [subscription] = await pubsub.subscription(subName).get(); + + const type = avro.parse(definition); + + const province = { + name: 'Ontario', + post_abbr: 'ON', + }; + + const dataBuffer = type.toBuffer(province); + const messageId = await topic.publish(dataBuffer); + assert.ok(messageId); + + const message = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for Avro record')), 15000); + subscription.once('message', (m: Message) => { + clearTimeout(timeout); + m.ack(); + resolve(m); + }); + }); + + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + assert.strictEqual(schemaMetadata.encoding, 'BINARY'); + + const result = type.fromBuffer(message.data) as any; + assert.strictEqual(result.name, 'Ontario'); + assert.strictEqual(result.post_abbr, 'ON'); + }); + + it('should listen for avro records with revisions', async () => { + const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString(); + + const schemaClient = await pubsub.getSchemaClient(); + + const [topic] = await pubsub.topic(topicName).get(); + const [subscription] = await pubsub.subscription(subName).get(); + + const type = avro.parse(definition); + const province = { + name: 'Ontario', + post_abbr: 'ON', + }; + + const dataBuffer = type.toBuffer(province); + await topic.publish(dataBuffer); + + const message = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for Avro revision')), 15000); + subscription.once('message', (m: Message) => { + clearTimeout(timeout); + m.ack(); + resolve(m); + }); + }); + + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + const revision = schemaMetadata.revision!; + assert.ok(revision); + + const [fetchedSchema] = await schemaClient.getSchema({ + name: `${schemaMetadata.name}@${schemaMetadata.revision}`, + }); + + const reader = avro.parse(fetchedSchema.definition!); + const result = reader.fromBuffer(message.data) as any; + + assert.strictEqual(result.name, 'Ontario'); + assert.strictEqual(result.post_abbr, 'ON'); + }); +}); diff --git a/handwritten/pubsub/system-test/batch-flow-samples.test.ts b/handwritten/pubsub/system-test/batch-flow-samples.test.ts new file mode 100644 index 000000000000..f00dd80c9462 --- /dev/null +++ b/handwritten/pubsub/system-test/batch-flow-samples.test.ts @@ -0,0 +1,111 @@ +import {Message, PubSub, PublishOptions} from '../src'; +import * as assert from 'assert'; +import {describe, it, after, before} from 'mocha'; +import {TestResources} from './testResources'; + +describe('Batch and Flow Control Samples System Tests', () => { + const pubsub = new PubSub(); + const resources = new TestResources('ps-sys-batch'); + + let topicName: string; + let subName: string; + + before(async () => { + topicName = resources.generateName('topic'); + subName = resources.generateName('sub'); + }); + + after(async () => { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + resources.filterForCleanup(subscriptions).map(x => x.delete?.()) + ); + + const [topics] = await pubsub.getTopics(); + await Promise.all( + resources.filterForCleanup(topics).map((x: any) => x.delete?.()) + ); + }); + + it('should publish batched messages', async () => { + const [topic] = await pubsub.createTopic(topicName); + const [subscription] = await topic.createSubscription(subName); + + const publishOptions: PublishOptions = { + batching: { + maxMessages: 10, + maxMilliseconds: 2000, + }, + }; + const batchPublisher = pubsub.topic(topicName, publishOptions); + + const promises: Promise[] = []; + for (let i = 0; i < 10; i++) { + promises.push(batchPublisher.publishMessage({data: Buffer.from(`message ${i}`)})); + } + + const messageIds = await Promise.all(promises); + assert.strictEqual(messageIds.length, 10); + + const messages: Message[] = []; + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for batched messages')), 15000); + subscription.on('message', (m: Message) => { + m.ack(); + messages.push(m); + if (messages.length === 10) { + clearTimeout(timeout); + subscription.removeAllListeners('message'); + resolve(); + } + }); + }); + + assert.strictEqual(messages.length, 10); + }); + + it('should publish with flow control', async () => { + const flowTopicName = resources.generateName('flow'); + const flowSubName = resources.generateName('flowsub'); + + const [topic] = await pubsub.createTopic(flowTopicName); + const [subscription] = await topic.createSubscription(flowSubName); + + const options = { + flowControlOptions: { + maxOutstandingMessages: 5, + maxOutstandingBytes: 1024, + }, + }; + + const topicWithFlow = pubsub.topic(flowTopicName, options); + const flow = topicWithFlow.flowControlled(); + + const promises = []; + for (let i = 0; i < 10; i++) { + const wait = flow.publish({data: Buffer.from('flow control message')}); + if (wait) { + await wait; + } + } + + const messageIds = await flow.all(); + assert.strictEqual(messageIds.length, 10); + + const messages: Message[] = []; + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for flow control messages')), 15000); + subscription.on('message', (m: Message) => { + m.ack(); + messages.push(m); + if (messages.length === 10) { + clearTimeout(timeout); + subscription.removeAllListeners('message'); + resolve(); + } + }); + }); + + assert.strictEqual(messages.length, 10); + }); +}); diff --git a/handwritten/pubsub/system-test/common.test.ts b/handwritten/pubsub/system-test/common.test.ts new file mode 100644 index 000000000000..77ca6642066b --- /dev/null +++ b/handwritten/pubsub/system-test/common.test.ts @@ -0,0 +1,36 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {assert} from 'chai'; +import {describe, it} from 'mocha'; +import {commandFor} from './common'; +import * as path from 'path'; + +describe('common (unit)', () => { + it('commandFor finds TS samples', () => { + const result = commandFor('createAvroSchema'); + assert.strictEqual( + result, + `node ${path.join('build', 'createAvroSchema.js')}` + ); + }); + + it('commandFor finds JS samples', () => { + const result = commandFor('createSubscription'); + assert.strictEqual( + result, + `node ${path.join('build', 'createSubscription.js')}` + ); + }); +}); diff --git a/handwritten/pubsub/system-test/common.ts b/handwritten/pubsub/system-test/common.ts new file mode 100644 index 000000000000..5529231d30ae --- /dev/null +++ b/handwritten/pubsub/system-test/common.ts @@ -0,0 +1,26 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as cp from 'child_process'; +import * as path from 'path'; + +export const execSync = (cmd: string): string => + cp.execSync(cmd, {encoding: 'utf-8'}); + +// Processed versions of TS samples go to the same build location +// as the rest of the JS samples. +export function commandFor(action: string): string { + const jsPath = path.join('build', `${action}.js`); + return `node ${jsPath}`; +} diff --git a/handwritten/pubsub/system-test/fixtures/provinces.proto b/handwritten/pubsub/system-test/fixtures/provinces.proto new file mode 100644 index 000000000000..08f05488efc0 --- /dev/null +++ b/handwritten/pubsub/system-test/fixtures/provinces.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package utilities; + +message Province { + string name = 1; + string post_abbr = 2; +} diff --git a/handwritten/pubsub/system-test/otel-samples.test.ts b/handwritten/pubsub/system-test/otel-samples.test.ts new file mode 100644 index 000000000000..2ac84b303b82 --- /dev/null +++ b/handwritten/pubsub/system-test/otel-samples.test.ts @@ -0,0 +1,66 @@ +import {Message, PubSub} from '../src'; +import * as assert from 'assert'; +import {describe, it, after, before} from 'mocha'; +import {TestResources} from './testResources'; +import {BasicTracerProvider, SimpleSpanProcessor, InMemorySpanExporter} from '@opentelemetry/sdk-trace-base'; +import {Resource} from '@opentelemetry/resources'; +import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions'; + +describe('OpenTelemetry Samples System Tests', () => { + const pubsub = new PubSub({enableOpenTelemetryTracing: true}); + const resources = new TestResources('ps-sys-otel'); + const exporter = new InMemorySpanExporter(); + + let topicName: string; + let subName: string; + let provider: BasicTracerProvider; + let processor: SimpleSpanProcessor; + + before(async () => { + topicName = resources.generateName('topic'); + subName = resources.generateName('sub'); + + provider = new BasicTracerProvider(); + processor = new SimpleSpanProcessor(exporter); + provider.addSpanProcessor(processor); + provider.register(); + }); + + after(async () => { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + resources.filterForCleanup(subscriptions).map(x => x.delete?.()) + ); + + const [topics] = await pubsub.getTopics(); + await Promise.all( + resources.filterForCleanup(topics).map((x: any) => x.delete?.()) + ); + }); + + it('should publish and listen with OpenTelemetry tracing', async () => { + const [topic] = await pubsub.createTopic(topicName); + const [subscription] = await topic.createSubscription(subName); + + const data = 'Hello, world!'; + const dataBuffer = Buffer.from(data); + + const messageId = await topic.publishMessage({data: dataBuffer}); + assert.ok(messageId); + + const message = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for OTel message')), 15000); + subscription.once('message', (m: Message) => { + clearTimeout(timeout); + m.ack(); + resolve(m); + }); + }); + + assert.strictEqual(message.data.toString(), data); + + await processor.forceFlush(); + const spans = exporter.getFinishedSpans(); + assert.ok(spans.length > 0, 'Should have generated spans'); + }); +}); diff --git a/handwritten/pubsub/system-test/protobuf-samples.test.ts b/handwritten/pubsub/system-test/protobuf-samples.test.ts new file mode 100644 index 000000000000..b096356dd515 --- /dev/null +++ b/handwritten/pubsub/system-test/protobuf-samples.test.ts @@ -0,0 +1,86 @@ +import {Message, PubSub, Schema} from '../src'; +import * as assert from 'assert'; +import {describe, it, after, before} from 'mocha'; +import {TestResources} from './testResources'; +import * as protobuf from 'protobufjs'; +import * as fs from 'fs'; + +describe('Protobuf Samples System Tests', () => { + const pubsub = new PubSub(); + const resources = new TestResources('ps-sys-proto'); + + let topicName: string; + let subName: string; + let schemaId: string; + + before(async () => { + topicName = resources.generateName('topic'); + subName = resources.generateName('sub'); + schemaId = resources.generateName('schema'); + }); + + after(async () => { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + resources.filterForCleanup(subscriptions).map(x => x.delete?.()) + ); + + const [topics] = await pubsub.getTopics(); + await Promise.all( + resources.filterForCleanup(topics).map((x: any) => x.delete?.()) + ); + + const schemas: any[] = []; + for await (const s of pubsub.listSchemas()) { + schemas.push(pubsub.schema(s.name!)); + } + await Promise.all( + resources.filterForCleanup(schemas).map(x => x.delete?.()) + ); + }); + + it('should publish and listen for protobuf messages', async () => { + const definition = fs.readFileSync('system-test/fixtures/provinces.proto').toString(); + + await pubsub.createSchema(schemaId, 'PROTOCOL_BUFFER', definition); + const [topic] = await pubsub.createTopic({ + name: topicName, + schemaSettings: { + schema: await pubsub.schema(schemaId).getName(), + encoding: 'BINARY', + }, + }); + + const [subscription] = await topic.createSubscription(subName); + + const root = await protobuf.load('system-test/fixtures/provinces.proto'); + const Province = root.lookupType('utilities.Province'); + const province = { + name: 'Ontario', + post_abbr: 'ON', + }; + + const messageObj = Province.create(province); + (messageObj as any).post_abbr = 'ON'; + const dataBuffer = Buffer.from(Province.encode(messageObj).finish()); + + const messageId = await topic.publishMessage({data: dataBuffer}); + assert.ok(messageId); + + const message = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout waiting for Proto message')), 15000); + subscription.once('message', (m: Message) => { + clearTimeout(timeout); + m.ack(); + resolve(m); + }); + }); + + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + assert.strictEqual(schemaMetadata.encoding, 'BINARY'); + + const result = Province.decode(message.data) as any; + assert.strictEqual(result.name, 'Ontario'); + assert.strictEqual(result.postAbbr || result.post_abbr, 'ON'); + }); +}); diff --git a/handwritten/pubsub/system-test/sample-tests.ts b/handwritten/pubsub/system-test/sample-tests.ts new file mode 100644 index 000000000000..a1679b4ebefb --- /dev/null +++ b/handwritten/pubsub/system-test/sample-tests.ts @@ -0,0 +1,102 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Message, PubSub} from '../src'; +import * as assert from 'assert'; +import {describe, it, after} from 'mocha'; +import {TestResources} from './testResources'; + +describe('Combined Samples Tests', () => { + const pubsub = new PubSub(); + const resources = new TestResources('pubsub_combined'); + + function topicName(testId: string): string { + return resources.generateName(testId); + } + + function subName(testId: string): string { + return resources.generateName(testId); + } + + async function cleanSubs() { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + resources.filterForCleanup(subscriptions).map(x => x.delete?.()) + ); + } + + async function cleanTopics() { + const [topics] = await pubsub.getTopics(); + await Promise.all( + resources.filterForCleanup(topics).map((x: any) => x.delete?.()) + ); + } + + after(async () => { + await cleanSubs(); + await cleanTopics(); + }); + + it('should create a topic', async () => { + const name = topicName('create'); + + // --- From sample (createTopic.js) --- + await pubsub.createTopic(name); + console.log(`Topic ${name} created.`); + + // --- From test (topics.test.ts) --- + const [topics] = await pubsub.getTopics(); + const exists = topics.some((t: any) => t.name.endsWith(name)); + assert.ok(exists, 'Topic was created'); + }); + + it('should publish a message', async () => { + const tname = topicName('publish'); + const sname = subName('publish'); + + const [topic] = await pubsub.topic(tname).get({autoCreate: true}); + const [subscription] = await topic.subscription(sname).get({autoCreate: true}); + + // --- From sample (publishMessage.js) --- + const data = 'Hello, world!'; + const dataBuffer = Buffer.from(data); + const messageId = await topic.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); + + // --- From test (topics.test.ts) --- + const message = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Timeout')), 10000); + subscription.once('message', (m: Message) => { + clearTimeout(timeout); + m.ack(); + resolve(m); + }); + }); + + assert.strictEqual(message.data.toString(), data); + }); + + it('should delete a topic', async () => { + const name = topicName('delete'); + await pubsub.topic(name).get({autoCreate: true}); + + // --- From sample (deleteTopic.js) --- + await pubsub.topic(name).delete(); + console.log(`Topic ${name} deleted.`); + + // --- From test (topics.test.ts) --- + const [exists] = await pubsub.topic(name).exists(); + assert.strictEqual(exists, false); + }); +}); diff --git a/handwritten/pubsub/system-test/testResources.test.ts b/handwritten/pubsub/system-test/testResources.test.ts new file mode 100644 index 000000000000..0ddd84655bd0 --- /dev/null +++ b/handwritten/pubsub/system-test/testResources.test.ts @@ -0,0 +1,84 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {assert} from 'chai'; +import {describe, it, beforeEach} from 'mocha'; +import {TestResources} from './testResources'; + +describe('testResources (unit)', () => { + const fixedId = 'fixed'; + const fixedTime = Date.now(); + const fakeTokenMaker = { + uuid: () => fixedId, + timestamp: () => fixedTime, + }; + + const suiteId = 'someSuite'; + let testResources!: TestResources; + + beforeEach(() => { + testResources = new TestResources(suiteId, fakeTokenMaker); + }); + + it('has predictable prefixes', () => { + const prefix = testResources.getPrefix('testId'); + assert.strictEqual(prefix, `${suiteId}-${fixedTime}-testId`); + + const normalizedPrefix = testResources.getPrefix('test-id-dashes'); + assert.strictEqual( + normalizedPrefix, + `${suiteId}-${fixedTime}-test_id_dashes` + ); + + const suitePrefix = testResources.getPrefix(); + assert.strictEqual(suitePrefix, `${suiteId}-${fixedTime}`); + }); + + it('generates names', () => { + const prefix = testResources.getPrefix('testId'); + const name = testResources.generateName('testId'); + assert.strictEqual(name, `${prefix}-${fixedId}`); + }); + + it('filters for cleanup', () => { + const resources = [ + { + // Not related + name: 'ooga', + }, + { + // For current test run + name: `${suiteId}-${fixedTime}-bob-98719284791`, + }, + { + // For previous test run, but not very old + name: `${suiteId}-${fixedTime - 100}-bob-124897912`, + }, + { + // For previous test run, but old + name: `${suiteId}-${fixedTime - 3000 * 60 * 60}-bob-57823975`, + }, + ]; + const filtered = testResources.filterForCleanup(resources); + assert.strictEqual(filtered.length, 2); + assert.strictEqual( + 1, + filtered.filter(r => r.name?.includes('bob-9871')).length + ); + assert.strictEqual( + 1, + filtered.filter(r => r.name?.includes('bob-5782')).length + ); + }); +}); diff --git a/handwritten/pubsub/system-test/testResources.ts b/handwritten/pubsub/system-test/testResources.ts new file mode 100644 index 000000000000..3f5665096201 --- /dev/null +++ b/handwritten/pubsub/system-test/testResources.ts @@ -0,0 +1,182 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// I don't like that these two files (this plus ".test") are duplicated +// across the two test structures, but because of the tangle of rootDirs +// and package.json "files", it's hard to avoid it. + +import * as uuid from 'uuid'; + +// Returns a shortened UUID that can be used to identify a +// specific run of a specific test. +function shortUUID() { + return uuid.v4().split('-').shift()!; +} + +export interface TokenMaker { + uuid(): string; + timestamp(): number; +} + +export const defaultMaker = { + uuid: shortUUID, + timestamp: () => Date.now(), +}; + +export interface Resource { + name?: string | null | undefined; + delete?(): Promise; +} + +function normalizeId(id: string): string { + return id.replace(/-/g, '_'); +} + +/** + * Manages the names of testing resources during a test run. It's + * easily to accidentally leak resources, and it's easy to accidentally + * have conflicts with tests running concurrently, so this class helps + * you manage them. + * + * Used nomenclature: + * Test - a single test for a single aspect of code; for example, + * "create a topic in pub/sub" + * Test Suite - a collection of tests that are generally run together; + * for example, "test topic operations in pub/sub" + * Test Run - a single run of a test suite (or single test within a suite); + * for example, "run the tests for PR #1234, 5th attempt" + */ +export class TestResources { + testSuiteId: string; + currentTime: string; + tokenMaker: TokenMaker; + + /** + * @param testSuiteId [string] A unique ID for a test suite (e.g. + * pubsub-topics). + */ + constructor(testSuiteId: string, tokenMaker: TokenMaker = defaultMaker) { + this.testSuiteId = normalizeId(testSuiteId); + this.currentTime = `${tokenMaker.timestamp()}`; + this.tokenMaker = tokenMaker; + } + + /** + * Returns the resource prefix for the current run of the test suite. + * Optionally, testId may specify the specific ID of a test in the + * suite. + */ + getPrefix(testId?: string): string { + if (testId) { + return [this.testSuiteId, this.currentTime, normalizeId(testId)].join( + '-', + ); + } else { + return [this.testSuiteId, this.currentTime].join('-'); + } + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite. + */ + generateName(testId: string): string { + return [this.getPrefix(testId), this.tokenMaker.uuid()].join('-'); + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite for BigQuery resources. + */ + generateBigQueryName(testId: string): string { + return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( + '_', + ); + } + + /** + * Generates a unique resource name for one run of a test within + * a test suite for Cloud Storage resources. + */ + generateStorageName(testId: string): string { + return [normalizeId(this.getPrefix(testId)), this.tokenMaker.uuid()].join( + '_', + ); + } + + /** + * Given a list of resource names (and a test ID), this will return + * a list of all resources that should be deleted to clean up for + * the current run of that particular test. + */ + filterForTest(testId: string, allResources: Resource[]): Resource[] { + const prefix = this.getPrefix(testId); + return allResources.filter(n => n.name?.includes(prefix)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to clean up after the current + * run of a test suite. + */ + filterForCurrentRun(allResources: Resource[]): Resource[] { + const prefix = this.getPrefix(); + return allResources.filter(n => n.name?.includes(prefix)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to clean up after any run + * of the current test suite. Note that some of the names may + * still be in use. + */ + filterForSuite(allResources: Resource[]): Resource[] { + return allResources.filter(n => n.name?.includes(this.testSuiteId)); + } + + /** + * Given a list of resource names, this will return a list of all + * resources that should be deleted to generally clean up after any + * run of the current test suite. This is much like filterForSuite(), + * but it also filters by age - items that are less than 2 hours + * old will not be cleaned. + */ + filterForCleanup(allResources: Resource[]): Resource[] { + const currentRunPrefix = this.getPrefix(); + return allResources.filter(n => { + let name = n.name || undefined; + if (name === undefined) { + return false; + } + + // We'll always get at least one thing. + name = name.split('/').pop()!; + + if (name.startsWith(currentRunPrefix)) { + return true; + } + + if (name.startsWith(this.testSuiteId)) { + const parts = name.split('-'); + const createdAt = Number(parts[1]); + const timeDiff = (Date.now() - createdAt) / (1000 * 60 * 60); + if (timeDiff >= 2) { + return true; + } + } + + return false; + }); + } +}