Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
93117a0
bundle/statemgmt: add StateReader abstraction for file and DMS state
shreyas-goenka May 31, 2026
b06e4d1
cmd/bundle: read state from DMS when record_deployment_history is set
shreyas-goenka Jun 1, 2026
a358c74
bundle/statemgmt: preserve deployment lineage when reading state from…
shreyas-goenka Jun 1, 2026
52e9a93
libs/testserver: minimal DMS resource recording for tests
shreyas-goenka Jun 1, 2026
52cb43d
acceptance/bundle/dms: state-read reads resource state from DMS
shreyas-goenka Jun 1, 2026
4cb6109
acceptance/bundle/dms: plan shows create/update/delete from DMS state
shreyas-goenka Jun 1, 2026
b174c65
acceptance/bundle/dms/plan: fix CI (yamlfmt comment, jq-free id capture)
shreyas-goenka Jun 1, 2026
a71d17b
bundle/statemgmt: simplify statereader tests
shreyas-goenka Jun 1, 2026
94ce558
acceptance/bundle/dms: fix plan test on CI; log full plan JSON
shreyas-goenka Jun 1, 2026
fbcde65
acceptance/bundle/dms/plan: create pipelines via the pipelines command
shreyas-goenka Jun 1, 2026
05ffbf7
bundle/statemgmt: trim redundant statereader tests
shreyas-goenka Jun 1, 2026
322c498
bundle/statemgmt: clarify TestDMSStateReader comment
shreyas-goenka Jun 1, 2026
4c1a319
bundle/statemgmt: clarify DMS read keeps resources.json for backward …
shreyas-goenka Jun 1, 2026
7c92471
libs/testserver: alias bundle SDK as sdkbundle
shreyas-goenka Jun 1, 2026
4e585e7
acceptance/bundle/dms: combine state-read and plan into one read test
shreyas-goenka Jun 1, 2026
6c1f1b8
bundle/statemgmt: use record-deployment-history terminology, not mana…
shreyas-goenka Jun 1, 2026
ea94cde
bundle/statemgmt: document statereader identity-vs-resources model an…
shreyas-goenka Jun 1, 2026
0af053f
bundle/statemgmt: keep resources.json resources when DMS has none yet
shreyas-goenka Jun 1, 2026
121192b
bundle/statemgmt: read from DMS only after a version completed succes…
shreyas-goenka Jun 1, 2026
dc7d772
libs/testserver: track DMS versions; acceptance records a completed v…
shreyas-goenka Jun 1, 2026
fc879a6
bundle/statemgmt: make reader-selection test names state the direct f…
shreyas-goenka Jun 1, 2026
8e3a46d
bundle/statemgmt: clarify dmsStateReader does not fall back to local …
shreyas-goenka Jun 1, 2026
eafc9e5
bundle/statemgmt: note resources.json fallback only on opting out of DMS
shreyas-goenka Jun 1, 2026
b2fa5b3
bundle/statemgmt: note where the lineage is minted (first write-mode …
shreyas-goenka Jun 1, 2026
434628c
bundle/direct/dstate: fold DMS state read into StateDB.Open
shreyas-goenka Jun 9, 2026
6206d44
bundle/direct/dstate: page through DMS list APIs instead of fetching all
shreyas-goenka Jun 9, 2026
8110fbf
acceptance/bundle/dms: fix Windows CI with MSYS_NO_PATHCONV on api calls
shreyas-goenka Jun 10, 2026
28cf10f
bundle/direct/dstate: pin DMS overlay semantics and early-exit scan
shreyas-goenka Jun 10, 2026
f50dfc0
bundle/direct/dstate: table-drive the DMS open and version-gate tests
shreyas-goenka Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions acceptance/bundle/dms/read/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# pipelines.delete_me is recorded in DMS (see script) but intentionally absent
# here, so the plan deletes it. Pipelines are used (not jobs) because their ids
# are strings: the `databricks api` command rounds large int64 job ids in its
# output, which would break the id round-trip this test relies on.
bundle:
name: dms-read

experimental:
record_deployment_history: true

resources:
pipelines:
# Not recorded in DMS -> plan should create it.
create_me:
name: create-me
# Recorded in DMS with a different name -> plan should update it.
update_me:
name: update-me-new
3 changes: 3 additions & 0 deletions acceptance/bundle/dms/read/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 119 additions & 0 deletions acceptance/bundle/dms/read/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@

=== bundle summary reflects resources read from DMS (update_me resolves its id; create_me is not deployed)
>>> [CLI] bundle summary
Name: dms-read
Target: default
Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/dms-read/default
Resources:
Pipelines:
create_me:
Name: create-me
URL: (not deployed)
delete_me:
Name:
URL: [DATABRICKS_URL]/pipelines/[UUID]?w=[NUMID]
update_me:
Name: update-me-new
URL: [DATABRICKS_URL]/pipelines/[UUID]?w=[NUMID]

=== bundle plan derives actions from DMS state: create create_me, update update_me, delete delete_me
>>> [CLI] bundle plan
create pipelines.create_me
delete pipelines.delete_me
update pipelines.update_me

Plan: 1 to add, 1 to change, 1 to delete, 0 unchanged

=== full plan as JSON
>>> [CLI] bundle plan -o json
{
"plan_version": 2,
"cli_version": "[DEV_VERSION]",
"lineage": "synthetic-dep",
"serial": 1,
"plan": {
"resources.pipelines.create_me": {
"action": "create",
"new_state": {
"value": {
"channel": "CURRENT",
"deployment": {
"kind": "BUNDLE",
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json"
},
"edition": "ADVANCED",
"name": "create-me"
}
}
},
"resources.pipelines.delete_me": {
"action": "delete",
"remote_state": {
"creator_user_name": "[USERNAME]",
"effective_publishing_mode": "DEFAULT_PUBLISHING_MODE",
"id": "[UUID]",
"last_modified": [UNIX_TIME_MILLIS][0],
"name": "delete-me",
"pipeline_id": "[UUID]",
"run_as_user_name": "[USERNAME]",
"state": "IDLE",
"storage": "dbfs:/pipelines/[UUID]"
}
},
"resources.pipelines.update_me": {
"action": "update",
"new_state": {
"value": {
"channel": "CURRENT",
"deployment": {
"kind": "BUNDLE",
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json"
},
"edition": "ADVANCED",
"name": "update-me-new"
}
},
"remote_state": {
"creator_user_name": "[USERNAME]",
"effective_publishing_mode": "DEFAULT_PUBLISHING_MODE",
"id": "[UUID]",
"last_modified": [UNIX_TIME_MILLIS][1],
"name": "update-me-old",
"pipeline_id": "[UUID]",
"run_as_user_name": "[USERNAME]",
"state": "IDLE",
"storage": "dbfs:/pipelines/[UUID]"
},
"changes": {
"channel": {
"action": "update",
"new": "CURRENT"
},
"deployment": {
"action": "update",
"new": {
"kind": "BUNDLE",
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-read/default/state/metadata.json"
}
},
"edition": {
"action": "update",
"new": "ADVANCED"
},
"name": {
"action": "update",
"old": "update-me-old",
"new": "update-me-new",
"remote": "update-me-old"
},
"storage": {
"action": "skip",
"reason": "backend_default",
"remote": "dbfs:/pipelines/[UUID]"
}
}
}
}
}
28 changes: 28 additions & 0 deletions acceptance/bundle/dms/read/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Read deployment state from DMS: build synthetic DMS state for a deployment
# whose resources, compared with databricks.yml, exercise each plan action
# - create_me: in config, not in DMS -> create
# - update_me: in both, names differ -> update
# - delete_me: in DMS, not in config -> delete
# then verify both `bundle summary` and `bundle plan` read it from DMS.

update_id=$($CLI pipelines create -o json --json '{"name":"update-me-old"}' | jq -r .pipeline_id)
delete_id=$($CLI pipelines create -o json --json '{"name":"delete-me"}' | jq -r .pipeline_id)

# DMS owns the state only once a version has completed successfully, so record one
# for the deployment before seeding its resources.
MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions?version_id=1" --json '{"version_type":"VERSION_TYPE_DEPLOY"}' > /dev/null
MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/complete" --json '{"completion_reason":"VERSION_COMPLETE_SUCCESS"}' > /dev/null

MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/operations" --json "{\"resource_key\":\"pipelines.update_me\",\"resource_id\":\"$update_id\",\"action_type\":\"OPERATION_ACTION_TYPE_CREATE\",\"status\":\"OPERATION_STATUS_SUCCEEDED\",\"state\":{\"name\":\"update-me-old\"}}" > /dev/null
MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/bundle/deployments/synthetic-dep/versions/1/operations" --json "{\"resource_key\":\"pipelines.delete_me\",\"resource_id\":\"$delete_id\",\"action_type\":\"OPERATION_ACTION_TYPE_CREATE\",\"status\":\"OPERATION_STATUS_SUCCEEDED\",\"state\":{\"name\":\"delete-me\"}}" > /dev/null

MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/workspace-files/import-file/Workspace/Users/$CURRENT_USER_NAME/.bundle/dms-read/default/state/resources.json?overwrite=true" --json '{"state_version":2,"lineage":"synthetic-dep","serial":1,"state":{}}' > /dev/null

title "bundle summary reflects resources read from DMS (update_me resolves its id; create_me is not deployed)"
trace $CLI bundle summary

title "bundle plan derives actions from DMS state: create create_me, update update_me, delete delete_me"
trace $CLI bundle plan

title "full plan as JSON"
trace $CLI bundle plan -o json
2 changes: 2 additions & 0 deletions acceptance/bundle/dms/read/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# DMS state reading is a direct-engine feature; pin the engine for stable output.
EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"]
2 changes: 1 addition & 1 deletion bundle/configsync/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func DetectChanges(ctx context.Context, b *bundle.Bundle, engine engine.EngineTy
} else {
deployBundle = &direct.DeploymentBundle{}
_, statePath := b.StateFilenameConfigSnapshot(ctx)
if err := deployBundle.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil {
if err := deployBundle.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil); err != nil {
return nil, fmt.Errorf("failed to open state: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion bundle/configsync/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func resourceIDLookup(ctx context.Context, b *bundle.Bundle) func(string) string
}
_, statePath := b.StateFilenameConfigSnapshot(ctx)
db := &dstate.DeploymentState{}
if err := db.Open(ctx, statePath, dstate.WithRecovery(false), dstate.WithWrite(false)); err != nil {
if err := db.Open(ctx, statePath, dstate.WithRecovery(false), dstate.WithWrite(false), nil); err != nil {
log.Debugf(ctx, "variable restoration: failed to open state DB at %s: %v", statePath, err)
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions bundle/direct/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type BindResult struct {
func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath, resourceKey, resourceID string) (*BindResult, error) {
// Check if the resource is already managed (bound to a different ID)
var checkStateDB dstate.DeploymentState
if err := checkStateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err == nil {
if err := checkStateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil); err == nil {
existingID := checkStateDB.GetResourceID(resourceKey)
if _, err := checkStateDB.Finalize(ctx); err != nil {
log.Warnf(ctx, "failed to finalize state: %v", err)
Expand All @@ -86,7 +86,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
}

// Open temp state
err := b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(false), dstate.WithWrite(true))
err := b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(false), dstate.WithWrite(true), nil)
if err != nil {
os.Remove(tmpStatePath)
return nil, err
Expand All @@ -109,7 +109,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
log.Infof(ctx, "Bound %s to id=%s (in temp state)", resourceKey, resourceID)

// First plan + update: populate state with resolved config
err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false))
err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil)
if err != nil {
os.Remove(tmpStatePath)
return nil, err
Expand Down Expand Up @@ -144,7 +144,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
}
}

err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true))
err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true), nil)
if err != nil {
os.Remove(tmpStatePath)
return nil, err
Expand All @@ -164,7 +164,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac
}

