Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zh_CN/sql-reference/statements/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ ALTER PIPELINE pipeline_name RESTART;

- 目前不支持 `ALTER SOURCE`
- 目前不支持修改 pipeline 的 SQL、source 或 sink 定义;如需调整这类内容,请重新创建对象。
- 如需删除一个正在运行的 pipeline,请先执行 `ALTER PIPELINE ... STOP`
56 changes: 47 additions & 9 deletions zh_CN/sql-reference/statements/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,45 @@ with (ttl='10d')

```SQL
CREATE SOURCE source_name (
column_name data_type [NOT NULL],
...
source_field,
...,
[WATERMARK FOR event_time_column [AS expr]]
) WITH (
connector='kafka|mqtt|http',
format='json|csv',
format='json|csv|parquet',
key='value',
...
)
```

其中 `source_field` 支持三种写法:

```SQL
column_name data_type [NULL | NOT NULL] [COMMENT 'text']
column_name data_type METADATA FROM 'metadata_key'
column_name data_type AS expr
```

- 第一种是普通的 physical field,直接从消息 payload 中解码得到。
- 第二种是 metadata field,从 connector 读取的消息元信息中提取,常见的 metadata 包括 Kafka 的 topic、partition、offset,HTTP 的 header 等。`METADATA FROM` 的 key 取决于具体 connector,且 metadata 列类型必须与该 key 的类型完全一致。
- 第三种是 computed field,基于 physical field 和 metadata field 定义的计算。Computed field 只能基于 physical field 和 metadata field 定义,不能基于其他 computed field 定义。

当前版本对 source field 的列选项约束如下:

- 只有 physical field 允许指定列选项,且仅支持 `NULL`、`NOT NULL` 和 `COMMENT`
- 如果一个 physical field 没有显式指定 `NULL` 或 `NOT NULL`,则默认为 `NULL`
- 如果一个 physical field 被 watermark 声明为事件时间列,则该列会隐式被视为 `NOT NULL`。如果用户显式指定了 `NULL` 会报错
- metadata field 和 computed field 不支持列选项
- 同一个 metadata key 不能被多个 source 列重复引用

示例

```SQL
CREATE SOURCE src_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
ts TIMESTAMP(9) NOT NULL COMMENT 'event time',
source_topic STRING METADATA FROM 'topic',
value_label STRING AS source_topic,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
Expand All @@ -102,14 +124,29 @@ CREATE SOURCE src_kafka (

说明

- `WITH (...)` 为必填,且不能为空。
- source 至少需要定义一个列。
- `WITH (...)` 为必填,且不能为空。
- `CREATE SOURCE` 暂不支持 `IF NOT EXISTS`。
- connector 配置项取决于具体 connector,详见 [Connectors 概述](../../streaming/connectors.md)。

#### Watermark

Watermark 用于声明 source 的事件时间语义。它由 `CREATE SOURCE` 中的 `WATERMARK FOR ...` 子句定义,系统会基于该表达式在运行时持续生成 watermark signal,并传递给下游算子。

最简单的写法是直接把某个时间列声明为事件时间列:`WATERMARK FOR ts`。也可以显式给出 watermark 表达式,例如 `WATERMARK FOR ts AS ts - INTERVAL '5' SECOND`,表示 watermark 信号比事件时间落后 5 秒。

当前版本的约束如下:

- 一个 source 至多定义一个 watermark
- `WATERMARK FOR <column>` 中的事件时间列必须来自 physical field 或 metadata field,不能是 computed field
- `WATERMARK FOR <column>` 中的事件时间列会被视为 non-nullable;显式指定 `NULL` 会报错
- Watermark 表达式也只能基于 physical field 或 metadata field 构建
- Watermark 表达式的结果类型必须是 `TIMESTAMP`

### 创建 Pipeline

`CREATE PIPELINE` 用于创建持续运行的流任务。pipeline 从一个 source 读取数据,执行实时计算,然后把结果写入一个已有的 sink table。
`CREATE PIPELINE` 用于创建持续运行的流任务。pipeline 从一个 source
读取数据,执行实时计算,然后把结果写入一个已有的 sink table。

语法

Expand Down Expand Up @@ -138,7 +175,8 @@ WHERE value >= 2.0;
- 当前一个 pipeline 只能引用一个 source。
- 当前仅支持投影与过滤,不支持 join、聚合、窗口、排序、limit、union、子查询等更复杂算子。
- sink table 必须已经存在,且必须是 TimeSeries 表。
- 查询输出列必须与 sink table 的列名和类型兼容;sink table 中无默认值的非空列必须出现在查询输出中。
- sink table 中无默认值的非空列必须出现在查询输出中。
- 查询输出列必须与 sink table 的列名和类型兼容;当类型可转换时,系统会自动补充必要的 cast。如果不兼容,则会报错。
- 执行 `CREATE PIPELINE` 时,当前用户需要对 source 具备 `SELECT` 权限,并对 sink table 具备 `INSERT` 权限。

### 建表时声明索引(INVERTED / VECTOR)
Expand Down
3 changes: 2 additions & 1 deletion zh_CN/sql-reference/statements/drop.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ DROP PIPELINE [IF EXISTS] [db.]pipeline_name

说明

- 删除 pipeline 时,系统会尝试先停止正在运行的任务,然后再删除元数据。
- 只有处于 `Stopped` 或 `Failed` 状态的 pipeline 才允许删除。
- 如果 pipeline 仍在运行,请先执行 `ALTER PIPELINE ... STOP`,再执行 `DROP PIPELINE`。
- `IF EXISTS` 可用于忽略不存在对象的报错。

### DROP NODE
Expand Down
2 changes: 1 addition & 1 deletion zh_CN/sql-reference/statements/show.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ SHOW SOURCES
SHOW PIPELINES
```

