Skywalking发送trace

2020/03/21 Skywalking

源码分析-发送Trace

考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列后台线程异步】发送给 Collector 。

我们先来看看 TraceSegmentServiceClient 的属性

  • TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。
  • lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。
  • segmentUplinkedCounter 属性,TraceSegment 发送数量。
  • segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。
  • carrier 属性,内存队列。
  • serviceStub 属性,非阻塞 Stub 。
  • status 属性,连接状态。

TracingContextListener

这里先帮助看官老爷回顾一个点,TracingContext.finish() 方法在关闭 TraceSegment 的时候,会调用下面这行代码:

在 ListenerManager 中记录了所有 TracingContextListener 监听器:

在ListenerManager.notify() 方法中会遍历所有TracingContextListener监听器,通知他们该 TraceSegment 将会被关闭了:

TraceSegmentServiceClient

TraceSegmentServiceClient 实现了下面四个接口:

BootService

IConsumer

TracingContextListener

GRPCChannelListener

先来看看它的核心字段:

再来看TraceSegmentReportService 服务的 proto定义:

prepare() 方法会将当前TraceSegmentServiceClient对象作为 Listener 注册到 GRPCChannelManager 上,监听链接状态。当链接状态发生变化时,会通过 statusChanged() 方法(这是对GRPCChannelListener接口的实现)修改 status 字段值。

boot() 方法中会初始化上述核心字段,其中会创建 DataCarrier 对象,并调用其 consume() 方法启动 ConsumerThread 线程:

onComplete() 方法会将当前TraceSegmentServiceClient对象作为 Listener 注册到TracingContext.LISTENERS 集合中,easy,不展开说了。

再来看TraceSegmentServiceClient对 TracingContextListener 接口的实现,其 afterFinished() 方法会调用 DataCarrier.produce() 方法,将 TraceSegment 写入 DataCarrier 缓冲区暂存。

再来看 TraceSegmentServiceClient 对 IConsumer 接口的实现,在其 consume() 方法中会将 TraceSegment(从DataCarrier中读取到的)通过 gRPC 的方式发送到服务端,具体实现如下:

简单看一下GRPCStreamServiceStatus 的实现,其中封装了一个 volatile boolean 字段,来表示发送状态:

还提供了 wait4Finish() 方法用来等待整个发送过程结束:

最后,GRPCStreamServiceStatus.finish()方法会修改 status 的值为true,这样, wait4Finish() 方法也可以退出。

发送Trace

XXXSegmentServiceClient有一个消费线程,不停的拉取缓冲队列中的数据,

  • 如果是TraceSegmentServiceClient,则通过gRPC发送给后端OAP
  • 如果是KafkaTraceSegmentServiceClient,则丢到Kafka中。

Search

    微信好友

    博士的沙漏

    Table of Contents