天天看点

Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)

​ 在前面个两篇博文中,我们使用Java模拟了​​AND型信号量​​和​​信号量集​​,本文将使用Java来模拟管程,关于管程的一些理论知识,可以参考另一篇​​博客​​。

​ 对于管程,在这里我们不做具体的讨论了。不过对于Java和管程之间的一些事,还是很有意思的。Java中,每个对象其实都一个Monitor(java中翻译为监视器),Java中提供的synchronized关键字及wait()、notify()、notifyAll()方法,都是Monitor的一部分,或者说,在Jdk1.5之前也就是JUC没有出现之前,Java都是通过Monitor来实现并发的。Monitor在OS中或者别处的翻译是管程,我也更倾向于翻译为管程,Java中使用的是MESA管程,本文呢,我们就来模拟实现霍尔管和汉森管程,来加强我们对并发编程的能力和增加对线程同步问题的理解。

​ 在实现之前,我们先对几种管程的区别来简单的说一下,这里还是使用我在进程同步机制中说到的问题:如果进程P1因x条件处于阻塞状态,那么当进程P2执行了x.signal操作唤醒P1后,进程P1和P2此时同时处于管程中了,这是不被允许的,那么如何确定哪个执行哪个等待?这个问题也很简单,可采用下面的两种方式之一进行处理:

P2等待,直至P1离开管程或者等待另一个条件;

P1等待,直至P2离开管程或者等待另一个条件。

​ Hoare管程采用了第一种处理方式;MESA管程采用第二种方式;Hansan管程采用了两者的折中,它规定管程中的所有过程执行的signal操作是过程体的最后一个操作,于是,进程P2执行完signal操作后立即退出管程,因此进程P1马上被恢复执行。

​ 在下面不同管程的具体实现中,我们还是通过解决经典(一直都是他)的生产者–消费者问题来具体的解释。并且下面所有的实现,都是使用JUC中的ReentrantLock(可重入锁)+Condition(条件变量)来实现的。

1.霍尔管程

​ 首先是霍尔管程,也是因为我们对其理论介绍的最多,并且在理论篇中给出了其wait()、signal()操作的伪代码,因此我们首先来实现霍尔管程,我们使用内部类来实现上一文中cond和interf的数据定义,其代码如下:

