Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions items.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
// Common items are the creation of a task, area or checklist, as well as modifying attributes
// or marking things as done.
type Item struct {
UUID string `json:"-"`
P json.RawMessage `json:"p"`
Kind ItemKind `json:"e"`
Action ItemAction `json:"t"`
UUID string `json:"-"`
P json.RawMessage `json:"p"`
Kind ItemKind `json:"e"`
Action ItemAction `json:"t"`
ServerIndex int `json:"-"`
HasServerIndex bool `json:"-"`
}

type itemsResponse struct {
Expand Down Expand Up @@ -65,9 +67,12 @@ func (h *History) Items(opts ItemsOptions) ([]Item, bool, error) {
return nil, false, err
}
var items = []Item{}
for _, m := range v.Items {
for offset, m := range v.Items {
serverIndex := opts.StartIndex + offset
for id, item := range m {
item.UUID = id
item.ServerIndex = serverIndex
item.HasServerIndex = true
items = append(items, item)
}
}
Expand Down
42 changes: 42 additions & 0 deletions items_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package thingscloud

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
)

Expand All @@ -25,4 +27,44 @@ func TestHistory_Items(t *testing.T) {
t.Fatalf("Expected items, but got none: %#v", items)
}
})

t.Run("SetsServerIndexFromOuterItems", func(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"items":[{"task-a":{"e":"Task6","t":0,"p":{"tt":"Task A"}},"task-b":{"e":"Task6","t":0,"p":{"tt":"Task B"}}},{"task-c":{"e":"Task6","t":0,"p":{"tt":"Task C"}}}],"current-item-index":12,"schema":301}`)
}))
defer server.Close()

c := New(server.URL, "martin@example.com", "")
h := &History{
Client: c,
ID: "33333abb-bfe4-4b03-a5c9-106d42220c72",
}

items, _, err := h.Items(ItemsOptions{StartIndex: 10})
if err != nil {
t.Fatalf("Items failed: %v", err)
}
if len(items) != 3 {
t.Fatalf("expected 3 flattened items, got %d", len(items))
}

indexByUUID := make(map[string]int)
for _, item := range items {
if !item.HasServerIndex {
t.Fatalf("item %s is missing server index metadata", item.UUID)
}
indexByUUID[item.UUID] = item.ServerIndex
}

for _, uuid := range []string{"task-a", "task-b"} {
if indexByUUID[uuid] != 10 {
t.Errorf("%s ServerIndex = %d, want 10", uuid, indexByUUID[uuid])
}
}
if indexByUUID["task-c"] != 11 {
t.Errorf("task-c ServerIndex = %d, want 11", indexByUUID["task-c"])
}
})
}
32 changes: 32 additions & 0 deletions sync/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,37 @@ func (c UnknownChange) EntityUUID() string {
return c.entityUUID
}

// LoggedChange represents a semantic change restored from the persisted change log.
// It preserves the stored change type and metadata, but not type-specific fields
// that are not persisted in the current schema.
type LoggedChange struct {
baseChange
changeType string
entityType string
entityUUID string
payload string
}

// ChangeType returns the persisted semantic change type.
func (c LoggedChange) ChangeType() string {
return c.changeType
}

// EntityType returns the persisted entity type.
func (c LoggedChange) EntityType() string {
return c.entityType
}

// EntityUUID returns the persisted entity UUID.
func (c LoggedChange) EntityUUID() string {
return c.entityUUID
}

// Payload returns the raw item payload stored with the change log entry.
func (c LoggedChange) Payload() string {
return c.payload
}

// Compile-time interface implementation checks
var (
_ Change = (*TaskCreated)(nil)
Expand Down Expand Up @@ -669,4 +700,5 @@ var (
_ Change = (*ChecklistItemTitleChanged)(nil)

_ Change = (*UnknownChange)(nil)
_ Change = (*LoggedChange)(nil)
)
88 changes: 88 additions & 0 deletions sync/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,97 @@ func TestIntegration(t *testing.T) {
if len(allChanges) != 3 {
t.Errorf("expected 3 total changes, got %d", len(allChanges))
}
if allChanges[0].ChangeType() != "TaskCreated" {
t.Errorf("expected first logged change type TaskCreated, got %s", allChanges[0].ChangeType())
}
if _, ok := allChanges[0].(LoggedChange); !ok {
t.Errorf("expected persisted change to be LoggedChange, got %T", allChanges[0])
}
})
}

func TestProcessItemsUsesItemServerIndex(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "test.db")

syncer, err := Open(dbPath, nil)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer syncer.Close()

payload := things.TaskActionItemPayload{}
title := "Indexed task"
payload.Title = &title
tp := things.TaskTypeTask
payload.Type = &tp
payloadBytes, _ := json.Marshal(payload)

items := []things.Item{
{
UUID: "task-a",
Kind: things.ItemKindTask,
Action: things.ItemActionCreated,
P: payloadBytes,
ServerIndex: 7,
HasServerIndex: true,
},
{
UUID: "task-b",
Kind: things.ItemKindTask,
Action: things.ItemActionCreated,
P: payloadBytes,
ServerIndex: 7,
HasServerIndex: true,
},
{
UUID: "task-c",
Kind: things.ItemKindTask,
Action: things.ItemActionCreated,
P: payloadBytes,
ServerIndex: 8,
HasServerIndex: true,
},
}

changes, err := syncer.processItems(items, 100)
if err != nil {
t.Fatalf("processItems failed: %v", err)
}
if len(changes) != 3 {
t.Fatalf("expected 3 changes, got %d", len(changes))
}

rows, err := syncer.db.Query(`SELECT entity_uuid, server_index FROM change_log`)
if err != nil {
t.Fatalf("querying change_log failed: %v", err)
}
defer rows.Close()

indexByUUID := make(map[string]int)
for rows.Next() {
var uuid string
var serverIndex int
if err := rows.Scan(&uuid, &serverIndex); err != nil {
t.Fatalf("scanning change_log failed: %v", err)
}
indexByUUID[uuid] = serverIndex
}
if err := rows.Err(); err != nil {
t.Fatalf("reading change_log failed: %v", err)
}

if indexByUUID["task-a"] != 7 {
t.Errorf("task-a server_index = %d, want 7", indexByUUID["task-a"])
}
if indexByUUID["task-b"] != 7 {
t.Errorf("task-b server_index = %d, want 7", indexByUUID["task-b"])
}
if indexByUUID["task-c"] != 8 {
t.Errorf("task-c server_index = %d, want 8", indexByUUID["task-c"])
}
}

func TestStateQueries(t *testing.T) {
t.Parallel()
dbPath := filepath.Join(t.TempDir(), "test.db")
Expand Down
3 changes: 3 additions & 0 deletions sync/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (s *Syncer) processItems(items []things.Item, baseIndex int) ([]Change, err

for i, item := range items {
serverIndex := baseIndex + i
if item.HasServerIndex {
serverIndex = item.ServerIndex
}
ts := time.Now()

changes, err := s.processItem(item, serverIndex, ts)
Expand Down
20 changes: 14 additions & 6 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type dbExecutor interface {

// Syncer manages persistent sync with Things Cloud
type Syncer struct {
rawDB *sql.DB // underlying connection for Close() and Begin()
db dbExecutor // current executor (db or tx)
rawDB *sql.DB // underlying connection for Close() and Begin()
db dbExecutor // current executor (db or tx)
client *things.Client
history *things.History
}
Expand Down Expand Up @@ -250,16 +250,24 @@ func (s *Syncer) scanChangeLog(rows *sql.Rows) ([]Change, error) {
timestamp: time.Unix(syncedAt, 0),
}

// Return UnknownChange with the change type as details
// A more complete implementation would reconstruct full typed changes
changes = append(changes, UnknownChange{
var rawPayload string
if payload.Valid {
rawPayload = payload.String
}

changes = append(changes, LoggedChange{
baseChange: base,
changeType: changeType,
entityType: entityType,
entityUUID: entityUUID,
Details: changeType,
payload: rawPayload,
})
}

if err := rows.Err(); err != nil {
return nil, err
}

return changes, nil
}

Expand Down