天天看點

Multi-Programming-8 線程安全類實作生産者和消費者

1.線程安全類 ArrayBlockingQueue

java庫中提供了線程安全的類BlockingQueue, 使得我們在擷取一個空的隊列元素時被阻塞,在向一個滿隊列添加元素時被阻塞,線程等待直到滿足合适的條件為止。
BlockingQueue中的方法按照處理異常的方式共分為四大類:
1). 第一種是直接抛出異常;
2). 第二種是傳回一種特殊值,比如null或者false;
3). 在條件合适之前無限期地阻塞目前線程;
4). 在一個給定的最大時間内阻塞線程,超過時間則放棄。
java.util.concurrent.BlockingQueue<Integer>

    A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

    BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:
    Summary of BlockingQueue methods

這四種方法如下表所示:
           
Method Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time,unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE.

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the java.util.Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x). However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.

BlockingQueue implementations are thread-safe. 
注意:
1).BlockingQueue是不接受null值的,因為null值是該類中某些方法的實作中的傳回值的标志;
2).BlockingQueue可以設定容量大小,超過一定容量則會阻塞一些操作;
3).阻塞隊列設計的初衷是為了解決生産者和消費者問題的,但後來為了支援Collection接口,添加了add()/remove()等方法,一般不要使用它們,這樣效率比較低。
           

2. 代碼實作

package com.fqyuan.blog;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ConcurrentQueue {

    public static void main(String[] args) {
        ConcurrentQueueUtil.demonstrate();
    }

}

class SharedObjectCon {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>();
    Random random = new Random();

    public void produce() throws InterruptedException {
        while (true) {
            Thread.sleep();
            if (random.nextDouble() < )
                queue.put(random.nextInt());
        }
    }

    public void consume() throws InterruptedException {
        while (true) {
            Thread.sleep();
            if (random.nextInt() < ) {
                int val = queue.take();
                System.out.println("Value taken is " + val + "; queue size is " + queue.size());
            }

        }
    }
}

class ConcurrentQueueUtil {
    public static void demonstrate() {
        SharedObjectCon sharedObjectCon = new SharedObjectCon();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sharedObjectCon.produce();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sharedObjectCon.consume();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
           

運作結果:

Multi-Programming-8 線程安全類實作生産者和消費者
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
Value taken is ; queue size is 
           

3.Why not common containers like List?

List不是線程安全的。
           

相關問題參見疑問。

What does it mean array list is synchronized in java?

It means it is thread-safe.

Vectors are synchronized. Any method that touches the Vector's contents is thread safe.
ArrayList, on the other hand, is unsynchronized, making them, therefore, not thread safe.
           

相關代碼實作,點選 這裡。

繼續閱讀