Skip to content

第23讲:如何查询乘客和司机的行程

上一课时对 CQRS 技术做了理论上的介绍,本课时将讲解 CQRS 技术在实际开发中的应用。

历史行程查询

在示例应用中,乘客和司机都需要查询历史行程的信息,行程的信息由行程管理微服务负责维护,保存在数据库中。最直接的做法是由行程管理微服务提供 API 来查询历史行程。但是这种做法有一个最大的问题在于,行程管理微服务中保存的行程信息是不完整的,行程的乘客和司机都只有标识符,没有具体的信息。相关的具体信息由乘客管理微服务和司机管理微服务来维护,这也是应用领域驱动设计的结果。乘客和司机分别是所在聚合的根实体,同样作为聚合根实体的行程,只能引用其他聚合根实体的标识符。

历史行程中需要包含乘客和司机的具体信息,这就意味着我们需要一种方式从这两个微服务中获取这些信息。

第一种获取信息的做法是在查询历史行程时,通过这两个微服务的 API 来获取。因为行程对象中已经包含了乘客和司机的标识符,只需要一个 API 调用,就可以获取到所需的信息,然后再与行程对象中的已有信息进行合并,就得到了所需要的结果。这种做法类似于关系型数据库中的表连接操作。它的缺点是性能很差,每个历史行程的获取都需要两次微服务 API 的调用。

第二种做法是修改行程管理服务中的行程对象的定义,以增加所需要的额外字段。这种做法的不足之处在于,这些附加的乘客和司机信息,与行程管理微服务的业务逻辑无关,只是为了满足特定的查询需要而引入的,并不是一个很好的设计。

第三种做法是为行程历史记录创建独立的存储,包含所需的全部数据。这种做法的好处是查询的性能很好,不需要额外的操作,从设计的角度来说,也实现了更好的职责划分。问题在于需要在不同的微服务之间同步数据。

下面通过具体的历史行程查询服务来说明第三种做法的实现。下面代码中的 Trip 类是历史行程查询服务中表示行程的领域对象类。与行程管理服务中的 Trip 类相比,下面所展示的 Trip 类的定义被简化了很多,只包含所需的属性,并没有相关的业务逻辑。这也符合对于查询模型的期望,查询模型只是作为数据的容器,其中除了一些基本的数据转换逻辑之外,并不包含与业务相关的内容。Trip 类的属性 passengerName 和 driverName 分别表示乘客和司机的名字,这是相对于行程管理服务中的 Trip 类而言,新增加的附加属性。

java
@Entity
@Table(name = "trips")
@Data
public class Trip {
  @Id
  private String id;
  @Column(name = "passenger_id")
  private String passengerId;
  @Column(name = "passenger_name")
  private String passengerName;
  @Column(name = "driver_id")
  private String driverId;
  @Column(name = "driver_name")
  private String driverName;
  @Column(name = "start_pos_lng")
  private BigDecimal startPosLng;
  @Column(name = "start_pos_lat")
  private BigDecimal startPosLat;
  @Column(name = "end_pos_lng")
  private BigDecimal endPosLng;
  @Column(name = "end_pos_lat")
  private BigDecimal endPosLat;
  @Column(name = "state")
  @Enumerated(EnumType.STRING)
  private TripState state;
}

接下来需要解决的问题是如何保持数据的同步。由于行程管理服务已经发布与行程对象相关的事件,我们只需要处理这些事件,就可以使得历史行程查询服务中的数据,与行程管理服务中的数据保持一致。当表示行程已创建的 TripCreatedEvent 事件发布时,历史行程查询服务需要创建同样的行程对象。当行程状态发生改变时,历史行程查询服务需要做出来相同的改动。

下面代码中的 TripHistoryServiceEventConsumer 类负责处理与行程对象相关的事件。当不同的事件发生时,调用 TripService 的不同方法来更新数据。通过这样的方式,可以保证查询模型中的数据的最终一致性。所处理的事件来自不同的微服务中聚合的根实体,包括行程、乘客和行程派发。

