Skip to content

Commit cf06706

Browse files
committed
Refactor checkpoint acknowledgment handling across connectors to use AssignCompositeSourceAck function for improved consistency and clarity. Added unit tests for order-by-only checkpoint acknowledgment in ClickHouse, PostgreSQL, and Trino connectors.
1 parent 8b0bbc7 commit cf06706

8 files changed

Lines changed: 71 additions & 20 deletions

File tree

internal/connectors/clickhouse.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,7 @@ func (c *ClickHouseSourceConnector) readRows(ctx context.Context, msgChan chan *
200200
if idIndex >= 0 && len(values) > idIndex {
201201
SetSourceRowIDMetadata(msg, values[idIndex])
202202
}
203-
if changeTime != nil {
204-
ct := *changeTime
205-
msg.Ack = c.cp.MakeAck(&ct, orderByVal, true)
206-
} else if orderByVal != nil {
207-
msg.Ack = c.cp.MakeAck(nil, orderByVal, false)
208-
}
203+
AssignCompositeSourceAck(msg, &c.cp, changeTime, orderByVal)
209204

210205
select {
211206
case msgChan <- msg:

internal/connectors/clickhouse_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
v1 "github.com/dataflow-operator/dataflow/api/v1"
2626
"github.com/dataflow-operator/dataflow/internal/checkpoint"
27+
"github.com/dataflow-operator/dataflow/internal/types"
2728
"github.com/go-logr/logr"
2829
"github.com/stretchr/testify/assert"
2930
"github.com/stretchr/testify/require"
@@ -285,6 +286,25 @@ func TestClickHouseSourceConnector_buildReadQuery(t *testing.T) {
285286
})
286287
}
287288

289+
func TestClickHouseSourceConnector_orderByOnlyCheckpointAck(t *testing.T) {
290+
c := NewClickHouseSourceConnector(&v1.ClickHouseSourceSpec{
291+
ConnectionString: "clickhouse://localhost:9000",
292+
Table: "mv_one_p_prices_migration",
293+
ChangeTrackingColumn: "material_id",
294+
OrderByColumn: "material_id",
295+
})
296+
msg := types.NewMessage([]byte(`{"material_id":200019}`))
297+
AssignCompositeSourceAck(msg, &c.cp, nil, int64(200019))
298+
require.NotNil(t, msg.Ack)
299+
msg.Ack()
300+
snap := c.cp.Snapshot()
301+
require.Equal(t, int64(200019), snap.OrderByValue)
302+
assert.Nil(t, snap.ChangeTime)
303+
304+
got := c.buildReadQuery()
305+
assert.Contains(t, got, "WHERE `material_id` > 200019")
306+
}
307+
288308
func TestClickHouseSourceConnector_applyInitialCheckpoint_legacy(t *testing.T) {
289309
opts := &SourceConnectorOptions{
290310
InitialCheckpoint: []byte(`{"lastReadID":100,"lastReadTime":"2024-06-01 12:00:00"}`),

internal/connectors/composite_source.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/dataflow-operator/dataflow/internal/checkpoint"
28+
"github.com/dataflow-operator/dataflow/internal/types"
2829
)
2930

3031
// SQLDialect formats SQL literals and identifiers for a database engine.
@@ -292,6 +293,20 @@ func (h *CompositeCheckpointHolder) Advance(next checkpoint.Composite, requireTi
292293
}
293294
}
294295

