Skip to content

24消息集成:如何剖析SpringCloudStream集成消息中间件的实现原理?

Spring Cloud Stream 中的内容比较多,今天我们重点关注的是如何实现 Spring Cloud Stream 与其他消息中间件的整合过程,因此只介绍消息发送和接收的主流程。我们将分别从Spring Cloud Stream 以及消息中间件的角度出发,分析如何基于这一主流程,完成两者之间的无缝集成。

Spring Cloud Stream 中的 Binder

通过前面几个课时的介绍,我们明确了 Binder 组件是 Spring Cloud Stream 与各种消息中间件进行集成的核心组件,而 Binder 组件的实现过程涉及一批核心类之间的相互协作。接下来,我们就对 Binder 相关的核心类做源码级的展开。

BindableProxyFactory

我们知道在发送和接收消息时,需要使用 @EnableBinding 注解,该注解的作用就是告诉 Spring Cloud Stream 将该应用程序绑定到消息中间件,从而实现两者之间的连接。我们来到 org.springframework.cloud.stream.binding 包下的 BindableProxyFactory 类。根据该类上的注释,BindableProxyFactory 是用于初始化由 @EnableBinding 注解所提供接口的工厂类,该类的定义如下所示:

java
public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean

注意到 BindableProxyFactory 同时实现了 MethodInterceptor 接口和 Bindable 接口。其中前者是 AOP 中的方法拦截器,而后者是一个标明能够绑定 Input 和 Output 的接口。我们先来看 MethodInterceptor 中用于拦截的 invoke 方法,如下所示:

java
@Override
public synchronized Object invoke(MethodInvocation invocation) throws Throwable {
        Method method = invocation.getMethod();
 
        Object boundTarget = targetCache.get(method);
        if (boundTarget != null) {
            return boundTarget;
        }
 
        Input input = AnnotationUtils.findAnnotation(method, Input.class);
        if (input != null) {
            String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method);
            boundTarget = this.inputHolders.get(name).getBoundTarget();
            targetCache.put(method, boundTarget);
            return boundTarget;
        }
        else {
            Output output = AnnotationUtils.findAnnotation(method, Output.class);
            if (output != null) {
                String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(output, method);
                boundTarget = this.outputHolders.get(name).getBoundTarget();
                targetCache.put(method, boundTarget);
                return boundTarget;
            }
        }
        return null;
}

这里的逻辑比较简单,可以看到 BindableProxyFactory 保存了一个缓存对象 targetCache。如果所调用方法已经存在于缓存中,则直接返回目标对象。反之,会根据 @Input 和 @Output 注解从 inputHolders 和 outputHolders 中获取对应的目标对象并放入缓存中。这里使用缓存的作用仅仅是为了加快每次方法调用的速度,而系统在初始化时通过重写 afterPropertiesSet 方法,已经将所有的目标对象都放置在 inputHolders 和 outputHolders 这两个集合中。至于这里提到的这个目标对象,暂时可以把它理解为就是一种 MessageChannel 对象,后面会对其进行展开。

然后我们来看 Bindable 接口的定义,如下所示:

java
public interface Bindable {
  default Collection<Binding<Object>> createAndBindInputs(BindingService adapter) {
      return Collections.<Binding<Object>>emptyList();
  }
  default void bindOutputs(BindingService adapter) {}
  default void unbindInputs(BindingService adapter) {}
  default void unbindOutputs(BindingService adapter) {}
  default Set<String> getInputs() {
      return Collections.emptySet();
  }

  default Set<String> getOutputs() {
      return Collections.emptySet();
  }
}

显然,这个接口提供了对 Input 和 Output 的绑定和解绑操作。在 BindableProxyFactory 中,对以上几个方法的实现过程基本都类似,我们随机挑选一个 bindOutputs 方法进行展开,如下所示:

java
@Override
public void bindOutputs(BindingService bindingService) {
        for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.outputHolders.entrySet()) {
            BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue();
            String outputTargetName = boundTargetHolderEntry.getKey();
            if (boundTargetHolderEntry.getValue().isBindable()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Binding %s:%s:%s", this.namespace, this.type, outputTargetName));
                }
                bindingService.bindProducer(boundTargetHolder.getBoundTarget(), outputTargetName);
            }
        }
}

这里需要引入另一个重要的工具类 BindingService,该类提供了对 Input 和 Output 目标对象进行绑定的能力。但事实上,通过类上的注释可以看到,这也是一个外观类,它将底层的绑定动作委托给了 Binder。我们以绑定生产者的 bindProducer 方法为例展开讨论,该方法如下所示:

