Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/server/telemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_prometheus_client_model//go",
],
)

Expand Down
88 changes: 88 additions & 0 deletions pkg/server/telemetry/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
)

// Bucket10 buckets a number by order of magnitude base 10, eg 637 -> 100.
Expand Down Expand Up @@ -164,6 +166,92 @@ func (c CounterWithMetric) Inspect(f func(interface{})) {
c.metric.Inspect(f)
}

// CounterWithAggMetric combines a telemetry and a agg metric counter.
type CounterWithAggMetric struct {
telemetry Counter
metric *aggmetric.AggCounter
}

// Necessary for metric metadata registration.
var _ metric.Iterable = CounterWithAggMetric{}
var _ metric.PrometheusIterable = CounterWithAggMetric{}

// NewCounterWithAggMetric creates a CounterWithAggMetric.
func NewCounterWithAggMetric(metadata metric.Metadata) CounterWithAggMetric {
return CounterWithAggMetric{
telemetry: GetCounter(metadata.Name),
metric: aggmetric.NewCounterWithCacheStorageType(metadata),
}
}

// Inc increments both counters.
func (c CounterWithAggMetric) Inc(labelValues ...string) {
Inc(c.telemetry)
c.metric.Inc(1, labelValues...)
}

// Count returns the value of the metric, not the telemetry. Note that the
// telemetry value may reset to zero when, for example, GetFeatureCounts() is
// called with ResetCounts to generate a report.
func (c CounterWithAggMetric) Count() int64 {
return c.metric.Count()
}

// Forward the metric.Iterable and PrometheusIterable interface to the metric counter. We
// don't just embed the counter because our Inc() interface is a bit different.

// GetName implements metric.Iterable
func (c CounterWithAggMetric) GetName() string {
return c.metric.GetName()
}

// GetHelp implements metric.Iterable
func (c CounterWithAggMetric) GetHelp() string {
return c.metric.GetHelp()
}

// GetMeasurement implements metric.Iterable
func (c CounterWithAggMetric) GetMeasurement() string {
return c.metric.GetMeasurement()
}

// GetUnit implements metric.Iterable
func (c CounterWithAggMetric) GetUnit() metric.Unit {
return c.metric.GetUnit()
}

// GetMetadata implements metric.Iterable
func (c CounterWithAggMetric) GetMetadata() metric.Metadata {
return c.metric.GetMetadata()
}

// Inspect implements metric.Iterable
func (c CounterWithAggMetric) Inspect(f func(interface{})) {
c.metric.Inspect(f)
}

func (c CounterWithAggMetric) GetType() *io_prometheus_client.MetricType {
return c.metric.GetType()
}

func (c CounterWithAggMetric) GetLabels() []*io_prometheus_client.LabelPair {
return c.metric.GetLabels()
}

func (c CounterWithAggMetric) ToPrometheusMetric() *io_prometheus_client.Metric {
return c.metric.ToPrometheusMetric()
}

func (c CounterWithAggMetric) Each(
pairs []*io_prometheus_client.LabelPair, f func(metric *io_prometheus_client.Metric),
) {
c.metric.Each(pairs, f)
}

func (c CounterWithAggMetric) ReinitialiseChildMetrics(labelVals []string) {
c.metric.ReinitialiseChildMetrics(labelVals)
}

