Skip to content

CREATE SOURCE

语法说明

CREATE SOURCE 创建一个对流式数据的连接,并将一个新的 SOURCE 表添加到当前数据库中。

语法结构

CREATE [OR REPLACE] SOURCE [IF NOT EXISTS] stream_name 
( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
WITH ( property_name = expression [, ...]);

语法解释

  • stream_name: SOURCE 名称。SOURCE 名称必须与当前数据库中任何现有的 SOURCE 名称不同。
  • column_name: 流式数据映射到 SOURCE 表中的列名。
  • data_type: column_name 对应字段在数据表中的类型。
  • property_name = expression: 有关流式数据映射的具体配置项名以及对应的值,可配置项如下:
property_name expression 描述
"type" 仅支持'kafka':目前仅支持接受的源为 kafka
"topic" kafka 数据源中对应的 topic
"partion" kafka 数据源中对应的 partion
"value" 仅支持'json': 目前仅支持接受的数据格式为 json
"bootstrap.servers" kafka 服务器对应的 IP:PORT
"sasl.username" 指定连接到 Kafka 时使用的 SASL(Simple Authentication and Security Layer)用户名
"sasl.password" 与 sasl.username 配对使用,这个参数提供了相应的密码
"sasl.mechanisms" 客户端和服务器之间认证的 SASL 机制
"security.protocol" 指定了与 Kafka 服务器通信时使用的安全协议

示例

create source stream_test(c1 char(25),c2 varchar(500),c3 text,c4 tinytext,c5 mediumtext,c6 longtext )with(
    "type"='kafka',
    "topic"= 'test',
    "partition" = '0',
    "value"= 'json',
    "bootstrap.servers"='127.0.0.1:9092'   
)
Query OK, 0 rows affected (0.01 sec)

限制

SOURCE 表目前不支持 drop 和 alter。

创建 SOURCE 表时目前仅支持连接 kafka,且仅支持传输数据格式为 json。