Skip to content

加餐3:SkyWalkingOAP存储体系剖析

前面的课时提到,SkyWalking OAP 底层支持 ElasticSearch、H2、MySQL 等多种持久化存储,同时也支持读取其他分布式链路追踪系统的数据,例如,jaeger(Uber 开源的分布式跟踪系统)、zipkin(Twitter 开源的分布式跟踪系统)。下图展示了 OAP 提供的针对不同持久化存储的插件模块:

在 server-core 模块中对整个 OAP 的持久化有一个整体的抽象,具体位置如下图所示,而上述这些插件都是在此基础上扩展实现的:

首先,OAP 存储了两种类型的数据:时间相关的数据和非时间相关的数据(与"时序"这个专有名词区分一下)。注册到 OAP 集群的 Service、ServiceInstance 以及同步的 EndpointName、NetworkAddress 都是非时间相关的数据,一个稳定的服务产生的这些数据是有限的,我们可以用几个固定的 ES 索引(或数据库表)来存储这些数据。

而像 JVM 监控等都属于时间相关的数据,它们的数据量会随时间流逝而线性增加。如果将这些时间相关的数据存储到几个固定的 ES 索引中,就会导致这些 ES 索引(或数据库表)非常大,这种显然是不能落地的。既然一个 ES 索引(或数据库表)存不下,一般会考虑切分,常见的切分方式按照时间窗口以及 DownSampling 进行切分。

这里简单介绍一下 DownSampling 这个概念。DownSampling(翻译为"向下采样"或是"降采样")是降低数据采样率或分辨率的过程。这里通过一个示例进行说明,假设 Agent 每隔一分钟进行一次 JVM Old GC 时间的采样,并发送到 Skywalking OAP 集群,一小时之后,我们可以到 60 个点(Timestamp 和 Value 构成一个点),在一张二维图(横轴是时间戳,纵轴是 Old GC 耗时)上将这些点连接起来,可以绘制出该 JVM 实例在这一小时内的 Old GC 监控曲线。

假设我们查询该 JVM 实例跨度为一周的 Old GC 监控,将获得 10080 个点,监控图中点与点之间的距离会变得非常密。此时,就可以通过 DownSampling 的方式,将一个时间范围内的多个监控数据点聚合成单个点,从而减少需要绘制的点的个数。这里我们可以按照小时为时间窗口,计算一小时内 60 点的平均值,作为该小时的 DownSampling 聚合结果。下图为 13:00 ~ 14:00 以及 14:00 ~ 15:00 两小时的 DownSampling 示意图,经过 DownSampling 之后,一周的监控数据只有 168 个点了。

另外,Trace 也是时间相关的数据,其数据量会随时间不断增加,但不具备可聚合的特性。

Model 与 ES 索引

明确了 OAP 要存储的数据特性,我们回到 server-core 模块继续分析。常见的 ORM 框架会将数据中的表结构映射成 Java 类,表中的一个行数据会映射成一个 Java Bean 对象,在 OAP 的存储抽象中也有类似的操作。SkyWalking 会将 [Metric + DownSampling] 对应的一系列 ES 索引与一个 Model 对象进行映射,下图展示了 instance_jvm_old_gc_time 这个 Metric(指标)涉及的全部 ES 索引以及对应的 Model 对象:

Model 对象记录了对应 ES 索引的核心信息:

  • name(String类型):Metric 名称,在 OAP 创建 ES 索引时会使用
  • columns(List 类型):ES 索引 中的 Field 集合,一个 ModelColumn 对象记录了一个 Field 的名称、类型等信息。
  • capableOfTimeSeries(boolean类型):对应 ES 索引中存储的数据是否为时间相关的数据。
  • downsampling(Downsampling类型):如果是时间相关的数据,则需要指定其 Downsampling 单位,可选值有 Second、 Minute、 Hour、 Day、 Month。对于非时间相关的数据,则该字段值为 Downsampling.NONE。
  • deleteHistory(boolean类型):是否删除历史数据。
  • scopeId(int类型):对应指标的全局唯一 ID。

很明显,ES 索引的名称由三部分构成:Metric 名称、DownSampling、时间窗口(后面两部分只有时序数据才会有),而 ES 索引的别名由 Metric 名称和 DownSampling 构成。

下图展示了 Model.columns 集合与 ES 索引中 Field 之间的映射关系,除了名称之间的映射关系之外,ModelColumn 中记录了每个 Field 的类型。

在 CoreModuleProvider 启动过程中,会扫描 @Stream 注解,创建相应的 Model 对象,并由 StorageModels 统一管理(底层维护了 List 集合)。 @Stream 扫描过程后面介绍,这里只需知道 @Stream 中会包含 Model 需要的信息即可。StorageModels 同时实现了 IModelSetter、IModelGetter、IModelOverride 三个 Service 接口,如下图所示。

