undefined

kafka 所有的请求都是通过TCP网络以Socket 的方式进行通讯的,使用的Reactor模式

1
2
3
4
5
6
Broker端参数 num.network.threads:网络线程池的数量,默认值是3。表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
Broker端参数 num.io.threads:控制 IO线程池 的数量,默认值是8,表示每台Broker启动后自动创建8个IO线程处理请求

注意:请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属。因为 Dispatcher 只是用于分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 就没有必要放在一个公共地方。

Purgatory组件:用来“缓存延时请求”,就是哪些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 Produce 请求,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,他就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

控制类请求和数据类请求分离

查看更多

undefined

nginx的基础

正向代理:安装在客户端,客户端的访问通过代理去请求

反向代理:安装在服务端,所有客户端的请求先到代理,由代理去负载均衡到后端某个服务器

负载均衡

动态分离:静态资源直接由nginx返回,不用到达后端服务器

access.log:记录每一条http请求信息

查看更多

undefined

一、kafka 架构

二、kafka 术语

kafka支持两种消息引擎模型

1
2
1. 点对点模型:消息队列模型。系统A发送的消息只能被系统B接收,其他任何系统不能读取A发送的消息
2. 发布/订阅模型:存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,他们都能接收到相同主题的消息

查看更多

undefined

副本机制的好处

1
2
3
1. 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性
2. 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量
3. 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时

kafka的副本机制

1
2
3
4
5
6
7
8
目前kafka只 "提供数据冗余" 实现高可用性和高持久性
副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本

副本本质上就是一个只能追加写消息的提交日志,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用

1. kafka中,副本分为:领导者副本、追随者副本。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本
2. kafka中,追随者副本是不对外提供服务的。追随者副本唯一的任务就是从领导者副本异步拉取消息,并写入自己的提交日志中,从而实现与领导者副本的同步
3. 当领导者副本挂掉后,或者说领导者副本所在的Broker宕机时,kafka依托于Zookeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老Leader副本重启回来后,只能作为追随者副本加入到集群中

查看更多

undefined

Broker 端参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
log.dirs: 指定Broker需要使用的若干个文件目录路径
log.dir: 表示单个路径,补充上一个参数用的
只要设置log.dirs就好,配置多个路径,中间用逗号分隔。如果有条件最好保证这些目录挂载到不同的物理磁盘上

zookeeper.connect: 指定zookerrper监听的地址
如果有两套kafka集群,分别叫kafka1,kafka2,那么这两套集群的zookeeper.connect 参数可以这样指定:zk1:2181,zk2:2181/kafka1 和 zk1:2181,zk2:2181/kafka2 就是chroot,只需要写一次,而且是加到最后的

关于Broker连接相关,即客户端程序或其他Broker如何与该Broker进行通信的设置
listeners: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开发的kafka服务
advertised.listeners: 这组监听器是Broker用于对外发布的
host.name/port: 不要设置,过期的参数

auto.create.topics.enable: 是否允许自动创建Topic。建议设置false,当发送的事件,如果topic之前没有创建,就会自动创建一个
unclean.leader.election.enable: 是否允许Unclean Leader 选举。每个分区有多个副本需要选一个Leader副本,只有保存数据比较多的那些副本才有资格竞选,如果保存数据比较多的副本都挂了。此参数设置成false,代表不允许那些落后太多的副本竞选Leader,这样做的后果是这个分区就不可用了。此参数如果设置成true,代表kafka允许从那些“跑的慢”的副本中选一个出来当Leader,这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全。这个参数推荐设置false
auto.leader.rebalance.enable: 是否允许定期进行Leader选举。有可能出现LeaderA 一直表现很好,但此参数为true,那么就有可能一段时间后LeaderB就要强行卸任换成LeaderB。换一个LeaderB的代价很高。此参数推荐为false

log.retention.{hours|minutes|ms}: 这三个都是控制一条消息数据被保存多长时间。优先级:ms设置最高,minutes次之,hours最低。默认log.retention.houts=168 保存7天的数据。
log.retention.bytes: 指定Broker为消息保存的总磁盘容量大小。默认为-1,表明想在这台Broker上保存多少数据都可。
message.max.bytes: 控制Broker能够接收的最大消息大小。默认值不到1M,太小了。

topic级别参数会覆盖全局Broker参数的值
retention.ms: 规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置此值,会覆盖Broker端的全局参数值
retention.bytes: 规定要为该Topic预留多大的磁盘空间。默认值为-1,表示可以无限使用磁盘空间
max.message.bytes: kafka Broker能够正常接收该Topic的最大消息大小

如下可以设置Topic级别的参数:在创建Topic时设置
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
在修改Topic时设置参数:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

undefined

zookeeper在kafka中的存在

注意:kafka于 2.8 版本放弃 zookeeper,该版本将依赖于 zookeeper 的控制器改造成了基于 kafka Raft 的 Quorm 控制器

在基于 kafka 的分布式消息队列中,zookeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等

一、broker 在 zookeeper 中的注册

  • 为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers
  • Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其 broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为 /brokers/ids/{broker.id}
查看更多

undefined

一、kafka为什么吞吐量大?速度快

  1. 磁盘顺序读写
    使用磁盘的顺序读写,一般来说要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至高于内存随机读写
    kafka的message是不断追加到本地磁盘文件末尾的
    每一个partition是一个文件,收到消息后kafka会把数据插入到文件末尾,但是没有办法删除数据,因此kakfka是不会删除数据的
    如何删除数据:1. 基于时间 2. 基于 partition 文件大小

  2. Page Cache
    为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。
    避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
    避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题
    相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,提供了 write-behind、read-ahead以及flush等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。
    通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。

查看更多

undefined

4. 消息堆积

消息的堆积往往因为生产者和生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试导致,也有可能就是消费者消费能力弱,逐渐消息就积压

因此我们需要先定位消费慢的原因,如果是本身消费能力较弱,下游的数据处理不及时,我们可以优化下消费逻辑,提高每批次拉取的数量。

如果消费逻辑已经优化了,消费能力不足,则可以考虑增加 topic 的分区数,并同时提升消费组的消费者数量,消费者数 = 分区数

undefined

consumer设计原理

1
2
3
4
5
kafka consumer:用户主线程、心跳线程
用户主线程:启动consumer应用程序main方法的线程
心跳线程:1. 负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性。2. 期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解藕真实的消息处理逻辑与消费者成员存活性管理。

KafkaConsumer 类不是线程安全的,所有的网络I/O处理都是发生在用户主线程中,不能在多个线程中共享同一个 KafkaConusmer 实例

定制两套多线程方案

1
2
1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,而从实现消息获取与消息处理的真正解藕

查看更多