undefined

kafka 如何实现消息有序

一、生产者端

生产者端一旦网络出现波动,那么消息就可能出现失序。应该考虑同步发送消息

  1. 设置消息响应参数 acks > 0,最好是 -1 ;再设置 max.in.flight.requests.per.connection = 1 。这样,kafka 发送端,一条消息发出后,响应必须满足 acks 设置的参数后,才会发送下一条消息。虽然在使用时,还是异步发送,但其实底层已经是一条接一条的发送了
    注意:max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)
  2. 当调用 KafkaProducer 的 send 方法后,调用 send 方法返回的 Future 对象的 get 方式阻塞等待结果。等结果返回后,再继续调用 KafkaProducer 的 send 方法发送下一条消息

同步发送消息之外,还要考虑消息重发问题。

kafka 发送端可以在发送出现问题时,判断问题是否可以自动恢复,如果是可以自动恢复的问题,可以通过设置 retries > 0,让 kafka 自动重试。kafka 1.0 之后的版本,引入了幂等特性,设置 enable.idempotence = true,幂等特性可以给消息添加序列号,每次发送,会把序列号递增加 1。然后我们就可以设置 max.in.flight.requests.per.connection = 5 ,这样,当 Kafka 发消息的时候,由于消息有了序列号,当发送消息出现错误的时候,在 Kafka 底层会通过获取服务器端的最近几条日志的序列号和发送端需要重新发送的消息序列号做对比,如果是连续的,那么就可以继续发送消息,保证消息顺序。同时提高 kafka 的性能

二、broker 端

Kafka 只保证同个分区内的消息是有序的。所以,如果要保证业务全局严格有序,就要设置 Topic 为单分区的形式。

不过,往往我们的业务是不需要考虑全局有序的,我们只需要保证业务中不同类别的消息有序即可。对这些业务中不同类别的消息,可以设置成不同的 Key,然后根据 Key 取模。这样,由于同类别消息有同样的 Key,就会被分配到同样的分区中,保证有序。
但是,这里有个问题,就是当我们对分区的数量进行改变的时候,会把以前可能分到同样的分区的消息,分到别的分区上。这就不能保证消息顺序了。

面对这种情况,就需要在动态变更分区的时候,考虑对业务的影响。有可能需要根据业务和当前分区需求,重新划分消息类别。
另外,如果一个 Topic 存在多分区的情况,并且 min.insync.replicas 指定的副本个数挂掉了,那么,就会出现这种情况:发送消息写入不了对应分区,但是消费依然可以消费消息。此时,往往我们会保证可用性,会考虑切换消息的分区,一旦这样做,消息顺序就可能出现不一致的情况。所以,一定要保证 min.insync.replicas 参数配置的合适,去最大可能保证消息写入的顺序性。

三、消费者端

在消费者端,一个 topic 下的一个分区只能从属于监听这个 topic 的消费者组中的某一个消费者。如果分区数量大于消费者数量,则会出现一个消费者被分配了多个分区的情况。可能就出现消息无序。最好是一个消费者对应一个分区。

另一个就是消费者的重平衡,重平衡会让消费者组中所有消费者都暂停消费。重平衡前后相当于让消费者组中消费者重新分配分区,这就可能造成消费者在 Rebalance 前后所对应的分区不一致。分区不一致,那自然消费顺序就不可能一致了。

总结:整个 kafka 不保证有序。如果为了保证 kafka 全局有序,那么设置一个生产者、一个分区、一个消费者。