Kafka学习笔记
约 6937 字大约 23 分钟
术语
消息引擎:系统 A 发送消息给消息引擎系统,系统 B 从消息引擎系统中读取 A 发送的消息。消息引擎的作用简单来说就是“削峰填谷”
消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递
消息队列模型:也叫点对点模型,A系统发送的消息只能被B系统接收并消费到
发布订阅模型:即对于同一个主题(Topic)的消息,所有的订阅者都能够接收并消费到
Java消息服务(JMS-Java Message Service)👉一套实现分布式系统之间消息传递的API接口规范,JMS同时支持消息队列模型和发布订阅模型两种消息引擎模型。
Kafka中的术语
- 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
- 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
- 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
- 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- 生产者:Producer。向主题发布新消息的应用程序。
- 消费者:Consumer。从主题订阅新消息的应用程序。
- 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
- 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
- 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka简介
Kafka是强大的消息引擎系统,也是多功能分布式流处理平台,此外还可以用作分布式存储系统(前两种使用更多,存储系统还不成熟)。
我们通常说的Kafka是指Apache Kafka,除此之外还有Confluent Kafka和CDH/HDP Kafka。Apache Kafka使用人数最多,但只提供最基础的组件;Confluent是Kafka的三个创始人创办的公司,主要从事Kafka的商业化工具开发
Kafka的发展历史
- v0.7:基础的消息队列
- v0.8:引入副本机制
- v0.9:增加基础的安全认证/权限功能;新版本的Consumer API;KafkaConnect
- v0.10:引入Kafka Streams,升级为分布式流处理平台;从v0.10.2.2开始新版本Consumer API稳定,ProducerAPI性能也有提升
- v0.11:提供幂等性Producer API和事务API,对Kafka消息格式重构
- v1.0:改进Kafka Streams和Kafka Connect
- v2.0:继续改进Kafka Streams和Kafka Connect;提升消息压缩比,减少了磁盘空间和网络IO消耗
- v3.0:停止对Java8的支持;增强系统安全性;更新了客户端API
对于Kafka的使用版本建议使用v2.*版本,并保证客户端和服务端版本一致
特点/优势
Kafka设计考虑了以下问题:
- 吞吐量/延时。
- 消息持久化。
- 负载均衡和故障转移。
- 伸缩性。
高吞吐量、低延时
Kafka通过如下方式解决高吞吐量、低延时问题:
- 大量使用操作系统页缓存,内存操作速度快且命中率高。
- Kafka不直接参与物理1/0操作,而是交由最擅长此事的操作系统来完成。
- 采用追加写入方式,摒弃了缓慢的磁盘随机读/写操作。
- 使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率。
Kafka参数说明
Broker参数
配置存储信息
即Broker使用哪些磁盘
log.dirs
指定Broker需要使用的若干个文件目录路径。此参数没有默认值,必须手动配置。具体格式为逗号分隔的一组路径,如:/home/kafka1,/home/kafka2,/home/kafka3。有条件的话最好保证这些目录挂载到不同的物理磁盘上,有两点好处
- 提升读写性能
- 实现故障转移,自v1.1开始,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作
log.dir
用于补充上一个参数(log.dirs),建议不设置
zookeeper相关配置
zookeeper.connect
用于指定zookeeper的位置,也是一个CSV格式的参数(逗号分隔),如:zk1:2181,zk2:2181,zk3:2181。
注
如何让多个Kafka集群使用同一个zookeeper集群?
这时候 chroot 就派上用场了,这个 chroot 是 ZooKeeper 的概念,类似于别名。
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:
Kafka集群1:zk1:2181,zk2:2181,zk3:2181/kafka1
Kafka集群2:zk1:2181,zk2:2181,zk3:2181/kafka2
切记 chroot 只需要写一次,而且是加到最后的。
下边这种写法是错误的zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3
Broker连接参数
listeners
监听器,告诉连接者通过什么协议访问指定主机名和端口开放的kafka服务
它的值是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。主机名最好全部使用主机名(不要使用IP地址),即 Broker 端和 Client 端应用配置中全部填写主机名。这里的协议名称可能是标准的协议名称和自定义协议名称
- PLAINTEXT 表示明文传输;
- SSL 表示使用 SSL 或 TLS 加密传输等;
- 其他允许的标准协议名称;
- 自定义的协议,一旦自己定义了协议名称,必须还要指定
listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议
一个简单的自定义协议写法如下:
listeners=CONTROLLER://localhost:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXTadvertised.listeners
这组监听器是 Broker 用于对外发布的
topic管理参数
auto.create.topics.enable
是否允许自动创建 Topic。建议设置为false(禁止自动创建),防止产生各种奇怪名称的topic
unclean.leader.election.enable
是否允许 Unclean Leader 选举。建议设置为false(禁止)
如果设置为true,那么 Kafka 允许从那些“数据不全”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的
auto.leader.rebalance.enable
是否允许定期进行 Leader 选举。建议设置为false,换leader对系统性能会产生影响(原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益)
数据留存参数
log.retention.{hours|minutes|ms}
这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
通常情况下我们还是设置 hours 级别,log.retention.hours=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当作存储来使用,那么这个值就要相应地调大
log.retention.bytes
这是指定 Broker 为消息保存的总磁盘容量大小。默认为-1,保存多少数据都可以。
此参数的使用场景是在云上构建多租户的 Kafka 集群:每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,可以通过这个参数控制
message.max.bytes
控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB,因此在线上环境中设置一个比较大的值还是比较保险的做法
Topic参数
重要
Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值
retention.ms
规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
retention.bytes
规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
max.message.bytes
它决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。
在很多公司都把 Kafka 作为一个基础架构组件来运行,上面跑了很多的业务数据。如果在全局层面上不好给出一个合适的最大消息值,那么不同业务能够自行设定这个 Topic 级别参数就显得非常必要了
如何设置topic级别的参数
有两种方式
- 创建topic时设置
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880- 修改topic时设置(建议)
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760JVM参数
KAFKA_HEAP_OPTS
指定堆大小。
KAFKA_JVM_PERFORMANCE_OPTS
指定 GC 参数。
Kafka服务启动示例:
export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
bin/kafka-server-start.sh config/server.properties操作系统参数
ulimit -n
我觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。
通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000
“你和钱,谁对我更重要?都不重要,没有你对我很重要!”
文件系统类型的选择
对于Kafka来说在不同文件系统下的性能对比为
ZFS>XFS>ext4
swap调优
可以设置成一个较小的值,如1,不建议设置为0,因为
一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间
提交时间(落盘时间)
向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了。
随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。Kafka提供了副本冗余机制,所有可以忽略在此期间服务器宕机造成数据丢失的风险
生产者客户端
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
- 构造生产者对象所需的参数对象。
- 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
- 使用 KafkaProducer 的 send 方法发送消息。
- 调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
示例代码如下
Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
……
}Apache Kafka 的所有通信都是基于 TCP 的
注
何时建立TCP连接
- 在创建KafkaProducer对象时,生产者会启动一个Sender线程,该线程开始运行时会先创建和Broker的连接。
注意,这里会连接所有bootstrap.servers中配置的Broker节点,因此不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了,因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息
- 更新元数据后会重新建立TCP连接
当Producer给一个不存在的Topic发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息;
Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟
- 发送消息时会建立TCP连接
注
何时关闭TCP连接
Producer 端关闭 TCP 连接的方式有两种:
- 用户主动关闭;建议通过
producer.close()关闭,通过kill -9也包括在主动关闭里 - Kafka 自动关闭。Producer 端参数
connections.max.idle.ms默认是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动把该 TCP 连接关闭。可以设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接
消息交付可靠性
常见的可靠性保证有三种
- 做多一次。消息可能丢失,但不会重复
- 至少一次。消息可能重复,但不会丢失
- 精确一次。消息不会丢失也不会重复
Kafka默认提供更多时第二种,但我们通常需要的时第三种。这可以通过幂等性和事务性生产者实现
幂等性Producer
幂等性,即某些操作或函数能够被执行多次,但每次得到的结果都是不变的。其最大的优势在于我们可以安全地重试任何幂等性操作
通过设置props.put(“enable.idempotence”, ture)或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)即可实现幂等性Producer。
注
它的原理是用时间换空间
在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉
注意
此处的幂等性的作用范围
- 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,可以理解为 Producer 进程的一次运行。当重启了 Producer 进程之后,这种幂等性保证就丧失了
那么如何实现跨分区、多会话上的消息无重复?通过事务性Producer实现
事务性Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区中,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
事务性Producer的开启需要两项配置
- 开启
enable.idempotence = true - 设置 Producer 端参数
transactional. id,最好为其设置一个有意义的名字
在代码中使用事务
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}为了读取事务性Producer发送的消息,消费者端需要设置 isolation.level 参数,这个参数有两个可选值
- read_uncommitted:这是默认值,此时Consumer 能够读取到 Kafka 写入的任何消息
- read_committed:此时Consumer 只会读取事务型 Producer 成功提交事务写入的消息和所有非事务型 Producer 写入的消息
生产者分区机制
Kafka使用的是 主题(topic)-分区(partion)-消息(message)的三层结构,每一个消息都会落到某一个分区内,而不会在多个分区保存多份。这样涉及的作用是提供了负载均衡的能力(提高系统伸缩性),除此之外,利用分区还可以实现一些业务级别的需求,如业务级别的消息顺序问题。
根据不同的业务场景需求,可以设置不同的分区策略来应对,这里的策略是指决定生产者将消息发送到哪个分区的算法。包括默认策略和自定义策略,默认策略为轮询,可以通过配置生产者端的参数 partitioner.class来实现自定义策略,这个参数值需要一个类的全路径名,而这个类只需要实现 org.apache.kafka.clients.producer.Partitioner接口即可。这个接口只有 partition()和 close()两个方法,重点关注 partition()方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);轮询--默认分区策略
也称顺序分配,如一个topic有3个分区0、1、2,那么第一条消息会发送到分区0、第二条消息会发送到分区1、第三条消息发送到分区2、第四条消息发送到分区0。如果未设置partitioner.class参数即使轮询策略,它的负载均衡表现非常优秀,也是最常用的分区策略。
随机策略
随意的将一条消息放到任意一个分区上,设置随机分区的 partition()方法代码如下
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());按key分区策略
Kafka可以为每个消息指定一个key,这个key可以是有明确意义的业务名称,如业务ID、编号等,也可以是任意的消息元数据,如生成时间戳。一旦定义了这个key,就可以通过分区策略保证同样的key被发送到同一个分区上,这种分区策略的 partition()方法代码为
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();其他自定义策略
可以根据不同的业务逻辑设置按地理位置分区、按用户组分区等方式
压缩算法
在不同版本的Kafka中所使用的消息格式是不一样的,v0.11.0.0之前的版本使用V1版格式,v0.11.0.0版本中引入了V2版格式。V2对V1做了优化(把消息的公共信息放到了消息集合中),V2版比V1版更加节省磁盘空间。
不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
指定压缩算法
在生产者代码中配置 compression.type这个参数可以设置压缩算法,代码如下
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);在V2.1.0版本之前,Kafka支持一下几种压缩算法
- GZIP
- Snappy
- LZ4
从V2.1.0开始支持Zstandard算法(简称zstd),以上四种压缩算法各有千秋,分别适应不同的应用场景,如
- 在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;
- 在压缩比方面,zstd > LZ4 > GZIP > Snappy。
- 在物理资源占用方面,使用 Snappy 算法占用的网络带宽最多,zstd 最少;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU
提示
如何确定是否需要压缩消息
启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足,除此之外,如果环境中带宽资源有限也需要开启压缩。而如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗。
什么时候需要压缩
除了在生产者端发送消息时会执行压缩操作之外,在Broker端也可能执行压缩,有两种情况
- Broker端指定了和Producer端不同的压缩算法。在Broker端也有一个
compression.type参数,它的默认值是 producer,即和生产之使用相同的压缩算法,如果这个值设定的压缩算法不同,那么在broker端会先将消息解压,然后按照设置的算法重新压缩 - Broker端发生了消息转换。如果在一个环境中Kafka保存了两种格式的消息(V1和V2),那么为了兼容性Broker会将V2消息转换为V1消息,这中间会涉及压缩和解压。应严格限制这种情况的发生,因为这时除了消息转换之外还会使“零拷贝”特性失效
提示
什么时候解压缩
除了上文中提到的Broker压缩的情况中会涉及到解压缩之外,只有Consumer中使用消息时会进行解压缩。因此,在正常情况下:Producer 端压缩、Broker 端保持、Consumer 端解压缩
消息防丢失
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
关键词:已提交、有限度持久化保证
何为已提交?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时为已提交。
何为有限度持久化保证?假如你的消息保存在 N 个 Kafka Broker 上,那么这个限度就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失
消息丢失的原因
消息丢失的原因主要有如下两个
- Producer发送失败
- 消费者读取出错
目前 Kafka Producer 是异步发送消息的,如果调用的是 producer.send(msg) 这个 API,那么它通常会立即回,但此时不能认为消息发送已成功完成,如网络不稳定消息没法送到,或消息太大超过了Broker设置的极限,这时消息发送失败对Producer来说是未知的。
而 producer.send(msg, callback)这个API可以通过callback知道消息的状态,能够针对消息发送失败做进一步处理。
造成Consumer读取出错的原因有offset设置出错,即先设置了offset,在读取消息时出错,导致这条消息被跳过了;还有一种情况时多线程处理一条消息,但其中一个线程出错了。解决这两种问题都可以通过设置更新offset的时间点的方式来解决。
设置为先消费消息,后更新offset可以解决第一个问题;设置为手动提交offset可以解决第二个问题
防止消息丢失的设置
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
消费者客户端
Kafka拦截器
Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许在发送消息前以及消息提交成功后植入拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。
值得一提的是,这两种拦截器都支持链的方式,即可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
在生产者和消费者端都有 interceptor.classes这个参数,用来指定一组类的列表,这里的类就是对应拦截器的实现类,可以按下列代码设置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.test.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("cn.test.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);Producer端的拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor接口,此接口提供两个方法
onSend():该方法会在消息发送之前被调用。如果你想在发送之前处理消息,这个方法是唯一的机会;onAcknowledgement():该方法会在消息成功提交或发送失败之后被调用。
同理,Consumer端的拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor接口,此接口也提供两个方法
onConsume():该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,之后再返回onCommit():Consumer 在提交位移之后调用该方法。通常可以在该方法中做一些记账类的动作,比如打日志等
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景