Skip to content

第16讲:事件发布如何进行处理

在事件驱动的微服务中最基本的操作是发布处理事件,事件以消息的形式发布到消息代理中,示例应用使用 Apache Kafka 作为消息代理,从实现的角度来说,我们只需要直接使用 Kafka 的客户端就可以发布和处理事件了。不过这种做法的开发效率不高,示例应用使用 Eventuate Tram 框架来提高开发效率。本课时将介绍如何使用 Eventuate Tram 框架来发布和处理事件。

事件描述

我们首要的任务是描述事件,事件一般由 3 个要素组成,即标识符、类型和载荷。标识符 是事件的唯一标识,可以用来区分重复的事件;类型 用来区分不同的事件,事件类型一般使用名词加上动词被动语态的形式,如 TripCreatedEvent,在 Java 中,一般使用事件类的全名作为事件的类型;事件的载荷由事件的类型来确定,事件类型可以没有载荷。

每个事件都有一个来源,表明产生该事件的对象。事件来源的类型一般是事件类型中作为前缀的名词部分,比如表示行程已创建的事件 TripCreatedEvent 的来源是行程。在介绍领域驱动设计时,我提到过聚合的概念,如果在建模时使用了聚合,那么事件的来源通常是聚合中的实体,这样的事件称为领域事件。领域事件除了上述 3 个基本属性之外,还包括事件的来源对象所在聚合的根实体的类型和标识符。比如,乘客管理服务中发布的事件,包含对应的乘客的标识符和乘客的实体类型。

在 Java 中,我们使用 Java 类来表示事件,不同的事件类之间共通的部分很少,一般使用一个标记接口(Marker Interface)来声明事件,所有的事件类只需要实现这个标记接口即可,你可能会认为事件的接口中应该包含一个 getId 方法来返回事件的标识符。实际上,事件的标识符对于事件对象本身来说并没有意义,我们只需要在发布事件的时候生成其标识符即可,并不需要把标识符添加到事件对象模型中。

Eventuate Tram 提供了标记接口 DomainEvent。下面代码中的接口 TripDomainEvent 是行程相关的事件的标记接口:

java
public interface TripDomainEvent extends DomainEvent {
}

事件类是实现了标记接口的 POJO 类,事件对象本身就是事件的载荷格式。当发布事件时,只需要把当前的事件对象序列化,就得到了事件的载荷。事件对象在序列化时,可以使用 JSON 和 XML 这样的文本格式,也可以使用 Protocol Buffers 和 Apache Avro 这样的二进制格式。

下面的代码给出了 TripCreatedEvent 事件类的定义:

java
@Data
@NoArgsConstructor
@RequiredArgsConstructor
public class TripCreatedEvent implements TripDomainEvent {

  @NonNull
  private TripDetails tripDetails;
}

发布事件

当发布事件时,只需要创建对应事件类的一个新对象即可,实际的事件发布由 Eventuate Tram 来完成。下面代码中的 TripDomainEventPublisher 类用来发布行程相关的领域事件。TripDomainEventPublisher 类继承自抽象类 AbstractAggregateDomainEventPublisher,AbstractAggregateDomainEventPublisher 类包含了发布聚合对应的领域事件的基本逻辑。

js
public class TripDomainEventPublisher extends
    AbstractAggregateDomainEventPublisher<Trip, TripDomainEvent> {

  public TripDomainEventPublisher(
      DomainEventPublisher eventPublisher) {
    super(eventPublisher, Trip.class, Trip::getId);
  }
}

AbstractAggregateDomainEventPublisher<A, E extends DomainEvent> 类有两个类型参数,A 表示聚合的类型,E 表示领域事件的类型。它的构造器需要 3 个参数,如下表所示。

TripDomainEventPublisher 类的构造器只需要提供 DomainEventPublisher 接口的实现。aggregateType 参数的值为 Trip.class,而 idSupplier 函数则是调用 Trip 类的 getId 方法。

当需要发布事件时,使用 TripDomainEventPublisher 的 publish 方法。publish 方法有两个参数:第一个参数是聚合对象,第二个参数是需要发布的事件列表。

下面以创建行程的过程为例来进行说明,代码是 Trip 类中的 createTrip 方法,用来创建 Trip 对象和需要发布的事件。createTrip 方法的返回值类型 ResultWithDomainEvents 是聚合对象和事件列表的一个封装,这里封装了一个 Trip 对象和 TripCreatedEvent 事件。

java
public static ResultWithDomainEvents<Trip, TripDomainEvent> createTrip(String passengerId,
    PositionVO startPos, PositionVO endPos) {
  Trip trip = new Trip(passengerId, startPos, endPos);
  TripCreatedEvent event = new TripCreatedEvent(new TripDetails(passengerId, startPos, endPos));
  return new ResultWithDomainEvents<>(trip, event);
}