js
public class TripHistoryServiceEventConsumer {
  @Autowired
  TripService tripService;
  public DomainEventHandlers tripDomainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType(
            "io.vividcode.happyride.tripservice.domain.Trip")
        .onEvent(TripCreatedEvent.class, this::onTripCreated)
        .onEvent(TripConfirmedEvent.class, this::onTripConfirmed)
        .onEvent(TripCancelledEvent.class, this::onTripCancelled)
        .build();
  }
  public DomainEventHandlers passengerDomainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType(
            "io.vividcode.happyride.passengerservice.domain.Passenger")
        .onEvent(PassengerDetailsUpdatedEvent.class,
            this::onPassengerDetailsUpdated)
        .build();
  }
  public DomainEventHandlers dispatchDomainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType(
            "io.vividcode.happyride.dispatchservice.domain.Dispatch")
        .onEvent(TripAcceptanceSelectedEvent.class, this::onTripAccepted)
        .build();
  }
  private void onTripCreated(
      final DomainEventEnvelope<TripCreatedEvent> envelope) {
    this.tripService.createTrip(envelope.getAggregateId(),
        envelope.getEvent().getTripDetails());
  }
  private void onPassengerDetailsUpdated(
      final DomainEventEnvelope<PassengerDetailsUpdatedEvent> envelope) {
    this.tripService
        .updatePassengerDetails(envelope.getAggregateId(),
            envelope.getEvent().getPassengerDetails());
  }
  private void onTripConfirmed(
      final DomainEventEnvelope<TripConfirmedEvent> envelope) {
    this.tripService
        .updateTripState(envelope.getAggregateId(), TripState.CONFIRMED);
  }
  private void onTripCancelled(
      final DomainEventEnvelope<TripCancelledEvent> envelope) {
    this.tripService
        .updateTripState(envelope.getAggregateId(), TripState.CANCELLED);
  }
  private void onTripAccepted(
      final DomainEventEnvelope<TripAcceptanceSelectedEvent> envelope) {
    final TripAcceptanceSelectedEvent event = envelope.getEvent();
    this.tripService
        .setTripDriver(envelope.getAggregateId(), event.getDriverId());
  }
}

这里需要注意的是对 PassengerDetailsUpdatedEvent 事件的处理,这是一个新增的事件,用来更新 Trip 对象中的 passengerName 属性。乘客管理服务同样会对 TripCreatedEvent 事件进行处理,并根据事件中的行程对象的乘客标识符,找到乘客的名称,并发布新的 PassengerDetailsUpdatedEvent 事件。下面代码中的 PassengerServiceEventConsumer 类展示了乘客管理服务对 TripCreatedEvent 事件的处理逻辑。

js
public class PassengerServiceEventConsumer {
  @Autowired
  PassengerService passengerService;
  @Autowired
  DomainEventPublisher domainEventPublisher;
  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType("io.vividcode.happyride.tripservice.domain.Trip")
        .onEvent(TripCreatedEvent.class, this::onTripCreated)
        .build();
  }
  private void onTripCreated(
      final DomainEventEnvelope<TripCreatedEvent> envelope) {
    final String passengerId = envelope.getEvent().getTripDetails()
        .getPassengerId();
    this.passengerService.getPassenger(passengerId)
        .ifPresent(passenger -> {
          final PassengerDetails passengerDetails = new PassengerDetails(
              passenger.getName());
          this.domainEventPublisher
              .publish(
                  "io.vividcode.happyride.passengerservice.domain.Passenger",
                  passengerId,
                  Collections.singletonList(
                      new PassengerDetailsUpdatedEvent(passengerDetails)));
        });
  }
}

如果乘客修改了自己的名字,乘客管理服务同样可以发布 PassengerDetailsUpdatedEvent 事件,这样历史行程查询服务中的行程信息同样会被更新。当乘客名称更新时,该乘客的所有行程记录都会被更新。

Axon 框架实现

Axon 框架提供了事件源和 CQRS 技术的完整实现,如果使用 Axon 框架,可以更容易的实现 CQRS。下面介绍如何使用 Axon 框架来实现行程管理服务。

聚合

Axon 框架对领域驱动设计中的一些概念提供了原生的支持。以聚合来说,只需要在领域对象类上添加 @Aggregate 注解,就可以声明它是一个聚合的根实体。聚合对象类中包含了表示状态的属性,以及改变状态的方法。通过在不同的属性和方法上添加 Axon 框架的注解,就可以定义对于聚合对象的不同操作。

下面代码中的 Trip 类是行程管理服务的聚合根实体,其中用到了几个重要的 Axon 框架注解。

@AggregateIdentifier 注解表示的是聚合根实体对象的标识符,该标识符应该是全局唯一的,该注解的作用是声明属性 id 作为 Trip 对象的标识符。

@CommandHandler 注解用来声明处理命令的构造器或方法,命令处理器方法的第一个参数表示了所接受的命令类型。如果该注解添加在构造器上,则说明在处理该命令时,会创建聚合根实体的一个新对象。命令处理器中包含的是处理请求的业务逻辑。

@EventSourcingHandler 注解用来声明事件源技术中事件的处理器,事件处理器中包含的是改变对象状态的逻辑。在事件源技术中,对象状态的所有修改都以事件的形式来描述。只需要重放所有的事件,再应用事件处理器中所做的修改,就得到了对象的当前状态。

