1. PyFlink Table API 数据类型与 Python 映射
在 Table 生态里,DataType 描述的是逻辑类型,不代表具体存储/传输的物理格式。你在 Python Table API 里会通过pyflink.table.types.DataTypes来声明 UDF 输入输出类型。
1.1 逻辑类型与 Python / Pandas 的对应关系
核心映射(节选你提供的表):
BOOLEAN→bool(Pandas:numpy.bool_)INT / BIGINT→int(Pandas:numpy.int32 / numpy.int64)FLOAT / DOUBLE→float(Pandas:numpy.float32 / numpy.float64)VARCHAR→strVARBINARY→bytesDECIMAL→decimal.DecimalDATE/TIME/TIMESTAMP→datetime.*ARRAY→list(Pandas:numpy.ndarray)ROW→Row(或在某些场景映射为dict)
这里最容易踩的点是:你在 UDF 声明了result_type,运行时返回值必须与声明匹配,否则会出现序列化/类型推断异常,尤其是在混用 SQL/Table API 与 Pandas UDF 时更明显。
2. Table API 的 Python UDF:普通 UDF 与向量化(Pandas)UDF
PyFlink 支持两类 UDF:
- 逐行 UDF(general Python UDF):一行一行处理
- 向量化 UDF(vectorized / Pandas UDF):一批一批(Arrow 列式)在 JVM 与 Python 间传输,通常吞吐显著更高
2.1 UDF 打包与分发(强烈建议)
非本地模式(YARN、Standalone、K8s)运行 UDF,推荐用:
- 配置:
python-files/python.files - 或代码:
table_env.add_python_file(...)
否则很常见的错误是:
ModuleNotFoundError: No module named 'my_udf'
2.2 在 UDF 里一次性加载资源:重写 open()
适合加载模型、词典、规则库等“只加载一次、重复 eval”的资源。
frompyflink.table.udfimportScalarFunction,udffrompyflink.tableimportDataTypesclassPredict(ScalarFunction):defopen(self,function_context):importpicklewithopen("resources.zip/resources/model.pkl","rb")asf:self.model=pickle.load(f)defeval(self,x):returnself.model.predict(x)predict=udf(Predict(),result_type=DataTypes.DOUBLE(),func_type="pandas")2.3 读取全局 Job 参数:FunctionContext
frompyflink.table.udfimportScalarFunction,udffrompyflink.tableimportDataTypesfrompyflink.table.functionsimportFunctionContextclassHashCode(ScalarFunction):defopen(self,function_context:FunctionContext):self.factor=int(function_context.get_job_parameter("hashcode_factor","12"))defeval(self,s:str):returnhash(s)*self.factor hash_code=udf(HashCode(),result_type=DataTypes.INT())并在作业侧设置:
t_env.get_config().set('pipeline.global-job-parameters','hashcode_factor:31')2.4 单元测试 UDF:取出原始函数
add=udf(lambdai,j:i+j,result_type=DataTypes.BIGINT())f=add._funcassertf(1,2)==32.5 向量化(Pandas)UDF 与 UDAF
向量化的关键点:
- JVM ↔ Python 使用Arrow 列式批量传输
python.fn-execution.arrow.batch.size控制 batch 大小- 对于 Pandas UDAF:不支持 partial aggregation,同一 group/window 的数据可能一次性加载到内存,需要评估内存峰值
向量化 Scalar UDF 示例:
frompyflink.table.udfimportudf@udf(result_type='BIGINT',func_type="pandas")defadd(i,j):returni+j向量化 UDAF 示例:
frompyflink.table.udfimportudaf@udaf(result_type='FLOAT',func_type="pandas")defmean_udaf(v):returnv.mean()3. Table 与 Pandas DataFrame 的互转:Arrow 通道
3.1 Pandas → Table
importpandasaspdimportnumpyasnpfrompyflink.tableimportDataTypes pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf)table=t_env.from_pandas(pdf,['f0','f1'])table=t_env.from_pandas(pdf,[DataTypes.DOUBLE(),DataTypes.DOUBLE()])table=t_env.from_pandas(pdf,DataTypes.ROW([DataTypes.FIELD("f0",DataTypes.DOUBLE()),DataTypes.FIELD("f1",DataTypes.DOUBLE())]))3.2 Table → Pandas(注意内存)
to_pandas()会把结果 collect 到 client 侧内存,建议先limit():
pdf=table.limit(100).to_pandas()4. PyFlink Metrics:在 UDF 中注册 Counter/Gauge/Distribution/Meter
在 UDF 的open()中通过function_context.get_metric_group()拿到指标组:
- Counter:
inc()/dec() - Gauge:回调取值(只支持 int)
- Distribution:sum/count/min/max/mean(只支持 int)
- Meter:吞吐速率(mark_event)
Counter 示例:
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):defopen(self,function_context):self.counter=function_context.get_metric_group().counter("my_counter")defeval(self,i):self.counter.inc(i)returni5. Connector 使用方式(PyFlink 侧重点)
5.1 先把 connector/format JAR 加进来
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")5.2 推荐用 DDL 定义源表/结果表
Kafka + JSON 示例(你提供的结构):
source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """然后:
t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()5.3 预置 Source/Sink
from_elements():从集合构建表from_pandas()/to_pandas():与 Pandas 互转- 自定义 source/sink:当前仍需 Java/Scala 实现,再用 TableFactory 暴露给 DDL
6. Python DataStream API:结构、类型系统与算子
DataStream API 更贴近底层流处理,适合状态、窗口、复杂事件、低层控制。
6.1 程序骨架:env → source → transform → sink → execute
你提供的示例里展示了 state 的访问方式:
- 在
open()中用RuntimeContext.get_state(...)注册/获取状态 map()中读取/更新状态
6.2 类型(Types)与 Pickle 序列化
如果你不指定type_info/output_type,默认Types.PICKLED_BYTE_ARRAY(),用 pickle 做序列化,通用但慢。
什么时候必须指定类型:
- Python 记录要交给 Java 算子/connector(比如 FileSink)
- 追求更好的序列化性能
- DataStream ↔ Table转换通常要求明确的复合类型
FileSink 场景(你提供的关键用法):
env.from_collection([(1,'aaa'),(2,'bbb')])\.map(lambdar:(r[0]+1,r[1].upper()),output_type=Types.ROW([Types.INT(),Types.STRING()]))\.add_sink(FileSink.for_row_format('/tmp/output',Encoder.simple_string_encoder()).build())6.3 常见算子与函数定义方式
- 实现接口类(如
MapFunction) - 直接 lambda
- 普通 Python 函数
注意:ConnectedStream.map()/flat_map()不支持 lambda,需要CoMapFunction/CoFlatMapFunction。
6.4 Operator Chaining:默认链式提升性能
Python 非 shuffle 算子默认 chain,减少序列化/反序列化开销。你可以通过:
key_by/shuffle/rescale/rebalance/partition_custom等打断start_new_chain()/disable_chaining()- 配置:
python.operator-chaining.enabled
7. 依赖管理:JAR、Python 包、requirements、archives、解释器
如果你在一个作业里混用了 Table API 和 DataStream API,建议统一从 DataStream API 侧配置依赖,确保两边都生效(你提供的文档结论)。
7.1 JAR 依赖
- Table API:
pipeline.jars/pipeline.classpaths - DataStream API:
env.add_jars(...)/env.add_classpaths(...) - CLI:
--jarfile只支持一个 jar,多个要打 fat jar
7.2 Python 依赖
add_python_file():py 文件、包、目录、zip/whl/eggset_python_requirements(requirements.txt, requirements_cache_dir):线上/离线安装add_python_archive():打包 venv、数据文件、模型文件python.executable与python.client.executable:分别控制 worker 端与 client 端解释器
离线依赖缓存目录的准备方式(你提供的命令):
pip download -d cached_dir -r requirements.txt --no-binary :all:8. Python 执行模式:PROCESS vs THREAD(性能与隔离权衡)
Flink 1.15 引入 THREAD 模式:
- PROCESS(默认):Python UDF 在独立 Python 进程,隔离好,但有跨进程通信/序列化开销
- THREAD:把 Python 嵌入 JVM(依赖 PEMJA),减少 IPC 开销,但同 JVM 多个 Python 函数仍受 GIL 影响
配置方式:
# Table APItable_env.get_config().set("python.execution-mode","process")# or "thread"THREAD 模式支持范围要注意:
- Table API:Python UDAF / Pandas UDF & Pandas UDAF 不支持 thread
- DataStream API:很多算子支持,但 iterate、join、async I/O 等不支持,可能会 fallback 到 process
9. 配置项速查:bundle、arrow batch、managed memory、profile
常用优化参数(你提供的表里很关键的几项):
python.fn-execution.bundle.size:吞吐与延迟的平衡点(大吞吐更高,延迟与内存更大)python.fn-execution.bundle.time:等待聚合成 bundle 的超时python.fn-execution.arrow.batch.size:Arrow batch 大小(不应超过 bundle.size)python.fn-execution.memory.managed:Python worker 使用 managed memory(默认 true)python.profile.enabled:开启 profiling,结果打印到 TaskManager 日志python.metric.enabled:追求极致吞吐时可以考虑关闭 Python metric
10. 调试与日志:Client vs Server
- Client 侧:代码中(非 UDF)
print/logging,日志在提交端;默认 WARNING - Server 侧:UDF 中
print/logging,日志在 TaskManager;默认 INFO - 找 PyFlink 日志目录:
python -c"import pyflink,os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"远程调试 UDF(PyCharm):
importpydevd_pycharm pydevd_pycharm.settrace('localhost',port=6789,stdoutToServer=True,stderrToServer=True)11. 端到端容错语义:Source/Sink 的交付保障
你给出的总结表非常实用,常见结论:
Source 参与 checkpoint 才能做到“状态 exactly-once”
Sink 参与 checkpoint 才可能做到端到端 exactly-once
例如:
- Kafka source:exactly once
- File sink:exactly once
- Elasticsearch sink:通常 at least once
- Kafka producer:可 at least once / exactly once(事务)
实际落地要逐个 connector 看文档细节,尤其是幂等、事务、两阶段提交等要求。
12. FlinkCEP:复杂事件处理(Pattern API)
CEP 用于在无限流里识别事件序列模式,例如:
- start → middle(满足条件)→ end(满足条件)
- 支持 oneOrMore/times/optional/greedy
- 支持 next/followedBy/followedByAny(严格/宽松/非确定宽松)
- 支持 within(Duration) 做时间窗口约束
- 支持 AfterMatchSkipStrategy 控制匹配结果爆炸(非常重要)
实践建议:
- 模式里如果有循环(oneOrMore),一定考虑加
until()或within(),避免状态膨胀 - 选择 skip strategy 来控制输出数量,否则在 followedByAny + oneOrMore 的组合下很容易产生组合爆炸
13. State Processor API:读写/修改 Savepoint 与 Checkpoint
这是“救火神器”和“演进神器”,适用场景:
- 离线分析 state 是否符合预期
- 用历史数据 bootstrap 一个新作业的 state
- 修复不一致 state
- 修改 state 类型、并行度、拆分/合并 operator state、替换 UID 等
13.1 DataStream 方式读取 state
SavepointReadersavepoint=SavepointReader.read(env,"hdfs://path/",newHashMapStateBackend());- 读 list/union/broadcast state
- 读 keyed state:实现
KeyedStateReaderFunction - 读 window state:指定 assigner + aggregate + WindowReaderFunction
13.2 写新 savepoint / 基于旧 savepoint 修改
SavepointWriter.newSavepoint(env,newHashMapStateBackend(),maxParallelism).withOperator(OperatorIdentifier.forUid("uid1"),transformation1).write(savepointPath);并可用changeOperatorIdentifier做 UID/UID hash 迁移。
13.3 Table/SQL 方式读 keyed state
通过savepointconnector 建表,把 keyed state 映射成可查询的表(只支持 keyed state),对于快速排查非常高效。
14. Avro Format:Flink 原生支持 + PyFlink 用法
Java 侧依赖(你给的依赖):
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>2.2.0</version></dependency>PyFlink 侧需要对应的 jar(并按“依赖管理”章节加入)。
Python 读取 Avro 文件的思路:定义 Avro schema,AvroInputFormat读出来是普通 Python 对象,然后你可以map(json.dumps)打印或下游处理:
schema=AvroSchema.parse_string("""{ ... }""")ds=env.create_input(AvroInputFormat(AVRO_FILE_PATH,schema))一个容易踩坑的点(你提供的说明非常关键):
- Avro 生成类里某字段若因为 UNION 定义写法不当导致生成
Object类型,就不能作为 join/grouping key - 可空类型用
["null","double"]这种形式是允许的
15. Azure Table Storage:用 HadoopInputFormat Wrapper 接入(示例落地)
你给的方案核心是:使用flink-hadoop-compatibility的 wrapper 来复用 Hadoop InputFormat。
15.1 准备依赖
- 拉源码并本地构建(因为不在 Maven Central):
gitclone https://github.com/mooso/azure-tables-hadoop.gitcdazure-tables-hadoop mvn cleaninstall- 在 Flink 项目里加入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.microsoft.hadoop</groupId><artifactId>microsoft-hadoop-azure</artifactId><version>0.0.5</version></dependency>15.2 读取 Azure Table 并转成 Flink 流做处理
你提供的代码表达的是“把 Azure Table 变成 Flink 的数据集/数据流再做 map”。这里给一个更清晰的版本,保持逻辑一致:读取(Text, WritableEntity),遍历属性打印,并把 key 输出到下游。
importjava.util.Map;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importcom.microsoft.hadoop.azure.AzureTableConfiguration;importcom.microsoft.hadoop.azure.AzureTableInputFormat;importcom.microsoft.hadoop.azure.WritableEntity;importcom.microsoft.windowsazure.storage.table.EntityProperty;publicclassAzureTableExample{publicstaticvoidmain(String[]args)throwsException{finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);HadoopInputFormat<Text,WritableEntity>hdIf=newHadoopInputFormat<>(newAzureTableInputFormat(),Text.class,WritableEntity.class,newJob());hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(),"TODO");hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(),"TODO");hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(),"TODO");DataStream<Tuple2<Text,WritableEntity>>input=env.createInput(hdIf);DataStream<String>out=input.map(newMapFunction<Tuple2<Text,WritableEntity>,String>(){@OverridepublicStringmap(Tuple2<Text,WritableEntity>value){System.err.println("Key = "+value.f0);WritableEntitywe=value.f1;for(Map.Entry<String,EntityProperty>prop:we.getProperties().entrySet()){System.err.println("key="+prop.getKey()+" ; value="+prop.getValue().getValueAsString());}returnvalue.f0.toString();}});out.print();env.execute("Azure Example");}}说明:
- 这个示例核心不是“Azure Table 专属 connector”,而是复用 Hadoop InputFormat
- 有了 DataStream 后,你就可以接上 Flink 的任意算子链(keyBy/window/state/CEP…)
16. 一套可复用的“选型与组合”建议
- 主要是表计算、SQL、维表 join、聚合:优先Table API/SQL
- 需要复杂状态、细粒度控制、底层连接器/自定义 source:用DataStream API
- Python 逻辑吞吐不够:优先用Pandas UDF(Arrow),其次考虑
bundle.size/batch.size调参 - 需要模式匹配告警:CEP
- 需要迁移/修复/审计 state:State Processor API