天天看點

Java多線程程式設計-(4)-線程間通信機制的介紹與使用

上一篇:

Java多線程程式設計-(1)-線程安全和鎖Synchronized概念

Java多線程程式設計-(2)-可重入鎖以及Synchronized的其他基本特性

Java多線程程式設計-(3)-線程本地ThreadLocal的介紹與使用

線程間通信簡介

我們知道線程是作業系統中獨立的個體,但是這個單獨的個體之間沒有一種特殊的處理方式使之成為一個整體,線程之間沒有任何交流和溝通的話,他就是一個個單獨的個體,不足以形成一個強大的互動性較強的整體。

為了提高CPU的使用率和各線程之間互相協作,Java的一種實作線程間通信的機制是:wait/notify線程間通信,下邊就一起學習一下這種線程間的通信機制。

不使用等待/通知機制實作線程間通信

假如,我們不使用下邊需要介紹的機制,那我們如何實作兩個線程之間的通信哪,下邊看一段代碼,實作的是兩個線程向一個List裡填充資料:

MyList代碼:

public class MyList {

    private List list = new ArrayList();

    public void add() {
        list.add("我是元素");
    }

    public int size() {
        return list.size();
    }

}
           

線程A:

public class ThreadA extends Thread {

    private MyList list;

    public ThreadA(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            for (int i = ; i < ; i++) {
                list.add();
                System.out.println("添加了" + (i + ) + "個元素");
                Thread.sleep();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
           

線程B:

public class ThreadB extends Thread {

    private MyList list;

    public ThreadB(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == ) {
                    System.out.println("==5了,線程b要退出了!");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

測試類Test:

public class Test {

    public static void main(String[] args) {
        MyList myList = new MyList();

        ThreadA a = new ThreadA(myList);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(myList);
        b.setName("B");
        b.start();
    }
}
           

執行結果:

添加了個元素
添加了個元素
添加了個元素
添加了個元素
添加了個元素
==了,線程b要退出了!
java.lang.InterruptedException
    at text.ThreadB.run(ThreadB.java:)
添加了個元素
添加了個元素
添加了個元素
添加了個元素
添加了個元素
           

可以看出,當List集合中的資料為5個的時候線程B退出,雖然兩個線程之間實作了通信,但是代碼中我們的線程B是一直執行着

while(true)

循環的,直到長度為5才終止執行,顯然這種方式是很消耗資源的。是以,就需要一種機制能避免上述的操作又能實作多個線程之間的通信,這就是接下來需要學習的“wait/notify線程間通信”。

什麼是等待/通知機制

道理很簡單,就像我們去銀行辦業務,進門之後取票号,等到達的時候會廣播通知我們辦業務一樣,這就是很實際的一個場景,我們取了票号就需要等待,等業務員輪到票号的時候就會廣播通知。

Java中等待/通知機制的實作

Java中對應等待/通知的方法是wait()/notify(),這兩個方法都是超類Object中的方法,如下圖所示:

Java多線程程式設計-(4)-線程間通信機制的介紹與使用

之是以會是超類Object中的方法,我們可以簡單的了解:上幾篇文章中我們知道任何對象都可以作為鎖,而wait()/notify()是由鎖調用的,想到這裡自然可以體會到這裡設計的巧妙之處。

一、wait方法

(1)方法wait()的作用是使目前執行代碼的線程進行等待,該方法會将該線程放入”預執行隊列“中,并且在wait()所在的代碼處停止執行,直到接到通知或被中斷為止。

(2)在調用wait()之前,線程必須獲得該對象級别鎖,這是一個很重要的地方,很多時候我們可能會忘記這一點,即隻能在同步方法或同步塊中調用wait()方法。

(3)還需要注意的是wait()是釋放鎖的,即在執行到wait()方法之後,目前線程會釋放鎖,當從wait()方法傳回前,線程與其他線程競争重新獲得鎖。

二、notify方法

(1)和wait()方法一樣,notify()方法也要在同步塊或同步方法中調用,即在調用前,線程也必須獲得該對象的對象級别鎖。

(2)該方法是用來通知那些可能等待該對象的對象鎖的其他線程,如果有多個線程等待,則由線程規劃器随機挑選出其中一個呈wait狀态的線程,對其發出通知notify,并使它等待擷取該對象的對象鎖。

(3)這裡需要注意的是,執行notify方法之後,目前線程不會立即釋放其擁有的該對象鎖,而是執行完之後才會釋放該對象鎖,被通知的線程也不會立即獲得對象鎖,而是等待notify方法執行完之後,釋放了該對象鎖,才可以獲得該對象鎖。

(3)notifyAll()通知所有等待同一共享資源的全部線程從等待狀态退出,進入可運作狀态,重新競争獲得對象鎖。

三、wait()/notify()方法總結

(1)wait()/notify()要集合synchronized關鍵字一起使用,因為他們都需要首先擷取該對象的對象鎖;

(2)wait方法是釋放鎖,notify方法是不釋放鎖的;

(3)線程的四種狀态如下圖:

Java多線程程式設計-(4)-線程間通信機制的介紹與使用

wait/notify線程間通信示例代碼

根據上述不使用wait/notify的代碼改造如下:

MyList代碼:

public class MyList {

    private static List list = new ArrayList();

    public static void add() {
        list.add("我是元素");
    }

    public static int size() {
        return list.size();
    }
}
           

線程A:

public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != ) {
                    System.out.println("wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end  " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

線程B:

public class ThreadB extends Thread {

    private Object lock;

    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = ; i < ; i++) {
                    MyList.add();
                    if (MyList.size() == ) {
                        lock.notify();
                        System.out.println("已發出通知!");
                    }
                    System.out.println("添加了" + (i + ) + "個元素!");
                    Thread.sleep();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

測試代碼:

public class Run {

    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep();
            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

運作結果:

wait begin 
添加了個元素!
添加了個元素!
添加了個元素!
添加了個元素!
已發出通知!
添加了個元素!
添加了個元素!
添加了個元素!
添加了個元素!
添加了個元素!
添加了個元素!
wait end  
           

上述執行個體已經實作了簡單的等待通知機制,并且我們也可以看到,雖然線程B在第五個元素的時候發出通知,而線程A實作線程B執行完之後才獲得對象鎖,這也可以說明,wait方法是釋放鎖的而notify方法是不釋放鎖的。

另一個案例:使用wait/notify模拟BlockingQueue阻塞隊列

BlockingQueue是阻塞隊列,我們需要實作的是阻塞的放入和得到資料,設計思路如下:

(1)初始化隊列最大長度為5;

(2)需要新加入的時候,判斷是否長度為5,如果是5則等待插入;

(3)需要消費元素的時候,判斷是否為0,如果是0則等待消費;

實作代碼如下:

public class MyQueue {

    //1、需要一個承裝元素的集合
    private final LinkedList<Object> list = new LinkedList<>();
    //2、需要一個計數器
    private final AtomicInteger count = new AtomicInteger();
    //3、需要指定上限和下限
    private final int maxSize = ;
    private final int minSize = ;

    //5、初始化鎖對象
    private final Object lock = new Object();

    /**
     * put方法
     */
    public void put(Object obj) {
        synchronized (lock) {
            //達到最大無法添加,進入等到
            while (count.get() == maxSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(obj); //加入元素
            count.getAndIncrement(); //計數器增加
            System.out.println(" 元素 " + obj + " 被添加 ");
            lock.notify(); //通知另外一個阻塞的線程方法
        }
    }

    /**
     * get方法
     */
    public Object get() {
        Object temp;
        synchronized (lock) {
            //達到最小,沒有元素無法消費,進入等到
            while (count.get() == minSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count.getAndDecrement();
            temp = list.removeFirst();
            System.out.println(" 元素 " + temp + " 被消費 ");
            lock.notify();
        }
        return temp;
    }

    private int size() {
        return count.get();
    }

    public static void main(String[] args) throws Exception {

        final MyQueue myQueue = new MyQueue();
        initMyQueue(myQueue);

        Thread t1 = new Thread(() -> {
            myQueue.put("h");
            myQueue.put("i");
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep();
                myQueue.get();
                Thread.sleep();
                myQueue.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");

        t1.start();
        Thread.sleep();
        t2.start();

    }

    private static void initMyQueue(MyQueue myQueue) {
        myQueue.put("a");
        myQueue.put("b");
        myQueue.put("c");
        myQueue.put("d");
        myQueue.put("e");
        System.out.println("目前元素個數:" + myQueue.size());
    }
}
           

執行結果:

元素 a 被添加 
 元素 b 被添加 
 元素 c 被添加 
 元素 d 被添加 
 元素 e 被添加 
目前元素個數:
 元素 a 被消費 
 元素 h 被添加 
 元素 b 被消費 
 元素 i 被添加 
           

其他注意事項

(1)wait()和notify()方法要在同步塊或同步方法中調用,即在調用前,線程也必須獲得該對象的對象級别鎖。

(2)wait方法是釋放鎖,notify方法是不釋放鎖的;

(3)notify每次喚醒wait等待狀态的線程都是随機的,且每次隻喚醒一個;

(4)notifAll每次喚醒wait等待狀态的線程使之重新競争擷取對象鎖,優先級最高的那個線程會最先執行;

(5)當線程處于wait()狀态時,調用線程對象的interrupt()方法會出現InterruptedException異常;

其他知識點

(1)程序間的通信方式:

管道(pipe)、有名管道(named pipe)、信号量(semophore)、消息隊列(message queue)、信号(signal)、共享記憶體(shared memory)、套接字(socket);

(2)線程程間的通信方式:

1、鎖機制

1.1 互斥鎖:提供了以排它方式阻止資料結構被并發修改的方法。

1.2 讀寫鎖:允許多個線程同時讀共享資料,而對寫操作互斥。

1.3 條件變量:可以以原子的方式阻塞程序,直到某個特定條件為真為止。

對條件測試是在互斥鎖的保護下進行的。條件變量始終與互斥鎖一起使用。

2、信号量機制:包括無名線程信号量與有名線程信号量

3、信号機制:類似于程序間的信号處理。

線程間通信的主要目的是用于線程同步,是以線程沒有象程序通信中用于資料交換的通信機制。