Appearance
加餐4:trace-receiver插件番外篇——慢查询的处理
在这一课时,我们重点来介绍 trace-receiver-plugin 插件对慢查询相关信息的处理。
这里先来简单回顾一下,在 mysql-8.x-plugin 插件中会拦截 preparedStatement.execute() 方法创建 Database 类型的 ExitSpan,并在 execute() 方法调用完成之后结束 ExitSpan。
除了前面介绍的对 ExitSpan 的基本处理之外,multiScopesSpanListener.parseExit() 方法还会针对 Database 类型的 ExitSpan 进行特殊处理,该处理主要用于统计慢查询。这里的慢查询统计不仅是 DB 的慢查询,还包括其他常见的存储,例如:Redis、MongoDB 等等。
parseExit() 方法相关的代码片段如下,其核心是将请求存储的时间与 application.yml 配置文件中指定的慢查询阈值(slowDBAccessThreshold 配置项)进行比较,超过阈值的请求会创建相应的 DatabaseSlowStatement 对象并记录到 slowDatabaseAccesses 集合中。
java
DatabaseSlowStatement statement = new DatabaseSlowStatement();
// 记录发生此次慢查询的 traceId
statement.setTraceId(traceId);
// 由 TraceSegment.id 以及 Span.id 构成的唯一标识
statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
// 记录存储对应的 ServiceId
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
// 此次慢查询的实际耗时
statement.setLatency(sourceBuilder.getLatency());
// 秒级时间窗口
statement.setTimeBucket(TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
for (KeyStringValuePair tag : spanDecorator.getAllTags()) { // 遍历ExitSpan 携带的 Tag 信息
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
// 具体执行的操作,例如,访问 DB 的话,就是 SQL 语句
statement.setStatement(tag.getValue());
} else if (SpanTags.DB_TYPE.equals(tag.getKey())) {
// 在 application.yml 配置文件中配置了不同存储的慢查询阈值上限,
// 这里会根据 dbType(其值可以为 sql、Redis、MongoDB 等)查找其阈值
String dbType = tag.getValue();
DBLatencyThresholdsAndWatcher thresholds = config.getDbLatencyThresholdsAndWatcher();
int threshold = thresholds.getThreshold(dbType);
if (sourceBuilder.getLatency() > threshold) {
isSlowDBAccess = true; // 判断此次请求存储的操作是否为慢查询
}
}
}
if (isSlowDBAccess) { // 将慢查询记录到 slowDatabaseAccesses 集合中
slowDatabaseAccesses.add(statement);
}
在 multiScopesSpanListener.build() 方法中会将 slowDatabaseAccesses 集合中记录的全部DatabaseSlowStatement 对象交给 SourceReceiver 处理,这里 DatabaseSlowStatement 对应的 SourceDispatcher 实现是 DatabaseStatementDispatcher。在 DatabaseStatementDispatcher 中会将 DatabaseSlowStatement 转换成 TopNDatabaseStatement,并交给 TopNStreamProcessor 进行处理。
TopNDatabaseStatement 的继承关系如下所示:
抽象类 Record 前面介绍过其 timeBucket 字段对应 Document 中的 time_bucket 字段。抽象类 TopN 中的四个核心字段如下,正好对应 DatabaseSlowStatement 中记录的慢查询核心信息:
java
@Getter @Setter @Column(columnName = "statement", content = true) private String statement;
@Getter @Setter @Column(columnName = "latency") private long latency;
@Getter @Setter @Column(columnName = "trace_id") private String traceId;
@Getter @Setter @Column(columnName = "service_id") private int serviceId;
有些场景中,超越慢查询阈值的操作可能会比较多,全部记录下来的意义不大,一般针对每个存储服务只会记录耗时最大的前几个慢查询,正如 TopN 这个抽象类的名字所示 。SkyWalking 只会记录耗时最大的 N 个慢查询,topN.compareTo() 方法会比较 latency 字段,从而实现按照耗时的排序。
TopNDatabaseStatement 是 TopN 的唯一实现类,其中需要注意的是 equals() 方法,比较的是 serviceId。TopNDatabaseStatement 对应 Index 名称的前缀是"top_n_database_statement",Document Id 就是前面介绍的 DatabaseSlowStatement 的 Id,即 TraceSegmentId + SpanId 构成。
TopNWorker
TopNStreamProcessor 为每个 TopN 类型(其实只有 TopNDatabaseStatement)提供的 Worker 链中只有一个 Worker ------ TopNWorker。与前文介绍的 MetricsPersistentWorker 以及 RecordPersistentWorker 类似,TopNWorker 也继承了 PersistenceWorker 抽象类,其结构如下图所示,TopNWorker 也是先将 TopNDatabaseStatement 暂存到 DataCarrier,然后由后台 Consumer 线程定期读取并调用 onWork() 方法进行处理。
在 TopNWorker.onWorker() 方法中会将 TopNDatabaseStatement 暂存到 LimitedSizeDataCache 中进行排序。LimitedSizeDataCache 使用双队列模式,继承了 Windows 抽象类,与前文介绍的 MergeDataCache 类似。LimitedSizeDataCache 底层的队列实现是 LimitedSizeDataCollection,其 data 字段(Map 类型)中维护了每个存储服务的慢查询(即 TopNDatabaseStatement)列表,每个列表都是定长的(由 limitedSize 字段指定,默认 50),在调用 limitedSizeDataCollection.put() 方法写入的时候会按照 latency 从大到小排列,并只保留最多 50 个元素,如下图所示:
可见,在 LimitedSizeDataCache 中缓存的慢查询是按照存储服务的维度进行分类、排序以及计算 TopN 的。
回到 TopNWorker,它覆盖了 PersistenceWorker 的 onWork() 方法,如下所示:
java
void onWork(TopN data) {
limitedSizeDataCache.writing();
try {
limitedSizeDataCache.add(data);
} finally {
limitedSizeDataCache.finishWriting();
}
}
PersistenceTimer
在 PersistenceWorker 的三个实现类中,MetricsPersistentWorker 和 RecordPersistentWorker 启动的 Consumer 直接使用了继承自 PersistenceWorker 的 onWork() 方法 ,该实现只会在 DataCache 缓存的数据到达一定阈值时,才会触发 ElasticSearch 的写入。如果缓存量长时间达不到阈值,就会导致监控数据和 Trace 数据写入延迟。另外,前面的介绍 TopNWorker.onWork() 实现只有写入 LimitedSizeDataCache 的逻辑,没有读取的逻辑。
为了解决上述问题,在各个模块初始化完成之后,会在 coreModuleProvider.notifyAfterCompleted() 方法中启动 PersistenceTimer(前面介绍的 GRPCServer 也是在此处启动的)。
PersistenceTimer 中会启动一个后台线程定期(初始延迟为 1s,后续间隔为 3s)将三个 PersistenceWorker 实现中缓存的数据持久化到 ElasticSearch 中,大致实现如下所示(省略 Debug 级别的日志输出以及部分 try/catch 代码):
java
private void extractDataAndSave(IBatchDAO batchDAO) {
// 三个 PersistenceWorker 实现构成的列表
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
// 逐个 PersistenceWorker 实现的 flushAndSwitch()方法,
// 其中主要是对 DataCache 队列的切换
if (worker.flushAndSwitch()) {
// 调用 PersistenceWorker.buildBatchCollection()为 DataCache中每个元素创建相应的 IndexRequest 以及 UpdateRequest 请求
List<?> batchCollection = worker.buildBatchCollection();
batchAllCollection.addAll(batchCollection);
}
});
// 执行三个 PersistenceWorker 生成的全部 ElasticSearch 请求
batchDAO.batchPersistence(batchAllCollection);
}
这里需要特别说明是:MetricsPersistentWorker 和 RecordPersistentWorker 中的 flushAndSwitch() 方法都继承自 PersistenceWorker,其主要功能是切换底层 DataCache 的 current 队列,这与 persistenceWorker.onWorker() 方法中的核心逻辑类似。
而 TopNWorker 覆盖了 flushAndSwitch() 方法,其中添加了执行频率的控制,大致实现如下:
java
public boolean flushAndSwitch() {
long now = System.currentTimeMillis();
if (now - lastReportTimestamp <= reportCycle) {
return false; // 默认 10min 执行一次
}
lastReportTimestamp = now; // 重置 lastReportTimestamp
return super.flushAndSwitch(); // 调用 PersistenceWorker 实现
}
到此为止,trace-receiver-plugin 插件核心的工作原理及实现就介绍完了。