Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion docs/designs/RFE284-flmrun/runpy-enhancement-summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ return TaskOutput(data=result)
```python
# Put the result into cache and return ObjectRef
logger.debug("Putting result into cache")
object_ref = put_object(context.session_id, result)
application_id = self._ssn_ctx.application.name
object_ref = put_object(application_id, context.session_id, result)
logger.info(f"Result cached with ObjectRef: {object_ref}")

# Return the ObjectRef as TaskOutput
Expand Down
3 changes: 2 additions & 1 deletion docs/designs/RFE284-flmrun/runpy-enhancements.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ This document describes the enhancements made to the `FlameRunpyService` to impr
```python
# Put the result into cache and return ObjectRef
logger.debug("Putting result into cache")
object_ref = put_object(context.session_id, result)
application_id = self._ssn_ctx.application.name
object_ref = put_object(application_id, context.session_id, result)
logger.info(f"Result cached with ObjectRef: {object_ref}")

# Return the ObjectRef as TaskOutput
Expand Down
66 changes: 35 additions & 31 deletions docs/designs/RFE318-cache/FS.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ The cache server implements the Arrow Flight protocol with the following operati
- Behavior: Reads data from disk using Arrow IPC

3. **get_flight_info**: Get metadata about a flight
- Request: FlightDescriptor with path (`{session_id}/{object_id}`)
- Request: FlightDescriptor with path (`{application_id}/{session_id}/{object_id}`)
- Response: FlightInfo with schema information

4. **list_flights**: List all cached objects
Expand Down Expand Up @@ -143,15 +143,18 @@ class ObjectRef:

**Python SDK Interface:**

1. **put_object(session_id: str, obj: Any) -> ObjectRef**
1. **put_object(application_id: str, session_id: str, obj: Any) -> ObjectRef**
- `application_id`: Required application ID for organizing objects (first parameter)
- `session_id`: The session ID for the object
- `obj`: The object to cache (will be pickled)
- If `cache.storage` is set:
- Write data to local storage using Arrow IPC
- Get flight info to construct an ObjectRef with cache's remote endpoint
- If `cache.storage` is not set:
- Connect to remote cache server via endpoint
- Use Arrow Flight do_put to upload object
- If endpoint is not set: raise exception
- Returns ObjectRef built from FlightInfo
- Returns ObjectRef with key format "application_id/session_id/object_id"