package XXX.util;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 霍尔管程
*/
@Slf4j
public class MonitorsUtil {

//缓冲池中缓冲区
static final Integer N = 50;
//缓冲池
static List<Integer> buffer = new ArrayList<>(N);

//互斥锁,用以实现缓冲的互斥访问
static Lock lock = new ReentrantLock();
static cond notFull = new cond(lock, "notFull");
static cond notEmpty = new cond(lock, "notEmpty");
//用与存放因无法进入管程的阻塞队列&&因为调用signal阻塞自身的线程(Hoare)
static interf IM = new interf(lock);

public static void wait(String id, cond declar, interf IM) throws InterruptedException {
//获取锁,需要在获得锁的情况下才可以操作condition
declar.count++;
//log.info("当前condition中阻塞的线程数:【{},{},{},{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);

//判断是否有进程在高优先级队列中
if (IM.nextCount > 0) {
//唤醒因调用signal操作的线程
IM.next.release();
    }
log.info("线程【{}】调用wait被挂起到条件变量【{}】。", id, declar.name);
//挂起时自动释放锁,等待进入管程的队列可以获得锁并进入管程
declar.condition.await();
log.info("被挂起的线程【{}】被唤醒执行。", id);
declar.count--;
  }

public static void signal(String id, cond declar, interf IM) throws InterruptedException {
log.info("线程【{}】执行了释放资源", id);
if (declar.count > 0) {
//挂起自己后,因为调用signal挂起自己的进程数量加1
IM.nextCount++;
//唤醒因为条件变量而阻塞的线程
declar.condition.signal();
log.info("唤醒的条件变量为:【{}】", declar.name);
//log.info("释放后所有condition中阻塞的线程数:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);
//释放资源后,立即把自己挂起,进入高优先级队列-------Hoare的处理方式
log.info("线程【{}】调用signal被挂起。", id);
//释放lock,不然别的线程无法进入管程
lock.unlock();
//将当前线程插入到next的阻塞队列中
IM.next.acquire();
//再次获取锁-->进入管程
lock.lock();
log.info("被挂起的线程【{}】被唤醒执行。", id);
//恢复执行后,等待调用的管程的线程数量减1
IM.nextCount--;
    }
  }

static class interf {
//等待着进入管程的队列
Condition enter;
//等待着进入管程的阻塞队列中线程的数量
int enterCount;
//发出signal的进程挂起自己的信号量,信号量中记录着等待调用管程的进程
Semaphore next;
//在next上等待的线程数
int nextCount;

interf(Lock lock) {
enter = lock.newCondition();
enterCount = 0;
next = new Semaphore(0);
nextCount = 0;
    }
  }

static class cond {
String name;
Condition condition;
int count;

cond(Lock lock, String id) {
condition = lock.newCondition();
count = 0;
name = id;
    }

  }

//往缓冲区中投放消息
public static void putMessage(String id, Integer item) throws InterruptedException {
lock.lock();
//如果缓冲池满了,就挂起到notFull的阻塞队列中
log.info("执行了投放消息,缓冲池的消息的数量:【{}】", buffer.size());
while (buffer.size() >= N) {
log.info("缓冲池满,线程【{}】阻塞", id);
wait(id, notFull, IM);
    }
//保证互斥访问
//IM.mutex.acquire();
buffer.add(item);
//IM.mutex.release();
signal(id, notEmpty, IM);
//...     一些别的操作
lock.unlock();
  }

//从缓冲区中取消息消费
public static void getMessage(String id, Integer item) throws InterruptedException {
//保证互斥访问
lock.lock();
//如果缓冲池满了,就挂起到notFull的阻塞队列中
log.info("执行了消费消息,缓冲池的消息的数量:【{}】", buffer.size());
while (buffer.size() <= 0) {
wait(id, notEmpty, IM);
    }
item = buffer.remove(0);
log.info("消费了一条消息:【{}】", item);
//IM.mutex.release();
signal(id, notFull, IM);
//...     一些别的操作
lock.unlock();
  }

}
      

​ 上面的代码,我们通过ReenTrantLock来控制线程互斥的访问管程,管程提供的过程putMessage()、getMessage()通过先获取lock,保证可以进入管程线程只有一个,对于ReenTrantLock的强大功能来说,在这里成了我模拟霍尔管程的一大“阻碍”,因为ReenTrantLock+Condition,即使线程因调用了condition的wait而阻塞,当被唤醒再次执行时,需要重新去获取lock,如果获取不到就要被插入到阻塞队列中,只能等待lock被释放才有可能执行。因此在上面的signal中,为了保证霍尔管程的规定,我们在阻塞当前线程时,需要先释放lock锁,再次被唤醒时再次重新获得锁,这也是signal中重复的进行释放和获取的原因。

​ 另外,在我们的整个实现中,interf中设计被用来作为互斥进入管程的条件变量没有用到,lock自身的强大帮我们把这部分工作做了,这里保留是为了和理论篇的伪代码保持一致。

2.MESA管程

​ 因为Java实现并发参考的就是MESA模型的管程,因此其实现的ReenTrantLock+Condition就可以很完美的实现MESA管程,我们将霍尔管程中的signal操作进行修改,代码如下:

