Skip to content

第15讲:什么是事务性消息模式

第 14 课时介绍了事件驱动设计的基本概念,在使用了消息代理之后,应用中产生的事件以消息的形式进行发布,消息的消费者接收到事件并进行处理。如果消息代理可以在传递消息时提供至少有一次的保证性,那么只要消息被成功发布,就可以确保该消息对应的事件必定会得到处理。

事务性消息(Transactional Messaging)的目的是保证数据的一致性。在示例应用中,当收到创建行程的请求之后,行程服务会把行程信息保存在关系型数据库中,同时发布表示行程已创建的事件 TripCreatedEvent。很显然,行程信息的保存和 TripCreatedEvent 事件的发布,这两个动作要么同时发生,要么同时不发生。如果只有一个动作发生,那么必然会产生数据一致性的问题。而这两个动作都可能失败,为了保证原子性,通常需要用到事务。

对于关系数据库中的事务,我们都不陌生。如果上述的两个动作是对同一个数据库中表的操作,我们使用事务就可以轻松解决。两个动作在同一个事务中,如果这两个动作都成功,事务才会被提交,否则事务会自动回滚。如果两个动作是对两个不同数据库的操作,那么也可以使用 XA 事务的两阶段提交协议(Two-Phase Commit Protocol,2PC)。

在行程服务的实现中,行程信息被保存在 PostgreSQL 数据库中,而事件发布则由 Apache Kafka 来完成。Kafka 并不支持 XA 事务,这使得无法通过 XA 事务来解决问题。为了解决这个问题,就需要用到下面介绍的事务性发件箱模式。

事务性发件箱模式

事务性发件箱(Transactional Outbox)模式使用一个数据库表来保存需要发布的事件,这个表称为事件的发件箱。通过使用这种模式,发布事件的动作被转换成一个数据库操作,因此可以使用一个本地数据库事务来保证原子性。对于保存在发件箱表中的事件,需要一个独立的消息中继进程来转发给消息代理。

下图给出了事务性发件箱模式的示意图。在行程服务对行程表进行操作时,包括插入、更新和删除操作,会同时在发件箱表中插入对应的事件记录,对这两个表的操作在同一个数据库事务中。如果对行程表的操作成功,则发件箱表中必然有对应的事件;如果对行程表的操作失败,则发件箱表中必然没有对应的事件。消息中继负责读取发件箱表中的记录,并发送事件给消息代理。

实现事务性发件箱模式的一个重要问题是如何有效读取发件箱表中的记录,一般的做法是使用下面介绍的变化数据捕获技术。

变化数据捕获

消息中继需要监控发件箱表,当有记录插入时,就需要发布消息到消息代理,这种监控数据库变化的技术称为变化数据捕获(Change Data Capture,CDC)。有很多不同的方法可以捕获到数据库表中的改动,常见的做法如下所示。

  • 更新时间戳。表中包含一个字段来记录每一行的更新时间戳。在检查数据变化时,更新时间戳大于上一次捕获的时间戳的行,都是这一次需要处理的内容。
  • 版本号。表中包含一个字段来记录数据的版本号。当一行的数据发生变化时,这一行的版本号被更新为当前的版本号,每次捕获变化时,选择版本号与当前版本号相同的行。当捕获完成之后,当前版本号被更新为新的值,为下一次捕获做准备。
  • 状态指示符。表中包含一个字段来标记每一行是否发生了变化。
  • 触发器。当表中的数据产生变化时,数据库的触发器负责往另外一个历史记录表中插入数据来记录对应的事件。在捕获变化时,只需要查询这个历史记录表即可。
  • 扫描事务日志。大部分数据库管理系统使用事务日志来记录对数据库的改动。通过扫描和解析事务日志的内容,可以捕获数据的变化。