2. **get_object(ref: ObjectRef) -> Any**
- Connect to cache server using ref.endpoint
Expand Down Expand Up @@ -320,9 +323,10 @@ pub struct ObjectMetadata {

**Storage Organization:**
- Root: `{storage_path}/`
- Session directory: `{storage_path}/{session_id}/`
- Object file: `{storage_path}/{session_id}/{object_id}.arrow`
- Key format: `{session_id}/{object_id}`
- Application directory: `{storage_path}/{application_id}/`
- Session directory: `{storage_path}/{application_id}/{session_id}/`
- Object file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow`
- Key format: `{application_id}/{session_id}/{object_id}`

**Arrow IPC File Format:**
- Each object stored as a single RecordBatch in Arrow IPC file
Expand All @@ -341,21 +345,21 @@ pub struct ObjectMetadata {

**do_put Algorithm:**
1. Receive FlightData stream with RecordBatch
2. Extract session_id from app_metadata
3. Generate unique object_id (UUID)
4. Construct key: `{session_id}/{object_id}`
5. Create session directory if it doesn't exist
6. Write RecordBatch to Arrow IPC file: `{storage_path}/{session_id}/{object_id}.arrow`
2. Extract application_id, session_id, and object_id from FlightDescriptor path or app_metadata
3. Generate unique object_id (UUID) if not provided
4. Construct key: `{application_id}/{session_id}/{object_id}`
5. Create application/session directory structure if it doesn't exist
6. Write RecordBatch to Arrow IPC file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow`
7. Update in-memory index: `HashMap<key, ObjectMetadata>`
8. Construct ObjectRef: `{endpoint, key, version: 0}` (using public endpoint from ObjectCache)
9. Serialize ObjectRef to BSON
10. Return PutResult with ObjectRef in app_metadata

**do_get Algorithm:**
1. Extract key from Ticket
2. Parse key to get session_id and object_id
2. Parse key to get application_id, session_id, and object_id
3. Check in-memory index for key
4. If found, read Arrow IPC file: `{storage_path}/{session_id}/{object_id}.arrow`
4. If found, read Arrow IPC file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow`
5. Deserialize RecordBatch from file
6. Convert RecordBatch to FlightData
7. Stream FlightData to client
Expand All @@ -366,8 +370,8 @@ pub struct ObjectMetadata {
3. For each session directory:
- List all `.arrow` files
- For each file:
- Extract object_id from filename
- Construct key: `{session_id}/{object_id}`
- Extract application_id, session_id, and object_id from directory structure and filename
- Construct key: `{application_id}/{session_id}/{object_id}`
- Create FlightInfo with key as ticket and cache service's public endpoint
- Stream FlightInfo to client

Expand All @@ -376,19 +380,19 @@ pub struct ObjectMetadata {
2. If set:
- Serialize object to RecordBatch
- Generate object_id (UUID)
- Write to local storage: `{cache.storage}/{session_id}/{object_id}.arrow`
- Write to local storage: `{cache.storage}/{application_id}/{session_id}/{object_id}.arrow`
- Connect to cache server using `cache.endpoint`
- Get flight info using FlightDescriptor with path `{session_id}/{object_id}`
- Construct ObjectRef with cache server's endpoint from FlightInfo, key `{session_id}/{object_id}`, and version 0
- Get flight info using FlightDescriptor with path `{application_id}/{session_id}/{object_id}`
- Construct ObjectRef with cache server's endpoint from FlightInfo, key `{application_id}/{session_id}/{object_id}`, and version 0
3. If not set:
- Check if `cache.endpoint` is set, else raise exception
- Connect to remote cache server via endpoint
- Call do_put to upload object
- Call do_put with path `{application_id}/{session_id}/{object_id}` to upload object
- Extract ObjectRef from PutResult app_metadata
4. Return ObjectRef

**Key Construction:**
- Format: `{session_id}/{object_id}`
- Format: `{application_id}/{session_id}/{object_id}`
- Session directory creation: Automatically created when first object is stored
- Object ID generation: UUID v4

Expand Down Expand Up @@ -428,7 +432,7 @@ pub struct ObjectMetadata {
- No authentication/authorization in this version
- File permissions: Cache files should be readable/writable by cache server process only
- Network: Server binds to both public IP and localhost (consider firewall rules)
- Input validation: Validate session_id and object_id formats
- Input validation: Validate application_id, session_id, and object_id formats

**Observability:**
- Logging: Use tracing for structured logging
Expand Down Expand Up @@ -471,28 +475,28 @@ pub struct ObjectMetadata {
**Example 1: Python SDK Client Uploading Object to Remote Cache**
- Description: A Python SDK client uploads an object to a remote cache server
- Step-by-step workflow:
1. Client calls `put_object(session_id="sess123", obj=my_data)`
1. Client calls `put_object(application_id="my-app", session_id="sess123", obj=my_data)`
2. SDK checks `cache.storage` - not set
3. SDK checks `cache.endpoint` - set to "grpc://cache.example.com:9090"
4. SDK serializes object to RecordBatch
5. SDK connects to cache server via Arrow Flight
6. SDK calls do_put with RecordBatch and session_id in metadata
7. Cache server generates object_id, creates session directory, writes Arrow IPC file
8. Cache server returns ObjectRef in PutResult
6. SDK calls do_put with RecordBatch and path "my-app/sess123/{object_id}"
7. Cache server generates object_id, creates application/session directory, writes Arrow IPC file
8. Cache server returns ObjectRef in PutResult with key "my-app/sess123/{object_id}"
9. SDK deserializes ObjectRef and returns to client
- Expected outcome: Object is stored on cache server, client receives ObjectRef

**Example 2: Python SDK Client Uploading Object to Local Cache**
- Description: A Python SDK client uploads an object using local storage
- Step-by-step workflow:
1. Client calls `put_object(session_id="sess123", obj=my_data)`
1. Client calls `put_object(application_id="my-app", session_id="sess123", obj=my_data)`
2. SDK checks `cache.storage` - set to "/tmp/flame_cache"
3. SDK serializes object to RecordBatch
4. SDK generates object_id (UUID)
5. SDK writes RecordBatch to `/tmp/flame_cache/sess123/{object_id}.arrow`
5. SDK writes RecordBatch to `/tmp/flame_cache/my-app/sess123/{object_id}.arrow`
6. SDK connects to cache server using `cache.endpoint`
7. SDK gets flight info using FlightDescriptor with path `sess123/{object_id}`
8. SDK constructs ObjectRef with cache server's endpoint from FlightInfo, key `sess123/{object_id}`, and version 0
7. SDK gets flight info using FlightDescriptor with path `my-app/sess123/{object_id}`
8. SDK constructs ObjectRef with cache server's endpoint from FlightInfo, key `my-app/sess123/{object_id}`, and version 0
9. SDK returns ObjectRef to client
- Expected outcome: Object is stored locally, client receives ObjectRef with remote endpoint from FlightInfo

Expand Down Expand Up @@ -542,8 +546,8 @@ pub struct ObjectMetadata {
- Step-by-step workflow:
1. RL module creates RunnerService with execution object
2. RunnerService serializes RunnerContext using cloudpickle
3. RunnerService calls put_object with session_id and serialized context
4. Cache stores context and returns ObjectRef
3. RunnerService calls put_object with application_id, session_id, and serialized context
4. Cache stores context and returns ObjectRef with key "application_id/session_id/object_id"
5. RunnerService encodes ObjectRef to bytes for core API
6. Core API stores ObjectRef bytes as common_data
7. Remote executor retrieves common_data, decodes ObjectRef
Expand Down
10 changes: 5 additions & 5 deletions docs/designs/RFE318-cache/IMPLEMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ObjectRef:
@dataclass
class ObjectRef:
endpoint: str # e.g., "grpc://127.0.0.1:9090"
key: str # e.g., "session_id/object_id"
key: str # e.g., "application_id/session_id/object_id"
version: int # Always 0 for now
```

Expand All @@ -126,15 +126,15 @@ cache:
- `do_action`: Legacy operations (PUT, UPDATE, DELETE)

### Python SDK
- `put_object(session_id, obj)`: Store object, returns ObjectRef
- `put_object(application_id, session_id, obj)`: Store object, returns ObjectRef (application_id is required and is the first parameter)
- `get_object(ref)`: Retrieve object from cache
- `update_object(ref, new_obj)`: Update existing object

## Storage Format

Each object is stored as an Arrow IPC file with:
- **Schema**: `{version: UInt64, data: Binary}`
- **Filename**: `{session_id}/{object_id}.arrow`
- **Filename**: `{application_id}/{session_id}/{object_id}.arrow`
- **Format**: Arrow IPC File format (version 4)

## Testing
Expand All @@ -159,9 +159,9 @@ make e2e-py
import flamepy
from flamepy.core import put_object, get_object

# Put an object
# Put an object (application_id is required and is the first parameter)
data = {"test": "data", "value": 123}
ref = put_object("test-session", data)
ref = put_object("test-app", "test-session", data)
print(f"Stored: {ref.key} at {ref.endpoint}")

# Get the object
Expand Down
5 changes: 3 additions & 2 deletions docs/designs/RFE323-runner-v2/FS.md
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ def service(self, execution_object, stateful=None, autoscale=None):

# Step 6: Serialize and store in cache
serialized = cloudpickle.dumps(runner_context)
object_ref = put_object(session_id, serialized)
object_ref = put_object(self._name, session_id, serialized)

# Step 7: Create session and RunnerService
return RunnerService(self._name, execution_object, stateful, autoscale)
Expand Down Expand Up @@ -994,7 +994,8 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]:
update_object(object_ref, serialized)

