Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions handwritten/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@
"p-defer": "^3.0.0"
},
"devDependencies": {
"@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1",
"@grpc/proto-loader": "^0.8.0",
"@opentelemetry/sdk-trace-base": "^1.17.0",
"@opentelemetry/sdk-trace-node": "^2.7.0",
"@types/chai": "^5.2.3",
"@types/duplexify": "^3.6.4",
"@types/extend": "^3.0.4",
"@types/lodash.snakecase": "^4.1.9",
Expand All @@ -84,7 +87,9 @@
"@types/sinon": "^21.0.0",
"@types/tmp": "^0.2.6",
"@types/uuid": "^11.0.0",
"avro-js": "^1.12.1",
"c8": "^10.1.3",
"chai": "^6.2.2",
"codecov": "^3.8.3",
"execa": "~5.1.0",
"gapic-tools": "^1.0.1",
Expand Down
10 changes: 10 additions & 0 deletions handwritten/pubsub/system-test/avro-js.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
declare module 'avro-js' {
function parse(def: string): Parser;

class Parser {
fromBuffer<T>(buf: Buffer): T;
fromString<T>(str: string): T;
toBuffer<T>(item: T): Buffer;
toString<T>(item: T): string;
}
}
128 changes: 128 additions & 0 deletions handwritten/pubsub/system-test/avro-samples.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import {Message, PubSub, Schema} from '../src';
import * as assert from 'assert';
import {describe, it, after, before} from 'mocha';
import {TestResources} from './testResources';
import * as avro from 'avro-js';
import * as fs from 'fs';

describe('Avro Samples System Tests', () => {
const pubsub = new PubSub();
const resources = new TestResources('ps-sys-avro');

let topicName: string;
let subName: string;
let schemaId: string;

before(async () => {
topicName = resources.generateName('topic');
subName = resources.generateName('sub');
schemaId = resources.generateName('schema');

const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString();
await pubsub.createSchema(schemaId, 'AVRO', definition);
await pubsub.createTopic({
name: topicName,
schemaSettings: {
schema: await pubsub.schema(schemaId).getName(),
encoding: 'BINARY',
},
});

const [topic] = await pubsub.topic(topicName).get();
await topic.createSubscription(subName);
});

after(async () => {
const [subscriptions] = await pubsub.getSubscriptions();
await Promise.all(
resources.filterForCleanup(subscriptions).map(x => x.delete?.())
);

const [topics] = await pubsub.getTopics();
await Promise.all(
resources.filterForCleanup(topics).map((x: any) => x.delete?.())
);

const schemas: any[] = [];
for await (const s of pubsub.listSchemas()) {
schemas.push(pubsub.schema(s.name!));
}
await Promise.all(
resources.filterForCleanup(schemas).map(x => x.delete?.())
);
});

it('should publish and listen for avro records', async () => {
const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString();
const [topic] = await pubsub.topic(topicName).get();
const [subscription] = await pubsub.subscription(subName).get();

const type = avro.parse(definition);

const province = {
name: 'Ontario',
post_abbr: 'ON',
};

const dataBuffer = type.toBuffer(province);
const messageId = await topic.publish(dataBuffer);
assert.ok(messageId);

const message = await new Promise<Message>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Timeout waiting for Avro record')), 15000);
subscription.once('message', (m: Message) => {
clearTimeout(timeout);
m.ack();
resolve(m);
});
});

const schemaMetadata = Schema.metadataFromMessage(message.attributes);
assert.strictEqual(schemaMetadata.encoding, 'BINARY');

const result = type.fromBuffer(message.data) as any;
assert.strictEqual(result.name, 'Ontario');
assert.strictEqual(result.post_abbr, 'ON');
});

it('should listen for avro records with revisions', async () => {
const definition = fs.readFileSync('system-test/fixtures/provinces.avsc').toString();

const schemaClient = await pubsub.getSchemaClient();

const [topic] = await pubsub.topic(topicName).get();
const [subscription] = await pubsub.subscription(subName).get();

const type = avro.parse(definition);
const province = {
name: 'Ontario',
post_abbr: 'ON',
};

const dataBuffer = type.toBuffer(province);
await topic.publish(dataBuffer);

const message = await new Promise<Message>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Timeout waiting for Avro revision')), 15000);
subscription.once('message', (m: Message) => {
clearTimeout(timeout);
m.ack();
resolve(m);
});
});

const schemaMetadata = Schema.metadataFromMessage(message.attributes);
const revision = schemaMetadata.revision!;
assert.ok(revision);

const [fetchedSchema] = await schemaClient.getSchema({
name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
});

const reader = avro.parse(fetchedSchema.definition!);
const result = reader.fromBuffer(message.data) as any;

assert.strictEqual(result.name, 'Ontario');
assert.strictEqual(result.post_abbr, 'ON');
});
});
111 changes: 111 additions & 0 deletions handwritten/pubsub/system-test/batch-flow-samples.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import {Message, PubSub, PublishOptions} from '../src';
import * as assert from 'assert';
import {describe, it, after, before} from 'mocha';
import {TestResources} from './testResources';

