一次完整的RPC调用过程

前面的文章研究了dubbo中服务的发布和引用流程,这篇文章主要集中在真实rpc调用的过程上。

负载均衡算法

在进行真实的rpc调用前,有一个非常重要的步骤就是进行负载均衡。由于对于同一个服务来说,往往都有很多服务提供者,与此对应的就是服务消费者在进行服务引用时,本地会缓存很多的Invoker,在进行真实rpc调用时,如何选择Invoker,是负载均衡要解决的问题。dubbo中主要有四种负载均衡算法,分别为:RandomLoadBalanceLeastActiveLoadBalanceRoundRobinLoadBalanceConsistentHashLoadBalance,下面分别介绍这四种负载均衡算法。

RandomLoadBalance

  • 随机,按权重设置随机概率。
  • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

关键代码如下:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {  
    int length = invokers.size(); // 总个数
    int totalWeight = 0; // 总权重
    boolean sameWeight = true; // 权重是否都一样
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // 累计总权重
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false; // 计算所有权重是否一样
        }
    }
    if (totalWeight > 0 && ! sameWeight) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = random.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return invokers.get(random.nextInt(length));
}

LeastActiveLoadBalance

  • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
  • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

最少活跃调用数,我的理解是对于每一个url(即代表某一个provider)和方法都有一个当前正在调用数量,对于consumer端来说,ActiveLimitFilter统计了当前针对每个url和方法的调用数量,开始调用时数量加1,结束调用时数量减1,统计调用数量的关键代码如下:

RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());  
......
long begin = System.currentTimeMillis();  
RpcStatus.beginCount(url, methodName);  
try {  
  Result result = invoker.invoke(invocation);
  RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
  return result;
} catch (RuntimeException t) {
  RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
  throw t;
}

其中 RpcStatus.beginCount() 是增加计数, RpcStatus.endCount() 是减少计数。 有了每个url和方法当前调用数量(活跃数量)后,就可以进行LeastActiveLoadBalance负载均衡策略了。这个算法的思路就是把新的rpc请求分配当前活跃数量较少的机器上,实现起来也很简单——首先找到所有最小活跃数的invoker集合,如果只有一个invoker,则直接分配至此invoker,如果存在多个最小的invoker,则按照RandomLoadBalance算法进行选择。由于代码比较简单,这里就不列出来了。

RoundRobinLoadBalance

  • 轮循,按公约后的权重设置轮循比率。
  • 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

这种算法就是加上权重的轮循算法,这种算法能保证权重大的invoker被调用的概率更大。当所有invoker的权重都一样的时候,这个算法就会退化为普通轮循算法。关键代码如下:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {  
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int length = invokers.size(); // 总个数
    int maxWeight = 0; // 最大权重
    int minWeight = Integer.MAX_VALUE; // 最小权重
    final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
    int weightSum = 0;
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight); // 累计最大权重
        minWeight = Math.min(minWeight, weight); // 累计最小权重
        if (weight > 0) {
            invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
            weightSum += weight;
        }
    }
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }
    int currentSequence = sequence.getAndIncrement();
    if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
        int mod = currentSequence % weightSum;
        for (int i = 0; i < maxWeight; i++) {
            for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                final Invoker<T> k = each.getKey();
                final IntegerWrapper v = each.getValue();
                if (mod == 0 && v.getValue() > 0) {
                    return k;
                }
                if (v.getValue() > 0) {
                    v.decrement();
                    mod--;
                }
            }
        }
    }
    // 取模轮循
    return invokers.get(currentSequence % length);
}

ConsistentHashLoadBalance

  • 一致性Hash,相同参数的请求总是发到同一提供者。
  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing。
  • 缺省只对第一个参数Hash,如果要修改,请配置
  • 缺省用160份虚拟节点,如果要修改,请配置

