Skip to content

23消息消费:如何使用SpringCloudStream实现消息发布者和消费者?(下)

在上一课时中,我们给出了 SpringHealth 案例中基于 Spring Cloud Stream 的消息发布场景以及实现方式,同时也给出了消息消费的应用场景。今天我们将延续上一课时的内容,来具体讲解如何在服务中添加消息消费者,以及使用各项消息消费的高级主题,并给出案例的运行效果。

在服务中添加消息消费者

在介绍消息消费者的具体实现方法之前,我们先来回顾消息消费的实现流程,如下图所示:

消息消费实现流程

针对上图中各个消费者组件的实现过程,我们采用与介绍发布者时相同的方式进行展开。首当其冲的还是要使用 @EnableBinding 注解。

使用 @EnableBinding 注解

与初始化消息发布环境一样,我们同样需要在 intervention-service 需要引入 spring-cloud-stream、spring-cloud-starter-stream-kafka 或 spring-cloud-starter-stream-rabbit 这几个Maven依赖,并构建 Bootstrap 类。intervention-service 中的 Bootstrap 类是 InterventionApplication,其代码如下所示:

java
@SpringCloudApplication
@EnableBinding(Sink.class)
public class InterventionApplication{
 
    public static void main(String[] args) {
        SpringApplication.run(InterventionApplication.class, args);
    }
}

显然,对于作为消息消费者的 Bootstrap 类而言,@EnableBinding 注解所绑定的应该是 Sink 接口。

创建 Sink

UserInfoChangedSink 负责处理具体的消息消费逻辑,代码如下所示:

java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener; 
...
 
public class UserInfoChangedSink {
 
    @Autowired
    private UserInfoRedisRepository userInfoRedisRepository;
 
    private static final Logger logger = LoggerFactory.getLogger(UserInfoChangedSink.class);
 
    @StreamListener("input")
    public void handleChangedUserInfo(UserInfoChangedEventMapper userInfoChangedEventMapper) {
     
        logger.debug("Received a message of type " + userInfoChangedEventMapper.getType()); 
     logger.debug("Received a {} event from the user-service for user name {}", 
             userInfoChangedEventMapper.getOperation(), 
             userInfoChangedEventMapper.getUser().getUserName());
        
        if(userInfoChangedEventMapper.getOperation().equals("ADD")) {
            userInfoRedisRepository.saveUser(userInfoChangedEventMapper.getUser());
        } else if(userInfoChangedEventMapper.getOperation().equals("UPDATE")) {
         userInfoRedisRepository.updateUser(userInfoChangedEventMapper.getUser());            
        } else if(userInfoChangedEventMapper.getOperation().equals("DELETE")) {
         userInfoRedisRepository.deleteUser(userInfoChangedEventMapper.getUser().getUserName());
        } else {            
            logger.error("Received an UNKNOWN event from the user-service of type {}", userInfoChangedEventMapper.getType());
        }
    }
}

这里引入了一个新的注解 @StreamListener,将该注解添加到某个方法上就可以使之接收处理流中的事件。在上面的例子中,@StreamListener 注解添加在了 handleChangedUserInfo() 方法上并指向了"input"通道,意味着所有流经"input"通道的消息都会交由这个 handleChangedUserInfo() 方法进行处理。

而在 handleChangedUserInfo() 方法中,我们调用 UserInfoRedisRepository 类完成各种缓存相关的处理。UserInfoRedisRepository 的实现代码参考如下:

java
@Repository
public class UserInfoRedisRepositoryImpl implements UserInfoRedisRepository {
    private static final String HASH_NAME = "user";
 
    private RedisTemplate<String, UserMapper> redisTemplate;
    private HashOperations<String, String, UserMapper> hashOperations;
 
    public UserInfoRedisRepositoryImpl() {
        super();
    }
 
