日活需求(kafka精准一次性消费) 分别用sparkstreaming和flink实现

指标需求:求出当日新增日活,并通过kibana按照需求做实时展示

实现思路1:sparkstreaming消费kafka数据,使用redis保存kafka偏移量,确保程序意外退出后能从之前的偏移量继续消费,并保存至es做去重。

实现思路2:flink消费kafka数据,使用状态后端(state backend)保存data source中来自kafka的偏移量,确保程序宕机后重启能从之前的偏移位置重新消费。

端到端exactly-once实现:

source端:kafka偏移量

内部:sparkingstreaming:redis做去重同时保存偏移量

flink:state backend状态后端

sink端:es的id不可重复,做幂等性写入

项目局部架构:

sparkingstreaming流程图:

flink端到端状态一致性过程:

demo:

sparkstreaming使用scala实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
object DauApp {
def main(args: Array[String]): Unit = {
//创建配置文件对象 注意:Streaming程序至少不能设置为local,至少需要2个线程
val conf: SparkConf = new SparkConf().setAppName("Spark01_W").setMaster("local[4]")

//创建Spark Streaming上下文环境对象O
val ssc = new StreamingContext(conf,Seconds(3))

val gmallstartup = "GMALL_STARTUP_0105"
val daugroup = "DAU_GROUP"

//使用偏移量工具类从redis获取上一次的kafka偏移量
val partitionToLong = util.OffsetManager.getOffset(gmallstartup, daugroup)

//判断是否第一次消费,如果不是则从偏移量开始消费数据流
var inputStream: InputDStream[ConsumerRecord[String, String]]=null
if (partitionToLong!=null&&partitionToLong.size>0){
inputStream = util.MyKafkaUtil.getKafkaStream(gmallstartup, ssc, partitionToLong, daugroup)
}else{
inputStream=util.MyKafkaUtil.getKafkaStream(gmallstartup,ssc)
}

//得到本批次的偏移量的结束位置,用于更新redis中的偏移量
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //driver? executor? //周期性的执行
rdd
}

//补充日志json时间字段
val value1 = inputGetOffsetDstream.map(record => {
val str = record.value()
val nObject = JSON.parseObject(str)
val long = nObject.getLong("ts")
val str1 = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(long))
val strings = str1.split(" ")
nObject.put("dt", strings(0))
nObject.put("hr", strings(1))
nObject
})

//写入redis以及设置保存时间为24小时,并通过是否写入redis成功判断过滤条数
val value2 = value1.mapPartitions(iter => {
val client = util.RedisUtil.getJedisClient
val buffer = new ListBuffer[JSONObject]()
val list = iter.toList
println("过滤前:" + list.size)
for (jsonObj <- list) {
val str = jsonObj.getString("dt")
val str1 = jsonObj.getJSONObject("common").getString("mid")
val str2 = "dau:" + str
val long = client.sadd(str2, str1)
client.expire(str2, 3600 * 24)
if (long == 1) {
buffer += jsonObj
}
}
client.close()
println("过滤后:" + buffer.size)
list.toIterator
})


//写入es
value2.foreachRDD { rdd => {
rdd.foreachPartition(rdd => {
val list = rdd.toList
val tuples = list.map(jsonObj => {
val nObject = jsonObj.getJSONObject("common")
val info = bean.DauInfo(nObject.getString("mid"),
nObject.getString("uid"),
nObject.getString("ar"),
nObject.getString("ch"),
nObject.getString("vc"),
jsonObj.getString("dt"),
jsonObj.getString("hr"),
"00",
jsonObj.getLong("ts"))
(info.mid, info)
})
val str = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
util.MyEsUtil.bulkDoc(tuples, "gmall_dau_info_" + str)
})
}
util.OffsetManager.saveOffset(gmallstartup, daugroup, offsetRanges)
}

// value.map(_.value()).print()
//启动采集器
ssc.start()
//默认情况下,上下文对象不能关闭
//ssc.stop()
//等待采集结束,终止上下文环境对象
ssc.awaitTermination()
}
}

使用到的utils工具类可以去GitHub拉取

flink使用java实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package app;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.text.SimpleDateFormat;
import java.util.*;

/**
* Copyright(c) 2020-2021 sparrow All Rights Reserved
* Project: gmall2020-parent
* Package: app
* ClassName: DauApp01
*
* @author 18729 created on date: 2020/12/8 11:55
* @version 1.0
* @since JDK 1.8
*/
public class DauApp01 {
public static void main(String[] args) throws Exception {
String kafkaBrokers = "hadoop01:9092";
String zkBrokers = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
String topic = "GMALL_STARTUP_0105";
String groupId = "DAU_GROUP";

System.out.println("===============》 flink任务开始 ==============》");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置kafka连接参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBrokers);
properties.setProperty("zookeeper.connect", zkBrokers);
properties.setProperty("group.id", groupId);
//设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置检查点时间间隔
env.enableCheckpointing(5000);
env.setStateBackend( new MemoryStateBackend());
//创建kafak消费者,获取kafak中的数据
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaData = env.addSource(kafkaConsumer010);
DataStream<String> userData = kafkaData.map(new MapFunction<String, String>() {
@Override
public String map(String s) {
System.out.println(">>>>>>接收topic报文:"+s+"<<<<<");
return s;
}
});

List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop01", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
JSONObject jsonObject = JSON.parseObject(element);

Long ts = jsonObject.getLong("ts");
String format = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts));
String[] s = format.split(" ");
jsonObject.put("dt",s[0]);
jsonObject.put("hr",s[1]);

String common = jsonObject.getString("common");
JSONObject jsonObject1 = JSON.parseObject(common);

json.put("mid",jsonObject1.getString("mid"));
json.put("uid",jsonObject1.getString("uid"));
json.put("ar",jsonObject1.getString("ar"));
json.put("ch",jsonObject1.getString("ch"));
json.put("vc",jsonObject1.getString("vc"));
json.put("dt",jsonObject.getString("dt"));
json.put("hr",jsonObject.getString("hr"));
json.put("mi","00");
json.put("ts",jsonObject.getLong("ts"));
System.out.println("data:"+element);

return Requests.indexRequest()
.index("gmall_dau_info_" + jsonObject.getString("dt"))
.type("_doc")
.id(jsonObject1.getString("mid"))
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);

esSinkBuilder.setBulkFlushMaxActions(1);
// esSinkBuilder.setRestClientFactory(
// restClientBuilder -> {
// restClientBuilder.setDefaultHeaders()
// }
// );
esSinkBuilder.setRestClientFactory(new util.RestClientFactoryImpl());
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

userData.addSink(esSinkBuilder.build());

env.execute("flink-task");

}

}

测试:

将sparkstreaming任务宕机,然后在打开,从redis读取检查点位置继续消费:

将flink任务宕机,然后在打开,能够继续从检查点消费:

通过对比可以发现,使用flink代码比使用sparkstreaming简洁很多,原因在于flink内部有保存状态的状态后端,同时sparkstreaming基于微批次处理,flink基于流式处理在数据处理速度上页更加流畅。

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信