使用 Kafka 连接 MatrixOne
概述
Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
MatrixOne 支持与 Apache Kafka 进行连接,本文将指导您如何通过 Apache Kafka 连接到 MatrixOne 并实现高效数据流集成与持久化。
开始前准备
-
已完成安装和启动 MatrixOne。
操作步骤
第一步:启动 Kafka 并生产数据
-
解压二进制包 (注意对应版本)
tar -xzf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1
-
启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
-
开启一个新的终端,启动 Kafka
bin/kafka-server-start.sh config/server.properties
-
开启一个新的终端,创建一个 topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
-
开启一个生产者往 topic 中写入 json 数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test {"c1": -2147483648,"c2":20,"c3": -3,"c4":8,"c5":425,"c6":55} {"c1": 21474,"c2":-20,"c3": 3,"c4":9090,"c5":42,"c6":53}
开启一个消费者查看是否成功写入 topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test {"c1": -2147483648,"c2":20,"c3": -3,"c4":8,"c5":425,"c6":55} {"c1": 21474,"c2":-20,"c3": 3,"c4":9090,"c5":42,"c6":53}
第二步:创建 Source 表连接 Kafka
-
创建 Source 表
create source stream_test(c1 int,c2 tinyint,c3 smallint,c4 bigint,c5 int unsigned ,c6 tinyint unsigned) with( "type"='kafka', "topic"= 'test', "partition" = '0', "value"= 'json', "bootstrap.servers"='127.0.0.1:9092' )
查看是否接受了数据:
select * from stream_test; +-------------+------+------+------+------+------+ | c1 | c2 | c3 | c4 | c5 | c6 | +-------------+------+------+------+------+------+ | -2147483648 | 20 | -3 | 8 | 425 | 55 | | 21474 | -20 | 3 | 9090 | 42 | 53 | +-------------+------+------+------+------+------+ 2 rows in set (0.37 sec)
-
往 topic 中持续写入 json 数据,并检查是否继续接受了数据:
{"c1": -3421474,"c2":92,"c3": 333,"c4":9,"c5":42233,"c6":87}
查看是否接受了数据:
select * from stream_test; +-------------+------+------+------+-------+------+ | c1 | c2 | c3 | c4 | c5 | c6 | +-------------+------+------+------+-------+------+ | -2147483648 | 20 | -3 | 8 | 425 | 55 | | 21474 | -20 | 3 | 9090 | 42 | 53 | | -3421474 | 92 | 333 | 9 | 42233 | 87 | +-------------+------+------+------+-------+------+ 3 rows in set (0.44 sec)
第三步:创建动态表消费 Source 表中数据
-
创建动态表以消费 Source 表:
create dynamic table dt_test as select c1, c2+c3, c3*c4,c5/c3,c6/10 from stream_test;
查看动态表:
select * from dt_test; +-------------+---------+---------+---------------------+---------+ | c1 | c2 + c3 | c3 * c4 | c5 / c3 | c6 / 10 | +-------------+---------+---------+---------------------+---------+ | -2147483648 | 17 | -24 | -141.66666666666666 | 5.5 | | 21474 | -17 | 27270 | 14 | 5.3 | | -3421474 | 425 | 2997 | 126.82582582582583 | 8.7 | +-------------+---------+---------+---------------------+---------+ 3 rows in set (0.00 sec)
-
往 topic 中持续写入 json 数据,并检查动态表是否更新:
{"c1": 1474,"c2":2,"c3": 453,"c4":1,"c5":56233,"c6":7}
查看动态表发现成功更新:
select * from dt_test; +-------------+---------+---------+---------------------+---------+ | c1 | c2 + c3 | c3 * c4 | c5 / c3 | c6 / 10 | +-------------+---------+---------+---------------------+---------+ | -2147483648 | 17 | -24 | -141.66666666666666 | 5.5 | | 21474 | -17 | 27270 | 14 | 5.3 | | -3421474 | 425 | 2997 | 126.82582582582583 | 8.7 | | 1474 | 455 | 453 | 124.13465783664459 | 0.7 | +-------------+---------+---------+---------------------+---------+ 4 rows in set (0.00 sec)