kafka集群Broker端基于Reactor模式请求处理流程
Reactor单线程案例代码
如下是单线程的JAVA NIO编程模型。
首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。
客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。
当客户端向建立的SocketChannel发送请求时,服务端的Selector就会监听到OP_READ事件,并触发相应的处理逻辑。当服务端向客户端写数据时,会触发服务端Selector的OP_WRITE事件,从而执行响应的处理逻辑。
这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是……
Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.
优化点在于:通道连接|读取或写入|业务处理均采用单线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 打开服务端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打开 Selector
Selector selector = Selector.open();
// 服务端 Socket 监听8080端口, 并配置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 将 channel 注册到 selector 中.
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
// 注册到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
keyIterator.remove();
if (key.isAcceptable()) {
// 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
// 代表客户端的连接
// 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
// 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
Kafka Reactor模式设计思路
SelectionKey.OP_READ:Socket 读事件,以从远程发送过来了相应数据
SelectionKey.OP_WRITE:Socket写事件,即向远程发送数据
SelectionKey.OP_CONNECT:Socket连接事件,用来客户端同远程Server建立连接的时候注册到Selector,当连接建立以后,即对应的SocketChannel已经准备好了,用户可以从对应的key上取出SocketChannel.
SelectionKey.OP_ACCEPT:Socket连接接受事件,用来服务器端通过ServerSocketChannel绑定了对某个端口的监听,然后会让其SocketChannel对应的socket注册到服务端的Selector上,并关注该OP_ACCEPT事件。
Kafka的网络层入口类是SocketServer。我们知道,kafka.Kafka是Kafka Broker的入口类,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startup可以从main()方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer。
Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。然后,Acceptor会负责构造自己管理的一个或者多个Processor对象。其实,每一个Processor都是一个独立线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host, endPoint.port套接字
//Acceptor被构造的时候就会启动所有的processor线程
this.synchronized {
//每个processor创建一个单独线程
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
}
}Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法Acceptor.accept()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
try {
//socketChannel参数设置
processor.accept(socketChannel)//将SocketChannel交给process进行处理
} catch {
//异常处理
}
}
//Processor.accept():
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20override def run() {
startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()//为已经接受的请求注册OR_READ事件
// register any new responses for writing
processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
poll() //调用KSelector.poll(),进行真正的数据读写
processCompletedReceives()//调用mute,停止接受新的请求
processCompletedSends()
processDisconnected()
} catch {
//异常处理 略
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}KSelector.register()方法,开始对远程客户端或者其它服务器的读请求(OP_READ)进行绑定和处理。KSelect.register()方法,会将服务端的SocketChannel注册到服务器端的nioSelector,并关注SelectionKey.OP_READ,即,如果发生读请求,可以取出对应的Channel进行处理。这里的Channel也是Kafka经过封装以后的KafkaChannel对象
1
2
3
4
5
6
7public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
//如果是SocketServer创建的这个对象并且是纯文本,则channelBuilder是@Code PlainTextChannelBuilder
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//构造一个KafkaChannel
key.attach(channel);//将KafkaChannel对象attach到这个registration,以后可以通过调用SelectionKey.attachment()获得这个对象
this.channels.put(id, channel);//记录这个Channel
}Processor.processCompletedReceives()通过遍历completedReceives,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/*
* 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
*/
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
try {
//receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
} catch {
//异常处理 略
}
}
}
详情源码剖析请参考如下博客,讲解非常详细。
1
2https://blog.csdn.net/zhanyuanlin/article/details/76556578
https://blog.csdn.net/zhanyuanlin/article/details/76906583RequestChannel 负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13//创建RequestChannel,有totalProcessorThreads个responseQueue队列,
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
//request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
//responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors) //初始化responseQueues
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
//一些metrics用来监控request和response的数量,代码略
}KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
//初始化由KafkaRequestHandler线程构成的线程数组
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}KafkaRequestHandler.run()方法,就是不断从requestQueue中取出请求,调用API层业务处理逻辑进行处理
1
2
3
4
5
6
7
8
9
10
11
12def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
//略
req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
//略
apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
} catch {}
}
}
参数调优设置
- numProcessorThreads:通过num.network.threads进行配置,单个Acceptor所管理的Processor对象的数量。
- maxQueuedRequests:通过queued.max.requests进行配置,请求队列所允许的最大的未响应请求的数量,用来给ConnectionQuotas进行请求限额控制,避免Kafka Server产生过大的网络负载;
- totalProcessorThreads:计算方式为numProcessorThreads * endpoints.size,即单台机器总的Processor的数量;
- maxConnectionsPerIp:配置项为max.connections.per.ip,单个IP上的最大连接数,用来给ConnectionQuotas控制连接数;
- num.io.threads:表示KafkaRequestHander实际从队列中获取请求进行执行的线程数,默认是8个。
总结
- 通过Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server的Acceptor进行处理,进一步交由Processor、Kafka Server将请求交付给KafkaRequestHandler具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端。
- 由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰
- 本文链接:https://gjtmaster.github.io/2018/09/20/kafka集群Broker端基于Reactor模式请求处理流程/
- 版权声明:The author owns the copyright, please indicate the source reproduced.