柴少的官方网站 技术在学习中进步,水平在分享中升华

SeaTunnel搭建部署

SeaTunnel原名Waterdrop(水滴),2021年10月更名为SeaTunnel,2021年12月9日加入Apache孵化,2023年6月1日毕业成为Apache顶级项目。

SeaTunnel是一个非常易用、超高性能的分布式数据集成平台,支持实时海量数据同步。

具体介绍请看官网文档:https://seatunnel.apache.org/zh-CN/docs/2.3.12/about

一、SeaTunnel-2.3.12部署

1.1 二进制包环境部署

安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置 JAVA_HOME。

安装二进制包

官网下载地址:https://seatunnel.apache.org/download/

#export version="2.3.12"   #通过终端下载一下

#wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"

#tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

下载连接器插件

#从2.2.0-beta版本开始,二进制包不再默认提供连接器依赖,因此在第一次使用时,需要执行以下命令来安装连接器:(当然,也可以从Apache Maven Repository 手动下载连接器:https://repo.maven.apache.org/maven2/org/apache/seatunnel/,然后将其移动至connectors/目录下,如果是2.3.5之前则需要放入connectors/seatunnel目录下)。

# sh bin/install-plugin.sh 2.3.12  #下载指定版本的连接器,然后就开始疯狂报错了

image.png

#通常情况下,你不需要所有的连接器插件。你可以通过配置config/plugin_config来指定所需的插件,

可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称。

#下面我通过maven的方式解决上面的下载连接器报错,当然可能稍微麻烦一点,安装maven这步不再介绍

# vim ~/.m2/settings.xml 

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
  <mirrors>
    <!-- 阿里云公共仓库 -->
    <mirror>
      <id>aliyunmaven</id>
      <name>阿里云公共仓库</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <mirrorOf>*</mirrorOf> <!-- 所有请求都走阿里云仓库 -->
    </mirror>
  </mirrors>
</settings>

# vim /opt/soft/apache-seatunnel-2.3.12/bin/install-plugin.sh  #改成使用mvn

#${SEATUNNEL_HOME}/mvnw dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=${line} -Dversion=${version} -Ddest=${SEATUNNEL_HOME}/connectors
mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=${line} -Dversion=${version} -Ddest=${SEATUNNEL_HOME}/connectors

# sh bin/install-plugin.sh 2.3.12  #然后再次下载连接器

image.png

#上图表示下载连接器成功

# rsync -avz ~/.m2/repository/org/apache/seatunnel/ /opt/soft/apache-seatunnel-2.3.12/connectors/ >/dev/null  #手工将这些连接器移到指定目录

验证一下从mysql查询数据

#确认连接器connector-jdbc、connector-doris在${SEATUNNEL_HOME}/connectors/目录下即可

#需要去https://repo1.maven.org/maven2/mysql/mysql-connector-java/下载jdbc driver jar package 驱动,并放置在 ${SEATUNNEL_HOME}/lib/目录下

# cd /opt/soft/apache-seatunnel-2.3.12/lib/

# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar

# cd /opt/soft/apache-seatunnel-2.3.12

# mkdir job

#基本配置结构:https://seatunnel.apache.org/zh-CN/docs/2.3.12/concept/config/

# vim /opt/soft/apache-seatunnel-2.3.12/job/st.conf

env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  #SeaTunnel插件名称区分大小写,必须严格写为Jdbc(首字母大写),如果写成小写jdbc或全大写JDBC,也会导致插件找不到
  Jdbc {
    url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&serverTimezone=UTC"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    username = "seatunnel"
    password = "seatunnel"
    # 保持在 MySQL 中拼接的逻辑
    query = "SELECT CONCAT('id=',id,', type=',type,', role_name=',role_name,', description=',description,', create_time=',create_time,', update_time=',update_time) AS readable_output FROM seatunnel.role"
    fetch_size = 1000
    schema {
      fields {
        readable_output = "VARCHAR"
      }
    }
    # 添加 table 配置,统一 tableId 为简短名称
    table = "role"
  }
}

sink {
  Console {
    print-header = false
    encoding = "UTF-8"
    fields = ["readable_output"]
    # 2.3.12 版本特有的简化输出配置
    format = "simple"
  }
}

# cd /opt/soft/apache-seatunnel-2.3.12

