Skip to content

第21讲:如何使用Saga模式实现行程验证

上一课时对 Saga 模式进行了概念上的介绍,本课时将介绍 Saga 模式在实际开发中的应用,涉及示例应用中创建行程取消行程 这两个业务场景。Eventuate Tram 和 Axon 框架都提供了对 Saga 模式的支持,示例应用使用的是 Eventuate Tram 框架。Saga 分成编制型编排型两类,本课时将通过两个不同的业务事务来分别说明。创建行程的业务使用编制型 Saga 来实现,而取消行程的业务使用编排型 Saga 来实现,其中又以编制型 Saga 为重点。

编制型 Saga

编制型 Saga 使用一个协调者来管理 Saga 的生命周期,每个 Saga 描述一个业务事务。Saga 的定义用来描述对应的业务事务流程,主要包含具体的步骤,以及步骤之间的递进关系。Saga 定义中有多个参与者,每个参与者可以接受命令并返回响应。在微服务架构的应用中,参与者通常来自不同的微服务。

在运行时,每个 Saga 定义会产生多个实例,每个实例表示业务事务的一次执行。以创建行程为例,每个行程对象的创建过程都有与之对应的 Saga 实例,该实例的状态会被持久化下来。Eventuate Tram 使用关系型数据库来保存 Saga 实例。

Saga 定义可以看成是一个状态机的描述,状态机中的状态来自 Saga 所工作的领域对象,通常是聚合的根实体。状态机中的状态变迁来自对 Saga 参与者所提供的命令的调用,以及命令的回应消息。根据命令的回应结果,状态机转换到不同的状态,当状态机处于某个状态时,会调用与当前状态相关的参与者的命令。

了解业务事务

创建 Saga 的第一步是了解所要描述的业务事务,典型的做法是使用流程图来描述。下图是创建行程的流程图。

从这个流程图中,可以识别出创建行程的 Saga 中的参与者,以及每个参与者需要支持的命令,如下表所示。

处理命令

下一步是在每个微服务中处理与 Saga 相关的命令。在第 16 课时中已经介绍了 Eventuate Tram 框架中的命令的用法。Saga 参与者所使用的命令与普通的命令是相似的,只不过 Saga 中的命令的回应要区分成功和失败这两种情况,这是通过命令消息中特定的消息头的值来确定的。当接收到命令的失败回应时,需要执行补偿操作。

下面代码中的 TripValidationServiceCommandHandlers 类用来定义行程验证服务中,对验证行程的命令 ValidateTripCommand 的处理器。这里需要注意的是,创建 CommandHandlers 对象时使用的是 Saga 参与者特有的 SagaCommandHandlersBuilder 类,而不是通用的 CommandHandlersBuilder 类。

处理 ValidateTripCommand 命令的 validateTrip 方法调用的是 TripValidationService 类的 validateTrip 方法来进行验证。验证时执行的操作包括检查发起行程的乘客是否处于被封禁的状态,以及行程的距离是否超过预设的限制值。如果行程验证成功,则使用 withSuccess 方法返回表示成功的消息;如果验证失败,则使用 withFailure 方法返回以 InvalidTripReply 对象作为载荷的错误消息。

js
@Component
@Slf4j
public class TripValidationServiceCommandHandlers {
  @Autowired
  TripValidationService tripValidationService;
  public CommandHandlers commandHandlers() {
    return SagaCommandHandlersBuilder
        .fromChannel(TripValidationServiceChannels.tripValidation)
        .onMessage(ValidateTripCommand.class, this::validateTrip)
        .build();
  }
  private Message validateTrip(final CommandMessage<ValidateTripCommand> cm) {
    try {
      this.tripValidationService.validateTrip(cm.getCommand().getTripDetails());
      return withSuccess();
    } catch (final TripValidationException e) {
      log.warn("Trip is not valid", e);
      return withFailure(new InvalidTripReply());
    }
  }
}

Saga 参与者的命令处理器的配置方式也发生了变化,如下面的代码所示。CommandDispatcher 对象使用 SagaCommandDispatcherFactory 来创建,而不是普通的 CommandDispatcherFactory 对象。

java
@Configuration
@EnableAutoConfiguration
@Import({SagaParticipantConfiguration.class})
@ComponentScan
public class TripValidationServiceConfiguration {
  @Bean
  public CommandDispatcher commandDispatcher(
      final TripValidationServiceCommandHandlers commandHandlers,
      final SagaCommandDispatcherFactory sagaCommandDispatcherFactory) {
    return sagaCommandDispatcherFactory
        .make("tripValidationServiceDispatcher",
            commandHandlers.commandHandlers());
  }
}

除了行程验证服务之外,行程管理服务和支付服务中的命令处理使用相似的方式来实现。

Saga 定义

第三步是创建 Saga 定义,Eventuate Tram 提供了一种简单的 DSL 来描述 Saga 的定义。下面的代码中是创建行程的 CreateTripSaga 类。Saga 是所有 Saga 的接口,其中的方法 getSagaDefinition 用来返回表示 Saga 定义的 SagaDefinition 接口的对象。Saga 和 SagaDefinition 的类型参数 Data 表示该 Saga 所关联的上下文对象的类型。CreateTripSaga 类实现的是 SimpleSaga 接口,而 SimpleSaga 同时继承了 Saga 和 SimpleSagaDsl 接口。SimpleSagaDsl 接口提供了描述 Saga 定义的 DSL 支持,其中默认方法 step 返回一个构建 Saga 定义的 StepBuilder 对象。

js
@Component
public class CreateTripSaga implements SimpleSaga<CreateTripSagaState> {
  private final SagaDefinition<CreateTripSagaState> sagaDefinition;
  public CreateTripSaga(final TripServiceProxy tripService,
      final TripValidationServiceProxy tripValidationService,
      final PaymentServiceProxy paymentService) {
    this.sagaDefinition = this.step()
        .withCompensation(tripService.reject,
            CreateTripSagaState::createRejectTripCommand)
        .step()
        .invokeParticipant(tripValidationService.validateTrip,
            CreateTripSagaState::createValidateTripCommand)
        .step()
        .invokeParticipant(paymentService.createPayment,
            CreateTripSagaState::createPaymentCommand)
        .step()
        .invokeParticipant(
            CreateTripSagaState::paymentRequired, paymentService.makePayment,
            CreateTripSagaState::makePaymentCommand)
        .onReply(PaymentFailedReply.class,
            CreateTripSagaState::handlePaymentFailedReply)
        .onReply(Success.class, (state, success) -> state.markAsPaid())
        .step()
        .invokeParticipant(CreateTripSagaState::shouldConfirmTrip,
            tripService.confirm,
            CreateTripSagaState::createConfirmTripCommand)
        .step()
        .invokeParticipant(
            ((Predicate<CreateTripSagaState>) CreateTripSagaState::shouldConfirmTrip)
                .negate(),
            tripService.reject, CreateTripSagaState::createRejectTripCommand)
        .build();
  }
  @Override
  public SagaDefinition<CreateTripSagaState> getSagaDefinition() {
    return this.sagaDefinition;
  }
}

StepBuilder 类中的方法用来描述 Saga 定义的每个步骤中可以执行的操作,如下表所示。

invokeParticipant 和 withCompensation 方法描述的操作以发送命令的形式来完成。在 Saga 定义中,只需要描述如何创建命令对象即可,命令以 CommandWithDestination 对象来表示。在 Saga 实例执行到对应的步骤时,框架会负责发送相应的命令。

创建命令对象的第一种方式是提供 Function<Data, CommandWithDestination> 类型的函数,函数的参数是 Saga 实例当前的上下文对象,返回值是 CommandWithDestination 对象。第二种方式是提供一个 CommandEndpoint 对象来描述发送命令的终端,以及 Function<Data, C> 类型的函数来创建命令对象。类型参数 C 是命令对象的类型。CommandEndpoint 中包含了命令发送的通道、命令类和可能的回应类。

在下面的代码中,validateTrip 是发送 ValidateTripCommand 命令的终端,通过 CommandEndpointBuilder 构建器来创建,其中的 withReply 方法声明了 ValidateTripCommand 命令的成功响应 Success 类和错误响应 InvalidTripReply 类。

java
@Component
public class TripValidationServiceProxy {
  public final CommandEndpoint<ValidateTripCommand> validateTrip = CommandEndpointBuilder
      .forCommand(ValidateTripCommand.class)
      .withChannel(TripValidationServiceChannels.tripValidation)
      .withReply(Success.class)
      .withReply(InvalidTripReply.class)
      .build();
}

我们再回到 CreateTripSaga 类的定义,看一下 Saga 的定义是如何与业务流程关联起来的。第一个步骤对应的是创建初始的行程对象,行程对象与 Saga 实例同时创建,因此不需要发送额外的命令来创建。这个步骤的补偿操作是通过 TripServiceProxy 中的 reject 命令终端来发送 RejectTripCommand 命令,该命令的处理器负责把行程对象的状态改为已拒绝。

第二个步骤对应的操作是进行行程验证,通过 TripValidationServiceProxy 中的 validateTrip 命令终端来发送 ValidateTripCommand 命令。第三个步骤对应的是创建支付记录,通过 PaymentServiceProxy 中的 createPayment 命令终端来发送 CreatePaymentCommand 命令。

第四个步骤对应的是完成支付,不过这个步骤中的动作只有在满足一定的条件时才会被执行,对应的条件通过 invokeParticipant 方法的第一个 Predicate 类型的参数来表示。CreateTripSagaState 对象的 paymentRequired 方法用来判断行程是否需要提前支付,如果是的话,通过 PaymentServiceProxy 对象中的 makePayment 命令终端来发送 MakePaymentCommand 命令,该命令有成功和失败两种不同的回应。onReply 方法的作用是根据回应对象的类型来执行不同的操作。

最后两个步骤用来确定行程的状态,可以是已确认或已拒绝的状态,状态由 CreateTripSagaState 的 shouldConfirmTrip 方法来确定。第五个步骤的动作是通过 TripServiceProxy 中的 confirm 命令终端来发送 ConfirmTripCommand 命令。第六个步骤的动作则是发送 RejectTripCommand 命令。

Saga 实例的上下文对象

Saga 和 SagaDefinition 接口都有一个类型参数来表示上下文对象的类型。在 Saga 实例的生命周期中,上下文对象用来在不同的步骤之间进行数据传递,有些数据可能是 Saga 中的很多步骤所需要的。一个 Saga 中的某些步骤,可能需要使用之前步骤的命令的回应结果,这些数据的共享,都是通过上下文对象来完成的。在 Saga 实例创建的时候,会提供一个上下文对象作为初始状态。在 Saga 的步骤中,可以使用 invokeLocal 方法来修改这个对象的状态,也可以使用 invokeParticipant 方法返回的对象上的 onReply 方法来根据命令的回应修改这个对象。

下面代码中的 CreateTripSagaState 类是 CreateTripSaga 对应的上下文对象类。CreateTripSagaState 类中包含的一些属性作为状态值,一些方法用来创建不同的命令对象,以及一些改变状态值的方法。

java
@Data
@NoArgsConstructor
@RequiredArgsConstructor
public class CreateTripSagaState {
  @NonNull
  private String tripId;
  @NonNull
  private TripDetails tripDetails;
  @NonNull
  private BigDecimal fare;
  private boolean paid;
  public RejectTripCommand createRejectTripCommand() {
    return new RejectTripCommand(this.tripId);
  }
  public ValidateTripCommand createValidateTripCommand() {
    return new ValidateTripCommand(this.tripDetails);
  }
  public ConfirmTripCommand createConfirmTripCommand() {
    return new ConfirmTripCommand(this.tripId);
  }
  public CreatePaymentCommand createPaymentCommand() {
    return new CreatePaymentCommand(this.tripId, this.fare);
  }
  public MakePaymentCommand makePaymentCommand() {
    return new MakePaymentCommand(this.tripId);
  }
  public boolean paymentRequired() {
    return this.fare.compareTo(BigDecimal.valueOf(100)) > 0;
  }
  public void markAsPaid() {
    this.setPaid(true);
  }
  public void handlePaymentFailedReply(final PaymentFailedReply reply) {
    this.setPaid(false);
  }
  public boolean shouldConfirmTrip() {
    return !this.paymentRequired() || (this.paymentRequired() && this.isPaid());
  }
}

创建 Saga 实例

CreateTripSaga 用来管理创建行程的过程,因此该 Saga 的实例应该在创建行程对象的同时被创建。Saga 实例由 SagaInstanceFactory 对象的 create 方法来创建。create 方法的第一个参数是 Saga 对象,第二个参数是对应的上下文对象。

下面的代码展示了 TripService 的 createTrip 方法的实现。在创建 Trip 对象和发布相应的事件之后,首先创建一个 CreateTripSagaState 对象,再创建一个新的 CreateTripSaga 实例。

java
@Service
@Transactional
public class TripService {
  @Autowired
  TripRepository tripRepository;
  @Autowired
  TripFareService tripFareService;
  @Autowired
  TripDomainEventPublisher eventPublisher;
  @Autowired
  SagaInstanceFactory sagaInstanceFactory;
  @Autowired
  CreateTripSaga createTripSaga;
  public TripVO createTrip(final String passengerId, final PositionVO startPos,
      final PositionVO endPos) {
    final ResultWithDomainEvents<Trip, TripDomainEvent> tripAndEvents = Trip
        .createTrip(passengerId, startPos, endPos);
    final BigDecimal fare = this.tripFareService.calculate(startPos, endPos);
    final Trip trip = tripAndEvents.result;
    trip.setFare(fare);
    this.tripRepository.save(trip);
    this.eventPublisher.publish(trip, tripAndEvents.events);
    final TripDetails tripDetails = new TripDetails(passengerId, startPos,
        endPos);
    final CreateTripSagaState data = new CreateTripSagaState(trip.getId(),
        tripDetails, fare);
    this.sagaInstanceFactory.create(this.createTripSaga, data);
    return trip.toTripVO();
  }
}

当 Saga 实例创建之后,其中所包含的步骤会按照顺序来执行。

Saga 单元测试

Eventuate Tram 框架提供了对 Saga 的单元测试支持。下面代码给出了 CreateTripSaga 所对应的单元测试类。Saga 的单元测试以 BDD 的形式来描述。通过 expect 方法来声明每个步骤中所期望发送的命令,以及命令的回应消息。

js
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = TestConfig.class)
@DisplayName("Trip saga")
public class CreateTripSagaTest {
  @Autowired
  TripServiceProxy tripService;
  @Autowired
  TripValidationServiceProxy tripValidationService;
  @Autowired
  PaymentServiceProxy paymentService;
  @Test
  @DisplayName("Create trip")
  public void shouldCreateTrip() {
    final String tripId = this.uuid();
    final TripDetails tripDetails = this.tripDetails0();
    final BigDecimal fare = BigDecimal.valueOf(50);
    given().saga(this.makeCreateTripSaga(),
        new CreateTripSagaState(tripId, tripDetails, fare))
        .expect().command(new ValidateTripCommand(tripDetails))
        .to(TripValidationServiceChannels.tripValidation)
        .andGiven().successReply()
        .expect().command(new CreatePaymentCommand(tripId, fare))
        .to(PaymentServiceChannels.payment).andGiven().successReply()
        .expect()
        .command(new ConfirmTripCommand(tripId))
        .to(TripServiceChannels.trip);
  }
  private CreateTripSaga makeCreateTripSaga() {
    return new CreateTripSaga(this.tripService, this.tripValidationService,
        this.paymentService);
  }
  private TripDetails tripDetails0() {
    return new TripDetails(this.uuid(),
        new PositionVO(BigDecimal.ZERO, BigDecimal.ZERO),
        new PositionVO(BigDecimal.ZERO, BigDecimal.ZERO));
  }
  private String uuid() {
    return UUID.randomUUID().toString();
  }
  @TestConfiguration
  @ComponentScan(basePackageClasses = TripServiceProxy.class)
  static class TestConfig {
  }
}

编排型 Saga

编排型 Saga 没有单独的 Saga 实体来管理业务事务的流程,而是通过不同参与者之间的事件传递来完成。每个参与者只需要添加相应事件的处理器,通过本地事务来完成操作即可。处理的结果以新的事件方式进行发布,从而触发其他参与者的处理逻辑,推动业务事务的进展。

从实现的角度来说,编排型 Saga 只需要利用 Eventuate Tram 框架中提供的事务性消息即可,并不需要额外的支持。业务事务的流程,只存在于事件的发布和处理之中。以取消行程的业务事务为例,行程的取消涉及乘客和司机两个参与者,乘客和司机都可以发起取消行程的请求。如果另外一方同意,那么行程被取消,同时发送 TripCancelledEvent 事件;如果另外一方不同意,那么行程的取消则需要进行调解,在更新行程状态之后,发送 TripCancellationResolutionRequiredEvent 事件。在行程取消之后,行程派发服务可能需要取消正在进行的行程派发动作。如果取消行程还需要有后续的其他动作,只需要添加新的 TripCancelledEvent 事件的处理器即可。

编排型 Saga 的好处在于简单,并不需要附加的 Saga 实体,另外参与者之间是松散耦合的。编排型 Saga 的缺点在于业务事务的逻辑散落在不同的参与者中,不容易理解整个业务的流程,另外参与者可能会由于事件的发布和处理而产生循环依赖关系。由于这样的缺点,编排型 Saga 一般只用来实现非常简单的业务事务,更多的时候,使用编制型 Saga 是更好的选择。

Saga 的隔离性问题

Saga 由一系列本地事务组成,并通过补偿操作来处理失败。从数据库事务的 ACID 特性来说,Saga 只具有 ACD 特性,缺少了隔离性,隔离性保证了多个事务并发执行的结果,与这些事务顺序执行时的结果保持一致。每个事务都相当于在各自隔离的空间中运行,互相并不影响。Saga 并不具备隔离性,这是因为组成 Saga 的本地事务是各自独立提交的。当一个 Saga 实例的某个步骤完成之后,该步骤对应的本地事务就会被提交,该事务对数据库的改动对其他本地事务是可见的。一个正在运行 Saga 实例中的某个步骤在对数据库进行操作时,可以读取到另外一个 Saga 实例产生的部分结果,也可以覆写掉另外一个 Saga 实例已经写入的结果。这可能会造成数据异常。

以创建和取消行程这两个 Saga 为例,当创建行程的 Saga 实例执行到支付这一步骤时,乘客发出了取消行程的请求,取消行程的 Saga 把行程设置为已取消的状态。在这之后,创建行程的 Saga 实例继续执行,最后把行程又重新设置为已确认状态。这就造成了乘客已经取消的行程,仍然被确认和派发。

解决 Saga 隔离性问题的一个常见方案是在应用层次添加锁,可以把领域对象的状态作为锁。在上面的例子中,在创建支付这一步骤完成之后,可以把行程对象的状态设置为等待支付。取消行程的 Saga 首先检查行程的状态,如果发现行程处于等待支付的状态,它可以直接出错,或是等待创建行程的 Saga 实例完成之后,再进行取消的动作。

另外一种解决方案是从数据库中重新读取领域对象。在上面的例子中,创建行程的 Saga 在确认行程之前,重新读取行程对象,如果发现行程对象的状态变为已取消,则直接出错,将导致 Saga 实例自动执行相应的补偿操作。

总结

本课时详细介绍了如何使用 Eventuate Tram 框架来实现编制型和编排型 Saga,尤其是编制型 Saga 的使用。示例应用使用编制型 Saga 来实现创建行程这一业务事务。同时,还介绍了 Saga 的隔离性问题及解决办法。通过本课时的学习,你应该掌握如何在实际的开发中使用 Eventuate Tram 框架来创建 Saga,以满足业务事务的需求。