天天看點

Java并發程式設計模拟管程(霍爾Hoare管程、漢森Hansan管程、MESA管程)

​ 在前面個兩篇博文中,我們使用Java模拟了​​AND型信号量​​和​​信号量集​​,本文将使用Java來模拟管程,關于管程的一些理論知識,可以參考另一篇​​部落格​​。

​ 對于管程,在這裡我們不做具體的讨論了。不過對于Java和管程之間的一些事,還是很有意思的。Java中,每個對象其實都一個Monitor(java中翻譯為螢幕),Java中提供的synchronized關鍵字及wait()、notify()、notifyAll()方法,都是Monitor的一部分,或者說,在Jdk1.5之前也就是JUC沒有出現之前,Java都是通過Monitor來實作并發的。Monitor在OS中或者别處的翻譯是管程,我也更傾向于翻譯為管程,Java中使用的是MESA管程,本文呢,我們就來模拟實作霍爾管和漢森管程,來加強我們對并發程式設計的能力和增加對線程同步問題的了解。

​ 在實作之前,我們先對幾種管程的差別來簡單的說一下,這裡還是使用我在程序同步機制中說到的問題:如果程序P1因x條件處于阻塞狀态,那麼當程序P2執行了x.signal操作喚醒P1後,程序P1和P2此時同時處于管程中了,這是不被允許的,那麼如何确定哪個執行哪個等待?這個問題也很簡單,可采用下面的兩種方式之一進行處理:

P2等待,直至P1離開管程或者等待另一個條件;

P1等待,直至P2離開管程或者等待另一個條件。

​ Hoare管程采用了第一種處理方式;MESA管程采用第二種方式;Hansan管程采用了兩者的折中,它規定管程中的所有過程執行的signal操作是過程體的最後一個操作,于是,程序P2執行完signal操作後立即退出管程,是以程序P1馬上被恢複執行。

​ 在下面不同管程的具體實作中,我們還是通過解決經典(一直都是他)的生産者–消費者問題來具體的解釋。并且下面所有的實作,都是使用JUC中的ReentrantLock(可重入鎖)+Condition(條件變量)來實作的。

1.霍爾管程

​ 首先是霍爾管程,也是因為我們對其理論介紹的最多,并且在理論篇中給出了其wait()、signal()操作的僞代碼,是以我們首先來實作霍爾管程,我們使用内部類來實作上一文中cond和interf的資料定義,其代碼如下:

