-
Notifications
You must be signed in to change notification settings - Fork 0
WIP: Good Sensor Store #1080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
WIP: Good Sensor Store #1080
Conversation
- quality_barrier: Callable[[Observable[T]], Observable[T]] - sharpness_barrier: Callable[[Observable[Image]], Observable[Image]]
- Implement find_closest(), first_timestamp(), iterate(), iterate_ts(), iterate_realtime() methods using abstract _iter_items/_find_closest_timestamp - Add scheduler-based stream() with absolute time reference to prevent timing drift during long playback (ported from replay.py) - Move imports to top of file, add proper typing throughout - Fix pickledir.py mypy error (pickle.load returns Any)
- Single-file SQLite storage with indexed timestamp queries - BLOB storage for pickled sensor data - INSERT OR REPLACE for duplicate timestamp handling - Supports multiple tables per database (different sensors) - Added to parametrized tests (15 tests across 3 backends)
- PostgresStore implements SensorStore[T] + Resource for lifecycle management - Multiple stores can share same database with different tables - Tables created automatically on first save - Tests are optional - skip gracefully if PostgreSQL not available - Added psycopg2-binary and types-psycopg2 dependencies - Includes reset_db() helper for simple migrations (drop/recreate)
Greptile SummaryThis PR implements a unified sensor storage and replay system with multiple backend implementations (in-memory, SQLite, PostgreSQL, and pickle directory). The core design follows a clean abstract base class pattern where backends only need to implement 4 methods ( Key Changes:
Critical Security Issues:
Thread Safety Concerns:
Architecture Strengths:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant SensorStore
participant Backend as Backend (SQLite/Postgres/PickleDir)
participant Scheduler
participant Observer
Note over Client,Backend: Save Operation
Client->>SensorStore: save(data, timestamp)
SensorStore->>SensorStore: Get timestamp from data.ts or time.time()
SensorStore->>Backend: _save(timestamp, data)
Backend->>Backend: Serialize data (pickle)
Backend->>Backend: Store in DB/File
Backend-->>SensorStore: Complete
SensorStore-->>Client: Complete
Note over Client,Backend: Query Operation
Client->>SensorStore: find_closest(seek=10.0)
SensorStore->>SensorStore: first_timestamp()
SensorStore->>Backend: _iter_items()
Backend-->>SensorStore: First timestamp
SensorStore->>SensorStore: Calculate target: first + seek
SensorStore->>Backend: _find_closest_timestamp(target)
Backend->>Backend: Binary search / DB query
Backend-->>SensorStore: Closest timestamp
SensorStore->>Backend: _load(closest_timestamp)
Backend->>Backend: Deserialize data (pickle)
Backend-->>SensorStore: Data object
SensorStore-->>Client: Data object
Note over Client,Observer: Stream with Timing Control
Client->>SensorStore: stream(speed=1.0, seek=5.0)
SensorStore->>Observer: create Observable
Observer->>SensorStore: subscribe()
SensorStore->>Backend: iterate_ts(seek=5.0)
Backend-->>SensorStore: Iterator[(ts, data)]
SensorStore->>SensorStore: Get first message
SensorStore->>SensorStore: Establish timing reference
SensorStore->>Observer: on_next(first_data)
loop For each message
SensorStore->>SensorStore: Pre-load next message
SensorStore->>SensorStore: Calculate absolute emission time
SensorStore->>Scheduler: schedule_relative(delay, emit)
Scheduler->>Scheduler: Wait until target time
Scheduler->>Observer: on_next(data)
end
SensorStore->>Observer: on_completed()
Observer-->>Client: Stream complete
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
25 files reviewed, 7 comments
- Validate table/database names in SqliteStore and PostgresStore using regex (alphanumeric/underscore, not starting with digit) - Fix Transform.to_pose() return type using TYPE_CHECKING import - Add return type annotation to TF.get_pose() - Fix ambiguous doclink in transports.md
SqliteStore now accepts a name (e.g. "recordings/lidar") that gets resolved via get_data_dir to data/recordings/lidar.db. Still supports absolute paths and :memory: for backward compatibility.
- Add TypeVar bound: T = TypeVar("T", bound=Timestamped)
- Simplify save() to always use data.ts (no more optional timestamp)
- Update tests to use SampleData(Timestamped) instead of strings
- SqliteStore accepts str | Path for backward compatibility
# Conflicts: # dimos/models/manipulation/contact_graspnet_pytorch/inference.py
Required when cupy/contact_graspnet are installed locally without type stubs.
Summary
Unified
SensorStore[T]abstraction for timestamped sensor data with multiple backend implementations. Designed for robot sensor recording and replay.SensorStore[T]Base Class (dimos/memory/sensor/base.py)_save,_load,_iter_items,_find_closest_timestampfind_closest(),iterate(),iterate_ts(),iterate_realtime(),stream()Backend Implementations
InMemoryStorebase.pyPickleDirStorepickledir.pySqliteStoresqlite.pyPostgresStorepostgres.pyResourceUsage
Misc
removed generic Embedding and made all Embedding returns the same from all models (since they are)