MatrixOne 到 MySQL CDC 功能
场景描述
一家在线零售企业使用 MatrixOne 作为订单管理系统的生产数据库,用于存储订单数据。为了支持业务的实时分析需求(如订单数量、销售趋势、客户行为等),需要将订单数据从 MatrixOne 实时同步到 MySQL 分析数据库中,供数据分析团队和业务系统使用。通过 mo_cdc 工具,可以高效实现订单数据的实时同步,让分析系统随时获取最新的订单信息。
- 源数据库(生产数据库):MatrixOne 中的 
orders表,包含订单数据,记录每笔订单的详细信息,包括订单 ID、客户 ID、下单时间、订单金额和状态。 - 目标数据库(分析数据库):MySQL 中的 
orders_backup表,用于实时统计和分析订单信息。确保业务团队可以实时掌握销售动态。 - 同步需求:通过 
mo_cdc将 MatrixOne 的orders表中的数据实时同步到 MySQL 的orders_backup表,确保分析系统数据与生产系统一致。 
操作流程
创建表结构
确保源数据库 MatrixOne 和目标数据库 MySQL 中的表结构相同,便于无缝同步数据。
- MatrixOne 中的 
orders表: 
CREATE TABLE source_db.orders (
    order_id INT PRIMARY KEY,
    customer_id INT,
    order_date DATETIME,
    amount DECIMAL(10, 2),
    status VARCHAR(20)
);
INSERT INTO source_db.orders (order_id, customer_id, order_date, amount, status) VALUES
 (1, 101, '2024-01-15 14:30:00', 99.99, 'Shipped'),
 (2, 102, '2024-02-10 10:00:00', 149.50, 'Delivered'),
 (3, 103, '2024-03-05 16:45:00', 75.00, 'Processing'),
 (4, 104, '2024-04-20 09:15:00', 200.00, 'Shipped'),
 (5, 105, '2024-05-12 14:00:00', 49.99, 'Delivered');
- MySQL 中的 
orders_backup表: 
CREATE TABLE analytics_db.orders_backup (
    order_id INT PRIMARY KEY,
    customer_id INT,
    order_date DATETIME,
    amount DECIMAL(10, 2),
    status VARCHAR(20)
);
创建 mo_cdc 同步任务
通过 mo_cdc 工具创建同步任务,将 MatrixOne 的订单数据实时推送至 MySQL。
>./mo_cdc task create \
       --task-name "task1" \
       --source-uri "mysql://root:111@127.0.0.1:6001" \
       --sink-type "mysql" \
       --sink-uri "mysql://root:111@127.0.0.1:3306" \
       --tables "source_db.orders:analytics_db.orders_backup" \
       --level "account" \
       --account "sys"
查看任务状态
> ./mo_cdc task show \
       --task-name "task1" \
       --source-uri "mysql://root:111@127.0.0.1:6001"
[
  {
    "task-id": "0192d76f-d89a-70b3-a60d-615c5f2fd33d",
    "task-name": "task1",
    "source-uri": "mysql://root:******@127.0.0.1:6001",
    "sink-uri": "mysql://root:******@127.0.0.1:3306",
    "state": "running",
    "checkpoint": "{\n  \"source_db.orders\": 2024-10-29 16:43:00.318404 +0800 CST,\n}",
    "timestamp": "2024-10-29 16:43:01.299298 +0800 CST"
  }
] 
连接下游 mysql 查看全量数据同步情况
mysql> select * from analytics_db.orders_backup;
+----------+-------------+---------------------+--------+------------+
| order_id | customer_id | order_date          | amount | status     |
+----------+-------------+---------------------+--------+------------+
|        1 |         101 | 2024-01-15 14:30:00 |  99.99 | Shipped    |
|        2 |         102 | 2024-02-10 10:00:00 | 149.50 | Delivered  |
|        3 |         103 | 2024-03-05 16:45:00 |  75.00 | Processing |
|        4 |         104 | 2024-04-20 09:15:00 | 200.00 | Shipped    |
|        5 |         105 | 2024-05-12 14:00:00 |  49.99 | Delivered  |
+----------+-------------+---------------------+--------+------------+
5 rows in set (0.01 sec)
增量同步任务
任务建立后,在上游 MatrixOne 进行数据变更操作
INSERT INTO source_db.orders (order_id, customer_id, order_date, amount, status) VALUES
(6, 106, '2024-10-29 12:00:00', 150.00, 'New');
DELETE FROM source_db.orders WHERE order_id = 6;
UPDATE source_db.orders SET status = 'Delivered' WHERE order_id = 4;
mysql> select * from source_db.orders;
+----------+-------------+---------------------+--------+------------+
| order_id | customer_id | order_date          | amount | status     |
+----------+-------------+---------------------+--------+------------+
|        4 |         104 | 2024-04-20 09:15:00 | 200.00 | Delivered  |
|        1 |         101 | 2024-01-15 14:30:00 |  99.99 | Shipped    |
|        2 |         102 | 2024-02-10 10:00:00 | 149.50 | Delivered  |
|        3 |         103 | 2024-03-05 16:45:00 |  75.00 | Processing |
|        5 |         105 | 2024-05-12 14:00:00 |  49.99 | Delivered  |
+----------+-------------+---------------------+--------+------------+
5 rows in set (0.00 sec)
连接下游 mysql 查看增量数据同步情况
mysql> select * from analytics_db.orders_backup;
+----------+-------------+---------------------+--------+------------+
| order_id | customer_id | order_date          | amount | status     |
+----------+-------------+---------------------+--------+------------+
|        1 |         101 | 2024-01-15 14:30:00 |  99.99 | Shipped    |
|        2 |         102 | 2024-02-10 10:00:00 | 149.50 | Delivered  |
|        3 |         103 | 2024-03-05 16:45:00 |  75.00 | Processing |
|        4 |         104 | 2024-04-20 09:15:00 | 200.00 | Delivered  |
|        5 |         105 | 2024-05-12 14:00:00 |  49.99 | Delivered  |
+----------+-------------+---------------------+--------+------------+
5 rows in set (0.00 sec)
断点续传
现在由于意外造成任务中断。
> ./mo_cdc task pause \
       --task-name "task1" \
       --source-uri "mysql://root:111@127.0.0.1:6001"
