天天看點

Storm TimeCacheMap RotatingMap源碼分析

TimeCacheMap是Twitter Storm裡面一個類, Storm使用它來儲存那些最近活躍的對象,并且可以自動删除那些已經過期的對象。

不過在storm0.8之後TimeCacheMap被棄用了,取而代之的是RotatingMap。

RotatingMap與TimeCacheMap的差別如下:

  • 1.前者去掉了自動清理的線程,讓使用者自己去控制清理過期的資料,控制清理資料用rotate()方法,就是去尾加新頭。
  • 2.前者get,put等方法都不加鎖了,需要使用者自己控制鎖

總之就是提供了更大的自由度,讓開發者去控制這個資料結構!下面先具體分析TimeCacheMap,而後RotatingMap就一目了然了

我直接在源碼中,加上中文的注釋分析源碼TimeCacheMap

package backtype.storm.utils;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.utils.Time;

/**
 * Expires keys that have not been updated in the configured number of seconds.
 * The algorithm used will take between expirationSecs and
 * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
 *
 * get, put, remove, containsKey, and size take O(numBuckets) time to run.
 *
 * The advantage of this design is that the expiration thread only locks the object
 * for O(1) time, meaning the object is essentially always available for gets/puts.
 */
/**
 *如果在配置的時間内沒有更新資料,這個資料就會被删
 *expirationSecs * (1 + 1 / (numBuckets-1))解釋:
 *
 *假設_cleaner線程剛剛清理資料,put函數調用發生将key放入桶中,那麼一條資料的逾時時間為:
 *expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))
 *然而,假設put函數調用剛剛執行結束,_cleaner線程就開始清理資料,那麼一條資料的逾時時間為:
 *expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs
 *
 *這個資料結構最大的好處是:資料分成多個桶,鎖的粒度小,隻要O(1)的複雜度就可以删掉過期資料。是以,大部分時間都可以進行get和put操作
 */
//deprecated in favor of non-threaded RotatingMap
//雖然在storm0.8之後TimeCacheMap被棄用了,不過其設計還是很獨到的,值得一探究竟
@Deprecated
public class TimeCacheMap<K, V> {
    //this default ensures things expire at most 50% past the expiration time
    private static final int DEFAULT_NUM_BUCKETS = 3;

    
    //回調函數實作這個接口就可以,至少可以把删掉的元素傳回去
    public static interface ExpiredCallback<K, V> {
        public void expire(K key, V val);
    }

    //把資料分成多個桶,用連結清單是因為在頭尾的增減操作時O(1)
    private LinkedList<HashMap<K, V>> _buckets;

    private final Object _lock = new Object();
    private Thread _cleaner;
    private ExpiredCallback _callback;
    
    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        //構造函數中,按照桶的數量,初始桶
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }


        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            //删掉最後一個桶,在頭補充一個新的桶,最後一個桶的資料是最舊的
                            dead = _buckets.removeLast();
                            _buckets.addFirst(new HashMap<K, V>());
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                } catch (InterruptedException ex) {

                }
            }
        });
        //作為守護線程運作,一旦主線程不在,這個線程自動結束
        _cleaner.setDaemon(true);
        _cleaner.start();
    }

    public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
    }

    public TimeCacheMap(int expirationSecs) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS);
    }

    public TimeCacheMap(int expirationSecs, int numBuckets) {
        this(expirationSecs, numBuckets, null);
    }


    public boolean containsKey(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return true;
                }
            }
            return false;
        }
    }

    public V get(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.get(key);
                }
            }
            return null;
        }
    }

    public void put(K key, V value) {
        synchronized(_lock) {
            Iterator<HashMap<K, V>> it = _buckets.iterator();
            HashMap<K, V> bucket = it.next();
            //在第一個桶上更新資料
            bucket.put(key, value);
            //去掉後面桶的資料
            while(it.hasNext()) {
                bucket = it.next();
                bucket.remove(key);
            }
        }
    }
    
    public Object remove(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.remove(key);
                }
            }
            return null;
        }
    }

    public int size() {
        synchronized(_lock) {
            int size = 0;
            for(HashMap<K, V> bucket: _buckets) {
                size+=bucket.size();
            }
            return size;
        }
    }

    //這個方法也太迷惑人了,作用就是把清理線程殺掉,這樣資料就不會過期了,應該改名叫neverCleanup
    public void cleanup() {
        //中斷清理線程中的sleep,_cleaner線程會抛出異常,然後_cleaner線程就死了,不再清理過期資料了
        _cleaner.interrupt();  //調用了interrupt後,再跑sleep就會抛InterruptedException異常

    }    
}      

RotatingMap源碼幾乎和TimeCacheMap一樣,就是去掉清理線程去掉鎖,加了一個rotate()方法開發者自己清理過期資料

package backtype.storm.utils;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;

/**
 * Expires keys that have not been updated in the configured number of seconds.
 * The algorithm used will take between expirationSecs and
 * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
 *
 * get, put, remove, containsKey, and size take O(numBuckets) time to run.
 *
 * The advantage of this design is that the expiration thread only locks the object
 * for O(1) time, meaning the object is essentially always available for gets/puts.
 */
public class RotatingMap<K, V> {
    //this default ensures things expire at most 50% past the expiration time
    private static final int DEFAULT_NUM_BUCKETS = 3;

    public static interface ExpiredCallback<K, V> {
        public void expire(K key, V val);
    }

    private LinkedList<HashMap<K, V>> _buckets;

    private ExpiredCallback _callback;
    
    public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }

        _callback = callback;
    }

    public RotatingMap(ExpiredCallback<K, V> callback) {
        this(DEFAULT_NUM_BUCKETS, callback);
    }

    public RotatingMap(int numBuckets) {
        this(numBuckets, null);
    }   
    
    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if(_callback!=null) {
            for(Entry<K, V> entry: dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }

    public boolean containsKey(K key) {
        for(HashMap<K, V> bucket: _buckets) {
            if(bucket.containsKey(key)) {
                return true;
            }
        }
        return false;
    }

    public V get(K key) {
        for(HashMap<K, V> bucket: _buckets) {
            if(bucket.containsKey(key)) {
                return bucket.get(key);
            }
        }
        return null;
    }

    public void put(K key, V value) {
        Iterator<HashMap<K, V>> it = _buckets.iterator();
        HashMap<K, V> bucket = it.next();
        bucket.put(key, value);
        while(it.hasNext()) {
            bucket = it.next();
            bucket.remove(key);
        }
    }
    
    
    public Object remove(K key) {
        for(HashMap<K, V> bucket: _buckets) {
            if(bucket.containsKey(key)) {
                return bucket.remove(key);
            }
        }
        return null;
    }

    public int size() {
        int size = 0;
        for(HashMap<K, V> bucket: _buckets) {
            size+=bucket.size();
        }
        return size;
    }    
}      

E-mail: [email protected]

https://www.linkedin.com/in/huahuiyang/