// Second plan: this is the plan to present to the user (change between remote resource and config)
err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false))
err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false), nil)
if err != nil {
os.Remove(tmpStatePath)
return nil, err
Expand Down Expand Up @@ -214,7 +214,7 @@ func (result *BindResult) Cancel() {
// Unbind removes a resource from direct engine state without deleting
// the workspace resource. Also removes associated permissions/grants entries.
func (b *DeploymentBundle) Unbind(ctx context.Context, statePath, resourceKey string) error {
err := b.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(true))
err := b.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(true), nil)
if err != nil {
return err
}
Expand Down
104 changes: 104 additions & 0 deletions bundle/direct/dstate/dms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package dstate

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/databricks/databricks-sdk-go/apierr"
sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle"
)

// overlayDMSState replaces the file-derived resource state with the state
// recorded in the deployment metadata service, when DMS owns this deployment.
// Once DMS is authoritative its resource set is trusted even when empty (a
// successful deploy with no resources); the file's resources are only used when
// DMS has no successful version, or when the user opts out of recording
// deployment history. The caller holds db.mu and has already populated db.Data
// from the file.
func (db *DeploymentState) overlayDMSState(ctx context.Context, client sdkbundle.BundleInterface) error {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a serial based comparision and override here as a followup.

authoritative, err := deploymentHasSuccessfulVersion(ctx, client, db.Data.Lineage)
if err != nil {
return err
}
if !authoritative {
// DMS has no completed version for this lineage: a prior direct deployment
// that has not yet successfully recorded to DMS. Keep the file state.
return nil
}

resources, err := fetchDeploymentResources(ctx, client, db.Data.Lineage)
if err != nil {
return err
}

db.Data.State = resources
db.stateIDs = make(map[string]string, len(resources))
for key, entry := range resources {
db.stateIDs[key] = entry.ID
}
return nil
}

