From f6c79b2f17dc2d3c335cd6c1a110a7c3aecf27b8 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sun, 29 Mar 2026 03:07:39 +0000 Subject: [PATCH 1/2] update --- zh_CN/sql-reference/statements/alter.md | 1 + zh_CN/sql-reference/statements/create.md | 48 +++++++++++++++++++----- zh_CN/sql-reference/statements/drop.md | 3 +- zh_CN/sql-reference/statements/show.md | 2 +- zh_CN/streaming/connectors.md | 4 +- zh_CN/streaming/format.md | 38 ++++++++++++++++--- zh_CN/streaming/http.md | 21 ++++++----- zh_CN/streaming/kafka.md | 8 ++-- zh_CN/streaming/model.md | 11 ++++-- zh_CN/streaming/mqtt.md | 8 ++-- zh_CN/streaming/overview.md | 13 +++++-- zh_CN/streaming/quick-start.md | 11 ++++-- zh_CN/streaming/use_cases.md | 28 +++++++------- 13 files changed, 135 insertions(+), 61 deletions(-) diff --git a/zh_CN/sql-reference/statements/alter.md b/zh_CN/sql-reference/statements/alter.md index 104a461..1cfb20e 100644 --- a/zh_CN/sql-reference/statements/alter.md +++ b/zh_CN/sql-reference/statements/alter.md @@ -80,3 +80,4 @@ ALTER PIPELINE pipeline_name RESTART; - 目前不支持 `ALTER SOURCE`。 - 目前不支持修改 pipeline 的 SQL、source 或 sink 定义;如需调整这类内容,请重新创建对象。 +- 如需删除一个正在运行的 pipeline,请先执行 `ALTER PIPELINE ... STOP`。 diff --git a/zh_CN/sql-reference/statements/create.md b/zh_CN/sql-reference/statements/create.md index 9771312..f095127 100644 --- a/zh_CN/sql-reference/statements/create.md +++ b/zh_CN/sql-reference/statements/create.md @@ -74,23 +74,37 @@ with (ttl='10d') ```SQL CREATE SOURCE source_name ( - column_name data_type [NOT NULL], - ... + source_field, + ..., + [WATERMARK FOR event_time_column [AS expr]] ) WITH ( connector='kafka|mqtt|http', - format='json|csv', + format='json|csv|parquet', key='value', ... ) ``` +其中 `source_field` 支持三种写法: + +```SQL +column_name data_type +column_name data_type METADATA FROM 'metadata_key' +column_name data_type AS expr +``` + +- 第一种是普通的 physical field,直接从消息 payload 中解码得到。 +- 第二种是 metadata field,从 connector 读取的消息元信息中提取,常见的 metadata 包括 Kafka 的 topic、partition、offset,HTTP 的 header 等。`METADATA FROM` 的 key 取决于具体 connector,且 metadata 列类型必须与该 key 的类型完全一致。 +- 第三种是 computed field,基于 physical field 和 metadata field 定义的计算。Computed field 只能基于 physical field 和 metadata field 定义,不能基于其他 computed field 定义。 + 示例 ```SQL CREATE SOURCE src_kafka ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, - value FLOAT64 + ts TIMESTAMP(9), + source_topic STRING METADATA FROM 'topic', + value_label STRING AS source_topic, + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( connector='kafka', brokers='127.0.0.1:9092', @@ -102,14 +116,29 @@ CREATE SOURCE src_kafka ( 说明 -- `WITH (...)` 为必填,且不能为空。 - source 至少需要定义一个列。 +- source field 不支持列选项,例如 `NOT NULL`、`DEFAULT` 等。 +- `WITH (...)` 为必填,且不能为空。 - `CREATE SOURCE` 暂不支持 `IF NOT EXISTS`。 - connector 配置项取决于具体 connector,详见 [Connectors 概述](../../streaming/connectors.md)。 +#### Watermark + +Watermark 用于声明 source 的事件时间语义。它由 `CREATE SOURCE` 中的 `WATERMARK FOR ...` 子句定义,系统会基于该表达式在运行时持续生成 watermark signal,并传递给下游算子。 + +最简单的写法是直接把某个时间列声明为事件时间列:`WATERMARK FOR ts`。也可以显式给出 watermark 表达式,例如 `WATERMARK FOR ts AS ts - INTERVAL '5' SECOND`,表示 watermark 信号比事件时间落后 5 秒。 + +当前版本的约束如下: + +- 一个 source 至多定义一个 watermark +- `WATERMARK FOR ` 中的事件时间列必须来自 physical field 或 metadata field,不能是 computed field +- Watermark 表达式也只能基于 physical field 或 metadata field 构建 +- Watermark 表达式的结果类型必须是 `TIMESTAMP` + ### 创建 Pipeline -`CREATE PIPELINE` 用于创建持续运行的流任务。pipeline 从一个 source 读取数据,执行实时计算,然后把结果写入一个已有的 sink table。 +`CREATE PIPELINE` 用于创建持续运行的流任务。pipeline 从一个 source +读取数据,执行实时计算,然后把结果写入一个已有的 sink table。 语法 @@ -138,7 +167,8 @@ WHERE value >= 2.0; - 当前一个 pipeline 只能引用一个 source。 - 当前仅支持投影与过滤,不支持 join、聚合、窗口、排序、limit、union、子查询等更复杂算子。 - sink table 必须已经存在,且必须是 TimeSeries 表。 -- 查询输出列必须与 sink table 的列名和类型兼容;sink table 中无默认值的非空列必须出现在查询输出中。 +- sink table 中无默认值的非空列必须出现在查询输出中。 +- 查询输出列必须与 sink table 的列名和类型兼容;当类型可转换时,系统会自动补充必要的 cast。如果不兼容,则会报错。 - 执行 `CREATE PIPELINE` 时,当前用户需要对 source 具备 `SELECT` 权限,并对 sink table 具备 `INSERT` 权限。 ### 建表时声明索引(INVERTED / VECTOR) diff --git a/zh_CN/sql-reference/statements/drop.md b/zh_CN/sql-reference/statements/drop.md index 28732bc..42380e4 100644 --- a/zh_CN/sql-reference/statements/drop.md +++ b/zh_CN/sql-reference/statements/drop.md @@ -66,7 +66,8 @@ DROP PIPELINE [IF EXISTS] [db.]pipeline_name 说明 -- 删除 pipeline 时,系统会尝试先停止正在运行的任务,然后再删除元数据。 +- 只有处于 `Stopped` 或 `Failed` 状态的 pipeline 才允许删除。 +- 如果 pipeline 仍在运行,请先执行 `ALTER PIPELINE ... STOP`,再执行 `DROP PIPELINE`。 - `IF EXISTS` 可用于忽略不存在对象的报错。 ### DROP NODE diff --git a/zh_CN/sql-reference/statements/show.md b/zh_CN/sql-reference/statements/show.md index 965cbe8..46e693b 100644 --- a/zh_CN/sql-reference/statements/show.md +++ b/zh_CN/sql-reference/statements/show.md @@ -47,7 +47,7 @@ SHOW SOURCES SHOW PIPELINES ``` -返回结果包含 pipeline 名称、pipeline ID、source、sink、分配节点、运行状态、最后错误和创建时间等信息。 +返回结果包含 pipeline 名称、pipeline ID、source、sink、运行状态、运行时长、创建时间等信息;在集群模式下还会展示分配节点等集群相关列。 说明: diff --git a/zh_CN/streaming/connectors.md b/zh_CN/streaming/connectors.md index fc41464..20fb02d 100644 --- a/zh_CN/streaming/connectors.md +++ b/zh_CN/streaming/connectors.md @@ -29,6 +29,7 @@ Kafka connector 适合持续消费 topic 中的结构化事件流。 - 文档入口:[Kafka Connector](./kafka.md) - format:`json`、`csv` +- metadata:`topic`、`partition`、`offset` ### MQTT @@ -36,10 +37,11 @@ MQTT connector 适合订阅设备、网关或边缘服务上报的主题消息 - 文档入口:[MQTT Connector](./mqtt.md) - format:`json`、`csv` +- metadata:`topic` ### HTTP HTTP connector 适合按固定周期轮询第三方 API 或内部 HTTP 服务。 - 文档入口:[HTTP Connector](./http.md) -- format:`json`、`csv` +- format:`json`、`csv`、`parquet` diff --git a/zh_CN/streaming/format.md b/zh_CN/streaming/format.md index d962da1..37b6a2c 100644 --- a/zh_CN/streaming/format.md +++ b/zh_CN/streaming/format.md @@ -13,6 +13,7 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。 | --- | --- | --- | | JSON | Kafka、MQTT、HTTP | 适合结构化事件消息 | | CSV | Kafka、MQTT、HTTP | 适合简单表格型文本或按行输入 | +| Parquet | HTTP | 适合一次性抓取或轮询返回的完整 Parquet 文件 | ## 通用规则 @@ -26,7 +27,8 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。 说明: - `bad_data` 仅对 source 生效 -- 当前按逐行方式解码消息,适合 newline-delimited JSON 和按行 CSV +- `json` 和 `csv` 默认按逐行方式解码,适合 newline-delimited JSON 和按行 CSV +- `parquet` 按整个 payload 解码为一份完整 Parquet 文件 ## JSON @@ -53,8 +55,8 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。 ```sql CREATE SOURCE src_json ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='kafka', @@ -92,8 +94,8 @@ CREATE SOURCE src_json ( ```sql CREATE SOURCE src_csv ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='http', @@ -106,12 +108,38 @@ CREATE SOURCE src_csv ( ); ``` +## Parquet + +### Parquet 特点 + +- 适合 HTTP connector 返回的完整 Parquet 文件 +- 更适合批量导入型 source,而不是逐行文本流 +- 压缩信息由 Parquet 文件自身 metadata 描述,无需额外配置 + +### Parquet 配置示例 + +```sql +CREATE SOURCE src_parquet ( + ts TIMESTAMP(9), + sid STRING, + value FLOAT64 +) WITH ( + connector='http', + endpoint='http://127.0.0.1:18080/export.parquet', + method='GET', + poll='once', + format='parquet', + bad_data='fail' +); +``` + ## 选型建议 | 格式 | 推荐场景 | 不足 | | --- | --- | --- | | JSON | 结构化事件、MQTT / Kafka 消息 | 文本体积通常更大 | | CSV | 简单行式数据、HTTP 接口文本返回 | 字段可读性和扩展性较弱 | +| Parquet | HTTP 返回的列式文件、批量抓取 | 当前仅支持 HTTP connector | ## 常见问题 diff --git a/zh_CN/streaming/http.md b/zh_CN/streaming/http.md index b79dd5d..074d541 100644 --- a/zh_CN/streaming/http.md +++ b/zh_CN/streaming/http.md @@ -21,7 +21,7 @@ HTTP connector 通过单次或持续轮询 HTTP endpoint,将返回内容作为 | `endpoint` | STRING | 无 | Yes | 轮询地址,支持 `${...}` 时间变量 | | `method` | STRING | `get` | No | HTTP 方法,支持 `GET` 和 `POST` | | `poll` | STRING | `once` | No | 轮询模式,支持 `once` 或 `interval()` | -| `timeout` | INT | 无 | No | 请求超时时间,单位毫秒 | +| `timeout` | INT | 无 | No | 请求超时时间,单位秒 | | `headers` | STRING | 无 | No | 请求头,格式为 `k1:v1;k2:v2` | | `auth_type` | STRING | `none` | No | 鉴权类型,支持 `none` 和 `basic_auth` | | `username` | STRING | 无 | No | Basic Auth 用户名 | @@ -32,7 +32,7 @@ HTTP connector 通过单次或持续轮询 HTTP endpoint,将返回内容作为 - `auth_type='none'`:表示不使用鉴权。此时不能输入 `username`、`password`,否则会报错。 - `auth_type='basic_auth'`:表示使用 Basic Auth。此时必须同时输入 `username` 和 `password`。 -Format 相关配置请参考 [Formats](./format.md)。 +Format 相关配置请参考 [Formats](./format.md)。其中 `parquet` 目前仅支持 `http` connector。 ## endpoint 支持的时间变量 @@ -147,16 +147,16 @@ CREATE DATABASE stream_demo_http; USE stream_demo_http; CREATE TABLE sink_http_once ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries PARTITION BY HASH(sid) PARTITIONS 1; CREATE SOURCE src_http_once ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='http', @@ -183,16 +183,16 @@ SELECT ts, sid, value FROM sink_http_once ORDER BY ts; ```sql CREATE TABLE sink_http_poll ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries PARTITION BY HASH(sid) PARTITIONS 1; CREATE SOURCE src_http_poll ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='http', @@ -220,3 +220,4 @@ SELECT ts, sid, value FROM sink_http_poll ORDER BY ts; - 当前 HTTP connector 仅支持作为 source,不支持作为 sink - `poll='once'` 适合一次性抓取,`poll='interval(...)'` 适合持续轮询 +- `format='parquet'` 目前仅支持 `http` connector,且按整个 payload 解码一份完整 Parquet 文件 diff --git a/zh_CN/streaming/kafka.md b/zh_CN/streaming/kafka.md index 4a4a111..a0c9d80 100644 --- a/zh_CN/streaming/kafka.md +++ b/zh_CN/streaming/kafka.md @@ -71,16 +71,16 @@ CREATE DATABASE stream_demo; USE stream_demo; CREATE TABLE sink_t ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries PARTITION BY HASH(sid) PARTITIONS 1; CREATE SOURCE src_kafka ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='kafka', diff --git a/zh_CN/streaming/model.md b/zh_CN/streaming/model.md index b7fccba..b57926f 100644 --- a/zh_CN/streaming/model.md +++ b/zh_CN/streaming/model.md @@ -35,9 +35,11 @@ external system -> source -> pipeline -> sink table ```sql CREATE SOURCE src_mqtt ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, - value FLOAT64 + ts TIMESTAMP(9), + source_topic STRING METADATA FROM 'topic', + topic_tag STRING AS source_topic, + value FLOAT64, + WATERMARK FOR ts ) WITH ( connector='mqtt', broker='127.0.0.1:1883', @@ -84,7 +86,7 @@ sink 不是独立对象,而是 Datalayers 中已存在的一张内部表。当 - sink table 必须事先创建 - sink table 必须使用 `TimeSeries` 引擎 -- pipeline 输出列名和类型必须与 sink table 严格兼容 +- pipeline 输出列名和类型必须与 sink table 兼容;当类型可转换时,系统会自动补充 cast - sink table 中非空且没有默认值的列,必须出现在 pipeline 输出里 这意味着在设计 sink table 时,应先确定 pipeline 输出 schema,再创建表结构。 @@ -104,5 +106,6 @@ sink 不是独立对象,而是 Datalayers 中已存在的一张内部表。当 ```sql ALTER PIPELINE p1 STOP; ALTER PIPELINE p1 RESTART; +ALTER PIPELINE p1 STOP; DROP PIPELINE p1; ``` diff --git a/zh_CN/streaming/mqtt.md b/zh_CN/streaming/mqtt.md index d50aed2..db10984 100644 --- a/zh_CN/streaming/mqtt.md +++ b/zh_CN/streaming/mqtt.md @@ -43,16 +43,16 @@ CREATE DATABASE stream_demo_mqtt; USE stream_demo_mqtt; CREATE TABLE sink_t ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries PARTITION BY HASH(sid) PARTITIONS 1; CREATE SOURCE src_mqtt ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='mqtt', diff --git a/zh_CN/streaming/overview.md b/zh_CN/streaming/overview.md index 6574428..08c741d 100644 --- a/zh_CN/streaming/overview.md +++ b/zh_CN/streaming/overview.md @@ -47,9 +47,10 @@ Kafka / MQTT / HTTP ```sql CREATE SOURCE src_kafka ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, - value FLOAT64 + ts TIMESTAMP(9), + source_topic STRING METADATA FROM 'topic', + topic_tag STRING AS source_topic, + value FLOAT64, ) WITH ( connector='kafka', brokers='127.0.0.1:9092', @@ -73,7 +74,8 @@ WHERE value >= 2.0; ### Sink Table -`SINK` 不是独立对象,而是一张已存在的内部表。当前必须使用 `TimeSeries` 引擎,且查询输出 schema 必须与 sink table 严格兼容。 +`SINK` 不是独立对象,而是一张已存在的内部表。当前必须使用 +`TimeSeries` 引擎,且查询输出 schema 必须与 sink table 兼容;当类型可转换时,系统会自动补充 cast。 ## Connector 与 Format @@ -108,6 +110,9 @@ ALTER PIPELINE p_kafka STOP; -- 重启一个 pipeline ALTER PIPELINE p_kafka RESTART; +-- 删除前先停止 pipeline +ALTER PIPELINE p_kafka STOP; + -- 删除指定 pipeline DROP PIPELINE p_kafka; diff --git a/zh_CN/streaming/quick-start.md b/zh_CN/streaming/quick-start.md index 9e88c0a..ece94d7 100644 --- a/zh_CN/streaming/quick-start.md +++ b/zh_CN/streaming/quick-start.md @@ -65,8 +65,8 @@ CREATE DATABASE stream_demo; USE stream_demo; CREATE TABLE sink_t ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries @@ -81,8 +81,8 @@ PARTITION BY HASH(sid) PARTITIONS 1; ```sql CREATE SOURCE src_kafka ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='kafka', @@ -165,6 +165,9 @@ ALTER PIPELINE p_kafka RESTART; ### Step 8:清理资源 ```sql +-- 删除前先停止 pipeline +ALTER PIPELINE p_kafka STOP; + -- 删除 pipeline DROP PIPELINE p_kafka; diff --git a/zh_CN/streaming/use_cases.md b/zh_CN/streaming/use_cases.md index 1c104fb..bde21d5 100644 --- a/zh_CN/streaming/use_cases.md +++ b/zh_CN/streaming/use_cases.md @@ -17,8 +17,8 @@ description: "通过 Kafka、MQTT 和 HTTP 示例说明流计算的典型使用 ```sql CREATE TABLE sink_device_alerts ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries @@ -29,8 +29,8 @@ PARTITION BY HASH(sid) PARTITIONS 1; ```sql CREATE SOURCE src_device_kafka ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='kafka', @@ -68,8 +68,8 @@ WHERE value >= 80.0; ```sql CREATE SOURCE src_factory_mqtt ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='mqtt', @@ -84,8 +84,8 @@ CREATE SOURCE src_factory_mqtt ( ```sql CREATE TABLE sink_factory_events ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries @@ -114,8 +114,8 @@ WHERE value >= 2.0; ```sql CREATE SOURCE src_http_once ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='http', @@ -130,8 +130,8 @@ CREATE SOURCE src_http_once ( ```sql CREATE SOURCE src_http_poll ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64 ) WITH ( connector='http', @@ -146,8 +146,8 @@ CREATE SOURCE src_http_poll ( ```sql CREATE TABLE sink_http_poll ( - ts TIMESTAMP(9) NOT NULL, - sid STRING NOT NULL, + ts TIMESTAMP(9), + sid STRING, value FLOAT64, TIMESTAMP KEY(ts) ) ENGINE=TimeSeries From d03a626b1d9ebd7631d7b19725fb0d9234fdffd6 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 30 Mar 2026 09:37:12 +0000 Subject: [PATCH 2/2] second update --- zh_CN/sql-reference/statements/create.md | 14 +++++++++++--- zh_CN/streaming/model.md | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/zh_CN/sql-reference/statements/create.md b/zh_CN/sql-reference/statements/create.md index f095127..6f5d9d5 100644 --- a/zh_CN/sql-reference/statements/create.md +++ b/zh_CN/sql-reference/statements/create.md @@ -88,7 +88,7 @@ CREATE SOURCE source_name ( 其中 `source_field` 支持三种写法: ```SQL -column_name data_type +column_name data_type [NULL | NOT NULL] [COMMENT 'text'] column_name data_type METADATA FROM 'metadata_key' column_name data_type AS expr ``` @@ -97,11 +97,19 @@ column_name data_type AS expr - 第二种是 metadata field,从 connector 读取的消息元信息中提取,常见的 metadata 包括 Kafka 的 topic、partition、offset,HTTP 的 header 等。`METADATA FROM` 的 key 取决于具体 connector,且 metadata 列类型必须与该 key 的类型完全一致。 - 第三种是 computed field,基于 physical field 和 metadata field 定义的计算。Computed field 只能基于 physical field 和 metadata field 定义,不能基于其他 computed field 定义。 +当前版本对 source field 的列选项约束如下: + +- 只有 physical field 允许指定列选项,且仅支持 `NULL`、`NOT NULL` 和 `COMMENT` +- 如果一个 physical field 没有显式指定 `NULL` 或 `NOT NULL`,则默认为 `NULL` +- 如果一个 physical field 被 watermark 声明为事件时间列,则该列会隐式被视为 `NOT NULL`。如果用户显式指定了 `NULL` 会报错 +- metadata field 和 computed field 不支持列选项 +- 同一个 metadata key 不能被多个 source 列重复引用 + 示例 ```SQL CREATE SOURCE src_kafka ( - ts TIMESTAMP(9), + ts TIMESTAMP(9) NOT NULL COMMENT 'event time', source_topic STRING METADATA FROM 'topic', value_label STRING AS source_topic, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND @@ -117,7 +125,6 @@ CREATE SOURCE src_kafka ( 说明 - source 至少需要定义一个列。 -- source field 不支持列选项,例如 `NOT NULL`、`DEFAULT` 等。 - `WITH (...)` 为必填,且不能为空。 - `CREATE SOURCE` 暂不支持 `IF NOT EXISTS`。 - connector 配置项取决于具体 connector,详见 [Connectors 概述](../../streaming/connectors.md)。 @@ -132,6 +139,7 @@ Watermark 用于声明 source 的事件时间语义。它由 `CREATE SOURCE` 中 - 一个 source 至多定义一个 watermark - `WATERMARK FOR ` 中的事件时间列必须来自 physical field 或 metadata field,不能是 computed field +- `WATERMARK FOR ` 中的事件时间列会被视为 non-nullable;显式指定 `NULL` 会报错 - Watermark 表达式也只能基于 physical field 或 metadata field 构建 - Watermark 表达式的结果类型必须是 `TIMESTAMP` diff --git a/zh_CN/streaming/model.md b/zh_CN/streaming/model.md index b57926f..8c24b1d 100644 --- a/zh_CN/streaming/model.md +++ b/zh_CN/streaming/model.md @@ -35,7 +35,7 @@ external system -> source -> pipeline -> sink table ```sql CREATE SOURCE src_mqtt ( - ts TIMESTAMP(9), + ts TIMESTAMP(9) NOT NULL COMMENT 'event time', source_topic STRING METADATA FROM 'topic', topic_tag STRING AS source_topic, value FLOAT64,