java
public <T> Binding<T> bindProducer(T output, String outputName) {
        String bindingTarget = this.bindingServiceProperties
                .getBindingDestination(outputName);
        Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(
                outputName, output.getClass());
        ProducerProperties producerProperties = this.bindingServiceProperties
                .getProducerProperties(outputName);
        if (binder instanceof ExtendedPropertiesBinder) {
            Object extension = ((ExtendedPropertiesBinder) binder)
                    .getExtendedProducerProperties(outputName);
            ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
                    extension);
            BeanUtils.copyProperties(producerProperties, extendedProducerProperties);
            producerProperties = extendedProducerProperties;
        }
        validate(producerProperties);
        Binding<T> binding = doBindProducer(output, bindingTarget, binder, producerProperties);
        this.producerBindings.put(outputName, binding);
        return binding;
}

显然,这里的 doBindProducer 方法完成了真正的绑定操作,如下所示:

java
public <T> Binding<T> doBindProducer(T output, String bindingTarget, Binder<T, ?, ProducerProperties> binder,
            ProducerProperties producerProperties) {
        if (this.taskScheduler == null || this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
            return binder.bindProducer(bindingTarget, output, producerProperties);
        }
        else {
            try {
                return binder.bindProducer(bindingTarget, output, producerProperties);
            }
            catch (RuntimeException e) {
                LateBinding<T> late = new LateBinding<T>();
                rescheduleProducerBinding(output, bindingTarget, binder, producerProperties, late, e);
                return late;
            }
        }
}

从这个方法中,我们终于看到了 Spring Cloud Stream 中最核心的概念 Binder,通过 Binder 的 bindProducer 方法完成了目标对象的绑定。

Binder

Binder 是一个接口,分别提供了绑定生产者和消费者的方法,如下所示:

java
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
 
    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

在介绍 Binder 接口的具体实现类之前,我们先来看一下如何获取一个 Binder,getBinder 方法如下所示。

java
protected <T> Binder<T, ?, ?> getBinder(String channelName, Class<T> bindableType) {
        String binderConfigurationName = this.bindingServiceProperties.getBinder(channelName);
        return binderFactory.getBinder(binderConfigurationName, bindableType);

显然,这里用到了个工厂模式。工厂类 BinderFactory 的定义如下所示:

java
public interface BinderFactory {
 
    <T> Binder<T, ? extends ConsumerProperties, ? extends ProducerProperties> getBinder(String configurationName,
            Class<? extends T> bindableType);
}

BinderFactory 只有一个方法,根据给定的配置名称 configurationName 和绑定类型 bindableType 获取 Binder 实例。而 BinderFactory 的实现类也只有一个,即 DefaultBinderFactory。在该实现类的 getBinder 方法中对配置信息进行了校验,并通过 getBinderInstance 获取真正的 Binder 实例。在 getBinderInstance 方法中,我们通过一系列基于 Spring 容器的步骤构建了一个上下文对象 ConfigurableApplicationContext,并通过该上下文对象获取实现了 Binder 接口的 Java bean,核心代码就是下面这句:

java
Binder<T, ?, ?> binder = binderProducingContext.getBean(Binder.class);

当然,对于 BinderFactory 而言,缓存也是需要的。在 DefaultBinderFactory 中存在一个 binderInstanceCache 变量,使用了一个 Map 来保存配置名称所对应的 Binder 对象。

AbstractMessageChannelBinder

既然我们已经能够获取 Binder 实例,接下去就来讨论 Binder 实例中对 bindConsumer 和 bindProducer 方法的实现过程。在 Spring Cloud Stream 中,Binder 接口的类层关系如下所示,注意到这里还展示了 spring-cloud-stream-binder-rabbit 代码工程中的 RabbitMessageChannelBinder 类,这个类在本课时讲到 Spring Cloud Stream 与 RabbitMQ 进行集成时会具体展开:

Binder 接口类层结构图

Spring Cloud Stream 首先提供了一个 AbstractBinder,这是一个抽象类,提供的 bindConsumer 和 bindProducer 方法实现如下所示:

java
@Override
public final Binding<T> bindConsumer(String name, String group, T target, C properties) {
        if (StringUtils.isEmpty(group)) {
            Assert.isTrue(!properties.isPartitioned(), "A consumer group is required for a partitioned subscription");
        }
        return doBindConsumer(name, group, target, properties);
}
 
protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);
 
@Override
public final Binding<T> bindProducer(String name, T outboundBindTarget, P properties) {
        return doBindProducer(name, outboundBindTarget, properties);
}
 
protected abstract Binding<T> doBindProducer(String name, T outboundBindTarget, P properties);