func init() {
counters.m = make(map[string]Counter, approxFeatureCount)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/telemetry/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,17 @@ func TestBucket(t *testing.T) {
// for example, a report is created.
func TestCounterWithMetric(t *testing.T) {
cm := telemetry.NewCounterWithMetric(metric.Metadata{Name: "test-metric"})
cag := telemetry.NewCounterWithAggMetric(metric.Metadata{Name: "test-agg-metric"})

cm.Inc()
cag.Inc()

// Using GetFeatureCounts to read the telemetry value.
m1 := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ReadOnly)
require.Equal(t, int32(1), m1["test-metric"])
require.Equal(t, int64(1), cm.Count())
require.Equal(t, int32(1), m1["test-agg-metric"])
require.Equal(t, int64(1), cm.Count())

// Reset the telemetry.
telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand All @@ -106,4 +111,6 @@ func TestCounterWithMetric(t *testing.T) {
m2 := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ReadOnly)
require.Equal(t, int32(0), m2["test-metric"])
require.Equal(t, int64(1), cm.Count())
require.Equal(t, int32(0), m2["test-agg-metric"])
require.Equal(t, int64(1), cm.Count())
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ go_library(
"//pkg/util/memzipper",
"//pkg/util/metamorphic",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/optional",
"//pkg/util/pretty",
Expand Down
100 changes: 54 additions & 46 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/sentryutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -574,27 +575,27 @@ func makeMetrics(internal bool, sv *settings.Values) Metrics {
Duration: 6 * metricsSampleInterval,
BucketConfig: metric.IOLatencyBuckets,
}),
SQLServiceLatency: metric.NewHistogram(metric.HistogramOptions{
SQLServiceLatency: aggmetric.NewHistogramWithCacheStorage(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: getMetricMeta(MetaSQLServiceLatency, internal),
Duration: 6 * metricsSampleInterval,
BucketConfig: metric.IOLatencyBuckets,
}),
SQLTxnLatency: metric.NewHistogram(metric.HistogramOptions{
SQLTxnLatency: aggmetric.NewHistogramWithCacheStorage(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: getMetricMeta(MetaSQLTxnLatency, internal),
Duration: 6 * metricsSampleInterval,
BucketConfig: metric.IOLatencyBuckets,
}),
SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLActiveStatements: metric.NewGauge(getMetricMeta(MetaSQLActiveQueries, internal)),
SQLTxnsOpen: aggmetric.NewGaugeWithCacheStorageType(getMetricMeta(MetaSQLTxnsOpen, internal)),
SQLActiveStatements: aggmetric.NewGaugeWithCacheStorageType(getMetricMeta(MetaSQLActiveQueries, internal)),
SQLContendedTxns: metric.NewCounter(getMetricMeta(MetaSQLTxnContended, internal)),

TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)),
FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)),
FailureCount: aggmetric.NewCounterWithCacheStorageType(getMetricMeta(MetaFailure, internal)),
StatementTimeoutCount: metric.NewCounter(getMetricMeta(MetaStatementTimeout, internal)),
TransactionTimeoutCount: metric.NewCounter(getMetricMeta(MetaTransactionTimeout, internal)),
FullTableOrIndexScanCount: metric.NewCounter(getMetricMeta(MetaFullTableOrIndexScan, internal)),
FullTableOrIndexScanCount: aggmetric.NewCounterWithCacheStorageType(getMetricMeta(MetaFullTableOrIndexScan, internal)),
FullTableOrIndexScanRejectedCount: metric.NewCounter(getMetricMeta(MetaFullTableOrIndexScanRejected, internal)),
},
StartedStatementCounters: makeStartedStatementCounters(internal),
Expand Down Expand Up @@ -3015,12 +3016,12 @@ func (ex *connExecutor) execCopyOut(
ctx, cancelQuery = ctxlog.WithCancel(ctx)
queryID := ex.server.cfg.GenerateID()
ex.addActiveQuery(cmd.ParsedStmt, nil /* placeholders */, queryID, cancelQuery)
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1, ex.getLabelValues()...)

defer func() {
ex.removeActiveQuery(queryID, cmd.Stmt)
cancelQuery()
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1, ex.getLabelValues()...)
if !payloadHasError(retPayload) {
ex.incrementExecutedStmtCounter(cmd.Stmt)
}
Expand Down Expand Up @@ -3227,12 +3228,13 @@ func (ex *connExecutor) execCopyIn(
ctx, cancelQuery = ctxlog.WithCancel(ctx)
queryID := ex.server.cfg.GenerateID()
ex.addActiveQuery(cmd.ParsedStmt, nil /* placeholders */, queryID, cancelQuery)
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1, ex.getLabelValues()...)

defer func() {
ex.removeActiveQuery(queryID, cmd.Stmt)
cancelQuery()
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1,
ex.getLabelValues()...)
if !payloadHasError(retPayload) {
ex.incrementExecutedStmtCounter(cmd.Stmt)
}
Expand Down Expand Up @@ -4579,6 +4581,11 @@ func (ex *connExecutor) getDescIDGenerator() eval.DescIDGenerator {
return ex.server.cfg.DescIDGenerator
}

func (ex *connExecutor) getLabelValues() []string {
sessionData := ex.sessionData()
return ex.server.LabelValueConfig.GetLabelValues(sessionData.Database, sessionData.ApplicationName)
}

