type
Post
status
Published
date
Jul 19, 2023
slug
kafka-consumer-multiplethread
summary
本文详细介绍 Kafka 消费的过程并研究如何设计和实现 Kafka 的并发消费
tags
Java
Kafka
分布式
category
学习思考
icon
password
Property
Aug 3, 2023 06:12 AM
先让我们简单了解一下本文的主角 Kafka,它是一个分布式事件流处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。Kafka 的工作原理可以简化为事件流的“发布-持久化存储-消费”过程,本文主要探讨的是消费者的效率提升,也就是并发消费的设计和实现。
Kafka 消费过程和原理
现实世界或业务系统中发生了“某些事情”的事实在软件系统中被抽象为事件 Event,在某些文档中也叫记录 Record 或者消息 Message。事件是 Kafka 存储和分发的主角,从概念上讲,事件具有键、值、时间戳和可选的元数据头,上图彩色小圆点代表着生产者写入的不同事件。
事件被组织并持久存储在 Kafka 的逻辑概念“主题”中,主题被拆分成一个或者多个分区,当新事件发布到主题时,它实际上会被追加到主题的分区之一,然后按照“先入先出”的顺序读取和消费,也就是说事件在同一个分区的读取是可以保证顺序的,上图中,主题有 4 个分区,分区 1 中的事件读取顺序可以确定是 1-2-3… ,但是它跟分区 3 中事件读取的先后顺序是无法确定的。
Kafka 中的主题始终是多生产者和多消费者:一个主题可以有零个、一个或多个向其写入事件的生产者,也可以有零个、一个或多个订阅这些事件的消费者。
消费者从属于消费者群组,消费者群组里的消费者共享主题分区的所有权,第一个加入群组的消费者将成为群组首领。首领从群组协调器(从所有的 Broker 中选举出来)那里获取群组的成员列表,列表中包含了所有最近发送过心跳的消费者,也就是被标记为“活着”的成员,并负责为每一个消费者分配分区。首领使用
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
接口来决定哪些分区应该被分配给哪个消费者。上图中消费者群组包含 2 个消费者,消费者 分配到了分区 1 和分区 3,消费者 分配到了分区 2 和分区 4。我们把更新分区当前读取位置的操作叫作偏移量提交,消费者会将已成功处理的最后一条消息的位置提交给 Kafka,并假定该消息之前的每一条消息都已成功处理,此时消费者会向一个叫作
__consumer_offset
的主题发送消息,消息里包含每个分区的偏移量,默认情况下,消费者会自动提交偏移量。当消费者数量增加或者减少时,分区的所有权会发生从一个消费者转移到另一个消费者的行为,我们把这个过程称为再均衡,群组首领负责执行再均衡过程,此过程中,消费者可能会无法读取部分或者所有分区的消息,再均衡让 Kafka 消费者获得了横向扩展的能力,让使用者可以根据不同场景灵活调整自己的消费者数量。
使用 Kafka Client 来实现消费过程
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, " org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group"); Duration timeout = Duration.ofMillis(100); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(List.of(TOPIC, TOPIC2)); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(timeout); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.printf("topic = %s, partition = %d, offset = %d, " + "key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } }
Kafka 消费者的并发设计
Kafka 作为一种高吞吐量的分布式消息队列系统,其性能优势在于能够处理大规模的事件流。在面对大量消息涌入的情况下,单线程的消费者可能无法跟上处理速度,导致消息积压的现象,在介绍完基本概念和消费流程之后,我们继续探索如何引入并发处理策略,充分发挥多核处理器的优势,将消息分发给多个消费者线程并同时处理,从而提高整体消费速度。
我们先从简单的并发模型入手,也就是每个消费者一个线程,通过合理地配置消费者群组中消费者的数量来实现多个线程同时处理消息
每个消费者一个线程
Kafka 的事件存储在分区中,每个分区最多可以分配给同个消费群组的一个消费者,消费者不是线程安全的,我们既不能在同一个线程中运行多个同属一个群组的消费者,也不能保证多个线程能够安全地共享一个消费者,所以默认情况下分区数量就是主题的最大并发处理上限。
如图所示,如果主题有 4 个分区,那么可以在消费者群组中使用 4 个消费者,来达到最大的消费并发,每个消费者运行在独立线程中,实现方式如下
private static void startConsumer() { for (int i = 0; i < 4; i++) { Thread t = new Thread(() -> consume(), "Thread-consumer-" + i); t.start(); } } private static void consume() { // 此处的 properties 也不能在线程之间共享,否则无法启动 4 个 consumer Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, " org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "thread-pre-consumer"); Duration timeout = Duration.ofMillis(100); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(List.of(TOPIC)); //不停拉取新事件 while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(timeout); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.printf("Thread = %s, topic = %s, partition = %d, offset = %d, " + "customer = %s, country = %s\n", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }
此时,偏移量会自动在
poll
方法的执行过程中提交,所以每次都是提交的处理过的偏移量,两次 poll
操作的最大间隔是 max.poll.interval.ms
控制的,默认是5分钟,超过这个时间,消费者会被认为已经失活,则会触发消费组的再均衡。如果消费者处理一个消息的时候花费时间太久,这个情况很可能会发生,比如受到 CPU、文件 IO、网络和 GC 等的因素的影响,所以采用这个方案时需要注意下列参数的设置,避免消费者因超时而被踢出消费组max.poll.records
- 设置为较小的值
max.poll.interval.ms
- 设置为更高的值
此方案实现起来较为简单,且无需额外的多线程控制,效率也高,同时还保持了分区内消息处理的顺序性,缺点是分区数量限制了并发消费的规模,并且如果处理消息的过程耗时过长则会导致消费者被移除成员列表。
每个消费者多个线程
我们进一步拆解消费者的行为,可以将整个消费过程抽象为一个循环,其中包括三个阶段:获取新消息,处理消息,提交偏移量,其中默认设置为自动提交偏移量,在获取新消息的过程中会提交上一个循环成功处理的消息偏移量。
那我们如果想要在分区数量上进一步提高消费的并发程度,则需要将这个循环的三个阶段解耦,然后单独扩展处理阶段的线程数量,也就是每个消费者在自己的线程外,额外开启处理消息的线程,这样能够使得处理消息的并发程度突破分区数量的限制,加快消息的处理。
这个方案示意图如下
实现此方案时需要关注几个细节
偏移量提交
当我们分离处理和提交偏移量的过程,自动提交偏移量就不再适用,因为提交偏移量和消息处理处于不同的线程,此时偏移量的提交可能在消息处理成功之前,导致消息遗漏,所以我们需要设置
enable.auto.commit
为 false
来禁用自动提交偏移量,采用在代码中手动提交的方式确保消息处理成功之后,其所在的偏移量才被提交。理想情况下,提交的偏移量和实际处理的偏移量之间应该是对应的,此时才能保证消息不重不漏,如果提交量大于实际处理量,则会发生消息的遗漏,如果提交量小于实际处理量,则会导致消息被重复处理。
手动提交偏移量可调用
commitSync()
进行同步提交,提交的是此次 poll
方法返回的偏移量,此时应用程序会阻塞等待 Broker 响应成功之后才会继续进行下一次消息拉取,会在一定程度上降低消费的吞吐量,加大提交偏移的间隔可以改善吞吐降低的问题,但如果发生了再均衡,则会增加潜在的消息重复。或者我们可以使用异步提交 commitAsync
,不会阻塞等待,而且在提交成功或碰到无法恢复的错误之前,commitSync()
会一直重试,但 commitAsync()
不会。异步提交的问题是我们必须小心谨慎的处理失败重试,避免重试提交时,已经有一个更大的偏移量被提交,这样会导致消息重复。所以,实际实践中我们一般都组合使用同步提交和异步提交,尽量做到系统整体消费吞吐量和偏移量正确提交的平衡。分区内消息处理的顺序性
如果分区内的消息被进一步分发到不同的线程中处理,则其所保证的顺序性就被破坏了,我们的实现应该能够根据需求保证分区内消息处理的顺序性或者局部的顺序性,比如分区内同一个 Key 的消息处理时的顺序性。如果是分区内所有消息都需要保持顺序处理,则可以将分区分配给以一个线程,如果是局部顺序处理,则可以将不同的 key 分配给不同的线程,实际实践中,建议使用线程池来管理工作线程,提高线程的创建和回收效率。
消费者群组再均衡问题
当我们使用多线程处理消息的时候,发生了消费者群组的再均衡,分配给每个消费者的分区分生变化时,不能再继续处理以前分配的分区消息,此时应该停止处理消息并及时更新自己所能处理的分区和消息。消费者会在退出和进行分区再均衡之前做一些清理工作,此时也需要处理偏移量的提交,将已经处理过的消息及时提交给 Broker。
具体的实现,请参考下面的步骤和代码
- 拆分主线程一个循环内的任务,解耦三个阶段
// MultiThreadConsumerWrapper.java public void consume(Properties properties) { Duration timeout = Duration.ofMillis(100); try { consumer = new KafkaConsumer<>((Properties) properties.clone()); consumer.subscribe(List.of("multi-thread-topic")); //一个执行循环 ♻️ while (!stopped.get()) { //处理拉取到的消息 handleRecords(consumer.poll(timeout)); //检查任务是不是并发执行完毕 checkTasks(); //提交任务执行中产生的偏移量 commitOffsets(); } } catch (WakeupException e) { if (!stopped.get()) { throw new IllegalStateException(e); } } finally { if (consumer != null) { consumer.close(); } } }
- 多线程并发处理消息
private void handleRecords(ConsumerRecords<String, String> records) { if (records.count() > 0) { List<TopicPartition> needPause = new ArrayList<>(); //此处将每个分区放入一个线程执行 records.partitions().forEach(partition -> { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); Task task = new Task(partitionRecords, record -> System.out.printf( "Thread = %s, topic = %s, partition = %d, offset = %d, " + "customer = %s, country = %s\n", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value())); needPause.add(partition); //提交到线程池中执行 executorService.submit(task); //记录任务和分区对应关系 activeTasks.put(partition, task); }); //任务是异步处理的,所以为了保持同个分区的顺序性,可以将此分区暂停拉取,此时 poll 方法不会返回此分区的任何数据 consumer.pause(needPause); } }
- 检查任务执行情况
private void checkTasks() { List<TopicPartition> finishedPartitions = new ArrayList<>(); activeTasks.forEach((partition, task) -> { if (task.isFinished()) { finishedPartitions.add(partition); } long offset = task.getCurrentOffset(); if (offset > 0) { //记录当前分区的偏移量,统一提交 offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); } }); finishedPartitions.forEach(activeTasks::remove); //处理完成之后可以继续拉取下一批数据 consumer.resume(finishedPartitions); }
- 提交偏移量
private void commitOffsets() { try { long currentTimeMillis = System.currentTimeMillis(); // 不会每次 poll 都提交,固定一个时间间隔提交,降低对吞吐量的影响 if (currentTimeMillis - lastCommitTime > COMMIT_INTERVAL) { if (!offsetsToCommit.isEmpty()) { consumer.commitSync(offsetsToCommit); offsetsToCommit.clear(); } lastCommitTime = currentTimeMillis; } } catch (Exception e) { System.out.println("Failed to commit offsets -- " + e.getMessage()); } }
- 监听再均衡事件,等待正在执行的任务完成,剩余的消息不再处理
// public class MultiThreadConsumerWrapper implements ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 停止所有被撤销所有权的分区上的任务 Map<TopicPartition, Task> stoppedTask = new HashMap<>(); for (TopicPartition partition : partitions) { Task task = activeTasks.remove(partition); if (task != null) { task.stop(); stoppedTask.put(partition, task); } } // 如果被撤销的分区,有正在进行的任务,则等待其完成,记录偏移量 stoppedTask.forEach((partition, task) -> { long offset = task.waitForCompletion(); if (offset > 0) { offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); } }); // 提交所有可以提交的偏移量 Map<TopicPartition, OffsetAndMetadata> offsetNeedCommit = new HashMap<>(); partitions.forEach(partition -> { OffsetAndMetadata offset = offsetsToCommit.remove(partition); if (offset != null) { offsetNeedCommit.put(partition, offset); } }); try { consumer.commitSync(offsetNeedCommit); } catch (Exception e) { System.out.println("Failed to commit offsets for revoked partitions!"); } }
- 停止消费者
public void stop() { stopped.set(true); //这个方法是线程安全的,可以唤醒正在阻塞的 poll 方法,抛出 WakeupException consumer.wakeup(); }
上面的代码简单的展示了将同一个消费者的处理能力扩展到多个线程中所需要进行的工作(这些示例代码不建议在生产环境中使用)。代码中没有展示的功能也可以很容易的添加进去,比如按照事件的 Key 将同一个分区的消息继续分发到不同的线程中,进一步提高并发能力;将同步提交的方式改成异步提交,在消费者关闭之前再次进行同步提交,最大化吞吐能力等
Spring 中如何实现消费者的并发消费
在现代 Java 开发离不开的框架 Spring 中,我们也可以很容易的实现消费者的并发消费,如下面代码所示,只需要配置
KafkaListener
的 concurrency
参数即可。@KafkaListener(id = "consumer", topics = TOPIC, groupId = "spring-1", autoStartup = "false", concurrency = "4") public void listen1(String value, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_KEY) String key) { logger.info("Consumer consumed event -- thread:{}, topic: {}, key:{}, value:{}", Thread.currentThread().getName(), topic, key, value); }
通过阅读 Spring-kafka 的源码可知,它实现的是每个消费者一个线程的并发模式,也就是这个参数的最大值就是分区数量,传入的数值大于分区数会导致部分消费者闲置。