Skip to content

39加餐:多个返回值不用怕,Merger合并器来帮忙

你好,我是杨四正,今天我和你分享的主题是 Merger 合并器。

在上一课时中,我们分析 MergeableClusterInvoker 的具体实现时讲解过这样的内容:MergeableClusterInvoker 中会读取 URL 中的 merger 参数值,如果 merger 参数以 "." 开头,则表示 "." 后的内容是一个方法名,这个方法名是远程目标方法的返回类型中的一个方法,MergeableClusterInvoker 在拿到所有 Invoker 返回的结果对象之后,会遍历每个返回结果,并调用 merger 参数指定的方法,合并这些结果值。

其实,除了上述指定 Merger 方法名称的合并方式之外,Dubbo 内部还提供了很多默认的 Merger 实现,这也就是本课时将要分析的内容。本课时将详细介绍 MergerFactory 工厂类、Merger 接口以及针对 Java 中常见数据类型的 Merger 实现。

MergerFactory

在 MergeableClusterInvoker 使用默认 Merger 实现的时候,会通过 MergerFactory 以及服务接口返回值类型(returnType),选择合适的 Merger 实现

在 MergerFactory 中维护了一个 ConcurrentHashMap 集合(即 MERGER_CACHE 字段),用来缓存服务接口返回值类型与 Merger 实例之间的映射关系。

MergerFactory.getMerger() 方法会根据传入的 returnType 类型,从 MERGER_CACHE 缓存中查找相应的 Merger 实现,下面我们来看该方法的具体实现:

java
public static <T> Merger<T> getMerger(Class<T> returnType) {
    if (returnType == null) { // returnType为空,直接抛出异常
        throw new IllegalArgumentException("returnType is null");
    }
    Merger result;
    if (returnType.isArray()) { // returnType为数组类型
        // 获取数组中元素的类型
        Class type = returnType.getComponentType();
        // 获取元素类型对应的Merger实现
        result = MERGER_CACHE.get(type);
        if (result == null) {
            loadMergers();
            result = MERGER_CACHE.get(type);
        }
        // 如果Dubbo没有提供元素类型对应的Merger实现,则返回ArrayMerger
        if (result == null && !type.isPrimitive()) {
            result = ArrayMerger.INSTANCE;
        }
    } else {
        // 如果returnType不是数组类型,则直接从MERGER_CACHE缓存查找对应的Merger实例
        result = MERGER_CACHE.get(returnType);
        if (result == null) {
            loadMergers();
            result = MERGER_CACHE.get(returnType);
        }
    }
    return result;
}

loadMergers() 方法会通过 Dubbo SPI 方式加载 Merger 接口全部扩展实现的名称,并填充到 MERGER_CACHE 集合中,具体实现如下:

java
static void loadMergers() {
    // 获取Merger接口的所有扩展名称
    Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
            .getSupportedExtensions();
    for (String name : names) { // 遍历所有Merger扩展实现
        Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
        // 将Merger实例与对应returnType的映射关系记录到MERGER_CACHE集合中
        MERGER_CACHE.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
    }
}

ArrayMerger

在 Dubbo 中提供了处理不同类型返回值的 Merger 实现,其中不仅有处理 boolean[]、byte[]、char[]、double[]、float[]、int[]、long[]、short[] 等基础类型数组 的 Merger 实现,还有处理 List、Set、Map 等集合类的 Merger 实现,具体继承关系如下图所示:

Merger 继承关系图

我们首先来看 ArrayMerger 实现:当服务接口的返回值为数组的时候,会使用 ArrayMerger 将多个数组合并成一个数组,也就是将二维数组拍平成一维数组。ArrayMerger.merge() 方法的具体实现如下:

java
public Object[] merge(Object[]... items) {
    if (ArrayUtils.isEmpty(items)) {
        // 传入的结果集合为空,则直接返回空数组
        return new Object[0];
    }
    int i = 0;
    // 查找第一个不为null的结果
    while (i < items.length && items[i] == null) {
        i++;
    }
    // 所有items数组中全部结果都为null,则直接返回空数组
    if (i == items.length) {
        return new Object[0];
    }
    Class<?> type = items[i].getClass().getComponentType();
    int totalLen = 0;
    for (; i < items.length; i++) {
        if (items[i] == null) { // 忽略为null的结果
            continue;
        }
        Class<?> itemType = items[i].getClass().getComponentType();
        if (itemType != type) { // 保证类型相同
            throw new IllegalArgumentException("Arguments' types are different");
        }
        totalLen += items[i].length;
    }
    if (totalLen == 0) { // 确定最终数组的长度
        return new Object[0];
    }
    Object result = Array.newInstance(type, totalLen);
    int index = 0;
    // 遍历全部的结果数组,将items二维数组中的每个元素都加到result中,形成一维数组
    for (Object[] array : items) {
        if (array != null) {
            for (int j = 0; j < array.length; j++) {
                Array.set(result, index++, array[j]);
            }
        }
    }
    return (Object[]) result;
}

其他基础数据类型数组的 Merger 实现,与 ArrayMerger 的实现非常类似,都是将相应类型的二维数组拍平成同类型的一维数组,这里以 IntArrayMerger 为例进行分析:

java
public int[] merge(int[]... items) {
    if (ArrayUtils.isEmpty(items)) {
        // 检测传入的多个int[]不能为空
        return new int[0];
    }
    // 直接使用Stream的API将多个int[]数组拍平成一个int[]数组
    return Arrays.stream(items).filter(Objects::nonNull)
            .flatMapToInt(Arrays::stream)
            .toArray();
}

