Skip to content

16消息驱动:如何使用RabbitTemplate集成RabbitMQ?

15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。

今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。

AMQP 规范与 RabbitMQ

AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列规范。和 JMS 规范一样,AMQP 描述了一套模块化的组件及组件之间进行连接的标准规则,用于明确客户端与服务器交互的语义。而业界也存在一批实现 AMQP 规范的框架,其中极具代表性的是 RabbitMQ。

AMQP 规范

在 AMQP 规范中存在三个核心组件,分别是交换器(Exchange)、消息队列(Queue)和绑定(Binding)。其中交换器用于接收应用程序发送的消息,并根据一定的规则将这些消息路由发送到消息队列中;消息队列用于存储消息,直到这些消息被消费者安全处理完毕;而绑定定义了交换器和消息队列之间的关联,为它们提供了路由规则。

在 AMQP 规范中并没有明确指明类似 JMS 中一对一的点对点模型和一对多的发布-订阅模型,不过通过控制 Exchange 与 Queue 之间的路由规则,我们可以很容易地模拟 Topic 这种典型消息中间件的概念。

如果存在多个 Queue,Exchange 如何知道把消息发送到哪个 Queue 中呢?

通过 Binding 规则设置路由信息即可。在与多个 Queue 关联之后,Exchange 中会存在一个路由表,这个表中维护着每个 Queue 存储消息的限制条件。

消息中包含一个路由键(Routing Key),它由消息发送者产生,并提供给 Exchange 路由这条消息的标准。而 Exchange 会检查 Routing Key,并结合路由算法决定将消息路由发送到哪个 Queue 中。

通过下面 Exchange 与 Queue 之间的路由关系图,我们可以看到一条来自生产者的消息通过 Exchange 中的路由算法可以发送给一个或多个 Queue,从而实现点对点和发布订阅功能。

AMQP 路由关系图

上图中,不同的路由算法存在不同的 Exchange 类型,而 AMQP 规范中指定了直接式交换器(Direct Exchange)、广播式交换器(Fanout Exchange)、主题式交换器(Topic Exchange)和消息头式交换器(Header Exchange)这几种 Exchange 类型,不过这一讲我们将重点介绍直接式交换器。

通过精确匹配消息的 Routing Key,直接式交换器可以将消息路由发送到零个或多个队列中,如下图所示:

Direct Exchange 示意图

RabbitMQ 基本架构

RabbitMQ 使用 Erlang 语言开发的 AMQP 规范标准实现框架,而 ConnectionFactory、Connection、Channel 是 RabbitMQ 对外提供的 API 中最基本的对象,都需要遵循 AMQP 规范的建议。其中,Channel 是应用程序与 RabbitMQ 交互过程中最重要的一个接口,因为我们大部分的业务操作需要通过 Channel 接口完成,如定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。

如果想启动 RabbitMQ,我们只需要运行 rabbitmq-server.sh 文件即可。不过,因为 RabbitMQ 依赖于 Erlang,所以首先我们需要确保安装上 Erlang 环境。

接下来,我们一起看下如何使用 Spring 框架所提供的 RabbitTemplate 模板工具类集成 RabbitMQ。

使用 RabbitTemplate 集成 RabbitMQ

如果想使用 RabbitTemplate 集成 RabbitMQ,首先我们需要在 Spring Boot 应用程序中添加对 spring-boot-starter-amqp 的依赖,如下代码所示:

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

使用 RabbitTemplate 发送消息

和其他模板类一样,RabbitTemplate 也提供了一批 send 方法用来发送消息,如下代码所示:

java
@Override
public void send(Message message) throws AmqpException {
    send(this.exchange, this.routingKey, message);
}
 
@Override
public void send(String routingKey, Message message) throws AmqpException {
    send(this.exchange, routingKey, message);
}
 
@Override
public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {
    send(exchange, routingKey, message, null);
}

在这里可以看到,我们指定了消息发送的 Exchange 及用于消息路由的路由键 RoutingKey。因为这些 send 方法发送的是原生消息对象,所以在与业务代码进行集成时,我们需要将业务对象转换为 Message 对象,示例代码如下所示:

java
public void sendDemoObject(DemoObject demoObject) { 
    MessageConverter converter =               rabbitTemplate.getMessageConverter(); 
    MessageProperties props = new MessageProperties(); 
    Message message = converter.toMessage(demoObject, props); 
    rabbitTemplate.send("demo.queue", message); 
}

