Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions internal/agent/auditnl/emit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package auditnl

import (
"fmt"
"syscall"

libaudit "github.com/elastic/go-libaudit/v2"
"github.com/elastic/go-libaudit/v2/auparse"
)

// Emitter writes Kensa transaction-phase records into the local auditd via
// an AUDIT_USER (type 1005) netlink message — the observability the FedRAMP
// reviewer flagged: every transaction phase produces an event in the
// host's audit log, not just in Kensa's own evidence file. It holds one
// netlink socket for its lifetime.
//
// The engine runs on the controller, so events land in the controller
// (operator) host's auditd — the host kensa's actions originate from.
// Opening the socket needs CAP_AUDIT_WRITE (root); when it can't be opened
// the Emitter degrades to a silent no-op (client == nil) so emission is
// always safe to call and NEVER affects a transaction outcome.
type Emitter struct {
client *libaudit.AuditClient
}

// NewEmitter opens an AUDIT netlink socket for emitting user records. It
// NEVER returns an error: on failure (no privilege / no audit) it returns
// a no-op Emitter, so the engine can always hold a usable, non-nil emitter
// and emission can never fail a transaction.
func NewEmitter() *Emitter {
c, err := libaudit.NewAuditClient(nil)
if err != nil {
return &Emitter{}
}
return &Emitter{client: c}
}

// EmitPhase writes one transaction-phase record. Best-effort and
// non-blocking: it uses SendNoWait (no ACK wait) and swallows every error,
// so a slow or unavailable audit subsystem can never delay or fail a
// transaction. A no-op when the socket could not be opened.
func (e *Emitter) EmitPhase(txnID, phase string, ok bool) {
if e == nil || e.client == nil {
return
}
msg := syscall.NetlinkMessage{
Header: syscall.NlMsghdr{
Type: uint16(auparse.AUDIT_USER),
Flags: syscall.NLM_F_REQUEST | syscall.NLM_F_ACK,
},
Data: []byte(formatPhaseMessage(txnID, phase, ok)),
}
_, _ = e.client.Netlink.SendNoWait(msg)
}

// Close releases the netlink socket. Safe on a no-op emitter.
func (e *Emitter) Close() error {
if e == nil || e.client == nil {
return nil
}
return e.client.Close()
}

// formatPhaseMessage renders the audit record body as auditd-style
// key=value text. Pure (no IO) so it is unit-testable without a socket.
func formatPhaseMessage(txnID, phase string, ok bool) string {
result := "fail"
if ok {
result = "ok"
}
return fmt.Sprintf("op=kensa_transaction phase=%s txn=%s result=%s", phase, txnID, result)
}
48 changes: 48 additions & 0 deletions internal/agent/auditnl/emit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package auditnl

import (
"strings"
"testing"
)

// formatPhaseMessage renders auditd-style key=value text with the result
// mapped from the ok flag.
//
// @spec engine-audit-emission
// @ac AC-04
func TestFormatPhaseMessage(t *testing.T) {
t.Run("engine-audit-emission/AC-04", func(t *testing.T) {})
ok := formatPhaseMessage("abc-123", "apply", true)
if !strings.Contains(ok, "phase=apply") || !strings.Contains(ok, "txn=abc-123") || !strings.Contains(ok, "result=ok") {
t.Errorf("ok message = %q", ok)
}
fail := formatPhaseMessage("abc-123", "rolled_back", false)
if !strings.Contains(fail, "result=fail") {
t.Errorf("fail message = %q", fail)
}
if !strings.HasPrefix(ok, "op=kensa_transaction") {
t.Errorf("message should start with the op tag; got %q", ok)
}
}

// NewEmitter never errors, and EmitPhase/Close are safe to call on the
// no-op emitter that results when the socket cannot be opened (the
// non-root CI case) — emission must never panic or block a transaction.
//
// @spec engine-audit-emission
// @ac AC-03
func TestNewEmitter_NoopSafe(t *testing.T) {
t.Run("engine-audit-emission/AC-03", func(t *testing.T) {})
em := NewEmitter() // non-root in CI → no-op emitter
// Must not panic regardless of whether the socket opened.
em.EmitPhase("txn-1", "apply", true)
if err := em.Close(); err != nil {
t.Errorf("Close: %v", err)
}
// A zero-value emitter is also safe.
var zero *Emitter
zero.EmitPhase("txn-2", "capture", true)
if err := zero.Close(); err != nil {
t.Errorf("zero Close: %v", err)
}
}
108 changes: 108 additions & 0 deletions internal/engine/audit_emit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package engine_test

import (
"context"
"errors"
"reflect"
"testing"
"time"

"github.com/google/uuid"

"github.com/Hanalyx/kensa/api"
"github.com/Hanalyx/kensa/internal/engine"
"github.com/Hanalyx/kensa/internal/handler"
)

// recordingEmitter records the phase names the engine emits.
type recordingEmitter struct{ phases []string }

