Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions cmd/ingestor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func applySchema(db *sql.DB) error {
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor REAL,
inactive INTEGER DEFAULT 0
inactive INTEGER DEFAULT 0,
last_packet_at TEXT DEFAULT NULL
);

CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
Expand Down Expand Up @@ -421,6 +422,28 @@ func applySchema(db *sql.DB) error {
log.Println("[migration] observations.raw_hex column added")
}

// Migration: add last_packet_at column to observers (#last-packet-at)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_last_packet_at_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding last_packet_at column to observers...")
_, alterErr := db.Exec(`ALTER TABLE observers ADD COLUMN last_packet_at TEXT DEFAULT NULL`)
if alterErr != nil && !strings.Contains(alterErr.Error(), "duplicate column") {
return fmt.Errorf("observers last_packet_at ALTER: %w", alterErr)
}
// Backfill: set last_packet_at = last_seen only for observers that actually have
// observation rows (packet_count alone is unreliable — UpsertObserver sets it to 1
// on INSERT even for status-only observers).
res, err := db.Exec(`UPDATE observers SET last_packet_at = last_seen
WHERE last_packet_at IS NULL
AND rowid IN (SELECT DISTINCT observer_idx FROM observations WHERE observer_idx IS NOT NULL)`)
if err == nil {
n, _ := res.RowsAffected()
log.Printf("[migration] Backfilled last_packet_at for %d observers with packets", n)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_last_packet_at_v1')`)
log.Println("[migration] observers.last_packet_at column added")
}

return nil
}

Expand Down Expand Up @@ -504,7 +527,7 @@ func (s *Store) prepareStatements() error {
return err
}

s.stmtUpdateObserverLastSeen, err = s.db.Prepare("UPDATE observers SET last_seen = ? WHERE rowid = ?")
s.stmtUpdateObserverLastSeen, err = s.db.Prepare("UPDATE observers SET last_seen = ?, last_packet_at = ? WHERE rowid = ?")
if err != nil {
return err
}
Expand Down Expand Up @@ -583,9 +606,9 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
if err == nil {
observerIdx = &rowid
// Update observer last_seen on every packet to prevent
// Update observer last_seen and last_packet_at on every packet to prevent
// low-traffic observers from appearing offline (#463)
_, _ = s.stmtUpdateObserverLastSeen.Exec(now, rowid)
_, _ = s.stmtUpdateObserverLastSeen.Exec(now, now, rowid)
}
}

Expand Down
55 changes: 55 additions & 0 deletions cmd/ingestor/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,61 @@ func TestInsertTransmissionUpdatesObserverLastSeen(t *testing.T) {
}
}

func TestLastPacketAtUpdatedOnPacketOnly(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()

// Insert observer via status path — last_packet_at should be NULL
if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err)
}

var lastPacketAt sql.NullString
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt)
if lastPacketAt.Valid {
t.Fatalf("expected last_packet_at to be NULL after UpsertObserver, got %s", lastPacketAt.String)
}

// Insert a packet from this observer — last_packet_at should be set
data := &PacketData{
RawHex: "0A00D69F",
Timestamp: "2026-04-24T12:00:00Z",
ObserverID: "obs1",
Hash: "lastpackettest123456",
RouteType: 2,
PayloadType: 2,
PathJSON: "[]",
DecodedJSON: `{"type":"TXT_MSG"}`,
}
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}

s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt)
if !lastPacketAt.Valid {
t.Fatal("expected last_packet_at to be non-NULL after InsertTransmission")
}
// InsertTransmission uses `now = data.Timestamp || time.Now()`, so last_packet_at
// should match the packet's Timestamp when provided (same source-of-truth as last_seen).
if lastPacketAt.String != "2026-04-24T12:00:00Z" {
t.Errorf("expected last_packet_at=2026-04-24T12:00:00Z, got %s", lastPacketAt.String)
}

// UpsertObserver again (status path) — last_packet_at should NOT change
if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err)
}

var lastPacketAtAfterStatus sql.NullString
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAtAfterStatus)
if !lastPacketAtAfterStatus.Valid || lastPacketAtAfterStatus.String != lastPacketAt.String {
t.Errorf("UpsertObserver should not change last_packet_at; expected %s, got %v", lastPacketAt.String, lastPacketAtAfterStatus)
}
}

func TestEndToEndIngest(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions cmd/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Observer struct {
BatteryMv *int `json:"battery_mv"`
UptimeSecs *int64 `json:"uptime_secs"`
NoiseFloor *float64 `json:"noise_floor"`
LastPacketAt *string `json:"last_packet_at"`
}

// Transmission represents a row from the transmissions table.
Expand Down Expand Up @@ -972,7 +973,7 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]

