何为状态?

计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算。比如 wordcount, 给一些 word, 其计算它的 count, 这是一个很常见的业务场景。count 做为输出,在计算的过程中要不断的把输入累加到 count 上去,那么 count 就是一个 state。

在批处理过程中,数据是划分为块分片去完成的,然后每一个 Task 去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于 state 的需求还是比较小的。

在流处理过程中,对 State 有非常高的要求,因为在流系统中输入是一个无限制的流,会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来

检查点 checkpoint 与 Barrier

checkpoint【可以理解为 checkpoint 是把 state 数据持久化存储了】,则表示 Flink job 在一个特定时刻的一份全局状态快照,即包含了所有的 task/operator 的状态。

Checkpoint 是 Flink 实现容错机制最核心的功能,它能根据配置周期性地基于 Stream 中各个 Operator/Task 的状态来生成快照,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

Flink 分布式快照算法 Asynchronous Barrier Snapshots 算法借鉴了经典的 Chandy-Lamport 算法的主要思想,同时做了一些改进。Lightweight Asynchronous Snapshots for Distributed Dataflows

Chandy-Lamport 算法

分布式系统是一个包含有限进程和有限消息通道的系统,这些进程和通道可以用一个有向图描述,其中节点表示进程,边表示通道。如下图所示:p、q 分别是进程,c->c’则是消息通道,分布式系统快照是了保存分布式系统的 state。分布式系统 State 是由进程状态和通道状态组成的。

img

  • Event:分布式系统中发生的一个事件,在类似于 Flink 这样的分布式计算系统中从 Source 输入的新消息相当于一个事件

  • 进程状态:包含一个初始状态(initial state),和持续发生的若干 Events。初始状态可以理解为 Flink 中刚启动的计算节点,计算节点每处理一条 Event,就转换到一个新的状态。

  • 通道状态:我们用在通道上传输的消息(Event)来描述一个通道的状态。

进程 p 启动这个算法,记录自身状态,并发出 Marker。随着 Marker 不断的沿着分布式系统的相连通道逐渐传输到所有的进程,所有的进程都会执行算法以记录自身状态和入射通道的状态,** 待到所有进程执行完该算法,一个分布式 Snapshot 就完成了记录。**Marker 相当于是一个信使,它随着消息流流经所有的进程,通知每个进程记录自身状态。且 Marker 对整个分布式系统的计算过程没有任何影响。只要保证 Marker 能在有限时间内通过通道传输到进程,每个进程能够在有限时间内完成自身状态的记录,这个算法就能在有限的时间内执行完成。

Flink 分布式快照算法 ABS

在 ABS 算法中用 Barrier 代替了 C-L 算法中的 Marker。

  1. Barrier 周期性的被注入到所有的 Source 中,Source 节点看到 Barrier 之后,就会立即记录自己的状态,然后将 Barrier 发送到 Transformation Operator。

  2. 当 Operator 从某个 input channel 收到 Barrier 之后,会立即 Block 住这条通道,直到收到所有的 input channel 的 Barrier,此时 Operator 会记录自身状态,并向自己所有的 output channel 广播 Barrier。

  3. Sink 接受 Barrier 的操作流程与 Transformation Operator 一样。当所有的 Barrier 都到达 Sink 之后,并且所有的 Sink 也完成了 Checkpoint,这一轮 Snapshot 就完成了。

img

上述算法中 Block Input 实际上是有负面效果的,一旦某个 input channel 发生延迟,Barrier 迟迟未到,就会导致 Operator 上的其他通道全部堵塞,导致系统吞吐下降。但有个一好处是可以实现 Exactly Once

一轮快照整个执行流程如下所示:

flink_ckp_flow

Checkpoint 统一由 JobManager 发起,中间涉及到 JobManager 和 TaskManager 的交互,一轮快照可以分为 4 个阶段:

  • JobManager checkpoint 的发起

全局协调控制的核心抽象是 CheckpointCoordinator,发起时的 checkpoint 被抽象成 PendingCheckpoint,向所有的 Source 节点发送 barrier。图中第一步

  • barrier 的传递

