SeaTunnel搭建部署
SeaTunnel是一个非常易用、超高性能的分布式数据集成平台,支持实时海量数据同步。
具体介绍请看官网文档:https://seatunnel.apache.org/zh-CN/docs/2.3.12/about
一、SeaTunnel-2.3.12部署
1.1 二进制包环境部署
安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置 JAVA_HOME。
安装二进制包
官网下载地址:https://seatunnel.apache.org/download/
#export version="2.3.12" #通过终端下载一下
#wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
#tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
#从2.2.0-beta版本开始,二进制包不再默认提供连接器依赖,因此在第一次使用时,需要执行以下命令来安装连接器:(当然,也可以从Apache Maven Repository 手动下载连接器:https://repo.maven.apache.org/maven2/org/apache/seatunnel/,然后将其移动至connectors/目录下,如果是2.3.5之前则需要放入connectors/seatunnel目录下)。
# sh bin/install-plugin.sh 2.3.12 #下载指定版本的连接器,然后就开始疯狂报错了

#通常情况下,你不需要所有的连接器插件。你可以通过配置config/plugin_config来指定所需的插件,
可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称。
#下面我通过maven的方式解决上面的下载连接器报错,当然可能稍微麻烦一点,安装maven这步不再介绍
# vim ~/.m2/settings.xml
<?xml version="1.0" encoding="UTF-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> <mirrors> <!-- 阿里云公共仓库 --> <mirror> <id>aliyunmaven</id> <name>阿里云公共仓库</name> <url>https://maven.aliyun.com/repository/public</url> <mirrorOf>*</mirrorOf> <!-- 所有请求都走阿里云仓库 --> </mirror> </mirrors> </settings>
# vim /opt/soft/apache-seatunnel-2.3.12/bin/install-plugin.sh #改成使用mvn
#${SEATUNNEL_HOME}/mvnw dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=${line} -Dversion=${version} -Ddest=${SEATUNNEL_HOME}/connectors
mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=${line} -Dversion=${version} -Ddest=${SEATUNNEL_HOME}/connectors# sh bin/install-plugin.sh 2.3.12 #然后再次下载连接器

#上图表示下载连接器成功
# rsync -avz ~/.m2/repository/org/apache/seatunnel/ /opt/soft/apache-seatunnel-2.3.12/connectors/ >/dev/null #手工将这些连接器移到指定目录
验证一下从mysql查询数据
#确认连接器connector-jdbc、connector-doris在${SEATUNNEL_HOME}/connectors/目录下即可
#需要去https://repo1.maven.org/maven2/mysql/mysql-connector-java/下载jdbc driver jar package 驱动,并放置在 ${SEATUNNEL_HOME}/lib/目录下
# cd /opt/soft/apache-seatunnel-2.3.12/lib/
# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
# cd /opt/soft/apache-seatunnel-2.3.12
# mkdir job
#基本配置结构:https://seatunnel.apache.org/zh-CN/docs/2.3.12/concept/config/
# vim /opt/soft/apache-seatunnel-2.3.12/job/st.conf
env {
parallelism = 2
job.mode = "BATCH"
}
source {
#SeaTunnel插件名称区分大小写,必须严格写为Jdbc(首字母大写),如果写成小写jdbc或全大写JDBC,也会导致插件找不到
Jdbc {
url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&serverTimezone=UTC"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
username = "seatunnel"
password = "seatunnel"
# 保持在 MySQL 中拼接的逻辑
query = "SELECT CONCAT('id=',id,', type=',type,', role_name=',role_name,', description=',description,', create_time=',create_time,', update_time=',update_time) AS readable_output FROM seatunnel.role"
fetch_size = 1000
schema {
fields {
readable_output = "VARCHAR"
}
}
# 添加 table 配置,统一 tableId 为简短名称
table = "role"
}
}
sink {
Console {
print-header = false
encoding = "UTF-8"
fields = ["readable_output"]
# 2.3.12 版本特有的简化输出配置
format = "simple"
}
}# cd /opt/soft/apache-seatunnel-2.3.12
# ./bin/seatunnel.sh --config ./job/st.conf -m local #跑一下我们的读取mysql的任务,输出下面的报错信息
ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40) Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Jdbc'.

