undefined

Kafka消费监控他们的消费进度:消费者Lag

1
2
3
4
5
滞后程度:消费者当前落后于生产者的程度。
Lag:单位是消息数。生产者生产的消息数目减去消费者消费的消息数目。层级是在分区上的
如果需要计算主题级别的,需要手动汇总所有主题分区的Lag,将他们累加起来,合并成最终的Lag值

Lag越大,消费者的速度无法匹及生产者的速度,又可能导致它消费的鹅数据已经不在操作系统的页缓存中了,这样消费者不得不从磁盘上读取他们,就会进一步拉大了与生产者的差距,进而出现马太效应,即哪些Lag原本就很大的消费者越来越慢,Lag也会越来越大。

监控Lag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1. 使用命令行工具 kafka-consumer-groups 脚本
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

2. 使用Kafka Java Consumer API编程

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}

3. 使用Kafka自带的 JMX 监控指标
关注Lead 值:消费者最新消费消息的位移与分区当前第一条消息位移的差值。一旦监测到Lead 越来越小,甚至快接近于0,预示着消费者端要丢数据了