Skip to content

Commit cefabe0

Browse files
bundle/direct: conditionally upgrade state version when opted into DMS
Normal deploys keep writing state_version 2. When the bundle opts into experimental.record_deployment_history, the deploy upgrades the local state to dmsStateVersion (3) and stamps a new CurrentDmsVersion header field, so future DMS breaking changes or minimum-CLI requirements can be gated on what the state was written with. migrateState now reads up to dmsStateVersion but still auto-migrates only up to the baseline currentStateVersion, so a v3 state is neither auto-applied nor downgraded. The upgrade is applied explicitly in deployCore, never as an automatic migration. Co-authored-by: Isaac
1 parent 30db14f commit cefabe0

11 files changed

Lines changed: 172 additions & 12 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
bundle:
2+
name: test-rdh-state-upgrade
3+
4+
experimental:
5+
record_deployment_history: true
6+
7+
resources:
8+
jobs:
9+
foo:
10+
name: foo-job-renamed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
bundle:
2+
name: test-rdh-state-upgrade
3+
4+
resources:
5+
jobs:
6+
foo:
7+
name: foo-job

acceptance/bundle/deploy/record-deployment-history/state-upgrade/out.test.toml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
=== without the flag: state stays at the baseline schema version
3+
>>> [CLI] bundle deploy
4+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-state-upgrade/default/files...
5+
Deploying resources...
6+
Updating deployment state...
7+
Deployment complete!
8+
9+
>>> print_state.py
10+
{
11+
"state_version": 2,
12+
"current_dms_version": null
13+
}
14+
15+
=== with experimental.record_deployment_history: state upgrades and records the DMS version
16+
>>> [CLI] bundle deploy
17+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-rdh-state-upgrade/default/files...
18+
Deploying resources...
19+
Updating deployment state...
20+
Deployment complete!
21+
22+
>>> print_state.py
23+
{
24+
"state_version": 3,
25+
"current_dms_version": 1
26+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
title "without the flag: state stays at the baseline schema version"
2+
trace $CLI bundle deploy
3+
trace print_state.py | jq '{state_version, current_dms_version}'
4+
5+
title "with experimental.record_deployment_history: state upgrades and records the DMS version"
6+
# Also renames the job so the deploy writes state (a no-op deploy would not
7+
# rewrite the state file and the upgrade would not be observable yet).
8+
cp databricks.dms.yml databricks.yml
9+
trace $CLI bundle deploy
10+
trace print_state.py | jq '{state_version, current_dms_version}'
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Local = true
2+
Cloud = false
3+
4+
# databricks.yml is rewritten in-place by the script (flag added in the second step).
5+
Ignore = [".databricks", "databricks.yml"]
6+
7+
# The state-version upgrade only applies to the direct engine; terraform stores
8+
# state differently and would diverge.
9+
[EnvMatrix]
10+
DATABRICKS_BUNDLE_ENGINE = ["direct"]

bundle/direct/dstate/migrate.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ import (
1313
// migrateState runs all necessary migrations on the database.
1414
// It is called after loading state from disk.
1515
func migrateState(db *Database) error {
16-
if db.StateVersion == currentStateVersion {
17-
return nil
18-
}
19-
if db.StateVersion > currentStateVersion {
20-
return fmt.Errorf("state version %d is newer than supported version %d; upgrade the CLI", db.StateVersion, currentStateVersion)
16+
if db.StateVersion > dmsStateVersion {
17+
return fmt.Errorf("state version %d is newer than supported version %d; upgrade the CLI", db.StateVersion, dmsStateVersion)
2118
}
2219

20+
// Migrate legacy states forward to the baseline current version. A state at
21+
// dmsStateVersion is already current and skipped here; opting into DMS is an
22+
// explicit upgrade applied during deploy (see UpgradeToDMS), not an automatic
23+
// migration.
2324
for version := db.StateVersion; version < currentStateVersion; version++ {
2425
fn, ok := migrations[version]
2526
if !ok {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package dstate
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestMigrateStateLeavesCurrentUntouched(t *testing.T) {
11+
db := &Database{Header: Header{StateVersion: currentStateVersion}}
12+
require.NoError(t, migrateState(db))
13+
assert.Equal(t, currentStateVersion, db.StateVersion)
14+
}
15+
16+
func TestMigrateStateLeavesDMSStateUntouched(t *testing.T) {
17+
// A DMS-upgraded state is already current; it must not be downgraded to the
18+
// baseline version, and the recorded DMS version must be preserved.
19+
db := &Database{Header: Header{StateVersion: dmsStateVersion, CurrentDmsVersion: currentDmsVersion}}
20+
require.NoError(t, migrateState(db))
21+
assert.Equal(t, dmsStateVersion, db.StateVersion)
22+
assert.Equal(t, currentDmsVersion, db.CurrentDmsVersion)
23+
}
24+
25+
func TestMigrateStateRejectsNewerThanSupported(t *testing.T) {
26+
db := &Database{Header: Header{StateVersion: dmsStateVersion + 1}}
27+
err := migrateState(db)
28+
require.Error(t, err)
29+
assert.Contains(t, err.Error(), "upgrade the CLI")
30+
}

bundle/direct/dstate/state.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,32 @@ import (
2121
)
2222

2323
const (
24+
// currentStateVersion is the schema version written for normal deployments
25+
// and the target that older states are migrated up to on load.
26+
//
27+
// NOTE: the next bump to the baseline schema must go to 4, not 3 (which is
28+
// reserved by dmsStateVersion below), and should delete dmsStateVersion,
29+
// folding DMS state handling back into normal versioning.
2430
currentStateVersion = 2
25-
initialBufferSize = 64 * 1024
26-
maxWalEntrySize = 10 * 1024 * 1024
27-
walSuffix = ".wal"
31+
32+
// dmsStateVersion is the schema version written when the bundle opts into the
33+
// deployment metadata service via experimental.record_deployment_history. It
34+
// is also the newest version this CLI understands; newer states are rejected.
35+
//
36+
// It is kept separate from currentStateVersion on purpose: opting into DMS
37+
// must not force a state upgrade on everyone else. Non-DMS deploys stay at
38+
// currentStateVersion while only DMS opt-in bumps the state to this version.
39+
// Remove it once currentStateVersion is bumped (to 4) and the two reconcile.
40+
dmsStateVersion = 3
41+
42+
// currentDmsVersion is stamped into DMS-upgraded state. Bump it when the DMS
43+
// protocol changes in a way the CLI must record (e.g. a breaking change or a
44+
// new minimum CLI version) so future deploys can gate behavior on it.
45+
currentDmsVersion = 1
46+
47+
initialBufferSize = 64 * 1024
48+
maxWalEntrySize = 10 * 1024 * 1024
49+
walSuffix = ".wal"
2850
)
2951

3052
// errStaleWAL is returned when the WAL serial is behind the expected serial.
@@ -42,10 +64,16 @@ type DeploymentState struct {
4264
}
4365

4466
type Header struct {
45-
StateVersion int `json:"state_version"`
46-
CLIVersion string `json:"cli_version"`
47-
Lineage string `json:"lineage"`
48-
Serial int `json:"serial"`
67+
StateVersion int `json:"state_version"`
68+
69+
// CurrentDmsVersion records the deployment metadata service (DMS) protocol
70+
// version this state was written with. Set only for states opted into DMS
71+
// (see dmsStateVersion) and omitted otherwise.
72+
CurrentDmsVersion int `json:"current_dms_version,omitempty"`
73+
74+
CLIVersion string `json:"cli_version"`
75+
Lineage string `json:"lineage"`
76+
Serial int `json:"serial"`
4977
}
5078

5179
type Database struct {
@@ -414,6 +442,17 @@ func (db *DeploymentState) UpgradeToWrite() error {
414442
return appendJSONLine(db.walFile, walHead)
415443
}
416444

445+
// UpgradeToDMS marks the state as opted into the deployment metadata service
446+
// (DMS): it bumps the schema to dmsStateVersion and stamps the current DMS
447+
// version. The change is persisted on the next save. State must be open for write.
448+
func (db *DeploymentState) UpgradeToDMS() {
449+
db.AssertOpenedForWrite()
450+
db.mu.Lock()
451+
defer db.mu.Unlock()
452+
db.Data.StateVersion = dmsStateVersion
453+
db.Data.CurrentDmsVersion = currentDmsVersion
454+
}
455+
417456
func (db *DeploymentState) AssertOpenedForReadOrWrite() {
418457
if db.Path == "" {
419458
panic("internal error: DeploymentState must be opened first")

bundle/direct/dstate/state_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,24 @@ func TestOpenSaveFinalizeRoundTrip(t *testing.T) {
3232
mustFinalize(t, &db2)
3333
}
3434

35+
func TestUpgradeToDMSPersistsVersions(t *testing.T) {
36+
path := filepath.Join(t.TempDir(), "state.json")
37+
38+
var db DeploymentState
39+
require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true)))
40+
db.UpgradeToDMS()
41+
require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{"key": "val"}, nil))
42+
mustFinalize(t, &db)
43+
44+
// Re-open and verify the upgraded schema version and DMS version persisted,
45+
// and that loading the upgraded state does not error or downgrade it.
46+
var db2 DeploymentState
47+
require.NoError(t, db2.Open(t.Context(), path, WithRecovery(false), WithWrite(false)))
48+
assert.Equal(t, dmsStateVersion, db2.Data.StateVersion)
49+
assert.Equal(t, currentDmsVersion, db2.Data.CurrentDmsVersion)
50+
mustFinalize(t, &db2)
51+
}
52+
3553
func TestFinalizeWithNoEntriesDoesNotWriteStateFile(t *testing.T) {
3654
path := filepath.Join(t.TempDir(), "state.json")
3755

0 commit comments

Comments
 (0)