SeaTunnel 数据采集实战指南
概述
本文档提供了一个完整的 SeaTunnel(V2.3.8) 数据采集部署和使用指南,适用于 MongoDB 和 MySQL(RDS)的数据同步场景。通过本文,您将学会如何搭建一个自动化的数据采集系统,实现每日定时的数据同步任务。
其他版本大同小异,实际数据同步配置文档说明以官方文档为准
一、环境准备
1.1 系统要求
| 环境类型 | 要求 |
|---|---|
| 操作系统 | Linux (x86_64 / ARM64) |
| Docker | 20.10+ |
| Kubernetes | 1.20+(可选) |
| SeaTunnel | 2.3.8 |
1.2 目录结构
seatunnel/ ├── bin/ │ ├── mongo_start.sh# MongoDB任务启动脚本│ ├── rds_start.sh# RDS任务启动脚本│ ├── mongoshell-linux-amd64 │ └── mongoshell-linux-arm64 ├── config/ │ ├── mongo_dynamic.template# MongoDB动态任务配置│ ├── mongo_static.template# MongoDB静态任务配置│ └── rds.template# RDS任务配置├── seatunnel_mongo.yaml# K8s MongoDB部署文件└── seatunnel_rds.yaml# K8s RDS部署文件二、配置文件详解
2.1 配置模板机制
配置模板使用$(变量名)作为占位符,启动脚本运行时会替换为实际环境变量值。同步脚本语言类型:Hocon
支持的占位符:
| 占位符 | 说明 |
|---|---|
$(MONGODB_URI) | MongoDB连接字符串 |
$(MONGODB_DATABASE) | MongoDB数据库名称 |
$(RDS_URI) | MySQL JDBC连接前缀 |
$(RDS_USERNAME) | MySQL用户名 |
$(RDS_PASSWORD) | MySQL密码 |
2.2 MongoDB静态任务配置
用于同步固定的MongoDB集合:
env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "skyladder_flowline_logs" result_table_name = "skyladder_flowline_logs" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["skyladder_flowline_logs"] result_table_name = "sub_skyladder_flowline_logs" query = "select projectId as project_id, _id as _id from skyladder_flowline_logs;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_skyladder_flowline_logs"] generate_sink_sql = true database = "metric" table = "metric.sub_skyladder_flowline_logs" primary_keys = ["_id"] } }2.3 MongoDB动态任务配置
支持按项目ID动态遍历多个集合:
env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "3e9d762f34d944c782876ef07723e3ac.npm_allItemData" result_table_name = "npm_allItemData" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_kb_workbench"] generate_sink_sql = true database = "metric" table = "metric.sub_kb_workbench" primary_keys = ["_id"] } }2.4 RDS任务配置
env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { "result_table_name"=pms_unit_info table_path="portal.pms_unit_info" url="$(RDS_URI)/portal?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" driver = "com.mysql.cj.jdbc.Driver" user="$(RDS_USERNAME)" password="$(RDS_PASSWORD)" } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { Jdbc { "source_table_name"=["pms_unit_info"] "generate_sink_sql"=true database="metric" table="a_pms_unit_info" user="$(RDS_USERNAME)" driver="com.mysql.cj.jdbc.Driver" url= "$(RDS_URI)/metric?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" password="$(RDS_PASSWORD)" schema_save_mode=IGNORE data_save_mode=CUSTOM_PROCESSING custom_sql="truncate table a_pms_unit_info" } }三、启动脚本编写
3.1 MongoDB启动脚本
#!/bin/bash:"${ARCH:?Error:ARCH not set,use 'x86' or 'arm'}":"${MONGODB_URI:?Error:MONGODB_URI not set}":"${MONGODB_DATABASE:?Error:MONGODB_DATABASE not set}":"${RDS_URI:?Error:RDS_URI not set}":"${RDS_USERNAME:?Error:RDS_USERNAME not set}":"${RDS_PASSWORD:?Error:RDS_PASSWORD not set}"home="/sea"case"$ARCH"inx86)mongoshell="${home}/bin/mongoshell-linux-amd64";;arm)mongoshell="${home}/bin/mongoshell-linux-arm64";;*)echo"Error: Unsupported ARCH:$ARCH";exit1;;esac\cp"${home}/config/mongo_dynamic.template""${home}/config/mongo_dynamic.conf"\cp"${home}/config/mongo_static.template""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_dynamic.conf"mkdir-p"${home}/logs"whiletrue;donow=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)[-z"$tomorrow"]&&tomorrow=$((now-now%86400+86400))sleep_seconds=$((tomorrow-now))echo"Starting daily task:$(date)"/opt/seatunnel/bin/seatunnel.sh--config"${home}/config/mongo_static.conf"-elocal>>"${home}/logs/mongo_static-$(date+%Y%m%d).log"2>&1echo"Waiting$sleep_secondsseconds for next run..."sleep"$sleep_seconds"done3.2 RDS启动脚本
#!/bin/bash:"${RDS_URI:?错误:环境变量 RDS_URI 未设置}":"${RDS_USERNAME:?错误:环境变量 RDS_USERNAME 未设置}":"${RDS_PASSWORD:?错误:环境变量 RDS_PASSWORD 未设置}"# 定义其他路径(使用环境变量)# 根目录home="/sea"rds_config_file="${home}/config/rds.conf"log_dir="${home}/logs"\cp"${home}/config/rds.template""${home}/config/rds.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g"$rds_config_filesed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g"$rds_config_filesed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g"$rds_config_file# 确保日志目录存在mkdir-p"$log_dir"# 注意:原脚本中的 chmod +x /config/* 可能路径错误,已修正为 ${home}/config/*chmod+x${home}/config/*2>/dev/null# 无限循环,每天0点执行一次任务whiletrue;do# 计算距离下一个0点的秒数now=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)if[-z"$tomorrow"];thenseconds_today=$((now%86400))sleep_seconds=$((86400-seconds_today))elsesleep_seconds=$((tomorrow-now))fiecho"开始执行每日任务:$(date)"# 使用当天日期作为日志文件名(按天分割)today=$(date+%Y%m%d)static_log="${log_dir}/rds-${today}.log"# 将本次执行的开始时间记录到日志(追加)echo"===== 开始执行任务:$(date)=====">>"$static_log"# 执行一次rds任务echo"执行配置文件任务:$rds_config_file"# 使用追加模式 >> 将 seatunnel 输出写入当天日志文件/opt/seatunnel/bin/seatunnel.sh--config"$rds_config_file"-elocal>>"$static_log"2>&1echo"每日任务完成:$(date)"echo"当前时间:$(date),等待$sleep_seconds秒后到达下一个0点..."sleep"$sleep_seconds"done四、Docker部署
4.1 准备目录
mkdir-p/opt/data/seatunnel/{bin,config}4.2 启动RDS任务
dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/rds_start.sh4.3 启动MongoDB任务
dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eARCH="x86"\-eMONGODB_URI="mongodb://user:pass@mongo-host:27017"\-eMONGODB_DATABASE="dbname"\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/mongo_start.sh五、Kubernetes部署
5.1 Deployment示例-MongoDB
apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-mongonamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-mongotemplate:metadata:labels:app:seatunnel-mongospec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:ARCHvalue:"x86"-name:MONGODB_URIvalue:"mongodb://user:pass@mongo-host:27017"-name:MONGODB_DATABASEvalue:"dbname"-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/mongo_start.sh && bash /sea/bin/mongo_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea5.1 Deployment示例-RDS
apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-rdsnamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-rdstemplate:metadata:labels:app:seatunnel-rdsspec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/rds_start.sh && bash /sea/bin/rds_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea六、常见问题
| 问题 | 解决方案 |
|---|---|
| 脚本换行符错误 | sed -i 's/\r$//' script.sh |
| 连接失败 | 检查网络和认证信息 |
| 占位符未替换 | 确认环境变量正确传递 |
| mongo客户端 | 可替换为本地客户端 |
七、参考链接
- SeaTunnel官方文档
- SeaTunnel GitHub