Skip to content

使用 Flink 将 MongoDB 数据写入 MatrixOne

本章节将介绍如何使用 Flink 将 MongoDB 数据写入到 MatrixOne。

前期准备

本次实践需要安装部署以下软件环境:

操作步骤

开启 Mongodb 副本集模式

关闭命令:

mongod -f /opt/software/mongodb/conf/config.conf --shutdown

在 /opt/software/mongodb/conf/config.conf 中增加以下参数

replication:
replSetName: rs0 #复制集名称

重新开启 mangod

mongod -f /opt/software/mongodb/conf/config.conf

然后进入 mongo 执行 rs.initiate() 然后执行 rs.status()

> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "xx.xx.xx.xx:27017",
"ok" : 1
}
rs0:SECONDARY> rs.status()

看到以下相关信息说明复制集启动成功

"members" : [
{
"_id" : 0,
"name" : "xx.xx.xx.xx:27017",
"health" : 1,
"state" : 1,
"stateStr" : "PRIMARY",
"uptime" : 77,
"optime" : {
"ts" : Timestamp(1665998544, 1),
"t" : NumberLong(1)
},
"optimeDate" : ISODate("2022-10-17T09:22:24Z"),
"syncingTo" : "",
"syncSourceHost" : "",
"syncSourceId" : -1,
"infoMessage" : "could not find member to sync from",
"electionTime" : Timestamp(1665998504, 2),
"electionDate" : ISODate("2022-10-17T09:21:44Z"),
"configVersion" : 1,
"self" : true,
"lastHeartbeatMessage" : ""
}
],
"ok" : 1,

rs0:PRIMARY> show dbs
admin   0.000GB
config  0.000GB
local   0.000GB
test    0.000GB

在 flinkcdc sql 界面建立 source 表(mongodb)

在 flink 目录下的 lib 目录下执行,下载 mongodb 的 cdcjar 包

wget <https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.2.1/flink-sql-connector-mongodb-cdc-2.2.1.jar>

建立数据源 mongodb 的映射表,列名也必须一模一样

CREATE TABLE products (
  _id STRING,#必须有这一列,也必须为主键,因为mongodb会给每行数据自动生成一个id
  `name` STRING,
  age INT,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'xx.xx.xx.xx:27017',
  'username' = 'root',
  'password' = '',
  'database' = 'test',
  'collection' = 'test'
);

建立完成后可以执行 select * from products; 查下是否连接成功

在 flinkcdc sql 界面建立 sink 表(MatrixOne)

建立 matrixone 的映射表,表结构需相同,不需要带 id 列

CREATE TABLE cdc_matrixone (
   `name` STRING,
   age INT,
   PRIMARY KEY (`name`) NOT ENFORCED
)WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:6001/test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '111',
'table-name' = 'mongodbtest'   
);

开启 cdc 同步任务

这里同步任务开启后,mongodb 增删改的操作均可同步

INSERT INTO cdc_matrixone SELECT `name`,age FROM products;

#增加
rs0:PRIMARY> db.test.insert({"name" : "ddd", "age" : 90})
WriteResult({ "nInserted" : 1 })
rs0:PRIMARY> db.test.find()
{ "_id" : ObjectId("6347e3c6229d6017c82bf03d"), "name" : "aaa", "age" : 20 }
{ "_id" : ObjectId("6347e64a229d6017c82bf03e"), "name" : "bbb", "age" : 18 }
{ "_id" : ObjectId("6347e652229d6017c82bf03f"), "name" : "ccc", "age" : 28 }
{ "_id" : ObjectId("634d248f10e21b45c73b1a36"), "name" : "ddd", "age" : 90 }
#修改
rs0:PRIMARY> db.test.update({'name':'ddd'},{$set:{'age':'99'}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
#删除
rs0:PRIMARY> db.test.remove({'name':'ddd'})
WriteResult({ "nRemoved" : 1 })