#哎呀竟然报错了,啥原因我们解决一下
#SeaTunnel 作业配置中使用了 Jdbc 作为数据源(source),但当前 SeaTunnel 环境中没有安装或配置 Jdbc 源插件 #SeaTunnel 2.x+版本的插件默认存放路径:$SEATUNNEL_HOME/plugins,检查是否存在 source 目录下的seatunnel-source-jdbc文件夹
#mkdir /opt/soft/apache-seatunnel-2.3.12/plugins/source && cd /opt/soft/apache-seatunnel-2.3.12/plugins/source
#wget https://repo1.maven.org/maven2/org/apache/seatunnel/connector-jdbc/2.3.12/connector-jdbc-2.3.12.jar
# cd /opt/soft/apache-seatunnel-2.3.12
# ./bin/seatunnel.sh --config ./job/st.conf -m local #再次执行读取mysql打印到屏幕的任务,从输出结果可以看到读取了2行输出了2行
...... INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=default.default.default SeaTunnelRow#kind=INSERT : id=1, type=0, role_name=ADMIN_ROLE, description=Admin User INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=default.default.default SeaTunnelRow#kind=INSERT : id=2, type=1, role_name=NORMAL_ROLE, description=Normal User ...... Job Statistic Information *********************************************** ...... Total Time(s) : 3 Total Read Count : 2 Total Write Count : 2 Total Failed Count : 0 ***********************************************
#可以看到输出了两行内容,但是还有元数据信息并非标准输出,这是因为在2.3.12版本中,Console Sink无法完全去除元数据前缀,但通过配置优化可以简化它;核心的 “字段名 = 值" 数据已经正确输出,这部分是有效的;若需要完全干净的输出,可以将结果输出到文件再查看。
1.2 源码包安装
# mvn -version #首先mvn先安装一下
Apache Maven 3.9.11
# vim ~/.m2/settings.xml #增加华为镜像源,不然会有编译报错,因为有两个包是华为云maven仓库专属
<settings> <mirrors> <!-- 保留阿里云镜像 --> <mirror> <id>aliyunmaven</id> <mirrorOf>central</mirrorOf> <url>https://maven.aliyun.com/repository/public</url> </mirror> <!-- 添加华为云镜像 --> <mirror> <id>huaweicloud-maven</id> <mirrorOf>huaweicloud</mirrorOf> <url>https://repo.huaweicloud.com/repository/maven/</url> </mirror> </mirrors> <profiles> <profile> <id>huaweicloud</id> <repositories> <repository> <id>huaweicloud-maven</id> <url>https://repo.huaweicloud.com/repository/maven/</url> <releases><enabled>true</enabled></releases> <snapshots><enabled>false</enabled></snapshots> </repository> </repositories> </profile> </profiles> <activeProfiles> <activeProfile>huaweicloud</activeProfile> </activeProfiles> </settings>
# wget https://dlcdn.apache.org/seatunnel/2.3.12/apache-seatunnel-2.3.12-src.tar.gz
# tar xf apache-seatunnel-2.3.12-src.tar.gz
# cd apache-seatunnel-2.3.12-src
# mvn clean install -DskipTests -Dskip.spotless=true
#上面那个阿里云源的配置记得配置上啊,如果一开始执行就报错的话可以执行mvn spotless:apply修复一下

