diff --git a/acceptance/bundle/dms/read/databricks.yml b/acceptance/bundle/dms/read/databricks.yml new file mode 100644 index 00000000000..1d741011abb --- /dev/null +++ b/acceptance/bundle/dms/read/databricks.yml @@ -0,0 +1,18 @@ +# pipelines.delete_me is recorded in DMS (see script) but intentionally absent +# here, so the plan deletes it. Pipelines are used (not jobs) because their ids +# are strings: the `databricks api` command rounds large int64 job ids in its +# output, which would break the id round-trip this test relies on. +bundle: + name: dms-read + +experimental: + record_deployment_history: true + +resources: + pipelines: + # Not recorded in DMS -> plan should create it. + create_me: + name: create-me + # Recorded in DMS with a different name -> plan should update it. + update_me: + name: update-me-new diff --git a/acceptance/bundle/dms/read/out.test.toml b/acceptance/bundle/dms/read/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/read/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/read/output.txt b/acceptance/bundle/dms/read/output.txt new file mode 100644 index 00000000000..bdf0c5b4d4f --- /dev/null +++ b/acceptance/bundle/dms/read/output.txt @@ -0,0 +1,119 @@ + +=== bundle summary reflects resources read from DMS (update_me resolves its id; create_me is not deployed) +>>> [CLI] bundle summary +Name: dms-read +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/dms-read/default +Resources: + Pipelines: + create_me: + Name: create-me + URL: (not deployed) + delete_me: + Name: + URL: [DATABRICKS_URL]/pipelines/[UUID]?w=[NUMID] + update_me: + Name: update-me-new + URL: [DATABRICKS_URL]/pipelines/[UUID]?w=[NUMID] + +=== bundle plan derives actions from DMS state: create create_me, update update_me, delete delete_me +>>> [CLI] bundle plan +create pipelines.create_me +delete pipelines.delete_me +update pipelines.update_me + +Plan: 1 to add, 1 to change, 1 to delete, 0 unchanged + +=== full plan as JSON +>>> [CLI] bundle plan -o json +{ + "plan_version": 2, + "cli_version": "[DEV_VERSION]", + "lineage": "synthetic-dep", + "serial": 1, + "plan": { + "resources.pipelines.create_me": { + "action": "create", + "new_state": { + "value": { + "channel": "CURRENT", + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json" + }, + "edition": "ADVANCED", + "name": "create-me" + } + } + }, + "resources.pipelines.delete_me": { + "action": "delete", + "remote_state": { + "creator_user_name": "[USERNAME]", + "effective_publishing_mode": "DEFAULT_PUBLISHING_MODE", + "id": "[UUID]", + "last_modified": [UNIX_TIME_MILLIS][0], + "name": "delete-me", + "pipeline_id": "[UUID]", + "run_as_user_name": "[USERNAME]", + "state": "IDLE", + "storage": "dbfs:/pipelines/[UUID]" + } + }, + "resources.pipelines.update_me": { + "action": "update", + "new_state": { + "value": { + "channel": "CURRENT", + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json" + }, + "edition": "ADVANCED", + "name": "update-me-new" + } + }, + "remote_state": { + "creator_user_name": "[USERNAME]", + "effective_publishing_mode": "DEFAULT_PUBLISHING_MODE", + "id": "[UUID]", + "last_modified": [UNIX_TIME_MILLIS][1], + "name": "update-me-old", + "pipeline_id": "[UUID]", + "run_as_user_name": "[USERNAME]", + "state": "IDLE", + "storage": "dbfs:/pipelines/[UUID]" + }, + "changes": { + "channel": { + "action": "update", + "new": "CURRENT" + }, + "deployment": { + "action": "update", + "new": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json" + } + }, + "edition": { + "action": "update", + "new": "ADVANCED" + }, + "name": { + "action": "update", + "old": "update-me-old", + "new": "update-me-new", + "remote": "update-me-old" + }, + "storage": { + "action": "skip", + "reason": "backend_default", + "remote": "dbfs:/pipelines/[UUID]" + } + } + } + } +} diff --git a/acceptance/bundle/dms/read/script b/acceptance/bundle/dms/read/script new file mode 100644 index 00000000000..1354f5a008f --- /dev/null +++ b/acceptance/bundle/dms/read/script @@ -0,0 +1,28 @@ +# Read deployment state from DMS: build synthetic DMS state for a deployment +# whose resources, compared with databricks.yml, exercise each plan action +# - create_me: in config, not in DMS -> create +# - update_me: in both, names differ -> update +# - delete_me: in DMS, not in config -> delete +# then verify both `bundle summary` and `bundle plan` read it from DMS. + +update_id=$($CLI pipelines create -o json --json '{"name":"update-me-old"}' | jq -r .pipeline_id) +delete_id=$($CLI pipelines create -o json --json '{"name":"delete-me"}' | jq -r .pipeline_id) + +# DMS owns the state only once a version has completed successfully, so record one +# for the deployment before seeding its resources. +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions?version_id=1" --json '{"version_type":"VERSION_TYPE_DEPLOY"}' > /dev/null +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/complete" --json '{"completion_reason":"VERSION_COMPLETE_SUCCESS"}' > /dev/null + +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/operations" --json "{\"resource_key\":\"pipelines.update_me\",\"resource_id\":\"$update_id\",\"action_type\":\"OPERATION_ACTION_TYPE_CREATE\",\"status\":\"OPERATION_STATUS_SUCCEEDED\",\"state\":{\"name\":\"update-me-old\"}}" > /dev/null +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/operations" --json "{\"resource_key\":\"pipelines.delete_me\",\"resource_id\":\"$delete_id\",\"action_type\":\"OPERATION_ACTION_TYPE_CREATE\",\"status\":\"OPERATION_STATUS_SUCCEEDED\",\"state\":{\"name\":\"delete-me\"}}" > /dev/null + +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/workspace-files/import-file/Workspace/Users/$CURRENT_USER_NAME/.bundle/dms-read/default/state/resources.json?overwrite=true" --json '{"state_version":2,"lineage":"synthetic-dep","serial":1,"state":{}}' > /dev/null + +title "bundle summary reflects resources read from DMS (update_me resolves its id; create_me is not deployed)" +trace $CLI bundle summary + +title "bundle plan derives actions from DMS state: create create_me, update update_me, delete delete_me" +trace $CLI bundle plan + +title "full plan as JSON" +trace $CLI bundle plan -o json diff --git a/acceptance/bundle/dms/read/test.toml b/acceptance/bundle/dms/read/test.toml new file mode 100644 index 00000000000..cfc54980cbb --- /dev/null +++ b/acceptance/bundle/dms/read/test.toml @@ -0,0 +1,2 @@ +# DMS state reading is a direct-engine feature; pin the engine for stable output. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/bundle/configsync/diff.go b/bundle/configsync/diff.go index 4a0b4f01ca3..69d6c6fad87 100644 --- a/bundle/configsync/diff.go +++ b/bundle/configsync/diff.go @@ -140,7 +140,7 @@ func DetectChanges(ctx context.Context, b *bundle.Bundle, engine engine.EngineTy } else { deployBundle = &direct.DeploymentBundle{} _, statePath := b.StateFilenameConfigSnapshot(ctx) - if err := deployBundle.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + if err := deployBundle.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil); err != nil { return nil, fmt.Errorf("failed to open state: %w", err) } } diff --git a/bundle/configsync/variables.go b/bundle/configsync/variables.go index 0745bfba43b..a759d0f85d3 100644 --- a/bundle/configsync/variables.go +++ b/bundle/configsync/variables.go @@ -144,7 +144,7 @@ func resourceIDLookup(ctx context.Context, b *bundle.Bundle) func(string) string } _, statePath := b.StateFilenameConfigSnapshot(ctx) db := &dstate.DeploymentState{} - if err := db.Open(ctx, statePath, dstate.WithRecovery(false), dstate.WithWrite(false)); err != nil { + if err := db.Open(ctx, statePath, dstate.WithRecovery(false), dstate.WithWrite(false), nil); err != nil { log.Debugf(ctx, "variable restoration: failed to open state DB at %s: %v", statePath, err) return nil } diff --git a/bundle/direct/bind.go b/bundle/direct/bind.go index f1c534bea9d..2477b9a0a95 100644 --- a/bundle/direct/bind.go +++ b/bundle/direct/bind.go @@ -62,7 +62,7 @@ type BindResult struct { func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath, resourceKey, resourceID string) (*BindResult, error) { // Check if the resource is already managed (bound to a different ID) var checkStateDB dstate.DeploymentState - if err := checkStateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err == nil { + if err := checkStateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil); err == nil { existingID := checkStateDB.GetResourceID(resourceKey) if _, err := checkStateDB.Finalize(ctx); err != nil { log.Warnf(ctx, "failed to finalize state: %v", err) @@ -86,7 +86,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } // Open temp state - err := b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(false), dstate.WithWrite(true)) + err := b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(false), dstate.WithWrite(true), nil) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -109,7 +109,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac log.Infof(ctx, "Bound %s to id=%s (in temp state)", resourceKey, resourceID) // First plan + update: populate state with resolved config - err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false)) + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -144,7 +144,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } } - err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true)) + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true), nil) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -164,7 +164,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } // Second plan: this is the plan to present to the user (change between remote resource and config) - err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false)) + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -214,7 +214,7 @@ func (result *BindResult) Cancel() { // Unbind removes a resource from direct engine state without deleting // the workspace resource. Also removes associated permissions/grants entries. func (b *DeploymentBundle) Unbind(ctx context.Context, statePath, resourceKey string) error { - err := b.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(true)) + err := b.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(true), nil) if err != nil { return err } diff --git a/bundle/direct/dstate/dms.go b/bundle/direct/dstate/dms.go new file mode 100644 index 00000000000..b24a4656833 --- /dev/null +++ b/bundle/direct/dstate/dms.go @@ -0,0 +1,104 @@ +package dstate + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// overlayDMSState replaces the file-derived resource state with the state +// recorded in the deployment metadata service, when DMS owns this deployment. +// Once DMS is authoritative its resource set is trusted even when empty (a +// successful deploy with no resources); the file's resources are only used when +// DMS has no successful version, or when the user opts out of recording +// deployment history. The caller holds db.mu and has already populated db.Data +// from the file. +func (db *DeploymentState) overlayDMSState(ctx context.Context, client sdkbundle.BundleInterface) error { + authoritative, err := deploymentHasSuccessfulVersion(ctx, client, db.Data.Lineage) + if err != nil { + return err + } + if !authoritative { + // DMS has no completed version for this lineage: a prior direct deployment + // that has not yet successfully recorded to DMS. Keep the file state. + return nil + } + + resources, err := fetchDeploymentResources(ctx, client, db.Data.Lineage) + if err != nil { + return err + } + + db.Data.State = resources + db.stateIDs = make(map[string]string, len(resources)) + for key, entry := range resources { + db.stateIDs[key] = entry.ID + } + return nil +} + +// deploymentHasSuccessfulVersion reports whether DMS holds a successfully +// completed version for the deployment. It is the signal that DMS owns the +// state: if the deployment was never recorded to DMS, or its initial DMS deploy +// did not complete successfully, DMS state is absent or partial and Open keeps +// the local file's resources instead. +func deploymentHasSuccessfulVersion(ctx context.Context, client sdkbundle.BundleInterface, deploymentID string) (bool, error) { + // Versions are listed newest-first and fetched page by page, and we stop at + // the first successful one, so a deployment with a long version history does + // not require reading the whole list (typically just the first page). + it := client.ListVersions(ctx, sdkbundle.ListVersionsRequest{ + Parent: "deployments/" + deploymentID, + }) + for it.HasNext(ctx) { + v, err := it.Next(ctx) + if err != nil { + // A deployment that was never recorded to DMS is not an error here: it just + // means DMS is not (yet) the source of truth. + if errors.Is(err, apierr.ErrNotFound) { + return false, nil + } + return false, fmt.Errorf("listing versions from deployment metadata service: %w", err) + } + if v.Status == sdkbundle.VersionStatusVersionStatusCompleted && + v.CompletionReason == sdkbundle.VersionCompleteVersionCompleteSuccess { + return true, nil + } + } + return false, nil +} + +// fetchDeploymentResources lists every resource recorded for the deployment in +// DMS and maps them into state entries keyed by the fully-qualified resource key. +func fetchDeploymentResources(ctx context.Context, client sdkbundle.BundleInterface, deploymentID string) (map[string]ResourceEntry, error) { + it := client.ListResources(ctx, sdkbundle.ListResourcesRequest{ + Parent: "deployments/" + deploymentID, + }) + + out := make(map[string]ResourceEntry) + for it.HasNext(ctx) { + res, err := it.Next(ctx) + if err != nil { + return nil, fmt.Errorf("listing resources from deployment metadata service: %w", err) + } + + // DMS reports resource keys without the "resources." prefix (e.g. + // "jobs.foo"), but the state DB keys are fully qualified + // ("resources.jobs.foo"), so prepend it here. + key := "resources." + res.ResourceKey + + var state json.RawMessage + if res.State != nil { + state = *res.State + } + + out[key] = ResourceEntry{ + ID: res.ResourceId, + State: state, + } + } + return out, nil +} diff --git a/bundle/direct/dstate/dms_test.go b/bundle/direct/dstate/dms_test.go new file mode 100644 index 00000000000..c28ec1d0d05 --- /dev/null +++ b/bundle/direct/dstate/dms_test.go @@ -0,0 +1,195 @@ +package dstate + +import ( + "context" + "encoding/json" + "maps" + "os" + "path/filepath" + "testing" + + "github.com/databricks/databricks-sdk-go/listing" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeBundleClient struct { + sdkbundle.BundleInterface + resources []sdkbundle.Resource + versions []sdkbundle.Version + + // Number of versions consumed from ListVersions iterators, to observe how + // far a scan read into the (paginated) list. + versionNexts int +} + +func (c *fakeBundleClient) ListResources(context.Context, sdkbundle.ListResourcesRequest) listing.Iterator[sdkbundle.Resource] { + it := listing.SliceIterator[sdkbundle.Resource](c.resources) + return &it +} + +func (c *fakeBundleClient) ListVersions(context.Context, sdkbundle.ListVersionsRequest) listing.Iterator[sdkbundle.Version] { + it := listing.SliceIterator[sdkbundle.Version](c.versions) + return &countingIterator{Iterator: &it, nexts: &c.versionNexts} +} + +// countingIterator counts Next calls on behalf of fakeBundleClient. +type countingIterator struct { + listing.Iterator[sdkbundle.Version] + nexts *int +} + +func (c *countingIterator) Next(ctx context.Context) (sdkbundle.Version, error) { + *c.nexts++ + return c.Iterator.Next(ctx) +} + +func raw(s string) *json.RawMessage { + msg := json.RawMessage(s) + return &msg +} + +func completed(reason sdkbundle.VersionComplete) sdkbundle.Version { + return sdkbundle.Version{Status: sdkbundle.VersionStatusVersionStatusCompleted, CompletionReason: reason} +} + +var succeeded = completed(sdkbundle.VersionCompleteVersionCompleteSuccess) + +// writeStateFile writes a resources.json with the given lineage and resources, +// standing in for a prior local (direct) deployment, and returns its path. +func writeStateFile(t *testing.T, lineage string, state map[string]ResourceEntry) string { + t.Helper() + db := NewDatabase(lineage, 1) + maps.Copy(db.State, state) + content, err := json.Marshal(db) + require.NoError(t, err) + path := filepath.Join(t.TempDir(), "resources.json") + require.NoError(t, os.WriteFile(path, content, 0o600)) + return path +} + +// TestOpenWithDMS covers how Open chooses between the local file and the +// deployment metadata service. Identity (lineage) always comes from the file; +// only the resource set is overlaid from DMS, and only when DMS owns the +// deployment (a version has completed successfully). +func TestOpenWithDMS(t *testing.T) { + fileState := map[string]ResourceEntry{"resources.jobs.from_file": {ID: "file-1"}} + dmsResources := []sdkbundle.Resource{ + {ResourceKey: "jobs.foo", ResourceId: "job-1", State: raw(`{"name":"foo"}`)}, + } + + tests := []struct { + name string + lineage string + client sdkbundle.BundleInterface + want map[string]string // expected resource key -> ID after Open + }{ + { + name: "DMS owns the deployment: resources come from DMS, identity from the file", + lineage: "dep-1", + client: &fakeBundleClient{resources: dmsResources, versions: []sdkbundle.Version{succeeded}}, + want: map[string]string{"resources.jobs.foo": "job-1"}, + }, + { + name: "no successful version yet: fall back to the direct file-based state", + lineage: "dep-1", + client: &fakeBundleClient{resources: dmsResources}, + want: map[string]string{"resources.jobs.from_file": "file-1"}, + }, + { + // The client reports a successful version and resources; if Open consulted + // DMS despite the missing lineage, jobs.foo would appear in state. + name: "no lineage in the file (nothing deployed yet): never consult DMS", + lineage: "", + client: &fakeBundleClient{resources: dmsResources, versions: []sdkbundle.Version{succeeded}}, + want: map[string]string{"resources.jobs.from_file": "file-1"}, + }, + { + name: "nil client (record_deployment_history off): file-based state only", + lineage: "dep-1", + client: nil, + want: map[string]string{"resources.jobs.from_file": "file-1"}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + path := writeStateFile(t, tc.lineage, fileState) + var db DeploymentState + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(false), tc.client)) + + assert.Equal(t, tc.lineage, db.Data.Lineage) + ids := make(map[string]string) + for key := range db.Data.State { + // GetResourceID reads the stateIDs cache, so this also checks it was + // rebuilt in sync with the overlaid state. + ids[key] = db.GetResourceID(key) + } + assert.Equal(t, tc.want, ids) + }) + } +} + +// TestDeploymentHasSuccessfulVersion is the gate that decides whether DMS owns +// a deployment's state: only once a version has completed successfully. +// wantNexts pins the pagination contract: versions are listed newest-first and +// the scan stops at the first success, so deployments with long version +// histories don't fetch the whole list. +func TestDeploymentHasSuccessfulVersion(t *testing.T) { + failed := completed(sdkbundle.VersionCompleteVersionCompleteFailure) + inProgress := sdkbundle.Version{Status: sdkbundle.VersionStatusVersionStatusInProgress} + tests := []struct { + name string + versions []sdkbundle.Version + want bool + wantNexts int + }{ + {"no versions", nil, false, 0}, + {"in progress", []sdkbundle.Version{inProgress}, false, 1}, + {"failed", []sdkbundle.Version{failed}, false, 1}, + {"succeeded", []sdkbundle.Version{succeeded}, true, 1}, + {"failed then succeeded", []sdkbundle.Version{failed, succeeded}, true, 2}, + {"stops at first success", []sdkbundle.Version{succeeded, inProgress}, true, 1}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + client := &fakeBundleClient{versions: tc.versions} + got, err := deploymentHasSuccessfulVersion(t.Context(), client, "dep-1") + require.NoError(t, err) + assert.Equal(t, tc.want, got) + assert.Equal(t, tc.wantNexts, client.versionNexts) + }) + } +} + +// TestFetchDeploymentResources covers mapping DMS resources to state entries. +func TestFetchDeploymentResources(t *testing.T) { + tests := []struct { + name string + resources []sdkbundle.Resource + want map[string]ResourceEntry + }{ + { + name: "keys are prefixed with resources. and id/state are copied", + resources: []sdkbundle.Resource{{ResourceKey: "jobs.foo", ResourceId: "job-1", State: raw(`{"name":"foo"}`)}}, + want: map[string]ResourceEntry{"resources.jobs.foo": {ID: "job-1", State: json.RawMessage(`{"name":"foo"}`)}}, + }, + { + name: "missing state becomes nil", + resources: []sdkbundle.Resource{{ResourceKey: "jobs.foo", ResourceId: "job-1"}}, + want: map[string]ResourceEntry{"resources.jobs.foo": {ID: "job-1"}}, + }, + { + name: "empty list yields no entries", + resources: nil, + want: map[string]ResourceEntry{}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := fetchDeploymentResources(t.Context(), &fakeBundleClient{resources: tc.resources}, "dep-1") + require.NoError(t, err) + assert.Equal(t, tc.want, got) + }) + } +} diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 5b2a70adbb3..0167733475f 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -17,6 +17,7 @@ import ( "github.com/databricks/cli/bundle/statemgmt/resourcestate" "github.com/databricks/cli/internal/build" "github.com/databricks/cli/libs/log" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/google/uuid" ) @@ -156,7 +157,14 @@ type ( WithWrite bool ) -func (db *DeploymentState) Open(ctx context.Context, path string, withRecovery WithRecovery, withWrite WithWrite) error { +// Open reads the deployment state from disk (and recovers the WAL when +// withRecovery is set). When dmsClient is non-nil, the deployment metadata +// service is the source of truth for resource state: if DMS holds a +// successfully completed version for this deployment's lineage, the resources +// read from the file are replaced with the ones recorded in DMS. The local +// identity (lineage and serial) always comes from the file, since that is what +// the write path increments. A nil dmsClient keeps the behavior file-only. +func (db *DeploymentState) Open(ctx context.Context, path string, withRecovery WithRecovery, withWrite WithWrite, dmsClient sdkbundle.BundleInterface) error { db.mu.Lock() defer db.mu.Unlock() @@ -204,6 +212,12 @@ func (db *DeploymentState) Open(ctx context.Context, path string, withRecovery W return fmt.Errorf("migrating state %s: %w", path, err) } + if dmsClient != nil && db.Data.Lineage != "" { + if err := db.overlayDMSState(ctx, dmsClient); err != nil { + return err + } + } + if withWrite { if err := os.MkdirAll(filepath.Dir(walPath), 0o755); err != nil { return fmt.Errorf("failed to create state directory: %w", err) diff --git a/bundle/direct/dstate/state_test.go b/bundle/direct/dstate/state_test.go index bbfd2559951..89fdd7ca561 100644 --- a/bundle/direct/dstate/state_test.go +++ b/bundle/direct/dstate/state_test.go @@ -19,14 +19,14 @@ func TestOpenSaveFinalizeRoundTrip(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil)) require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{"key": "val"}, nil)) mustFinalize(t, &db) // Re-open and verify persisted data. var db2 DeploymentState - require.NoError(t, db2.Open(t.Context(), path, WithRecovery(false), WithWrite(false))) + require.NoError(t, db2.Open(t.Context(), path, WithRecovery(false), WithWrite(false), nil)) assert.Equal(t, 1, db2.Data.Serial) assert.Equal(t, "123", db2.GetResourceID("jobs.my_job")) mustFinalize(t, &db2) @@ -36,7 +36,7 @@ func TestFinalizeWithNoEntriesDoesNotWriteStateFile(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil)) mustFinalize(t, &db) _, err := os.Stat(path) @@ -47,10 +47,10 @@ func TestPanicOnDoubleOpen(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil)) assert.Panics(t, func() { - _ = db.Open(t.Context(), path, WithRecovery(true), WithWrite(true)) + _ = db.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil) }) mustFinalize(t, &db) } @@ -59,17 +59,17 @@ func TestDeleteState(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil)) require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil)) mustFinalize(t, &db) var db2 DeploymentState - require.NoError(t, db2.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db2.Open(t.Context(), path, WithRecovery(true), WithWrite(true), nil)) require.NoError(t, db2.DeleteState("jobs.my_job")) mustFinalize(t, &db2) var db3 DeploymentState - require.NoError(t, db3.Open(t.Context(), path, WithRecovery(false), WithWrite(false))) + require.NoError(t, db3.Open(t.Context(), path, WithRecovery(false), WithWrite(false), nil)) assert.Equal(t, 2, db3.Data.Serial) assert.Empty(t, db3.GetResourceID("jobs.my_job")) mustFinalize(t, &db3) diff --git a/cmd/bundle/generate/dashboard.go b/cmd/bundle/generate/dashboard.go index 7af4e01e92f..e73bc9c15cc 100644 --- a/cmd/bundle/generate/dashboard.go +++ b/cmd/bundle/generate/dashboard.go @@ -394,7 +394,7 @@ func (d *dashboard) runForResource(ctx context.Context, b *bundle.Bundle) { var state statemgmt.ExportedResourcesMap if stateDesc.Engine.IsDirect() { _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false), nil); err != nil { logdiag.LogError(ctx, err) return } diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 5f43cff6acd..f2d88f3053d 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -25,6 +25,7 @@ import ( "github.com/databricks/cli/libs/logdiag" "github.com/databricks/cli/libs/sync" "github.com/databricks/cli/libs/telemetry/protos" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/spf13/cobra" ) @@ -189,7 +190,16 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle needDirectState := stateDesc.Engine.IsDirect() && (opts.InitIDs || opts.ErrorOnEmptyState || opts.Deploy || opts.ReadPlanPath != "" || opts.PreDeployChecks || opts.PostStateFunc != nil) if needDirectState { _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + + // When the bundle records deployment history, the deployment metadata + // service owns resource state, so hand Open its client to overlay DMS + // state on top of the local identity (lineage/serial). Reads open the + // state write-disabled, so no lineage is minted here. + var dmsClient sdkbundle.BundleInterface + if b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory { + dmsClient = b.WorkspaceClient(ctx).Bundle + } + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false), dmsClient); err != nil { logdiag.LogError(ctx, err) return b, stateDesc, root.ErrAlreadyPrinted } diff --git a/libs/testserver/bundle.go b/libs/testserver/bundle.go new file mode 100644 index 00000000000..089004c6f0d --- /dev/null +++ b/libs/testserver/bundle.go @@ -0,0 +1,108 @@ +package testserver + +import ( + "encoding/json" + "slices" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// Minimal Deployment Metadata Service (DMS) handlers under /api/2.0/bundle, +// sufficient to seed deployment state and read it back: CreateVersion / +// CompleteVersion / ListVersions track a deployment's versions, CreateOperation +// upserts (or deletes) a deployment-level resource, and ListResources returns +// the recorded resources for a deployment. + +// BundleCreateVersion records a new in-progress version for a deployment. +func (s *FakeWorkspace) BundleCreateVersion(req Request, deploymentID string) Response { + var version sdkbundle.Version + if err := json.Unmarshal(req.Body, &version); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + version.VersionId = req.URL.Query().Get("version_id") + version.Name = "deployments/" + deploymentID + "/versions/" + version.VersionId + version.Status = sdkbundle.VersionStatusVersionStatusInProgress + s.BundleVersions[deploymentID] = append(s.BundleVersions[deploymentID], version) + + return Response{Body: version} +} + +// BundleCompleteVersion marks a deployment version completed with the given reason. +func (s *FakeWorkspace) BundleCompleteVersion(req Request, deploymentID, versionID string) Response { + var completeReq sdkbundle.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &completeReq); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + versions := s.BundleVersions[deploymentID] + for i := range versions { + if versions[i].VersionId == versionID { + versions[i].Status = sdkbundle.VersionStatusVersionStatusCompleted + versions[i].CompletionReason = completeReq.CompletionReason + return Response{Body: versions[i]} + } + } + return Response{StatusCode: 404, Body: map[string]string{"message": "version not found"}} +} + +// BundleListVersions returns the versions recorded for a deployment. +func (s *FakeWorkspace) BundleListVersions(deploymentID string) Response { + defer s.LockUnlock()() + + return Response{Body: sdkbundle.ListVersionsResponse{Versions: s.BundleVersions[deploymentID]}} +} + +// BundleCreateOperation records a resource operation against a deployment. A +// create/update operation upserts the deployment-level resource; a delete +// operation removes it. +func (s *FakeWorkspace) BundleCreateOperation(req Request, deploymentID string) Response { + var op sdkbundle.Operation + if err := json.Unmarshal(req.Body, &op); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + if s.BundleResources[deploymentID] == nil { + s.BundleResources[deploymentID] = map[string]sdkbundle.Resource{} + } + + if op.ActionType == sdkbundle.OperationActionTypeOperationActionTypeDelete { + delete(s.BundleResources[deploymentID], op.ResourceKey) + } else { + s.BundleResources[deploymentID][op.ResourceKey] = sdkbundle.Resource{ + Name: "deployments/" + deploymentID + "/resources/" + op.ResourceKey, + ResourceKey: op.ResourceKey, + ResourceId: op.ResourceId, + LastActionType: op.ActionType, + State: op.State, + } + } + + return Response{Body: op} +} + +// BundleListResources returns the resources recorded for a deployment, sorted by +// resource key for deterministic output. +func (s *FakeWorkspace) BundleListResources(deploymentID string) Response { + defer s.LockUnlock()() + + resources := s.BundleResources[deploymentID] + keys := make([]string, 0, len(resources)) + for k := range resources { + keys = append(keys, k) + } + slices.Sort(keys) + + out := make([]sdkbundle.Resource, 0, len(keys)) + for _, k := range keys { + out = append(out, resources[k]) + } + + return Response{Body: sdkbundle.ListResourcesResponse{Resources: out}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index ff70f6b0505..3afa1ff8fec 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/databricks/databricks-sdk-go/service/apps" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -176,6 +177,15 @@ type FakeWorkspace struct { PostgresSyncedTables map[string]postgres.SyncedTable PostgresOperations map[string]postgres.Operation + // Deployment Metadata Service (DMS) resources recorded per deployment, keyed + // by deployment ID then by resource key. Populated from CreateOperation and + // served by ListResources. + BundleResources map[string]map[string]sdkbundle.Resource + + // DMS versions recorded per deployment, keyed by deployment ID. Populated from + // CreateVersion/CompleteVersion and served by ListVersions. + BundleVersions map[string][]sdkbundle.Version + // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the // primary endpoint on a new branch). The real backend rejects independent @@ -313,6 +323,8 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresCatalogs: map[string]postgres.Catalog{}, PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, + BundleResources: map[string]map[string]sdkbundle.Resource{}, + BundleVersions: map[string][]sdkbundle.Version{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 659f6e010ff..4669baea46f 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -218,6 +218,23 @@ func AddDefaultHandlers(server *Server) { return TestMetastore }) + // Deployment Metadata Service (DMS): minimal resource recording + listing. + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.BundleCreateOperation(req, req.Vars["deployment_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.BundleListResources(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.BundleCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.BundleCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.BundleListVersions(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.2/jobs/create", func(req Request) any { return req.Workspace.JobsCreate(req) })