如果我们不想在业务代码中嵌入 Message 等原生消息对象,还可以使用 RabbitTemplate 的 convertAndSend 方法组进行实现,如下代码所示:

java
@Override
public void convertAndSend(Object object) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null);
}
 
@Override
public void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, object, correlationData);
}
 
@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {
        convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}
 
@Override
public void convertAndSend(String routingKey, final Object object, CorrelationData correlationData)
            throws AmqpException {
        convertAndSend(this.exchange, routingKey, object, correlationData);
}
 
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

上述 convertAndSend 方法组在内部就完成了业务对象向原生消息对象的自动转换过程,因此,我们可以使用如下所示的代码来简化消息发送过程。

java
public void sendDemoObject(DemoObject demoObject) { 
	rabbitTemplate.convertAndSend("demo.queue", demoObject); 
}

当然,有时候我们需要在消息发送的过程中为消息添加一些属性,这就不可避免需要操作原生 Message 对象,而 RabbitTemplate 也提供了一组 convertAndSend 重载方法应对这种场景,如下代码所示:

java
@Override
public void convertAndSend(String exchange, String routingKey, final Object message,
            final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        Message messageToSend = convertMessageIfNecessary(message);
        messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData);
        send(exchange, routingKey, messageToSend, correlationData);
}

注意这里,我们使用了一个 MessagePostProcessor 类对所生成的消息进行后处理,MessagePostProcessor 的使用方式如下代码所示:

java
rabbitTemplate.convertAndSend("demo.queue", event, new MessagePostProcessor() {
     @Override
     public Message postProcessMessage(Message message) throws AmqpException {
          //针对 Message 的处理
          return message;
     }
});

使用 RabbitTemplate 的最后一步是在配置文件中添加配置项,在配置时我们需要指定 RabbitMQ 服务器的地址、端口、用户名和密码等信息,如下代码所示:

xml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: DemoHost

注意,出于对多租户和安全因素的考虑,AMQP 还提出了虚拟主机(Virtual Host)概念,因此这里出现了一个 virtual-host 配置项。

Virtual Host 类似于权限控制组,内部可以包含若干个 Exchange 和 Queue。多个不同用户使用同一个 RabbitMQ 服务器提供的服务时,我们可以将其划分为多个 Virtual Host,并在自己的 Virtual Host 中创建相应组件,如下图所示:

添加了 Virtual Host 的 AMQP 模型

使用 RabbitTemplate 消费消息

和 JmsTemplate 一样,使用 RabbitTemplate 消费消息时,我们也可以使用推模式和拉模式。

在拉模式下,使用 RabbitTemplate 的典型示例如下代码所示:

java
public DemoEvent receiveEvent() {
        return (DemoEvent) rabbitTemplate.receiveAndConvert("demo.queue");
}

这里,我们使用了 RabbitTemplate 中的 receiveAndConvert 方法,该方法可以从一个指定的 Queue 中拉取消息,如下代码所示:

java
@Override
public Object receiveAndConvert(String queueName) throws AmqpException {
        return receiveAndConvert(queueName, this.receiveTimeout);
}

这里请注意,内部的 receiveAndConvert 方法中出现了第二个参数 receiveTimeout,这个参数的默认值是 0,意味着即使调用 receiveAndConvert 时队列中没有消息,该方法也会立即返回一个空对象,而不会等待下一个消息的到来,这点与 15 讲介绍的 JmsTemplate 存在本质性的区别。

如果我们想实现与 JmsTemplate 一样的阻塞等待,设置好 receiveTimeout 参数即可,如下代码所示:

java
public DemoEvent receiveEvent() { 
	return (DemoEvent)rabbitTemplate.receiveAndConvert("demo.queue", 2000ms); 
}

如果不想每次方法调用都指定 receiveTimeout,我们可以在配置文件中通过添加配置项的方式设置 RabbitTemplate 级别的时间,如下代码所示:

xml
spring:
  rabbitmq:
    template:
      receive-timeout: 2000

当然,RabbitTemplate 也提供了一组支持接收原生消息的 receive 方法,但我们还是建议使用 receiveAndConvert 方法实现拉模式下的消息消费。

介绍完拉模式,接下来我们介绍推模式,它的实现方法也很简单,如下代码所示:

