TimeCacheMap是Twitter Storm裡面一個類, Storm使用它來儲存那些最近活躍的對象,并且可以自動删除那些已經過期的對象。
不過在storm0.8之後TimeCacheMap被棄用了,取而代之的是RotatingMap。
RotatingMap與TimeCacheMap的差別如下:
- 1.前者去掉了自動清理的線程,讓使用者自己去控制清理過期的資料,控制清理資料用rotate()方法,就是去尾加新頭。
- 2.前者get,put等方法都不加鎖了,需要使用者自己控制鎖
總之就是提供了更大的自由度,讓開發者去控制這個資料結構!下面先具體分析TimeCacheMap,而後RotatingMap就一目了然了
我直接在源碼中,加上中文的注釋分析源碼TimeCacheMap
package backtype.storm.utils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.utils.Time;
/**
* Expires keys that have not been updated in the configured number of seconds.
* The algorithm used will take between expirationSecs and
* expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
*
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
*
* The advantage of this design is that the expiration thread only locks the object
* for O(1) time, meaning the object is essentially always available for gets/puts.
*/
/**
*如果在配置的時間内沒有更新資料,這個資料就會被删
*expirationSecs * (1 + 1 / (numBuckets-1))解釋:
*
*假設_cleaner線程剛剛清理資料,put函數調用發生将key放入桶中,那麼一條資料的逾時時間為:
*expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))
*然而,假設put函數調用剛剛執行結束,_cleaner線程就開始清理資料,那麼一條資料的逾時時間為:
*expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs
*
*這個資料結構最大的好處是:資料分成多個桶,鎖的粒度小,隻要O(1)的複雜度就可以删掉過期資料。是以,大部分時間都可以進行get和put操作
*/
//deprecated in favor of non-threaded RotatingMap
//雖然在storm0.8之後TimeCacheMap被棄用了,不過其設計還是很獨到的,值得一探究竟
@Deprecated
public class TimeCacheMap<K, V> {
//this default ensures things expire at most 50% past the expiration time
private static final int DEFAULT_NUM_BUCKETS = 3;
//回調函數實作這個接口就可以,至少可以把删掉的元素傳回去
public static interface ExpiredCallback<K, V> {
public void expire(K key, V val);
}
//把資料分成多個桶,用連結清單是因為在頭尾的增減操作時O(1)
private LinkedList<HashMap<K, V>> _buckets;
private final Object _lock = new Object();
private Thread _cleaner;
private ExpiredCallback _callback;
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
if(numBuckets<2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
//構造函數中,按照桶的數量,初始桶
_buckets = new LinkedList<HashMap<K, V>>();
for(int i=0; i<numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
_callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);
_cleaner = new Thread(new Runnable() {
public void run() {
try {
while(true) {
Map<K, V> dead = null;
Time.sleep(sleepTime);
synchronized(_lock) {
//删掉最後一個桶,在頭補充一個新的桶,最後一個桶的資料是最舊的
dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
}
if(_callback!=null) {
for(Entry<K, V> entry: dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
}
} catch (InterruptedException ex) {
}
}
});
//作為守護線程運作,一旦主線程不在,這個線程自動結束
_cleaner.setDaemon(true);
_cleaner.start();
}
public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
}
public TimeCacheMap(int expirationSecs) {
this(expirationSecs, DEFAULT_NUM_BUCKETS);
}
public TimeCacheMap(int expirationSecs, int numBuckets) {
this(expirationSecs, numBuckets, null);
}
public boolean containsKey(K key) {
synchronized(_lock) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return true;
}
}
return false;
}
}
public V get(K key) {
synchronized(_lock) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.get(key);
}
}
return null;
}
}
public void put(K key, V value) {
synchronized(_lock) {
Iterator<HashMap<K, V>> it = _buckets.iterator();
HashMap<K, V> bucket = it.next();
//在第一個桶上更新資料
bucket.put(key, value);
//去掉後面桶的資料
while(it.hasNext()) {
bucket = it.next();
bucket.remove(key);
}
}
}
public Object remove(K key) {
synchronized(_lock) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.remove(key);
}
}
return null;
}
}
public int size() {
synchronized(_lock) {
int size = 0;
for(HashMap<K, V> bucket: _buckets) {
size+=bucket.size();
}
return size;
}
}
//這個方法也太迷惑人了,作用就是把清理線程殺掉,這樣資料就不會過期了,應該改名叫neverCleanup
public void cleanup() {
//中斷清理線程中的sleep,_cleaner線程會抛出異常,然後_cleaner線程就死了,不再清理過期資料了
_cleaner.interrupt(); //調用了interrupt後,再跑sleep就會抛InterruptedException異常
}
}
RotatingMap源碼幾乎和TimeCacheMap一樣,就是去掉清理線程去掉鎖,加了一個rotate()方法開發者自己清理過期資料
package backtype.storm.utils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
/**
* Expires keys that have not been updated in the configured number of seconds.
* The algorithm used will take between expirationSecs and
* expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
*
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
*
* The advantage of this design is that the expiration thread only locks the object
* for O(1) time, meaning the object is essentially always available for gets/puts.
*/
public class RotatingMap<K, V> {
//this default ensures things expire at most 50% past the expiration time
private static final int DEFAULT_NUM_BUCKETS = 3;
public static interface ExpiredCallback<K, V> {
public void expire(K key, V val);
}
private LinkedList<HashMap<K, V>> _buckets;
private ExpiredCallback _callback;
public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
if(numBuckets<2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
_buckets = new LinkedList<HashMap<K, V>>();
for(int i=0; i<numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
_callback = callback;
}
public RotatingMap(ExpiredCallback<K, V> callback) {
this(DEFAULT_NUM_BUCKETS, callback);
}
public RotatingMap(int numBuckets) {
this(numBuckets, null);
}
public Map<K, V> rotate() {
Map<K, V> dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
if(_callback!=null) {
for(Entry<K, V> entry: dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
return dead;
}
public boolean containsKey(K key) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return true;
}
}
return false;
}
public V get(K key) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.get(key);
}
}
return null;
}
public void put(K key, V value) {
Iterator<HashMap<K, V>> it = _buckets.iterator();
HashMap<K, V> bucket = it.next();
bucket.put(key, value);
while(it.hasNext()) {
bucket = it.next();
bucket.remove(key);
}
}
public Object remove(K key) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.remove(key);
}
}
return null;
}
public int size() {
int size = 0;
for(HashMap<K, V> bucket: _buckets) {
size+=bucket.size();
}
return size;
}
}
E-mail: [email protected]
https://www.linkedin.com/in/huahuiyang/