Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 180 additions & 126 deletions cmd/confd/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,105 +376,175 @@ func (i *IMDSCmd) Run(cli *CLI) error {

// run is the shared execution function for all backends
func run(cli *CLI, backendCfg backends.Config) error {
// Load TOML config file if it exists (for defaults)
if err := loadConfigFile(cli, &backendCfg); err != nil {
resolvedBackendCfg, err := buildBackendConfig(cli, backendCfg)
if err != nil {
return err
}
applyBackendDefaults(&backendCfg)

// Process environment variables
configureLogging(cli)

// Check-config mode: validate configuration and exit (no backend needed)
if cli.CheckConfig {
return template.ValidateConfig(cli.ConfDir, cli.Resource)
}

// Validate mode: validate templates and exit (no backend needed)
if cli.Validate {
return template.ValidateTemplates(cli.ConfDir, cli.Resource, cli.MockData)
}

if err := applySRVDiscovery(cli, &resolvedBackendCfg); err != nil {
return err
}

log.Info("Starting confd")
log.Info("Backend set to %s", resolvedBackendCfg.Backend)

template.InitTemplateCache(cli.TemplateCache, cli.StatCacheTTL)

storeClient, err := backends.New(resolvedBackendCfg)
if err != nil {
return err
}

storeClient, metricsServer := startObservability(cli.MetricsAddr, resolvedBackendCfg.Backend, storeClient)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tmplCfg, err := buildTemplateConfig(cli, storeClient, ctx)
if err != nil {
return err
}

// Preflight mode: run connectivity checks and exit
if cli.Preflight {
return template.Preflight(tmplCfg)
}

// One-time mode
if cli.Onetime {
if err := template.Process(tmplCfg); err != nil {
return err
}
return nil
}

// Continuous mode with processor
channels := newProcessorChannels()

// Create reload manager for SIGHUP handling
reloadMgr := service.NewReloadManager()
reloadChan := reloadMgr.Subscribe()

processor := buildProcessor(cli, tmplCfg, channels, reloadChan)
go processor.Process()

// Create shutdown manager for graceful shutdown coordination
shutdownMgr := service.NewShutdownManager(cli.ShutdownTimeout, metricsServer, storeClient)

// Create systemd notifier and start watchdog if enabled
systemdNotifier := service.NewSystemdNotifier(cli.SystemdNotify, cli.WatchdogInterval)
systemdNotifier.StartWatchdog(ctx)

// Notify systemd that we're ready
if err := systemdNotifier.NotifyReady(); err != nil {
log.Warning("Failed to notify systemd ready: %v", err)
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
defer signal.Stop(signalChan)

return superviseRuntime(cancel, channels, signalChan, reloadMgr, shutdownMgr, systemdNotifier)
}

func buildBackendConfig(cli *CLI, backendCfg backends.Config) (backends.Config, error) {
if err := loadConfigFile(cli, &backendCfg); err != nil {
return backendCfg, err
}
applyBackendDefaults(&backendCfg)
processEnv(&backendCfg)
applyConnectionSettings(cli, &backendCfg)
return backendCfg, nil
}

// Apply CLI timeout/retry config to backend config
func applyConnectionSettings(cli *CLI, backendCfg *backends.Config) {
backendCfg.DialTimeout = cli.DialTimeout
backendCfg.ReadTimeout = cli.ReadTimeout
backendCfg.WriteTimeout = cli.WriteTimeout
backendCfg.RetryMaxAttempts = cli.RetryMaxAttempts
backendCfg.RetryBaseDelay = cli.RetryBaseDelay
backendCfg.RetryMaxDelay = cli.RetryMaxDelay
}

// Set up logging
func configureLogging(cli *CLI) {
if cli.LogLevel != "" {
log.SetLevel(cli.LogLevel)
}
if cli.LogFormat != "" {
log.SetFormat(cli.LogFormat)
}
}

// Check-config mode: validate configuration and exit (no backend needed)
if cli.CheckConfig {
return template.ValidateConfig(cli.ConfDir, cli.Resource)
func applySRVDiscovery(cli *CLI, backendCfg *backends.Config) error {
if cli.SRVDomain != "" && cli.SRVRecord == "" {
cli.SRVRecord = fmt.Sprintf("_%s._tcp.%s.", backendCfg.Backend, cli.SRVDomain)
}

// Validate mode: validate templates and exit (no backend needed)
if cli.Validate {
return template.ValidateTemplates(cli.ConfDir, cli.Resource, cli.MockData)
if backendCfg.Backend == "env" || cli.SRVRecord == "" {
return nil
}

// Handle SRV record discovery
if cli.SRVDomain != "" && cli.SRVRecord == "" {
cli.SRVRecord = fmt.Sprintf("_%s._tcp.%s.", backendCfg.Backend, cli.SRVDomain)
log.Info("SRV record set to %s", cli.SRVRecord)
srvNodes, err := getBackendNodesFromSRV(cli.SRVRecord)
if err != nil {
return fmt.Errorf("cannot get nodes from SRV records: %w", err)
}
if backendCfg.Backend != "env" && cli.SRVRecord != "" {
log.Info("SRV record set to %s", cli.SRVRecord)
srvNodes, err := getBackendNodesFromSRV(cli.SRVRecord)
if err != nil {
return fmt.Errorf("cannot get nodes from SRV records: %w", err)
if backendCfg.Backend == "etcd" {
for i, v := range srvNodes {
srvNodes[i] = backendCfg.Scheme + "://" + v
}
if backendCfg.Backend == "etcd" {
for i, v := range srvNodes {
srvNodes[i] = backendCfg.Scheme + "://" + v
}
}
backendCfg.BackendNodes = srvNodes
}
backendCfg.BackendNodes = srvNodes
return nil
}

log.Info("Starting confd")
log.Info("Backend set to %s", backendCfg.Backend)

// Initialize template cache
template.InitTemplateCache(cli.TemplateCache, cli.StatCacheTTL)

// Create store client
storeClient, err := backends.New(backendCfg)
if err != nil {
return err
func startObservability(addr, backendName string, storeClient backends.StoreClient) (backends.StoreClient, *http.Server) {
if addr == "" {
return storeClient, nil
}

// Start metrics server if configured
var metricsServer *http.Server
if cli.MetricsAddr != "" {
metrics.Initialize()
storeClient = metrics.WrapStoreClient(storeClient, backendCfg.Backend)
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{}))
mux.HandleFunc("/health", metrics.HealthHandler(storeClient))
mux.HandleFunc("/ready", metrics.ReadyHandler(storeClient))
mux.HandleFunc("/ready/detailed", metrics.ReadyDetailedHandler(storeClient))
metricsServer = &http.Server{
Addr: cli.MetricsAddr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
metrics.Initialize()
wrappedClient := metrics.WrapStoreClient(storeClient, backendName)
metricsServer := buildMetricsServer(addr, wrappedClient)
go func() {
log.Info("Starting metrics server on %s", addr)
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error("Metrics server error: %v", err)
}
go func() {
log.Info("Starting metrics server on %s", cli.MetricsAddr)
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error("Metrics server error: %v", err)
}
}()
}
}()
return wrappedClient, metricsServer
}

// Create root context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func buildMetricsServer(addr string, storeClient backends.StoreClient) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{}))
mux.HandleFunc("/health", metrics.HealthHandler(storeClient))
mux.HandleFunc("/ready", metrics.ReadyHandler(storeClient))
mux.HandleFunc("/ready/detailed", metrics.ReadyDetailedHandler(storeClient))
return &http.Server{
Addr: addr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
}
}

// Parse failure mode
func buildTemplateConfig(cli *CLI, storeClient backends.StoreClient, ctx context.Context) (template.Config, error) {
failureMode, err := template.ParseFailureMode(cli.FailureMode)
if err != nil {
return err
return template.Config{}, err
}

// Build template config
tmplCfg := template.Config{
ConfDir: cli.ConfDir,
ConfigDir: filepath.Join(cli.ConfDir, "conf.d"),
Expand All @@ -496,88 +566,61 @@ func run(cli *CLI, backendCfg backends.Config) error {
FailureMode: failureMode,
}

// Parse watch mode duration flags
if cli.DebounceStr != "" {
d, err := time.ParseDuration(cli.DebounceStr)
if err != nil {
return fmt.Errorf("invalid debounce duration %q: %w", cli.DebounceStr, err)
return template.Config{}, fmt.Errorf("invalid debounce duration %q: %w", cli.DebounceStr, err)
}
tmplCfg.Debounce = d
}
if cli.BatchIntervalStr != "" {
d, err := time.ParseDuration(cli.BatchIntervalStr)
if err != nil {
return fmt.Errorf("invalid batch-interval duration %q: %w", cli.BatchIntervalStr, err)
return template.Config{}, fmt.Errorf("invalid batch-interval duration %q: %w", cli.BatchIntervalStr, err)
}
tmplCfg.BatchInterval = d
}

// Preflight mode: run connectivity checks and exit
if cli.Preflight {
return template.Preflight(tmplCfg)
}

// One-time mode
if cli.Onetime {
if err := template.Process(tmplCfg); err != nil {
return err
}
return nil
}
return tmplCfg, nil
}

// Continuous mode with processor
stopChan := make(chan bool)
doneChan := make(chan bool)
errChan := make(chan error, 10)
type processorChannels struct {
stop chan bool
done chan bool
err chan error
}

// Create reload manager for SIGHUP handling
reloadMgr := service.NewReloadManager()
reloadChan := reloadMgr.Subscribe()
func newProcessorChannels() processorChannels {
return processorChannels{
stop: make(chan bool),
done: make(chan bool),
err: make(chan error, 10),
}
}

var processor template.Processor
func buildProcessor(cli *CLI, tmplCfg template.Config, channels processorChannels, reloadChan <-chan struct{}) template.Processor {
if cli.Watch {
if tmplCfg.BatchInterval > 0 {
// Use batch processor when --batch-interval is specified
log.Info("Batch processing enabled with interval %v", tmplCfg.BatchInterval)
processor = template.BatchWatchProcessor(tmplCfg, stopChan, doneChan, errChan, reloadChan)
} else {
processor = template.WatchProcessor(tmplCfg, stopChan, doneChan, errChan, reloadChan)
return template.BatchWatchProcessor(tmplCfg, channels.stop, channels.done, channels.err, reloadChan)
}
} else {
processor = template.IntervalProcessor(tmplCfg, stopChan, doneChan, errChan, cli.Interval, reloadChan)
}

go processor.Process()

// Create shutdown manager for graceful shutdown coordination
shutdownMgr := service.NewShutdownManager(cli.ShutdownTimeout, metricsServer, storeClient)

// Create systemd notifier and start watchdog if enabled
systemdNotifier := service.NewSystemdNotifier(cli.SystemdNotify, cli.WatchdogInterval)
systemdNotifier.StartWatchdog(ctx)

// Notify systemd that we're ready
if err := systemdNotifier.NotifyReady(); err != nil {
log.Warning("Failed to notify systemd ready: %v", err)
}

shutdown := func() error {
if err := shutdownMgr.Shutdown(context.Background()); err != nil {
log.Error("Shutdown error: %v", err)
return err
}
if err := template.CloseAllCachedClients(); err != nil {
log.Warning("Error closing per-resource backend clients: %v", err)
}
return nil
return template.WatchProcessor(tmplCfg, channels.stop, channels.done, channels.err, reloadChan)
}
return template.IntervalProcessor(tmplCfg, channels.stop, channels.done, channels.err, cli.Interval, reloadChan)
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
func superviseRuntime(
cancel context.CancelFunc,
channels processorChannels,
signalChan <-chan os.Signal,
reloadMgr *service.ReloadManager,
shutdownMgr *service.ShutdownManager,
systemdNotifier *service.SystemdNotifier,
) error {
var stopOnce sync.Once
for {
select {
case err := <-errChan:
case err := <-channels.err:
log.Error("%s", err.Error())
case s := <-signalChan:
switch s {
Expand All @@ -599,16 +642,27 @@ func run(cli *CLI, backendCfg backends.Config) error {
log.Warning("Failed to notify systemd stopping: %v", err)
}
cancel() // Cancel context to signal all goroutines
stopOnce.Do(func() { close(stopChan) })
<-doneChan
return shutdown()
stopOnce.Do(func() { close(channels.stop) })
<-channels.done
return shutdownRuntime(shutdownMgr)
}
case <-doneChan:
return shutdown()
case <-channels.done:
return shutdownRuntime(shutdownMgr)
}
}
}

func shutdownRuntime(shutdownMgr *service.ShutdownManager) error {
if err := shutdownMgr.Shutdown(context.Background()); err != nil {
log.Error("Shutdown error: %v", err)
return err
}
if err := template.CloseAllCachedClients(); err != nil {
log.Warning("Error closing per-resource backend clients: %v", err)
}
return nil
}

func applyBackendDefaults(cfg *backends.Config) {
if len(cfg.BackendNodes) > 0 {
return
Expand Down
Loading
Loading