# Step 6: Return result
result_ref = put_object(context.session_id, result)
application_id = self._ssn_ctx.application.name
result_ref = put_object(application_id, context.session_id, result)
return TaskOutput(result_ref.encode())
```

Expand Down
8 changes: 4 additions & 4 deletions e2e/src/e2e/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def serialize_runner_context(runner_context: RunnerContext, app_name: str) -> by
)
# Generate a temporary session_id for caching (will be regenerated by create_session if needed)
temp_session_id = short_name(app_name)
# Put in cache to get ObjectRef
object_ref = put_object(temp_session_id, serialized_ctx)
# Put in cache to get ObjectRef (use app_name as application_id)
object_ref = put_object(app_name, temp_session_id, serialized_ctx)
# Encode ObjectRef to bytes for core API
return object_ref.encode()

Expand Down Expand Up @@ -152,9 +152,9 @@ def serialize_common_data(

# Serialize with JSON
serialized_ctx = json.dumps(asdict(common_data)).encode("utf-8")
# Put in cache to get ObjectRef
# Put in cache to get ObjectRef (use app_name as application_id)
temp_session_id = short_name(app_name)
object_ref = put_object(temp_session_id, serialized_ctx)
object_ref = put_object(app_name, temp_session_id, serialized_ctx)
# Encode ObjectRef to bytes for core API
return object_ref.encode()

Expand Down
13 changes: 8 additions & 5 deletions e2e/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

def test_cache_put_and_get():
"""Test basic put and get operations."""
application_id = "test-app"
session_id = "test-session-001"
test_data = {"message": "Hello, Flame!", "value": 42}

# Put object
ref = put_object(session_id, test_data)
ref = put_object(application_id, session_id, test_data)

# Verify ObjectRef structure
assert isinstance(ref, ObjectRef)
assert ref.endpoint is not None
assert ref.key is not None
assert ref.key.startswith(session_id + "/")
assert ref.key.startswith(f"{application_id}/{session_id}/")
assert ref.version == 0

# Get object
Expand All @@ -39,12 +40,13 @@ def test_cache_put_and_get():

def test_cache_update():
"""Test update operation."""
application_id = "test-app"
session_id = "test-session-002"
original_data = {"count": 0}
updated_data = {"count": 1}

# Put original object
ref = put_object(session_id, original_data)
ref = put_object(application_id, session_id, original_data)

# Update object
new_ref = update_object(ref, updated_data)
Expand All @@ -62,6 +64,7 @@ def test_cache_update():

def test_cache_with_complex_objects():
"""Test caching complex Python objects."""
application_id = "test-app"
session_id = "test-session-003"

class ComplexObject:
Expand All @@ -75,7 +78,7 @@ def __eq__(self, other):
test_obj = ComplexObject("test", [1, 2, 3, {"nested": "value"}])

# Put object
ref = put_object(session_id, test_obj)
ref = put_object(application_id, session_id, test_obj)

# Get object
retrieved_obj = get_object(ref)
Expand All @@ -88,7 +91,7 @@ def __eq__(self, other):

def test_objectref_encode_decode():
"""Test ObjectRef serialization and deserialization."""
ref = ObjectRef(endpoint="grpc://127.0.0.1:9090", key="session123/obj456", version=0)
ref = ObjectRef(endpoint="grpc://127.0.0.1:9090", key="app123/session123/obj456", version=0)

# Encode
encoded = ref.encode()
Expand Down
8 changes: 8 additions & 0 deletions object_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ edition = "2021"
name = "flame_cache"
path = "src/lib.rs"

[[bin]]
name = "flame-object-cache"
path = "src/main.rs"

[dependencies]
common = { path = "../common" }
stdng = { path = "../stdng" }
Expand All @@ -21,6 +25,7 @@ uuid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_derive = { workspace = true }
serde_yaml = { workspace = true }
bytes = { workspace = true }
url = { workspace = true }
network-interface = "2"
Expand All @@ -34,3 +39,6 @@ prost-types = { workspace = true }
base64 = "0.22"
bson = "2"
clap = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Loading