Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/docs/reference/project-files/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ _[boolean]_ - Indicates whether a secured SSL connection is required

_[string]_ - Cluster name, required for running distributed queries

### `sync_replicas`

_[boolean]_ - Controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster, ensuring all inserted parts are visible across replicas before the partition swap. Defaults to true

### `write_dsn`

_[string]_ - Separate connection string for write operations
Expand Down
4 changes: 4 additions & 0 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type configProperties struct {
SSL bool `mapstructure:"ssl"`
// Cluster name. If a cluster is configured, Rill will create all models in the cluster as distributed tables.
Cluster string `mapstructure:"cluster"`
// SyncReplicas controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster.
// This ensures all inserted parts are visible across replicas before the partition swap. Defaults to true.
SyncReplicas bool `mapstructure:"sync_replicas"`
// LogQueries controls whether to log the raw SQL passed to OLAP.Execute.
LogQueries bool `mapstructure:"log_queries"`
// QuerySettingsOverride overrides the default query settings used for OLAP SELECT queries.
Expand Down Expand Up @@ -226,6 +229,7 @@ func (d driver) Open(connectorName, instanceID string, config map[string]any, st
// Parse config properties
conf := &configProperties{
CanScaleToZero: true,
SyncReplicas: true,
MaxOpenConns: 20,
MaxIdleConns: 5,
}
Expand Down
34 changes: 32 additions & 2 deletions runtime/drivers/clickhouse/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,14 +710,44 @@ func (c *Connection) replacePartition(ctx context.Context, src, dest, part strin

// syncReplica syncs the local replicated table across the cluster
func (c *Connection) syncReplica(ctx context.Context, tableName string) error {
if c.config.Cluster == "" {
if c.config.Cluster == "" || !c.config.SyncReplicas {
return nil
}
// get current database
database, err := c.currentDatabase(ctx)
if err != nil {
return err
}
onClusterClause := "ON CLUSTER " + safeSQLName(c.config.Cluster)
return c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("SYSTEM SYNC REPLICA %s %s", onClusterClause, safeSQLName(localTableName(tableName))),
Query: fmt.Sprintf("SYSTEM SYNC REPLICA %s %s.%s", onClusterClause, safeSQLName(database), safeSQLName(localTableName(tableName))),
Priority: 1,
})
}

func (c *Connection) currentDatabase(ctx context.Context) (string, error) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being done at a couple of other places as well. As a follow up, I will query this once and set it in config to save these DB calls.

if c.config.Database != "" {
return c.config.Database, nil
}
var database string
rows, err := c.Query(ctx, &drivers.Statement{
Query: "SELECT currentDatabase()",
Priority: 1,
})
if err != nil {
return "", err
}
defer rows.Close()
for rows.Next() {
if err := rows.Scan(&database); err != nil {
return "", err
}
}
err = rows.Err()
if err != nil {
return "", err
}
return database, nil
}

func isReplicatedEngine(engine string) bool {
Expand Down
60 changes: 60 additions & 0 deletions runtime/drivers/clickhouse/olap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestClickhouseCluster(t *testing.T) {
t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, c, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite(t, c, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite_DatePartition", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t, c, olap) })
t.Run("SyncReplica_NonDefaultDatabase", func(t *testing.T) { testSyncReplicaNonDefaultDatabase(t, olap, dsn, cluster) })
t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, c, olap) })
t.Run("QueryAttributesAsSettings", func(t *testing.T) { testQueryAttributesAsSettings(t, olap) })
}
Expand Down Expand Up @@ -388,6 +389,65 @@ func testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t *testing.T,
}
}

func testSyncReplicaNonDefaultDatabase(t *testing.T, olap drivers.OLAPStore, dsn, cluster string) {
ctx := context.Background()

// create a non-default database on the cluster
err := olap.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS sync_repl_db ON CLUSTER %s", cluster)})
require.NoError(t, err)
t.Cleanup(func() {
_ = olap.Exec(context.Background(), &drivers.Statement{Query: fmt.Sprintf("DROP DATABASE IF EXISTS sync_repl_db ON CLUSTER %s", cluster)})
})

// open a connection scoped to the non-default database (the database is taken from the DSN path)
conn, err := drivers.Open("clickhouse", "", "default", map[string]any{"dsn": dsn + "/sync_repl_db", "cluster": cluster, "mode": "readwrite"}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
defer conn.Close()
c := conn.(*Connection)
dbOLAP, ok := conn.AsOLAP("default")
require.True(t, ok)

props := &ModelOutputProperties{
Engine: "ReplicatedMergeTree",
Table: "repl_tbl",
DistributedShardingKey: "rand()",
IncrementalStrategy: drivers.IncrementalStrategyPartitionOverwrite,
OrderBy: "id",
PartitionBy: "id",
PrimaryKey: "id",
}
_, err = c.createTableAsSelect(ctx, "repl_tbl", "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", props, "", "")
require.NoError(t, err)
t.Cleanup(func() { _ = c.dropTable(context.Background(), "repl_tbl") })

insertOpts := &InsertTableOptions{Strategy: drivers.IncrementalStrategyPartitionOverwrite}
_, err = c.insertTableAsSelect(ctx, "repl_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", insertOpts, props)
require.NoError(t, err)

res, err := dbOLAP.Query(ctx, &drivers.Statement{Query: "SELECT id, value FROM repl_tbl ORDER BY id"})
require.NoError(t, err)

resultSet := make(map[int]string)
for res.Next() {
var id int
var value string
require.NoError(t, res.Scan(&id, &value))
resultSet[id] = value
}
require.NoError(t, res.Err())
require.NoError(t, res.Close())

expected := map[int]string{
0: "insert",
1: "insert",
2: "replace",
3: "replace",
4: "replace",
5: "replace",
}
require.Equal(t, expected, resultSet)
}

func testDictionary(t *testing.T, c *Connection, olap drivers.OLAPStore) {
props := &ModelOutputProperties{
Typ: "DICTIONARY",
Expand Down
3 changes: 3 additions & 0 deletions runtime/parser/schema/project.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ definitions:
cluster:
type: string
description: Cluster name, required for running distributed queries
sync_replicas:
type: boolean
description: "Controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster, ensuring all inserted parts are visible across replicas before the partition swap. Defaults to true"
write_dsn:
type: string
description: Separate connection string for write operations
Expand Down
Loading