Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions api/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"
"github.com/canonical/microcluster/v3/microcluster/types"
Expand Down Expand Up @@ -49,7 +50,12 @@ func clusterManagerGet(state types.State, r *http.Request) types.Response {
return types.SmartError(err)
}

clusterManager, clusterManagerConfig, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}

clusterManagerConfig, err := database.LoadClusterManagerConfigs(state, r.Context(), clusterManager.ID)
if err != nil {
return types.SmartError(err)
}
Expand Down Expand Up @@ -97,7 +103,7 @@ func clusterManagerPost(sh *service.Handler) func(state types.State, r *http.Req
}

// ensure cluster manager is not already configured
existingClusterManager, _, err := database.LoadClusterManager(state, r.Context(), args.Name)
existingClusterManager, err := database.LoadClusterManager(state, r.Context(), args.Name)
if err != nil {
if api.StatusErrorCheck(err, http.StatusNotFound) {
// ignore, this is the expected path
Expand Down Expand Up @@ -163,7 +169,7 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response {
return types.SmartError(err)
}

clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}
Expand Down Expand Up @@ -191,13 +197,21 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response {
}
}

if args.UpdateInterval != nil {
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateInterval)
if args.UpdateIntervalSeconds != nil {
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateIntervalSeconds)
if err != nil {
return types.SmartError(err)
}
}

if args.ReverseTunnel != nil {
reverseTunnelValue := strconv.FormatBool(*args.ReverseTunnel)
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.ReverseTunnelKey, reverseTunnelValue)
if err != nil {
return response.SmartError(err)
}
}

return types.SyncResponse(true, nil)
}

Expand All @@ -215,7 +229,7 @@ func clusterManagerDelete(sh *service.Handler) func(state types.State, r *http.R
return types.SmartError(err)
}

clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}
Expand Down
35 changes: 34 additions & 1 deletion api/types/cluster_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package types

import (
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
)

// ClusterManagersPost represents the cluster manager configuration when receiving a POST request in MicroCloud.
Expand Down Expand Up @@ -54,7 +58,11 @@ type ClusterManagerPut struct {

// Interval in seconds to send status messages to the cluster manager
// Example: 60
UpdateInterval *string `json:"update_interval" yaml:"update_interval"`
UpdateIntervalSeconds *string `json:"update_interval_seconds" yaml:"update_interval_seconds"`

// Enables or disables the reverse tunnel to the cluster manager
// Example: true, false
ReverseTunnel *bool `json:"reverse_tunnel" yaml:"reverse_tunnel"`
}

// StatusDistribution represents the distribution of items.
Expand Down Expand Up @@ -100,3 +108,28 @@ type ClusterManagerJoin struct {
ClusterCertificate string `json:"cluster_certificate" yaml:"cluster_certificate"`
Token string `json:"token" yaml:"token"`
}

// ClusterManagerTunnel represents the tunnel connection to the cluster manager.
type ClusterManagerTunnel struct {
Mu sync.RWMutex
WsConn *websocket.Conn
Wg sync.WaitGroup
}

// ClusterManagerTunnelRequest represents the request received through the tunnel.
type ClusterManagerTunnelRequest struct {
UUID string `json:"uuid"`
Method string `json:"method"`
Path string `json:"path"`
Headers http.Header `json:"headers"`
Body []byte `json:"body"`
}

// ClusterManagerTunnelResponse represents the response sent through the tunnel.
type ClusterManagerTunnelResponse struct {
UUID string `json:"uuid"`
Status int `json:"status"`
Headers http.Header `json:"headers"`
Cookies []*http.Cookie `json:"cookies"`
Body []byte `json:"body"`
}
40 changes: 35 additions & 5 deletions client/cluster_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/version"
"github.com/gorilla/websocket"

"github.com/canonical/microcloud/microcloud/api/types"
"github.com/canonical/microcloud/microcloud/database"
Expand Down Expand Up @@ -84,6 +86,26 @@ func (c *ClusterManagerClient) Delete(clusterCert *shared.CertInfo) error {
return err
}