可以看到,它对 Binder 接口中相关方法只是提供了空实现,并把具体实现过程通过 doBindConsumer 和 doBindProducer 抽象方法交由子类进行完成。显然,从设计模式上讲,AbstractBinder 应用了很典型的模板方法模式。

AbstractBinder 的子类是 AbstractMessageChannelBinder,它同样也是一个抽象类。我们来看它的 doBindProducer 方法,并对该方法中的核心语句进行提取和整理:

java
@Override
public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel,
        final P producerProperties) throws BinderException {
	 
	...
final MessageHandler producerMessageHandler;
        final ProducerDestination producerDestination;
        try {
            producerDestination = this.provisioningProvider.provisionProducerDestination(destination,
                    producerProperties);
            SubscribableChannel errorChannel = producerProperties.isErrorChannelEnabled()
                    ? registerErrorInfrastructure(producerDestination) : null;
            producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties,
                    errorChannel);

	...
postProcessOutputChannel(outputChannel, producerProperties);
        ((SubscribableChannel) outputChannel).subscribe(
                new SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders
                        .equals(producerProperties.getHeaderMode()), this.headersToEmbed,
	                     producerProperties.isUseNativeEncoding()));
 
Binding<MessageChannel> binding = new DefaultBinding<MessageChannel>(destination, null, outputChannel,
                producerMessageHandler instanceof Lifecycle ? (Lifecycle) producerMessageHandler : null) {
	...
};
 
        doPublishEvent(new BindingCreatedEvent(binding));
        return binding;
}

上述代码的核心逻辑在于,Source 里的 output 发送消息到 outputChannel 通道之后会被 SendingHandler 这个 MessageHandler 进行处理。从设计模式上讲,SendingHandler 是一个静态代理类,因此它又将这个处理过程委托给了由 createProducerMessageHandler 方法所创建的 producerMessageHandler,这点从 SendingHandler 的定义中可以得到验证,如下所示的 delegate 就是传入的 producerMessageHandler:

java
private final class SendingHandler extends AbstractMessageHandler implements Lifecycle {
 
private final MessageHandler delegate;
 
        @Override
        protected void handleMessageInternal(Message<?> message) throws Exception {
            Message<?> messageToSend = (this.useNativeEncoding) ? message
                    : serializeAndEmbedHeadersIfApplicable(message);
            this.delegate.handleMessage(messageToSend);
        }
 
        // 省略其他方法
}

请注意,同样作为一个模板方法类,AbstractMessageChannelBinder 具有三个抽象方法,即 createProducerMessageHandler、postProcessOutputChannel 和 afterUnbindProducer,这三个方法都需要由它的子类进行实现。也就是说,SendingHandler 所使用的 producerMessageHandler 需要由 AbstractMessageChannelBinder 子类负责进行创建。

需要注意的是,作为统一的数据模型,SendingHandler 以及 producerMessageHandler 中使用的都是 Spring Messaging 组件中的 Message 消息对象,而 createProducerMessageHandler 内部会把这个 Message 消息对象转换成对应中间件的消息数据格式并进行发送。

下面转到消息消费的场景,我们来看 AbstractMessageChannelBinder 的 doBindConsumer 方法。该方法的核心语句是创建一个消费者端点 ConsumerEndpoint,如下所示:

java
MessageProducer consumerEndpoint = createConsumerEndpoint(destination, group, properties);
consumerEndpoint.setOutputChannel(inputChannel);

这两行代码有两个注意点。首先,createConsumerEndpoint 是一个抽象方法,需要 AbstractMessageChannelBinder 的子类进行实现。与 createProducerMessageHandler 一样,createConsumerEndpoint 需要把中间件对应的消息数据结构转换成 Spring Messaging 中统一的 Message 消息对象。

然后,我们注意到这里的 consumerEndpoint 类型是 MessageProducer。MessageProducer 在 Spring Integration 中代表的是消息的生产者,它会把从第三方消息中间件中收到的消息转发到 inputChannel 所指定的通道中。基于 @StreamListener 注解,在 Spring Cloud Stream 中存在一个 StreamListenerMessageHandler 类,用于订阅 inputChannel 消息通道中传入的消息并进行消费。

作为总结,我们可以用如下所示的流程图来概括整个消息发送和消费流程:

消息发送和消费整体流程图

Spring Cloud Stream 集成 RabbitMQ

到目前为止,Spring Cloud Stream 提供了对 RabbitMQ 和 Kafka 这两款主流消息中间件的集成。今天,我们选择使用 RabbitMQ 作为示例,讲解如何通过 Spring Cloud Stream 所提供的 Binder 完成与具体消息中间件的整合。

