undefined

一、消息丢失原理

1
2
3
4
5
6
7
8
9
10
kafka只对“已提交”的消息做有限度的持久化保证
已提交消息定义:当kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,他们会告诉生产者程序这条消息已成功提交。此时,这条消息在kafka看来就正式变为“已提交”消息了。可选设置有1个Broker成功写入就算已提交,也可选所有Broker成功写入算已提交。

设置acks=all。acks是Producer的一个参数,代表对“已提交”消息的定义。如果设置为all,则表明所有副本Broker都要接受到消息,才算“已提交”。

有限度持久化保证:假设消息保存在N个kafka Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。只要这个条件成立,kafka就能保证消息永远不丢失

producer永远要使用带有回调通知的发送API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。callback可以告知消息是否提交成功,如果是那些瞬时错误,重试就可以。如果是消息不合格造成,那么可以调整消息格式后在此发送。

consumer端:如果是多线程异步处理消费消息,consumer程序不要开启自动提交位移,而是要应用程序手动提交位移。

kafka 丢消息会发生在 Broker、Producer、consumer 三种

1.1 Broker 端丢数据

Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。

broker 写数据只写到 pageCache中,而 pageCache 位于内存,这部分数据断电后会丢失。pageCache 的数据会通过 Linux 的 flusher 程序进行刷盘。刷盘触发条件有:

  • 主动调用sync或fsync函数
  • 可用内存低于阀值
  • dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除。

Broker 刷盘机制是通过调用 fsync 函数进行刷盘的,因此从单个 Broker 来看,pageCache 的数据会丢失。理论上,要完全让 kafka 保证单个 broker 不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如:减少刷盘间隔、减少刷盘数据量大小。刷盘时间越短、性能越差、可靠性越好。

1.2 如何防止 Broker 端丢数据

kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。producer 通过kafka 的 ack 机制检测数据是否丢失?

  • acks = 0,producer 不等待 broker 的响应,效率最高,但消息会消失

  • acks = 1,leader broker收到消息后,不等待其他 follower 的响应,即返回 ack。此时,如果 follower 没有收到leader 同步的消息,leader 挂了,那么消息就会丢失。也就是说,如果 leader 收到消息,成功写入 pageCache 后,返回 ack,producer 认为消息发送成功,如果数据没有被同步到 follower,leader 挂了,数据就丢失

  • acks = -1,leader broker 收到消息后,先挂起等待 ISR 列表中所有的 follower 返回结果后,在返回 ack。-1 等效于 all。如果此时断电,producer 可以知道消息没有发送成功,将会重新发送。如果在 follower 收到数据以后,成功返回 ack。leader 断电,数据将存在于原来的 follower 中。在重新选举以后,新的 leader 会持有该部分数据。数据从 leader 同步到 follower,需要两步:

    • 数据从pageCache被刷盘到disk。因为只有disk中的数据才能被同步到 replica
    • 数据同步到 replica,并且replica成功将数据写入PageCache。在producer得到ack后,哪怕是所有机器都停电,数据也至少会存在于leader的磁盘内

    ISR(in-sync Replica)列表是 Broker 维护的一个 “可靠的 follower“ 列表。还需要配合另一个参数 min.insync.replicas (ISR 最少的副本数)才能更好的保证 ack 的有效性,如果不设置该值,ISR 的 follower 列表可能为空。此时相当于 acks = 1

2.1 Producer 端丢数据

producer 端为了提升效率,减少 IO,producer 在发送数据时会将多个请求进行合并后发送。被合并的请求会缓存在本地的 buffer 中,producer 会按照一定时间间隔将 buffer 中数据发出。在正常情况下,客户端的异步调用可以通过 callback 来处理消息发送失败或者超时的情况,但是一旦 producer 被非法停止了,那么 buffer 中的数据会被丢失,broker 将无法收到该部分数据。还有一点,当 producer 客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是 block阻塞),消息也会被丢失。

还有的话,如果消息产生(异步产生)过快,上一次发送未完成,buffer 已经被填满,后到的线程就会阻塞等待,挂起线程过多,内存不足,导致程序崩溃,消息会丢失。

2.2 如何防止 Producer 端丢数据

  • 异步发送消息改成同步发送消息。或者设置线程数有一个上限。整体思路就是控制消息产生速度
  • 扩大 buffer 的容量配置。这种方式可以缓解该情况,但不能杜绝。或者消息写到本地磁盘(数据库或文件),由另一个生产线程发送消息,相当于加了缓冲层。 整体思路是扩大缓冲层容量

3.1 Consumer 端丢数据

consumer 消费消息的步骤:

  • 接收消息
  • 处理消息
  • 反馈“处理完毕”(commited)

consumer 的消费方式主要有两种:自动提交 offset、手动提交 offset

consumer 自动提交的机制是根据一定的时间间隔,将收到的消息进行 commit。commit 过程和消费消息的过程是异步的,也就是说,可能存在消费过程未成功(比如抛出异常),commit 消息已经提交了。此时消息就丢失了

3.2 如何防止 consumer 端丢数据

consumer 手动提交的话,手动控制 offset,可以保证消息“至少被消费一次”。但此时可能出现重复消费的情况。

二、如何防止消息丢失

1
2
3
4
5
6
7
8
1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。