一、消息丢失原理
1 | kafka只对“已提交”的消息做有限度的持久化保证 |
kafka 丢消息会发生在 Broker、Producer、consumer 三种
1.1 Broker 端丢数据
Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。
broker 写数据只写到 pageCache中,而 pageCache 位于内存,这部分数据断电后会丢失。pageCache 的数据会通过 Linux 的 flusher 程序进行刷盘。刷盘触发条件有:
- 主动调用sync或fsync函数
- 可用内存低于阀值
- dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除。
Broker 刷盘机制是通过调用 fsync 函数进行刷盘的,因此从单个 Broker 来看,pageCache 的数据会丢失。理论上,要完全让 kafka 保证单个 broker 不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如:减少刷盘间隔、减少刷盘数据量大小。刷盘时间越短、性能越差、可靠性越好。
1.2 如何防止 Broker 端丢数据
kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。producer 通过kafka 的 ack 机制检测数据是否丢失?
acks = 0,producer 不等待 broker 的响应,效率最高,但消息会消失
acks = 1,leader broker收到消息后,不等待其他 follower 的响应,即返回 ack。此时,如果 follower 没有收到leader 同步的消息,leader 挂了,那么消息就会丢失。也就是说,如果 leader 收到消息,成功写入 pageCache 后,返回 ack,producer 认为消息发送成功,如果数据没有被同步到 follower,leader 挂了,数据就丢失
acks = -1,leader broker 收到消息后,先挂起等待 ISR 列表中所有的 follower 返回结果后,在返回 ack。-1 等效于 all。如果此时断电,producer 可以知道消息没有发送成功,将会重新发送。如果在 follower 收到数据以后,成功返回 ack。leader 断电,数据将存在于原来的 follower 中。在重新选举以后,新的 leader 会持有该部分数据。数据从 leader 同步到 follower,需要两步:
- 数据从pageCache被刷盘到disk。因为只有disk中的数据才能被同步到 replica
- 数据同步到 replica,并且replica成功将数据写入PageCache。在producer得到ack后,哪怕是所有机器都停电,数据也至少会存在于leader的磁盘内
ISR(in-sync Replica)列表是 Broker 维护的一个 “可靠的 follower“ 列表。还需要配合另一个参数
min.insync.replicas
(ISR 最少的副本数)才能更好的保证 ack 的有效性,如果不设置该值,ISR 的 follower 列表可能为空。此时相当于 acks = 1
2.1 Producer 端丢数据
producer 端为了提升效率,减少 IO,producer 在发送数据时会将多个请求进行合并后发送。被合并的请求会缓存在本地的 buffer 中,producer 会按照一定时间间隔将 buffer 中数据发出。在正常情况下,客户端的异步调用可以通过 callback 来处理消息发送失败或者超时的情况,但是一旦 producer 被非法停止了,那么 buffer 中的数据会被丢失,broker 将无法收到该部分数据。还有一点,当 producer 客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是 block阻塞),消息也会被丢失。
还有的话,如果消息产生(异步产生)过快,上一次发送未完成,buffer 已经被填满,后到的线程就会阻塞等待,挂起线程过多,内存不足,导致程序崩溃,消息会丢失。
2.2 如何防止 Producer 端丢数据
- 异步发送消息改成同步发送消息。或者设置线程数有一个上限。整体思路就是控制消息产生速度
- 扩大 buffer 的容量配置。这种方式可以缓解该情况,但不能杜绝。或者消息写到本地磁盘(数据库或文件),由另一个生产线程发送消息,相当于加了缓冲层。 整体思路是扩大缓冲层容量
3.1 Consumer 端丢数据
consumer 消费消息的步骤:
- 接收消息
- 处理消息
- 反馈“处理完毕”(commited)
consumer 的消费方式主要有两种:自动提交 offset、手动提交 offset
consumer 自动提交的机制是根据一定的时间间隔,将收到的消息进行 commit。commit 过程和消费消息的过程是异步的,也就是说,可能存在消费过程未成功(比如抛出异常),commit 消息已经提交了。此时消息就丢失了
3.2 如何防止 consumer 端丢数据
consumer 手动提交的话,手动控制 offset,可以保证消息“至少被消费一次”。但此时可能出现重复消费的情况。
二、如何防止消息丢失
1 | 1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。 |