// deploymentHasSuccessfulVersion reports whether DMS holds a successfully
// completed version for the deployment. It is the signal that DMS owns the
// state: if the deployment was never recorded to DMS, or its initial DMS deploy
// did not complete successfully, DMS state is absent or partial and Open keeps
// the local file's resources instead.
func deploymentHasSuccessfulVersion(ctx context.Context, client sdkbundle.BundleInterface, deploymentID string) (bool, error) {
// Versions are listed newest-first and fetched page by page, and we stop at
// the first successful one, so a deployment with a long version history does
// not require reading the whole list (typically just the first page).
it := client.ListVersions(ctx, sdkbundle.ListVersionsRequest{
Parent: "deployments/" + deploymentID,
})
for it.HasNext(ctx) {
v, err := it.Next(ctx)
if err != nil {
// A deployment that was never recorded to DMS is not an error here: it just
// means DMS is not (yet) the source of truth.
if errors.Is(err, apierr.ErrNotFound) {
return false, nil
}
return false, fmt.Errorf("listing versions from deployment metadata service: %w", err)
}
if v.Status == sdkbundle.VersionStatusVersionStatusCompleted &&
v.CompletionReason == sdkbundle.VersionCompleteVersionCompleteSuccess {
return true, nil
}
}
return false, nil
}

// fetchDeploymentResources lists every resource recorded for the deployment in
// DMS and maps them into state entries keyed by the fully-qualified resource key.
func fetchDeploymentResources(ctx context.Context, client sdkbundle.BundleInterface, deploymentID string) (map[string]ResourceEntry, error) {
it := client.ListResources(ctx, sdkbundle.ListResourcesRequest{
Parent: "deployments/" + deploymentID,
})

out := make(map[string]ResourceEntry)
for it.HasNext(ctx) {
res, err := it.Next(ctx)
if err != nil {
return nil, fmt.Errorf("listing resources from deployment metadata service: %w", err)
}

// DMS reports resource keys without the "resources." prefix (e.g.
// "jobs.foo"), but the state DB keys are fully qualified
// ("resources.jobs.foo"), so prepend it here.
key := "resources." + res.ResourceKey

var state json.RawMessage
if res.State != nil {
state = *res.State
}

out[key] = ResourceEntry{
ID: res.ResourceId,
State: state,
}
}
return out, nil
}
Loading
Loading