下面的代码是 TripService 中的 createTrip 方法。首先使用 Trip 类的 createTrip 方法创建出 Trip 对象和 TripCreatedEvent 事件,接着使用 TripRepository 来保存 Trip 对象,最后使用 TripDomainEventPublisher 来发布事件。TripService 类添加了 @Transactional 注解,保证了 createTrip 方法的调用在一个事务中完成。

java
public TripVO createTrip(String passengerId, PositionVO startPos, PositionVO endPos) {
  ResultWithDomainEvents<Trip, TripDomainEvent> tripAndEvents = Trip
      .createTrip(passengerId, startPos, endPos);
  Trip trip = tripAndEvents.result;
  tripRepository.save(trip);
  tripDomainEventPublisher.publish(trip, tripAndEvents.events);
  return trip.toTripVO();
}

TripDomainEventPublisher 在发布事件时通过 MessageProducer 接口来完成。MessageProducer 是一个通用的消息发送接口,它的 send 方法用来发送消息,该方法有两个参数:第一个参数是消息的目的地,第二个参数是 Message 接口表示的消息。Message 接口描述了消息中的 3 个组成部分,即标识符、消息头和载荷,消息头是一个哈希表,包含消息头的名称和对应的值,其作用是包含消息的元数据。

TripDomainEventPublisher 在发布事件时,需要从聚合对象和事件对象中构建出消息。事件对象的 JSON 序列化结果,作为消息的载荷 。聚合对象的标识符和类型,以及事件对象的类型都添加到消息头中。聚合对象的类型作为消息的目的地。

由于使用了事务性发件箱模式,MessageProducer 发布的消息会被保存在关系型数据库中。发件箱表的名称是 message,下表给出了 message 表中的字段。

下面代码中的 JSON 数据是 message 表 headers 字段的内容示例:

java
{
  "PARTITION_ID": "05f5e7e8-5a46-4da3-bd29-bb44d5f8b34d",
  "event-aggregate-type": "io.vividcode.happyride.tripservice.domain.Trip",
  "DATE": "Fri, 1 May 2020 01:06:14 GMT",
  "event-aggregate-id": "05f5e7e8-5a46-4da3-bd29-bb44d5f8b34d",
  "event-type": "io.vividcode.happyride.tripservice.api.events.TripCreatedEvent",
  "DESTINATION": "io.vividcode.happyride.tripservice.domain.Trip",
  "ID": "00000171cdc508b4-94b86dfe66ea0000"
}

对于发布到 Kafka 中的消息,主题来自于 message 表中的 destination 字段,分片 ID 来自于 headers 字段中消息头中的 PARTITION_ID,消息的内容是包含 headers 和 payload 两个属性的 JSON 数据。

处理事件

由于事件以消息的形式发布到 Kafka,那么处理事件时则需要订阅 Kafka 中的主题,并消费其中的消息。我们需要创建一个 DomainEventDispatcher 类的对象,来负责消费主题中的消息并处理。DomainEventDispatcher 类使用 MessageConsumer 接口来订阅主题,对事件的处理逻辑由 DomainEventHandlers 类来声明,DomainEventHandlers 对象包含了一个 DomainEventHandler 对象的列表,DomainEventHandler 类的构造器的参数如下表所示。其中 DomainEventEnvelope 接口封装了事件的相关信息,包括事件对象、聚合 ID、聚合类型、事件 ID 和消息对应的 Message 对象。

一般使用构建器 DomainEventHandlersBuilder 类来创建 DomainEventHandlers 对象。在下面的代码中,DomainEventHandlersBuilder 类的 forAggregateType 方法指定了 DomainEventHandler 对象的聚合类型,而 onEvent 方法则对不同类型的事件指定了处理器。这里我指定了 TripConfirmedEvent 事件的处理器是 onTripConfirmed 方法。

js
public class DispatchServiceEventConsumer {

  @Autowired
  DispatchService dispatchService;

  private static final Logger LOGGER = LoggerFactory
      .getLogger(DispatchServiceEventConsumer.class);

  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType("io.vividcode.happyride.tripservice.domain.Trip")
        .onEvent(TripConfirmedEvent.class, this::onTripConfirmed)
        .build();
  }

  private void onTripConfirmed(DomainEventEnvelope<TripConfirmedEvent> envelope) {
    TripDetails tripDetails = envelope.getEvent().getTripDetails();
    try {
      dispatchService.dispatchTrip(envelope.getAggregateId(), tripDetails);
    } catch (Exception e) {
      LOGGER.warn("Failed to dispatch trip {}", envelope.getAggregateId(), e);
    }
  }
}

