假設有個任務是要對cassandra資料做遷移,我們先将資料寫入kafka中,然後讓另一方從kafka中消費資料存到其他資料庫中就行了。由于cassandra查詢限制比較多,查詢占用的cpu資源大,如果不停歇的查資料,肯定要當機。是以,我們想通過分頁進行處理(cassandra分頁是個麻煩事,有興趣可以看看别人的部落格),比如每次分頁1000條。我們分别起兩個線程CassandraTask和KafakaTask,然後中間通過BatchBlockingQueue将他們兩個線程解耦,寫一批就讀一批,依次進行下去。這樣就能比較好控制斷點續傳等操作了。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Describe:
* Author: JerryChii.
* Date: 2016/9/21
*/
public class ConcurrentBatchQueue<E> {
final List<E> items;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting take */
private final Condition putProcess;
/** Condition for waiting put */
private final Condition takeProcess;
public ConcurrentBatchQueue() {
items = new ArrayList<>();
lock = new ReentrantLock(false);
putProcess = lock.newCondition();
takeProcess = lock.newCondition();
}
public List<E> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (items.size() == )
takeProcess.await();
return dequeue();
} finally {
lock.unlock();
}
}
/** 這裡如果有需要,要考慮以下一直給empty list的情況,可以在外面的邏輯中判斷 */
public void put(List<E> toPut) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (items.size() != )
putProcess.await();
enqueue(toPut);
} finally {
lock.unlock();
}
}
private List<E> dequeue() {
List newList = new ArrayList();
newList.addAll(items);
items.clear();
putProcess.signal();
return newList;
}
private void enqueue(List<E> values) {
checkNotNull(values);
items.addAll(values);
takeProcess.signal();
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
}