java
@Aggregate
public class Trip {
  @AggregateIdentifier
  private String id;
  private TripState state;
  @CommandHandler
  public Trip(final CreateTripCommand command) {
    apply(new TripCreatedEvent(command.getTripId(), command.getTripDetails()));
  }
  @CommandHandler
  public void handle(final CancelTripCommand command) {
    if (this.state != TripState.CREATED) {
      throw new IllegalTripStateException(this.state, TripState.CANCELLED);
    }
    apply(new TripCancelledEvent(command.getTripId()));
  }
  @CommandHandler
  public void handle(final ConfirmTripCommand command) {
    if (this.state != TripState.CREATED) {
      throw new IllegalTripStateException(this.state, TripState.CONFIRMED);
    }
    apply(new TripConfirmedEvent(command.getTripId()));
  }
  @EventSourcingHandler
  public void on(final TripCreatedEvent event) {
    this.id = event.getTripId();
    this.state = TripState.CREATED;
  }
  @EventSourcingHandler
  public void on(final TripCancelledEvent event) {
    this.state = TripState.CANCELLED;
  }
  @EventSourcingHandler
  public void on(final TripConfirmedEvent event) {
    this.state = TripState.CONFIRMED;
  }
  protected Trip() {
  }
}

命令处理器

命令处理器所处理的命令只是简单的 POJO 对象,命令对象通常表示来自客户端的请求。下面代码中的 ConfirmTripCommand 类表示的是确认行程的命令,命令对象中需要包含一个属性来声明处理该命令的聚合对象的标识符,这是通过 @TargetAggregateIdentifier 注解来表示的。

java
@Data
@NoArgsConstructor
@RequiredArgsConstructor
public class ConfirmTripCommand {
  @NonNull
  @TargetAggregateIdentifier
  private String tripId;
}

在命令处理器中,AggregateLifecycle 类的静态方法 apply 用来发布事件消息,所发布的事件会触发对应的事件源处理器,从而改变对象的状态。在处理 ConfirmTripCommand 命令的 handle 方法中,首先检查当前行程对象的状态是否合法,如果合法的话,则使用 apply 方法来发布 TripConfirmedEvent 事件。而 TripConfirmedEvent 事件的处理器,把当前行程对象的状态修改为 CONFIRMED。

从这里可以看出来 CQRS 技术中命令模型的基本处理流程,那就是命令处理器发布事件,事件处理器更新对象状态。

一般来说,会有一个命令负责创建聚合对象,通过添加了 @CommandHandler 注解的构造器来实现。在 Trip 类中,CreateTripCommand 命令用来创建 Trip 对象。在构造器所发布的 TripCreatedEvent 事件的处理器中,必须要设置聚合对象的标识符,这样后续的命令才能找到对应的对象。

发送命令

Axon 框架中的 CommandGateway 用来发送命令。下面代码中的 TripService 使用 CommandGateway 的 send 方法来发送命令,send 方法的返回值是 CompletableFuture 对象,也可以使用 sendAndWait 方法来发送命令并等待完成。

java
@Service
public class TripService {
  @Autowired
  CommandGateway commandGateway;
  public CompletableFuture<String> createTrip(String passengerId, PositionVO startPos,
      PositionVO endPos) {
    String tripId = UUID.randomUUID().toString();
    TripDetails tripDetails = new TripDetails(passengerId, startPos, endPos);
    CreateTripCommand command = new CreateTripCommand(tripId, tripDetails);
    return commandGateway.send(command);
  }
  public CompletableFuture<Void> cancelTrip(String tripId) {
    CancelTripCommand command = new CancelTripCommand(tripId);
    return commandGateway.send(command);
  }
  public CompletableFuture<Void> confirmTrip(String tripId) {
    ConfirmTripCommand command = new ConfirmTripCommand(tripId);
    return commandGateway.send(command);
  }
}

查询模型

在设计查询模型时,要满足的需求是查询历史行程的相关信息。为了查询方便,使用关系型数据库来保存行程数据,通过 Spring Data JPA 来实现。下面代码中的 TripView 是表示历史行程的领域对象类,同时也是 JPA 的实体类。

java
@Entity
@Table(name = "trip_view")
@Data
public class TripView {
  @Id
  private String id;
  @Column(name = "start_pos_lng")
  private BigDecimal startPosLng;
  @Column(name = "start_pos_lat")
  private BigDecimal startPosLat;
  @Column(name = "end_pos_lng")
  private BigDecimal endPosLng;
  @Column(name = "ent_pos_lat")
  private BigDecimal endPosLat;
  @Column(name = "state")
  @Enumerated(EnumType.STRING)
  private TripState state;
}

