Skip to content
Empty file modified brain/config.go
100644 → 100755
Empty file.
Empty file modified brain/guard.go
100644 → 100755
Empty file.
Empty file modified brain/llm/metrics.go
100644 → 100755
Empty file.
99 changes: 99 additions & 0 deletions brain/visual.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package brain

import (
"fmt"

"github.com/hrygo/hotplex/provider"
)

// Visualizer translates provider events into human-readable messages for UI display.
type Visualizer struct{}

// NewVisualizer creates a new Visualizer.
func NewVisualizer() *Visualizer {
return &Visualizer{}
}

// TranslateEvent translates a provider event into a human-readable message.
// It uses session context to provide more accurate translations.
func (v *Visualizer) TranslateEvent(evt *provider.ProviderEvent, platform, taskType string) string {
switch evt.Type {
case provider.EventTypeToolUse:
return v.translateToolUse(evt, platform)
case provider.EventTypeThinking:
return v.translateThinking(evt, platform)
case provider.EventTypeResult:
return v.translateResult(evt)
case provider.EventTypeError:
return v.translateError(evt)
default:
return ""
}
}

// translateToolUse translates a tool use event.
func (v *Visualizer) translateToolUse(evt *provider.ProviderEvent, platform string) string {
toolName := evt.ToolName
if toolName == "" {
toolName = "unknown tool"
}

switch platform {
case "slack":
return fmt.Sprintf("🔧 Executing %s...", toolName)
case "feishu":
return fmt.Sprintf("🔧 正在执行 %s...", toolName)
default:
return fmt.Sprintf("Executing %s...", toolName)
}
}

// translateThinking translates a thinking event.
func (v *Visualizer) translateThinking(evt *provider.ProviderEvent, platform string) string {
switch platform {
case "slack":
return "🤔 Thinking..."
case "feishu":
return "🤔 思考中..."
default:
return "Thinking..."
}
}

// translateResult translates a result event.
func (v *Visualizer) translateResult(evt *provider.ProviderEvent) string {
if evt.Metadata != nil && evt.Metadata.TotalDurationMs > 0 {
return fmt.Sprintf("✓ Completed in %dms", evt.Metadata.TotalDurationMs)
}
return "✓ Completed"
}

// translateError translates an error event.
func (v *Visualizer) translateError(evt *provider.ProviderEvent) string {
errMsg := evt.Error
if errMsg == "" {
return "❌ An error occurred"
}
if len(errMsg) > 100 {
errMsg = errMsg[:100] + "..."
}
return fmt.Sprintf("❌ Error: %s", errMsg)
}

// GetTaskTypeSummary returns a brief summary of the task type for display.
func (v *Visualizer) GetTaskTypeSummary(taskType string) string {
switch taskType {
case "code":
return "💻 Code"
case "chat":
return "💬 Chat"
case "analysis":
return "📊 Analysis"
case "debug":
return "🐛 Debug"
case "git":
return "🔀 Git"
default:
return "❓ Unknown"
}
}
58 changes: 58 additions & 0 deletions chatapps/engine_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -15,6 +16,7 @@ import (
intengine "github.com/hrygo/hotplex/internal/engine"
"github.com/hrygo/hotplex/provider"
"github.com/hrygo/hotplex/types"
"github.com/hrygo/hotplex/internal/trace"
)

const (
Expand Down Expand Up @@ -1422,6 +1424,26 @@ func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) err
TaskInstructions: fullInstructions,
}

// Fill observability fields from message context
cfg.Platform = msg.Platform
cfg.UserID = msg.UserID
if channelID, ok := msg.Metadata["channel_id"].(string); ok {
cfg.ChannelID = channelID
}
if teamID, ok := msg.Metadata["team_id"].(string); ok {
cfg.TeamID = teamID
}
// Task type detection based on prompt content
cfg.TaskType = detectTaskType(msg.Content)
// Generate trace ID for distributed tracing
cfg.TraceID = trace.GenerateSimple()
// Summarize first prompt for debugging
if len(msg.Content) > 100 {
cfg.PromptSummary = msg.Content[:100] + "..."
} else {
cfg.PromptSummary = msg.Content
}

// Get platform-specific operations from AdapterManager
messageOps := h.adapters.GetMessageOperations(msg.Platform)
sessionOps := h.adapters.GetSessionOperations(msg.Platform)
Expand Down Expand Up @@ -1737,3 +1759,39 @@ func formatDataLength(bytes int64) string {
}
return fmt.Sprintf("%d bytes", bytes)
}