public static void signal(String id, cond declar, interf IM) throws InterruptedException {
log.info("线程【{}】执行了释放资源", id);
if (declar.count > 0) {
//挂起自己后,因为调用signal挂起自己的进程数量加1
IM.nextCount++;
//唤醒因为条件变量而阻塞的线程
declar.condition.signal();
log.info("唤醒的条件变量为:【{}】", declar.name);
//log.info("释放后所有condition中阻塞的线程数:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);
//释放资源,继续执行,直至线程退出管程后,别的线程才可进入-------MESA的处理方式
log.info("被挂起的线程【{}】被唤醒执行。", id);
//恢复执行后,等待调用的管程的线程数量减1
IM.nextCount--;
  }
}
      

​ 我们可以看到,我们只需将Hoare上面那段为了让线程将自己挂起并释放锁的代码去掉就可以实现MESA管程了。

3.汉森管程

​ 对于汉森管程,其实其实现和MESA是相同的,但是,根据其规定,signal应当是线程的最后一个操作,执行完signal操作后要立即退出管程,也就是说也释放lock,让别的线程可以进入管程,也就是其对应的操作代码要如下:

//从缓冲区中取消息消费
public static void getMessage(String id, Integer item) throws InterruptedException {
//保证互斥访问
lock.lock();
//如果缓冲池满了,就挂起到notFull的阻塞队列中
log.info("执行了消费消息,缓冲池的消息的数量:【{}】", buffer.size());
while (buffer.size() <= 0) {
wait(id, notEmpty, IM);
  }
item = buffer.remove(0);
log.info("消费了一条消息:【{}】", item);
//IM.mutex.release();
signal(id, notFull, IM);
//signal操作应当是最后一个操作,此处不再允许有别的操作,应当立即退出管程,
lock.unlock();
}
      

​ 也就是说,在每个过程执行完sginal后,应当立即退出管程,而不允许再执行别的操作。

4.执行结果分析

​ 这里呢,只对比霍尔管程和MESA管程(因为MESA管程在执行顺序上适合汉森管程一致的,只不过汉森管程在设计上变得更加严格,规定死了signal执行的位置),这里呢,我们先给生产者、消费者的代码:

/**
* 管程测试
*/
@Slf4j
public class MonitorsTest {

static Integer count = 0;

static class Producer extends Thread {
Producer(String name) {
super.setName(name);
        }

@Override
public void run() {
do {
try {
log.info("生产了一条消息:【{}】", count);
MonitorsUtil.putMessage(this.getName(), count++);
//Thread.sleep(1000);
                } catch (InterruptedException e) {
log.error("生产消息时产生异常!");
                }
            } while (true);
        }
    }

static class Consumer extends Thread {
Consumer(String name) {
super.setName(name);
        }

@Override
public void run() {
do {
try {
Integer item = -1;
MonitorsUtil.getMessage(this.getName(), item);
//Thread.sleep(1000);
                } catch (InterruptedException e) {
log.error("消费消息时产生异常!");
                }
            } while (true);
        }
    }

public static void main(String[] args) {
Producer p1 = new Producer("p1");
Producer p2 = new Producer("p2");
Producer p3 = new Producer("p3");
Producer p4 = new Producer("p4");

Consumer c1 = new Consumer("c1");
Consumer c2 = new Consumer("c2");
Consumer c3 = new Consumer("c3");
Consumer c4 = new Consumer("c4");

p1.start();
p2.start();
p3.start();
p4.start();
c1.start();
c2.start();
c3.start();
c4.start();
    }
}
      

​ 这里因为为了简单模拟消息,知识使用了整形的变量代替消息,首先我们来看下霍尔管程的执行结果,如下图所示,我们可以看到,c4执行signal操作,在释放完走远后,立刻将自身阻塞。

Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)

​ 下面我们来看下MESA管程的执行结果,如下图所示,对于执行结果来说,两种管程都可以保证并发的正确执行,但是MESA让线程的执行更顺畅,不会被频繁的阻塞,从结果中也能体现。

Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)

​ 又到了分隔线以下,本文到此就结束了,本文内容和代码都是博主自己本人整理和编写,如有错误,还请批评指正。

继续阅读