天天看點

基于curator的延遲隊列

基于curator的延遲隊列

這裡不介紹關于curator的用法及優劣,旨在探究curator對于延遲隊列的使用原理

怎麼使用

<groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.0.1</version>           
<groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.0.1</version>           

public class Processor {

private final static CuratorFramework client;
private final static DistributedDelayQueue<String> queue;

static{
    ZookeeperConfig config = ZookeeperConfig.getConfig();
    // create client
    client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
            new ExponentialBackoffRetry(3000, 2));
    // build queue
    queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
            new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
            .buildDelayQueue();
    // 開啟執行計劃
    enable();
}

/**
 * 生産資料
 *
 * @param id
 * @param endTime
 * @throws Exception
 */
public void producer(String id, Date endTime) throws Exception {
    queue.put(id, endTime.getTime());
}
           
private static void enable(){
    try {
        client.start();
        queue.start();
    } catch (Exception e) {
        logger.error("enable queue fail, exception:{}", e);
    }
}
           

}

// Serializer

class AutoSubmitQueueSerializer implements QueueSerializer {

@Override
public byte[] serialize(String s) {
     return s.getBytes("utf-8");
}

@Override
public String deserialize(byte[] bytes) {
    return new String(bytes);
}           

// consumer

AutoSubmitConsumer implements QueueConsumer {

@Override
public void consumeMessage(String id)  {
    logger.info("consumeMessage, :{}", id);
      // service processor.
    logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
}

@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}           

是臨時節點還是持久化節點,如果基于記憶體的話用戶端或者服務端挂了以後就會存在資料丢失的問題? 是否會重新排序,zk是按照請求的時間先後順序寫入的,那麼curator是怎麼監聽到期時間的呢?

猜想

是否持久化

是否會在每次請求的時候拿到服務端所有的節點資料進行排序後存入到服務端

驗證

針對第一點,我們關閉zookeeper服務端和用戶端後重新啟動後之前的節點還存在是以是持久化節點

通過用戶端工具連接配接zookeeper發現并不會每次請求的時候都會重新排序,也就是說可能在client端進行處理的

以下是在用戶端工具上截取的一部分資訊,key是由三部分組成的,第一部分固定的queue- , 第二部分暫不确定,第三部分是節點的序号 

源碼求證

// org.apache.curator.framework.recipes.queue.DistributedQueue#start

// 部分片段

client.create().creatingParentContainersIfNeeded().forPath(queuePath);

if ( !isProducerOnly )

{
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call()
                    {
                        runLoop(); // step1
                        return null;
                    }
                }
            );
    }           

// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop

// step1中的代碼片段

while ( state.get() == State.STARTED )

{
            try
            {
                ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                currentVersion = data.version;
                // 諸如:
                //queue-|2E1D86A3BB6|0000000019
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
        // 拿到所有的子節點
                List<String> children = Lists.newArrayList(data.children); 
                // 根據過期時間排序
            // step6
                sortChildren(children); 
        // 排序後
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
                //queue-|2E1D86A3BB6|0000000019
                if ( children.size() > 0 )
                { //擷取到期時間
                    maxWaitMs = getDelay(children.get(0));
                   
                    if ( maxWaitMs > 0 ) continue;
                }
                else  continue;
               // 死循環不斷輪詢是否有滿足條件的節點;
               // 隻要有滿足條件的節點就将整個排序後的集合往下傳遞
                processChildren(children, currentVersion); // step2
            }
           
        }           

// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren

// step2對應的代碼片段:

private void processChildren(List children, long currentVersion)

