diff --git a/docs/docs/reference/project-files/connectors.md b/docs/docs/reference/project-files/connectors.md index 05de056dd99..1fc1dc8618d 100644 --- a/docs/docs/reference/project-files/connectors.md +++ b/docs/docs/reference/project-files/connectors.md @@ -262,6 +262,10 @@ _[boolean]_ - Indicates whether a secured SSL connection is required _[string]_ - Cluster name, required for running distributed queries +### `sync_replicas` + +_[boolean]_ - Controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster, ensuring all inserted parts are visible across replicas before the partition swap. Defaults to true + ### `write_dsn` _[string]_ - Separate connection string for write operations diff --git a/runtime/drivers/clickhouse/clickhouse.go b/runtime/drivers/clickhouse/clickhouse.go index 07cf1c71c23..d646a5722b0 100644 --- a/runtime/drivers/clickhouse/clickhouse.go +++ b/runtime/drivers/clickhouse/clickhouse.go @@ -173,6 +173,9 @@ type configProperties struct { SSL bool `mapstructure:"ssl"` // Cluster name. If a cluster is configured, Rill will create all models in the cluster as distributed tables. Cluster string `mapstructure:"cluster"` + // SyncReplicas controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster. + // This ensures all inserted parts are visible across replicas before the partition swap. Defaults to true. + SyncReplicas bool `mapstructure:"sync_replicas"` // LogQueries controls whether to log the raw SQL passed to OLAP.Execute. LogQueries bool `mapstructure:"log_queries"` // QuerySettingsOverride overrides the default query settings used for OLAP SELECT queries. @@ -226,6 +229,7 @@ func (d driver) Open(connectorName, instanceID string, config map[string]any, st // Parse config properties conf := &configProperties{ CanScaleToZero: true, + SyncReplicas: true, MaxOpenConns: 20, MaxIdleConns: 5, } diff --git a/runtime/drivers/clickhouse/crud.go b/runtime/drivers/clickhouse/crud.go index cc1a9462b58..98841232111 100644 --- a/runtime/drivers/clickhouse/crud.go +++ b/runtime/drivers/clickhouse/crud.go @@ -710,14 +710,44 @@ func (c *Connection) replacePartition(ctx context.Context, src, dest, part strin // syncReplica syncs the local replicated table across the cluster func (c *Connection) syncReplica(ctx context.Context, tableName string) error { - if c.config.Cluster == "" { + if c.config.Cluster == "" || !c.config.SyncReplicas { return nil } + // get current database + database, err := c.currentDatabase(ctx) + if err != nil { + return err + } onClusterClause := "ON CLUSTER " + safeSQLName(c.config.Cluster) return c.Exec(ctx, &drivers.Statement{ - Query: fmt.Sprintf("SYSTEM SYNC REPLICA %s %s", onClusterClause, safeSQLName(localTableName(tableName))), + Query: fmt.Sprintf("SYSTEM SYNC REPLICA %s %s.%s", onClusterClause, safeSQLName(database), safeSQLName(localTableName(tableName))), + Priority: 1, + }) +} + +func (c *Connection) currentDatabase(ctx context.Context) (string, error) { + if c.config.Database != "" { + return c.config.Database, nil + } + var database string + rows, err := c.Query(ctx, &drivers.Statement{ + Query: "SELECT currentDatabase()", Priority: 1, }) + if err != nil { + return "", err + } + defer rows.Close() + for rows.Next() { + if err := rows.Scan(&database); err != nil { + return "", err + } + } + err = rows.Err() + if err != nil { + return "", err + } + return database, nil } func isReplicatedEngine(engine string) bool { diff --git a/runtime/drivers/clickhouse/olap_test.go b/runtime/drivers/clickhouse/olap_test.go index ebf1425543f..7cb20510ffe 100644 --- a/runtime/drivers/clickhouse/olap_test.go +++ b/runtime/drivers/clickhouse/olap_test.go @@ -66,6 +66,7 @@ func TestClickhouseCluster(t *testing.T) { t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, c, olap) }) t.Run("InsertTableAsSelect_WithPartitionOverwrite", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite(t, c, olap) }) t.Run("InsertTableAsSelect_WithPartitionOverwrite_DatePartition", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t, c, olap) }) + t.Run("SyncReplica_NonDefaultDatabase", func(t *testing.T) { testSyncReplicaNonDefaultDatabase(t, olap, dsn, cluster) }) t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, c, olap) }) t.Run("QueryAttributesAsSettings", func(t *testing.T) { testQueryAttributesAsSettings(t, olap) }) } @@ -388,6 +389,65 @@ func testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t *testing.T, } } +func testSyncReplicaNonDefaultDatabase(t *testing.T, olap drivers.OLAPStore, dsn, cluster string) { + ctx := context.Background() + + // create a non-default database on the cluster + err := olap.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS sync_repl_db ON CLUSTER %s", cluster)}) + require.NoError(t, err) + t.Cleanup(func() { + _ = olap.Exec(context.Background(), &drivers.Statement{Query: fmt.Sprintf("DROP DATABASE IF EXISTS sync_repl_db ON CLUSTER %s", cluster)}) + }) + + // open a connection scoped to the non-default database (the database is taken from the DSN path) + conn, err := drivers.Open("clickhouse", "", "default", map[string]any{"dsn": dsn + "/sync_repl_db", "cluster": cluster, "mode": "readwrite"}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + defer conn.Close() + c := conn.(*Connection) + dbOLAP, ok := conn.AsOLAP("default") + require.True(t, ok) + + props := &ModelOutputProperties{ + Engine: "ReplicatedMergeTree", + Table: "repl_tbl", + DistributedShardingKey: "rand()", + IncrementalStrategy: drivers.IncrementalStrategyPartitionOverwrite, + OrderBy: "id", + PartitionBy: "id", + PrimaryKey: "id", + } + _, err = c.createTableAsSelect(ctx, "repl_tbl", "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", props, "", "") + require.NoError(t, err) + t.Cleanup(func() { _ = c.dropTable(context.Background(), "repl_tbl") }) + + insertOpts := &InsertTableOptions{Strategy: drivers.IncrementalStrategyPartitionOverwrite} + _, err = c.insertTableAsSelect(ctx, "repl_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", insertOpts, props) + require.NoError(t, err) + + res, err := dbOLAP.Query(ctx, &drivers.Statement{Query: "SELECT id, value FROM repl_tbl ORDER BY id"}) + require.NoError(t, err) + + resultSet := make(map[int]string) + for res.Next() { + var id int + var value string + require.NoError(t, res.Scan(&id, &value)) + resultSet[id] = value + } + require.NoError(t, res.Err()) + require.NoError(t, res.Close()) + + expected := map[int]string{ + 0: "insert", + 1: "insert", + 2: "replace", + 3: "replace", + 4: "replace", + 5: "replace", + } + require.Equal(t, expected, resultSet) +} + func testDictionary(t *testing.T, c *Connection, olap drivers.OLAPStore) { props := &ModelOutputProperties{ Typ: "DICTIONARY", diff --git a/runtime/parser/schema/project.schema.yaml b/runtime/parser/schema/project.schema.yaml index 89bed113237..803ac675077 100644 --- a/runtime/parser/schema/project.schema.yaml +++ b/runtime/parser/schema/project.schema.yaml @@ -266,6 +266,9 @@ definitions: cluster: type: string description: Cluster name, required for running distributed queries + sync_replicas: + type: boolean + description: "Controls whether to run `SYSTEM SYNC REPLICA` before replacing partitions on a replicated table in a cluster, ensuring all inserted parts are visible across replicas before the partition swap. Defaults to true" write_dsn: type: string description: Separate connection string for write operations