剩余的其他基础类型的 Merger 实现类,例如,FloatArrayMerger、IntArrayMerger、LongArrayMerger、BooleanArrayMerger、ByteArrayMerger、CharArrayMerger、DoubleArrayMerger 等,这里就不再赘述,你若感兴趣的话可以参考源码进行学习。

MapMerger

SetMerger、ListMerger 和 MapMerger 是针对 Set 、List 和 Map 返回值的 Merger 实现,它们会将多个 Set(或 List、Map)集合合并成一个 Set(或 List、Map)集合,核心原理与 ArrayMerger 的实现类似。这里我们先来看 MapMerger 的核心实现:

java
public Map<?, ?> merge(Map<?, ?>... items) {
    if (ArrayUtils.isEmpty(items)) {
        // 空结果集时,这就返回空Map
        return Collections.emptyMap();
    }
    // 将items中所有Map集合中的KV,添加到result这一个Map集合中
    Map<Object, Object> result = new HashMap<Object, Object>();
    Stream.of(items).filter(Objects::nonNull).forEach(result::putAll);
    return result;
}

接下来再看 SetMerger 和 ListMerger 的核心实现:

java
public Set<Object> merge(Set<?>... items) {
    if (ArrayUtils.isEmpty(items)) {
        // 空结果集时,这就返回空Set集合
        return Collections.emptySet();
    }
    // 创建一个新的HashSet集合,传入的所有Set集合都添加到result中
    Set<Object> result = new HashSet<Object>();
    Stream.of(items).filter(Objects::nonNull).forEach(result::addAll);
    return result;
}
public List<Object> merge(List<?>... items) {
    if (ArrayUtils.isEmpty(items)) {
        // 空结果集时,这就返回空Set集合
        return Collections.emptyList();
    }
    // 通过Stream API将传入的所有List集合拍平成一个List集合并返回
    return Stream.of(items).filter(Objects::nonNull)
            .flatMap(Collection::stream)
            .collect(Collectors.toList());
}

自定义 Merger 扩展实现

介绍完 Dubbo 自带的 Merger 实现之后,下面我们还可以尝试动手写一个自己的 Merger 实现,这里我们以 dubbo-demo-xml 中的 Provider 和 Consumer 为例进行修改。

首先我们在 dubbo-demo-xml-provider 示例模块中发布两个服务,分别属于 groupA 和 groupB,相应的 dubbo-provider.xml 配置如下:

js
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application metadata-type="remote" name="demo-provider"/>
    <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <dubbo:protocol name="dubbo"/>
    <!-- 配置两个Spring Bean -->
    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    <bean id="demoServiceB" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    <!-- 将demoService和demoServiceB两个Spring Bean作为服务发布出去,分别属于groupA和groupB-->
    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" group="groupA"/>
    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoServiceB" group="groupB"/>
</beans>

接下来,在 dubbo-demo-xml-consumer 示例模块中进行服务引用,dubbo-consumer.xml 配置文件的具体内容如下:

js
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="demo-consumer"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <!-- 引用DemoService,这里指定了group为*,即可以引用任何group的Provider,同时merger设置为true,即需要对结果进行合并-->
    <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" group="*" merger="true"/>
</beans>

然后,在 dubbo-demo-xml-consumer 示例模块的 /resources/META-INF/dubbo 目录下,添加一个名为 org.apache.dubbo.rpc.cluster.Merger 的 Dubbo SPI 配置文件,其内容如下:

java
String=org.apache.dubbo.demo.consumer.StringMerger

StringMerger 实现了前面介绍的 Merger 接口,它会将多个 Provider 节点返回的 String 结果值拼接起来,具体实现如下:

java
public class StringMerger implements Merger<String> {
    @Override
    public String merge(String... items) {
        if (ArrayUtils.isEmpty(items)) { // 检测空返回值
            return "";
        }
        String result = "";
        for (String item : items) { // 通过竖线将多个Provider的返回值拼接起来
            result += item + "|";
        }
        return result;
    }
}

最后,我们依次启动 Zookeeper、dubbo-demo-xml-provider 示例模块和 dubbo-demo-xml-consumer 示例模块。在控制台中我们会看到如下输出:

java
result: Hello world, response from provider: 172.17.108.179:20880|Hello world, response from provider: 172.17.108.179:20880|

总结

本课时我们重点介绍了 MergeableCluster 中涉及的 Merger 合并器相关的知识点。

  • 首先,我们介绍了 MergerFactory 工厂类的核心功能,它可以配合远程方法调用的返回值,选择对应的 Merger 实现,完成结果的合并。

  • 然后,我们深入分析了 Dubbo 自带的 Merger 实现类,涉及 Java 中各个基础类型数组的 Merger 合并器实现,例如,IntArrayMerger、LongArrayMerger 等,它们都是将多个特定类型的一维数组拍平成相同类型的一维数组。

  • 除了这些基础类型数组的 Merger 实现,Dubbo 还提供了 List、Set、Map 等集合类的 Merger 实现,它们的核心是将多个集合中的元素整理到一个同类型的集合中。

  • 最后,我们还以 StringMerger 为例,介绍了如何自定义 Merger 合并器。

下一课时,我们将介绍 Dubbo 中 Mock 机制相关的内容,记得按时来听课。