天天看点

基于Berkeley DB实现的持久化队列

         转载地址:http://guoyunsky.iteye.com/blog/1169912

      队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.

      大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.

      这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb-je,common-io,junit.请在附件中下载

1、 自定义的BDB环境类,可以缓存StoredClassCatalog并共享

Java代码,如下:
package com.guoyun.util;  
import java.io.File; 
import com.sleepycat.bind.serial.StoredClassCatalog;  
import com.sleepycat.je.Database;  
import com.sleepycat.je.DatabaseConfig;  
import com.sleepycat.je.DatabaseException;  
import com.sleepycat.je.Environment;  
import com.sleepycat.je.EnvironmentConfig;  
/** 
 * BDB数据库环境,可以缓存StoredClassCatalog并共享 
 *  
 * @contributor guoyun 
 */  
public class BdbEnvironment extends Environment {  
    StoredClassCatalog classCatalog;   
    Database classCatalogDB;  

    /** 
     * Constructor 
     *  
     * @param envHome 数据库环境目录 
     * @param envConfig config options  数据库换纪念馆配置 
     * @throws DatabaseException 
     */  
    public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {  
        super(envHome, envConfig);  
    }  

    /** 
     * 返回StoredClassCatalog 
     * @return the cached class catalog 
     */  
    public StoredClassCatalog getClassCatalog() {  
        if(classCatalog == null) {  
            DatabaseConfig dbConfig = new DatabaseConfig();  
            dbConfig.setAllowCreate(true);  
            try {  
                classCatalogDB = openDatabase(null, "classCatalog", dbConfig);  
                classCatalog = new StoredClassCatalog(classCatalogDB);  
            } catch (DatabaseException e) {  
                // TODO Auto-generated catch block  
                throw new RuntimeException(e);  
            }  
        }  
        return classCatalog;  
    }  

    @Override  
    public synchronized void close() throws DatabaseException {  
        if(classCatalogDB!=null) {  
            classCatalogDB.close();  
        }  
        super.close();  
    }  

}  
           

2.  基于BDB实现的持久化队列

Java代码,如下:
package com.guoyun.util;  

import java.io.File;  
import java.io.IOException;  
import java.io.Serializable;  
import java.util.AbstractQueue;  
import java.util.Iterator;  
import java.util.concurrent.atomic.AtomicLong;  

import org.apache.commons.io.FileUtils;  