// GetObservers returns active observers (not soft-deleted) sorted by last_seen DESC.
func (db *DB) GetObservers() ([]Observer, error) {
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC")
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC")
if err != nil {
return nil, err
}
Expand All @@ -983,7 +984,7 @@ func (db *DB) GetObservers() ([]Observer, error) {
var o Observer
var batteryMv, uptimeSecs sql.NullInt64
var noiseFloor sql.NullFloat64
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor); err != nil {
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt); err != nil {
continue
}
if batteryMv.Valid {
Expand All @@ -1006,8 +1007,8 @@ func (db *DB) GetObserverByID(id string) (*Observer, error) {
var o Observer
var batteryMv, uptimeSecs sql.NullInt64
var noiseFloor sql.NullFloat64
err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE id = ?", id).
Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor)
err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at FROM observers WHERE id = ?", id).
Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt)
if err != nil {
return nil, err
}
Expand Down
52 changes: 50 additions & 2 deletions cmd/server/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func setupTestDB(t *testing.T) *DB {
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor REAL,
inactive INTEGER DEFAULT 0
inactive INTEGER DEFAULT 0,
last_packet_at TEXT DEFAULT NULL
);

CREATE TABLE transmissions (
Expand Down Expand Up @@ -356,6 +357,10 @@ func TestGetObservers(t *testing.T) {
if observers[0].ID != "obs1" {
t.Errorf("expected obs1 first (most recent), got %s", observers[0].ID)
}
// last_packet_at should be nil since seedTestData doesn't set it
if observers[0].LastPacketAt != nil {
t.Errorf("expected nil LastPacketAt for obs1 from seed, got %v", *observers[0].LastPacketAt)
}
}

// Regression: GetObservers must exclude soft-deleted (inactive=1) rows.
Expand Down Expand Up @@ -395,6 +400,48 @@ func TestGetObserverByID(t *testing.T) {
if obs.ID != "obs1" {
t.Errorf("expected obs1, got %s", obs.ID)
}
// Verify last_packet_at is nil by default
if obs.LastPacketAt != nil {
t.Errorf("expected nil LastPacketAt, got %v", *obs.LastPacketAt)
}
}

func TestGetObserverLastPacketAt(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)

// Set last_packet_at for obs1
ts := "2026-04-24T12:00:00Z"
db.conn.Exec(`UPDATE observers SET last_packet_at = ? WHERE id = ?`, ts, "obs1")

// Verify via GetObservers
observers, err := db.GetObservers()
if err != nil {
t.Fatal(err)
}
var obs1 *Observer
for i := range observers {
if observers[i].ID == "obs1" {
obs1 = &observers[i]
break
}
}
if obs1 == nil {
t.Fatal("obs1 not found")
}
if obs1.LastPacketAt == nil || *obs1.LastPacketAt != ts {
t.Errorf("expected LastPacketAt=%s via GetObservers, got %v", ts, obs1.LastPacketAt)
}

// Verify via GetObserverByID
obs, err := db.GetObserverByID("obs1")
if err != nil {
t.Fatal(err)
}
if obs.LastPacketAt == nil || *obs.LastPacketAt != ts {
t.Errorf("expected LastPacketAt=%s via GetObserverByID, got %v", ts, obs.LastPacketAt)
}
}

func TestGetObserverByIDNotFound(t *testing.T) {
Expand Down Expand Up @@ -1135,7 +1182,8 @@ func setupTestDBV2(t *testing.T) *DB {
iata TEXT,
last_seen TEXT,
first_seen TEXT,
packet_count INTEGER DEFAULT 0
packet_count INTEGER DEFAULT 0,
last_packet_at TEXT DEFAULT NULL
);

CREATE TABLE transmissions (
Expand Down
6 changes: 6 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ func main() {
log.Printf("[store] warning: could not add observers.inactive column: %v", err)
}

// Ensure observers.last_packet_at column exists (PR #905 reads it; ingestor migration
// adds it but server may run against DBs ingestor never touched, e.g. e2e fixture).
if err := ensureLastPacketAtColumn(dbPath); err != nil {
log.Printf("[store] warning: could not add observers.last_packet_at column: %v", err)
}

// Soft-delete observers that are in the blacklist (mark inactive=1) so
// historical data from a prior unblocked window is hidden too.
if len(cfg.ObserverBlacklist) > 0 {
Expand Down
38 changes: 38 additions & 0 deletions cmd/server/neighbor_persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,44 @@ func ensureObserverInactiveColumn(dbPath string) error {
return nil
}

// ensureLastPacketAtColumn adds the last_packet_at column to observers if missing.
// The column was originally added by ingestor migration (observers_last_packet_at_v1)
// to track the most recent packet observation time separately from status updates.
// When the server starts against a DB that was never touched by the ingestor (e.g.
// the e2e fixture), the column is missing and read queries that reference it
// (GetObservers, GetObserverByID) fail with "no such column: last_packet_at".
func ensureLastPacketAtColumn(dbPath string) error {
rw, err := openRW(dbPath)
if err != nil {
return err
}
defer rw.Close()

rows, err := rw.Query("PRAGMA table_info(observers)")
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var cid int
var colName string
var colType sql.NullString
var notNull, pk int
var dflt sql.NullString
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil && colName == "last_packet_at" {
return nil // already exists
}
}

_, err = rw.Exec("ALTER TABLE observers ADD COLUMN last_packet_at TEXT")
if err != nil {
return fmt.Errorf("add last_packet_at column: %w", err)
}
log.Println("[store] Added last_packet_at column to observers")
return nil
}

// softDeleteBlacklistedObservers marks observers matching the blacklist as
// inactive=1 so they are hidden from API responses. Runs once at startup.
func softDeleteBlacklistedObservers(dbPath string, blacklist []string) {
Expand Down
59 changes: 59 additions & 0 deletions cmd/server/neighbor_persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,62 @@ func TestOpenRW_BusyTimeout(t *testing.T) {
t.Errorf("expected busy_timeout=5000, got %d", timeout)
}
}

func TestEnsureLastPacketAtColumn(t *testing.T) {
// Create a temp DB with observers table missing last_packet_at
dir := t.TempDir()
dbPath := dir + "/test.db"
db, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
_, err = db.Exec(`CREATE TABLE observers (
id TEXT PRIMARY KEY,
name TEXT,
last_seen TEXT,
lat REAL,
lon REAL,
inactive INTEGER DEFAULT 0
)`)
if err != nil {
t.Fatal(err)
}
db.Close()

// First call: should add the column
if err := ensureLastPacketAtColumn(dbPath); err != nil {
t.Fatalf("first call failed: %v", err)
}

// Verify column exists
db2, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer db2.Close()

var found bool
rows, err := db2.Query("PRAGMA table_info(observers)")
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var cid int
var colName string
var colType sql.NullString
var notNull, pk int
var dflt sql.NullString
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil && colName == "last_packet_at" {
found = true
}
}
if !found {
t.Fatal("last_packet_at column not found after migration")
}

// Idempotency: second call should succeed without error
if err := ensureLastPacketAtColumn(dbPath); err != nil {
t.Fatalf("idempotent call failed: %v", err)
}
}
2 changes: 2 additions & 0 deletions cmd/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,7 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
ClientVersion: o.ClientVersion, Radio: o.Radio,
BatteryMv: o.BatteryMv, UptimeSecs: o.UptimeSecs,
NoiseFloor: o.NoiseFloor,
LastPacketAt: o.LastPacketAt,
PacketsLastHour: plh,
Lat: lat, Lon: lon, NodeRole: nodeRole,
})
Expand Down Expand Up @@ -1996,6 +1997,7 @@ func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) {
ClientVersion: obs.ClientVersion, Radio: obs.Radio,
BatteryMv: obs.BatteryMv, UptimeSecs: obs.UptimeSecs,
NoiseFloor: obs.NoiseFloor,
LastPacketAt: obs.LastPacketAt,
PacketsLastHour: plh,
})
}
Expand Down
1 change: 1 addition & 0 deletions cmd/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ type ObserverResp struct {
BatteryMv interface{} `json:"battery_mv"`
UptimeSecs interface{} `json:"uptime_secs"`
NoiseFloor interface{} `json:"noise_floor"`
LastPacketAt interface{} `json:"last_packet_at"`
PacketsLastHour int `json:"packetsLastHour"`
Lat interface{} `json:"lat"`
Lon interface{} `json:"lon"`
Expand Down
8 changes: 8 additions & 0 deletions public/observer-detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@
<div class="stat-label">First Seen</div>
<div class="stat-value" style="font-size:0.85em">${obs.first_seen ? new Date(obs.first_seen).toLocaleDateString() : '—'}</div>
</div>
<div class="stat-card">
<div class="stat-label">Last Status Update</div>
<div class="stat-value" style="font-size:0.85em">${obs.last_seen ? timeAgo(obs.last_seen) + '<br><span style="font-size:0.8em;color:var(--text-muted)">' + new Date(obs.last_seen).toLocaleString() + '</span>' : '—'}</div>
</div>
<div class="stat-card">
<div class="stat-label">Last Packet Observation</div>
<div class="stat-value" style="font-size:0.85em">${obs.last_packet_at ? timeAgo(obs.last_packet_at) + '<br><span style="font-size:0.8em;color:var(--text-muted)">' + new Date(obs.last_packet_at).toLocaleString() + '</span>' : '<span style="color:var(--text-muted)">never</span>'}</div>
</div>
</div>
<div class="mono" style="font-size:0.75em;color:var(--text-muted);margin-bottom:20px;word-break:break-all">
ID: ${obs.id}
Expand Down
Loading