IModelGetter、IModelSetter 定义了读取和新增 Model 实例的方法,IModelOverride 接口提供的 overrideColumnName() 方法一般用在以 DB 为存储的插件模块中,当列名与数据库关键字冲突时,会通过该方法修改列名。

在 CoreModuleProvider 的 prepare() 方法中会将 StorageModels 作为上述三个 Service 接口的实现注册到 services 集合中。

初始化存储结构

很多 ORM 框架(例如,Hibernate )提供了在应用启动时根据 Java Bean 初始化表结构的功能。SkyWalking OAP 也提供了类似的功能,主要在 ModelInstaller 中完成,核心在 install() 方法中:

java
public final void install(Client client) throws StorageException {
    // 获取全部Model对象
    IModelGetter modelGetter = moduleManager.find(CoreModule.NAME)
      .provider().getService(IModelGetter.class);
    List<Model> models = modelGetter.getModels();
    // 根据mode这个环境变量决定是否创建底层存储结构
    if (RunningMode.isNoInitMode()) {  
        ... // mode环境变量为no-init,则只会输出日志,不会初始化ES索引
    } else { // mode环境变量为非non-init值,走这个分支
        for (Model model : models) {
            // 检查Model对应的底层存储结构是否存在,如果不存在则通过
            // createTable()方法进行创建
            if (!isExists(client, model)) { 
                createTable(client, model); 
            }
        }
    }
}

ModelInstaller 是一个抽象类,其中的 createTable() 是个抽象方法,针对具体存储的具体创建流程由对应的子类完成,继承关系如下图所示:

StorageEsInstaller 是针对 ElasticSearch 的实现,位于 storage-elasticsearch-plugin 模块中,该模块的 ModuleProvider 实现是 StorageModuleElasticsearchProvider(依赖于 CoreModule),在其 start() 方法中会调用 StorageEsInstaller.intall() 方法完成 ES 索引的初始化。

java
public void start() throws ModuleStartException {
    elasticSearchClient.connect();
    StorageEsInstaller installer = new StorageEsInstaller(...);
    installer.install(elasticSearchClient);
}

这里使用的 ElasticSearchClient 是 SkyWalking 在 RestHighLevelClient 之上的封装,帮上层使用方屏蔽了一些通用的 Request 请求构造以及 Response 处理逻辑。

下面深入到 StorageEsInstaller 中的 createTable() 方法, 在存储 Metric 、Trace 等数据时会随时间推移新创建多个索引,一般会先创建统一的模板,指定公共参数、字段类型等,之后再创建索引;存储服务注册等信息的索引都只有一个,直接创建索引即可。

java
protected void createTable(Client client, Model model) {
    ElasticSearchClient esClient = (ElasticSearchClient)client;
    // 创建settings,其中指定了索引的分片数量、副本数量以及refresh时间间隔
    JsonObject settings = createSetting();
    // 创建mapping,其中指定了各个Field的类型等配置
    JsonObject mapping = createMapping(model);
    if (model.isCapableOfTimeSeries()) {
        // 对于时间相关的索引,先创建Template,其中除了指定settings和mapping
        // 之外,还会指定该Template匹配的索引名称(index_patterns)以及
        // 别名(aliases)
        if (!esClient.isExistsTemplate(model.getName())) {
            boolean isAcknowledged = esClient.createTemplate(
                model.getName(), settings, mapping);
        }
        if (!esClient.isExistsIndex(model.getName())) {
            // ElasticSearch中真正的索引名称会添加时间戳后缀
            String timeSeriesIndexName = TimeSeriesUtils
                      .timeSeries(model);
            boolean isAcknowledged = esClient.createIndex(
                      timeSeriesIndexName);
        }
    } else {
        // 与时间无关的索引只会创建一个索引,直接使用Model.name作为索引名
        // 称创建(没有时间戳后缀),另外,同样会设置settings和mapping
        boolean isAcknowledged = esClient.createIndex(model.getName(), 
            settings, mapping);
    }
}

这里有几个点需要关注一下,首先是模板的相关内容,以 instance_jvm_old_gc_time 这个指标为例,每个 DownSampling 维度对应一个模板,每个模板中的 index_patterns 、aliases 配置以及匹配的索引名称,如下表所示:

可以看到模板中指定的别名与 Model.name 是相同的,在后续查询操作中,我们可以直接通过别名查询该模板匹配的全部索引,但是写入时还是要明确指定具体索引名称的。