    @Autowired
    private UserInfoRedisRepositoryImpl(RedisTemplate<String, UserMapper> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
 
    @PostConstruct
    private void init() {
        hashOperations = redisTemplate.opsForHash();
    }
 
    @Override
    public void saveUser(UserMapper user) {
        hashOperations.put(HASH_NAME, user.getUserName(), user);
    }
 
    @Override
    public void updateUser(UserMapper user) {
        hashOperations.put(HASH_NAME, user.getUserName(), user);
    }
 
    @Override
    public void deleteUser(String userName) {
        hashOperations.delete(HASH_NAME, userName);
    }
 
    @Override
    public UserMapper findUserByUserName(String userName) {
        return (UserMapper) hashOperations.get(HASH_NAME, userName);
    }
}

这里,我们使用了 Spring Data 提供的 RedisTemplate 和 HashOperations 工具类来封装对Redis的数据操作。关于 Spring Data 的使用方法不是本课程的重点,你可以参考相关资料进行进一步了解。

配置 Binder

对于消息消费者而言,配置 Binder 的方式和消息发布者非常类似。如果使用默认的消息通道,那么我们只需要把用于发送的"output"通道改为接收的"input"通道就可以了。这里以 Kafka 为例,给出 Binder 的配置信息,如下所示:

xml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination:  userInfoChangedTopic
          content-type: application/json
      kafka:
        binder:
          zk-nodes: localhost
	      brokers: localhost

Spring Cloud Stream 高级主题

在分别介绍完消息发布者和消费者的基本实现过程之后,我们将在此基础上讨论 Spring Cloud Stream 的高级主题,包括自定义消息通道、使用消费者组以及消息分区。

自定义消息通道

在前面的示例中,无论是消息发布还是消息消费,我们都使用了 Spring Cloud Stream 中默认提供的通道名"output"和"input"。显然,在有些场景下,为了更好地管理系统中存在的所有通道,为通道进行命名是一项最佳实践,这点对于消息消费的场景尤为重要。在接下来的内容中,针对消息消费的场景,我们将不使用 Sink 组件默认提供的"input"通道,而是尝试通过自定义通道的方式来实现消息消费。

在 Spring Cloud Stream 中,实现一个面向消息消费场景的自定义通道的方法也非常简单,只需要定义一个新的接口,并在该接口中通过 @Input 注解声明一个新的 Channel 即可。例如我们可以定义一个新的 UserInfoChangedChannel 接口,然后通过 @Input 注解就可以声明一个"userInfoChangedChannel"通道,代码如下所示。

java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
 
public interface UserInfoChangedChannel{
 
  String USER_INFO = "userInfoChangedChannel";
    
    @Input(UserInfoChangedChannel.USER_INFO)
    SubscribableChannel userInfoChangedChannel();
}

注意到该通道的类型为 Spring Intergration 中用于消费消息的 SubscribableChannel。同时,我们也注意到这个 UserInfoChangedChannel 的代码风格与 Spring Cloud Stream 自带的Sink接口完全一致。作为回顾,这里也给出 Sink 接口的定义,如下所示:

java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
	 
public interface Sink{
 
    String INPUT = "input";
 
    @Input(Sink.INPUT)
    SubscribableChannel input();
}

一旦我们完成了自定义的消息通信,就可以在 @StreamListener 注解中设置这个通道。以前面介绍的 UserInfoChangedSink 为例,添加了自定义通道之后的重构代码结构如下所示:

java
@EnableBinding(UserInfoChangedChannel.class)
public class UserInfoChangedSink{
 