// ConnectTunnel establishes a WebSocket connection to the cluster manager for reverse tunneling.
func (c *ClusterManagerClient) ConnectTunnel(clusterCert *shared.CertInfo) (*websocket.Conn, error) {
tlsConfig, address, err := c.getTlsConfig(clusterCert)
if err != nil {
return nil, fmt.Errorf("Failed to get TLS config: %w", err)
}

dialer := websocket.Dialer{
TLSClientConfig: tlsConfig,
}

u := url.URL{Scheme: "wss", Host: address, Path: "/1.0/remote-cluster/ws"}
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
return nil, err
}

return conn, nil
}

func (c *ClusterManagerClient) craftRequest(method string, path string, reqBody io.Reader) (*http.Request, error) {
url := "https://remote" + path // remote is a placeholder, real address will be set in sendRequest
req, err := http.NewRequest(method, url, reqBody)
Expand Down Expand Up @@ -122,7 +144,19 @@ func (c *ClusterManagerClient) sendRequest(clusterCert *shared.CertInfo, req *ht

func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*http.Client, string, error) {
client := &http.Client{}
tlsConfig, address, err := c.getTlsConfig(clusterCert)
if err != nil {
return nil, "", fmt.Errorf("Failed to get TLS config: %w", err)
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}

return client, address, nil
}

func (c *ClusterManagerClient) getTlsConfig(clusterCert *shared.CertInfo) (*tls.Config, string, error) {
var address string
var remoteCert *x509.Certificate
var err error
Expand Down Expand Up @@ -171,9 +205,5 @@ func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*htt
return &cert, nil
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}

return client, address, nil
return tlsConfig, address, nil
}
52 changes: 33 additions & 19 deletions cmd/microcloud/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ func (c *cmdClusterManagerGet) command() *cobra.Command {
cmd.Use = "get"
cmd.Short = "Get specific cluster manager configuration by key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager get addresses
microcloud cluster-manager get certificate-fingerprint
microcloud cluster-manager get update-interval-seconds
microcloud cluster-manager get status-last-success-time
microcloud cluster-manager get status-last-error-time
microcloud cluster-manager get status-last-error-response`)
microcloud cluster-manager get certificate_fingerprint
microcloud cluster-manager get update_interval_seconds
microcloud cluster-manager get status_last_success_time
microcloud cluster-manager get status_last_error_time
microcloud cluster-manager get status_last_error_response`)

cmd.RunE = c.run