# ./bin/seatunnel.sh --config ./job/st.conf -m local  #跑一下我们的读取mysql的任务,输出下面的报错信息

ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Jdbc'.

image.png

#哎呀竟然报错了,啥原因我们解决一下

#SeaTunnel 作业配置中使用了 Jdbc 作为数据源(source),但当前 SeaTunnel 环境中没有安装或配置 Jdbc 源插件
#SeaTunnel 2.x+版本的插件默认存放路径:$SEATUNNEL_HOME/plugins,检查是否存在 source 目录下的seatunnel-source-jdbc文件夹

#mkdir /opt/soft/apache-seatunnel-2.3.12/plugins/source   && cd  /opt/soft/apache-seatunnel-2.3.12/plugins/source

#wget https://repo1.maven.org/maven2/org/apache/seatunnel/connector-jdbc/2.3.12/connector-jdbc-2.3.12.jar

# cd /opt/soft/apache-seatunnel-2.3.12

# ./bin/seatunnel.sh --config ./job/st.conf -m local  #再次执行读取mysql打印到屏幕的任务,从输出结果可以看到读取了2行输出了2行

......
INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=default.default.default SeaTunnelRow#kind=INSERT : id=1, type=0, role_name=ADMIN_ROLE, description=Admin User
INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=default.default.default SeaTunnelRow#kind=INSERT : id=2, type=1, role_name=NORMAL_ROLE, description=Normal User
......
Job Statistic Information
***********************************************
......
Total Time(s)             :                   3
Total Read Count          :                   2
Total Write Count         :                   2
Total Failed Count        :                   0
***********************************************

#可以看到输出了两行内容,但是还有元数据信息并非标准输出,这是因为在2.3.12版本中,Console Sink无法完全去除元数据前缀,但通过配置优化可以简化它;核心的 “字段名 = 值" 数据已经正确输出,这部分是有效的;若需要完全干净的输出,可以将结果输出到文件再查看。

1.2 源码包安装

# mvn -version   #首先mvn先安装一下

Apache Maven 3.9.11

# vim ~/.m2/settings.xml  #增加华为镜像源,不然会有编译报错,因为有两个包是华为云maven仓库专属

<settings>
    <mirrors>
        <!-- 保留阿里云镜像 -->
        <mirror>
            <id>aliyunmaven</id>
            <mirrorOf>central</mirrorOf>
            <url>https://maven.aliyun.com/repository/public</url>
        </mirror>
        <!-- 添加华为云镜像 -->
        <mirror>
            <id>huaweicloud-maven</id>
            <mirrorOf>huaweicloud</mirrorOf>
            <url>https://repo.huaweicloud.com/repository/maven/</url>
        </mirror>
    </mirrors>
    <profiles>
        <profile>
            <id>huaweicloud</id>
            <repositories>
                <repository>
                    <id>huaweicloud-maven</id>
                    <url>https://repo.huaweicloud.com/repository/maven/</url>
                    <releases><enabled>true</enabled></releases>
                    <snapshots><enabled>false</enabled></snapshots>
                </repository>
            </repositories>
        </profile>
    </profiles>
    <activeProfiles>
        <activeProfile>huaweicloud</activeProfile>
    </activeProfiles>
</settings>

# wget https://dlcdn.apache.org/seatunnel/2.3.12/apache-seatunnel-2.3.12-src.tar.gz

# tar xf apache-seatunnel-2.3.12-src.tar.gz

# cd apache-seatunnel-2.3.12-src

# mvn clean install -DskipTests -Dskip.spotless=true

 #上面那个阿里云源的配置记得配置上啊,如果一开始执行就报错的话可以执行mvn spotless:apply修复一下

image.png

#到上面这个界面就表示编译成功了

# cp seatunnel-dist/target/apache-seatunnel-2.3.12-bin.tar.gz /opt/soft/

# cd /opt/soft

# tar xf apache-seatunnel-2.3.12-bin.tar.gz

# cd apache-seatunnel-2.3.12  #源码构建所有的连接器插件和一些必要的依赖都包含在二进制包中。可以直接使用连接器插件,而无需单独安装它们。

# 可以用我们上面的mysql驱动器的测试命令测试一下,没有任何报错

# sh bin/seatunnel-cluster.sh start --daemon  #后台启动下服务

博文来自:www.51niux.com

二、apache-seatunnel-web部署

2.1 mysql5.7安装