func (r *recordingEmitter) EmitPhase(_, phase string, _ bool) {
r.phases = append(r.phases, phase)
}

func txnFor(mech string) *api.Transaction {
return &api.Transaction{
ID: uuid.New(),
HostID: "test-host",
Severity: "medium",
Steps: []api.Step{{Index: 0, Mechanism: mech}},
StartedAt: time.Now().UTC(),
Deadline: time.Now().Add(time.Minute),
Transactional: true,
}
}

// A committed transaction emits started → capture → apply → validate →
// committed, in order.
//
// @spec engine-audit-emission
// @ac AC-01
func TestEmit_CommittedSequence(t *testing.T) {
t.Run("engine-audit-emission/AC-01", func(t *testing.T) {})
r := handler.NewRegistry()
r.Register(&engine.FakeHandler{HandlerName: "fake_ok", IsCapturable: true})
em := &recordingEmitter{}
e := engine.New(engine.WithRegistry(r), engine.WithAuditEmitter(em))

res, err := e.Run(context.Background(), engine.NewFakeTransport(), txnFor("fake_ok"), false)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Status != api.StatusCommitted {
t.Fatalf("status = %s, want committed", res.Status)
}
want := []string{"started", "capture", "apply", "validate", "committed"}
if !reflect.DeepEqual(em.phases, want) {
t.Errorf("emitted phases = %v, want %v", em.phases, want)
}
}

// A rolled-back transaction (apply fails) emits started → capture → apply
// → rolled_back, with no validate phase.
//
// @spec engine-audit-emission
// @ac AC-02
func TestEmit_RolledBackSequence(t *testing.T) {
t.Run("engine-audit-emission/AC-02", func(t *testing.T) {})
r := handler.NewRegistry()
r.Register(&engine.FakeHandler{
HandlerName: "fake_fail",
IsCapturable: true,
ApplyErr: errors.New("induced apply failure"),
})
em := &recordingEmitter{}
e := engine.New(engine.WithRegistry(r), engine.WithAuditEmitter(em))

res, err := e.Run(context.Background(), engine.NewFakeTransport(), txnFor("fake_fail"), false)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Status != api.StatusRolledBack {
t.Fatalf("status = %s, want rolled_back", res.Status)
}
want := []string{"started", "capture", "apply", "rolled_back"}
if !reflect.DeepEqual(em.phases, want) {
t.Errorf("emitted phases = %v, want %v", em.phases, want)
}
}

