news 2026/4/23 12:52:10

Flink SQL 性能调优MiniBatch、两阶段聚合、Distinct 拆分、MultiJoin 与 Delta Join 一文打通

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL 性能调优MiniBatch、两阶段聚合、Distinct 拆分、MultiJoin 与 Delta Join 一文打通

1. 为什么 Flink SQL 会慢:状态与放大效应

Flink Table/SQL 的性能瓶颈高频出现在两类算子:

1)聚合(Group Aggregation / Window TVF Aggregation)
默认逐条处理:读 state → 更新 accumulator → 写回 state。RocksDB 场景下尤其“读写上瘾”,数据倾斜时还会出现热点 key,轻松 backpressure。

2)常规 Join(Regular Join)
同样逐条处理:查对侧 state → 更新本侧 state → 产出 join 结果。多表级联 join 时,中间结果会“记录放大”,state 变大、反查更慢、checkpoint 更重,作业稳定性直线下降。

接下来我们用一组优化把这两个痛点逐个拆掉。

2. MiniBatch 聚合:把“每条一次状态读写”变成“一批一次”

2.1 核心思想

MiniBatch 聚合会先把输入缓存到算子内部 buffer,触发时再批量处理。同一个 key 在一个 batch 内可以被折叠,状态访问从“每条一次”降低到“每个 key 一次”。

收益
显著减少 state 访问次数,提高吞吐,尤其是 RocksDB StateBackend。

代价
会引入额外延迟(因为要攒一批再算),吞吐与延迟的典型 trade-off。

2.2 开启方式(Java 配置)

// instantiate table environmentTableEnvironmenttEnv=...;// access flink configurationTableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");// 开启 mini-batchconfiguration.set("table.exec.mini-batch.allow-latency","5 s");// 允许缓存 5sconfiguration.set("table.exec.mini-batch.size","5000");// 每个 task 缓存最大记录数

2.3 一个经验值怎么选

  • allow-latency:先按 1s~5s 试,目标是“吞吐明显上升但业务还能接受延迟”
  • size:按单条记录大小与并发估算内存,5000/10000 常见,越大越容易提升吞吐但越吃内存、延迟越高
  • RocksDB 场景通常更值得开(state 读写成本更高)

2.4 Window TVF Aggregation 的特殊点

Window TVF 聚合默认总是开启 MiniBatch,而且它使用托管内存(managed memory)缓存,不走 JVM 堆,GC/OOM 风险小一些。Group Aggregation 则需要你显式开启。

3. Local-Global 两阶段聚合:专治数据倾斜与热点 key

3.1 为什么“两阶段”能治倾斜

GROUP BY的 key 倾斜时(比如某个 color/day 的数据量巨大),某些聚合实例会变成热点。两阶段聚合把聚合拆成:

  • Local 聚合:上游先在本地做一次预聚合(类似 MapReduce 的 Combine)
  • Global 聚合:下游再把各个 local 的 accumulator 合并

这样网络 shuffle 的数据量减少了,state 访问也减少了,热点压力被分摊。

3.2 注意:它依赖 MiniBatch

Local-Global 的“攒一波再合并”依赖 MiniBatch 的触发节奏,所以必须先开启 MiniBatch。

3.3 开启方式(Java 配置)

TableEnvironmenttEnv=...;TableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");configuration.set("table.exec.mini-batch.allow-latency","5 s");configuration.set("table.exec.mini-batch.size","5000");// 两阶段聚合:TWO_PHASEconfiguration.set("table.optimizer.agg-phase-strategy","TWO_PHASE");

3.4 适用场景

  • SUM / COUNT / MAX / MIN / AVG 等普通聚合 + 明显倾斜
  • 需要降低 shuffle 与 RocksDB state 读写

不太适用

  • DISTINCT 聚合(下一节讲)

4. Split Distinct Aggregation:让 COUNT(DISTINCT) 也能水平扩展

4.1 为什么 DISTINCT 聚合难搞

COUNT(DISTINCT user_id)如果 user_id 很稀疏,Local-Global 并不能有效减少数据:local accumulator 里几乎还是“原始全集”,全压到 global 上,global 仍是瓶颈。

4.2 解决思路:加一个 bucket key

把 distinct 拆成两层:

第一层:按group key + bucket key聚合
bucket key 由MOD(HASH_CODE(distinct_key), BUCKET_NUM)得到,默认 BUCKET_NUM=1024,可配置。

第二层:按原 group key 再聚合,把各 bucket 的结果 SUM 起来。

等价性
同一个 distinct_key 会落在同一个 bucket,去重逻辑不变,但热点被 1024 个桶分摊,吞吐更稳定。