#apache-seatunnel-web需要mysql5.7+,这里也记录一下编译过程吧

#cd /opt/soft

# wget https://github.com/Kitware/CMake/releases/download/v3.31.10/cmake-3.31.10.tar.gz

# tar xf cmake-3.31.10.tar.gz 

# cd cmake-3.31.10

# ./bootstrap

# gmake && gmake install

# /usr/local/bin/cmake -version

cmake version 3.31.10

# cd /opt/soft

# wget https://archives.boost.io/release/1.59.0/source/boost_1_59_0.tar.gz

# tar xf boost_1_59_0.tar.gz

# wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.44.tar.gz

# tar xf mysql-5.7.44.tar.gz

# cd mysql-5.7.44

# /usr/local/bin/cmake -DCMAKE_INSTALL_PREFIX=/opt/soft/mysql -DMYSQL_DATADIR=/opt/soft/mysql/data -DSYSCONFDIR=/etc -DWITH_INNOBASE_STORAGE_ENGINE=1 -DWITH_PARTITION_STORAGE_ENGINE=1 -DWITH_FEDERATED_STORAGE_ENGINE=1 -DWITH_BLACKHOLE_STORAGE_ENGINE=1 -DWITH_MYISAM_STORAGE_ENGINE=1 -DENABLED_LOCAL_INFILE=1 -DENABLE_DTRACE=0 -DDEFAULT_CHARSET=utf8mb4 -DDEFAULT_COLLATION=utf8mb4_general_ci -DWITH_EMBEDDED_SERVER=1 -DDOWNLOAD_BOOST=1 -DFORCE_INSOURCE_BUILD=1 -DWITHOUT_PARTITION_STORAGE_ENGINE=0 -DCMAKE_C_COMPILER=/usr/bin/gcc -DCMAKE_CXX_COMPILER=/usr/bin/g++ -DWITH_BOOST=/opt/soft/boost_1_59_0 -DDOWNLOAD_BOOST=0

# make -j 4

# make install

# useradd mysql  -s /sbin/nologin -M

# cp support-files/mysql.server /etc/init.d/mysqld

# chmod +x /etc/init.d/mysqld

# mkdir /opt/soft/mysql/data

# chown mysql:mysql /opt/soft/mysql/data

# mkdir /opt/soft/mysql/log

# chown mysql:mysql /opt/soft/mysql/log

# /opt/soft/mysql/bin/mysqld --initialize --user=mysql --basedir=/opt/soft/mysql --datadir=/opt/soft/mysql/data

# vim /etc/my.cnf

[client]
port=3306
socket=/tmp/mysql.sock
default-character-set=utf8mb4
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
#skip-grant-tables
port=3306
user=mysql
max_connections=200
socket=/tmp/mysql.sock
basedir=/opt/soft/mysql
datadir=/opt/soft/mysql/data
pid-file=/opt/soft/mysql/log/mysql.pid
character-set-client-handshake=FALSE
character-set-server=utf8mb4
collation-server=utf8mb4_bin
init_connect='SET NAMES utf8mb4'
default-storage-engine=INNODB
log_error=/opt/soft/mysql/log/mysql-error.log
slow_query_log_file=/opt/soft/mysql/log/mysql-slow.log
expire_logs_days=3
[mysqldump]
quick
max_allowed_packet=16M

#touch  /opt/soft/mysql/log/mysql-error.log

#chown mysql:mysql /opt/soft/mysql/log/mysql-error.log

#/etc/init.d/mysqld start

2.2 二进制包安装

# wget https://dlcdn.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-bin.tar.gz

# tar xf apache-seatunnel-web-1.0.2-bin.tar.gz

# vim conf/application.yml

datasource下面的数据库信息修改成实际的.jwt下面的secretKey要填一个比如mysecretkeyforseatunnelweb2025111

# cp /opt/soft/apache-seatunnel-2.3.8/config/hazelcast-client.yaml  /opt/soft/apache-seatunnel-web-1.0.2/conf/

# cp /opt/soft/apache-seatunnel-2.3.8/connectors/plugin-mapping.properties  /opt/soft/apache-seatunnel-web-1.0.2/conf/

# vim script/seatunnel_server_env.sh  #修改成实际的数据库信息

# sh /opt/soft/apache-seatunnel-web-1.0.2/script/init_sql.sh  #进行数据库表的初始化,需要的mysql版本是5.7+

