Flink 状态管理与 checkpoint 容错机制 (二) | 字数总计: 2.1k | 阅读时长: 10 分钟 | 阅读量: 9 |
状态分类 Flink 支持两种状态 Keyed State
和 Operator State
。两类状态又都包括原始状态 row state
和托管状态 managed state
。
原始状态:由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte [] 来读写状态内容,对其内部数据结构一无所知。当实现一个用户自定义的 operator 时,会使用到原始状态
托管状态是由 Flink 框架管理的状态,通常在 DataStream 上的状态推荐使用托管的状态。
Keyed State 该类状态是基于 KeyedStream 上的状态,这个状态是根据特定的 key 绑定的,对 keyedStream 流上的每一个 key,都对应着一个 state。stream.keyBy(...)
数据结构:
ValueState: 即类型为 T 的单值状态。这个状态与对应的 key 绑定,是最简单的状态了。它可以通过 update 方法更新状态值,通过 value () 方法获取状态值
ListState: 即 key 上的状态值为一个列表。可以通过 add 方法往列表中附加值;也可以通过 get () 方法返回一个 Iterable 来遍历状态值。
ReducingState: 这种状态通过用户传入的 reduceFunction,每次调用 add 方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值。
MapState<UK, UV>: 即状态值为一个 map。用户通过 put 或 putAll 方法添加元素。
通过 value()
获取值,通过 update()
更新值,Keyed State 继承 RichFunction 类
private static int sourceCount = 0 ;public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend ("file:///D://hadoop//data//checkpoint" )); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000 ); DataStream<Tuple3<Integer, String, Integer>> source = env.addSource(new KeyedStateSource ()); source.keyBy(0 ) .window(TumblingProcessingTimeWindows.of(Time.seconds(2 ))) .apply(new KeyedStateRichFunction ()); env.execute(); } private static class KeyedStateSource implements SourceFunction <Tuple3<Integer, String, Integer>> { private Boolean isRunning = true ; private int count = 0 ; @Override public void run (SourceContext<Tuple3<Integer, String, Integer>> sourceContext) throws Exception { while (isRunning){ for (int i = 0 ; i < 10 ; i++) { sourceContext.collect(Tuple3.of(1 ,"ahah" ,count)); count++; } if (count>100 ){ System.out.println("err_________________" ); throw new Exception ("123" ); } sourceCount = count; Thread.sleep(2000 ); } } @Override public void cancel () { } } private static class KeyedStateRichFunction extends RichWindowFunction <Tuple3<Integer,String,Integer>, Integer, Tuple, TimeWindow> { private transient ValueState<Integer> state; private int count = 0 ; @Override public void apply (Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<Integer> collector) throws Exception { count=state.value(); for (Tuple3<Integer, String, Integer> item : iterable){ count++; } state.update(count); System.out.println("windows count:" +count+" all count:" + sourceCount); collector.collect(count); } @Override public void open (Configuration parameters) throws Exception { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor <Integer>( "average" , TypeInformation.of(new TypeHint <Integer>() {}), 0 ); state = getRuntimeContext().getState(descriptor); } }
代码详情:
加载数据源,每次 count=10
休眠 2s,当达到 count=100
时中断数据源,重新开始…
并把 window 窗口大小设置 2s 负责触发计算,观察每次 throw exception 后,能不能从之前的结果开始算…
输出:
all count:10 source count:10 all count:20 source count:20 all count:30 source count:30 all count:40 source count:40 all count:50 source count:50 all count:60 source count:60 all count:70 source count:70 all count:80 source count:80 all count:90 source count:90 all count:100 source count:100 err_________________ all count:110 source count:10 all count:120 source count:20
从结果可以看出达到了想要的效果,all count 的值并没有从 0 开始计算,而是从之前的结果计算。
Operator State 该类 State 与 key 无关,整个 operator 对应一个 state,该类 State 没有 Keyed Key 支持的数据结构多,仅支持 ListState。举例来说,Flink 中的 Kafka Connector,就使用了 operator state。它会在每个 connector 实例中,保存该实例中消费 topic 的所有 (partition, offset) 映射。
有两种方式:
CheckpointedFunction (很少使用,需要自己初始化) void snapshotState (FunctionSnapshotContext context) throws Exception;void initializeState (FunctionInitializationContext context) throws Exception;
public class BufferingSink implements SinkFunction <Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink (int threshold) { this .threshold = threshold; this .bufferedElements = new ArrayList <>(); } @Override public void invoke (Tuple2<String, Integer> value) throws Exception { bufferedElements.add(value); if (bufferedElements.size() == threshold) { for (Tuple2<String, Integer> element: bufferedElements) { } bufferedElements.clear(); } } @Override public void snapshotState (FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState (FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor <>( "buffered-elements" , TypeInformation.of(new TypeHint <Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } } }
ListCheckpointed (常用,Flink 自动初始化) List<T> snapshotState (long checkpointId, long timestamp) throws Exception; void restoreState (List<T> state) throws Exception;
仅支持 list state,ListCheckpointed 是 CheckpointedFunction 的限制版,它仅仅支持 Even-splitredistribution 模式的 list-style state。ListCheckpointed 定义了两个方法,分别是 snapshotState 方法及 restoreState 方法;snapshotState 方法在 master 触发 checkpoint 的时候被调用,用户需要返回当前的状态,而 restoreState 方法会在 failure recovery 的时候被调用,传递的参数为 List 类型的 state,方法里头可以将 state 恢复到本地.
private static int sourceCount = 0 ;public static void main (String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend ("file:///D://hadoop//data//checkpoint" )); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000 ); DataStream<Tuple4<Integer,String,String,Integer>> source = env.addSource(new OperatorStateSource ()); source.keyBy(0 ) .window(TumblingProcessingTimeWindows.of(Time.seconds(2 ))) .apply(new OperatorStateAppy ()); env.execute("" ); } private static class OperatorStateSource extends RichSourceFunction <Tuple4<Integer,String,String,Integer>> implements ListCheckpointed <UserState>{ private int count = 0 ; private boolean is_running = true ; @Override public List<UserState> snapshotState (long l, long l1) throws Exception { List<UserState> userStateList = new ArrayList <>(); UserState state = new UserState (); state.setCount(count); userStateList.add(state); return userStateList; } @Override public void restoreState (List<UserState> list) throws Exception { count = list.get(0 ).getCount(); System.out.println("OperatorStateSource restoreState: " +count); } @Override public void run (SourceContext<Tuple4<Integer, String, String, Integer>> sourceContext) throws Exception { while (is_running){ for (int i = 0 ; i < 10 ; i++) { sourceContext.collect(Tuple4.of(1 , "hello-" + count, "alphabet" , count)); count++; } sourceCount = count; Thread.sleep(2000 ); if (count>=100 ){ System.out.println("err_________________" ); throw new Exception ("exception made by ourself!" ); } } } @Override public void cancel () { is_running = false ; } } private static class OperatorStateAppy implements WindowFunction <Tuple4<Integer,String,String,Integer>,Integer,Tuple,TimeWindow>,ListCheckpointed<UserState>{ private int total = 0 ; @Override public List<UserState> snapshotState (long l, long l1) throws Exception { List<UserState> userStateList = new ArrayList <>(); UserState state = new UserState (); state.setCount(total); userStateList.add(state); return userStateList; } @Override public void restoreState (List<UserState> list) throws Exception { total = list.get(0 ).getCount(); } @Override public void apply (Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Integer, String, String, Integer>> iterable, Collector<Integer> collector) throws Exception { int count = 0 ; for (Tuple4<Integer,String,String,Integer> data:iterable){ count ++; } total = total + count; System.out.println("all count:" +total+" source count:" + sourceCount); collector.collect(total); } } static class UserState implements Serializable { private int count; public int getCount () { return count; } public void setCount (int count) { this .count = count; } }
输出:
all count:10 source count:10 all count:20 source count:20 all count:30 source count:30 all count:40 source count:40 all count:50 source count:50 all count:60 source count:60 all count:70 source count:70 all count:80 source count:80 all count:90 source count:90 all count:100 source count:100 err_________________ OperatorStateSource restoreState: 100 all count:110 source count:110
从结果可以看出达到了想要的结果,当数据源中断后,调用了 restore 方法,恢复了 state 的值。
总结:
两者的区别,实现 CheckpointedFunction 接口,有两种形式的 ListState API 可以使用,分别是 getListState 以及 getListUnionState,它们都会返回一个 ListState,但是他们在重新分区的时候会有区别,后面会详细介绍。如果我们直接实现 ListCheckpointed 接口,那么就会规定使用 ListState,不需要我们进行初始化,Flink 内部帮我们解决。
state 重分区 当我们在一个 job 中重新设置了一个 operator 的并行度之后,之前的 state 该如何被分配呢?下面就 ListState、ListUnionState 以及 BroadcastState 来说明如何进行重分区。
原文:https://github.com/heibaiying/BigData-Notes