undefined

消费者组:consumer Group是kafka提供的可扩展且具有容错性的消费者机制

1
2
3
4
5
6
7
8
9
1. Consumer Group 下可以有一个或多个Consumer实例。这里的实例可以是一个单独的进程,也可以是同一个进程下的线程。
2. Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个Consumer Group
3. Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个Consumer实例消费。这个分区当然也可以被其他的Group消费

kafka仅仅使用Consumer Group这一种机制,却同时实现了传统消息引擎系统的两个模型:如果所有实例都属于一个Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的Group,那么它实现的就是发布/订阅模型。

理想情况,Consumer实例的数量应该等于该Group订阅主题的分区总数。如果实例数量太多,则有些实例永远处于空闲状态;如果实例数量太少,那实例可能需要消费多个分区。

对于消费位移(KV{分区,位移}),kafka将位移保存在kafka内部主题 __consumer_offsets 中

消费者组重平衡机制

1
2
3
4
5
6
7
8
9
目的:规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区。
触发条件:
1. 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,抑或是有Consumer实例奔溃被“踢出”组
2. 订阅主题数发送变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscrible(Pattern.compile("t.*c")) 就表明该Group订阅所有以字母t开头、字母c结尾的主题。在Consumer Group的运行过程中,新创建一个满足这样条件的主题,那么该Group就会发生Rebalance
3. 订阅主题的分区数发生变更。kafka当前只能允许增加一个主题的分区数,当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance
缺点:
1. 在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成。
2. 在Rebalance的设计中所有的Consumer实例会共同参与,全部重新分配所有分区,其实高效的做法是尽量减少分配方案的变动
3. Rebalance实在特别慢,应该避免Rebalance的出现

kafka内部的位移主题 __consumer_offsets

1
2
3
位移主题的Key中应该保存3部分内容:<GroupID, 主题名,分区号>,value除了位移值,还有时间戳、用户自定义的数据等。

当kafka集群中的第一个Consumer程序启动时,kafka会自动创建位移主题。此主题的分区数由Broker端参数 offsets.topic.num.partitions决定,默认值为50。kafka的日志路径中会有 __consumer_offsets-xxx 这样的目录。副本数由参数 offsets.topic.replication.factor决定,默认值为3

提交位移:自动提交位移和手动提交位移

1
2
3
4
5
6
Consumer端参数: enable.auto.commit 
为true:Consumer在后台定期提交位移,提交间隔由参数:auto.commit.interval.ms来控制
为false:手动提交位移,Consumer API提供方法:consumer.commitSync等,当调用这些方法时,kafka会向位移主题写入相应消息

自动提交位移的问题:只要consumer一直启动,他就会无限期的向位移主题写入消息
kafka使用compact策略来删除位移主题中的过期消息,kafka提供专门的后台线程定期的巡检待compact的主题,看看是否存在满足条件的可删除数据。这个后台进程是Log Cleaner。

协调者:Coordinator

1
2
3
4
5
6
作用:专门为消费者组执行Rebalance以及提供位移管理和组成员管理等,每个Group对应一个Coordinator。
所有的Broker都有各自的Coordinator组件。Broker在启动时,创建和开启Coordinator组件。

Consumer Group 如何确定为它服务的Coordinator在那台Broker上呢?两个步骤
1. 确定由位移主题的那个分区来保存该Group数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
2. 找到该分区领导者副本所在的Broker,该Broker即为对应的Coordinator

重平衡的缺点

1
2
3
1. Rebalance 影响来Consumer端的TPS,在重平衡期间,Consumer会停下来什么都不干
2. Rebalance 很慢,如果Group下有很多成员,就是个痛点
3. Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

重平衡发生的时机

1
2
3
1. 组成员数量发生变化(概率较高)
2. 订阅主题数量发生变化
3. 订阅主题的分区数发生变化

如何避免重平衡

1
2
3
4
5
6
7
8
9
10
11
12
13
每个consumer实例都会定期地向Coordinator发生心跳请求,表明它还存活。以下参数会影响:
1. consumer端参数 session.timeout.ms 表示心跳的超时时间,默认是10s。如果10s内没有收到Group下某个Consumer实例的心跳,Coordinator就会认为 consumer实例已经挂了。
2. consumer端参数 heartbeat.interval.ms,是发送心跳请求频率的参数。
3. consumer端参数 max.poll.interval.ms,限定了consumer端应用程序两次调用poll方法的最大时间间隔,默认值为5分钟。表示如果consumer程序在5分钟内无法消费完poll方法返回的消息,那么consumer会主动发起“离开组”的请求,coordinator也会开启新一轮Rebalance

如何避免非必要Rebalance:
1. 未能及时发送心跳,导致consumer被踢出Group而引发。推荐一下设置
session.timeout.ms = 6s
heartbeat.interval.ms = 2s
保证consumer实例在被判定为“dead”之前,能够发送至少3轮的心跳请求
2. consumer消费时间过长。需要为业务处理逻辑留下充足的时间,这样consumer就不会因为处理这些消息的时间过长而引发Rebalance。
比如业务处理逻辑7分钟,那 max.poll.interval.ms至少设置8分钟。
3. 排查consumer端的GC表现。比如是否出现频繁的Full GC 导致的长时间停顿。