# cd /opt/soft/apache-seatunnel-web-1.0.2/libs/ &&  wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar  #不做这步操作的话启动日志会报数据库连接器报错:Caused by: java.lang.IllegalStateException: Cannot load driver class: com.mysql.cj.jdbc.Driver

image.png

# cd /opt/soft/apache-seatunnel-web-1.0.2

# vim bin/seatunnel-backend-daemon.sh   #加一条环境变量,不然启动报错找不到seatunnel

set -
SEATUNNEL_HOME=/opt/soft/apache-seatunnel-2.3.8

# sh bin/seatunnel-backend-daemon.sh start

#we端口为8801,访问下web页面发现有一个报错 

image.png

查看下报错日志,大概意思是少了datasource:

ERROR [tr:,sp:] [qtp546936087-21] [GlobalExceptionHandler.logError():78] - No supported data source found
org.apache.seatunnel.datasource.exception.DataSourceSDKException: No supported data source found
	at org.apache.seatunnel.datasource.AbstractDataSourceClient.<init>(AbstractDataSourceClient.java:107)
	at org.apache.seatunnel.datasource.DataSourceClient.<init>(DataSourceClient.java:26)
	at org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory.getDataSourceClient(DataSourceClientFactory.java:32)
	at org.apache.seatunnel.app.service.impl.DatasourceServiceImpl.queryAllDatasourcesGroupByType(DatasourceServiceImpl.java:478)
	at org.apache.seatunnel.app.controller.SeatunnelDatasourceController.getSupportDatasources(SeatunnelDatasourceController.java:302)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
......	

又两种方法解决上面的报错:
第一种手工方式:
# mkdir datasource

# cd datasource

# https://repo.maven.apache.org/maven2/org/apache/seatunnel/   #去这里把对应版本的jar包手工下载下来

image.png

第二种方式就是用脚本:

# vim bin/download_datasource.sh  #还是那个mvnw命令那个ssl证书报错我没有解决所以换成mvn的方式了

    #"$SEATUNNEL_WEB_HOME"/mvnw dependency:get -DgroupId=org.apache.seatunnel -DartifactId="$i" -Dversion="$version" -Ddest="$DATASOURCE_DIR" 
    mvn dependency:get -DgroupId=org.apache.seatunnel -DartifactId="$i" -Dversion="$version" -Ddest="$DATASOURCE_DIR"

# sh bin/download_datasource.sh 1.0.2  #搞不懂web-1.0.2脚本里面的version=1.0.0,要么修改脚本变量要么传参

# for num in `find ~/.m2/repository/org/apache/seatunnel/|grep datasource|grep '.jar$'|grep "1.0.2"`;do rsync -avz $num datasource/;done

做了上述操作后,我们重启下apache-seatunnel-web-1.0.2服务再来查看下页面:

image.png

#从上面的截图可以看到数据源页面不再报错了,当然也可以再查下日志没有异常报错了。

2.3 源码包安装

#如果有条件的话还是建议进行源码包安装,安装出来的插件都比较全,比二进制包各种报错然后花精力解决要好

#https://github.com/apache/seatunnel-web   #git地址

image.png

#官网给出的版本对应表,保险期间还是用2.3.8吧,我看web的libs目录下面关于seatunnel的都是2.3.8的

#wget https://downloads.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-src.tar.gz

#tar xf apache-seatunnel-web-1.0.2-src.tar.gz

#cd apache-seatunnel-web-1.0.2-src

# vim build.sh

  #/bin/sh $WORKDIR/mvnw clean package -DskipTests -Pci
  mvn clean package -DskipTests -Pci

#mvn spotless:apply

# sh build.sh code  #正常没有报错的话二进制包就打包了

image.png

# cp seatunnel-web-dist/target/apache-seatunnel-web-1.0.2.tar.gz  /opt/soft/  

# cd /opt/soft/

# tar xf apache-seatunnel-web-1.0.2.tar.gz

# vim conf/application.yml

datasource下面的数据库信息修改成实际的.jwt下面的secretKey要填一个比如mysecretkeyforseatunnelweb2025111

# cp /opt/soft/apache-seatunnel-2.3.8/config/hazelcast-client.yaml /opt/soft/apache-seatunnel-web-1.0.2/conf/