describe('Batch and Flow Control Samples System Tests', () => {
const pubsub = new PubSub();
const resources = new TestResources('ps-sys-batch');

let topicName: string;
let subName: string;

before(async () => {
topicName = resources.generateName('topic');
subName = resources.generateName('sub');
});

after(async () => {
const [subscriptions] = await pubsub.getSubscriptions();
await Promise.all(
resources.filterForCleanup(subscriptions).map(x => x.delete?.())
);

const [topics] = await pubsub.getTopics();
await Promise.all(
resources.filterForCleanup(topics).map((x: any) => x.delete?.())
);
});

it('should publish batched messages', async () => {
const [topic] = await pubsub.createTopic(topicName);
const [subscription] = await topic.createSubscription(subName);

const publishOptions: PublishOptions = {
batching: {
maxMessages: 10,
maxMilliseconds: 2000,
},
};
const batchPublisher = pubsub.topic(topicName, publishOptions);

const promises: Promise<string>[] = [];
for (let i = 0; i < 10; i++) {
promises.push(batchPublisher.publishMessage({data: Buffer.from(`message ${i}`)}));
}

const messageIds = await Promise.all(promises);
assert.strictEqual(messageIds.length, 10);

const messages: Message[] = [];
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Timeout waiting for batched messages')), 15000);
subscription.on('message', (m: Message) => {
m.ack();
messages.push(m);
if (messages.length === 10) {
clearTimeout(timeout);
subscription.removeAllListeners('message');
resolve();
}
});
});

assert.strictEqual(messages.length, 10);
});

it('should publish with flow control', async () => {
const flowTopicName = resources.generateName('flow');
const flowSubName = resources.generateName('flowsub');

const [topic] = await pubsub.createTopic(flowTopicName);
const [subscription] = await topic.createSubscription(flowSubName);

const options = {
flowControlOptions: {
maxOutstandingMessages: 5,
maxOutstandingBytes: 1024,
},
};

const topicWithFlow = pubsub.topic(flowTopicName, options);
const flow = topicWithFlow.flowControlled();

const promises = [];
for (let i = 0; i < 10; i++) {
const wait = flow.publish({data: Buffer.from('flow control message')});
if (wait) {
await wait;
}
}

const messageIds = await flow.all();
assert.strictEqual(messageIds.length, 10);

const messages: Message[] = [];
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Timeout waiting for flow control messages')), 15000);
subscription.on('message', (m: Message) => {
m.ack();
messages.push(m);
if (messages.length === 10) {
clearTimeout(timeout);
subscription.removeAllListeners('message');
resolve();
}
});
});

assert.strictEqual(messages.length, 10);
});
});
36 changes: 36 additions & 0 deletions handwritten/pubsub/system-test/common.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {assert} from 'chai';
import {describe, it} from 'mocha';
import {commandFor} from './common';
import * as path from 'path';

describe('common (unit)', () => {
it('commandFor finds TS samples', () => {
const result = commandFor('createAvroSchema');
assert.strictEqual(
result,
`node ${path.join('build', 'createAvroSchema.js')}`
);
});

it('commandFor finds JS samples', () => {
const result = commandFor('createSubscription');
assert.strictEqual(
result,
`node ${path.join('build', 'createSubscription.js')}`
);
});
});
26 changes: 26 additions & 0 deletions handwritten/pubsub/system-test/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as cp from 'child_process';
import * as path from 'path';

export const execSync = (cmd: string): string =>
cp.execSync(cmd, {encoding: 'utf-8'});

// Processed versions of TS samples go to the same build location
// as the rest of the JS samples.
export function commandFor(action: string): string {
const jsPath = path.join('build', `${action}.js`);
return `node ${jsPath}`;
}
8 changes: 8 additions & 0 deletions handwritten/pubsub/system-test/fixtures/provinces.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package utilities;

message Province {
string name = 1;
string post_abbr = 2;
}
Loading
Loading