Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions append_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"log/slog"

f_log "github.com/transparency-dev/formats/log"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/tessera/api/layout"
Expand All @@ -34,7 +36,6 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/mod/sumdb/note"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -89,7 +90,8 @@ func init() {
metric.WithDescription("Number of calls to the appender lifecycle Add function"),
metric.WithUnit("{call}"))
if err != nil {
klog.Exitf("Failed to create appenderAddsTotal metric: %v", err)
slog.Error("Failed to create appenderAddsTotal metric", slog.Any("error", err))
os.Exit(1)
}

appenderAddHistogram, err = meter.Int64Histogram(
Expand All @@ -98,22 +100,25 @@ func init() {
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderAddDuration metric: %v", err)
slog.Error("Failed to create appenderAddDuration metric", slog.Any("error", err))
os.Exit(1)
}

appenderHighestIndex, err = meter.Int64Gauge(
"tessera.appender.index",
metric.WithDescription("Highest index assigned by appender lifecycle Add function"))
if err != nil {
klog.Exitf("Failed to create appenderHighestIndex metric: %v", err)
slog.Error("Failed to create appenderHighestIndex metric", slog.Any("error", err))
os.Exit(1)
}

appenderIntegratedSize, err = meter.Int64Gauge(
"tessera.appender.integrated.size",
metric.WithDescription("Size of the integrated (but not necessarily published) tree"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderIntegratedSize metric: %v", err)
slog.Error("Failed to create appenderIntegratedSize metric", slog.Any("error", err))
os.Exit(1)
}

appenderIntegrateLatency, err = meter.Int64Histogram(
Expand All @@ -122,7 +127,8 @@ func init() {
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderIntegrateLatency metric: %v", err)
slog.Error("Failed to create appenderIntegrateLatency metric", slog.Any("error", err))
os.Exit(1)
}

appenderDeadlineRemaining, err = meter.Int64Histogram(
Expand All @@ -131,54 +137,61 @@ func init() {
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderDeadlineRemaining metric: %v", err)
slog.Error("Failed to create appenderDeadlineRemaining metric", slog.Any("error", err))
os.Exit(1)
}

appenderNextIndex, err = meter.Int64Gauge(
"tessera.appender.next_index",
metric.WithDescription("The next available index to be assigned to entries"))
if err != nil {
klog.Exitf("Failed to create appenderNextIndex metric: %v", err)
slog.Error("Failed to create appenderNextIndex metric", slog.Any("error", err))
os.Exit(1)
}

appenderSignedSize, err = meter.Int64Gauge(
"tessera.appender.signed.size",
metric.WithDescription("Size of the latest signed checkpoint"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderSignedSize metric: %v", err)
slog.Error("Failed to create appenderSignedSize metric", slog.Any("error", err))
os.Exit(1)
}

appenderWitnessedSize, err = meter.Int64Gauge(
"tessera.appender.witnessed.size",
metric.WithDescription("Size of the latest successfully witnessed checkpoint"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderWitnessedSize metric: %v", err)
slog.Error("Failed to create appenderWitnessedSize metric", slog.Any("error", err))
os.Exit(1)
}

followerEntriesProcessed, err = meter.Int64Gauge(
"tessera.follower.processed",
metric.WithDescription("Number of entries processed"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create followerEntriesProcessed metric: %v", err)
slog.Error("Failed to create followerEntriesProcessed metric", slog.Any("error", err))
os.Exit(1)
}

followerLag, err = meter.Int64Gauge(
"tessera.follower.lag",
metric.WithDescription("Number of unprocessed entries in the current integrated tree"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create followerLag metric: %v", err)
slog.Error("Failed to create followerLag metric", slog.Any("error", err))
os.Exit(1)
}

appenderWitnessRequests, err = meter.Int64Counter(
"tessera.appender.witness.requests",
metric.WithDescription("Number of attempts to witness a log checkpoint"),
metric.WithUnit("{call}"))
if err != nil {
klog.Exitf("Failed to create appenderWitnessRequests metric: %v", err)
slog.Error("Failed to create appenderWitnessRequests metric", slog.Any("error", err))
os.Exit(1)
}

appenderWitnessHistogram, err = meter.Int64Histogram(
Expand All @@ -187,7 +200,8 @@ func init() {
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderWitnessHistogram metric: %v", err)
slog.Error("Failed to create appenderWitnessHistogram metric", slog.Any("error", err))
os.Exit(1)
}

}
Expand Down Expand Up @@ -338,12 +352,12 @@ func followerStats(ctx context.Context, f Follower, size func(context.Context) (

n, err := f.EntriesProcessed(ctx)
if err != nil {
klog.Errorf("followerStats: follower %q EntriesProcessed(): %v", name, err)
slog.Error("followerStats: error in EntriesProcessed", slog.String("name", name), slog.Any("error", err))
continue
}
s, err := size(ctx)
if err != nil {
klog.Errorf("followerStats: follower %q size(): %v", name, err)
slog.Error("followerStats: error in follower size", slog.String("name", name), slog.Any("error", err))
}
attrs := metric.WithAttributes(followerNameKey.String(name))
followerEntriesProcessed.Record(ctx, otel.Clamp64(n), attrs)
Expand Down Expand Up @@ -401,7 +415,7 @@ func (i *integrationStats) latency(size uint64) (time.Duration, bool) {
// This is a long running function, exitingly only when the provided context is done.
func (i *integrationStats) updateStats(ctx context.Context, r LogReader) {
if r == nil {
klog.Warning("updateStates: nil logreader provided, not updating stats")
slog.Warn("updateStates: nil logreader provided, not updating stats")
return
}
t := time.NewTicker(100 * time.Millisecond)
Expand All @@ -413,7 +427,7 @@ func (i *integrationStats) updateStats(ctx context.Context, r LogReader) {
}
s, err := r.IntegratedSize(ctx)
if err != nil {
klog.Errorf("IntegratedSize: %v", err)
slog.Error("Error calling IntegratedSize", slog.Any("error", err))
continue
}
appenderIntegratedSize.Record(ctx, otel.Clamp64(s))
Expand All @@ -422,7 +436,7 @@ func (i *integrationStats) updateStats(ctx context.Context, r LogReader) {
}
i, err := r.NextIndex(ctx)
if err != nil {
klog.Errorf("NextIndex: %v", err)
slog.Error("Error calling NextIndex", slog.Any("error", err))
}
appenderNextIndex.Record(ctx, otel.Clamp64(i))
}
Expand Down Expand Up @@ -545,7 +559,7 @@ func (t *terminator) Shutdown(ctx context.Context) error {
if err != nil {
return err
}
klog.V(1).Infof("Shutting down, waiting for checkpoint committing to size %d (current checkpoint is %d)", maxIndex, size)
slog.Debug("Shutting down, waiting for checkpoint", slog.Uint64("goal", maxIndex), slog.Uint64("current", size))
if size > maxIndex {
return nil
}
Expand Down Expand Up @@ -660,7 +674,7 @@ func (o AppendOptions) CheckpointPublisher(lr LogReader, httpClient *http.Client
var oldSize uint64
oldCP, err := lr.ReadCheckpoint(ctx)
if err != nil {
klog.Infof("Failed to fetch old checkpoint: %v", err)
slog.Info("Failed to fetch old checkpoint", slog.Any("error", err))
} else {
_, oldSize, _, err = parse.CheckpointUnsafe(oldCP)
if err != nil {
Expand All @@ -682,7 +696,7 @@ func (o AppendOptions) CheckpointPublisher(lr LogReader, httpClient *http.Client
appenderWitnessRequests.Add(ctx, 1, metric.WithAttributes(attribute.String("error.type", "failed")))
return nil, err
}
klog.Warningf("WitnessGateway: failing-open despite error: %v", err)
slog.Warn("WitnessGateway: failing-open despite error", slog.Any("error", err))
witAttr = append(witAttr, attribute.String("error.type", "failed_open"))
}

Expand Down Expand Up @@ -746,7 +760,8 @@ func (o *AppendOptions) WithCheckpointSigner(s note.Signer, additionalSigners ..
origin := s.Name()
for _, signer := range additionalSigners {
if origin != signer.Name() {
klog.Exitf("WithCheckpointSigner: additional signer name (%q) does not match primary signer name (%q)", signer.Name(), origin)
slog.Error("WithCheckpointSigner: additional signer name does not match primary signer name", slog.String("name", signer.Name()), slog.String("origin", origin))
os.Exit(1)
}
}
o.newCP = func(ctx context.Context, size uint64, hash []byte) ([]byte, error) {
Expand Down
5 changes: 3 additions & 2 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"sync"
"time"

"log/slog"

"github.com/transparency-dev/tessera/internal/otel"
"github.com/transparency-dev/tessera/internal/parse"
"go.opentelemetry.io/otel/trace"
"k8s.io/klog/v2"
)

// NewPublicationAwaiter provides an PublicationAwaiter that can be cancelled
Expand Down Expand Up @@ -123,7 +124,7 @@ func (a *PublicationAwaiter) pollLoop(ctx context.Context, readCheckpoint func(c
select {
case <-ctx.Done():
span.AddEvent("context.done")
klog.V(2).Info("PublicationAwaiter exiting due to context completion")
slog.Debug("PublicationAwaiter exiting due to context completion")
cp, cpSize, cpErr = nil, 0, ctx.Err()
ctxDone = true
case <-time.After(pollPeriod):
Expand Down
5 changes: 3 additions & 2 deletions client/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
"path"
"strings"

"log/slog"

"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/fetcher"
"k8s.io/klog/v2"
)

// NewHTTPFetcher creates a new HTTPFetcher for the log rooted at the given URL, using
Expand Down Expand Up @@ -88,7 +89,7 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) {

defer func() {
if err := r.Body.Close(); err != nil {
klog.Errorf("resp.Body.Close(): %v", err)
slog.Error("resp.Body.Close", slog.Any("error", err))
}
}()
return io.ReadAll(r.Body)
Expand Down
9 changes: 5 additions & 4 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"fmt"
"iter"

"log/slog"

"github.com/transparency-dev/tessera/api/layout"
"k8s.io/klog/v2"
)

// TreeSizeFunc is a function which knows how to return the current tree size of a log.
Expand Down Expand Up @@ -86,7 +87,7 @@ func EntryBundles(ctx context.Context, numWorkers uint, getSize TreeSizeFunc, ge
tokens <- struct{}{}
}

klog.V(1).Infof("stream.EntryBundles: streaming [%d, %d)", fromEntry, fromEntry+N)
slog.Debug("stream.EntryBundles: streaming", slog.Uint64("from", fromEntry), slog.Uint64("N", N))

// For each bundle, pop a future into the bundles channel and kick off an async request
// to resolve it.
Expand Down Expand Up @@ -114,7 +115,7 @@ func EntryBundles(ctx context.Context, numWorkers uint, getSize TreeSizeFunc, ge
bundles <- f
}

klog.V(1).Infof("stream.EntryBundles: exiting")
slog.Debug("stream.EntryBundles: exiting")
}()

return func(yield func(Bundle, error) bool) {
Expand All @@ -131,7 +132,7 @@ func EntryBundles(ctx context.Context, numWorkers uint, getSize TreeSizeFunc, ge
return
}
}
klog.V(1).Infof("stream.EntryBundles: iter done")
slog.Debug("stream.EntryBundles: iter done")
}
}

Expand Down
Loading
Loading