上述方法可以根据是否使用事务日志划分成两类。事务日志的好处是对数据库没有影响,也不要求对应用的表结构和代码进行修改,另外还有更好的性能。事务日志的不足之处在于,事务日志的格式并没有统一的标准,不同的数据库系统有自己的私有实现,而且会随着版本更新而变化。这就要求解析事务日志的代码需不断更新。幸运的是,有不少开源库可供使用。

事务日志

下面对 MySQL 和 PostgreSQL 中的事务日志进行具体的介绍。
MySQL

MySQL 使用二进制日志(Binary Log,binlog)来记录数据库变化,二进制日志中包含的事件描述数据库中的变化,包括表创建和表中数据的变化。二进制日志有两个重要的作用。

第一个作用是复制(Replication)。主服务器(Master)上的二进制日志包含了数据变化的记录。在进行复制时,二进制日志被发送到从服务器(Slave),从服务器通过执行日志中包含的事件来完成复制。

第二个作用是进行数据恢复。当从备份中恢复数据之后,在二进制日志文件中,该备份创建之后产生的事件会被重新执行,从而把数据恢复到最近的状态。

MySQL 支持 3 种不同的二进制日志格式,通过参数 binlog-format 来指定,如下表所示。

语句和行格式各有优势。有些 SQL 语句可能不会产生对行的改动,如找不到匹配行的 UPDATE 或 DELETE 语句。这样的 SQL 语句可以记录在语句格式日志中,但是不会出现在行格式的日志中。语句格式的问题在于,某些情况下语句的执行并没有确定的结果。

MySQL 中的变化数据捕获技术一般通过二进制日志文件的复制来完成。具体的做法是把要监控的 MySQL 数据库作为复制的主服务器,而捕获变化的客户端作为复制的从服务器,这样就可以自动获取到二进制日志文件,并解析其中的事件。

下面给出了 MySQL 8 服务器的配置文件,用来配置二进制日志文件。对于日志文件的保留时间,旧版本的 MySQL 使用 expire_logs_days 配置项,该文件应该被添加到 MySQL 服务器的 /etc/mysql/conf.d 目录中。推荐的做法是创建自定义的 Docker 镜像,也可以使用我创建的 quay.io/alexcheng1982/mysql-cdc 镜像。

sql
[mysqld]
default-authentication-plugin = mysql_native_password
server-id                     = 1
log_bin                       = mysql-bin # 日志文件名称的前缀
binlog_expire_logs_seconds    = 86400     # 日志文件保留时间
binlog_format                 = row       # 使用行格式

在 Java 中,读取 MySQL 二进制日志文件最常用的是 mysql-binlog-connector-java 库,该库实现了 MySQL 的二进制日志文件的复制协议,可以直接连接 MySQL 服务器并解析事件。需要注意的是,连接 MySQL 服务器的用户需要具有 REPLICATION SLAVE 和 REPLICATION CLIENT 权限。从测试的角度,可以直接使用 root 用户;在实际的生产环境,应该使用专有的 MySQL 用户并配置好权限。

下面的代码是 mysql-binlog-connector-java 库的使用示例。表示事件的 Event 对象有 EventHeader 和 EventData 两个属性:EventHeader 中包含的是事件的元数据;EventData 接口的不同实现类表示不同类型的事件数据。EventType 表示事件的类型,最常见的 3 种事件类型是 EXT_WRITE_ROWS、EXT_UPDATE_ROWS 和 EXT_DELETE_ROWS,分别对应于插入行、更新行和删除行。

java
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "myrootpassword");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(event -> {
  EventType type = event.getHeader().getEventType();
  switch (type) {
    case EXT_WRITE_ROWS:
      WriteRowsEventData writeData = event.getData();
      String writeResult = "Insert: " + writeData.getRows().stream()
          .map(Arrays::toString).collect(Collectors.joining(",\n"));
      System.out.println(writeResult);
      break;
    case EXT_UPDATE_ROWS:
      UpdateRowsEventData updateData = event.getData();
      String updateResult = "Update: " + updateData.getRows().stream()
          .map(entry -> String.format("before: %s, after: %s",
              Arrays.toString(entry.getKey()),
              Arrays.toString(entry.getValue())))
          .collect(Collectors.joining(",\n"));
      System.out.println(updateResult);
      break;
    case EXT_DELETE_ROWS:
      DeleteRowsEventData deleteData = event.getData();
      String deleteResult = "Delete: " + deleteData.getRows().stream()
          .map(Arrays::toString).collect(Collectors.joining(",\n"));
      System.out.println(deleteResult);
      break;
  }
});
client.connect();

