这个类是网络通信的核心类,它持有这Acceptor和Processor对象。
这个是控制连接数配额的类,
涉及到的Broker配置有:
AbstractServerThread类:这是Acceptor线程和Processor线程的抽象基类,它定义了一个抽象方法wakeup(),主要是用来唤醒Acceptor线程和Processor对应的Selector的,当然还有一些共用方法
Acceptor线程类:继承自AbstractServerThread,这是接收和创建外部TCP连接的线程。每个SocketServer实例一般会创建一个Acceptor线程(如果listeners配置了多个就会创建多个Acceptor)。它的唯一目的就是创建连接,并将接收到的SocketChannel(SocketChannel通道用于传输数据)传递给下游的Processor线程处理,Processor主要是处理连接之后的事情,例如读写I/O。
Processor线程类:这是处理单个TCP连接上所有请求的处理线程。每个Acceptor实例创建若干个(num.network.threads)Processor线程。Processor线程负责将接收到的SocketChannel(SocketChannel通道用于传输数据。),注册读写事件,当数据传送过来的时候,会立即读取Request数据,通过解析之后,然后将其添加到RequestChannel的requestQueue队列上,同时还负责将Response返还给Request发送方。
简单画了一张两个类之间的关系图
既然两个都是可执行线程,那我们看看两个线程的run方法都做了哪些事情
//阻塞查询Selector是否有监听到新的事件valready=nioSelector.select(500)//如果有事件,则查询具体的事件和通道if(ready>0>{//获取所有就绪事件准备处理valkeys=nioSelector.selectedKeys()}3、遍历刚刚监听到的事件,如果该SelectionKey不包含OP_ACCEPT(建立连接)事件,则抛出异常,通常不会出现这个异常。
Unrecognizedkeystateforacceptorthread4、如果SelectionKey包含OP_ACCEPT(建立连接)事件,则可以通过这个SelectionKey拿到serverSocketChannel,通过serverSocketChannel拿到socketChannel,并且将SocketChannel设置为非阻塞模式。
valserverSocketChannel=key.channel().asInstanceOf[ServerSocketChannel]//调用accept方法就可以拿到ScoketChannel了。valsocketChannel=serverSocketChannel.accept()//设置为非阻塞模式就可以在异步模式下调用connect(),read()和write()了。socketChannel.configureBlocking(false)5、接下来,把上面拿到的SocketChannel以遍历的形式给Acceptor下面的Procesor,让Processor来执行后面的处理。分配的体现形式是,将拿到的SocketChannel保存在Processor中的newConnections阻塞队列中,这个newConnections上限是20,在代码里面写死了的,也就是说一个Processor同时最多只能处理20个连接,那么所有的Processor能处理的最大连接就是Processor数量*20;如果你的连接请求并发度很高,可以尝试调大num.network.threads
6、最后,如果newConnections队列放入了一个新的SocketChannel,则会调用一下对应Processor实例的wakeup()方法。
具体Request的处理类,所有的请求方法处理逻辑都放在这个里面。
KafkaRequestHandler的线程池,KafkaRequestHandler线程的数量由配置num.io.threads决定。
请求处理类,每个Handler都会去requestChannel的requestQueue队列里面poll请求,然后去处理,最终调用的处理方法是KafkaApis.handle()
这几个类之间的关系如下
数据面板是用来处理Broker与Broker/Client之间的网络模型模块,与之相对的是控制器面板。
控制器面板是专门用于Controller与Broker之间的网络通信模块。
其实本质上他们都是一模一样的,但是为了将Controller的通信和普通通信隔离,才有这么两个概念。
上面的网络通信模型就是以数据面板来分析的,因为本质是一样的,只是有一些配置不一样。
那么,数据面板就不详细讲了,我们主要讲下控制器面板的不一样的地方。
首先,要启用控制器面板,必须配置control.plane.listener.name.并且这个监听器名称必须在listeners里面有配置
否则的话,是不会专用的控制器链接的EndPoint的。
例如:
##所有的监听器isteners=INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093,CONTROLLER://192.1.1.8:9094##监听器对应的安全协议listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROLLER:SSL##控制器control.plane.listener.name=CONTROLLER在启动时,代理将开始使用安全协议“SSL”监听“192.1.1.8:9094”。
在控制器端,当它通过zookeeper发现代理发布的端点时,它将使用control.plane.listener.name找到端点,它将用于建立与代理的连接。
上面我们主要分析了一下,Kafka中的网络通信模型,那么聪明的你应该肯定能够看的出来,它是使用线程模型中的Reactor模式来实现的。
Reactor模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor模式也叫Dispatcher模式。即I/O多路复用统一监听事件,收到事件后分发(Dispatch给某进程),是编写高性能网络服务器的必备技术之一。
根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现:
我们主要了解一下主从Reactor多线程
针对单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行。
方案说明:
(1)Kafka的网络模型使用了Reactor模式的哪种实现方式?
答案:3。使用了主从Reactor多线程的实现方式.
MainReactor(Acceptor)只负责监听OP_ACCEPT事件,监听到之后把SocketChannel传递给SubReactor(Processor),每个Processor都有自己的Selector。SubReactor会监听并处理其他的事件,并最终把具体的请求传递给KafkaRequestHandlerPool。
很典型的主从Reactor多线程模式。
(2)什么是ControllerPlane(控制器面板),什么是DataPlane(数据面板)
控制器面板:主要处理控制器类的的请求数据面板:主要处理数据类的请求。