Skip to content

第19讲:如何实现行程派发与调度算法

第 18 课时介绍了司机模拟器如何发布位置更新事件,以及行程派发服务如何处理这些事件,并维护所有可用的司机信息,本课时紧接着第 18 课时的内容,主要介绍行程派发服务的实现。行程派发是示例应用的核心领域,因此需要进行重点介绍。本课时只对代码实现中的重要部分进行介绍,完整的实现请参考示例应用在 GitHub 上的源代码。

调度算法

当乘客发出创建行程的请求之后,该创建请求首先需要被验证,行程验证由专门的服务来完成。在第 21 课时介绍 Saga 模式的实现时,会具体介绍行程验证服务。当行程通过验证之后,行程会处于已确认状态,与此同时,行程派发服务会开始执行该行程的调度任务。

派发行程的操作由 DispatchService 类的 dispatchTrip 方法来完成。当行程派发服务接收到表示行程已经被确认的 TripConfirmedEvent 事件之后,dispatchTrip 方法就会被调用。下面的代码给出了 dispatchTrip 方法的实现,具体的派发过程分成下面 3 个步骤。

  1. 以行程的起始位置为中心,找到所有处于可用状态的司机,这些是派发行程的候选。这一步调用 findAvailableDrivers 方法来完成,实际调用的是 DriverLocationService 类的 findAvailableDrivers 方法。在第 18 课时已经介绍了这个方法,它通过 Redis 来查找特定范围之内的可用的司机。
  2. 基于行程信息和可用司机的列表来创建表示行程派发的领域类 Dispatch 的对象,保存 Dispatch 对象并发布相关的事件,这是事务性消息模式的应用。
  3. 如果存在可用的司机,则调用 TripAcceptanceService 类的 startTripAcceptanceCheck 方法来检查是否有司机接受行程。

由于第 1 个步骤已经在第 18 课时进行了介绍,下面会具体对第 2 个和第 3 个步骤进行介绍。

java
@Transactional
public void dispatchTrip(final String tripId, final TripDetails tripDetails) {
  final Set<AvailableDriver> availableDrivers = this
      .findAvailableDrivers(tripDetails);
  this.saveAndPublishEvents(
      Dispatch.createDispatch(tripId, tripDetails, availableDrivers));
  if (!availableDrivers.isEmpty()) {
    this.tripAcceptanceService.startTripAcceptanceCheck(tripId, tripDetails,
        this.acceptanceCheckInterval,
        this::selectTripAcceptance, this::notifyTripDispatchFailed);
    log.info("Dispatch trip {} to drivers {}", tripId, availableDrivers);
  }
}

行程派发领域对象

每个行程的派发动作都有自己的生命周期,因此需要定义相关的实体类 Dispatch,并保存在数据库中。行程派发是所在聚合的根实体,每个行程被派发之后,查找到的可用司机会被邀请来接受行程。对行程的接受动作也是有生命周期的,同样以实体的形式来定义,即 TripAcceptance,属于聚合的一般实体。行程派发实体和行程接受实体存在一对多的关系。下图是这两个实体之间关系的 ER 图。

在创建行程派发对象时,需要提供的是行程信息和可用司机的列表。Dispatch 类的静态方法 createDispatch 用来创建 Dispatch 对象和需要发布的事件对象,如下面的代码所示。在创建出 Dispatch 对象之后,对于每一个表示可用的司机的 AvailableDriver 对象,创建出一个与之对应的 TripAcceptance 对象,用来追踪每个司机接受行程的状态。这些 TripAcceptance 对象与 Dispatch 对象关联起来。

对于发布的事件对象,如果可用司机的列表为空,则直接把 Dispatch 对象设置为失败状态,发布的事件为 TripDispatchFailedEvent 对象;否则,发布的事件为 TripDispatchedEvent 对象。Dispatch 对象和事件对象以 ResultWithDomainEvents 的形式返回。DispatchService 的 saveAndPublishEvents 方法用来保存 Dispatch 对象并发布事件。

java
public static ResultWithDomainEvents<Dispatch, DispatchDomainEvent> createDispatch(
    final String tripId,
    final TripDetails tripDetails,
    final Set<AvailableDriver> drivers) {
  final PositionVO startPos = tripDetails.getStartPos();
  final Dispatch dispatch = new Dispatch(tripId, startPos.getLng(),
      startPos.getLat());
  final List<TripAcceptance> tripAcceptances = drivers.stream()
      .map(driver -> new TripAcceptance(driver.getDriverId(),
          driver.getPosLng(),
          driver.getPosLat()))
      .collect(Collectors.toList());
  dispatch.setTripAcceptances(tripAcceptances);
  final DispatchDomainEvent event;
  if (drivers.isEmpty()) {
    dispatch.setState(DispatchState.FAILED);
    dispatch.setFailedReason(TripDispatchFailedReason.NO_DRIVERS_AVAILABLE);
    event = new TripDispatchFailedEvent(tripId,
        TripDispatchFailedReason.NO_DRIVERS_AVAILABLE);
  } else {
    final Set<String> driversId = drivers.stream()
        .map(AvailableDriver::getDriverId)
        .collect(Collectors.toSet());
    event = new TripDispatchedEvent(tripId, tripDetails, driversId);
  }
  return new ResultWithDomainEvents<>(dispatch, event);
}

接受行程

在行程派发之后,需要通知司机来接受行程。对于乘客 App 说,可以使用消息推送来发送通知;对于 Web 应用来说,可以使用 WebSocket 来发送通知。只需要添加 TripDispatchedEvent 事件的处理器,就可以使用不同的方式来发送通知。

当司机接收到通知之后,可以选择是否接受行程。在一定的时间之内,所有收到通知的司机都可以选择接受行程。在初始的等待时间过后,如果有司机接受行程,那么会从接受行程的司机中,选择一个来作为行程的接受者,而其他的司机则会收到通知,说明行程已经被其他司机所接受。如果没有司机接受行程,那么会再等待一段时间之后,再进行检查;如果在给定的期限之后,仍然没有司机接受行程,那么行程派发失败。

司机接受行程的请求由行程管理服务提供的 REST API 来负责处理。REST 控制器使用 TripService 处理请求,如下面的代码所示。

java
@PostMapping("{id}/accept")
public void acceptTrip(@PathVariable("id") String id,
    @RequestBody AcceptTripRequest request) {
  tripService
      .driverAcceptTrip(id, request.getDriverId(), request.getPosLng(), request.getPosLat());
}

下面的代码给出了 TripService 类的 driverAcceptTrip 方法的实现。在实现中,一个 DriverAcceptTripEvent 事件会被发布。withTrip 方法的作用是根据行程的标识符找到对应的 Trip 对象,并执行操作。

java
public void driverAcceptTrip(final String tripId, final String driverId,
    final BigDecimal posLng,
    final BigDecimal posLat) {
  this.withTrip(tripId, trip -> this.eventPublisher.publish(trip,
      ImmutableList
          .of(new DriverAcceptTripEvent(
              new DriverAcceptTripDetails(driverId, posLng, posLat)))));
}

DriverAcceptTripEvent 事件的处理器调用 DispatchService 类的 submitTripAcceptance 方法,如下面的代码所示。其中 withCurrentDispatch 方法的作用是根据行程的标识符,找到该行程当前的 Dispatch 对象,再对该 Dispatch 对象进行操作。

java
@Transactional
public void submitTripAcceptance(final String tripId,
    final DriverAcceptTripDetails details) {
  log.info("Driver {} wants to accept trip {}", details.getDriverId(),
      tripId);
  this.withCurrentDispatch(tripId, dispatch -> {
    this.dispatchRepository.save(dispatch.submitTripAcceptance(details));
    this.tripAcceptanceService.addDriverToAcceptTrip(tripId, details);
  });
}

在下面的代码中,Dispatch 类的 submitTripAcceptance 方法用来对当前 Dispatch 对象进行修改,把司机对应的 TripAcceptance 对象的状态改为已提交。

java
public Dispatch submitTripAcceptance(
    final DriverAcceptTripDetails acceptTripDetails) {
  Stream.ofAll(this.tripAcceptances)
      .find(tripAcceptance -> Objects
          .equals(tripAcceptance.getDriverId(),
              acceptTripDetails.getDriverId()))
      .toJavaOptional()
      .ifPresent(tripAcceptance -> {
        tripAcceptance.setState(TripAcceptanceState.SUBMITTED);
        tripAcceptance.setCurrentPosLng(acceptTripDetails.getPosLng());
        tripAcceptance.setCurrentPosLat(acceptTripDetails.getPosLat());
      });
  return this;
}

TripAcceptanceService 类负责找到合适的司机来接受行程,如下面的代码所示。在 startTripAcceptanceCheck 方法中,把行程的起始地理位置添加到 Redis 中,然后启动一个定时任务来检查行程的接受状态。该定时任务由 CheckTripAcceptanceTask 类来描述。在每次执行任务时,调用 findDriverToAcceptTrip 方法来从当前已接受行程的司机中,找到距离行程的起始位置最近的司机,并选中该司机来接受行程。

如果当前没有司机接受行程,那么会启动一个新的执行同样操作的任务,并在给定的时间间隔之后运行。对于一个行程,检查任务最多运行 3 次,如果 3 次之后仍然没有司机接受行程,会调用指定的错误回调函数 failureCallback;如果有司机接受行程,则会调用指定的成功回调函数 successCallback。当有司机接受行程时,addDriverToAcceptTrip 方法会被调用来把司机的位置信息添加到 Redis 中,可以在下一次定时任务中被查询到。addDriverToAcceptTrip 方法会被 DispatchService 类的 submitTripAcceptance 方法调用。

java
public class TripAcceptanceService {
  @Autowired
  RedisTemplate<String, String> redisTemplate;
  @Autowired
  TaskScheduler taskScheduler;
  private final Distance searchRadius = new Distance(10,
      DistanceUnit.KILOMETERS);
  private final String passenger = "__passenger__";
  private final int acceptanceCheckMaxTimes = 3;
  public void startTripAcceptanceCheck(final String tripId,
      final TripDetails tripDetails,
      final Duration interval,
      final BiConsumer<String, String> successCallback,
      final BiConsumer<String, TripDispatchFailedReason> failureCallback) {
    this.redisTemplate.opsForGeo()
        .add(this.keyForTripAcceptance(tripId),
            new Point(tripDetails.getStartPos().getLng().doubleValue(),
                tripDetails.getStartPos().getLat().doubleValue()),
            this.passenger);
    this.scheduleCheckTripAcceptanceTask(tripId, interval, successCallback,
        failureCallback, 1);
  }
  private void scheduleCheckTripAcceptanceTask(
      final String tripId,
      final Duration interval,
      final BiConsumer<String, String> successCallback,
      final BiConsumer<String, TripDispatchFailedReason> failureCallback,
      final int attempt) {
    this.taskScheduler
        .schedule(new CheckTripAcceptanceTask(tripId, interval, successCallback,
                failureCallback,
                attempt),
            Instant.now().plusMillis(interval.toMillis()));
  }
  public void addDriverToAcceptTrip(final String tripId,
      final DriverAcceptTripDetails details) {
    this.redisTemplate.opsForGeo()
        .add(this.keyForTripAcceptance(tripId),
            new Point(details.getPosLng().doubleValue(),
                details.getPosLat().doubleValue()), details.getDriverId());
  }
  private Optional<String> findDriverToAcceptTrip(final String tripId) {
    final GeoResults<GeoLocation<String>> results = this.redisTemplate
        .opsForGeo()
        .radius(this.keyForTripAcceptance(tripId), this.passenger,
            this.searchRadius,
            GeoRadiusCommandArgs.newGeoRadiusArgs().sortAscending());
    return results.getContent().stream()
        .map(result -> result.getContent().getName())
        .filter(name -> !Objects.equals(name, this.passenger))
        .findFirst();
  }
  private String keyForTripAcceptance(final String tripId) {
    return String.format("accept_trip_%s", tripId);
  }
}

当有司机被选中接受行程时,DispatchService 的 selectTripAcceptance 方法会被调用,如下面的代码所示。这个方法会对 Dispatch 对象及其关联的 TripAcceptance 对象进行修改,并发布相应的事件。只有被选中的司机所对应的 TripAcceptance 对象的状态会变为已选中,其他的 TripAcceptance 对象的状态会变为已拒绝,与之相关的事件也会被发布。

java
@Transactional
public void selectTripAcceptance(final String tripId, final String driverId) {
  log.info("Select driver {} to accept trip {}", driverId, tripId);
  this.withCurrentDispatch(tripId, dispatch ->
      this.saveAndPublishEvents(dispatch.selectTripAcceptance(driverId)));
}

事件处理配置

下面的代码是 Eventuate Tram 框架使用的领域事件处理相关的配置,产生事件的领域对象是 Trip。创建的 DomainEventHandlers 对象对 Trip 对象产生的 TripConfirmedEvent 和 DriverAcceptTripEvent 两种事件进行处理。

js
public class DispatchServiceEventConsumer {
  @Autowired
  DispatchService dispatchService;
  private static final Logger LOGGER = LoggerFactory
      .getLogger(DispatchServiceEventConsumer.class);
  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
        .forAggregateType("io.vividcode.happyride.tripservice.domain.Trip")
        .onEvent(TripConfirmedEvent.class, this::onTripConfirmed)
        .onEvent(DriverAcceptTripEvent.class, this::onDriverAcceptTrip)
        .build();
  }
  private void onTripConfirmed(DomainEventEnvelope<TripConfirmedEvent> envelope) {
    TripDetails tripDetails = envelope.getEvent().getTripDetails();
    try {
      dispatchService.dispatchTrip(envelope.getAggregateId(), tripDetails);
    } catch (Exception e) {
      LOGGER.warn("Failed to dispatch trip {}", envelope.getAggregateId(), e);
    }
  }
  private void onDriverAcceptTrip(DomainEventEnvelope<DriverAcceptTripEvent> envelope) {
    dispatchService.submitTripAcceptance(envelope.getAggregateId(),
        envelope.getEvent().getAcceptTripDetails());
  }
}

事件驱动的微服务总结

本课时是事件驱动的微服务部分的最后一个课时,我对事件驱动的微服务做一下总结。事件驱动的微服务使用异步传递的消息来代替同步的微服务 API 调用。当一个微服务的领域对象状态发生变化时,会发布相应的事件来通知感兴趣的其他微服务。每个微服务可以添加对感兴趣的领域事件的处理器,来修改自己内部的对象状态,事件的发布和处理使用发布者 - 订阅者(PubSub) 模式。事件的发布者和处理者之间不存在耦合关系。

值得一提的是,在事件驱动的微服务中,每个微服务所发布和处理的事件,成为了其公开 API 的一部分,需要在设计时与其他微服务进行协调。比如,一个事件的发布者不能随意修改事件的载荷格式,不过事件驱动设计的一个好处是事件的版本更新很容易。比如,当前的某个事件的类型声明是 MyEvent,如果需要改变该事件的载荷格式时,只需要创建一个新的事件类型 MyEvent2 即可。在代码迁移阶段,事件的发布者可以同时发布这两种类型的事件,这样可以保证已有的代码不会出错。等所有的事件处理器都升级到使用新的事件类型之后,只需要停止旧版本的事件发布即可。

为了保证对象状态的修改和事件的发布之间的一致性,我们使用事务性消息模式,典型的做法是事务性发件箱模式。Eventuate Tram 框架提供了对事务性发件箱模式的支持,示例应用使用这个框架来进行跨服务的消息传递。

与事务性消息模式不同的是,事件源技术使用事件来描述所有对对象状态的修改。这样可以产生一个完整的对象状态修改的日志记录,形成一个审核日志。通过这些事件,可以查询到对象在任意时刻的状态。

总结

作为事件驱动的微服务部分的最后一个课时,本课时主要介绍了行程派发服务的一些实现细节,包括其中的领域对象、行程派发算法和接受行程的实现。这些实现细节可以帮助你更好的理解事务性消息模式的使用。最后对事件驱动的微服务做了总结。相信通过部分的学习,你已经掌握或了解了事件驱动的微服务的开发,在今后的工作中可以按照事件驱动的思路来设计和实现微服务。