原副标题:JRC Flink 流工作台Listary手册
译者:天猫仓储 康琪
责任编辑综合性 Apache Flink 基本原理与天猫动态排序互联网平台(JRC)的大背景,详尽讲诉了小规模 Flink 流工作台的Listary方法。通过写作责任编辑,听众可如是说 Flink 流工作台的通用型Listary举措,并应用于生产环境。
写在后面
Apache Flink 作为 Google Dataflow Model 的工业级实现,经过十多年的发展,如今已经成为INS13ZD排序开源应用领域的基克维泽区。它具有高林宏吉、低传输速率、原生植仓储批多功能、高连续性、可扩展性、高尾端的特点,同时提供多样的层次化 API、时间询问处、状况化排序等语法,方便快捷使用者加速进阶动态合作开发,构筑动态排序管理体系。
俗语有云,工欲善其事,工欲善其事。要想让小规模、大互联网流量的 Flink 工作台高效率运转,就必定要进行Listary,因此认知其另一面的基本原理。责任编辑是本栏根据甚或实战经验以及Listary课堂教学,紧密结合天猫动态排序互联网平台(JRC)大背景工业生产的面向全国专精相关人员的 Flink 流工作台Listary手册。主要包涵以下五个方面:
TaskManager 缓存数学模型Listary 互联网栈Listary RocksDB 与状况Listary 其他Listary项责任编辑如前所述 Flink 1.12 版。写作之前,建议听众对 Flink 基础模块、程式设计数学模型和运转时有较深入细致的如是说。
01 TaskManager 缓存数学模型Listary
1.1 TaskManager 缓存数学模型与模块
目前的 Flink TaskManager 缓存数学模型是 1.10 版确认下来的,非官方文件格式中得出的库尔如下表所示。在高版 Flink 的 Web UI 中,也可以看到这张图。
图 1 TaskManager 缓存数学模型
上面来看图骂人,分地区得出比非官方文件格式详尽一些的如是说。t.m. 即为 taskmanager. memory. 后缀的简写。
1.2 互联网平台特定模块
除了 TaskManager 缓存数学模型相关的模块之外,还有一些互联网平台提供的其他模块,列举如下表所示。
1.3 TM / 互联网平台模块与 JVM 的关系
上述模块与 TaskManager JVM 本身的模块有如下表所示的对应关系:
-Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network -XX: Max Metaspace Size → t. m. jvm- metaspace. size另外,还可以通过 env.java.opts.{jobmanager | taskmanager} 配置项来分别设定 JM 和 TM JVM 的附加模块。
1.4 缓存分配示例
上面以在生产环境某工作台中运转的 8C / 16G TaskManager 为例,根据以上规则,手动排序各个缓存分区的配额。注意有部分模块未采用默认值。
t.m.process.size = 16384
t.m.flink.size
= t.m.process.size * apus.memory.incontainer.available.ratio
= 16384 * 0.9 = 14745.6
t.m.jvm-metaspace.size
= [t.m.process.size – t.m.flink.size] * apus.metaspace.incutoff.ratio
= [16384 – 14745.6] * 0.25 = 409.6
$overhead
= MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max}
= MIN{16384 * 0.1, 1024} = 1024
$network
= MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max}
= MIN{14745.6 * 0.3, 5120} = 4423.68
$managed
= t.m.flink.size * t.m.managed.fraction
= 14745.6 * 0.25 = 3686.4
t.m.task.off-heap.size
= t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction
= 14745.6 * 0.01 = 147.4
t.m.task.heap.size
= t.m.flink.size – $network – $managed – t.m.task.off-heap.size – t.m.framework.heap.size – t.m.framework.off-heap.size
= 14745.6 – 4423.68 – 3686.4 – 147.4 – 128 – 128 = 6232.12
与 Web UI 中展示的缓存配额做比对,可发现完全吻合。
图 2 Web UI 展示的缓存分配情况
1.5 Listary概览
认知 TaskManager 缓存数学模型是开展Listary的大前提,进行Listary的宗旨就是:合理分配,避免浪费,保证性能。上面先对比较容易出现问题的三块地区做简要的解说。
1. 关于任务堆外缓存
互联网平台方的解释是有些使用者的工作台需要这部分缓存,但从 Flink Runtime 的角度讲,主要是批工作台(如 Sort-Merge Shuffle 过程)会积极地使用它。相对地,流工作台很少涉及这一部分,除非使用者代码或使用者引用的第三方库直接操作了 DirectByteBuffer 或 Unsafe 之类。所以一般可以优先保证堆缓存,即尝试将
apus.t.m.task.off-heap.fraction 再调小一些(如 0.05),再观察工作台运转是否正常。
2. 关于托管缓存
如果使用 RocksDB 状况后端,且状况数据量较大或读写较频繁,建议适当增加 t.m.managed.fraction,如 0.2~0.5,可配合 RocksDB 监控决定。如果不使用 RocksDB 状况后端,可设为 0,因为其他状况后端下的本地状况会存在 TaskManager 堆缓存中。后文会详尽讲解 RocksDB 相关的Listary项。
3. 关于互联网缓存
需要特别注意的是,互联网缓存的占用量与并行度和工作台拓扑有关,而与实际互联网互联网流量关系不大,所以不能简单地以工作台的数据量来设置这一地区。粗略地讲,对简单拓扑,建议以默认值启动工作台,再观察该地区的利用情况并进行调整;对复杂拓扑,建议先适当调大 t.m.network.fraction 和 max,保证不出现 IOException: Insufficient number of network buffers 异常,然后再做调整。另外,请一定不要把 t.m.network.min 和 max 设成相等的值,这样会直接忽略 fraction,而这种直接的设定往往并不科学。下一节就来详尽讲解 Flink 互联网栈的Listary。
02 互联网栈Listary
2.1 互联网栈和互联网缓存
图 3 Flink 互联网栈
Flink 的互联网栈构筑在 Netty 的基础之上。如上图所示,每个 TaskManager 既可以是 Server(发送端)也可以是 Client(接收端),因此它们之间的 TCP 连接会被复用,以减少资源消耗。
图中的小色块就是互联网缓存(NetworkBuffer),它是数据传输的最基本单位,以直接缓存的形式分配,承载序列化的 StreamRecord 数据,且一个 Buffer 的大小就等于一个 MemorySegment 的大小(t.m.segment-size,默认 32KB)。TM 中的每个 Sub-task 都会创建互联网缓存池(NetworkBufferPool),用于分配和回收 Buffer。上面讲解一下互联网缓存的分配规则。
2.2 互联网缓存分配规则
Flink 流工作台的执行计划用三层 DAG 来表示,即:StreamGraph(逻辑计划)→ JobGraph(优化的逻辑计划)→ ExecutionGraph(物理计划)。当 ExecutionGraph 真正被调度到 TaskManager 上面执行时,形成的是如下表所示图所示的结构。
图 4 Flink 物理执行图结构
每个 Sub-task 都有一套用于数据交换的模块,输出侧称为 ResultPartition(RP),输入侧称为 InputGate(IG)。另外,它们还会根据并行度和上下游的 DistributionPattern(POINTWISE 或 ALL_TO_ALL)划分为子块,分别称为 ResultSubpartition(RS)和 InputChannel(IC)。注意上下游 RS 和 IC 的比例是严格 1:1 的。互联网缓存就是在 ResultPartition 和 InputGate 级别分配的,具体的分配规则是:
#Buffer-RP = #RS + 1 && #Buffer-RS <= t.network.m.max-buffers-per-channel (10) #Buffer-IG = #IC * t.network.m.buffers-per-channel (2, exclusive) + t.network.m.floating-buffers-per-gate (8, floating)<!—->
翻译一下:
发送端 RP 分配的 Buffer 总数为 RS 的数量 + 1,且为了防止倾斜,每个 RS 可获得的 Buffer 数不能多于 taskmanager.network.memory.max-buffers-per-channel(默认值 10); 接收端每个 IC 独享的 Buffer 数为 taskmanager. network. memory. buffers- per- channel(默认值 2),IG 可额外提供的浮动 Buffer 数为 taskmanager. network. memory. floating- buffers- per- gate(默认值 8)。多说一句,上图这套机制也是 Flink 实现 Credit-based 流控(反压)的基础,想想诊断反压时会看的 **PoolUsage 模块就明白了。反压是比较基础的话题,这里就不再展开。
再重复上一节的那句话:互联网缓存的占用量与并行度和工作台拓扑有关,而与实际互联网互联网流量关系不大。特别地,由于 ALL_TO_ALL 分布(如 Hash、Rebalance)会产生 O (N^2) 级别的 RS 和 IC,所以对 Buffer 的需求量也就更大。当然,我们基本不可能通过用肉眼看复杂的拓扑图来排序 Buffer 数,所以最好的方法是加速试错,来看一个例子。
2.3 互联网缓存Listary示例
本节以测试环境中的某工作台(下称 “示例工作台”)为例。
该工作台有 54 个 8C / 16G 规格的 TM,并行度 400,运转 4330 个 Sub-tasks,且包涵大量的 keyBy 操作。初始设定 t.m.network.fraction = 0.2 & t.m.network.max = 3GB,报 IOException: Insufficient network buffers 异常;再次设定 t.m.network.fraction = 0.3 & t.m.network.max = 5GB,工作台正常启动,实际分配 4.32GB,占用率 73%~78% 之间浮动(参见之前的 Web UI 图)。这个分配情况相对于原工作台的 fraction = 0.5 & min = max = 8GB 显然是更优的。
有的同学可能会问:空闲的 Network 区域缓存不能挪作他用吗?答案是否定的。在工作台启动时,Network 地区的全部缓存都会初始化成 Buffer,并按上一节所述的配额分配到 RP 和 IG,Web UI 中 Netty Shuffle Buffers → Available 一栏的 Buffer 基本可以认为被浪费了。所以,当工作台遇到瓶颈时,盲目增大互联网缓存对林宏吉量有害无益。
2.4 容易忽略的缓存超时
互联网缓存在发送端被 Flush 到下游有三种时机:Buffer 写满、超时时间到、遇到特殊标记(如 Checkpoint Barrier)。之所以要设计缓存超时,是为了避免 Buffer 总是无法写满导致下游处理延迟。可以通过 StreamExecutionEnvironment#setBufferTimeout 方法或者 execution.buffer-timeout 模块来设置缓存超时,默认 100ms,一般无需更改。
图 5 缓存的填充与发送
但是,考虑大并行度、大量 ALL_TO_ALL 交换的工作台,数据相对分散,每个 ResultSubpartition 的 Buffer 并不会很快填满,大量的 Flush 操作反而会无谓地占用 CPU。此时可以考虑适当增大缓存超时,降低 Flush 频率,能够有效降低 CPU Usage。以前述工作台为例,将缓存超时设为 500ms,其他模块不变,稳定消费阶段 TM 的平均 CPU Usage 降低了 40%,效果拔群。当然这仍是以下游延迟作为 trade-off 的,故时效性极敏感的工作台不适用于此优化。
2.5 互联网容错
互联网平台采用 Flink on Kubernetes 的部署方式,但是 Kubernetes 互联网虚拟化(Calico、Flannel 等)会损失互联网性能,故对于大互联网流量或复杂工作台,务必提高互联网容错性。以下是三个相关的模块。
1.taskmanager.network.request-backoff.max
默认值 10000(社区版)/ 60000(互联网平台),表示下游 InputChannel 请求上游 ResultSubpartition 的指数退避最大时长,单位为毫秒。如果请求失败,会抛出
PartitionNotFoundException: Partition xx@host not found,应适当调大,如 240000。注意此报错与 Kafka Partition 无关,切勿混淆。
2.akka.ask.timeout
默认值 10s(社区版)/ 60s(互联网平台),表示 Akka Actor 的 Ask RPC 等待返回结果的超时。如果互联网拥塞或者拓扑过于复杂,就会出现 AskTimeoutException: Ask timed out on Actor akka://xx after xx ms 的信息,应调大此值,如 120s。注意长时间 GC 也可能导致此问题,留心排查。
3.heartbeat.timeout
默认值 50000,表示 JobManager 和 TaskManager 之间心跳信号的发送 / 接收超时,单位为毫秒。与 akka.ask.timeout 同理,若出现 TimeoutException: Heartbeat of TaskManager with id xx timed out,建议适当调大。
03 RocksDB 与状况Listary
3.1 Flink 中的 FRocksDB
图 6 FRocksDB 读写流程
Flink RocksDB 状况后端采用的是名为 FRocksDB 的分支版,由 Ververica 维护。它的读写流程与原版基本相同,如上图所示,MemTable 和 BlockCache 分别就是读写缓存和读缓存。特别地,由于 Flink 在每个 Checkpoint 周期都会将 RocksDB 的数据快照持久化到文件系统,所以不需要写预写日志(WAL)。
TM 中的每个 Slot 都拥有一个 RocksDB 实例,且传统方式下每个列族(CF)都对应一套 MemTable、BlockCache 和 SST。而在 Flink 工作台中申请的一个 StateHandle—— 即 Runtime Context# get… State (State Descriptor) —— 就对应一个取 StateDescriptor 名称的列族。显然,同一工作台内 StateDescriptor 的名称不能重复。
3.2 RocksDB 托管缓存机制
上述传统方式有个明显的缺点,即 RocksDB 的缓存几乎不受控(因为 Flink 并不限制使用者能申请多少个 StateHandle)。因此,Flink 在 1.10 版借助 RocksDB 5.6 + 提出的 WriteBufferManager 和 LRUCache 协同机制,实现了全托管的 RocksDB 缓存管理,如下表所示图所示。
图 7 全托管 RocksDB 缓存管理
托管缓存机制默认启用(state. backend. rocksdb. memory. managed = true),此时 TM 会将整块 Managed Memory 地区作为所有 RocksDB 实例共用的 BlockCache,并通过 WriteBufferManager 将 MemTable 的缓存消耗向 BlockCache 记账(即写入只有 size 信息的 dummy 块),从而 BlockCache 能够感知到全部的缓存使用并施加限制,避免 OOM 发生。SST 索引和 Bloom Filter 块则会进入 BlockCache 的高优先级区。需要注意,由于历史原因以及 Iterator-pinned Blocks 的存在,BlockCache 在少数情况下不能严格限制缓存,故有必要配置一些 JVM Overhead 作为兜底。
托管缓存默认在各个 Slot 之间平均分配,使用者也可以通过
s.b.r.memory.fixed-per-slot 模块来为每个 Slot 手动设定托管缓存配额,但一般不推荐。除此之外,可调整的两个模块如下表所示。
s.b.r.memory.write-buffer-ratio:MemTable 缓存占托管缓存的比例,默认值 0.5;<!—->
s.b.r.memory.high-prio-pool-ratio:高优先级区缓存占托管缓存的比例,默认值 0.1。剩余的部分(默认 0.4)就是留给数据 BlockCache 的配额。使用者一般不需要更改它们,若工作台状况特别重读或重写,可适当调整,但必须先保证托管缓存充足。
3.3 其他 RocksDB 模块
1.s.b.r.checkpoint.transfer.thread.num(默认 1)**
每个有状况算子在 Checkpoint 时传输数据的线程数,增大此值会对互联网和磁盘林宏吉量有更高要求。一般建议 4~8,1.13 版中默认已改为 4。
2.s.b.r.timer-service.factory(社区版默认 ROCKSDB,互联网平台默认 HEAP)**
Timer 相关状况存储的位置,包涵使用者注册的 Timer 和框架内部注册的 Timer(如 Window、Trigger)。若存储在堆中,则 Timer 状况做 CP 时无法异步 Snapshot,所以 Timer 很多的情况下存在 RocksDB 内更好。但美中不足的是,设置为 ROCKSDB 会有一个极偶发的序列化 bug,导致无法从 Savepoint 恢复状况,若不能接受,建议 HEAP。
3.s.b.r.predefined-options(默认 DEFAULT)**
社区提供的预设 RocksDB Listary参数集,有 4 种:DEFAULT、SPINNING_DISK_OPTIMIZED、
SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED(名称都很 self-explanatory)。该模块容易忽略,但强烈建议设置,比起默认值均有不错的性能收益。若单个 Slot 的状况量达到 GB 级别,且托管缓存充裕,设为 SPINNING_DISK_OPTIMIZED_HIGH_MEM 最佳。其他情况设为 SPINNING_DISK_OPTIMIZED 即可。
除了上述模块之外,原则上建议遵循 RocksDB Wiki 的忠告(”No need to tune it unless you see an obvious performance problem”),不再手动调整 RocksDB 高级模块(如 s.b.r.{block | writebuffer | compaction}.*),除非出现了托管缓存机制无法解决的问题。本栏也将部分高级模块列出如下表所示,供参考。
图 8 RocksDB 高级模块
注意划线的项会被托管缓存机制覆盖掉。如果经过慎重思考,必须 fine tune RocksDB,则需要将 s.b.r.memory.managed 设为 false,同时使用者要承担可能的 OOM 风险。
3.4 RocksDB 监控 & Listary示例
在大状况作业正式上线之前,应打开一部分必要的 RocksDB 监控,观察是否有性能瓶颈。开启监控对状况读写性能有一定影响,一般建议如下表所示 6 项:
s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true观察完毕并解决问题后,请务必关闭它们。
图 9 示例工作台 RocksDB 监控
上图是示例工作台的部分 RocksDB Metrics 图表,比较正常。如果在稳定消费阶段,Flush 和 Compaction 等重量级操作特别频繁,以至于图中的点连成线,一般就提示 RocksDB 遇到了瓶颈。但是托管缓存(即 BlockCache)占用 100% 是正常现象,基本不必担心。
作为参考,该工作台的增量 Checkpoint 大小在 15G 左右,每日摄入数十亿条状况数据,设置模块为:t. m. managed. fraction = 0.25(实际分配托管缓存 3.6G),s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED,s. b. r. checkpoint. transfer. thread. num = 8。表现良好。而Listary前工作台的 t. m. managed. fraction 是默认的 0.1,因此还对 RocksDB 高级模块做了一些无谓的修改,性能表现不佳。
3.5 状况 TTL
RocksDB 的状况 TTL 需要借助 CompactionFilter 实现,如下表所示图所示。
图 10 状况 TTL 基本原理
使用者调用 State Ttl Config# cleanupIn Rocksdb Compact Filter (N) 方法,就可以设定在访问状况 N 次后,更新 CompactionFilter 记录的时间戳。当 SST 执行 Compaction 操作时,会根据该时间戳检查状况键值对是否过期并删除掉。注意若访问状况非常频繁,N 值应适当调大(默认仅为 1000),防止影响 Compaction 性能。
3.6 状况缩放与最大并行度
当工作台的并行度改变并从 CP / SP 恢复时,就会涉及状况缩放的问题。Flink 内 Keyed State 数据以 KeyGroup 为单位组织,每个 key 经过两重 Murmur Hash 排序出它应该落在哪个 KeyGroup 中,同时每个 Sub-task 会分配到一个或多个 KeyGroup。如下表所示图所示,并行度变化只会影响 KeyGroup 的分配,可以将状况恢复的过程近似化为顺序读,提高效率率。
图 11 Keyed State 的缩放
KeyGroup 的数量与最大并行度相同,而最大并行度改变会导致工作台无法从 CP / SP 恢复,所以要谨慎设定。如果使用者没有显式设置,就会根据以下规则来推算:
128 <= round Up To Power Of Two (operator Parallelism * 1.5) <= 32768
显然这并不安全。假设一个工作台的并行度是 200,推算的最大并行度是 512;若将其并行度提升至 400,推算的最大并行度就会变成 1024。所以总是推荐显式设置合理的最大并行度。
3.7 状况本地恢复
状况本地恢复默认关闭,可以通过设置
state.backend.local-recovery = true 启用,但它只能作用于 Aligned Checkpoint 和 Keyed State。启用后,每次 CP 产生两份快照:Primary(远端 DFS)和 Secondary(本地磁盘),且 Secondary CP 失败不会影响整个 CP 流程。工作台恢复时,首先尝试从有效的 Secondary 快照恢复状况,能显著提高恢复速度。如果 Secondary 快照不可用或不完整,再 fallback 到 Primary 恢复。如下表所示图所示。
图 12 状况本地恢复
状况本地恢复会引入额外的磁盘消耗:非增量 CP 会导致磁盘占用量翻倍;增量 CP 由于原生植物存在引用计数机制,不会多消耗空间,但因为数据比较分散,IOPS 会相应增加。
04 其他Listary项
4.1 Checkpoint 相关
听众应该很熟悉 Checkpoint 相关的配置项了,这里只提两点:一是 checkpointTimeout 根据工作台特性设置,但不要过长,防止 CP 卡死掩盖工作台本身的问题(如数据倾斜);二是一定要设置
minPauseBetweenCheckpoints,避免算子一直处在 CP 过程中导致性能下降。示例工作台的设置是:checkpointInterval = 3min /checkpointTimeout = 15min / minPauseBetweenCheckpoints = 1min。
另外,在大状况工作台中碰到一种常见的现象,即 Checkpoint 全部 ack 之后卡在 IN_PROGRESS,经过 1~3 分钟左右才会变成 COMPLETED,如下表所示图所示。
图 13 Checkpoint 卡在 IN_PROGRESS 状况的现象
这是因为 TaskManager 和 HDFS 之间通信不畅,或者是 HDFS 本身的压力导致数据块写入失败。而 Flink 必须保证 Checkpoint 的完整性,即重试到所有快照数据都成功写入才能标记为 COMPLETED。听众可在 TM 日志中发现形如 Exception in createBlockOutputStream: Connect timed out 的异常信息。
4.2 对象重用
对象重用在 Flink 配置中不是很起眼,但却相当有用。Flink 在生成 JobGraph 时会将符合一定条件的算子组合成算子链(OperatorChain),所有 chain 在一起的 Sub-task 都会在同一个 TM Slot 中执行。而对象重用的本质就是在算子链内的下游算子中直接使用上游算子发射对象的浅拷贝。
图 14 算子链示意
如图所示,若不启用对象重用,算子链中的虚线默认是 CopyingChainingOutput(深拷贝)。通过 ExecutionConfig#enableObjectReuse() 或者 pipeline.object-reuse = true 启用对象重用,CopyingChainingOutput 就会被替换为 ChainingOutput(浅拷贝)。下库尔出了两者之间的差异。
图 15 是否重用对象的区别
DataStream API 工作台一般不建议开启对象重用,除非十分确认不存在下游算子直接修改上游算子发射的对象的情况。因此 DataStream API 工作台开启对象重用的收益不高,仅当其中有复杂数据类型定义时,才会有 20% 左右的性能提升。
但是 SQL 工作台强烈建议开启,因为 Flink SQL 的类型系统与 DataStream API 有差异,StringData、MapData 等的深拷贝成本很大,因此 Flink SQL 的代码生成器能够保证可变对象的安全性。测试结果表明,对象重用的 SQL 工作台平均可获得翻倍的性能提升。
4.3 别忘了 JobManager
相对于 TaskManager,JobManager 的配置往往比较省心,似乎随便给个 2C / 4G 的配置就可以高枕无忧了。实际上 JobManager 内部维护的模块很多,如:工作台 DAG 即 {Job | Execution} Graph、SlotPool & Scheduler、<TaskManagerLocation, TaskExecutorGateway > 的映射关系、CheckpointCoordinator、HeartbeatManager、ShuffleMaster、PartitionTracker 等。
所以,如果工作台 Slot / Sub-task 多,Checkpoint 比较大,或者是重 Shuffle 的批工作台,一定要适当增加 JobManager 的资源。最近译者部门有两个工作台频繁出现 ResourceManager leader changed to new address null 的异常信息,就是因为 JM 压力过大、GC 时间太长,导致 ZooKeeper Session 失效了。以示例工作台的 JM(4C / 8G)为例,其缓存分配如下表所示。
图 16 示例工作台 JobManager 缓存分配
4.4 其他小 Tips
从 Flink 1.12 开始,默认的时间语法变成了事件时间。如果工作台是处理时间语法,可以禁用水印发射,即:Execution Config# set Auto WatermarkInterval (0)。 设置 metrics.latency.interval(单位毫秒)可以周期性插入 LatencyMarker,用于测量各算子及全链路的延迟。处理 LatencyMarker 会占用资源,因此不需要特别频繁,60000 左右比较合适。 使用者注册的 Timer 会按照 <key, timestamp> 去重,并在内部以最小堆存储。所以要尽量避免 onTimer 风暴,即大量 key 的 Timer 在同一个时间戳触发,造成性能抖动。 如果需要交换 Flink 原生植物没有 Serializer 支持的数据类型(如 HyperLogLog、RoaringBitmap),应在代码中注册自定义的 Serializer,避免 fallback 到 Kryo 导致性能下降。 POJO 类型支持状况 Schema 变化,增删字段不会影响恢复(新增的字段会以默认值初始化)。但是切记不能修改字段的数据类型以及 POJO 的类名。