consumer的消费位移:记录了consumer要消费的下一条消息的位移,不是目前最新消费消息的位移
提交位移:consumer需要为分配给它的每个分区提交各自的位移数据
位移提交的语义保障是由用户保证,kafka只会“无脑”的接受你提交的位移
从用户角度讲,位移提交分为自动提交和手动提交。从consumer端的角度讲,位移提交分为同步提交和异步提交
自动提交位移:
1 2 3 4 5 6 7 8
| consumer端参数 enable.auto.commit 设置为true,默认为true,代表自动提交位移。 consumer端参数 auto.commit.interval.ms 默认为5秒,代表kafka每5秒会自动提交一次位移
Properties props = new Properties(); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000");
缺陷:consumer每5秒自动提交一次位移,如果提交位移之后的3秒发生了Rebalance,在Rebalance之后,所有consumer从上一次提交的位移处继续消费,但该位移已经是3秒前的位移数据了,因此Rebalance发生前3秒消费的所有数据都要重新再消费一次。
|
手动提交位移:
1 2 3 4 5 6
| enable.auto.commit 设置为false 1. 同步方式(阻塞): consumer.commitSync(); // 同步操作,如果提交过程出现异常,该方法会抛异常,阻塞。 2. 异步方式(非阻塞): consumer.commitAsync(); // 异步操作,有回调函数 异步方式出现问题时不会自动重试,因为提交失败后重试时提交的位移值早已经过期了,没有意义了。因此需要我们在回调中做一些操作
|
提交位移如何得到理想情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 原因: 1. 可以利用commitSync 的自动重试来规避哪些瞬时错误,比如网络的瞬时抖动,Broker端的GC等。因为这些问题时短暂的,自动重试通常都会成功,这些我们希望kafka consumer 帮我们做 2. 我们不希望程序总处于阻塞状态,影响TPS
实现: try { while (true) { consumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAsync(); // 使用异步提交规避阻塞 } } catch(Exception e) { handle(e); // 处理异常 } finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close(); } } 如上常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 consumer 要关闭前,我们调用 commitSync() 方式执行同步阻塞式的位移提交,以确保 consumer 关闭前能够保证正确的位移数据。
|
更细粒度化的提交位移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 比如:你的方法poll方法返回的时5000条消息,我想处理500条就提交,因为一次性处理5000条,中间出现差错的话,之前处理的全部都要重来一遍。 commitSync(Map<TopicPartition, OffsetAndMetadata>) commitAsync(Map<TopicPartition, OffsetAndMetadata>)
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; …… while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回调处理逻辑是null count++; } } Map<TopicPartition, OffsetAndMetadata>: 键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据
|
同步提交时,出现 CommitFailedException 异常的场景
1 2 3 4 5 6 7 8 9 10 11
| 场景一: 当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,kafka consumer端会抛出 CommitFailedException 异常 解决: 1. 缩短单条消息处理的时间 2. 增加 consumer 端允许下游系统消费一批消息的最大时长。max.poll.interval.ms 的值,默认值是5分钟,提高它 3. 减少下游系统一次性消费的消息总数。max.poll.records 的值,默认是500条,表明调用一次 KafkaConsumer.poll 方法,最多返回500条消息。减少它 4. 下游系统使用多线程来加速消费。可以随时提高消费承载能力,但是处理位移提交较难
场景二(冷门): consumer端还提供了一个名为 Standalone Consumer的独立消费者。也需要指定 group.id 参数来提交位移,每个消费者实例都是独立工作的,彼此之间毫无联系 如果你的应用中同时出现了设置相同 group.id 值的消费者程序和独立消费者程序,那么当独立消费者手动提交位移时,kafka就会立刻抛出 CommitFailedException 异常,因为kafka无法识别这个具有相同 group.id 的消费者实例。
|