当我们使用消费者在调用服务提供者的时候,会经过一系列的Filter链,注意,这个Filter链在服务提供者端有,在服务调用者端也是有的,可以根据下面这个图理解下,大体是这个样子的。
首先我们先认识一下Filter接口
接着就到RegistryProtocol的export方法
这里就是从bounds这个缓存中获取exporter,如果没有就创建,然后加到缓存中,我们着重看下画红框那行,其实这个Protocol根据自适应的规则,就是DubboProtocol了,它跟RegistryProtocol一样,也是被wrapper包装了,然而不一样的是在ProtocolFilterWrapper的export方法中走的是buildInvokerChain方法,这个方法就是生成Filter链的。
服务调用者其实跟服务提供者的生成链的时机差不多,当注册中心的providers,configurators,routers发生变化的时候,就会通知RegistryDirectory的notify方法,然后就需要将url转换成invoker,在转换invoker的时候,有这么一行代码(出自com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers)
我们在上面两个小节已经将ProtocolFilterWrapper的refer与export方法讲解了,接下来就是buildInvokerChain方法,我们重点看下子
调用的时候就是从上面的那个invoker对象调用下来,然后到filter,然后又到invoker,又到filter,直到真实的那个invoker,其中,filter在list中下标越大的越靠里,也就是在调用的时候这个优先级就越低。这是invoke方法的调用流程,你要是其他方法的话直接就是调用的真实invoker的。
我们这边稍微讲一下获取filter列表,咱们这边以调用者端为例子
这是dubbov3里面服务提供者的filters,调用顺序是这个样子的:echo->classloader->generic->context->exception->moniter->timeout->trace
我这里引用官方的解释:
回声测试用于检测服务是否可用,回声测试按照正常请求流程执行,能够测试整个调用是否通畅,可用于监控。
我们可以看出来这个回声测试其实就是检测服务是否可用的,看看调用是否畅通。
我这里使用注解的方式使用一下这个回声测试。
我这里有一个服务提供者,然后暴露接口IHelloProviderService,
publicinterfaceIHelloProviderService{StringgetName(Integerid);}实现类可以随便写写,别忘了加dubbo的@Service注解就行
@org.springframework.stereotype.Service@ServicepublicclassIHelloProviderServiceImplimplementsIHelloProviderService{@OverridepublicStringgetName(Integerid){return"test";}}2.服务调用者编码服务调用者这边使用@Reference注解来生成接口的代理。
@RestControllerpublicclassTestController{@Reference(check=false,interfaceClass=IHelloProviderService.class)privateEchoServiceechoService;@RequestMapping("/tests")publicStringtest(){Objectaaa=echoService.$echo("aaa");System.out.println(aaa);returnnull;}}我这里使用的是EchoService类型,然后代理的接口是IHelloProviderService,可以看下这个EchoService,这个接口是dubbo提供的,就一个echo方法,然后在生成代理类的时候代理类实现业务接口的同时也会实现这个接口,重写里面echo方法,然后在生成代理类的时候代理类实现业务接口的同时也会实现这个接口,重写里面echo方法,然后在生成代理类的时候代理类实现业务接口的同时也会实现这个接口,重写里面echo方法,所有我们在获取这个接口实现类的时候可以在EchoService类型与你的业务接口类型间相互转换,这里使用注解的方式可能不好理解,使用xml的方式会好理解些。关于这个实现原理我们在后面解析的时候会说到。
publicinterfaceEchoService{/***echotest.**@parammessagemessage.*@returnmessage.*/Object$echo(Objectmessage);}4.1.3调用测试使用postman或者浏览器请求一下,然后可以看到打印出来我们请求的参数,这里dubbo就是将你请求参数原封不动返回给你。
在服务调用者端,我们在生成某个接口的代理类的时候,dubbo会自动给你的代理类implements一个EchoService接口,然后重写里面的$ehco()方法,多说无益,直接看下dubbo生成的代理类(这里我们还是将之前讲解Proxy的那张图拿过来)。
服务提供者这端是使用的Filter进行拦截的,dubbo在服务提供者端提供了一个EchoFilter来处理回声调用,我们可以看下这个Filter的源码。
在invoke方法中,首先判断,如果调用方法名字是“$echo”,然后参数不是null,只有一个参数,他就认为是个回声测试调用,直接封装Result将你传过来的参数返回。
dubbo提供了一个调用上下文用来给服务提供者或者服务调用者传递一些信息,它只会在当前这个调用链上生效,比如说我服务调用者的某个方法中有两个rpc调用,然后分别调用同一个或者不同的服务提供者的某个方法,这个两个rpc调用就会有两个context,你可以往对应的context中塞东西,然后对应的服务提供者就能够根据context获取到,同理,服务提供者也可以给服务调用着带东西,对应的服务调用者也能够获得到。
我们先来看下这个RpcContext。
我们这里就使用一下这个RpcContext来帮我们干点事情。这里我们在调用者端使用RpcContext给服务提供者端带点东西,同时服务提供者端回带点东西给服务调用者端。演示如下:
publicinterfaceIHelloProviderService{StringgetName(Integerid);}服务调用者端@RestControllerpublicclassTestController{@Reference(check=false)privateIHelloProviderServiceiHelloProviderService;@RequestMapping("/tests")publicStringtest(){//往context添加点东西,带给服务调用者端RpcContext.getContext().getAttachments().put("aaa","ok");Stringname=iHelloProviderService.getName(1);//从服务调用者端context获取带过来的信息StringremoteData=RpcContext.getServerContext().getAttachments().get("bbb");System.out.println("我从提供者端获得的数据:"+remoteData);returnnull;}}服务调用者端很简单,就是在调用前获取context,塞了个kv进去,在调用后获取serverContext获取了一下服务提供者给带过来的信息。
我这里就是接口的实现类,我们从context中获取服务调用者带过来的信息,同时往serverContext中塞了kv带给服务调用者。
@org.springframework.stereotype.Service@ServicepublicclassIHelloProviderServiceImplimplementsIHelloProviderService{@OverridepublicStringgetName(Integerid){StringremoteData=RpcContext.getContext().getAttachments().get("aaa");System.out.println("我从服务调用着端获得的数据是:"+remoteData);RpcContext.getServerContext().getAttachments().put("bbb","ccc");return"test";}}4.2.3测试我们这里测试一下,然后从服务提供者日志中可以看到
在我们普通的开发中,RpcContext基本用不到,但是在某些框架在做dubbo兼容适配的时候会用到,我这里举两个例子,分布式链路追踪是怎样将traceId,spanId,pId传到服务调用者的,其实就是用了这个context。再就是分布式事务是怎样将全局事务id传到子事务的,其实还是用这个传的,只不过他们在做的时候优雅一下,不会明着调用,可能会使用Filter来往里面塞,也可能是aop的形式往里面塞。
在上面的小节中我们讲解了RpcContext,还用RpcContext在服务调用者与服务提供者做了信息传递。那么这个功能是怎样实现的呢,其实很简单。首先我们得了解一下Rpc调用传输的实体RpcInvocation与Result,在进行服务调用者调用往服务提供者端发送请求就是发送的Request对象,将Request对象序列化成二进制,然后传到服务提供者端,服务提供者端就会将二进制反序列化成Request对象,这个request对象中包的就是这个RpcInvocation(调用实体),然后服务提供者进行对应方法调用(对应方法参数其实都在RpcInvocation实体中封装着)将执行结果放到RpcResult实体中,然后包上一层Response,序列化成二进制发送给服务调用者端,服务调用者端反序列化成Response,再将Response这层皮拨开,取出RpcResult,这个RpcResult就封装着调用结果集。其实不管是RpcInvocation还是RpcResult里面都有共同的成员变量,那就是attachments,这个就是存放附加参数的,我们上面塞到Context中的数据,其实都会被封装到这个成员中传输。我们可以看到,他其实就是个map
那么怎么从RpcContext中封装到这两个实体中的呢?其实这里是用了dubboFilter来做的,在服务调用着端有个ConsumerContextFilter过滤器,专门将RpcResult中的attachments塞到本地的RpcServerContext中,最后会清了本次调用context中的attachments,我们来看下实现:
ExceptionFilter的group=provider,也就是在服务提供者端才激活的Filter,它的内容也很简单,我们可以看下源代码:
在讲解TraceFilter之前我们需要演示一下dubbotelnet服务治理命令trace。
这时候你再回下车,接着我就输入trace命令了,dubbo3不支持trace命令,以下借鉴于dubbo2.6的版本。
tracecom.xuzhaocai.dubbo.provider.IHelloProviderServicegetName3注意:com.xuzhaocai.dubbo.provider.IHelloProviderService这个是接口,getName方法名,3这个是监听次数,监听3次就不监听了
它的使用也就是这个样子的:trace[service][method][times]如果你不写method直接就是监听这个接口的任意一个方法times这个监听次数,你不写就是1次。我们来看下我的监听截图
接下来我们就看下这个TraceFilter,首先看下添加tracer与移除tracer方法
首先是往channel塞了trace.max与trace.count两个属性值,这个trace.max就是最大监听次数,trace.count就是计数器,记录调用了几次了。接着就是拼装key,如果method没有的话就使用type的全类名,如果有method话就是全类名.方法名,先从tracers这个map里面查找有没有对应的channels,没有找到就创建set集合,最后添加到这个集合中。
接下来我们就要看下invoke方法了
最后就是将执行结果result返回了。
好了,以上就是TraceFilter的全部的,还是很简单的。
引用dubbo官方文档的介绍:
我们可以了解到,这个token就是防止服务消费者绕过注册中心去访问服务提供者,同时能够使用注册中心进行权限控制。dubbo官网关于token的流程图,我们拿过来分析下:
我们这里主要解析令牌验证的验证阶段,可以看下TokenFilter源码:
我们在调用者端配置dubbo:refrence的时候有一个dubbo调优的参数actives。
我这里直接从官网拿过来了,我们可以看下描述这一栏,每服务消费者每服务每方法最大并发调用数这个可能不好理解,我画了张图大家可以理解下:
这个所谓的每服务消费者每服务每方法最大并发调用数其实是某个服务调用者调用同一个服务提供者(同一个服务提供者实例,也就是上图三个箭头指向的同一个ip+port下的实例)的同一个接口的同一个方法的(可以看到上图三个箭头都调用的com.xxx.UserSerivce.getUserName()方法)并发数(上图的actives=3表示在同一时刻最多3个调用,然后多出来的调用只能等待)
我们看下这个actives属性的配置使用:
@Activate(group=Constants.CONSUMER,value=Constants.ACTIVES_KEY)publicclassActiveLimitFilterimplementsFilter{可以看出来这个ActiveLimitFilter只能用于服务调用者端,而且需要配置actives属性才能激活。接着再看下invoke方法:
RpcStatuscount=RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName());对应的RpcStatus的getStatus方法源码:
我们再来看下第二部分,该部分上面说过主要是做并发数的控制:
如果超过了并发控制数,我们一直在这里循坏等待,知道不超过时才会调用服务提供者的方法。
第三部分,主要是正常响应和异常的处理逻辑。这里会将当前的激活数减1,并唤醒所有挂起线程。
首先介绍下Monitor,Monitor字面是监控的意思,在dubbo中可以通过Monitor实时监控(不能算是实时,你要是将上报周期缩小可以接近实时)到服务的调用情况(粒度为方法级别),包括请求成功数,请求失败数,吞吐量,当前并发数,响应时长等等。
而Monitor这些调用统计指标是MonitorFilter来采集的,MonitorFilter就相当于咱们的source。当我们配置了monitor,这个MonitorFilter就会在服务调用端或者服务提供端进行数据采集,然后上报给对应的monitor对象。xml配置示例:
@Activate(group={PROVIDER})publicclassMonitorFilterimplementsFilter,Filter.Listener{我们看到MonitorFilter适应服务提供者端,接下来看下它的成员变量:
自增1表示当前并发数+1,接着就是进行真正调用了,之后调用collect收集方法,该方法最后一个参数表示是否异常,正常调用的话就是false,出现异常的话就是true。最后调用getConcurrent方法获取计数器自减1,因为这时当前这次调用就结束了,并发数也就是减1。接下来我们看下collect这个收集方法。
这个collect方法看起来比较长,实则内容比较简单,我把它分为了两部分,上面主要是一些调用信息与统计指标的收集工作,下面就是拼装这些调用信息url,然后调用对应monitor的collect方法进行收集。
先看下上面收集调用信息与指标统计工作:通过start算出调用耗时elapsed,通过getConcurrent方法获取当前方法一个并发数concurrent,下面获取接口名,方法名我就不解释了,后面通过url获取monitorUrl,接着使用监控工厂获取monitorUrl对应的monitor对象,在往后就是根据端来获取远端key与远端地址,有个获取input与output参数值需要说下,这两个属性值其实是在序列化层(在DubboCountCodec中)设置进去的。
下面这部分其实就是一行monitor.collect(url),使用monitor对象进行收集,只不过这个url是在这现拼装成的。可以看出来设置比较常规的参数外还设置了成功或失败,调用耗时,input,output,并发数这些调用统计指标。
什么是泛化调用?我个人是这样理解的,服务调用者不需要引用服务接口就能够调用服务提供者,我们用过dubbo的都知道,在开发的时候服务双方需要引用相同的接口,这种其实依赖性还是比较强,但是有某些场景使用共同接口的开发方式并不方便,dubbo提供了泛化调用的功能,我们服务调用端不需要定义共同的情况下就可以调用服务提供者。
我们这里简单演示下泛化调用的使用,这里使用的spring的方式,如果想体验下api的方式可以访问dubbo官网,官网提供了spring与api的实例。
服务提供者定义接口,然后书写实现类实现接口,并把进行服务暴露。服务提供者端就按照正常的开发流程开发就行。接口:
publicinterfaceIUserInfoService{publicStringgetUserName(Integerid);}实现类:
@org.springframework.stereotype.Service@ServicepublicclassIUserInfoServiceImplimplementsIUserInfoService{@OverridepublicStringgetUserName(Integerid){return"0";}}服务调用者之前开发服务调用者,需要引用服务提供者共同的接口,咱们这里泛化调用就不需要了,dubbo给我们提供了一个泛化接口GenericService,我们在使用的时候直接注入就可以了。咱们这里先配置一下:
我们来使用postman测试下
这个要从服务调用端的服务引用说起,首先在ReferenceConfig的checkAndUpdateSubConfigs方法中,有这么一段代码:
//如果泛化参数为true,将interfaceClass设置成GenericService.classif(ProtocolUtils.isGeneric(generic)){interfaceClass=GenericService.class;}else{try{interfaceClass=Class.forName(interfaceName,true,Thread.currentThread().getContextClassLoader());}catch(ClassNotFoundExceptione){thrownewIllegalStateException(e.getMessage(),e);}//checkInterfaceAndMethods(interfaceClass,getMethods());}如果是泛化调用,就将interfaceClass设置成GenericService.class,这时候interfaceName属性还是咱们那个配置的那个接口,在上面案例中就是com.hsf.dubbo.provider.IUserInfoService,接着在一个收集信息的map添加interface=com.hsf.dubbo.provider.IUserInfoService键值对。
接着当注册中心变动的时候会通知RegistryDirectory进行toInvokers,在newDubboInvoker的构造中,有这么一段代码,
我们在看下super的处理:
我们在服务调用端invoke的时候会调用到一个GenericImplFilter,我们看下源码实现