#到上面这个界面就表示编译成功了
# cp seatunnel-dist/target/apache-seatunnel-2.3.12-bin.tar.gz /opt/soft/
# cd /opt/soft
# tar xf apache-seatunnel-2.3.12-bin.tar.gz
# cd apache-seatunnel-2.3.12 #源码构建所有的连接器插件和一些必要的依赖都包含在二进制包中。可以直接使用连接器插件,而无需单独安装它们。
# 可以用我们上面的mysql驱动器的测试命令测试一下,没有任何报错
# sh bin/seatunnel-cluster.sh start --daemon #后台启动下服务
博文来自:www.51niux.com
二、apache-seatunnel-web部署
2.1 mysql5.7安装
#apache-seatunnel-web需要mysql5.7+,这里也记录一下编译过程吧
#cd /opt/soft
# wget https://github.com/Kitware/CMake/releases/download/v3.31.10/cmake-3.31.10.tar.gz
# tar xf cmake-3.31.10.tar.gz
# cd cmake-3.31.10
# ./bootstrap
# gmake && gmake install
# /usr/local/bin/cmake -version
cmake version 3.31.10
# cd /opt/soft
# wget https://archives.boost.io/release/1.59.0/source/boost_1_59_0.tar.gz
# tar xf boost_1_59_0.tar.gz
# wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.44.tar.gz
# tar xf mysql-5.7.44.tar.gz
# cd mysql-5.7.44
# /usr/local/bin/cmake -DCMAKE_INSTALL_PREFIX=/opt/soft/mysql -DMYSQL_DATADIR=/opt/soft/mysql/data -DSYSCONFDIR=/etc -DWITH_INNOBASE_STORAGE_ENGINE=1 -DWITH_PARTITION_STORAGE_ENGINE=1 -DWITH_FEDERATED_STORAGE_ENGINE=1 -DWITH_BLACKHOLE_STORAGE_ENGINE=1 -DWITH_MYISAM_STORAGE_ENGINE=1 -DENABLED_LOCAL_INFILE=1 -DENABLE_DTRACE=0 -DDEFAULT_CHARSET=utf8mb4 -DDEFAULT_COLLATION=utf8mb4_general_ci -DWITH_EMBEDDED_SERVER=1 -DDOWNLOAD_BOOST=1 -DFORCE_INSOURCE_BUILD=1 -DWITHOUT_PARTITION_STORAGE_ENGINE=0 -DCMAKE_C_COMPILER=/usr/bin/gcc -DCMAKE_CXX_COMPILER=/usr/bin/g++ -DWITH_BOOST=/opt/soft/boost_1_59_0 -DDOWNLOAD_BOOST=0
# make -j 4
# make install
# useradd mysql -s /sbin/nologin -M
# cp support-files/mysql.server /etc/init.d/mysqld
# chmod +x /etc/init.d/mysqld
# mkdir /opt/soft/mysql/data
# chown mysql:mysql /opt/soft/mysql/data
# mkdir /opt/soft/mysql/log
# chown mysql:mysql /opt/soft/mysql/log
# /opt/soft/mysql/bin/mysqld --initialize --user=mysql --basedir=/opt/soft/mysql --datadir=/opt/soft/mysql/data
# vim /etc/my.cnf
[client] port=3306 socket=/tmp/mysql.sock default-character-set=utf8mb4 [mysqld] server-id=1 log-bin=mysql-bin binlog-format=ROW #skip-grant-tables port=3306 user=mysql max_connections=200 socket=/tmp/mysql.sock basedir=/opt/soft/mysql datadir=/opt/soft/mysql/data pid-file=/opt/soft/mysql/log/mysql.pid character-set-client-handshake=FALSE character-set-server=utf8mb4 collation-server=utf8mb4_bin init_connect='SET NAMES utf8mb4' default-storage-engine=INNODB log_error=/opt/soft/mysql/log/mysql-error.log slow_query_log_file=/opt/soft/mysql/log/mysql-slow.log expire_logs_days=3 [mysqldump] quick max_allowed_packet=16M
#touch /opt/soft/mysql/log/mysql-error.log
#chown mysql:mysql /opt/soft/mysql/log/mysql-error.log
#/etc/init.d/mysqld start
2.2 二进制包安装
# wget https://dlcdn.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-bin.tar.gz
# tar xf apache-seatunnel-web-1.0.2-bin.tar.gz
# vim conf/application.yml
datasource下面的数据库信息修改成实际的.jwt下面的secretKey要填一个比如mysecretkeyforseatunnelweb2025111
# cp /opt/soft/apache-seatunnel-2.3.8/config/hazelcast-client.yaml /opt/soft/apache-seatunnel-web-1.0.2/conf/
# cp /opt/soft/apache-seatunnel-2.3.8/connectors/plugin-mapping.properties /opt/soft/apache-seatunnel-web-1.0.2/conf/
# vim script/seatunnel_server_env.sh #修改成实际的数据库信息
# sh /opt/soft/apache-seatunnel-web-1.0.2/script/init_sql.sh #进行数据库表的初始化,需要的mysql版本是5.7+
# cd /opt/soft/apache-seatunnel-web-1.0.2/libs/ && wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar #不做这步操作的话启动日志会报数据库连接器报错:Caused by: java.lang.IllegalStateException: Cannot load driver class: com.mysql.cj.jdbc.Driver