Expand Down Expand Up @@ -233,20 +233,26 @@ func (c *cmdClusterManagerGet) run(_ *cobra.Command, args []string) error {
switch key {
case "addresses":
fmt.Printf("%s\n", strings.Join(clusterManager.Addresses, ", "))
case "certificate-fingerprint":
case "certificate_fingerprint":
fmt.Printf("%s\n", clusterManager.CertificateFingerprint)
case "update-interval-seconds":
case "update_interval_seconds":
value, ok := clusterManager.Config[database.UpdateIntervalSecondsKey]
if ok {
fmt.Printf("%s\n", value)
}

case "status-last-success-time":
case "status_last_success_time":
fmt.Printf("%s\n", clusterManager.StatusLastSuccessTime)
case "status-last-error-time":
case "status_last_error_time":
fmt.Printf("%s\n", clusterManager.StatusLastErrorTime)
case "status-last-error-response":
case "status_last_error_response":
fmt.Printf("%s\n", clusterManager.StatusLastErrorResponse)
case "reverse_tunnel":
value, ok := clusterManager.Config[database.ReverseTunnelKey]
if ok {
fmt.Printf("%s\n", value)
}

default:
return errors.New("Invalid key")
}
Expand All @@ -265,8 +271,9 @@ func (c *cmdClusterManagerSet) command() *cobra.Command {
cmd.Use = "set"
cmd.Short = "Set specific cluster manager configuration key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager set addresses example.com:8443
microcloud cluster-manager set certificate-fingerprint abababababababababababababababababababababababababababababababab
microcloud cluster-manager set update-interval-seconds 50`)
microcloud cluster-manager set certificate_fingerprint abababababababababababababababababababababababababababababababab
microcloud cluster-manager set update_interval_seconds 50
microcloud cluster-manager set reverse_tunnel true`)

cmd.RunE = c.run

Expand All @@ -291,10 +298,16 @@ func (c *cmdClusterManagerSet) run(_ *cobra.Command, args []string) error {
switch key {
case "addresses":
payload.Addresses = []string{value}
case "certificate-fingerprint":
case "certificate_fingerprint":
payload.CertificateFingerprint = &value
case "update-interval-seconds":
payload.UpdateInterval = &value
case "update_interval_seconds":
payload.UpdateIntervalSeconds = &value
case "reverse_tunnel":
if value != "true" && value != "false" {
return errors.New("Invalid value for reverse_tunnel, expected 'true' or 'false'")
}

payload.ReverseTunnel = new(value == "true")
default:
return errors.New("Invalid key")
}
Expand All @@ -317,7 +330,7 @@ func (c *cmdClusterManagerUnset) command() *cobra.Command {
cmd := &cobra.Command{}
cmd.Use = "unset"
cmd.Short = "Unset specific cluster manager configuration key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update-interval-seconds`)
cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update_interval_seconds`)

cmd.RunE = c.run

Expand All @@ -339,9 +352,10 @@ func (c *cmdClusterManagerUnset) run(_ *cobra.Command, args []string) error {
payload := types.ClusterManagerPut{}

switch key {
case "update-interval-seconds":
emptyString := ""
payload.UpdateInterval = &emptyString
case "update_interval_seconds":
payload.UpdateIntervalSeconds = new("")
case "reverse_tunnel":
payload.ReverseTunnel = new(false)
default:
return errors.New("Invalid key")
}
Expand Down
36 changes: 19 additions & 17 deletions cmd/microcloudd/cluster_manager_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
microTypes "github.com/canonical/microcluster/v3/microcluster/types"
"golang.org/x/sync/errgroup"

"github.com/canonical/microcloud/microcloud/api/types"
"github.com/canonical/microcloud/microcloud/client"
Expand All @@ -20,7 +21,8 @@ import (

// SendClusterManagerStatusMessageTask starts a go routine, that sends periodic status messages to cluster manager.
func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handler, s microTypes.State) {
go func(ctx context.Context, sh *service.Handler, s microTypes.State) {
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
ticker := time.NewTicker(database.UpdateIntervalDefaultSeconds * time.Second)
defer ticker.Stop()

Expand All @@ -33,15 +35,22 @@ func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handle
}

case <-ctx.Done():
return // exit the loop and close the go routine
return nil // exit the loop and close the go routine
}
}
}(ctx, sh, s)
})

go func() {
err := g.Wait()
if err != nil {
logger.Error("Failed to wait for send cluster manager status routine", logger.Ctx{"err": err})
}
}()
}

func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s microTypes.State) time.Duration {
logger.Debug("Starting sendClusterManagerStatusMessage")
var nextUpdate time.Duration = 0
var nextUpdate = time.Duration(database.UpdateIntervalDefaultSeconds) * time.Second

cloud := sh.Services[types.MicroCloud].(*service.CloudService)
isInitialized, err := cloud.IsInitialized(ctx)
Expand All @@ -55,7 +64,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
return nextUpdate
}

clusterManager, clusterManagerConfig, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName)
clusterManager, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName)
if err != nil {
if err.Error() == "Cluster manager not found" {
logger.Debug("Cluster manager not configured, skipping status message")
Expand All @@ -66,17 +75,10 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
return nextUpdate
}

for _, config := range clusterManagerConfig {
if config.Key == database.UpdateIntervalSecondsKey {
interval, err := time.ParseDuration(config.Value + "s")
if err != nil {
logger.Error("Failed to parse update interval", logger.Ctx{"err": err})
return nextUpdate
}

nextUpdate = interval
break
}
nextUpdate, err = database.LoadClusterManagerUpdateIntervalSeconds(s, ctx, clusterManager.ID)
if err != nil {
logger.Error("Failed to fetch cluster manager update interval", logger.Ctx{"err": err})
return nextUpdate
}

leaderClient, err := s.Database().Leader(ctx)
Expand All @@ -99,7 +101,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
payload := types.ClusterManagerPostStatus{}

lxdService := sh.Services[types.LXD].(*service.LXDService)
lxdClient, err := lxdService.Client(context.Background())
lxdClient, err := lxdService.Client(ctx)
if err != nil {
logger.Error("Failed to get LXD client", logger.Ctx{"err": err})
return nextUpdate
Expand Down
Loading
Loading