From d18280711d6b7bcffe0ff85b86a7978a4f5c05e0 Mon Sep 17 00:00:00 2001 From: "jiand.yuan001" Date: Fri, 27 Mar 2026 15:50:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E4=BB=BB=E5=8A=A1=E8=A2=AB=E5=93=AA=E4=B8=AA?= =?UTF-8?q?ak=E6=8B=BF=E8=B5=B0=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/sql/update.20260327.sql | 2 + .../ke/bella/batch/tables/QueueMetadata.java | 13 +- .../batch/tables/pojos/QueueMetadataDB.java | 19 +++ .../tables/records/QueueMetadataRecord.java | 130 +++++++++++------- .../BellaQueueAuthorizationInterceptor.java | 2 +- .../ke/bella/batch/service/QueueService.java | 11 ++ 6 files changed, 126 insertions(+), 51 deletions(-) create mode 100644 api/sql/update.20260327.sql diff --git a/api/sql/update.20260327.sql b/api/sql/update.20260327.sql new file mode 100644 index 0000000..747ac95 --- /dev/null +++ b/api/sql/update.20260327.sql @@ -0,0 +1,2 @@ +ALTER TABLE `queue_metadata` + ADD COLUMN `enable_take_log` TINYINT NOT NULL DEFAULT 0 AFTER `endpoint` COMMENT '是否开启打印take日志'; diff --git a/api/src/codegen/java/com/ke/bella/batch/tables/QueueMetadata.java b/api/src/codegen/java/com/ke/bella/batch/tables/QueueMetadata.java index 68c11a1..e89268b 100644 --- a/api/src/codegen/java/com/ke/bella/batch/tables/QueueMetadata.java +++ b/api/src/codegen/java/com/ke/bella/batch/tables/QueueMetadata.java @@ -17,7 +17,7 @@ import org.jooq.Identity; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Row9; +import org.jooq.Row10; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; @@ -64,6 +64,11 @@ public Class getRecordType() { */ public final TableField ENDPOINT = createField(DSL.name("endpoint"), SQLDataType.VARCHAR(256).nullable(false).defaultValue(DSL.inline("", SQLDataType.VARCHAR)), this, ""); + /** + * The column queue_metadata.enable_take_log. 是否开启打印take日志 + */ + public final TableField ENABLE_TAKE_LOG = createField(DSL.name("enable_take_log"), SQLDataType.INTEGER.nullable(false).defaultValue(DSL.inline("0", SQLDataType.INTEGER)), this, "是否开启打印take日志"); + /** * The column queue_metadata.cuid. */ @@ -174,11 +179,11 @@ public QueueMetadata rename(Name name) { } // ------------------------------------------------------------------------- - // Row9 type methods + // Row10 type methods // ------------------------------------------------------------------------- @Override - public Row9 fieldsRow() { - return (Row9) super.fieldsRow(); + public Row10 fieldsRow() { + return (Row10) super.fieldsRow(); } } diff --git a/api/src/codegen/java/com/ke/bella/batch/tables/pojos/QueueMetadataDB.java b/api/src/codegen/java/com/ke/bella/batch/tables/pojos/QueueMetadataDB.java index 0d8ebef..42fb6b3 100644 --- a/api/src/codegen/java/com/ke/bella/batch/tables/pojos/QueueMetadataDB.java +++ b/api/src/codegen/java/com/ke/bella/batch/tables/pojos/QueueMetadataDB.java @@ -21,6 +21,7 @@ public class QueueMetadataDB implements Operator, Serializable { private Long id; private String queue; private String endpoint; + private Integer enableTakeLog; private Long cuid; private Long muid; private String cuName; @@ -34,6 +35,7 @@ public QueueMetadataDB(QueueMetadataDB value) { this.id = value.id; this.queue = value.queue; this.endpoint = value.endpoint; + this.enableTakeLog = value.enableTakeLog; this.cuid = value.cuid; this.muid = value.muid; this.cuName = value.cuName; @@ -46,6 +48,7 @@ public QueueMetadataDB( Long id, String queue, String endpoint, + Integer enableTakeLog, Long cuid, Long muid, String cuName, @@ -56,6 +59,7 @@ public QueueMetadataDB( this.id = id; this.queue = queue; this.endpoint = endpoint; + this.enableTakeLog = enableTakeLog; this.cuid = cuid; this.muid = muid; this.cuName = cuName; @@ -106,6 +110,20 @@ public void setEndpoint(String endpoint) { this.endpoint = endpoint; } + /** + * Getter for queue_metadata.enable_take_log. 是否开启打印take日志 + */ + public Integer getEnableTakeLog() { + return this.enableTakeLog; + } + + /** + * Setter for queue_metadata.enable_take_log. 是否开启打印take日志 + */ + public void setEnableTakeLog(Integer enableTakeLog) { + this.enableTakeLog = enableTakeLog; + } + /** * Getter for queue_metadata.cuid. */ @@ -197,6 +215,7 @@ public String toString() { sb.append(id); sb.append(", ").append(queue); sb.append(", ").append(endpoint); + sb.append(", ").append(enableTakeLog); sb.append(", ").append(cuid); sb.append(", ").append(muid); sb.append(", ").append(cuName); diff --git a/api/src/codegen/java/com/ke/bella/batch/tables/records/QueueMetadataRecord.java b/api/src/codegen/java/com/ke/bella/batch/tables/records/QueueMetadataRecord.java index d754678..3419487 100644 --- a/api/src/codegen/java/com/ke/bella/batch/tables/records/QueueMetadataRecord.java +++ b/api/src/codegen/java/com/ke/bella/batch/tables/records/QueueMetadataRecord.java @@ -12,8 +12,8 @@ import org.jooq.Field; import org.jooq.Record1; -import org.jooq.Record9; -import org.jooq.Row9; +import org.jooq.Record10; +import org.jooq.Row10; import org.jooq.impl.UpdatableRecordImpl; @@ -21,7 +21,7 @@ * This class is generated by jOOQ. */ @SuppressWarnings({ "all", "unchecked", "rawtypes" }) -public class QueueMetadataRecord extends UpdatableRecordImpl implements Operator, Record9 { +public class QueueMetadataRecord extends UpdatableRecordImpl implements Operator, Record10 { private static final long serialVersionUID = 1L; @@ -67,88 +67,102 @@ public String getEndpoint() { return (String) get(2); } + /** + * Setter for queue_metadata.enable_take_log. 是否开启打印take日志 + */ + public void setEnableTakeLog(Integer value) { + set(3, value); + } + + /** + * Getter for queue_metadata.enable_take_log. 是否开启打印take日志 + */ + public Integer getEnableTakeLog() { + return (Integer) get(3); + } + /** * Setter for queue_metadata.cuid. */ public void setCuid(Long value) { - set(3, value); + set(4, value); } /** * Getter for queue_metadata.cuid. */ public Long getCuid() { - return (Long) get(3); + return (Long) get(4); } /** * Setter for queue_metadata.muid. */ public void setMuid(Long value) { - set(4, value); + set(5, value); } /** * Getter for queue_metadata.muid. */ public Long getMuid() { - return (Long) get(4); + return (Long) get(5); } /** * Setter for queue_metadata.cu_name. */ public void setCuName(String value) { - set(5, value); + set(6, value); } /** * Getter for queue_metadata.cu_name. */ public String getCuName() { - return (String) get(5); + return (String) get(6); } /** * Setter for queue_metadata.mu_name. */ public void setMuName(String value) { - set(6, value); + set(7, value); } /** * Getter for queue_metadata.mu_name. */ public String getMuName() { - return (String) get(6); + return (String) get(7); } /** * Setter for queue_metadata.ctime. */ public void setCtime(LocalDateTime value) { - set(7, value); + set(8, value); } /** * Getter for queue_metadata.ctime. */ public LocalDateTime getCtime() { - return (LocalDateTime) get(7); + return (LocalDateTime) get(8); } /** * Setter for queue_metadata.mtime. */ public void setMtime(LocalDateTime value) { - set(8, value); + set(9, value); } /** * Getter for queue_metadata.mtime. */ public LocalDateTime getMtime() { - return (LocalDateTime) get(8); + return (LocalDateTime) get(9); } // ------------------------------------------------------------------------- @@ -161,17 +175,17 @@ public Record1 key() { } // ------------------------------------------------------------------------- - // Record9 type implementation + // Record10 type implementation // ------------------------------------------------------------------------- @Override - public Row9 fieldsRow() { - return (Row9) super.fieldsRow(); + public Row10 fieldsRow() { + return (Row10) super.fieldsRow(); } @Override - public Row9 valuesRow() { - return (Row9) super.valuesRow(); + public Row10 valuesRow() { + return (Row10) super.valuesRow(); } @Override @@ -190,32 +204,37 @@ public Field field3() { } @Override - public Field field4() { - return QueueMetadata.QUEUE_METADATA.CUID; + public Field field4() { + return QueueMetadata.QUEUE_METADATA.ENABLE_TAKE_LOG; } @Override public Field field5() { + return QueueMetadata.QUEUE_METADATA.CUID; + } + + @Override + public Field field6() { return QueueMetadata.QUEUE_METADATA.MUID; } @Override - public Field field6() { + public Field field7() { return QueueMetadata.QUEUE_METADATA.CU_NAME; } @Override - public Field field7() { + public Field field8() { return QueueMetadata.QUEUE_METADATA.MU_NAME; } @Override - public Field field8() { + public Field field9() { return QueueMetadata.QUEUE_METADATA.CTIME; } @Override - public Field field9() { + public Field field10() { return QueueMetadata.QUEUE_METADATA.MTIME; } @@ -235,32 +254,37 @@ public String component3() { } @Override - public Long component4() { - return getCuid(); + public Integer component4() { + return getEnableTakeLog(); } @Override public Long component5() { + return getCuid(); + } + + @Override + public Long component6() { return getMuid(); } @Override - public String component6() { + public String component7() { return getCuName(); } @Override - public String component7() { + public String component8() { return getMuName(); } @Override - public LocalDateTime component8() { + public LocalDateTime component9() { return getCtime(); } @Override - public LocalDateTime component9() { + public LocalDateTime component10() { return getMtime(); } @@ -280,32 +304,37 @@ public String value3() { } @Override - public Long value4() { - return getCuid(); + public Integer value4() { + return getEnableTakeLog(); } @Override public Long value5() { + return getCuid(); + } + + @Override + public Long value6() { return getMuid(); } @Override - public String value6() { + public String value7() { return getCuName(); } @Override - public String value7() { + public String value8() { return getMuName(); } @Override - public LocalDateTime value8() { + public LocalDateTime value9() { return getCtime(); } @Override - public LocalDateTime value9() { + public LocalDateTime value10() { return getMtime(); } @@ -328,43 +357,49 @@ public QueueMetadataRecord value3(String value) { } @Override - public QueueMetadataRecord value4(Long value) { - setCuid(value); + public QueueMetadataRecord value4(Integer value) { + setEnableTakeLog(value); return this; } @Override public QueueMetadataRecord value5(Long value) { + setCuid(value); + return this; + } + + @Override + public QueueMetadataRecord value6(Long value) { setMuid(value); return this; } @Override - public QueueMetadataRecord value6(String value) { + public QueueMetadataRecord value7(String value) { setCuName(value); return this; } @Override - public QueueMetadataRecord value7(String value) { + public QueueMetadataRecord value8(String value) { setMuName(value); return this; } @Override - public QueueMetadataRecord value8(LocalDateTime value) { + public QueueMetadataRecord value9(LocalDateTime value) { setCtime(value); return this; } @Override - public QueueMetadataRecord value9(LocalDateTime value) { + public QueueMetadataRecord value10(LocalDateTime value) { setMtime(value); return this; } @Override - public QueueMetadataRecord values(Long value1, String value2, String value3, Long value4, Long value5, String value6, String value7, LocalDateTime value8, LocalDateTime value9) { + public QueueMetadataRecord values(Long value1, String value2, String value3, Integer value4, Long value5, Long value6, String value7, String value8, LocalDateTime value9, LocalDateTime value10) { value1(value1); value2(value2); value3(value3); @@ -374,6 +409,7 @@ public QueueMetadataRecord values(Long value1, String value2, String value3, Lon value7(value7); value8(value8); value9(value9); + value10(value10); return this; } @@ -391,12 +427,13 @@ public QueueMetadataRecord() { /** * Create a detached, initialised QueueMetadataRecord */ - public QueueMetadataRecord(Long id, String queue, String endpoint, Long cuid, Long muid, String cuName, String muName, LocalDateTime ctime, LocalDateTime mtime) { + public QueueMetadataRecord(Long id, String queue, String endpoint, Integer enableTakeLog, Long cuid, Long muid, String cuName, String muName, LocalDateTime ctime, LocalDateTime mtime) { super(QueueMetadata.QUEUE_METADATA); setId(id); setQueue(queue); setEndpoint(endpoint); + setEnableTakeLog(enableTakeLog); setCuid(cuid); setMuid(muid); setCuName(cuName); @@ -415,6 +452,7 @@ public QueueMetadataRecord(QueueMetadataDB value) { setId(value.getId()); setQueue(value.getQueue()); setEndpoint(value.getEndpoint()); + setEnableTakeLog(value.getEnableTakeLog()); setCuid(value.getCuid()); setMuid(value.getMuid()); setCuName(value.getCuName()); diff --git a/api/src/main/java/com/ke/bella/batch/api/interceptor/BellaQueueAuthorizationInterceptor.java b/api/src/main/java/com/ke/bella/batch/api/interceptor/BellaQueueAuthorizationInterceptor.java index c035ce8..1f4d1da 100644 --- a/api/src/main/java/com/ke/bella/batch/api/interceptor/BellaQueueAuthorizationInterceptor.java +++ b/api/src/main/java/com/ke/bella/batch/api/interceptor/BellaQueueAuthorizationInterceptor.java @@ -36,7 +36,7 @@ public boolean preHandle(@NotNull HttpServletRequest request, @NotNull HttpServl return false; } - BellaContext.setApikey(ApikeyInfo.builder().apikey(ak).build()); + BellaContext.setApikey(ApikeyInfo.builder().apikey(ak).code(apikeyInfo.getCode()).build()); BellaContext.setOperator(Operator.builder() .userId(apikeyInfo.getUserId()) .userName(apikeyInfo.getOwnerName()) diff --git a/api/src/main/java/com/ke/bella/batch/service/QueueService.java b/api/src/main/java/com/ke/bella/batch/service/QueueService.java index 905f6ec..9102c76 100644 --- a/api/src/main/java/com/ke/bella/batch/service/QueueService.java +++ b/api/src/main/java/com/ke/bella/batch/service/QueueService.java @@ -236,6 +236,17 @@ public Map> take(Take take) { tasksByQueue.forEach((queue, tasks) -> meterRegistry.counter("queue.task.take.total", "queue", queue).increment(tasks.size())); + + for (Map.Entry> entry : tasksByQueue.entrySet()) { + QueueMetadataDB meta = queueRepo.findMetadataByName(FullQueueName.valueOf(entry.getKey()).getQueueName()); + if(!Integer.valueOf(1).equals(meta.getEnableTakeLog())) { + continue; + } + List taskIds = entry.getValue().stream().map(Task::getTaskId).collect(Collectors.toList()); + log.info("Tasks taken, queue={}, akCode={}, taskIds={}, takedSize:{}", entry.getKey() + , BellaContext.getApikey().getCode(), taskIds, taskIds.size()); + } + return tasksByQueue; }