Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int xmx;

/**
* writer batch size for DataX
*/
private int batchSize;

private List<ResourceInfo> resourceList;

@Override
Expand Down Expand Up @@ -138,6 +143,7 @@ public String toString() {
", jobChannel=" + jobChannel +
", xms=" + xms +
", xmx=" + xmx +
", batchSize=" + batchSize +
", resourceList=" + JSONUtils.toJsonString(resourceList) +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ private List<ObjectNode> buildDataxJobContentJson() {
}
}

if (dataXParameters.getBatchSize() > 0) {
writerParam.put("batchSize", dataXParameters.getBatchSize());
}

ObjectNode writer = JSONUtils.createObjectNode();
writer.put("name", DataxUtils.getWriterPluginName(dataxTaskExecutionContext.getTargetType()));
writer.set("parameter", writerParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,26 @@ public void testToString() {
+ "jobChannel=1, "
+ "xms=0, "
+ "xmx=-100, "
+ "batchSize=0, "
+ "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"
+ "}";

Assertions.assertEquals(expected, dataxParameters.toString());
}

@Test
public void testBatchSize() {
DataxParameters dataxParameters = new DataxParameters();
dataxParameters.setBatchSize(0);
Assertions.assertEquals(0, dataxParameters.getBatchSize());

dataxParameters.setBatchSize(2048);
Assertions.assertEquals(2048, dataxParameters.getBatchSize());

dataxParameters.setBatchSize(65536);
Assertions.assertEquals(65536, dataxParameters.getBatchSize());
}

public String loadJvmEnvTest(DataxParameters dataXParameters) {
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ export default {
datax_target_database_pre_sql: 'Pre SQL Statement',
datax_target_database_post_sql: 'Post SQL Statement',
datax_non_query_sql_tips: 'Please enter the non-query sql statement',
datax_writer_batch_size: 'Writer Batch Size',
datax_job_speed_byte: 'Speed(Byte count)',
datax_job_speed_byte_info: '(0 means unlimited)',
datax_job_speed_record: 'Speed(Record count)',
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ export default {
datax_target_database_pre_sql: '目标库前置SQL',
datax_target_database_post_sql: '目标库后置SQL',
datax_non_query_sql_tips: '请输入非查询SQL语句',
datax_writer_batch_size: 'Writer 批量大小',
datax_job_speed_byte: '限流(字节数)',
datax_job_speed_byte_info: '(KB,0代表不限制)',
datax_job_speed_record: '限流(记录数)',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,45 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
}
]

const batchSizeOptions = [
{
label: `0(${t('project.node.unlimited')})`,
value: 0
},
{
label: '1024',
value: 1024
},
{
label: '2048',
value: 2048
},
{
label: '4096',
value: 4096
},
{
label: '8192',
value: 8192
},
{
label: '16384',
value: 16384
},
{
label: '32768',
value: 32768
},
{
label: '65536',
value: 65536
},
{
label: '131072',
value: 131072
}
]

const sqlEditorSpan = ref(24)
const jsonEditorSpan = ref(0)
const datasourceSpan = ref(12)
Expand Down Expand Up @@ -254,6 +293,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
autosize: { minRows: 1 }
}
},
{
type: 'select',
field: 'batchSize',
name: t('project.node.datax_writer_batch_size'),
span: otherStatementSpan,
options: batchSizeOptions,
value: 2048
},
{
type: 'select',
field: 'jobSpeedByte',
Expand Down
Loading