Spring Cloud Stream 团队提供了 spring-cloud-stream-binder-rabbit 作为与 RabbitMQ 集成的代码工程。这个工程只有四个类,我们需要重点关注的就是实现了 AbstractMessageChannelBinder 中几个抽象方法的 RabbitMessageChannelBinder 类。

集成消息发送

首先找到 RabbitMessageChannelBinder中 的 createProducerMessageHandler 方法,我们知道该方法用于完成消息的发送。我们在 createProducerMessageHandler 中找到了以下核心代码:

java
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(         buildRabbitTemplate(producerProperties.getExtension(), errorChannel != null));
endpoint.setExchangeName(producerDestination.getName());

首先,在 buildRabbitTemplate 方法中,我们看到了 RabbitTemplate 的构建过程。RabbitTemplate 是 Spring Amqp 组件中提供的专门用于封装与 RabbitMQ 底层交互 API 的模板类。在构建 RabbitTemplate 的整个过程中,涉及设置与 RabbitMQ 相关的 ConnectionFactory 等众多参数。

然后,我们发现 RabbitMessageChannelBinder 也是直接集成了 Spring Integration 中用于整合 AQMP 协议的 AmqpOutboundEndpoint。AmqpOutboundEndpoint 提供了如下所示的 send 方法进行消息的发送:

java
private void send(String exchangeName, String routingKey,
            final Message<?> requestMessage, CorrelationData correlationData) {
        if (this.amqpTemplate instanceof RabbitTemplate) {
            MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
            org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
                    getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
            addDelayProperty(requestMessage, amqpMessage);
            ((RabbitTemplate) this.amqpTemplate).send(exchangeName, routingKey, amqpMessage, correlationData);
        }
        else {
            this.amqpTemplate.convertAndSend(exchangeName, routingKey, requestMessage.getPayload(),
                    message -> {
                        getHeaderMapper().fromHeadersToRequest(requestMessage.getHeaders(),
                                message.getMessageProperties());
                        return message;
                    });
        }
}

可以看到这里依赖于 Spring Amqp 提供的 AmqpTemplate 接口进行消息的发送,而 RabbitTemplate 是 AmqpTemplate 的一个实现类。同时,通过 Spring Integration 组件中提供的 MessageConverter 工具类完成了从 org.springframework.messaging.Message 到 org.springframework.amqp.core.Message 这两个消息数据结构之间的转换。

集成消息消费

RabbitMessageChannelBinder 中与消息消费相关的是 createConsumerEndpoint 方法。这个方法中大量使用了 Spring Amqp 和 Spring Integration 中的工具类。该方法最终返回的是一个 AmqpInboundChannelAdapter 对象。在 Spring Integration 中,AmqpInboundChannelAdapter 是一种 InboundChannelAdapter,代表面向输入的通道适配器,提供了消息监听功能,如下所示

java
protected class Listener implements ChannelAwareMessageListener, RetryListener {
        @Override
        public void onMessage(final Message message, final Channel channel) throws Exception {
 
        //省略相关实现
	 }
}

在这个 onMessage 方法中,调用了 createAndSend 方法完成消息的创建和发送,如下所示:

java
private void createAndSend(Message message, Channel channel) {
            org.springframework.messaging.Message<Object> messagingMessage = createMessage(message, channel);
            setAttributesIfNecessary(message, messagingMessage);
            sendMessage(messagingMessage);
}
	 
	 
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
            Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
            Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
                    .toHeadersFromRequest(message.getMessageProperties());
            if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
                    == AcknowledgeMode.MANUAL) {
                headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
                headers.put(AmqpHeaders.CHANNEL, channel);
            }
            if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
                headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
            }
            final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
                    .withPayload(payload)
                    .copyHeaders(headers)
                    .build();
            return messagingMessage;
}

显然,在上述 createMessage 方法中,我们完成了消息数据格式从 org.springframework.amqp.core.Message 到 org.springframework.messaging.Message 的反向转换。

小结与预告

Binder 是 Spring Cloud Stream 的核心组件,通过这个组件,Spring Cloud Stream 完成了与第三方消息中间件的集成。在本课时中,我们花了较大篇幅系统分析了 Binder 组件相关的核心类。然后,基于这些核心类以及 RabbitMQ,我们给出了 Spring Cloud Stream 集成RabbitMQ 的实现原理。

这里给你留一道思考题:在 Spring Cloud Stream 中,Binder 组件对于消息发送和消费做了哪些抽象?

介绍完 Spring Cloud Stream 之后,我们又将启动一个新的话题,即安全性。在微服务架构中,安全性的重要性往往被忽略,值得我们系统的进行分析和实现。下一课时,我们首先关注如何理解微服务访问的安全需求和实现方案。