天天看點

Skywalking資料存儲消費方式

一、背景

Skywalking在進行鍊路追蹤時,會不斷生成資料,這些資料先緩存在本地,然後異步批量上報。産生資料的一方為生産者,擷取資料并上報為消費者。下面簡單介紹一下Skywywalking的資料存儲于消費的設計。

二、Skywalking資料存儲、消費方式

Skywalking的資料存儲及上報的邏輯在類DataCarrier中實作,DataCarrier的定義如下:

public class DataCarrier<T> {

    /**
     * buffer 大小
     */
    private final int bufferSize;
    /**
     * 通道數量
     */
    private final int channelSize;
    /**
     * 通道
     */
    private Channels<T> channels;
    /**
     * 消費線程
     */
    private ConsumerPool<T> consumerPool;

    public DataCarrier(int channelSize, int bufferSize) {
        this.bufferSize = bufferSize;
        this.channelSize = channelSize;
        channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
    }
。。。
}
           

DataCarrier會啟動多個線程進行資料的存儲和消費,資料的存取都在channels中

public class Channels<T> {

    /**
     * Buffer 數組
     */
    private final Buffer<T>[] bufferChannels;
    private IDataPartitioner<T> dataPartitioner;
    /**
     * 緩沖政策
     */
    private BufferStrategy strategy;

    /**
     * 建立 Channels
     *
     * @param channelSize 通道數量
     * @param bufferSize 緩沖區大小
     * @param partitioner
     * @param strategy 緩沖政策
     */
    public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
        this.dataPartitioner = partitioner;
        this.strategy = strategy;
        // 建立 Buffer 數組
        bufferChannels = new Buffer[channelSize];
        for (int i = 0; i < channelSize; i++) {
            bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
        }
    }
           

在Channels的初始化中,根據通道數量channelSize來設定buffer的個數,這樣就會生成channelSize個buffer。

1、生産者流程

生産者代碼

public boolean save(T data) {
        int index = dataPartitioner.partition(bufferChannels.length, data);

        // 計算最大重試次數
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }

        // 多次重試,儲存資料直到成功
        for (; retryCountDown > 0; retryCountDown--) {
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

    @Override
    public int partition(int total, T data) {
        return (int)Thread.currentThread().getId() % total;
    }
           

生産者用save(T data)函數存儲資料,先通過函數partition确定bufferChannels的索引index,然後将資料儲存在選擇的bufferChannel中。

/**
     * 儲存資料
     *
     * @param data 資料
     * @return 是否儲存成功
     */
    boolean save(T data) {
        int i = index.getAndIncrement();
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    // 等待
                    while (buffer[i] != null) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                case OVERRIDE:
                default:
            }
        }
        buffer[i] = data;
        return true;
    }
           

在儲存資料時,先通過累加器index擷取最新的寫入位址,如果該位址沒有資料,直接寫入;如有資料,執行不同的政策:

     阻塞:等待這個位置的資料被消費,然後再寫入

     覆寫:将之前未被消費的資料覆寫

     跳過:将這個資料丢棄

如果一次寫入不成功,還可以重試幾次。

2、消費者流程

/**
 * 消費者池
 *
**/
public class ConsumerPool<T> {

    /**
     * 是否運作中
     */
    private boolean running;
    /**
     * 線程數組
     */
    private ConsumerThread[] consumerThreads;
    /**
     * 通道
     */
    private Channels<T> channels;
    /**
     * 鎖
     */
    private ReentrantLock lock;

    public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
        this(channels, num);
        for (int i = 0; i < num; i++) {
            consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
            consumerThreads[i].setDaemon(true);
        }
    }
           

消費者會建立一個消費線程池ConsumerPool,線上程池ConsumerPool初始化時,會傳入之前建立好的channels并和ConsumerPool關聯,然後建立多個consumerThreads。

下面看一下消費線程的執行流程

/**
 * 消費線程
 */
public class ConsumerThread<T> extends Thread {

    @Override
    public void run() {
        running = true;

        // 不斷消費,直到關閉
        while (running) {
            // 消費
            boolean hasData = consume();

            if (!hasData) { // 無資料,sleep 等待
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                }
            }
        }

        // 消費剩餘部分
        // consumer thread is going to stop
        // consume the last time
        consume();

        consumer.onExit();
    }
}

/**
     * 消費
     *
     * @return 是否消費了資料
     */
    private boolean consume() {
        boolean hasData = false;

        // 獲得資料
        LinkedList<T> consumeList = new LinkedList<T>();
        for (DataSource dataSource : dataSources) {
            LinkedList<T> data = dataSource.obtain();
            if (data.size() == 0) {
                continue;
            }
            for (T element : data) {
                consumeList.add(element);
            }
            hasData = true;
        }

        // 若有資料,進行消費
        if (consumeList.size() > 0) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            }
        }
        return hasData;
    }
           

消費線程會不斷從buffer中擷取資料,如果沒有資料會sleep 20毫秒。得到資料後就會調用函數 consumer.consume(consumeList)上報資料,進而達到消費的效果。

繼續閱讀