Flink 广播状态
| 字数总计:845|阅读时长:3 分钟 | 阅读量:17|
背景
有这样一个需求:flink 或者 spark 任务需要访问数据库,或者用到表 schema 信息。但此时数据库中的字段有添加或者修改时 (schama 发生改变的时候),这时候任务就会失败。最直接的做法就是重启 flink 或 spark 任务,但该做法会对业务数据造成一定的影响。
方案:将改动的 schema 信息放入 redis 中,再通过 broadcast 广播的方式传送给数据流。
flink broadcast state
Broadcast State 是 Flink 支持的一种 Operator State。使用 Broadcast State,可以在 Flink 程序的一个 Stream 中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个 Task 中,使得这些数据记录能够为所有的 Task 所共享,比如一些用于配置的数据记录。这样,每个 Task 在处理其所对应的 Stream 中记录的时候,读取这些配置,来满足实际数据处理需要。
步骤:
定义一个 MapStateDescriptor 来描述要广播数据的地址
添加数据源,并注册为广播流
连接广播流和处理数据的流
实现连接的处理方法
private static final MapStateDescriptor<String, TableSchema> mapStateDescriptor = new MapStateDescriptor<>( "broadcast", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<TableSchema>() { })); public static void main(String[] args){
TableSchema meta = getLatestMeta(..);
KeyedStream<String,String> keyedStream = env.addSource(..) .map(..) .returns(String.class) .keyby(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } });
BroadcastStream<TableSchema> broadcastStream = env.addSource(..) .map(..) .returns(TableSchema.class) .broadcast(mapStateDescriptor); keyedStream.connect(broadcastStream) .process(new KeyedBroadcastProcessFunction<String, String, TableSchema, String>(){ @Override public void processBroadcastElement(TableSchema value, Context ctx, Collector<String> out){ TableSchema old = ctx.getBroadcastState(mapStateDescriptor).get("id"); System.out.println("old value:"+old+",new value:"+value); state.put("id", value); } @Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) { TableSchema meta = ctx.getBroadcastState(mapStateDescriptor).get("id");
} }); }
|
spark streaming broadcast
我们知道 spark 的广播变量允许换成一个只读的变量在每台机器上面,而不是每个任务保存一份。常见于 spark 在一些全局统计的场景中应用。通过广播变量,能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark 也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本
一个广播变量可以通过调用 sparkContext.broadcast (v) 方法从一个初始变量 v 中创建。广播变量是 v 的一个包装变量,它的值可以通过 value 方法访问,例如:
public static void main(String[] args){ JavaStreamingContext sc = new JavaStreamingContext(conf); JavaPairInputDStream<String,String> kafka = KafkaUtils.createStream(...); kafka.repartition(30) .transform(...) .foreachRDD(new VoidFunction<JavaRDD<String>>() { public void call(JavaRDD<String> rdd) { final Broadcast<String> cast = JavaSparkContext .fromSparkContext(rdd.context()) .broadcast("broadcast value"); rdd.foreach(new VoidFunction<String>() { public void call(String v) throws Exception { System.out.println(cast.value()); } }); } }); sc.start(); }
|
原文:https://github.com/heibaiying/BigData-Notes