java
@RabbitListener(queues = "demo.queue")
public void handlerEvent(DemoEvent event) {
    //TODO:添加消息处理逻辑
}

开发人员在 @RabbitListener 中指定目标队列即可自动接收来自该队列的消息,这种实现方式与 15 讲中介绍的 @JmsListener 完全一致。

在 SpringCSS 案例中集成 RabbitMQ

因为这三种模板工具类的使用方式非常类似,都可以用来提取公共代码形成统一的接口和抽象类,所以作为介绍消息中间件的最后一讲,我们想对 SpringCSS 案例中的三种模板工具类的集成方式进行抽象。

实现 account-service 消息生产者

在消息生产者的 account-service 中,我们提取了如下所示的 AccountChangedPublisher 作为消息发布的统一接口。

java
public interface AccountChangedPublisher {

    void publishAccountChangedEvent(Account account, String operation);
}

请注意,这是一个面向业务的接口,没有使用用于消息通信的 AccountChangedEvent 对象。

而我们将在 AccountChangedPublisher 接口的实现类 AbstractAccountChangedPublisher 中完成对 AccountChangedEvent 对象的构建,如下代码所示:

java
public abstract class AbstractAccountChangedPublisher implements AccountChangedPublisher {
 
    @Override
    public void publishAccountChangedEvent(Account account, String operation) {
 
        AccountMessage accountMessage = new AccountMessage(account.getId(), account.getAccountCode(), account.getAccountName());
        AccountChangedEvent event = new AccountChangedEvent(AccountChangedEvent.class.getTypeName(),
                operation.toString(), accountMessage);
 
        publishEvent(event);
    }
 
    protected abstract void publishEvent(AccountChangedEvent event);
}

AbstractAccountChangedPublisher 是一个抽象类,我们基于传入的业务对象构建了一个消息对象 AccountChangedEvent,并通过 publishEvent 抽象方法发送消息。

针对不同的消息中间件,我们需要分别实现对应的 publishEvent 方法。以 Kafka 为例,我们重构了原有代码并提供了如下所示的 KafkaAccountChangedPublisher 实现类。

java
@Component("kafkaAccountChangedPublisher")
public class KafkaAccountChangedPublisher extends AbstractAccountChangedPublisher {
 
    @Autowired
    private KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;

    @Override
    protected void publishEvent(AccountChangedEvent event) {
        kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);
    }
}

对 RabbitMQ 而言,RabbitMQAccountChangedPublisher 的实现方式也是类似,如下代码所示:

java
@Component("rabbitMQAccountChangedPublisher")
public class RabbitMQAccountChangedPublisher extends AbstractAccountChangedPublisher {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    protected void publishEvent(AccountChangedEvent event) {
        rabbitTemplate.convertAndSend(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, event, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties props = message.getMessageProperties();
                props.setHeader("EVENT_SYSTEM", "SpringCSS");
                return message;
            }
        });
    }
}

对于 RabbitMQ 而言,在使用 RabbitMQAccountChangedPublisher 发送消息之前,我们需要先初始化 Exchange、Queue,以及两者之间的 Binding 关系,因此我们实现了如下所示的 RabbitMQMessagingConfig 配置类。

java
@Configuration
public class RabbitMQMessagingConfig {
 
    public static final String SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE = "springcss.account.exchange";
    public static final String SPRINGCSS_ACCOUNT_ROUTING = "springcss.account.routing";
 
    @Bean
    public Queue SpringCssDirectQueue() {
        return new Queue(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, true);
    }
 
    @Bean
    public DirectExchange SpringCssDirectExchange() {
        return new DirectExchange(SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE, true, false);
    }
 
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(SpringCssDirectQueue()).to(SpringCssDirectExchange())
                .with(SPRINGCSS_ACCOUNT_ROUTING);
    }
 
    @Bean
    public Jackson2JsonMessageConverter rabbitMQMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

上述代码中初始化了一个 DirectExchange、一个 Queue ,并设置了两者之间的绑定关系,同时我们还初始化了一个 Jackson2JsonMessageConverter 用于在消息发送过程中将消息转化为序列化对象,以便在网络上进行传输。

实现 customer-service 消息消费者

现在,回到 customer-service 服务,我们先看看提取用于接收消息的统一化接口 AccountChangedReceiver,如下代码所示:

java
public interface AccountChangedReceiver {

    //Pull 模式下的消息接收方法
    void receiveAccountChangedEvent();

