undefined

分区策略

1
2
3
4
5
6
7
8
功能:决定生产者将消息发送到那个分区的算法
常见的分区策略:
1. 轮询策略。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略
2. 随机策略
3. 按消息键保序策略(key-ordering策略)。保证同一个key的所有消息都进入到相同的分区里面,每个分区下的消息处理都是有顺序的
kafka默认分区策略:如果指定了key,默认按消息键保序策略;如果没有指定key,则使用轮询策略

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展

生产者压缩算法

1
2
3
4
5
6
7
kafka中Producer端压缩、Broker端保持、Consumer端解压缩

Properties props = new Properties();
props.put("compression.type", "gzip"); // 开启GZIP压缩
表明该Producer的压缩算法使用的是GZIP

Broker端也会进行解压缩,每个压缩过的消息集合在Broker端写入时都要发生解压缩操作,目的就是对消息执行各种验证

消息丢失原理

1
2
3
4
5
6
7
8
9
10
kafka只对“已提交”的消息做有限度的持久化保证
已提交消息定义:当kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,他们会告诉生产者程序这条消息已成功提交。此时,这条消息在kafka看来就正式变为“已提交”消息了。可选设置有1个Broker成功写入就算已提交,也可选所有Broker成功写入算已提交。

设置acks=all。acks是Producer的一个参数,代表对“已提交”消息的定义。如果设置为all,则表明所有副本Broker都要接受到消息,才算“已提交”。

有限度持久化保证:假设消息保存在N个kafka Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。只要这个条件成立,kafka就能保证消息永远不丢失

producer永远要使用带有回调通知的发送API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。callback可以告知消息是否提交成功,如果是那些瞬时错误,重试就可以。如果是消息不合格造成,那么可以调整消息格式后在此发送。

consumer端:如果是多线程异步处理消费消息,consumer程序不要开启自动提交位移,而是要应用程序手动提交位移。

无消息丢失配置

1
2
3
4
5
6
7
8
1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

拦截器

1
2
3
4
5
拦截器分为生产者拦截器和消费者拦截器
生产者拦截器允许你在发送消息前以及消息提交后植入你的拦截器逻辑
消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑

kafka的拦截器可以应用于客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

生产者拦截器

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

生产者端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。里面有两个核心方法
1. onSend:该方法会在消息发送之前被调用
2. onAcknowledgement: 该方法会在消息成功提交或者发送失败之后被调用。onAcknowledgement 的调用要早于发送回调通知callback的调用。这个方法和onSend 不是在同一个线程中被调用的,因此如果在两个方法共享某个对象,注意线程安全。这个方法在Producer发送的主路径中,所以不要放太重的逻辑进去。

消费者拦截器

1
2
3
消费者拦截器实现类需要继承 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,也有两个核心方法
1. onConsume: 该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会对数据做一些事情,之后再返回。
2. onCommit:consumer在提交位移之后调用该方法。通常可以做一些记账的动作

kafka生产者如何管理TCP连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程运行时首先会创建与Broker的连接。
Java Producer 目前会连接bootstrap.servers参数所指定的所有Broker。因此一般配置bootstrap.servers时指定3-4台就可以了。
Producer向某一台Broker发送了METADATA请求,尝试获取集群的元数据信息,这就是producer能够获取集群所有信息的方法

tcp连接还可能在 (1.更新集群元数据后 2. 消息发送时) 被创建。

何时Producer会更新集群元数据呢?
1. 当Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer这个主题不存在。此时Producer会发送METADATA请求给kafka集群,去尝试获取最新的元数据信息
2. producer通过metadata.max.age.ms参数定期地去更新元数据信息,默认为5分钟。

kafka这种设计的合理性:
目前,一个producer默认会向集群的所有Broker都创建TCP连接,不管是否真的需要传输请求。这显然没有必要。kafka还支持强制将空闲的TCP连接资源关闭,更加多此一举。可能有这种情况:我的producer可能只会与其中的3-5台broker长期通信,但producer启动依次创建与这1000台Broker的TCP连接,一段时间之后,大约有995个TCP连接又要强制关闭,太浪费资源了。

何时关闭TCP连接
1. 用户主动关闭。用户调用 producer.close()
2. kafka自动关闭。producer端参数 connections.max.idle.ms 默认参数为9分钟,即如果9分钟内此连接没有任何请求,那么kafka会主动关闭此TCP连接。如果设置为-1,TCP连接将成为永久连接。