1、定義一個隊列緩存池:
private static List queueCache = new LinkedList();
2、定義隊列緩沖池最大消息數,如果達到該值,那麼隊列檢入将等待檢出低于該值時繼續進行。
private Integer offerMaxQueue = 2000;
3、定義檢出線程,如果隊列緩沖池沒有消息,那麼檢出線程會線程等待中
new Thread(){
public void run(){
while(true){
String ip = null;
try {
synchronized (queueCache) {
Integer size = queueCache.size();
if(size==0){
//隊列緩存池沒有消息,等待。。。。 queueCache.wait();
}
Queue queue = queueCache.remove(0);
if(isIpLock(queueStr)){//假若這個是一個多應用的分布式系統,那麼這個判斷應該是分布式鎖,這裡說的鎖不是線程停止,而是跳過該消息,滞後處理
queueCache.add(queue);該queue重新加入隊列緩沖池,滞後處理,
continue;
}else{
;//這裡是處理該消息的操作。
}
size = queueCache.size();
if(size<offerMaxQueue&&size>=0){ queueCache.notifyAll();//在隊列緩存池不超過最大值的前提下,假若檢入正在等待中,那麼那麼讓他們排隊檢入。
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
try {//檢出該消息隊列的鎖
unIpLock(queueStr);
} catch (Execption e) {//捕獲異常,不能讓線程挂掉
e.printStackTrace();
}
}
}
}.start();
4、檢入隊列
synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
try {
queueCache.wait();
continue;//繼續執行等待中的檢入任務。
} catch (InterruptedException e) {
e.printStackTrace();
}
}//IF
if(size<=offerMaxQueue&&size>0){
queueCache.notifyAll();
}
break;//檢入完畢
}//while
}
5、鎖方法實作
/**
* 鎖
* @param ip
* @return
* @throws
*/
public Boolean isLock(String queueStr) {
return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
}
//解鎖
public void unIpLock(String queueStr) {
if(ip!=null){
this.redisManager.del(queueStr+"_lock");
// lock.unlock();
}
}