// Task type detection patterns
var (
codePatterns = regexp.MustCompile(`(?i)(write|create|fix|debug|implement|refactor|add|remove|update|code|function|class|method|script|api|endpoint|query|sql|test|spec|deploy|build|compile|run|execute)`)
gitPatterns = regexp.MustCompile(`(?i)(git|commit|push|pull|merge|branch|checkout|rebase|clone|fetch|status|log|diff|reset|cherry|tag|stash|init)`)
debugPatterns = regexp.MustCompile(`(?i)(debug|error|bug|exception|stack|trace|issue|problem|fail|crash|broken|fix|repair)`)
analysisPatterns = regexp.MustCompile(`(?i)(analyze|analyse|review|explain|describe|what|how|why|compare|summary|extract|parse|crawl|fetch|fetch|get|list|search|find|look)`)
)

// detectTaskType detects the task type based on prompt content
func detectTaskType(prompt string) string {
lowerPrompt := strings.ToLower(prompt)

// Check for code-related tasks
if codePatterns.MatchString(lowerPrompt) && !gitPatterns.MatchString(lowerPrompt) {
return "code"
}

// Check for git tasks
if gitPatterns.MatchString(lowerPrompt) {
return "git"
}

// Check for debug tasks
if debugPatterns.MatchString(lowerPrompt) {
return "debug"
}

// Check for analysis/review tasks
if analysisPatterns.MatchString(lowerPrompt) {
return "analysis"
}

// Default to chat
return "chat"
}
Empty file modified chatapps/slack/builder.go
100644 → 100755
Empty file.
Empty file modified chatapps/slack/streaming_writer.go
100644 → 100755
Empty file.
12 changes: 10 additions & 2 deletions engine/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ func (r *Engine) Execute(ctx context.Context, cfg *types.Config, prompt string,

r.logger.Info("Engine: starting execution pipeline",
"namespace", r.opts.Namespace,
"session_id", cfg.SessionID)
"session_id", cfg.SessionID,
"platform", cfg.Platform,
"user_id", cfg.UserID,
"channel_id", cfg.ChannelID,
"task_type", cfg.TaskType,
"trace_id", cfg.TraceID)