下面的代码用来启动运行时需要的 MySQL 8 容器:

java
docker run --rm -p 3306:3306 -e MYSQL_ROOT_PASSWORD=myrootpassword quay.io/alexcheng1982/mysql-cdc

PostgreSQL

PostgreSQL 使用写提前日志(Write-Ahead Logging,WAL)来保证数据的完整性。WAL 的核心思想是对数据文件的写入,必须发生在相应的改动被记录在日志之后。当数据库崩溃之后,可以从日志中进行恢复,只需要重新应用日志中记录的改动即可。

PostgreSQL 的逻辑解码(Logical Decoding)对 WAL 中的内容进行解码,转换成特定的格式。在进行逻辑复制时,一个复制位置(Replication Slot)代表一个变化的流,可以在一个客户端中进行重放。输出插件用来把 WAL 中的内容转换成复制位置的消费者所期望的格式,逻辑解码最早在 PostgreSQL 9.4 版本中引入。下表给出了常用的输出插件,其中 pgoutput 是 PostgreSQL 10 及以上版本自带的输出插件,除了它之外的其他插件都需要手动安装并启用。

输出插件格式
pgoutputPostgreSQL 逻辑复制协议
wal2jsonJSON
decoderbufsProtocol Buffers

如果使用 pgoutput 之外的输出插件,在安装对应的插件之后,需要在 postgresql.conf 文件中配置 PostgreSQL 加载对应的插件,如下面的代码所示。

java
shared_preload_libraries = 'decoderbufs,wal2json'

接着需要配置 PostgreSQL 复制时的逻辑解码。下面的代码给出了 postgresql.conf 文件中的相关配置。

sql
max_wal_senders       = 4        # walsender进程的最大数量
wal_level             = logical  # WAL文件的级别
max_replication_slots = 4        # 复制位置的最大数量

推荐使用已有的 debezium/postgres 镜像,该镜像已经安装了 wal2json 和 decoderbufs 两个输出插件。

PostgreSQL 中的变化数据捕获技术一般使用 PostgreSQL 的流复制协议来读取输出插件产生的内容。在 Java 中,PostgreSQL 的 JDBC 驱动可以直接读取相应的事件。下面的代码展示了如何使用 PostgreSQL 的 JDBC 驱动来读取数据变化事件,在连接到 PostgreSQL 数据库之后,首先创建一个名为 demo_logical_slot 的复制位置,并指定输出插件为 wal2json,接着创建一个读取该复制位置的流,最后读取流中的内容并输出。

java
String url = "jdbc:postgresql://localhost:5432/postgres";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);

String slogName = "demo_logical_slot";
replConnection.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName(slogName)
    .withOutputPlugin("wal2json")
    .make();

PGReplicationStream stream = replConnection.getReplicationAPI()
    .replicationStream()
    .logical()
    .withSlotName(slogName)
    .withStatusInterval(20, TimeUnit.SECONDS)
    .start();

while (true) {
  ByteBuffer msg = stream.readPending();

  if (msg == null) {
    TimeUnit.MILLISECONDS.sleep(10L);
    continue;
  }

  int offset = msg.arrayOffset();
  byte[] source = msg.array();
  int length = source.length - offset;
  System.out.println(new String(source, offset, length));
}

下面的 JSON 数据对应的是往 demo 表中插入一行之后的输出,该 JSON 数据由 wal2json 插件生成。