// StatementCounters groups metrics for counting different types of
// statements.
type StatementCounters struct {
Expand All @@ -4587,17 +4594,17 @@ type StatementCounters struct {
QueryCount telemetry.CounterWithMetric

// Basic CRUD statements.
SelectCount telemetry.CounterWithMetric
UpdateCount telemetry.CounterWithMetric
InsertCount telemetry.CounterWithMetric
DeleteCount telemetry.CounterWithMetric
SelectCount telemetry.CounterWithAggMetric
UpdateCount telemetry.CounterWithAggMetric
InsertCount telemetry.CounterWithAggMetric
DeleteCount telemetry.CounterWithAggMetric
// CRUDQueryCount includes all 4 CRUD statements above.
CRUDQueryCount telemetry.CounterWithMetric
CRUDQueryCount telemetry.CounterWithAggMetric

// Transaction operations.
TxnBeginCount telemetry.CounterWithMetric
TxnCommitCount telemetry.CounterWithMetric
TxnRollbackCount telemetry.CounterWithMetric
TxnBeginCount telemetry.CounterWithAggMetric
TxnCommitCount telemetry.CounterWithAggMetric
TxnRollbackCount telemetry.CounterWithAggMetric
TxnUpgradedCount *metric.Counter

// Transaction XA two-phase commit operations.
Expand Down Expand Up @@ -4638,11 +4645,11 @@ type StatementCounters struct {

func makeStartedStatementCounters(internal bool) StatementCounters {
return StatementCounters{
TxnBeginCount: telemetry.NewCounterWithMetric(
TxnBeginCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnBeginStarted, internal)),
TxnCommitCount: telemetry.NewCounterWithMetric(
TxnCommitCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnCommitStarted, internal)),
TxnRollbackCount: telemetry.NewCounterWithMetric(
TxnRollbackCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnRollbackStarted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
Expand All @@ -4664,15 +4671,15 @@ func makeStartedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaReleaseSavepointStarted, internal)),
RollbackToSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRollbackToSavepointStarted, internal)),
SelectCount: telemetry.NewCounterWithMetric(
SelectCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaSelectStarted, internal)),
UpdateCount: telemetry.NewCounterWithMetric(
UpdateCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaUpdateStarted, internal)),
InsertCount: telemetry.NewCounterWithMetric(
InsertCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaInsertStarted, internal)),
DeleteCount: telemetry.NewCounterWithMetric(
DeleteCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaDeleteStarted, internal)),
CRUDQueryCount: telemetry.NewCounterWithMetric(
CRUDQueryCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaCRUDStarted, internal)),
DdlCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDdlStarted, internal)),
Expand All @@ -4691,11 +4698,11 @@ func makeStartedStatementCounters(internal bool) StatementCounters {

func makeExecutedStatementCounters(internal bool) StatementCounters {
return StatementCounters{
TxnBeginCount: telemetry.NewCounterWithMetric(
TxnBeginCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnBeginExecuted, internal)),
TxnCommitCount: telemetry.NewCounterWithMetric(
TxnCommitCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnCommitExecuted, internal)),
TxnRollbackCount: telemetry.NewCounterWithMetric(
TxnRollbackCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaTxnRollbackExecuted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
Expand All @@ -4717,15 +4724,15 @@ func makeExecutedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaReleaseSavepointExecuted, internal)),
RollbackToSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRollbackToSavepointExecuted, internal)),
SelectCount: telemetry.NewCounterWithMetric(
SelectCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaSelectExecuted, internal)),
UpdateCount: telemetry.NewCounterWithMetric(
UpdateCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaUpdateExecuted, internal)),
InsertCount: telemetry.NewCounterWithMetric(
InsertCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaInsertExecuted, internal)),
DeleteCount: telemetry.NewCounterWithMetric(
DeleteCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaDeleteExecuted, internal)),
CRUDQueryCount: telemetry.NewCounterWithMetric(
CRUDQueryCount: telemetry.NewCounterWithAggMetric(
getMetricMeta(MetaCRUDExecuted, internal)),
DdlCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDdlExecuted, internal)),
Expand All @@ -4744,30 +4751,31 @@ func makeExecutedStatementCounters(internal bool) StatementCounters {

func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statement) {
sc.QueryCount.Inc()
labelValues := ex.getLabelValues()
switch t := stmt.(type) {
case *tree.BeginTransaction:
sc.TxnBeginCount.Inc()
sc.TxnBeginCount.Inc(labelValues...)
case *tree.Select:
sc.SelectCount.Inc()
sc.CRUDQueryCount.Inc()
sc.SelectCount.Inc(labelValues...)
sc.CRUDQueryCount.Inc(labelValues...)
case *tree.Update:
sc.UpdateCount.Inc()
sc.CRUDQueryCount.Inc()
sc.UpdateCount.Inc(labelValues...)
sc.CRUDQueryCount.Inc(labelValues...)
case *tree.Insert:
sc.InsertCount.Inc()
sc.CRUDQueryCount.Inc()
sc.InsertCount.Inc(labelValues...)
sc.CRUDQueryCount.Inc(labelValues...)
case *tree.Delete:
sc.DeleteCount.Inc()
sc.CRUDQueryCount.Inc()
sc.DeleteCount.Inc(labelValues...)
sc.CRUDQueryCount.Inc(labelValues...)
case *tree.CommitTransaction:
sc.TxnCommitCount.Inc()
sc.TxnCommitCount.Inc(labelValues...)
case *tree.RollbackTransaction:
// The CommitWait state means that the transaction has already committed
// after a specially handled `RELEASE SAVEPOINT cockroach_restart` command.
if ex.getTransactionState() == CommitWaitStateStr {
sc.TxnCommitCount.Inc()
sc.TxnCommitCount.Inc(labelValues...)
} else {
sc.TxnRollbackCount.Inc()
sc.TxnRollbackCount.Inc(labelValues...)
}
case *tree.PrepareTransaction:
sc.TxnPrepareCount.Inc()
Expand Down
Loading