import com.sleepycat.bind.EntryBinding;  
import com.sleepycat.bind.serial.SerialBinding;  
import com.sleepycat.bind.serial.StoredClassCatalog;  
import com.sleepycat.bind.tuple.TupleBinding;  
import com.sleepycat.collections.StoredMap;  
import com.sleepycat.collections.StoredSortedMap;  
import com.sleepycat.je.Database;  
import com.sleepycat.je.DatabaseConfig;  
import com.sleepycat.je.DatabaseException;  
import com.sleepycat.je.DatabaseExistsException;  
import com.sleepycat.je.DatabaseNotFoundException;  
import com.sleepycat.je.EnvironmentConfig;  
/** 
 * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭 
 * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间 
 * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可, 
 * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现 
 *  
 * @contributor guoyun 
 * @param <E> 
 */  
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements  
        Serializable {  
    private static final long serialVersionUID = L;  
    private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化  
    private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化  
    private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化  
    private transient String dbDir;                 // 数据库所在目录  
    private transient String dbName;                // 数据库名字  
    private AtomicLong headIndex;                   // 头部指针  
    private AtomicLong tailIndex;                   // 尾部指针  
    private transient E peekItem=null;              // 当前获取的值  

    /** 
     * 构造函数,传入BDB数据库 
     *  
     * @param db 
     * @param valueClass 
     * @param classCatalog 
     */  
    public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){  
        this.queueDb=db;  
        this.dbName=db.getDatabaseName();  
        headIndex=new AtomicLong();  
        tailIndex=new AtomicLong();  
        bindDatabase(queueDb,valueClass,classCatalog);  
    }  
    /** 
     * 构造函数,传入BDB数据库位置和名字,自己创建数据库 
     *  
     * @param dbDir 
     * @param dbName 
     * @param valueClass 
     */  
    public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){  
        headIndex=new AtomicLong();  
        tailIndex=new AtomicLong();  
        this.dbDir=dbDir;  
        this.dbName=dbName;  
        createAndBindDatabase(dbDir,dbName,valueClass);  
    }  
    /** 
     * 绑定数据库 
     *  
     * @param db 
     * @param valueClass 
     * @param classCatalog 
     */  
    public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){  
        EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);  
        if(valueBinding == null) {  
            valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定  
        }  
        queueDb = db;  
        queueMap = new StoredSortedMap<Long,E>(  
                db,                                             // db  
                TupleBinding.getPrimitiveBinding(Long.class),   //Key  
                valueBinding,                                   // Value  
                true);                                          // allow write  
    }  
    /** 
     * 创建以及绑定数据库 
     *  
     * @param dbDir 
     * @param dbName 
     * @param valueClass 
     * @throws DatabaseNotFoundException 
     * @throws DatabaseExistsException 
     * @throws DatabaseException 
     * @throws IllegalArgumentException 
     */  
    private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,  
    DatabaseExistsException,DatabaseException,IllegalArgumentException{  
        File envFile = null;  
        EnvironmentConfig envConfig = null;  
        DatabaseConfig dbConfig = null;  
        Database db=null;  

        try {  
            // 数据库位置  
            envFile = new File(dbDir);  

            // 数据库环境配置  
            envConfig = new EnvironmentConfig();  
            envConfig.setAllowCreate(true);  
            envConfig.setTransactional(false);  

            // 数据库配置  
            dbConfig = new DatabaseConfig();  
            dbConfig.setAllowCreate(true);  
            dbConfig.setTransactional(false);  
            dbConfig.setDeferredWrite(true);  

            // 创建环境  
            dbEnv = new BdbEnvironment(envFile, envConfig);  
            // 打开数据库  
            db = dbEnv.openDatabase(null, dbName, dbConfig);  
            // 绑定数据库  
            bindDatabase(db,valueClass,dbEnv.getClassCatalog());  

        } catch (DatabaseNotFoundException e) {  
            throw e;  
        } catch (DatabaseExistsException e) {  
            throw e;  
        } catch (DatabaseException e) {  
            throw e;  
        } catch (IllegalArgumentException e) {  
            throw e;  
        }  


    }  

    /** 
     * 值遍历器 
     */  
    @Override  
    public Iterator<E> iterator() {  
        return queueMap.values().iterator();  
    }  
    /** 
     * 大小 
     */  
    @Override  
    public int size() {  
        synchronized(tailIndex){  
            synchronized(headIndex){  
                return (int)(tailIndex.get()-headIndex.get());  
            }  
        }  
    }  

    /** 
     * 插入值 
     */  
    @Override  
    public boolean offer(E e) {  
        synchronized(tailIndex){  
            queueMap.put(tailIndex.getAndIncrement(), e);   // 从尾部插入  
        }  
        return true;  
    }  

    /** 
     * 获取值,从头部获取 
     */  
    @Override  
    public E peek() {  
        synchronized(headIndex){  
            if(peekItem!=null){  
                return peekItem;  
            }  
            E headItem=null;  
            while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围  
                headItem=queueMap.get(headIndex.get());  
                if(headItem!=null){  
                    peekItem=headItem;  
                    continue;  
                }   
                headIndex.incrementAndGet();    // 头部指针后移  
            }  
            return headItem;  
        }  
    }  

    /** 
     * 移出元素,移出头部元素 
     */  
    @Override  
    public E poll() {  
        synchronized(headIndex){  
            E headItem=peek();  
            if(headItem!=null){  
                queueMap.remove(headIndex.getAndIncrement());  
                peekItem=null;  
                return headItem;  
            }  
        }  
        return null;  
    }  

    /** 
     * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境 
     */  
    public void close(){  
        try {  
            if(queueDb!=null){  
                queueDb.sync();  
                queueDb.close();  
            }  
        } catch (DatabaseException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } catch (UnsupportedOperationException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  

    /** 
     * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close() 
     */  
    @Override  
    public void clear() {  
       try {  
           close();  
           if(dbEnv!=null&&queueDb!=null){  
                dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);   
                dbEnv.close();  
           }  
        } catch (DatabaseNotFoundException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } catch (DatabaseException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } finally{  
            try {  
                if(this.dbDir!=null){  
                    FileUtils.deleteDirectory(new File(this.dbDir));  
                }  

            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
    }  

}  
           

3. 测试类,测试数据准确性和性能

Java代码,如下:
package com.guoyun.util;  

import java.io.File;  
import java.util.Queue;  
import java.util.concurrent.LinkedBlockingQueue;  

import junit.framework.TestCase;  

public class BdbPersistentQueueTest extends TestCase{  
    Queue<String> memoryQueue;  
    Queue<String> persistentQueue;  

    @Override  
    protected void setUp() throws Exception {  
        super.setUp();  
        memoryQueue=new LinkedBlockingQueue<String>();  
        String dbDir="E:/java/test/bdbDir";  
        File file=new File(dbDir);  
        if(!file.exists()||!file.isDirectory()){  
            file.mkdirs();  
        }  
        persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class);  
    }  

    @Override  
    protected void tearDown() throws Exception {  
        super.tearDown();  
        memoryQueue.clear();  
        memoryQueue=null;  
        persistentQueue.clear();  
        persistentQueue=null;  
    }  

    /** 
     * 排放值 
     * @param queue 
     * @return      排放的数据个数 
     */  
    public int drain(Queue<String> queue){  
        int count=;  
        while(true){  
            try {  
                queue.remove();  
                count++;  
            } catch (Exception e) {  
                return count;  
            }  
        }  

    }  
    /** 
     *  
     * @param queue 
     * @param size 
     */  
    public void fill(Queue<String> queue,int size){  
        for(int i=;i<size;i++){  
            queue.add(i+"");  
        }  
    }  

    public void checkTime(int size){  
        System.out.println("1.内存Queue插入和排空数据所耗时间");  
        long time=;  
        long start=System.nanoTime();  
        fill(memoryQueue,size);  
        time=System.nanoTime()-start;  
        System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");  
        start=System.nanoTime();  
        drain(memoryQueue);  
        time=System.nanoTime()-start;  
        System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");  

        System.out.println("2.持久化Queue插入和排空数据所耗时间");  
        start=System.nanoTime();  
        fill(persistentQueue,size);  
        time=System.nanoTime()-start;  
        System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/+" 毫秒,单条耗时: "+((double)time/size/)+" 豪秒");  
        start=System.nanoTime();  
        drain(persistentQueue);  
        time=System.nanoTime()-start;  
        System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/+" 毫秒,单条耗时: "+((double)time/size/)+" 豪秒");  

    }  

    /** 
     * 十万数量级测试 
     */  
    public void testTime_tenThousand(){  
        System.out.println("========测试1000000(十万)条数据=================");  
        checkTime();  
    }  


    /** 
     * 百万数量级测试 
     */  
    public void testTime_mil(){  
        System.out.println("========测试1000000(百万)条数据=================");  
        checkTime();  
    }  


    /** 
     * 千万数量级测试,注意要防止内存溢出 
     */  
    public void testTime_tenMil(){  
        System.out.println("========测试10000000(千万)条数据=================");  
        checkTime();  
    }  

    /** 
     * 测试队列数据准确性 
     * @param queue 
     * @param queueName 
     * @param size 
     */  
    public void checkDataExact(Queue<String> queue,String queueName,int size){  
        if(queue.size()!=size){  
            System.err.println("Error size of "+queueName);  
        }  
        String value=null;  
        for(int i=;i<size;i++){  
            value=queue.remove();  
            if(!((i+"").equals(value))){  
                System.err.println("Error "+queueName+":"+i+"->"+value);  
            }  
        }  
    }  

    /** 
     * 测试队列中数据的准确性,包括长度 
     */  
    public void testExact(){  
        int size=;  
        fill(memoryQueue,size);  
        fill(persistentQueue,size);  

        checkDataExact(memoryQueue,"MemoryQueue",);  
        checkDataExact(persistentQueue,"PersistentQueue",);  

    }  

}  
           

4. 测试性能

========测试1000000(十万)条数据=================

1.内存Queue插入和排空数据所耗时间

 填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒

 排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒

2.持久化Queue插入和排空数据所耗时间

 填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒

 排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒

 持久化写入是内存写入的26倍,读取是77倍

========测试1000000(百万)条数据=================

1.内存Queue插入和排空数据所耗时间

 填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒

 排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒

2.持久化Queue插入和排空数据所耗时间

 填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒

 排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒

 持久化写入是内存写入的17倍,读取是141倍

========测试10000000(千万)条数据=================

1.内存Queue插入和排空数据所耗时间

 填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒

 排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒

2.持久化Queue插入和排空数据所耗时间

 填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒

 排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒

 持久化写入是内存写入的15倍,读取是245倍

可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.