java
{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "demo",
      "columnnames": [
        "id",
        "name"
      ],
      "columntypes": [
        "integer",
        "character varying"
      ],
      "columnvalues": [
        5,
        "a"
      ]
    }
  ]
}

下面的代码用来启动运行时需要的 PostgreSQL 容器:

java
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=postgres debezium/postgres:12-alpine

数据库轮询

如果不能通过读取事务日志的方式来捕获数据变化,可以采用数据库轮询的形式,数据库轮询的做法是定期查询数据库中的表数据,来找出变化的行。这里需要在表中添加额外的字段,如更新时间戳、版本号或状态指示符等。

比如,可以在发件箱表中添加一个字段 published 来标明每一行对应的事件是否被发布。在每次查询时,总是选择 published 字段的值为 0 的行,并尝试发送事件到消息代理。当发送成功后,把对应行的 published 字段的值更新为 1。

CDC 库

直接读取事务日志的做法在很多时候都过于复杂,有一些开源 CDC 库可供使用,如下表所示。

名称支持数据库开发者
DebeziumMySQL、PostgreSQL、Oracle、SQL Server、DB2、MongoDB、CassandraRedHat
SpinalTapMySQLAirbnb
maxwellMySQLZendesk
mysql_streamerMySQLYelp
DBLogMySQL、PostgreSQLNetflix
Eventuate CDCMySQL、PostgreSQLEventuate

下面对 Debezium 和 Eventuate CDC 进行具体介绍。

Debezium

Debezium 是流行的开源 CDC 库,构建在 Apache Kafka 之上,提供了 Kafka Connect 兼容的连接器,可以把数据库中的变化事件发布成 Kafka 中的消息。Debezium 提供了对应不同类型的数据库连接器,只需要把连接器部署到 Kafka Connect 即可,下图是 Debezium 的架构图。

如果你的应用也使用 Kafka,那么 Debezium 是一个不错的选择。通过 Kafka Streams API 可以把 Debezium 发布的消息进行转换,并发布到其他主题中,还可以使用连接器输出到其他第三方消费者。Debezium 也支持嵌入在 Java 应用中运行。

Eventuate CDC

Eventuate CDC 是 Eventuate 平台的一部分,也是 Eventuate 提供的事务性消息框架的基础。Eventuate CDC 对 MySQL 和 PostgreSQL 使用事务日志,对其他数据库使用轮询。在发送消息时,Eventuate CDC 支持 Apache Kafka、Apache ActiveMQ、RabbitMQ 和 Redis 作为消息代理。

下图是 Eventuate CDC 的架构示意图。

Eventuate CDC 中重要组件的介绍如下表所示。

示例应用使用的是 Eventuate CDC。

实现事务性发件箱模式

在了解了变化数据捕获技术相关的内容之后,我们可以实现自己的事务性发件箱模式,不过更好的做法是使用已有的开源库。

在连接器读取到发件箱表中的数据变化之后,Debezium 可以对发布到 Kafka 中的消息进行转换,再发布到应用相关的主题中。不过 Debezium 的这个功能目前还处于开发阶段,不太适用于生产环境。

本专栏的示例应用使用的 Eventuate 提供的事务消息库。Eventuate 的事务性消息的具体内容将在下一课时中介绍。

需要注意的是,事务性发件箱模式会导致一个事件被发布至少一次。如果消息中继进程在发送事件之后崩溃,而没有机会记录下 CDC 相关的状态,当消息中继进程恢复之后,会重新处理发件箱表中的一些记录,这会导致对应的事件被重新发布。这并不是一个问题,因为 Kafka 也是提供至少有一次的消息传递的保证性,所以事件的重复是无法避免的。

总结

事务性消息对数据一致性有着至关重要的作用,它保证了对关系型数据库的修改和对应的事件的发布这两个动作的原子性。本课时介绍了事务性发件箱模式,以及实现该模式需要的变化数据捕获技术;同时还介绍了事务日志和数据库轮询这两种 CDC 实现技术,以及常用的 CDC 库 Debezium 和 Eventuate CDC。