# cd /opt/soft/apache-seatunnel-web-1.0.2
# vim bin/seatunnel-backend-daemon.sh #加一条环境变量,不然启动报错找不到seatunnel
set - SEATUNNEL_HOME=/opt/soft/apache-seatunnel-2.3.8
# sh bin/seatunnel-backend-daemon.sh start
#we端口为8801,访问下web页面发现有一个报错

查看下报错日志,大概意思是少了datasource:
ERROR [tr:,sp:] [qtp546936087-21] [GlobalExceptionHandler.logError():78] - No supported data source found org.apache.seatunnel.datasource.exception.DataSourceSDKException: No supported data source found at org.apache.seatunnel.datasource.AbstractDataSourceClient.<init>(AbstractDataSourceClient.java:107) at org.apache.seatunnel.datasource.DataSourceClient.<init>(DataSourceClient.java:26) at org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory.getDataSourceClient(DataSourceClientFactory.java:32) at org.apache.seatunnel.app.service.impl.DatasourceServiceImpl.queryAllDatasourcesGroupByType(DatasourceServiceImpl.java:478) at org.apache.seatunnel.app.controller.SeatunnelDatasourceController.getSupportDatasources(SeatunnelDatasourceController.java:302) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ......
又两种方法解决上面的报错:
第一种手工方式:
# mkdir datasource
# cd datasource
# https://repo.maven.apache.org/maven2/org/apache/seatunnel/ #去这里把对应版本的jar包手工下载下来

第二种方式就是用脚本:
# vim bin/download_datasource.sh #还是那个mvnw命令那个ssl证书报错我没有解决所以换成mvn的方式了
#"$SEATUNNEL_WEB_HOME"/mvnw dependency:get -DgroupId=org.apache.seatunnel -DartifactId="$i" -Dversion="$version" -Ddest="$DATASOURCE_DIR" mvn dependency:get -DgroupId=org.apache.seatunnel -DartifactId="$i" -Dversion="$version" -Ddest="$DATASOURCE_DIR"
# sh bin/download_datasource.sh 1.0.2 #搞不懂web-1.0.2脚本里面的version=1.0.0,要么修改脚本变量要么传参
# for num in `find ~/.m2/repository/org/apache/seatunnel/|grep datasource|grep '.jar$'|grep "1.0.2"`;do rsync -avz $num datasource/;done
做了上述操作后,我们重启下apache-seatunnel-web-1.0.2服务再来查看下页面:

#从上面的截图可以看到数据源页面不再报错了,当然也可以再查下日志没有异常报错了。
2.3 源码包安装
#如果有条件的话还是建议进行源码包安装,安装出来的插件都比较全,比二进制包各种报错然后花精力解决要好
#https://github.com/apache/seatunnel-web #git地址

#官网给出的版本对应表,保险期间还是用2.3.8吧,我看web的libs目录下面关于seatunnel的都是2.3.8的
#wget https://downloads.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-src.tar.gz
#tar xf apache-seatunnel-web-1.0.2-src.tar.gz
#cd apache-seatunnel-web-1.0.2-src
# vim build.sh
#/bin/sh $WORKDIR/mvnw clean package -DskipTests -Pci mvn clean package -DskipTests -Pci
#mvn spotless:apply
# sh build.sh code #正常没有报错的话二进制包就打包了

