diff --git a/services/monitor/executor.go b/services/monitor/executor.go index f612546..931c6f2 100644 --- a/services/monitor/executor.go +++ b/services/monitor/executor.go @@ -3,10 +3,14 @@ package monitor import ( "context" "fmt" + "io" "os" "os/exec" "path/filepath" "strings" + "time" + + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/colors" ) type CommandExecutor interface { @@ -34,6 +38,7 @@ func (e *ChifraExecutor) Execute(ctx context.Context, cmd Command, vars Template chifraCmd := expandedArgs[0] chifraArgs := expandedArgs[1:] + fmt.Println(colors.BrightGreen, "output", output, "cmd", strings.Join(expandedArgs, " "), colors.Off) if output != "" { dir := filepath.Dir(output) if dir != "" && dir != "." { @@ -51,6 +56,7 @@ func (e *ChifraExecutor) Execute(ctx context.Context, cmd Command, vars Template cmdStr := fmt.Sprintf("chifra %s %s", chifraCmd, strings.Join(chifraArgs, " ")) _ = cmdStr + fmt.Println(colors.BrightGreen, cmdStr, colors.Off) return nil } @@ -62,16 +68,29 @@ func NewShellExecutor() *ShellExecutor { } func (e *ShellExecutor) Execute(ctx context.Context, cmd Command, vars TemplateVars) error { + start := time.Now() + defer func() { + duration := time.Since(start) + if duration > 30*time.Second { + fmt.Printf("[MONITOR] SLOW: Command %s for %s took %v\n", cmd.ID, vars.Address, duration) + } + }() + expandedArgs := make([]string, len(cmd.Arguments)) for i, arg := range cmd.Arguments { expandedArgs[i] = ExpandTemplate(arg, vars) } output := ExpandTemplate(cmd.Output, vars) + fmt.Println(colors.BrightGreen, cmd, strings.Join(expandedArgs, " "), colors.Off) - shellCmd := exec.CommandContext(ctx, cmd.Command, expandedArgs...) + // Create timeout context for this specific command (5 minutes max per monitor) + cmdCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() - if output != "" { + shellCmd := exec.CommandContext(cmdCtx, cmd.Command, expandedArgs...) + + if output != "" && output != "/dev/null" { dir := filepath.Dir(output) if dir != "" && dir != "." { if err := os.MkdirAll(dir, 0o755); err != nil { @@ -88,8 +107,9 @@ func (e *ShellExecutor) Execute(ctx context.Context, cmd Command, vars TemplateV shellCmd.Stdout = file shellCmd.Stderr = file } else { - shellCmd.Stdout = nil - shellCmd.Stderr = nil + // Discard all output - don't set to nil as that can cause blocking + shellCmd.Stdout = io.Discard + shellCmd.Stderr = io.Discard } if err := shellCmd.Run(); err != nil { diff --git a/services/monitor/executor_test.go b/services/monitor/executor_test.go index 233e2b3..86db11b 100644 --- a/services/monitor/executor_test.go +++ b/services/monitor/executor_test.go @@ -217,7 +217,7 @@ func TestShellExecutor_Execute_TemplateExpansion(t *testing.T) { vars := TemplateVars{ Address: "0xabc123", - Chain: "sepolia", + Chain: "gnosis", FirstBlock: 100, LastBlock: 200, BlockCount: 101, @@ -239,7 +239,7 @@ func TestShellExecutor_Execute_TemplateExpansion(t *testing.T) { t.Fatalf("Execute() unexpected error = %v", err) } - expectedPath := filepath.Join(tmpDir, "sepolia", "0xabc123.txt") + expectedPath := filepath.Join(tmpDir, "gnosis", "0xabc123.txt") if _, err := os.Stat(expectedPath); os.IsNotExist(err) { t.Fatalf("Output file not created at expected path: %s", expectedPath) } @@ -249,7 +249,7 @@ func TestShellExecutor_Execute_TemplateExpansion(t *testing.T) { t.Fatalf("Failed to read output file: %v", err) } - expectedContent := "Address: 0xabc123, Chain: sepolia, Blocks: 100-200" + expectedContent := "Address: 0xabc123, Chain: gnosis, Blocks: 100-200" actualContent := strings.TrimSpace(string(content)) if actualContent != expectedContent { t.Errorf("Expected output '%s', got '%s'", expectedContent, actualContent) diff --git a/services/monitor/parser.go b/services/monitor/parser.go index 63a93fe..6a0f09f 100644 --- a/services/monitor/parser.go +++ b/services/monitor/parser.go @@ -8,6 +8,10 @@ import ( "strings" "gopkg.in/yaml.v3" + + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/base" + chifraMonitor "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/monitor" + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/rpc" ) type MonitorEntry struct { @@ -37,6 +41,13 @@ func ParseWatchlist(path string) ([]MonitorEntry, error) { scanner := bufio.NewScanner(file) lineNum := 0 + chain := "mainnet" + filename := filepath.Base(path) + if strings.HasPrefix(filename, "watchlist-") && strings.HasSuffix(filename, ".txt") { + chain = strings.TrimSuffix(strings.TrimPrefix(filename, "watchlist-"), ".txt") + } + conn := rpc.TempConnection(chain) + for scanner.Scan() { lineNum++ line := strings.TrimSpace(scanner.Text()) @@ -51,10 +62,23 @@ func ParseWatchlist(path string) ([]MonitorEntry, error) { } parts := strings.Split(line, ",") - address := strings.TrimSpace(parts[0]) + addressOrEns := strings.TrimSpace(parts[0]) - if !isValidAddress(address) { - return nil, NewWatchlistError(path, lineNum, fmt.Sprintf("invalid address: %s", address)) + if !base.IsValidAddress(addressOrEns) { + return nil, NewWatchlistError(path, lineNum, fmt.Sprintf("invalid address: %s", addressOrEns)) + } + + address := addressOrEns + if strings.HasSuffix(addressOrEns, ".eth") { + if aa, ok := conn.GetEnsAddress(addressOrEns); !ok { + if bb, okok := conn.GetEnsAddress(addressOrEns); !okok { + return []MonitorEntry{}, fmt.Errorf("failed to resolve ENS name: %s", addressOrEns) + } else { + address = bb + } + } else { + address = aa + } } entry := MonitorEntry{ @@ -86,6 +110,27 @@ func ParseWatchlist(path string) ([]MonitorEntry, error) { return entries, nil } +// readMonitorState reads the LastScanned value from a monitor file using chifra's monitor package +func readMonitorState(chain, address string) (uint64, error) { + // Create monitor instance + mon, err := chifraMonitor.NewMonitor(chain, base.HexToAddress(address), false) + if err != nil { + return 0, err + } + defer mon.Close() + + // Read the monitor header to get LastScanned + if err := mon.ReadMonitorHeader(); err != nil { + // Monitor file doesn't exist yet, start from 0 + if os.IsNotExist(err) { + return 0, nil + } + return 0, fmt.Errorf("failed to read monitor header: %v", err) + } + + return uint64(mon.LastScanned), nil +} + func DiscoverMonitors(chain string) ([]MonitorEntry, error) { homeDir, err := os.UserHomeDir() if err != nil { @@ -115,9 +160,16 @@ func DiscoverMonitors(chain string) ([]MonitorEntry, error) { continue } + // Read the actual LastScanned value from the monitor file + lastScanned, err := readMonitorState(chain, address) + if err != nil { + // fmt.Printf("[MONITOR] Warning: Could not read state for %s: %v, using 0\n", address, err) + lastScanned = 0 + } + monitors = append(monitors, MonitorEntry{ Address: address, - StartingBlock: 0, + StartingBlock: lastScanned, }) } diff --git a/services/monitor/service.go b/services/monitor/service.go index ff62244..a834ae7 100644 --- a/services/monitor/service.go +++ b/services/monitor/service.go @@ -3,27 +3,40 @@ package monitor import ( "context" "fmt" + "io" "log/slog" - "os/exec" + "os" "path/filepath" "strings" + "sync/atomic" "time" + + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/logger" + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/rpc" ) type ChainState struct { - Chain string - Monitors []MonitorEntry - Commands []Command + Chain string + Monitors []MonitorEntry + Commands []Command + WatchlistModTime time.Time + CommandsModTime time.Time + WatchlistPath string + CommandsPath string } type MonitorService struct { - logger *slog.Logger - chains []string - config MonitorConfig - chainStates map[string]*ChainState - paused bool - ctx context.Context - cancel context.CancelFunc + logger *slog.Logger + chains []string + config MonitorConfig + chainStates map[string]*ChainState + paused bool + ctx context.Context + cancel context.CancelFunc + processedCount atomic.Int64 // Atomic counter for current round + monitorAddressMap map[string]string // address -> monitor mapping for logging + processing atomic.Bool // True if a round is currently in progress + currentAddress atomic.Value // string: Address currently being processed } func NewMonitorService(logger *slog.Logger, chains []string, config MonitorConfig) (*MonitorService, error) { @@ -34,13 +47,14 @@ func NewMonitorService(logger *slog.Logger, chains []string, config MonitorConfi ctx, cancel := context.WithCancel(context.Background()) return &MonitorService{ - logger: logger, - chains: chains, - config: config, - chainStates: make(map[string]*ChainState), - paused: false, - ctx: ctx, - cancel: cancel, + logger: logger, + chains: chains, + config: config, + chainStates: make(map[string]*ChainState), + monitorAddressMap: make(map[string]string), + paused: false, + ctx: ctx, + cancel: cancel, }, nil } @@ -52,6 +66,7 @@ func (s *MonitorService) Initialize() error { for _, chain := range s.chains { var monitors []MonitorEntry var err error + var watchlistPath string if s.config.WatchlistDir == "all" { monitors, err = DiscoverMonitors(chain) @@ -59,7 +74,7 @@ func (s *MonitorService) Initialize() error { return fmt.Errorf("failed to discover monitors for chain %s: %v", chain, err) } } else { - watchlistPath := filepath.Join(s.config.WatchlistDir, fmt.Sprintf("watchlist-%s.txt", chain)) + watchlistPath = filepath.Join(s.config.WatchlistDir, fmt.Sprintf("watchlist-%s.txt", chain)) monitors, err = ParseWatchlist(watchlistPath) if err != nil { return fmt.Errorf("failed to parse watchlist for chain %s: %v", chain, err) @@ -72,10 +87,18 @@ func (s *MonitorService) Initialize() error { return fmt.Errorf("failed to parse commands for chain %s: %v", chain, err) } + // Get initial modification times + watchlistModTime := getFileModTime(watchlistPath) + commandsModTime := getFileModTime(commandsPath) + s.chainStates[chain] = &ChainState{ - Chain: chain, - Monitors: monitors, - Commands: commands, + Chain: chain, + Monitors: monitors, + Commands: commands, + WatchlistPath: watchlistPath, + CommandsPath: commandsPath, + WatchlistModTime: watchlistModTime, + CommandsModTime: commandsModTime, } s.logger.Info("Initialized chain", "chain", chain, "monitors", len(monitors), "commands", len(commands)) @@ -87,159 +110,14 @@ func (s *MonitorService) Initialize() error { func (s *MonitorService) Process(ready chan bool) error { ready <- true - for { - if s.paused { - time.Sleep(time.Second) - continue - } - - select { - case <-s.ctx.Done(): - return nil - default: - } - - anyProgress := false - metrics := IterationMetrics{ - StartTime: time.Now(), - } - - for _, chain := range s.chains { - state := s.chainStates[chain] - if state == nil { - continue - } - - chainProgress := s.processChain(state, &metrics) - if chainProgress { - anyProgress = true - } - } - - metrics.Duration = time.Since(metrics.StartTime) - s.logMetrics(metrics) - - if s.shouldFailEarly(metrics) { - return fmt.Errorf("fail-early triggered: high failure rate (%d/%d failed)", metrics.MonitorsFailed, metrics.MonitorsTotal) - } - - if !anyProgress { - time.Sleep(time.Duration(s.config.Sleep) * time.Second) - } - } -} - -func (s *MonitorService) processChain(state *ChainState, metrics *IterationMetrics) bool { - if len(state.Monitors) == 0 { - return false - } - - chainHead, err := s.getChainHead(state.Chain) - if err != nil { - s.logger.Warn("Failed to get chain head", "chain", state.Chain, "error", err) - return false - } - - batchCount := (len(state.Monitors) + s.config.BatchSize - 1) / s.config.BatchSize - metrics.BatchesTotal += batchCount - - for i := 0; i < len(state.Monitors); i += s.config.BatchSize { - end := i + s.config.BatchSize - if end > len(state.Monitors) { - end = len(state.Monitors) - } - batch := state.Monitors[i:end] - - if err := s.batchFreshen(state.Chain, batch); err != nil { - s.logger.Warn("Batch freshen failed", "chain", state.Chain, "batch", i/s.config.BatchSize+1, "error", err) - metrics.BatchesFailed++ - } - } - - pool := NewWorkerPool(s.config.Concurrency, NewShellExecutor()) - pool.Start() - - for _, entry := range state.Monitors { - firstBlock := entry.StartingBlock - lastBlock := chainHead - - if s.config.MaxBlocksPerRun > 0 && lastBlock-firstBlock > uint64(s.config.MaxBlocksPerRun) { - lastBlock = firstBlock + uint64(s.config.MaxBlocksPerRun) - } - - if lastBlock <= firstBlock { - continue - } - - vars := TemplateVars{ - Address: entry.Address, - Chain: state.Chain, - FirstBlock: firstBlock, - LastBlock: lastBlock, - BlockCount: lastBlock - firstBlock + 1, - } - - job := MonitorJob{ - Entry: entry, - Commands: state.Commands, - Vars: vars, - } - - pool.Submit(job) - metrics.MonitorsTotal++ - } - - results := pool.Wait() - - for _, result := range results { - if result.Success { - metrics.MonitorsSuccess++ - } else { - metrics.MonitorsFailed++ - s.logger.Warn("Monitor processing failed", "chain", state.Chain, "address", result.Address, "error", result.Error) - } - } - - return metrics.MonitorsTotal > 0 -} - -func (s *MonitorService) batchFreshen(chain string, batch []MonitorEntry) error { - if len(batch) == 0 { - return nil - } - - addresses := make([]string, len(batch)) - for i, entry := range batch { - addresses[i] = entry.Address - } - - args := []string{"export", "--freshen"} - args = append(args, addresses...) - args = append(args, "--chain", chain) - - cmd := exec.CommandContext(s.ctx, "chifra", args...) - output, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("chifra export failed: %v: %s", err, string(output)) - } - + // Event-driven: processing happens via ProcessBlockRange() called by coordinator + <-s.ctx.Done() return nil } func (s *MonitorService) getChainHead(chain string) (uint64, error) { - cmd := exec.CommandContext(s.ctx, "chifra", "blocks", "--count", "--chain", chain) - output, err := cmd.Output() - if err != nil { - return 0, fmt.Errorf("failed to get chain head: %v", err) - } - - var head uint64 - _, err = fmt.Sscanf(strings.TrimSpace(string(output)), "%d", &head) - if err != nil { - return 0, fmt.Errorf("failed to parse chain head: %v", err) - } - - return head, nil + conn := rpc.TempConnection(chain) + return uint64(conn.GetLatestBlockNumber()), nil } func (s *MonitorService) shouldFailEarly(metrics IterationMetrics) bool { @@ -296,3 +174,297 @@ type IterationMetrics struct { MonitorsSuccess int MonitorsFailed int } + +// ProcessMonitors triggers the monitor to process newly scraped blocks for a chain +// This is called by the coordinator when the scraper completes a batch +// Monitors track their own state and determine which blocks to process +func (s *MonitorService) ProcessMonitors(chain string) error { + defer func() { + logger.SetLoggerWriter(io.Discard) + }() + logger.SetLoggerWriter(os.Stderr) + + fmt.Println("[MONITOR] ProcessMonitors called for chain:", chain) + + // Skip if already processing a round + if s.processing.Load() { + processed := s.processedCount.Load() + state := s.chainStates[chain] + total := 0 + if state != nil { + total = len(state.Monitors) + } + currentAddr := "" + if addr := s.currentAddress.Load(); addr != nil { + currentAddr = addr.(string) + } + fmt.Printf("[MONITOR] Skipping - previous round %d/%d (%s) still in progress for chain: %s\n", processed, total, currentAddr[:10]+"...", chain) + return nil + } + s.processing.Store(true) + defer s.processing.Store(false) + + // Check if config files have been modified and reload if necessary + if err := s.checkAndReloadConfig(chain); err != nil { + fmt.Printf("[MONITOR] Warning: Failed to reload config for %s: %v\n", chain, err) + } + + if s.IsPaused() { + // fmt.Println("[MONITOR] ProcessMonitors skipped - service is paused for chain:", chain) + return nil + } + + state := s.chainStates[chain] + if state == nil { + // fmt.Println("[MONITOR] ERROR: ProcessMonitors failed - no state for chain:", chain) + return fmt.Errorf("no state for chain %s", chain) + } + + if len(state.Monitors) == 0 { + // fmt.Println("[MONITOR] ProcessMonitors skipped - no monitors configured for chain:", chain) + return nil + } + + metrics := IterationMetrics{ + StartTime: time.Now(), + } + + // Get current head of chain to determine processing range + latestBlock, err := s.getChainHead(chain) + if err != nil { + // fmt.Println("[MONITOR] ERROR: ProcessMonitors failed - could not get chain head for", chain, "error:", err) + return fmt.Errorf("failed to get chain head: %w", err) + } + + fmt.Printf("[MONITOR] Starting monitor processing for chain: %s, latestBlock: %d, monitorCount: %d\n", chain, latestBlock, len(state.Monitors)) + + // Reset progress counter for this round + s.processedCount.Store(0) + + // Build address map for progress logging + s.monitorAddressMap = make(map[string]string) + for _, m := range state.Monitors { + s.monitorAddressMap[m.Address] = m.Address + } + + // Refresh monitor states to get current LastScanned values + // fmt.Println("[MONITOR] Refreshing monitor states from .mon.bin files...") + for i := range state.Monitors { + lastScanned, err := readMonitorState(chain, state.Monitors[i].Address) + if err != nil { + fmt.Printf("[MONITOR] Warning: Could not read state for %s: %v, keeping previous value %d\n", + state.Monitors[i].Address, err, state.Monitors[i].StartingBlock) + } else { + if lastScanned != state.Monitors[i].StartingBlock { + // fmt.Printf("[MONITOR] Updated %s: StartingBlock %d -> %d\n", + // state.Monitors[i].Address, state.Monitors[i].StartingBlock, lastScanned) + state.Monitors[i].StartingBlock = lastScanned + } + } + } + + // TEMPORARY: Sequential processing for debugging (bypasses worker pool) + useSequential := true // Set to false to re-enable worker pool + + if useSequential { + // Sequential processing with batching + executor := NewShellExecutor() + batchSize := s.config.BatchSize + + // Process monitors in batches + for i := 0; i < len(state.Monitors); i += batchSize { + end := i + batchSize + if end > len(state.Monitors) { + end = len(state.Monitors) + } + batch := state.Monitors[i:end] + + // Collect addresses and check if any need processing + var addresses []string + var firstAddr string + skippedAll := true + + for _, entry := range batch { + startBlock := entry.StartingBlock + endBlock := latestBlock + + if s.config.MaxBlocksPerRun > 0 && endBlock-startBlock > uint64(s.config.MaxBlocksPerRun) { + endBlock = startBlock + uint64(s.config.MaxBlocksPerRun) + } + + if endBlock <= startBlock { + fmt.Printf("[MONITOR] Monitor skipped - no new blocks: chain=%s address=%s startBlock=%d endBlock=%d\n", chain, entry.Address, startBlock, endBlock) + continue + } + + addresses = append(addresses, entry.Address) + if firstAddr == "" { + firstAddr = entry.Address + } + skippedAll = false + metrics.MonitorsTotal++ + } + + if skippedAll || len(addresses) == 0 { + continue + } + + // Update progress for this batch + s.currentAddress.Store(firstAddr) + processed := s.processedCount.Add(int64(len(addresses))) + fmt.Printf("[MONITOR] Processing batch %d/%d (%d addresses, starting with %s)\n", + processed, len(state.Monitors), len(addresses), firstAddr[:10]+"...") + + // Build template vars with multiple addresses + vars := TemplateVars{ + Addresses: addresses, + Address: firstAddr, // Keep for backwards compatibility + Chain: state.Chain, + FirstBlock: batch[0].StartingBlock, + LastBlock: latestBlock, + BlockCount: latestBlock - batch[0].StartingBlock + 1, + } + + // Execute commands for this batch + var executeErr error + for _, cmd := range state.Commands { + if err := executor.Execute(context.Background(), cmd, vars); err != nil { + executeErr = err + break + } + } + + if executeErr != nil { + metrics.MonitorsFailed += len(addresses) + s.logger.Warn("Batch processing failed", "chain", state.Chain, "addresses", len(addresses), "error", executeErr) + } else { + metrics.MonitorsSuccess += len(addresses) + } + } + fmt.Println() // Newline after processing + } else { + // Process monitors with worker pool (original code) + pool := NewWorkerPool(s.config.Concurrency, NewShellExecutor()) + + // Set up real-time progress callback + pool.onProgressUpdate = func(result JobResult) { + s.currentAddress.Store(result.Address) + processed := s.processedCount.Add(1) + fmt.Printf("\r[MONITOR] Processing monitors %d/%d (%s)%s", + processed, metrics.MonitorsTotal, result.Address[:10]+"...", strings.Repeat(" ", 30)) + } + + pool.Start() + + for _, entry := range state.Monitors { + startBlock := entry.StartingBlock + endBlock := latestBlock + + if s.config.MaxBlocksPerRun > 0 && endBlock-startBlock > uint64(s.config.MaxBlocksPerRun) { + endBlock = startBlock + uint64(s.config.MaxBlocksPerRun) + } + + if endBlock <= startBlock { + fmt.Printf("[MONITOR] Monitor skipped - no new blocks: chain=%s address=%s startBlock=%d endBlock=%d\n", chain, entry.Address, startBlock, endBlock) + continue + } + + vars := TemplateVars{ + Address: entry.Address, + Chain: state.Chain, + FirstBlock: startBlock, + LastBlock: endBlock, + BlockCount: endBlock - startBlock + 1, + } + + job := MonitorJob{ + Entry: entry, + Commands: state.Commands, + Vars: vars, + } + + pool.Submit(job) + metrics.MonitorsTotal++ + } + + fmt.Printf("[MONITOR] %d monitors submitted to worker pool for chain: %s\n", metrics.MonitorsTotal, chain) + + results := pool.Wait() + fmt.Println() // Newline after progress updates + + for _, result := range results { + if result.Success { + metrics.MonitorsSuccess++ + } else { + metrics.MonitorsFailed++ + s.logger.Warn("Monitor processing failed", "chain", state.Chain, "address", result.Address, "error", result.Error) + } + } + } + + metrics.Duration = time.Since(metrics.StartTime) + s.logMetrics(metrics) + + if s.shouldFailEarly(metrics) { + return fmt.Errorf("fail-early triggered: high failure rate (%d/%d failed)", metrics.MonitorsFailed, metrics.MonitorsTotal) + } + + return nil +} + +// getFileModTime returns the modification time of a file, or zero time if file doesn't exist +func getFileModTime(path string) time.Time { + info, err := os.Stat(path) + if err != nil { + return time.Time{} + } + return info.ModTime() +} + +// checkAndReloadConfig checks if watchlist or commands files have been modified and reloads them +func (s *MonitorService) checkAndReloadConfig(chain string) error { + state := s.chainStates[chain] + if state == nil { + return nil + } + + reloadNeeded := false + + // Check watchlist file + if state.WatchlistPath != "" { + currentModTime := getFileModTime(state.WatchlistPath) + if !currentModTime.IsZero() && currentModTime.After(state.WatchlistModTime) { + fmt.Printf("[MONITOR] Watchlist file changed for %s, reloading...\n", chain) + monitors, err := ParseWatchlist(state.WatchlistPath) + if err != nil { + return fmt.Errorf("failed to reload watchlist: %v", err) + } + state.Monitors = monitors + state.WatchlistModTime = currentModTime + reloadNeeded = true + fmt.Printf("[MONITOR] Reloaded watchlist for %s: %d monitors\n", chain, len(monitors)) + } + } + + // Check commands file + if state.CommandsPath != "" { + currentModTime := getFileModTime(state.CommandsPath) + if !currentModTime.IsZero() && currentModTime.After(state.CommandsModTime) { + fmt.Printf("[MONITOR] Commands file changed for %s, reloading...\n", chain) + commands, err := ParseCommands(state.CommandsPath) + if err != nil { + return fmt.Errorf("failed to reload commands: %v", err) + } + state.Commands = commands + state.CommandsModTime = currentModTime + reloadNeeded = true + fmt.Printf("[MONITOR] Reloaded commands for %s: %d commands\n", chain, len(commands)) + } + } + + if reloadNeeded { + s.logger.Info("Configuration reloaded", "chain", chain, "monitors", len(state.Monitors), "commands", len(state.Commands)) + } + + return nil +} diff --git a/services/monitor/service_test.go b/services/monitor/service_test.go index b72bf10..5e043aa 100644 --- a/services/monitor/service_test.go +++ b/services/monitor/service_test.go @@ -9,7 +9,7 @@ import ( func TestNewMonitorService(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - chains := []string{"mainnet", "sepolia"} + chains := []string{"mainnet", "gnosis"} config := MonitorConfig{ WatchlistDir: "/tmp/watchlists", CommandsDir: "/tmp/commands", diff --git a/services/monitor/template.go b/services/monitor/template.go index b2c3e3b..9f8c4ac 100644 --- a/services/monitor/template.go +++ b/services/monitor/template.go @@ -5,7 +5,8 @@ import ( ) type TemplateVars struct { - Address string + Address string // Single address (for backwards compatibility) + Addresses []string // Multiple addresses for batching Chain string FirstBlock uint64 LastBlock uint64 @@ -14,7 +15,12 @@ type TemplateVars struct { func ExpandTemplate(input string, vars TemplateVars) string { result := input - result = strings.ReplaceAll(result, "{address}", vars.Address) + // For multiple addresses, join them with spaces + if len(vars.Addresses) > 0 { + result = strings.ReplaceAll(result, "{address}", strings.Join(vars.Addresses, " ")) + } else { + result = strings.ReplaceAll(result, "{address}", vars.Address) + } result = strings.ReplaceAll(result, "{chain}", vars.Chain) result = strings.ReplaceAll(result, "{first_block}", formatUint64(vars.FirstBlock)) result = strings.ReplaceAll(result, "{last_block}", formatUint64(vars.LastBlock)) diff --git a/services/monitor/worker.go b/services/monitor/worker.go index 3fccea8..9554f07 100644 --- a/services/monitor/worker.go +++ b/services/monitor/worker.go @@ -18,21 +18,22 @@ type JobResult struct { } type WorkerPool struct { - concurrency int - jobs chan MonitorJob - results chan JobResult - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - executor CommandExecutor + concurrency int + jobs chan MonitorJob + results chan JobResult + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + executor CommandExecutor + onProgressUpdate func(result JobResult) // Callback for real-time progress } func NewWorkerPool(concurrency int, executor CommandExecutor) *WorkerPool { ctx, cancel := context.WithCancel(context.Background()) return &WorkerPool{ concurrency: concurrency, - jobs: make(chan MonitorJob, concurrency*2), - results: make(chan JobResult, concurrency*2), + jobs: make(chan MonitorJob, 10000), + results: make(chan JobResult, 10000), ctx: ctx, cancel: cancel, executor: executor, @@ -76,6 +77,11 @@ func (wp *WorkerPool) processJob(job MonitorJob) { } } + // Call progress callback if set (before sending to results channel) + if wp.onProgressUpdate != nil { + wp.onProgressUpdate(result) + } + select { case wp.results <- result: case <-wp.ctx.Done(): diff --git a/services/service_scraper.go b/services/service_scraper.go index ef807e7..51a5b7c 100644 --- a/services/service_scraper.go +++ b/services/service_scraper.go @@ -9,20 +9,29 @@ import ( "time" "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/logger" + "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/output" "github.com/TrueBlocks/trueblocks-chifra/v6/pkg/types" sdk "github.com/TrueBlocks/trueblocks-sdk/v6" ) +// ScrapeCompletedEvent is sent when scraper completes a batch +// This is a "wake up" signal - monitors track their own block state +type ScrapeCompletedEvent struct { + Chain string + Meta interface{} // *types.MetaData from core +} + // ScrapeService implements Servicer, Pauser, and Restarter interfaces type ScrapeService struct { - paused bool - logger *slog.Logger - initMode string - configTargets []string - sleep int - blockCnt int - ctx context.Context - cancel context.CancelFunc + paused bool + logger *slog.Logger + initMode string + configTargets []string + sleep int + blockCnt int + ctx context.Context + cancel context.CancelFunc + onScrapeComplete func(ScrapeCompletedEvent) } func NewScrapeService(logger *slog.Logger, initMode string, configTargets []string, sleep int, blockCnt int) *ScrapeService { @@ -139,6 +148,11 @@ func (s *ScrapeService) Logger() *slog.Logger { return s.logger } +// SetScrapeCompleteCallback configures the scraper to call a function when scraping completes +func (s *ScrapeService) SetScrapeCompleteCallback(fn func(ScrapeCompletedEvent)) { + s.onScrapeComplete = fn +} + func (s *ScrapeService) initOneChain(chain string) (*scraperReport, error) { defer func() { logger.SetLoggerWriter(io.Discard) @@ -187,20 +201,77 @@ func (s *ScrapeService) scrapeOneChain(chain string) (*scraperReport, error) { return nil, nil } + // Create streaming context for event handling + rCtx := output.NewStreamingContext() + done := make(chan struct{}) + + // Start listener goroutine for streaming events + go func() { + defer close(done) + for { + select { + case event, ok := <-rCtx.ModelChan: + if !ok { + return + } + s.handleStreamEvent(event) + case err, ok := <-rCtx.ErrorChan: + if !ok { + return + } + s.logger.Error("Scrape error", "error", err) + case <-rCtx.Ctx.Done(): + return + } + } + }() + opts := sdk.ScrapeOptions{ BlockCnt: uint64(s.blockCnt), Globals: sdk.Globals{ Chain: chain, }, + RenderCtx: rCtx, } - if msg, meta, err := opts.ScrapeRunOnce(); err != nil { + msg, meta, err := opts.ScrapeRunOnce() + + // Cleanup: close channels and wait for goroutine + close(rCtx.ModelChan) + close(rCtx.ErrorChan) + <-done + + if err != nil { return nil, err - } else { - if len(msg) > 0 { - s.logger.Info(msg[0].String()) + } + + if len(msg) > 0 { + s.logger.Info(msg[0].String()) + } + return reportScrapeRun(meta, chain, s.blockCnt), nil +} + +// handleStreamEvent processes events from the streaming context +func (s *ScrapeService) handleStreamEvent(event types.Modeler) { + switch e := event.(type) { + case *types.Message: + s.logger.Info(e.Msg) + default: + if s.onScrapeComplete != nil { + m := event.Model("", "", false, nil) + if chainVal, ok := m.Data["chain"]; ok { + if chainStr, ok := chainVal.(string); ok { + go func(chain string) { + defer func() { + if r := recover(); r != nil { + s.logger.Error("Callback panic", "error", r) + } + }() + s.onScrapeComplete(ScrapeCompletedEvent{Chain: chain}) + }(chainStr) + } + } } - return reportScrapeRun(meta, chain, s.blockCnt), nil } }