diff --git a/brain/config.go b/brain/config.go old mode 100644 new mode 100755 diff --git a/brain/guard.go b/brain/guard.go old mode 100644 new mode 100755 diff --git a/brain/llm/metrics.go b/brain/llm/metrics.go old mode 100644 new mode 100755 diff --git a/brain/visual.go b/brain/visual.go new file mode 100644 index 00000000..ba60e327 --- /dev/null +++ b/brain/visual.go @@ -0,0 +1,99 @@ +package brain + +import ( + "fmt" + + "github.com/hrygo/hotplex/provider" +) + +// Visualizer translates provider events into human-readable messages for UI display. +type Visualizer struct{} + +// NewVisualizer creates a new Visualizer. +func NewVisualizer() *Visualizer { + return &Visualizer{} +} + +// TranslateEvent translates a provider event into a human-readable message. +// It uses session context to provide more accurate translations. +func (v *Visualizer) TranslateEvent(evt *provider.ProviderEvent, platform, taskType string) string { + switch evt.Type { + case provider.EventTypeToolUse: + return v.translateToolUse(evt, platform) + case provider.EventTypeThinking: + return v.translateThinking(evt, platform) + case provider.EventTypeResult: + return v.translateResult(evt) + case provider.EventTypeError: + return v.translateError(evt) + default: + return "" + } +} + +// translateToolUse translates a tool use event. +func (v *Visualizer) translateToolUse(evt *provider.ProviderEvent, platform string) string { + toolName := evt.ToolName + if toolName == "" { + toolName = "unknown tool" + } + + switch platform { + case "slack": + return fmt.Sprintf("πŸ”§ Executing %s...", toolName) + case "feishu": + return fmt.Sprintf("πŸ”§ ζ­£εœ¨ζ‰§θ‘Œ %s...", toolName) + default: + return fmt.Sprintf("Executing %s...", toolName) + } +} + +// translateThinking translates a thinking event. +func (v *Visualizer) translateThinking(evt *provider.ProviderEvent, platform string) string { + switch platform { + case "slack": + return "πŸ€” Thinking..." + case "feishu": + return "πŸ€” 思考中..." + default: + return "Thinking..." + } +} + +// translateResult translates a result event. +func (v *Visualizer) translateResult(evt *provider.ProviderEvent) string { + if evt.Metadata != nil && evt.Metadata.TotalDurationMs > 0 { + return fmt.Sprintf("βœ“ Completed in %dms", evt.Metadata.TotalDurationMs) + } + return "βœ“ Completed" +} + +// translateError translates an error event. +func (v *Visualizer) translateError(evt *provider.ProviderEvent) string { + errMsg := evt.Error + if errMsg == "" { + return "❌ An error occurred" + } + if len(errMsg) > 100 { + errMsg = errMsg[:100] + "..." + } + return fmt.Sprintf("❌ Error: %s", errMsg) +} + +// GetTaskTypeSummary returns a brief summary of the task type for display. +func (v *Visualizer) GetTaskTypeSummary(taskType string) string { + switch taskType { + case "code": + return "πŸ’» Code" + case "chat": + return "πŸ’¬ Chat" + case "analysis": + return "πŸ“Š Analysis" + case "debug": + return "πŸ› Debug" + case "git": + return "πŸ”€ Git" + default: + return "❓ Unknown" + } +} diff --git a/chatapps/engine_handler.go b/chatapps/engine_handler.go index 1b6b61c1..f7dd30ae 100644 --- a/chatapps/engine_handler.go +++ b/chatapps/engine_handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "regexp" "strings" "sync" "time" @@ -15,6 +16,7 @@ import ( intengine "github.com/hrygo/hotplex/internal/engine" "github.com/hrygo/hotplex/provider" "github.com/hrygo/hotplex/types" + "github.com/hrygo/hotplex/internal/trace" ) const ( @@ -1422,6 +1424,26 @@ func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) err TaskInstructions: fullInstructions, } + // Fill observability fields from message context + cfg.Platform = msg.Platform + cfg.UserID = msg.UserID + if channelID, ok := msg.Metadata["channel_id"].(string); ok { + cfg.ChannelID = channelID + } + if teamID, ok := msg.Metadata["team_id"].(string); ok { + cfg.TeamID = teamID + } + // Task type detection based on prompt content + cfg.TaskType = detectTaskType(msg.Content) + // Generate trace ID for distributed tracing + cfg.TraceID = trace.GenerateSimple() + // Summarize first prompt for debugging + if len(msg.Content) > 100 { + cfg.PromptSummary = msg.Content[:100] + "..." + } else { + cfg.PromptSummary = msg.Content + } + // Get platform-specific operations from AdapterManager messageOps := h.adapters.GetMessageOperations(msg.Platform) sessionOps := h.adapters.GetSessionOperations(msg.Platform) @@ -1737,3 +1759,39 @@ func formatDataLength(bytes int64) string { } return fmt.Sprintf("%d bytes", bytes) } + +// Task type detection patterns +var ( + codePatterns = regexp.MustCompile(`(?i)(write|create|fix|debug|implement|refactor|add|remove|update|code|function|class|method|script|api|endpoint|query|sql|test|spec|deploy|build|compile|run|execute)`) + gitPatterns = regexp.MustCompile(`(?i)(git|commit|push|pull|merge|branch|checkout|rebase|clone|fetch|status|log|diff|reset|cherry|tag|stash|init)`) + debugPatterns = regexp.MustCompile(`(?i)(debug|error|bug|exception|stack|trace|issue|problem|fail|crash|broken|fix|repair)`) + analysisPatterns = regexp.MustCompile(`(?i)(analyze|analyse|review|explain|describe|what|how|why|compare|summary|extract|parse|crawl|fetch|fetch|get|list|search|find|look)`) +) + +// detectTaskType detects the task type based on prompt content +func detectTaskType(prompt string) string { + lowerPrompt := strings.ToLower(prompt) + + // Check for code-related tasks + if codePatterns.MatchString(lowerPrompt) && !gitPatterns.MatchString(lowerPrompt) { + return "code" + } + + // Check for git tasks + if gitPatterns.MatchString(lowerPrompt) { + return "git" + } + + // Check for debug tasks + if debugPatterns.MatchString(lowerPrompt) { + return "debug" + } + + // Check for analysis/review tasks + if analysisPatterns.MatchString(lowerPrompt) { + return "analysis" + } + + // Default to chat + return "chat" +} diff --git a/chatapps/slack/builder.go b/chatapps/slack/builder.go old mode 100644 new mode 100755 diff --git a/chatapps/slack/streaming_writer.go b/chatapps/slack/streaming_writer.go old mode 100644 new mode 100755 diff --git a/engine/runner.go b/engine/runner.go index 86e9af73..ea008f50 100644 --- a/engine/runner.go +++ b/engine/runner.go @@ -155,7 +155,12 @@ func (r *Engine) Execute(ctx context.Context, cfg *types.Config, prompt string, r.logger.Info("Engine: starting execution pipeline", "namespace", r.opts.Namespace, - "session_id", cfg.SessionID) + "session_id", cfg.SessionID, + "platform", cfg.Platform, + "user_id", cfg.UserID, + "channel_id", cfg.ChannelID, + "task_type", cfg.TaskType, + "trace_id", cfg.TraceID) // Execute via multiplexed persistent session if err := r.executeWithMultiplex(ctx, cfg, prompt, callback); err != nil { @@ -168,7 +173,10 @@ func (r *Engine) Execute(ctx context.Context, cfg *types.Config, prompt string, r.logger.Info("Engine: Session completed", "namespace", r.opts.Namespace, - "session_id", cfg.SessionID) + "session_id", cfg.SessionID, + "platform", cfg.Platform, + "task_type", cfg.TaskType, + "trace_id", cfg.TraceID) return nil } diff --git a/internal/secrets/provider.go b/internal/secrets/provider.go old mode 100644 new mode 100755 diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go old mode 100644 new mode 100755 index ba37e770..49d9e2e7 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -15,6 +15,13 @@ type Metrics struct { dangersBlocked int64 requestDuration time.Duration + // Dimensioned metrics (platform, task_type) + sessionDurationBuckets map[sessionKey]durationBucket + sessionTurns map[sessionKey]int64 + sessionErrors map[sessionKey]int64 + toolsInvokedByType map[sessionKey]int64 + sessionTokens map[sessionKey]int64 + // Slack permission metrics slackPermissionAllowed int64 slackPermissionBlockedUser int64 @@ -23,6 +30,17 @@ type Metrics struct { mu sync.RWMutex } +type sessionKey struct { + platform string + taskType string + direction string // input/output for tokens +} + +type durationBucket struct { + sum time.Duration + count int64 +} + var ( globalMetrics *Metrics globalMetricsMu sync.Once @@ -32,7 +50,14 @@ func NewMetrics(logger *slog.Logger) *Metrics { if logger == nil { logger = slog.Default() } - return &Metrics{logger: logger} + return &Metrics{ + logger: logger, + sessionDurationBuckets: make(map[sessionKey]durationBucket), + sessionTurns: make(map[sessionKey]int64), + sessionErrors: make(map[sessionKey]int64), + toolsInvokedByType: make(map[sessionKey]int64), + sessionTokens: make(map[sessionKey]int64), + } } func (m *Metrics) IncSessionsActive() { @@ -134,6 +159,52 @@ func (m *Metrics) IncSlackPermissionBlockedMention() { m.mu.Unlock() } +// Dimensioned Metrics Methods + +// RecordSessionDuration records session duration with platform and task_type dimensions. +func (m *Metrics) RecordSessionDuration(platform, taskType string, duration time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + key := sessionKey{platform: platform, taskType: taskType} + bucket := m.sessionDurationBuckets[key] + bucket.sum += duration + bucket.count++ + m.sessionDurationBuckets[key] = bucket +} + +// IncSessionTurns increments turn count with platform and task_type dimensions. +func (m *Metrics) IncSessionTurns(platform, taskType string) { + m.mu.Lock() + defer m.mu.Unlock() + key := sessionKey{platform: platform, taskType: taskType} + m.sessionTurns[key]++ +} + +// IncSessionErrorsByType increments error count with platform and task_type dimensions. +func (m *Metrics) IncSessionErrorsByType(platform, taskType string) { + m.mu.Lock() + defer m.mu.Unlock() + key := sessionKey{platform: platform, taskType: taskType} + m.sessionErrors[key]++ +} + +// IncToolsInvokedByType increments tools invoked with platform and task_type dimensions. +func (m *Metrics) IncToolsInvokedByType(platform, taskType string) { + m.mu.Lock() + defer m.mu.Unlock() + key := sessionKey{platform: platform, taskType: taskType} + m.toolsInvokedByType[key]++ +} + +// RecordTokens records token consumption with platform and task_type dimensions. +// direction should be "input" or "output". +func (m *Metrics) RecordTokens(platform, taskType, direction string, tokens int64) { + m.mu.Lock() + defer m.mu.Unlock() + key := sessionKey{platform: platform, taskType: taskType, direction: direction} + m.sessionTokens[key] += tokens +} + func InitMetrics(logger *slog.Logger) { globalMetrics = NewMetrics(logger) } diff --git a/internal/trace/traceid.go b/internal/trace/traceid.go new file mode 100644 index 00000000..56a30ed1 --- /dev/null +++ b/internal/trace/traceid.go @@ -0,0 +1,44 @@ +package trace + +import ( + "crypto/rand" + "encoding/hex" + "strings" + "time" +) + +// Generator generates trace IDs for distributed tracing. +type Generator struct { + prefix string +} + +// New creates a new trace ID generator with optional prefix. +func New(prefix string) *Generator { + if prefix == "" { + prefix = "hotplex" + } + return &Generator{prefix: prefix} +} + +// Generate generates a new trace ID. +// Format: {prefix}-{timestamp}-{random} +// Example: hotplex-6076b8cb-8a2f4d3e +func (g *Generator) Generate() string { + // Get current timestamp in hex (8 chars) + timestamp := time.Now().UnixNano() >> 20 // Shift to get ~8 hex chars + + // Generate 8 random bytes (16 hex chars) + randomBytes := make([]byte, 8) + rand.Read(randomBytes) + randomHex := hex.EncodeToString(randomBytes)[:8] + + ts := strings.TrimLeft(hex.EncodeToString([]byte{byte(timestamp >> 24), byte(timestamp >> 16), byte(timestamp >> 8), byte(timestamp)}), "0") + return strings.Join([]string{g.prefix, ts, randomHex}, "-") +} + +// GenerateSimple generates a simple 16-character hex trace ID. +func GenerateSimple() string { + bytes := make([]byte, 8) + rand.Read(bytes) + return hex.EncodeToString(bytes)[:16] +} diff --git a/plugins/storage/config.go b/plugins/storage/config.go old mode 100644 new mode 100755 index cc713aeb..506566bb --- a/plugins/storage/config.go +++ b/plugins/storage/config.go @@ -70,7 +70,10 @@ func ExportToJSON(store ChatAppMessageStore, outputPath string, query *MessageQu return fmt.Errorf("failed to marshal messages: %w", err) } - return os.WriteFile(outputPath, data, 0644) + if err := os.WriteFile(outputPath, data, 0644); err != nil { + return fmt.Errorf("failed to write export file: %w", err) + } + return nil } // ImportFromJSON 从 JSON ε―Όε…₯梈息 @@ -129,5 +132,8 @@ func BackupStorage(store ChatAppMessageStore, backupPath string) error { return fmt.Errorf("failed to marshal backup: %w", err) } - return os.WriteFile(backupPath, data, 0644) + if err := os.WriteFile(backupPath, data, 0644); err != nil { + return fmt.Errorf("failed to write backup file: %w", err) + } + return nil } diff --git a/plugins/storage/interface.go b/plugins/storage/interface.go old mode 100644 new mode 100755 diff --git a/provider/claude_provider.go b/provider/claude_provider.go old mode 100644 new mode 100755 diff --git a/provider/pi_provider.go b/provider/pi_provider.go old mode 100644 new mode 100755 diff --git a/provider/pi_provider_test.go b/provider/pi_provider_test.go old mode 100644 new mode 100755 diff --git a/provider/provider.go b/provider/provider.go old mode 100644 new mode 100755 diff --git a/types/types.go b/types/types.go index c3702eeb..aee78d5b 100644 --- a/types/types.go +++ b/types/types.go @@ -109,4 +109,13 @@ type Config struct { SessionID string // Unique identifier used to route the request to a persistent process in the pool TaskInstructions string // Per-task instructions or objective prepended to the user prompt WAFApproved bool // When true, Engine skips WAF check (already approved by chatapps layer) + + // SessionContext for observability + Platform string // slack/feishu/discord/telegram + UserID string // User identifier + ChannelID string // Source channel/group + TeamID string // Team/workspace identifier + TaskType string // code/chat/analysis/debug/git + TraceID string // OpenTelemetry TraceID + PromptSummary string // First prompt summary }