From adfb856102956af87382551183d124acc22129bc Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 19 Jun 2026 17:15:32 +0200 Subject: [PATCH] bundle: record deployment history in DMS after approval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Records each approved deploy/destroy as a version with the Deployment Metadata Service (DMS), gated by experimental.record_deployment_history and the direct engine. The version is created only after the plan is approved — a cancelled or declined deploy/destroy records nothing, so there are no empty/abandoned versions for operations that never ran. - libs/dms: Recorder with CreateVersion / CompleteVersion. The deployment ID is the state lineage (from GetOrInitLineage), so a bundle deployment maps one-to-one to a DMS deployment record. GetDeployment first, CreateDeployment only when missing, then create the next version; heartbeat keeps the version's lease alive; CompleteVersion records success/failure and, for destroy, deletes the deployment record on success. Independent of bundle/lock. - phases: newDeploymentRecorder builds the recorder from the bundle (nil unless the flag is set and the engine is direct); deploy/destroy create the version inside the approved branch (after UpgradeToWrite, so the lineage is already in the WAL) and complete it in the deferred lock release. - libs/testserver: in-memory DMS handlers under /api/2.0/bundle/... - acceptance/bundle/dms: deploy/redeploy/destroy record versions and hold the file lock; redeploy after deleting .databricks recovers the lineage from remote state; enabling the flag after a plain deploy creates a new deployment; a declined destroy records no version and does not delete the deployment. Co-authored-by: Shreyas Goenka --- .../dms/destroy-declined/databricks.yml | 10 + .../bundle/dms/destroy-declined/out.test.toml | 3 + .../bundle/dms/destroy-declined/output.txt | 56 +++++ acceptance/bundle/dms/destroy-declined/script | 14 ++ .../dms/enable-after-deploy/databricks.yml | 7 + .../dms/enable-after-deploy/out.test.toml | 3 + .../bundle/dms/enable-after-deploy/output.txt | 88 ++++++++ .../bundle/dms/enable-after-deploy/script | 25 +++ acceptance/bundle/dms/record/databricks.yml | 10 + acceptance/bundle/dms/record/out.test.toml | 3 + acceptance/bundle/dms/record/output.txt | 148 ++++++++++++ acceptance/bundle/dms/record/script | 23 ++ acceptance/bundle/dms/test.toml | 5 + bundle/phases/deploy.go | 20 +- bundle/phases/destroy.go | 14 ++ bundle/phases/dms.go | 32 +++ libs/dms/recorder.go | 212 ++++++++++++++++++ libs/testserver/bundle.go | 106 +++++++++ libs/testserver/fake_workspace.go | 5 + libs/testserver/handlers.go | 20 ++ 20 files changed, 803 insertions(+), 1 deletion(-) create mode 100644 acceptance/bundle/dms/destroy-declined/databricks.yml create mode 100644 acceptance/bundle/dms/destroy-declined/out.test.toml create mode 100644 acceptance/bundle/dms/destroy-declined/output.txt create mode 100644 acceptance/bundle/dms/destroy-declined/script create mode 100644 acceptance/bundle/dms/enable-after-deploy/databricks.yml create mode 100644 acceptance/bundle/dms/enable-after-deploy/out.test.toml create mode 100644 acceptance/bundle/dms/enable-after-deploy/output.txt create mode 100644 acceptance/bundle/dms/enable-after-deploy/script create mode 100644 acceptance/bundle/dms/record/databricks.yml create mode 100644 acceptance/bundle/dms/record/out.test.toml create mode 100644 acceptance/bundle/dms/record/output.txt create mode 100644 acceptance/bundle/dms/record/script create mode 100644 acceptance/bundle/dms/test.toml create mode 100644 bundle/phases/dms.go create mode 100644 libs/dms/recorder.go create mode 100644 libs/testserver/bundle.go diff --git a/acceptance/bundle/dms/destroy-declined/databricks.yml b/acceptance/bundle/dms/destroy-declined/databricks.yml new file mode 100644 index 00000000000..98cdf3c451f --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-destroy-declined + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/destroy-declined/out.test.toml b/acceptance/bundle/dms/destroy-declined/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/destroy-declined/output.txt b/acceptance/bundle/dms/destroy-declined/output.txt new file mode 100644 index 00000000000..1b7308fa2ff --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/output.txt @@ -0,0 +1,56 @@ + +=== Deploy creates the DMS deployment + version 1 +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-destroy-declined/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== Destroy without --auto-approve is declined: it must not create a DMS version or delete the deployment +>>> [CLI] bundle destroy +Error: this command will destroy all resources deployed by this bundle, including workspace files in the deployment directory. +Deleting data assets such as schemas, pipelines, or volumes may cause permanent data loss and should be carefully reviewed. +To proceed, use --auto-approve. + +Exit code: 1 + +=== Only the deploy's DMS calls were recorded — no destroy version, no DELETE from the declined destroy +>>> print_requests.py //api/2.0/bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-destroy-declined/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/destroy-declined/script b/acceptance/bundle/dms/destroy-declined/script new file mode 100644 index 00000000000..9d791d212c8 --- /dev/null +++ b/acceptance/bundle/dms/destroy-declined/script @@ -0,0 +1,14 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy creates the DMS deployment + version 1" +trace $CLI bundle deploy + +title "Destroy without --auto-approve is declined: it must not create a DMS version or delete the deployment" +errcode trace $CLI bundle destroy + +title "Only the deploy's DMS calls were recorded — no destroy version, no DELETE from the declined destroy" +trace print_requests.py //api/2.0/bundle diff --git a/acceptance/bundle/dms/enable-after-deploy/databricks.yml b/acceptance/bundle/dms/enable-after-deploy/databricks.yml new file mode 100644 index 00000000000..cd8ff90b266 --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-enable-after-deploy + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/enable-after-deploy/out.test.toml b/acceptance/bundle/dms/enable-after-deploy/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/enable-after-deploy/output.txt b/acceptance/bundle/dms/enable-after-deploy/output.txt new file mode 100644 index 00000000000..0d9addaa6ee --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/output.txt @@ -0,0 +1,88 @@ + +=== Deploy without the feature: no DMS calls are made +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Enable experimental.record_deployment_history +=== Delete local cache (.databricks) so the next deploy cannot rely on it +=== Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/enable-after-deploy/script b/acceptance/bundle/dms/enable-after-deploy/script new file mode 100644 index 00000000000..c17b3b859dc --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/script @@ -0,0 +1,25 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy without the feature: no DMS calls are made" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock + +title "Enable experimental.record_deployment_history" +cat >> databricks.yml <<'YAML' + +experimental: + record_deployment_history: true +YAML + +title "Delete local cache (.databricks) so the next deploy cannot rely on it" +rm -rf .databricks + +title "Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache)" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/record/databricks.yml b/acceptance/bundle/dms/record/databricks.yml new file mode 100644 index 00000000000..b20e6274310 --- /dev/null +++ b/acceptance/bundle/dms/record/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-record + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/record/out.test.toml b/acceptance/bundle/dms/record/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/record/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/record/output.txt b/acceptance/bundle/dms/record/output.txt new file mode 100644 index 00000000000..ea51648031d --- /dev/null +++ b/acceptance/bundle/dms/record/output.txt @@ -0,0 +1,148 @@ + +=== Deploy: a DMS deployment + version are recorded while the file lock is held +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== DMS API calls made during deploy (create deployment, create + complete version) +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +=== The workspace-filesystem lock is applied alongside DMS (deploy.lock written) +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Destroy: a DMS version is recorded while the file lock is held +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-record/default + +Deleting files... +Destroy complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DESTROY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "DELETE", + "path": "/api/2.0/bundle/deployments/[UUID]" +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} diff --git a/acceptance/bundle/dms/record/script b/acceptance/bundle/dms/record/script new file mode 100644 index 00000000000..afb9b05bdc3 --- /dev/null +++ b/acceptance/bundle/dms/record/script @@ -0,0 +1,23 @@ +cleanup() { + title "Destroy: a DMS version is recorded while the file lock is held" + trace $CLI bundle destroy --auto-approve + trace print_requests.py //api/2.0/bundle --keep + trace print_requests.py //deploy.lock + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy: a DMS deployment + version are recorded while the file lock is held" +trace $CLI bundle deploy + +title "DMS API calls made during deploy (create deployment, create + complete version)" +trace print_requests.py //api/2.0/bundle --keep + +title "The workspace-filesystem lock is applied alongside DMS (deploy.lock written)" +trace print_requests.py //deploy.lock + +title "Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment)" +rm -rf .databricks +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..84b939a7da2 --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,5 @@ +# Deployment Metadata Service (DMS) recording is only meaningful in the direct +# engine, where the deployment ID is derived from the state lineage. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] + +RecordRequests = true diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index fd76151483c..d4766af79c6 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -22,6 +22,7 @@ import ( "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/libs/agent" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/dms" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/cli/libs/sync" @@ -142,8 +143,16 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - // lock is acquired here + // lock is acquired here. + // + // Set up DMS recording of this deployment as a version. The version is not + // created until the plan is approved (below), so a cancelled deploy records + // nothing; the deferred CompleteVersion is a no-op until CreateVersion runs. + recorder := newDeploymentRecorder(ctx, b, engine, dms.VersionTypeDeploy) defer func() { + if err := recorder.CompleteVersion(ctx, !logdiag.HasError(ctx)); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) }() @@ -208,6 +217,15 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } if haveApproval { + // Record the DMS version now that the plan is approved and the state WAL + // (with the lineage) has been opened. CreateVersion requests + // version_id == last_version_id + 1; the server returns ABORTED if a + // concurrent deploy advanced the deployment since the plan was computed, + // so a stale plan is not applied. + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } deployCore(ctx, b, plan, engine) } else { cmdio.LogString(ctx, "Deployment cancelled!") diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 74049f26f42..1fdf898b425 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/bundle/direct" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dms" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/databricks-sdk-go/apierr" @@ -127,7 +128,14 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } + // Set up DMS recording of this destroy as a version. The version is not + // created until the destroy is approved (below), so a cancelled destroy + // records nothing; the deferred CompleteVersion is a no-op until then. + recorder := newDeploymentRecorder(ctx, b, engine, dms.VersionTypeDestroy) defer func() { + if err := recorder.CompleteVersion(ctx, !logdiag.HasError(ctx)); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) }() @@ -184,6 +192,12 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } } + // Record the DMS version now that the destroy is approved and the state + // WAL (with the lineage) has been opened. + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } destroyCore(ctx, b, plan, engine) } else { cmdio.LogString(ctx, "Destroy cancelled!") diff --git a/bundle/phases/dms.go b/bundle/phases/dms.go new file mode 100644 index 00000000000..1427be1c606 --- /dev/null +++ b/bundle/phases/dms.go @@ -0,0 +1,32 @@ +package phases + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/engine" + "github.com/databricks/cli/libs/dms" +) + +// newDeploymentRecorder returns a dms.Recorder for the current deployment, or +// nil when DMS recording does not apply. A nil recorder is a no-op, so callers +// do not need to branch on it. +// +// Recording is enabled only when experimental.record_deployment_history is set +// AND the engine is direct: the deployment ID is the state lineage, which is +// only populated for the direct engine (the terraform engine never opens the +// state DB). Returning nil for terraform leaves those deployments untouched. +func newDeploymentRecorder(ctx context.Context, b *bundle.Bundle, eng engine.EngineType, versionType dms.VersionType) *dms.Recorder { + if b.Config.Experimental == nil || !b.Config.Experimental.RecordDeploymentHistory { + return nil + } + if !eng.IsDirect() { + return nil + } + return dms.NewRecorder( + b.WorkspaceClient(ctx).BundleDeployments, + b.DeploymentBundle.StateDB.GetOrInitLineage(), + b.Config.Bundle.Target, + versionType, + ) +} diff --git a/libs/dms/recorder.go b/libs/dms/recorder.go new file mode 100644 index 00000000000..4ad203b7f3c --- /dev/null +++ b/libs/dms/recorder.go @@ -0,0 +1,212 @@ +// Package dms records bundle deployments as versions with the Deployment +// Metadata Service (DMS). +// +// It is intentionally independent of the deployment lock: a Recorder does not +// acquire or hold any lock. Callers are responsible for serializing concurrent +// deployments (today via the workspace-filesystem lock). The server-side +// version counter — CreateVersion only succeeds when the requested version is +// last_version_id + 1 — provides the concurrency control for the records +// themselves. +package dms + +import ( + "context" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/bundledeployments" +) + +// The server expires a version's lease if it does not receive a heartbeat +// within a 2-minute TTL; we heartbeat well inside that window. +const defaultHeartbeatInterval = 30 * time.Second + +// VersionType identifies the kind of deployment a version records. +type VersionType = bundledeployments.VersionType + +const ( + VersionTypeDeploy VersionType = bundledeployments.VersionTypeVersionTypeDeploy + VersionTypeDestroy VersionType = bundledeployments.VersionTypeVersionTypeDestroy +) + +// Recorder records a single deploy/destroy as a version with DMS. The +// deployment ID is the bundle's state lineage, so a bundle deployment maps +// one-to-one to a DMS deployment record. +type Recorder struct { + svc bundledeployments.BundleDeploymentsInterface + deploymentID string + targetName string + versionType VersionType + + // populated by CreateVersion + versionNum int64 + stopHeartbeat context.CancelFunc +} + +// NewRecorder returns a Recorder for the given deployment. +func NewRecorder(svc bundledeployments.BundleDeploymentsInterface, deploymentID, targetName string, versionType VersionType) *Recorder { + return &Recorder{ + svc: svc, + deploymentID: deploymentID, + targetName: targetName, + versionType: versionType, + } +} + +// CreateVersion registers a new version with DMS, claiming it for the duration +// of the deployment. A nil Recorder is a no-op, so callers can leave it nil +// when recording is disabled. +func (r *Recorder) CreateVersion(ctx context.Context) error { + if r == nil { + return nil + } + + versionID, err := r.createDeploymentVersion(ctx) + if err != nil { + return err + } + + versionNum, err := strconv.ParseInt(versionID, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse version ID %q: %w", versionID, err) + } + r.versionNum = versionNum + r.stopHeartbeat = startHeartbeat(ctx, r.svc, r.deploymentID, versionID) + return nil +} + +// CompleteVersion finalizes the version created by CreateVersion. A nil +// Recorder, or one whose CreateVersion never ran, is a no-op. +func (r *Recorder) CompleteVersion(ctx context.Context, success bool) error { + if r == nil || r.stopHeartbeat == nil { + return nil + } + + r.stopHeartbeat() + + versionIDStr := strconv.FormatInt(r.versionNum, 10) + versionName := fmt.Sprintf("deployments/%s/versions/%s", r.deploymentID, versionIDStr) + + reason := bundledeployments.VersionCompleteVersionCompleteSuccess + if !success { + reason = bundledeployments.VersionCompleteVersionCompleteFailure + } + + _, err := r.svc.CompleteVersion(ctx, bundledeployments.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }) + if err != nil { + return err + } + log.Infof(ctx, "Completed deployment version: deployment=%s version=%s reason=%s", r.deploymentID, versionIDStr, reason) + + // For destroy operations, delete the deployment record after the version + // completes successfully. + if success && r.versionType == VersionTypeDestroy { + err = r.svc.DeleteDeployment(ctx, bundledeployments.DeleteDeploymentRequest{ + Name: "deployments/" + r.deploymentID, + }) + if err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + + return nil +} + +// createDeploymentVersion ensures the deployment record exists, then creates a +// new version. We GetDeployment first and only CreateDeployment when it does +// not exist yet. +func (r *Recorder) createDeploymentVersion(ctx context.Context) (versionID string, err error) { + dep, getErr := r.svc.GetDeployment(ctx, bundledeployments.GetDeploymentRequest{ + Name: "deployments/" + r.deploymentID, + }) + switch { + case errors.Is(getErr, apierr.ErrNotFound): + // Fresh deployment: create the record and start at version 1. + _, createErr := r.svc.CreateDeployment(ctx, bundledeployments.CreateDeploymentRequest{ + DeploymentId: r.deploymentID, + Deployment: bundledeployments.Deployment{ + TargetName: r.targetName, + }, + }) + if createErr != nil { + return "", fmt.Errorf("failed to create deployment: %w", createErr) + } + versionID = "1" + case getErr != nil: + return "", fmt.Errorf("failed to get deployment: %w", getErr) + default: + // Existing deployment: increment the last version to get the next one. + lastVersion, parseErr := strconv.ParseInt(dep.LastVersionId, 10, 64) + if parseErr != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", dep.LastVersionId, parseErr) + } + versionID = strconv.FormatInt(lastVersion+1, 10) + } + + // The server validates that versionID equals last_version_id + 1 and returns + // ABORTED otherwise (e.g. a concurrent deploy already created this version). + version, versionErr := r.svc.CreateVersion(ctx, bundledeployments.CreateVersionRequest{ + Parent: "deployments/" + r.deploymentID, + VersionId: versionID, + Version: bundledeployments.Version{ + CliVersion: build.GetInfo().Version, + VersionType: r.versionType, + TargetName: r.targetName, + }, + }) + if versionErr != nil { + return "", fmt.Errorf("failed to create deployment version: %w", versionErr) + } + + log.Infof(ctx, "Created deployment version: deployment=%s version=%s", r.deploymentID, version.VersionId) + return versionID, nil +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment version's lease alive. Returns a cancel function to stop it. +func startHeartbeat(ctx context.Context, svc bundledeployments.BundleDeploymentsInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, bundledeployments.HeartbeatRequest{Name: versionName}) + if err != nil { + // A 409 ABORTED is expected if the version was completed + // between the ticker firing and the heartbeat. + if isAbortedErr(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent: deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} + +// isAbortedErr reports whether err is an HTTP 409 ABORTED from the DMS API. +func isAbortedErr(err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + return ok && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" +} diff --git a/libs/testserver/bundle.go b/libs/testserver/bundle.go new file mode 100644 index 00000000000..5b173c1484a --- /dev/null +++ b/libs/testserver/bundle.go @@ -0,0 +1,106 @@ +package testserver + +import ( + "encoding/json" + "strconv" + + "github.com/databricks/databricks-sdk-go/service/bundledeployments" +) + +// Handlers for the Deployment Metadata Service (DMS) API under /api/2.0/bundle. +// State is kept in FakeWorkspace.Deployments, keyed by deployment ID. + +func (s *FakeWorkspace) CreateDeployment(req Request) Response { + deploymentID := req.URL.Query().Get("deployment_id") + + var dep bundledeployments.Deployment + if err := json.Unmarshal(req.Body, &dep); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep.Name = "deployments/" + deploymentID + dep.Status = bundledeployments.DeploymentStatusDeploymentStatusActive + s.Deployments[deploymentID] = &dep + return Response{Body: dep} +} + +func (s *FakeWorkspace) GetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{ + "error_code": "RESOURCE_DOES_NOT_EXIST", + "message": "deployment " + deploymentID + " does not exist", + }, + } + } + return Response{Body: *dep} +} + +func (s *FakeWorkspace) DeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + delete(s.Deployments, deploymentID) + return Response{Body: map[string]any{}} +} + +func (s *FakeWorkspace) CreateVersion(req Request, deploymentID string) Response { + versionID := req.URL.Query().Get("version_id") + + var version bundledeployments.Version + if err := json.Unmarshal(req.Body, &version); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{"error_code": "RESOURCE_DOES_NOT_EXIST", "message": "deployment does not exist"}, + } + } + + // Mirror the server-side optimistic concurrency check: the new version must + // be exactly last_version_id + 1. + last, _ := strconv.ParseInt(dep.LastVersionId, 10, 64) + want := strconv.FormatInt(last+1, 10) + if dep.LastVersionId == "" { + want = "1" + } + if versionID != want { + return Response{ + StatusCode: 409, + Headers: map[string][]string{"Content-Type": {"application/json"}}, + Body: map[string]string{"error_code": "ABORTED", "message": "expected version " + want + ", got " + versionID}, + } + } + + dep.LastVersionId = versionID + version.Name = "deployments/" + deploymentID + "/versions/" + versionID + version.VersionId = versionID + version.Status = bundledeployments.VersionStatusVersionStatusInProgress + return Response{Body: version} +} + +func (s *FakeWorkspace) CompleteVersion(req Request, deploymentID, versionID string) Response { + var completeReq bundledeployments.CompleteVersionRequest + _ = json.Unmarshal(req.Body, &completeReq) + + return Response{Body: bundledeployments.Version{ + Name: "deployments/" + deploymentID + "/versions/" + versionID, + VersionId: versionID, + Status: bundledeployments.VersionStatusVersionStatusCompleted, + CompletionReason: completeReq.CompletionReason, + }} +} + +func (s *FakeWorkspace) Heartbeat() Response { + return Response{Body: bundledeployments.HeartbeatResponse{}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index d2925fb1e38..2740fa6cbe7 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/databricks/databricks-sdk-go/service/apps" + "github.com/databricks/databricks-sdk-go/service/bundledeployments" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -179,6 +180,9 @@ type FakeWorkspace struct { PostgresSyncedTables map[string]postgres.SyncedTable PostgresOperations map[string]postgres.Operation + // Deployment Metadata Service (DMS) deployment records, keyed by deployment ID. + Deployments map[string]*bundledeployments.Deployment + // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the // primary endpoint on a new branch). The real backend rejects independent @@ -319,6 +323,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresRoles: map[string]postgres.Role{}, PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, + Deployments: map[string]*bundledeployments.Deployment{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 8f611a7c7c9..6c028056220 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -222,6 +222,26 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.JobsCreate(req) }) + // Deployment Metadata Service (DMS) endpoints. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.CreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.GetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.CreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.CompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.Heartbeat() + }) + server.Handle("POST", "/api/2.2/jobs/delete", func(req Request) any { var request jobs.DeleteJob if err := json.Unmarshal(req.Body, &request); err != nil {