Skip to content

使用 Flink 将 Oracle 数据写入 MatrixOne

本章节将介绍如何使用 Flink 将 Oracle 数据写入到 MatrixOne。

前期准备

本次实践需要安装部署以下软件环境:

操作步骤

在 Oracle 中创建表,并插入数据

create table flinkcdc_empt
(
    EMPNO    NUMBER not null primary key,
    ENAME    VARCHAR2(10),
    JOB      VARCHAR2(9),
    MGR      NUMBER(4),
    HIREDATE DATE,
    SAL      NUMBER(7, 2),
    COMM     NUMBER(7, 2),
    DEPTNO   NUMBER(2)
)
--修改 FLINKCDC_EMPT 表让其支持增量日志
ALTER TABLE scott.FLINKCDC_EMPT ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
--插入测试数据:
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(1, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(2, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(3, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(4, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(5, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(6, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(5989, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);

在 MatrixOne 中创建目标表

create database test;
use test;
CREATE TABLE `oracle_empt` (
    `empno` bigint NOT NULL COMMENT "",
    `ename` varchar(10) NULL COMMENT "",
    `job` varchar(9) NULL COMMENT "",
    `mgr` int NULL COMMENT "",
    `hiredate` datetime NULL COMMENT "",
    `sal` decimal(7, 2) NULL COMMENT "",
    `comm` decimal(7, 2) NULL COMMENT "",
    `deptno` int NULL COMMENT ""
);

复制 jar 包

flink-sql-connector-oracle-cdc-2.2.1.jarflink-connector-jdbc_2.11-1.13.6.jarmysql-connector-j-8.0.31.jar 复制到 flink-1.13.6/lib/

如果 flink 已经启动,需要重启 flink,加载生效 jar 包。

./bin/start-cluster.sh
./bin/sql-client.sh

开启 checkpoint

SET execution.checkpointing.interval = 3s;
-- 创建 source 表 (oracle)
CREATE TABLE `oracle_source` (
    EMPNO bigint NOT NULL,
    ENAME VARCHAR(10),
    JOB VARCHAR(9),
    MGR int,
    HIREDATE timestamp,
    SAL decimal(7,2),
    COMM decimal(7,2),
    DEPTNO int,
    PRIMARY KEY(EMPNO) NOT ENFORCED
) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = 'xx.xx.xx.xx',
     'port' = '1521',
     'username' = 'scott',
     'password' = 'tiger',
     'database-name' = 'ORCLCDB',
     'schema-name' = 'SCOTT',
     'table-name' = 'FLINKCDC_EMPT',
     'debezium.database.tablename.case.insensitive'='false',
     'debezium.log.mining.strategy'='online_catalog'
    );
-- 创建 sink 表 (mo)
CREATE TABLE IF NOT EXISTS `oracle_sink` (
    EMPNO bigint NOT NULL, 
   ENAME VARCHAR(10), 
   JOB VARCHAR(9), 
   MGR int, 
   HIREDATE timestamp, 
   SAL decimal(7,2), 
   COMM decimal(7,2), 
   DEPTNO int, 
    PRIMARY KEY(EMPNO) NOT ENFORCED
) with (
'connector' = 'jdbc',
 'url' = 'jdbc:mysql://ip:6001/test',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = '111',
  'table-name' = 'oracle_empt'
);
-- 将 source 表数据读取插入到 sink 表中
insert into `oracle_sink` select * from `oracle_source`;

在 MatrixOne 中查询对应表数据

select * from oracle_empt;