undefined

一、kafka 的消费模式

producer和consumer对于消息的交付可靠性保障

1
2
3
4
5
6
kafka 的消费模式
1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。如果在 ack 超时或返回错误时 producer 不重试,则该消息可能最终不会写入 kafka,因此不会传递给 consumer。
2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka 。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka 。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入多次,可能导致重复的工作和不正确的结果。
3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer 。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将 Kafka consumer 的偏移量 rollback ,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现 excactly-once

kafka默认提供的交付可靠性是至少一次。比如出现网络瞬时抖动,Broker的应答没有成功发送回Producer端。那就需要Producer重复发送

kafka如何做到消息精确一次呢?机制:幂等性和事务

二、如何保证消息精确一次

1. 幂等性

1
2
3
4
5
6
7
8
9
10
11
12
在数学中,幂等的意思为:执行某个操作或函数能够执行多次,但每次得到的结果都是不变的。幂等最大的优势在于我们可以安全地重试任何幂等性操作,反正他们也不会破坏我们的系统状态。
幂等性是kafka 0.11.0.0 版本引入的新功能
指定幂等性:props.put("enable.idempotence", true) 或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

实现:
每个 Producer 在初始化时都会被分配一个唯一的 PID,这个 PID 对应用是透明的,完全没有暴露给用户。对于一个给定的 PID,sequence number 将会从0开始自增。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number ,broker 也就是通过这个来验证数据是否重复。
这里的 PID 是全局唯一的,但是 Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。
broker上每个 Topic-Partition 也会维护 pid-seq 的映射,并且每次 Commit 都会更新 lastSeq。这样Record Batch 到来时,broker 会先检查 Record Batch 再保存数据。如果 batch 中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存

幂等性的作用范围:(幂等性Producer)
1. 一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
2. 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。会话可以理解为Producer进程的一次运行,当重启了Producer进程之后,幂等性会消失。

2. 事务型Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
事务性Producer:实现多分区以及多会话的消息无重复,而且不怕Producer重启
设置事务型Producer,需要满足两个条件:
1. 和幂等性Producer一样,开启 enable.idempotence=true
2. 设置Producer端参数transactional.id,最好为其设置一个有意义的名字
Producer代码也需要做一些调整

producer.initTransactions(); // 事务初始化
try {
producer.beginTransaction(); // 事务开始
producer.send(record1);
producer.send(record2);
producer.commitTransaction(); // 事务提交
} catch (KafkaException e) {
producer.abortTransaction(); // 事务终止
}
这段代码可以保证 Record1 和 Record2 被当作一个事务统一提交到kafka,要么他们全部提交成功,要么全部提交失败。

consumer端需要设置isolation.level参数的值:这个参数有两个取值
1. read_uncommitted:默认值,表明consumer能够读取到kafka写入的任何消息,不论事务型Producer提交事务还是终止事务,其写入的消息都可以读取
2. read_committed:表明consumer只会读取事务型Producer成功提交事务写入的消息。它也能看到非事务型Producer写入的消息
因为事务型Producer即使写入失败,kafka也会把他们写入到底层的日志中,也就是consumer还是会看到这些消息。