返回结果包含 pipeline 名称、pipeline ID、source、sink、分配节点、运行状态、最后错误和创建时间等信息
返回结果包含 pipeline 名称、pipeline ID、source、sink、运行状态、运行时长、创建时间等信息;在集群模式下还会展示分配节点等集群相关列

说明:

Expand Down
4 changes: 3 additions & 1 deletion zh_CN/streaming/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ Kafka connector 适合持续消费 topic 中的结构化事件流。

- 文档入口:[Kafka Connector](./kafka.md)
- format:`json`、`csv`
- metadata:`topic`、`partition`、`offset`

### MQTT

MQTT connector 适合订阅设备、网关或边缘服务上报的主题消息。

- 文档入口:[MQTT Connector](./mqtt.md)
- format:`json`、`csv`
- metadata:`topic`

### HTTP

HTTP connector 适合按固定周期轮询第三方 API 或内部 HTTP 服务。

- 文档入口:[HTTP Connector](./http.md)
- format:`json`、`csv`
- format:`json`、`csv`、`parquet`
38 changes: 33 additions & 5 deletions zh_CN/streaming/format.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。
| --- | --- | --- |
| JSON | Kafka、MQTT、HTTP | 适合结构化事件消息 |
| CSV | Kafka、MQTT、HTTP | 适合简单表格型文本或按行输入 |
| Parquet | HTTP | 适合一次性抓取或轮询返回的完整 Parquet 文件 |

## 通用规则

Expand All @@ -26,7 +27,8 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。
说明:

- `bad_data` 仅对 source 生效
- 当前按逐行方式解码消息,适合 newline-delimited JSON 和按行 CSV
- `json` 和 `csv` 默认按逐行方式解码,适合 newline-delimited JSON 和按行 CSV
- `parquet` 按整个 payload 解码为一份完整 Parquet 文件

## JSON

Expand All @@ -53,8 +55,8 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。

