Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions e2e/whole_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"time"

"github.com/jackc/pgx/v5"
controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane"
"github.com/stretchr/testify/require"
Comment thread
jason-lynch marked this conversation as resolved.

controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane"
"github.com/pgEdge/control-plane/client"
)

// TestWholeCluster deploys one instance to each host in the cluster.
Expand Down Expand Up @@ -81,30 +83,58 @@ func TestWholeCluster(t *testing.T) {
continue
}

for instance := range db.GetInstances(WithNode(read.Name)) {
t.Logf("validating table on node %s, instance %s with role %s", read.Name, instance.ID, *instance.Postgres.Role)
primary := db.GetInstance(And(WithNode(read.Name), WithRole(client.RolePrimary)))
require.NotNil(t, primary)

t.Logf("validating table on node %s, instance %s with role %s", read.Name, primary.ID, *primary.Postgres.Role)

readOpts := ConnectionOptions{
Instance: primary,
Username: username,
Password: password,
}
db.WithConnection(ctx, readOpts, t, func(conn *pgx.Conn) {
t.Log("waiting for replication to finish")

var synced bool
row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", write.Name, syncLSN)

require.NoError(t, row.Scan(&synced))
require.True(t, synced)

t.Log("selecting test data")

var actual string
row = conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name))

require.NoError(t, row.Scan(&actual))
require.Equal(t, "test", actual)
})

for replica := range db.GetInstances(And(WithNode(read.Name), WithRole(client.RoleReplica))) {
t.Logf("validating table on node %s, instance %s with role %s", read.Name, replica.ID, *replica.Postgres.Role)

readOpts := ConnectionOptions{
Instance: instance,
Instance: replica,
Username: username,
Password: password,
}
db.WithConnection(ctx, readOpts, t, func(conn *pgx.Conn) {
t.Log("waiting for replication to finish")
t.Log("polling until replica syncs to primary")

deadline := time.Now().Add(5 * time.Second)
var synced bool
row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", write.Name, syncLSN)

require.NoError(t, row.Scan(&synced))
for !synced && time.Now().Before(deadline) {
var actual string
row := conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name))
err := row.Scan(&actual)
if err != nil || actual != "test" {
time.Sleep(500 * time.Millisecond)
} else {
synced = true
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
require.True(t, synced)

t.Log("selecting test data")

var actual string
row = conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name))

require.NoError(t, row.Scan(&actual))
require.Equal(t, "test", actual)
})
}
}
Expand Down