news 2026/4/23 12:23:52

PyFlink Table API / DataStream API / UDF(含 Pandas)/ 依赖管理 / 调试与性能模式 / CEP / State Processor / Av

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink Table API / DataStream API / UDF(含 Pandas)/ 依赖管理 / 调试与性能模式 / CEP / State Processor / Av

1. PyFlink Table API 数据类型与 Python 映射

在 Table 生态里,DataType 描述的是逻辑类型,不代表具体存储/传输的物理格式。你在 Python Table API 里会通过pyflink.table.types.DataTypes来声明 UDF 输入输出类型。

1.1 逻辑类型与 Python / Pandas 的对应关系

核心映射(节选你提供的表):

  • BOOLEANbool(Pandas:numpy.bool_
  • INT / BIGINTint(Pandas:numpy.int32 / numpy.int64
  • FLOAT / DOUBLEfloat(Pandas:numpy.float32 / numpy.float64
  • VARCHARstr
  • VARBINARYbytes
  • DECIMALdecimal.Decimal
  • DATE/TIME/TIMESTAMPdatetime.*
  • ARRAYlist(Pandas:numpy.ndarray
  • ROWRow(或在某些场景映射为dict

这里最容易踩的点是:你在 UDF 声明了result_type运行时返回值必须与声明匹配,否则会出现序列化/类型推断异常,尤其是在混用 SQL/Table API 与 Pandas UDF 时更明显。

2. Table API 的 Python UDF:普通 UDF 与向量化(Pandas)UDF

PyFlink 支持两类 UDF:

  • 逐行 UDF(general Python UDF):一行一行处理
  • 向量化 UDF(vectorized / Pandas UDF):一批一批(Arrow 列式)在 JVM 与 Python 间传输,通常吞吐显著更高

2.1 UDF 打包与分发(强烈建议)

非本地模式(YARN、Standalone、K8s)运行 UDF,推荐用:

  • 配置:python-files/python.files
  • 或代码:table_env.add_python_file(...)

否则很常见的错误是:

  • ModuleNotFoundError: No module named 'my_udf'

2.2 在 UDF 里一次性加载资源:重写 open()

适合加载模型、词典、规则库等“只加载一次、重复 eval”的资源。

frompyflink.table.udfimportScalarFunction,udffrompyflink.tableimportDataTypesclassPredict(ScalarFunction):defopen(self,function_context):importpicklewithopen("resources.zip/resources/model.pkl","rb")asf:self.model=pickle.load(f)defeval(self,x):returnself.model.predict(x)predict=udf(Predict(),result_type=DataTypes.DOUBLE(),func_type="pandas")

2.3 读取全局 Job 参数:FunctionContext

frompyflink.table.udfimportScalarFunction,udffrompyflink.tableimportDataTypesfrompyflink.table.functionsimportFunctionContextclassHashCode(ScalarFunction):defopen(self,function_context:FunctionContext):self.factor=int(function_context.get_job_parameter("hashcode_factor","12"))defeval(self,s:str):returnhash(s)*self.factor hash_code=udf(HashCode(),result_type=DataTypes.INT())

并在作业侧设置:

t_env.get_config().set('pipeline.global-job-parameters','hashcode_factor:31')

2.4 单元测试 UDF:取出原始函数

add=udf(lambdai,j:i+j,result_type=DataTypes.BIGINT())f=add._funcassertf(1,2)==3

2.5 向量化(Pandas)UDF 与 UDAF

向量化的关键点:

  • JVM ↔ Python 使用Arrow 列式批量传输
  • python.fn-execution.arrow.batch.size控制 batch 大小
  • 对于 Pandas UDAF:不支持 partial aggregation,同一 group/window 的数据可能一次性加载到内存,需要评估内存峰值

向量化 Scalar UDF 示例:

frompyflink.table.udfimportudf@udf(result_type='BIGINT',func_type="pandas")defadd(i,j):returni+j

向量化 UDAF 示例:

frompyflink.table.udfimportudaf@udaf(result_type='FLOAT',func_type="pandas")defmean_udaf(v):returnv.mean()

3. Table 与 Pandas DataFrame 的互转:Arrow 通道

3.1 Pandas → Table

importpandasaspdimportnumpyasnpfrompyflink.tableimportDataTypes pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf)table=t_env.from_pandas(pdf,['f0','f1'])table=t_env.from_pandas(pdf,[DataTypes.DOUBLE(),DataTypes.DOUBLE()])table=t_env.from_pandas(pdf,DataTypes.ROW([DataTypes.FIELD("f0",DataTypes.DOUBLE()),DataTypes.FIELD("f1",DataTypes.DOUBLE())]))

3.2 Table → Pandas(注意内存)

to_pandas()会把结果 collect 到 client 侧内存,建议先limit()

pdf=table.limit(100).to_pandas()

4. PyFlink Metrics:在 UDF 中注册 Counter/Gauge/Distribution/Meter

在 UDF 的open()中通过function_context.get_metric_group()拿到指标组:

  • Counter:inc()/dec()
  • Gauge:回调取值(只支持 int)
  • Distribution:sum/count/min/max/mean(只支持 int)
  • Meter:吞吐速率(mark_event)

Counter 示例:

frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):defopen(self,function_context):self.counter=function_context.get_metric_group().counter("my_counter")defeval(self,i):self.counter.inc(i)returni

5. Connector 使用方式(PyFlink 侧重点)

5.1 先把 connector/format JAR 加进来

table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

5.2 推荐用 DDL 定义源表/结果表

Kafka + JSON 示例(你提供的结构):

source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """

然后:

t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()

5.3 预置 Source/Sink

  • from_elements():从集合构建表
  • from_pandas()/to_pandas():与 Pandas 互转
  • 自定义 source/sink:当前仍需 Java/Scala 实现,再用 TableFactory 暴露给 DDL

6. Python DataStream API:结构、类型系统与算子

DataStream API 更贴近底层流处理,适合状态、窗口、复杂事件、低层控制。

6.1 程序骨架:env → source → transform → sink → execute

你提供的示例里展示了 state 的访问方式:

  • open()中用RuntimeContext.get_state(...)注册/获取状态
  • map()中读取/更新状态

6.2 类型(Types)与 Pickle 序列化

如果你不指定type_info/output_type,默认Types.PICKLED_BYTE_ARRAY(),用 pickle 做序列化,通用但慢。

什么时候必须指定类型:

  1. Python 记录要交给 Java 算子/connector(比如 FileSink)
  2. 追求更好的序列化性能
  3. DataStream ↔ Table转换通常要求明确的复合类型

FileSink 场景(你提供的关键用法):

env.from_collection([(1,'aaa'),(2,'bbb')])\.map(lambdar:(r[0]+1,r[1].upper()),output_type=Types.ROW([Types.INT(),Types.STRING()]))\.add_sink(FileSink.for_row_format('/tmp/output',Encoder.simple_string_encoder()).build())

6.3 常见算子与函数定义方式

  • 实现接口类(如MapFunction
  • 直接 lambda
  • 普通 Python 函数

注意:ConnectedStream.map()/flat_map()不支持 lambda,需要CoMapFunction/CoFlatMapFunction

6.4 Operator Chaining:默认链式提升性能

Python 非 shuffle 算子默认 chain,减少序列化/反序列化开销。你可以通过:

  • key_by/shuffle/rescale/rebalance/partition_custom等打断
  • start_new_chain()/disable_chaining()
  • 配置:python.operator-chaining.enabled

7. 依赖管理:JAR、Python 包、requirements、archives、解释器

如果你在一个作业里混用了 Table API 和 DataStream API,建议统一从 DataStream API 侧配置依赖,确保两边都生效(你提供的文档结论)。

7.1 JAR 依赖

  • Table API:pipeline.jars/pipeline.classpaths
  • DataStream API:env.add_jars(...)/env.add_classpaths(...)
  • CLI:--jarfile只支持一个 jar,多个要打 fat jar

7.2 Python 依赖

  • add_python_file():py 文件、包、目录、zip/whl/egg
  • set_python_requirements(requirements.txt, requirements_cache_dir):线上/离线安装
  • add_python_archive():打包 venv、数据文件、模型文件
  • python.executablepython.client.executable:分别控制 worker 端与 client 端解释器

离线依赖缓存目录的准备方式(你提供的命令):

pip download -d cached_dir -r requirements.txt --no-binary :all:

8. Python 执行模式:PROCESS vs THREAD(性能与隔离权衡)

Flink 1.15 引入 THREAD 模式:

  • PROCESS(默认):Python UDF 在独立 Python 进程,隔离好,但有跨进程通信/序列化开销
  • THREAD:把 Python 嵌入 JVM(依赖 PEMJA),减少 IPC 开销,但同 JVM 多个 Python 函数仍受 GIL 影响

配置方式:

# Table APItable_env.get_config().set("python.execution-mode","process")# or "thread"

THREAD 模式支持范围要注意:

  • Table API:Python UDAF / Pandas UDF & Pandas UDAF 不支持 thread
  • DataStream API:很多算子支持,但 iterate、join、async I/O 等不支持,可能会 fallback 到 process

9. 配置项速查:bundle、arrow batch、managed memory、profile

常用优化参数(你提供的表里很关键的几项):

  • python.fn-execution.bundle.size:吞吐与延迟的平衡点(大吞吐更高,延迟与内存更大)
  • python.fn-execution.bundle.time:等待聚合成 bundle 的超时
  • python.fn-execution.arrow.batch.size:Arrow batch 大小(不应超过 bundle.size)
  • python.fn-execution.memory.managed:Python worker 使用 managed memory(默认 true)
  • python.profile.enabled:开启 profiling,结果打印到 TaskManager 日志
  • python.metric.enabled:追求极致吞吐时可以考虑关闭 Python metric

10. 调试与日志:Client vs Server

  • Client 侧:代码中(非 UDF)print/logging,日志在提交端;默认 WARNING
  • Server 侧:UDF 中print/logging,日志在 TaskManager;默认 INFO
  • 找 PyFlink 日志目录:
python -c"import pyflink,os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"

远程调试 UDF(PyCharm):

importpydevd_pycharm pydevd_pycharm.settrace('localhost',port=6789,stdoutToServer=True,stderrToServer=True)

11. 端到端容错语义:Source/Sink 的交付保障

你给出的总结表非常实用,常见结论:

  • Source 参与 checkpoint 才能做到“状态 exactly-once”

  • Sink 参与 checkpoint 才可能做到端到端 exactly-once

  • 例如:

    • Kafka source:exactly once
    • File sink:exactly once
    • Elasticsearch sink:通常 at least once
    • Kafka producer:可 at least once / exactly once(事务)

实际落地要逐个 connector 看文档细节,尤其是幂等、事务、两阶段提交等要求。

12. FlinkCEP:复杂事件处理(Pattern API)

CEP 用于在无限流里识别事件序列模式,例如:

  • start → middle(满足条件)→ end(满足条件)
  • 支持 oneOrMore/times/optional/greedy
  • 支持 next/followedBy/followedByAny(严格/宽松/非确定宽松)
  • 支持 within(Duration) 做时间窗口约束
  • 支持 AfterMatchSkipStrategy 控制匹配结果爆炸(非常重要)

实践建议:

  • 模式里如果有循环(oneOrMore),一定考虑加until()within(),避免状态膨胀
  • 选择 skip strategy 来控制输出数量,否则在 followedByAny + oneOrMore 的组合下很容易产生组合爆炸

13. State Processor API:读写/修改 Savepoint 与 Checkpoint

这是“救火神器”和“演进神器”,适用场景:

  • 离线分析 state 是否符合预期
  • 用历史数据 bootstrap 一个新作业的 state
  • 修复不一致 state
  • 修改 state 类型、并行度、拆分/合并 operator state、替换 UID 等

13.1 DataStream 方式读取 state

SavepointReadersavepoint=SavepointReader.read(env,"hdfs://path/",newHashMapStateBackend());
  • 读 list/union/broadcast state
  • 读 keyed state:实现KeyedStateReaderFunction
  • 读 window state:指定 assigner + aggregate + WindowReaderFunction

13.2 写新 savepoint / 基于旧 savepoint 修改

SavepointWriter.newSavepoint(env,newHashMapStateBackend(),maxParallelism).withOperator(OperatorIdentifier.forUid("uid1"),transformation1).write(savepointPath);

并可用changeOperatorIdentifier做 UID/UID hash 迁移。

13.3 Table/SQL 方式读 keyed state

通过savepointconnector 建表,把 keyed state 映射成可查询的表(只支持 keyed state),对于快速排查非常高效。

14. Avro Format:Flink 原生支持 + PyFlink 用法

Java 侧依赖(你给的依赖):

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>2.2.0</version></dependency>

PyFlink 侧需要对应的 jar(并按“依赖管理”章节加入)。

Python 读取 Avro 文件的思路:定义 Avro schema,AvroInputFormat读出来是普通 Python 对象,然后你可以map(json.dumps)打印或下游处理:

schema=AvroSchema.parse_string("""{ ... }""")ds=env.create_input(AvroInputFormat(AVRO_FILE_PATH,schema))

一个容易踩坑的点(你提供的说明非常关键):

  • Avro 生成类里某字段若因为 UNION 定义写法不当导致生成Object类型,就不能作为 join/grouping key
  • 可空类型用["null","double"]这种形式是允许的

15. Azure Table Storage:用 HadoopInputFormat Wrapper 接入(示例落地)

你给的方案核心是:使用flink-hadoop-compatibility的 wrapper 来复用 Hadoop InputFormat。

15.1 准备依赖

  1. 拉源码并本地构建(因为不在 Maven Central):
gitclone https://github.com/mooso/azure-tables-hadoop.gitcdazure-tables-hadoop mvn cleaninstall
  1. 在 Flink 项目里加入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.microsoft.hadoop</groupId><artifactId>microsoft-hadoop-azure</artifactId><version>0.0.5</version></dependency>

15.2 读取 Azure Table 并转成 Flink 流做处理

你提供的代码表达的是“把 Azure Table 变成 Flink 的数据集/数据流再做 map”。这里给一个更清晰的版本,保持逻辑一致:读取(Text, WritableEntity),遍历属性打印,并把 key 输出到下游。

importjava.util.Map;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importcom.microsoft.hadoop.azure.AzureTableConfiguration;importcom.microsoft.hadoop.azure.AzureTableInputFormat;importcom.microsoft.hadoop.azure.WritableEntity;importcom.microsoft.windowsazure.storage.table.EntityProperty;publicclassAzureTableExample{publicstaticvoidmain(String[]args)throwsException{finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);HadoopInputFormat<Text,WritableEntity>hdIf=newHadoopInputFormat<>(newAzureTableInputFormat(),Text.class,WritableEntity.class,newJob());hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(),"TODO");hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(),"TODO");hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(),"TODO");DataStream<Tuple2<Text,WritableEntity>>input=env.createInput(hdIf);DataStream<String>out=input.map(newMapFunction<Tuple2<Text,WritableEntity>,String>(){@OverridepublicStringmap(Tuple2<Text,WritableEntity>value){System.err.println("Key = "+value.f0);WritableEntitywe=value.f1;for(Map.Entry<String,EntityProperty>prop:we.getProperties().entrySet()){System.err.println("key="+prop.getKey()+" ; value="+prop.getValue().getValueAsString());}returnvalue.f0.toString();}});out.print();env.execute("Azure Example");}}

说明:

  • 这个示例核心不是“Azure Table 专属 connector”,而是复用 Hadoop InputFormat
  • 有了 DataStream 后,你就可以接上 Flink 的任意算子链(keyBy/window/state/CEP…)

16. 一套可复用的“选型与组合”建议

  • 主要是表计算、SQL、维表 join、聚合:优先Table API/SQL
  • 需要复杂状态、细粒度控制、底层连接器/自定义 source:用DataStream API
  • Python 逻辑吞吐不够:优先用Pandas UDF(Arrow),其次考虑bundle.size/batch.size调参
  • 需要模式匹配告警:CEP
  • 需要迁移/修复/审计 state:State Processor API
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 12:16:22

企业微信 RPA 外部群自动化的稳定策略

QiWe开放平台提供了后台直登功能&#xff0c;登录成功后获取相关参数&#xff0c;快速Apifox在线测试&#xff0c;所有登录功能都是基于QiWe平台API自定义开发。 ​ 引言 当 RPA 流程从“跑通”进入“长期稳定运行”阶段&#xff0c;真正的挑战才刚刚开始。UI 变化、响应堆积…

作者头像 李华
网站建设 2026/4/23 8:21:09

汇编语言全接触-86.如何获取真正中断入口地址

概述&#xff1a;我们知道&#xff0c;DOS 的中断例程的入口地址存在 0000&#xff1a;0000 开始的中断向量表中&#xff0c;当程序要要建立一个中断例程时&#xff0c;需要修改中断向量表把入口地址指向自己的程序&#xff0c;为了使原来的中断例程能正常使用&#xff0c;在出…

作者头像 李华
网站建设 2026/4/23 8:19:51

matlab实现GMSK信号调制和解调

GMSK&#xff08;Gaussian Minimum Shift Keying&#xff09;是一种基于高斯滤波的调制技术&#xff0c;它结合了MSK&#xff08;Minimum Shift Keying&#xff09;和Gaussian滤波的特性&#xff0c;以减少频谱扩展和提高频带利用率。在MATLAB中实现GMSK信号的调制和解调可以分…

作者头像 李华
网站建设 2026/4/23 8:17:09

程序员项目管理能力提升手册:从技术执行者到项目主导者

很多程序员认为 “项目管理是项目经理的事”&#xff0c;只需专注编码即可。但实际工作中&#xff0c;程序员往往需要主导模块开发、协调跨角色协作、把控开发进度与质量&#xff0c;缺乏项目管理能力会导致&#xff1a;需求理解偏差、进度拖延、风险失控、协作混乱&#xff0c…

作者头像 李华
网站建设 2026/4/23 8:23:33

英超阿森纳这个球队怎么样?

作为英超最长情的观察者&#xff08;自1886年建队以来从未降级&#xff09;&#xff0c;阿森纳总能用它的独特魅力吸引你。它既是一部厚重的足球历史书&#xff0c;又是当下欧洲足坛最锋利、最沉稳的力量之一。下面让我为你展开这幅“兵工厂”的画卷。 一、 球队名片 昵称&…

作者头像 李华
网站建设 2026/4/23 8:21:10

‌测试领导力培养指南

一、测试领导力的本质&#xff1a;从“找Bug”到“质量驱动者”的角色跃迁‌ 传统意义上的测试工程师&#xff0c;核心职责是执行用例、报告缺陷、保障上线质量。而现代测试领导者&#xff0c;已演变为‌质量文化的塑造者、技术战略的制定者与跨职能团队的赋能者‌。其领导力并…

作者头像 李华