diff --git a/items.go b/items.go index 894ea48..d614bf5 100644 --- a/items.go +++ b/items.go @@ -12,10 +12,12 @@ import ( // Common items are the creation of a task, area or checklist, as well as modifying attributes // or marking things as done. type Item struct { - UUID string `json:"-"` - P json.RawMessage `json:"p"` - Kind ItemKind `json:"e"` - Action ItemAction `json:"t"` + UUID string `json:"-"` + P json.RawMessage `json:"p"` + Kind ItemKind `json:"e"` + Action ItemAction `json:"t"` + ServerIndex int `json:"-"` + HasServerIndex bool `json:"-"` } type itemsResponse struct { @@ -65,9 +67,12 @@ func (h *History) Items(opts ItemsOptions) ([]Item, bool, error) { return nil, false, err } var items = []Item{} - for _, m := range v.Items { + for offset, m := range v.Items { + serverIndex := opts.StartIndex + offset for id, item := range m { item.UUID = id + item.ServerIndex = serverIndex + item.HasServerIndex = true items = append(items, item) } } diff --git a/items_test.go b/items_test.go index 7f366c8..1c70b64 100644 --- a/items_test.go +++ b/items_test.go @@ -2,6 +2,8 @@ package thingscloud import ( "fmt" + "net/http" + "net/http/httptest" "testing" ) @@ -25,4 +27,44 @@ func TestHistory_Items(t *testing.T) { t.Fatalf("Expected items, but got none: %#v", items) } }) + + t.Run("SetsServerIndexFromOuterItems", func(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"items":[{"task-a":{"e":"Task6","t":0,"p":{"tt":"Task A"}},"task-b":{"e":"Task6","t":0,"p":{"tt":"Task B"}}},{"task-c":{"e":"Task6","t":0,"p":{"tt":"Task C"}}}],"current-item-index":12,"schema":301}`) + })) + defer server.Close() + + c := New(server.URL, "martin@example.com", "") + h := &History{ + Client: c, + ID: "33333abb-bfe4-4b03-a5c9-106d42220c72", + } + + items, _, err := h.Items(ItemsOptions{StartIndex: 10}) + if err != nil { + t.Fatalf("Items failed: %v", err) + } + if len(items) != 3 { + t.Fatalf("expected 3 flattened items, got %d", len(items)) + } + + indexByUUID := make(map[string]int) + for _, item := range items { + if !item.HasServerIndex { + t.Fatalf("item %s is missing server index metadata", item.UUID) + } + indexByUUID[item.UUID] = item.ServerIndex + } + + for _, uuid := range []string{"task-a", "task-b"} { + if indexByUUID[uuid] != 10 { + t.Errorf("%s ServerIndex = %d, want 10", uuid, indexByUUID[uuid]) + } + } + if indexByUUID["task-c"] != 11 { + t.Errorf("task-c ServerIndex = %d, want 11", indexByUUID["task-c"]) + } + }) } diff --git a/sync/changes.go b/sync/changes.go index 7d2b957..656e96f 100644 --- a/sync/changes.go +++ b/sync/changes.go @@ -621,6 +621,37 @@ func (c UnknownChange) EntityUUID() string { return c.entityUUID } +// LoggedChange represents a semantic change restored from the persisted change log. +// It preserves the stored change type and metadata, but not type-specific fields +// that are not persisted in the current schema. +type LoggedChange struct { + baseChange + changeType string + entityType string + entityUUID string + payload string +} + +// ChangeType returns the persisted semantic change type. +func (c LoggedChange) ChangeType() string { + return c.changeType +} + +// EntityType returns the persisted entity type. +func (c LoggedChange) EntityType() string { + return c.entityType +} + +// EntityUUID returns the persisted entity UUID. +func (c LoggedChange) EntityUUID() string { + return c.entityUUID +} + +// Payload returns the raw item payload stored with the change log entry. +func (c LoggedChange) Payload() string { + return c.payload +} + // Compile-time interface implementation checks var ( _ Change = (*TaskCreated)(nil) @@ -669,4 +700,5 @@ var ( _ Change = (*ChecklistItemTitleChanged)(nil) _ Change = (*UnknownChange)(nil) + _ Change = (*LoggedChange)(nil) ) diff --git a/sync/integration_test.go b/sync/integration_test.go index 14076cb..4026bb7 100644 --- a/sync/integration_test.go +++ b/sync/integration_test.go @@ -144,9 +144,97 @@ func TestIntegration(t *testing.T) { if len(allChanges) != 3 { t.Errorf("expected 3 total changes, got %d", len(allChanges)) } + if allChanges[0].ChangeType() != "TaskCreated" { + t.Errorf("expected first logged change type TaskCreated, got %s", allChanges[0].ChangeType()) + } + if _, ok := allChanges[0].(LoggedChange); !ok { + t.Errorf("expected persisted change to be LoggedChange, got %T", allChanges[0]) + } }) } +func TestProcessItemsUsesItemServerIndex(t *testing.T) { + t.Parallel() + dbPath := filepath.Join(t.TempDir(), "test.db") + + syncer, err := Open(dbPath, nil) + if err != nil { + t.Fatalf("Open failed: %v", err) + } + defer syncer.Close() + + payload := things.TaskActionItemPayload{} + title := "Indexed task" + payload.Title = &title + tp := things.TaskTypeTask + payload.Type = &tp + payloadBytes, _ := json.Marshal(payload) + + items := []things.Item{ + { + UUID: "task-a", + Kind: things.ItemKindTask, + Action: things.ItemActionCreated, + P: payloadBytes, + ServerIndex: 7, + HasServerIndex: true, + }, + { + UUID: "task-b", + Kind: things.ItemKindTask, + Action: things.ItemActionCreated, + P: payloadBytes, + ServerIndex: 7, + HasServerIndex: true, + }, + { + UUID: "task-c", + Kind: things.ItemKindTask, + Action: things.ItemActionCreated, + P: payloadBytes, + ServerIndex: 8, + HasServerIndex: true, + }, + } + + changes, err := syncer.processItems(items, 100) + if err != nil { + t.Fatalf("processItems failed: %v", err) + } + if len(changes) != 3 { + t.Fatalf("expected 3 changes, got %d", len(changes)) + } + + rows, err := syncer.db.Query(`SELECT entity_uuid, server_index FROM change_log`) + if err != nil { + t.Fatalf("querying change_log failed: %v", err) + } + defer rows.Close() + + indexByUUID := make(map[string]int) + for rows.Next() { + var uuid string + var serverIndex int + if err := rows.Scan(&uuid, &serverIndex); err != nil { + t.Fatalf("scanning change_log failed: %v", err) + } + indexByUUID[uuid] = serverIndex + } + if err := rows.Err(); err != nil { + t.Fatalf("reading change_log failed: %v", err) + } + + if indexByUUID["task-a"] != 7 { + t.Errorf("task-a server_index = %d, want 7", indexByUUID["task-a"]) + } + if indexByUUID["task-b"] != 7 { + t.Errorf("task-b server_index = %d, want 7", indexByUUID["task-b"]) + } + if indexByUUID["task-c"] != 8 { + t.Errorf("task-c server_index = %d, want 8", indexByUUID["task-c"]) + } +} + func TestStateQueries(t *testing.T) { t.Parallel() dbPath := filepath.Join(t.TempDir(), "test.db") diff --git a/sync/process.go b/sync/process.go index 3f01e2b..aea0ea3 100644 --- a/sync/process.go +++ b/sync/process.go @@ -33,6 +33,9 @@ func (s *Syncer) processItems(items []things.Item, baseIndex int) ([]Change, err for i, item := range items { serverIndex := baseIndex + i + if item.HasServerIndex { + serverIndex = item.ServerIndex + } ts := time.Now() changes, err := s.processItem(item, serverIndex, ts) diff --git a/sync/sync.go b/sync/sync.go index f209004..b67dfb7 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -25,8 +25,8 @@ type dbExecutor interface { // Syncer manages persistent sync with Things Cloud type Syncer struct { - rawDB *sql.DB // underlying connection for Close() and Begin() - db dbExecutor // current executor (db or tx) + rawDB *sql.DB // underlying connection for Close() and Begin() + db dbExecutor // current executor (db or tx) client *things.Client history *things.History } @@ -250,16 +250,24 @@ func (s *Syncer) scanChangeLog(rows *sql.Rows) ([]Change, error) { timestamp: time.Unix(syncedAt, 0), } - // Return UnknownChange with the change type as details - // A more complete implementation would reconstruct full typed changes - changes = append(changes, UnknownChange{ + var rawPayload string + if payload.Valid { + rawPayload = payload.String + } + + changes = append(changes, LoggedChange{ baseChange: base, + changeType: changeType, entityType: entityType, entityUUID: entityUUID, - Details: changeType, + payload: rawPayload, }) } + if err := rows.Err(); err != nil { + return nil, err + } + return changes, nil }