一、背景
Skywalking在進行鍊路追蹤時,會不斷生成資料,這些資料先緩存在本地,然後異步批量上報。産生資料的一方為生産者,擷取資料并上報為消費者。下面簡單介紹一下Skywywalking的資料存儲于消費的設計。
二、Skywalking資料存儲、消費方式
Skywalking的資料存儲及上報的邏輯在類DataCarrier中實作,DataCarrier的定義如下:
public class DataCarrier<T> {
/**
* buffer 大小
*/
private final int bufferSize;
/**
* 通道數量
*/
private final int channelSize;
/**
* 通道
*/
private Channels<T> channels;
/**
* 消費線程
*/
private ConsumerPool<T> consumerPool;
public DataCarrier(int channelSize, int bufferSize) {
this.bufferSize = bufferSize;
this.channelSize = channelSize;
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}
。。。
}
DataCarrier會啟動多個線程進行資料的存儲和消費,資料的存取都在channels中
public class Channels<T> {
/**
* Buffer 數組
*/
private final Buffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
/**
* 緩沖政策
*/
private BufferStrategy strategy;
/**
* 建立 Channels
*
* @param channelSize 通道數量
* @param bufferSize 緩沖區大小
* @param partitioner
* @param strategy 緩沖政策
*/
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
// 建立 Buffer 數組
bufferChannels = new Buffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
}
在Channels的初始化中,根據通道數量channelSize來設定buffer的個數,這樣就會生成channelSize個buffer。
1、生産者流程
生産者代碼
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
// 計算最大重試次數
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
// 多次重試,儲存資料直到成功
for (; retryCountDown > 0; retryCountDown--) {
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
@Override
public int partition(int total, T data) {
return (int)Thread.currentThread().getId() % total;
}
生産者用save(T data)函數存儲資料,先通過函數partition确定bufferChannels的索引index,然後将資料儲存在選擇的bufferChannel中。
/**
* 儲存資料
*
* @param data 資料
* @return 是否儲存成功
*/
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
// 等待
while (buffer[i] != null) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
buffer[i] = data;
return true;
}
在儲存資料時,先通過累加器index擷取最新的寫入位址,如果該位址沒有資料,直接寫入;如有資料,執行不同的政策:
阻塞:等待這個位置的資料被消費,然後再寫入
覆寫:将之前未被消費的資料覆寫
跳過:将這個資料丢棄
如果一次寫入不成功,還可以重試幾次。
2、消費者流程
/**
* 消費者池
*
**/
public class ConsumerPool<T> {
/**
* 是否運作中
*/
private boolean running;
/**
* 線程數組
*/
private ConsumerThread[] consumerThreads;
/**
* 通道
*/
private Channels<T> channels;
/**
* 鎖
*/
private ReentrantLock lock;
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i].setDaemon(true);
}
}
消費者會建立一個消費線程池ConsumerPool,線上程池ConsumerPool初始化時,會傳入之前建立好的channels并和ConsumerPool關聯,然後建立多個consumerThreads。
下面看一下消費線程的執行流程
/**
* 消費線程
*/
public class ConsumerThread<T> extends Thread {
@Override
public void run() {
running = true;
// 不斷消費,直到關閉
while (running) {
// 消費
boolean hasData = consume();
if (!hasData) { // 無資料,sleep 等待
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
}
// 消費剩餘部分
// consumer thread is going to stop
// consume the last time
consume();
consumer.onExit();
}
}
/**
* 消費
*
* @return 是否消費了資料
*/
private boolean consume() {
boolean hasData = false;
// 獲得資料
LinkedList<T> consumeList = new LinkedList<T>();
for (DataSource dataSource : dataSources) {
LinkedList<T> data = dataSource.obtain();
if (data.size() == 0) {
continue;
}
for (T element : data) {
consumeList.add(element);
}
hasData = true;
}
// 若有資料,進行消費
if (consumeList.size() > 0) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
}
}
return hasData;
}
消費線程會不斷從buffer中擷取資料,如果沒有資料會sleep 20毫秒。得到資料後就會調用函數 consumer.consume(consumeList)上報資料,進而達到消費的效果。