另一个点是索引名称的时间后缀。在 TimeBucket 工具类中会根据指定的 DownSampling 将毫秒级时间进行整理,得到相应的时间窗口,如下图所示:

拿到时间戳所在的窗口之后,TimeSeriesUtils 工具类会将其转换成对应索引的时间戳后缀:

java
public static String timeSeries(String modelName, 
        long timeBucket, Downsampling downsampling) {
    switch (downsampling) {
        case None:
            return modelName;
        case Hour: // 一天内的Hour级别数据存储到同一个索引之中
            return modelName + Const.LINE + timeBucket / 100;
        case Minute: // 一天内的分钟级数据存储到一个索引之中
            return modelName + Const.LINE + timeBucket / 10000;
        case Second: // 一天内的秒级数据存储到一个索引之中
            return modelName + Const.LINE + timeBucket / 1000000;
        default: // 对于Day、Month以及Year不做处理,直接用时间窗口作为后缀
            return modelName + Const.LINE + timeBucket;
    }
}

到此为止,StorageEsInstaller 初始化 ES 索引的核心流程就介绍完了。

数据抽象

了解了索引(或是数据库表结构)的初始化流程之后,再来看 SkyWalking OAP是如何对持久化数据进行抽象的。

SkyWalking 与 ORM 类似,会将索引中的一个 Document (或是数据库表中的一条数据)映射为一个 StorageData 对象。StorageData 接口中定义了一个 id() 方法,负责返回该条数据的唯一标识。实现 StorageData 接口的有三类数据,分别对应三个抽象类,如下图所示:

  • Metrics:所有监控指标的顶级抽象,其中定义了一个 timeBucket 字段(long类型),它是所有监控指标的公共字段,用于表示该监控点所处的时间窗口。另外,timeBucket 字段被 @Column 注解标记,在 OAP 启动时会被扫描到转换成 Model 中的 ModelColumn,在初始化 ES 索引时就会创建相应的 Field。Metrics 抽象类中还定义了计算监控数据的一些公共方法:

    • calculate() 方法:大部分 Metrics 都会有一个 value 字段来记录该监控点的值,例如,CountMetrics 中的 value 字段记录了时间窗口内事件的次数,MaxLongMetrics 中的 value 字段记录时间窗口内的最大值。calculate() 方法就是用来计算该 value 值。

    • combine() 方法:合并两个监控点。对于不同含义的监控数据,合并方式也有所不同,例如,CountMetrics 中 combine() 方法的实现是将两个监控点的值进行求和;MaxLongMetrics 中 combine() 方法的实现是取两个监控点的最大值。

  • Record:抽象了所有记录类型的数据,其子类如下图所示。其中 SegmentRecord 对应的是 TraceSegment 数据、AlarmRecord 对应一条告警、TopNDatabaseStatement 对应一条慢查询记录,这些数据都是一条条的记录。

上述记录类型的数据也有一个公共字段 ------ timeBucket(long 类型),表示该条记录数据所在的时间窗口,也同样被 @Column 注解标记。Record 抽象类中没有定义其他的公共方法。

  • RegisterSource :抽象了服务注册、服务实例注册、EndpointName(以及 NetworkAddress)同步三个过程中产生的数据。其中,定义了三个公共字段,且这三个字段都被 @Column 注解标注了:
    • sequence(int 类型):上述三个过程中的数据都会产生一个全局唯一的 ID,该全局 ID 就保存在该字段中。
    • registerTime(long 类型):第一注册(或同步)时的时间戳。
    • heartbeatTime(long 类型):心跳时间戳。

在这三个抽象类下还有很多具体的实现类,这些实现类会根据对应的具体数据,扩展新的字段和方法,在后面介绍具体模块时会展开细说。

最后,我们来看 StorageBuilder 这个接口,它与 StorageData 接口的关系非常紧密,在 StorageData 的全部实现类中,都有一个内部 Builder 类实现类了 StorageBuilder 接口。StorageBuilder 接口中定义了 map2Data() 和 data2Map() 两个方法,实现了 StorageData 对象与 Map 之间的相互转换。

DAO 层架构

在创建 ES 索引时使用到的 ElasticSearchClient, 是对 RestHighLevelClient 进行了一层封装,位于 library-client 模块,其中还提供了 JDBC 以及 gRPC 的 Client,如下图所示:

GRPCClient 底层会与指定地址创建 gRPC ManagedChannel 连接,在后面的课程中会看到, OAP 节点之间的通信就是依赖 GRPCClient 实现的。

JDBCHikariCPClient 底层封装了 HikariCP 连接池,对外提供了 execute()、executeQuery() 等执行 SQL 语句的方法。

