天天看點

模仿ArrayBlockingQueue編寫一個批處理BlockingQueue

假設有個任務是要對cassandra資料做遷移,我們先将資料寫入kafka中,然後讓另一方從kafka中消費資料存到其他資料庫中就行了。由于cassandra查詢限制比較多,查詢占用的cpu資源大,如果不停歇的查資料,肯定要當機。是以,我們想通過分頁進行處理(cassandra分頁是個麻煩事,有興趣可以看看别人的部落格),比如每次分頁1000條。我們分别起兩個線程CassandraTask和KafakaTask,然後中間通過BatchBlockingQueue将他們兩個線程解耦,寫一批就讀一批,依次進行下去。這樣就能比較好控制斷點續傳等操作了。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Describe:
 * Author:  JerryChii.
 * Date:    2016/9/21
 */
public class ConcurrentBatchQueue<E> {
    final List<E> items;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting take */
    private final Condition putProcess;

    /** Condition for waiting put */
    private final Condition takeProcess;


    public ConcurrentBatchQueue() {
        items = new ArrayList<>();
        lock = new ReentrantLock(false);
        putProcess = lock.newCondition();
        takeProcess = lock.newCondition();
    }

    public List<E> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (items.size() == )
                takeProcess.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    /** 這裡如果有需要,要考慮以下一直給empty list的情況,可以在外面的邏輯中判斷 */
    public void put(List<E> toPut) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (items.size() != )
                putProcess.await();
            enqueue(toPut);
        } finally {
            lock.unlock();
        }
    }

    private List<E> dequeue() {
        List newList = new ArrayList();
        newList.addAll(items);
        items.clear();
        putProcess.signal();
        return newList;
    }

    private void enqueue(List<E> values) {
        checkNotNull(values);
        items.addAll(values);
        takeProcess.signal();
    }

    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }
}