# cp seatunnel-web-dist/target/apache-seatunnel-web-1.0.2.tar.gz /opt/soft/
# cd /opt/soft/
# tar xf apache-seatunnel-web-1.0.2.tar.gz
# vim conf/application.yml
datasource下面的数据库信息修改成实际的.jwt下面的secretKey要填一个比如mysecretkeyforseatunnelweb2025111
# cp /opt/soft/apache-seatunnel-2.3.8/config/hazelcast-client.yaml /opt/soft/apache-seatunnel-web-1.0.2/conf/
# cp /opt/soft/apache-seatunnel-2.3.8/connectors/plugin-mapping.properties /opt/soft/apache-seatunnel-web-1.0.2/conf/
# vim script/seatunnel_server_env.sh #修改成实际的数据库信息
# sh /opt/soft/apache-seatunnel-web-1.0.2/script/init_sql.sh #进行数据库表的初始化
# vim bin/seatunnel-backend-daemon.sh #加一条环境变量,不然启动报错找不到seatunnel
set - SEATUNNEL_HOME=/opt/soft/apache-seatunnel-2.3.8
# sh bin/seatunnel-backend-daemon.sh start
博文来自:www.51niux.com
三、数据库表同步的一些简单示例
#现在基础环境搭建完毕了,现在让我们来一些简单的数据同步
3.1 命令行单表同步
#先来一个最简单的示例,从A库同步完整表到B库
# vim job/test.conf
# 定义运行时环境
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
username = "seatunnel"
password = "seatunnel"
query = "select * from role"
}
}
transform {
# 如果您想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
jdbc {
url = "jdbc:mysql://192.168.1.111:3306/seatunnel_bak?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
username = "seatunnel"
password = "seatunnel"
database = "seatunnel_bak"
table = "role"
generate_sink_sql = true #自动生成插入语句
batch_insert = true # 开启批量插入(提升性能)
batch_size = 1000 # 批量大小(可选,默认500)
}
#如果你想了解更多关于如何配置seatunnel的信息,并查看完整的sink插件列表,
#请前往https://seatunnel.apache.org/docs/connector-v2/sink
}#特别注意目标数据库是要存在的,不然数据同步不过去会有报错提示:Caused by: java.sql.SQLSyntaxErrorException: Unknown database 'seatunnel_bak'
# ./bin/seatunnel.sh --config ./job/test.conf -m local

#从上图的数据库插入情况来看,也可以看出来我们只要库存在,表就能自动创建并自动插入数据
第一点:仅同步数据不同步表结构
好了,那么有一个问题,如果再执行一遍这个脚本会怎么样呢?