library-client 模块相对独立,如果同学们在实际开发中需要封装 ElasticSearch 客户端或是 JDBC 客户端,都可以直接拿来使用。

虽然 ElasticSearchClient 对 RestHighLevelClient 的通用操作进行了封装,但如果上层逻辑直接使用,还是必须了解 RestHighLevelClient 的相关概念,所以在实践中会针对业务封装出一套 DAO 层。在 DAO 层会完成业务概念与存储概念的转换,如下图所示:

SkyWalking OAP 也提供了DAO 层抽象,如下图所示,大致可以分为三类:Receiver DAO、Cache DAO、Query DAO。

  • Receiver DAO 接口:在各个 receiver 模块接收到 Agent 上报的 Metrics 数据、Trace 数据以及注册请求的时候,会通过 Receiver DAO 将数据持久化到底层存储,具体的接口如下图所示:
  • IRegisterDAO 接口:负责增删改查 RegisterSource 数据。
  • IMetricsDAO 接口:负责增删改查 Metrics 数据。
  • IRecordDAO 接口:负责增删改查 Record 数据。
  • IBatchDAO 接口:上述接口都是单条数据的操作接口,IBatchDAO 接口提供了批量写入的功能。
  • IRegisterLockDAO 接口:在处理 RegisterSource 数据的时候,需要维护一把全局的锁,这样才能保证写入生成的 ID 全局唯一。IRegisterLockDAO 接口的实现就提供了全局锁的功能。
  • Cache DAO 接口:在稳定集群中,RegisterSource 类型数据基本不会变化,这里的 Cache DAO 接口主要提供了 RegisterSource 数据的查询,相较于 IRegisterDAO,Cache DAO 的接口提供了更多维度的查询方式。

如下图所示,每一种 RegisterSource 数据都对应一个 Cache,通过 Cache 的方式提高查询性能,每个 Cache 底层都关联了一个 Cache DAO 接口实现。

另外,在 CoreModule 中还会启动 CacheUpdateTimer,其中会启动一个后台线程,定期更新 ServiceInventoryCache 缓存。

  • Query DAO 接口:Query DAO 接口负责支撑 query 模块处理 SkyWalking Rocketbot 发来的查询请求,接口参数更加贴近用户请求参数。下图展示了所有 Query DAO 接口,从接口名称即可看出查询的是哪些数据。

SkyWalking OAP 中 DAO 层的整体架构以及核心接口就介绍完了。

数据 TTL

前文提到,Metrics、Trace 等(时间相关的数据)对应的 ES 索引都是按照时间进行切分的,随着时间的推移,ES 索引会越来越多。为了解决这个问题,SkyWalking 只会在 ES 中存储一段时间内的数据,CoreModuleProvider 会启动 DataTTLKeeperTimer 定时清理过期数据。

在 DataTTLKeeperTimer 中会启动一个后台线程,每 5 分钟执行一次清理操作,具体执行步骤如下:

  1. 通过 ClusterNodesQuery 查询到当前 OAP 集群中全部节点列表,如果当前节点为列表中第一个节点,才能继续执行后续的清理操作。这就保证不会出现多个 OAP 节点并发执行清理任务。
  2. 从 IModelGetter 中拿到全部 Model 对象,根据 Model 数据的 DownSampling 计算每个 Model 保留 ES 索引的范围。这里使用到了 StorageTTL 接口,其中会根据每个 Model 不同的 DownSampling 返回相应的 TTLCalculator,如下图所示:

对于一个 Model,通过 TTLCalculator 计算之后会得到一个时间,在该时间之前的 ES 索引将被删除。

在删除 ES 索引的过程成还有一些细节:

  1. 首先根据模板别名(也就是 Model.name)拿到该 Model 对应的全部 ES 索引。
  2. 根据 ES 索引的时间后缀,以及 TTLCalculator 计算得到的时间,确定哪些索引会要被删除。
  3. 如果该 Model 对应的全部索引都要被删除,则会创建一个新索引,为后面写入数据做准备。
  4. 循环调用 ElasticSearchClient.delete() 方法删除索引。

总结

本课时重点介绍了 SkyWalking OAP 持久化存储方面的架构设计以及核心接口功能。首先介绍了 Model 与底层 ES 索引之间的映射关系,之后介绍了 ModelInstaller 是如何在 OAP 启动时初始化 ES 索引的。接下来,介绍了 OAP 对不同持久化数据的抽象,以及对应的 DAO 层设计。最后介绍了定期删除过期数据的核心原理。

特别注意,本课时重在顶层设计,不在具体实现,在后面深入分析各个模块的具体实现时,会深入上述接口的实现细节。