Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 175 additions & 43 deletions daemon/ccotel_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"log/slog"
"os"
"time"
"strconv"

"github.com/google/uuid"
"github.com/malamtime/cli/model"
Expand Down Expand Up @@ -54,14 +54,15 @@ func (p *CCOtelProcessor) ProcessMetrics(ctx context.Context, req *collmetricsv1
continue
}

session := extractSessionFromResource(resource)
// Extract resource attributes once for all metrics in this resource
resourceAttrs := extractResourceAttributes(resource)
project := p.detectProject(resource)

var metrics []model.CCOtelMetric

for _, sm := range rm.GetScopeMetrics() {
for _, m := range sm.GetMetrics() {
parsedMetrics := p.parseMetric(m)
parsedMetrics := p.parseMetric(m, resourceAttrs)
metrics = append(metrics, parsedMetrics...)
}
}
Expand All @@ -70,11 +71,10 @@ func (p *CCOtelProcessor) ProcessMetrics(ctx context.Context, req *collmetricsv1
continue
}

// Build and send request immediately
// Build and send request immediately - flat structure without session
ccReq := &model.CCOtelRequest{
Host: p.hostname,
Project: project,
Session: session,
Metrics: metrics,
}

Expand Down Expand Up @@ -103,14 +103,15 @@ func (p *CCOtelProcessor) ProcessLogs(ctx context.Context, req *collogsv1.Export
continue
}

session := extractSessionFromResource(resource)
// Extract resource attributes once for all events in this resource
resourceAttrs := extractResourceAttributes(resource)
project := p.detectProject(resource)

var events []model.CCOtelEvent

