undefined

一、kafka 的消费模式

producer和consumer对于消息的交付可靠性保障

1
2
3
4
5
6
kafka 的消费模式
1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。如果在 ack 超时或返回错误时 producer 不重试,则该消息可能最终不会写入 kafka,因此不会传递给 consumer。
2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka 。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka 。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入多次,可能导致重复的工作和不正确的结果。
3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer 。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将 Kafka consumer 的偏移量 rollback ,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现 excactly-once

kafka默认提供的交付可靠性是至少一次。比如出现网络瞬时抖动,Broker的应答没有成功发送回Producer端。那就需要Producer重复发送

kafka如何做到消息精确一次呢?机制:幂等性和事务

二、如何保证消息精确一次

1. 幂等性

1
2
3
4
5
6
7
8
9
10
11
12
在数学中,幂等的意思为:执行某个操作或函数能够执行多次,但每次得到的结果都是不变的。幂等最大的优势在于我们可以安全地重试任何幂等性操作,反正他们也不会破坏我们的系统状态。
幂等性是kafka 0.11.0.0 版本引入的新功能
指定幂等性:props.put("enable.idempotence", true) 或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

实现:
每个 Producer 在初始化时都会被分配一个唯一的 PID,这个 PID 对应用是透明的,完全没有暴露给用户。对于一个给定的 PID,sequence number 将会从0开始自增。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number ,broker 也就是通过这个来验证数据是否重复。
这里的 PID 是全局唯一的,但是 Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。
broker上每个 Topic-Partition 也会维护 pid-seq 的映射,并且每次 Commit 都会更新 lastSeq。这样Record Batch 到来时,broker 会先检查 Record Batch 再保存数据。如果 batch 中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存

幂等性的作用范围:(幂等性Producer)
1. 一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
2. 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。会话可以理解为Producer进程的一次运行,当重启了Producer进程之后,幂等性会消失。

查看更多

undefined

kafka 如何实现消息有序

一、生产者端

生产者端一旦网络出现波动,那么消息就可能出现失序。应该考虑同步发送消息

  1. 设置消息响应参数 acks > 0,最好是 -1 ;再设置 max.in.flight.requests.per.connection = 1 。这样,kafka 发送端,一条消息发出后,响应必须满足 acks 设置的参数后,才会发送下一条消息。虽然在使用时,还是异步发送,但其实底层已经是一条接一条的发送了
    注意:max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)
  2. 当调用 KafkaProducer 的 send 方法后,调用 send 方法返回的 Future 对象的 get 方式阻塞等待结果。等结果返回后,再继续调用 KafkaProducer 的 send 方法发送下一条消息

查看更多

undefined

分区策略

1
2
3
4
5
6
7
8
功能:决定生产者将消息发送到那个分区的算法
常见的分区策略:
1. 轮询策略。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略
2. 随机策略
3. 按消息键保序策略(key-ordering策略)。保证同一个key的所有消息都进入到相同的分区里面,每个分区下的消息处理都是有顺序的
kafka默认分区策略:如果指定了key,默认按消息键保序策略;如果没有指定key,则使用轮询策略

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展

生产者压缩算法

1
2
3
4
5
6
7
kafka中Producer端压缩、Broker端保持、Consumer端解压缩

Properties props = new Properties();
props.put("compression.type", "gzip"); // 开启GZIP压缩
表明该Producer的压缩算法使用的是GZIP

Broker端也会进行解压缩,每个压缩过的消息集合在Broker端写入时都要发生解压缩操作,目的就是对消息执行各种验证

查看更多

undefined

kafka 文件存储机制

一、topic 中 partition 存储分布

在 kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partition 为一个目录,partition 命名规则为 “topic名称+有序序号”,第一个 partition 序号从 0 开始,序号最大值为 partitions 数量减 1 。

当kafka 中有多 Broker 多 partition 的时候:如下,是一个 kafka 集群中,1 个 Topic 4 个 Broker,2 Replication,

当集群中新增 2 节点,partition 增加到 6 个时分布情况如下:

查看更多

undefined

消息队列

一、消息队列的模式

消息队列目前主要2种模式,分别为“点对点模式”和“发布/订阅模式”。

1. 点对点模式

一个具体的消息只能由一个消费者消费。多个生产者可以向同一个消息队列发送消息;但是,一个消息在被一个消费者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。

2. 发布/订阅模式

单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:

  • 临时(ephemeral)订阅,这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。
查看更多

undefined

一、消息丢失原理

1
2
3
4
5
6
7
8
9
10
kafka只对“已提交”的消息做有限度的持久化保证
已提交消息定义:当kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,他们会告诉生产者程序这条消息已成功提交。此时,这条消息在kafka看来就正式变为“已提交”消息了。可选设置有1个Broker成功写入就算已提交,也可选所有Broker成功写入算已提交。

设置acks=all。acks是Producer的一个参数,代表对“已提交”消息的定义。如果设置为all,则表明所有副本Broker都要接受到消息,才算“已提交”。

有限度持久化保证:假设消息保存在N个kafka Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。只要这个条件成立,kafka就能保证消息永远不丢失

producer永远要使用带有回调通知的发送API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。callback可以告知消息是否提交成功,如果是那些瞬时错误,重试就可以。如果是消息不合格造成,那么可以调整消息格式后在此发送。

consumer端:如果是多线程异步处理消费消息,consumer程序不要开启自动提交位移,而是要应用程序手动提交位移。

kafka 丢消息会发生在 Broker、Producer、consumer 三种

1.1 Broker 端丢数据

Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。

broker 写数据只写到 pageCache中,而 pageCache 位于内存,这部分数据断电后会丢失。pageCache 的数据会通过 Linux 的 flusher 程序进行刷盘。刷盘触发条件有:

查看更多

undefined

consumer的消费位移:记录了consumer要消费的下一条消息的位移,不是目前最新消费消息的位移

提交位移:consumer需要为分配给它的每个分区提交各自的位移数据

位移提交的语义保障是由用户保证,kafka只会“无脑”的接受你提交的位移

从用户角度讲,位移提交分为自动提交和手动提交。从consumer端的角度讲,位移提交分为同步提交和异步提交

自动提交位移:

查看更多

undefined

重平衡过程通过消费者端的心跳线程通知到其他消费者实例

heartbear.interval.ms:设置了心跳的间隔时间,也控制重平衡通知的频率。如果想要消费者实例能迅速地得到通知,那可以设置一个非常小的值,这样消费者就能更快的感知到重平衡已经开启了

消费者组的5种状态

查看更多
1
2
3
4
5
6
1. 发起 FindCoordinator 请求时。
当消费者程序首次调用poll方法时,它需要向kafka集群发送一个名为 FindCoordinator 的请求,希望kafka集群告诉它那个Broker是管理它的协调者。消费者程序会向集群中当前负载最小的那台Broker发送请求。(负载评估:消费者连接的所有Broker,谁的待发送请求最少,就会认为那个Broker的负载低)
2. 连接协调者时
Broker处理完上一步发送的FindCoordinator 请求之后,会返回对应的响应结果,显式告诉消费者那个Broker是真正的协调者,因此,consumer会创建向该Broker的Socket连接。只有成功连入协调者,协调者才能开启的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等等
3. 消费数据时
消费者会为每个要消费的分区创建与该分区领导者副本所在的Broker连接的TCP。

消费者程序会创建3类TCP连接:

1
2
3
1. 确定协调者和获取集群元数据
2. 连接协调者,令其执行组成员管理操作
3. 执行实际的消息获取

查看更多

undefined

nginx配置文件

  • worker_processes 8;

    定义worker进程的个数,每个worker进程都是单线程的进程,他们会调用各个模块实现不同的功能。如果这些模块确定不会出现阻塞式的调用,那么,有多少CPU内核就应该配置多少个进程;反之,如果有可能出现阻塞式调用,那么需要配置稍多一些的worker进程。

  •   events {
    

查看更多