# cp /opt/soft/apache-seatunnel-2.3.8/connectors/plugin-mapping.properties /opt/soft/apache-seatunnel-web-1.0.2/conf/

# vim script/seatunnel_server_env.sh  #修改成实际的数据库信息

# sh /opt/soft/apache-seatunnel-web-1.0.2/script/init_sql.sh  #进行数据库表的初始化

# vim bin/seatunnel-backend-daemon.sh   #加一条环境变量,不然启动报错找不到seatunnel

set -
SEATUNNEL_HOME=/opt/soft/apache-seatunnel-2.3.8

# sh bin/seatunnel-backend-daemon.sh start

博文来自:www.51niux.com

三、数据库表同步的一些简单示例

#现在基础环境搭建完毕了,现在让我们来一些简单的数据同步

3.1 命令行单表同步

#先来一个最简单的示例,从A库同步完整表到B库

# vim job/test.conf 

# 定义运行时环境
env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        username = "seatunnel"
        password = "seatunnel"
        query = "select * from role"
    }
}

transform {
    # 如果您想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的转换插件列表,
    # 请访问 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    jdbc {
        url = "jdbc:mysql://192.168.1.111:3306/seatunnel_bak?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "seatunnel"
        password = "seatunnel"
        database = "seatunnel_bak" 
        table = "role"
        generate_sink_sql = true  #自动生成插入语句
        batch_insert = true  # 开启批量插入(提升性能)
        batch_size = 1000    # 批量大小(可选,默认500)
        }
    #如果你想了解更多关于如何配置seatunnel的信息,并查看完整的sink插件列表,
    #请前往https://seatunnel.apache.org/docs/connector-v2/sink
}

#特别注意目标数据库是要存在的,不然数据同步不过去会有报错提示:Caused by: java.sql.SQLSyntaxErrorException: Unknown database 'seatunnel_bak'

# ./bin/seatunnel.sh --config ./job/test.conf -m local

image.png

#从上图的数据库插入情况来看,也可以看出来我们只要库存在,表就能自动创建并自动插入数据

第一点:仅同步数据不同步表结构

好了,那么有一个问题,如果再执行一遍这个脚本会怎么样呢?

image.png

#从上图可以看出同样的数据重复insert插入了,明明源表id是主键啊,但是sink表没有这个主键的设置才会这样。Seatunnel JDBC Sink 的核心特性 ——Seatunnel 仅负责 “数据同步”,不负责 “表结构同步 / 创建”,包括主键、索引、字段类型、约束等表结构信息,都不会自动同步 / 创建,这是导致主键未同步的根本原因。就用推荐方法提前手工创建目标表。

#这时候你执行脚本第一遍可以,第二遍就会报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'

#直接说结论不贴图片了,一旦设置了主键,只要主键冲突,后面的insert插入也就不会再执行,也就是说这种同步操作适用于新表的全量同步

第二点:绕过主键冲突报错,实现增删改效果

#vim job/test.conf  #注意不仅是追加插入数据哦可自行测试效果

# 定义运行时环境
env {
  parallelism = 4
  job.mode = "BATCH"
}

source {
    Jdbc {
        url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        username = "seatunnel"
        password = "seatunnel"
        query = "select * from role"
    }
}

transform {
    # 无需转换
}

sink {
    jdbc {
        url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "seatunnel"
        password = "seatunnel"
        database = "seatunnel" 
        table = "role"
        
        # 核心:主键冲突时跳过,不报错
        skip_on_conflict = true
        # 指定主键字段(需与目标表主键一致)
        primary_keys = ["id"]
        
        # 保留批量插入提升性能
        generate_sink_sql = true
        batch_insert = true
        batch_size = 1000
        
        # 可选:开启错误重试(进一步避免偶发报错)
        max_retries = 3
    }
}

#剩下的多表,指定字段同步就不写示例了,网上一查就查到了

博文来自:www.51niux.com

四、Mysql CDC模式

#前面介绍的都是命令行模式,如果你不需要实时数据同步可以采用上面那种定时任务的方式,但是如果想要类似于主从数据同步那种方式就要用到CDC模式了

#MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。

4.1 创建mysql用户

#在源库创建有指定权限的用户