{
    final Semaphore processedLatch = new Semaphore(0);
    final boolean   isUsingLockSafety = (lockPath != null);
    int             min = minItemsBeforeRefresh;
    for ( final String itemNode : children )
    {
        if ( Thread.currentThread().isInterrupted() )
        {
            processedLatch.release(children.size());
            break;
        }

        if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
        {
            processedLatch.release();
            continue;
        }

        if ( min-- <= 0 )
        {
            if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
            {
                processedLatch.release(children.size());
                break;
            }
        }
    // step3
        if ( getDelay(itemNode) > 0 )
        {
            processedLatch.release();
            continue;
        }
        //這裡使用了線程池,為了保證每一個節點都執行完畢後才傳回方法是以使用了信号燈
        executor.execute
        (
            new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        //是否采用了分布式鎖,因為我們初始化的時候并未使用是以沒有用到這裡的安全鎖,實際上是進入到了else中
                        if ( isUsingLockSafety )
                        {
                            
                            processWithLockSafety(itemNode, ProcessType.NORMAL);
                        }
                        else
                        {
                // 看這裡 step4
                            processNormally(itemNode, ProcessType.NORMAL);
                        }
                    }finally
                    {
                        processedLatch.release();
                    }
                }
            }
        );
    }

    processedLatch.acquire(children.size());
}
           

// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)

// 對應step3處的代碼片段

protected long getDelay(String itemNode)

{
            return getDelay(itemNode, System.currentTimeMillis());
        }
        
        private long getDelay(String itemNode, long sortTime)
        {  // 會從key上擷取時間戳        
    // step5
            long epoch = getEpoch(itemNode); 
            return epoch - sortTime; // 計算過期時間
        }
           

// 對應step5處的代碼

private static long getEpoch(String itemNode)

{
// itemNode -> queue-|時間戳|序号
    int     index2 = itemNode.lastIndexOf(SEPARATOR);
    int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
    if ( (index1 > 0) && (index2 > (index1 + 1)) )
    {
        try
        {
            String  epochStr = itemNode.substring(index1 + 1, index2);
            return Long.parseLong(epochStr, 16); // 從這裡可以知道queue-|這裡是16進制的時間戳了|序号| 可能是出于key長度的考量吧(更節省記憶體),用10進制的時間戳會長很多
        }
    }
    return 0;
}
           

// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren

// 會根據延時時間排序

// step6處的代碼片段

protected void sortChildren(List children)

{
            final long sortTime = System.currentTimeMillis();
            Collections.sort
            (
                children,
                new Comparator<String>()
                {
                    @Override
                    public int compare(String o1, String o2)
                    {
                        long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                        return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                    }
                }
            );
        }
           

// 對應step4處的代碼片段

private boolean processNormally(String itemNode, ProcessType type) throws Exception

{
    try
    {
        String  itemPath = ZKPaths.makePath(queuePath, itemNode);
        Stat    stat = new Stat();

        byte[]  bytes = null;
        if ( type == ProcessType.NORMAL )
        {
            // 擷取key對應的value
            bytes = client.getData().storingStatIn(stat).forPath(itemPath);
        }
        if ( client.getState() == CuratorFrameworkState.STARTED )
        {
           // 移除節點
                        client.delete().withVersion(stat.getVersion()).forPath(itemPath);
        }

        if ( type == ProcessType.NORMAL )
        {
        //step7
            processMessageBytes(itemNode, bytes);
        }

        return true;
    }

    return false;
}           

//對應step7處代碼,會回調我們的業務代碼

private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception

{
    ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
    MultiItem<T>                items;
    try
    {
      // 根據我們定義的序列化器序列化
        items = ItemSerializer.deserialize(bytes, serializer);
    }

    for(;;)
    {
     // 省略一部分代碼
        try
        {
            consumer.consumeMessage(item); // 這裡就會回調到我們的業務代碼
        }
    }
    return resultCode;
}           

總結

org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode這個方法也證明了确實是持久化且有序的節點;

如果過期時間太長而資料生産的過于頻繁的話,那麼勢必會造成資料的積壓對于性能和記憶體都是很大的考驗;

而且是用戶端不斷的循環擷取所有的節點、排序、再處理,由此我們也證明了前面猜想是排序後在服務端重新添加所有節點每次監聽第一個節點變化的想法看來是錯誤的;

原文位址

https://my.oschina.net/u/2486137/blog/3215445