From a65e3367e0e8f33186f1f4a5318648a81b397e6d Mon Sep 17 00:00:00 2001 From: AnnatarHe Date: Wed, 24 Dec 2025 02:07:45 +0800 Subject: [PATCH 1/2] feat(daemon): add coding heartbeat tracking with offline persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new socket handler type for coding activity heartbeats (like wakatime). When heartbeats fail to send to the server, they are saved locally and retried every 30 minutes by a background resync service. Changes: - Add HeartbeatPayload/HeartbeatData types mirroring server API - Add SendHeartbeatsToServer() API function - Add SocketMessageTypeHeartbeat handler in daemon - Add HeartbeatResyncService for periodic retry of failed heartbeats - Add CodeTracking config struct (codeTracking.enabled) - Add HEARTBEAT_LOG_FILE path variable Feature is gated by codeTracking.enabled config option. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- cmd/daemon/main.go | 11 +++ daemon/handlers.go | 26 +++-- daemon/handlers.heartbeat.go | 80 ++++++++++++++++ daemon/heartbeat_resync.go | 181 +++++++++++++++++++++++++++++++++++ daemon/socket.go | 19 +++- model/api_heartbeat.go | 35 +++++++ model/db.go | 2 + model/heartbeat.go | 52 ++++++++++ model/types.go | 9 ++ 9 files changed, 406 insertions(+), 9 deletions(-) create mode 100644 daemon/handlers.heartbeat.go create mode 100644 daemon/heartbeat_resync.go create mode 100644 model/api_heartbeat.go create mode 100644 model/heartbeat.go diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 7f7bff9..cf4c02f 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -105,6 +105,17 @@ func main() { } } + // Start heartbeat resync service if codeTracking is enabled + if cfg.CodeTracking != nil && cfg.CodeTracking.Enabled != nil && *cfg.CodeTracking.Enabled { + heartbeatResyncService := daemon.NewHeartbeatResyncService(cfg) + if err := heartbeatResyncService.Start(ctx); err != nil { + slog.Error("Failed to start heartbeat resync service", slog.Any("err", err)) + } else { + slog.Info("Heartbeat resync service started") + defer heartbeatResyncService.Stop() + } + } + // Create processor instance processor := daemon.NewSocketHandler(&cfg, pubsub) diff --git a/daemon/handlers.go b/daemon/handlers.go index a8310a7..0f24f11 100644 --- a/daemon/handlers.go +++ b/daemon/handlers.go @@ -17,16 +17,26 @@ func SocketTopicProccessor(messages <-chan *message.Message) { if err := json.Unmarshal(msg.Payload, &socketMsg); err != nil { slog.ErrorContext(ctx, "failed to parse socket message", slog.Any("err", err)) msg.Nack() + continue } - if socketMsg.Type == SocketMessageTypeSync { - err := handlePubSubSync(ctx, socketMsg.Payload) - if err != nil { - slog.ErrorContext(ctx, "failed to parse socket message", slog.Any("err", err)) - msg.Nack() - } else { - msg.Ack() - } + var err error + switch socketMsg.Type { + case SocketMessageTypeSync: + err = handlePubSubSync(ctx, socketMsg.Payload) + case SocketMessageTypeHeartbeat: + err = handlePubSubHeartbeat(ctx, socketMsg.Payload) + default: + slog.ErrorContext(ctx, "unknown socket message type", slog.String("type", string(socketMsg.Type))) + msg.Nack() + continue + } + + if err != nil { + slog.ErrorContext(ctx, "failed to handle socket message", slog.Any("err", err), slog.String("type", string(socketMsg.Type))) + msg.Nack() + } else { + msg.Ack() } } } diff --git a/daemon/handlers.heartbeat.go b/daemon/handlers.heartbeat.go new file mode 100644 index 0000000..a5ed501 --- /dev/null +++ b/daemon/handlers.heartbeat.go @@ -0,0 +1,80 @@ +package daemon + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + + "github.com/malamtime/cli/model" +) + +func handlePubSubHeartbeat(ctx context.Context, socketMsgPayload interface{}) error { + pb, err := json.Marshal(socketMsgPayload) + if err != nil { + slog.Error("Failed to marshal the heartbeat payload again for unmarshal", slog.Any("payload", socketMsgPayload)) + return err + } + + var heartbeatPayload model.HeartbeatPayload + err = json.Unmarshal(pb, &heartbeatPayload) + if err != nil { + slog.Error("Failed to parse heartbeat payload", slog.Any("payload", socketMsgPayload)) + return err + } + + if len(heartbeatPayload.Heartbeats) == 0 { + slog.Debug("Empty heartbeat payload, skipping") + return nil + } + + cfg, err := stConfig.ReadConfigFile(ctx) + if err != nil { + slog.Error("Failed to read config file", slog.Any("err", err)) + return err + } + + // Try to send to server + err = model.SendHeartbeatsToServer(ctx, cfg, heartbeatPayload) + if err != nil { + slog.Warn("Failed to send heartbeats to server, saving to local file", slog.Any("err", err)) + // On failure, save to local file + if saveErr := saveHeartbeatToFile(heartbeatPayload); saveErr != nil { + slog.Error("Failed to save heartbeat to local file", slog.Any("err", saveErr)) + return saveErr + } + // Return nil because we saved the data locally - don't nack the message + return nil + } + + slog.Info("Successfully sent heartbeats to server", slog.Int("count", len(heartbeatPayload.Heartbeats))) + return nil +} + +// saveHeartbeatToFile appends a heartbeat payload as a single JSON line to the log file +func saveHeartbeatToFile(payload model.HeartbeatPayload) error { + logFilePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.HEARTBEAT_LOG_FILE)) + + // Open file for appending, create if not exists + file, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open heartbeat log file: %w", err) + } + defer file.Close() + + // Marshal payload to JSON + data, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal heartbeat payload: %w", err) + } + + // Write as single line with newline + _, err = file.Write(append(data, '\n')) + if err != nil { + return fmt.Errorf("failed to write heartbeat to file: %w", err) + } + + slog.Debug("Saved heartbeat to local file", slog.String("path", logFilePath), slog.Int("count", len(payload.Heartbeats))) + return nil +} diff --git a/daemon/heartbeat_resync.go b/daemon/heartbeat_resync.go new file mode 100644 index 0000000..c94d5fa --- /dev/null +++ b/daemon/heartbeat_resync.go @@ -0,0 +1,181 @@ +package daemon + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "sync" + "time" + + "github.com/malamtime/cli/model" +) + +const ( + // HeartbeatResyncInterval is the interval for retrying failed heartbeats + HeartbeatResyncInterval = 30 * time.Minute +) + +// HeartbeatResyncService handles periodic resync of failed heartbeats +type HeartbeatResyncService struct { + config model.ShellTimeConfig + ticker *time.Ticker + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewHeartbeatResyncService creates a new heartbeat resync service +func NewHeartbeatResyncService(config model.ShellTimeConfig) *HeartbeatResyncService { + return &HeartbeatResyncService{ + config: config, + stopChan: make(chan struct{}), + } +} + +// Start begins the periodic resync job +func (s *HeartbeatResyncService) Start(ctx context.Context) error { + s.ticker = time.NewTicker(HeartbeatResyncInterval) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + // Run once at startup + s.resync(ctx) + + for { + select { + case <-s.ticker.C: + s.resync(ctx) + case <-s.stopChan: + return + case <-ctx.Done(): + return + } + } + }() + + slog.Info("Heartbeat resync service started", slog.Duration("interval", HeartbeatResyncInterval)) + return nil +} + +// Stop stops the resync service +func (s *HeartbeatResyncService) Stop() { + if s.ticker != nil { + s.ticker.Stop() + } + close(s.stopChan) + s.wg.Wait() + slog.Info("Heartbeat resync service stopped") +} + +// resync reads failed heartbeats from the log file and attempts to send them +func (s *HeartbeatResyncService) resync(ctx context.Context) { + logFilePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.HEARTBEAT_LOG_FILE)) + + // Check if file exists + if _, err := os.Stat(logFilePath); os.IsNotExist(err) { + slog.Debug("No heartbeat log file found, nothing to resync") + return + } + + // Read the file + file, err := os.Open(logFilePath) + if err != nil { + slog.Error("Failed to open heartbeat log file for resync", slog.Any("err", err)) + return + } + + var lines []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + lines = append(lines, line) + } + } + file.Close() + + if err := scanner.Err(); err != nil { + slog.Error("Error reading heartbeat log file", slog.Any("err", err)) + return + } + + if len(lines) == 0 { + slog.Debug("No failed heartbeats to resync") + return + } + + slog.Info("Starting heartbeat resync", slog.Int("pendingCount", len(lines))) + + // Process each line + var failedLines []string + successCount := 0 + + for _, line := range lines { + var payload model.HeartbeatPayload + if err := json.Unmarshal([]byte(line), &payload); err != nil { + slog.Error("Failed to parse heartbeat line, discarding", slog.Any("err", err), slog.String("line", line)) + continue + } + + // Try to send to server + if err := model.SendHeartbeatsToServer(ctx, s.config, payload); err != nil { + slog.Warn("Failed to resync heartbeat, keeping for next retry", slog.Any("err", err)) + failedLines = append(failedLines, line) + } else { + successCount++ + } + } + + // Rewrite the file with only failed lines + if err := s.rewriteLogFile(logFilePath, failedLines); err != nil { + slog.Error("Failed to update heartbeat log file", slog.Any("err", err)) + return + } + + slog.Info("Heartbeat resync completed", + slog.Int("success", successCount), + slog.Int("remaining", len(failedLines))) +} + +// rewriteLogFile atomically rewrites the log file with the given lines +func (s *HeartbeatResyncService) rewriteLogFile(logFilePath string, lines []string) error { + // If no lines remaining, remove the file + if len(lines) == 0 { + if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove empty log file: %w", err) + } + return nil + } + + // Write to temp file first + tempFile := logFilePath + ".tmp" + file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + for _, line := range lines { + if _, err := file.WriteString(line + "\n"); err != nil { + file.Close() + os.Remove(tempFile) + return fmt.Errorf("failed to write to temp file: %w", err) + } + } + + if err := file.Close(); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + // Atomic rename + if err := os.Rename(tempFile, logFilePath); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} diff --git a/daemon/socket.go b/daemon/socket.go index c717d7c..eec5c00 100644 --- a/daemon/socket.go +++ b/daemon/socket.go @@ -14,7 +14,8 @@ import ( type SocketMessageType string const ( - SocketMessageTypeSync SocketMessageType = "sync" + SocketMessageTypeSync SocketMessageType = "sync" + SocketMessageTypeHeartbeat SocketMessageType = "heartbeat" ) type SocketMessage struct { @@ -112,6 +113,22 @@ func (p *SocketHandler) handleConnection(conn net.Conn) { if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { slog.Error("Error to publish topic", slog.Any("err", err)) } + case SocketMessageTypeHeartbeat: + // Only process heartbeat if codeTracking is enabled + if p.config.CodeTracking == nil || p.config.CodeTracking.Enabled == nil || !*p.config.CodeTracking.Enabled { + slog.Debug("Heartbeat message received but codeTracking is disabled, ignoring") + return + } + buf, err := json.Marshal(msg) + if err != nil { + slog.Error("Error encoding heartbeat message", slog.Any("err", err)) + return + } + + chMsg := message.NewMessage(watermill.NewUUID(), buf) + if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { + slog.Error("Error publishing heartbeat topic", slog.Any("err", err)) + } default: slog.Error("Unknown message type:", slog.String("messageType", string(msg.Type))) } diff --git a/model/api_heartbeat.go b/model/api_heartbeat.go new file mode 100644 index 0000000..93622b3 --- /dev/null +++ b/model/api_heartbeat.go @@ -0,0 +1,35 @@ +package model + +import ( + "context" + "net/http" + "time" +) + +// SendHeartbeatsToServer sends heartbeat data to the server +func SendHeartbeatsToServer(ctx context.Context, cfg ShellTimeConfig, payload HeartbeatPayload) error { + ctx, span := modelTracer.Start(ctx, "api.sendHeartbeats") + defer span.End() + + endpoint := Endpoint{ + Token: cfg.Token, + APIEndpoint: cfg.APIEndpoint, + } + + var response HeartbeatResponse + err := SendHTTPRequestJSON(HTTPRequestOptions[HeartbeatPayload, HeartbeatResponse]{ + Context: ctx, + Endpoint: endpoint, + Method: http.MethodPost, + Path: "/api/v1/heartbeats", + Payload: payload, + Response: &response, + Timeout: 10 * time.Second, + }) + + if err != nil { + return err + } + + return nil +} diff --git a/model/db.go b/model/db.go index ad1fd8e..eedf778 100644 --- a/model/db.go +++ b/model/db.go @@ -23,6 +23,7 @@ var ( COMMAND_PRE_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/pre.txt" COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" + HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" ) func InitFolder(baseFolder string) { @@ -34,6 +35,7 @@ func InitFolder(baseFolder string) { COMMAND_PRE_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/pre.txt" COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" + HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" } // key: ${shell}|${sessionID}|${command}|${username} diff --git a/model/heartbeat.go b/model/heartbeat.go new file mode 100644 index 0000000..8ae2922 --- /dev/null +++ b/model/heartbeat.go @@ -0,0 +1,52 @@ +package model + +// HeartbeatPayload represents the payload for heartbeat ingestion +// Sent by client daemon in batches +type HeartbeatPayload struct { + Heartbeats []HeartbeatData `json:"heartbeats"` +} + +// HeartbeatData represents a single heartbeat event +type HeartbeatData struct { + // Unique identifier for idempotency (client-generated UUID) + HeartbeatID string `json:"heartbeatId"` + + // Core activity data + Entity string `json:"entity"` // File path, URL, or app name + EntityType string `json:"entityType,omitempty"` // "file", "app", "domain" - defaults to "file" + Category string `json:"category,omitempty"` // "coding", "debugging", etc. - defaults to "coding" + Time int64 `json:"time"` // Unix timestamp in seconds + + // Project context + Project string `json:"project,omitempty"` + ProjectRootPath string `json:"projectRootPath,omitempty"` + Branch string `json:"branch,omitempty"` + + // File details + Language string `json:"language,omitempty"` + Lines *int `json:"lines,omitempty"` + LineNumber *int `json:"lineNumber,omitempty"` + CursorPosition *int `json:"cursorPosition,omitempty"` + + // Editor/IDE information + Editor string `json:"editor,omitempty"` + EditorVersion string `json:"editorVersion,omitempty"` + Plugin string `json:"plugin,omitempty"` + PluginVersion string `json:"pluginVersion,omitempty"` + + // Machine context + Machine string `json:"machine,omitempty"` + OS string `json:"os,omitempty"` + OSVersion string `json:"osVersion,omitempty"` + + // Write tracking + IsWrite bool `json:"isWrite,omitempty"` +} + +// HeartbeatResponse represents the response for heartbeat ingestion +type HeartbeatResponse struct { + Success bool `json:"success"` + Processed int `json:"processed"` + Errors int `json:"errors"` + Message string `json:"message,omitempty"` +} diff --git a/model/types.go b/model/types.go index c293f72..daf2e1a 100644 --- a/model/types.go +++ b/model/types.go @@ -31,6 +31,11 @@ type CCOtel struct { GRPCPort int `toml:"grpcPort"` // default: 4317 } +// CodeTracking configuration for coding activity heartbeat tracking +type CodeTracking struct { + Enabled *bool `toml:"enabled"` +} + type ShellTimeConfig struct { Token string APIEndpoint string @@ -69,6 +74,9 @@ type ShellTimeConfig struct { // CCOtel configuration for OTEL-based Claude Code tracking (v2 - gRPC passthrough) CCOtel *CCOtel `toml:"ccotel"` + // CodeTracking configuration for coding activity heartbeat tracking + CodeTracking *CodeTracking `toml:"codeTracking"` + // SocketPath is the path to the Unix domain socket used for communication // between the CLI and the daemon. SocketPath string `toml:"socketPath"` @@ -98,6 +106,7 @@ var DefaultConfig = ShellTimeConfig{ Exclude: []string{}, CCUsage: nil, CCOtel: nil, + CodeTracking: nil, SocketPath: DefaultSocketPath, } From b86fa51d5d9efd5556747d280646f2a456043e77 Mon Sep 17 00:00:00 2001 From: AnnatarHe Date: Thu, 25 Dec 2025 15:06:37 +0800 Subject: [PATCH 2/2] feat(model): add cache support to ConfigService with skip option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add caching mechanism to ReadConfigFile using functional options pattern. Cache is stored in-memory with RWMutex for thread safety. Use WithSkipCache() option to bypass cache and read from disk. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- model/config.go | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/model/config.go b/model/config.go index f7a2fd5..087ca63 100644 --- a/model/config.go +++ b/model/config.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/pelletier/go-toml/v2" ) @@ -13,11 +14,28 @@ import ( var UserShellTimeConfig ShellTimeConfig type ConfigService interface { - ReadConfigFile(ctx context.Context) (ShellTimeConfig, error) + ReadConfigFile(ctx context.Context, opts ...ReadConfigOption) (ShellTimeConfig, error) +} + +// readConfigOptions holds configuration for ReadConfigFile behavior +type readConfigOptions struct { + skipCache bool +} + +// ReadConfigOption is a functional option for ReadConfigFile +type ReadConfigOption func(*readConfigOptions) + +// WithSkipCache returns an option that skips the cache and reads from disk +func WithSkipCache() ReadConfigOption { + return func(o *readConfigOptions) { + o.skipCache = true + } } type configService struct { configFilePath string + cachedConfig *ShellTimeConfig + mu sync.RWMutex } func NewConfigService(configFilePath string) ConfigService { @@ -73,10 +91,27 @@ func mergeConfig(base, local *ShellTimeConfig) { } } -func (cs *configService) ReadConfigFile(ctx context.Context) (config ShellTimeConfig, err error) { +func (cs *configService) ReadConfigFile(ctx context.Context, opts ...ReadConfigOption) (config ShellTimeConfig, err error) { ctx, span := modelTracer.Start(ctx, "config.read") defer span.End() + // Apply options + options := &readConfigOptions{} + for _, opt := range opts { + opt(options) + } + + // Check cache first (unless skipCache is set) + if !options.skipCache { + cs.mu.RLock() + if cs.cachedConfig != nil { + config = *cs.cachedConfig + cs.mu.RUnlock() + return config, nil + } + cs.mu.RUnlock() + } + configFile := cs.configFilePath existingConfig, err := os.ReadFile(configFile) if err != nil { @@ -142,6 +177,11 @@ func (cs *configService) ReadConfigFile(ctx context.Context) (config ShellTimeCo config.SocketPath = DefaultSocketPath } + // Save to cache + cs.mu.Lock() + cs.cachedConfig = &config + cs.mu.Unlock() + UserShellTimeConfig = config return }