4.3 自动改写示例

原 SQL:

SELECTday,COUNT(DISTINCTuser_id)FROMTGROUPBYday;

开启后会被等价改写为类似:

SELECTday,SUM(cnt)FROM(SELECTday,COUNT(DISTINCTuser_id)AScntFROMTGROUPBYday,MOD(HASH_CODE(user_id),1024))GROUPBYday;

4.4 开启方式

tEnv.getConfig().set("table.optimizer.distinct-agg.split.enabled","true");

可调 bucket 数:

  • table.optimizer.distinct-agg.split.bucket-num

4.5 限制点

目前不支持包含用户自定义 AggregateFunction 的聚合(distinct 拆分无法保证通用等价)。

5. DISTINCT 多维 UV:用 FILTER 替代 CASE WHEN,省 state、提性能

很多人写多维 UV 喜欢这样:

COUNT(DISTINCTCASEWHENflagIN(...)THENuser_idELSENULLEND)

更推荐用标准 SQL 的 FILTER:

SELECTday,COUNT(DISTINCTuser_id)AStotal_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('android','iphone'))ASapp_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('wap','other'))ASweb_uvFROMTGROUPBYday;

关键收益
优化器能识别“同一 distinct_key(user_id)+ 不同 filter 条件”,从而复用状态:原本可能要 3 份 distinct state,现在可以共享一份,state 大小与访问次数都下降,特别适合高基数 UV 指标。

6. MiniBatch Regular Join:减少 state IO + 抑制冗余输出

6.1 常规 Join 的痛点

逐条 join 会导致:

  • 频繁查对侧 state(RocksDB 更慢)
  • 级联 join 时记录放大严重,中间结果爆炸

6.2 MiniBatch Join 做了两件事

1)buffer 内折叠记录:join 前先把同 key 的变更合并,减少参与 join 的数据量
2)尽量抑制冗余结果:buffer 处理时尽可能不输出多余的中间变更

6.3 开启方式(SQL 例子)

SET'table.exec.mini-batch.enabled'='true';SET'table.exec.mini-batch.allow-latency'='5S';SET'table.exec.mini-batch.size'='5000';SELECTa.idASa_id,a.a_content,b.idASb_id,b.b_contentFROMaLEFTJOINbONa.id=b.id;

说明
Regular Join 的 MiniBatch 默认是关闭的,需要显式开启(同聚合一样的三项参数)。

6.4 适用建议

  • Join 两侧是 Upsert / CDC 场景、同 key 变更频繁,buffer 折叠收益巨大
  • 级联 join 的作业,先上 mini-batch join 往往能立刻看到 state 与吞吐改善

7. 多表 Regular Join:MultiJoin(Flink 2.1)把“中间 state”直接砍掉

当你的 SQL 是多表非时态 regular join,最常见的故障模式是:state 越跑越大,checkpoint 越来越慢,作业越来越不稳。

Flink 2.1 引入 MultiJoin Operator,核心目标是:

零中间 state(zero intermediate state)
不再为 join 链上的每个二元 join 存中间结果状态,而是把多个流同时在一个算子里 join,显著减少 state。

7.1 什么时候启用最划算

如果满足两点,就非常值得试:

  • 作业有多个 join,且 join 条件共享至少一个公共 join key(能按同一 key 分区)
  • 你观察到:中间 join 的 state 比输入表 state 还大(典型记录放大链路)

7.2 开启方式

SET'table.optimizer.multi-join.enabled'='true';

7.3 支持与限制要点(很重要)

  • 当前处于实验状态(可能有优化与 breaking change)

  • 目前支持 streaming INNER / LEFT joins

  • RIGHT join 计划支持(但你上线前要以实际版本为准)

  • 分区要求:至少有一条可以把多表一起 partition 的 key

    • 支持:A JOIN B ON A.key = B.key JOIN C ON A.key = C.key
    • 支持:A JOIN B ON A.key = B.key JOIN C ON B.key = C.key(传递性)
    • 不支持:A.key1=B.key1 且 B.key2=C.key2(没有统一 key,会拆成多个 MultiJoin)

7.4 经验结论

  • 记录放大越明显,MultiJoin 越能“越跑越稳”
  • 如果 join 链反而让 state 变小(较少见),二元 join 可能更快,但 MultiJoin 通常 still 更省总 state

8. Delta Join:用“外部索引 + 双向查表”替代“大 state”,稳定性直接起飞

8.1 为什么 Delta Join 能把 state 压下去

传统 regular join 必须把两侧历史数据都存进 Flink state,以确保对侧迟到时还能匹配。

