diff --git a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts index d2eff204104..1de88269d24 100644 --- a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts +++ b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts @@ -14,18 +14,27 @@ * limitations under the License. */ -import {ServerDuplexStream, status} from '@grpc/grpc-js'; -import {Spanner} from '../../src'; -import {trace, context, Tracer} from '@opentelemetry/api'; +import { ServerDuplexStream, status } from '@grpc/grpc-js'; +import { Spanner } from '../../src'; +import { trace, context, Tracer } from '@opentelemetry/api'; import * as protos from '../../protos/protos'; -import {CloudUtil} from './cloud-util'; -import {OutcomeSender, ExecutionFlowContextInterface} from './cloud-executor'; +import { CloudUtil } from './cloud-util'; +import { + OutcomeSender, + ExecutionFlowContextInterface, + CloudExecutor, +} from './cloud-executor'; import spanner = protos.google.spanner; import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest; import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse; +import SpannerActionOutcome = spanner.executor.v1.SpannerActionOutcome; import ISpannerAction = spanner.executor.v1.ISpannerAction; import IAdminAction = spanner.executor.v1.IAdminAction; import ICreateCloudInstanceAction = spanner.executor.v1.ICreateCloudInstanceAction; +import IUpdateCloudInstanceAction = spanner.executor.v1.IUpdateCloudInstanceAction; +import IDeleteCloudInstanceAction = spanner.executor.v1.IDeleteCloudInstanceAction; +import IListCloudInstancesAction = spanner.executor.v1.IListCloudInstancesAction; +import IGetCloudInstanceAction = spanner.executor.v1.IGetCloudInstanceAction; /** * Context for a single stream connection. @@ -66,9 +75,11 @@ export class ExecutionFlowContext implements ExecutionFlowContextInterface { * Sends an error back to the client. */ public onError(error: Error): void { - const stream = this.call as any; - - if (this.call.cancelled || stream.destroyed || stream.writable === false) { + if ( + this.call.cancelled || + this.call.destroyed || + this.call.writable === false + ) { console.warn( 'Attempted to emit error to a closed or cancelled stream.', error, @@ -99,6 +110,23 @@ export class CloudClientExecutor { action as ICreateCloudInstanceAction, sender, ), + updateCloudInstance: (action, sender) => + this.executeUpdateCloudInstance( + action as IUpdateCloudInstanceAction, + sender, + ), + deleteCloudInstance: (action, sender) => + this.executeDeleteCloudInstance( + action as IDeleteCloudInstanceAction, + sender, + ), + listCloudInstances: (action, sender) => + this.executeListCloudInstances( + action as IListCloudInstancesAction, + sender, + ), + getCloudInstance: (action, sender) => + this.executeGetCloudInstance(action as IGetCloudInstanceAction, sender), }; private readonly actionRegistry: Record = { @@ -130,7 +158,7 @@ export class CloudClientExecutor { public startHandlingRequest( req: SpannerAsyncActionRequest, executionContext: ExecutionFlowContext, - ): {code: number; details: string} { + ): { code: number; details: string } { const outcomeSender = new OutcomeSender(req.actionId!, executionContext); if (!req.action) { @@ -144,7 +172,7 @@ export class CloudClientExecutor { outcomeSender.finishWithError(err); }); - return {code: status.OK, details: ''}; + return { code: status.OK, details: '' }; } /** @@ -155,10 +183,8 @@ export class CloudClientExecutor { action: ISpannerAction, ): Promise { const actionType = - Object.keys(action).find( - k => - action[k as keyof typeof action] !== undefined && - !!this.actionRegistry[k], + Object.keys(this.actionRegistry).find( + k => action[k as keyof typeof action] !== undefined, ) || 'unknown'; const span = this.tracer.startSpan(`performaction_${actionType}`); @@ -195,10 +221,8 @@ export class CloudClientExecutor { sender: OutcomeSender, ): Promise { try { - const adminType = Object.keys(action).find( - k => - action[k as keyof typeof action] !== undefined && - !!this.adminActionRegistry[k], + const adminType = Object.keys(this.adminActionRegistry).find( + k => action[k as keyof typeof action] !== undefined, ); if (adminType && this.adminActionRegistry[adminType]) { @@ -226,21 +250,28 @@ export class CloudClientExecutor { console.log(`Creating instance: \n${JSON.stringify(action, null, 2)}`); const instanceId = action.instanceId!; - const projectId = action.projectId!; + const projectId = action.projectId || CloudExecutor.PROJECT_ID; const configId = action.instanceConfigId!; const instanceAdminClient = this.spanner.getInstanceAdminClient(); + const instancePayload: any = { + config: instanceAdminClient.instanceConfigPath(projectId, configId), + displayName: instanceId, + labels: action.labels || {}, + }; + + if (action.nodeCount !== undefined) { + instancePayload.nodeCount = action.nodeCount; + } + if (action.processingUnits !== undefined) { + instancePayload.processingUnits = action.processingUnits; + } + const [operation] = await instanceAdminClient.createInstance({ parent: instanceAdminClient.projectPath(projectId), instanceId: instanceId, - instance: { - config: instanceAdminClient.instanceConfigPath(projectId, configId), - displayName: instanceId, - nodeCount: action.nodeCount || 1, - processingUnits: action.processingUnits, - labels: action.labels || {}, - }, + instance: instancePayload, }); console.log('Waiting for instance creation operation to complete...'); @@ -259,4 +290,144 @@ export class CloudClientExecutor { sender.finishWithError(err); } } + + private async executeUpdateCloudInstance( + action: IUpdateCloudInstanceAction, + sender: OutcomeSender, + ): Promise { + try { + console.log(`Updating instance: \n${JSON.stringify(action, null, 2)}`); + + const instanceId = action.instanceId!; + const projectId = action.projectId || CloudExecutor.PROJECT_ID; + + const instanceAdminClient = this.spanner.getInstanceAdminClient(); + + const paths: string[] = []; + if (action.displayName !== undefined) paths.push('display_name'); + if (action.nodeCount !== undefined) paths.push('node_count'); + if (action.processingUnits !== undefined) paths.push('processing_units'); + if (action.labels && Object.keys(action.labels).length > 0) + paths.push('labels'); + + const [operation] = await instanceAdminClient.updateInstance({ + instance: { + name: instanceAdminClient.instancePath(projectId, instanceId), + displayName: action.displayName, + nodeCount: action.nodeCount, + processingUnits: action.processingUnits, + labels: action.labels, + }, + fieldMask: { paths: paths }, + }); + + console.log('Waiting for instance update operation to complete...'); + await operation.promise(); + + console.log(`Instance ${instanceId} updated successfully.`); + + sender.finishWithOK(); + } catch (err: any) { + console.error('Failed to update instance:', err); + sender.finishWithError(err); + } + } + + private async executeDeleteCloudInstance( + action: IDeleteCloudInstanceAction, + sender: OutcomeSender, + ): Promise { + try { + console.log(`Deleting instance: \n${JSON.stringify(action, null, 2)}`); + + const instanceId = action.instanceId!; + const projectId = action.projectId || CloudExecutor.PROJECT_ID; + + const instanceAdminClient = this.spanner.getInstanceAdminClient(); + + await instanceAdminClient.deleteInstance({ + name: instanceAdminClient.instancePath(projectId, instanceId), + }); + + console.log(`Instance ${instanceId} deleted successfully.`); + + sender.finishWithOK(); + } catch (err: any) { + console.error('Failed to delete instance:', err); + sender.finishWithError(err); + } + } + + private async executeListCloudInstances( + action: IListCloudInstancesAction, + sender: OutcomeSender, + ): Promise { + try { + console.log(`Listing instances: \n${JSON.stringify(action, null, 2)}`); + + const projectId = action.projectId || CloudExecutor.PROJECT_ID; + + const instanceAdminClient = this.spanner.getInstanceAdminClient(); + + const [instances, , response] = await instanceAdminClient.listInstances({ + parent: instanceAdminClient.projectPath(projectId), + filter: action.filter, + pageSize: action.pageSize, + pageToken: action.pageToken, + }); + + console.log(`Found ${instances.length} instances.`); + + const outcome = SpannerActionOutcome.create({ + status: CloudExecutor.toProto(status.OK), + commitTime: { seconds: 0, nanos: 0 }, + adminResult: { + instanceResponse: { + listedInstances: instances, + nextPageToken: response?.nextPageToken || '', + }, + }, + }); + + sender.sendOutcome(outcome); + } catch (err: any) { + console.error('Failed to list instances:', err); + sender.finishWithError(err); + } + } + + private async executeGetCloudInstance( + action: IGetCloudInstanceAction, + sender: OutcomeSender, + ): Promise { + try { + console.log(`Getting instance: \n${JSON.stringify(action, null, 2)}`); + + const instanceId = action.instanceId!; + const projectId = action.projectId || CloudExecutor.PROJECT_ID; + + const instanceAdminClient = this.spanner.getInstanceAdminClient(); + + const [instance] = await instanceAdminClient.getInstance({ + name: instanceAdminClient.instancePath(projectId, instanceId), + }); + + console.log(`Found instance: ${instance.name}`); + + const outcome = SpannerActionOutcome.create({ + status: CloudExecutor.toProto(status.OK), + commitTime: { seconds: 0, nanos: 0 }, + adminResult: { + instanceResponse: { + instance: instance, + }, + }, + }); + + sender.sendOutcome(outcome); + } catch (err: any) { + console.error('Failed to get instance:', err); + sender.finishWithError(err); + } + } } diff --git a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts index 09028bef447..980a9375858 100644 --- a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts +++ b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts @@ -15,7 +15,7 @@ */ import {ServerDuplexStream, status} from '@grpc/grpc-js'; -import {trace, context, Tracer} from '@opentelemetry/api'; +import {trace, context, Tracer, SpanStatusCode} from '@opentelemetry/api'; import {CloudClientExecutor} from './cloud-client-executor'; import * as protos from '../../protos/protos'; import spanner = protos.google.spanner; @@ -86,6 +86,7 @@ export class CloudExecutorImpl { context.with(streamContext, () => { console.error('Client ends the stream with error.', err); span.recordException(err); + span.setStatus({code: SpanStatusCode.ERROR, message: err.message}); span.end(); executionContext.cleanup(); }); diff --git a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor.ts b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor.ts index f494946562b..78ad2d7793e 100644 --- a/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor.ts +++ b/handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor.ts @@ -46,6 +46,7 @@ export class OutcomeSender { public finishWithOK(): {code: number; details: string} { const outcome = SpannerActionOutcome.create({ status: CloudExecutor.toProto(status.OK), + commitTime: {seconds: 0, nanos: 0}, }); return this.sendOutcome(outcome); } @@ -54,11 +55,12 @@ export class OutcomeSender { const s = CloudExecutor.toStatus(err); const outcome = SpannerActionOutcome.create({ status: CloudExecutor.toProto(s.code, s.message), + commitTime: {seconds: 0, nanos: 0}, }); return this.sendOutcome(outcome); } - private sendOutcome(outcome: SpannerActionOutcome): { + public sendOutcome(outcome: SpannerActionOutcome): { code: number; details: string; } {