深入理解Kafka:核心设计与实践原理
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

2.1 客户端开发

一个正常的生产逻辑需要具备以下几个步骤:

(1)配置生产者客户端参数及创建相应的生产者实例。

(2)构建待发送的消息。

(3)发送消息。

(4)关闭生产者实例。

代码清单 1-2 中已经简单对生产者客户端的编码做了一个基本演示,本节对其修改以做具体的分析,如代码清单2-1所示。

代码清单2-1 生产者客户端示例代码

相比代码清单 1-2 而言,这里仅仅是让编码的逻辑显得更加“正统”一些,也更加方便下面内容的陈述。

这里有必要单独说明的是构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个 value 属性,比如“Hello,Kafka!”只是ProducerRecord对象中的一个属性。ProducerRecord类的定义如下(只截取成员变量):

其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka 0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中,详情参见2.1.4节。有key的消息还可以支持日志压缩的功能,详情参见5.4节。value是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息,详情参见5.4节。timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间,详情参见5.2节。

接下来我们将按照生产逻辑的各个步骤来一一做相应分析。

2.1.1 必要的参数配置

在创建真正的生产者实例前需要配置相应的参数,比如需要连接的Kafka集群地址。参照代码清单2-1中的initConfig()方法,在Kafka生产者客户端KafkaProducer中有3个参数是必填的。

· bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。有关此参数的更多释义可以参考6.5.2节。

· key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单2-1中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>对应的就是消息中key和value的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如代码清单2-1中的org.apache.kafka.common.serialization.StringSerializer,单单指定StringSerializer是错误的,更多有关序列化的内容可以参考2.1.3节。

注意到代码清单2-1中的initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为“”。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”“producer-2”,即字符串“producer-”与数字的拼接。

KafkaProducer中的参数众多,远非示例initConfig()方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如“key.serializer”“max.request.size”“interceptor.classes”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称,以代码清单2-1中的initConfig()方法为例,引入ProducerConfig后的修改结果如下:

注意到上面的代码中key.serializer和value.serializer参数对应类的全限定名比较长,也比较容易写错,这里通过Java中的技巧来做进一步的改进,相关代码如下:

如此代码便简洁了许多,同时进一步降低了人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,示例如下:

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

KafkaProducer 中有多个构造方法,比如在创建 KafkaProducer 实例时并没有设定key.serializer 和 value.serializer 这两个配置参数,那么就需要在构造方法中添加对应的序列化器,示例如下:

其内部原理和无序列化器的构造方法一样,不过就实际应用而言,一般都选用 public KafkaProducer(Properties properties)这个构造方法来创建KafkaProducer实例。

2.1.2 消息的发送

在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。通过代码清单2-1中我们已经了解了ProducerRecord的属性结构,其中topic属性和value属性是必填项,其余属性是选填项,对应的ProducerRecord的构造方法也有多种,参考如下:

代码清单 2-1 中使用的是最后一种构造方法,也是最简单的一种,这种方式相当于将ProducerRecord中除topic和value外的属性全部值设置为null。在实际的应用中,还会用到其他构造方法,比如要指定 key,或者添加 headers 等。有可能会遇到这些构造方法都不满足需求的情况,需要自行添加更多的构造方法,比如下面的示例:

可以参阅11.1节的内容来了解此构造方法的具体应用。注意,针对不同的消息,需要构建不同的ProducerRecord对象,在实际应用中创建ProducerRecord对象是一个非常频繁的动作。

创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

代码清单2-1中的这种发送方式就是发后即忘,它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

KafkaProducer 的 send()方法并非是 void 类型,而是 Future<RecordMetadata>类型,send()方法有2个重载方法,具体定义如下:

要实现同步的发送方式,可以利用返回的Future对象实现,示例如下:

实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

也可以在执行完send()方法之后不直接调用get()方法,比如下面的一种同步发送方式的实现:

这样可以获取一个RecordMetadata对象,在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息,则可以使用这个方式。如果不需要,则直接采用producer.send(record).get()的方式更省事。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然KafkaProducer.send()方法的返回值是一个Future类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future中的 get(long timeout,TimeUnit unit)方法实现可超时的阻塞。

KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 1.4 节中提及的RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0,配置方式参考如下:

示例中配置了10次重试。如果重试了10次之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

我们再来了解一下异步发送的方式,一般是在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问,send()方法的返回值类型就是Future,而Future本身就可以用作异步的逻辑处理。这样做不是不行,只不过Future里的 get()方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的Future对象的处理难免会引起代码处理逻辑的混乱。使用Callback的方式非常简洁明了,Kafka有响应时就会回调,要么发送成功,要么抛出异常。异步发送方式的示例如下:

示例代码中遇到异常时(exception!=null)只是做了简单的打印操作,在实际应用中应该使用更加稳妥的方式来处理,比如可以将异常记录以便日后分析,也可以做一定的处理来进行消息重发。onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。

对于同一个分区而言,如果消息record1于record2之前先发送(参考上面的示例代码),那么KafkaProducer就可以保证对应的callback1在callback2之前调用,也就是说,回调函数的调用也可以保证分区有序。

通常,一个KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用KafkaProducer的close()方法来回收资源。下面的示例中发送了100条消息,之后就调用了close()方法来回收所占用的资源:

close()方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。与此同时,KafkaProducer还提供了一个带超时时间的close()方法,具体定义如下:

如果调用了带超时时间timeout的close()方法,那么只会在等待timeout时间内来完成所有尚未完成的请求处理,然后强行退出。在实际应用中,一般使用的都是无参的close()方法。

2.1.3 序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。在代码清单2-1中,为了方便,消息的key和value都使用了字符串,对应程序中的序列化器也使用了客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有3个方法:

configure()方法用来配置当前类,serialize()方法用来执行序列化操作。而close()方法用来关闭当前的序列化器,一般情况下 close()是一个空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer,那么是无法解析出想要的数据的。本节讨论的都是与生产者相关的,对于与消费者相关的反序列化器的内容请参见3.2.3节。

下面就以StringSerializer为例来看看Serializer接口中的3个方法的使用方法,StringSerializer类的具体实现如代码清单2-2所示。

代码清单2-2 StringSerializer的代码实现

首先是configure()方法,这个方法是在创建KafkaProducer实例的时候调用的,主要用来确定编码类型,不过一般客户端对于 key.serializer.encoding、value.serializer.encoding和serializer.encoding这几个参数都不会配置,在KafkaProducer的参数集合(ProducerConfig)里也没有这几个参数(它们可以看作用户自定义的参数),所以一般情况下encoding的值就为默认的“UTF-8”。serialize()方法非常直观,就是将String类型转为byte[]类型。

如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。下面就以一个简单的例子来介绍自定义类型的使用方法。

假设我们要发送的消息都是Company对象,这个Company的定义很简单,只有名称name和地址address,示例代码参考如下(为了构建方便,示例中使用了lombok[2]工具):

下面我们再来看一下Company对应的序列化器CompanySerializer,示例代码如代码清单2-3所示。

代码清单2-3 自定义的序列化器CompanySerializer

上面的这段代码的逻辑很简单,configure()和close()方法也都为空。与此对应的反序列化器CompanyDeserializer的详细实现参见3.2.3节。

如何使用自定义的序列化器CompanySerializer呢?只需将KafkaProducer的value.serializer参数设置为CompanySerializer类的全限定名即可。假如我们要发送一个Company对象到Kafka,关键代码如代码清单2-4所示。

代码清单2-4 自定义序列化器使用示例

注意,示例中消息的 key 对应的序列化器还是 StringSerializer,这个并没有改动。其实key.serializer和value.serializer并没有太大的区别,读者可以自行修改key对应的序列化器,看看会不会有不一样的效果。

2.1.4 分区器

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器(下一章会详细介绍)一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,这个接口中定义了2个方法,具体如下所示。

其中partition()方法用来计算分区号,返回值为int类型。partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()方法在关闭分区器的时候用来回收一些资源。

Partitioner 接口还有一个父接口 org.apache.kafka.common.Configurable,这个接口中只有一个方法:

Configurable接口中的configure()方法主要用来获取配置信息及初始化数据。

在默认分区器 DefaultPartitioner 的实现中,close()是空方法,而在 partition()方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。

除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制,具体的实现可以参考下面的示例代码,如代码清单2-5所示。

代码清单2-5 自定义分区器实现

实现自定义的DemoPartitioner类之后,需要通过配置参数partitioner.class来显式指定这个分区器。示例如下:

这个自定义分区器的实现比较简单,读者也可以根据自身业务的需求来灵活实现分配分区的计算方式,比如一般大型电商都有多个仓库,可以将仓库的名称或ID作为key来灵活地记录商品信息。

2.1.5 生产者拦截器

拦截器(Interceptor)是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容,有关消费者拦截器的具体细节请参考3.2.9节。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:

KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close()方法主要用于在关闭拦截器时执行一些资源的清理工作。在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

ProducerInterceptor 接口与 2.1.4 节中的 Partitioner 接口一样,它也有一个同样的父接口Configurable,具体的内容可以参见Partitioner接口的相关介绍。

下面通过一个示例来演示生产者拦截器的具体用法,ProducerInterceptorPrefix 中通过onSend()方法来为每条消息添加一个前缀“prefix1-”,并且通过onAcknowledgement()方法来计算发送消息的成功率。ProducerInterceptorPrefix类的具体实现如代码清单2-6所示。

代码清单2-6 生产者拦截器示例

实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数interceptor.classes中指定这个拦截器,此参数的默认值为“”。示例如下:

然后使用指定了ProducerInterceptorPrefix的生产者连续发送10条内容为“kafka”的消息,在发送完之后客户端打印出如下信息:

如果消费这 10 条消息,会发现消费了的消息都变成了“prefix1-kafka”,而不是原来的“kafka”。

KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。下面我们再添加一个自定义拦截器ProducerInterceptorPrefixPlus,它只实现了Interceptor接口中的onSend()方法,主要用来为每条消息添加另一个前缀“prefix2-”,具体实现如下:

接着修改生产者的interceptor.classes配置,具体实现如下:

此时生产者再连续发送10条内容为“kafka”的消息,那么最终消费者消费到的是10条内容为“prefix2-prefix1-kafka”的消息。如果将interceptor.classes配置中的两个拦截器的位置互换:

那么最终消费者消费到的消息为“prefix1-prefix2-kafka”。

如果拦截链中的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。