diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/client/AeronCacheClient.java b/libraries/java/src/main/java/com/bhf/aeroncache/client/AeronCacheClient.java index 791ccab..a8aa795 100644 --- a/libraries/java/src/main/java/com/bhf/aeroncache/client/AeronCacheClient.java +++ b/libraries/java/src/main/java/com/bhf/aeroncache/client/AeronCacheClient.java @@ -101,6 +101,18 @@ public DeleteCacheResponse deleteCache(String cacheId) throws Exception { return objectMapper.readValue(response.body(), DeleteCacheResponse.class); } + public BulkCacheOpsResponse bulkOps(BulkCacheOpsRequest bulkOpsRequest) throws Exception { + String json = objectMapper.writeValueAsString(bulkOpsRequest); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/api/v1/cache/bulkops")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(json)) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + checkStatus(response); + return objectMapper.readValue(response.body(), BulkCacheOpsResponse.class); + } + // --- Async Operations --- public CompletableFuture createCacheAsync(String cacheId) { @@ -240,6 +252,28 @@ public CompletableFuture deleteCacheAsync(String cacheId) { }); } + public CompletableFuture bulkOpsAsync(BulkCacheOpsRequest bulkOpsRequest) { + try { + String json = objectMapper.writeValueAsString(bulkOpsRequest); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/api/v1/cache/bulkops")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(json)) + .build(); + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenApply(resp -> { + checkStatusAsync(resp); + try { + return objectMapper.readValue(resp.body(), BulkCacheOpsResponse.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + // --- WebSocket --- public ReconnectingWebSocket subscribe(String cacheId, WebSocket.Listener listener) { diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsRequest.java b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsRequest.java new file mode 100644 index 0000000..a08d5ab --- /dev/null +++ b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsRequest.java @@ -0,0 +1,30 @@ +package com.bhf.aeroncache.models; + +import java.util.List; + +public class BulkCacheOpsRequest { + private String requestId; + private List operations; + + public String getRequestId() { return requestId; } + public void setRequestId(String requestId) { this.requestId = requestId; } + + public List getOperations() { return operations; } + public void setOperations(List operations) { this.operations = operations; } + + public static Builder builder() { return new Builder(); } + + public static class Builder { + private BulkCacheOpsRequest request = new BulkCacheOpsRequest(); + private java.util.List operations = new java.util.ArrayList<>(); + + public Builder requestId(String id) { request.setRequestId(id); return this; } + public Builder operations(java.util.List ops) { this.operations = ops; return this; } + public Builder addOperation(CacheOperationRequest op) { this.operations.add(op); return this; } + + public BulkCacheOpsRequest build() { + request.setOperations(operations); + return request; + } + } +} diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsResponse.java b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsResponse.java new file mode 100644 index 0000000..5df781c --- /dev/null +++ b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkCacheOpsResponse.java @@ -0,0 +1,14 @@ +package com.bhf.aeroncache.models; + +import java.util.List; + +public class BulkCacheOpsResponse { + private String requestId; + private List operationResponses; + + public String getRequestId() { return requestId; } + public void setRequestId(String requestId) { this.requestId = requestId; } + + public List getOperationResponses() { return operationResponses; } + public void setOperationResponses(List operationResponses) { this.operationResponses = operationResponses; } +} diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkOperationType.java b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkOperationType.java new file mode 100644 index 0000000..9e226d5 --- /dev/null +++ b/libraries/java/src/main/java/com/bhf/aeroncache/models/BulkOperationType.java @@ -0,0 +1,11 @@ +package com.bhf.aeroncache.models; + +public enum BulkOperationType { + NONE, + CREATE_CACHE, + ADD_ITEM, + REMOVE_ITEM, + CLEAR_CACHE, + GET_ITEM, + DELETE_CACHE +} diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationRequest.java b/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationRequest.java new file mode 100644 index 0000000..240fa70 --- /dev/null +++ b/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationRequest.java @@ -0,0 +1,41 @@ +package com.bhf.aeroncache.models; + +public class CacheOperationRequest { + private BulkOperationType operationType; + private Long ttl; + private String requestId; + private String cacheId; + private String key; + private String value; + + public BulkOperationType getOperationType() { return operationType; } + public void setOperationType(BulkOperationType operationType) { this.operationType = operationType; } + + public Long getTtl() { return ttl; } + public void setTtl(Long ttl) { this.ttl = ttl; } + + public String getRequestId() { return requestId; } + public void setRequestId(String requestId) { this.requestId = requestId; } + + public String getCacheId() { return cacheId; } + public void setCacheId(String cacheId) { this.cacheId = cacheId; } + + public String getKey() { return key; } + public void setKey(String key) { this.key = key; } + + public String getValue() { return value; } + public void setValue(String value) { this.value = value; } + + public static Builder builder() { return new Builder(); } + + public static class Builder { + private CacheOperationRequest request = new CacheOperationRequest(); + public Builder operationType(BulkOperationType type) { request.setOperationType(type); return this; } + public Builder ttl(Long ttl) { request.setTtl(ttl); return this; } + public Builder requestId(String id) { request.setRequestId(id); return this; } + public Builder cacheId(String id) { request.setCacheId(id); return this; } + public Builder key(String key) { request.setKey(key); return this; } + public Builder value(String value) { request.setValue(value); return this; } + public CacheOperationRequest build() { return request; } + } +} diff --git a/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationResponse.java b/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationResponse.java new file mode 100644 index 0000000..abc6957 --- /dev/null +++ b/libraries/java/src/main/java/com/bhf/aeroncache/models/CacheOperationResponse.java @@ -0,0 +1,24 @@ +package com.bhf.aeroncache.models; + +public class CacheOperationResponse { + private String requestId; + private String status; + private String cacheId; + private String key; + private String value; + + public String getRequestId() { return requestId; } + public void setRequestId(String requestId) { this.requestId = requestId; } + + public String getStatus() { return status; } + public void setStatus(String status) { this.status = status; } + + public String getCacheId() { return cacheId; } + public void setCacheId(String cacheId) { this.cacheId = cacheId; } + + public String getKey() { return key; } + public void setKey(String key) { this.key = key; } + + public String getValue() { return value; } + public void setValue(String value) { this.value = value; } +} diff --git a/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientIntegrationTest.java b/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientIntegrationTest.java index 2d0a9df..6c29da9 100644 --- a/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientIntegrationTest.java +++ b/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientIntegrationTest.java @@ -128,4 +128,39 @@ public void testGetAndClearCache() throws Exception { assertEquals(0, getResp2.getItems().size()); } + @Test + public void testBulkOperations() throws Exception { + String cacheId = "it-bulk-" + System.currentTimeMillis(); + String requestId = "req-" + System.currentTimeMillis(); + + BulkCacheOpsRequest request = new BulkCacheOpsRequest.Builder() + .requestId(requestId) + .addOperation(new CacheOperationRequest.Builder() + .operationType(BulkOperationType.CREATE_CACHE) + .requestId("op-1") + .cacheId(cacheId) + .build()) + .addOperation(new CacheOperationRequest.Builder() + .operationType(BulkOperationType.ADD_ITEM) + .requestId("op-2") + .cacheId(cacheId) + .key("bulk-key") + .value("bulk-val") + .build()) + .addOperation(new CacheOperationRequest.Builder() + .operationType(BulkOperationType.GET_ITEM) + .requestId("op-3") + .cacheId(cacheId) + .key("bulk-key") + .build()) + .build(); + + BulkCacheOpsResponse response = client.bulkOps(request); + assertNotNull(response); + assertEquals(requestId, response.getRequestId()); + assertEquals(3, response.getOperationResponses().size()); + + assertEquals("op-3", response.getOperationResponses().get(2).getRequestId()); + assertEquals("bulk-val", response.getOperationResponses().get(2).getValue()); + } } diff --git a/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientTest.java b/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientTest.java index 0f9eb16..144b76a 100644 --- a/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientTest.java +++ b/libraries/java/src/test/java/com/bhf/aeroncache/client/AeronCacheClientTest.java @@ -5,6 +5,8 @@ import com.bhf.aeroncache.models.PutItemResponse; import com.bhf.aeroncache.models.ClearCacheResponse; import com.bhf.aeroncache.models.GetCacheResponse; +import com.bhf.aeroncache.models.BulkCacheOpsRequest; +import com.bhf.aeroncache.models.BulkCacheOpsResponse; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; @@ -97,6 +99,21 @@ public void testHttpErrorThrowsException() { assertTrue(exception.getMessage().contains("Http Error: 500")); } + @Test + public void testBulkOps() throws Exception { + stubFor(post(urlEqualTo("/api/v1/cache/bulkops")) + .willReturn(aResponse() + .withHeader("Content-Type", "application/json") + .withBody("{\"requestId\":\"req-1\",\"operationResponses\":[]}"))); + + BulkCacheOpsResponse response = client.bulkOps(BulkCacheOpsRequest.builder() + .requestId("req-1") + .operations(java.util.Collections.emptyList()) + .build()); + assertNotNull(response); + assertEquals("req-1", response.getRequestId()); + } + @Test public void testAllowBusinessLogicHttpErrors() throws Exception { stubFor(get(urlEqualTo("/api/v1/cache/test-cache/non-existent-key")) diff --git a/libraries/python/aeron_cache/client.py b/libraries/python/aeron_cache/client.py index 0a23227..99b8c77 100644 --- a/libraries/python/aeron_cache/client.py +++ b/libraries/python/aeron_cache/client.py @@ -12,7 +12,10 @@ DeleteCacheResponse, GetCacheResponse, ClearCacheResponse, - CacheUpdateEvent + CacheUpdateEvent, + BulkCacheOpsRequest, + BulkCacheOpsResponse, + BulkOperationType ) class AeronCacheClient: @@ -93,6 +96,22 @@ def delete_cache(self, cache_id) -> DeleteCacheResponse: operationStatus=data.get('operationStatus') ) + def bulk_ops(self, request: BulkCacheOpsRequest) -> BulkCacheOpsResponse: + url = f"{self.base_url}/api/v1/cache/bulkops" + # Convert request to dict, handling Enum and Optional fields + payload = { + "requestId": request.requestId, + "operations": [ + {k: (v.value if isinstance(v, BulkOperationType) else v) + for k, v in op.__dict__.items() if v is not None} + for op in request.operations + ] + } + response = requests.post(url, json=payload) + if response.status_code >= 400 and response.status_code not in [400, 401, 404]: + response.raise_for_status() + return BulkCacheOpsResponse.from_dict(response.json()) + # --- Async Operations --- async def create_cache_async(self, cache_id) -> CreateResponse: @@ -194,6 +213,23 @@ async def delete_cache_async(self, cache_id) -> DeleteCacheResponse: operationStatus=data.get('operationStatus') ) + async def bulk_ops_async(self, request: BulkCacheOpsRequest) -> BulkCacheOpsResponse: + url = f"{self.base_url}/api/v1/cache/bulkops" + payload = { + "requestId": request.requestId, + "operations": [ + {k: (v.value if isinstance(v, BulkOperationType) else v) + for k, v in op.__dict__.items() if v is not None} + for op in request.operations + ] + } + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload) as response: + if response.status >= 400 and response.status not in [400, 401, 404]: + response.raise_for_status() + data = await response.json() + return BulkCacheOpsResponse.from_dict(data) + # --- WebSocket --- def get_cache(self, cache_id: str) -> EmbeddedAeronCache: diff --git a/libraries/python/aeron_cache/models.py b/libraries/python/aeron_cache/models.py index d0d1a9e..9273db2 100644 --- a/libraries/python/aeron_cache/models.py +++ b/libraries/python/aeron_cache/models.py @@ -89,3 +89,49 @@ def from_dict(cls, data: dict): class ClearCacheResponse: cacheId: str operationStatus: str + +from enum import Enum + +class BulkOperationType(str, Enum): + NONE = "NONE" + CREATE_CACHE = "CREATE_CACHE" + ADD_ITEM = "ADD_ITEM" + REMOVE_ITEM = "REMOVE_ITEM" + CLEAR_CACHE = "CLEAR_CACHE" + GET_ITEM = "GET_ITEM" + DELETE_CACHE = "DELETE_CACHE" + +@dataclass +class CacheOperationRequest: + operationType: BulkOperationType + requestId: str + cacheId: str + key: Optional[str] = None + value: Optional[str] = None + ttl: Optional[int] = None + +@dataclass +class BulkCacheOpsRequest: + requestId: str + operations: list[CacheOperationRequest] + +@dataclass +class CacheOperationResponse: + requestId: str + status: str + cacheId: str + key: Optional[str] = None + value: Optional[str] = None + +@dataclass +class BulkCacheOpsResponse: + requestId: str + operationResponses: list[CacheOperationResponse] + + @classmethod + def from_dict(cls, data: dict): + ops = [CacheOperationResponse(**op) for op in data.get('operationResponses', [])] + return cls( + requestId=data.get('requestId'), + operationResponses=ops + ) diff --git a/libraries/python/tests/test_client.py b/libraries/python/tests/test_client.py index 60602b5..22275c4 100644 --- a/libraries/python/tests/test_client.py +++ b/libraries/python/tests/test_client.py @@ -70,6 +70,40 @@ def test_delete_cache(client): response = client.delete_cache("test-cache") assert response.cacheId == "test-cache" +@responses.activate +def test_bulk_ops(client): + from aeron_cache.models import BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType + + responses.add( + responses.POST, + "http://localhost:7070/api/v1/cache/bulkops", + json={ + "requestId": "req-1", + "operationResponses": [ + {"requestId": "op-1", "status": "SUCCESS", "cacheId": "test-cache", "key": "k1"} + ] + }, + status=200 + ) + + request = BulkCacheOpsRequest( + requestId="req-1", + operations=[ + CacheOperationRequest( + operationType=BulkOperationType.ADD_ITEM, + requestId="op-1", + cacheId="test-cache", + key="k1", + value="v1" + ) + ] + ) + + response = client.bulk_ops(request) + assert response.requestId == "req-1" + assert len(response.operationResponses) == 1 + assert response.operationResponses[0].status == "SUCCESS" + @responses.activate def test_http_error_throws_exception(client): responses.add( diff --git a/libraries/python/tests/test_integration.py b/libraries/python/tests/test_integration.py index f0aa471..c12ac9d 100644 --- a/libraries/python/tests/test_integration.py +++ b/libraries/python/tests/test_integration.py @@ -99,5 +99,51 @@ def test_get_and_clear_cache(client): clear_resp = client.clear_cache(cache_id) assert clear_resp.operationStatus == "SUCCESS" +def test_bulk_operations(client): + from aeron_cache.models import BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType + import uuid + + cache_id = f"it-bulk-{uuid.uuid4().hex[:8]}" + req_id = str(uuid.uuid4()) + + request = BulkCacheOpsRequest( + requestId=req_id, + operations=[ + CacheOperationRequest( + operationType=BulkOperationType.CREATE_CACHE, + requestId="op-1", + cacheId=cache_id + ), + CacheOperationRequest( + operationType=BulkOperationType.ADD_ITEM, + requestId="op-2", + cacheId=cache_id, + key="bulk-key", + value="bulk-val" + ), + CacheOperationRequest( + operationType=BulkOperationType.GET_ITEM, + requestId="op-3", + cacheId=cache_id, + key="bulk-key" + ) + ] + ) + + response = client.bulk_ops(request) + assert response.requestId == req_id + assert len(response.operationResponses) == 3 + + # Verify CREATE_CACHE + assert response.operationResponses[0].requestId == "op-1" + + # Verify ADD_ITEM + assert response.operationResponses[1].requestId == "op-2" + + # Verify GET_ITEM + assert response.operationResponses[2].requestId == "op-3" + assert response.operationResponses[2].value == "bulk-val" + get_resp2 = client.get_cache_items(cache_id) - assert len(get_resp2.items) == 0 + assert len(get_resp2.items) == 1 + assert get_resp2.items[0].key == "bulk-key" diff --git a/libraries/rust/src/lib.rs b/libraries/rust/src/lib.rs index 0192051..e513eb8 100644 --- a/libraries/rust/src/lib.rs +++ b/libraries/rust/src/lib.rs @@ -112,6 +112,58 @@ pub struct CacheUpdateEvent { pub request_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum BulkOperationType { + None, + CreateCache, + AddItem, + RemoveItem, + ClearCache, + GetItem, + DeleteCache, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct CacheOperationRequest { + pub operation_type: BulkOperationType, + pub request_id: String, + pub cache_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ttl: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BulkCacheOpsRequest { + pub request_id: String, + pub operations: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct CacheOperationResponse { + pub request_id: String, + pub status: String, + pub cache_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BulkCacheOpsResponse { + pub request_id: String, + pub operation_responses: Vec, +} + pub struct AeronCacheClient { pub base_url: String, pub ws_url: String, @@ -204,6 +256,18 @@ impl AeronCacheClient { Ok(data) } + pub fn bulk_ops(&self, req: &BulkCacheOpsRequest) -> Result> { + let url = format!("{}/api/v1/cache/bulkops", self.base_url); + let resp = self.get_sync_client().post(&url) + .json(req) + .send()?; + if !resp.status().is_success() && resp.status() != 400 && resp.status() != 401 && resp.status() != 404 { + return Err(format!("HTTP Error: {} - {}", resp.status(), resp.text()?).into()); + } + let data = resp.json::()?; + Ok(data) + } + // --- Async Operations --- pub async fn create_cache_async(&self, cache_id: &str) -> Result> { @@ -278,6 +342,19 @@ impl AeronCacheClient { Ok(data) } + pub async fn bulk_ops_async(&self, req: &BulkCacheOpsRequest) -> Result> { + let url = format!("{}/api/v1/cache/bulkops", self.base_url); + let resp = self.async_client.post(&url) + .json(req) + .send() + .await?; + if !resp.status().is_success() && resp.status() != 400 && resp.status() != 401 && resp.status() != 404 { + return Err(format!("HTTP Error: {} - {}", resp.status(), resp.text().await?).into()); + } + let data = resp.json::().await?; + Ok(data) + } + pub fn get_cache_items(&self, cache_id: &str) -> Result> { let url = format!("{}/api/v1/cache/{}", self.base_url, cache_id); diff --git a/libraries/rust/tests/client_tests.rs b/libraries/rust/tests/client_tests.rs index 1013e40..2790bdb 100644 --- a/libraries/rust/tests/client_tests.rs +++ b/libraries/rust/tests/client_tests.rs @@ -106,3 +106,38 @@ fn test_clear_cache() { assert_eq!(response.operation_status, "SUCCESS"); } + +#[test] +fn test_bulk_ops() { + use aeron_cache_embedded_client::{BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType}; + + let mut server = mockito::Server::new(); + let url = server.url(); + + let _m = server.mock("POST", "/api/v1/cache/bulkops") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"{"requestId": "req-1", "operationResponses": [{"requestId": "op-1", "status": "SUCCESS", "cacheId": "test-cache", "key": "k1"}]}"#) + .create(); + + let client = AeronCacheClient::new(url, "ws://localhost".into()); + + let request = BulkCacheOpsRequest { + request_id: "req-1".to_string(), + operations: vec![ + CacheOperationRequest { + operation_type: BulkOperationType::AddItem, + request_id: "op-1".to_string(), + cache_id: "test-cache".to_string(), + key: Some("k1".to_string()), + value: Some("v1".to_string()), + ttl: None, + } + ], + }; + + let response = client.bulk_ops(&request).unwrap(); + assert_eq!(response.request_id, "req-1"); + assert_eq!(response.operation_responses.len(), 1); + assert_eq!(response.operation_responses[0].status, "SUCCESS"); +} diff --git a/libraries/rust/tests/integration_tests.rs b/libraries/rust/tests/integration_tests.rs index 5a55618..9028ee8 100644 --- a/libraries/rust/tests/integration_tests.rs +++ b/libraries/rust/tests/integration_tests.rs @@ -107,3 +107,55 @@ fn test_get_and_clear_cache() { let get_resp2 = client.get_cache_items(&cache_id).unwrap(); assert_eq!(get_resp2.items.len(), 0); } + +#[test] +fn test_integration_bulk_operations() { + use aeron_cache_embedded_client::{BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType}; + + let Some((base_url, ws_url)) = get_urls() else { + println!("Skipping test_integration_bulk_operations: AERON_CACHE_BASE_URL not set"); + return; + }; + + let client = AeronCacheClient::new(base_url, ws_url); + let cache_id = generate_id("it-bulk"); + let req_id = generate_id("req"); + + let request = BulkCacheOpsRequest { + request_id: req_id.clone(), + operations: vec![ + CacheOperationRequest { + operation_type: BulkOperationType::CreateCache, + request_id: "op-1".to_string(), + cache_id: cache_id.clone(), + key: None, + value: None, + ttl: None, + }, + CacheOperationRequest { + operation_type: BulkOperationType::AddItem, + request_id: "op-2".to_string(), + cache_id: cache_id.clone(), + key: Some("bulk-key".to_string()), + value: Some("bulk-val".to_string()), + ttl: None, + }, + CacheOperationRequest { + operation_type: BulkOperationType::GetItem, + request_id: "op-3".to_string(), + cache_id: cache_id.clone(), + key: Some("bulk-key".to_string()), + value: None, + ttl: None, + }, + ], + }; + + let response = client.bulk_ops(&request).expect("Failed bulk operations"); + assert_eq!(response.request_id, req_id); + assert_eq!(response.operation_responses.len(), 3); + + // Verify GET_ITEM result + assert_eq!(response.operation_responses[2].request_id, "op-3"); + assert_eq!(response.operation_responses[2].value, Some("bulk-val".to_string())); +} diff --git a/libraries/typescript/src/index.test.ts b/libraries/typescript/src/index.test.ts index 0657ac6..6c38220 100644 --- a/libraries/typescript/src/index.test.ts +++ b/libraries/typescript/src/index.test.ts @@ -91,6 +91,25 @@ describe('AeronCacheClient', () => { expect(response).toEqual(mockResponse); }); + it('should perform bulk operations', async () => { + const mockResponse = { requestId: 'req-1', operationResponses: [] }; + (global.fetch as jest.Mock).mockResolvedValue({ + ok: true, + status: 200, + json: async () => mockResponse + }); + + const request = { requestId: 'req-1', operations: [] }; + const response = await client.bulkOps(request); + + expect(global.fetch).toHaveBeenCalledWith('http://localhost:7070/api/v1/cache/bulkops', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(request) + }); + expect(response).toEqual(mockResponse); + }); + it('should throw error on 500 status', async () => { (global.fetch as jest.Mock).mockResolvedValue({ ok: false, diff --git a/libraries/typescript/src/index.ts b/libraries/typescript/src/index.ts index 9156c90..dcd82af 100644 --- a/libraries/typescript/src/index.ts +++ b/libraries/typescript/src/index.ts @@ -7,7 +7,9 @@ import { DeleteCacheResponse, CacheUpdateEvent, GetCacheResponse, - ClearCacheResponse + ClearCacheResponse, + BulkCacheOpsRequest, + BulkCacheOpsResponse } from './models'; export { EmbeddedAeronCache }; @@ -95,6 +97,15 @@ export class AeronCacheClient { return this.handleResponse(response); } + async bulkOps(request: BulkCacheOpsRequest): Promise { + const response = await fetch(`${this.baseUrl}/api/v1/cache/bulkops`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(request) + }); + return this.handleResponse(response); + } + getCache(cacheId: string): EmbeddedAeronCache { return new EmbeddedAeronCache(this, cacheId); } diff --git a/libraries/typescript/src/integration.test.ts b/libraries/typescript/src/integration.test.ts index 4164df4..20ccd63 100644 --- a/libraries/typescript/src/integration.test.ts +++ b/libraries/typescript/src/integration.test.ts @@ -109,4 +109,40 @@ if (shouldRun && !wsUrl) { const getResp2 = await client.getCacheItems(cacheId); expect(getResp2.items.length).toBe(0); }); + + test('bulk_operations behaves correctly', async () => { + const cacheId = `it-bulk-${Date.now()}`; + const requestId = `req-${Date.now()}`; + + const request = { + requestId, + operations: [ + { + operationType: 'CREATE_CACHE' as const, + requestId: 'op-1', + cacheId + }, + { + operationType: 'ADD_ITEM' as const, + requestId: 'op-2', + cacheId, + key: 'bulk-key', + value: 'bulk-val' + }, + { + operationType: 'GET_ITEM' as const, + requestId: 'op-3', + cacheId, + key: 'bulk-key' + } + ] + }; + + const response = await client.bulkOps(request); + expect(response.requestId).toBe(requestId); + expect(response.operationResponses.length).toBe(3); + + expect(response.operationResponses[2].requestId).toBe('op-3'); + expect(response.operationResponses[2].value).toBe('bulk-val'); + }); }); diff --git a/libraries/typescript/src/models.ts b/libraries/typescript/src/models.ts index 8b92512..0597d14 100644 --- a/libraries/typescript/src/models.ts +++ b/libraries/typescript/src/models.ts @@ -65,3 +65,39 @@ export interface ClearCacheResponse { cacheId: string; operationStatus: string; } + +export type BulkOperationType = + | 'NONE' + | 'CREATE_CACHE' + | 'ADD_ITEM' + | 'REMOVE_ITEM' + | 'CLEAR_CACHE' + | 'GET_ITEM' + | 'DELETE_CACHE'; + +export interface CacheOperationRequest { + operationType: BulkOperationType; + requestId: string; + cacheId: string; + key?: string; + value?: string; + ttl?: number; +} + +export interface BulkCacheOpsRequest { + requestId: string; + operations: CacheOperationRequest[]; +} + +export interface CacheOperationResponse { + requestId: string; + status: string; + cacheId: string; + key?: string; + value?: string; +} + +export interface BulkCacheOpsResponse { + requestId: string; + operationResponses: CacheOperationResponse[]; +} diff --git a/samples/README.md b/samples/README.md index 234e3eb..c116642 100644 --- a/samples/README.md +++ b/samples/README.md @@ -2,10 +2,11 @@ This folder contains example applications demonstrating how to use the Aeron Cache Embedded Clients in Java, TypeScript, Python, and Rust. -The samples for each language are split into three distinct categories: +The samples for each language are split into distinct categories: 1. **Sync Sample**: Demonstrates sequential, synchronous-style operations. 2. **Async Sample**: Demonstrates non-blocking, asynchronous operations. 3. **Streaming Sample**: Demonstrates how to listen for real-time updates from the server. +4. **Bulk Sample**: Demonstrates executing multiple operations in a single request. ## Structure diff --git a/samples/java/async-sample/build.gradle.kts b/samples/java/async-sample/build.gradle.kts index 6fffd20..7bed6a8 100644 --- a/samples/java/async-sample/build.gradle.kts +++ b/samples/java/async-sample/build.gradle.kts @@ -9,7 +9,7 @@ repositories { } dependencies { - implementation("com.aeron.cache:aeron-cache-embedded-client:1.0.0") + implementation("com.bhf.aeroncache:aeron-cache-embedded-client:1.0.1") implementation("org.slf4j:slf4j-simple:2.0.13") } diff --git a/samples/java/bulk-sample/build.gradle.kts b/samples/java/bulk-sample/build.gradle.kts new file mode 100644 index 0000000..382755c --- /dev/null +++ b/samples/java/bulk-sample/build.gradle.kts @@ -0,0 +1,18 @@ +plugins { + application + java +} + +repositories { + mavenLocal() + mavenCentral() +} + +dependencies { + implementation("com.bhf.aeroncache:aeron-cache-embedded-client:1.0.1") + implementation("org.slf4j:slf4j-simple:2.0.13") +} + +application { + mainClass.set("com.bhf.aeroncache.samples.BulkSample") +} diff --git a/samples/java/bulk-sample/src/main/java/com/bhf/aeroncache/samples/BulkSample.java b/samples/java/bulk-sample/src/main/java/com/bhf/aeroncache/samples/BulkSample.java new file mode 100644 index 0000000..45d4d3b --- /dev/null +++ b/samples/java/bulk-sample/src/main/java/com/bhf/aeroncache/samples/BulkSample.java @@ -0,0 +1,52 @@ +package com.bhf.aeroncache.samples; + +import com.bhf.aeroncache.client.AeronCacheClient; +import com.bhf.aeroncache.models.*; + +import java.util.Arrays; +import java.util.UUID; + +public class BulkSample { + public static void main(String[] args) throws Exception { + String baseUrl = args.length > 0 ? args[0] : "http://localhost:7070"; + String wsUrl = args.length > 1 ? args[1] : "ws://localhost:7071"; + + System.out.println("Starting Java Bulk Operations Sample against " + baseUrl); + + AeronCacheClient client = new AeronCacheClient(baseUrl, wsUrl); + String cacheId = "bulk-java-sample"; + + BulkCacheOpsRequest bulkRequest = BulkCacheOpsRequest.builder() + .requestId(UUID.randomUUID().toString()) + .operations(Arrays.asList( + CacheOperationRequest.builder() + .operationType(BulkOperationType.CREATE_CACHE) + .requestId("op-1") + .cacheId(cacheId) + .build(), + CacheOperationRequest.builder() + .operationType(BulkOperationType.ADD_ITEM) + .requestId("op-2") + .cacheId(cacheId) + .key("java-bulk-1") + .value("value-1") + .build(), + CacheOperationRequest.builder() + .operationType(BulkOperationType.GET_ITEM) + .requestId("op-3") + .cacheId(cacheId) + .key("java-bulk-1") + .build() + )) + .build(); + + System.out.println("Executing bulk operations..."); + BulkCacheOpsResponse response = client.bulkOps(bulkRequest); + + System.out.println("Bulk Response ID: " + response.getRequestId()); + for (CacheOperationResponse opResp : response.getOperationResponses()) { + System.out.printf(" Op %s: status=%s, cache=%s, key=%s, value=%s%n", + opResp.getRequestId(), opResp.getStatus(), opResp.getCacheId(), opResp.getKey(), opResp.getValue()); + } + } +} diff --git a/samples/java/streaming-sample/build.gradle.kts b/samples/java/streaming-sample/build.gradle.kts index bebb3df..be304d8 100644 --- a/samples/java/streaming-sample/build.gradle.kts +++ b/samples/java/streaming-sample/build.gradle.kts @@ -9,7 +9,7 @@ repositories { } dependencies { - implementation("com.aeron.cache:aeron-cache-embedded-client:1.0.0") + implementation("com.bhf.aeroncache:aeron-cache-embedded-client:1.0.1") implementation("org.slf4j:slf4j-simple:2.0.13") } diff --git a/samples/java/sync-sample/build.gradle.kts b/samples/java/sync-sample/build.gradle.kts index c7dc763..b7b3b34 100644 --- a/samples/java/sync-sample/build.gradle.kts +++ b/samples/java/sync-sample/build.gradle.kts @@ -9,7 +9,7 @@ repositories { } dependencies { - implementation("com.aeron.cache:aeron-cache-embedded-client:1.0.0") + implementation("com.bhf.aeroncache:aeron-cache-embedded-client:1.0.1") implementation("org.slf4j:slf4j-simple:2.0.13") } diff --git a/samples/python/bulk-sample/bulk_sample.py b/samples/python/bulk-sample/bulk_sample.py new file mode 100644 index 0000000..d3b0ba6 --- /dev/null +++ b/samples/python/bulk-sample/bulk_sample.py @@ -0,0 +1,62 @@ +import sys +import uuid +from aeron_cache.client import AeronCacheClient +from aeron_cache.models import BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType + + +def main(): + base_url = "http://localhost:7070" + ws_url = "http://localhost:7071" + if len(sys.argv) > 1: + base_url = sys.argv[1] + ws_url = sys.argv[2] + + print(f"Starting Bulk Operations Sample against {base_url}") + + client = AeronCacheClient(base_url, ws_url) + cache_id = "bulk-sample-cache" + + # Define a series of operations to be executed in bulk + operations = [ + CacheOperationRequest( + operationType=BulkOperationType.CREATE_CACHE, + requestId="op-1", + cacheId=cache_id + ), + CacheOperationRequest( + operationType=BulkOperationType.ADD_ITEM, + requestId="op-2", + cacheId=cache_id, + key="bulk-key-1", + value="bulk-value-1" + ), + CacheOperationRequest( + operationType=BulkOperationType.ADD_ITEM, + requestId="op-3", + cacheId=cache_id, + key="bulk-key-2", + value="bulk-value-2" + ), + CacheOperationRequest( + operationType=BulkOperationType.GET_ITEM, + requestId="op-4", + cacheId=cache_id, + key="bulk-key-1" + ) + ] + + bulk_request = BulkCacheOpsRequest( + requestId=str(uuid.uuid4()), + operations=operations + ) + + print(f"Executing {len(operations)} operations in bulk...") + response = client.bulk_ops(bulk_request) + + print(f"Bulk Request ID: {response.requestId}") + for op_resp in response.operationResponses: + print(f" Operation {op_resp.requestId}: status={op_resp.status}, cache={op_resp.cacheId}, key={op_resp.key}, value={op_resp.value}") + + +if __name__ == "__main__": + main() diff --git a/samples/python/bulk-sample/pyproject.toml b/samples/python/bulk-sample/pyproject.toml new file mode 100644 index 0000000..67ac71b --- /dev/null +++ b/samples/python/bulk-sample/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "bulk-sample" +version = "0.1.0" +description = "Aeron Cache Bulk Operations Sample" +dependencies = [ + "aeron-cache-embedded-client", +] + +[tool.setuptools] +py-modules = ["bulk_sample"] + +[tool.setuptools.package-dir] +"" = "." diff --git a/samples/rust/bulk-sample/Cargo.toml b/samples/rust/bulk-sample/Cargo.toml new file mode 100644 index 0000000..87f031b --- /dev/null +++ b/samples/rust/bulk-sample/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "bulk-sample" +version = "0.1.0" +edition = "2021" + +[dependencies] +aeron-cache-embedded-client = { path = "../../../libraries/rust" } +tokio = { version = "1.0", features = ["full"] } +uuid = { version = "1.0", features = ["v4"] } diff --git a/samples/rust/bulk-sample/src/main.rs b/samples/rust/bulk-sample/src/main.rs new file mode 100644 index 0000000..99fd9ff --- /dev/null +++ b/samples/rust/bulk-sample/src/main.rs @@ -0,0 +1,60 @@ +use aeron_cache_embedded_client::{ + AeronCacheClient, BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType +}; +use std::env; +use uuid::Uuid; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + let base_url = args.get(1).map(|s| s.as_str()).unwrap_or("http://localhost:7070"); + let ws_url = args.get(2).map(|s| s.as_str()).unwrap_or("ws://localhost:7071"); + + println!("Starting Rust Bulk Operations Sample against {}", base_url); + + let client = AeronCacheClient::new(base_url.to_string(), ws_url.to_string()); + let cache_id = "bulk-rust-sample"; + + let bulk_request = BulkCacheOpsRequest { + request_id: Uuid::new_v4().to_string(), + operations: vec![ + CacheOperationRequest { + operation_type: BulkOperationType::CreateCache, + request_id: "op-1".to_string(), + cache_id: cache_id.to_string(), + key: None, + value: None, + ttl: None, + }, + CacheOperationRequest { + operation_type: BulkOperationType::AddItem, + request_id: "op-2".to_string(), + cache_id: cache_id.to_string(), + key: Some("rust-bulk-1".to_string()), + value: Some("value-1".to_string()), + ttl: None, + }, + CacheOperationRequest { + operation_type: BulkOperationType::GetItem, + request_id: "op-3".to_string(), + cache_id: cache_id.to_string(), + key: Some("rust-bulk-1".to_string()), + value: None, + ttl: None, + }, + ], + }; + + println!("Executing bulk operations..."); + let response = client.bulk_ops_async(&bulk_request).await?; + + println!("Bulk Response ID: {}", response.request_id); + for op_resp in response.operation_responses { + println!( + " Op {}: status={}, cache={}, key={:?}, value={:?}", + op_resp.request_id, op_resp.status, op_resp.cache_id, op_resp.key, op_resp.value + ); + } + + Ok(()) +} diff --git a/samples/typescript/bulk-sample/package.json b/samples/typescript/bulk-sample/package.json new file mode 100644 index 0000000..8dfe77c --- /dev/null +++ b/samples/typescript/bulk-sample/package.json @@ -0,0 +1,16 @@ +{ + "name": "bulk-sample", + "version": "1.0.0", + "description": "Bulk Operations sample for Aeron Cache Embedded Client", + "main": "dist/index.js", + "scripts": { + "build": "tsc", + "start": "node dist/index.js" + }, + "dependencies": { + "aeron-cache-embedded-client": "file:../../../libraries/typescript" + }, + "devDependencies": { + "typescript": "^5.0.0" + } +} diff --git a/samples/typescript/bulk-sample/src/index.ts b/samples/typescript/bulk-sample/src/index.ts new file mode 100644 index 0000000..f432da0 --- /dev/null +++ b/samples/typescript/bulk-sample/src/index.ts @@ -0,0 +1,50 @@ +import { AeronCacheClient } from 'aeron-cache-embedded-client'; + +async function main() { + const args = process.argv.slice(2); + const baseUrl = args[0] || 'http://localhost:7070'; + const wsUrl = args[1] || 'ws://localhost:7071'; + + console.log(`Starting TypeScript Bulk Operations Sample against ${baseUrl}`); + + const client = new AeronCacheClient(baseUrl, wsUrl); + const cacheId = 'bulk-ts-sample'; + + const bulkRequest = { + requestId: Math.random().toString(36).substring(7), + operations: [ + { + operationType: 'CREATE_CACHE' as const, + requestId: 'op-1', + cacheId: cacheId + }, + { + operationType: 'ADD_ITEM' as const, + requestId: 'op-2', + cacheId: cacheId, + key: 'ts-bulk-1', + value: 'value-1' + }, + { + operationType: 'GET_ITEM' as const, + requestId: 'op-3', + cacheId: cacheId, + key: 'ts-bulk-1' + } + ] + }; + + try { + console.log('Executing bulk operations...'); + const response = await client.bulkOps(bulkRequest); + + console.log(`Bulk Response ID: ${response.requestId}`); + for (const opResp of response.operationResponses) { + console.log(` Op ${opResp.requestId}: status=${opResp.status}, cache=${opResp.cacheId}, key=${opResp.key}, value=${opResp.value}`); + } + } catch (error) { + console.error('Error executing bulk operations:', error); + } +} + +main(); diff --git a/samples/typescript/bulk-sample/tsconfig.json b/samples/typescript/bulk-sample/tsconfig.json new file mode 100644 index 0000000..c771420 --- /dev/null +++ b/samples/typescript/bulk-sample/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "CommonJS", + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["src/**/*"] +}