Skip to content

第14讲:收集、发送Trace核心原理,Agent与OAP的大动脉

在前面的课时中,我们深入介绍了 SkyWalking 对 Trace 基本概念的实现。本课时我们将继续深入学习 Trace 相关的 BootService 接口实现类以及 Trace 收集和发送的核心逻辑。Trace 相关的 BootService 接口实现类如下图所示:

ContextManager

ContextManager 的主要职责就是管理前文介绍的 TracingContext,它会通过 ThreadLocal 将 TracingContext 对象与当前线程进行绑定,这样就实现了 TraceSegment、TracingContext 和 线程三方之间的关联。

ContextManager 有三个核心字段:

  • CONTEXT(ThreadLocal 类型)
    :通过该字段可以将一个 TracingContext 对象与一个线程进行关联。
  • RUNTIME_CONTEXT(ThreadLocal 类型)
    :RuntimeContext 底层封装了一个 ConcurrentHashMap 集合,可以为当前 TracingContext 记录一些附加信息。
  • EXTEND_SERVICE(ContextManagerExtendService 类型):ContextManagerExtendService 也实现了 BootService 接口,它主要负责创建 TracingContext 对象。

虽然 ContextManager 实现了 BootService 接口,但是其 prepare()、boot()、onComplete() 方法都为空实现。ContextManager 提供了与 TracingContext 对应的几乎所有方法,基本实现都是委托给当前线程绑定的 TracingContext 对象,这里以 createEntrySpan() 方法为例进行介绍:

java
public static AbstractSpan createEntrySpan(String operationName,
         ContextCarrier carrier) {
    SamplingService samplingService = ServiceManager.INSTANCE
            .findService(SamplingService.class);     // 采样相关
    AbstractSpan span;
    AbstractTracerContext context;
    // 检测ContextCarrier是否合法,其实就是检查它的核心字段是否已填充好
    if (carrier != null && carrier.isValid()) { 
        samplingService.forceSampled();
        // 获取当前线程绑定的TracingContext
        context = getOrCreate(operationName, true); 
        // 委托给当前线程绑定的TracingContext来创建EntrySpan
        span = context.createEntrySpan(operationName); 
        // 从ContextCarrier提取上游服务传播过来的Trace信息
        context.extract(carrier); 
    } else { // 没有上游服务的场景
        context = getOrCreate(operationName, false);
        span = context.createEntrySpan(operationName);
    }
    return span;
}

getOrCreate() 方法会从 CONTEXT 字段中获取当前线程绑定的 TracingContext 对象,如果当前线程没有关联 TracingContext 上下文,则会通过 ContextManagerExtendService 新建并绑定。

stopSpan() 方法在关闭 Span 的同时,还会检查当前 TraceSegment 是否结束,TraceSegment 结束时会将存储在 CONTEXT 中的 TracingContext 对象以及 RUNTIME_CONTEXT 中的附加信息一并清除,这也是为了防止内存泄露的一步重要操作。

Context 生成与采样

如果不做任何限制,每个请求都应该生成一条完整的 Trace。在面对海量请求时如果也同时产生海量 Trace,就会给网络和存储带来双倍的压力,浪费很多资源。为了解决这个问题,几乎所有的 Trace 系统都会支持采样的功能。SamplingService 就是用来实现采样功能的 BootService 实现。

SamplingService 的采样逻辑依赖 samplingFactorHolder 字段(AtomicInteger 类型)的自增。ContextManagerExtendService 是负责创建 TracingContext 的 BootService 实现,在 ContextManagerExtendService 创建 TracingContext 时,会调用 SamplingService 的 trySampling() 方法递增 samplingFactorHolder 字段(CAS 操作),当增加到阈值(默认值为 3,可以通过 agent.sample_n_per_3_secs 配置进行修改)时会返回 false,表示采样失败,这时 ContextManagerExtendService 就会生成 IgnoredTracerContext,IgnoredTracerContext 是个空 Context 实现,不会记录 Trace 信息。

另外,SamplingService 中会启动一个定时任务,每秒都会将 samplingFactorHolder 字段清零,这样就实现了每秒采样指定条数的 Trace 数据,如下图所示:

Trace 的收集

这里我们先来回顾一个知识点,当 TracingContext 通过 stopSpan() 方法关闭最后一个 Span 时,会调用 finish() 方法关闭相应的 TraceSegment,与此同时,还会调用 TracingContext.ListenerManager.notifyFinish() 方法通知所有监听 TracingContext 关闭事件的监听器 ------ TracingContextListener。TracingContext.finish() 方法的相关实现如下:

java
private void finish() {
    TraceSegment finishedSegment = 
          segment.finish(isLimitMechanismWorking());
    TracingContext.ListenerManager.notifyFinish(finishedSegment);
}

TraceSegmentServiceClient 是 TracingContextListener 接口的唯一实现,其主要功能就是在 TraceSegment 结束时对其进行收集,并发送到后端的 OAP 集群。

下图展示了 TraceSegmentServiceClient 的核心结构:

TraceSegmentServiceClient 底层维护了一个 DataCarrier 对象,其底层 Channels 默认有 5 个 Buffer,每个 Buffer 长度为 300,使用的是 IF_POSSIBLE 阻塞写入策略,底层会启动一个 ConsumerThread 线程。

TraceSegmentServiceClient 作为一个 TracingContextListener 接口的实现,会在 notifyFinish() 方法中,将刚刚结束的 TraceSegment 写入到 DataCarrier 中缓存。同时,TraceSegmentServiceClient 实现了前面介绍的 IConsumer 接口,封装了消费 Channels 中数据的逻辑,在 consume() 方法中会首先将消费到的 TraceSegment 对象序列化,然后通过 gRPC 请求发送到后端 OAP 集群。该过程涉及的 gRPC 接口定义如下:

java
service TraceSegmentReportService {
    rpc collect (stream UpstreamSegment) returns (Commands) {
    }
}

该 gRPC 请求中用到的 UpstreamSegment 结构体包含了 Trace ID 以及 TraceSegment 序列化之后的字节数组,定义如下所示:

java
message UpstreamSegment {
    repeated UniqueId globalTraceIds = 1;
    bytes segment = 2; // TraceSegment信息
}

这个过程中,TraceSegment 对象会转换成相应的 proto 结构体实例,下图展示了 UpstreamSegment 中包含的具体信息:

既然要发送 gRPC 请求,就必然要依赖网络连接,TraceSegmentServiceClient 实现了 GRPCChannelListener 接口,可以监听底层网络连接的变化情况。在 prepare() 方法中可将其作为 Listener 注册到前文介绍的 GRPCChannelManager 中。

明确了发送 Trace 时的具体数据,以及其涉及的 gRPC 请求和接口定义,我们再来看 consume() 方法的具体实现:

java
public void consume(List<TraceSegment> data) {
    if (CONNECTED.equals(status)) { // 根据底层网络连接的状态决定是否发送
        // 创建GRPCStreamServiceStatus对象
        final GRPCStreamServiceStatus status = 
             new GRPCStreamServiceStatus(false);
        StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver
               = serviceStub.collect(new StreamObserver<Commands>() {
            public void onNext(Commands commands) {}
            public void onError(Throwable throwable) {
                // 发生异常会调用 finished()方法,停止等待
                status.finished();
                // 通知GRPCChannelManager重新创建网络连接
                ServiceManager.INSTANCE.findService(
                   GRPCChannelManager.class).reportError(throwable);
            }
            public void onCompleted() {
                // 发送成功之后,会调用finished()方法结束等待
                status.finished(); 
            }
        });
        for (TraceSegment segment : data) { 
            // 将TraceSegment转换成UpstreamSegment对象,然后才能进行序列化以
            // 及发送操作transform()方法实现的转换逻辑并不复杂,填充字段而已
            UpstreamSegment upstreamSegment = segment.transform();
            upstreamSegmentStreamObserver.onNext(upstreamSegment);
        }
        upstreamSegmentStreamObserver.onCompleted();
        status.wait4Finish(); // 等待全部TraceSegment数据发送结束
        segmentUplinkedCounter += data.size(); // 统计发送的数据量
    } else { // 网络连接断开时,只进行简单统计,数据将被直接抛弃
        segmentAbandonedCounter += data.size();
    }
    printUplinkStatus(); // 每隔 30s打印一下发送日志
}

注意,TraceSegmentServiceClient 在批量发送完 UpstreamSegment 数据之后,会通过 GRPCStreamServiceStatus 进行自旋等待,直至该批 UpstreamSegment 全部发送完毕。

最后总结一下,TraceSegmentServiceClient 同时实现了 BootService、IConsumer、GRPCChannelListener、TracingContextListener 四个接口,如下图所示,这四个接口的实现相互依赖,共同完成 Trace 数据的收集和发送:

总结

本课时我们重点介绍了 Trace 相关的 BootService 接口实现。首先介绍了 ContextManager 的核心实现,理清了它是如何将 TracingContext 与当前线程关联起来的。接下来介绍了 SamplingService 实现客户端 Trace 采样的逻辑。最后介绍了上报 Trace 的 gRPC 接口,深入分析了 TraceSegmentServiceClient 收集和上报 Trace 数据的核心逻辑。