From 86ba02eec86d8386e4eaeb5c9db2de97397173d2 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 29 Aug 2025 13:34:14 +0530 Subject: [PATCH 1/3] SK-2258 synchronized errors --- .../vault/controller/VaultController.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java index 08d38aba..733be6e4 100644 --- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java +++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java @@ -77,13 +77,14 @@ public CompletableFuture bulkInsertAsync( setBearerToken(); configureInsertConcurrencyAndBatchSize(insertRequest.getValues().size()); com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig()); + ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); - List> futures = this.insertBatchFutures(request); + List errorRecords = new ArrayList<>(); + List> futures = this.insertBatchFutures(request, errorRecords, executor); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> { List successRecords = new ArrayList<>(); - List errorRecords = new ArrayList<>(); for (CompletableFuture future : futures) { com.skyflow.vault.data.InsertResponse futureResponse = future.join(); @@ -96,6 +97,7 @@ public CompletableFuture bulkInsertAsync( } } } + executor.shutdown(); // Shutdown the executor after all tasks are completed return new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, insertRequest.getValues()); }); @@ -113,9 +115,9 @@ private com.skyflow.vault.data.InsertResponse processSync( LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog()); List errorRecords = new ArrayList<>(); List successRecords = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); - List> futures = this.insertBatchFutures(insertRequest); - + List> futures = this.insertBatchFutures(insertRequest, errorRecords, executor); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); @@ -129,6 +131,7 @@ private com.skyflow.vault.data.InsertResponse processSync( errorRecords.addAll(futureResponse.getErrors()); } } + executor.shutdown(); // Shutdown the executor after all tasks are completed } com.skyflow.vault.data.InsertResponse response = new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, originalPayload); LogUtil.printInfoLog(InfoLogs.INSERT_REQUEST_RESOLVED.getLog()); @@ -137,11 +140,9 @@ private com.skyflow.vault.data.InsertResponse processSync( private List> insertBatchFutures( - com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest - ) { + com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, List errorRecords, ExecutorService executor) { List records = insertRequest.getRecords().get(); - ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); List> batches = Utils.createBatches(records, insertBatchSize); List> futures = new ArrayList<>(); @@ -152,7 +153,12 @@ private List> insertBat CompletableFuture future = CompletableFuture .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor) .thenApply(response -> formatResponse(response, batchNumber, insertBatchSize)) - .exceptionally(ex -> new com.skyflow.vault.data.InsertResponse(null, handleBatchException(ex, batch, batchNumber, batches))); + .exceptionally(ex -> { + synchronized (errorRecords){ + errorRecords.addAll(handleBatchException(ex, batch, batchNumber, batches)); + } + return null; + }); futures.add(future); } } finally { From a675ae20f86f896ed7b635a5dfc1c93de638c961 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 29 Aug 2025 14:05:19 +0530 Subject: [PATCH 2/3] SK-2258 fixed the index --- v3/src/main/java/com/skyflow/utils/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v3/src/main/java/com/skyflow/utils/Utils.java b/v3/src/main/java/com/skyflow/utils/Utils.java index e44395db..76edb9cd 100644 --- a/v3/src/main/java/com/skyflow/utils/Utils.java +++ b/v3/src/main/java/com/skyflow/utils/Utils.java @@ -125,7 +125,7 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons tokensMap.put(key, tokenList); } } - Success success = new Success(index, record.get(index).getSkyflowId().get(), tokensMap, null); + Success success = new Success(indexNumber, record.get(index).getSkyflowId().get(), tokensMap, null); successRecords.add(success); } indexNumber++; From e5d77336abc32e7172b526def01a9e3499a92ecc Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 29 Aug 2025 14:12:09 +0530 Subject: [PATCH 3/3] SK-2258 fixed the index --- .../vault/controller/VaultController.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java index 733be6e4..40093fb0 100644 --- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java +++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java @@ -77,14 +77,13 @@ public CompletableFuture bulkInsertAsync( setBearerToken(); configureInsertConcurrencyAndBatchSize(insertRequest.getValues().size()); com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig()); - ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); - List errorRecords = new ArrayList<>(); - List> futures = this.insertBatchFutures(request, errorRecords, executor); + List> futures = this.insertBatchFutures(request); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> { List successRecords = new ArrayList<>(); + List errorRecords = new ArrayList<>(); for (CompletableFuture future : futures) { com.skyflow.vault.data.InsertResponse futureResponse = future.join(); @@ -97,7 +96,6 @@ public CompletableFuture bulkInsertAsync( } } } - executor.shutdown(); // Shutdown the executor after all tasks are completed return new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, insertRequest.getValues()); }); @@ -115,9 +113,9 @@ private com.skyflow.vault.data.InsertResponse processSync( LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog()); List errorRecords = new ArrayList<>(); List successRecords = new ArrayList<>(); - ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); - List> futures = this.insertBatchFutures(insertRequest, errorRecords, executor); + List> futures = this.insertBatchFutures(insertRequest); + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); @@ -131,7 +129,6 @@ private com.skyflow.vault.data.InsertResponse processSync( errorRecords.addAll(futureResponse.getErrors()); } } - executor.shutdown(); // Shutdown the executor after all tasks are completed } com.skyflow.vault.data.InsertResponse response = new com.skyflow.vault.data.InsertResponse(successRecords, errorRecords, originalPayload); LogUtil.printInfoLog(InfoLogs.INSERT_REQUEST_RESOLVED.getLog()); @@ -140,9 +137,11 @@ private com.skyflow.vault.data.InsertResponse processSync( private List> insertBatchFutures( - com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, List errorRecords, ExecutorService executor) { + com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest + ) { List records = insertRequest.getRecords().get(); + ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); List> batches = Utils.createBatches(records, insertBatchSize); List> futures = new ArrayList<>(); @@ -153,12 +152,7 @@ private List> insertBat CompletableFuture future = CompletableFuture .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor) .thenApply(response -> formatResponse(response, batchNumber, insertBatchSize)) - .exceptionally(ex -> { - synchronized (errorRecords){ - errorRecords.addAll(handleBatchException(ex, batch, batchNumber, batches)); - } - return null; - }); + .exceptionally(ex -> new com.skyflow.vault.data.InsertResponse(null, handleBatchException(ex, batch, batchNumber, batches))); futures.add(future); } } finally { @@ -225,4 +219,4 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) { this.insertConcurrencyLimit = Math.min(Constants.INSERT_CONCURRENCY_LIMIT, maxConcurrencyNeeded); } } -} +} \ No newline at end of file