Delta join 的思路是:
不在 Flink state 里囤全量数据,而是借助外部存储系统的索引能力(例如 Apache Fluss 提供索引信息),直接对外部系统做高效索引查询来完成匹配。这样 Flink state 与外部存储之间不会重复存储同一份数据。

效果
state 大幅缩小,checkpoint 压力下降,作业长期运行更稳。

8.2 默认策略

Delta join 默认开启;当满足条件时 regular join 会自动被优化为 delta join。

如需关闭:

SET'table.optimizer.delta-join.strategy'='NONE';

8.3 可调缓存参数(调优入口)

  • table.exec.delta-join.cache-enabled
  • table.exec.delta-join.left.cache-size
  • table.exec.delta-join.right.cache-size

8.4 支持与限制(上线前务必自查)

支持

  • INSERT-only source 表
  • 没有 DELETE 的 CDC source 表
  • delta join 前的 projection / filter
  • 算子内部缓存

限制:出现以下任一情况就不能优化成 delta join

  • 表的 index key 必须在 join 等值条件中
  • 目前只支持 INNER JOIN
  • 下游必须能处理重复变更(例如 UPSERT sink 且没有 upsertMaterialize 时可能不行)
  • CDC 场景:join key 必须是主键的一部分
  • CDC 场景:所有 filter 必须应用在 upsert key 上
  • filter / projection 中不允许非确定性函数

9. 一套“按症下药”的调优落地清单

9.1 你看到 backpressure + RocksDB state 读写很重

优先做

  • 开 MiniBatch(聚合/Join 都考虑)
  • 聚合倾斜明显就上 TWO_PHASE(Local-Global)

9.2 你有 COUNT DISTINCT 且 group key 热点

优先做

  • 开 distinct-agg split(bucket 拆分)
  • UV 多维统计用 FILTER 替换 CASE WHEN,争取共享 state

9.3 你有多表级联 join,state 越跑越大

优先做

  • 开 mini-batch join(先抑制记录放大)
  • Flink 2.1 且满足公共 key 分区条件:尝试 MultiJoin
  • 如果 source 外部系统具备索引能力且满足限制:让 regular join 自动转 delta join(或检查为何没转)

10. 结语:把“默认策略”切到“更适合生产负载的策略”

Flink SQL 的默认执行策略是通用稳妥型,但生产负载往往更偏“状态密集 + 倾斜 + 多表 join + CDC 变更频繁”。你这份调优组合拳的核心路线很清晰:

  • MiniBatch:用吞吐换少量延迟,换来 state IO 大幅下降
  • Local-Global:治倾斜、减 shuffle
  • Distinct 拆分 + FILTER:让 UV 指标也能扩展、还能省 state
  • MiniBatch Join:减少中间结果与冗余输出
  • MultiJoin:多表 join 直接砍中间 state
  • Delta Join:把大 state 变成外部索引查表,长期稳定运行
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 10:01:48

小波变换这玩意儿在图像处理里真是万金油般的存在,今天咱们直接上六个实战案例,手把手用Matlab代码拆解常见玩法。老规矩,边写代码边唠嗑,遇到坑点随时提醒

小波变换及其相关应用,Matlab代码,基于小波塔式分解的图像增强,基于离散小波变换的图像增强,基于小波变换的图像融合,基于小波变换的图像压缩,基于小波变换的数字水印技术,共6个典型案例&#x…

作者头像 李华
网站建设 2026/4/23 10:02:44

基于SpringBoot+Vue的数码产品购物商城的设计与实现

背景与意义 随着互联网技术的快速发展和电子商务的普及,数码产品购物商城已成为消费者购买电子产品的主要渠道之一。传统的线下购物模式存在地域限制、时间成本高等问题,而线上购物商城能够突破这些限制,提供更便捷、高效的购物体验。 Spri…

作者头像 李华
网站建设 2026/4/21 1:55:31

12-Ants(轻量级桌面娱乐工具)

12 - Ants 是由开发者 Nenad Hrg 打造的轻量级桌面娱乐工具,该软件定位为 “办公间隙减压工具”,通过在桌面生成虚拟蚂蚁动画增添趣味性,帮助用户缓解视觉疲劳与精神压力,程序体积仅 16KB,属单文件绿色软件&#xff0c…

作者头像 李华
网站建设 2026/4/11 9:05:55

Logic Pro X专业音频工程导出WAV用于HeyGem

Logic Pro X专业音频工程导出WAV用于HeyGem 在虚拟主播、AI讲师和智能客服日益普及的今天,一段“嘴型对得上、语气自然、声音清晰”的数字人视频,早已不再是炫技demo,而是内容生产链条中的标准输出。而在这背后,真正决定最终呈现质…

作者头像 李华