Skip to content

20消息驱动:如何理解Spring中对消息处理机制的抽象过程?

从今天开始,我们将进入到 Spring Cloud 中与消息处理机制相关内容的介绍。Spring Cloud 专门提供了一个 Spring Cloud Stream 框架来实现事件驱动架构,并完成与主流消息中间件的集成。同时,Spring Cloud Stream 背后也整合了 Spring 家族中的消息处理和消息总线方面的几个框架,可以说是 Spring Cloud 中整合程度最高的一个开发框架。

SpringHealth 中的事件驱动架构

在微服务设计和开发过程中经常会存在这样的需求:系统中的某个服务会因为用户操作或内部行为发布一个事件,该服务知道这个事件在将来的某一个时间点会被其他服务所消费,但是并不知道这个服务具体是谁、也不关心什么时候被消费。同样,消费该事件的服务也不一定需要知道该事件是由哪个服务所发布。如下图所示:

事件发送和消费示意图

在上图中,事件生产者和消费者之间的虚线代表的是一种相互松散、没有直接调用的关联关系。满足以上特性的系统代表着一种松耦合的架构,通常被称为事件驱动架构,而这里的事件也可以被理解是服务与服务之间发送的一种消息。事件驱动架构本质上是一种架构设计风格,实现方法和工具有很多。在 Spring Cloud 家族中这个工具就是 Spring Cloud Stream。在接下来的内容中,我们将结合 SpringHealth 案例来分析事件驱动架构的实现需求以及在微服务架构中的应用。

在微服务系统中引入事件驱动架构的主要目的在于提升系统的扩展性。所谓扩展性,举例来说,就是在向现有系统中添加新业务时,不需要改变原有的各个组件,而只需把新业务封闭在一个新的组件中就能完成整体业务的升级,我们认为这样的系统就具有较好的可扩展性。

让我们回到 SpringHealth 系统,在我们的案例中存在健康干预相关的业务场景,常见的健康干预涉及用户、设备和健康干预自身信息维护等功能,而 SpringHealth 分别提取了 user-service、device-service 和 intervention-service 这三个微服务。显然,这三个服务之间需要进行服务之间的调用和协调从而完成业务闭环。如果在不久的将来,SpringHealth 中需要引入其他服务才能形成完整的业务流程,那么这个业务闭环背后的交互模式就需要进行相应的调整。

一般而言,类似 SpringHealth 这样的系统中的用户信息变动并不会太频繁,所以很多时候我们会想到通过缓存来存放用户信息,并在健康干预处理过程中直接从缓存中获取所需的用户信息。在这样的设计和实现方式下,试想一旦某个用户信息发生变化,我们应该如何正确和高效的应对这一场景?

考虑到系统扩展性,显然在 intervention-service 中直接通过访问 user-service 实时获取用户信息的服务交互模式并不是一个好的选择,因为用户信息更新的时机我们无法事先预知,而事件驱动架构为我们提供了一种更好的实现方案。当用户信息变更时,user-service 可以发送一个事件,该事件表明了某个用户信息已经发生了变化,并将传递到所有对该事件感兴趣的微服务,这些微服务会根据自身的业务逻辑来消费这一事件。通过这种方式,某个特定服务就可以获取用户信息变更事件从而正确且高效的更新缓存信息。基于这种设计思想,该场景下交互示意图如下所示:

用户信息更新场景中的事件驱动架构

在上图中,我们看到了有 consumer-service1 和 consumer-service2 这两个消费者服务,事件处理架构的优势就在于当系统中需要添加新的用户信息变更事件处理逻辑来完成整个流程时,我们只需要对该事件添加一个新的 consumer-service2 即可,而不需要对原有的 consumer-service1 中的处理流程做任何修改。这在应对系统扩展性上有很大的优势。

针对上图,在技术上实现上,我们可以使用主流的消息中间件来实现消息的发布与消费,常见的包括 ActiveMQ、RabbitMQ、Kafka 等。这些消息中间件的核心功能就是能够将所收到的消息存储起来并进行转发。有了存储转发机制之后,就可以做到消息发布者和消费者相互独立。关于各个消息中间件的介绍不是本课程的重点,而在 Spring Cloud Stream 中集成了 RabbitMQ 和 Kafka,我们会在下一课时中进行详细展开。在此之前,我们有必要对 Spring 家族中的消息处理机制做一个展开,因为 Spring Cloud Stream 正是构建在 Spring 消息处理机制之上。

Spring 家族中的消息处理机制

在了解了事件驱动架构以及消息中间件的基本概念之后,我们来看一下 Spring 中针对这些概念提供的技术解决方案。在 Spring 家族中,与消息处理机制相关的框架有三个。事实上,本课程要介绍的 Spring Cloud Stream 是基于 Spring Integration 实现了消息发布和消费机制并提供了一层封装,很多关于消息发布和消费的概念和实现方法本质上都是依赖于 Spring Integration。而在 Spring Integration 的背后,则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。这三个框架之间的依赖关系如下图所示:

