线程安全

kafka的Producer是线程安全的,用户可以非常非常放心的在多线程中使用。

但是官方建议:通常情况下,一个线程维护一个kafka 的producer的效率会更高。

Producer 消息发送流程

  • 第一步:封装ProducerRecord
  • 第二步:分区器Partioner进行数据路由,选择某一个Topic分区。如果没有指定key,消息会被均匀的分配到所有分区。
  • 第三步:确定好分区,就会找分区对应的leader,接下来就是副本同步机制。

Producer官方实例

Fire and Fogret案例 (无所谓心态)

  • 发送之后便不再理会发送结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

    producer.close();

异步回调官方案例 (不阻塞)

  • JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制。
  • Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

  • RecordMetadata 和 Exception 不可能同时为空,消息发送成功时,Exception为null,消息发送失败时,metadata为空。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);

    producer.send(myRecord,
    new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e != null) {
    e.printStackTrace();
    } else {
    System.out.println("The offset of the record we just sent is: " + metadata.offset());
    }
    }
    });

同步发送官方案例 (阻塞)

  • 通过 producer.send(record)返回Future对象,通过调用Future.get()进行无限等待结果返回。

    1
    producer.send(record).get()

基于事务发送官方案例 (原子性和幂等性)

  • From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producerand the transactional producer. The idempotent producer strengthens Kafka’s deliverysemantics from at least once to exactly once delivery. In particular producer retries willno longer introduce duplicates. The transactional producer allows an application to sendmessages to multiple partitions (and topics!) atomically.

  • To enable idempotence, the enable.idempotence configuration must be set to true. If set,the retries config will default to Integer.MAX_VALUE and the acks config will default to all.There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

    producer.initTransactions();

    try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
    } catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
    }
    producer.close();
  • As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.

可重试异常(继承RetriableException)

  • LeaderNotAvailableException :分区的Leader副本不可用,这可能是换届选举导致的瞬时的异常,重试几次就可以恢复
  • NotControllerException:Controller主要是用来选择分区副本和每一个分区leader的副本信息,主要负责统一管理分区信息等,也可能是选举所致。
  • NetWorkerException :瞬时网络故障异常所致。

不可重试异常

  • SerializationException:序列化失败异常
  • RecordToolLargeException:消息尺寸过大导致。

异常的区别对待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e ==null){
//正常处理逻辑
System.out.println("The offset of the record we just sent is: " + metadata.offset());

}else{

if(e instanceof RetriableException) {
//处理可重试异常
......
} else {
//处理不可重试异常
......
}
}
}
});

Producer的绅士关闭

  • producer.close():优先把消息处理完毕,优雅退出。
  • producer.close(timeout): 超时时,强制关闭。