mysql> CREATE USER 'source-cdc'@'%' IDENTIFIED BY 'source-cdc';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'source-cdc'@'%' IDENTIFIED BY 'source-cdc';
#SELECT是查询权限、RELOAD重载/刷新权限(允许执行FLUSH类命令如FLUSH PRIVILEGES、FLUSH BINARY LOGS,CDC 需刷新binlog元信息)
#SHOW DATABASES是查询数据库列表权限,REPLICATION SLAVE是复制从库权限(允许用户模拟MySQL从库,读取主库的binlog,CDC核心获取数据变更日志)
#REPLICATION CLIENT是复制客户端权限,允许执行SHOW MASTER STATUS/SHOW SLAVE STATUS,获取binlog位点(文件名+偏移量),CDC 需定位同步起点。    
mysql> FLUSH PRIVILEGES;

4.2 启动mysql的binlog

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');

+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)

#如果与上述结果不一致,请使用以下属性配置MySQL服务器配置文件($MYDateTimeHOME/MySQL.cnf),如下表所示:

server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 10
binlog_format     = row
# mysql 5.6+ requires binlog_row_image to be set to FULL
binlog_row_image  = FULL

# enable gtid mode
# mysql 5.6+ requires gtid_mode to be set to ON
gtid_mode = on
enforce_gtid_consistency = on

#上面只是官方的实例,server-id没必要一致啊

# /etc/init.d/mysqld restart

4.3 记录下相关参数

#官方文档:https://seatunnel.apache.org/zh-CN/docs/2.3.12/connector-v2/source/MySQL-CDC

下载相关依赖包,不然会启动报错:

# ls  plugins/*.jar  #有下面三个jar包就行

plugins/connector-cdc-base-2.3.12.jar  plugins/connector-cdc-mysql-2.3.12.jar  plugins/connector-jdbc-2.3.12.jar

先来一个-m local方式的数据同步:

#vim job/cdc_test.conf

env {
  #本地模式仅支持单并行度
  parallelism = 1
  #流式同步模式(CDC必需)
  job.mode = "STREAMING"
  #检查点间隔
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&allowPublicKeyRetrieval=true"
    #CDC同步账号(需有binlog读取权限)
    username = "source-cdc"
    password = "source-cdc"
    table-names = ["seatunnel.role"]
    #唯一server-id(避免与MySQL自身server-id冲突)
    server-id = 5401
    #启动模式:先全量同步,再增量同步
    startup.mode = "initial"
    #可选:指定binlog起始位置(避免找不到binlog,需替换为你的实际值)
    #先执行 show master status; 获取File和Position后填写
    startup.specific.offset.file = "mysql-bin.000001"
    startup.specific.offset.pos = 4
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useSSL=false"
    driver = "com.mysql.cj.jdbc.Driver"
    username = "seatunnel"
    password = "seatunnel"
    #自动生成写入SQL
    generate_sink_sql = true
    database = "seatunnel"
    #目标表名
    table = "role"
    # 键字段(保障更新/删除同步)
    primary_keys = ["id"]
    #关闭XA事务(2.3.12本地模式XA有Bug)
    is_exactly_once = false
    #关闭批量写入(避免静默失败)
    batch_size = 1
    #开启调试日志,排查错误的时候使用
    #log.enabled = true
    #log.level = "DEBUG"
    #retry_times = 3
    #retry_interval = 1000
  }
}

 # ./bin/seatunnel.sh --config job/cdc_test.conf -m local    #这是一个常驻进程了就不是一次性的了,但是是前台启动,可以放到后台并将输入写入到一个日志文件

#如果目的库跟源库数据不一致,只在第一次启动的时候会把目标库的数据更新过来,这个进程常态的可以实现源库数据往目标库的插入和删除和update动作触发同步。

#注意这种方式用# bin/seatunnel.sh --list是查不到在运行任务的,只有使用集群模式才可以

再来2.3.12版本的cdc调用集群方式表同步:

#这就需要#bin/seatunnel-cluster.sh start --daemon 启动服务并确保5801端口是启动的

#vim job/cluster_cdc_job.conf

env {
  #集群并行度
  execution.parallelism = 2                
  job.mode = "STREAMING"
  checkpoint.interval = 10000
  #Engine集群配置
  seatunnel.engine.job.mode = "cluster"
  seatunnel.engine.server.address = "127.0.0.1:5801"
  seatunnel.engine.job.heartbeat.interval = 30000
  #自定义Job Name(支持中文/英文,建议包含业务+表名)
  job.name = "MySQL_CDC_同步_seatunnel_role表"
  #任务失败自动重启(避免变成CANCELED)
  seatunnel.engine.job.restart.count = 3
  seatunnel.engine.job.restart.interval = 5000
  #任务超时时间(避免长时间卡壳)
  seatunnel.engine.job.timeout = 3600000
}

