diff --git a/cachekeys/main.go b/cachekeys/main.go index 02e327d..0b8c157 100644 --- a/cachekeys/main.go +++ b/cachekeys/main.go @@ -40,4 +40,7 @@ const ( FeedInstanceActionPendingH = "oc3:h:feed_instance_action_pending" FeedSysreportQ = "oc3:q:feed_sysreport" + + FeedChecksH = "oc3:h:feed_checks" + FeedChecksQ = "oc3:q:feed_checks" ) diff --git a/cdb/db.go b/cdb/db.go index c878add..ae80fa9 100644 --- a/cdb/db.go +++ b/cdb/db.go @@ -25,7 +25,7 @@ type ( Session *Session dbPool *sql.DB - hasTx bool + HasTx bool } // DBLocker combines a database connection and a sync.Locker @@ -65,14 +65,14 @@ func New(dbPool *sql.DB) *DB { } func (oDb *DB) CreateTx(ctx context.Context, opts *sql.TxOptions) error { - if oDb.hasTx { + if oDb.HasTx { return fmt.Errorf("already in a transaction") } if tx, err := oDb.dbPool.BeginTx(ctx, opts); err != nil { return err } else { oDb.DB = tx - oDb.hasTx = true + oDb.HasTx = true return nil } } @@ -112,7 +112,7 @@ func (oDb *DB) CreateSession(ev eventPublisher) { } func (oDb *DB) Commit() error { - if !oDb.hasTx { + if !oDb.HasTx { return nil } tx, ok := oDb.DB.(DBTxer) @@ -122,15 +122,15 @@ func (oDb *DB) Commit() error { if err := tx.Commit(); err != nil { return err } - oDb.hasTx = false + oDb.HasTx = false return nil } func (oDb *DB) Rollback() error { - if !oDb.hasTx { + if !oDb.HasTx { return nil } - defer func() { oDb.hasTx = false }() + defer func() { oDb.HasTx = false }() tx, ok := oDb.DB.(DBTxer) if !ok { return nil diff --git a/cdb/db_checks_live.go b/cdb/db_checks_live.go new file mode 100644 index 0000000..e483b2a --- /dev/null +++ b/cdb/db_checks_live.go @@ -0,0 +1,77 @@ +package cdb + +import ( + "context" + "fmt" + + "strings" + "time" +) + +func (oDb *DB) PurgeChecksLive(ctx context.Context, nodeID string) error { + defer logDuration("PurgeChecksLive", time.Now()) + query := `DELETE FROM checks_live WHERE node_id = ? AND chk_type NOT IN ("netdev_err", "save") AND chk_updated < DATE_SUB(NOW(), INTERVAL 20 SECOND)` + if _, err := oDb.DB.ExecContext(ctx, query, nodeID); err != nil { + return fmt.Errorf("PurgeChecksLive: %w", err) + } + return nil +} + +func (oDb *DB) InsertChecksLive(ctx context.Context, vars []string, vals [][]any) error { + defer logDuration("InsertChecksLive", time.Now()) + if len(vals) == 0 { + return nil + } + + var placeHolders []string + var values []any + + placeHolder := "(" + strings.Repeat("?,", len(vars)-1) + "?)" + + for _, val := range vals { + placeHolders = append(placeHolders, placeHolder) + values = append(values, val...) + } + + query := fmt.Sprintf("INSERT INTO checks_live (%s) VALUES %s", strings.Join(vars, ","), strings.Join(placeHolders, ",")) + + if _, err := oDb.DB.ExecContext(ctx, query, values...); err != nil { + return fmt.Errorf("InsertChecksLive: %w", err) + } + return nil +} + +type CheckLive struct { + NodeID string + SvcID string + ChkType string + ChkInstance string + ChkValue float64 +} + +func (oDb *DB) GetChecksLiveForNode(ctx context.Context, nodeID string) ([]CheckLive, error) { + defer logDuration("GetChecksLiveForNode", time.Now()) + query := `SELECT node_id, svc_id, chk_type, chk_instance, chk_value FROM checks_live WHERE node_id = ?` + rows, err := oDb.DB.QueryContext(ctx, query, nodeID) + if err != nil { + return nil, fmt.Errorf("GetChecksLiveForNode: %w", err) + } + defer rows.Close() + + var result []CheckLive + for rows.Next() { + var r CheckLive + var svcId, chkInstance *string + if err := rows.Scan(&r.NodeID, &svcId, &r.ChkType, &chkInstance, &r.ChkValue); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + if svcId != nil { + r.SvcID = *svcId + } + if chkInstance != nil { + r.ChkInstance = *chkInstance + } + result = append(result, r) + } + return result, nil +} diff --git a/cmd/conf.go b/cmd/conf.go index 2ff7895..b75a04a 100644 --- a/cmd/conf.go +++ b/cmd/conf.go @@ -67,6 +67,7 @@ func setDefaultServerConfig() { func setDefaultSchedulerConfig() { s := sectionScheduler viper.SetDefault(s+".addr", "127.0.0.1:8082") + viper.SetDefault(s+".directories.uploads", "/oc3/uploads") viper.SetDefault(s+".pprof.net.enable", false) viper.SetDefault(s+".pprof.ux.enable", false) viper.SetDefault(s+".pprof.ux.socket", "/var/run/oc3_scheduler_pprof.sock") @@ -139,7 +140,6 @@ func initConfig() error { setDefaultRunnerConfig() viper.SetDefault("git.user_email", "nobody@localhost.localdomain") - viper.SetDefault("sysreport.dir", "uploads/sysreport") // config file viper.SetConfigName("config") diff --git a/feeder/api.yaml b/feeder/api.yaml index 6b44de7..1c5d4d1 100644 --- a/feeder/api.yaml +++ b/feeder/api.yaml @@ -8,6 +8,26 @@ info: version: 3.0.1 paths: + /checks: + post: + description: | + Push checks + operationId: PostChecks + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/PostChecks' + responses: + 200: + description: OK + security: + - basicAuth: [ ] + - bearerAuth: [ ] + tags: + - checks + /daemon/ping: post: description: | @@ -588,6 +608,23 @@ components: type: string format: binary + PostChecks: + type: object + required: + - vars + - vals + properties: + vars: + type: array + items: + type: string + vals: + type: array + items: + type: array + items: {} + + parameters: ObjectPathHeader: name: OC3-ObjectPath diff --git a/feeder/codegen_server_gen.go b/feeder/codegen_server_gen.go index 1503962..790902f 100644 --- a/feeder/codegen_server_gen.go +++ b/feeder/codegen_server_gen.go @@ -21,6 +21,9 @@ import ( // ServerInterface represents all server handlers. type ServerInterface interface { + // (POST /checks) + PostChecks(ctx echo.Context) error + // (POST /daemon/ping) PostDaemonPing(ctx echo.Context) error @@ -63,6 +66,19 @@ type ServerInterfaceWrapper struct { Handler ServerInterface } +// PostChecks converts echo context to params. +func (w *ServerInterfaceWrapper) PostChecks(ctx echo.Context) error { + var err error + + ctx.Set(BasicAuthScopes, []string{}) + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.PostChecks(ctx) + return err +} + // PostDaemonPing converts echo context to params. func (w *ServerInterfaceWrapper) PostDaemonPing(ctx echo.Context) error { var err error @@ -248,6 +264,7 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL Handler: si, } + router.POST(baseURL+"/checks", wrapper.PostChecks) router.POST(baseURL+"/daemon/ping", wrapper.PostDaemonPing) router.POST(baseURL+"/daemon/status", wrapper.PostDaemonStatus) router.POST(baseURL+"/instance/action", wrapper.PostInstanceAction) @@ -266,41 +283,40 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xaW3PbvNH+Kxh+30Uyw0iKnbZT9co59XXbiV3LaS9sjwYCVxTekAACgIqVjP57Byce", - "JFCSHSvzziRXiQlwF/vswweLhb4lhJeCM2BaJeNvicASl6BB2r8o+3cFcjVZMeL+TMbJZ/MkSROGS0jG", - "iTJjaaLIAkpsJumVMM9nnBeAWbJer9NEghKcKbBGX41G5h/CmQamzX+xEAUlWFPOhr8rzsyzxuD/S5gn", - "4+T/hs1Kh25UDS8lnxVQOi8ZKCKpMGaScfIaZ+gKPlegdLJOk1ejlz/C60eGK73gkn6FzLk9/RFu33M5", - "o1kGzPj8048B+JxpkAwXaAJyCRK9k5JL4/8thpKzS8ryM0JAaMgetBwhuQCpqWMLn/0ORE+/UL3glZ4S", - "zuY0NwPdxRRUacTnyE1HhpwK6QXWSMLnikpQ6PJico2GWNChmzT0ttKEaijVts2WrSQNvFZaUpabMP0D", - "LCVeJevmgXsthlhmgUFKY10ppGkJSuNSKPSFFgWaAZIwl6AWkKE5l4jxDBo8J/atX4juQHQXjOugUXZZ", - "Z8QZ2MQG18+3FodlvrQiGELbs/w0mUFO2TYI9jFynhoSIMrQ1fs3p6enf/2AGTcrL7GOoURkZ4W10KYJ", - "sGzbHbDsO5wJrBfRYBUoRTmbVhXN4hNsTnYMTQueR4d7TS5BKp+ebox6AYgLYGpJECkoMI0yrDEKL2wF", - "Zvcky+IsGd+4KNOQ/cZRyKEHfSNqzwm/YAd/MFLwfE4Lw3KPw90Wm1PPQr9Jtb/sLil78NiIwc6KeXlL", - "1adto1kc+x7kS55BER3xatLLEwl53xel6FcwA55944QyfXrSJIsyDTnYLaVS0F5Ya2QJLONyPzw2Q+3F", - "ev/edm0oxJoahGJwnjOlMSNwBYpXksA5m/NteKl/WqtFd/gTrNTu4fgHgIsK9sdqXg+TYyFsCpU8hF/S", - "ImgXfojNXkJoLnjB89V+jz5NFspdmZjUSrNBcazbFWnzYu/aji0wdkWNm1hQH3gG5ovdEU9Nml2Fm/3q", - "D9hT0+TC/u9NXQZs7IhCRKEivCx98bE1lkkxtbvurkH1sM0U2DI6b17A/bTE93F1cKOU7RjVWOag4xNK", - "zqjmErKp9F/7lPCK9czmkixAaYl1PPJ+jcRfWmVYLYezlY6WSYpwAQ9D74EfXYyYl1zpprLfJkqd03gZ", - "SYpKafDVWG+B2J51YJkYVP0A3+1S9omK1CNqhgO0CW+3bjTp6ZNDssAs3/jsopFziaiX1kMA6JdZCUvK", - "KzWtRIY1ZFOsO/w2D1+YmjTm5THvHC0VXrcDgnsy4Y/M2/oNGtNie3VnaFGVmL2QgDM8KwDBvSgws6c5", - "pAQQOqcEaY70girECamkBEbAUFsv4JYJ53Fwy6JyUfOh6/Z6Aei36+vLcHoi5pt7dnP1/s1fTk5f3qVo", - "Au7Y8OfnKAcGRtcyNFs5n1xSc4xR7uxvzlnx1aHY4lqiqakuIIaJWnCp001oVFWWWK42jCNjd4DQuUaT", - "3y4+/uvtLftwcY1cvtBc8rK9MM37l5kiuDd1+C0zIYlKCq5AmUkFJ7igX11WnsEgH6SoUpTl5lVT9C8B", - "+RP5LWOQc03t3L8hBYAisJ4OXj2PpmyTfI42dSIDZjHuTVbqCgSXOsa+Avzx4vB9w55hOnsSZdi2//at", - "2TnzFmJLFZh8wjlEKg5J4tukFaWieKAouD5l9PiR78Bgd3W4R7nd1tGoiw3JW3aOY4ioldIx3WghdVDp", - "F+YfVP21gup6bQ3APS6FIUIyGowGL/dmv18dbeOAVJLq1cSs1rmaYUXJWeWqIxuFbWyYp42vhdbCdVWw", - "BBlmu7/eByb847/XoRltTdjRTRvrdVqfz7z8JGF3mANkIBEWtJXAcXI6GA1GttYQwMxgeOSqOhvF0DWj", - "hiKUR1zpbWG7cr2puibp6wlaYTD5sCJyniXjzQrMYQ5Kv+bZ6ukavl0n625utaxgs59/MjrpM1rPG0Z6", - "wus0ORm92kaopMrKaheZ0MlLfdfRg+1HqUL1ItskS8Y3HXrd3K3Tbx0K3dytDU1xrgxzcW4AvDMmuh4e", - "mdCdWZwEPT9uHr2bp83kRjfa3nSM9r9sJjWXMfvmvmzdoOybe9q6+dg910x6Ko6EOnnYNI7jLHntmr51", - "39fQGaOcLoGFY4kRkh6+hE7HWWhQHoMx3vjhPHlCr5s90Eir3wMXuuc/MfHSRFQRir1j2eMIVm3w653t", - "Y/8hKBalgLvR+KU8TnnqzlQoaeIC9NGep83Znkgw/wsGUDCAjIE9AtRpeh+HI1FXj2VMPEhTLBoibd0X", - "Pn129lUPE81l05s4Gbxs8uJLmwxrPNiTlrqSaP984yaOczNl2P55h4nkmOl8SBUy2pFIj4qybVmkKkJA", - "qXlVFCv0TK0YWUjOeKWeu/LyZL+l5rI4aAp6hjct/ZQSY8rtYRZuDw9TFdu4Ne8o5DralaNsD4HDbceR", - "tKS5THmsgPTFc0TtsLCrlZJNIyeKvcHP4V1P3oFy0xnaBXVZFZoKLPXQqOSL0No9DO3GxWPRrgP5tb8H", - "FoSOUD8F3CRDUsKXIFc9JJg4W8f50PxCvyPvNgSs8Z7fQv2UXOj+uupwMfZ1/yFC3LkEPg5HOi6+u6D7", - "UWrsG36DEKu/K+4i+HfQky84z22vMVbMHAzg3l+8XfzzD0PtAJeoZgUlHq9W29hD1V2/BF1JhrCgrdu3", - "LTT/Uw99F5q76Bi892L8KCwIFnhGC2q76Hdrx0K5DAV5JYtknAyGyfpu/b8AAAD//wUJXAZ7LQAA", + "H4sIAAAAAAAC/+xaW2/bOBb+KwR3H2YBN06T7sP6Le20i+4A02w8s/uQCQyaOpY5lUiWpNx4Cv/3AW+6", + "2JTkXFwM0D4lJqlz+c7Hw6NDfcFUlFJw4Ebj2RcsiSIlGFDuF+P/rUBt51tO/U88w5/sCJ5gTkrAM6zt", + "3ARruoaS2EVmK+34UogCCMe73W6CFWgpuAYn9NX5uf1DBTfAjf2XSFkwSgwTfPq7FtyONQL/rmCFZ/hv", + "08bSqZ/V02sllgWUXksGmiomrRg8w69Jhm7gUwXa4N0Evzp/+TW0/spJZdZCsT8g82ovv4bad0ItWZYB", + "tzr/+XUAfs8NKE4KNAe1AYXeKiWU1f8jgVLwa8bzK0pBGsgeZI5UQoIyzLNFLH8HahafmVmLyiyo4CuW", + "24muMQXTBokV8suRJadGZk0MUvCpYgo0uv4w/wVNiWRTv2gaZE0wM1DqQ5ktWXgSea2NYjy3boYBohTZ", + "4l0z4B9LIZY5YJA2xFQaGVaCNqSUGn1mRYGWgBSsFOg1ZGglFOIigwbPuXvqO6IDiA7BuIs5ypl1Rb2A", + "fWxIPX5gHFH5xiXB6NqI+RO8hJzxQxDcMPKaGhIgxtHNuzeXl5f/+plwYS0viUmhRFXHwjrRTjDw7FAd", + "8OwJyiQx66SzGrRmgi+qimXpBS4mA1OLQuTJ6V6RG1A6hKfro1kDEhK43lBECwbcoIwYguIDB465M8mx", + "OMOzW+/lJEa/URRjGEDf8zpwIhjs4Y9CCpGvWGFZHnC4O2DzJLAwHFLtnd0lZQ8eez64VSktPzL98VBo", + "lsa+B/lSZFAkZ0I26eWJgrxvR2n2B9iJwL4ZZtxcXjTBYtxADu5IqTS0DWvNbIBnQo3D4yLUNjboD7Jr", + "QdHXiUUoBed7rg3hFG5Ai0pReM9X4hBeFkbrbNGd/ghbPTyd3gCkqGDcV/t4XJxyYT9RqWP4pRyCzvBj", + "ZPYSwggpCpFvxzWGMDkohyIxrzPNHsWJaVekzYO9tp06wTiLGjUpp34WGdgdO+BPTZqhws3t+iPO1An+", + "4P57U5cBeyeilEmoqCjLUHwczGVKLtypOzSpH3aYAt8k160KuF+U5D6dHfws4wOzhqgcTHpBKTgzQkG2", + "UGG3L6ioeM9qoegatFHEpD3vz5Hkc6sMq9PhcmuSZZKmQsLD0HvgpksR81po82YNNMXMDSm64Yz/pEzZ", + "+70h6kFU2LPXPT7xFvSZ3byQHJpeUzFd/dKi0gZCEdlb17ZXHVndxsPoCN3tCvyZausTpjoPaOPecLpr", + "wtOXxema8HwvWyQ9FwqxcCIcA0D/6aBgw0SlF5XMiIFsQUxnW9rBF7aUTml5zDMnC0U4biKCI5EIb/oH", + "ATBwbw5tu0LrqiT8hQKSkWUBCO5lQbh7BUVaAmUrRpERyKyZRoLSSingFCyxzRp+49LrO/tt3A9nQcrm", + "+VbfgBTKJA5LKCBU08enSVeyd1Iw48R1u0aADsqChJSpktCPJIfEAato+lRwZC6KB5LJt+WS1XY+gMFw", + "MTSy433KaVjpXAqSveIUInqrTYpvLaSOqnTi+qOKnZZTe0dYMwH3pJSWCPj87Pzs5Wj0+3eVe0+mlWJm", + "O7fWelVLohm9qnwx4Lxw7/F2tNG1Nkb6JgJRoOJq/+tdZMJ//v9L7L06EW52X8ZuN6lfRwwzzrGYVVYA", + "GShEJGsFcIYvnd/2jJLA7aQfOse+iHFeTGlTDAidyBDXlV4jv8jtcYu2Sw/vMzvblBMeTdDmtci2z9e5", + "bBTsuhEzqoL9pvSF75l2PfjwUyeEeHbbCd7t3W7ypROg27udJQHJteVFAOjOypj6TtVUxiIkCdmNb1zV", + "J39fw7AHz1adczpMW0qOwvWiT2i9bppoGO8m+OL81SFCJdOa8XwPmdjmm4SWZAA7zDKNaiOfEk+SWwA7", + "4WyaXI8I6GAUQzl06jgGNc8byb1WtbsGOR9/2C5qbmrG1r5sXa+Mrb1sXYsMr7WLnosjsRqdNl3lNEte", + "+45w3RS2dCYoZxvgsfi3abeHL7ENchW7l6dgTBB+PE+eUet+gzRxDxCAi631b5h4EyyrBMXe8uxxBKv2", + "+PXWNbn/EhRLUsBfd3zPPD7z1G2rWACmE9Cv7q3VvkFTBfa/KABFAcgKGElAnY74aTiSVPVYxqSdtKW1", + "JdLBZeLzR2esepgboZoOwMXZyyYuobSxb/hnI2GpK4n2tx23aZybJdP2tx/Wk1OG8yFVyPlAIAMq2vVs", + "ka4oBa1XVVFs0Q96y+laCS4q/Q9fXl6MS2pukmNOQT+QfUnfZIqx5fY0i1eLx2UV1x61z2jk292Vp2wP", + "geNVyIlySXPT8tgE0ufPCXOHg11vtWraXumXb6GNx7tePIBy00cbgrqsCsMkUWZqs+SL2EA9Du1GxWPR", + "rh35fr5HFsT+WT8F/CJLUio2oLY9JJh7WafZaMHQJ8TduUAMGflQ6pvkQvfTq+OTcaj7j0nEnRvi03Ck", + "o+LJBd3XysahPXoWfQ0XyV0E/w1m/pnkuevMpoqZowEc/Rzuw09/GWpHuGS1LBgNeLWa7AGqrv0KTKU4", + "IpK17rgO0PxfPfUkNIfoGLX3YvwoLCiRZMkK5u4c7naehWoTC/JKFXiGz6Z4d7f7MwAA//8uCkmKmC0A", + "AA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/feeder/codegen_type_gen.go b/feeder/codegen_type_gen.go index 63d5a80..2f0de67 100644 --- a/feeder/codegen_type_gen.go +++ b/feeder/codegen_type_gen.go @@ -97,6 +97,12 @@ type ObjectConfig struct { Topology *string `json:"topology,omitempty"` } +// PostChecks defines model for PostChecks. +type PostChecks struct { + Vals [][]interface{} `json:"vals"` + Vars []string `json:"vars"` +} + // PostDaemonPing defines model for PostDaemonPing. type PostDaemonPing struct { // Nodes list of cluster node @@ -185,6 +191,9 @@ type PostInstanceStatusParams struct { Sync *InQuerySync `form:"sync,omitempty" json:"sync,omitempty"` } +// PostChecksJSONRequestBody defines body for PostChecks for application/json ContentType. +type PostChecksJSONRequestBody = PostChecks + // PostDaemonPingJSONRequestBody defines body for PostDaemonPing for application/json ContentType. type PostDaemonPingJSONRequestBody = PostDaemonPing diff --git a/feeder/handlers/post_checks.go b/feeder/handlers/post_checks.go new file mode 100644 index 0000000..4f37dd6 --- /dev/null +++ b/feeder/handlers/post_checks.go @@ -0,0 +1,54 @@ +package feederhandlers + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/labstack/echo/v4" + + "github.com/opensvc/oc3/cachekeys" + "github.com/opensvc/oc3/feeder" + "github.com/opensvc/oc3/util/logkey" +) + +func (a *Api) PostChecks(c echo.Context) error { + nodeID, log := getNodeIDAndLogger(c, "PostChecks") + if nodeID == "" { + return JSONNodeAuthProblem(c) + } + + var payload feeder.PostChecks + if err := c.Bind(&payload); err != nil { + return JSONProblemf(c, http.StatusBadRequest, "Unable to parse body: %s", err) + } + + val := []any{payload.Vars, payload.Vals} + valBytes, err := json.Marshal(val) + if err != nil { + log.Error("json encode body", logkey.Error, err) + return JSONProblemf(c, http.StatusInternalServerError, "Unable to marshal val: %s", err) + } + + ctx := c.Request().Context() + if err := a.Redis.HSet(ctx, cachekeys.FeedChecksH, nodeID, string(valBytes)).Err(); err != nil { + log.Error("HSet FeedChecksH", logkey.Error, err) + return JSONProblemf(c, http.StatusInternalServerError, "Unable to HSet check: %s", err) + } + + if err := a.Redis.LRem(ctx, cachekeys.FeedChecksQ, 0, nodeID).Err(); err != nil { + log.Error("LRem FeedChecksQ", logkey.Error, err) + return JSONProblemf(c, http.StatusInternalServerError, "Unable to LRem check: %s", err) + } + + if err := a.Redis.LPush(ctx, cachekeys.FeedChecksQ, nodeID).Err(); err != nil { + log.Error("LPush FeedChecksQ", logkey.Error, err) + return JSONProblemf(c, http.StatusInternalServerError, "Unable to LPush check: %s", err) + } + + msg := fmt.Sprintf("Checks Vars: %v Vals: %v", payload.Vars, payload.Vals) + log.Info(msg) + fmt.Println(msg) + + return c.NoContent(http.StatusOK) +} diff --git a/feeder/handlers/post_feed_sysreport.go b/feeder/handlers/post_feed_sysreport.go index c54b1e4..102c5d4 100644 --- a/feeder/handlers/post_feed_sysreport.go +++ b/feeder/handlers/post_feed_sysreport.go @@ -40,7 +40,8 @@ func (a *Api) PostNodeSysReport(ctx echo.Context) error { } payload.File.InitFromMultipart(file) - sysreportDir := viper.GetString("sysreport.dir") + uploadDir := viper.GetString("scheduler.directories.uploads") + sysreportDir := filepath.Join(uploadDir, "sysreport") if err := os.MkdirAll(sysreportDir, 0755); err != nil { log.Error("can't create sysreport dir", logkey.Error, err) return JSONProblem(ctx, http.StatusInternalServerError, "can't create sysreport dir") diff --git a/scheduler/task.go b/scheduler/task.go index 27e38c2..7807db3 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -48,6 +48,7 @@ const ( var ( Tasks = TaskList{ + TaskChecks, TaskSysreport, TaskRefreshBActionErrors, TaskAlertUpdateActionErrors, @@ -154,6 +155,7 @@ func (t *Task) DBX(ctx context.Context) (*cdb.DB, error) { Session: cdb.NewSession(tx, t.ev), DB: tx, DBLck: cdb.InitDbLocker(t.db), + HasTx: true, }, nil } diff --git a/scheduler/task_checks.go b/scheduler/task_checks.go new file mode 100644 index 0000000..6c07d13 --- /dev/null +++ b/scheduler/task_checks.go @@ -0,0 +1,207 @@ +package scheduler + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "time" + + "github.com/go-graphite/go-whisper" + "github.com/go-redis/redis/v8" + + "github.com/opensvc/oc3/cachekeys" + "github.com/opensvc/oc3/timeseries" +) + +var TaskChecks = Task{ + name: "checks", + fn: taskChecks, + period: time.Minute, + timeout: 10 * time.Minute, +} + +func taskChecks(ctx context.Context, task *Task) error { + if task.Redis == nil { + return fmt.Errorf("redis client is nil") + } + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + result, err := task.Redis.LPop(ctx, cachekeys.FeedChecksQ).Result() + if err == redis.Nil { + task.Infof("checks: queue is empty") + break + } + if err != nil { + return fmt.Errorf("lpop: %w", err) + } + + task.Infof("checks: processing %s", result) + err = processCheck(ctx, task, result) + if err != nil { + task.Errorf("process check %s: %s", result, err) + } + } + return nil +} + +func processCheck(ctx context.Context, task *Task, keyStr string) error { + nodeID := keyStr + if nodeID == "" { + return fmt.Errorf("empty key") + } + + valStr, err := task.Redis.HGet(ctx, cachekeys.FeedChecksH, keyStr).Result() + if err != nil { + return fmt.Errorf("hget %s: %w", keyStr, err) + } + + var data []json.RawMessage + if err := json.Unmarshal([]byte(valStr), &data); err != nil { + return fmt.Errorf("unmarshal val: %w", err) + } + if len(data) != 2 { + return fmt.Errorf("unexpected data length: %d", len(data)) + } + + var vars []string + if err := json.Unmarshal(data[0], &vars); err != nil { + return fmt.Errorf("unmarshal vars: %w", err) + } + var vals [][]any + if err := json.Unmarshal(data[1], &vals); err != nil { + return fmt.Errorf("unmarshal vals: %w", err) + } + + odb, err := task.DBX(ctx) + if err != nil { + return err + } + defer odb.Rollback() + + // initial purge + task.Infof("checks: purging live checks for node %s", nodeID) + if err := odb.PurgeChecksLive(ctx, nodeID); err != nil { + return err + } + + if len(vals) == 0 { + task.Infof("checks: no values to insert for node %s", nodeID) + return odb.Commit() + } + + // Column indices + idxMap := make(map[string]int) + for i, v := range vars { + idxMap[v] = i + } + + now := time.Now() + + idxUpdated, hasUpdated := idxMap["chk_updated"] + idxNodeID, hasNodeID := idxMap["node_id"] + + if !hasUpdated { + vars = append(vars, "chk_updated") + for i := range vals { + vals[i] = append(vals[i], now) + } + } + if !hasNodeID { + vars = append(vars, "node_id") + for i := range vals { + vals[i] = append(vals[i], nodeID) + } + } + // refresh map + for i, v := range vars { + idxMap[v] = i + } + idxUpdated, hasUpdated = idxMap["chk_updated"] + idxNodeID, hasNodeID = idxMap["node_id"] + + // Filter and update vals + validVals := make([][]any, 0, len(vals)) + + for i := range vals { + row := vals[i] + if hasUpdated && idxUpdated < len(row) { + row[idxUpdated] = now + } + if hasNodeID && idxNodeID < len(row) { + row[idxNodeID] = nodeID + } + validVals = append(validVals, row) + } + + task.Infof("checks: inserting %d rows for node %s", len(validVals), nodeID) + // Insert in batches + batchSize := 100 + for i := 0; i < len(validVals); i += batchSize { + end := i + batchSize + if end > len(validVals) { + end = len(validVals) + } + if err := odb.InsertChecksLive(ctx, vars, validVals[i:end]); err != nil { + task.Errorf("insert checks batch %d-%d: %s", i, end, err) + continue + } + } + + if err := odb.PurgeChecksLive(ctx, nodeID); err != nil { + return err + } + + if err := odb.Commit(); err != nil { + return err + } + + // Update timeseries + rodb, err := task.DBXRO(ctx) + if err != nil { + return err + } + defer rodb.Rollback() + + checks, err := rodb.GetChecksLiveForNode(ctx, nodeID) + if err != nil { + return fmt.Errorf("get checks: %w", err) + } + + for _, check := range checks { + instance := check.ChkInstance + if instance != "" { + instance = base64.RawURLEncoding.EncodeToString([]byte(instance)) + } + + path, err := MakeWSPFilename("nodes/%s/checks/%s:%s:%s", + nodeID, + check.SvcID, + check.ChkType, + instance) + + if err != nil { + task.Errorf("make wsp filename: %s", err) + continue + } + + if err := timeseries.Update(path, check.ChkValue, int(now.Unix()), timeseries.DefaultRetentions, whisper.Average, 0.5); err != nil { + task.Errorf("timeseries update %s: %s", path, err) + } else { + task.Infof("timeseries updated for %s", path) + } + } + + // Update dashboard alerts + if task.ev != nil { + if err := task.ev.EventPublish("checks_change", map[string]any{"node_id": nodeID}); err != nil { + task.Errorf("event publish: %s", err) + } + } + + return nil +} diff --git a/scheduler/task_sysreport.go b/scheduler/task_sysreport.go index 7a77356..41892f8 100644 --- a/scheduler/task_sysreport.go +++ b/scheduler/task_sysreport.go @@ -62,8 +62,8 @@ func taskSysreport(ctx context.Context, task *Task) error { continue } - sysreportDir := viper.GetString("sysreport.dir") - nodeDir := filepath.Join(sysreportDir, data.NodeID) + sysreportDir := viper.GetString("scheduler.directories.uploads") + nodeDir := filepath.Join(sysreportDir, "sysreport", data.NodeID) if err := git.Commit(nodeDir); err != nil { task.Errorf("git commit: %s", err) }