Reactor单线程案例代码

  • 如下是单线程的JAVA NIO编程模型。

  • 首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。

  • 客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。

  • 当客户端向建立的SocketChannel发送请求时,服务端的Selector就会监听到OP_READ事件,并触发相应的处理逻辑。当服务端向客户端写数据时,会触发服务端Selector的OP_WRITE事件,从而执行响应的处理逻辑。

  • 这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是……

  • Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.

  • 优化点在于:通道连接|读取或写入|业务处理均采用单线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。

    img

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
      public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
    // 打开服务端 Socket
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    // 打开 Selector
    Selector selector = Selector.open();

    // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
    serverSocketChannel.socket().bind(new InetSocketAddress(8080));
    serverSocketChannel.configureBlocking(false);

    // 将 channel 注册到 selector 中.
    // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
    // 注册到 Selector 中.
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
    // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
    if (selector.select(TIMEOUT) == 0) {
    System.out.print(".");
    continue;
    }

    // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

    while (keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
    keyIterator.remove();

    if (key.isAcceptable()) {
    // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
    // 代表客户端的连接
    // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
    // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
    clientChannel.configureBlocking(false);
    //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
    // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
    }

    if (key.isReadable()) {
    SocketChannel clientChannel = (SocketChannel) key.channel();
    ByteBuffer buf = (ByteBuffer) key.attachment();
    long bytesRead = clientChannel.read(buf);
    if (bytesRead == -1) {
    clientChannel.close();
    } else if (bytesRead > 0) {
    key.interestOps(OP_READ | SelectionKey.OP_WRITE);
    System.out.println("Get data length: " + bytesRead);
    }
    }

    if (key.isValid() && key.isWritable()) {
    ByteBuffer buf = (ByteBuffer) key.attachment();
    buf.flip();
    SocketChannel clientChannel = (SocketChannel) key.channel();

    clientChannel.write(buf);

    if (!buf.hasRemaining()) {
    key.interestOps(OP_READ);
    }
    buf.compact();
    }
    }
    }
    }
    }

Kafka Reactor模式设计思路

  • SelectionKey.OP_READ:Socket 读事件,以从远程发送过来了相应数据

  • SelectionKey.OP_WRITE:Socket写事件,即向远程发送数据

  • SelectionKey.OP_CONNECT:Socket连接事件,用来客户端同远程Server建立连接的时候注册到Selector,当连接建立以后,即对应的SocketChannel已经准备好了,用户可以从对应的key上取出SocketChannel.

  • SelectionKey.OP_ACCEPT:Socket连接接受事件,用来服务器端通过ServerSocketChannel绑定了对某个端口的监听,然后会让其SocketChannel对应的socket注册到服务端的Selector上,并关注该OP_ACCEPT事件。

  • Kafka的网络层入口类是SocketServer。我们知道,kafka.Kafka是Kafka Broker的入口类,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startup可以从main()方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer。

  • Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。然后,Acceptor会负责构造自己管理的一个或者多个Processor对象。其实,每一个Processor都是一个独立线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private[kafka] class Acceptor(val endPoint: EndPoint,
    val sendBufferSize: Int,
    val recvBufferSize: Int,
    brokerId: Int,
    processors: Array[Processor],
    connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

    private val nioSelector = NSelector.open()
    val serverChannel = openServerSocket(endPoint.host, endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host, endPoint.port套接字

    //Acceptor被构造的时候就会启动所有的processor线程
    this.synchronized {
    //每个processor创建一个单独线程
    processors.foreach { processor =>
    Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
    }
  • Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法Acceptor.accept()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
    val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
    try {
    //socketChannel参数设置
    processor.accept(socketChannel)//将SocketChannel交给process进行处理
    } catch {
    //异常处理
    }
    }

    //Processor.accept():
    /**
    * Queue up a new connection for reading
    */
    def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
    }
  • 每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    override def run() {
    startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
    while (isRunning) {
    try {
    // setup any new connections that have been queued up
    configureNewConnections()//为已经接受的请求注册OR_READ事件
    // register any new responses for writing
    processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
    poll() //调用KSelector.poll(),进行真正的数据读写
    processCompletedReceives()//调用mute,停止接受新的请求
    processCompletedSends()
    processDisconnected()
    } catch {
    //异常处理 略
    }

    debug("Closing selector - processor " + id)
    swallowError(closeAll())
    shutdownComplete()
    }
  • KSelector.register()方法,开始对远程客户端或者其它服务器的读请求(OP_READ)进行绑定和处理。KSelect.register()方法,会将服务端的SocketChannel注册到服务器端的nioSelector,并关注SelectionKey.OP_READ,即,如果发生读请求,可以取出对应的Channel进行处理。这里的Channel也是Kafka经过封装以后的KafkaChannel对象

    1
    2
    3
    4
    5
    6
    7
    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
    //如果是SocketServer创建的这个对象并且是纯文本,则channelBuilder是@Code PlainTextChannelBuilder
    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//构造一个KafkaChannel
    key.attach(channel);//将KafkaChannel对象attach到这个registration,以后可以通过调用SelectionKey.attachment()获得这个对象
    this.channels.put(id, channel);//记录这个Channel
    }
  • Processor.processCompletedReceives()通过遍历completedReceives,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /*
    * 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
    */
    private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
    try {
    //receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
    val channel = selector.channel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
    channel.socketAddress)
    val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
    requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
    selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
    } catch {
    //异常处理 略
    }
    }
    }