Spring 家族中三大消息处理相关框架关系图

接下来的内容,我们先来对位于底层的 Spring Messaging 和 Spring Integration 框架做一些展开,方便你在使用 Spring Cloud Stream 时对其背后的实现原理有更好的理解。

Spring Messaging

Spring Messaging 是 Spring 框架中的一个底层模块,用于提供统一的消息编程模型。例如,消息这个数据单元在 Spring Messaging 中统一定义为如下所示的 Message 接口,包括一个消息头 Header 和一个消息体 Payload:

java
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

而消息通道 MessageChannel 的定义也比较简单,我们可以调用 send() 方法将消息发送至该消息通道中,MessageChannel 接口定义如下所示:

java
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;
    default boolean send(Message<?> message) {
           return send(message, INDEFINITE_TIMEOUT);
    }
    boolean send(Message<?> message, long timeout);
}

消息通道的概念比较抽象,可以简单把它理解为是对队列的一种抽象。我们知道在消息传递系统中,队列的作用就是实现存储转发的媒介,消息发布者所生成的消息都将保存在队列中并由消息消费者进行消费。通道的名称对应的就是队列的名称,但是作为一种抽象和封装,各个消息传递系统所特有的队列概念并不会直接暴露在业务代码中,而是通过通道来对队列进行配置。

Spring Messaging 把通道抽象成如下所示的两种基本表现形式,即支持轮询的 PollableChannel 和实现发布-订阅模式的 SubscribableChannel,这两个通道都继承自具有消息发送功能的 MessageChannel:

java
public interface PollableChannel extends MessageChannel { 
	    Message<?> receive(); 
	    Message<?> receive(long timeout);
}
	 
public interface SubscribableChannel extends MessageChannel { 
	    boolean subscribe(MessageHandler handler); 
	    boolean unsubscribe(MessageHandler handler);
}

我们注意到对于 PollableChannel 而言才有 receive 的概念,代表这是通过轮询操作主动获取消息的过程。而 SubscribableChannel 则是通过注册回调函数 MessageHandler 来实现事件响应。MessageHandler 接口定义如下:

java
public interface MessageHandler {
       void handleMessage(Message<?> message) throws MessagingException;
}

Spring Messaging 在基础消息模型之上还提供了很多方便在业务系统中使用消息传递机制的辅助功能,例如各种消息体内容转换器 MessageConverter 以及消息通道拦截器 ChannelInterceptor 等,这里不做展开,你可以参考官方文档做进一步了解。

Spring Integration

Spring Integration 是对 Spring Messaging 的扩展,提供了对系统集成领域的经典著作《企业集成模式:设计构建及部署消息传递解决方案》中所描述的各种企业集成模式的支持,通常被认为是一种企业服务总线 ESB 框架。

在 Spring Messaging 的基础上,Spring Integration 还实现了其他几种有用的通道,包括支持阻塞式队列的 RendezvousChannel,该通道与带缓存的 QueueChannel 都属于点对点通道,但只有在前一个消息被消费之后才能发送下一个消息。PriorityChannel 即优先级队列,而 DirectChannel 是 Spring Integration 的默认通道,该通道的消息发送和接收过程处于同一线程中。另外还有 ExecutorChannel,使用基于多线程的 TaskExecutor 来异步消费通道中的消息。

Spring Integration 的设计目的是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。当各个异构系统之间进行集成时,如何屏蔽各种技术体系所带来的差异性,Spring Integration 为我们提供了解决方案。通过通道之间的消息传递,在消息的入口和出口我们可以使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。Spring Integration 提供的常见集成端点包括 File、FTP、TCP/UDP、HTTP、JDBC、JMS、AMQP、JPA、Mail、MongoDB、Redis、RMI、Web Services 等。

Spring Integration 的功能非常强大,本课程无意对所有这些功能做过多阐述。在下一课时介绍 Spring Cloud Stream 的基本架构时我们会对 Spring Integration 做更详细的介绍。

小结与预告

本课时引入了消息传递机制来应对系统开发中所需要实现的事件驱动架构,而在 Spring Cloud 中也存在强大的 Spring Cloud Stream 框架完成对主流消息中间件的平台化集成。注意到该框架同时也是对 Spring Messaging 和 Spring Intergration 这两个 Spring 家族中消息处理框架的封装,这些都是我们理解并正确使用 Spring Cloud Stream 的前提。

这里给你留一道思考题:在 Spring 家族中存在哪些框架可以用来实现消息处理,而 Spring Cloud Stream 与这些框架又是什么样的关系?

在引入 Spring Cloud Stream 框架之后,下一课时我们将关注于该框架的基本架构。在架构设计上,Spring Cloud Stream 中所包含的理念和实现技巧同样值得我们学习和应用。