发布网友 发布时间:2022-10-26 07:40
共1个回答
热心网友 时间:2023-09-14 01:25
Checkpointing 是 Flink 故障恢复的内部机制。一个 checkpoint 就是 Flink应用程序产生的状态的一个副本。如果 Flink 任务发生故障,它会从 checkpoint 中载入之前的状态来恢复任务,就好像故障没有发生一样。
Checkpoints是 Flink 容错的基础,并且确保了 Flink 流式应用在失败时的完整性。Checkpoints 可以通过 Flink 设置定时触发。
Flink Kafka consumer 使用 Flink 的 checkpoint 机制来存储 Kafka 每个分区的位点到 state。当 Flink 执行 checkpoint 时,Kafka 的每个分区的位点都被存储到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 机制确保了所有任务算子的状态是一致的,也就是说这些状态具有相同的数据输入。当所有的任务算子成功存储他们自己的状态后,代表一次 checkpoint 的完成。因此,当任务从故障中恢复时,Flink 保证了exactly-once。
下面将一步一步的演示 Flink 是如何通过 checkpoint 来管理 Kafka 的 offset 的。
下面的例子从两个分区的 Kafka topic 中读取数据,每个分区的数据是 “A”, “B”, “C”, ”D”, “E”。假设每个分区都是从 0 开始读取。
假设 Flink Kafka consumer 从分区 0 开始读取数据 “A”,那么此时第一个 consumer 的位点从 0 变成 1。如下图所示。
此时数据 “A” 到达 Flink Job 中的 Map Task。两个 consumer 继续读取数据 (从分区 0 读取数据 “B” ,从分区 1 读取数据 “A”)。 offsets 分别被更新成 2 和 1。与此同时,假设 Flink 从 source 端开始执行 checkpoint。
到这里,Flink Kafka consumer tasks 已经执行了一次快照,offsets也保存到了 state 中(“offset = 2, 1”) 。此时 source tasks 在 数据 “B” 和 “A” 后面,向下游发送一个 checkpoint barrier。checkpoint barriers 是 Flink 用来对齐每个任务算子的 checkpoint,以确保整个 checkpoint 的一致性。分区 1 的数据 “A” 到达 Flink Map Task, 与此同时分区 0 的 consumer 继续读取下一个消息(message “C”)。
Flink Map Task 收到上游两个 source 的 checkpoint barriers 然后开始执行 checkpoint ,把 state 保存到 filesystem。同时,消费者继续从Kafka分区读取更多事件。
假设 Flink Map Task 是 Flink Job 的最末端,那么当它完成 checkpoint 后,就会立马通知 Flink Job Master。当 job 的所有 task 都确认其 state 已经 “checkpointed”,Job Master将完成这次的整个 checkpoint。 之后,checkpoint 可以用于故障恢复。
如果发生故障(例如,worker 挂掉),则所有任务将重启,并且它们的状态将被重置为最近一次的 checkpoint 的状态。 如下图所示。
source 任务将分别从 offset 2 和 1 开始消费。当任务重启完成, 将会正常运行,就像之前没发生故障一样。
PS: 文中提到的 checkpoint 对齐,我说下我的理解,假设一个 Flink Job 有 Source -> Map -> Sink,其中Sink有多个输入。那么当一次checkpoint的 barrier从source发出时,到sink这里,多个输入需要等待其它的输入的barrier已经到达,经过对齐后,sink才会继续处理消息。这里就是exactly-once和at-least-once的区别。
The End
原文链接: How Apache Flink manages Kafka consumer offsets