#从上图可以看出同样的数据重复insert插入了,明明源表id是主键啊,但是sink表没有这个主键的设置才会这样。Seatunnel JDBC Sink 的核心特性 ——Seatunnel 仅负责 “数据同步”,不负责 “表结构同步 / 创建”,包括主键、索引、字段类型、约束等表结构信息,都不会自动同步 / 创建,这是导致主键未同步的根本原因。就用推荐方法提前手工创建目标表。
#这时候你执行脚本第一遍可以,第二遍就会报错:
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
#直接说结论不贴图片了,一旦设置了主键,只要主键冲突,后面的insert插入也就不会再执行,也就是说这种同步操作适用于新表的全量同步
第二点:绕过主键冲突报错,实现增删改效果
#vim job/test.conf #注意不仅是追加插入数据哦可自行测试效果
# 定义运行时环境
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
username = "seatunnel"
password = "seatunnel"
query = "select * from role"
}
}
transform {
# 无需转换
}
sink {
jdbc {
url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
username = "seatunnel"
password = "seatunnel"
database = "seatunnel"
table = "role"
# 核心:主键冲突时跳过,不报错
skip_on_conflict = true
# 指定主键字段(需与目标表主键一致)
primary_keys = ["id"]
# 保留批量插入提升性能
generate_sink_sql = true
batch_insert = true
batch_size = 1000
# 可选:开启错误重试(进一步避免偶发报错)
max_retries = 3
}
}#剩下的多表,指定字段同步就不写示例了,网上一查就查到了
博文来自:www.51niux.com
四、Mysql CDC模式
#前面介绍的都是命令行模式,如果你不需要实时数据同步可以采用上面那种定时任务的方式,但是如果想要类似于主从数据同步那种方式就要用到CDC模式了
#MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。
4.1 创建mysql用户
#在源库创建有指定权限的用户
mysql> CREATE USER 'source-cdc'@'%' IDENTIFIED BY 'source-cdc'; mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'source-cdc'@'%' IDENTIFIED BY 'source-cdc'; #SELECT是查询权限、RELOAD重载/刷新权限(允许执行FLUSH类命令如FLUSH PRIVILEGES、FLUSH BINARY LOGS,CDC 需刷新binlog元信息) #SHOW DATABASES是查询数据库列表权限,REPLICATION SLAVE是复制从库权限(允许用户模拟MySQL从库,读取主库的binlog,CDC核心获取数据变更日志) #REPLICATION CLIENT是复制客户端权限,允许执行SHOW MASTER STATUS/SHOW SLAVE STATUS,获取binlog位点(文件名+偏移量),CDC 需定位同步起点。 mysql> FLUSH PRIVILEGES;
4.2 启动mysql的binlog
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+ | Variable_name | Value | +--------------------------+----------------+ | binlog_format | ROW | | binlog_row_image | FULL | | enforce_gtid_consistency | ON | | gtid_mode | ON | | log_bin | ON | +--------------------------+----------------+ 5 rows in set (0.00 sec)
#如果与上述结果不一致,请使用以下属性配置MySQL服务器配置文件($MYDateTimeHOME/MySQL.cnf),如下表所示:
server-id = 223344 log_bin = mysql-bin expire_logs_days = 10 binlog_format = row # mysql 5.6+ requires binlog_row_image to be set to FULL binlog_row_image = FULL # enable gtid mode # mysql 5.6+ requires gtid_mode to be set to ON gtid_mode = on enforce_gtid_consistency = on
#上面只是官方的实例,server-id没必要一致啊
# /etc/init.d/mysqld restart
4.3 记录下相关参数
#官方文档:https://seatunnel.apache.org/zh-CN/docs/2.3.12/connector-v2/source/MySQL-CDC
下载相关依赖包,不然会启动报错:
# ls plugins/*.jar #有下面三个jar包就行
plugins/connector-cdc-base-2.3.12.jar plugins/connector-cdc-mysql-2.3.12.jar plugins/connector-jdbc-2.3.12.jar
先来一个-m local方式的数据同步:
#vim job/cdc_test.conf
env {
#本地模式仅支持单并行度
parallelism = 1
#流式同步模式(CDC必需)
job.mode = "STREAMING"
#检查点间隔
checkpoint.interval = 10000
}
source {
MySQL-CDC {
url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&allowPublicKeyRetrieval=true"
#CDC同步账号(需有binlog读取权限)
username = "source-cdc"
password = "source-cdc"
table-names = ["seatunnel.role"]
#唯一server-id(避免与MySQL自身server-id冲突)
server-id = 5401
#启动模式:先全量同步,再增量同步
startup.mode = "initial"
#可选:指定binlog起始位置(避免找不到binlog,需替换为你的实际值)
#先执行 show master status; 获取File和Position后填写
startup.specific.offset.file = "mysql-bin.000001"
startup.specific.offset.pos = 4
}
}
sink {
jdbc {
url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
username = "seatunnel"
password = "seatunnel"
#自动生成写入SQL
generate_sink_sql = true
database = "seatunnel"
#目标表名
table = "role"
# 键字段(保障更新/删除同步)
primary_keys = ["id"]
#关闭XA事务(2.3.12本地模式XA有Bug)
is_exactly_once = false
#关闭批量写入(避免静默失败)
batch_size = 1
#开启调试日志,排查错误的时候使用
#log.enabled = true
#log.level = "DEBUG"
#retry_times = 3
#retry_interval = 1000
}
}# ./bin/seatunnel.sh --config job/cdc_test.conf -m local #这是一个常驻进程了就不是一次性的了,但是是前台启动,可以放到后台并将输入写入到一个日志文件
#如果目的库跟源库数据不一致,只在第一次启动的时候会把目标库的数据更新过来,这个进程常态的可以实现源库数据往目标库的插入和删除和update动作触发同步。
#注意这种方式用# bin/seatunnel.sh --list是查不到在运行任务的,只有使用集群模式才可以
再来2.3.12版本的cdc调用集群方式表同步:
#这就需要#bin/seatunnel-cluster.sh start --daemon 启动服务并确保5801端口是启动的
#vim job/cluster_cdc_job.conf
env {
#集群并行度
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 10000
#Engine集群配置
seatunnel.engine.job.mode = "cluster"
seatunnel.engine.server.address = "127.0.0.1:5801"
seatunnel.engine.job.heartbeat.interval = 30000
#自定义Job Name(支持中文/英文,建议包含业务+表名)
job.name = "MySQL_CDC_同步_seatunnel_role表"
#任务失败自动重启(避免变成CANCELED)
seatunnel.engine.job.restart.count = 3
seatunnel.engine.job.restart.interval = 5000
#任务超时时间(避免长时间卡壳)
seatunnel.engine.job.timeout = 3600000
}
source {
MySQL-CDC {
url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&allowPublicKeyRetrieval=true"
username = "source-cdc"
password = "source-cdc"
#数组格式表名(集群模式不支持单字符串)
table-names = ["seatunnel.role"]
#集群内唯一(避免与其他节点冲突)
server-id = 5401
#首次启动全量+后续增量同步
startup.mode = "initial"
#集群模式优化:增大binlog读取缓存
debezium.connector.properties.max.queue.size = 8192
debezium.connector.properties.max.batch.size = 2048
}
}
sink {
jdbc {
url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
username = "seatunnel"
password = "seatunnel"
generate_sink_sql = true
database = "seatunnel"
table = "role"
primary_keys = ["id"]
#集群模式关闭XA(避免分布式事务问题)
is_exactly_once = false
#集群模式恢复批量(提升性能)
batch_size = 50
#批量刷新间隔
batch_flush_interval = 1000
#集群模式重试
retry_times = 5
retry_interval = 2000
#全字段同步
upsert_all_fields = true
#连接池(集群模式避免连接耗尽)
connection_pool {
type = "hikari"
hikaricp {
maximum-pool-size = 10
minimum-idle = 2
connection-timeout = 30000
}
}
}
}# ./bin/seatunnel.sh --config job/cluster_cdc_job.conf -m cluster #使用集群模式启动服务,一样首次启动的时候会把目标库的数据更新到跟源库表一致
# bin/seatunnel.sh --list #查看再运行的

