diff --git a/Cargo.lock b/Cargo.lock index 517d6423..07995dd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,7 +1525,9 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_yaml", "stdng", + "tempfile", "tokio", "tonic", "tracing", diff --git a/docs/designs/RFE284-flmrun/runpy-enhancement-summary.md b/docs/designs/RFE284-flmrun/runpy-enhancement-summary.md index 2ed3608a..33f76fc6 100644 --- a/docs/designs/RFE284-flmrun/runpy-enhancement-summary.md +++ b/docs/designs/RFE284-flmrun/runpy-enhancement-summary.md @@ -22,7 +22,8 @@ return TaskOutput(data=result) ```python # Put the result into cache and return ObjectRef logger.debug("Putting result into cache") -object_ref = put_object(context.session_id, result) +application_id = self._ssn_ctx.application.name +object_ref = put_object(application_id, context.session_id, result) logger.info(f"Result cached with ObjectRef: {object_ref}") # Return the ObjectRef as TaskOutput diff --git a/docs/designs/RFE284-flmrun/runpy-enhancements.md b/docs/designs/RFE284-flmrun/runpy-enhancements.md index b6f7765d..bdece713 100644 --- a/docs/designs/RFE284-flmrun/runpy-enhancements.md +++ b/docs/designs/RFE284-flmrun/runpy-enhancements.md @@ -22,7 +22,8 @@ This document describes the enhancements made to the `FlameRunpyService` to impr ```python # Put the result into cache and return ObjectRef logger.debug("Putting result into cache") -object_ref = put_object(context.session_id, result) +application_id = self._ssn_ctx.application.name +object_ref = put_object(application_id, context.session_id, result) logger.info(f"Result cached with ObjectRef: {object_ref}") # Return the ObjectRef as TaskOutput diff --git a/docs/designs/RFE318-cache/FS.md b/docs/designs/RFE318-cache/FS.md index 2caa979b..dccc0ecc 100644 --- a/docs/designs/RFE318-cache/FS.md +++ b/docs/designs/RFE318-cache/FS.md @@ -98,7 +98,7 @@ The cache server implements the Arrow Flight protocol with the following operati - Behavior: Reads data from disk using Arrow IPC 3. **get_flight_info**: Get metadata about a flight - - Request: FlightDescriptor with path (`{session_id}/{object_id}`) + - Request: FlightDescriptor with path (`{application_id}/{session_id}/{object_id}`) - Response: FlightInfo with schema information 4. **list_flights**: List all cached objects @@ -143,7 +143,10 @@ class ObjectRef: **Python SDK Interface:** -1. **put_object(session_id: str, obj: Any) -> ObjectRef** +1. **put_object(application_id: str, session_id: str, obj: Any) -> ObjectRef** + - `application_id`: Required application ID for organizing objects (first parameter) + - `session_id`: The session ID for the object + - `obj`: The object to cache (will be pickled) - If `cache.storage` is set: - Write data to local storage using Arrow IPC - Get flight info to construct an ObjectRef with cache's remote endpoint @@ -151,7 +154,7 @@ class ObjectRef: - Connect to remote cache server via endpoint - Use Arrow Flight do_put to upload object - If endpoint is not set: raise exception - - Returns ObjectRef built from FlightInfo + - Returns ObjectRef with key format "application_id/session_id/object_id" 2. **get_object(ref: ObjectRef) -> Any** - Connect to cache server using ref.endpoint @@ -320,9 +323,10 @@ pub struct ObjectMetadata { **Storage Organization:** - Root: `{storage_path}/` -- Session directory: `{storage_path}/{session_id}/` -- Object file: `{storage_path}/{session_id}/{object_id}.arrow` -- Key format: `{session_id}/{object_id}` +- Application directory: `{storage_path}/{application_id}/` +- Session directory: `{storage_path}/{application_id}/{session_id}/` +- Object file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow` +- Key format: `{application_id}/{session_id}/{object_id}` **Arrow IPC File Format:** - Each object stored as a single RecordBatch in Arrow IPC file @@ -341,11 +345,11 @@ pub struct ObjectMetadata { **do_put Algorithm:** 1. Receive FlightData stream with RecordBatch -2. Extract session_id from app_metadata -3. Generate unique object_id (UUID) -4. Construct key: `{session_id}/{object_id}` -5. Create session directory if it doesn't exist -6. Write RecordBatch to Arrow IPC file: `{storage_path}/{session_id}/{object_id}.arrow` +2. Extract application_id, session_id, and object_id from FlightDescriptor path or app_metadata +3. Generate unique object_id (UUID) if not provided +4. Construct key: `{application_id}/{session_id}/{object_id}` +5. Create application/session directory structure if it doesn't exist +6. Write RecordBatch to Arrow IPC file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow` 7. Update in-memory index: `HashMap` 8. Construct ObjectRef: `{endpoint, key, version: 0}` (using public endpoint from ObjectCache) 9. Serialize ObjectRef to BSON @@ -353,9 +357,9 @@ pub struct ObjectMetadata { **do_get Algorithm:** 1. Extract key from Ticket -2. Parse key to get session_id and object_id +2. Parse key to get application_id, session_id, and object_id 3. Check in-memory index for key -4. If found, read Arrow IPC file: `{storage_path}/{session_id}/{object_id}.arrow` +4. If found, read Arrow IPC file: `{storage_path}/{application_id}/{session_id}/{object_id}.arrow` 5. Deserialize RecordBatch from file 6. Convert RecordBatch to FlightData 7. Stream FlightData to client @@ -366,8 +370,8 @@ pub struct ObjectMetadata { 3. For each session directory: - List all `.arrow` files - For each file: - - Extract object_id from filename - - Construct key: `{session_id}/{object_id}` + - Extract application_id, session_id, and object_id from directory structure and filename + - Construct key: `{application_id}/{session_id}/{object_id}` - Create FlightInfo with key as ticket and cache service's public endpoint - Stream FlightInfo to client @@ -376,19 +380,19 @@ pub struct ObjectMetadata { 2. If set: - Serialize object to RecordBatch - Generate object_id (UUID) - - Write to local storage: `{cache.storage}/{session_id}/{object_id}.arrow` + - Write to local storage: `{cache.storage}/{application_id}/{session_id}/{object_id}.arrow` - Connect to cache server using `cache.endpoint` - - Get flight info using FlightDescriptor with path `{session_id}/{object_id}` - - Construct ObjectRef with cache server's endpoint from FlightInfo, key `{session_id}/{object_id}`, and version 0 + - Get flight info using FlightDescriptor with path `{application_id}/{session_id}/{object_id}` + - Construct ObjectRef with cache server's endpoint from FlightInfo, key `{application_id}/{session_id}/{object_id}`, and version 0 3. If not set: - Check if `cache.endpoint` is set, else raise exception - Connect to remote cache server via endpoint - - Call do_put to upload object + - Call do_put with path `{application_id}/{session_id}/{object_id}` to upload object - Extract ObjectRef from PutResult app_metadata 4. Return ObjectRef **Key Construction:** -- Format: `{session_id}/{object_id}` +- Format: `{application_id}/{session_id}/{object_id}` - Session directory creation: Automatically created when first object is stored - Object ID generation: UUID v4 @@ -428,7 +432,7 @@ pub struct ObjectMetadata { - No authentication/authorization in this version - File permissions: Cache files should be readable/writable by cache server process only - Network: Server binds to both public IP and localhost (consider firewall rules) -- Input validation: Validate session_id and object_id formats +- Input validation: Validate application_id, session_id, and object_id formats **Observability:** - Logging: Use tracing for structured logging @@ -471,28 +475,28 @@ pub struct ObjectMetadata { **Example 1: Python SDK Client Uploading Object to Remote Cache** - Description: A Python SDK client uploads an object to a remote cache server - Step-by-step workflow: - 1. Client calls `put_object(session_id="sess123", obj=my_data)` + 1. Client calls `put_object(application_id="my-app", session_id="sess123", obj=my_data)` 2. SDK checks `cache.storage` - not set 3. SDK checks `cache.endpoint` - set to "grpc://cache.example.com:9090" 4. SDK serializes object to RecordBatch 5. SDK connects to cache server via Arrow Flight - 6. SDK calls do_put with RecordBatch and session_id in metadata - 7. Cache server generates object_id, creates session directory, writes Arrow IPC file - 8. Cache server returns ObjectRef in PutResult + 6. SDK calls do_put with RecordBatch and path "my-app/sess123/{object_id}" + 7. Cache server generates object_id, creates application/session directory, writes Arrow IPC file + 8. Cache server returns ObjectRef in PutResult with key "my-app/sess123/{object_id}" 9. SDK deserializes ObjectRef and returns to client - Expected outcome: Object is stored on cache server, client receives ObjectRef **Example 2: Python SDK Client Uploading Object to Local Cache** - Description: A Python SDK client uploads an object using local storage - Step-by-step workflow: - 1. Client calls `put_object(session_id="sess123", obj=my_data)` + 1. Client calls `put_object(application_id="my-app", session_id="sess123", obj=my_data)` 2. SDK checks `cache.storage` - set to "/tmp/flame_cache" 3. SDK serializes object to RecordBatch 4. SDK generates object_id (UUID) - 5. SDK writes RecordBatch to `/tmp/flame_cache/sess123/{object_id}.arrow` + 5. SDK writes RecordBatch to `/tmp/flame_cache/my-app/sess123/{object_id}.arrow` 6. SDK connects to cache server using `cache.endpoint` - 7. SDK gets flight info using FlightDescriptor with path `sess123/{object_id}` - 8. SDK constructs ObjectRef with cache server's endpoint from FlightInfo, key `sess123/{object_id}`, and version 0 + 7. SDK gets flight info using FlightDescriptor with path `my-app/sess123/{object_id}` + 8. SDK constructs ObjectRef with cache server's endpoint from FlightInfo, key `my-app/sess123/{object_id}`, and version 0 9. SDK returns ObjectRef to client - Expected outcome: Object is stored locally, client receives ObjectRef with remote endpoint from FlightInfo @@ -542,8 +546,8 @@ pub struct ObjectMetadata { - Step-by-step workflow: 1. RL module creates RunnerService with execution object 2. RunnerService serializes RunnerContext using cloudpickle - 3. RunnerService calls put_object with session_id and serialized context - 4. Cache stores context and returns ObjectRef + 3. RunnerService calls put_object with application_id, session_id, and serialized context + 4. Cache stores context and returns ObjectRef with key "application_id/session_id/object_id" 5. RunnerService encodes ObjectRef to bytes for core API 6. Core API stores ObjectRef bytes as common_data 7. Remote executor retrieves common_data, decodes ObjectRef diff --git a/docs/designs/RFE318-cache/IMPLEMENTATION.md b/docs/designs/RFE318-cache/IMPLEMENTATION.md index a0054640..b161c747 100644 --- a/docs/designs/RFE318-cache/IMPLEMENTATION.md +++ b/docs/designs/RFE318-cache/IMPLEMENTATION.md @@ -99,7 +99,7 @@ class ObjectRef: @dataclass class ObjectRef: endpoint: str # e.g., "grpc://127.0.0.1:9090" - key: str # e.g., "session_id/object_id" + key: str # e.g., "application_id/session_id/object_id" version: int # Always 0 for now ``` @@ -126,7 +126,7 @@ cache: - `do_action`: Legacy operations (PUT, UPDATE, DELETE) ### Python SDK -- `put_object(session_id, obj)`: Store object, returns ObjectRef +- `put_object(application_id, session_id, obj)`: Store object, returns ObjectRef (application_id is required and is the first parameter) - `get_object(ref)`: Retrieve object from cache - `update_object(ref, new_obj)`: Update existing object @@ -134,7 +134,7 @@ cache: Each object is stored as an Arrow IPC file with: - **Schema**: `{version: UInt64, data: Binary}` -- **Filename**: `{session_id}/{object_id}.arrow` +- **Filename**: `{application_id}/{session_id}/{object_id}.arrow` - **Format**: Arrow IPC File format (version 4) ## Testing @@ -159,9 +159,9 @@ make e2e-py import flamepy from flamepy.core import put_object, get_object -# Put an object +# Put an object (application_id is required and is the first parameter) data = {"test": "data", "value": 123} -ref = put_object("test-session", data) +ref = put_object("test-app", "test-session", data) print(f"Stored: {ref.key} at {ref.endpoint}") # Get the object diff --git a/docs/designs/RFE323-runner-v2/FS.md b/docs/designs/RFE323-runner-v2/FS.md index 43ee4aef..c301789e 100644 --- a/docs/designs/RFE323-runner-v2/FS.md +++ b/docs/designs/RFE323-runner-v2/FS.md @@ -918,7 +918,7 @@ def service(self, execution_object, stateful=None, autoscale=None): # Step 6: Serialize and store in cache serialized = cloudpickle.dumps(runner_context) - object_ref = put_object(session_id, serialized) + object_ref = put_object(self._name, session_id, serialized) # Step 7: Create session and RunnerService return RunnerService(self._name, execution_object, stateful, autoscale) @@ -994,7 +994,8 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: update_object(object_ref, serialized) # Step 6: Return result - result_ref = put_object(context.session_id, result) + application_id = self._ssn_ctx.application.name + result_ref = put_object(application_id, context.session_id, result) return TaskOutput(result_ref.encode()) ``` diff --git a/e2e/src/e2e/helpers.py b/e2e/src/e2e/helpers.py index 989c27ee..f98bb9a1 100644 --- a/e2e/src/e2e/helpers.py +++ b/e2e/src/e2e/helpers.py @@ -111,8 +111,8 @@ def serialize_runner_context(runner_context: RunnerContext, app_name: str) -> by ) # Generate a temporary session_id for caching (will be regenerated by create_session if needed) temp_session_id = short_name(app_name) - # Put in cache to get ObjectRef - object_ref = put_object(temp_session_id, serialized_ctx) + # Put in cache to get ObjectRef (use app_name as application_id) + object_ref = put_object(app_name, temp_session_id, serialized_ctx) # Encode ObjectRef to bytes for core API return object_ref.encode() @@ -152,9 +152,9 @@ def serialize_common_data( # Serialize with JSON serialized_ctx = json.dumps(asdict(common_data)).encode("utf-8") - # Put in cache to get ObjectRef + # Put in cache to get ObjectRef (use app_name as application_id) temp_session_id = short_name(app_name) - object_ref = put_object(temp_session_id, serialized_ctx) + object_ref = put_object(app_name, temp_session_id, serialized_ctx) # Encode ObjectRef to bytes for core API return object_ref.encode() diff --git a/e2e/tests/test_cache.py b/e2e/tests/test_cache.py index 3d78578c..f085db04 100644 --- a/e2e/tests/test_cache.py +++ b/e2e/tests/test_cache.py @@ -17,17 +17,18 @@ def test_cache_put_and_get(): """Test basic put and get operations.""" + application_id = "test-app" session_id = "test-session-001" test_data = {"message": "Hello, Flame!", "value": 42} # Put object - ref = put_object(session_id, test_data) + ref = put_object(application_id, session_id, test_data) # Verify ObjectRef structure assert isinstance(ref, ObjectRef) assert ref.endpoint is not None assert ref.key is not None - assert ref.key.startswith(session_id + "/") + assert ref.key.startswith(f"{application_id}/{session_id}/") assert ref.version == 0 # Get object @@ -39,12 +40,13 @@ def test_cache_put_and_get(): def test_cache_update(): """Test update operation.""" + application_id = "test-app" session_id = "test-session-002" original_data = {"count": 0} updated_data = {"count": 1} # Put original object - ref = put_object(session_id, original_data) + ref = put_object(application_id, session_id, original_data) # Update object new_ref = update_object(ref, updated_data) @@ -62,6 +64,7 @@ def test_cache_update(): def test_cache_with_complex_objects(): """Test caching complex Python objects.""" + application_id = "test-app" session_id = "test-session-003" class ComplexObject: @@ -75,7 +78,7 @@ def __eq__(self, other): test_obj = ComplexObject("test", [1, 2, 3, {"nested": "value"}]) # Put object - ref = put_object(session_id, test_obj) + ref = put_object(application_id, session_id, test_obj) # Get object retrieved_obj = get_object(ref) @@ -88,7 +91,7 @@ def __eq__(self, other): def test_objectref_encode_decode(): """Test ObjectRef serialization and deserialization.""" - ref = ObjectRef(endpoint="grpc://127.0.0.1:9090", key="session123/obj456", version=0) + ref = ObjectRef(endpoint="grpc://127.0.0.1:9090", key="app123/session123/obj456", version=0) # Encode encoded = ref.encode() diff --git a/object_cache/Cargo.toml b/object_cache/Cargo.toml index 76821fa7..62c9bf81 100644 --- a/object_cache/Cargo.toml +++ b/object_cache/Cargo.toml @@ -7,6 +7,10 @@ edition = "2021" name = "flame_cache" path = "src/lib.rs" +[[bin]] +name = "flame-object-cache" +path = "src/main.rs" + [dependencies] common = { path = "../common" } stdng = { path = "../stdng" } @@ -21,6 +25,7 @@ uuid = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_derive = { workspace = true } +serde_yaml = { workspace = true } bytes = { workspace = true } url = { workspace = true } network-interface = "2" @@ -34,3 +39,6 @@ prost-types = { workspace = true } base64 = "0.22" bson = "2" clap = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/object_cache/README.md b/object_cache/README.md index d8e31423..77feb177 100644 --- a/object_cache/README.md +++ b/object_cache/README.md @@ -10,7 +10,7 @@ The flame-object-cache is an embedded library that provides persistent object st - **Persistent Storage**: Objects are stored on disk using Arrow IPC format and survive server restarts - **Arrow Flight Protocol**: High-performance gRPC-based protocol for data transfer -- **Key-based Organization**: Objects organized by session ID (`session_id/object_id`) +- **Key-based Organization**: Objects organized by application and session ID (`application_id/session_id/object_id`) - **In-memory Index**: Fast O(1) lookups with disk-backed persistence - **Zero-copy Operations**: Leverages Arrow's efficient columnar format @@ -40,6 +40,10 @@ clusters: - `FLAME_CACHE_STORAGE`: Override cache storage path - `FLAME_CACHE_ENDPOINT`: Override cache endpoint +- `RUST_LOG`: Control log level (e.g., `debug`, `info`, `warn`, `error`) + - Set to `debug` to see detailed debug logs: `RUST_LOG=flame_cache=debug` + - Set to `trace` for even more verbose output: `RUST_LOG=flame_cache=trace` + - Set globally: `RUST_LOG=debug` (shows debug logs from all modules) ## Usage @@ -50,10 +54,10 @@ The cache server is automatically started when the executor-manager starts. No s ### Python SDK ```python -from flamepy import put_object, get_object, ObjectRef +from flamepy import put_object, get_object, update_object, ObjectRef -# Put an object -ref = put_object("session123", my_data) +# Put an object (application_id is required and is the first parameter) +ref = put_object("my-app", "session123", my_data) print(f"Stored at: {ref.endpoint}/{ref.key}") # Get an object @@ -67,10 +71,11 @@ new_ref = update_object(ref, new_data) ``` /var/lib/flame/cache/ -└── session_id/ - ├── object1.arrow - ├── object2.arrow - └── object3.arrow +└── application_id/ + └── session_id/ + ├── object1.arrow + ├── object2.arrow + └── object3.arrow ``` Each object is stored as an Arrow IPC file with schema: `{version: UInt64, data: Binary}` @@ -97,12 +102,94 @@ cargo build --package object_cache --release cargo build --package executor_manager --release ``` +## Logging Configuration + +The object cache uses the `tracing` crate for logging. Log levels are controlled via the `RUST_LOG` environment variable. + +### Enable Debug Logs + +To see debug logs from object_cache: + +```bash +# Enable debug logs for object_cache only +export RUST_LOG=flame_cache=debug + +# Or enable debug logs for all modules +export RUST_LOG=debug + +# For even more verbose output (trace level) +export RUST_LOG=flame_cache=trace + +# Output ALL logs from all modules (maximum verbosity) +export RUST_LOG=trace + +# Output all logs including third-party libraries (override default filters) +export RUST_LOG=trace,h2=trace,hyper_util=trace,tower=trace +``` + +### Log Level Examples + +```bash +# Debug level (recommended for development) +RUST_LOG=flame_cache=debug flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Info level (default) +RUST_LOG=flame_cache=info flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Trace level (very verbose, for deep debugging) +RUST_LOG=flame_cache=trace flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Output ALL logs from all modules (maximum verbosity) +RUST_LOG=trace flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Output all logs including network libraries (override default filters) +RUST_LOG=trace,h2=trace,hyper_util=trace,tower=trace flame-object-cache --endpoint grpc://127.0.0.1:9090 +``` + +### Available Log Levels + +- `error`: Only error messages +- `warn`: Warnings and errors +- `info`: Informational messages (default) +- `debug`: Debug information including object operations +- `trace`: Very detailed trace information + +### Debug Log Output + +When debug logging is enabled, you'll see logs for: +- Object put/get/update/delete operations +- Disk I/O operations +- Object loading from disk +- Key format parsing +- Flight protocol operations + +### Output All Logs + +To output **all logs** including third-party libraries: + +```bash +# Maximum verbosity - all modules at trace level +RUST_LOG=trace flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Override default filters to see network library logs too +RUST_LOG=trace,h2=trace,hyper_util=trace,tower=trace,sqlx=trace flame-object-cache --endpoint grpc://127.0.0.1:9090 + +# Or set globally before running +export RUST_LOG=trace +flame-object-cache --endpoint grpc://127.0.0.1:9090 +``` + +**Note**: By default, some third-party libraries (h2, hyper_util, tower, sqlx) have their log levels restricted to reduce noise. To see their logs, explicitly set them to `trace` as shown above. + ## Running with Docker Compose ```bash # Start all services (cache runs embedded in executor-manager) docker compose up -d +# View cache logs with debug level +RUST_LOG=flame_cache=debug docker compose up + # View cache logs (part of executor-manager logs) docker compose logs flame-executor-manager | grep cache diff --git a/object_cache/src/cache.rs b/object_cache/src/cache.rs index 8ad498d1..3f15a0e0 100644 --- a/object_cache/src/cache.rs +++ b/object_cache/src/cache.rs @@ -164,6 +164,7 @@ impl ObjectCache { fn load_session_objects( &self, + application_id: &str, session_path: &Path, objects: &mut HashMap, ) -> Result<(), FlameError> { @@ -187,7 +188,7 @@ impl ObjectCache { .and_then(|n| n.to_str()) .ok_or_else(|| FlameError::Internal("Invalid object file name".to_string()))?; - let key = format!("{}/{}", session_id, object_id); + let key = format!("{}/{}/{}", application_id, session_id, object_id); let size = fs::metadata(&object_path)?.len(); let metadata = self.create_metadata(key.clone(), size); @@ -208,15 +209,31 @@ impl ObjectCache { tracing::info!("Loading objects from disk: {:?}", storage_path); let mut objects = lock_ptr!(self.objects)?; - for session_entry in fs::read_dir(storage_path)? { - let session_entry = session_entry?; - let session_path = session_entry.path(); + // Iterate over application directories + for application_entry in fs::read_dir(storage_path)? { + let application_entry = application_entry?; + let application_path = application_entry.path(); - if !session_path.is_dir() { + if !application_path.is_dir() { continue; } - self.load_session_objects(&session_path, &mut objects)?; + let application_id = application_path + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| FlameError::Internal("Invalid application directory name".to_string()))?; + + // Iterate over session directories within application directory + for session_entry in fs::read_dir(&application_path)? { + let session_entry = session_entry?; + let session_path = session_entry.path(); + + if !session_path.is_dir() { + continue; + } + + self.load_session_objects(application_id, &session_path, &mut objects)?; + } } tracing::info!("Loaded {} objects from disk", objects.len()); @@ -225,25 +242,28 @@ impl ObjectCache { async fn put( &self, + application_id: String, session_id: SessionID, object: Object, ) -> Result { - self.put_with_id(session_id, None, object).await + self.put_with_id(application_id, session_id, None, object).await } async fn put_with_id( &self, + application_id: String, session_id: SessionID, object_id: Option, object: Object, ) -> Result { let object_id = object_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - let key = format!("{}/{}", session_id, object_id); + let key = format!("{}/{}/{}", application_id, session_id, object_id); // Write to disk if storage is configured if let Some(storage_path) = &self.storage_path { - // Create session directory - let session_dir = storage_path.join(&session_id); + // Create application/session directory structure + let application_dir = storage_path.join(&application_id); + let session_dir = application_dir.join(&session_id); fs::create_dir_all(&session_dir)?; // Write object to Arrow IPC file @@ -272,7 +292,24 @@ impl ObjectCache { .as_ref() .ok_or_else(|| FlameError::InvalidConfig("Storage path not configured".to_string()))?; - let object_path = storage_path.join(format!("{}.arrow", key)); + // Parse key: application_id/session_id/object_id + let parts: Vec<&str> = key.split('/').collect(); + if parts.len() != 3 { + return Err(FlameError::InvalidConfig(format!( + "Invalid key format (expected application_id/session_id/object_id): {}", + key + ))); + } + + let application_id = parts[0]; + let session_id = parts[1]; + let object_id = parts[2]; + + // Build path: storage_path/application_id/session_id/object_id.arrow + let object_path = storage_path + .join(application_id) + .join(session_id) + .join(format!("{}.arrow", object_id)); let file = fs::File::open(&object_path) .map_err(|e| FlameError::NotFound(format!("Object file not found: {}", e)))?; @@ -297,7 +334,22 @@ impl ObjectCache { None => return Ok(None), }; - let object_path = storage_path.join(format!("{}.arrow", key)); + // Parse key: application_id/session_id/object_id + let parts: Vec<&str> = key.split('/').collect(); + if parts.len() != 3 { + return Ok(None); + } + + let application_id = parts[0]; + let session_id = parts[1]; + let object_id = parts[2]; + + // Build path: storage_path/application_id/session_id/object_id.arrow + let object_path = storage_path + .join(application_id) + .join(session_id) + .join(format!("{}.arrow", object_id)); + if !object_path.exists() { return Ok(None); } @@ -335,21 +387,24 @@ impl ObjectCache { } async fn update(&self, key: String, new_object: Object) -> Result { - // For now, update is the same as put (overwrites the file) - // Parse key to get session_id + // Parse key: application_id/session_id/object_id let parts: Vec<&str> = key.split('/').collect(); - if parts.len() != 2 { + if parts.len() != 3 { return Err(FlameError::InvalidConfig(format!( - "Invalid key format: {}", + "Invalid key format (expected application_id/session_id/object_id): {}", key ))); } - let _session_id = parts[0].to_string(); - let _object_id = parts[1].to_string(); + let application_id = parts[0]; + let session_id = parts[1]; + let object_id = parts[2]; // Write to disk if storage is configured if let Some(storage_path) = &self.storage_path { - let object_path = storage_path.join(format!("{}.arrow", key)); + let object_path = storage_path + .join(application_id) + .join(session_id) + .join(format!("{}.arrow", object_id)); let batch = object_to_batch(&new_object) .map_err(|e| FlameError::Internal(format!("Failed to create batch: {}", e)))?; @@ -368,10 +423,10 @@ impl ObjectCache { Ok(metadata) } - async fn delete(&self, session_id: SessionID) -> Result<(), FlameError> { + async fn delete(&self, application_id: String, session_id: SessionID) -> Result<(), FlameError> { // Delete session directory and all objects if let Some(storage_path) = &self.storage_path { - let session_dir = storage_path.join(&session_id); + let session_dir = storage_path.join(&application_id).join(&session_id); if session_dir.exists() { fs::remove_dir_all(&session_dir)?; tracing::debug!("Deleted session directory: {:?}", session_dir); @@ -380,9 +435,10 @@ impl ObjectCache { // Remove from in-memory index let mut objects = lock_ptr!(self.objects)?; - objects.retain(|key, _| !key.starts_with(&format!("{}/", session_id))); + let prefix = format!("{}/{}/", application_id, session_id); + objects.retain(|key, _| !key.starts_with(&prefix)); - tracing::debug!("Session deleted: <{}>", session_id); + tracing::debug!("Session deleted: <{}/{}>", application_id, session_id); Ok(()) } @@ -402,8 +458,9 @@ impl FlightCacheServer { Self { cache } } - fn extract_session_and_object_id( + fn extract_application_session_and_object_id( flight_data: &FlightData, + application_id: &mut Option, session_id: &mut Option, object_id: &mut Option, ) { @@ -416,15 +473,34 @@ impl FlightCacheServer { let path_str = &desc.path[0]; if path_str.contains('/') { let parts: Vec<&str> = path_str.split('/').collect(); - if parts.len() == 2 { + if parts.len() == 3 { + // Format: application_id/session_id/object_id + *application_id = Some(parts[0].to_string()); + *session_id = Some(parts[1].to_string()); + *object_id = Some(parts[2].to_string()); + } else if parts.len() == 2 { + // Legacy format: session_id/object_id (for backward compatibility) *session_id = Some(parts[0].to_string()); *object_id = Some(parts[1].to_string()); } } else { + // Only session_id provided *session_id = Some(path_str.clone()); } } } + + // Try to extract application_id from app_metadata if not found in path + if application_id.is_none() && !flight_data.app_metadata.is_empty() { + if let Ok(metadata_str) = String::from_utf8(flight_data.app_metadata.to_vec()) { + for line in metadata_str.lines() { + if let Some(id) = line.strip_prefix("application_id:") { + *application_id = Some(id.trim().to_string()); + break; + } + } + } + } } fn extract_schema_from_flight_data( @@ -457,8 +533,9 @@ impl FlightCacheServer { async fn collect_batches_from_stream( mut stream: Streaming, - ) -> Result<(String, Option, Vec), FlameError> { + ) -> Result<(String, String, Option, Vec), FlameError> { let mut batches = Vec::new(); + let mut application_id: Option = None; let mut session_id: Option = None; let mut object_id: Option = None; let mut schema: Option> = None; @@ -468,7 +545,12 @@ impl FlightCacheServer { .await .map_err(|e| FlameError::Internal(format!("Stream error: {}", e)))? { - Self::extract_session_and_object_id(&flight_data, &mut session_id, &mut object_id); + Self::extract_application_session_and_object_id( + &flight_data, + &mut application_id, + &mut session_id, + &mut object_id, + ); // Extract schema from data_header in first message if schema.is_none() && !flight_data.data_header.is_empty() { @@ -488,13 +570,19 @@ impl FlightCacheServer { return Err(FlameError::InvalidState("No data received".to_string())); } + let application_id = application_id.ok_or_else(|| { + FlameError::InvalidState( + "application_id must be provided in flight descriptor path or app_metadata".to_string(), + ) + })?; + let session_id = session_id.ok_or_else(|| { FlameError::InvalidState( - "session_id must be provided in app_metadata as 'session_id:{id}'".to_string(), + "session_id must be provided in flight descriptor path or app_metadata".to_string(), ) })?; - Ok((session_id, object_id, batches)) + Ok((application_id, session_id, object_id, batches)) } fn combine_batches(batches: Vec) -> Result { @@ -525,17 +613,33 @@ impl FlightCacheServer { } async fn handle_put_action(&self, action_body: &str) -> Result { - let (session_id_str, data_b64) = action_body - .split_once(':') - .ok_or_else(|| FlameError::InvalidState("Invalid PUT action format".to_string()))?; + // Format: "application_id:session_id:data_b64" or "application_id/session_id:data_b64" + let parts: Vec<&str> = action_body.splitn(3, ':').collect(); + if parts.len() < 2 { + return Err(FlameError::InvalidState("Invalid PUT action format".to_string())); + } - let session_id = session_id_str.to_string(); + let (application_id, session_id) = if parts[0].contains('/') { + // Format: "application_id/session_id:data_b64" + let path_parts: Vec<&str> = parts[0].split('/').collect(); + if path_parts.len() != 2 { + return Err(FlameError::InvalidState("Invalid PUT action format".to_string())); + } + (path_parts[0].to_string(), path_parts[1].to_string()) + } else if parts.len() == 3 { + // Format: "application_id:session_id:data_b64" + (parts[0].to_string(), parts[1].to_string()) + } else { + return Err(FlameError::InvalidState("Invalid PUT action format".to_string())); + }; + + let data_b64 = parts[parts.len() - 1]; let data = base64::engine::general_purpose::STANDARD .decode(data_b64) .map_err(|e| FlameError::InvalidState(format!("Invalid base64: {}", e)))?; let object = Object { version: 0, data }; - let metadata = self.cache.put(session_id, object).await?; + let metadata = self.cache.put(application_id, session_id, object).await?; serde_json::to_string(&metadata) .map_err(|e| FlameError::Internal(format!("Failed to serialize: {}", e))) @@ -558,8 +662,29 @@ impl FlightCacheServer { .map_err(|e| FlameError::Internal(format!("Failed to serialize: {}", e))) } - async fn handle_delete_action(&self, session_id: String) -> Result { - self.cache.delete(session_id).await?; + async fn handle_delete_action(&self, action_body: String) -> Result { + // Format: "application_id/session_id" or "application_id:session_id" + let (application_id, session_id) = if action_body.contains('/') { + let parts: Vec<&str> = action_body.split('/').collect(); + if parts.len() != 2 { + return Err(FlameError::InvalidState("Invalid DELETE action format".to_string())); + } + (parts[0].to_string(), parts[1].to_string()) + } else if action_body.contains(':') { + let parts: Vec<&str> = action_body.split(':').collect(); + if parts.len() != 2 { + return Err(FlameError::InvalidState("Invalid DELETE action format".to_string())); + } + (parts[0].to_string(), parts[1].to_string()) + } else { + // Legacy format: only session_id (for backward compatibility) + // Try to infer application_id from existing objects + return Err(FlameError::InvalidState( + "DELETE action requires application_id/session_id format".to_string(), + )); + }; + + self.cache.delete(application_id, session_id).await?; Ok("OK".to_string()) } } @@ -727,7 +852,7 @@ impl FlightService for FlightCacheServer { return Err(Status::invalid_argument("Empty descriptor path")); }; - // Key format: "session_id/object_id" + // Key format: "application_id/session_id/object_id" // Create endpoint with cache server's public endpoint let endpoint_uri = self.cache.endpoint.to_uri(); @@ -770,7 +895,7 @@ impl FlightService for FlightCacheServer { let key = String::from_utf8(ticket.ticket.to_vec()) .map_err(|e| Status::invalid_argument(format!("Invalid ticket: {}", e)))?; - // Key format: "session_id/object_id" + // Key format: "application_id/session_id/object_id" let object = self.cache.get(key.clone()).await?; let batch = object_to_batch(&object)?; @@ -796,13 +921,13 @@ impl FlightService for FlightCacheServer { ) -> Result, Status> { let stream = request.into_inner(); - let (session_id, object_id, batches) = Self::collect_batches_from_stream(stream).await?; + let (application_id, session_id, object_id, batches) = Self::collect_batches_from_stream(stream).await?; let combined_batch = Self::combine_batches(batches)?; let object = batch_to_object(&combined_batch)?; let metadata = self .cache - .put_with_id(session_id, object_id, object) + .put_with_id(application_id, session_id, object_id, object) .await?; let result = Self::create_put_result(&metadata)?; diff --git a/sdk/python/src/flamepy/agent/client.py b/sdk/python/src/flamepy/agent/client.py index 7b6422b9..c964ea69 100644 --- a/sdk/python/src/flamepy/agent/client.py +++ b/sdk/python/src/flamepy/agent/client.py @@ -86,8 +86,8 @@ def __init__( temp_session_id = short_name(name) # Serialize the context using cloudpickle serialized_ctx = cloudpickle.dumps(ctx, protocol=cloudpickle.DEFAULT_PROTOCOL) - # Put in cache to get ObjectRef - object_ref = put_object(temp_session_id, serialized_ctx) + # Put in cache to get ObjectRef (use name as application_id) + object_ref = put_object(name, temp_session_id, serialized_ctx) # Encode ObjectRef to bytes for core API common_data_bytes = object_ref.encode() diff --git a/sdk/python/src/flamepy/core/cache.py b/sdk/python/src/flamepy/core/cache.py index 82271dbd..76fa9078 100644 --- a/sdk/python/src/flamepy/core/cache.py +++ b/sdk/python/src/flamepy/core/cache.py @@ -29,7 +29,7 @@ class ObjectRef: """Object reference for remote cached objects.""" endpoint: str # Cache server endpoint (e.g., "grpc://127.0.0.1:9090") - key: str # Object key in format "session_id/object_id" + key: str # Object key in format "application_id/session_id/object_id" version: int = 0 def encode(self) -> bytes: @@ -149,10 +149,11 @@ def _do_put_remote(client: flight.FlightClient, descriptor: flight.FlightDescrip raise ValueError("No result metadata received from cache server") -def put_object(session_id: str, obj: Any) -> "ObjectRef": +def put_object(application_id: str, session_id: str, obj: Any) -> "ObjectRef": """Put an object into the cache. Args: + application_id: Application ID (required). Uses format "application_id/session_id/object_id". session_id: The session ID for the object obj: The object to cache (will be pickled) @@ -184,6 +185,9 @@ def put_object(session_id: str, obj: Any) -> "ObjectRef": # Serialize object to Arrow RecordBatch batch = _serialize_object(obj) + # Generate object_id + object_id = str(uuid.uuid4()) + # Check if local storage is configured and accessible if cache_storage: storage_path = Path(cache_storage) @@ -199,11 +203,11 @@ def put_object(session_id: str, obj: Any) -> "ObjectRef": if use_local_storage: # Write to local storage (optimization when client has access to cache filesystem) - object_id = str(uuid.uuid4()) - key = f"{session_id}/{object_id}" - - # Create session directory - session_dir = storage_path / session_id + # Format: application_id/session_id/object_id + key = f"{application_id}/{session_id}/{object_id}" + # Create application/session directory structure + application_dir = storage_path / application_id + session_dir = application_dir / session_id session_dir.mkdir(parents=True, exist_ok=True) # Write Arrow IPC file @@ -230,8 +234,10 @@ def put_object(session_id: str, obj: Any) -> "ObjectRef": # Use remote cache via Arrow Flight client = _get_flight_client(cache_endpoint) - # Encode session_id in FlightDescriptor path - upload_descriptor = flight.FlightDescriptor.for_path(session_id) + # Format: application_id/session_id/object_id + key_path = f"{application_id}/{session_id}/{object_id}" + upload_descriptor = flight.FlightDescriptor.for_path(key_path) + return _do_put_remote(client, upload_descriptor, batch) @@ -287,10 +293,11 @@ def update_object(ref: ObjectRef, new_obj: Any) -> "ObjectRef": client = _get_flight_client(ref.endpoint) # Parse key to validate format + # Format: application_id/session_id/object_id parts = ref.key.split("/") - if len(parts) != 2: - raise ValueError(f"Invalid key format: {ref.key}") + if len(parts) != 3: + raise ValueError(f"Invalid key format (expected application_id/session_id/object_id): {ref.key}") - # Use full key (session_id/object_id) in FlightDescriptor to update existing object + # Use full key in FlightDescriptor to update existing object upload_descriptor = flight.FlightDescriptor.for_path(ref.key) return _do_put_remote(client, upload_descriptor, batch) diff --git a/sdk/python/src/flamepy/runner/runner.py b/sdk/python/src/flamepy/runner/runner.py index 2fa45aca..025d08d2 100644 --- a/sdk/python/src/flamepy/runner/runner.py +++ b/sdk/python/src/flamepy/runner/runner.py @@ -151,8 +151,8 @@ def __init__(self, app: str, execution_object: Any, stateful: bool = False, auto serialized_ctx = cloudpickle.dumps(runner_context, protocol=cloudpickle.DEFAULT_PROTOCOL) # Generate a temporary session_id for caching (will be regenerated by create_session if needed) temp_session_id = short_name(app) - # Put in cache to get ObjectRef - object_ref = put_object(temp_session_id, serialized_ctx) + # Put in cache to get ObjectRef (use app as application_id) + object_ref = put_object(app, temp_session_id, serialized_ctx) # Encode ObjectRef to bytes for core API common_data_bytes = object_ref.encode() # Pass min_instances and max_instances from RunnerContext to create_session diff --git a/sdk/python/src/flamepy/runner/runpy.py b/sdk/python/src/flamepy/runner/runpy.py index 3df89fa7..e9956815 100644 --- a/sdk/python/src/flamepy/runner/runpy.py +++ b/sdk/python/src/flamepy/runner/runpy.py @@ -455,7 +455,8 @@ def on_task_invoke(self, context: TaskContext) -> Optional[TaskOutput]: # Step 6: Put the result into cache and return ObjectRef encoded as bytes # This enables efficient data transfer for large objects logger.debug("Putting result into cache") - result_object_ref = put_object(context.session_id, result) + application_id = self._ssn_ctx.application.name + result_object_ref = put_object(application_id, context.session_id, result) logger.info(f"Result cached with ObjectRef: {result_object_ref}") # For RL module: encode ObjectRef to bytes for core API