Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cachekeys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ const (
FeedInstanceActionPendingH = "oc3:h:feed_instance_action_pending"

FeedSysreportQ = "oc3:q:feed_sysreport"

FeedChecksH = "oc3:h:feed_checks"
FeedChecksQ = "oc3:q:feed_checks"
)
14 changes: 7 additions & 7 deletions cdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
Session *Session

dbPool *sql.DB
hasTx bool
HasTx bool
}

// DBLocker combines a database connection and a sync.Locker
Expand Down Expand Up @@ -65,14 +65,14 @@ func New(dbPool *sql.DB) *DB {
}

func (oDb *DB) CreateTx(ctx context.Context, opts *sql.TxOptions) error {
if oDb.hasTx {
if oDb.HasTx {
return fmt.Errorf("already in a transaction")
}
if tx, err := oDb.dbPool.BeginTx(ctx, opts); err != nil {
return err
} else {
oDb.DB = tx
oDb.hasTx = true
oDb.HasTx = true
return nil
}
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (oDb *DB) CreateSession(ev eventPublisher) {
}

func (oDb *DB) Commit() error {
if !oDb.hasTx {
if !oDb.HasTx {
return nil
}
tx, ok := oDb.DB.(DBTxer)
Expand All @@ -122,15 +122,15 @@ func (oDb *DB) Commit() error {
if err := tx.Commit(); err != nil {
return err
}
oDb.hasTx = false
oDb.HasTx = false
return nil
}

func (oDb *DB) Rollback() error {
if !oDb.hasTx {
if !oDb.HasTx {
return nil
}
defer func() { oDb.hasTx = false }()
defer func() { oDb.HasTx = false }()
tx, ok := oDb.DB.(DBTxer)
if !ok {
return nil
Expand Down
77 changes: 77 additions & 0 deletions cdb/db_checks_live.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cdb

import (
"context"
"fmt"

"strings"
"time"
)

func (oDb *DB) PurgeChecksLive(ctx context.Context, nodeID string) error {
defer logDuration("PurgeChecksLive", time.Now())
query := `DELETE FROM checks_live WHERE node_id = ? AND chk_type NOT IN ("netdev_err", "save") AND chk_updated < DATE_SUB(NOW(), INTERVAL 20 SECOND)`
if _, err := oDb.DB.ExecContext(ctx, query, nodeID); err != nil {
return fmt.Errorf("PurgeChecksLive: %w", err)
}
return nil
}

func (oDb *DB) InsertChecksLive(ctx context.Context, vars []string, vals [][]any) error {
defer logDuration("InsertChecksLive", time.Now())
if len(vals) == 0 {
return nil
}

var placeHolders []string
var values []any

placeHolder := "(" + strings.Repeat("?,", len(vars)-1) + "?)"

for _, val := range vals {
placeHolders = append(placeHolders, placeHolder)
values = append(values, val...)
}

query := fmt.Sprintf("INSERT INTO checks_live (%s) VALUES %s", strings.Join(vars, ","), strings.Join(placeHolders, ","))

if _, err := oDb.DB.ExecContext(ctx, query, values...); err != nil {
return fmt.Errorf("InsertChecksLive: %w", err)
}
return nil
}

type CheckLive struct {
NodeID string
SvcID string
ChkType string
ChkInstance string
ChkValue float64
}

func (oDb *DB) GetChecksLiveForNode(ctx context.Context, nodeID string) ([]CheckLive, error) {
defer logDuration("GetChecksLiveForNode", time.Now())
query := `SELECT node_id, svc_id, chk_type, chk_instance, chk_value FROM checks_live WHERE node_id = ?`
rows, err := oDb.DB.QueryContext(ctx, query, nodeID)
if err != nil {
return nil, fmt.Errorf("GetChecksLiveForNode: %w", err)
}
defer rows.Close()

var result []CheckLive
for rows.Next() {
var r CheckLive
var svcId, chkInstance *string
if err := rows.Scan(&r.NodeID, &svcId, &r.ChkType, &chkInstance, &r.ChkValue); err != nil {
return nil, fmt.Errorf("scan: %w", err)
}
if svcId != nil {
r.SvcID = *svcId
}
if chkInstance != nil {
r.ChkInstance = *chkInstance
}
result = append(result, r)
}
return result, nil
}
2 changes: 1 addition & 1 deletion cmd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func setDefaultServerConfig() {
func setDefaultSchedulerConfig() {
s := sectionScheduler
viper.SetDefault(s+".addr", "127.0.0.1:8082")
viper.SetDefault(s+".directories.uploads", "/oc3/uploads")
viper.SetDefault(s+".pprof.net.enable", false)
viper.SetDefault(s+".pprof.ux.enable", false)
viper.SetDefault(s+".pprof.ux.socket", "/var/run/oc3_scheduler_pprof.sock")
Expand Down Expand Up @@ -139,7 +140,6 @@ func initConfig() error {
setDefaultRunnerConfig()

viper.SetDefault("git.user_email", "nobody@localhost.localdomain")
viper.SetDefault("sysreport.dir", "uploads/sysreport")

// config file
viper.SetConfigName("config")
Expand Down
37 changes: 37 additions & 0 deletions feeder/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ info:
version: 3.0.1

paths:
/checks:
post:
description: |
Push checks
operationId: PostChecks
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/PostChecks'
responses:
200:
description: OK
security:
- basicAuth: [ ]
- bearerAuth: [ ]
tags:
- checks

/daemon/ping:
post:
description: |
Expand Down Expand Up @@ -588,6 +608,23 @@ components:
type: string
format: binary

PostChecks:
type: object
required:
- vars
- vals
properties:
vars:
type: array
items:
type: string
vals:
type: array
items:
type: array
items: {}


parameters:
ObjectPathHeader:
name: OC3-ObjectPath
Expand Down
86 changes: 51 additions & 35 deletions feeder/codegen_server_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions feeder/codegen_type_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions feeder/handlers/post_checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package feederhandlers

import (
"encoding/json"
"fmt"
"net/http"

"github.com/labstack/echo/v4"

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/feeder"
"github.com/opensvc/oc3/util/logkey"
)

func (a *Api) PostChecks(c echo.Context) error {
nodeID, log := getNodeIDAndLogger(c, "PostChecks")
if nodeID == "" {
return JSONNodeAuthProblem(c)
}

var payload feeder.PostChecks
if err := c.Bind(&payload); err != nil {
return JSONProblemf(c, http.StatusBadRequest, "Unable to parse body: %s", err)
}

val := []any{payload.Vars, payload.Vals}
valBytes, err := json.Marshal(val)
if err != nil {
log.Error("json encode body", logkey.Error, err)
return JSONProblemf(c, http.StatusInternalServerError, "Unable to marshal val: %s", err)
}

ctx := c.Request().Context()
if err := a.Redis.HSet(ctx, cachekeys.FeedChecksH, nodeID, string(valBytes)).Err(); err != nil {
log.Error("HSet FeedChecksH", logkey.Error, err)
return JSONProblemf(c, http.StatusInternalServerError, "Unable to HSet check: %s", err)
}

if err := a.Redis.LRem(ctx, cachekeys.FeedChecksQ, 0, nodeID).Err(); err != nil {
log.Error("LRem FeedChecksQ", logkey.Error, err)
return JSONProblemf(c, http.StatusInternalServerError, "Unable to LRem check: %s", err)
}

if err := a.Redis.LPush(ctx, cachekeys.FeedChecksQ, nodeID).Err(); err != nil {
log.Error("LPush FeedChecksQ", logkey.Error, err)
return JSONProblemf(c, http.StatusInternalServerError, "Unable to LPush check: %s", err)
}

msg := fmt.Sprintf("Checks Vars: %v Vals: %v", payload.Vars, payload.Vals)
log.Info(msg)
fmt.Println(msg)

return c.NoContent(http.StatusOK)
}
Loading
Loading