296+
// AssignCompositeSourceAck sets msg.Ack so the composite checkpoint advances after a successful sink write.
297+
// When changeTime is nil but orderByVal is set (e.g. non-timestamp changeTrackingColumn), orderBy-only ack is used.
298+
func AssignCompositeSourceAck(msg *types.Message, cp *CompositeCheckpointHolder, changeTime *time.Time, orderByVal interface{}) {
299+
if msg == nil || cp == nil {
300+
return
301+
}
302+
if changeTime != nil {
303+
ct := *changeTime
304+
msg.Ack = cp.MakeAck(&ct, orderByVal, true)
305+
} else if orderByVal != nil {
306+
msg.Ack = cp.MakeAck(nil, orderByVal, false)
307+
}
308+
}
309+
295310
// MakeAck returns a callback that advances the checkpoint after sink write.
296311
func (h *CompositeCheckpointHolder) MakeAck(changeTime *time.Time, orderBy interface{}, requireTime bool) func() {
297312
return func() {

internal/connectors/composite_source_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/client-go/kubernetes/fake"
2727

2828
"github.com/dataflow-operator/dataflow/internal/checkpoint"
29+
"github.com/dataflow-operator/dataflow/internal/types"
2930
"github.com/stretchr/testify/assert"
3031
"github.com/stretchr/testify/require"
3132
)
@@ -374,6 +375,17 @@ func TestCompositeCheckpointHolder_AdvanceAndSyncFlush(t *testing.T) {
374375
assert.Contains(t, string(loaded), "5042")
375376
}
376377

378+
func TestAssignCompositeSourceAck_orderByOnly(t *testing.T) {
379+
h := &CompositeCheckpointHolder{}
380+
msg := types.NewMessage([]byte(`{}`))
381+
AssignCompositeSourceAck(msg, h, nil, int64(200019))
382+
require.NotNil(t, msg.Ack)
383+
msg.Ack()
384+
snap := h.Snapshot()
385+
require.Equal(t, int64(200019), snap.OrderByValue)
386+
assert.Nil(t, snap.ChangeTime)
387+
}
388+
377389
func TestCompositeCheckpointHolder_orderOnlyLegacy(t *testing.T) {
378390
h := &CompositeCheckpointHolder{}
379391
h.ApplyInitial([]byte(`{"lastReadOrderByValue":50}`))

internal/connectors/postgresql.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,7 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
255255
SetSourceRowIDMetadata(msg, values[idIndex])
256256
}
257257
msg.Metadata["operation"] = operation
258-
if changeTime != nil {
259-
ct := *changeTime
260-
msg.Ack = p.cp.MakeAck(&ct, orderByVal, true)
261-
} else if orderByVal != nil {
262-
msg.Ack = p.cp.MakeAck(nil, orderByVal, false)
263-
}
258+
AssignCompositeSourceAck(msg, &p.cp, changeTime, orderByVal)
264259

265260
select {
266261
case msgChan <- msg:

internal/connectors/postgresql_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,7 @@ func TestPostgreSQLSourceConnector_orderByOnlyCheckpointAck(t *testing.T) {
176176
OrderByColumn: "material_id",
177177
})
178178
msg := types.NewMessage([]byte(`{"material_id":200019}`))
179-
orderByVal := int64(200019)
180-
msg.Ack = p.cp.MakeAck(nil, orderByVal, false)
179+
AssignCompositeSourceAck(msg, &p.cp, nil, int64(200019))
181180
require.NotNil(t, msg.Ack)
182181
msg.Ack()
183182
snap := p.cp.Snapshot()

internal/connectors/trino.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,7 @@ func (t *TrinoSourceConnector) readRows(ctx context.Context, msgChan chan *types
165165
msg.Metadata["table"] = t.config.Table
166166
SetSourceRowIDMetadata(msg, orderByVal)
167167

168-
if changeTime != nil {
169-
ct := *changeTime
170-
msg.Ack = t.cp.MakeAck(&ct, orderByVal, true)
171-
} else if orderByVal != nil {
172-
msg.Ack = t.cp.MakeAck(nil, orderByVal, false)
173-
}
168+
AssignCompositeSourceAck(msg, &t.cp, changeTime, orderByVal)
174169

175170
select {
176171
case msgChan <- msg:

internal/connectors/trino_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,26 @@ func TestTrinoSourceConnector_buildReadQuery(t *testing.T) {
611611
})
612612
}
613613

614+
func TestTrinoSourceConnector_orderByOnlyCheckpointAck(t *testing.T) {
615+
c := NewTrinoSourceConnector(&v1.TrinoSourceSpec{
616+
Catalog: "cat",
617+
Schema: "sch",
618+
Table: "mv_one_p_prices_migration",
619+
ChangeTrackingColumn: "material_id",
620+
OrderByColumn: "material_id",
621+
})
622+
msg := types.NewMessage([]byte(`{"material_id":200019}`))
623+
AssignCompositeSourceAck(msg, &c.cp, nil, int64(200019))
624+
require.NotNil(t, msg.Ack)
625+
msg.Ack()
626+
snap := c.cp.Snapshot()
627+
require.Equal(t, int64(200019), snap.OrderByValue)
628+
assert.Nil(t, snap.ChangeTime)
629+
630+
got := c.buildReadQuery()
631+
assert.Contains(t, got, "WHERE material_id > 200019")
632+
}
633+
614634
func TestTrinoSourceConnector_wrapQueryWithStableOrder(t *testing.T) {
615635
c := NewTrinoSourceConnector(&v1.TrinoSourceSpec{
616636
Catalog: "c",

0 commit comments

Comments
 (0)