flink读取mq数据实时流式写入hive并合并hdfs分区小文件

source以及sink连接参考flink官网不在赘述。

数据从mq读入flink然后定义hive sink流式写入,最后也在hdfs查到了写入的分区文件,但是到hive却查不到数据,观察源码:org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(
String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime =
extractor.extract(partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}

if (watermark > toMills(partTime) + commitDelay)这里点进toMills方法,发现并没有进行时区处理,所以我们重写这个类,在extract方法抽取分区的时候减去8小时,并在创建hive表时使用自定义分区提交抽取,这样才能提交分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues) {
String timestampString;
if (pattern == null) {
timestampString = partitionValues.get(0);
} else {
timestampString = pattern;
for (int i = 0; i < partitionKeys.size(); i++) {
timestampString =
timestampString.replaceAll(
"\\$" + partitionKeys.get(i), partitionValues.get(i));
}
}
return toLocalDateTime(timestampString);
}

public static LocalDateTime toLocalDateTime(String timestampString) {
try {
return LocalDateTime.parse(timestampString, TIMESTAMP_FORMATTER);
} catch (DateTimeParseException e) {
return LocalDateTime.of(
LocalDate.parse(timestampString, DATE_FORMATTER), LocalTime.MIDNIGHT);
}
}

public static long toMills(LocalDateTime dateTime) {
return TimestampData.fromLocalDateTime(dateTime).getMillisecond();
}

public static long toMills(String timestampString) {
return toMills(toLocalDateTime(timestampString));
}

重写类的extract方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.filesystem;

import org.apache.flink.table.data.TimestampData;

import javax.annotation.Nullable;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.format.ResolverStyle;
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
import java.util.List;

import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
import static java.time.temporal.ChronoField.YEAR;

/**
* Default {@link PartitionTimeExtractor}. See {@link
* FileSystemOptions#PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN}.
*/
public class DefaultPartTimeExtractor implements PartitionTimeExtractor {

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
.appendLiteral('-')
.appendValue(MONTH_OF_YEAR, 1, 2, SignStyle.NORMAL)
.appendLiteral('-')
.appendValue(DAY_OF_MONTH, 1, 2, SignStyle.NORMAL)
.optionalStart()
.appendLiteral(" ")
.appendValue(HOUR_OF_DAY, 1, 2, SignStyle.NORMAL)
.appendLiteral(':')
.appendValue(MINUTE_OF_HOUR, 1, 2, SignStyle.NORMAL)
.appendLiteral(':')
.appendValue(SECOND_OF_MINUTE, 1, 2, SignStyle.NORMAL)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
.optionalEnd()
.optionalEnd()
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

private static final DateTimeFormatter DATE_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
.appendLiteral('-')
.appendValue(MONTH_OF_YEAR, 1, 2, SignStyle.NORMAL)
.appendLiteral('-')
.appendValue(DAY_OF_MONTH, 1, 2, SignStyle.NORMAL)
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

@Nullable private final String pattern;

public DefaultPartTimeExtractor(@Nullable String pattern) {
this.pattern = pattern;
}

@Override
public LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues) {
String timestampString;
if (pattern == null) {
timestampString = partitionValues.get(0);
} else {
timestampString = pattern;
for (int i = 0; i < partitionKeys.size(); i++) {
timestampString =
timestampString.replaceAll(
"\\$" + partitionKeys.get(i), partitionValues.get(i));
}
}
return toLocalDateTime(timestampString).plusHours(-8);
}

public static LocalDateTime toLocalDateTime(String timestampString) {
try {
return LocalDateTime.parse(timestampString, TIMESTAMP_FORMATTER);
} catch (DateTimeParseException e) {
return LocalDateTime.of(
LocalDate.parse(timestampString, DATE_FORMATTER), LocalTime.MIDNIGHT);
}
}

public static long toMills(LocalDateTime dateTime) {
return TimestampData.fromLocalDateTime(dateTime).getMillisecond();
}

public static long toMills(String timestampString) {
return toMills(toLocalDateTime(timestampString));
}
}

我们查看官网也可以发现时区的问题:

创建三级/二级/一级分区hive表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE external TABLE if not exists awsLog
(
actionsExecuted STRING,
chosenCertArn STRING,
classification STRING,
classificationReasonf STRING,
clientPort STRING,
domainName STRING,
elb STRING,
elbStatusCode STRING,
errorReason STRING,
matchedRulePriority STRING,
receivedBytes STRING,
redirectUrl STRING,
request STRING,
requestCreationTime STRING,
requestProcessingTime STRING,
responseProcessingTime STRING,
sentBytes STRING,
sslCipher STRING,
sslProtocol STRING,
targetGroupArn STRING,
targetPort STRING,
targetPortList STRING,
targetProcessingTime STRING,
targetStatusCode STRING,
targetStatusCodeList STRING,
`time` STRING,
traceId STRING,
type STRING,
userAgent STRING
) partitioned by (dt string,hr string,mm string)
stored as PARQUET
location '/sunqi/awsLog/'
TBLPROPERTIES (
'partition.time-extractor.kind' = 'custom',
'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00',
'partition.time-extractor.class' = 'com.topsky.aws.userlogin.functions.MyPartTimeExtractor',
'sink.partition-commit.policy.kind' = 'metastore,success-file,custom',
'sink.partition-commit.policy.class' = 'com.topsky.aws.userlogin.functions.ParquetFileMergingCommitPolicy'
);

hive查到数据后,问题又来了,数据量很大时,发现hive每个分区出现了大量的小文件:

所以这里需要自定义分区一个分区提交策略,将textfile手动合并为parquet列式存储文件,这里参考网上一位大佬的demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package me.lmagics.flinkexp.hiveintegration.util;

import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

@Override
public void commit(Context context) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();

List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), partitionPath);

MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", schema.toString());

Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}

private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
List<Path> result = new ArrayList<>();

RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}

return result;
}

private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
if (files.size() == 0) {
return null;
}

HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();

reader.close();
return schema;
}

private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

for (Path file : files) {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while((data = reader.read()) != null) {
writer.write(data);
}
reader.close();
}
writer.close();

for (Path file : files) {
fs.delete(file, false);
}

return mergeDest;
}
}

再次写入,问题得到解决:

分区提交策略说明:

参考官网:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/

参考文章:

https://blog.csdn.net/nazeniwaresakini/article/details/107811860

https://blog.csdn.net/m0_37592814/article/details/108044830

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信