下面代码是与事件处理相关的 Spring 配置类。DomainEventDispatcher 类的对象由 DomainEventDispatcherFactory 类的 make 方法来创建,make 方法的第一个参数是 DomainEventDispatcher 对象的标识符,第二个参数是 DomainEventHandlers 对象。

java
@Configuration
@Import(TramEventSubscriberConfiguration.class)
public class DispatchServiceMessageHandlersConfiguration {

  @Bean
  public DispatchServiceEventConsumer dispatchServiceEventConsumer() {
    return new DispatchServiceEventConsumer();
  }

  @Bean
  public DomainEventDispatcher domainEventDispatcher(
      DispatchServiceEventConsumer dispatchServiceEventConsumer,
      DomainEventDispatcherFactory domainEventDispatcherFactory) {
    return domainEventDispatcherFactory
        .make("dispatchServiceEvents",
            dispatchServiceEventConsumer.domainEventHandlers());
  }
}

重复事件处理

消息在传递时提供的是至少一次的保证性,虽然不会丢失消息,但是会产生重复消息。这就意味着,对于同一个事件,它的处理器可能会被调用多次,对于重复事件的问题,一般有两种解决办法。

第一种做法是使用幂等( Idempotent)处理器,其含义是,对于同一个事件,多次调用处理器不会产生副作用。如果一个事件处理器是幂等的,那就不需要对重复事件进行额外处理,并不是所有的处理器都是幂等的。幂等的处理器需要满足业务逻辑和具体实现两方面的要求。业务逻辑指的是对事件的重复处理在业务上是可行的,具体实现指的是代码实现对于重复事件在处理时不会出错。以订单取消的事件为例,从业务逻辑上来说,一个订单被取消多次是没有问题的,取消一个已经被取消的订单并没有什么影响。在实现上,代码也需要考虑到处理重复事件的情况。

第二种做法是去掉重复的事件。每个事件都有自己的标识符,只需要记录下已经处理过的事件标识符,就可以去掉重复的事件。Eventuate Tram 提供了检测重复消息的功能,DuplicateMessageDetector 接口用来检测重复的消息,接收到消息的标识符被保存在 received_messages 表中。当需要处理新消息时,首先尝试往 received_messages 表中插入新的记录,如果插入时出现重复键的异常,就说明消息已经被处理过。

命令

命令与一般消息的不同之处在于,命令有对应的返回结果,命令的返回结果用另外一个消息来表示。命令类都需要实现标记接口 Command,命令通过 CommandProducer 接口的 send 方法来发送。下面代码给出了 CommandProducer 接口的声明,两个 send 方法的区别在于是否可以使用 resource 参数。

java
public interface CommandProducer {

  String send(String channel, Command command, String replyTo, Map<String, String> headers);

  String send(String channel, String resource, Command command, String replyTo, Map<String, String> headers);
}

下表给出了 send 方法的参数及其说明。

CommandProducer 接口的实现在发送命令时,也创建出了对应的 Message 对象,只不过消息的头中包含的内容不同。消息的目的地是发送命令时指定的通道。

处理命令的工作由 CommandDispatcher 类来完成。CommandDispatcher 类使用 MessageConsumer 接口来接收消息,同时使用 MessageProducer 接口来发送命令的回应消息。对命令的处理逻辑由 CommandHandlers 类来声明,CommandHandlers 对象包含一个 CommandHandler 对象的列表。CommandHandler 类的构造器参数如下表所示。

命令处理器对应的函数有两个参数:第一个参数是封装命令的 CommandMessage 对象,第二个参数是包含资源路径的实际参数值的 PathVariables 对象。函数的返回值是作为回应消息的 Message 对象的列表。

一般使用构建器 CommandHandlersBuilder 类来创建 CommandHandlers 对象。在下面的代码中,CommandHandlers 对象对 QueryWeatherCommand 类型的命令,使用 queryWeather 方法进行处理。withSuccess 方法的作用是创建一个成功的回应消息,与之对应的 withFailure 方法用来创建一个失败的回应消息。

js
@Component
public class WeatherCommandHandlers {

  public CommandHandlers commandHandlers() {
    return CommandHandlersBuilder.fromChannel("weather")
        .onMessage(QueryWeatherCommand.class, this::queryWeather)
        .build();
  }

  private Message queryWeather(CommandMessage<QueryWeatherCommand> cm,
      PathVariables pathVariables) {
    return withSuccess(
        new QueryWeatherResult(cm.getCommand().getCity(), "Rain"));
  }
}

有了 CommandHandlers 对象之后,就可以创建 CommandDispatcher 对象。下面的代码是对应的 Spring 配置类,CommandDispatcherFactory 类的 make 方法用来创建 CommandDispatcher 对象。

java
@Configuration
@Import({TramCommandProducerConfiguration.class,
    TramCommandConsumerConfiguration.class})
