天天看点

Skywalking数据存储消费方式

一、背景

Skywalking在进行链路追踪时,会不断生成数据,这些数据先缓存在本地,然后异步批量上报。产生数据的一方为生产者,获取数据并上报为消费者。下面简单介绍一下Skywywalking的数据存储于消费的设计。

二、Skywalking数据存储、消费方式

Skywalking的数据存储及上报的逻辑在类DataCarrier中实现,DataCarrier的定义如下:

public class DataCarrier<T> {

    /**
     * buffer 大小
     */
    private final int bufferSize;
    /**
     * 通道数量
     */
    private final int channelSize;
    /**
     * 通道
     */
    private Channels<T> channels;
    /**
     * 消费线程
     */
    private ConsumerPool<T> consumerPool;

    public DataCarrier(int channelSize, int bufferSize) {
        this.bufferSize = bufferSize;
        this.channelSize = channelSize;
        channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
    }
。。。
}
           

DataCarrier会启动多个线程进行数据的存储和消费,数据的存取都在channels中

public class Channels<T> {

    /**
     * Buffer 数组
     */
    private final Buffer<T>[] bufferChannels;
    private IDataPartitioner<T> dataPartitioner;
    /**
     * 缓冲策略
     */
    private BufferStrategy strategy;

    /**
     * 创建 Channels
     *
     * @param channelSize 通道数量
     * @param bufferSize 缓冲区大小
     * @param partitioner
     * @param strategy 缓冲策略
     */
    public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
        this.dataPartitioner = partitioner;
        this.strategy = strategy;
        // 创建 Buffer 数组
        bufferChannels = new Buffer[channelSize];
        for (int i = 0; i < channelSize; i++) {
            bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
        }
    }
           

在Channels的初始化中,根据通道数量channelSize来设置buffer的个数,这样就会生成channelSize个buffer。

1、生产者流程

生产者代码

public boolean save(T data) {
        int index = dataPartitioner.partition(bufferChannels.length, data);

        // 计算最大重试次数
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }

        // 多次重试,保存数据直到成功
        for (; retryCountDown > 0; retryCountDown--) {
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

    @Override
    public int partition(int total, T data) {
        return (int)Thread.currentThread().getId() % total;
    }
           

生产者用save(T data)函数存储数据,先通过函数partition确定bufferChannels的索引index,然后将数据保存在选择的bufferChannel中。

/**
     * 保存数据
     *
     * @param data 数据
     * @return 是否保存成功
     */
    boolean save(T data) {
        int i = index.getAndIncrement();
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    // 等待
                    while (buffer[i] != null) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                case OVERRIDE:
                default:
            }
        }
        buffer[i] = data;
        return true;
    }
           

在保存数据时,先通过累加器index获取最新的写入地址,如果该地址没有数据,直接写入;如有数据,执行不同的策略:

     阻塞:等待这个位置的数据被消费,然后再写入

     覆盖:将之前未被消费的数据覆盖

     跳过:将这个数据丢弃

如果一次写入不成功,还可以重试几次。

2、消费者流程

/**
 * 消费者池
 *
**/
public class ConsumerPool<T> {

    /**
     * 是否运行中
     */
    private boolean running;
    /**
     * 线程数组
     */
    private ConsumerThread[] consumerThreads;
    /**
     * 通道
     */
    private Channels<T> channels;
    /**
     * 锁
     */
    private ReentrantLock lock;

    public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
        this(channels, num);
        for (int i = 0; i < num; i++) {
            consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
            consumerThreads[i].setDaemon(true);
        }
    }
           

消费者会创建一个消费线程池ConsumerPool,在线程池ConsumerPool初始化时,会传入之前创建好的channels并和ConsumerPool关联,然后创建多个consumerThreads。

下面看一下消费线程的执行流程

/**
 * 消费线程
 */
public class ConsumerThread<T> extends Thread {

    @Override
    public void run() {
        running = true;

        // 不断消费,直到关闭
        while (running) {
            // 消费
            boolean hasData = consume();

            if (!hasData) { // 无数据,sleep 等待
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                }
            }
        }

        // 消费剩余部分
        // consumer thread is going to stop
        // consume the last time
        consume();

        consumer.onExit();
    }
}

/**
     * 消费
     *
     * @return 是否消费了数据
     */
    private boolean consume() {
        boolean hasData = false;

        // 获得数据
        LinkedList<T> consumeList = new LinkedList<T>();
        for (DataSource dataSource : dataSources) {
            LinkedList<T> data = dataSource.obtain();
            if (data.size() == 0) {
                continue;
            }
            for (T element : data) {
                consumeList.add(element);
            }
            hasData = true;
        }

        // 若有数据,进行消费
        if (consumeList.size() > 0) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            }
        }
        return hasData;
    }
           

消费线程会不断从buffer中获取数据,如果没有数据会sleep 20毫秒。得到数据后就会调用函数 consumer.consume(consumeList)上报数据,从而达到消费的效果。

继续阅读