diff --git a/api/cluster_manager.go b/api/cluster_manager.go index 236290fdb..291128425 100644 --- a/api/cluster_manager.go +++ b/api/cluster_manager.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + "github.com/canonical/lxd/lxd/response" "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/api" "github.com/canonical/microcluster/v3/microcluster/types" @@ -49,7 +50,12 @@ func clusterManagerGet(state types.State, r *http.Request) types.Response { return types.SmartError(err) } - clusterManager, clusterManagerConfig, err := database.LoadClusterManager(state, r.Context(), name) + clusterManager, err := database.LoadClusterManager(state, r.Context(), name) + if err != nil { + return types.SmartError(err) + } + + clusterManagerConfig, err := database.LoadClusterManagerConfigs(state, r.Context(), clusterManager.ID) if err != nil { return types.SmartError(err) } @@ -97,7 +103,7 @@ func clusterManagerPost(sh *service.Handler) func(state types.State, r *http.Req } // ensure cluster manager is not already configured - existingClusterManager, _, err := database.LoadClusterManager(state, r.Context(), args.Name) + existingClusterManager, err := database.LoadClusterManager(state, r.Context(), args.Name) if err != nil { if api.StatusErrorCheck(err, http.StatusNotFound) { // ignore, this is the expected path @@ -163,7 +169,7 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response { return types.SmartError(err) } - clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name) + clusterManager, err := database.LoadClusterManager(state, r.Context(), name) if err != nil { return types.SmartError(err) } @@ -191,13 +197,21 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response { } } - if args.UpdateInterval != nil { - err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateInterval) + if args.UpdateIntervalSeconds != nil { + err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateIntervalSeconds) if err != nil { return types.SmartError(err) } } + if args.ReverseTunnel != nil { + reverseTunnelValue := strconv.FormatBool(*args.ReverseTunnel) + err = database.StoreClusterManagerConfig(state, r.Context(), name, database.ReverseTunnelKey, reverseTunnelValue) + if err != nil { + return response.SmartError(err) + } + } + return types.SyncResponse(true, nil) } @@ -215,7 +229,7 @@ func clusterManagerDelete(sh *service.Handler) func(state types.State, r *http.R return types.SmartError(err) } - clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name) + clusterManager, err := database.LoadClusterManager(state, r.Context(), name) if err != nil { return types.SmartError(err) } diff --git a/api/types/cluster_manager.go b/api/types/cluster_manager.go index 53d6592ec..05e605ee8 100644 --- a/api/types/cluster_manager.go +++ b/api/types/cluster_manager.go @@ -1,7 +1,11 @@ package types import ( + "net/http" + "sync" "time" + + "github.com/gorilla/websocket" ) // ClusterManagersPost represents the cluster manager configuration when receiving a POST request in MicroCloud. @@ -54,7 +58,11 @@ type ClusterManagerPut struct { // Interval in seconds to send status messages to the cluster manager // Example: 60 - UpdateInterval *string `json:"update_interval" yaml:"update_interval"` + UpdateIntervalSeconds *string `json:"update_interval_seconds" yaml:"update_interval_seconds"` + + // Enables or disables the reverse tunnel to the cluster manager + // Example: true, false + ReverseTunnel *bool `json:"reverse_tunnel" yaml:"reverse_tunnel"` } // StatusDistribution represents the distribution of items. @@ -100,3 +108,28 @@ type ClusterManagerJoin struct { ClusterCertificate string `json:"cluster_certificate" yaml:"cluster_certificate"` Token string `json:"token" yaml:"token"` } + +// ClusterManagerTunnel represents the tunnel connection to the cluster manager. +type ClusterManagerTunnel struct { + Mu sync.RWMutex + WsConn *websocket.Conn + Wg sync.WaitGroup +} + +// ClusterManagerTunnelRequest represents the request received through the tunnel. +type ClusterManagerTunnelRequest struct { + UUID string `json:"uuid"` + Method string `json:"method"` + Path string `json:"path"` + Headers http.Header `json:"headers"` + Body []byte `json:"body"` +} + +// ClusterManagerTunnelResponse represents the response sent through the tunnel. +type ClusterManagerTunnelResponse struct { + UUID string `json:"uuid"` + Status int `json:"status"` + Headers http.Header `json:"headers"` + Cookies []*http.Cookie `json:"cookies"` + Body []byte `json:"body"` +} diff --git a/client/cluster_manager_client.go b/client/cluster_manager_client.go index 183ac3a59..febfafd95 100644 --- a/client/cluster_manager_client.go +++ b/client/cluster_manager_client.go @@ -10,10 +10,12 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/version" + "github.com/gorilla/websocket" "github.com/canonical/microcloud/microcloud/api/types" "github.com/canonical/microcloud/microcloud/database" @@ -84,6 +86,26 @@ func (c *ClusterManagerClient) Delete(clusterCert *shared.CertInfo) error { return err } +// ConnectTunnel establishes a WebSocket connection to the cluster manager for reverse tunneling. +func (c *ClusterManagerClient) ConnectTunnel(clusterCert *shared.CertInfo) (*websocket.Conn, error) { + tlsConfig, address, err := c.getTlsConfig(clusterCert) + if err != nil { + return nil, fmt.Errorf("Failed to get TLS config: %w", err) + } + + dialer := websocket.Dialer{ + TLSClientConfig: tlsConfig, + } + + u := url.URL{Scheme: "wss", Host: address, Path: "/1.0/remote-cluster/ws"} + conn, _, err := dialer.Dial(u.String(), nil) + if err != nil { + return nil, err + } + + return conn, nil +} + func (c *ClusterManagerClient) craftRequest(method string, path string, reqBody io.Reader) (*http.Request, error) { url := "https://remote" + path // remote is a placeholder, real address will be set in sendRequest req, err := http.NewRequest(method, url, reqBody) @@ -122,7 +144,19 @@ func (c *ClusterManagerClient) sendRequest(clusterCert *shared.CertInfo, req *ht func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*http.Client, string, error) { client := &http.Client{} + tlsConfig, address, err := c.getTlsConfig(clusterCert) + if err != nil { + return nil, "", fmt.Errorf("Failed to get TLS config: %w", err) + } + client.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, + } + + return client, address, nil +} + +func (c *ClusterManagerClient) getTlsConfig(clusterCert *shared.CertInfo) (*tls.Config, string, error) { var address string var remoteCert *x509.Certificate var err error @@ -171,9 +205,5 @@ func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*htt return &cert, nil } - client.Transport = &http.Transport{ - TLSClientConfig: tlsConfig, - } - - return client, address, nil + return tlsConfig, address, nil } diff --git a/cmd/microcloud/cluster_manager.go b/cmd/microcloud/cluster_manager.go index 78544708f..0acf57181 100644 --- a/cmd/microcloud/cluster_manager.go +++ b/cmd/microcloud/cluster_manager.go @@ -201,11 +201,11 @@ func (c *cmdClusterManagerGet) command() *cobra.Command { cmd.Use = "get" cmd.Short = "Get specific cluster manager configuration by key." cmd.Example = cli.FormatSection("", `microcloud cluster-manager get addresses -microcloud cluster-manager get certificate-fingerprint -microcloud cluster-manager get update-interval-seconds -microcloud cluster-manager get status-last-success-time -microcloud cluster-manager get status-last-error-time -microcloud cluster-manager get status-last-error-response`) +microcloud cluster-manager get certificate_fingerprint +microcloud cluster-manager get update_interval_seconds +microcloud cluster-manager get status_last_success_time +microcloud cluster-manager get status_last_error_time +microcloud cluster-manager get status_last_error_response`) cmd.RunE = c.run @@ -233,20 +233,26 @@ func (c *cmdClusterManagerGet) run(_ *cobra.Command, args []string) error { switch key { case "addresses": fmt.Printf("%s\n", strings.Join(clusterManager.Addresses, ", ")) - case "certificate-fingerprint": + case "certificate_fingerprint": fmt.Printf("%s\n", clusterManager.CertificateFingerprint) - case "update-interval-seconds": + case "update_interval_seconds": value, ok := clusterManager.Config[database.UpdateIntervalSecondsKey] if ok { fmt.Printf("%s\n", value) } - case "status-last-success-time": + case "status_last_success_time": fmt.Printf("%s\n", clusterManager.StatusLastSuccessTime) - case "status-last-error-time": + case "status_last_error_time": fmt.Printf("%s\n", clusterManager.StatusLastErrorTime) - case "status-last-error-response": + case "status_last_error_response": fmt.Printf("%s\n", clusterManager.StatusLastErrorResponse) + case "reverse_tunnel": + value, ok := clusterManager.Config[database.ReverseTunnelKey] + if ok { + fmt.Printf("%s\n", value) + } + default: return errors.New("Invalid key") } @@ -265,8 +271,9 @@ func (c *cmdClusterManagerSet) command() *cobra.Command { cmd.Use = "set" cmd.Short = "Set specific cluster manager configuration key." cmd.Example = cli.FormatSection("", `microcloud cluster-manager set addresses example.com:8443 -microcloud cluster-manager set certificate-fingerprint abababababababababababababababababababababababababababababababab -microcloud cluster-manager set update-interval-seconds 50`) +microcloud cluster-manager set certificate_fingerprint abababababababababababababababababababababababababababababababab +microcloud cluster-manager set update_interval_seconds 50 +microcloud cluster-manager set reverse_tunnel true`) cmd.RunE = c.run @@ -291,10 +298,16 @@ func (c *cmdClusterManagerSet) run(_ *cobra.Command, args []string) error { switch key { case "addresses": payload.Addresses = []string{value} - case "certificate-fingerprint": + case "certificate_fingerprint": payload.CertificateFingerprint = &value - case "update-interval-seconds": - payload.UpdateInterval = &value + case "update_interval_seconds": + payload.UpdateIntervalSeconds = &value + case "reverse_tunnel": + if value != "true" && value != "false" { + return errors.New("Invalid value for reverse_tunnel, expected 'true' or 'false'") + } + + payload.ReverseTunnel = new(value == "true") default: return errors.New("Invalid key") } @@ -317,7 +330,7 @@ func (c *cmdClusterManagerUnset) command() *cobra.Command { cmd := &cobra.Command{} cmd.Use = "unset" cmd.Short = "Unset specific cluster manager configuration key." - cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update-interval-seconds`) + cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update_interval_seconds`) cmd.RunE = c.run @@ -339,9 +352,10 @@ func (c *cmdClusterManagerUnset) run(_ *cobra.Command, args []string) error { payload := types.ClusterManagerPut{} switch key { - case "update-interval-seconds": - emptyString := "" - payload.UpdateInterval = &emptyString + case "update_interval_seconds": + payload.UpdateIntervalSeconds = new("") + case "reverse_tunnel": + payload.ReverseTunnel = new(false) default: return errors.New("Invalid key") } diff --git a/cmd/microcloudd/cluster_manager_task.go b/cmd/microcloudd/cluster_manager_task.go index feb03065b..a1e3929b9 100644 --- a/cmd/microcloudd/cluster_manager_task.go +++ b/cmd/microcloudd/cluster_manager_task.go @@ -11,6 +11,7 @@ import ( "github.com/canonical/lxd/shared/api" "github.com/canonical/lxd/shared/logger" microTypes "github.com/canonical/microcluster/v3/microcluster/types" + "golang.org/x/sync/errgroup" "github.com/canonical/microcloud/microcloud/api/types" "github.com/canonical/microcloud/microcloud/client" @@ -20,7 +21,8 @@ import ( // SendClusterManagerStatusMessageTask starts a go routine, that sends periodic status messages to cluster manager. func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handler, s microTypes.State) { - go func(ctx context.Context, sh *service.Handler, s microTypes.State) { + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { ticker := time.NewTicker(database.UpdateIntervalDefaultSeconds * time.Second) defer ticker.Stop() @@ -33,15 +35,22 @@ func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handle } case <-ctx.Done(): - return // exit the loop and close the go routine + return nil // exit the loop and close the go routine } } - }(ctx, sh, s) + }) + + go func() { + err := g.Wait() + if err != nil { + logger.Error("Failed to wait for send cluster manager status routine", logger.Ctx{"err": err}) + } + }() } func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s microTypes.State) time.Duration { logger.Debug("Starting sendClusterManagerStatusMessage") - var nextUpdate time.Duration = 0 + var nextUpdate = time.Duration(database.UpdateIntervalDefaultSeconds) * time.Second cloud := sh.Services[types.MicroCloud].(*service.CloudService) isInitialized, err := cloud.IsInitialized(ctx) @@ -55,7 +64,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s return nextUpdate } - clusterManager, clusterManagerConfig, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName) + clusterManager, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName) if err != nil { if err.Error() == "Cluster manager not found" { logger.Debug("Cluster manager not configured, skipping status message") @@ -66,17 +75,10 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s return nextUpdate } - for _, config := range clusterManagerConfig { - if config.Key == database.UpdateIntervalSecondsKey { - interval, err := time.ParseDuration(config.Value + "s") - if err != nil { - logger.Error("Failed to parse update interval", logger.Ctx{"err": err}) - return nextUpdate - } - - nextUpdate = interval - break - } + nextUpdate, err = database.LoadClusterManagerUpdateIntervalSeconds(s, ctx, clusterManager.ID) + if err != nil { + logger.Error("Failed to fetch cluster manager update interval", logger.Ctx{"err": err}) + return nextUpdate } leaderClient, err := s.Database().Leader(ctx) @@ -99,7 +101,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s payload := types.ClusterManagerPostStatus{} lxdService := sh.Services[types.LXD].(*service.LXDService) - lxdClient, err := lxdService.Client(context.Background()) + lxdClient, err := lxdService.Client(ctx) if err != nil { logger.Error("Failed to get LXD client", logger.Ctx{"err": err}) return nextUpdate diff --git a/cmd/microcloudd/cluster_manager_tunnel.go b/cmd/microcloudd/cluster_manager_tunnel.go new file mode 100644 index 000000000..79b7f7656 --- /dev/null +++ b/cmd/microcloudd/cluster_manager_tunnel.go @@ -0,0 +1,283 @@ +package main + +import ( + "bytes" + "context" + "crypto/x509" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/canonical/lxd/shared" + "github.com/canonical/lxd/shared/logger" + "github.com/canonical/lxd/shared/version" + microTypes "github.com/canonical/microcluster/v3/microcluster/types" + "golang.org/x/sync/errgroup" + + "github.com/canonical/microcloud/microcloud/api/types" + "github.com/canonical/microcloud/microcloud/client" + "github.com/canonical/microcloud/microcloud/database" + "github.com/canonical/microcloud/microcloud/service" +) + +const TunnelCheckIntervalSeconds = 10 + +// ReconcileClusterManagerTunnel starts a go routine, that ensures the tunnel to cluster manager is in the right state. +func ReconcileClusterManagerTunnel(ctx context.Context, sh *service.Handler, s microTypes.State) { + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { + // tunnel object to hold the websocket connection and its mutex for safe concurrent access + tunnel := &types.ClusterManagerTunnel{ + WsConn: nil, // This will be set when the websocket connection is established + Mu: sync.RWMutex{}, + Wg: sync.WaitGroup{}, + } + + ticker := time.NewTicker(TunnelCheckIntervalSeconds * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ensureTunnel(ctx, sh, s, tunnel) + + case <-ctx.Done(): + ensureTunnelClosed(tunnel) + tunnel.Wg.Wait() // wait for the inner routine to finish if it's running + return nil // exit the loop and close the go routine + } + } + }) + + go func() { + err := g.Wait() + if err != nil { + logger.Error("Failed to wait for reconcile cluster manager tunnel routine", logger.Ctx{"err": err}) + } + }() +} + +func ensureTunnel(ctx context.Context, sh *service.Handler, s microTypes.State, tunnel *types.ClusterManagerTunnel) { + logger.Debug("Starting ensureTunnel") + + cloud := sh.Services[types.MicroCloud].(*service.CloudService) + isInitialized, err := cloud.IsInitialized(ctx) + if err != nil { + logger.Error("Failed to check if MicroCloud is initialized", logger.Ctx{"err": err}) + return + } + + if !isInitialized { + logger.Debug("MicroCloud not initialized") + return + } + + leaderClient, err := s.Database().Leader(ctx) + if err != nil { + logger.Error("Failed to get database leader client", logger.Ctx{"err": err}) + return + } + + leaderInfo, err := leaderClient.Leader(ctx) + if err != nil { + logger.Error("Failed to get database leader info", logger.Ctx{"err": err}) + return + } + + if leaderInfo.Address != s.Address().Host { + ensureTunnelClosed(tunnel) + return + } + + clusterManager, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName) + if err != nil { + logger.Error("Failed to load cluster manager config", logger.Ctx{"err": err}) + return + } + + needsTunnel, err := database.LoadClusterManagerReverseTunnel(s, ctx, clusterManager.ID) + if err != nil { + logger.Error("Failed to load reverse tunnel", logger.Ctx{"err": err}) + return + } + + clusterManagerClient := client.NewClusterManagerClient(clusterManager) + clusterCert, err := cloud.ClusterCert() + if err != nil { + logger.Error("Failed to get cluster certificate", logger.Ctx{"err": err}) + return + } + + tunnel.Mu.Lock() + hasConnection := tunnel.WsConn != nil + tunnel.Mu.Unlock() + + if needsTunnel && hasConnection { + logger.Debug("Tunnel already connected") + return + } + + if needsTunnel && !hasConnection { + logger.Debug("Tunnel not connected, opening") + tunnel.Wg.Add(1) + go func() { + defer tunnel.Wg.Done() + openTunnel(ctx, sh, tunnel, clusterManagerClient, clusterCert) + }() + return + } + + if !needsTunnel && hasConnection { + logger.Debug("Tunnel connected but should be disabled, closing") + ensureTunnelClosed(tunnel) + return + } + + logger.Debug("Tunnel disabled, finished ensure tunnel") +} + +func openTunnel(ctx context.Context, sh *service.Handler, tunnel *types.ClusterManagerTunnel, clusterManagerClient *client.ClusterManagerClient, clusterCert *shared.CertInfo) { + conn, err := clusterManagerClient.ConnectTunnel(clusterCert) + if err != nil { + logger.Error("Failed to connect cluster manager tunnel", logger.Ctx{"err": err}) + return + } + + defer func() { + err := conn.Close() + if err != nil { + logger.Error("Failed to close cluster manager tunnel", logger.Ctx{"err": err}) + } + }() + + // Get the server certificate + lxdService := sh.Services[types.LXD].(*service.LXDService) + lxdClient, err := lxdService.Client(ctx) + if err != nil { + logger.Error("Failed to connect to LXD service", logger.Ctx{"err": err}) + return + } + + server, _, err := lxdClient.GetServer() + if err != nil { + logger.Error("Failed to get LXD server info", logger.Ctx{"err": err}) + return + } + + lxdServerCert := server.Environment.Certificate + lxdHttpsAddress := fmt.Sprint(server.Config["core.https_address"]) + + tunnel.Mu.Lock() + tunnel.WsConn = conn + tunnel.Mu.Unlock() + + logger.Debug("Tunnel with cluster manager connected") + + for { + var req types.ClusterManagerTunnelRequest + err = conn.ReadJSON(&req) + if err != nil { + logger.Error("Cluster manager tunnel read error:", logger.Ctx{"err": err}) + ensureTunnelClosed(tunnel) + return + } + + logger.Debug("Cluster manager tunnel request received:", logger.Ctx{"path": req.Path}) + resp := handleTunnelRequest(req, lxdServerCert, lxdHttpsAddress) + + // Send back the response + err = conn.WriteJSON(resp) + if err != nil { + logger.Error("Cluster manager tunnel write error:", logger.Ctx{"err": err}) + ensureTunnelClosed(tunnel) + return + } + } +} + +func handleTunnelRequest(req types.ClusterManagerTunnelRequest, lxdServerCert string, lxdHttpsAddress string) types.ClusterManagerTunnelResponse { + if !strings.HasPrefix(req.Path, "/1.0/") { + logger.Warn("Received tunnel request with invalid path prefix", logger.Ctx{"path": req.Path}) + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusBadRequest} + } + + if req.Method != http.MethodGet && req.Method != http.MethodPost && req.Method != http.MethodPut && req.Method != http.MethodDelete { + logger.Warn("Received tunnel request with unsupported HTTP method", logger.Ctx{"method": req.Method}) + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusMethodNotAllowed} + } + + if lxdHttpsAddress == "[::]:8443" || lxdHttpsAddress == ":8443" { + lxdHttpsAddress = "127.0.0.1:8443" + } + + targetURL := "https://" + lxdHttpsAddress + req.Path + newReq, err := http.NewRequest(req.Method, targetURL, bytes.NewReader(req.Body)) + if err != nil { + logger.Error("Failed to create new HTTP request for tunnel", logger.Ctx{"err": err}) + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusInternalServerError} + } + + newReq.Header.Set("Cookie", req.Headers.Get("Cookie")) + newReq.Header.Set("Authorization", req.Headers.Get("Authorization")) + newReq.Header.Set("User-Agent", version.UserAgent+" (cookiejar)") + + tlsConfig := shared.InitTLSConfig() + tlsConfig.RootCAs = x509.NewCertPool() + ok := tlsConfig.RootCAs.AppendCertsFromPEM([]byte(lxdServerCert)) + if !ok { + logger.Error("Failed to parse LXD server certificate") + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusUnauthorized} + } + + lxdHttpClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: 30 * time.Second, + } + + lxdResponse, err := lxdHttpClient.Do(newReq) + if err != nil { + logger.Error("Error from LXD client query", logger.Ctx{"err": err, "path": req.Path, "method": req.Method}) + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusInternalServerError} + } + + defer func() { + err = lxdResponse.Body.Close() + if err != nil { + logger.Error("Failed to close LXD response body", logger.Ctx{"err": err}) + } + }() + + responseBody, err := io.ReadAll(lxdResponse.Body) + if err != nil { + logger.Error("Failed to marshal LXD response", logger.Ctx{"err": err}) + return types.ClusterManagerTunnelResponse{UUID: req.UUID, Status: http.StatusInternalServerError} + } + + return types.ClusterManagerTunnelResponse{ + UUID: req.UUID, + Status: lxdResponse.StatusCode, + Body: responseBody, + Cookies: lxdResponse.Cookies(), + Headers: lxdResponse.Header, + } +} + +func ensureTunnelClosed(tunnel *types.ClusterManagerTunnel) { + tunnel.Mu.Lock() + defer tunnel.Mu.Unlock() + if tunnel.WsConn == nil { + return + } + + logger.Debug("Closing cluster manager tunnel") + err := tunnel.WsConn.Close() + tunnel.WsConn = nil + if err != nil { + logger.Error("Failed to close cluster manager tunnel", logger.Ctx{"err": err}) + } +} diff --git a/cmd/microcloudd/main.go b/cmd/microcloudd/main.go index dff579afa..8a94539dd 100644 --- a/cmd/microcloudd/main.go +++ b/cmd/microcloudd/main.go @@ -184,6 +184,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error { return setHandlerAddress(state.Address().Host) }, OnStart: func(ctx context.Context, state microTypes.State) error { + ReconcileClusterManagerTunnel(ctx, s, state) SendClusterManagerStatusMessageTask(ctx, s, state) // If we are already initialized, there's nothing to do. diff --git a/database/cluster_manager_config.go b/database/cluster_manager_config.go index 34b26278b..ebe46928c 100644 --- a/database/cluster_manager_config.go +++ b/database/cluster_manager_config.go @@ -3,7 +3,9 @@ package database import ( "context" "database/sql" + "errors" "net/http" + "strconv" "time" "github.com/canonical/lxd/shared/api" @@ -14,31 +16,106 @@ import ( const ClusterManagerDefaultName = "default" // UpdateIntervalSecondsKey is the key for the update interval configuration. -const UpdateIntervalSecondsKey = "update-interval-seconds" +const UpdateIntervalSecondsKey = "update_interval_seconds" + +// ReverseTunnelKey is the key for enabling or disabling the websocket in configuration. +const ReverseTunnelKey = "reverse_tunnel" // UpdateIntervalDefaultSeconds is the interval for the status update task if none is defined in the database. const UpdateIntervalDefaultSeconds = 60 // LoadClusterManager loads the cluster manager configuration from the database. -func LoadClusterManager(state types.State, ctx context.Context, name string) (*ClusterManager, []ClusterManagerConfig, error) { +func LoadClusterManager(state types.State, ctx context.Context, name string) (*ClusterManager, error) { clusterManager, err := loadClusterManagerFromDb(ctx, state, name) if err != nil { - return nil, nil, err + return nil, err + } + + return clusterManager, nil +} + +// LoadClusterManagerConfigs loads all cluster manager configurations from the database. +func LoadClusterManagerConfigs(state types.State, ctx context.Context, clusterManagerId int64) ([]ClusterManagerConfig, error) { + var clusterManagerConfig []ClusterManagerConfig + var err error + err = state.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { + clusterManagerConfig, err = GetClusterManagerConfig(ctx, tx, ClusterManagerConfigFilter{ + ClusterManagerID: &clusterManagerId, + }) + + return err + }) + if err != nil { + return nil, err } + return clusterManagerConfig, nil +} + +// LoadClusterManagerSingleConfig loads a single cluster manager configuration by key from the database. +func LoadClusterManagerSingleConfig(state types.State, ctx context.Context, clusterManagerId int64, configKey string) (*ClusterManagerConfig, error) { var clusterManagerConfig []ClusterManagerConfig + var err error err = state.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { clusterManagerConfig, err = GetClusterManagerConfig(ctx, tx, ClusterManagerConfigFilter{ - ClusterManagerID: &clusterManager.ID, + ClusterManagerID: &clusterManagerId, + Key: &configKey, }) return err }) if err != nil { - return nil, nil, err + return nil, err + } + + if len(clusterManagerConfig) == 0 { + return nil, nil } - return clusterManager, clusterManagerConfig, nil + if len(clusterManagerConfig) > 1 { + return nil, errors.New("Multiple cluster manager configs found") + } + + return &clusterManagerConfig[0], nil +} + +// LoadClusterManagerUpdateIntervalSeconds loads the cluster manager update interval configuration from the database. +func LoadClusterManagerUpdateIntervalSeconds(state types.State, ctx context.Context, clusterManagerId int64) (time.Duration, error) { + defaultInterval := time.Duration(UpdateIntervalDefaultSeconds) * time.Second + updateIntervalConfig, err := LoadClusterManagerSingleConfig(state, ctx, clusterManagerId, UpdateIntervalSecondsKey) + if err != nil { + return defaultInterval, err + } + + if updateIntervalConfig == nil { + return defaultInterval, nil + } + + updateInterval, err := time.ParseDuration(updateIntervalConfig.Value + "s") + if err != nil { + return defaultInterval, err + } + + return updateInterval, nil +} + +// LoadClusterManagerReverseTunnel loads the cluster manager reverse tunnel configuration from the database. +func LoadClusterManagerReverseTunnel(state types.State, ctx context.Context, clusterManagerId int64) (bool, error) { + reverseTunnelConfig, err := LoadClusterManagerSingleConfig(state, ctx, clusterManagerId, ReverseTunnelKey) + if err != nil { + return false, err + } + + if reverseTunnelConfig == nil { + return false, nil + } + + reverseTunnel, err := strconv.ParseBool(reverseTunnelConfig.Value) + if err != nil { + return false, err + } + + return reverseTunnel, nil } // StoreClusterManager stores the cluster manager configuration in the database. @@ -61,7 +138,12 @@ func RemoveClusterManager(state types.State, ctx context.Context, clusterManager // StoreClusterManagerConfig stores the cluster manager configuration in the database. func StoreClusterManagerConfig(state types.State, ctx context.Context, name string, key string, value string) error { - clusterManager, clusterManagerConfig, err := LoadClusterManager(state, ctx, name) + clusterManager, err := LoadClusterManager(state, ctx, name) + if err != nil { + return err + } + + clusterManagerConfig, err := LoadClusterManagerConfigs(state, ctx, clusterManager.ID) if err != nil { return err } diff --git a/test/suites/cluster-manager.sh b/test/suites/cluster-manager.sh index d681f9513..759438a97 100644 --- a/test/suites/cluster-manager.sh +++ b/test/suites/cluster-manager.sh @@ -15,19 +15,60 @@ test_cluster_manager() { echo "==> Test cluster manager without previous setup" lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager show 2>&1 | grep "Error: Cluster manager not found" -q lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager delete 2>&1 | grep "Error: Cluster manager not found" -q - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update-interval-seconds 2>&1 | grep "Error: Cluster manager not found" -q - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update-interval-seconds 10 2>&1 | grep "Error: Cluster manager not found" -q - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager unset update-interval-seconds 2>&1 | grep "Error: Cluster manager not found" -q + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update_interval_seconds 2>&1 | grep "Error: Cluster manager not found" -q + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update_interval_seconds 10 2>&1 | grep "Error: Cluster manager not found" -q + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager unset update_interval_seconds 2>&1 | grep "Error: Cluster manager not found" -q echo "==> Create a cert for dummy cluster manager" lxc exec micro01 -- openssl req -x509 -newkey rsa:2048 -nodes -keyout key.pem -out cert.pem -days 1 -subj "/CN=localhost" -addext "subjectAltName = DNS:localhost" echo "==> Start dummy cluster manager server" - lxc exec micro01 -- sh -c "nohup sh -c '( - while true; do - printf \"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n\" - sleep 1 - done - ) | openssl s_server -accept 3000 -key key.pem -cert cert.pem -quiet > /tmp/openssl_server.log 2>&1 &'" & + lxc exec micro01 -- sh -c "cat > ws_server.py << 'PYEOF' +import ssl, socket, hashlib, base64 + +WS_MAGIC = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' +HIT_FILE = 'ws_hit' + +ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) +ctx.load_cert_chain('cert.pem', 'key.pem') + +srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +srv.bind(('127.0.0.1', 3000)) +srv.listen(5) + +while True: + raw, _ = srv.accept() + conn = ctx.wrap_socket(raw, server_side=True) + data = b'' + while b'\r\n\r\n' not in data: + data += conn.recv(4096) + headers = {} + lines = data.split(b'\r\n') + request_line = lines[0].decode() + for line in lines[1:]: + if b':' in line: + k, v = line.split(b':', 1) + headers[k.strip().lower()] = v.strip() + path = request_line.split(' ')[1] if ' ' in request_line else '' + if headers.get(b'upgrade', b'').lower() == b'websocket' and path == '/1.0/remote-cluster/ws': + key = headers.get(b'sec-websocket-key', b'').decode() + accept = base64.b64encode(hashlib.sha1((key + WS_MAGIC).encode()).digest()).decode() + response = ( + 'HTTP/1.1 101 Switching Protocols\r\n' + 'Upgrade: websocket\r\n' + 'Connection: Upgrade\r\n' + f'Sec-WebSocket-Accept: {accept}\r\n\r\n' + ) + conn.sendall(response.encode()) + open(HIT_FILE, 'w').write('hit') + import time; time.sleep(30) + else: + conn.sendall(b'HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n') + import time; time.sleep(1) + conn.close() +PYEOF" + lxc exec micro01 -- sh -c "nohup sh -c 'nohup python3 ws_server.py > ws_server.log 2>&1'" & + echo "==> Create a token for connecting to cluster manager" fingerprint=$(lxc exec micro01 -- openssl x509 -in cert.pem -noout -fingerprint -sha256 | cut -d'=' -f2 | tr -d ':' | tr 'A-F' 'a-f') token=$(echo '{"secret":"not_so_secret","expires_at":"2125-04-10T12:32:00Z","addresses":["localhost:3000"],"server_name":"localhost","fingerprint":"'"$fingerprint"'"}' | base64 -w0) @@ -39,11 +80,11 @@ test_cluster_manager() { lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager join "$token" | grep "Successfully joined cluster manager" -q echo "==> Run cluster manager commands" - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update-interval-seconds | grep "60" -q - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update-interval-seconds 15 - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update-interval-seconds | grep "15" -q - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager unset update-interval-seconds - lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update-interval-seconds 60 + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update_interval_seconds | grep "60" -q + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update_interval_seconds 15 + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get update_interval_seconds | grep "15" -q + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager unset update_interval_seconds + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set update_interval_seconds 60 lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager show | grep "certificate_fingerprint:" -q echo "==> Delete cluster manager" @@ -51,8 +92,30 @@ test_cluster_manager() { lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager show 2>&1 | grep "Error: Cluster manager not found" -q lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager join "$token" | grep "Successfully joined cluster manager" -q - echo "==> Stop dummy cluster manager" - lxc exec micro01 -- sh -c "pgrep -f 'openssl s_server -accept 3000' | xargs kill" || true + echo "==> Tunnel config for cluster manager" + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager set reverse_tunnel true + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get reverse_tunnel | grep "true" -q + + echo "==> Wait for daemon to open WebSocket tunnel to /1.0/remote-cluster/ws" + # The reconcile ticker fires every 10 s; wait up to 25 s + for _i in $(seq 1 25); do + if lxc exec micro01 -- test -f ws_hit 2>/dev/null; then + break + fi + sleep 1 + done + if ! lxc exec micro01 -- test -f ws_hit; then + echo "ERROR: WebSocket tunnel was never opened on /1.0/remote-cluster/ws" + exit 1 + fi + + echo "==> WebSocket tunnel confirmed on /1.0/remote-cluster/ws" + + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager unset reverse_tunnel + lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager get reverse_tunnel | grep "false" -q + + echo "==> Stop WebSocket dummy cluster manager server" + lxc exec micro01 -- sh -c "pgrep -f ws_server.py | xargs kill" || true echo "==> Delete cluster manager with force flag" lxc exec micro01 --env TEST_CONSOLE=0 -- microcloud cluster-manager delete 2>&1 | grep "Cannot connect to: localhost:3000" -q