使用 DataX 将 Doris 数据写入 MatrixOne
本文介绍如何使用 DataX 工具将 Doris 数据离线写入 MatrixOne 数据库。
开始前准备
在开始使用 DataX 将数据写入 MatrixOne 之前,需要完成安装以下软件:
- 已完成安装和启动 MatrixOne。
- 安装 JDK 8+ version。
- 安装 Python 3.8(or plus)。
- 下载 DataX 安装包,并解压。
- 下载并安装 Doris。
- 下载 matrixonewriter.zip,解压至 DataX 项目根目录的
plugin/writer/
目录下。 - 安装 MySQL Client。
步骤
在 Doris 中创建测试数据
create database test;
use test;
CREATE TABLE IF NOT EXISTS example_tbl
(
user_id BIGINT NOT NULL COMMENT "用户id",
date DATE NOT NULL COMMENT "数据灌入日期时间",
city VARCHAR(20) COMMENT "用户所在城市",
age SMALLINT COMMENT "用户年龄",
sex TINYINT COMMENT "用户性别"
)
DUPLICATE KEY(user_id, date)
DISTRIBUTED BY HASH(user_id) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
insert into example_tbl values
(10000,'2017-10-01','北京',20,0),
(10000,'2017-10-01','北京',20,0),
(10001,'2017-10-01','北京',30,1),
(10002,'2017-10-02','上海',20,1),
(10003,'2017-10-02','广州',32,0),
(10004,'2017-10-01','深圳',35,0),
(10004,'2017-10-03','深圳',35,0);
在 MatrixOne 中创建目标库表
create database sparkdemo;
use sparkdemo;
CREATE TABLE IF NOT EXISTS example_tbl
(
user_id BIGINT NOT NULL COMMENT "用户id",
date DATE NOT NULL COMMENT "数据灌入日期时间",
city VARCHAR(20) COMMENT "用户所在城市",
age SMALLINT COMMENT "用户年龄",
sex TINYINT COMMENT "用户性别"
);
编辑 datax 的 json 模板文件
进入到 datax/job 路径,在 doris2mo.json 填以下内容
{
"job": {
"setting": {
"speed": {
"channel": 8
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"splitPk": "user_id",
"column": [
'*'
],
"connection": [
{
"table": [
"example_tbl"
],
"jdbcUrl": [
"jdbc:mysql://xx.xx.xx.xx:9030/test"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "matrixonewriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "111",
"column": [
'*'
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://xx.xx.xx.xx:6001/sparkdemo",
"table": [
"example_tbl"
]
}
]
}
}
}
]
}
}
启动 datax 作业
python bin/datax.py job/doris2mo.json
显示以下结果:
2024-04-28 15:47:38.222 [job-0] INFO JobContainer -
任务启动时刻 : 2024-04-28 15:47:26
任务结束时刻 : 2024-04-28 15:47:38
任务总计耗时 : 11s
任务平均流量 : 12B/s
记录写入速度 : 0rec/s
读出记录总数 : 7
读写失败总数 : 0