public class WeatherConfiguration {

  @Bean
  public CommandDispatcher weatherCommandDispatcher(
      CommandDispatcherFactory commandDispatcherFactory,
      WeatherCommandHandlers weatherCommandHandlers) {
    return commandDispatcherFactory
        .make("weatherCommandDispatcher",
            weatherCommandHandlers.commandHandlers());
  }
}

下面的代码展示了如何发布命令和获取回应。使用 CommandProducer 接口发送 QueryWeatherCommand 命令到通道 weather,而命令回应的发送通道是 weather-reply。使用 MessageConsumer 接口订阅 weather-reply 通道就可以得到命令的回应消息。

java
public void testWeatherCommand() {
  messageConsumer
      .subscribe("weather-subscriber", Collections.singleton("weather-reply"),
          message -> {
            QueryWeatherResult result = JSonMapper
                .fromJson(message.getPayload(), QueryWeatherResult.class);
            System.out.println(result.getResult());
          });

  commandProducer
      .send("weather", new QueryWeatherCommand("Beijing"), "weather-reply",
          new HashMap<>());
}

资源路径的作用类似于 REST API 中的 URL 路径,资源路径中可以包含参数。在处理命令时,可以获取到资源路径中参数的实际值。

在下面的代码中,CommandHandlers 对象的资源路径是 /user/{username},其中 username 是参数。在对应的处理方法 echo 中,可以使用 PathVariables 对象来获取到参数 username 的实际值。

js
@Component
public class EchoCommandHandlers {

  public CommandHandlers commandHandlers() {
    return CommandHandlersBuilder.fromChannel("echo")
        .resource("/user/{username}")
        .onMessage(EchoCommand.class, this::echo)
        .build();
  }

  private Message echo(CommandMessage<EchoCommand> cm,
      PathVariables pathVariables) {
    return withSuccess("echo -> " + pathVariables.getString("username"));
  }
}

在下面的代码中,当使用 CommandProducer 接口的 send 方法发送命令时,提供了资源路径 /user/alex,其中 alex 是参数 username 的实际值。

java
public void testEchoCommand() {
messageConsumer
.subscribe("echo-subscriber", Collections.singleton("echo-reply"),
message -> {
System.out.println(message.getPayload());
});

commandProducer
.send("echo", "/user/alex", new EchoCommand(), "echo-reply",
new HashMap<>());
}

配置 CDC

使用 Eventuate Tram 发布和处理事件时,必须使用 Eventuate CDC 服务,我们只需要启动 CDC 服务对应的 Docker 容器即可。

下面的代码是开发环境的 Docker Compose 文件中与 CDC 服务相关的内容。CDC 服务的配置通过环境变量来完成,以 EVENTUATE_CDC_READER_TRIP 开头的环境变量用来配置读取行程数据库的名为 TRIP 的读取器,以 EVENTUATE_CDC_PIPELINE_PIPELINE1 开头的环境变量用来配置名为 PIPELINE1 的流水线。流水线和读取器的对应关系由 EVENTUATE_CDC_PIPELINE_PIPELINE1_READER 变量来指定。

java
cdc-service:
  image: eventuateio/eventuate-cdc-service:0.6.0.RELEASE
  ports:
    - "9090:8080"
  depends_on:
    - postgres-trip
    - kafka
    - zookeeper
  environment:
    SPRING_PROFILES_ACTIVE: PostgresWal
    LOGGING_LEVEL_IO_EVENTUATE: INFO

    EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181

    EVENTUATE_CDC_PIPELINE_PIPELINE1_TYPE: eventuate-tram
    EVENTUATE_CDC_PIPELINE_PIPELINE1_READER: Trip
    EVENTUATE_CDC_PIPELINE_PIPELINE1_EVENTUATEDATABASESCHEMA: eventuate

    EVENTUATE_CDC_READER_TRIP_TYPE: postgres-wal
    EVENTUATE_CDC_READER_TRIP_DATASOURCEURL: jdbc:postgresql://postgres-trip/happyride-trip
    EVENTUATE_CDC_READER_TRIP_DATASOURCEUSERNAME: postgres
    EVENTUATE_CDC_READER_TRIP_DATASOURCEPASSWORD: postgres
    EVENTUATE_CDC_READER_TRIP_DATASOURCEDRIVERCLASSNAME: org.postgresql.Driver
    EVENTUATE_CDC_READER_TRIP_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/trip

总结

事件驱动的微服务在实现中离不开事件的发布和处理,Eventuate Tram 框架简化了与事件的发布和处理相关的操作。本课时介绍了如何描述事件、发布事件和处理事件,还介绍了如何对重复事件进行处理,以及如何使用命令,最后介绍了如何配置 Eventuate CDC 服务。