```sql
CREATE SOURCE src_json (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='kafka',
Expand Down Expand Up @@ -92,8 +94,8 @@ CREATE SOURCE src_json (

```sql
CREATE SOURCE src_csv (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
Expand All @@ -106,12 +108,38 @@ CREATE SOURCE src_csv (
);
```

## Parquet

### Parquet 特点

- 适合 HTTP connector 返回的完整 Parquet 文件
- 更适合批量导入型 source,而不是逐行文本流
- 压缩信息由 Parquet 文件自身 metadata 描述,无需额外配置

### Parquet 配置示例

```sql
CREATE SOURCE src_parquet (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/export.parquet',
method='GET',
poll='once',
format='parquet',
bad_data='fail'
);
```

## 选型建议

| 格式 | 推荐场景 | 不足 |
| --- | --- | --- |
| JSON | 结构化事件、MQTT / Kafka 消息 | 文本体积通常更大 |
| CSV | 简单行式数据、HTTP 接口文本返回 | 字段可读性和扩展性较弱 |
| Parquet | HTTP 返回的列式文件、批量抓取 | 当前仅支持 HTTP connector |

## 常见问题

Expand Down
21 changes: 11 additions & 10 deletions zh_CN/streaming/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ HTTP connector 通过单次或持续轮询 HTTP endpoint,将返回内容作为
| `endpoint` | STRING | 无 | Yes | 轮询地址,支持 `${...}` 时间变量 |
| `method` | STRING | `get` | No | HTTP 方法,支持 `GET` 和 `POST` |
| `poll` | STRING | `once` | No | 轮询模式,支持 `once` 或 `interval(<millis>)` |
| `timeout` | INT | 无 | No | 请求超时时间,单位毫秒 |
| `timeout` | INT | 无 | No | 请求超时时间,单位秒 |
| `headers` | STRING | 无 | No | 请求头,格式为 `k1:v1;k2:v2` |
| `auth_type` | STRING | `none` | No | 鉴权类型,支持 `none` 和 `basic_auth` |
| `username` | STRING | 无 | No | Basic Auth 用户名 |
Expand All @@ -32,7 +32,7 @@ HTTP connector 通过单次或持续轮询 HTTP endpoint,将返回内容作为
- `auth_type='none'`:表示不使用鉴权。此时不能输入 `username`、`password`,否则会报错。
- `auth_type='basic_auth'`:表示使用 Basic Auth。此时必须同时输入 `username` 和 `password`。

Format 相关配置请参考 [Formats](./format.md)。
Format 相关配置请参考 [Formats](./format.md)。其中 `parquet` 目前仅支持 `http` connector。

## endpoint 支持的时间变量

Expand Down Expand Up @@ -147,16 +147,16 @@ CREATE DATABASE stream_demo_http;
USE stream_demo_http;

CREATE TABLE sink_http_once (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;

CREATE SOURCE src_http_once (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
Expand All @@ -183,16 +183,16 @@ SELECT ts, sid, value FROM sink_http_once ORDER BY ts;

```sql
CREATE TABLE sink_http_poll (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;

CREATE SOURCE src_http_poll (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
Expand Down Expand Up @@ -220,3 +220,4 @@ SELECT ts, sid, value FROM sink_http_poll ORDER BY ts;

- 当前 HTTP connector 仅支持作为 source,不支持作为 sink
- `poll='once'` 适合一次性抓取,`poll='interval(...)'` 适合持续轮询
- `format='parquet'` 目前仅支持 `http` connector,且按整个 payload 解码一份完整 Parquet 文件
8 changes: 4 additions & 4 deletions zh_CN/streaming/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ CREATE DATABASE stream_demo;
USE stream_demo;

CREATE TABLE sink_t (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;

CREATE SOURCE src_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='kafka',
Expand Down
11 changes: 7 additions & 4 deletions zh_CN/streaming/model.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ external system -> source -> pipeline -> sink table

```sql
CREATE SOURCE src_mqtt (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
ts TIMESTAMP(9) NOT NULL COMMENT 'event time',
source_topic STRING METADATA FROM 'topic',
topic_tag STRING AS source_topic,
value FLOAT64,
WATERMARK FOR ts
) WITH (
connector='mqtt',
broker='127.0.0.1:1883',
Expand Down Expand Up @@ -84,7 +86,7 @@ sink 不是独立对象,而是 Datalayers 中已存在的一张内部表。当

- sink table 必须事先创建
- sink table 必须使用 `TimeSeries` 引擎
- pipeline 输出列名和类型必须与 sink table 严格兼容
- pipeline 输出列名和类型必须与 sink table 兼容;当类型可转换时,系统会自动补充 cast
- sink table 中非空且没有默认值的列,必须出现在 pipeline 输出里

这意味着在设计 sink table 时,应先确定 pipeline 输出 schema,再创建表结构。
Expand All @@ -104,5 +106,6 @@ sink 不是独立对象,而是 Datalayers 中已存在的一张内部表。当
```sql
ALTER PIPELINE p1 STOP;
ALTER PIPELINE p1 RESTART;
ALTER PIPELINE p1 STOP;
DROP PIPELINE p1;
```
8 changes: 4 additions & 4 deletions zh_CN/streaming/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ CREATE DATABASE stream_demo_mqtt;
USE stream_demo_mqtt;

CREATE TABLE sink_t (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;

CREATE SOURCE src_mqtt (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='mqtt',
Expand Down
13 changes: 9 additions & 4 deletions zh_CN/streaming/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ Kafka / MQTT / HTTP

```sql
CREATE SOURCE src_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
ts TIMESTAMP(9),
source_topic STRING METADATA FROM 'topic',
topic_tag STRING AS source_topic,
value FLOAT64,
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
Expand All @@ -73,7 +74,8 @@ WHERE value >= 2.0;

### Sink Table

`SINK` 不是独立对象,而是一张已存在的内部表。当前必须使用 `TimeSeries` 引擎,且查询输出 schema 必须与 sink table 严格兼容。
`SINK` 不是独立对象,而是一张已存在的内部表。当前必须使用
`TimeSeries` 引擎,且查询输出 schema 必须与 sink table 兼容;当类型可转换时,系统会自动补充 cast。

## Connector 与 Format

Expand Down Expand Up @@ -108,6 +110,9 @@ ALTER PIPELINE p_kafka STOP;
-- 重启一个 pipeline
ALTER PIPELINE p_kafka RESTART;

-- 删除前先停止 pipeline
ALTER PIPELINE p_kafka STOP;

-- 删除指定 pipeline
DROP PIPELINE p_kafka;

Expand Down
Loading