再来2.3.8版本的cdc调用集群方式表同步:
# vim job/cluster_cdc_job.conf
env {
#并行度
execution.parallelism = 2
#流模式(CDC必须用STREAMING)
job.mode = "STREAMING"
#检查点间隔
checkpoint.interval = 10000
#集群模式相关
seatunnel.engine.job.mode = "cluster"
seatunnel.engine.server.address = "127.0.0.1:5801"
seatunnel.engine.job.heartbeat.interval = 30000
#任务名称
job.name = "MySQL_CDC_同步_seatunnel_role表"
#失败重启配置
seatunnel.engine.job.restart.count = 3
seatunnel.engine.job.restart.interval = 5000
seatunnel.engine.job.timeout = 3600000
#基础配置(避免类加载冲突)
job.type = "STREAM"
checkpoint.storage = "MEMORY"
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=Asia/Shanghai&useSSL=false"
username = "source-cdc"
password = "source-cdc"
database-name = "seatunnel"
table-names = ["seatunnel.role"]
#server-id必须唯一(避免和其他CDC任务冲突,建议范围5000-65535)
server-id = 5401
#启动模式:initial(全量+增量)、latest-offset(仅增量)
startup.mode = "initial"
#时区
server-time-zone = "Asia/Shanghai"
#binlog消费性能配置
max.queue.size = 8192
max.batch.size = 2048
#批量读取超时
poll.interval.ms = 1000
}
}
sink {
jdbc {
url = "jdbc:mysql://192.168.1.111:3306/seatunnel"
driver = "com.mysql.cj.jdbc.Driver"
user = "seatunnel"
password = "seatunnel"
generate_sink_sql = true
database = seatunnel
table = role
primary_keys = ["id"]
}
}# ./bin/seatunnel.sh --cluster seatunnel --config job/cluster_cdc_job.conf #注意--cluster seatunnel 就是对应hazelcast.yaml中的cluster-name
#官网中文文档讲的很清楚,详细的可参照官网文档,集群的模式啊监控啊之类的可以看官方文档: https://seatunnel.apache.org/zh-CN/docs/2.3.12/seatunnel-engine/about
