Skip to content

Turbowarp Integration and Event-sourcing #1097

@mikebarkmin

Description

@mikebarkmin

We can use the turobowarp editor in an iframe and save the current state in the indexeddb and restore it afterwards.

await window.vm.saveProjectSb3()
await window.vm.loadProject(p)

But I think we have to restructure the hyperbook cloud to make the sync more efficient. Therefore I suggest moving to an event-sourcing system.

Here is a reference implementation, which claude produced:

Claude Reference Implementation

Yes! Event sourcing would be an excellent fit for your use case. Here's how you can implement it:

Event Sourcing Architecture

Frontend: Emit Events

class SyncManager {
  constructor(options = {}) {
    // ... existing code ...
    this.eventQueue = [];
    this.eventSequence = 0;
  }

  logEvent(storeName, eventType, data) {
    if (isLoadingFromCloud || isReadOnlyMode()) return;
    
    const event = {
      id: `${Date.now()}-${this.eventSequence++}`,
      storeName,
      eventType, // 'RECORD_CREATED', 'RECORD_UPDATED', 'RECORD_DELETED'
      data,
      timestamp: Date.now(),
      userId: getAuthUser()?.id,
      origin: window.location.origin
    };
    
    this.eventQueue.push(event);
    this.markDirty(storeName);
  }

  async exportStores() {
    // Send events, not state
    const events = {
      version: 1,
      type: 'events',
      events: this.eventQueue,
      metadata: {
        clientId: this.getClientId(),
        sequenceStart: this.eventQueue[0]?.id,
        sequenceEnd: this.eventQueue[this.eventQueue.length - 1]?.id
      }
    };
    
    return events;
  }

  clearEventQueue() {
    this.eventQueue = [];
  }