// Execute via multiplexed persistent session
if err := r.executeWithMultiplex(ctx, cfg, prompt, callback); err != nil {
Expand All @@ -168,7 +173,10 @@ func (r *Engine) Execute(ctx context.Context, cfg *types.Config, prompt string,

r.logger.Info("Engine: Session completed",
"namespace", r.opts.Namespace,
"session_id", cfg.SessionID)
"session_id", cfg.SessionID,
"platform", cfg.Platform,
"task_type", cfg.TaskType,
"trace_id", cfg.TraceID)

return nil
}
Expand Down
Empty file modified internal/secrets/provider.go
100644 → 100755
Empty file.
73 changes: 72 additions & 1 deletion internal/telemetry/metrics.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type Metrics struct {
dangersBlocked int64
requestDuration time.Duration

// Dimensioned metrics (platform, task_type)
sessionDurationBuckets map[sessionKey]durationBucket
sessionTurns map[sessionKey]int64
sessionErrors map[sessionKey]int64
toolsInvokedByType map[sessionKey]int64
sessionTokens map[sessionKey]int64

// Slack permission metrics
slackPermissionAllowed int64
slackPermissionBlockedUser int64
Expand All @@ -23,6 +30,17 @@ type Metrics struct {
mu sync.RWMutex
}

type sessionKey struct {
platform string
taskType string
direction string // input/output for tokens
}

type durationBucket struct {
sum time.Duration
count int64
}

var (
globalMetrics *Metrics
globalMetricsMu sync.Once
Expand All @@ -32,7 +50,14 @@ func NewMetrics(logger *slog.Logger) *Metrics {
if logger == nil {
logger = slog.Default()
}
return &Metrics{logger: logger}
return &Metrics{
logger: logger,
sessionDurationBuckets: make(map[sessionKey]durationBucket),
sessionTurns: make(map[sessionKey]int64),
sessionErrors: make(map[sessionKey]int64),
toolsInvokedByType: make(map[sessionKey]int64),
sessionTokens: make(map[sessionKey]int64),
}
}

func (m *Metrics) IncSessionsActive() {
Expand Down Expand Up @@ -134,6 +159,52 @@ func (m *Metrics) IncSlackPermissionBlockedMention() {
m.mu.Unlock()
}

// Dimensioned Metrics Methods

// RecordSessionDuration records session duration with platform and task_type dimensions.
func (m *Metrics) RecordSessionDuration(platform, taskType string, duration time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
key := sessionKey{platform: platform, taskType: taskType}
bucket := m.sessionDurationBuckets[key]
bucket.sum += duration
bucket.count++
m.sessionDurationBuckets[key] = bucket
}

// IncSessionTurns increments turn count with platform and task_type dimensions.
func (m *Metrics) IncSessionTurns(platform, taskType string) {
m.mu.Lock()
defer m.mu.Unlock()
key := sessionKey{platform: platform, taskType: taskType}
m.sessionTurns[key]++
}

// IncSessionErrorsByType increments error count with platform and task_type dimensions.
func (m *Metrics) IncSessionErrorsByType(platform, taskType string) {
m.mu.Lock()
defer m.mu.Unlock()
key := sessionKey{platform: platform, taskType: taskType}
m.sessionErrors[key]++
}

// IncToolsInvokedByType increments tools invoked with platform and task_type dimensions.
func (m *Metrics) IncToolsInvokedByType(platform, taskType string) {
m.mu.Lock()
defer m.mu.Unlock()
key := sessionKey{platform: platform, taskType: taskType}
m.toolsInvokedByType[key]++
}

// RecordTokens records token consumption with platform and task_type dimensions.
// direction should be "input" or "output".
func (m *Metrics) RecordTokens(platform, taskType, direction string, tokens int64) {
m.mu.Lock()
defer m.mu.Unlock()
key := sessionKey{platform: platform, taskType: taskType, direction: direction}
m.sessionTokens[key] += tokens
}

func InitMetrics(logger *slog.Logger) {
globalMetrics = NewMetrics(logger)
}
Expand Down
44 changes: 44 additions & 0 deletions internal/trace/traceid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package trace

import (
"crypto/rand"
"encoding/hex"
"strings"
"time"
)

// Generator generates trace IDs for distributed tracing.
type Generator struct {
prefix string
}

// New creates a new trace ID generator with optional prefix.
func New(prefix string) *Generator {
if prefix == "" {
prefix = "hotplex"
}
return &Generator{prefix: prefix}
}

// Generate generates a new trace ID.
// Format: {prefix}-{timestamp}-{random}
// Example: hotplex-6076b8cb-8a2f4d3e
func (g *Generator) Generate() string {
// Get current timestamp in hex (8 chars)
timestamp := time.Now().UnixNano() >> 20 // Shift to get ~8 hex chars

// Generate 8 random bytes (16 hex chars)
randomBytes := make([]byte, 8)
rand.Read(randomBytes)
randomHex := hex.EncodeToString(randomBytes)[:8]

ts := strings.TrimLeft(hex.EncodeToString([]byte{byte(timestamp >> 24), byte(timestamp >> 16), byte(timestamp >> 8), byte(timestamp)}), "0")
return strings.Join([]string{g.prefix, ts, randomHex}, "-")
}

// GenerateSimple generates a simple 16-character hex trace ID.
func GenerateSimple() string {
bytes := make([]byte, 8)
rand.Read(bytes)
return hex.EncodeToString(bytes)[:16]
}
10 changes: 8 additions & 2 deletions plugins/storage/config.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func ExportToJSON(store ChatAppMessageStore, outputPath string, query *MessageQu
return fmt.Errorf("failed to marshal messages: %w", err)
}

return os.WriteFile(outputPath, data, 0644)
if err := os.WriteFile(outputPath, data, 0644); err != nil {
return fmt.Errorf("failed to write export file: %w", err)
}
return nil
}

// ImportFromJSON 从 JSON 导入消息
Expand Down Expand Up @@ -129,5 +132,8 @@ func BackupStorage(store ChatAppMessageStore, backupPath string) error {
return fmt.Errorf("failed to marshal backup: %w", err)
}

return os.WriteFile(backupPath, data, 0644)
if err := os.WriteFile(backupPath, data, 0644); err != nil {
return fmt.Errorf("failed to write backup file: %w", err)
}
return nil
}
Empty file modified plugins/storage/interface.go
100644 → 100755
Empty file.
Empty file modified provider/claude_provider.go
100644 → 100755
Empty file.
Empty file modified provider/pi_provider.go
100644 → 100755
Empty file.
Empty file modified provider/pi_provider_test.go
100644 → 100755
Empty file.
Empty file modified provider/provider.go
100644 → 100755
Empty file.
9 changes: 9 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,13 @@ type Config struct {
SessionID string // Unique identifier used to route the request to a persistent process in the pool
TaskInstructions string // Per-task instructions or objective prepended to the user prompt
WAFApproved bool // When true, Engine skips WAF check (already approved by chatapps layer)

// SessionContext for observability
Platform string // slack/feishu/discord/telegram
UserID string // User identifier
ChannelID string // Source channel/group
TeamID string // Team/workspace identifier
TaskType string // code/chat/analysis/debug/git
TraceID string // OpenTelemetry TraceID
PromptSummary string // First prompt summary
}
Loading