source {
  MySQL-CDC {
    url = "jdbc:mysql://192.168.1.110:3306/seatunnel?useSSL=false&allowPublicKeyRetrieval=true"
    username = "source-cdc"
    password = "source-cdc"
    #数组格式表名(集群模式不支持单字符串)
    table-names = ["seatunnel.role"]
    #集群内唯一(避免与其他节点冲突)
    server-id = 5401
    #首次启动全量+后续增量同步                       
    startup.mode = "initial"               
    #集群模式优化:增大binlog读取缓存
    debezium.connector.properties.max.queue.size = 8192
    debezium.connector.properties.max.batch.size = 2048
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://192.168.1.111:3306/seatunnel?useSSL=false"
    driver = "com.mysql.cj.jdbc.Driver"
    username = "seatunnel"
    password = "seatunnel"
    generate_sink_sql = true
    database = "seatunnel"
    table = "role"
    primary_keys = ["id"]
    #集群模式关闭XA(避免分布式事务问题)
    is_exactly_once = false
    #集群模式恢复批量(提升性能)                
    batch_size = 50
    #批量刷新间隔                        
    batch_flush_interval = 1000            
    #集群模式重试
    retry_times = 5
    retry_interval = 2000
    #全字段同步
    upsert_all_fields = true
    #连接池(集群模式避免连接耗尽)
    connection_pool {
      type = "hikari"
      hikaricp {
        maximum-pool-size = 10
        minimum-idle = 2
        connection-timeout = 30000
      }
    }
  }
}

# ./bin/seatunnel.sh --config job/cluster_cdc_job.conf -m cluster   #使用集群模式启动服务,一样首次启动的时候会把目标库的数据更新到跟源库表一致

# bin/seatunnel.sh --list  #查看再运行的

image.png

再来2.3.8版本的cdc调用集群方式表同步:

# vim job/cluster_cdc_job.conf

env {
  #并行度
  execution.parallelism = 2
  #流模式(CDC必须用STREAMING)
  job.mode = "STREAMING"
  #检查点间隔
  checkpoint.interval = 10000
  #集群模式相关
  seatunnel.engine.job.mode = "cluster"
  seatunnel.engine.server.address = "127.0.0.1:5801"
  seatunnel.engine.job.heartbeat.interval = 30000
  #任务名称
  job.name = "MySQL_CDC_同步_seatunnel_role表"
  #失败重启配置
  seatunnel.engine.job.restart.count = 3
  seatunnel.engine.job.restart.interval = 5000
  seatunnel.engine.job.timeout = 3600000
  #基础配置(避免类加载冲突)
  job.type = "STREAM"
  checkpoint.storage = "MEMORY"
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://192.168.1.110:3306/seatunnel?serverTimezone=Asia/Shanghai&useSSL=false"
    username = "source-cdc"
    password = "source-cdc"
    database-name = "seatunnel"
    table-names = ["seatunnel.role"]
    #server-id必须唯一(避免和其他CDC任务冲突,建议范围5000-65535)
    server-id = 5401
    #启动模式:initial(全量+增量)、latest-offset(仅增量)
    startup.mode = "initial"
    #时区
    server-time-zone = "Asia/Shanghai"
    #binlog消费性能配置
    max.queue.size = 8192
    max.batch.size = 2048
    #批量读取超时
    poll.interval.ms = 1000
    
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://192.168.1.111:3306/seatunnel"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "seatunnel"
    password = "seatunnel"
    generate_sink_sql = true
    database = seatunnel
    table = role
    primary_keys = ["id"]
  }
}

# ./bin/seatunnel.sh --cluster seatunnel --config job/cluster_cdc_job.conf  #注意--cluster seatunnel 就是对应hazelcast.yaml中的cluster-name

#官网中文文档讲的很清楚,详细的可参照官网文档,集群的模式啊监控啊之类的可以看官方文档: https://seatunnel.apache.org/zh-CN/docs/2.3.12/seatunnel-engine/about

作者:忙碌的柴少 分类:SeaTunnel 浏览:39 评论:0
留言列表
发表评论
来宾的头像