Skip to content

CREATE SOURCE

Grammar Description

CREATE SOURCE Creates a connection to streaming data and adds a new SOURCE table to the current database.

Grammar Structure

CREATE [OR REPLACE] SOURCE [IF NOT EXISTS] stream_name
( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
WITH ( property_name = expression [, ...]);

Syntax Explanation

  • stream_name: SOURCE Name. The SOURCE name must be different from any existing SOURCE name in the current database.
  • column_name: Streaming data is mapped to column names in the SOURCE table.
  • data_type: column_name The type of the corresponding field in the data table.
  • property_name = expression: For the specific configuration item name and corresponding values ​​of streaming data mapping, the configurable items can be as follows:

| property_name | expression Description | | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | "type" | Only 'kafka' is supported: Currently only accepted sources are supported as kafka | | "topic" | corresponding topic in kafka data source | | "partion" | corresponding partition in kafka data source | | "value" | Only 'json' is supported: Currently only accepted data formats are supported in json | | "bootstrap.servers" | IP corresponding to kafka server:PORT | | "sasl.username" | Specify the SASL (Simple Authentication and Security Layer) username to use when connecting to Kafka | | "sasl.password" | Used with sasl.username, this parameter provides the corresponding password | | "sasl.mechanisms" | SASL mechanism for authentication between client and server | | "security.protocol" | Specifies the security protocol used to communicate with the Kafka server |

Example

create source stream_test(c1 char(25),c2 varchar(500),c3 text,c4 tinytext,c5 mediumtext,c6 longtext )with(
    "type"='kafka',
    "topic"= 'test',
    "partition" = '0',
    "value"= 'json',
    "bootstrap.servers"='127.0.0.1:9092'
)
Query OK, 0 rows affected (0.01 sec)

limit

The SOURCE table currently does not support drop and alter.

When creating the SOURCE table, only kafka is supported, and only the data transmission format is json.