Skip to content

Write TiDB data to MatrixOne using Flink

This chapter describes how to write TiDB data to MatrixOne using Flink.

Pre-preparation

This practice requires the installation and deployment of the following software environments:

Operational steps

Copy the jar package

Copy the Flink CDC connector and the corresponding Jar packages for flink-connector-jdbc_2.12-1.13.6.jar and mysql-connector-j-8.0.33.jar to flink-1.13.6/lib/.

If flink is already started, you need to restart flink and load the effective jar package.

Create a table in TiDB and insert data

create table EMPQ_cdc
(
    empno    bigint not null,
    ename    VARCHAR(10),
    job      VARCHAR(9),
    mgr      int,
    hiredate  DATE,
    sal      decimal(7,2),
    comm   decimal(7,2),
    deptno   int(2),
    primary key (empno)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT  into empq VALUES (1,"张三","sale",1,'2024-01-01',1000,NULL,1);
INSERT  into empq VALUES (2,"李四","develo,"2,'2024-03-05',5000,NULL,2);
INSERT  into empq VALUES (3,"王五","hr",3,'2024-03-18',2000,NULL,2);
INSERT  into empq VALUES (4,"赵六","pm",4,'2024-03-11',2000,NULL,3);

Creating a Target Table in MatrixOne

create table EMPQ
(
    empno    bigint not null,
    ename    VARCHAR(10),
    job      VARCHAR(9),
    mgr      int,
    hiredate  DATE,
    sal      decimal(7,2),
    comm   decimal(7,2),
    deptno   int(2),
    primary key (empno)
);
./bin/start-cluster.sh 
./bin/sql-client.sh 

Turn on checkpoint

SET execution.checkpointing.interval = 3s; 

The build table statement is in smt/result/flink-create.all.sql.

-- Creating Test Libraries
CREATE DATABASE IF NOT EXISTS `default_catalog`.`test`;

-- Create source table
CREATE TABLE IF NOT EXISTS `default_catalog`.`test`.`EMPQ_src` (
`empno` BIGINT NOT NULL,                                                 
`ename` STRING NULL,                                                   
`job` STRING NULL,                                                      
`mgr` INT NULL,                                                      
`hiredate` DATE NULL,                                                         
`sal` DECIMAL(7, 2) NULL,                                             
`comm` DECIMAL(7, 2) NULL,                                                     
`deptno` INT NULL,                                                        
PRIMARY KEY(`empno`) NOT ENFORCED
) with (
    'connector' = 'tidb-cdc',
    'database-name' = 'test',
    'table-name' = 'EMPQ_cdc',
    'pd-addresses' = 'xx.xx.xx.xx:2379'
);

-- Creating a sink table
CREATE TABLE IF NOT EXISTS `default_catalog`.`test`.`EMPQ_sink` (           
`empno` BIGINT NOT NULL,                                                     
`ename` STRING NULL,                                                     
`job` STRING NULL,                                                        
`mgr` INT NULL,                                                         
`hiredate` DATE NULL,                                                          
`sal` DECIMAL(7, 2) NULL,                                               
`comm` DECIMAL(7, 2) NULL,                                                             
`deptno` INT NULL,                                                           
PRIMARY KEY(`empno`) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:6001/test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '111',
'table-name' = 'empq'
);

Importing TiDB data into MatrixOne

INSERT INTO `default_catalog`.`test`.`EMPQ_sink` SELECT * FROM `default_catalog`.`test`.`EMPQ_src`; 

Query correspondence table data in Matrixone

select * from EMPQ; 

Data can be found to have been imported

Delete a piece of data in TiDB

delete from EMPQ_cdc where empno=1; 

Query table data in MatrixOne, this row has been deleted synchronously.