diff --git a/e2e/whole_cluster_test.go b/e2e/whole_cluster_test.go index bff4054c..d22c3b1f 100644 --- a/e2e/whole_cluster_test.go +++ b/e2e/whole_cluster_test.go @@ -9,8 +9,10 @@ import ( "time" "github.com/jackc/pgx/v5" - controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" "github.com/stretchr/testify/require" + + controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/pgEdge/control-plane/client" ) // TestWholeCluster deploys one instance to each host in the cluster. @@ -81,30 +83,58 @@ func TestWholeCluster(t *testing.T) { continue } - for instance := range db.GetInstances(WithNode(read.Name)) { - t.Logf("validating table on node %s, instance %s with role %s", read.Name, instance.ID, *instance.Postgres.Role) + primary := db.GetInstance(And(WithNode(read.Name), WithRole(client.RolePrimary))) + require.NotNil(t, primary) + + t.Logf("validating table on node %s, instance %s with role %s", read.Name, primary.ID, *primary.Postgres.Role) + + readOpts := ConnectionOptions{ + Instance: primary, + Username: username, + Password: password, + } + db.WithConnection(ctx, readOpts, t, func(conn *pgx.Conn) { + t.Log("waiting for replication to finish") + + var synced bool + row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", write.Name, syncLSN) + + require.NoError(t, row.Scan(&synced)) + require.True(t, synced) + + t.Log("selecting test data") + + var actual string + row = conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name)) + + require.NoError(t, row.Scan(&actual)) + require.Equal(t, "test", actual) + }) + + for replica := range db.GetInstances(And(WithNode(read.Name), WithRole(client.RoleReplica))) { + t.Logf("validating table on node %s, instance %s with role %s", read.Name, replica.ID, *replica.Postgres.Role) readOpts := ConnectionOptions{ - Instance: instance, + Instance: replica, Username: username, Password: password, } db.WithConnection(ctx, readOpts, t, func(conn *pgx.Conn) { - t.Log("waiting for replication to finish") + t.Log("polling until replica syncs to primary") + deadline := time.Now().Add(5 * time.Second) var synced bool - row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", write.Name, syncLSN) - - require.NoError(t, row.Scan(&synced)) + for !synced && time.Now().Before(deadline) { + var actual string + row := conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name)) + err := row.Scan(&actual) + if err != nil || actual != "test" { + time.Sleep(500 * time.Millisecond) + } else { + synced = true + } + } require.True(t, synced) - - t.Log("selecting test data") - - var actual string - row = conn.QueryRow(ctx, fmt.Sprintf(`SELECT data FROM %s WHERE id = 1;`, write.Name)) - - require.NoError(t, row.Scan(&actual)) - require.Equal(t, "test", actual) }) } }