Producer核心工作流程

  • Producer首先使用用户主线程将待发送的消息封装进一个ProducerRecord类实例中。
  • 进行序列化后,发送给Partioner,由Partioner确定目标分区后,发送到Producer程序中的一块内存缓冲区中。
  • Producer的另一个工作线程(即Sender线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次,统一发送给对应的broker中。

producer 主要参数设置

producer 参数acks 设置(无数据丢失)

在消息被认为是“已提交”之前,producer需要leader确认的produce请求的应答数。该参数用于控制消息的持久性,目前提供了3个取值:

acks = 0: 表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。

acks = -1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

acks = 1: 表示leader副本必须应答此produce请求并写入消息到本地日志,之后produce请求被认为成功。如果此时leader副本应答请求之后挂掉了,消息会丢失。这是个较好的方案,提供了不错的持久性保证和吞吐。

商业环境推荐:

如果要较高的持久性要求以及无数据丢失的需求,设置acks = -1。其他情况下设置acks = 1

producer参数 buffer.memory 设置(吞吐量)

该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432合计为32M。kafka采用的是异步发送的消息架构,prducer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。

商业环境推荐:

  • 消息持续发送过程中,当缓冲区被填满后,producer立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过max.blocks.ms设置的值,一旦超过,producer则会抛出TimeoutException 异常,因为Producer是线程安全的,若一直报TimeoutException,需要考虑调高buffer.memory 了。
  • 用户在使用多个线程共享kafka producer时,很容易把 buffer.memory 打满。

producer参数 compression.type 设置(lZ4)

producer压缩器,目前支持none(不压缩),gzip,snappy和lz4。

商业环境推荐:

基于公司的大数据平台,试验过目前lz4的效果最好。当然2016年8月,FaceBook开源了Ztandard。官网测试: Ztandard压缩率为2.8,snappy为2.091,LZ4 为2.101 。

producer参数 retries设置(注意消息乱序,EOS)

producer重试的次数设置。重试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下producer会尝试重试。

商业环境推荐:

  • producer还有个参数:max.in.flight.requests.per.connection。如果设置该参数大约1,那么设置retries就有可能造成发送消息的乱序。
  • 版本为0.11.0.1的kafka已经支持”精确到一次的语义”,因此消息的重试不会造成消息的重复发送。

producer参数batch.size设置(吞吐量和延时性能)

producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。默认值为16K,合计为16384.

商业环境推荐:

  • batch 越小,producer的吞吐量越低,越大,吞吐量越大。

producer参数linger.ms设置(吞吐量和延时性能)

producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。

商业环境推荐:

  • 为了减少了网络IO,提升了整体的TPS。假设设置linger.ms=5,表示producer请求可能会延时5ms才会被发送。

producer参数max.in.flight.requests.per.connection设置(吞吐量和延时性能)

producer的IO线程在单个Socket连接上能够发送未应答produce请求的最大数量。增加此值应该可以增加IO线程的吞吐量,从而整体上提升producer的性能。不过就像之前说的如果开启了重试机制,那么设置该参数大于1的话有可能造成消息的乱序。

商业环境推荐:

  • 默认值5是一个比较好的起始点,如果发现producer的瓶颈在IO线程,同时各个broker端负载不高,那么可以尝试适当增加该值.
  • 过大增加该参数会造成producer的整体内存负担,同时还可能造成不必要的锁竞争反而会降低TPS

Java客户端

KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一个用于向kafka集群发送数据的Java客户端。该Java客户端是线程安全的,多个线程可以共享同一个producer实例,而且这通常比在多个线程中每个线程创建一个实例速度要快些。本文介绍的内容来自于kafka官方文档,详情参见KafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.test.kafkaProducer;

import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;


public class TestProducer {

public static void main(String[] args) {

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.137.200:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生产者发送消息
String topic = "mytopic";
Producer<String, String> procuder = new KafkaProducer<String,String>(props);
for (int i = 1; i <= 10; i++) {
String value = "value_" + i;
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
procuder.send(msg);
}
//列出topic的相关信息
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
partitions = procuder.partitionsFor(topic);
for(PartitionInfo p:partitions)
{
System.out.println(p);
}

System.out.println("send message over.");
procuder.close(100,TimeUnit.MILLISECONDS);
}
}

producer包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息。位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。
用于建立消费者的相关参数说明及其默认值参见producerconfigs,此处对代码中用到的几个参数进行解释:
bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;
acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
retries:生产者发送失败后,重试的次数
batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。
batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。
buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

kafka客户端的API

KafkaProducer对象实例化方法,可以使用map形式的键值对或者Properties对象来配置客户端的属性

1
2
3
4
5
6
7
8
/*
*keySerializer:发送数据key值的序列化方法,该方法实现了Serializer接口
*valueSerializer:发送数据value值的序列化方法,该方法实现了Serializer接口
*/
public KafkaProducer(Map<String,Object> configs);
public KafkaProducer(Map<String,Object> configs, Serializer<K> keySerializer,Serializer<V> valueSerializer);
public KafkaProducer(Properties properties);
public KafkaProducer(Properties properties, Serializer<K> keySerializer,Serializer<V> valueSerializer);

消息发送方法send()

1
2
3
4
5
6
/*
*record:key-value形式的待发送数据
*callback:到发送的消息被borker端确认后的回调函数
*/
public Future<RecordMetadata> send(ProducerRecord<K,V> record); // Equivalent to send(record, null)
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback);

send方法负责将缓冲池中的消息异步的发送到broker的指定topic中。异步发送是指,方法将消息存储到底层待发送的I/O缓存后,将立即返回,这可以实现并行无阻塞的发送更多消息。send方法的返回值是RecordMetadata类型,它含有消息将被投递的partition信息,该条消息的offset,以及时间戳。
因为send返回的是Future对象,因此在该对象上调用get()方法将阻塞,直到相关的发送请求完成并返回元数据信息;或者在发送时抛出异常而退出。
阻塞发送的方法如下:

1
2
3
4
String key = "Key";
String value = "Value";
ProducerRecord<String, String> record = new ProducerRecord<String, String>(key, value);
producer.send(record).get();

可以充分利用回调函数和异步发送方式来确认消息发送的进度:

1
2
3
4
5
6
7
8
9
10
ProducerRecord<String,String> record = new ProducerRecord<String,String>("the-topic", key, value);
producer.send(myRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});

flush

立即发送缓存数据

1
public void flush();

调用该方法将使得缓冲区的所有消息被立即发送(即使linger.ms参数被设置为大于0),且会阻塞直到这些相关消息的发送请求完成。flush方法的前置条件是:之前发送的所有消息请求已经完成。一个请求被视为完成是指:根据acks参数配置项收到了相应的确认,或者发送中抛出异常失败了。
下面的例子展示了从一个topic消费后发到另一个topic,flush方法在此非常有用,它提供了一种方便的方法来确保之前发送的消息确实已经完成了:

1
2
3
4
for(ConsumerRecord<String, String> record: consumer.poll(100))
producer.send(new ProducerRecord("my-topic", record.key(), record.value());
producer.flush(); //将缓冲区的消息立即发送
consumer.commit(); //消费者手动确认消费进度

partitionsFor

1
2
//获取指定topic的partition元数据信息
public List<PartitionInfo> partitionsFor(String topic);

close

1
2
3
//关闭producer,方法将被阻塞,直到之前的发送请求已经完成
public void close();// equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
public void close(long timeout,TimeUnit timeUnit); //同上,方法将等待timeout时长,以让未完成的请求完成发送