undefined

consumer设计原理

1
2
3
4
5
kafka consumer:用户主线程、心跳线程
用户主线程:启动consumer应用程序main方法的线程
心跳线程:1. 负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性。2. 期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解藕真实的消息处理逻辑与消费者成员存活性管理。

KafkaConsumer 类不是线程安全的,所有的网络I/O处理都是发生在用户主线程中,不能在多个线程中共享同一个 KafkaConusmer 实例

定制两套多线程方案

1
2
1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,而从实现消息获取与消息处理的真正解藕
方案 优点 缺点
方案1: 多线程+多KafkaConsumer实例 1. 方便实现
2. 速度快,无线程间交互开销
3. 易于维护分区内的消费顺序
1. 占有更多系统资源
2. 线程数受限于主题分区数,扩展性差
3. 线程自己处理消息容易超时,从而引发Rebalance
方案2: 单线程+单KafkaConsumer实例+消息处理Worker线程池 1. 可独立扩展消费获取线程数和Worker线程数
2. 伸缩性好
1. 实现难度高
2. 难以维护分区内的消息消费顺序
3. 处理链路拉长,不易于位移提交管理

方案1的主体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;


public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}


// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}

方案2的主体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..