查询模型中的数据更新来自对不同事件的处理。下面代码中的 TripViewEventHandler 类中包含了不同的事件处理方法,通过 @EventHandler 注解来声明。在处理事件时,只需要根据事件的类型,对相应的 TripView 对象使用 TripViewRepository 进行修改即可。所有的修改都会保存在数据库中,与命令模型中的状态保持一致。

java
@Service
@Transactional
public class TripViewEventHandler {
  @Autowired
  TripViewRepository repository;
  @EventHandler
  public void on(TripCreatedEvent event) {
    TripView tripView = new TripView();
    tripView.setId(event.getTripId());
    TripDetails tripDetails = event.getTripDetails();
    PositionVO startPos = tripDetails.getStartPos();
    tripView.setStartPosLng(startPos.getLng());
    tripView.setStartPosLat(startPos.getLat());
    PositionVO endPos = tripDetails.getEndPos();
    tripView.setEndPosLng(endPos.getLng());
    tripView.setEndPosLat(endPos.getLat());
    tripView.setState(TripState.CREATED);
    repository.save(tripView);
  }
  @EventHandler
  public void on(TripCancelledEvent event) {
    repository.findById(event.getTripId())
        .ifPresent(tripView -> tripView.setState(TripState.CANCELLED));
  }
  @EventHandler
  public void on(TripConfirmedEvent event) {
    repository.findById(event.getTripId())
        .ifPresent(tripView -> tripView.setState(TripState.CONFIRMED));
  }
}

处理查询

Axon 框架提供了对查询的支持,由查询请求查询处理器两部分组成。查询请求是一个 POJO 对象,包含了自定义的查询元数据。查询处理器的方法通过 @QueryHandler 注解来进行声明。查询处理器方法的参数是它所能处理的查询请求对象,而返回值则是查询的处理结果。

Axon 框架支持 3 种不同类型的查询,如下表所示。

查询类型说明
点对点查询每个查询请求对应于单一的查询处理器
分散-收集查询对于一个查询请求,所有能够处理该请求的查询处理器都会被调用,得到的结果是一个流,其中包含了所有成功处理该请求的查询处理器的返回值
订阅查询订阅查询除了可以获取到查询结果的初始值之外,还可以获取到之后的更新结果

查询请求由 QueryGateway 来进行处理,应用代码调用 QueryGateway 的不同方法来执行查询,并获取结果。QueryGateway 中的方法如下表所示,类型参数 R 表示查询结果的类型。

查询类型方法名称返回值类型
点对点查询queryCompletableFuture
分散-收集查询scatterGatherStream
订阅查询subscriptionQuerySubscriptionQueryResult

在下面的代码中,TripService的queryTrip 方法用来处理 FetchTripQuery 类型的查询请求,查询结果以 TripSummary 对象表示。

java
@Service
public class TripService {
  @QueryHandler
  public TripSummary queryTrip(final FetchTripQuery query) {
    return this.tripViewRepository.findById(query.getTripId())
        .map(tripView -> {
          final TripSummary tripSummary = new TripSummary();
          tripSummary.setId(tripView.getId());
          tripSummary
              .setStartPos(new PositionVO(tripView.getStartPosLng(),
                  tripView.getStartPosLat()));
          tripSummary.setEndPos(
              new PositionVO(tripView.getEndPosLng(), tripView.getEndPosLat()));
          tripSummary.setState(tripView.getState());
          return tripSummary;
        })
        .orElseThrow(() -> new TripNotFoundException(query.getTripId()));
  }
}

下面代码中的 TripController 是行程管理服务 REST API 的实现,其中的 getTrip 方法使用 QueryGateway 的 query 方法来发送点对点的 FetchTripQuery 类型的查询请求,并把查询结果返回给客户端。

java
@RestController
public class TripController {
  @Autowired
  QueryGateway queryGateway;
  @GetMapping("{id}")
  public CompletableFuture<TripSummary> getTrip(
      @PathVariable("id") final String tripId) {
    return this.queryGateway
        .query(new FetchTripQuery(tripId), TripSummary.class);
  }
}

总结

本课时介绍了 CQRS 技术的两种实现方式,第一种方式是创建新的查询模型并进行存储,再通过事件来与更新模型中的数据保持一致;第二种方式是使用 Axon 框架提供的事件源和 CQRS 技术的支持。通过本课时的学习,你应该掌握根据应用的不同需求来选择合适的 CQRS 技术的实现方式。最后需要强调的是,CQRS 技术有其特定的适用范围,盲目使用该技术可能带来更多的问题,在使用之前需要充分调研和谨慎对待。