针对每一个Invoker,都会生成160(默认)个虚拟节点,每个虚拟节点都有自己的hash值,将这些invoker按照虚拟节点的hash值存储在TreeMap中。当要进行负载均衡时,首先需要根据方法的参数(哪些参数参与计算上面有配置说明)计算出来hash值,然后利用这个hash值,取出TreeMap中所有比此hash值大的节点,直接返回这些节点中第一个节点即可,如果为空,则取整个TreeMap中的第一个invoker。

通过上面分析可以看出,相同参数的请求总是发到统一提供者,当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。关键代码如下:

// 初始化虚拟节点
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {  
    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    this.identityHashCode = System.identityHashCode(invokers);
    URL url = invokers.get(0).getUrl();
    this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
    argumentIndex = new int[index.length];
    for (int i = 0; i < index.length; i ++) {
        argumentIndex[i] = Integer.parseInt(index[i]);
    }
    for (Invoker<T> invoker : invokers) {
        for (int i = 0; i < replicaNumber / 4; i++) {
            byte[] digest = md5(invoker.getUrl().toFullString() + i);
            for (int h = 0; h < 4; h++) {
                long m = hash(digest, h);
                virtualInvokers.put(m, invoker);
            }
        }
    }
}

public Invoker<T> select(Invocation invocation) {  
    String key = toKey(invocation.getArguments());
    byte[] digest = md5(key);
    Invoker<T> invoker = sekectForKey(hash(digest, 0));
    return invoker;
}
// 将参数转换为string
private String toKey(Object[] args) {  
    StringBuilder buf = new StringBuilder();
    for (int i : argumentIndex) {
        if (i >= 0 && i < args.length) {
            buf.append(args[i]);
        }
    }
    return buf.toString();
}
// 根据参数计算出来hash值,在虚拟节点map中取得对应实际invoker
private Invoker<T> sekectForKey(long hash) {  
    Invoker<T> invoker;
    Long key = hash;
    if (!virtualInvokers.containsKey(key)) {
        // tailMap是取所有比key大的节点map
        SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
        if (tailMap.isEmpty()) {
            key = virtualInvokers.firstKey();
        } else {
            key = tailMap.firstKey();
        }
    }
    invoker = virtualInvokers.get(key);
    return invoker;
}

流程

dubbo底层通信默认采用的是Netty,我们知道,Netty是基于Channel的异步通信框架,而大多数应用场景下都是采用同步RPC调用。因此,在dubbo中存在一个异步转同步的过程。下面将通过一次完整的RPC调用来分析异步转同步的实现原理。

一次rpc调用流程

从上述调用流程图可以看出,整个RPC调用过程分为如下四个步骤:

  • RPC调用请求发送,并返回Future对象
  • 调用Future的get方法挂起当前线程
  • 接收服务器回执消息,唤醒挂起线程
  • 返回RPC调用结果

在远程服务引用的过程中,客户端会从注册中心获取所有provider的信息,包括ip地址和服务监听端口号,然后分别与这些服务提供者建立TCP连接。当客户端通过代理对象调用接口方法时,会触发Invoker的invoke方法,在客户端,这个Invoker是ClusterInvoker,顾名思义,即带有集群功能的Invoker。这个Invoker会首先按照一定的负载均衡算法选出合适的Invoker,进而进行invoke方法的调用。上图就是从经过负载均衡算法后触发invoke方法开始画的。

RPC调用的第一个步骤是创建RPC请求对象,在创建请求对象时,会为每一个请求对象分配一个id,这个id是自增的。然后创建Future对象,这里采用的时DefaultFuture,在创建Future对象的过程中,会以请求对象id为key,future对象为value,将future对象保存至Map中。之后便是利用上述建立起的连接,发送RPC调用请求。也就是说,consumer通过先前建立起的TCP连接,把要调用的接口信息,方法名成,参数类型和参数发送给provider。最后将上述future对象返回给调用者。关键代码如下:

