Skip to content

Write PostgreSQL data to MatrixOne using Flink

This chapter describes how to write PostgreSQL data to MatrixOne using Flink.


This practice requires the installation and deployment of the following software environments:

Operational steps


Copy the jar package

Copy the Flink CDC connector and the corresponding Jar packages for flink-connector-jdbc_2.12-1.13.6.jar and mysql-connector-j-8.0.33.jar to flink-1.13.6/lib/ If flink is already started, restart flink and load the valid jar package.

Postgresql Turn on cdc configuration

  1. postgresql.conf Configuration

    #change the maximum number of wal send processes (default is 10), which is the same value as the solts setting above 
    max_wal_senders = 10 # max number of walsender processes #break replication connections that have been inactive for more than the specified number of milliseconds, you can set it appropriately a little larger (default 60s) 
    wal_sender_timeout = 180s # in milliseconds; 0 disables #change the maximum number of solts (default is 10), flink-cdc defaults to one table 
    max_replication_slots = 10 # max number of replication slots #specify as logical 
    wal_level = logical # minimal, replica, or logical
  2. pg_hba.conf

    #IPv4 local connections: 
    host all all password 
    host replication all password 

Create table in postgresql and insert data

create table student
    stu_id integer not null unique,
    stu_name varchar(50),
    stu_age integer,
    stu_bth date

INSERT  into student VALUES (1,"lisa",12,'2022-10-12');
INSERT  into student VALUES (2,"tom",23,'2021-11-10');
INSERT  into student VALUES (3,"jenny",11,'2024-02-19');
INSERT  into student VALUES (4,"henry",12,'2022-04-22');

Building tables in MatrixOne

create table student
    stu_id integer not null unique,
    stu_name varchar(50),
    stu_age integer,
    stu_bth date

Start cluster

Switch to the flink directory and execute the following command:


Turn on checkpoint

Set up checkpoint every 3 seconds

SET execution.checkpointing.interval = 3s; 
CREATE TABLE pgsql_bog  (
      stu_id  int not null,
      stu_name    varchar(50),
      stu_age     int,
      stu_bth     date,
     primary key (stu_id) not enforced
) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'xx.xx.xx.xx',
      'port' = '5432',
      'username' = 'postgres',
      'password' = '123456',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'student',
      '' = 'pgoutput' ,
      'debezium.snapshot.mode' = 'initial'
      ) ;

If it's table sql, pgoutput is the standard logical decode output plugin in PostgreSQL 10+. It needs to be set up. Without adding: '' = 'pgoutput', an error is reported: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory.

Create sink table

CREATE TABLE test_pg (
      stu_id  int not null,
      stu_name    varchar(50),
      stu_age     int,
      stu_bth     date,
      primary key (stu_id) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:6001/postgre',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '111',
'table-name' = 'student'

Importing PostgreSQL data into MatrixOne

insert into test_pg select * from pgsql_bog; 

Query the corresponding table data in MatrixOne;

mysql> select * from student;
| stu_id | stu_name | stu_age | stu_bth    |
|      1 | lisa     |      12 | 2022-10-12 |
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |
4 rows in set (0.00 sec)

Data can be found to have been imported

Adding data to postgrsql

insert into public.student values (51, '58', 39, '2020-01-03'); 

Query the corresponding table data in MatrixOne;

mysql>  select * from student;
| stu_id | stu_name | stu_age | stu_bth    |
|      1 | lisa     |      12 | 2022-10-12 |
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |
|     51 | 58       |      39 | 2020-01-03 |
5 rows in set (0.01 sec)

You can find that the data has been synchronized to the MatrixOne correspondence table.

To delete data:

delete from public.student where stu_id=1; 

If something goes wrong,

cannot delete from table "student" because it does not have a replica identity and publishes deletes 

then execute

alter table public.student replica identity full; 

Query the corresponding table data in MatrixOne;

mysql> select * from student;
| stu_id | stu_name | stu_age | stu_bth    |
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |
|     51 | 58       |      39 | 2020-01-03 |
4 rows in set (0.00 sec)