From 493123950a41058b56db7a3fb542f285362485da Mon Sep 17 00:00:00 2001 From: leocook Date: Sat, 25 Apr 2026 22:38:56 +0800 Subject: [PATCH] [Improvement-16754][DataX] Support DataX writer parameter batchSize Add batchSize parameter support for DataX task to control writer batch size. Backend: - Add batchSize field to DataxParameters (lombok @Data generates accessors) - Generate batchSize JSON config in DataxTask only when value > 0 - Add unit tests for batchSize parameter Frontend: - Add batchSize select with a static option list (0/1024/2048/4096/8192/16384/32768/65536/131072) - Default to 2048 to match the DataX upstream default; ClickHouse / Databend users can pick 65536 / 131072 from the dropdown - Add zh_CN / en_US i18n entries Closes #16754 --- .../plugin/task/datax/DataxParameters.java | 6 +++ .../plugin/task/datax/DataxTask.java | 4 ++ .../task/datax/DataxParametersTest.java | 14 ++++++ .../src/locales/en_US/project.ts | 1 + .../src/locales/zh_CN/project.ts | 1 + .../task/components/node/fields/use-datax.ts | 47 +++++++++++++++++++ 6 files changed, 73 insertions(+) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java index 9d9722bc6f64..07c20c320392 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java @@ -101,6 +101,11 @@ public class DataxParameters extends AbstractParameters { */ private int xmx; + /** + * writer batch size for DataX + */ + private int batchSize; + private List resourceList; @Override @@ -138,6 +143,7 @@ public String toString() { ", jobChannel=" + jobChannel + ", xms=" + xms + ", xmx=" + xmx + + ", batchSize=" + batchSize + ", resourceList=" + JSONUtils.toJsonString(resourceList) + '}'; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index a8c39a3a5027..e82666d53006 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -282,6 +282,10 @@ private List buildDataxJobContentJson() { } } + if (dataXParameters.getBatchSize() > 0) { + writerParam.put("batchSize", dataXParameters.getBatchSize()); + } + ObjectNode writer = JSONUtils.createObjectNode(); writer.put("name", DataxUtils.getWriterPluginName(dataxTaskExecutionContext.getTargetType())); writer.set("parameter", writerParam); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java index a20b5a169514..736d0aab941e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java @@ -91,12 +91,26 @@ public void testToString() { + "jobChannel=1, " + "xms=0, " + "xmx=-100, " + + "batchSize=0, " + "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]" + "}"; Assertions.assertEquals(expected, dataxParameters.toString()); } + @Test + public void testBatchSize() { + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setBatchSize(0); + Assertions.assertEquals(0, dataxParameters.getBatchSize()); + + dataxParameters.setBatchSize(2048); + Assertions.assertEquals(2048, dataxParameters.getBatchSize()); + + dataxParameters.setBatchSize(65536); + Assertions.assertEquals(65536, dataxParameters.getBatchSize()); + } + public String loadJvmEnvTest(DataxParameters dataXParameters) { int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 1e8f23bba55f..f01ea2d534b9 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -648,6 +648,7 @@ export default { datax_target_database_pre_sql: 'Pre SQL Statement', datax_target_database_post_sql: 'Post SQL Statement', datax_non_query_sql_tips: 'Please enter the non-query sql statement', + datax_writer_batch_size: 'Writer Batch Size', datax_job_speed_byte: 'Speed(Byte count)', datax_job_speed_byte_info: '(0 means unlimited)', datax_job_speed_record: 'Speed(Record count)', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 8876969f72d0..e5fa7cfc37f6 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -629,6 +629,7 @@ export default { datax_target_database_pre_sql: '目标库前置SQL', datax_target_database_post_sql: '目标库后置SQL', datax_non_query_sql_tips: '请输入非查询SQL语句', + datax_writer_batch_size: 'Writer 批量大小', datax_job_speed_byte: '限流(字节数)', datax_job_speed_byte_info: '(KB,0代表不限制)', datax_job_speed_record: '限流(记录数)', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts index cc38bca9b4b4..0e56939931cf 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts @@ -119,6 +119,45 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { } ] + const batchSizeOptions = [ + { + label: `0(${t('project.node.unlimited')})`, + value: 0 + }, + { + label: '1024', + value: 1024 + }, + { + label: '2048', + value: 2048 + }, + { + label: '4096', + value: 4096 + }, + { + label: '8192', + value: 8192 + }, + { + label: '16384', + value: 16384 + }, + { + label: '32768', + value: 32768 + }, + { + label: '65536', + value: 65536 + }, + { + label: '131072', + value: 131072 + } + ] + const sqlEditorSpan = ref(24) const jsonEditorSpan = ref(0) const datasourceSpan = ref(12) @@ -254,6 +293,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { autosize: { minRows: 1 } } }, + { + type: 'select', + field: 'batchSize', + name: t('project.node.datax_writer_batch_size'), + span: otherStatementSpan, + options: batchSizeOptions, + value: 2048 + }, { type: 'select', field: 'jobSpeedByte',