![Kafka进阶](https://wfqqreader-1252317822.image.myqcloud.com/cover/408/43738408/b_43738408.jpg)
3.4.3 生产者拦截器
从图3.1可以看出,在KafkaProducer的主线程中可以创建一个或多个ProducerInterceptors(拦截器)。拦截器是从Kafka 0.10版本中引入的,在生产者端和消费者端均可设置,拦截器主要用于实现生产者端和消费者端的定制化控制逻辑。
对生产者而言,拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。下面展示了该接口ProducerInterceptor中的方法。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-079-3.jpg?sign=1739189596-YnzW88e5cA4tWdjF73konjIP5D3KRQvn-0-3458d7bce3b750acc07279e2edef6995)
下面对接口中的三个方法做出必要的解释:
(1)onSend。
该方法将在KafkaProducer.send方法的主线程中执行。KafkaProducer确保在消息被序列化以前,调用ProducerInterceptor.onSend方法。用户可以在该方法中对消息进行操作,但最好不要修改消息所属的Topic和分区,否则会影响目标分区的计算。
(2)onAcknowledgement。
该方法会在消息被确认应答之前或消息发送失败时调用。如果生产者采用的是异步发送机制,该方法通常是在生产者回调逻辑触发之前被调用的。需要注意的是,该方法运行在生产者的I/O线程中,因此不要在该方法中放入很重的逻辑,否则会影响生产者的消息发送性能。
(3)close。
关闭拦截器之前,可以将一些资源清理工作放在close方法中。
需要注意的是,如果指定了多个连接器,生产者将按照指定顺序调用它们。如果拦截器中出现了异常,生产者会将异常的错误信息记录到错误日志中,而不是向上传递。
当创建完拦截器后,可以通过以下的代码在生产者端指定它们。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-080-1.jpg?sign=1739189596-zrAr2JZo8wLzozy9DGIEXE1p4pSGFPK5-0-9f863d34b0013c9675c1d3be72c51d47)
在了解了拦截器的功能特点后,我们通过一个具体的例子来演示拦截器的使用方法。在这个例子中,将开发两个拦截器,具体的功能需求如下。
(1)拦截器1:将当前系统的时间戳设置到Employee对象的hiredate属性上。
(2)拦截器2:统计生产者发送成功和发送失败的消息总数。
下面是拦截器1的完整代码:HireDateTimeStampInterceptor。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-080-2.jpg?sign=1739189596-lfg0KeO7zHrsgfNRYeq7QFEzLbHmkxNR-0-b556508d0993b124f5f7d9dc1bb9e558)
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-081-1.jpg?sign=1739189596-6gPhDG1Pqm0AiEBtvNC4p2nxu6uFJMdI-0-574da993c5e67daa8feab074c865824b)
下面是拦截器2的完整代码:ProducerSendCounterInterceptor。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-081-2.jpg?sign=1739189596-wZ7OxfexjMkNT4pKrzghkG8LUUwHzt5d-0-312cce5a299382452db71a8b0b57fe8d)
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-082-1.jpg?sign=1739189596-8PDg9nqgZsgVaADTLqX0o77hHdrzqbiM-0-c0bac5020e55d79d2c67120bd6b59bc4)
改造EmployeeProducer代码,将拦截器加入Kafka的生产者中,下面展示了需要添加的代码。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-082-2.jpg?sign=1739189596-EcCw2N2qckvGrbYnOLdEA9j2UDVC78QE-0-f3616ceeee1297742dcd30f357bb633a)
代码开发完成后,就可以进行测试了。首先,启动Kafka集群,然后我们使用Kafka自带的Consumer Console工具来测试收到的数据。执行下面的命令启动Consumer Console。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-082-3.jpg?sign=1739189596-7FvE03gu1ydiPqjPxbowvCPdCpXmYTBO-0-9d56ed78f85f19fcffa657e7de8ca3fb)
再启动EmployeeProducer程序,观察输出的结果。可以在Consumer Console上输出如下信息。可以看到通过拦截器已经将Employee对象的hiredate属性设置为当前系统的系统时间戳,如图3.4所示。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-082-01.jpg?sign=1739189596-MnSGqmv6SWazVuTLCBLiikKQOBa1ns0p-0-00bc2b53c25470f9fb627fe9d33739f8)
图3.4 在Consumer Console上输出结果
这里我们使用Kafka自带的Consumer Console命令行工具将接收到的消息直接输出到命令行中。后面章节将介绍如何使用代码程序接收自定义的消息对象。
图3.5展示了EmployeeProducer运行后的输出结果,从输出的结果可以看到生产者发送成功的消息总数和发送失败的消息总数。
![](https://epubservercos.yuewen.com/527ABA/23020654009771406/epubprivate/OEBPS/Images/42653-00-083-01.jpg?sign=1739189596-sYGV0U6BsZCJaeK9mIfDbKnSzo4oNQJY-0-786f2763321262c9e8393b1092fab861)
图3.5 EmployeeProducer执行的输出