    //Push 模式下的消息接收方法
    void handlerAccountChangedEvent(AccountChangedEvent event);
}

AccountChangedReceiver 分别定义了拉取模式和推送模式下的消息接收方法,同样我们也提取了一个抽象实现类 AbstractAccountChangedReceiver,如下代码所示:

java
public abstract class AbstractAccountChangedReceiver implements AccountChangedReceiver {
 
    @Autowired
    LocalAccountRepository localAccountRepository;
 
    @Override
    public void receiveAccountChangedEvent() {
 
        AccountChangedEvent event = receiveEvent();
 
        handleEvent(event);
    }
 
    protected void handleEvent(AccountChangedEvent event) {
        AccountMessage account = event.getAccountMessage();
        String operation = event.getOperation();
 
        operateAccount(account, operation);
    }
 
    private void operateAccount(AccountMessage accountMessage, String operation) {
        System.out.print(
                accountMessage.getId() + ":" + accountMessage.getAccountCode() + ":" + accountMessage.getAccountName());
 
        LocalAccount localAccount = new LocalAccount(accountMessage.getId(), accountMessage.getAccountCode(),
                accountMessage.getAccountName());
 
        if (operation.equals("ADD") || operation.equals("UPDATE")) {
            localAccountRepository.save(localAccount);
        } else {
            localAccountRepository.delete(localAccount);
        }
    }
 
    protected abstract AccountChangedEvent receiveEvent();
}

这里实现了 AccountChangedReceiver 接口的 receiveAccountChangedEvent 方法,并定义了一个 receiveEvent 抽象方法接收来自不同消息中间件的 AccountChangedEvent 消息。一旦 receiveAccountChangedEvent 方法获取了消息,我们将根据其中的 Account 对象及对应的操作更新本地数据库。

接下来我们看看 AbstractAccountChangedReceiver 中的一个实现类 RabbitMQAccountChangedReceiver,如下代码所示:

java
@Component("rabbitMQAccountChangedReceiver")
public class RabbitMQAccountChangedReceiver extends AbstractAccountChangedReceiver {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Override
    public AccountChangedEvent receiveEvent() {
        return (AccountChangedEvent) rabbitTemplate.receiveAndConvert(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE);
    }
 
    @Override
    @RabbitListener(queues = AccountChannels.SPRINGCSS_ACCOUNT_QUEUE)
    public void handlerAccountChangedEvent(AccountChangedEvent event) {
        super.handleEvent(event);
    }
}

上述 RabbitMQAccountChangedReceiver 同时实现了 AbstractAccountChangedReceiver 的 receiveEvent 抽象方法及 AccountChangedReceiver 接口中的 handlerAccountChangedEvent 方法。其中 receiveEvent 方法用于主动拉取消息,而 handlerAccountChangedEvent 方法用于接受推动过来的消息,在该方法上我们添加了 @RabbitListener 注解。

接着我们来看下同样继承了 AbstractAccountChangedReceiver 抽象类的 KafkaAccountChangedListener 类,如下代码所示:

java
@Component
public class KafkaAccountChangedListener extends AbstractAccountChangedReceiver {
 
    @Override
    @KafkaListener(topics = AccountChannels.SPRINGCSS_ACCOUNT_TOPIC)
    public void handlerAccountChangedEvent(AccountChangedEvent event) {
 
        super.handleEvent(event);
    }
 
    @Override
    protected AccountChangedEvent receiveEvent() {
        return null;
    }
}

我们知道 Kafka 只能通过推送方式获取消息,所以它只实现了 handlerAccountChangedEvent 方法,而 receiveEvent 方法为空。

小结与预告

这一讲,我们学习了最后一款消息中间件 RabbitMQ,并使用 Spring Boot 提供的 RabbitTemplate 完成了消息的发送和消费。同时,基于三种消息中间件的对接方式,我们提取了它们之间的共同点,并抽取了对应的接口和抽象类,重构了 SpringCSS 系统的实现过程。

Spring 为我们提供了一整套完整的安全解决方案,17 讲我们将对这套解决方案展开讨论。

这里给你留一道思考题:在使用 RabbitMQ 时,如何完成 Exchange、Queue ,以及它们之间绑定关系的初始化?欢迎你在留言区与我交流、互动。

另外,如果你觉得本专栏很有价值,欢迎转发给更多好友哦~