Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions backends/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Package backends is the aggregator: a single import the apiserver pulls
// in to register every in-tree storage backend against a *registry.Backends.
//
// Pattern mirrors upstream kubeapiserver/options/plugins.go: the apiserver
// imports this one package; the package imports each backend; each backend
// self-describes via its Options struct. Adding a backend (postgres, kine,
// etc.) means one new import + one Register() call here — no apiserver
// change required.
package backends

import (
"github.com/kplane-dev/storage/backends/spanner"
"github.com/kplane-dev/storage/registry"
)

// RegisterBuiltin installs the in-tree backends into b. The apiserver
// constructs a *registry.Backends in main, calls RegisterBuiltin, then
// hands the registry to its options chain. External backends (out-of-tree)
// would call b.Register(<their>.NewOptions()) directly from a custom
// apiserver main, after RegisterBuiltin.
func RegisterBuiltin(b *registry.Backends) {
b.Register(spanner.NewOptions())
// Future backends:
// b.Register(postgres.NewOptions())
// b.Register(kine.NewOptions())
}
119 changes: 119 additions & 0 deletions backends/spanner/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package spanner

import (
"sync"

"k8s.io/klog/v2"
)

// watchEvent is the internal representation of a storage mutation,
// published through the broadcaster to all active watchers.
type watchEvent struct {
key string
value []byte
prevValue []byte
rev int64

isCreated bool
isDeleted bool
isProgress bool
}

// subscription is a channel-based subscription to broadcast events.
type subscription struct {
ch chan watchEvent
id uint64
closed bool
}

// Broadcaster fans out storage mutation events to all subscribed watchers.
// The write path (Create/Delete/GuaranteedUpdate) calls Publish() after each
// successful Spanner commit. This gives ~microsecond notification latency
// for in-process consumers (the cacher), avoiding the need for Change Streams
// on the hot path.
type Broadcaster struct {
mu sync.RWMutex
subscribers map[uint64]*subscription
nextID uint64

// highWaterMark tracks the highest revision seen, used for
// progress/bookmark events.
highWaterMark int64
}

// NewBroadcaster creates a new Broadcaster.
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
subscribers: make(map[uint64]*subscription),
}
}

// Publish sends an event to all subscribers. Non-blocking: if a subscriber's
// channel is full, the subscription is closed so the watcher detects the gap
// and the cacher can re-list (same behavior as etcd closing a slow watch).
func (b *Broadcaster) Publish(e watchEvent) {
b.mu.Lock()
defer b.mu.Unlock()

if e.rev > b.highWaterMark {
b.highWaterMark = e.rev
}

klog.V(4).Infof("broadcaster.Publish: key=%q rev=%d isCreated=%v isDeleted=%v isProgress=%v subscribers=%d",
e.key, e.rev, e.isCreated, e.isDeleted, e.isProgress, len(b.subscribers))

for id, sub := range b.subscribers {
if sub.closed {
continue
}
select {
case sub.ch <- e:
default:
// Subscriber can't keep up — close it so the watcher
// exits cleanly and the cacher re-establishes the watch.
sub.closed = true
close(sub.ch)
delete(b.subscribers, id)
}
}
}

// Subscribe creates a new subscription. The returned channel receives all
// events published after the subscription is created. bufSize controls the
// channel buffer depth.
func (b *Broadcaster) Subscribe(bufSize int) *subscription {
b.mu.Lock()
defer b.mu.Unlock()

if bufSize < 64 {
bufSize = 64
}

sub := &subscription{
ch: make(chan watchEvent, bufSize),
id: b.nextID,
}
b.nextID++
b.subscribers[sub.id] = sub
return sub
}

// Unsubscribe removes a subscription and closes its channel.
func (b *Broadcaster) Unsubscribe(sub *subscription) {
b.mu.Lock()
defer b.mu.Unlock()

if sub.closed {
return
}
sub.closed = true
close(sub.ch)
delete(b.subscribers, sub.id)
}

// HighWaterMark returns the highest revision seen by the broadcaster.
func (b *Broadcaster) HighWaterMark() int64 {
b.mu.RLock()
defer b.mu.RUnlock()
return b.highWaterMark
}
204 changes: 204 additions & 0 deletions backends/spanner/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package spanner

import (
"context"
"fmt"
"time"

"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
databasepb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
instancepb "cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"k8s.io/klog/v2"
)

// SpannerConfig holds configuration for connecting to a Spanner instance.
type SpannerConfig struct {
// Project is the GCP project ID.
Project string

// Instance is the Spanner instance ID.
Instance string

// Database is the Spanner database name.
Database string

// EmulatorHost overrides the Spanner endpoint for local development.
// When set, TLS is disabled and authentication is skipped.
// Format: "host:port" (e.g. "localhost:9010").
EmulatorHost string
}

// DatabasePath returns the fully qualified Spanner database path.
func (c SpannerConfig) DatabasePath() string {
return fmt.Sprintf("projects/%s/instances/%s/databases/%s", c.Project, c.Instance, c.Database)
}