for _, sl := range rl.GetScopeLogs() {
for _, lr := range sl.GetLogRecords() {
event := p.parseLogRecord(lr)
event := p.parseLogRecord(lr, resourceAttrs)
if event != nil {
events = append(events, *event)
}
Expand All @@ -121,11 +122,10 @@ func (p *CCOtelProcessor) ProcessLogs(ctx context.Context, req *collogsv1.Export
continue
}

// Build and send request immediately
// Build and send request immediately - flat structure without session
ccReq := &model.CCOtelRequest{
Host: p.hostname,
Project: project,
Session: session,
Events: events,
}

Expand Down Expand Up @@ -155,50 +155,105 @@ func isClaudeCodeResource(resource *resourcev1.Resource) bool {
return false
}

// extractSessionFromResource extracts session info from resource attributes
func extractSessionFromResource(resource *resourcev1.Resource) *model.CCOtelSession {
session := &model.CCOtelSession{
StartedAt: time.Now().Unix(),
}
// extractResourceAttributes extracts resource-level attributes from OTEL resource
// Returns a struct that can be used to populate metrics and events
func extractResourceAttributes(resource *resourcev1.Resource) *model.CCOtelResourceAttributes {
attrs := &model.CCOtelResourceAttributes{}

if resource == nil {
return session
return attrs
}

for _, attr := range resource.GetAttributes() {
key := attr.GetKey()
value := attr.GetValue()

switch key {
// Standard resource attributes
case "session.id":
session.SessionID = value.GetStringValue()
attrs.SessionID = value.GetStringValue()
case "app.version":
session.AppVersion = value.GetStringValue()
attrs.AppVersion = value.GetStringValue()
case "organization.id":
session.OrganizationID = value.GetStringValue()
attrs.OrganizationID = value.GetStringValue()
case "user.account_uuid":
session.UserAccountUUID = value.GetStringValue()
attrs.UserAccountUUID = value.GetStringValue()
case "terminal.type":
session.TerminalType = value.GetStringValue()
attrs.TerminalType = value.GetStringValue()
case "service.version":
session.ServiceVersion = value.GetStringValue()
attrs.ServiceVersion = value.GetStringValue()
case "os.type":
session.OSType = value.GetStringValue()
attrs.OSType = value.GetStringValue()
case "os.version":
session.OSVersion = value.GetStringValue()
attrs.OSVersion = value.GetStringValue()
case "host.arch":
session.HostArch = value.GetStringValue()
attrs.HostArch = value.GetStringValue()
case "wsl.version":
session.WSLVersion = value.GetStringValue()
attrs.WSLVersion = value.GetStringValue()
// Additional identifiers
case "user.id":
attrs.UserID = value.GetStringValue()
case "user.email":
attrs.UserEmail = value.GetStringValue()
// Custom resource attributes (from OTEL_RESOURCE_ATTRIBUTES)
case "user.name":
attrs.UserName = value.GetStringValue()
case "machine.name":
attrs.MachineName = value.GetStringValue()
case "team.id":
attrs.TeamID = value.GetStringValue()
case "pwd":
attrs.Pwd = value.GetStringValue()
}
}

// Generate session ID if not present
if session.SessionID == "" {
session.SessionID = uuid.New().String()
}
return attrs
}
Comment on lines +160 to +211
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The previous extractSessionFromResource function would generate a new UUID for the session ID if one wasn't provided in the resource attributes. This behavior seems to have been lost in the refactoring. If a sessionId is expected to be present on all events and metrics, you might want to reintroduce this fallback logic to prevent potential issues downstream. For example, by adding if attrs.SessionID == "" { attrs.SessionID = uuid.New().String() } before returning from this function.


return session
// applyResourceAttributesToMetric copies resource attributes into a metric
func applyResourceAttributesToMetric(metric *model.CCOtelMetric, attrs *model.CCOtelResourceAttributes) {
// Standard resource attributes
metric.SessionID = attrs.SessionID
metric.UserAccountUUID = attrs.UserAccountUUID
metric.OrganizationID = attrs.OrganizationID
metric.TerminalType = attrs.TerminalType
metric.AppVersion = attrs.AppVersion
metric.OSType = attrs.OSType
metric.OSVersion = attrs.OSVersion
metric.HostArch = attrs.HostArch

// Additional identifiers
metric.UserID = attrs.UserID
metric.UserEmail = attrs.UserEmail

// Custom resource attributes
metric.UserName = attrs.UserName
metric.MachineName = attrs.MachineName
metric.TeamID = attrs.TeamID
metric.Pwd = attrs.Pwd
}

// applyResourceAttributesToEvent copies resource attributes into an event
func applyResourceAttributesToEvent(event *model.CCOtelEvent, attrs *model.CCOtelResourceAttributes) {
// Standard resource attributes
event.SessionID = attrs.SessionID
event.UserAccountUUID = attrs.UserAccountUUID
event.OrganizationID = attrs.OrganizationID
event.TerminalType = attrs.TerminalType
event.AppVersion = attrs.AppVersion
event.OSType = attrs.OSType
event.OSVersion = attrs.OSVersion
event.HostArch = attrs.HostArch

// Additional identifiers
event.UserID = attrs.UserID
event.UserEmail = attrs.UserEmail

// Custom resource attributes
event.UserName = attrs.UserName
event.MachineName = attrs.MachineName
event.TeamID = attrs.TeamID
event.Pwd = attrs.Pwd
}

// detectProject extracts project from resource attributes or environment
Expand All @@ -224,7 +279,7 @@ func (p *CCOtelProcessor) detectProject(resource *resourcev1.Resource) string {
}

// parseMetric parses an OTEL metric into CCOtelMetric(s)
func (p *CCOtelProcessor) parseMetric(m *metricsv1.Metric) []model.CCOtelMetric {
func (p *CCOtelProcessor) parseMetric(m *metricsv1.Metric, resourceAttrs *model.CCOtelResourceAttributes) []model.CCOtelMetric {
var metrics []model.CCOtelMetric

name := m.GetName()
Expand All @@ -243,7 +298,9 @@ func (p *CCOtelProcessor) parseMetric(m *metricsv1.Metric) []model.CCOtelMetric
Timestamp: int64(dp.GetTimeUnixNano() / 1e9), // Convert to seconds
Value: getDataPointValue(dp),
}
// Extract attributes
// Apply resource attributes first
applyResourceAttributesToMetric(&metric, resourceAttrs)
// Then extract data point attributes (can override resource attrs)
for _, attr := range dp.GetAttributes() {
applyMetricAttribute(&metric, attr, metricType)
}
Expand All @@ -257,6 +314,9 @@ func (p *CCOtelProcessor) parseMetric(m *metricsv1.Metric) []model.CCOtelMetric
Timestamp: int64(dp.GetTimeUnixNano() / 1e9),
Value: getDataPointValue(dp),
}
// Apply resource attributes first
applyResourceAttributesToMetric(&metric, resourceAttrs)
// Then extract data point attributes (can override resource attrs)
for _, attr := range dp.GetAttributes() {
applyMetricAttribute(&metric, attr, metricType)
}
Expand All @@ -268,34 +328,39 @@ func (p *CCOtelProcessor) parseMetric(m *metricsv1.Metric) []model.CCOtelMetric
}

// parseLogRecord parses an OTEL log record into a CCOtelEvent
func (p *CCOtelProcessor) parseLogRecord(lr *logsv1.LogRecord) *model.CCOtelEvent {
func (p *CCOtelProcessor) parseLogRecord(lr *logsv1.LogRecord, resourceAttrs *model.CCOtelResourceAttributes) *model.CCOtelEvent {
event := &model.CCOtelEvent{
EventID: uuid.New().String(),
Timestamp: int64(lr.GetTimeUnixNano() / 1e9), // Convert to seconds
}

// Extract event type and other attributes
// Apply resource attributes first
applyResourceAttributesToEvent(event, resourceAttrs)

// Extract event type and other attributes from log record
for _, attr := range lr.GetAttributes() {
key := attr.GetKey()
value := attr.GetValue()

switch key {
case "event.name":
event.EventType = mapEventName(value.GetStringValue())
case "event.timestamp":
event.EventTimestamp = value.GetStringValue()
case "model":
event.Model = value.GetStringValue()
case "cost_usd":
event.CostUSD = value.GetDoubleValue()
event.CostUSD = getFloatFromValue(value)
case "duration_ms":
event.DurationMs = int(value.GetIntValue())
event.DurationMs = getIntFromValue(value)
case "input_tokens":
event.InputTokens = int(value.GetIntValue())
event.InputTokens = getIntFromValue(value)
case "output_tokens":
event.OutputTokens = int(value.GetIntValue())
event.OutputTokens = getIntFromValue(value)
case "cache_read_tokens":
event.CacheReadTokens = int(value.GetIntValue())
event.CacheReadTokens = getIntFromValue(value)
case "cache_creation_tokens":
event.CacheCreationTokens = int(value.GetIntValue())
event.CacheCreationTokens = getIntFromValue(value)
case "tool_name":
event.ToolName = value.GetStringValue()
case "success":
Expand All @@ -307,7 +372,7 @@ func (p *CCOtelProcessor) parseLogRecord(lr *logsv1.LogRecord) *model.CCOtelEven
case "error":
event.Error = value.GetStringValue()
case "prompt_length":
event.PromptLength = int(value.GetIntValue())
event.PromptLength = getIntFromValue(value)
case "prompt":
event.Prompt = value.GetStringValue()
case "tool_parameters":
Expand All @@ -321,11 +386,26 @@ func (p *CCOtelProcessor) parseLogRecord(lr *logsv1.LogRecord) *model.CCOtelEven
}
}
case "status_code":
event.StatusCode = int(value.GetIntValue())
event.StatusCode = getIntFromValue(value)
case "attempt":
event.Attempt = int(value.GetIntValue())
event.Attempt = getIntFromValue(value)
case "language":
event.Language = value.GetStringValue()
// Log record level attributes that override resource attrs
case "user.id":
event.UserID = value.GetStringValue()
case "user.email":
event.UserEmail = value.GetStringValue()
case "session.id":
event.SessionID = value.GetStringValue()
case "app.version":
event.AppVersion = value.GetStringValue()
case "organization.id":
event.OrganizationID = value.GetStringValue()
case "user.account_uuid":
event.UserAccountUUID = value.GetStringValue()
case "terminal.type":
event.TerminalType = value.GetStringValue()
}
}

Expand Down Expand Up @@ -391,6 +471,36 @@ func getDataPointValue(dp *metricsv1.NumberDataPoint) float64 {
}
}

// getIntFromValue extracts an int from an OTEL value, handling both int and string formats
func getIntFromValue(value *commonv1.AnyValue) int {
// First try to get as int
if intVal := value.GetIntValue(); intVal != 0 {
return int(intVal)
}
// Try to parse from string (Claude Code sends some values as strings)
if strVal := value.GetStringValue(); strVal != "" {
if parsed, err := strconv.Atoi(strVal); err == nil {
return parsed
}
}
return 0
}
Comment on lines +475 to +487
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of getIntFromValue relies on checking for a non-zero value from value.GetIntValue(), which is not a robust way to determine the type of a commonv1.AnyValue. This can be ambiguous if 0 is a valid integer value. The idiomatic and safer approach for handling protobuf oneof fields is to use a type switch on the Value field. This makes the code cleaner, more readable, and less prone to breaking with future library changes. A similar improvement can be applied to getFloatFromValue.

Suggested change
func getIntFromValue(value *commonv1.AnyValue) int {
// First try to get as int
if intVal := value.GetIntValue(); intVal != 0 {
return int(intVal)
}
// Try to parse from string (Claude Code sends some values as strings)
if strVal := value.GetStringValue(); strVal != "" {
if parsed, err := strconv.Atoi(strVal); err == nil {
return parsed
}
}
return 0
}
func getIntFromValue(value *commonv1.AnyValue) int {
if value == nil {
return 0
}
switch v := value.Value.(type) {
case *commonv1.AnyValue_IntValue:
return int(v.IntValue)
case *commonv1.AnyValue_StringValue:
if parsed, err := strconv.Atoi(v.StringValue); err == nil {
return parsed
}
}
return 0
}


// getFloatFromValue extracts a float64 from an OTEL value, handling both double and string formats
func getFloatFromValue(value *commonv1.AnyValue) float64 {
// First try to get as double
if doubleVal := value.GetDoubleValue(); doubleVal != 0 {
return doubleVal
}
// Try to parse from string (Claude Code sends some values as strings)
if strVal := value.GetStringValue(); strVal != "" {
if parsed, err := strconv.ParseFloat(strVal, 64); err == nil {
return parsed
}
}
return 0
}

// applyMetricAttribute applies an attribute to a metric
func applyMetricAttribute(metric *model.CCOtelMetric, attr *commonv1.KeyValue, metricType string) {
key := attr.GetKey()
Expand All @@ -411,5 +521,27 @@ func applyMetricAttribute(metric *model.CCOtelMetric, attr *commonv1.KeyValue, m
metric.Decision = value.GetStringValue()
case "language":
metric.Language = value.GetStringValue()
// Resource attributes at data point level - apply them (override if already set from resource)
case "session.id":
metric.SessionID = value.GetStringValue()
case "user.account_uuid":
metric.UserAccountUUID = value.GetStringValue()
case "organization.id":
metric.OrganizationID = value.GetStringValue()
case "terminal.type":
metric.TerminalType = value.GetStringValue()
case "app.version":
metric.AppVersion = value.GetStringValue()
case "os.type":
metric.OSType = value.GetStringValue()
case "os.version":
metric.OSVersion = value.GetStringValue()
case "host.arch":
metric.HostArch = value.GetStringValue()
// Additional identifiers at data point level
case "user.id":
metric.UserID = value.GetStringValue()
case "user.email":
metric.UserEmail = value.GetStringValue()
}
}
Loading
Loading