当 operator 收到所有 input channel 的 barrier 之后,将 barrier 传递给下一个 operator/task。图中第二步

  • operator/task 的 checkpoint

当 operator/task 收到所有 input channels 的 barrier,本地计算完成后,进行状态持久化。图中第三步

  • ack 消息回传

当 TaskManager 完成本地备份之后,并将数据的地址以及快照句柄等通过 akka 以 ack 消息的形式发送给 CheckpointCoordinator,由其负责维护这一轮快照的全局状态视图。当 CheckpointCoordinator 收到所有的 ack 消息后,此时 checkpoint 的状态由 PendingCheckpoint 变为 CompletedCheckpoint。此时一次 checkpoint 完成。图中剩余步骤

单流

stream_barriers

  • 每个 Barrier 携带着快照的 ID,快照记录着 ID,并将其放在快照数据的前面。

  • 单流时两个 Barrier 之间的数据,存储在相应的 barrierID 中,例如 barrier n-1 和 n 之间的数据存储在 Barrier n 中。

多流

stream_aligning

  • 比如此 operator 有两个输入流,当收到第一个流的 barrier n 时,下一个流的 barrier n-1 还有数据流入,此时会先临时搁置此流的数据,将数据放入缓存 buffer 中,即 1 2 3 临时存储起来。待所有输入通道都收到了 barrier n 时,此时所有之前的数据都是 barrier n-1 的数据。然后该 operator 会释放 buffer 中的数据,继续处理。

  • 虽然该方法有效的实现了 Exactly Once,但是一旦某个 input channel 发生延迟,Barrier 迟迟未到,这会导致 Transformation Operator 上的其它通道全部堵塞,系统吞吐大幅下降。Flink 提供了选项,可以关闭 Exactly once 并仅保留 at least once。

checkpoint 的存储

img

用户可以根据自己的需求选择,如果数据量较小,可以存放到 MemoryStateBackend 和 FsStateBackend 中,如果数据量较大,可以放到 RockDB 中。

HeapStateBackend

img

MemoryStateBackend 和 FsStateBackend 都是存储在内存中,被保存在一个由多层 Java Map 嵌套而成的数据结构中,默认不超过 5M。优点:速度快,缺点:容量小

RocksDBStateBackend

img

RockDBKeyedStateBackend 每个 State 单独存储在一个 ColumnFamily 中。会在本地文件系统中维护状态,state 会直接写入本地 rocksdb 中。同时 RocksDB 需要配置一个远端的 filesystem。uri(一般是 HDFS),在做 checkpoint 的时候,会把本地的数据直接复制到 filesystem 中。fail over 的时候从 filesystem 中恢复到本地。RocksDB 克服了 state 受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

RocksDB 全量快照

img

全量 checkpoint 会在每个节点做备份数据时,需要将数据都遍历一遍,然后写入到外部存储中,这种情况会影响备份性能。

RocksDB 自身的 snapshot 全量写出,主要步骤如下:

  1. 拿到 RocksDB 自身的 snapshot 对象

  2. 通过 CheckpointStreamFactory 拿到 CheckpointStateOutputStream 作为快照写出流

  3. 分别将快照的 meta 信息和数据写到 2 对应的输出流中

  4. 拿到 2 输出流的句柄,获取状态 offset,将 k-v 数据读取到 RocksDB 中,这里要注意的是快照时留下的 meta 起始标志位【标志一个新的 state 起始或者一个 keyGroup 结束】,快照恢复时需要复原.

  5. 将 RocksDB 的快照对象及一些辅助资源释放

rocksdb-full-spt-data-write-format.png

RocksDB 增量快照

img

RocksDB 的数据会更新到内存,当内存满时,会写入到磁盘中。增量的机制会将新产生的文件 copy 持久化中,而之前产生的文件就不需要 COPY 到持久化中了。这种方式减少了 COPY 的数据量,并提高性能。

原文:https://github.com/heibaiying/BigData-Notes