package XXX.util;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 霍爾管程
*/
@Slf4j
public class MonitorsUtil {

//緩沖池中緩沖區
static final Integer N = 50;
//緩沖池
static List<Integer> buffer = new ArrayList<>(N);

//互斥鎖,用以實作緩沖的互斥通路
static Lock lock = new ReentrantLock();
static cond notFull = new cond(lock, "notFull");
static cond notEmpty = new cond(lock, "notEmpty");
//用與存放因無法進入管程的阻塞隊列&&因為調用signal阻塞自身的線程(Hoare)
static interf IM = new interf(lock);

public static void wait(String id, cond declar, interf IM) throws InterruptedException {
//擷取鎖,需要在獲得鎖的情況下才可以操作condition
declar.count++;
//log.info("目前condition中阻塞的線程數:【{},{},{},{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);

//判斷是否有程序在高優先級隊列中
if (IM.nextCount > 0) {
//喚醒因調用signal操作的線程
IM.next.release();
    }
log.info("線程【{}】調用wait被挂起到條件變量【{}】。", id, declar.name);
//挂起時自動釋放鎖,等待進入管程的隊列可以獲得鎖并進入管程
declar.condition.await();
log.info("被挂起的線程【{}】被喚醒執行。", id);
declar.count--;
  }

public static void signal(String id, cond declar, interf IM) throws InterruptedException {
log.info("線程【{}】執行了釋放資源", id);
if (declar.count > 0) {
//挂起自己後,因為調用signal挂起自己的程序數量加1
IM.nextCount++;
//喚醒因為條件變量而阻塞的線程
declar.condition.signal();
log.info("喚醒的條件變量為:【{}】", declar.name);
//log.info("釋放後所有condition中阻塞的線程數:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);
//釋放資源後,立即把自己挂起,進入高優先級隊列-------Hoare的處理方式
log.info("線程【{}】調用signal被挂起。", id);
//釋放lock,不然别的線程無法進入管程
lock.unlock();
//将目前線程插入到next的阻塞隊列中
IM.next.acquire();
//再次擷取鎖-->進入管程
lock.lock();
log.info("被挂起的線程【{}】被喚醒執行。", id);
//恢複執行後,等待調用的管程的線程數量減1
IM.nextCount--;
    }
  }

static class interf {
//等待着進入管程的隊列
Condition enter;
//等待着進入管程的阻塞隊列中線程的數量
int enterCount;
//發出signal的程序挂起自己的信号量,信号量中記錄着等待調用管程的程序
Semaphore next;
//在next上等待的線程數
int nextCount;

interf(Lock lock) {
enter = lock.newCondition();
enterCount = 0;
next = new Semaphore(0);
nextCount = 0;
    }
  }

static class cond {
String name;
Condition condition;
int count;

cond(Lock lock, String id) {
condition = lock.newCondition();
count = 0;
name = id;
    }

  }

//往緩沖區中投放消息
public static void putMessage(String id, Integer item) throws InterruptedException {
lock.lock();
//如果緩沖池滿了,就挂起到notFull的阻塞隊列中
log.info("執行了投放消息,緩沖池的消息的數量:【{}】", buffer.size());
while (buffer.size() >= N) {
log.info("緩沖池滿,線程【{}】阻塞", id);
wait(id, notFull, IM);
    }
//保證互斥通路
//IM.mutex.acquire();
buffer.add(item);
//IM.mutex.release();
signal(id, notEmpty, IM);
//...     一些别的操作
lock.unlock();
  }

//從緩沖區中取消息消費
public static void getMessage(String id, Integer item) throws InterruptedException {
//保證互斥通路
lock.lock();
//如果緩沖池滿了,就挂起到notFull的阻塞隊列中
log.info("執行了消費消息,緩沖池的消息的數量:【{}】", buffer.size());
while (buffer.size() <= 0) {
wait(id, notEmpty, IM);
    }
item = buffer.remove(0);
log.info("消費了一條消息:【{}】", item);
//IM.mutex.release();
signal(id, notFull, IM);
//...     一些别的操作
lock.unlock();
  }

}
      

​ 上面的代碼,我們通過ReenTrantLock來控制線程互斥的通路管程,管程提供的過程putMessage()、getMessage()通過先擷取lock,保證可以進入管程線程隻有一個,對于ReenTrantLock的強大功能來說,在這裡成了我模拟霍爾管程的一大“阻礙”,因為ReenTrantLock+Condition,即使線程因調用了condition的wait而阻塞,當被喚醒再次執行時,需要重新去擷取lock,如果擷取不到就要被插入到阻塞隊列中,隻能等待lock被釋放才有可能執行。是以在上面的signal中,為了保證霍爾管程的規定,我們在阻塞目前線程時,需要先釋放lock鎖,再次被喚醒時再次重新獲得鎖,這也是signal中重複的進行釋放和擷取的原因。

​ 另外,在我們的整個實作中,interf中設計被用來作為互斥進入管程的條件變量沒有用到,lock自身的強大幫我們把這部分工作做了,這裡保留是為了和理論篇的僞代碼保持一緻。

2.MESA管程

​ 因為Java實作并發參考的就是MESA模型的管程,是以其實作的ReenTrantLock+Condition就可以很完美的實作MESA管程,我們将霍爾管程中的signal操作進行修改,代碼如下:

public static void signal(String id, cond declar, interf IM) throws InterruptedException {
log.info("線程【{}】執行了釋放資源", id);
if (declar.count > 0) {
//挂起自己後,因為調用signal挂起自己的程序數量加1
IM.nextCount++;
//喚醒因為條件變量而阻塞的線程
declar.condition.signal();
log.info("喚醒的條件變量為:【{}】", declar.name);
//log.info("釋放後所有condition中阻塞的線程數:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);
//釋放資源,繼續執行,直至線程退出管程後,别的線程才可進入-------MESA的處理方式
log.info("被挂起的線程【{}】被喚醒執行。", id);
//恢複執行後,等待調用的管程的線程數量減1
IM.nextCount--;
  }
}
      

