From acbdeebbb4cdc8f0f31e43746d414997973f0b3f Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Wed, 22 Apr 2026 09:38:11 -0400 Subject: [PATCH] fix: move patroni reload operation Patroni has an activity loop that they refer to as a 'run cycle'. Each run cycle is `loop_wait` seconds long. When a cycle takes less than `loop_wait` seconds, which is the typical case, Patroni will sleep until that time has elapsed. If the system is degraded and the cycle takes longer than `loop_wait`, it'll sleep for 1ms and log a warning. When we signal Patroni to reload its configuration, it will perform the reload at the beginning of the next run cycle. This means that under non-degraded conditions, it can take up to `loop_wait` seconds to reload its configuration. This commit fixes a bug where we were not waiting for the reload to complete. It was working most of the time because we signal Patroni to reload in the `UnitResource`, so the timing worked out that in a lot of cases we would get into `InstanceResource.Create` before the new config took effect, and it would take effect before we evaluated whether a restart was needed. Rather than doing the wait in the `InstanceResource`, which would happen on every update, I've moved the reload operation to each orchestrator's `PatroniConfig` resource. I've also added a sleep operation here to ensure that `loop_wait` seconds have elapsed before we proceed. This way, we can assume that the new configuration has taken effect in the `InstanceResource` lifecycle methods. This additional sleep does slow down some operations - especially in Swarm databases because the Patroni Config resource contains the entire instance spec, so there are plenty of conditions where there will be a resource diff even though there isn't a Patroni config change. We'll improve this condition when we adopt the new `common.PatroniConfigGenerator` in the `swarm.patroni_config` resource. PLAT-548 --- lima/roles/stop_dbs/tasks/main.yaml | 4 ++ server/internal/database/instance_resource.go | 42 +++++++------------ server/internal/database/utils.go | 7 ++++ .../common/patroni_config_generator.go | 2 +- .../orchestrator/swarm/patroni_config.go | 30 ++++++++++++- .../internal/orchestrator/systemd/client.go | 32 +++++++++++--- .../orchestrator/systemd/patroni_config.go | 32 +++++++++++++- server/internal/orchestrator/systemd/unit.go | 4 +- server/internal/patroni/config.go | 2 + server/internal/utils/utils.go | 9 ++++ 10 files changed, 125 insertions(+), 39 deletions(-) diff --git a/lima/roles/stop_dbs/tasks/main.yaml b/lima/roles/stop_dbs/tasks/main.yaml index dd6e8dc5..ff96a3b3 100644 --- a/lima/roles/stop_dbs/tasks/main.yaml +++ b/lima/roles/stop_dbs/tasks/main.yaml @@ -37,3 +37,7 @@ - name: Reload systemd ansible.builtin.command: systemctl daemon-reload changed_when: false + +- name: Clear journalctl + ansible.builtin.command: journalctl --flush --rotate --vacuum-time=1s + changed_when: false diff --git a/server/internal/database/instance_resource.go b/server/internal/database/instance_resource.go index ed3bf8fc..12f7a978 100644 --- a/server/internal/database/instance_resource.go +++ b/server/internal/database/instance_resource.go @@ -93,6 +93,13 @@ func (r *InstanceResource) Refresh(ctx context.Context, rc *resource.Context) er } func (r *InstanceResource) Create(ctx context.Context, rc *resource.Context) error { + if err := r.updateConnectionInfo(ctx, rc); err != nil { + return r.recordError(ctx, rc, err) + } + if err := WaitForPatroniRunning(ctx, r.patroniClient(), 0); err != nil { + err = fmt.Errorf("failed to wait for patroni to enter running state: %w", err) + return r.recordError(ctx, rc, err) + } if err := r.initializeInstance(ctx, rc); err != nil { return r.recordError(ctx, rc, err) } @@ -101,25 +108,17 @@ func (r *InstanceResource) Create(ctx context.Context, rc *resource.Context) err } func (r *InstanceResource) Update(ctx context.Context, rc *resource.Context) error { - // Get connection info from previous instance state in case the ports have - // changed. - previous, err := resource.FromContext[*InstanceResource](rc, r.Identifier()) - if err != nil { - return r.recordError(ctx, rc, fmt.Errorf("failed to get previous instance state: %w", err)) + if err := r.updateConnectionInfo(ctx, rc); err != nil { + return r.recordError(ctx, rc, err) } - // We fallback to computing the connection info from the spec if the - // previous instance state is malformed. - if previous.ConnectionInfo != nil { - r.ConnectionInfo = previous.ConnectionInfo - } else if err := r.updateConnectionInfo(ctx, rc); err != nil { + patroniClient := r.patroniClient() + if err := WaitForPatroniRunning(ctx, patroniClient, 0); err != nil { + err = fmt.Errorf("failed to wait for patroni to enter running state: %w", err) return r.recordError(ctx, rc, err) } - - if err := r.patroniClient().Reload(ctx); err != nil { - err = fmt.Errorf("failed to reload patroni conf: %w", err) + if err := r.restartIfNeeded(ctx, patroniClient); err != nil { return r.recordError(ctx, rc, err) } - if err := r.initializeInstance(ctx, rc); err != nil { return r.recordError(ctx, rc, err) } @@ -164,20 +163,7 @@ func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, } func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { - if err := r.updateConnectionInfo(ctx, rc); err != nil { - return err - } - - patroniClient := r.patroniClient() - - if err := WaitForPatroniRunning(ctx, patroniClient, 0); err != nil { - return fmt.Errorf("failed to wait for patroni to enter running state: %w", err) - } - if err := r.restartIfNeeded(ctx, patroniClient); err != nil { - return err - } - - primaryInstanceID, err := GetPrimaryInstanceID(ctx, patroniClient, time.Minute) + primaryInstanceID, err := GetPrimaryInstanceID(ctx, r.patroniClient(), time.Minute) if err != nil { return err } diff --git a/server/internal/database/utils.go b/server/internal/database/utils.go index 7d278363..b89cd9f3 100644 --- a/server/internal/database/utils.go +++ b/server/internal/database/utils.go @@ -9,6 +9,13 @@ import ( "github.com/pgEdge/control-plane/server/internal/utils" ) +// WaitForPatroniRunning polls the Patroni instance status endpoint until one of +// the following is true: +// - Patroni reports a running state +// - The context is canceled +// - The timeout has elapsed +// - We encounter more than 3 connection errors +// Giving a timeout of 0 will disable the timeout condition. func WaitForPatroniRunning(ctx context.Context, patroniClient *patroni.Client, timeout time.Duration) error { var cancel context.CancelFunc if timeout > 0 { diff --git a/server/internal/orchestrator/common/patroni_config_generator.go b/server/internal/orchestrator/common/patroni_config_generator.go index 69895c14..99ce4576 100644 --- a/server/internal/orchestrator/common/patroni_config_generator.go +++ b/server/internal/orchestrator/common/patroni_config_generator.go @@ -215,7 +215,7 @@ func (p *PatroniConfigGenerator) bootstrap(dcsParameters map[string]any) *patron {Plugin: utils.PointerTo("spock_output")}, }, TTL: utils.PointerTo(30), - LoopWait: utils.PointerTo(10), + LoopWait: utils.PointerTo(int(patroni.DefaultLoopWaitSeconds)), RetryTimeout: utils.PointerTo(10), }, InitDB: utils.PointerTo([]string{"data-checksums"}), diff --git a/server/internal/orchestrator/swarm/patroni_config.go b/server/internal/orchestrator/swarm/patroni_config.go index 1ca7b073..b045cd8a 100644 --- a/server/internal/orchestrator/swarm/patroni_config.go +++ b/server/internal/orchestrator/swarm/patroni_config.go @@ -8,6 +8,7 @@ import ( "maps" "net/url" "path/filepath" + "time" "github.com/alessio/shellescape" "github.com/samber/do" @@ -178,7 +179,11 @@ func (c *PatroniConfig) Create(ctx context.Context, rc *resource.Context) error } func (c *PatroniConfig) Update(ctx context.Context, rc *resource.Context) error { - return c.Create(ctx, rc) + if err := c.Create(ctx, rc); err != nil { + return err + } + + return c.signalReload(ctx, rc) } func (c *PatroniConfig) Delete(ctx context.Context, rc *resource.Context) error { @@ -214,6 +219,27 @@ func (c *PatroniConfig) isNewNode(rc *resource.Context) (bool, error) { } } +func (c *PatroniConfig) signalReload(ctx context.Context, rc *resource.Context) error { + client, err := do.Invoke[*docker.Docker](rc.Injector) + if err != nil { + return err + } + // Signal the container if it exists + container, err := GetPostgresContainer(ctx, client, c.Spec.InstanceID) + if errors.Is(err, ErrNoPostgresContainer) { + return nil + } else if err != nil { + return fmt.Errorf("failed to check if postgres container exists: %w", err) + } + if err := client.ContainerSignal(ctx, container.ID, "SIGHUP"); err != nil { + return fmt.Errorf("failed to signal patroni to reload: %w", err) + } + // It can take up to loop_wait seconds for Patroni to reload the config. + // We'll want to update this code to read loop_wait from c.Spec if we make + // loop_wait configurable. + return utils.SleepContext(ctx, patroni.DefaultLoopWaitSeconds*time.Second) +} + func generatePatroniConfig( spec *database.InstanceSpec, instanceHostname string, @@ -305,7 +331,7 @@ func generatePatroniConfig( {Plugin: utils.PointerTo("spock_output")}, }, TTL: utils.PointerTo(30), - LoopWait: utils.PointerTo(10), + LoopWait: utils.PointerTo(int(patroni.DefaultLoopWaitSeconds)), RetryTimeout: utils.PointerTo(10), }, InitDB: utils.PointerTo([]string{"data-checksums"}), diff --git a/server/internal/orchestrator/systemd/client.go b/server/internal/orchestrator/systemd/client.go index 1f938fec..617bfcf1 100644 --- a/server/internal/orchestrator/systemd/client.go +++ b/server/internal/orchestrator/systemd/client.go @@ -114,24 +114,46 @@ func (c *Client) StopUnit(ctx context.Context, name string, wait bool) error { return nil } -func (c *Client) RestartUnit(ctx context.Context, name string) error { +func (c *Client) ReloadUnit(ctx context.Context, name string) error { logger := c.logger.With().Str("unit", name).Logger() - logger.Debug().Msg("restarting unit") + logger.Debug().Msg("reloading unit") + + resCh := make(chan string, 1) + pid, err := c.conn.ReloadUnitContext(ctx, name, "replace", resCh) + if err != nil { + return fmt.Errorf("failed to reload unit '%s': %w", name, err) + } + res, err := awaitJob(ctx, resCh) + if err != nil { + return fmt.Errorf("failed to reload unit '%s': %w", name, err) + } + + c.logger.Debug(). + Str("response", res). + Int("pid", pid). + Msg("reloaded unit") + + return nil +} + +func (c *Client) ReloadOrRestartUnit(ctx context.Context, name string) error { + logger := c.logger.With().Str("unit", name).Logger() + logger.Debug().Msg("reloading or restarting unit") resCh := make(chan string, 1) pid, err := c.conn.ReloadOrRestartUnitContext(ctx, name, "replace", resCh) if err != nil { - return fmt.Errorf("failed to restart unit '%s': %w", name, err) + return fmt.Errorf("failed to reload or restart unit '%s': %w", name, err) } res, err := awaitJob(ctx, resCh) if err != nil { - return fmt.Errorf("failed to restart unit '%s': %w", name, err) + return fmt.Errorf("failed to reload or restart unit '%s': %w", name, err) } c.logger.Debug(). Str("response", res). Int("pid", pid). - Msg("restarted unit") + Msg("reloaded or restarted unit") return nil } diff --git a/server/internal/orchestrator/systemd/patroni_config.go b/server/internal/orchestrator/systemd/patroni_config.go index b06b73fb..0ac8a1b8 100644 --- a/server/internal/orchestrator/systemd/patroni_config.go +++ b/server/internal/orchestrator/systemd/patroni_config.go @@ -2,13 +2,17 @@ package systemd import ( "context" + "errors" "fmt" "strings" + "time" "github.com/pgEdge/control-plane/server/internal/ds" "github.com/pgEdge/control-plane/server/internal/host" "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/patroni" "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/utils" "github.com/samber/do" ) @@ -79,9 +83,35 @@ func (c *PatroniConfig) Create(ctx context.Context, rc *resource.Context) error } func (c *PatroniConfig) Update(ctx context.Context, rc *resource.Context) error { - return c.Create(ctx, rc) + if err := c.Create(ctx, rc); err != nil { + return err + } + + return c.signalReload(ctx, rc) } func (c *PatroniConfig) Delete(ctx context.Context, rc *resource.Context) error { return c.Base.Delete(ctx, rc) } + +func (c *PatroniConfig) signalReload(ctx context.Context, rc *resource.Context) error { + client, err := do.Invoke[*Client](rc.Injector) + if err != nil { + return err + } + // Reload patroni unit if it exists + name := patroniServiceName(c.Base.InstanceID) + err = client.UnitExists(ctx, name) + if errors.Is(err, ErrUnitNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to check if patroni unit exists: %w", err) + } + if err := client.ReloadUnit(ctx, name); err != nil { + return fmt.Errorf("failed to reload patroni: %w", err) + } + // It can take up to loop_wait seconds for Patroni to reload the config. + // We'll want to update this code to read loop_wait from c.Base if we make + // loop_wait configurable. + return utils.SleepContext(ctx, patroni.DefaultLoopWaitSeconds*time.Second) +} diff --git a/server/internal/orchestrator/systemd/unit.go b/server/internal/orchestrator/systemd/unit.go index 134f0988..bb21dbbc 100644 --- a/server/internal/orchestrator/systemd/unit.go +++ b/server/internal/orchestrator/systemd/unit.go @@ -95,8 +95,8 @@ func (r *UnitResource) Create(ctx context.Context, rc *resource.Context) error { if err := client.EnableUnit(ctx, r.Name); err != nil { return fmt.Errorf("failed to enable unit '%s': %w", path, err) } - if err := client.RestartUnit(ctx, r.Name); err != nil { - return fmt.Errorf("failed to restart unit '%s': %w", path, err) + if err := client.ReloadOrRestartUnit(ctx, r.Name); err != nil { + return fmt.Errorf("failed to reload or restart unit '%s': %w", path, err) } return nil diff --git a/server/internal/patroni/config.go b/server/internal/patroni/config.go index 49b9db3c..0a63bde3 100644 --- a/server/internal/patroni/config.go +++ b/server/internal/patroni/config.go @@ -4,6 +4,8 @@ import ( "github.com/pgEdge/control-plane/server/internal/storage" ) +const DefaultLoopWaitSeconds = 10 + func Namespace() string { return storage.Prefix("/", "patroni") } diff --git a/server/internal/utils/utils.go b/server/internal/utils/utils.go index 80eacf74..b1539953 100644 --- a/server/internal/utils/utils.go +++ b/server/internal/utils/utils.go @@ -57,6 +57,15 @@ func Retry(maxAttempts int, initialDelay time.Duration, f func() error) error { return nil } +func SleepContext(ctx context.Context, duration time.Duration) error { + select { + case <-time.After(duration): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func PointerTo[T any](v T) *T { return &v }