// 发送RPC调用请求
public ResponseFuture request(Object request, int timeout) throws RemotingException {  
  if (closed) {
    throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
  }
  // create request.
  Request req = new Request();
  req.setVersion("2.0.0");
  req.setTwoWay(true);
  req.setData(request);
  DefaultFuture future = new DefaultFuture(channel, req, timeout);
  try{
    channel.send(req);
  }catch (RemotingException e) {
    future.cancel();
    throw e;
  }
  return future;
}

// 创建DefaultFuture对象
public DefaultFuture(Channel channel, Request request, int timeout){  
  this.channel = channel;
  this.request = request;
  this.id = request.getId();
  this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  // put into waiting map.
  FUTURES.put(id, this);
  CHANNELS.put(id, channel);
}

调用者拿到Future对象后,调用其get方法进行RPC调用结果的获取。在get方法中,会进行调用是否结束的判断(判断response对象是否为空),如果发现调用还没有结束,便会调用Condition的await方法,挂起当前线程。关键代码如下:

public Object get() throws RemotingException {  
  return get(timeout);
}

public Object get(int timeout) throws RemotingException {  
  if (timeout <= 0) {
    timeout = Constants.DEFAULT_TIMEOUT;
  }
  if (! isDone()) {
    long start = System.currentTimeMillis();
    lock.lock();
    try {
      while (! isDone()) {
        done.await(timeout, TimeUnit.MILLISECONDS);
        if (isDone() || System.currentTimeMillis() - start > timeout) {
          break;
        }
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      lock.unlock();
    }
    if (! isDone()) {
      throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
    }
  }
  return returnFromResponse();
}
// 判断rpc调用是否结束
public boolean isDone() {  
  return response != null;
}

provider端在创建Response对象时,会将请求对象id赋值给Response对象的id字段,然后将此对象通过请求Channel回复给consumer端。consumer端在进行TCP连接建立的时候,创建了一系列的Handler,provider端发送过来的响应会触发这些Handler的receive方法。这里忽略Handler的调用过程,假设直接调用HeaderExchangeHandler的receive方法,在该receive方法中通过response的对象的id,找到对应请求的future对象,然后将response对象赋值给future中的response,并且调用该future中condition对象的signal方法,唤醒先前挂起线程。关键代码如下:

// HeaderExchangeHandler中处理响应的方法
static void handleResponse(Channel channel, Response response) throws RemotingException {  
  if (response != null && !response.isHeartbeat()) {
    DefaultFuture.received(channel, response);
  }
}

// DefaultFuture中处理响应的方法
public static void received(Channel channel, Response response) {  
  try {
    DefaultFuture future = FUTURES.remove(response.getId());
    if (future != null) {
      future.doReceived(response);
    } else {
      logger.warn("The timeout response finally returned at " 
        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
        + ", response " + response 
        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
        + " -> " + channel.getRemoteAddress()));
    }
  } finally {
    CHANNELS.remove(response.getId());
  }
}

private void doReceived(Response res) {  
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signal();
    }
  } finally {
    lock.unlock();
  }
  if (callback != null) {
    invokeCallback(callback);
  }
}

最后,当挂起线程被唤醒后,判断response的状态,进行不同的处理,如果响应成功,将RPC调用的结果返回给consumer,关键代码如下:

private Object returnFromResponse() throws RemotingException {  
  Response res = response;
  if (res == null) {
    throw new IllegalStateException("response cannot be null");
  }
  if (res.getStatus() == Response.OK) {
    return res.getResult();
  }
  if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
    throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
  }
  throw new RemotingException(channel, res.getErrorMessage());
}

到这里,dubbo异步转同步的处理过程就总结完了。回过头来看,这个过程其实是java中比较经典的线程通信方式,也没有什么特别深奥的东西。整个rpc调用流程可以用下图表示:

rpc调用流程

另外,在分析的过程中,省去了底层通信以及请求响应Handler的包装过程,以后会慢慢补上。

Shaohang Zhao

Read more posts by this author.