​ 我們可以看到,我們隻需将Hoare上面那段為了讓線程将自己挂起并釋放鎖的代碼去掉就可以實作MESA管程了。

3.漢森管程

​ 對于漢森管程,其實其實作和MESA是相同的,但是,根據其規定,signal應當是線程的最後一個操作,執行完signal操作後要立即退出管程,也就是說也釋放lock,讓别的線程可以進入管程,也就是其對應的操作代碼要如下:

//從緩沖區中取消息消費
public static void getMessage(String id, Integer item) throws InterruptedException {
//保證互斥通路
lock.lock();
//如果緩沖池滿了,就挂起到notFull的阻塞隊列中
log.info("執行了消費消息,緩沖池的消息的數量:【{}】", buffer.size());
while (buffer.size() <= 0) {
wait(id, notEmpty, IM);
  }
item = buffer.remove(0);
log.info("消費了一條消息:【{}】", item);
//IM.mutex.release();
signal(id, notFull, IM);
//signal操作應當是最後一個操作,此處不再允許有别的操作,應當立即退出管程,
lock.unlock();
}
      

​ 也就是說,在每個過程執行完sginal後,應當立即退出管程,而不允許再執行别的操作。

4.執行結果分析

​ 這裡呢,隻對比霍爾管程和MESA管程(因為MESA管程在執行順序上适合漢森管程一緻的,隻不過漢森管程在設計上變得更加嚴格,規定死了signal執行的位置),這裡呢,我們先給生産者、消費者的代碼:

/**
* 管程測試
*/
@Slf4j
public class MonitorsTest {

static Integer count = 0;

static class Producer extends Thread {
Producer(String name) {
super.setName(name);
        }

@Override
public void run() {
do {
try {
log.info("生産了一條消息:【{}】", count);
MonitorsUtil.putMessage(this.getName(), count++);
//Thread.sleep(1000);
                } catch (InterruptedException e) {
log.error("生産消息時産生異常!");
                }
            } while (true);
        }
    }

static class Consumer extends Thread {
Consumer(String name) {
super.setName(name);
        }

@Override
public void run() {
do {
try {
Integer item = -1;
MonitorsUtil.getMessage(this.getName(), item);
//Thread.sleep(1000);
                } catch (InterruptedException e) {
log.error("消費消息時産生異常!");
                }
            } while (true);
        }
    }

public static void main(String[] args) {
Producer p1 = new Producer("p1");
Producer p2 = new Producer("p2");
Producer p3 = new Producer("p3");
Producer p4 = new Producer("p4");

Consumer c1 = new Consumer("c1");
Consumer c2 = new Consumer("c2");
Consumer c3 = new Consumer("c3");
Consumer c4 = new Consumer("c4");

p1.start();
p2.start();
p3.start();
p4.start();
c1.start();
c2.start();
c3.start();
c4.start();
    }
}
      

​ 這裡因為為了簡單模拟消息,知識使用了整形的變量代替消息,首先我們來看下霍爾管程的執行結果,如下圖所示,我們可以看到,c4執行signal操作,在釋放完走遠後,立刻将自身阻塞。

Java并發程式設計模拟管程(霍爾Hoare管程、漢森Hansan管程、MESA管程)

​ 下面我們來看下MESA管程的執行結果,如下圖所示,對于執行結果來說,兩種管程都可以保證并發的正确執行,但是MESA讓線程的執行更順暢,不會被頻繁的阻塞,從結果中也能展現。

Java并發程式設計模拟管程(霍爾Hoare管程、漢森Hansan管程、MESA管程)

​ 又到了分隔線以下,本文到此就結束了,本文内容和代碼都是部落客自己本人整理和編寫,如有錯誤,還請批評指正。

繼續閱讀