使用 Spark 从 Doris 迁移数据至 MatrixOne
在本章节,我们将介绍使用 Spark 计算引擎实现 Doris 批量数据写入 MatrixOne。
前期准备
本次实践需要安装部署以下软件环境:
- 已完成安装和启动 MatrixOne。
- 下载并安装 Doris。
- 下载并安装 IntelliJ IDEA version 2022.2.1 及以上。
- 下载并安装 JDK 8+。
- 下载并安装 MySQL Client 8.0.33。
操作步骤
步骤一:在 Doris 中准备数据
create database test;
use test;
CREATE TABLE IF NOT EXISTS example_tbl
(
user_id BIGINT NOT NULL COMMENT "用户id",
date DATE NOT NULL COMMENT "数据灌入日期时间",
city VARCHAR(20) COMMENT "用户所在城市",
age SMALLINT COMMENT "用户年龄",
sex TINYINT COMMENT "用户性别"
)
DUPLICATE KEY(user_id, date)
DISTRIBUTED BY HASH(user_id) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
insert into example_tbl values
(10000,'2017-10-01','北京',20,0),
(10000,'2017-10-01','北京',20,0),
(10001,'2017-10-01','北京',30,1),
(10002,'2017-10-02','上海',20,1),
(10003,'2017-10-02','广州',32,0),
(10004,'2017-10-01','深圳',35,0),
(10004,'2017-10-03','深圳',35,0);
步骤二:在 MatrixOne 中准备库表
create database sparkdemo;
use sparkdemo;
CREATE TABLE IF NOT EXISTS example_tbl
(
user_id BIGINT NOT NULL COMMENT "用户id",
date DATE NOT NULL COMMENT "数据灌入日期时间",
city VARCHAR(20) COMMENT "用户所在城市",
age SMALLINT COMMENT "用户年龄",
sex TINYINT COMMENT "用户性别"
);
步骤三:初始化项目
启动 IDEA,并创建一个新的 Maven 项目,添加项目依赖,pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.mo</groupId>
<artifactId>mo-spark-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>3.2.1</spark.version>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.1_2.12</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>2.12.16</scalaVersion>
</configuration>
<version>2.15.1</version>
<executions>
<execution>
<id>compile-scala</id>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptor>jar-with-dependencies</descriptor>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
步骤四:将 Doris 数据写入 MatrixOne
-
编写代码
创建 Doris2Mo.java 类,通过 Spark 读取 Doris 数据,将数据写入到 MatrixOne 中:
package org.example; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.sql.SQLException; /** * @auther MatrixOne * @desc */ public class Doris2Mo { public static void main(String[] args) throws SQLException { SparkSession spark = SparkSession .builder() .appName("Spark Doris to MatixOne") .master("local") .getOrCreate(); Dataset<Row> df = spark.read().format("doris").option("doris.table.identifier", "test.example_tbl") .option("doris.fenodes", "192.168.110.11:8030") .option("user", "root") .option("password", "root") .load(); // JDBC properties for MySQL java.util.Properties mysqlProperties = new java.util.Properties(); mysqlProperties.setProperty("user", "root"); mysqlProperties.setProperty("password", "111"); mysqlProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver"); // MySQL JDBC URL String mysqlUrl = "jdbc:mysql://xx.xx.xx.xx:6001/sparkdemo"; // Write to MySQL df.write() .mode(SaveMode.Append) .jdbc(mysqlUrl, "example_tbl", mysqlProperties); } }
-
查看执行结果
在 MatrixOne 中执行如下 SQL 查询结果:
mysql> select * from sparkdemo.example_tbl; +---------+------------+--------+------+------+ | user_id | date | city | age | sex | +---------+------------+--------+------+------+ | 10000 | 2017-10-01 | 北京 | 20 | 0 | | 10000 | 2017-10-01 | 北京 | 20 | 0 | | 10001 | 2017-10-01 | 北京 | 30 | 1 | | 10002 | 2017-10-02 | 上海 | 20 | 1 | | 10003 | 2017-10-02 | 广州 | 32 | 0 | | 10004 | 2017-10-01 | 深圳 | 35 | 0 | | 10004 | 2017-10-03 | 深圳 | 35 | 0 | +---------+------------+--------+------+------+ 7 rows in set (0.01 sec)
-
在 Spark 中执行
- 添加依赖
通过 Maven 将第 2 步中编写的代码进行打包:
mo-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
, 将以上 Jar 包,放到 Spark 安装目录 jars 下。- 启动 Spark
依赖添加完成后,启动 Spark,这里我使用 Spark Standalone 模式启动
./sbin/start-all.sh
启动完成后,使用 jps 命令查询是否启动成功,出现 master 和 worker 进程即启动成功
[root@node02 jars]# jps 5990 Worker 8093 Jps 5870 Master
- 执行程序
进入 Spark 安装目录下,执行如下命令
[root@node02 spark-3.2.4-bin-hadoop3.2]# bin/spark-submit --class org.example.Doris2Mo --master spark://192.168.110.247:7077 ./jars/mo-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar //class:表示要执行的主类 //master:Spark 程序运行的模式 //mo-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar:运行的程序 jar 包
输出如下结果表示写入成功:
24/04/30 10:24:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1261 bytes result sent to driver 24/04/30 10:24:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1493 ms on node02 (executor driver) (1/1) 24/04/30 10:24:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 24/04/30 10:24:53 INFO DAGScheduler: ResultStage 0 (jdbc at Doris2Mo.java:40) finished in 1.748 s 24/04/30 10:24:53 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 24/04/30 10:24:53 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 24/04/30 10:24:53 INFO DAGScheduler: Job 0 finished: jdbc at Doris2Mo.java:40, took 1.848481 s 24/04/30 10:24:53 INFO SparkContext: Invoking stop() from shutdown hook 24/04/30 10:24:53 INFO SparkUI: Stopped Spark web UI at http://node02:4040 24/04/30 10:24:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/04/30 10:24:53 INFO MemoryStore: MemoryStore cleared 24/04/30 10:24:53 INFO BlockManager: BlockManager stopped 24/04/30 10:24:53 INFO BlockManagerMaster: BlockManagerMaster stopped 24/04/30 10:24:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/04/30 10:24:53 INFO SparkContext: Successfully stopped SparkContext 24/04/30 10:24:53 INFO ShutdownHookManager: Shutdown hook called