undefined

consumer的消费位移:记录了consumer要消费的下一条消息的位移,不是目前最新消费消息的位移

提交位移:consumer需要为分配给它的每个分区提交各自的位移数据

位移提交的语义保障是由用户保证,kafka只会“无脑”的接受你提交的位移

从用户角度讲,位移提交分为自动提交和手动提交。从consumer端的角度讲,位移提交分为同步提交和异步提交

自动提交位移:

1
2
3
4
5
6
7
8
consumer端参数 enable.auto.commit 设置为true,默认为true,代表自动提交位移。
consumer端参数 auto.commit.interval.ms 默认为5秒,代表kafka每5秒会自动提交一次位移

Properties props = new Properties();
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");

缺陷:consumer每5秒自动提交一次位移,如果提交位移之后的3秒发生了Rebalance,在Rebalance之后,所有consumer从上一次提交的位移处继续消费,但该位移已经是3秒前的位移数据了,因此Rebalance发生前3秒消费的所有数据都要重新再消费一次。

手动提交位移:

1
2
3
4
5
6
enable.auto.commit 设置为false
1. 同步方式(阻塞):
consumer.commitSync(); // 同步操作,如果提交过程出现异常,该方法会抛异常,阻塞。
2. 异步方式(非阻塞):
consumer.commitAsync(); // 异步操作,有回调函数
异步方式出现问题时不会自动重试,因为提交失败后重试时提交的位移值早已经过期了,没有意义了。因此需要我们在回调中做一些操作

提交位移如何得到理想情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
原因:
1. 可以利用commitSync 的自动重试来规避哪些瞬时错误,比如网络的瞬时抖动,Broker端的GC等。因为这些问题时短暂的,自动重试通常都会成功,这些我们希望kafka consumer 帮我们做
2. 我们不希望程序总处于阻塞状态,影响TPS

实现:
try {
while (true) {
consumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAsync(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
如上常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 consumer 要关闭前,我们调用 commitSync() 方式执行同步阻塞式的位移提交,以确保 consumer 关闭前能够保证正确的位移数据。

更细粒度化的提交位移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
比如:你的方法poll方法返回的时5000条消息,我想处理500条就提交,因为一次性处理5000条,中间出现差错的话,之前处理的全部都要重来一遍。
commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)


private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
Map<TopicPartition, OffsetAndMetadata>: 键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据

同步提交时,出现 CommitFailedException 异常的场景

1
2
3
4
5
6
7
8
9
10
11
场景一: 
当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,kafka consumer端会抛出 CommitFailedException 异常
解决:
1. 缩短单条消息处理的时间
2. 增加 consumer 端允许下游系统消费一批消息的最大时长。max.poll.interval.ms 的值,默认值是5分钟,提高它
3. 减少下游系统一次性消费的消息总数。max.poll.records 的值,默认是500条,表明调用一次 KafkaConsumer.poll 方法,最多返回500条消息。减少它
4. 下游系统使用多线程来加速消费。可以随时提高消费承载能力,但是处理位移提交较难

场景二(冷门):
consumer端还提供了一个名为 Standalone Consumer的独立消费者。也需要指定 group.id 参数来提交位移,每个消费者实例都是独立工作的,彼此之间毫无联系
如果你的应用中同时出现了设置相同 group.id 值的消费者程序和独立消费者程序,那么当独立消费者手动提交位移时,kafka就会立刻抛出 CommitFailedException 异常,因为kafka无法识别这个具有相同 group.id 的消费者实例。