学堂 学堂 学堂公众号手机端

dubbo源码解析-高可用集群

lewis 1年前 (2024-04-12) 阅读数 12 #技术

  服务集群的概述

  概述

  为了避免单点故障,现在的应用通常至少会部署在两台服务器上,这样就组成了集群。集群就是单机的多实例,在多个服务器上部署多个服务,每个服务就是一个节点,部署N个节点,处理业务的能力就提升 N倍(大约),这些节点的集合就叫做集群。


  管理控制台

  目前的管理控制台已经发布0.1版本,结构上采取了前后端分离的方式,前端使用Vue和Vuetify分别作为Javascript框架和UI框架,后端采用Spring Boot框架。既可以按照标准的Maven方式进行打包,部署,也可以采用前后端分离的部署方式,方便开发,功能上,目前具备了服务查询,服务治理(包括Dubbo2.7中新增的治理规则)以及服务测试三部分内容。

  Maven方式部署

  - 安装

gitclonehttps://github.com/apache/dubbo-admin.gitcddubbo-adminmvncleanpackagecddubbo-admin-distribution/targetjava-jardubbo-admin-0.1.jar

  - 访问

  [http://localhost:8080](http://localhost:8080/)

  前后端分离部署

  - 前端

cddubbo-admin-uinpminstallnpmrundev

  - 后端

cddubbo-admin-servermvncleanpackagecdtargetjava-jardubbo-admin-server-0.1.jar

  - 访问

  [http://localhost:8081](http://localhost:8081/)

  - 前后端分离模式下,前端的修改可以实时生效

  环境搭建

  略

  集群调用存在的问题

  负载均衡

  集群容错

  服务治理

  集群的调用过程

  调用过程

  在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

  

  集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

  组件介绍

  Directory:它代表多个Invoker,从methodInvokerMap提取,但是他的值是动态,例如注册中心的变更。

  Router:负责从多个Invoker中按路由规则选出子集,例如应用隔离或读写分离或灰度发布等等

  Cluster:将Directory中的多个Invoker伪装成一个Invoker,来容错,调用失败重试。

  LoadBalance:从多个Invoker选取一个做本次调用,具体包含很多种负载均衡算法。

  Invoker:Provider中的一个可调用接口。例如DemoService

   集群容错

  在分布式系统中,集群某个某些节点出现问题是大概率事件,因此在设计分布式RPC框架的过程中,必须要把失败作为设计的一等公民来对待。一次调用失败之后,应该如何选择对失败的选择策略,这是一个见仁见智的问题,每种策略可能都有自己独特的应用场景。因此,作为框架来说,应当针对不同场景提供多种策略,供用户进行选择。

  在Dubbo设计中,通过Cluster这个接口的抽象,把一组可供调用的Provider信息组合成为一个统一的`Invoker`供调用方进行调用。经过路由规则过滤,负载均衡选址后,选中一个具体地址进行调用,如果调用失败,则会按照集群配置的容错策略进行容错处理。

  内置集群容错方式

  Dubbo默认内置了若干容错策略,如果不能满足用户需求,则可以通过自定义容错策略进行配置

  Dubbo主要内置了如下几种策略:

  - Failover(失败自动切换)

  - Failsafe(失败安全)

  - Failfast(快速失败)

  - Failback(失败自动恢复)

  - Forking(并行调用)

  - Broadcast(广播调用)

  这些名称比较相似,概念也比较容易混淆,下面逐一进行解释。

  Failover(失败自动切换)

  `Failover`是高可用系统中的一个常用概念,服务器通常拥有主备两套机器配置,如果主服务器出现故障,则自动切换到备服务器中,从而保证了整体的高可用性。

  Dubbo也借鉴了这个思想,并且把它作为Dubbo`默认的容错策略`。当调用出现失败的时候,根据配置的重试次数,会自动从其他可用地址中重新选择一个可用的地址进行调用,直到调用成功,或者是达到重试的上限位置。

  Dubbo里默认配置的重试次数是2,也就是说,算上第一次调用,最多会调用3次。

  其配置方法,容错策略既可以在服务提供方配置,也可以服务调用方进行配置。而重试次数的配置则更为灵活,既可以在服务级别进行配置,也可以在方法级别进行配置。具体优先顺序为:

  ```

  服务调用方方法级配置 > 服务调用方服务级配置 > 服务提供方方法级配置 > 服务提供方服务级配置

  ```

  以XML方式为例,具体配置方法如下:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="failover"retries="2"/>

  服务提供方,方法级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="failover"><dubbo:methodname="sayHello"retries="2"/></dubbo:reference>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="failover"retries="1"/>

  服务调用方,方法级配置:

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="failover"><dubbo:methodname="sayHello"retries="3"/></dubbo:reference>

  Failover可以自动对失败进行重试,对调用者封禁了失败的细节,但是Failover策略也会带来一些副作用:

  - 重试会额外增加一下开销,例如增加资源的使用,在高负载系统下,额外的重试可能让系统雪上加霜。

  - 重试会增加调用的响应时间。

  - 某些情况下,重试甚至会造成资源的浪费。考虑一个调用场景,A->B->C,如果A处设置了超时100ms,再B->C的第一次调用完成时已经超过了100ms,但很不幸B->C失败,这时候会进行重试,但其实这时候重试已经没有意义,因此在A看来这次调用已经超时,A可能已经开始执行其他逻辑。

  Failsafe(失败安全)

  失败安全策略的核心是即使失败了也不会影响整个调用流程。通常情况下用于旁路系统或流程中,它的失败不影响核心业务的正确性。在实现上,当出现调用失败时,会忽略此错误,并记录一条日志,同时返回一个空结果,在上游看来调用是成功的。

  应用场景,可以用于写入审计日志等操作。

  具体配置方法:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="failsafe"/>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="failsafe"/>

  其中服务调用方配置优先于服务提供方配置。

  Failfast(快速失败)

  某些业务场景中,某些操作可能是非幂等的,如果重复发起调用,可能会导致出现脏数据等。例如调用某个服务,其中包含一个数据库的写操作,如果写操作完成,但是在发送结果给调用方的过程中出错了,那么在调用发看来这次调用失败了,但其实数据写入已经完成。这种情况下,重试可能并不是一个好策略,这时候就需要使用到`Failfast`策略,调用失败立即报错。让调用方来决定下一步的操作并保证业务的幂等性。

  具体配置方法:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="failfast"/>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="failfast"/>

  其中服务调用方配置优先于服务提供方配置。

  Failback(失败自动恢复)

  `Failback`通常和`Failover`两个概念联系在一起。在高可用系统中,当主机发生故障,通过`Failover`进行主备切换后,待故障恢复后,系统应该具备自动恢复原始配置的能力。

  Dubbo中的`Failback`策略中,如果调用失败,则此次失败相当于`Failsafe`,将返回一个空结果。而与`Failsafe`不同的是,Failback策略会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,即使重试调用成功,原来的调用方也感知不到了。因此它通常适合于,对于实时性要求不高,且不需要返回值的一些异步操作。

  具体配置方法:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="failsafe"/>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="failsafe"/>

  其中服务调用方配置优先于服务提供方配置。

  按照目前的实现,Failback策略还有一些局限,例如内存中的失败调用列表没有上限,可能导致堆积,异步重试的执行间隔无法调整,默认是5秒。

  Forking(并行调用)

  上述几种策略中,主要都是针对调用失败发生后如何进行弥补的角度去考虑的,而`Forking`策略则跟上述几种策略不同,是一种典型的用成本换时间的思路。即第一次调用的时候就同时发起多个调用,只要其中一个调用成功,就认为成功。在资源充足,且对于失败的容忍度较低的场景下,可以采用此策略。

  具体配置方法:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="forking"/>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="forking"/>

  其中服务调用方配置优先于服务提供方配置。

  Broadcast(广播调用)

  在某些场景下,可能需要对服务的所有提供者进行操作,此时可以使用广播调用策略。此策略会逐个调用所有提供者,只要任意有一个提供者出错,则认为此次调用出错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

  具体配置方法:

  服务提供方,服务级配置

<dubbo:serviceinterface="org.apache.dubbo.demo.DemoService"ref="demoService"cluster="broadcast"/>

  服务调用方,服务级配置

<dubbo:referenceid="demoService"interface="org.apache.dubbo.demo.DemoService"cluster="broadcast"/>

  其中服务调用方配置优先于服务提供方配置。

  集群容错调优

  下表对各种策略做一个简单对比,

  | 策略名称 | 优点 | 缺点 |

  | --------- | -------------------------------- | -------------------------------------- |

  | Failover | 对调用者封禁调用失败的信息 | 增加RT,额外资源开销,资源浪费 |

  | Failfast | 业务快速感知失败状态进行自主决策 | 产生较多报错的信息 |

  | Failsafe | 即使失败了也不会影响核心流程 | 对于失败的信息不敏感,需要额外的监控 |

  | Failback | 失败自动异步重试 | 重试任务可能堆积 |

  | Forking | 并行发起多个调用,降低失败概率 | 消耗额外的机器资源,需要确保操作幂等性 |

  | Broadcast | 支持对所有的服务提供者进行操作 | 资源消耗很大 |

  综上我们得知,不同的容错策略往往对应不同的业务处理,这里做一个总结如下:

  Failover:通常用于对调用rt不敏感的场景,如读操作;但重试会带来更长延迟

  Failfast:通常用于非幂等性操作,需要快速感知失败的场景;比如新增记录

  Failsafe:通常用于旁路系统,失败不影响核心流程正确性的场景;如日志记录

  Failback:通常用于对于实时性要求不高,且不需要返回值的一些异步操作的场景

  Forking:通常用于资源充足,且对于失败的容忍度较低,实时性要求高的读操作,但需要浪费更多服务资源

  Broadcast:如通知所有提供者更新缓存或日志等本地资源信息

  源码分析

  我们在上一章看到了两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。那么 Cluster 接口和相关实现类有什么用呢?用途比较简单,仅用于生成 Cluster Invoker。下面我们来看一下源码。

publicclassFailoverClusterimplementsCluster{publicfinalstaticStringNAME="failover";@Overridepublic<T>Invoker<T>join(Directory<T>directory)throwsRpcException{//创建并返回FailoverClusterInvoker对象returnnewFailoverClusterInvoker<T>(directory);}}

  如上,FailoverCluster 总共就包含这几行代码,用于创建 FailoverClusterInvoker 对象,很简单。下面再看一个。

publicclassFailbackClusterimplementsCluster{publicfinalstaticStringNAME="failback";@Overridepublic<T>Invoker<T>join(Directory<T>directory)throwsRpcException{//创建并返回FailbackClusterInvoker对象returnnewFailbackClusterInvoker<T>(directory);}}

  如上,FailbackCluster 的逻辑也是很简单,无需解释了。所以接下来,我们把重点放在各种 Cluster Invoker 上

  Cluster Invoker

  我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

publicResultinvoke(finalInvocationinvocation)throwsRpcException{checkWhetherDestroyed();LoadBalanceloadbalance=null;//绑定attachments到invocation中.Map<String,String>contextAttachments=RpcContext.getContext().getAttachments();if(contextAttachments!=null&&contextAttachments.size()!=0){((RpcInvocation)invocation).addAttachments(contextAttachments);}//列举InvokerList<Invoker<T>>invokers=list(invocation);if(invokers!=null&&!invokers.isEmpty()){//加载LoadBalanceloadbalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation),Constants.LOADBALANCE_KEY,Constants.DEFAULT_LOADBALANCE));}RpcUtils.attachInvocationIdIfAsync(getUrl(),invocation);//调用doInvoke进行后续操作returndoInvoke(invocation,invokers,loadbalance);}//抽象方法,由子类实现protectedabstractResultdoInvoke(Invocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException;

  AbstractClusterInvoker 的 invoke 方法主要用于列举 Invoker,以及加载 LoadBalance。最后再调用模板方法 doInvoke 进行后续操作。下面我们来看一下 Invoker 列举方法 list(Invocation) 的逻辑,如下:

protectedList<Invoker<T>>list(Invocationinvocation)throwsRpcException{//调用Directory的list方法列举InvokerList<Invoker<T>>invokers=directory.list(invocation);returninvokers;}

  如上,AbstractClusterInvoker 中的 list 方法做的事情很简单,只是简单的调用了 Directory 的 list 方法,没有其他更多的逻辑了。Directory 即相关实现类在前文已经分析过,这里就不多说了。接下来,我们把目光转移到 AbstractClusterInvoker 的各种实现类上,来看一下这些实现类是如何实现 doInvoke 方法逻辑的。

  FailoverClusterInvoker

  FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。下面来看一下该类的逻辑。

publicclassFailoverClusterInvoker<T>extendsAbstractClusterInvoker<T>{//省略部分代码@OverridepublicResultdoInvoke(Invocationinvocation,finalList<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{List<Invoker<T>>copyinvokers=invokers;checkInvokers(copyinvokers,invocation);//获取重试次数intlen=getUrl().getMethodParameter(invocation.getMethodName(),Constants.RETRIES_KEY,Constants.DEFAULT_RETRIES)+1;if(len<=0){len=1;}RpcExceptionle=null;List<Invoker<T>>invoked=newArrayList<Invoker<T>>(copyinvokers.size());Set<String>providers=newHashSet<String>(len);//循环调用,失败重试for(inti=0;i<len;i++){if(i>0){checkWhetherDestroyed();//在进行重试前重新列举Invoker,这样做的好处是,如果某个服务挂了,//通过调用list可得到最新可用的Invoker列表copyinvokers=list(invocation);//对copyinvokers进行判空检查checkInvokers(copyinvokers,invocation);}//通过负载均衡选择InvokerInvoker<T>invoker=select(loadbalance,invocation,copyinvokers,invoked);//添加到invoker到invoked列表中invoked.add(invoker);//设置invoked到RPC上下文中RpcContext.getContext().setInvokers((List)invoked);try{//调用目标Invoker的invoke方法Resultresult=invoker.invoke(invocation);returnresult;}catch(RpcExceptione){if(e.isBiz()){throwe;}le=e;}catch(Throwablee){le=newRpcException(e.getMessage(),e);}finally{providers.add(invoker.getUrl().getAddress());}}//若重试失败,则抛出异常thrownewRpcException(...,"Failedtoinvokethemethod...");}}

  如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。整个流程大致如此,不是很难理解。下面我们看一下 select 方法的逻辑。

protectedInvoker<T>select(LoadBalanceloadbalance,Invocationinvocation,List<Invoker<T>>invokers,List<Invoker<T>>selected)throwsRpcException{if(invokers==null||invokers.isEmpty())returnnull;//获取调用方法名StringmethodName=invocation==null?"":invocation.getMethodName();//获取sticky配置,sticky表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的//调用同一个服务提供者,除非该提供者挂了再进行切换booleansticky=invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY,Constants.DEFAULT_CLUSTER_STICKY);{//检测invokers列表是否包含stickyInvoker,如果不包含,//说明stickyInvoker代表的服务提供者挂了,此时需要将其置空if(stickyInvoker!=null&&!invokers.contains(stickyInvoker)){stickyInvoker=null;}//在sticky为true,且stickyInvoker!=null的情况下。如果selected包含//stickyInvoker,表明stickyInvoker对应的服务提供者可能因网络原因未能成功提供服务。//但是该提供者并没挂,此时invokers列表中仍存在该服务提供者对应的Invoker。if(sticky&&stickyInvoker!=null&&(selected==null||!selected.contains(stickyInvoker))){//availablecheck表示是否开启了可用性检查,如果开启了,则调用stickyInvoker的//isAvailable方法进行检查,如果检查通过,则直接返回stickyInvoker。if(availablecheck&&stickyInvoker.isAvailable()){returnstickyInvoker;}}}//如果线程走到当前代码处,说明前面的stickyInvoker为空,或者不可用。//此时继续调用doSelect选择InvokerInvoker<T>invoker=doSelect(loadbalance,invocation,invokers,selected);//如果sticky为true,则将负载均衡组件选出的Invoker赋值给stickyInvokerif(sticky){stickyInvoker=invoker;}returninvoker;}

  如上,select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是**存活着的服务提供者**列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。

  以上就是 select 方法的逻辑,这段逻辑看起来不是很复杂,但是信息量比较大。不搞懂 invokers 和 selected 两个入参的含义,以及粘滞连接特性,这段代码是不容易看懂的。所以大家在阅读这段代码时,不要忽略了对背景知识的理解。关于 select 方法先分析这么多,继续向下分析。

privateInvoker<T>doSelect(LoadBalanceloadbalance,Invocationinvocation,List<Invoker<T>>invokers,List<Invoker<T>>selected)throwsRpcException{if(invokers==null||invokers.isEmpty())returnnull;if(invokers.size()==1)returninvokers.get(0);if(loadbalance==null){//如果loadbalance为空,这里通过SPI加载Loadbalance,默认为RandomLoadBalanceloadbalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);}//通过负载均衡组件选择InvokerInvoker<T>invoker=loadbalance.select(invokers,getUrl(),invocation);//如果selected包含负载均衡选择出的Invoker,或者该Invoker无法经过可用性检查,此时进行重选if((selected!=null&&selected.contains(invoker))||(!invoker.isAvailable()&&getUrl()!=null&&availablecheck)){try{//进行重选Invoker<T>rinvoker=reselect(loadbalance,invocation,invokers,selected,availablecheck);if(rinvoker!=null){//如果rinvoker不为空,则将其赋值给invokerinvoker=rinvoker;}else{//rinvoker为空,定位invoker在invokers中的位置intindex=invokers.indexOf(invoker);try{//获取index+1位置处的Invoker,以下代码等价于://invoker=invokers.get((index+1)%invokers.size());invoker=index<invokers.size()-1?invokers.get(index+1):invokers.get(0);}catch(Exceptione){logger.warn("...maybecauseinvokerslistdynamicchange,ignore.");}}}catch(Throwablet){logger.error("clusterreselectfailreasonis:...");}}returninvoker;}

  doSelect 主要做了两件事,第一是通过负载均衡组件选择 Invoker。第二是,如果选出来的 Invoker 不稳定,或不可用,此时需要调用 reselect 方法进行重选。若 reselect 选出来的 Invoker 为空,此时定位 invoker 在 invokers 列表中的位置 index,然后获取 index + 1 处的 invoker,这也可以看做是重选逻辑的一部分。下面我们来看一下 reselect 方法的逻辑。

privateInvoker<T>reselect(LoadBalanceloadbalance,Invocationinvocation,List<Invoker<T>>invokers,List<Invoker<T>>selected,booleanavailablecheck)throwsRpcException{List<Invoker<T>>reselectInvokers=newArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());//下面的if-else分支逻辑有些冗余,pullrequest#2826对这段代码进行了简化,可以参考一下//根据availablecheck进行不同的处理if(availablecheck){//遍历invokers列表for(Invoker<T>invoker:invokers){//检测可用性if(invoker.isAvailable()){//如果selected列表不包含当前invoker,则将其添加到reselectInvokers中if(selected==null||!selected.contains(invoker)){reselectInvokers.add(invoker);}}}//reselectInvokers不为空,此时通过负载均衡组件进行选择if(!reselectInvokers.isEmpty()){returnloadbalance.select(reselectInvokers,getUrl(),invocation);}//不检查Invoker可用性}else{for(Invoker<T>invoker:invokers){//如果selected列表不包含当前invoker,则将其添加到reselectInvokers中if(selected==null||!selected.contains(invoker)){reselectInvokers.add(invoker);}}if(!reselectInvokers.isEmpty()){//通过负载均衡组件进行选择returnloadbalance.select(reselectInvokers,getUrl(),invocation);}}{//若线程走到此处,说明reselectInvokers集合为空,此时不会调用负载均衡组件进行筛选。//这里从selected列表中查找可用的Invoker,并将其添加到reselectInvokers集合中if(selected!=null){for(Invoker<T>invoker:selected){if((invoker.isAvailable())&&!reselectInvokers.contains(invoker)){reselectInvokers.add(invoker);}}}if(!reselectInvokers.isEmpty()){//再次进行选择,并返回选择结果returnloadbalance.select(reselectInvokers,getUrl(),invocation);}}returnnull;}

  reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到 reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。其中第一件事情又可进行细分,一开始,reselect 从 invokers 列表中查找有效可用的 Invoker,若未能找到,此时再到 selected 列表中继续查找。关于 reselect 方法就先分析到这,继续分析其他的 Cluster Invoker。

  FailbackClusterInvoker

  FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。下面来看一下它的实现逻辑。

publicclassFailbackClusterInvoker<T>extendsAbstractClusterInvoker<T>{privatestaticfinallongRETRY_FAILED_PERIOD=5*1000;privatefinalScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(2,newNamedInternalThreadFactory("failback-cluster-timer",true));privatefinalConcurrentMap<Invocation,AbstractClusterInvoker<?>>failed=newConcurrentHashMap<Invocation,AbstractClusterInvoker<?>>();privatevolatileScheduledFuture<?>retryFuture;@OverrideprotectedResultdoInvoke(Invocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{try{checkInvokers(invokers,invocation);//选择InvokerInvoker<T>invoker=select(loadbalance,invocation,invokers,null);//进行调用returninvoker.invoke(invocation);}catch(Throwablee){//如果调用过程中发生异常,此时仅打印错误日志,不抛出异常logger.error("Failbacktoinvokemethod...");//记录调用信息addFailed(invocation,this);//返回一个空结果给服务消费者returnnewRpcResult();}}privatevoidaddFailed(Invocationinvocation,AbstractClusterInvoker<?>router){if(retryFuture==null){synchronized(this){if(retryFuture==null){//创建定时任务,每隔5秒执行一次retryFuture=scheduledExecutorService.scheduleWithFixedDelay(newRunnable(){@Overridepublicvoidrun(){try{//对失败的调用进行重试retryFailed();}catch(Throwablet){//如果发生异常,仅打印异常日志,不抛出logger.error("Unexpectederroroccuratcollectstatistic",t);}}},RETRY_FAILED_PERIOD,RETRY_FAILED_PERIOD,TimeUnit.MILLISECONDS);}}}//添加invocation和invoker到failed中failed.put(invocation,router);}voidretryFailed(){if(failed.size()==0){return;}//遍历failed,对失败的调用进行重试for(Map.Entry<Invocation,AbstractClusterInvoker<?>>entry:newHashMap<Invocation,AbstractClusterInvoker<?>>(failed).entrySet()){Invocationinvocation=entry.getKey();Invoker<?>invoker=entry.getValue();try{//再次进行调用invoker.invoke(invocation);//调用成功后,从failed中移除invokerfailed.remove(invocation);}catch(Throwablee){//仅打印异常,不抛出logger.error("Failedretrytoinvokemethod...");}}}}

  这个类主要由3个方法组成,首先是 doInvoker,该方法负责初次的远程调用。若远程调用失败,则通过 addFailed 方法将调用信息存入到 failed 中,等待定时重试。addFailed 在开始阶段会根据 retryFuture 为空与否,来决定是否开启定时任务。retryFailed 方法则是包含了失败重试的逻辑,该方法会对 failed 进行遍历,然后依次对 Invoker 进行调用。调用成功则将 Invoker 从 failed 中移除,调用失败则忽略失败原因。

  以上就是 FailbackClusterInvoker 的执行逻辑,不是很复杂,继续往下看。

  FailfastClusterInvoker

  FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。源码如下:

publicclassFailfastClusterInvoker<T>extendsAbstractClusterInvoker<T>{@OverridepublicResultdoInvoke(Invocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{checkInvokers(invokers,invocation);//选择InvokerInvoker<T>invoker=select(loadbalance,invocation,invokers,null);try{//调用Invokerreturninvoker.invoke(invocation);}catch(Throwablee){if(einstanceofRpcException&&((RpcException)e).isBiz()){//抛出异常throw(RpcException)e;}//抛出异常thrownewRpcException(...,"Failfastinvokeproviders...");}}}

  如上,首先是通过 select 方法选择 Invoker,然后进行远程调用。如果调用失败,则立即抛出异常。FailfastClusterInvoker 就先分析到这,下面分析 FailsafeClusterInvoker。

  FailsafeClusterInvoker

  FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。下面分析源码。

publicclassFailsafeClusterInvoker<T>extendsAbstractClusterInvoker<T>{@OverridepublicResultdoInvoke(Invocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{try{checkInvokers(invokers,invocation);//选择InvokerInvoker<T>invoker=select(loadbalance,invocation,invokers,null);//进行远程调用returninvoker.invoke(invocation);}catch(Throwablee){//打印错误日志,但不抛出logger.error("Failsafeignoreexception:"+e.getMessage(),e);//返回空结果忽略错误returnnewRpcResult();}}}

  FailsafeClusterInvoker 的逻辑和 FailfastClusterInvoker 的逻辑一样简单,无需过多说明。继续向下分析。

  ForkingClusterInvoker

  ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高**读操作**(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。下面来看该类的实现。

publicclassForkingClusterInvoker<T>extendsAbstractClusterInvoker<T>{privatefinalExecutorServiceexecutor=Executors.newCachedThreadPool(newNamedInternalThreadFactory("forking-cluster-timer",true));@OverridepublicResultdoInvoke(finalInvocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{try{checkInvokers(invokers,invocation);finalList<Invoker<T>>selected;//获取forks配置finalintforks=getUrl().getParameter(Constants.FORKS_KEY,Constants.DEFAULT_FORKS);//获取超时配置finalinttimeout=getUrl().getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);//如果forks配置不合理,则直接将invokers赋值给selectedif(forks<=0||forks>=invokers.size()){selected=invokers;}else{selected=newArrayList<Invoker<T>>();//循环选出forks个Invoker,并添加到selected中for(inti=0;i<forks;i++){//选择InvokerInvoker<T>invoker=select(loadbalance,invocation,invokers,selected);if(!selected.contains(invoker)){selected.add(invoker);}}}//----------------------✨分割线1✨----------------------//RpcContext.getContext().setInvokers((List)selected);finalAtomicIntegercount=newAtomicInteger();finalBlockingQueue<Object>ref=newLinkedBlockingQueue<Object>();//遍历selected列表for(finalInvoker<T>invoker:selected){//为每个Invoker创建一个执行线程executor.execute(newRunnable(){@Overridepublicvoidrun(){try{//进行远程调用Resultresult=invoker.invoke(invocation);//将结果存到阻塞队列中ref.offer(result);}catch(Throwablee){intvalue=count.incrementAndGet();//仅在value大于等于selected.size()时,才将异常对象//放入阻塞队列中,请大家思考一下为什么要这样做。if(value>=selected.size()){//将异常对象存入到阻塞队列中ref.offer(e);}}}});}//----------------------✨分割线2✨----------------------//try{//从阻塞队列中取出远程调用结果Objectret=ref.poll(timeout,TimeUnit.MILLISECONDS);//如果结果类型为Throwable,则抛出异常if(retinstanceofThrowable){Throwablee=(Throwable)ret;thrownewRpcException(...,"Failedtoforkinginvokeprovider...");}//返回结果return(Result)ret;}catch(InterruptedExceptione){thrownewRpcException("Failedtoforkinginvokeprovider...");}}finally{RpcContext.getContext().clearAttachments();}}}

  ForkingClusterInvoker 的 doInvoker 方法比较长,这里通过两个分割线将整个方法划分为三个逻辑块。从方法开始到分割线1之间的代码主要是用于选出 forks 个 Invoker,为接下来的并发调用提供输入。分割线1和分割线2之间的逻辑通过线程池并发调用多个 Invoker,并将结果存储在阻塞队列中。分割线2到方法结尾之间的逻辑主要用于从阻塞队列中获取返回结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回。

  以上就是ForkingClusterInvoker 的 doInvoker 方法大致过程。我们在分割线1和分割线2之间的代码上留了一个问题,问题是这样的:为什么要在`value >= selected.size()`的情况下,才将异常对象添加到阻塞队列中?这里来解答一下。原因是这样的,在并行调用多个服务提供者的情况下,只要有一个服务提供者能够成功返回结果,而其他全部失败。此时 ForkingClusterInvoker 仍应该返回成功的结果,而非抛出异常。在`value >= selected.size()`时将异常对象放入阻塞队列中,可以保证异常对象不会出现在正常结果的前面,这样可从阻塞队列中优先取出正常的结果。

  关于 ForkingClusterInvoker 就先分析到这,接下来分析最后一个 Cluster Invoker。

  BroadcastClusterInvoker

  本章的最后,我们再来看一下 BroadcastClusterInvoker。BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。源码如下。

publicclassBroadcastClusterInvoker<T>extendsAbstractClusterInvoker<T>{@OverridepublicResultdoInvoke(finalInvocationinvocation,List<Invoker<T>>invokers,LoadBalanceloadbalance)throwsRpcException{checkInvokers(invokers,invocation);RpcContext.getContext().setInvokers((List)invokers);RpcExceptionexception=null;Resultresult=null;//遍历Invoker列表,逐个调用for(Invoker<T>invoker:invokers){try{//进行远程调用result=invoker.invoke(invocation);}catch(RpcExceptione){exception=e;logger.warn(e.getMessage(),e);}catch(Throwablee){exception=newRpcException(e.getMessage(),e);logger.warn(e.getMessage(),e);}}//exception不为空,则抛出异常if(exception!=null){throwexception;}returnresult;}}

  以上就是 BroadcastClusterInvoker 的代码,比较简单,就不多说了。

  这里分析了集群容错的几种实现方式。集群容错对于 Dubbo 框架来说,是很重要的逻辑。集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其封禁服务提供者集群的情况,使其能够专心进行远程调用。除此之外,通过集群模块,我们还可以对服务之间的调用链路进行编排优化,治理服务。总的来说,对于 Dubbo 而言,集群容错相关逻辑是非常重要的。想要对 Dubbo 有比较深的理解,集群容错是必须要掌握的。

  负载均衡

  在之前章节中,介绍了服务集群的调用方式。我们发现在多服务实例时,负载均衡调用是其中极其重要的一环。在本章节中,我们一起学习Dubbo中的各种负载均衡策略

  负载均衡的主要作用

  负载均衡(LoadBalance),它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。

  在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。

  内置的负载均衡策略

  Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。这几个负载均衡算法代码不是很长,但是想看懂也不是很容易,需要大家对这几个算法的原理有一定了解才行。如果不是很了解,也没不用太担心。我们会在分析每个算法的源码之前,对算法原理进行简单的讲解,帮助大家建立初步的印象。

  RandomLoadBalance

  RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。

  以上就是 RandomLoadBalance 背后的算法思想,比较简单。下面开始分析源码。


  RandomLoadBalance 的算法思想比较简单,在经过多次请求后,能够将调用请求按照权重值进行“均匀”分配。当然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为缺省实现。

  LeastActiveLoadBalance

  LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。关于 LeastActiveLoadBalance 的背景知识就先介绍到这里,下面开始分析源码。

publicclassRandomLoadBalanceextendsAbstractLoadBalance{publicstaticfinalStringNAME="random";privatefinalRandomrandom=newRandom();@Overrideprotected<T>Invoker<T>doSelect(List<Invoker<T>>invokers,URLurl,Invocationinvocation){intlength=invokers.size();inttotalWeight=0;booleansameWeight=true;//下面这个循环有两个作用,第一是计算总权重totalWeight,//第二是检测每个服务提供者的权重是否相同for(inti=0;i<length;i++){intweight=getWeight(invokers.get(i),invocation);//累加权重totalWeight+=weight;//检测当前服务提供者的权重与上一个服务提供者的权重是否相同,//不相同的话,则将sameWeight置为false。if(sameWeight&&i>0&&weight!=getWeight(invokers.get(i-1),invocation)){sameWeight=false;}}//下面的if分支主要用于获取随机数,并计算随机数落在哪个区间上if(totalWeight>0&&!sameWeight){//随机获取一个[0,totalWeight)区间内的数字intoffset=random.nextInt(totalWeight);//循环让offset数减去服务提供者权重值,当offset小于0时,返回相应的Invoker。//举例说明一下,我们有servers=[A,B,C],weights=[5,3,2],offset=7。//第一次循环,offset-5=2>0,即offset>5,//表明其不会落在服务器A对应的区间上。//第二次循环,offset-3=-1<0,即5<offset<8,//表明其会落在服务器B对应的区间上for(inti=0;i<length;i++){//让随机值offset减去权重值offset-=getWeight(invokers.get(i),invocation);if(offset<0){//返回相应的Invokerreturninvokers.get(i);}}}//如果所有服务提供者权重值相同,此时直接随机返回一个即可returninvokers.get(random.nextInt(length));}}

  除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

  ConsistentHashLoadBalance

  一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 2^32-1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

  下面来看看一致性 hash 在 Dubbo 中的应用。我们把上图的缓存节点替换成 Dubbo 的服务提供者,于是得到了下图:

  这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。比如:

  如上,由于 Invoker-1 和 Invoker-2 在圆环上分布不均,导致系统中75%的请求都会落到 Invoker-1 上,只有 25% 的请求会落到 Invoker-2 上。解决这个问题办法是引入虚拟节点,通过虚拟节点均衡各个节点的请求量。

  到这里背景知识就普及完了,接下来开始分析源码。我们先从 ConsistentHashLoadBalance 的 doSelect 方法开始看起,如下:

publicclassConsistentHashLoadBalanceextendsAbstractLoadBalance{privatefinalConcurrentMap<String,ConsistentHashSelector<?>>selectors=newConcurrentHashMap<String,ConsistentHashSelector<?>>();@Overrideprotected<T>Invoker<T>doSelect(List<Invoker<T>>invokers,URLurl,Invocationinvocation){StringmethodName=RpcUtils.getMethodName(invocation);Stringkey=invokers.get(0).getUrl().getServiceKey()+"."+methodName;//获取invokers原始的hashcodeintidentityHashCode=System.identityHashCode(invokers);ConsistentHashSelector<T>selector=(ConsistentHashSelector<T>)selectors.get(key);//如果invokers是一个新的List对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。//此时selector.identityHashCode!=identityHashCode条件成立if(selector==null||selector.identityHashCode!=identityHashCode){//创建新的ConsistentHashSelectorselectors.put(key,newConsistentHashSelector<T>(invokers,methodName,identityHashCode));selector=(ConsistentHashSelector<T>)selectors.get(key);}//调用ConsistentHashSelector的select方法选择Invokerreturnselector.select(invocation);}privatestaticfinalclassConsistentHashSelector<T>{...}}

  如上,doSelect 方法主要做了一些前置工作,比如检测 invokers 列表是不是变动过,以及创建 ConsistentHashSelector。这些工作做完后,接下来开始调用 ConsistentHashSelector 的 select 方法执行负载均衡逻辑。在分析 select 方法之前,我们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

privatestaticfinalclassConsistentHashSelector<T>{//使用TreeMap存储Invoker虚拟节点privatefinalTreeMap<Long,Invoker<T>>virtualInvokers;privatefinalintreplicaNumber;privatefinalintidentityHashCode;privatefinalint[]argumentIndex;ConsistentHashSelector(List<Invoker<T>>invokers,StringmethodName,intidentityHashCode){this.virtualInvokers=newTreeMap<Long,Invoker<T>>();this.identityHashCode=identityHashCode;URLurl=invokers.get(0).getUrl();//获取虚拟节点数,默认为160this.replicaNumber=url.getMethodParameter(methodName,"hash.nodes",160);//获取参与hash计算的参数下标值,默认对第一个参数进行hash运算String[]index=Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName,"hash.arguments","0"));argumentIndex=newint[index.length];for(inti=0;i<index.length;i++){argumentIndex[i]=Integer.parseInt(index[i]);}for(Invoker<T>invoker:invokers){Stringaddress=invoker.getUrl().getAddress();for(inti=0;i<replicaNumber/4;i++){//对address+i进行md5运算,得到一个长度为16的字节数组byte[]digest=md5(address+i);//对digest部分字节进行4次hash运算,得到四个不同的long型正整数for(inth=0;h<4;h++){//h=0时,取digest中下标为0~3的4个字节进行位运算//h=1时,取digest中下标为4~7的4个字节进行位运算//h=2,h=3时过程同上longm=hash(digest,h);//将hash到invoker的映射关系存储到virtualInvokers中,//virtualInvokers需要提供高效的查询操作,因此选用TreeMap作为存储结构virtualInvokers.put(m,invoker);}}}}}

  ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比如从配置中获取虚拟节点数以及参与 hash 计算的参数下标,默认情况下只使用第一个参数进行 hash。需要特别说明的是,ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。ConsistentHashLoadBalance 不 关系权重,因此使用时需要注意一下。

  在获取虚拟节点数和参数下标配置后,接下来要做的事情是计算虚拟节点 hash 值,并将虚拟节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就完成了。接下来,我们来看看 select 方法的逻辑。

publicInvoker<T>select(Invocationinvocation){//将参数转为keyStringkey=toKey(invocation.getArguments());//对参数key进行md5运算byte[]digest=md5(key);//取digest数组的前四个字节进行hash运算,再将hash值传给selectForKey方法,//寻找合适的InvokerreturnselectForKey(hash(digest,0));}privateInvoker<T>selectForKey(longhash){//到TreeMap中查找第一个节点值大于或等于当前hash的InvokerMap.Entry<Long,Invoker<T>>entry=virtualInvokers.tailMap(hash,true).firstEntry();//如果hash大于Invoker在圆环上最大的位置,此时entry=null,//需要将TreeMap的头节点赋值给entryif(entry==null){entry=virtualInvokers.firstEntry();}//返回Invokerreturnentry.getValue();}

  如上,选择的过程相对比较简单了。首先是对参数进行 md5 以及 hash 运算,得到一个 hash 值。然后再拿这个值到 TreeMap 中查找目标 Invoker 即可。

  RoundRobinLoadBalance

  LeastActiveLoadBalance 即加权轮询负载均衡,我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

publicclassRoundRobinLoadBalanceextendsAbstractLoadBalance{publicstaticfinalStringNAME="roundrobin";privatestaticintRECYCLE_PERIOD=60000;protectedstaticclassWeightedRoundRobin{//服务提供者权重privateintweight;//当前权重privateAtomicLongcurrent=newAtomicLong(0);//最后一次更新时间privatelonglastUpdate;publicvoidsetWeight(intweight){this.weight=weight;//初始情况下,current=0current.set(0);}publiclongincreaseCurrent(){//current=current+weight;returncurrent.addAndGet(weight);}publicvoidsel(inttotal){//current=current-total;current.addAndGet(-1*total);}}//嵌套Map结构,存储的数据结构示例如下://{//"UserService.query":{//"url1":WeightedRoundRobin@123,//"url2":WeightedRoundRobin@456,//},//"UserService.update":{//"url1":WeightedRoundRobin@123,//"url2":WeightedRoundRobin@456,//}//}//最外层为服务类名+方法名,第二层为url到WeightedRoundRobin的映射关系。//这里我们可以将url看成是服务提供者的idprivateConcurrentMap<String,ConcurrentMap<String,WeightedRoundRobin>>methodWeightMap=newConcurrentHashMap<String,ConcurrentMap<String,WeightedRoundRobin>>();//原子更新锁privateAtomicBooleanupdateLock=newAtomicBoolean();@Overrideprotected<T>Invoker<T>doSelect(List<Invoker<T>>invokers,URLurl,Invocationinvocation){Stringkey=invokers.get(0).getUrl().getServiceKey()+"."+invocation.getMethodName();//获取url到WeightedRoundRobin映射表,如果为空,则创建一个新的ConcurrentMap<String,WeightedRoundRobin>map=methodWeightMap.get(key);if(map==null){methodWeightMap.putIfAbsent(key,newConcurrentHashMap<String,WeightedRoundRobin>());map=methodWeightMap.get(key);}inttotalWeight=0;longmaxCurrent=Long.MIN_VALUE;//获取当前时间longnow=System.currentTimeMillis();Invoker<T>selectedInvoker=null;WeightedRoundRobinselectedWRR=null;//下面这个循环主要做了这样几件事情://1.遍历Invoker列表,检测当前Invoker是否有//相应的WeightedRoundRobin,没有则创建//2.检测Invoker权重是否发生了变化,若变化了,//则更新WeightedRoundRobin的weight字段//3.让current字段加上自身权重,等价于current+=weight//4.设置lastUpdate字段,即lastUpdate=now//5.寻找具有最大current的Invoker,以及Invoker对应的WeightedRoundRobin,//暂存起来,留作后用//6.计算权重总和for(Invoker<T>invoker:invokers){StringidentifyString=invoker.getUrl().toIdentityString();WeightedRoundRobinweightedRoundRobin=map.get(identifyString);intweight=getWeight(invoker,invocation);if(weight<0){weight=0;}//检测当前Invoker是否有对应的WeightedRoundRobin,没有则创建if(weightedRoundRobin==null){weightedRoundRobin=newWeightedRoundRobin();//设置Invoker权重weightedRoundRobin.setWeight(weight);//存储url唯一标识identifyString到weightedRoundRobin的映射关系map.putIfAbsent(identifyString,weightedRoundRobin);weightedRoundRobin=map.get(identifyString);}//Invoker权重不等于WeightedRoundRobin中保存的权重,说明权重变化了,此时进行更新if(weight!=weightedRoundRobin.getWeight()){weightedRoundRobin.setWeight(weight);}//让current加上自身权重,等价于current+=weightlongcur=weightedRoundRobin.increaseCurrent();//设置lastUpdate,表示近期更新过weightedRoundRobin.setLastUpdate(now);//找出最大的currentif(cur>maxCurrent){maxCurrent=cur;//将具有最大current权重的Invoker赋值给selectedInvokerselectedInvoker=invoker;//将Invoker对应的weightedRoundRobin赋值给selectedWRR,留作后用selectedWRR=weightedRoundRobin;}//计算权重总和totalWeight+=weight;}//对<identifyString,WeightedRoundRobin>进行检查,过滤掉长时间未被更新的节点。//该节点可能挂了,invokers中不包含该节点,所以该节点的lastUpdate长时间无法被更新。//若未更新时长超过阈值后,就会被移除掉,默认阈值为60秒。if(!updateLock.get()&&invokers.size()!=map.size()){if(updateLock.compareAndSet(false,true)){try{ConcurrentMap<String,WeightedRoundRobin>newMap=newConcurrentHashMap<String,WeightedRoundRobin>();//拷贝newMap.putAll(map);//遍历修改,即移除过期记录Iterator<Entry<String,WeightedRoundRobin>>it=newMap.entrySet().iterator();while(it.hasNext()){Entry<String,WeightedRoundRobin>item=it.next();if(now-item.getValue().getLastUpdate()>RECYCLE_PERIOD){it.remove();}}//更新引用methodWeightMap.put(key,newMap);}finally{updateLock.set(false);}}}if(selectedInvoker!=null){//让current减去权重总和,等价于current-=totalWeightselectedWRR.sel(totalWeight);//返回具有最大current的InvokerreturnselectedInvoker;}//shouldnothappenherereturninvokers.get(0);}}

  轮询调用并不是简单的一个接着一个依次调用,它是根据权重的值进行循环的。

  负载均衡总结

  Dubbo 负载均衡策略提供下列四种方式:

   Random LoadBalance 随机,按权重设置随机概率。Dubbo的默认负载均衡策略

   在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

   RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。

   存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

   LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

   使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

   ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者。

   当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

  服务治理

  服务治理的概述

  服务治理主要作用是改变运行时服务的行为和选址逻辑,达到限流,权重配置等目的,主要有:标签路由,条件路由,黑白名单,动态配置,权重调节,负载均衡等功能。

  执行过程

  1、消费者,提供者启动成功,订阅zookeeper节点

  2、管理平台对服务进行治理处理,向zookeeper写入节点数据

  3、写入成功,通知消费者,提供者

  4、根据不同的业务处理,在invoker调用时做出响应的处理

  相关案例

  服务禁用

  服务禁用:通常用于临时踢除某台提供者机器

configVersion:v2.7scope:applicationkey:demo-providerenabled:trueconfigs:-addresses:["192.168.191.2:20883"]side:providerparameters:disabled:true

  服务降级封禁

  服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。

  容错:当系统出现非业务异常(比如并发数太高导致超时,网络异常等)时,不对该接口进行处理。

  封禁:在大促,促销活动的可预知情况下,例如双11活动。采用直接封禁接口访问

configVersion:v2.7scope:servicekey:org.apache.dubbo.samples.governance.api.DemoServiceenabled:trueconfigs:-side:consumerparameters:force:return12345


版权声明

本文仅代表作者观点,不代表博信信息网立场。

热门