dubbo源码解析-服务暴露与发现
概述
dubbo是一个简单易用的RPC框架,通过简单的提供者,消费者配置就能完成无感的网络调用。那么在dubbo中是如何将提供者的服务暴露出去,消费者又是如何获取到提供者相关信息的呢?这就是本章我们要讨论的内容。
dubbo与spring的整合
在了解dubbo的服务注册和服务发现之前,我们首先需要掌握一个知识点:Spring中自定义Schema。
Spring自定义Schema
Dubbo 现在的设计是完全无侵入,也就是使用者只依赖于配置契约。在 Dubbo 中,可以使用 XML 配置相关信息,也可以用来引入服务或者导出服务。配置完成,启动工程,Spring 会读取配置文件,生成注入相关Bean。那 Dubbo 如何实现自定义 XML 被 Spring 加载读取呢?
从 Spring 2.0 开始,Spring 开始提供了一种基于 XML Schema 格式扩展机制,用于定义和配置 bean。
入门案例
学习和使用Spring XML Schema 扩展机制并不难,需要下面几个步骤:
1. 创建配置属性的JavaBean对象
2. 创建一个 XML Schema 文件,描述自定义的合法构建模块,也就是xsd文件。
3. 自定义处理器类,并实现`NamespaceHandler`接口。
4. 自定义解析器,实现`BeanDefinitionParser`接口(最关键的部分)。
5. 编写Spring.handlers和Spring.schemas文件配置所有部件
定义JavaBean对象,在spring中此对象会根据配置自动创建
publicclassUser{privateStringid;privateStringname;privateIntegerage;//省略gettersetter方法}
在META-INF下定义`user.xsd`文件,使用xsd用于描述标签的规则
<?xmlversion="1.0"encoding="UTF-8"?><xsd:schemaxmlns="http://www.itheima.com/schema/user"xmlns:xsd="http://www.w3.org/2001/XMLSchema"xmlns:beans="http://www.springframework.org/schema/beans"targetNamespace="http://www.itheima.com/schema/user"elementFormDefault="qualified"attributeFormDefault="unqualified"><xsd:importnamespace="http://www.springframework.org/schema/beans"/><xsd:elementname="user"><xsd:complexType><xsd:complexContent><xsd:extensionbase="beans:identifiedType"><xsd:attributename="name"type="xsd:string"/><xsd:attributename="age"type="xsd:int"/></xsd:extension></xsd:complexContent></xsd:complexType></xsd:element></xsd:schema>
Spring读取xml文件时,会根据标签的命名空间找到其对应的NamespaceHandler,我们在NamespaceHandler内会注册标签对应的解析器BeanDefinitionParser。
packagecom.itheima.schema;importorg.springframework.beans.factory.xml.NamespaceHandlerSupport;publicclassUserNamespaceHandlerextendsNamespaceHandlerSupport{publicvoidinit(){registerBeanDefinitionParser("user",newUserBeanDefinitionParser());}}
BeanDefinitionParser是标签对应的解析器,Spring读取到对应标签时会使用该类进行解析;
publicclassUserBeanDefinitionParserextendsAbstractSingleBeanDefinitionParser{protectedClassgetBeanClass(Elementelement){returnUser.class;}protectedvoiddoParse(Elementelement,BeanDefinitionBuilderbean){Stringname=element.getAttribute("name");Stringage=element.getAttribute("age");Stringid=element.getAttribute("id");if(StringUtils.hasText(id)){bean.addPropertyValue("id",id);}if(StringUtils.hasText(name)){bean.addPropertyValue("name",name);}if(StringUtils.hasText(age)){bean.addPropertyValue("age",Integer.valueOf(age));}}}
定义spring.handlers文件,内部保存命名空间与NamespaceHandler类的对应关系;必须放在classpath下的META-INF文件夹中。
```proprties
http\://www.itheima.com/schema/user=com.itheima.schema.UserNamespaceHandler
```
定义spring.schemas文件,内部保存命名空间对应的xsd文件位置;必须放在classpath下的META-INF文件夹中。
http\://www.itheima.com/schema/user.xsd=META-INF/user.xsd
代码准备好了之后,就可以在spring工程中进行使用和测试,定义spring配置文件,导入对应约束。
<?xmlversion="1.0"encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:util="http://www.springframework.org/schema/util"xmlns:task="http://www.springframework.org/schema/task"xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx"xmlns:itheima="http://www.itheima.com/schema/user"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util.xsdhttp://www.springframework.org/schema/taskhttp://www.springframework.org/schema/task/spring-task.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx.xsdhttp://www.itheima.com/schema/userhttp://www.itheima.com/schema/user.xsd"><itheima:userid="user"name="zhangsan"age="12"></itheima:user></beans>
编写测试类,通过spring容器获取对象user
publicclassSchemaDemo{publicstaticvoidmain(String[]args){ApplicationContextctx=newClassPathXmlApplicationContext("/spring/applicationContext.xml");Useruser=(User)ctx.getBean("user");System.out.println(user);}}
dubbo中的相关对象
Dubbo是运行在spring容器中,dubbo的配置文件也是通过spring的配置文件applicationContext.xml来加载,所以dubbo的自定义配置标签实现,其实同样依赖spring的xml schema机制
可以看出Dubbo所有的组件都是由`DubboBeanDefinitionParser`解析,并通过registerBeanDefinitionParser方法来注册到spring中最后解析对应的对象。这些对象中我们重点关注的有以下两个:
ServiceBean:服务提供者暴露服务的核心对象
ReferenceBean:服务消费者发现服务的核心对象
RegistryConfig:定义注册中心的核心配置对象
服务暴露
前面主要探讨了 Dubbo 中 schema 、 XML 的相关原理 , 这些内容对理解框架整体至关重要 , 在此基础上我们继续探讨服务是如何依靠前面的配置进行服务暴露。
名词解释
在 Dubbo 的核心领域模型中:
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。
- Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
- export:暴露远程服务
- refer:引用远程服务
- proxyFactory:获取一个接口的代理类
- getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
- getProxy:针对client端,创建接口的代理对象,例如DemoService的接口。
- Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等
整体流程
在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务暴露原理
在整体上看,Dubbo 框架做服务暴露分为两大部分 , 第一步将持有的服务实例通过代理转换成 Invoker, 第二步会把 Invoker 通过具体的协议 ( 比如 Dubbo ) 转换成 Exporter, 框架做了这层抽象也大大方便了功能扩展 。
服务提供方暴露服务的蓝色初始化链,时序图如下:
源码分析
(1) 导出入口
服务导出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。方法代码如下:
publicvoidonApplicationEvent(ContextRefreshedEventevent){//是否有延迟导出&&是否已导出&&是不是已被取消导出if(isDelay()&&!isExported()&&!isUnexported()){//导出服务export();}}
onApplicationEvent 方法在经过一些判断后,会决定是否调用 export 方法导出服务。在export 根据配置执行相应的动作。最终进入到doExportUrls导出服务方法。
privatevoiddoExportUrls(){//加载注册中心链接List<URL>registryURLs=loadRegistries(true);//遍历protocols,并在每个协议下导出服务for(ProtocolConfigprotocolConfig:protocols){doExportUrlsFor1Protocol(protocolConfig,registryURLs);}}
关于多协议多注册中心导出服务首先是根据配置,以及其他一些信息组装 URL。前面说过,URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。
privatevoiddoExportUrlsFor1Protocol(ProtocolConfigprotocolConfig,List<URL>registryURLs){Stringname=protocolConfig.getName();//如果协议名为空,或空串,则将协议名变量设置为dubboif(name==null||name.length()==0){name="dubbo";}Map<String,String>map=newHashMap<String,String>();//略//获取上下文路径StringcontextPath=protocolConfig.getContextpath();if((contextPath==null||contextPath.length()==0)&&provider!=null){contextPath=provider.getContextpath();}//获取host和portStringhost=this.findConfigedHosts(protocolConfig,registryURLs,map);Integerport=this.findConfigedPorts(protocolConfig,name,map);//组装URLURLurl=newURL(name,host,port,(contextPath==null||contextPath.length()==0?"":contextPath+"/")+path,map);//省略无关代码}
上面的代码首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。前置工作做完,接下来就可以进行服务导出了。服务导出分为导出到本地 (JVM),和导出到远程。在深入分析服务导出的源码前,我们先来从宏观层面上看一下服务导出逻辑。如下:
privatevoiddoExportUrlsFor1Protocol(ProtocolConfigprotocolConfig,List<URL>registryURLs){//省略无关代码Stringscope=url.getParameter(Constants.SCOPE_KEY);//如果scope=none,则什么都不做if(!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)){//scope!=remote,导出到本地if(!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)){exportLocal(url);}//scope!=local,导出到远程if(!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)){if(registryURLs!=null&&!registryURLs.isEmpty()){for(URLregistryURL:registryURLs){//省略无关代码//为服务提供类(ref)生成InvokerInvoker<?>invoker=proxyFactory.getInvoker(ref,(Class)interfaceClass,registryURL.addParameterAndEncoded(Constants.EXPORT_KEY,url.toFullString()));//DelegateProviderMetaDataInvoker用于持有Invoker和ServiceConfigDelegateProviderMetaDataInvokerwrapperInvoker=newDelegateProviderMetaDataInvoker(invoker,this);//导出服务,并生成ExporterExporter<?>exporter=protocol.export(wrapperInvoker);exporters.add(exporter);}//不存在注册中心,仅导出服务}else{//略}}}this.urls.add(url);}
上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:
- scope = none,不导出服务
- scope != remote,导出到本地
- scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。如下:
public<T>Invoker<T>getInvoker(Tproxy,Class<T>type,URLurl){//为目标类创建WrapperfinalWrapperwrapper=Wrapper.getWrapper(proxy.getClass().getName().indexOf('$')<0?proxy.getClass():type);//创建匿名Invoker类对象,并实现doInvoke方法。returnnewAbstractProxyInvoker<T>(proxy,type,url){@OverrideprotectedObjectdoInvoke(Tproxy,StringmethodName,Class<?>[]parameterTypes,Object[]arguments)throwsThrowable{//调用Wrapper的invokeMethod方法,invokeMethod最终会调用目标方法returnwrapper.invokeMethod(proxy,methodName,parameterTypes,arguments);}};}
如上,JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。
(2) 导出服务到本地
Invoke创建成功之后,接下来我们来看本地导出
privatevoidexportLocal(URLurl){//如果URL的协议头等于injvm,说明已经导出到本地了,无需再次导出if(!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())){URLlocal=URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL)//设置协议头为injvm.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));//创建Invoker,并导出服务,这里的protocol会在运行时调用InjvmProtocol的export方法Exporter<?>exporter=protocol.export(proxyFactory.getInvoker(ref,(Class)interfaceClass,local));exporters.add(exporter);}}
exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面我们来看一下 InjvmProtocol 的 export 方法都做了哪些事情。
public<T>Exporter<T>export(Invoker<T>invoker)throwsRpcException{//创建InjvmExporterreturnnewInjvmExporter<T>(invoker,invoker.getUrl().getServiceKey(),exporterMap);}
如上,InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了。
(3) 导出服务到远程
接下来,我们继续分析导出服务到远程的过程。导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑。我们把目光移动到 RegistryProtocol 的 export 方法上。
public<T>Exporter<T>export(finalInvoker<T>originInvoker)throwsRpcException{//导出服务finalExporterChangeableWrapper<T>exporter=doLocalExport(originInvoker);//获取注册中心URLURLregistryUrl=getRegistryUrl(originInvoker);//根据URL加载Registry实现类,比如ZookeeperRegistryfinalRegistryregistry=getRegistry(originInvoker);//获取已注册的服务提供者URL,比如:finalURLregisteredProviderUrl=getRegisteredProviderUrl(originInvoker);//获取register参数booleanregister=registeredProviderUrl.getParameter("register",true);//向服务提供者与消费者注册表中注册服务提供者ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl,registeredProviderUrl);//根据register的值决定是否注册服务if(register){//向注册中心注册服务register(registryUrl,registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}//获取订阅URL,比如:finalURLoverrideSubscribeUrl=getSubscribedOverrideUrl(registeredProviderUrl);//创建监听器finalOverrideListeneroverrideSubscribeListener=newOverrideListener(overrideSubscribeUrl,originInvoker);overrideListeners.put(overrideSubscribeUrl,overrideSubscribeListener);//向注册中心进行订阅override数据registry.subscribe(overrideSubscribeUrl,overrideSubscribeListener);//创建并返回DestroyableExporterreturnnewDestroyableExporter<T>(exporter,originInvoker,overrideSubscribeUrl,registeredProviderUrl);}
上面代码看起来比较复杂,主要做如下一些操作:
1. 调用 doLocalExport 导出服务
2. 向注册中心注册服务
3. 向注册中心进行订阅 override 数据
4. 创建并返回 DestroyableExporter
下面先来分析 doLocalExport 方法的逻辑,如下:
private<T>ExporterChangeableWrapper<T>doLocalExport(finalInvoker<T>originInvoker){Stringkey=getCacheKey(originInvoker);//访问缓存ExporterChangeableWrapper<T>exporter=(ExporterChangeableWrapper<T>)bounds.get(key);if(exporter==null){synchronized(bounds){exporter=(ExporterChangeableWrapper<T>)bounds.get(key);if(exporter==null){//创建Invoker为委托类对象finalInvoker<?>invokerDelegete=newInvokerDelegete<T>(originInvoker,getProviderUrl(originInvoker));//调用protocol的export方法导出服务exporter=newExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete),originInvoker);//写缓存bounds.put(key,exporter);}}}returnexporter;}
接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。
public<T>Exporter<T>export(Invoker<T>invoker)throwsRpcException{URLurl=invoker.getUrl();//获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如://demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880Stringkey=serviceKey(url);//创建DubboExporterDubboExporter<T>exporter=newDubboExporter<T>(invoker,key,exporterMap);//将<key,exporter>键值对放入缓存中exporterMap.put(key,exporter);//省略无关代码//启动服务器openServer(url);//优化序列化optimizeSerialization(url);returnexporter;}
(4) 开启Netty服务
如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。下面分析 openServer 方法。
privatevoidopenServer(URLurl){//获取host:port,并将其作为服务器实例的key,用于标识当前的服务器实例Stringkey=url.getAddress();booleanisServer=url.getParameter(Constants.IS_SERVER_KEY,true);if(isServer){//访问缓存ExchangeServerserver=serverMap.get(key);if(server==null){//创建服务器实例serverMap.put(key,createServer(url));}else{//服务器已创建,则根据url中的配置重置服务器server.reset(url);}}}
接下来分析服务器实例的创建过程。如下:
privateExchangeServercreateServer(URLurl){url=url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,//添加心跳检测配置到url中url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));//获取server参数,默认为nettyStringstr=url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER);//通过SPI检测是否存在server参数所代表的Transporter拓展,不存在则抛出异常if(str!=null&&str.length()>0&&!ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))thrownewRpcException("Unsupportedservertype:"+str+",url:"+url);//添加编码解码器参数url=url.addParameter(Constants.CODEC_KEY,DubboCodec.NAME);ExchangeServerserver;try{//创建ExchangeServerserver=Exchangers.bind(url,requestHandler);}catch(RemotingExceptione){thrownewRpcException("Failtostartserver...");}//获取client参数,可指定netty,minastr=url.getParameter(Constants.CLIENT_KEY);if(str!=null&&str.length()>0){//获取所有的Transporter实现类名称集合,比如supportedTypes=[netty,mina]Set<String>supportedTypes=ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();//检测当前Dubbo所支持的Transporter实现类名称列表中,//是否包含client所表示的Transporter,若不包含,则抛出异常if(!supportedTypes.contains(str)){thrownewRpcException("Unsupportedclienttype...");}}returnserver;}
如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。
publicstaticExchangeServerbind(URLurl,ExchangeHandlerhandler)throwsRemotingException{if(url==null){thrownewIllegalArgumentException("url==null");}if(handler==null){thrownewIllegalArgumentException("handler==null");}url=url.addParameterIfAbsent(Constants.CODEC_KEY,"exchange");//获取Exchanger,默认为HeaderExchanger。//紧接着调用HeaderExchanger的bind方法创建ExchangeServer实例returngetExchanger(url).bind(url,handler);}
上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法。
publicExchangeServerbind(URLurl,ExchangeHandlerhandler)throwsRemotingException{//创建HeaderExchangeServer实例,该方法包含了多个逻辑,分别如下://1.newHeaderExchangeHandler(handler)//2.newDecodeHandler(newHeaderExchangeHandler(handler))//3.Transporters.bind(url,newDecodeHandler(newHeaderExchangeHandler(handler)))returnnewHeaderExchangeServer(Transporters.bind(url,newDecodeHandler(newHeaderExchangeHandler(handler))));}
HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下:
publicstaticServerbind(URLurl,ChannelHandler...handlers)throwsRemotingException{if(url==null){thrownewIllegalArgumentException("url==null");}if(handlers==null||handlers.length==0){thrownewIllegalArgumentException("handlers==null");}ChannelHandlerhandler;if(handlers.length==1){handler=handlers[0];}else{//如果handlers元素数量大于1,则创建ChannelHandler分发器handler=newChannelHandlerDispatcher(handlers);}//获取自适应Transporter实例,并调用实例方法returngetTransporter().bind(url,handler);}
如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。调用`NettyTransporter.bind(URL, ChannelHandler)`方法。创建一个`NettyServer`实例。调用`NettyServer.doOPen()`方法,服务器被开启,服务也被暴露出来了。
(5) 服务注册
本节内容以 Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册的入口方法开始分析,我们把目光再次移到 RegistryProtocol 的 export 方法上。如下:
public<T>Exporter<T>export(finalInvoker<T>originInvoker)throwsRpcException{//${导出服务}//省略其他代码booleanregister=registeredProviderUrl.getParameter("register",true);if(register){//注册服务register(registryUrl,registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}finalURLoverrideSubscribeUrl=getSubscribedOverrideUrl(registeredProviderUrl);finalOverrideListeneroverrideSubscribeListener=newOverrideListener(overrideSubscribeUrl,originInvoker);overrideListeners.put(overrideSubscribeUrl,overrideSubscribeListener);//订阅override数据registry.subscribe(overrideSubscribeUrl,overrideSubscribeListener);//省略部分代码}
RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,相关代码如下:
publicvoidregister(URLregistryUrl,URLregistedProviderUrl){//获取RegistryRegistryregistry=registryFactory.getRegistry(registryUrl);//注册服务registry.register(registedProviderUrl);}
register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。
这里以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry 方法的源码,这个方法由 AbstractRegistryFactory 实现。如下:
publicRegistrygetRegistry(URLurl){url=url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY,RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY,Constants.REFER_KEY);Stringkey=url.toServiceString();LOCK.lock();try{//访问缓存Registryregistry=REGISTRIES.get(key);if(registry!=null){returnregistry;}//缓存未命中,创建Registry实例registry=createRegistry(url);if(registry==null){thrownewIllegalStateException("Cannotcreateregistry...");}//写入缓存REGISTRIES.put(key,registry);returnregistry;}finally{LOCK.unlock();}}protectedabstractRegistrycreateRegistry(URLurl);
如上,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry。在此方法中就是通过`new ZookeeperRegistry(url, zookeeperTransporter)`实例化一个注册中心
publicZookeeperRegistry(URLurl,ZookeeperTransporterzookeeperTransporter){super(url);if(url.isAnyHost()){thrownewIllegalStateException("registryaddress==null");}//获取组名,默认为dubboStringgroup=url.getParameter(Constants.GROUP_KEY,DEFAULT_ROOT);if(!group.startsWith(Constants.PATH_SEPARATOR)){//group="/"+groupgroup=Constants.PATH_SEPARATOR+group;}this.root=group;//创建Zookeeper客户端,默认为CuratorZookeeperTransporterzkClient=zookeeperTransporter.connect(url);//添加状态监听器zkClient.addStateListener(newStateListener(){@OverridepublicvoidstateChanged(intstate){if(state==RECONNECTED){try{recover();}catch(Exceptione){logger.error(e.getMessage(),e);}}}});}
在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。
publicZookeeperClientconnect(URLurl){//创建CuratorZookeeperClientreturnnewCuratorZookeeperClient(url);}
继续向下看。
publicclassCuratorZookeeperClientextendsAbstractZookeeperClient<CuratorWatcher>{privatefinalCuratorFrameworkclient;publicCuratorZookeeperClient(URLurl){super(url);try{//创建CuratorFramework构造器CuratorFrameworkFactory.Builderbuilder=CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(newRetryNTimes(1,1000)).connectionTimeoutMs(5000);Stringauthority=url.getAuthority();if(authority!=null&&authority.length()>0){builder=builder.authorization("digest",authority.getBytes());}//构建CuratorFramework实例client=builder.build();//省略无关代码//启动客户端client.start();}catch(Exceptione){thrownewIllegalStateException(e.getMessage(),e);}}}
CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。至此Zookeeper客户端就已经启动了
下面我们将 Dubbo 的 demo 跑起来,然后通过 Zookeeper 可视化客户端 [ZooInspector](https://github.com/apache/zookeeper/tree/b79af153d0f98a4f3f3516910ed47234d7b3d74e/src/contrib/zooinspector) 查看节点数据。如下:

从上图中可以看到DemoService 这个服务对应的配置信息最终被注册到了zookeeper节点下。搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。
protectedvoiddoRegister(URLurl){try{//通过Zookeeper客户端创建节点,节点路径由toUrlPath方法生成,路径格式如下:///${group}/${serviceInterface}/providers/${url}//比如///dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......zkClient.create(toUrlPath(url),url.getParameter(Constants.DYNAMIC_KEY,true));}catch(Throwablee){thrownewRpcException("Failedtoregister...");}}
如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:
publicvoidcreate(Stringpath,booleanephemeral){if(!ephemeral){//如果要创建的节点类型非临时节点,那么这里要检测节点是否存在if(checkExists(path)){return;}}inti=path.lastIndexOf('/');if(i>0){//递归创建上一级路径create(path.substring(0,i),false);}//根据ephemeral的值创建临时或持久节点if(ephemeral){createEphemeral(path);}else{createPersistent(path);}}
好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。
总结
1. 在有注册中心,需要注册提供者地址的情况下,ServiceConfig 解析出的 URL 格式为:`registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服务名}/{版本号}")`
2. 基于 Dubbo SPI 的自适应机制,通过 URL `registry://` 协议头识别,就调用 RegistryProtocol#export() 方法
1. 将具体的服务类名,比如 `DubboServiceRegistryImpl`,通过 ProxyFactory 包装成 Invoker 实例
2. 调用 doLocalExport 方法,使用 DubboProtocol 将 Invoker 转化为 Exporter 实例,并打开 Netty 服务端监听客户请求
3. 创建 Registry 实例,连接 Zookeeper,并在服务节点下写入提供者的 URL 地址,注册服务
4. 向注册中心订阅 override 数据,并返回一个 Exporter 实例
3. 根据 URL 格式中的 `"dubbo://service-host/{服务名}/{版本号}"`中协议头 `dubbo://` 识别,调用 DubboProtocol#export() 方法,开发服务端口
4. RegistryProtocol#export() 返回的 Exporter 实例存放到 ServiceConfig 的 `List exporters` 中
服务发现
在学习了服务暴露原理之后 , 接下来重点探讨服务是如何消费的 。 这里主要讲解如何通过注册中心进行服务发现进行远程服务调用等细节 。
服务发现流程
在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务消费原理
在整体上看 , Dubbo 框架做服务消费也分为两大部分 , 第一步通过持有远程服务实例生成Invoker, 这个 Invoker 在客户端是核心的远程代理对象 。 第二步会把 Invoker 通过动态代理转换成实现用户接口的动态代理引用 。
服务消费方引用服务的蓝色初始化链,时序图如下:
源码分析
(1) 引用入口
服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法。
publicObjectgetObject()throwsException{returnget();}publicsynchronizedTget(){//检测ref是否为空,为空则通过init方法创建if(ref==null){//init方法主要用于处理配置,以及调用createProxy生成代理类init();}returnref;}
Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。
privatevoidinit(){//创建代理类ref=createProxy(map);
此方法代码很长,主要完成的配置加载,检查,以及创建引用的代理对象。这里要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。
privateTcreateProxy(Map<String,String>map){URLtmpUrl=newURL("temp","localhost",0,map);...........isDvmRefer=InjvmProtocol.getlnjvmProtocol().islnjvmRefer(tmpUrl)//本地引用略if(isJvmRefer){}else{//点对点调用略if(url!=null&&url.length()>0){}else{//加载注册中心urlList<URL>us=loadRegistries(false);if(us!=null&&!us.isEmpty()){for(URLu:us){URLmonitorUrl=loadMonitor(u);if(monitorUrl!=null){map.put(Constants.MONITOR_KEY,URL.encode(monitorUrl.toFullString()));}//添加refer参数到url中,并将url添加到urls中urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map)));}}}//单个注册中心或服务提供者(服务直连,下同)if(urls.size()==1){//调用RegistryProtocol的refer构建Invoker实例invoker=refprotocol.refer(interfaceClass,urls.get(0));//多个注册中心或多个服务提供者,或者两者混合}else{List<Invoker<?>>invokers=newArrayList<Invoker<?>>();URLregistryURL=null;//获取所有的Invokerfor(URLurl:urls){//通过refprotocol调用refer构建Invoker,refprotocol会在运行时//根据url协议头加载指定的Protocol实例,并调用实例的refer方法invokers.add(refprotocol.refer(interfaceClass,url));if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())){registryURL=url;}}if(registryURL!=null){//如果注册中心链接不为空,则将使用AvailableClusterURLu=registryURL.addParameter(Constants.CLUSTER_KEY,AvailableCluster.NAME);//创建StaticDirectory实例,并由Cluster对多个Invoker进行合并invoker=cluster.join(newStaticDirectory(u,invokers));}else{invoker=cluster.join(newStaticDirectory(invokers));}}}//省略无关代码//生成代理类return(T)proxyFactory.getProxy(invoker);}
上面代码很多,不过逻辑比较清晰。
1、如果是本地调用,直接jvm 协议从内存中获取实例
2、如果只有一个注册中心,直接通过 Protocol 自适应拓展类构建 Invoker 实例接口
3、如果有多个注册中心,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。
(2) 创建客户端
在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,这里分析DubboProtocol。
public<T>Invoker<T>refer(Class<T>serviceType,URLurl)throwsRpcException{optimizeSerialization(url);//创建DubboInvokerDubboInvoker<T>invoker=newDubboInvoker<T>(serviceType,url,getClients(url),invokers);invokers.add(invoker);returninvoker;}
上面方法看起来比较简单,创建一个DubboInvoker。通过构造方法传入远程调用的client对象。默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。
privateExchangeClient[]getClients(URLurl){//是否共享连接booleanservice_share_connect=false;//获取连接数,默认为0,表示未配置intconnections=url.getParameter(Constants.CONNECTIONS_KEY,0);//如果未配置connections,则共享连接if(connections==0){service_share_connect=true;connections=1;}ExchangeClient[]clients=newExchangeClient[connections];for(inti=0;i<clients.length;i++){if(service_share_connect){//获取共享客户端clients[i]=getSharedClient(url);}else{//初始化新的客户端clients[i]=initClient(url);}}returnclients;}
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这个方法。
privateExchangeClientinitClient(URLurl){//获取客户端类型,默认为nettyStringstr=url.getParameter(Constants.CLIENT_KEY,url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_CLIENT));//省略无关代码ExchangeClientclient;try{//获取lazy配置,并根据配置值决定创建的客户端类型if(url.getParameter(Constants.LAZY_CONNECT_KEY,false)){//创建懒加载ExchangeClient实例client=newLazyConnectExchangeClient(url,requestHandler);}else{//创建普通ExchangeClient实例client=Exchangers.connect(url,requestHandler);}}catch(RemotingExceptione){thrownewRpcException("Failtocreateremotingclientforservice...");}returnclient;}
initClient 方法首先获取用户配置的客户端类型,默认为 netty。下面我们分析一下 Exchangers 的 connect 方法。
publicstaticExchangeClientconnect(URLurl,ExchangeHandlerhandler)throwsRemotingException{//获取Exchanger实例,默认为HeaderExchangeClientreturngetExchanger(url).connect(url,handler);}
如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。
publicExchangeClientconnect(URLurl,ExchangeHandlerhandler)throwsRemotingException{//这里包含了多个调用,分别如下://1.创建HeaderExchangeHandler对象//2.创建DecodeHandler对象//3.通过Transporters构建Client实例//4.创建HeaderExchangeClient对象returnnewHeaderExchangeClient(Transporters.connect(url,newDecodeHandler(newHeaderExchangeHandler(handler))),true);}
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
publicstaticClientconnect(URLurl,ChannelHandler...handlers)throwsRemotingException{if(url==null){thrownewIllegalArgumentException("url==null");}ChannelHandlerhandler;if(handlers==null||handlers.length==0){handler=newChannelHandlerAdapter();}elseif(handlers.length==1){handler=handlers[0];}else{//如果handler数量大于1,则创建一个ChannelHandler分发器handler=newChannelHandlerDispatcher(handlers);}//获取Transporter自适应拓展类,并调用connect方法生成Client实例returngetTransporter().connect(url,handler);}
如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:
publicClientconnect(URLurl,ChannelHandlerlistener)throwsRemotingException{//创建NettyClient对象returnnewNettyClient(url,listener);}
(3) 注册
这里就已经创建好了NettyClient对象。关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。
public<T>Invoker<T>refer(Class<T>type,URLurl)throwsRpcException{//取registry参数值,并将其设置为协议头url=url.setProtocol(url.getParameter(Constants.REGISTRY_KEY,Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);//获取注册中心实例Registryregistry=registryFactory.getRegistry(url);if(RegistryService.class.equals(type)){returnproxyFactory.getInvoker((T)registry,type,url);}//将url查询字符串转为MapMap<String,String>qs=StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));//获取group配置Stringgroup=qs.get(Constants.GROUP_KEY);if(group!=null&&group.length()>0){if((Constants.COMMA_SPLIT_PATTERN.split(group)).length>1||"*".equals(group)){//通过SPI加载MergeableCluster实例,并调用doRefer继续执行服务引用逻辑returndoRefer(getMergeableCluster(),registry,type,url);}}//调用doRefer继续执行服务引用逻辑returndoRefer(cluster,registry,type,url);}
上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:
private<T>Invoker<T>doRefer(Clustercluster,Registryregistry,Class<T>type,URLurl){//创建RegistryDirectory实例RegistryDirectory<T>directory=newRegistryDirectory<T>(type,url);//设置注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);Map<String,String>parameters=newHashMap<String,String>(directory.getUrl().getParameters());//生成服务消费者链接URLsubscribeUrl=newURL(Constants.CONSUMER_PROTOCOL,parameters.remove(Constants.REGISTER_IP_KEY),0,type.getName(),parameters);//注册服务消费者,在consumers目录下新节点if(!Constants.ANY_VALUE.equals(url.getServiceInterface())&&url.getParameter(Constants.REGISTER_KEY,true)){registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY,Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY,String.valueOf(false)));}//订阅providers、configurators、routers等节点数据directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+","+Constants.CONFIGURATORS_CATEGORY+","+Constants.ROUTERS_CATEGORY));//一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个Invokerinvoker=cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker,url,subscribeUrl,directory);returninvoker;}
如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
(4)创建代理对象
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。
public<T>TgetProxy(Invoker<T>invoker)throwsRpcException{//调用重载方法returngetProxy(invoker,false);}public<T>TgetProxy(Invoker<T>invoker,booleangeneric)throwsRpcException{Class<?>[]interfaces=null;//获取接口列表Stringconfig=invoker.getUrl().getParameter("interfaces");if(config!=null&&config.length()>0){//切分接口列表String[]types=Constants.COMMA_SPLIT_PATTERN.split(config);if(types!=null&&types.length>0){interfaces=newClass<?>[types.length+2];//设置服务接口类和EchoService.class到interfaces中interfaces[0]=invoker.getInterface();interfaces[1]=EchoService.class;for(inti=0;i<types.length;i++){//加载接口类interfaces[i+1]=ReflectUtils.forName(types[i]);}}}if(interfaces==null){interfaces=newClass<?>[]{invoker.getInterface(),EchoService.class};}//为http和hessian协议提供泛化调用支持,参考pullrequest#1827if(!invoker.getInterface().equals(GenericService.class)&&generic){intlen=interfaces.length;Class<?>[]temp=interfaces;//创建新的interfaces数组interfaces=newClass<?>[len+1];System.arraycopy(temp,0,interfaces,0,len);//设置GenericService.class到数组中interfaces[len]=GenericService.class;}//调用重载方法returngetProxy(invoker,interfaces);}publicabstract<T>TgetProxy(Invoker<T>invoker,Class<?>[]types);
如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
public<T>TgetProxy(Invoker<T>invoker,Class<?>[]interfaces){//生成Proxy子类(Proxy是抽象类)。并调用Proxy子类的newInstance方法创建Proxy实例return(T)Proxy.getProxy(interfaces).newInstance(newInvokerInvocationHandler(invoker));}
上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。下面以 org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。
packageorg.apache.dubbo.common.bytecode;publicclassproxy0implementsorg.apache.dubbo.demo.DemoService{publicstaticjava.lang.reflect.Method[]methods;privatejava.lang.reflect.InvocationHandlerhandler;publicproxy0(){}publicproxy0(java.lang.reflect.InvocationHandlerarg0){handler=$1;}publicjava.lang.StringsayHello(java.lang.Stringarg0){Object[]args=newObject[1];args[0]=($w)$1;Objectret=handler.invoke(this,methods[0],args);return(java.lang.String)ret;}}
好了,到这里代理类生成逻辑就分析完了。整个过程比较复杂,大家需要耐心看一下。
总结
1. 从注册中心发现引用服务:在有注册中心,通过注册中心发现提供者地址的情况下,ReferenceConfig 解析出的 URL 格式为:`registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")`。
2. 通过 URL 的registry://协议头识别,就会调用RegistryProtocol#refer()方法
(1). 查询提供者 URL,如 `dubbo://service-host/com.foo.FooService?version=1.0.0` ,来获取注册中心
(2). 创建一个 RegistryDirectory 实例并设置注册中心和协议
(3). 生成 conusmer 连接,在 consumer 目录下创建节点,向注册中心注册
(4). 注册完毕后,订阅 providers,configurators,routers 等节点的数据
(5). 通过 URL 的 `dubbo://` 协议头识别,调用 `DubboProtocol#refer()` 方法,创建一个 ExchangeClient 客户端并返回 DubboInvoker 实例
3. 由于一个服务可能会部署在多台服务器上,这样就会在 providers 产生多个节点,这样也就会得到多个 DubboInvoker 实例,就需要 RegistryProtocol 调用 Cluster 将多个服务提供者节点伪装成一个节点,并返回一个 Invoker
4. Invoker 创建完毕后,调用 ProxyFactory 为服务接口生成代理对象,返回提供者引用
版权声明
本文仅代表作者观点,不代表博信信息网立场。