news 2026/6/16 2:28:44

SeaTunnel 数据采集实战指南(K8S Docker)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SeaTunnel 数据采集实战指南(K8S Docker)

SeaTunnel 数据采集实战指南

概述

本文档提供了一个完整的 SeaTunnel(V2.3.8) 数据采集部署和使用指南,适用于 MongoDB 和 MySQL(RDS)的数据同步场景。通过本文,您将学会如何搭建一个自动化的数据采集系统,实现每日定时的数据同步任务。
其他版本大同小异,实际数据同步配置文档说明以官方文档为准

一、环境准备

1.1 系统要求

环境类型要求
操作系统Linux (x86_64 / ARM64)
Docker20.10+
Kubernetes1.20+(可选)
SeaTunnel2.3.8

1.2 目录结构

seatunnel/ ├── bin/ │ ├── mongo_start.sh# MongoDB任务启动脚本│ ├── rds_start.sh# RDS任务启动脚本│ ├── mongoshell-linux-amd64 │ └── mongoshell-linux-arm64 ├── config/ │ ├── mongo_dynamic.template# MongoDB动态任务配置│ ├── mongo_static.template# MongoDB静态任务配置│ └── rds.template# RDS任务配置├── seatunnel_mongo.yaml# K8s MongoDB部署文件└── seatunnel_rds.yaml# K8s RDS部署文件

二、配置文件详解

2.1 配置模板机制

配置模板使用$(变量名)作为占位符,启动脚本运行时会替换为实际环境变量值。同步脚本语言类型:Hocon

支持的占位符:

占位符说明
$(MONGODB_URI)MongoDB连接字符串
$(MONGODB_DATABASE)MongoDB数据库名称
$(RDS_URI)MySQL JDBC连接前缀
$(RDS_USERNAME)MySQL用户名
$(RDS_PASSWORD)MySQL密码

2.2 MongoDB静态任务配置

用于同步固定的MongoDB集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "skyladder_flowline_logs" result_table_name = "skyladder_flowline_logs" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["skyladder_flowline_logs"] result_table_name = "sub_skyladder_flowline_logs" query = "select projectId as project_id, _id as _id from skyladder_flowline_logs;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_skyladder_flowline_logs"] generate_sink_sql = true database = "metric" table = "metric.sub_skyladder_flowline_logs" primary_keys = ["_id"] } }

2.3 MongoDB动态任务配置

支持按项目ID动态遍历多个集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "3e9d762f34d944c782876ef07723e3ac.npm_allItemData" result_table_name = "npm_allItemData" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_kb_workbench"] generate_sink_sql = true database = "metric" table = "metric.sub_kb_workbench" primary_keys = ["_id"] } }

2.4 RDS任务配置

env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { "result_table_name"=pms_unit_info table_path="portal.pms_unit_info" url="$(RDS_URI)/portal?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" driver = "com.mysql.cj.jdbc.Driver" user="$(RDS_USERNAME)" password="$(RDS_PASSWORD)" } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { Jdbc { "source_table_name"=["pms_unit_info"] "generate_sink_sql"=true database="metric" table="a_pms_unit_info" user="$(RDS_USERNAME)" driver="com.mysql.cj.jdbc.Driver" url= "$(RDS_URI)/metric?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" password="$(RDS_PASSWORD)" schema_save_mode=IGNORE data_save_mode=CUSTOM_PROCESSING custom_sql="truncate table a_pms_unit_info" } }

三、启动脚本编写

3.1 MongoDB启动脚本

#!/bin/bash:"${ARCH:?Error:ARCH not set,use 'x86' or 'arm'}":"${MONGODB_URI:?Error:MONGODB_URI not set}":"${MONGODB_DATABASE:?Error:MONGODB_DATABASE not set}":"${RDS_URI:?Error:RDS_URI not set}":"${RDS_USERNAME:?Error:RDS_USERNAME not set}":"${RDS_PASSWORD:?Error:RDS_PASSWORD not set}"home="/sea"case"$ARCH"inx86)mongoshell="${home}/bin/mongoshell-linux-amd64";;arm)mongoshell="${home}/bin/mongoshell-linux-arm64";;*)echo"Error: Unsupported ARCH:$ARCH";exit1;;esac\cp"${home}/config/mongo_dynamic.template""${home}/config/mongo_dynamic.conf"\cp"${home}/config/mongo_static.template""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_dynamic.conf"mkdir-p"${home}/logs"whiletrue;donow=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)[-z"$tomorrow"]&&tomorrow=$((now-now%86400+86400))sleep_seconds=$((tomorrow-now))echo"Starting daily task:$(date)"/opt/seatunnel/bin/seatunnel.sh--config"${home}/config/mongo_static.conf"-elocal>>"${home}/logs/mongo_static-$(date+%Y%m%d).log"2>&1echo"Waiting$sleep_secondsseconds for next run..."sleep"$sleep_seconds"done

3.2 RDS启动脚本

