diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e0c44f..f719d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixes - Fixed multi-workspace desktop/API sync scoping, fail-closed workspace authentication, workspace-qualified purges, and explicit watch workspace selection. Thanks @zm2231. +- Prevented unchanged desktop refreshes from duplicating message events and added preview-first retained-history compaction with `purge --keep-message-events`. Thanks @barbieri. ## 0.7.3 - 2026-06-19 diff --git a/README.md b/README.md index 082175a..6449fe3 100644 --- a/README.md +++ b/README.md @@ -193,7 +193,7 @@ Choose the path that matches your setup: - `update` pulls and imports the latest git snapshot, or restores a historical tag/ref without moving the share checkout - `sync` performs a one-shot crawl from bot/API, MCP connector, wiretap/desktop, or both - `import` imports a Slack export ZIP or extracted export directory -- `purge` previews or deletes messages and message-owned records older than a cutoff +- `purge` previews or deletes messages and message-owned records older than a cutoff, with optional retained-event compaction - `tail` listens for live events through Socket Mode, including one tail per configured workspace - `watch` refreshes desktop-local state on a schedule, optionally scoped with `--workspace ` - `search` runs safe local text search with FTS and substring fallback, optionally filtered by workspace diff --git a/SPEC.md b/SPEC.md index eea2f8e..4fcfe49 100644 --- a/SPEC.md +++ b/SPEC.md @@ -192,6 +192,7 @@ Expected flags: - optional `--workspace ` - `--force` to execute; omission is a preview - `--keep-media` to retain cached media no longer referenced by stored messages +- optional `--keep-message-events ` to retain the newest events per message, event type, and source - `--vacuum` to compact SQLite after deletion Behavior: @@ -203,6 +204,7 @@ Behavior: - preserve workspaces, channels, users, and sync state - record per-channel retention floors so incremental API/MCP repair overlap does not restore purged history - delete only cached media paths with no remaining database references +- preview and compact retained event history only when `--keep-message-events` is provided - do not compact the SQLite file unless `--vacuum` is set ### `status` diff --git a/docs/desktop-mode.md b/docs/desktop-mode.md index 01c73b9..9247587 100644 --- a/docs/desktop-mode.md +++ b/docs/desktop-mode.md @@ -87,7 +87,7 @@ Use `watch` to keep refreshing the DB from local desktop state: slacrawl watch --desktop-every 5m ``` -This loop does not truncate the database. It repeatedly upserts and appends event history so the local DB stays current as Slack Desktop changes. It refreshes every workspace in the signed-in desktop profile by default; pass `--workspace T01234567` to restrict each refresh to one workspace. +This loop does not truncate the database. It repeatedly upserts desktop state and appends event history when a message snapshot changes, so unchanged refreshes do not amplify the archive. It refreshes every workspace in the signed-in desktop profile by default; pass `--workspace T01234567` to restrict each refresh to one workspace. ## Validation Commands diff --git a/docs/retention.md b/docs/retention.md index b96395a..7aabd31 100644 --- a/docs/retention.md +++ b/docs/retention.md @@ -43,6 +43,19 @@ Pass `--force` to execute: slacrawl purge --workspace T01234567 --older-than 90d --force ``` +Retained messages can also accumulate historical snapshots. Add +`--keep-message-events N` to preview keeping only the newest `N` events for +each message, event type, and source: + +```bash +slacrawl --json purge --older-than 90d --keep-message-events 5 +slacrawl purge --older-than 90d --keep-message-events 5 --force --vacuum +``` + +The preview reports `compacted_message_events` without changing the database. +Compaction never removes the canonical current row in `messages`, does not mix +event sources or event types, and follows `--workspace` when supplied. + The SQLite transaction deletes: - messages @@ -52,6 +65,10 @@ The SQLite transaction deletes: - embedding jobs - FTS entries +Consecutive identical message snapshots are suppressed during normal ingest; +real transitions, including a value changing and later reverting, remain in +event history. + Workspaces, channels, users, and sync state remain. Executed purges also record a per-channel retention floor so ordinary incremental API and MCP syncs do not restore deleted history through their repair overlap. New replies to expired diff --git a/internal/cli/app_test.go b/internal/cli/app_test.go index e40fce5..874fe5d 100644 --- a/internal/cli/app_test.go +++ b/internal/cli/app_test.go @@ -710,6 +710,7 @@ func TestCompletionBashOutput(t *testing.T) { require.Contains(t, out, "--ref") require.Contains(t, out, "--max-bytes") require.Contains(t, out, "--desktop-every --workspace") + require.Contains(t, out, "--keep-message-events") } func TestCompletionZshOutput(t *testing.T) { @@ -736,6 +737,7 @@ func TestCompletionZshOutput(t *testing.T) { require.Contains(t, out, "connector") require.Contains(t, out, "purge") require.Contains(t, out, "--keep-media") + require.Contains(t, out, "--keep-message-events[events retained per message, source, and type]") require.Contains(t, out, "--tag[immutable snapshot tag]") require.Contains(t, out, "--ref[historical Git ref to import]") require.Contains(t, out, "--workspace[workspace id]") diff --git a/internal/cli/completion.go b/internal/cli/completion.go index a5a7fd4..ddab394 100644 --- a/internal/cli/completion.go +++ b/internal/cli/completion.go @@ -182,7 +182,7 @@ _slacrawl() COMPREPLY=( $(compgen -W "--workspace --dry-run --force --format --help -h ${global_flags}" -- "${cur}") ) ;; purge) - COMPREPLY=( $(compgen -W "--before --older-than --workspace --force --keep-media --vacuum --help -h ${global_flags}" -- "${cur}") ) + COMPREPLY=( $(compgen -W "--before --older-than --workspace --force --keep-media --keep-message-events --vacuum --help -h ${global_flags}" -- "${cur}") ) ;; tail) COMPREPLY=( $(compgen -W "--workspace --repair-every --help -h ${global_flags}" -- "${cur}") ) @@ -320,7 +320,7 @@ _slacrawl() { _arguments '--workspace[workspace id]:workspace id:' '--dry-run[walk and count without writing]' '--force[overwrite existing slack-export rows at the same rank]' '--format[output format]:format:(text json log)' ;; purge) - _arguments '--before[absolute cutoff]:time:' '--older-than[relative cutoff]:duration:' '--workspace[workspace id]:workspace id:' '--force[execute deletion]' '--keep-media[retain unreferenced cached media]' '--vacuum[compact database after deletion]' + _arguments '--before[absolute cutoff]:time:' '--older-than[relative cutoff]:duration:' '--workspace[workspace id]:workspace id:' '--force[execute deletion]' '--keep-media[retain unreferenced cached media]' '--keep-message-events[events retained per message, source, and type]:count:' '--vacuum[compact database after deletion]' ;; tail) _arguments '--workspace[workspace id]:workspace id:' '--repair-every[repair interval]:duration:' diff --git a/internal/cli/purge.go b/internal/cli/purge.go index 26ec499..70d0346 100644 --- a/internal/cli/purge.go +++ b/internal/cli/purge.go @@ -15,23 +15,25 @@ import ( ) type purgeResponse struct { - Cutoff time.Time `json:"cutoff"` - WorkspaceID string `json:"workspace_id,omitempty"` - DryRun bool `json:"dry_run"` - Messages int64 `json:"messages"` - MessageEvents int64 `json:"message_events"` - MessageFiles int64 `json:"message_files"` - Mentions int64 `json:"mentions"` - EmbeddingJobs int64 `json:"embedding_jobs"` - FTSEntries int64 `json:"fts_entries"` - CachedMediaFiles int64 `json:"cached_media_files"` - CachedMediaBytes int64 `json:"cached_media_bytes"` - CachedMediaDeleted int64 `json:"cached_media_deleted"` - CachedMediaMissing int64 `json:"cached_media_missing"` - CachedMediaRetained int64 `json:"cached_media_retained"` - CachedMediaFailures []string `json:"cached_media_failures,omitempty"` - KeepMedia bool `json:"keep_media"` - Vacuumed bool `json:"vacuumed"` + Cutoff time.Time `json:"cutoff"` + WorkspaceID string `json:"workspace_id,omitempty"` + DryRun bool `json:"dry_run"` + Messages int64 `json:"messages"` + MessageEvents int64 `json:"message_events"` + CompactedMessageEvents int64 `json:"compacted_message_events"` + KeepMessageEvents int `json:"keep_message_events,omitempty"` + MessageFiles int64 `json:"message_files"` + Mentions int64 `json:"mentions"` + EmbeddingJobs int64 `json:"embedding_jobs"` + FTSEntries int64 `json:"fts_entries"` + CachedMediaFiles int64 `json:"cached_media_files"` + CachedMediaBytes int64 `json:"cached_media_bytes"` + CachedMediaDeleted int64 `json:"cached_media_deleted"` + CachedMediaMissing int64 `json:"cached_media_missing"` + CachedMediaRetained int64 `json:"cached_media_retained"` + CachedMediaFailures []string `json:"cached_media_failures,omitempty"` + KeepMedia bool `json:"keep_media"` + Vacuumed bool `json:"vacuumed"` } func (a *App) runPurge(ctx context.Context, configPath string, args []string, format OutputFormat) error { @@ -46,6 +48,7 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo workspaceID := fs.String("workspace", "", "limit purge to workspace id") force := fs.Bool("force", false, "execute deletion instead of previewing") keepMedia := fs.Bool("keep-media", false, "retain unreferenced cached media files") + keepMessageEvents := fs.Int("keep-message-events", 0, "keep newest message events per source and type") vacuum := fs.Bool("vacuum", false, "compact the SQLite database after deletion") if err := fs.Parse(args); err != nil { return err @@ -60,15 +63,22 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo return errors.New("--vacuum requires --force") } workspaceSet := false + keepMessageEventsSet := false fs.Visit(func(item *flag.Flag) { - if item.Name == "workspace" { + switch item.Name { + case "workspace": workspaceSet = true + case "keep-message-events": + keepMessageEventsSet = true } }) workspace := strings.TrimSpace(*workspaceID) if workspaceSet && workspace == "" { return errors.New("--workspace cannot be empty") } + if keepMessageEventsSet && *keepMessageEvents <= 0 { + return errors.New("--keep-message-events must be greater than zero") + } now := a.nowUTC() cutoff, err := resolvePurgeCutoff(now, *before, *olderThan) @@ -90,9 +100,10 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo defer func() { _ = st.Close() }() opts := store.PurgeOptions{ - Before: cutoff, - WorkspaceID: workspace, - Delete: *force, + Before: cutoff, + WorkspaceID: workspace, + Delete: *force, + KeepMessageEvents: *keepMessageEvents, } var report store.PurgeReport var mediaDeleted, mediaMissing, mediaRetained int64 @@ -103,8 +114,9 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo if *force && !*keepMedia { if strings.TrimSpace(cfg.CacheDir) == "" { preview, err := st.PurgeMessages(ctx, store.PurgeOptions{ - Before: cutoff, - WorkspaceID: opts.WorkspaceID, + Before: cutoff, + WorkspaceID: opts.WorkspaceID, + KeepMessageEvents: opts.KeepMessageEvents, }) if err != nil { return err @@ -121,8 +133,9 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo } else { err := media.WithCacheLock(ctx, cfg.CacheDir, func() error { preview, err := st.PurgeMessages(ctx, store.PurgeOptions{ - Before: cutoff, - WorkspaceID: opts.WorkspaceID, + Before: cutoff, + WorkspaceID: opts.WorkspaceID, + KeepMessageEvents: opts.KeepMessageEvents, }) if err != nil { return err @@ -190,7 +203,7 @@ func (a *App) runPurge(ctx context.Context, configPath string, args []string, fo } } } - response := purgeResponseFromStore(cutoff, opts.WorkspaceID, !*force, *keepMedia, report) + response := purgeResponseFromStore(cutoff, opts.WorkspaceID, !*force, *keepMedia, opts.KeepMessageEvents, report) response.CachedMediaDeleted = mediaDeleted response.CachedMediaMissing = mediaMissing response.CachedMediaRetained = mediaRetained @@ -279,19 +292,21 @@ func resolvePurgeCutoff(now time.Time, before, olderThan string) (time.Time, err return now.Add(-duration), nil } -func purgeResponseFromStore(cutoff time.Time, workspaceID string, dryRun, keepMedia bool, report store.PurgeReport) purgeResponse { +func purgeResponseFromStore(cutoff time.Time, workspaceID string, dryRun, keepMedia bool, keepMessageEvents int, report store.PurgeReport) purgeResponse { response := purgeResponse{ - Cutoff: cutoff, - WorkspaceID: workspaceID, - DryRun: dryRun, - Messages: report.Messages, - MessageEvents: report.MessageEvents, - MessageFiles: report.MessageFiles, - Mentions: report.Mentions, - EmbeddingJobs: report.EmbeddingJobs, - FTSEntries: report.FTSEntries, - CachedMediaFiles: int64(len(report.Media)), - KeepMedia: keepMedia, + Cutoff: cutoff, + WorkspaceID: workspaceID, + DryRun: dryRun, + Messages: report.Messages, + MessageEvents: report.MessageEvents, + CompactedMessageEvents: report.CompactedMessageEvents, + KeepMessageEvents: keepMessageEvents, + MessageFiles: report.MessageFiles, + Mentions: report.Mentions, + EmbeddingJobs: report.EmbeddingJobs, + FTSEntries: report.FTSEntries, + CachedMediaFiles: int64(len(report.Media)), + KeepMedia: keepMedia, } for _, item := range report.Media { response.CachedMediaBytes += item.Size @@ -312,6 +327,8 @@ Flags: -workspace string limit purge to one workspace -force execute deletion -keep-media retain unreferenced cached media files + -keep-message-events int + keep newest events per message, source, and type -vacuum compact the database after deletion; requires --force `) } diff --git a/internal/cli/purge_test.go b/internal/cli/purge_test.go index 1212c17..ccb5677 100644 --- a/internal/cli/purge_test.go +++ b/internal/cli/purge_test.go @@ -120,6 +120,60 @@ func TestPurgeCommandValidatesSafetyFlags(t *testing.T) { require.ErrorContains(t, err, "future") err = app.Run(context.Background(), []string{"purge", "--before", "2026-01-01", "--workspace", " "}) require.ErrorContains(t, err, "--workspace cannot be empty") + err = app.Run(context.Background(), []string{"purge", "--before", "2026-01-01", "--keep-message-events", "0"}) + require.ErrorContains(t, err, "--keep-message-events must be greater than zero") +} + +func TestPurgeCommandPreviewsAndCompactsRetainedMessageEvents(t *testing.T) { + dir := t.TempDir() + configPath := filepath.Join(dir, "config.toml") + dbPath := filepath.Join(dir, "slacrawl.db") + cfg := config.Default() + cfg.DBPath = dbPath + cfg.CacheDir = "" + require.NoError(t, cfg.Save(configPath)) + + now := time.Date(2026, 6, 10, 12, 0, 0, 0, time.UTC) + messageTime := now.Add(-24 * time.Hour) + st, err := store.Open(dbPath) + require.NoError(t, err) + require.NoError(t, st.UpsertMessage(context.Background(), store.Message{ + WorkspaceID: "T1", ChannelID: "C1", TS: purgeTestSlackTS(messageTime), + Text: "retained", NormalizedText: "retained", SourceRank: 3, + SourceName: "desktop-indexeddb", RawJSON: `{"version":0}`, UpdatedAt: messageTime, + }, nil)) + for i := 1; i <= 4; i++ { + _, err := st.DB().ExecContext(context.Background(), ` +insert into message_events (channel_id, ts, event_type, source_name, payload_json, created_at) +values ('C1', ?, 'message', 'desktop-indexeddb', ?, ?) +`, purgeTestSlackTS(messageTime), fmt.Sprintf(`{"version":%d}`, i), messageTime.Add(time.Duration(i)*time.Second).Format(time.RFC3339)) + require.NoError(t, err) + } + require.NoError(t, st.Close()) + + var stdout bytes.Buffer + app := &App{Stdout: &stdout, Stderr: &stdout, now: func() time.Time { return now }} + args := []string{ + "--config", configPath, "--json", "purge", "--before", "2026-01-01", + "--keep-message-events", "2", "--keep-media", + } + require.NoError(t, app.Run(context.Background(), args)) + var preview purgeResponse + require.NoError(t, json.Unmarshal(stdout.Bytes(), &preview)) + require.True(t, preview.DryRun) + require.Equal(t, 2, preview.KeepMessageEvents) + require.Equal(t, int64(3), preview.CompactedMessageEvents) + require.Equal(t, int64(5), purgeTestTableCount(t, dbPath, "message_events")) + + stdout.Reset() + args = append(args, "--force") + require.NoError(t, app.Run(context.Background(), args)) + var executed purgeResponse + require.NoError(t, json.Unmarshal(stdout.Bytes(), &executed)) + require.False(t, executed.DryRun) + require.Equal(t, int64(3), executed.CompactedMessageEvents) + require.Equal(t, int64(2), purgeTestTableCount(t, dbPath, "message_events")) + require.Equal(t, int64(1), purgeTestMessageCount(t, dbPath)) } func TestPurgeCommandKeepMedia(t *testing.T) { @@ -368,11 +422,15 @@ func TestRemovePurgeMediaContinuesAfterFailure(t *testing.T) { } func purgeTestMessageCount(t *testing.T, dbPath string) int64 { + return purgeTestTableCount(t, dbPath, "messages") +} + +func purgeTestTableCount(t *testing.T, dbPath, table string) int64 { t.Helper() st, err := store.Open(dbPath) require.NoError(t, err) defer func() { require.NoError(t, st.Close()) }() - rows, err := st.QueryReadOnly(context.Background(), "select count(*) as n from messages") + rows, err := st.QueryReadOnly(context.Background(), "select count(*) as n from "+table) require.NoError(t, err) return rows[0]["n"].(int64) } diff --git a/internal/share/share.go b/internal/share/share.go index 7cb6468..c1701f7 100644 --- a/internal/share/share.go +++ b/internal/share/share.go @@ -246,6 +246,9 @@ func importLocked(ctx context.Context, s *store.Store, opts Options) (Manifest, if _, err := tx.ExecContext(ctx, `delete from message_fts`); err != nil { return Manifest{}, fmt.Errorf("clear message_fts: %w", err) } + if _, err := tx.ExecContext(ctx, `delete from message_event_heads`); err != nil { + return Manifest{}, fmt.Errorf("clear message event heads: %w", err) + } for i := len(SnapshotTables) - 1; i >= 0; i-- { table := SnapshotTables[i] if _, err := tx.ExecContext(ctx, "delete from "+quoteIdent(table)); err != nil { //nolint:gosec // Snapshot table names are quoted identifiers from the fixed schema list. diff --git a/internal/share/share_test.go b/internal/share/share_test.go index c7af599..3e4b028 100644 --- a/internal/share/share_test.go +++ b/internal/share/share_test.go @@ -33,8 +33,7 @@ func TestExportImportRoundTrip(t *testing.T) { require.NotEmpty(t, manifest.Tables) require.FileExists(t, filepath.Join(opts.RepoPath, ManifestName)) - reader, err := store.Open(filepath.Join(dir, "reader.db")) - require.NoError(t, err) + reader := seedStore(t, filepath.Join(dir, "reader.db")) defer func() { require.NoError(t, reader.Close()) }() imported, err := Import(ctx, reader, opts) @@ -45,6 +44,9 @@ func TestExportImportRoundTrip(t *testing.T) { require.NoError(t, err) require.Len(t, rows, 1) require.Equal(t, "git backed archive works", rows[0].Text) + heads, err := reader.QueryReadOnly(ctx, `select count(*) as count from message_event_heads`) + require.NoError(t, err) + require.Equal(t, int64(0), heads[0]["count"]) } func TestImportRejectsIncompleteManifestBeforeClearing(t *testing.T) { diff --git a/internal/store/purge.go b/internal/store/purge.go index cf2c931..d940cc9 100644 --- a/internal/store/purge.go +++ b/internal/store/purge.go @@ -11,7 +11,10 @@ import ( "github.com/openclaw/slacrawl/internal/store/storedb" ) -const purgeMessageKeysTable = "temp.slacrawl_purge_message_keys" +const ( + purgeMessageKeysTable = "temp.slacrawl_purge_message_keys" + purgeMessageEventIDsTable = "temp.slacrawl_purge_message_event_ids" +) const ( retentionFloorSource = "retention" @@ -25,10 +28,11 @@ and cast(substr(ts, 7, instr(substr(ts, 7), ':') - 1) as real) >= 1000000000` ) type PurgeOptions struct { - Before time.Time - WorkspaceID string - Delete bool - RequireNoMedia bool + Before time.Time + WorkspaceID string + Delete bool + RequireNoMedia bool + KeepMessageEvents int } type PurgeMedia struct { @@ -37,19 +41,23 @@ type PurgeMedia struct { } type PurgeReport struct { - Messages int64 - MessageEvents int64 - MessageFiles int64 - Mentions int64 - EmbeddingJobs int64 - FTSEntries int64 - Media []PurgeMedia + Messages int64 + MessageEvents int64 + CompactedMessageEvents int64 + MessageFiles int64 + Mentions int64 + EmbeddingJobs int64 + FTSEntries int64 + Media []PurgeMedia } func (s *Store) PurgeMessages(ctx context.Context, opts PurgeOptions) (PurgeReport, error) { if opts.Before.IsZero() { return PurgeReport{}, errors.New("purge cutoff is required") } + if opts.KeepMessageEvents < 0 { + return PurgeReport{}, errors.New("message event retention cannot be negative") + } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return PurgeReport{}, err @@ -102,8 +110,11 @@ where ( if err := selectUntimestampedPurgeDrafts(ctx, tx, before, opts.WorkspaceID); err != nil { return PurgeReport{}, err } + if err := selectMessageEventsForCompaction(ctx, tx, opts.WorkspaceID, opts.KeepMessageEvents); err != nil { + return PurgeReport{}, err + } - report, err := readPurgeReport(ctx, tx) + report, err := readPurgeReport(ctx, tx, opts.KeepMessageEvents > 0) if err != nil { return PurgeReport{}, err } @@ -123,6 +134,14 @@ where ( if err := deletePurgeSelection(ctx, tx); err != nil { return PurgeReport{}, err } + if err := deleteMessageEventCompactionSelection(ctx, tx, opts.KeepMessageEvents > 0); err != nil { + return PurgeReport{}, err + } + } + if opts.KeepMessageEvents > 0 { + if _, err := tx.ExecContext(ctx, `drop table `+purgeMessageEventIDsTable); err != nil { + return PurgeReport{}, fmt.Errorf("drop message event compaction selection: %w", err) + } } if _, err := tx.ExecContext(ctx, `drop table `+purgeMessageKeysTable); err != nil { return PurgeReport{}, fmt.Errorf("drop purge selection: %w", err) @@ -287,7 +306,48 @@ values (?, ?, ?) return nil } -func readPurgeReport(ctx context.Context, tx *sql.Tx) (PurgeReport, error) { +func selectMessageEventsForCompaction(ctx context.Context, tx *sql.Tx, workspaceID string, keep int) error { + if keep == 0 { + return nil + } + if _, err := tx.ExecContext(ctx, `drop table if exists `+purgeMessageEventIDsTable); err != nil { + return fmt.Errorf("reset message event compaction selection: %w", err) + } + if _, err := tx.ExecContext(ctx, ` +create temporary table slacrawl_purge_message_event_ids ( + id integer primary key +) +`); err != nil { + return fmt.Errorf("create message event compaction selection: %w", err) + } + workspaceID = strings.TrimSpace(workspaceID) + if _, err := tx.ExecContext(ctx, ` +insert into `+purgeMessageEventIDsTable+` (id) +select id +from ( + select + e.id, + row_number() over ( + partition by e.channel_id, e.ts, e.event_type, e.source_name + order by e.id desc + ) as retention_rank + from message_events e + join messages m on m.channel_id = e.channel_id and m.ts = e.ts + where (? = '' or m.workspace_id = ?) + and not exists ( + select 1 + from `+purgeMessageKeysTable+` p + where p.workspace_id = m.workspace_id and p.channel_id = e.channel_id and p.ts = e.ts + ) +) +where retention_rank > ? +`, workspaceID, workspaceID, keep); err != nil { + return fmt.Errorf("select message events for compaction: %w", err) + } + return nil +} + +func readPurgeReport(ctx context.Context, tx *sql.Tx, compactMessageEvents bool) (PurgeReport, error) { report := PurgeReport{} counts := []struct { label string @@ -329,6 +389,11 @@ join ` + purgeMessageKeysTable + ` p on f.message_key = p.channel_id || '|' || p return PurgeReport{}, fmt.Errorf("count purge %s: %w", count.label, err) } } + if compactMessageEvents { + if err := tx.QueryRowContext(ctx, `select count(*) from `+purgeMessageEventIDsTable).Scan(&report.CompactedMessageEvents); err != nil { + return PurgeReport{}, fmt.Errorf("count compacted message events: %w", err) + } + } rows, err := tx.QueryContext(ctx, ` select f.media_path, max(f.content_size) @@ -376,6 +441,13 @@ where exists ( select 1 from ` + purgeMessageKeysTable + ` p join messages m on m.workspace_id = p.workspace_id and m.channel_id = message_events.channel_id and m.ts = message_events.ts where p.channel_id = message_events.channel_id and p.ts = message_events.ts +)`}, + {"message event heads", ` +delete from message_event_heads +where exists ( + select 1 from ` + purgeMessageKeysTable + ` p + join messages m on m.workspace_id = p.workspace_id and m.channel_id = message_event_heads.channel_id and m.ts = message_event_heads.ts + where p.channel_id = message_event_heads.channel_id and p.ts = message_event_heads.ts )`}, {"message files", ` delete from message_files @@ -418,6 +490,19 @@ where exists ( return nil } +func deleteMessageEventCompactionSelection(ctx context.Context, tx *sql.Tx, selected bool) error { + if !selected { + return nil + } + if _, err := tx.ExecContext(ctx, ` +delete from message_events +where id in (select id from `+purgeMessageEventIDsTable+`) +`); err != nil { + return fmt.Errorf("compact retained message events: %w", err) + } + return nil +} + func (s *Store) Vacuum(ctx context.Context) error { _, err := s.db.ExecContext(ctx, "vacuum") return err diff --git a/internal/store/purge_test.go b/internal/store/purge_test.go index b3b068f..40e98c5 100644 --- a/internal/store/purge_test.go +++ b/internal/store/purge_test.go @@ -2,6 +2,7 @@ package store import ( "context" + "fmt" "path/filepath" "testing" "time" @@ -50,6 +51,7 @@ values (?, ?, 'pending', ?), (?, ?, 'pending', ?) require.Equal(t, preview, executed) requireTableCount(t, st, "messages", 2) requireTableCount(t, st, "message_events", 2) + requireTableCount(t, st, "message_event_heads", 2) requireTableCount(t, st, "message_files", 2) requireTableCount(t, st, "message_mentions", 2) requireTableCount(t, st, "embedding_jobs", 1) @@ -68,6 +70,73 @@ values (?, ?, 'pending', ?), (?, ?, 'pending', ?) require.Empty(t, empty.Media) } +func TestPurgeMessagesCompactsRetainedEventHistoryBySourceAndType(t *testing.T) { + st, err := Open(filepath.Join(t.TempDir(), "slacrawl.db")) + require.NoError(t, err) + defer func() { require.NoError(t, st.Close()) }() + + ctx := context.Background() + oldTime := time.Date(2025, 12, 1, 12, 0, 0, 0, time.UTC) + newTime := time.Date(2026, 2, 1, 12, 0, 0, 0, time.UTC) + cutoff := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + upsertPurgeTestMessage(t, st, "T1", "C-old", oldTime, "old", "F-old", "", 0) + upsertPurgeTestMessage(t, st, "T1", "C-keep", newTime, "keep", "F-keep", "", 0) + upsertPurgeTestMessage(t, st, "T2", "C-other", newTime, "other", "F-other", "", 0) + + retainedTS := slackTSFromTime(newTime) + for i := 1; i <= 4; i++ { + _, err := st.DB().ExecContext(ctx, ` +insert into message_events (channel_id, ts, event_type, source_name, payload_json, created_at) +values (?, ?, 'message', 'api-bot', ?, ?) +`, "C-keep", retainedTS, fmt.Sprintf(`{"api":%d}`, i), formatDBTime(newTime.Add(time.Duration(i)*time.Second))) + require.NoError(t, err) + } + for i := 1; i <= 3; i++ { + _, err := st.DB().ExecContext(ctx, ` +insert into message_events (channel_id, ts, event_type, source_name, payload_json, created_at) +values (?, ?, 'message_changed', 'desktop-indexeddb', ?, ?) +`, "C-keep", retainedTS, fmt.Sprintf(`{"desktop":%d}`, i), formatDBTime(newTime.Add(time.Duration(i)*time.Second))) + require.NoError(t, err) + } + for i := 1; i <= 2; i++ { + _, err := st.DB().ExecContext(ctx, ` +insert into message_events (channel_id, ts, event_type, source_name, payload_json, created_at) +values (?, ?, 'message', 'api-bot', ?, ?) +`, "C-other", retainedTS, fmt.Sprintf(`{"other":%d}`, i), formatDBTime(newTime.Add(time.Duration(i)*time.Second))) + require.NoError(t, err) + } + + opts := PurgeOptions{Before: cutoff, WorkspaceID: "T1", KeepMessageEvents: 2} + preview, err := st.PurgeMessages(ctx, opts) + require.NoError(t, err) + require.Equal(t, int64(1), preview.Messages) + require.Equal(t, int64(1), preview.MessageEvents) + require.Equal(t, int64(4), preview.CompactedMessageEvents) + requireTableCount(t, st, "message_events", 12) + + opts.Delete = true + executed, err := st.PurgeMessages(ctx, opts) + require.NoError(t, err) + require.Equal(t, preview, executed) + requireTableCount(t, st, "message_events", 7) + + rows, err := st.QueryReadOnly(ctx, ` +select channel_id, event_type, source_name, payload_json +from message_events +order by channel_id, event_type, source_name, id +`) + require.NoError(t, err) + require.Equal(t, []map[string]any{ + {"channel_id": "C-keep", "event_type": "message", "source_name": "api-bot", "payload_json": `{"api":3}`}, + {"channel_id": "C-keep", "event_type": "message", "source_name": "api-bot", "payload_json": `{"api":4}`}, + {"channel_id": "C-keep", "event_type": "message_changed", "source_name": "desktop-indexeddb", "payload_json": `{"desktop":2}`}, + {"channel_id": "C-keep", "event_type": "message_changed", "source_name": "desktop-indexeddb", "payload_json": `{"desktop":3}`}, + {"channel_id": "C-other", "event_type": "message", "source_name": "api-bot", "payload_json": `{}`}, + {"channel_id": "C-other", "event_type": "message", "source_name": "api-bot", "payload_json": `{"other":1}`}, + {"channel_id": "C-other", "event_type": "message", "source_name": "api-bot", "payload_json": `{"other":2}`}, + }, rows) +} + func TestPurgeMessagesWorkspaceScopeKeepsOtherWorkspaceRows(t *testing.T) { st, err := Open(filepath.Join(t.TempDir(), "slacrawl.db")) require.NoError(t, err) diff --git a/internal/store/sqlc/queries.sql b/internal/store/sqlc/queries.sql index 948aeb4..89e9373 100644 --- a/internal/store/sqlc/queries.sql +++ b/internal/store/sqlc/queries.sql @@ -187,6 +187,17 @@ insert into message_fts (message_key, content) values (?, ?); insert into message_events (channel_id, ts, event_type, source_name, payload_json, created_at) values (?, ?, ?, ?, ?, ?); +-- name: GetMessageEventHead :one +select payload_json +from message_event_heads +where channel_id = ? and ts = ? and event_type = ? and source_name = ?; + +-- name: UpsertMessageEventHead :exec +insert into message_event_heads (channel_id, ts, event_type, source_name, payload_json) +values (?, ?, ?, ?, ?) +on conflict(channel_id, ts, event_type, source_name) do update set + payload_json = excluded.payload_json; + -- name: MarkMessageDeleted :execrows update messages set deleted_ts = ?, diff --git a/internal/store/sqlc/schema.sql b/internal/store/sqlc/schema.sql index 87d3e5c..0b861c1 100644 --- a/internal/store/sqlc/schema.sql +++ b/internal/store/sqlc/schema.sql @@ -107,6 +107,15 @@ create table message_events ( created_at text not null ); +create table message_event_heads ( + channel_id text not null, + ts text not null, + event_type text not null, + source_name text not null, + payload_json text not null, + primary key (channel_id, ts, event_type, source_name) +); + create table sync_state ( source_name text not null, entity_type text not null, diff --git a/internal/store/store.go b/internal/store/store.go index fd978ce..4a84f2d 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -17,7 +17,7 @@ import ( "github.com/openclaw/slacrawl/internal/store/storedb" ) -const schemaVersion = 3 +const schemaVersion = 4 const schemaPragmas = ` pragma foreign_keys = on; @@ -138,6 +138,15 @@ create table if not exists message_events ( created_at text not null ); +create table if not exists message_event_heads ( + channel_id text not null, + ts text not null, + event_type text not null, + source_name text not null, + payload_json text not null, + primary key (channel_id, ts, event_type, source_name) +); + create table if not exists sync_state ( source_name text not null, entity_type text not null, @@ -214,6 +223,34 @@ create index if not exists idx_message_files_file_id on message_files(file_id); create index if not exists idx_message_files_name on message_files(name); ` +const schemaV4Migration = ` +create table if not exists message_event_heads ( + channel_id text not null, + ts text not null, + event_type text not null, + source_name text not null, + payload_json text not null, + primary key (channel_id, ts, event_type, source_name) +); + +-- Seed canonical snapshots without scanning potentially multi-gigabyte event +-- history during database open. Non-canonical source/type partitions seed +-- lazily on their next event, bounding upgrade overhead to one row per active +-- partition instead of an unbounded migration. +insert or ignore into message_event_heads (channel_id, ts, event_type, source_name, payload_json) +select + channel_id, + ts, + case + when trim(coalesce(deleted_ts, '')) <> '' then 'message_deleted' + when trim(coalesce(edited_ts, '')) <> '' then 'message_changed' + else 'message' + end, + source_name, + raw_json +from messages; +` + type Store struct { db *sql.DB q *storedb.Queries @@ -886,14 +923,7 @@ func (s *Store) upsertMessage(ctx context.Context, message Message, mentions []M return false, err } - if err := qtx.InsertMessageEvent(ctx, storedb.InsertMessageEventParams{ - ChannelID: message.ChannelID, - Ts: message.TS, - EventType: eventType(message), - SourceName: message.SourceName, - PayloadJson: message.RawJSON, - CreatedAt: formatDBTime(message.UpdatedAt), - }); err != nil { + if err := appendMessageEvent(ctx, qtx, message, formatDBTime(message.UpdatedAt)); err != nil { return false, err } @@ -937,6 +967,7 @@ select exists ( } for _, query := range []string{ `delete from message_events where channel_id = ? and ts = ?`, + `delete from message_event_heads where channel_id = ? and ts = ?`, `delete from message_files where channel_id = ? and ts = ?`, `delete from message_mentions where channel_id = ? and ts = ?`, `delete from embedding_jobs where channel_id = ? and ts = ?`, @@ -1048,14 +1079,7 @@ func (s *Store) markMessageDeleted(ctx context.Context, message Message, mention return false, err } } - if err := qtx.InsertMessageEvent(ctx, storedb.InsertMessageEventParams{ - ChannelID: message.ChannelID, - Ts: message.TS, - EventType: eventType(message), - SourceName: message.SourceName, - PayloadJson: message.RawJSON, - CreatedAt: updatedAt, - }); err != nil { + if err := appendMessageEvent(ctx, qtx, message, updatedAt); err != nil { return false, err } if err := commit(); err != nil { @@ -2305,6 +2329,41 @@ func eventType(message Message) string { } } +func appendMessageEvent(ctx context.Context, qtx *storedb.Queries, message Message, createdAt string) error { + typeName := eventType(message) + head, err := qtx.GetMessageEventHead(ctx, storedb.GetMessageEventHeadParams{ + ChannelID: message.ChannelID, + Ts: message.TS, + EventType: typeName, + SourceName: message.SourceName, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("read message event head: %w", err) + } + if errors.Is(err, sql.ErrNoRows) || head != message.RawJSON { + if err := qtx.InsertMessageEvent(ctx, storedb.InsertMessageEventParams{ + ChannelID: message.ChannelID, + Ts: message.TS, + EventType: typeName, + SourceName: message.SourceName, + PayloadJson: message.RawJSON, + CreatedAt: createdAt, + }); err != nil { + return err + } + } + if err := qtx.UpsertMessageEventHead(ctx, storedb.UpsertMessageEventHeadParams{ + ChannelID: message.ChannelID, + Ts: message.TS, + EventType: typeName, + SourceName: message.SourceName, + PayloadJson: message.RawJSON, + }); err != nil { + return fmt.Errorf("update message event head: %w", err) + } + return nil +} + func messageKey(channelID string, ts string) string { return strings.TrimSpace(channelID) + "|" + strings.TrimSpace(ts) } @@ -2384,6 +2443,12 @@ func migrateSchema(db *sql.DB, currentVersion int) error { } currentVersion = 3 } + if currentVersion < 4 { + if _, err := tx.Exec(schemaV4Migration); err != nil { + return fmt.Errorf("migrate sqlite schema to v4: %w", err) + } + currentVersion = 4 + } if currentVersion != schemaVersion { return fmt.Errorf("no migration path from sqlite schema version %d to %d", currentVersion, schemaVersion) } @@ -2405,16 +2470,17 @@ type schemaQueryer interface { func validateCurrentSchema(q schemaQueryer) error { required := map[string][]string{ - "workspaces": {"id", "name", "domain", "enterprise_id", "raw_json", "updated_at"}, - "channels": {"id", "workspace_id", "name", "kind", "topic", "purpose", "is_private", "is_archived", "is_shared", "is_general", "raw_json", "updated_at"}, - "users": {"id", "workspace_id", "name", "real_name", "display_name", "title", "is_bot", "is_deleted", "raw_json", "updated_at"}, - "messages": {"channel_id", "ts", "workspace_id", "user_id", "subtype", "client_msg_id", "thread_ts", "parent_user_id", "text", "normalized_text", "reply_count", "latest_reply", "edited_ts", "deleted_ts", "source_rank", "source_name", "raw_json", "updated_at"}, - "message_files": {"workspace_id", "channel_id", "ts", "file_id", "user_id", "name", "title", "mimetype", "filetype", "pretty_type", "mode", "size", "url_private", "url_private_download", "permalink", "is_public", "plain_text", "preview_plain_text", "media_path", "content_sha256", "content_size", "fetched_at", "fetch_status", "fetch_error", "raw_json", "updated_at"}, - "message_events": {"id", "channel_id", "ts", "event_type", "source_name", "payload_json", "created_at"}, - "sync_state": {"source_name", "entity_type", "entity_id", "value", "updated_at"}, - "message_mentions": {"channel_id", "ts", "mention_type", "target_id", "display_text"}, - "embedding_jobs": {"id", "channel_id", "ts", "state", "created_at"}, - "message_fts": {"message_key", "content"}, + "workspaces": {"id", "name", "domain", "enterprise_id", "raw_json", "updated_at"}, + "channels": {"id", "workspace_id", "name", "kind", "topic", "purpose", "is_private", "is_archived", "is_shared", "is_general", "raw_json", "updated_at"}, + "users": {"id", "workspace_id", "name", "real_name", "display_name", "title", "is_bot", "is_deleted", "raw_json", "updated_at"}, + "messages": {"channel_id", "ts", "workspace_id", "user_id", "subtype", "client_msg_id", "thread_ts", "parent_user_id", "text", "normalized_text", "reply_count", "latest_reply", "edited_ts", "deleted_ts", "source_rank", "source_name", "raw_json", "updated_at"}, + "message_files": {"workspace_id", "channel_id", "ts", "file_id", "user_id", "name", "title", "mimetype", "filetype", "pretty_type", "mode", "size", "url_private", "url_private_download", "permalink", "is_public", "plain_text", "preview_plain_text", "media_path", "content_sha256", "content_size", "fetched_at", "fetch_status", "fetch_error", "raw_json", "updated_at"}, + "message_events": {"id", "channel_id", "ts", "event_type", "source_name", "payload_json", "created_at"}, + "message_event_heads": {"channel_id", "ts", "event_type", "source_name", "payload_json"}, + "sync_state": {"source_name", "entity_type", "entity_id", "value", "updated_at"}, + "message_mentions": {"channel_id", "ts", "mention_type", "target_id", "display_text"}, + "embedding_jobs": {"id", "channel_id", "ts", "state", "created_at"}, + "message_fts": {"message_key", "content"}, } for table, columns := range required { if err := requireSchemaColumns(q, table, columns); err != nil { diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 862ab4e..4dc5d5a 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -112,6 +112,77 @@ func TestUpsertMessageDeduplicatesMentions(t *testing.T) { require.Equal(t, int64(1), rows[0]["n"]) } +func TestUpsertMessageSuppressesConsecutiveDuplicateEventsAndPreservesReversions(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + s, err := Open(dbPath) + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + + ctx := context.Background() + now := time.Now().UTC() + message := Message{ + ChannelID: "C1", TS: "123.45", WorkspaceID: "T1", + Text: "alpha", NormalizedText: "alpha", SourceRank: 3, + SourceName: "desktop-indexeddb", RawJSON: `{"text":"alpha"}`, UpdatedAt: now, + } + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + message.UpdatedAt = now.Add(time.Second) + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + + message.Text = "beta" + message.NormalizedText = "beta" + message.RawJSON = `{"text":"beta"}` + message.UpdatedAt = now.Add(2 * time.Second) + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + + message.Text = "alpha" + message.NormalizedText = "alpha" + message.RawJSON = `{"text":"alpha"}` + message.UpdatedAt = now.Add(3 * time.Second) + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + + rows, err := s.QueryReadOnly(ctx, `select payload_json from message_events order by id`) + require.NoError(t, err) + require.Equal(t, []map[string]any{ + {"payload_json": `{"text":"alpha"}`}, + {"payload_json": `{"text":"beta"}`}, + {"payload_json": `{"text":"alpha"}`}, + }, rows) +} + +func TestUpsertMessageDeduplicatesLowerPrioritySourceEvents(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + s, err := Open(dbPath) + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + + ctx := context.Background() + now := time.Now().UTC() + apiMessage := Message{ + ChannelID: "C1", TS: "123.45", WorkspaceID: "T1", + Text: "api", NormalizedText: "api", SourceRank: 1, + SourceName: "api-user", RawJSON: `{"source":"api"}`, UpdatedAt: now, + } + require.NoError(t, s.UpsertMessage(ctx, apiMessage, nil)) + desktopMessage := apiMessage + desktopMessage.Text = "desktop" + desktopMessage.NormalizedText = "desktop" + desktopMessage.SourceRank = 3 + desktopMessage.SourceName = "desktop-indexeddb" + desktopMessage.RawJSON = `{"source":"desktop"}` + desktopMessage.UpdatedAt = now.Add(time.Second) + require.NoError(t, s.UpsertMessage(ctx, desktopMessage, nil)) + desktopMessage.UpdatedAt = now.Add(2 * time.Second) + require.NoError(t, s.UpsertMessage(ctx, desktopMessage, nil)) + + rows, err := s.QueryReadOnly(ctx, `select source_name, payload_json from message_events order by id`) + require.NoError(t, err) + require.Equal(t, []map[string]any{ + {"source_name": "api-user", "payload_json": `{"source":"api"}`}, + {"source_name": "desktop-indexeddb", "payload_json": `{"source":"desktop"}`}, + }, rows) +} + func TestUpsertMessagePreservesSourcePrecedenceAndRefreshesSearch(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "test.db") s, err := Open(dbPath) @@ -549,6 +620,36 @@ func TestOpenMigratesVersion2Schema(t *testing.T) { matches, err := s.Search(ctx, "", "appendix", 10) require.NoError(t, err) require.Len(t, matches, 1) + requireTableCount(t, s, "message_event_heads", 1) +} + +func TestOpenMigratesVersion3AndSeedsCanonicalEventHeads(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + s, err := Open(dbPath) + require.NoError(t, err) + ctx := context.Background() + now := time.Now().UTC() + message := Message{ + ChannelID: "C1", TS: "123.45", WorkspaceID: "T1", + Text: "unchanged", NormalizedText: "unchanged", SourceRank: 3, + SourceName: "desktop-indexeddb", RawJSON: `{"text":"unchanged"}`, UpdatedAt: now, + } + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + require.NoError(t, s.Close()) + + db, err := sql.Open("sqlite", dbPath) + require.NoError(t, err) + _, err = db.Exec(`drop table message_event_heads; pragma user_version = 3;`) + require.NoError(t, err) + require.NoError(t, db.Close()) + + s, err = Open(dbPath) + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + requireTableCount(t, s, "message_event_heads", 1) + message.UpdatedAt = now.Add(time.Second) + require.NoError(t, s.UpsertMessage(ctx, message, nil)) + requireTableCount(t, s, "message_events", 1) } func TestOpenDoesNotStampInvalidOldSchema(t *testing.T) { diff --git a/internal/store/storedb/models.go b/internal/store/storedb/models.go index b7c3097..9a288ce 100644 --- a/internal/store/storedb/models.go +++ b/internal/store/storedb/models.go @@ -62,6 +62,14 @@ type MessageEvent struct { CreatedAt string `json:"created_at"` } +type MessageEventHead struct { + ChannelID string `json:"channel_id"` + Ts string `json:"ts"` + EventType string `json:"event_type"` + SourceName string `json:"source_name"` + PayloadJson string `json:"payload_json"` +} + type MessageFile struct { WorkspaceID string `json:"workspace_id"` ChannelID string `json:"channel_id"` diff --git a/internal/store/storedb/queries.sql.go b/internal/store/storedb/queries.sql.go index 5892f4b..5ab83a0 100644 --- a/internal/store/storedb/queries.sql.go +++ b/internal/store/storedb/queries.sql.go @@ -232,6 +232,31 @@ func (q *Queries) GetChannelWorkspace(ctx context.Context, id string) (string, e return workspace_id, err } +const getMessageEventHead = `-- name: GetMessageEventHead :one +select payload_json +from message_event_heads +where channel_id = ? and ts = ? and event_type = ? and source_name = ? +` + +type GetMessageEventHeadParams struct { + ChannelID string `json:"channel_id"` + Ts string `json:"ts"` + EventType string `json:"event_type"` + SourceName string `json:"source_name"` +} + +func (q *Queries) GetMessageEventHead(ctx context.Context, arg GetMessageEventHeadParams) (string, error) { + row := q.db.QueryRowContext(ctx, getMessageEventHead, + arg.ChannelID, + arg.Ts, + arg.EventType, + arg.SourceName, + ) + var payload_json string + err := row.Scan(&payload_json) + return payload_json, err +} + const getMessageSearchText = `-- name: GetMessageSearchText :one select normalized_text from messages @@ -1574,6 +1599,32 @@ func (q *Queries) UpsertMessageByPriority(ctx context.Context, arg UpsertMessage return result.RowsAffected() } +const upsertMessageEventHead = `-- name: UpsertMessageEventHead :exec +insert into message_event_heads (channel_id, ts, event_type, source_name, payload_json) +values (?, ?, ?, ?, ?) +on conflict(channel_id, ts, event_type, source_name) do update set + payload_json = excluded.payload_json +` + +type UpsertMessageEventHeadParams struct { + ChannelID string `json:"channel_id"` + Ts string `json:"ts"` + EventType string `json:"event_type"` + SourceName string `json:"source_name"` + PayloadJson string `json:"payload_json"` +} + +func (q *Queries) UpsertMessageEventHead(ctx context.Context, arg UpsertMessageEventHeadParams) error { + _, err := q.db.ExecContext(ctx, upsertMessageEventHead, + arg.ChannelID, + arg.Ts, + arg.EventType, + arg.SourceName, + arg.PayloadJson, + ) + return err +} + const upsertMessageMention = `-- name: UpsertMessageMention :exec insert into message_mentions (channel_id, ts, mention_type, target_id, display_text) values (?, ?, ?, ?, ?)