flink watermark传递机制

Generating Watermarks with awareness for Kafka-partitions

watermark是衡量eventtime进展机制的时间,经常用来处理乱序数据,常常结合window使用

生产环境一般使用周期性生成watermark,系统默认的周期是200ms,可以在代码中自定义:

1
env.getConfig.setAutoWatermarkInterval(5000);

对于有eventtime的数据流,可以分为两大类:时间单调递增,乱序时间数据流,一般在生产环境中不能保证数据的准确性,所以使用BoundedOutOfOrdernessTimestampExtractor允许2s内的数据迟到:

1
2
3
4
5
6
7
SingleOutputStreamOperator<String> input=env.socketTextStream("hadoop01",7777).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(2)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[1]) * 1000L;
}
});

在这之后要对数据进行开窗口计算,对于窗口时间和2s允许迟到的时间,以及allowedLateness内的时间也就是加起来4s的时间,会在主流输出。

对于超出允许迟到4s的数据,则会被送往侧输出流sideOutputLateData每来一条就进行侧输出。

1
2
3
4
5
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowedStream = keyedStream
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(new OutputTag<Tuple2<String, Integer>>("sideOutPut") {
});

新发布的flink 1.12中,官方对生成watermark的api进行了升级,旧版本的api预计在不久的未来会被淘汰,但是原理是通用的。

在多并行度时,算子向下游发送watermark是同时发送的,且同一个算子如果有多个watermark取最小。在生成watermark之前则是轮询发送。

下图是在map算子之前生成watermark:

在map算子之后生成watermark:

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信