任务中断期间,往上游 MatrixOne 继续插入数据。
INSERT INTO source_db.orders (order_id, customer_id, order_date, amount, status) VALUES
(11, 111, '2024-06-15 08:30:00', 250.75, 'Processing');
INSERT INTO source_db.orders (order_id, customer_id, order_date, amount, status) VALUES
(12, 112, '2024-07-22 15:45:00', 399.99, 'Shipped');
INSERT INTO source_db.orders (order_id, customer_id, order_date, amount, status) VALUES
(13, 113, '2024-08-30 10:20:00', 599.99, 'Delivered');
mysql> select * from source_db.orders;
+----------+-------------+---------------------+--------+------------+
| order_id | customer_id | order_date          | amount | status     |
+----------+-------------+---------------------+--------+------------+
|        1 |         101 | 2024-01-15 14:30:00 |  99.99 | Shipped    |
|        2 |         102 | 2024-02-10 10:00:00 | 149.50 | Delivered  |
|        3 |         103 | 2024-03-05 16:45:00 |  75.00 | Processing |
|        4 |         104 | 2024-04-20 09:15:00 | 200.00 | Delivered  |
|        5 |         105 | 2024-05-12 14:00:00 |  49.99 | Delivered  |
|       11 |         111 | 2024-06-15 08:30:00 | 250.75 | Processing |
|       12 |         112 | 2024-07-22 15:45:00 | 399.99 | Shipped    |
|       13 |         113 | 2024-08-30 10:20:00 | 599.99 | Delivered  |
+----------+-------------+---------------------+--------+------------+
8 rows in set (0.01 sec)
手动恢复任务。
> ./mo_cdc task resume \
       --task-name "task1" \
       --source-uri "mysql://root:111@127.0.0.1:6001"
连接下游 mysql 查看断点续传情况。
mysql> select * from analytics_db.orders_backup;
+----------+-------------+---------------------+--------+------------+
| order_id | customer_id | order_date          | amount | status     |
+----------+-------------+---------------------+--------+------------+
|        1 |         101 | 2024-01-15 14:30:00 |  99.99 | Shipped    |
|        2 |         102 | 2024-02-10 10:00:00 | 149.50 | Delivered  |
|        3 |         103 | 2024-03-05 16:45:00 |  75.00 | Processing |
|        4 |         104 | 2024-04-20 09:15:00 | 200.00 | Delivered  |
|        5 |         105 | 2024-05-12 14:00:00 |  49.99 | Delivered  |
|       11 |         111 | 2024-06-15 08:30:00 | 250.75 | Processing |
|       12 |         112 | 2024-07-22 15:45:00 | 399.99 | Shipped    |
|       13 |         113 | 2024-08-30 10:20:00 | 599.99 | Delivered  |
+----------+-------------+---------------------+--------+------------+
8 rows in set (0.00 sec)
应用效果
通过该方案,零售企业可以实时同步订单数据至分析库,实现订单统计、销售趋势分析、客户行为洞察等应用场景,支持业务决策。同时断点续传保障了在网络延迟或任务中断时的数据一致性,使得数据分析系统始终保持准确、可靠的数据来源。