// InstancePath returns the fully qualified Spanner instance path.
func (c SpannerConfig) InstancePath() string {
return fmt.Sprintf("projects/%s/instances/%s", c.Project, c.Instance)
}

// NewClient creates a Spanner client from the config.
func (c SpannerConfig) NewClient(ctx context.Context) (*spanner.Client, error) {
var opts []option.ClientOption
if c.EmulatorHost != "" {
opts = append(opts,
option.WithEndpoint(c.EmulatorHost),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
)
}
return spanner.NewClient(ctx, c.DatabasePath(), opts...)
}

// schemaDDL returns the DDL statements for the kv table and change stream.
var schemaDDL = []string{
`CREATE TABLE kv (
key STRING(MAX) NOT NULL,
value BYTES(MAX) NOT NULL,
mod_ts TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
create_ts TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
lease_ttl INT64,
) PRIMARY KEY (key)`,
`CREATE CHANGE STREAM kv_changes FOR kv
OPTIONS (value_capture_type = 'OLD_AND_NEW_VALUES')`,
}

// EnsureInstance creates the Spanner instance if it doesn't already exist.
// Intended for development / testing with the Spanner emulator.
func EnsureInstance(ctx context.Context, cfg SpannerConfig) error {
var opts []option.ClientOption
if cfg.EmulatorHost != "" {
opts = append(opts,
option.WithEndpoint(cfg.EmulatorHost),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
)
}
adminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return fmt.Errorf("creating instance admin client: %w", err)
}
defer adminClient.Close()

op, err := adminClient.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
Parent: fmt.Sprintf("projects/%s", cfg.Project),
InstanceId: cfg.Instance,
Instance: &instancepb.Instance{
Config: "emulator-config",
DisplayName: cfg.Instance,
NodeCount: 1,
},
})
if err != nil {
// Instance may already exist — not an error.
return nil
}
_, _ = op.Wait(ctx)
return nil
}

// EnsureSchema creates the database and applies the KV schema if it doesn't
// exist. Idempotent: a pre-existing database returns nil instead of an
// error, so the function is safe to call on every apiserver startup. This
// is the seam Options.Build() uses so a fresh Spanner backend doesn't
// require a separate schema-apply step out of band.
func EnsureSchema(ctx context.Context, cfg SpannerConfig) error {
var opts []option.ClientOption
if cfg.EmulatorHost != "" {
opts = append(opts,
option.WithEndpoint(cfg.EmulatorHost),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
)
}

adminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return fmt.Errorf("creating database admin client: %w", err)
}
defer adminClient.Close()

// Try to create the database with schema in one shot.
op, err := adminClient.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
Parent: cfg.InstancePath(),
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", cfg.Database),
ExtraStatements: schemaDDL,
})
if err != nil {
if isAlreadyExists(err) {
klog.V(2).InfoS("Spanner database already exists, skipping schema apply", "database", cfg.DatabasePath())
return nil
}
return fmt.Errorf("creating database: %w", err)
}

if _, err := op.Wait(ctx); err != nil {
if isAlreadyExists(err) {
klog.V(2).InfoS("Spanner database raced to exist, skipping schema apply", "database", cfg.DatabasePath())
return nil
}
return fmt.Errorf("waiting for database creation: %w", err)
}

klog.V(2).InfoS("Spanner database created with schema", "database", cfg.DatabasePath())
return nil
}

// isAlreadyExists returns true if err carries a gRPC AlreadyExists status —
// the signal the Spanner database admin API uses when CreateDatabase is
// called against a database that already exists. Extracted so EnsureSchema
// can treat re-runs as a no-op without parsing string messages.
func isAlreadyExists(err error) bool {
if err == nil {
return false
}
if s, ok := status.FromError(err); ok && s.Code() == codes.AlreadyExists {
return true
}
return false
}

// DropDatabase drops the specified Spanner database.
// Intended for test cleanup to avoid hitting emulator database limits.
func DropDatabase(ctx context.Context, cfg SpannerConfig) error {
var opts []option.ClientOption
if cfg.EmulatorHost != "" {
opts = append(opts,
option.WithEndpoint(cfg.EmulatorHost),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
)
}

adminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return fmt.Errorf("creating database admin client: %w", err)
}
defer adminClient.Close()

return adminClient.DropDatabase(ctx, &databasepb.DropDatabaseRequest{
Database: cfg.DatabasePath(),
})
}

// revisionFromCommitTimestamp converts a Spanner commit timestamp to a
// resource version. We use UnixNano which gives us ~292 years of headroom
// from epoch and natural monotonicity via TrueTime.
func revisionFromCommitTimestamp(ts time.Time) uint64 {
return uint64(ts.UnixNano())
}

// timestampFromRevision converts a resource version back to a time.Time
// for use in Spanner stale reads.
func timestampFromRevision(rv int64) time.Time {
return time.Unix(0, rv)
}
Loading