Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ public DeleteCacheResponse deleteCache(String cacheId) throws Exception {
return objectMapper.readValue(response.body(), DeleteCacheResponse.class);
}

public BulkCacheOpsResponse bulkOps(BulkCacheOpsRequest bulkOpsRequest) throws Exception {
String json = objectMapper.writeValueAsString(bulkOpsRequest);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/api/v1/cache/bulkops"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
checkStatus(response);
return objectMapper.readValue(response.body(), BulkCacheOpsResponse.class);
}

// --- Async Operations ---

public CompletableFuture<CreateResponse> createCacheAsync(String cacheId) {
Expand Down Expand Up @@ -240,6 +252,28 @@ public CompletableFuture<DeleteCacheResponse> deleteCacheAsync(String cacheId) {
});
}

public CompletableFuture<BulkCacheOpsResponse> bulkOpsAsync(BulkCacheOpsRequest bulkOpsRequest) {
try {
String json = objectMapper.writeValueAsString(bulkOpsRequest);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/api/v1/cache/bulkops"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(resp -> {
checkStatusAsync(resp);
try {
return objectMapper.readValue(resp.body(), BulkCacheOpsResponse.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

// --- WebSocket ---

public ReconnectingWebSocket subscribe(String cacheId, WebSocket.Listener listener) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.bhf.aeroncache.models;

import java.util.List;

public class BulkCacheOpsRequest {
private String requestId;
private List<CacheOperationRequest> operations;

public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }

public List<CacheOperationRequest> getOperations() { return operations; }
public void setOperations(List<CacheOperationRequest> operations) { this.operations = operations; }

public static Builder builder() { return new Builder(); }

public static class Builder {
private BulkCacheOpsRequest request = new BulkCacheOpsRequest();
private java.util.List<CacheOperationRequest> operations = new java.util.ArrayList<>();

public Builder requestId(String id) { request.setRequestId(id); return this; }
public Builder operations(java.util.List<CacheOperationRequest> ops) { this.operations = ops; return this; }
public Builder addOperation(CacheOperationRequest op) { this.operations.add(op); return this; }

public BulkCacheOpsRequest build() {
request.setOperations(operations);
return request;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.bhf.aeroncache.models;

import java.util.List;

public class BulkCacheOpsResponse {
private String requestId;
private List<CacheOperationResponse> operationResponses;

public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }

public List<CacheOperationResponse> getOperationResponses() { return operationResponses; }
public void setOperationResponses(List<CacheOperationResponse> operationResponses) { this.operationResponses = operationResponses; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.bhf.aeroncache.models;

public enum BulkOperationType {
NONE,
CREATE_CACHE,
ADD_ITEM,
REMOVE_ITEM,
CLEAR_CACHE,
GET_ITEM,
DELETE_CACHE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.bhf.aeroncache.models;

public class CacheOperationRequest {
private BulkOperationType operationType;
private Long ttl;
private String requestId;
private String cacheId;
private String key;
private String value;

public BulkOperationType getOperationType() { return operationType; }
public void setOperationType(BulkOperationType operationType) { this.operationType = operationType; }

public Long getTtl() { return ttl; }
public void setTtl(Long ttl) { this.ttl = ttl; }

public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }

public String getCacheId() { return cacheId; }
public void setCacheId(String cacheId) { this.cacheId = cacheId; }

public String getKey() { return key; }
public void setKey(String key) { this.key = key; }

public String getValue() { return value; }
public void setValue(String value) { this.value = value; }

public static Builder builder() { return new Builder(); }

public static class Builder {
private CacheOperationRequest request = new CacheOperationRequest();
public Builder operationType(BulkOperationType type) { request.setOperationType(type); return this; }
public Builder ttl(Long ttl) { request.setTtl(ttl); return this; }
public Builder requestId(String id) { request.setRequestId(id); return this; }
public Builder cacheId(String id) { request.setCacheId(id); return this; }
public Builder key(String key) { request.setKey(key); return this; }
public Builder value(String value) { request.setValue(value); return this; }
public CacheOperationRequest build() { return request; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.bhf.aeroncache.models;

public class CacheOperationResponse {
private String requestId;
private String status;
private String cacheId;
private String key;
private String value;

public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }

public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }

public String getCacheId() { return cacheId; }
public void setCacheId(String cacheId) { this.cacheId = cacheId; }

public String getKey() { return key; }
public void setKey(String key) { this.key = key; }

public String getValue() { return value; }
public void setValue(String value) { this.value = value; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,39 @@ public void testGetAndClearCache() throws Exception {
assertEquals(0, getResp2.getItems().size());
}

@Test
public void testBulkOperations() throws Exception {
String cacheId = "it-bulk-" + System.currentTimeMillis();
String requestId = "req-" + System.currentTimeMillis();

BulkCacheOpsRequest request = new BulkCacheOpsRequest.Builder()
.requestId(requestId)
.addOperation(new CacheOperationRequest.Builder()
.operationType(BulkOperationType.CREATE_CACHE)
.requestId("op-1")
.cacheId(cacheId)
.build())
.addOperation(new CacheOperationRequest.Builder()
.operationType(BulkOperationType.ADD_ITEM)
.requestId("op-2")
.cacheId(cacheId)
.key("bulk-key")
.value("bulk-val")
.build())
.addOperation(new CacheOperationRequest.Builder()
.operationType(BulkOperationType.GET_ITEM)
.requestId("op-3")
.cacheId(cacheId)
.key("bulk-key")
.build())
.build();

BulkCacheOpsResponse response = client.bulkOps(request);
assertNotNull(response);
assertEquals(requestId, response.getRequestId());
assertEquals(3, response.getOperationResponses().size());

assertEquals("op-3", response.getOperationResponses().get(2).getRequestId());
assertEquals("bulk-val", response.getOperationResponses().get(2).getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.bhf.aeroncache.models.PutItemResponse;
import com.bhf.aeroncache.models.ClearCacheResponse;
import com.bhf.aeroncache.models.GetCacheResponse;
import com.bhf.aeroncache.models.BulkCacheOpsRequest;
import com.bhf.aeroncache.models.BulkCacheOpsResponse;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
Expand Down Expand Up @@ -97,6 +99,21 @@ public void testHttpErrorThrowsException() {
assertTrue(exception.getMessage().contains("Http Error: 500"));
}

@Test
public void testBulkOps() throws Exception {
stubFor(post(urlEqualTo("/api/v1/cache/bulkops"))
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("{\"requestId\":\"req-1\",\"operationResponses\":[]}")));

BulkCacheOpsResponse response = client.bulkOps(BulkCacheOpsRequest.builder()
.requestId("req-1")
.operations(java.util.Collections.emptyList())
.build());
assertNotNull(response);
assertEquals("req-1", response.getRequestId());
}

@Test
public void testAllowBusinessLogicHttpErrors() throws Exception {
stubFor(get(urlEqualTo("/api/v1/cache/test-cache/non-existent-key"))
Expand Down
38 changes: 37 additions & 1 deletion libraries/python/aeron_cache/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
DeleteCacheResponse,
GetCacheResponse,
ClearCacheResponse,
CacheUpdateEvent
CacheUpdateEvent,
BulkCacheOpsRequest,
BulkCacheOpsResponse,
BulkOperationType
)

class AeronCacheClient:
Expand Down Expand Up @@ -93,6 +96,22 @@ def delete_cache(self, cache_id) -> DeleteCacheResponse:
operationStatus=data.get('operationStatus')
)

def bulk_ops(self, request: BulkCacheOpsRequest) -> BulkCacheOpsResponse:
url = f"{self.base_url}/api/v1/cache/bulkops"
# Convert request to dict, handling Enum and Optional fields
payload = {
"requestId": request.requestId,
"operations": [
{k: (v.value if isinstance(v, BulkOperationType) else v)
for k, v in op.__dict__.items() if v is not None}
for op in request.operations
]
}
response = requests.post(url, json=payload)
if response.status_code >= 400 and response.status_code not in [400, 401, 404]:
response.raise_for_status()
return BulkCacheOpsResponse.from_dict(response.json())

# --- Async Operations ---

async def create_cache_async(self, cache_id) -> CreateResponse:
Expand Down Expand Up @@ -194,6 +213,23 @@ async def delete_cache_async(self, cache_id) -> DeleteCacheResponse:
operationStatus=data.get('operationStatus')
)

async def bulk_ops_async(self, request: BulkCacheOpsRequest) -> BulkCacheOpsResponse:
url = f"{self.base_url}/api/v1/cache/bulkops"
payload = {
"requestId": request.requestId,
"operations": [
{k: (v.value if isinstance(v, BulkOperationType) else v)
for k, v in op.__dict__.items() if v is not None}
for op in request.operations
]
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status >= 400 and response.status not in [400, 401, 404]:
response.raise_for_status()
data = await response.json()
return BulkCacheOpsResponse.from_dict(data)

# --- WebSocket ---

def get_cache(self, cache_id: str) -> EmbeddedAeronCache:
Expand Down
46 changes: 46 additions & 0 deletions libraries/python/aeron_cache/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,49 @@ def from_dict(cls, data: dict):
class ClearCacheResponse:
cacheId: str
operationStatus: str

from enum import Enum

class BulkOperationType(str, Enum):
NONE = "NONE"
CREATE_CACHE = "CREATE_CACHE"
ADD_ITEM = "ADD_ITEM"
REMOVE_ITEM = "REMOVE_ITEM"
CLEAR_CACHE = "CLEAR_CACHE"
GET_ITEM = "GET_ITEM"
DELETE_CACHE = "DELETE_CACHE"

@dataclass
class CacheOperationRequest:
operationType: BulkOperationType
requestId: str
cacheId: str
key: Optional[str] = None
value: Optional[str] = None
ttl: Optional[int] = None

@dataclass
class BulkCacheOpsRequest:
requestId: str
operations: list[CacheOperationRequest]

@dataclass
class CacheOperationResponse:
requestId: str
status: str
cacheId: str
key: Optional[str] = None
value: Optional[str] = None

@dataclass
class BulkCacheOpsResponse:
requestId: str
operationResponses: list[CacheOperationResponse]

@classmethod
def from_dict(cls, data: dict):
ops = [CacheOperationResponse(**op) for op in data.get('operationResponses', [])]
return cls(
requestId=data.get('requestId'),
operationResponses=ops
)
34 changes: 34 additions & 0 deletions libraries/python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,40 @@ def test_delete_cache(client):
response = client.delete_cache("test-cache")
assert response.cacheId == "test-cache"

@responses.activate
def test_bulk_ops(client):
from aeron_cache.models import BulkCacheOpsRequest, CacheOperationRequest, BulkOperationType

responses.add(
responses.POST,
"http://localhost:7070/api/v1/cache/bulkops",
json={
"requestId": "req-1",
"operationResponses": [
{"requestId": "op-1", "status": "SUCCESS", "cacheId": "test-cache", "key": "k1"}
]
},
status=200
)

request = BulkCacheOpsRequest(
requestId="req-1",
operations=[
CacheOperationRequest(
operationType=BulkOperationType.ADD_ITEM,
requestId="op-1",
cacheId="test-cache",
key="k1",
value="v1"
)
]
)

response = client.bulk_ops(request)
assert response.requestId == "req-1"
assert len(response.operationResponses) == 1
assert response.operationResponses[0].status == "SUCCESS"

@responses.activate
def test_http_error_throws_exception(client):
responses.add(
Expand Down
Loading
Loading