img

  • 详情源码剖析请参考如下博客,讲解非常详细。

    1
    2
    https://blog.csdn.net/zhanyuanlin/article/details/76556578
    https://blog.csdn.net/zhanyuanlin/article/details/76906583
  • RequestChannel 负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //创建RequestChannel,有totalProcessorThreads个responseQueue队列,
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
    private var responseListeners: List[(Int) => Unit] = Nil
    //request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
    private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
    //responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
    private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
    for(i <- 0 until numProcessors) //初始化responseQueues
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

    //一些metrics用来监控request和response的数量,代码略
    }
  • KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
      class KafkaRequestHandlerPool(val brokerId: Int,
    val requestChannel: RequestChannel,
    val apis: KafkaApis,
    numThreads: Int) extends Logging with KafkaMetricsGroup {

    /* a meter to track the average free capacity of the request handlers */
    private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

    this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
    val threads = new Array[Thread](numThreads)
    //初始化由KafkaRequestHandler线程构成的线程数组
    val runnables = new Array[KafkaRequestHandler](numThreads)
    for(i <- 0 until numThreads) {
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    threads(i).start()
    }
  • KafkaRequestHandler.run()方法,就是不断从requestQueue中取出请求,调用API层业务处理逻辑进行处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def run() {
    while(true) {
    try {
    var req : RequestChannel.Request = null
    while (req == null) {
    //略
    req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
    //略
    apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
    } catch {}
    }
    }

参数调优设置

  • numProcessorThreads:通过num.network.threads进行配置,单个Acceptor所管理的Processor对象的数量。
  • maxQueuedRequests:通过queued.max.requests进行配置,请求队列所允许的最大的未响应请求的数量,用来给ConnectionQuotas进行请求限额控制,避免Kafka Server产生过大的网络负载;
  • totalProcessorThreads:计算方式为numProcessorThreads * endpoints.size,即单台机器总的Processor的数量;
  • maxConnectionsPerIp:配置项为max.connections.per.ip,单个IP上的最大连接数,用来给ConnectionQuotas控制连接数;
  • num.io.threads:表示KafkaRequestHander实际从队列中获取请求进行执行的线程数,默认是8个。

总结

  • 通过Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server的Acceptor进行处理,进一步交由Processor、Kafka Server将请求交付给KafkaRequestHandler具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端。
  • 由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