#!/bin/bash:"${RDS_URI:?错误:环境变量 RDS_URI 未设置}":"${RDS_USERNAME:?错误:环境变量 RDS_USERNAME 未设置}":"${RDS_PASSWORD:?错误:环境变量 RDS_PASSWORD 未设置}"# 定义其他路径(使用环境变量)# 根目录home="/sea"rds_config_file="${home}/config/rds.conf"log_dir="${home}/logs"\cp"${home}/config/rds.template""${home}/config/rds.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g"$rds_config_filesed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g"$rds_config_filesed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g"$rds_config_file# 确保日志目录存在mkdir-p"$log_dir"# 注意:原脚本中的 chmod +x /config/* 可能路径错误,已修正为 ${home}/config/*chmod+x${home}/config/*2>/dev/null# 无限循环,每天0点执行一次任务whiletrue;do# 计算距离下一个0点的秒数now=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)if[-z"$tomorrow"];thenseconds_today=$((now%86400))sleep_seconds=$((86400-seconds_today))elsesleep_seconds=$((tomorrow-now))fiecho"开始执行每日任务:$(date)"# 使用当天日期作为日志文件名(按天分割)today=$(date+%Y%m%d)static_log="${log_dir}/rds-${today}.log"# 将本次执行的开始时间记录到日志(追加)echo"===== 开始执行任务:$(date)=====">>"$static_log"# 执行一次rds任务echo"执行配置文件任务:$rds_config_file"# 使用追加模式 >> 将 seatunnel 输出写入当天日志文件/opt/seatunnel/bin/seatunnel.sh--config"$rds_config_file"-elocal>>"$static_log"2>&1echo"每日任务完成:$(date)"echo"当前时间:$(date),等待$sleep_seconds秒后到达下一个0点..."sleep"$sleep_seconds"done

四、Docker部署

4.1 准备目录

mkdir-p/opt/data/seatunnel/{bin,config}

4.2 启动RDS任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/rds_start.sh

4.3 启动MongoDB任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eARCH="x86"\-eMONGODB_URI="mongodb://user:pass@mongo-host:27017"\-eMONGODB_DATABASE="dbname"\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/mongo_start.sh

五、Kubernetes部署

5.1 Deployment示例-MongoDB

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-mongonamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-mongotemplate:metadata:labels:app:seatunnel-mongospec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:ARCHvalue:"x86"-name:MONGODB_URIvalue:"mongodb://user:pass@mongo-host:27017"-name:MONGODB_DATABASEvalue:"dbname"-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/mongo_start.sh && bash /sea/bin/mongo_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

5.1 Deployment示例-RDS

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-rdsnamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-rdstemplate:metadata:labels:app:seatunnel-rdsspec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/rds_start.sh && bash /sea/bin/rds_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

六、常见问题

问题解决方案
脚本换行符错误sed -i 's/\r$//' script.sh
连接失败检查网络和认证信息
占位符未替换确认环境变量正确传递
mongo客户端可替换为本地客户端

七、参考链接

  • SeaTunnel官方文档
  • SeaTunnel GitHub
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/16 2:26:50

文档自动化操作系统:规则驱动的PDF生成与出版流水线

1. 项目概述:当模板不再是“套壳”,而是一套可执行的文档操作系统你有没有过这种体验:手头有一篇写得不错的行业分析,想快速变成一份体面的PDF报告发给客户,结果打开Word或InDesign,光是调封面字体、对齐目…

作者头像 李华
网站建设 2026/6/16 2:23:50

如何选择适合公司项目的UI设计工具?企业选型指南

一、工具太多,反而不知道怎么选 据 UX Tools Spring 2026 调研报告(1,478名设计师参与),53% 的团队将"可评估工具太多"列为工作流最大障碍。今天的设计工具市场已不是"好工具稀缺"的时代,而是&qu…

作者头像 李华
网站建设 2026/6/16 2:19:57

从 Dubbo+ZK 到 Nacos:注册中心深度拆解

从 DubboZK 到 Nacos:注册中心深度拆解这篇文章把 Dubbo 服务发现、ZK 注册中心、CAP 理论、脑裂防护到 Nacos 迁移的完整思考链路整理出来,希望能帮到同样在使用 Dubbo ZK 的同学。一、三个组件的关系:Spring Boot 是地基,Dubbo…

作者头像 李华
网站建设 2026/6/16 2:18:54

企业级AI接口网关架构重构:从单体到微服务的性能优化方案

企业级AI接口网关架构重构:从单体到微服务的性能优化方案 【免费下载链接】new-api A unified AI model hub for aggregation & distribution. It supports cross-converting various LLMs into OpenAI-compatible, Claude-compatible, or Gemini-compatible fo…

作者头像 李华
网站建设 2026/6/16 2:18:53

AntiDupl终极指南:5步快速清理电脑中的重复图片

AntiDupl终极指南:5步快速清理电脑中的重复图片 【免费下载链接】AntiDupl A program to search similar and defect pictures on the disk 项目地址: https://gitcode.com/gh_mirrors/an/AntiDupl 你是否经常为电脑中堆积如山的重复图片而烦恼?照…

作者头像 李华
网站建设 2026/6/16 2:18:52

一文吃透 NVIDIA PhysX 物理引擎:原理、架构、核心组件与实战应用

目录 前言 一、PhysX 基础概述与发展背景 1.1 什么是 PhysX 1.2 发展历程与生态 1.3 主流应用场景 二、PhysX 核心设计思想与基础物理模型 2.1 离散时间步模拟(核心关键) 2.2 数值精度选择 2.3 物理世界基本规则约束 三、PhysX 整体架构与核心…

作者头像 李华