一、背景
Skywalking在进行链路追踪时,会不断生成数据,这些数据先缓存在本地,然后异步批量上报。产生数据的一方为生产者,获取数据并上报为消费者。下面简单介绍一下Skywywalking的数据存储于消费的设计。
二、Skywalking数据存储、消费方式
Skywalking的数据存储及上报的逻辑在类DataCarrier中实现,DataCarrier的定义如下:
public class DataCarrier<T> {
/**
* buffer 大小
*/
private final int bufferSize;
/**
* 通道数量
*/
private final int channelSize;
/**
* 通道
*/
private Channels<T> channels;
/**
* 消费线程
*/
private ConsumerPool<T> consumerPool;
public DataCarrier(int channelSize, int bufferSize) {
this.bufferSize = bufferSize;
this.channelSize = channelSize;
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}
。。。
}
DataCarrier会启动多个线程进行数据的存储和消费,数据的存取都在channels中
public class Channels<T> {
/**
* Buffer 数组
*/
private final Buffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
/**
* 缓冲策略
*/
private BufferStrategy strategy;
/**
* 创建 Channels
*
* @param channelSize 通道数量
* @param bufferSize 缓冲区大小
* @param partitioner
* @param strategy 缓冲策略
*/
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
// 创建 Buffer 数组
bufferChannels = new Buffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
}
在Channels的初始化中,根据通道数量channelSize来设置buffer的个数,这样就会生成channelSize个buffer。
1、生产者流程
生产者代码
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
// 计算最大重试次数
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
// 多次重试,保存数据直到成功
for (; retryCountDown > 0; retryCountDown--) {
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
@Override
public int partition(int total, T data) {
return (int)Thread.currentThread().getId() % total;
}
生产者用save(T data)函数存储数据,先通过函数partition确定bufferChannels的索引index,然后将数据保存在选择的bufferChannel中。
/**
* 保存数据
*
* @param data 数据
* @return 是否保存成功
*/
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
// 等待
while (buffer[i] != null) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
buffer[i] = data;
return true;
}
在保存数据时,先通过累加器index获取最新的写入地址,如果该地址没有数据,直接写入;如有数据,执行不同的策略:
阻塞:等待这个位置的数据被消费,然后再写入
覆盖:将之前未被消费的数据覆盖
跳过:将这个数据丢弃
如果一次写入不成功,还可以重试几次。
2、消费者流程
/**
* 消费者池
*
**/
public class ConsumerPool<T> {
/**
* 是否运行中
*/
private boolean running;
/**
* 线程数组
*/
private ConsumerThread[] consumerThreads;
/**
* 通道
*/
private Channels<T> channels;
/**
* 锁
*/
private ReentrantLock lock;
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i].setDaemon(true);
}
}
消费者会创建一个消费线程池ConsumerPool,在线程池ConsumerPool初始化时,会传入之前创建好的channels并和ConsumerPool关联,然后创建多个consumerThreads。
下面看一下消费线程的执行流程
/**
* 消费线程
*/
public class ConsumerThread<T> extends Thread {
@Override
public void run() {
running = true;
// 不断消费,直到关闭
while (running) {
// 消费
boolean hasData = consume();
if (!hasData) { // 无数据,sleep 等待
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
}
// 消费剩余部分
// consumer thread is going to stop
// consume the last time
consume();
consumer.onExit();
}
}
/**
* 消费
*
* @return 是否消费了数据
*/
private boolean consume() {
boolean hasData = false;
// 获得数据
LinkedList<T> consumeList = new LinkedList<T>();
for (DataSource dataSource : dataSources) {
LinkedList<T> data = dataSource.obtain();
if (data.size() == 0) {
continue;
}
for (T element : data) {
consumeList.add(element);
}
hasData = true;
}
// 若有数据,进行消费
if (consumeList.size() > 0) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
}
}
return hasData;
}
消费线程会不断从buffer中获取数据,如果没有数据会sleep 20毫秒。得到数据后就会调用函数 consumer.consume(consumeList)上报数据,从而达到消费的效果。