天天看點

多線程下的消息隊列實作

1、定義一個隊列緩存池:

private static List queueCache = new LinkedList();

2、定義隊列緩沖池最大消息數,如果達到該值,那麼隊列檢入将等待檢出低于該值時繼續進行。

private Integer offerMaxQueue = 2000;

3、定義檢出線程,如果隊列緩沖池沒有消息,那麼檢出線程會線程等待中

new Thread(){
        public void run(){
          while(true){
            String ip = null;
            try {
              synchronized (queueCache) {
                Integer size = queueCache.size();
                if(size==0){
//隊列緩存池沒有消息,等待。。。。									queueCache.wait();
                }
                Queue queue = queueCache.remove(0);
 
                if(isIpLock(queueStr)){//假若這個是一個多應用的分布式系統,那麼這個判斷應該是分布式鎖,這裡說的鎖不是線程停止,而是跳過該消息,滞後處理
                  queueCache.add(queue);該queue重新加入隊列緩沖池,滞後處理,
                  continue;
                }else{
            ;//這裡是處理該消息的操作。
                }
                size = queueCache.size();
                if(size<offerMaxQueue&&size>=0){									queueCache.notifyAll();//在隊列緩存池不超過最大值的前提下,假若檢入正在等待中,那麼那麼讓他們排隊檢入。
                }
              }
            } catch (Exception e) {
              e.printStackTrace();
            }finally{
              try {//檢出該消息隊列的鎖
                unIpLock(queueStr);
              } catch (Execption e) {//捕獲異常,不能讓線程挂掉
                e.printStackTrace();
              }	
                                            }
            }
      }.start();
           

4、檢入隊列

synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
            try {
              queueCache.wait();
continue;//繼續執行等待中的檢入任務。
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
 }//IF
 
if(size<=offerMaxQueue&&size>0){
  queueCache.notifyAll();
}
break;//檢入完畢
}//while
}
           

5、鎖方法實作

/**
   * 鎖
   * @param ip
   * @return
   * @throws 
   */
  public Boolean isLock(String queueStr) {
    return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
  }
  //解鎖
  public void unIpLock(String queueStr) {
    if(ip!=null){
      this.redisManager.del(queueStr+"_lock");
//			lock.unlock();
    }
  }