    @StreamListener(UserInfoChangedChannel.USER_INFO)
public void handleChangedUserInfo(UserInfoChangedEventMapper userInfoChangedEventMapper) {
	     ...
	}
}

可以看到,这里我们继续使用 @EnableBinding 注解绑定了自定义的 UserInfoChangedChannel。因为 UserInfoChangedChannel 中通过 @Input 注解提供了"userInfoChangedChannel"通道,所以这种用法实际上和 @EnableBinding(Sink.class) 是完全一致的。因此,对于 Binder 的配置而言,我们要做的也只是调整通道的名称。再次以 Kafka 为例,重构后的 Binder 配置信息如下所示:

xml
spring:
  cloud:
    stream:
      bindings:
        userInfoChangedChannel:
          destination:  userInfoChangedTopic
          content-type: application/json
      kafka:
        binder:
          zk-nodes: localhost
	      brokers: localhost

使用消费者分组

在微服务架构中,服务多实例部署的场景非常常见。在集群环境下,我们希望服务的不同实例被放置在竞争的消费者关系中,同一服务集群中只有一个实例能够处理给定消息。Spring Cloud Stream 提供的消费者分组可以很方便地实现这一需求,效果图如下所示:

intervention-service 消息分组效果示意图

在上图中,两个 intervention-service 实例构成了一个 interventionGroup。在这个 interventionGroup 中,UserInfoChangedEvent 事件只会被一个 intervention-service 实例所消费。

要想实现上图所示的消息消费效果,我们唯一要做的事情也是重构Binder配置,即在配置Binder时指定消费者分组信息即可,如下所示:

xml
spring:
  cloud:
    stream:
      bindings:
        userInfoChangedChannel:
          destination:  userInfoChangedTopic
          content-type: application/json
         group: interventionGroup
      kafka:
        binder:
          zk-nodes: localhost
	      brokers: localhost

以上基于Kafka的配置信息中,我们关注"bindings"段中的通道名称使用了自定义的"userInfoChangedChannel",并且在该配置项中设置了"group"为"interventionGroup"。

使用消息分区

最后一项 Spring Cloud Stream 使用上的高级主题是使用消费分区。同样是在集群环境下,假设存在两个 intervention-service 实例,我们希望用户信息中 id 为单号的 UserInfoChangedEvent 始终由第一个 intervention-service 实例进行消费,而id为双号的 UserInfoChangedEvent 则始终由第二个 intervention-service 实例进行消费。基于类似这样的需求,我们就可以构建消息分区,如下所示:

intervention-service 消息分区效果示意图

要想实现上图所示的消息消费效果,我们唯一要做的事情还是重构 Binder 配置。这次以 RabbitMQ 为例给出示例配置,如下所示:

xml
spring:
  cloud:
    stream:
      bindings:
        default:
          content-type: application/json
          binder: rabbitmq
        output:
             destination: userInfoChangedExchange
          group: interventionGroup
          producer:
            partitionKeyExpression: payload.user.id % 2
            partitionCount: 2
      binders:
        rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: guest
                password: guest
                virtual-host: /

首先,我们明确上述配置项针对的是消息发布者 Source 组件,因为我们看到了"output"配置项。注意到,我们指定了交换器和消费者分组分别为 "userInfoChangedExchange"和"interventionGroup"。同时,这里还出现了两个新的配置项"partitionKeyExpression"和"partitionCount",这两个配置项就与消息分区有关。我们指定了"partitionKeyExpression"为"payload.user.id",意味着 Spring Cloud Stream 会根据传入的 UserInfoChangedEvent 中的 User 对象的 id 对 2 进行取模操作。如果取模值为 1 表示只有分区Id为 1 的 intervention-service 能接收到该信息,如果是取模值为 0 表示只有分区 Id 为 2 的 intervention-service 能接收到该信息。显然,通过这样的分区策略,分区的数量"partitionCount"应该为 2。

对应的,作为消息消费者的 Sink 组件的配置项如下所示:

xml
spring:
  cloud:
    stream:
      bindings:
        default:
          content-type: application/json
          binder: rabbitmq
        input:
          destination: userInfoChangedExchange
	group: interventionGroup
          consumer:
            partitioned: true
            instanceIndex: 0
            instanceCount: 2
      binders:
        rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: guest
                password: guest
                virtual-host: /

上述配置中同样包含了分区信息,其中 partitioned=true 表示启用消息分区功能,instanceCount=2 表示消息分区的消费者节点数量为 2 个。特别要注意的是 instanceIndex 参数用来设置当前消费者实例的索引号。instanceIndex 是从 0 开始的,我们在这里就把当前服务实例的索引号为 0。显然我们在另外一个 intervention-service 实例中需要将 instanceIndex 设置为 1。

为了演示消息分区功能,我们需要运行一个 user-service 作为 Source 组件,以及两个独立的 intervention-service 作为 Sink 组件,从而构建一个完整的示例并给出运行时应用系统的控制台输出效果。两个独立的 Sink 组件就按照前面给出的分区策略进行消息的处理。然后在两个 Sink 组件的输出中,UserInfoChangedEvent 中 User 对象的 Id 成单双数交替出现。你可以自己做一些尝试和练习。

小结与预告

承接上一课时内容,今天我们继续讨论使用 Spring Cloud Stream 实现消息消费者的实现方法。同样,我们发现通过合理配置 Binder 组件,这一实现过程也比较简单。另一方面,Spring Cloud Stream 中还存在一些高级主题,例如自定义消息通道、消费者组以及消费分区,本课时同样也介绍了如何在 SpringHealth 案例系统中使用这些高级主题的方法。

这里给你留一道思考题:在 Spring Cloud Stream 中,如何配置消费者组和消费分区功能?

通过前面课程的学习,我们感受到了 Spring Cloud Stream 中 Binder 组件的强大功能。基于这个组件,我们可以使用同一套开发模式来分别集成 RabbitMQ 和 Kafka 等主流的消息中间件。介绍完消息发布和消费之后,我们有必要对 Binder 组件的内部实现机制做深入分析,这就是下一课时的内容。