Skip to content

使用 Flink 将 SQL Server 数据写入 MatrixOne

本章节将介绍如何使用 Flink 将 SQL Server 数据写入到 MatrixOne。

前期准备

本次实践需要安装部署以下软件环境:

操作步骤

在 SQL Server 中创建库、表并插入数据

create database sstomo;
use sstomo;
create table sqlserver_data (
    id INT PRIMARY KEY,
    name NVARCHAR(100),
    age INT,
    entrytime DATE,
    gender NVARCHAR(2)
);

insert into sqlserver_data (id, name, age, entrytime, gender)
values  (1, 'Lisa', 25, '2010-10-12', '0'),
        (2, 'Liming', 26, '2013-10-12', '0'),
        (3, 'asdfa', 27, '2022-10-12', '0'),
        (4, 'aerg', 28, '2005-10-12', '0'),
        (5, 'asga', 29, '2015-10-12', '1'),
        (6, 'sgeq', 30, '2010-10-12', '1');

SQL Server 配置 CDC

  1. 确认当前用户已开启 sysadmin 权限 查询当前用户权限,必须为 sysadmin 固定服务器角色的成员才允许对数据库启用 CDC (变更数据捕获) 功能。 通过下面命令查询 sa 用户是否开启 sysadmin

    exec sp_helpsrvrolemember 'sysadmin';
    

  2. 查询当前数据库是否启用 CDC(变更数据捕获能力)功能

    备注:0:表示未启用;1:表示启用

    如未开启,则执行如下 sql 开启:

    use sstomo;
    exec sys.sp_cdc_enable_db;
    
  3. 查询表是否已经启用 CDC (变更数据捕获) 功能

    select name,is_tracked_by_cdc from sys.tables where name = 'sqlserver_data';
    

    备注:0:表示未启用;1:表示启用 如未开启,则执行下面 sql 进行开启:

    use sstomo;
    exec sys.sp_cdc_enable_table 
    @source_schema = 'dbo', 
    @source_name = 'sqlserver_data', 
    @role_name = NULL, 
    @supports_net_changes = 0;
    
  4. 表 sqlserver_data 启动 CDC (变更数据捕获) 功能配置完成

    查看数据库下的系统表,会发现多了些 cdc 相关数据表,其中 cdc.dbo_sqlserver_flink_CT 就是记录源表的所有 DML 操作记录,每个表对应一个实例表。

  5. 确认 CDC agent 是否正常启动

    执行下面命令查看 CDC agent 是否开启:

    exec master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT';
    

    如状态是 Stopped,则需要开启 CDC agent。

    在 Windows 环境开启 CDC agent: 在安装 SqlServer 数据库的机器上,打开 Microsoft Sql Server Managememt Studio,右击下图位置(SQL Server 代理),点击开启,如下图:

    开启之后,再次查询 agent 状态,确认状态变更为 running

    至此,表 sqlserver_data 启动 CDC (变更数据捕获) 功能全部完成。

在 MatrixOne 中创建目标库及表

create database sstomo;
use sstomo;
CREATE TABLE sqlserver_data (
     id int NOT NULL,
     name varchar(100) DEFAULT NULL,
     age int DEFAULT NULL,
     entrytime date DEFAULT NULL,
     gender char(1) DEFAULT NULL,
     PRIMARY KEY (id)
);
  1. 复制 cdc jar 包

    link-sql-connector-sqlserver-cdc-2.3.0.jarflink-connector-jdbc_2.12-1.13.6.jarmysql-connector-j-8.0.33.jar 复制到 flink 的 lib 目录下。

  2. 启动 flink

    切换到 flink 目录,并启动集群

    ./bin/start-cluster.sh
    

    启动 Flink SQL CLIENT

    ./bin/sql-client.sh
    
  3. 开启 checkpoint

    SET execution.checkpointing.interval = 3s;
    
-- 创建 source 表
CREATE TABLE sqlserver_source (
id INT,
name varchar(50),
age INT,
entrytime date,
gender varchar(100),
PRIMARY KEY (`id`) not enforced
) WITH( 
'connector' = 'sqlserver-cdc',
'hostname' = 'xx.xx.xx.xx',
'port' = '1433',
'username' = 'sa',
'password' = '123456',
'database-name' = 'sstomo',
'schema-name' = 'dbo',
'table-name' = 'sqlserver_data');

-- 创建 sink 表
CREATE TABLE sqlserver_sink (
id INT,
name varchar(100),
age INT,
entrytime date,
gender varchar(10),
PRIMARY KEY (`id`) not enforced
) WITH( 
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:6001/sstomo',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '111',
'table-name' = 'sqlserver_data'
);

-- 将 source 表数据读取插入到 sink 表中
Insert into sqlserver_sink select * from sqlserver_source;

在 MatrixOne 中查询对应表数据

use sstomo;
select * from sqlserver_data;

在 SQL Server 中新增数据

在 SqlServer 表 sqlserver_data 中插入 3 条数据:

insert into sstomo.dbo.sqlserver_data (id, name, age, entrytime, gender)
values (7, 'Liss12a', 25, '2010-10-12', '0'),
      (8, '12233s', 26, '2013-10-12', '0'),
      (9, 'sgeq1', 304, '2010-10-12', '1');

在 MatrixOne 中查询对应表数据:

select * from sstomo.sqlserver_data;

在 SQL Server 中删除增数据

在 SQL Server 中删除 id 为 3 和 4 的两行:

delete from sstomo.dbo.sqlserver_data where id in(3,4);

在 mo 中查询表数据,这两行已同步删除:

在 SQL Server 中更新增数据

在 SqlServer 表中更新两行数据:

update sstomo.dbo.sqlserver_data set age = 18 where id in(1,2);

在 MatrixOne 中查询表数据,这两行已同步更新: