clickhouse作为实时和离线计算引擎从kafka消费数据

ClickHouse is capable of generating analytical data reports in real time, with sub-second latencies

​ 由于clickhouse超高的查询性能,和使用sql开发的低成本,我们不禁想到让clickhouse作为计算引擎和存储介质直接分析存储在其中的海量数据,并配合其MergeTree表引擎特有的TTL功能做实时数据分析。

本文使用到的表引擎:

kafka

Distributed

ReplicatedMergeTree

使用kafka-eagle创建topic:

在控制台创建kafka消费者并指定消费者组与kafka表中的kafka_group_name保持一致:

1
bin/kafka-console-consumer.sh --topic clickhouse_test01 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --group clickhouse_group

clickhouse集群副本及其分片配置如下:

1
2
3
4
5
┌─cluster──────────────────────────────────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address──┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
│ gmall_cluster │ 1 │ 1 │ 1 │ hadoop01 │ 192.168.150.4 │ 9000 │ 0 │ default │ │ 0 │ 0 │
│ gmall_cluster │ 1 │ 1 │ 2 │ hadoop02 │ 192.168.150.5 │ 9000 │ 1 │ default │ │ 0 │ 0 │
│ gmall_cluster │ 2 │ 1 │ 1 │ hadoop03 │ 192.168.150.6 │ 9000 │ 0 │ default │ │ 0 │ 0 │

首先使用kafka表引擎创建集群表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE default.kafka_src_table ON CLUSTER gmall_cluster
(
`id` Int32,
`age` Int32,
`msg` String
)
ENGINE = Kafka()
SETTINGS kafka_broker_list = 'hadoop01:9092,hadoop02:9092,hadoop03:9092', kafka_topic_list = 'clickhouse_test01', kafka_group_name = 'clickhouse_group', kafka_format = 'JSONEachRow'

┌─host─────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ hadoop02 │ 90000 │ │ 20
│ hadoop03 │ 90000 │ │ 10
│ hadoop01 │ 90000 │ │ 00
└──────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

3 rows in set. Elapsed: 0.130 sec.

使用ReplacingMergeTree 弱幂等性表引擎创建集群分片表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE default.kafka_table_local ON CLUSTER gmall_cluster
(
`id` Int32,
`age` UInt32,
`msg` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/kafka_sink_table/{shard}', '{replica}')
ORDER BY id

┌─host─────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ hadoop02 │ 9000 │ 0 │ │ 2 │ 0 │
│ hadoop03 │ 9000 │ 0 │ │ 1 │ 0 │
│ hadoop01 │ 9000 │ 0 │ │ 0 │ 0 │
└──────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

3 rows in set. Elapsed: 0.294 sec.

使用distributed引擎创建分布式集群表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE kafka_table_distributed ON CLUSTER gmall_cluster
(
`id` Int32,
`age` UInt32,
`msg` String
)
ENGINE = Distributed(gmall_cluster, default, kafka_table_local, id)

┌─host─────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ hadoop02 │ 9000 │ 0 │ │ 2 │ 0 │
│ hadoop03 │ 9000 │ 0 │ │ 1 │ 0 │
│ hadoop01 │ 9000 │ 0 │ │ 0 │ 0 │
└──────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

3 rows in set. Elapsed: 0.137 sec.

创建视图从kafka表转储数据到集群表:

1
CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

然后开启kafka生产者生产数据:

1
bin/kafka-console-producer.sh --topic clickhouse_test01 --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092

因为上面指定的kafka表中消费数据格式为json,故这里生产json格式的测试数据:

1
2
3
4
5
6
7
8
9
10
{"id":3,"age":22,"msg":"su"}
{"id":1,"age":20,"msg":"sunqi"}
{"id":2,"age":21,"msg":"a"}
{"id":3,"age":22,"msg":"su"}
{"id":4,"age":22,"msg":"suqi"}
{"id":5,"age":203,"msg":"nqi"}
{"id":5,"age":203,"msg":"nqi"}
{"id":5,"age":203,"msg":"nqi"}
{"id":5,"age":203,"msg":"nqi"}

查询集群表中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SELECT *
FROM kafka_table_distributed

┌─id─┬─age─┬─msg──┐
221 │ a │
422 │ suqi │
└────┴─────┴──────┘
┌─id─┬─age─┬─msg─┐
5203 │ nqi │
└────┴─────┴─────┘
┌─id─┬─age─┬─msg───┐
120 │ sunqi │
322 │ su │
└────┴─────┴───────┘

5 rows in set. Elapsed: 0.012 sec.

查询副本1表:

1
2
3
4
5
6
7
8
9
10
SELECT *
FROM kafka_table_local

┌─id─┬─age─┬─msg──┐
221 │ a │
422 │ suqi │
└────┴─────┴──────┘

2 rows in set. Elapsed: 0.002 sec.

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信