// The default engine (no emitter wired) runs to completion without
// panicking — emission is off by default and never affects the outcome.
//
// @spec engine-audit-emission
// @ac AC-03
func TestEmit_NoopDefaultIsSafe(t *testing.T) {
t.Run("engine-audit-emission/AC-03", func(t *testing.T) {})
r := handler.NewRegistry()
r.Register(&engine.FakeHandler{HandlerName: "fake_ok", IsCapturable: true})
e := engine.New(engine.WithRegistry(r)) // no WithAuditEmitter

res, err := e.Run(context.Background(), engine.NewFakeTransport(), txnFor("fake_ok"), false)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Status != api.StatusCommitted {
t.Errorf("status = %s, want committed", res.Status)
}
}
7 changes: 7 additions & 0 deletions internal/engine/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (e *Engine) finalize(
validators []api.ValidatorResult,
rollbacks []api.RollbackResult,
) *api.TransactionResult {
// Terminal transaction-phase audit record (best-effort; never affects
// the outcome). The status string is the phase name (committed /
// rolled_back / partially_applied).
e.emitter.EmitPhase(txn.ID.String(), string(status), status == api.StatusCommitted)

now := time.Now().UTC()
result := &api.TransactionResult{
TransactionID: txn.ID,
Expand Down Expand Up @@ -121,6 +126,8 @@ func (e *Engine) finalize(
// [api.StatusErrored] outcome. The phase argument identifies which
// phase failed for diagnostics.
func (e *Engine) errored(ctx context.Context, txn *api.Transaction, startedAt time.Time, phase api.Phase, err error) *api.TransactionResult {
e.emitter.EmitPhase(txn.ID.String(), "errored", false)

now := time.Now().UTC()
result := &api.TransactionResult{
TransactionID: txn.ID,
Expand Down
41 changes: 41 additions & 0 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ type Engine struct {
validators []Validator
forceValidateFail bool

// emitter writes a transaction-phase record into the host's auditd
// at each phase boundary (the AUDIT_NETLINK observability surface). It is
// strictly best-effort and non-blocking — an audit-log failure can
// NEVER fail or delay a transaction. Defaults to a no-op; the
// production path wires auditnl.NewEmitter via WithAuditEmitter.
emitter PhaseEmitter

// agentClient is set via WithAgentClient. When non-nil,
// every handler lookup returns a RemoteHandler wrapping
// this client (the original handler's Capturable() value
Expand Down Expand Up @@ -107,6 +114,24 @@ type AgentAwareDeadmanArmer interface {
UseAgentClient(c DeadmanAgentClient)
}

// PhaseEmitter receives a transaction-phase record at each phase
// boundary. Implementations MUST be non-blocking and MUST NOT error —
// the engine ignores any failure, and audit emission can never affect a
// transaction's outcome. The interface lives here (not in auditnl) so the
// engine core does not import the netlink stack; auditnl.Emitter
// satisfies it structurally and is injected via WithAuditEmitter.
type PhaseEmitter interface {
// EmitPhase records that transaction txnID reached phase with the
// given success state.
EmitPhase(txnID, phase string, ok bool)
}

// noopPhaseEmitter is the default — emission is off unless a real emitter
// is wired in.
type noopPhaseEmitter struct{}

func (noopPhaseEmitter) EmitPhase(_, _ string, _ bool) {}

// Option configures [Engine] at construction. The default zero-config
// engine uses an in-memory store, a no-op signer, a no-op deadman
// armer, and a no-op event bus — sufficient for unit tests but not for
Expand All @@ -131,6 +156,17 @@ func WithDeadman(d DeadmanArmer) Option { return func(e *Engine) { e.deadman = d
// WithEvents overrides the event bus.
func WithEvents(b EventBus) Option { return func(e *Engine) { e.events = b } }

// WithAuditEmitter wires a transaction-phase auditd emitter.
// The production path passes auditnl.NewEmitter(); tests pass a recorder.
// Emission is best-effort and never affects a transaction.
func WithAuditEmitter(em PhaseEmitter) Option {
return func(e *Engine) {
if em != nil {
e.emitter = em
}
}
}

// WithForceValidateFail forces the validate phase to return false for
// every transaction. Used by kensa-fuzz to test the
// apply→validate-fail→rollback path without requiring a real rule check
Expand Down Expand Up @@ -180,6 +216,7 @@ func New(opts ...Option) *Engine {
deadman: noopDeadman{},
events: noopEventBus{},
locks: newHostLocks(),
emitter: noopPhaseEmitter{},
}
for _, opt := range opts {
opt(e)
Expand Down Expand Up @@ -225,6 +262,7 @@ func (e *Engine) Run(ctx context.Context, transport api.Transport, txn *api.Tran
defer release()

e.publishStarted(ctx, txn)
e.emitter.EmitPhase(txn.ID.String(), "started", true)

// Phase 1: PRE-FLIGHT.
if err := e.preflight(txn); err != nil {
Expand All @@ -241,6 +279,7 @@ func (e *Engine) Run(ctx context.Context, transport api.Transport, txn *api.Tran
return e.errored(ctx, txn, startedAt, api.PhaseCapture, err), nil
}
e.publishPhaseCompleted(ctx, txn, api.PhaseCapture, true, time.Since(startedAt))
e.emitter.EmitPhase(txn.ID.String(), "capture", true)

// Arm deadman timer for control-channel-sensitive transactions
// (engine-transaction spec AC-06, C-04).
Expand All @@ -255,13 +294,15 @@ func (e *Engine) Run(ctx context.Context, transport api.Transport, txn *api.Tran
// Phase 3: APPLY.
applyResults, applyOK := e.apply(ctx, transport, txn, preStates)
e.publishPhaseCompleted(ctx, txn, api.PhaseApply, applyOK, time.Since(startedAt))
e.emitter.EmitPhase(txn.ID.String(), "apply", applyOK)

// Phase 4: VALIDATE (only if APPLY succeeded).
var validators []api.ValidatorResult
validateOK := applyOK
if applyOK {
validators, validateOK = e.validate(ctx, transport, txn)
e.publishPhaseCompleted(ctx, txn, api.PhaseValidate, validateOK, time.Since(startedAt))
e.emitter.EmitPhase(txn.ID.String(), "validate", validateOK)
}

// Phase 5: COMMIT or ROLLBACK.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kensa/kensa.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/google/uuid"

"github.com/Hanalyx/kensa/api"
"github.com/Hanalyx/kensa/internal/agent/auditnl"
"github.com/Hanalyx/kensa/internal/engine"
"github.com/Hanalyx/kensa/internal/engine/deadman"
"github.com/Hanalyx/kensa/internal/evidence"
Expand Down Expand Up @@ -183,6 +184,11 @@ func defaultService(ctx context.Context, storePath string, tf api.TransportFacto
engine.WithDeadman(deadman.New(0, nil)),
engine.WithSigner(signer),
engine.WithEvents(bus),
// Emit a transaction-phase record into the host's auditd
// at each phase boundary. Best-effort — NewEmitter degrades to a
// no-op when the AUDIT netlink socket can't be opened (no
// privilege), so this never affects a transaction.
engine.WithAuditEmitter(auditnl.NewEmitter()),
}
allOpts := make([]engine.Option, 0, len(stdOpts)+len(engineOpts))
allOpts = append(allOpts, stdOpts...)
Expand Down
Loading