  getClientId() {
    let clientId = localStorage.getItem('hyperbook_client_id');
    if (!clientId) {
      clientId = `client-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
      localStorage.setItem('hyperbook_client_id', clientId);
    }
    return clientId;
  }
}

// Update Dexie hooks to emit events
store.tables.forEach((table) => {
  if (table.name === "currentState") return;
  
  table.hook("creating", (primKey, obj) => {
    syncManager.logEvent(table.name, 'RECORD_CREATED', {
      key: primKey,
      value: obj
    });
  });
  
  table.hook("updating", (mods, primKey, obj) => {
    syncManager.logEvent(table.name, 'RECORD_UPDATED', {
      key: primKey,
      previousValue: obj,
      modifications: mods,
      newValue: { ...obj, ...mods }
    });
  });
  
  table.hook("deleting", (primKey) => {
    syncManager.logEvent(table.name, 'RECORD_DELETED', {
      key: primKey
    });
  });
});

// After successful save
async performSave(reason = "manual") {
  // ... existing code ...
  
  await apiRequest(`/api/store/${HYPERBOOK_CLOUD.id}/events`, {
    method: "POST",
    body: JSON.stringify({ events: dataToSave }),
  });

  this.clearEventQueue(); // Clear after successful sync
  // ... rest of code ...
}

Backend: Event Store

// Database schema
CREATE TABLE events (
  id VARCHAR(255) PRIMARY KEY,
  store_id VARCHAR(255) NOT NULL,
  user_id VARCHAR(255) NOT NULL,
  client_id VARCHAR(255) NOT NULL,
  store_name VARCHAR(100) NOT NULL,
  event_type VARCHAR(50) NOT NULL,
  event_data JSONB NOT NULL,
  timestamp BIGINT NOT NULL,
  sequence_number SERIAL,
  created_at TIMESTAMP DEFAULT NOW(),
  INDEX idx_store_sequence (store_id, sequence_number),
  INDEX idx_store_timestamp (store_id, timestamp)
);

CREATE TABLE snapshots (
  id SERIAL PRIMARY KEY,
  store_id VARCHAR(255) NOT NULL,
  snapshot_data JSONB NOT NULL,
  last_event_id VARCHAR(255) NOT NULL,
  sequence_number BIGINT NOT NULL,
  created_at TIMESTAMP DEFAULT NOW(),
  INDEX idx_store_latest (store_id, created_at DESC)
);

Backend: Event Handler

// routes/events.js
const express = require('express');
const router = express.Router();

router.post('/api/store/:id/events', async (req, res) => {
  const { id: storeId } = req.params;
  const { events } = req.body;
  
  try {
    // Validate events
    if (!events?.events || !Array.isArray(events.events)) {
      return res.status(400).json({ error: 'Invalid event payload' });
    }

    // Store events atomically
    await db.transaction(async (trx) => {
      for (const event of events.events) {
        await trx('events').insert({
          id: event.id,
          store_id: storeId,
          user_id: req.user.id,
          client_id: events.metadata.clientId,
          store_name: event.storeName,
          event_type: event.eventType,
          event_data: event.data,
          timestamp: event.timestamp
        });
      }
    });

    // Optionally trigger snapshot creation if needed
    await maybeCreateSnapshot(storeId);

    res.json({ success: true, eventsStored: events.events.length });
  } catch (error) {
    console.error('Event storage error:', error);
    res.status(500).json({ error: 'Failed to store events' });
  }
});

// Get current state (replay events or use snapshot)
router.get('/api/store/:id', async (req, res) => {
  const { id: storeId } = req.params;
  const { fromSequence } = req.query;

  try {
    let state;
    
    // Try to use latest snapshot
    const snapshot = await db('snapshots')
      .where({ store_id: storeId })
      .orderBy('created_at', 'desc')
      .first();

    if (snapshot && !fromSequence) {
      state = snapshot.snapshot_data;
      
      // Get events after snapshot
      const newEvents = await db('events')
        .where('store_id', storeId)
        .where('sequence_number', '>', snapshot.sequence_number)
        .orderBy('sequence_number', 'asc');

      // Apply new events to snapshot
      state = applyEvents(state, newEvents);
    } else {
      // Rebuild from all events
      const allEvents = await db('events')
        .where('store_id', storeId)
        .where('sequence_number', '>=', fromSequence || 0)
        .orderBy('sequence_number', 'asc');

      state = replayEvents(allEvents);
    }

    res.json({ data: state });
  } catch (error) {
    console.error('Event replay error:', error);
    res.status(500).json({ error: 'Failed to retrieve state' });
  }
});

// Event replay logic
function replayEvents(events) {
  const state = {
    hyperbook: {},
    sqlIde: {},
    learnJ: {}
  };

  for (const event of events) {
    const store = state[getStoreKey(event.store_name)];
    if (!store) continue;

    switch (event.event_type) {
      case 'RECORD_CREATED':
        if (!store[event.store_name]) store[event.store_name] = [];
        store[event.store_name].push(event.event_data.value);
        break;

      case 'RECORD_UPDATED':
        if (!store[event.store_name]) store[event.store_name] = [];
        const updateIdx = store[event.store_name].findIndex(
          r => r.id === event.event_data.key
        );
        if (updateIdx !== -1) {
          store[event.store_name][updateIdx] = event.event_data.newValue;
        }
        break;

      case 'RECORD_DELETED':
        if (store[event.store_name]) {
          store[event.store_name] = store[event.store_name].filter(
            r => r.id !== event.event_data.key
          );
        }
        break;
    }
  }

  return state;
}

function applyEvents(state, events) {
  // Same as replayEvents but starts with existing state
  return events.reduce((currentState, event) => {
    // Apply event transformation
    return transformState(currentState, event);
  }, state);
}

// Snapshot creation
async function maybeCreateSnapshot(storeId) {
  const eventCount = await db('events')
    .where({ store_id: storeId })
    .count('* as count')
    .first();

  const lastSnapshot = await db('snapshots')
    .where({ store_id: storeId })
    .orderBy('created_at', 'desc')
    .first();

  const eventsSinceSnapshot = lastSnapshot
    ? eventCount.count - lastSnapshot.sequence_number
    : eventCount.count;

  // Create snapshot every 1000 events
  if (eventsSinceSnapshot >= 1000) {
    const allEvents = await db('events')
      .where({ store_id: storeId })
      .orderBy('sequence_number', 'asc');

    const state = replayEvents(allEvents);
    const lastEvent = allEvents[allEvents.length - 1];

    await db('snapshots').insert({
      store_id: storeId,
      snapshot_data: state,
      last_event_id: lastEvent.id,
      sequence_number: lastEvent.sequence_number
    });

    console.log(`✓ Snapshot created for store ${storeId}`);
  }
}

function getStoreKey(tableName) {
  // Map Dexie table names to store keys
  if (tableName.startsWith('sql')) return 'sqlIde';
  if (tableName.startsWith('learn')) return 'learnJ';
  return 'hyperbook';
}

module.exports = router;

Benefits of Event Sourcing

  1. Complete audit trail - Every change is recorded
  2. Time travel - Reconstruct state at any point in time
  3. Debugging - See exactly what changed and when
  4. Conflict resolution - Easier to merge concurrent changes
  5. Bandwidth optimization - Only send deltas
  6. Undo/redo - Trivial to implement
  7. Analytics - Query event stream for usage patterns

Additional Optimizations

// Conflict detection
router.post('/api/store/:id/events', async (req, res) => {
  const { events, metadata } = req.body;
  
  // Check for conflicts with other clients
  const lastServerEvent = await db('events')
    .where({ store_id: storeId })
    .orderBy('sequence_number', 'desc')
    .first();

  if (metadata.lastKnownSequence && 
      lastServerEvent.sequence_number > metadata.lastKnownSequence) {
    // Conflict! Return new events for client to merge
    const newEvents = await db('events')
      .where('store_id', storeId)
      .where('sequence_number', '>', metadata.lastKnownSequence)
      .orderBy('sequence_number', 'asc');

    return res.status(409).json({
      conflict: true,
      newEvents,
      message: 'State has changed, please merge'
    });
  }

  // No conflict, proceed...
});

Event sourcing is perfect for your collaborative editing scenario and gives you much more flexibility than simple state snapshots!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions