Skip to content

第18讲:如何处理司机位置更新事件

在第 17 课时中,我介绍了事件源技术的基本概念,并提供了基本的 Java 实现。在实际的开发中,我们通常使用已有的开源框架来实现事件源,不同的编程语言和平台都有事件源的开源实现。下表给出了几个比较流行的开源实现,示例应用使用的是 Axon。

示例应用中使用事件源技术的场景是司机位置的更新。本课时将介绍如何使用 Axon 来更新司机的位置,除了本课时之外,在介绍 CQRS 技术的课时中,也会用到 Axon。

在打车应用中,司机需要实时更新其当前位置。司机的位置信息是派发行程的重要依据,同时也是很多安全相关的功能的基础,打车应用的司机 App 会定期读取 GPS 中的位置信息,并发送给服务器。这属于典型的事件应用的场景。示例应用中虽然没有司机 App,但是有司机模拟器来模拟司机的行为,也会同样发送被模拟的司机位置信息给服务器。

Axon

Axon 是一个事件驱动的微服务的完整解决方案,我们可以完全基于 Axon 来开发事件驱动的微服务架构的应用。Axon 包括 Axon 框架和 Axon 服务器两个部分。Axon 框架是 Axon 中的编程模型,提供了 SDK 来构建应用,支持事件源和 CQRS 等技术;Axon 服务器则是一个高可用性和可伸缩的事件存储。Axon 框架并不一定要使用 Axon 服务器作为事件存储,也可以使用关系型数据库、MongoDB 或内存作为事件存储方式。不过 Axon 服务器提供了额外的功能,适合于生产环境。

Axon 支持 3 种不同类型的消息,这 3 种消息模式,可以适用于不同的应用场景,如下表所示。

Axon 服务器

Axon 服务器是一个通用的分布式消息处理平台,不同的应用可以连接到 Axon 服务器来进行消息传递。一个应用可以发布事件给 Axon 服务器,其他应用可以声明事件的处理器来处理事件。从这个功能上来说,Axon 服务器的作用类似于分布式的事件总线,一个应用也可以发送命令给 Axon 服务器,由其他应用来提供回应。命令只会发送给一个应用。这个功能类似于常见的远程过程调用(RPC)模式。查询会被发送给所有能够回答该查询的应用。

Axon 服务器提供两种交互方式,分别是 8024 端口上的 HTTP 和 8124 端口上的 gRPC。Axon 服务器基于 Spring Boot 实现,打包成单个 JAR 文件,下载之后可以直接运行。推荐的做法是使用 Docker 来运行。下面的命令用来启动 Axon 服务器的 Docker 容器。

java
docker run -p 8024:8024 -p 8124:8124 axoniq/axonserver:4.3.3

Axon 框架

Axon 框架是 Java 应用使用的开发框架,支持 Axon 中 3 种类型的消息发布和处理。在 Spring Boot 应用中使用 Axon 框架非常简单,只需要添加相关的 Maven 依赖,就可以利用 Axon 框架提供的自动配置功能,如下面的代码所示。完成配置之后,Axon 框架中的对象实例都可以通过 @Autowired 来声明。

html
<dependency>
  <groupId>org.axonframework</groupId>
  <artifactId>axon-spring-boot-starter</artifactId>
  <version>4.3.3</version>
</dependency>

在 Spring Boot 应用中,可以使用 Spring 标准的方式来配置 Axon 框架。在下面的 YAML 文件中,axon.axonserver.servers 用来配置 Axon 服务器的连接方式,这里连接的是 8124 端口上的 gRPC 接口。axon.serializer 用来配置事件对象的序列化格式。Axon 框架默认使用 XStream 序列化成 XML 格式。下面代码中的配置值 jackson 用来指定使用 Jackson 序列化成 JSON 格式。

java
axon:
  axonserver:
    servers: ${DOCKER_HOST_IP:localhost}:8124
  serializer:
    events: jackson

司机模拟器

司机模拟器是示例应用提供的一个辅助工具,其作用是代替司机 App 在打车应用中的作用,司机模拟器可以同时模拟多个司机的行为。每个被模拟的司机有固定的行为模式,每隔 5 秒钟改变一次位置,位置改变的规则如下所示:

  • 从当前位置开始进行模拟,第一次模拟时使用初始位置;
  • 随机确定是否应该转向,如果转向,随机确定是左转还是右转;
  • 以随机的速度前进一段距离,作为新的位置。

除了位置之外,司机还可能处于不同的状态,如下表所示。代码中使用枚举类型 DriverState 来表示。

下图是司机模拟器的界面,上面展示了每个司机的状态,以及可以进行的操作。

发布事件

Axon 框架中有两类不同的事件,分别是从聚合中发布的领域事件,以及从其他组件发布的普通事件。司机模拟器只需要发布普通事件即可。领域事件相关的内容,将会在介绍 CQRS 技术的课时(第 22 课时)中进行说明。

Axon 框架 EventGateway 的 publish 方法可以发布多个事件。EventGateway 接口使用 Object 作为事件类型,因此任何对象都可以作为事件来发布。在司机模拟器中,每个被模拟的司机都会定期发送其位置,位置变化的事件由 DriverLocationUpdatedEvent 类表示,如下面的代码所示。

java
@Data
public class DriverLocationUpdatedEvent {

  private DriverLocation location;

  private DriverState state;

  private long timestamp;
}

在下面的代码中,DriverSimulator 类中的 sendLocation 方法使用 EventGateway 对象来发布 DriverLocationUpdatedEvent 对象。

java
public class DriverSimulator {

  private final EventGateway eventGateway;

  private void sendLocation() {
    final DriverLocationUpdatedEvent event = new DriverLocationUpdatedEvent();
    event.setTimestamp(System.currentTimeMillis());
    event.setLocation(this.currentLocation);
    event.setState(this.state);
    this.eventGateway.publish(event);
  }
}

Axon 中所有组件的通讯都是通过消息对象来完成的。Message 是所有消息对象的接口,其类型参数 T 表示载荷对象的类型。所有消息对象都由 标识符载荷元数据这 3 个部分组成,标识符的类型是 String,元数据由 MetaData 类来表示,而载荷则根据消息的类型来确定。MetaData 类实现了 Map<String, Object> 接口,实际是一个名值对的哈希表,用来包含与消息相关的辅助信息。需要注意的是,消息对象都是不可变,对消息对象的修改,都会创建新的消息对象。

对于 Axon 支持的 3 种类型的消息,都有与之对应的 Message 子接口,表示事件的 EventMessage 接口在 Message 接口的基础上,增加了事件发生的时间戳,以 Instant 对象表示。通过 EventGateway 发布的事件对象,都会被自动封装成 EventMessage 对象,再发送给 Axon 服务器。

可以访问 Axon 服务器的 8024 端口来查看 Axon 服务器提供的管理界面,如下图所示,可以查看所有发布的事件的详细信息。

处理事件

事件的处理器通过在处理方法上添加 @EventHandler 注解来声明。事件处理方法的第一个参数是事件消息的载荷类型,这个载荷类型用来确定可以被处理的事件类型。一个事件处理类中可以包含多个方法来处理不同类型的事件,对于一个事件处理对象,最多只有一个处理方法被调用。如果有多个方法可以匹配,那么参数类型最具体的那个方法会被调用;如果处理方法不需要访问事件的载荷对象,则可以通过 @EventHandler 注解的 payloadType 属性来声明载荷对象的类型,而不需要添加额外的方法参数。

除了事件的载荷之外,处理方法还可以声明其他类型的参数来自动获取事件对象中的其他值,如下表所示。

从司机模拟器中发布的 DriverLocationUpdatedEvent 事件,会被行程派发服务来处理。在下面的代码中,DriverLocationUpdater 类中的 handle 方法用来处理 DriverLocationUpdatedEvent 事件。根据事件中司机状态的不同,调用 DriverLocationService 对象中的不同方法。

java
@Component
public class DriverLocationUpdater {

  @Autowired
  DriverLocationService driverLocationService;

  @EventHandler
  public void handle(final DriverLocationUpdatedEvent event) {
    final DriverLocation location = event.getLocation();
    if (event.getState() == DriverState.AVAILABLE) {
      this.driverLocationService.addAvailableDriver(location);
    } else {
      this.driverLocationService.removeAvailableDriver(location.getDriverId());
    }
  }
}

位置查询

在行程派发服务中,司机的位置信息被保存在 Redis 中。通过 Redis 提供的地理位置查询功能,可以从一个点出发,找到指定距离内的其他点。在下面的代码中,DriverLocationService 服务类负责管理所有处于可用状态的司机,其中 addAvailableDriver 和 removeAvailableDriver 方法分别用来添加和删除可用的司机,由上一节中 DriverLocationUpdatedEvent 的 handle 方法来调用。访问 Redis 时使用的是 Spring Data Redis 中的 RedisTemplate。

在 findAvailableDrivers 方法中,两个参数是作为查找起始点的地理位置坐标。接着通过调用 Redis 的 GEORADIUS 命令来查询以起始位置为圆心,半径 10 公里内的全部可用司机,并以 AvailableDriver 对象的形式返回。

java
@Service
public class DriverLocationService {

  @Autowired
  RedisTemplate<String, String> redisTemplate;

  private final String key = "available_drivers";
  public static final Distance searchRadius = new Distance(10,
      DistanceUnit.KILOMETERS);

  public void addAvailableDriver(final DriverLocation location) {
    this.redisTemplate.opsForGeo()
        .add(this.key, new Point(location.getLng().doubleValue(),
                location.getLat().doubleValue()),
            location.getDriverId());
  }

  public void removeAvailableDriver(final String driverId) {
    this.redisTemplate.opsForGeo().remove(this.key, driverId);
  }

  public Set<AvailableDriver> findAvailableDrivers(final BigDecimal lng,
      final BigDecimal lat) {
    final GeoResults<GeoLocation<String>> results = this.redisTemplate.opsForGeo()
        .radius(this.key, new Circle(new Point(lng.doubleValue(), lat.doubleValue()),
                searchRadius),
            GeoRadiusCommandArgs.newGeoRadiusArgs().includeCoordinates());
    if (results != null) {
      return results.getContent().stream().filter(Objects::nonNull)
          .map(result -> {
            final GeoLocation<String> content = result.getContent();
            final Point point = content.getPoint();
            return new AvailableDriver(content.getName(),
                BigDecimal.valueOf(point.getX()),
                BigDecimal.valueOf(point.getY()));
          })
          .collect(Collectors.toSet());
    }
    return Collections.emptySet();
  }
}

总结

示例应用中的司机模拟器用来模拟司机的行为,并定时发送其位置信息,示例应用使用 Axon 框架和 Axon 服务器来作为事件源技术的实现。本课时介绍了 Axon 服务器及其框架,以及如何使用 Axon 来发布和处理司机位置更新事件,并使用 Redis 来查询可用司机。