{@code
+ * ICustomUploader custom = new AzureBlobRestCustomUploader(containerUrl);
+ * IUploader uploader = CustomUploaderHelper.asUploader(custom);
+ *
+ * // Use Java-friendly methods directly on IUploader:
+ * CompletableFuture result = uploader.uploadAsyncJava(localSource);
+ * CompletableFuture batchResult = uploader.uploadManyAsyncJava(localSources);
+ *
+ * // Or pass to a client builder:
+ * QueuedIngestClient client = QueuedIngestClientBuilder.create(dmUrl)
+ * .withAuthentication(credential)
+ * .withUploader(uploader, true)
+ * .build();
+ * client.ingestAsyncJava(database, table, fileSource, properties).join();
+ * }
+ */
+public class AzureBlobRestCustomUploader implements ICustomUploader {
+
+ private final String containerUrlWithSas; // Container URL with SAS token from DM config
+ private final ExecutorService executor;
+ private boolean ignoreSizeLimit = false;
+
+ /**
+ * Creates a custom uploader using a container URL with SAS token.
+ *
+ * The container URL is obtained from DM cluster's config API:
+ * - ConfigurationResponse.containerSettings.containers[0].path
+ *
+ * @param containerUrlWithSas Full container URL including SAS token
+ * Example: https://account.blob.core.windows.net/container?sv=...&sig=...
+ */
+ public AzureBlobRestCustomUploader(String containerUrlWithSas) {
+ this.containerUrlWithSas = containerUrlWithSas;
+ this.executor = Executors.newFixedThreadPool(4);
+ }
+
+ @Override
+ public boolean getIgnoreSizeLimit() {
+ return ignoreSizeLimit;
+ }
+
+ @Override
+ public void setIgnoreSizeLimit(boolean value) {
+ this.ignoreSizeLimit = value;
+ }
+
+ @Override
+ public CompletableFutureThis sample shows simple, blocking examples that are easy to understand. + * For production use, consider using the async API with CompletableFuture to avoid + * blocking threads - see {@link #advancedAsyncIngestionExample()} for an example. + * + *
Queued ingestion is asynchronous on the server side and provides reliable,
+ * high-throughput data ingestion with operation tracking capabilities.
+ */
+public class QueuedIngestV2 {
+
+ private static String database;
+ private static String table;
+ private static String mappingName;
+ private static QueuedIngestClient queuedIngestClient;
+
+ public static void main(String[] args) {
+ try {
+ // Get configuration from system properties
+ String engineEndpoint = System.getProperty("clusterPath"); // "https:// These examples use blocking .get() calls for simplicity. In production,
+ * consider using the async API with CompletableFuture composition instead.
+ */
+ static void ingestFromStream() throws Exception {
+ System.out.println("\n=== Queued Ingestion from Streams ===");
+
+ String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/";
+
+ // Example 1: Ingest from in-memory CSV string
+ String csvData =
+ "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null";
+ InputStream csvInputStream =
+ new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array());
+
+ try {
+ StreamSource csvStreamSource = new StreamSource(csvInputStream, Format.csv);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting CSV data from string...");
+ // Note: ingestAsync returns a CompletableFuture. We call .get() to block and wait for completion.
+ // In production, you might compose futures instead of blocking - see advancedAsyncIngestionExample().
+ ExtendedIngestResponse response = queuedIngestClient
+ .ingestAsyncJava(database, table, csvStreamSource, csvProperties)
+ .get(2, TimeUnit.MINUTES);
+
+ System.out.println("CSV ingestion queued successfully. Operation ID: "
+ + response.getIngestResponse().getIngestionOperationId());
+ trackIngestionOperation(response, "CSV Stream");
+ } finally {
+ closeQuietly(csvInputStream);
+ }
+
+ // Example 2: Ingest from compressed CSV file
+ InputStream compressedCsvStream = new ByteArrayInputStream(
+ readResourceBytes(resourcesDirectory, "dataset.csv.gz"));
+
+ try {
+ StreamSource compressedStreamSource = new StreamSource(
+ compressedCsvStream,
+ Format.csv,
+ CompressionType.GZIP,
+ UUID.randomUUID(),
+ false);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting compressed CSV file...");
+ ExtendedIngestResponse response = queuedIngestClient
+ .ingestAsyncJava(database, table, compressedStreamSource, csvProperties)
+ .get(2, TimeUnit.MINUTES);
+
+ System.out.println("Compressed CSV ingestion queued successfully. Operation ID: "
+ + response.getIngestResponse().getIngestionOperationId());
+ trackIngestionOperation(response, "Compressed CSV Stream");
+ } finally {
+ closeQuietly(compressedCsvStream);
+ }
+
+ // Example 3: Ingest JSON with mapping
+ InputStream jsonStream = new ByteArrayInputStream(
+ readResourceBytes(resourcesDirectory, "dataset.json"));
+
+ try {
+ StreamSource jsonStreamSource = new StreamSource(jsonStream, Format.json);
+
+ IngestionMapping mapping = new IngestionMapping(
+ mappingName, IngestionMapping.IngestionMappingType.JSON);
+ IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create()
+ .withIngestionMapping(mapping)
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting JSON file with mapping...");
+ ExtendedIngestResponse response = queuedIngestClient
+ .ingestAsyncJava(database, table, jsonStreamSource, jsonProperties)
+ .get(2, TimeUnit.MINUTES);
+
+ System.out.println("JSON ingestion queued successfully. Operation ID: "
+ + response.getIngestResponse().getIngestionOperationId());
+ trackIngestionOperation(response, "JSON Stream");
+ } finally {
+ closeQuietly(jsonStream);
+ }
+ }
+
+ /**
+ * Demonstrates ingestion from file sources including:
+ * - CSV file
+ * - Compressed JSON file with mapping
+ *
+ * These examples use blocking .get() calls for simplicity.
+ */
+ static void ingestFromFile() throws Exception {
+ System.out.println("\n=== Queued Ingestion from Files ===");
+
+ String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/";
+
+ // Example 1: Ingest CSV file
+ FileSource csvFileSource = new FileSource(
+ Paths.get(resourcesDirectory + "dataset.csv"), Format.csv);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting CSV file...");
+ ExtendedIngestResponse csvResponse = queuedIngestClient
+ .ingestAsyncJava(database, table, csvFileSource, csvProperties)
+ .get(2, TimeUnit.MINUTES);
+
+ System.out.println("CSV file ingestion queued successfully. Operation ID: "
+ + csvResponse.getIngestResponse().getIngestionOperationId());
+ trackIngestionOperation(csvResponse, "CSV File");
+
+ // Example 2: Ingest compressed JSON file with mapping
+ FileSource jsonFileSource = new FileSource(
+ Paths.get(resourcesDirectory + "dataset.jsonz.gz"),
+ Format.json,
+ UUID.randomUUID(),
+ CompressionType.GZIP);
+
+ IngestionMapping mapping = new IngestionMapping(
+ mappingName, IngestionMapping.IngestionMappingType.JSON);
+ IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create()
+ .withIngestionMapping(mapping)
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting compressed JSON file with mapping...");
+ ExtendedIngestResponse jsonResponse = queuedIngestClient
+ .ingestAsyncJava(database, table, jsonFileSource, jsonProperties)
+ .get(2, TimeUnit.MINUTES);
+
+ System.out.println("Compressed JSON file ingestion queued successfully. Operation ID: "
+ + jsonResponse.getIngestResponse().getIngestionOperationId());
+ trackIngestionOperation(jsonResponse, "Compressed JSON File");
+ }
+
+ /**
+ * Demonstrates batch ingestion from multiple blob sources in a single operation.
+ *
+ * IMPORTANT: Multi-source ingestion only accepts BlobSource. For local sources
+ * (FileSource, StreamSource), you must either:
+ * This example uses public blob URLs from the Kusto sample files.
+ */
+ static void ingestMultipleSources() throws Exception {
+ System.out.println("\n=== Queued Ingestion from Multiple Blob Sources (Batch) ===");
+
+ // IMPORTANT: All sources in a batch must have the same format!
+ BlobSource blob1 = new BlobSource(
+ "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json",
+ Format.json,
+ UUID.randomUUID(),
+ CompressionType.NONE);
+
+ BlobSource blob2 = new BlobSource(
+ "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json",
+ Format.json,
+ UUID.randomUUID(),
+ CompressionType.NONE);
+
+ List This pattern is recommended for production use when you need to:
+ * The key difference from the simple examples is that we compose futures
+ * instead of calling .get() immediately, allowing the operations to run concurrently.
+ */
+ static void advancedAsyncIngestionExample() throws Exception {
+ System.out.println("\n=== Advanced Async Ingestion Example ===");
+
+ String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/";
+
+ // Create multiple sources to ingest concurrently
+ FileSource csvFile = new FileSource(
+ Paths.get(resourcesDirectory + "dataset.csv"), Format.csv);
+ FileSource jsonFile = new FileSource(
+ Paths.get(resourcesDirectory + "dataset.jsonz.gz"),
+ Format.json,
+ UUID.randomUUID(),
+ CompressionType.GZIP);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ IngestionMapping mapping = new IngestionMapping(
+ mappingName, IngestionMapping.IngestionMappingType.JSON);
+ IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create()
+ .withIngestionMapping(mapping)
+ .withEnableTracking(true)
+ .build();
+
+ // Start both ingestions concurrently - don't call .get() yet!
+ System.out.println("Starting concurrent ingestion of CSV and JSON files...");
+
+ CompletableFuture Pattern:
+ * NOTE: This example shows both source creation with defaults and source creation with full control
+ * StreamSource defaults: compression=NONE, sourceId=auto-generated, baseName=null, leaveOpen=false
+ */
+ static void ingestFromStream() throws Exception {
+ System.out.println("\n=== Ingesting from Streams ===");
+
+ // Example 1: Ingest from in-memory CSV string
+ String csvData = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null";
+ InputStream csvInputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array());
+
+ StreamSource csvStreamSource = new StreamSource(
+ csvInputStream,
+ Format.csv,
+ CompressionType.NONE,
+ UUID.randomUUID(),
+ false);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting CSV data from string...");
+ ExtendedIngestResponse ingestResponse = streamingIngestClient.ingestAsyncJava(database, table, csvStreamSource, csvProperties).get();
+ System.out.println(
+ "CSV ingestion completed. Operation ID: "
+ + ingestResponse.getIngestResponse().getIngestionOperationId());
+
+ // Example 2: Ingest from compressed CSV file
+ String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/";
+ FileInputStream compressedCsvStream = new FileInputStream(resourcesDirectory + "dataset.csv.gz");
+
+ StreamSource compressedStreamSource = new StreamSource(
+ compressedCsvStream,
+ Format.csv,
+ CompressionType.GZIP,
+ UUID.randomUUID(),
+ false);
+ System.out.println("Ingesting compressed CSV file...");
+ ExtendedIngestResponse compressedResponse = streamingIngestClient.ingestAsyncJava(database, table, compressedStreamSource, csvProperties).get();
+ System.out.println(
+ "Compressed CSV ingestion completed. Operation ID: "
+ + compressedResponse.getIngestResponse().getIngestionOperationId());
+ compressedCsvStream.close();
+
+ // Example 3: Ingest JSON with mapping
+ // Demonstrating minimal parameters for quick prototyping
+ FileInputStream jsonStream = new FileInputStream(resourcesDirectory + "dataset.json");
+
+ StreamSource jsonStreamSource = new StreamSource(
+ jsonStream,
+ Format.json);
+
+ IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON);
+ IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create()
+ .withIngestionMapping(new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON))
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting JSON file with mapping...");
+ ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsyncJava(database, table, jsonStreamSource, jsonProperties).get();
+ System.out.println(
+ "JSON ingestion completed. Operation ID: "
+ + jsonResponse.getIngestResponse().getIngestionOperationId());
+ jsonStream.close();
+ }
+
+ /**
+ * Demonstrates ingestion from file sources including: - CSV file - Compressed JSON file with
+ * mapping
+ *
+ * NOTE: This example shows both source creation with defaults and source creation with full control.
+ * FileSource defaults: sourceId=auto-generated, compression=auto-detected from extension, baseName=from-filename
+ */
+ static void ingestFromFile() throws Exception {
+ System.out.println("\n=== Ingesting from Files ===");
+
+ String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/";
+
+ // Example 1: Ingest CSV file using with defaults
+ // Only providing required parameters: path and format
+ // Defaults: sourceId=auto-generated, compression=auto-detected (NONE for .csv), baseName="dataset.csv"
+ FileSource csvFileSource =
+ new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv);
+
+ IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create()
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting CSV file...");
+ ExtendedIngestResponse csvResponse = streamingIngestClient.ingestAsyncJava(database, table, csvFileSource, csvProperties).get();
+ System.out.println(
+ "CSV file ingestion completed. Operation ID: "
+ + csvResponse.getIngestResponse().getIngestionOperationId());
+
+ // Example 2: Ingest compressed JSON file with mapping
+ FileSource jsonFileSource = new FileSource(
+ Paths.get(resourcesDirectory + "dataset.jsonz.gz"),
+ Format.json,
+ UUID.randomUUID(),
+ CompressionType.GZIP);
+
+ IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create()
+ .withIngestionMapping(new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON))
+ .withEnableTracking(true)
+ .build();
+
+ System.out.println("Ingesting compressed JSON file with mapping...");
+ ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsyncJava(database, table, jsonFileSource, jsonProperties).get();
+ System.out.println(
+ "Compressed JSON file ingestion completed. Operation ID: "
+ + jsonResponse.getIngestResponse().getIngestionOperationId());
+ }
+}
diff --git a/samples/src/main/resources/create-table.kql b/samples/src/main/resources/create-table.kql
new file mode 100644
index 000000000..891f650fb
--- /dev/null
+++ b/samples/src/main/resources/create-table.kql
@@ -0,0 +1,88 @@
+// KQL Table Creation Script for the sample dataset
+// This script creates a table matching the schema of dataset.csv
+
+.create table SampleData (
+ rownumber: int,
+ rowguid: guid,
+ xdouble: real,
+ xfloat: real,
+ xbool: bool,
+ xint16: int,
+ xint32: int,
+ xint64: long,
+ xuint8: int,
+ xuint16: int,
+ xuint32: long,
+ xuint64: long,
+ xdate: datetime,
+ xsmalltext: string,
+ xtext: string,
+ xnumberAsText: string,
+ xtime: timespan,
+ xtextWithNulls: string,
+ xdynamicWithNulls: dynamic
+)
+
+// Enable streaming ingestion (optional, for better performance)
+.alter table SampleData policy streamingingestion enable
+
+// Create an ingestion mapping for CSV
+.create table SampleData ingestion csv mapping 'SampleDataMapping'
+```
+[
+ {"column": "rownumber", "Properties": {"Ordinal": "0"}},
+ {"column": "rowguid", "Properties": {"Ordinal": "1"}},
+ {"column": "xdouble", "Properties": {"Ordinal": "2"}},
+ {"column": "xfloat", "Properties": {"Ordinal": "3"}},
+ {"column": "xbool", "Properties": {"Ordinal": "4"}},
+ {"column": "xint16", "Properties": {"Ordinal": "5"}},
+ {"column": "xint32", "Properties": {"Ordinal": "6"}},
+ {"column": "xint64", "Properties": {"Ordinal": "7"}},
+ {"column": "xuint8", "Properties": {"Ordinal": "8"}},
+ {"column": "xuint16", "Properties": {"Ordinal": "9"}},
+ {"column": "xuint32", "Properties": {"Ordinal": "10"}},
+ {"column": "xuint64", "Properties": {"Ordinal": "11"}},
+ {"column": "xdate", "Properties": {"Ordinal": "12"}},
+ {"column": "xsmalltext", "Properties": {"Ordinal": "13"}},
+ {"column": "xtext", "Properties": {"Ordinal": "14"}},
+ {"column": "xnumberAsText", "Properties": {"Ordinal": "15"}},
+ {"column": "xtime", "Properties": {"Ordinal": "16"}},
+ {"column": "xtextWithNulls", "Properties": {"Ordinal": "17"}},
+ {"column": "xdynamicWithNulls", "Properties": {"Ordinal": "18"}}
+]
+```
+
+// Create an ingestion mapping for JSON (identity mapping - JSON property names match column names)
+.create table SampleData ingestion json mapping 'SampleDataMapping'
+```
+[
+ {"column": "rownumber", "Properties": {"Path": "$.rownumber"}},
+ {"column": "rowguid", "Properties": {"Path": "$.rowguid"}},
+ {"column": "xdouble", "Properties": {"Path": "$.xdouble"}},
+ {"column": "xfloat", "Properties": {"Path": "$.xfloat"}},
+ {"column": "xbool", "Properties": {"Path": "$.xbool"}},
+ {"column": "xint16", "Properties": {"Path": "$.xint16"}},
+ {"column": "xint32", "Properties": {"Path": "$.xint32"}},
+ {"column": "xint64", "Properties": {"Path": "$.xint64"}},
+ {"column": "xuint8", "Properties": {"Path": "$.xuint8"}},
+ {"column": "xuint16", "Properties": {"Path": "$.xuint16"}},
+ {"column": "xuint32", "Properties": {"Path": "$.xuint32"}},
+ {"column": "xuint64", "Properties": {"Path": "$.xuint64"}},
+ {"column": "xdate", "Properties": {"Path": "$.xdate"}},
+ {"column": "xsmalltext", "Properties": {"Path": "$.xsmalltext"}},
+ {"column": "xtext", "Properties": {"Path": "$.xtext"}},
+ {"column": "xnumberAsText", "Properties": {"Path": "$.xnumberAsText"}},
+ {"column": "xtime", "Properties": {"Path": "$.xtime"}},
+ {"column": "xtextWithNulls", "Properties": {"Path": "$.xtextWithNulls"}},
+ {"column": "xdynamicWithNulls", "Properties": {"Path": "$.xdynamicWithNulls"}}
+]
+```
+
+// Alternative: Create a table with retention policy (example: 90 days)
+// .alter-merge table SampleData policy retention softdelete = 90d
+
+// Query examples after data ingestion:
+// SampleData | take 10
+// SampleData | where xbool == true | project rownumber, xtext, xdate
+// SampleData | extend ParsedJson = parse_json(xdynamicWithNulls) | project rownumber, ParsedJson.rowId, ParsedJson.arr
+
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
+ static void ingestMultipleLocalFilesViaBlobUpload(
+ String engineEndpoint, ChainedTokenCredential credential) throws Exception {
+ System.out.println("\n=== Queued Ingestion: Upload Local Files to Blob, Then Ingest ===");
+
+ // Step 1: Create configuration cache (needed for ManagedUploader)
+ String dmUrl = IngestClientBase.getIngestionEndpoint(engineEndpoint);
+ if (dmUrl == null) {
+ dmUrl = engineEndpoint;
+ }
+
+ ConfigurationCache configCache = DefaultConfigurationCache.create(
+ dmUrl,
+ credential,
+ new ClientDetails("QueuedIngestV2Sample", "1.0", "ingest-v2-sample"));
+
+ // Step 2: Create ManagedUploader
+ ManagedUploader uploader = ManagedUploader.builder()
+ .withConfigurationCache(configCache)
+ .withRetryPolicy(new SimpleRetryPolicy())
+ .withMaxConcurrency(10)
+ .withMaxDataSize(4L * 1024 * 1024 * 1024) // 4GB max size
+ .withUploadMethod(UploadMethod.STORAGE)
+ .withTokenCredential(credential)
+ .build();
+
+ try {
+ System.out.println("ManagedUploader created for batch upload");
+
+ // Step 3: Prepare local files (all same format - CSV)
+ String resourcesDirectory = System.getProperty("user.dir") + "/src/main/resources/";
+ FileSource file1 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv);
+ FileSource file2 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv.gz"), Format.csv);
+ List