flink实时数仓-dws层双流join

​ 在ods进行维度表和事实表关联形成dwd层宽表后,分别可以得到orderinfo和orderdetail两张宽表,这时可以将订单信息和订单详情表从kafka消费并使用flink intervalJoin和ProcessJoinFunction再次进行合流并去重形成订单总表,这样就可以将聚合好的数据写入OLAP例如clickhouse中,方便后续其他实时需求的实现SQL化,同时也可以将合流后的数据再次写入kafka形成dws层宽表方便数据的再次加工,最后实现ads层实时需求。

整体架构:

程序流图:

代码实现 java版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package dws;

import bean.OrderDetail;
import bean.OrderDetailWide;
import bean.OrderInfo01;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
* Copyright(c) 2020-2021 sparrow All Rights Reserved
* Project: gmall2020-parent
* Package: dws
* ClassName: OrderDetailWideApp
*
* @author 18729 created on date: 2020/12/14 16:29
* @version 1.0
* @since JDK 1.8
*/
public class OrderDetailWideApp {
public static void main(String[] args) throws Exception {
String kafkaBrokers = "hadoop01:9092";
String zkBrokers = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
String topic = "DWD_ORDER_INFO";
String groupId = "dws_order_info_group";

String topic01="DWD_ORDER_DETAIL";
String groupId01="dws_order_detail_group";

System.out.println("===============》 flink任务开始 ==============》");


Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 4550);
//本地env
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(4);
//生产env
//val env = StreamExecutionEnvironment.getExecutionEnvironment

// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置kafka连接参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBrokers);
properties.setProperty("zookeeper.connect", zkBrokers);
properties.setProperty("group.id", groupId);

Properties properties01 = new Properties();
properties01.setProperty("bootstrap.servers", kafkaBrokers);
properties01.setProperty("zookeeper.connect", zkBrokers);
properties01.setProperty("group.id", groupId01);

//设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置检查点时间间隔
// env.enableCheckpointing(5000);
// env.setStateBackend( new MemoryStateBackend());
//创建kafak消费者,获取kafak中的数据
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaData = env.addSource(kafkaConsumer);
DataStream<String> orderInfo = kafkaData.map(new MapFunction<String, String>() {
@Override
public String map(String s) {
// System.out.println(">>>>>>接收topic报文:"+s+"<<<<<");
return s;
}
});

FlinkKafkaConsumer<String> kafkaConsumer01 = new FlinkKafkaConsumer<>(topic01, new SimpleStringSchema(), properties01);
DataStreamSource<String> kafkaData01 = env.addSource(kafkaConsumer01);
SingleOutputStreamOperator<String> orderDetail = kafkaData01.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// System.out.println(">>>>>>接收topic报文:" + value + "<<<<<");
return value;
}
});


// 双流join
SingleOutputStreamOperator<OrderInfo01> orderInfo01 = orderInfo.map(new RichMapFunction<String, OrderInfo01>() {
@Override
public OrderInfo01 map(String value) throws Exception {
OrderInfo01 orderInfo = (OrderInfo01) JSON.parseObject(value, Class.forName("bean.OrderInfo01"));
return orderInfo;
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo01>() {
@SneakyThrows
@Override
public long extractAscendingTimestamp(OrderInfo01 element) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(element.getCreate_time());
long time = date.getTime();
return time * 1000L;
}
}).filter(new FilterFunction<OrderInfo01>() {
@Override
public boolean filter(OrderInfo01 value) throws Exception {
return value.getOrder_status().equals("1004");
}
});

SingleOutputStreamOperator<OrderDetail> orderDetail01 = (SingleOutputStreamOperator<OrderDetail>) orderDetail.map(new RichMapFunction<String, OrderDetail>() {
@Override
public OrderDetail map(String value) throws Exception {
OrderDetail orderDetail = (OrderDetail) JSON.parseObject(value, Class.forName("bean.OrderDetail"));
return orderDetail;
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderDetail>() {
@SneakyThrows
@Override
public long extractAscendingTimestamp(OrderDetail element) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(element.getCreate_time());
long time = date.getTime();
return time * 1000L;
}
});

orderInfo01
.keyBy(OrderInfo01::getId)
.intervalJoin(orderDetail01.keyBy(OrderDetail::getOrder_id))
.between(Time.minutes(-5), Time.minutes(5))
.process(new TimeJoinFunction())
.print();


// 订单金额分摊 略

// 写入clickhouse 略

// .print();

env.execute();
}

private static class TimeJoinFunction extends ProcessJoinFunction<OrderInfo01,OrderDetail,OrderDetailWide> {
@Override
public void processElement(OrderInfo01 left, OrderDetail right, Context ctx, Collector<OrderDetailWide> out) throws Exception {
out.collect(new OrderDetailWide(left,right));
}
}
}

webUI反压监控正常:

dws层写入clickhouse

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信