天天看點

進階并發對象(Councurrency Tutorial 7)

[size=medium][color=blue]進階并發對象[/color][/size]

到目前為止,上幾個章節中,我們重點講了Java架構最原始的低級API。這些API對于完成基本任務來說已經足夠了,但是對于更進階的工作我們需要更進階的API。對于那些需要充分利用現代多處理器和多核系統的大規模高并發應用尤其如此。

本節中,我們将看到一些Java5.0新增的進階并發特性。這些特性大部分實作在java.tuil.concurrenct 包中。在Java集合架構中也有一些新的并發資料結構。

鎖對象(Lock objects) 提供了可以簡化許多并發應用的鎖的慣用法(locking idioms)。

[list][*]Executors 為加載和管理線程定義了進階API。Executors提供了适合大型應用的線程池管理。

[*]并發集合(Concurrent collections)簡化了對大型資料集合的管理,可以大大降低同步的需要。

[*]原子對象(Atomic variables) 可以幫助縮小同步粒度和避免記憶體不一緻問題(memory consistency errors)。

[*]ThreadLocalRandom(JDK 7) 提供高效的多線程生成僞随機數的方法。[/list]

[color=blue]鎖對象(Lock Objects)[/color]

同步方法與同步代碼塊依賴一個簡單的可重入鎖,這種鎖很容易使用,但是有諸多限制。Java.util.concurrent.locks包中提供了更多複雜的常用鎖對象。我們不會詳細的描述這個包,但是我們會關注它的最基礎的接口Lock。

Lock對象的工作機制類似于同步代碼塊使用的隐含鎖。與隐含鎖一樣,在某一時刻隻允許一個線程擁有鎖對象。通過它們關聯的Condition對象,鎖對象也支援wait/notify機制。

與隐含鎖相比,Lock對象最大的優勢是它可以撤消擷取一個鎖對象的嘗試。tryLock方法若沒辦法立即或在預設的逾時時間範圍内無法獲得鎖對象,它将會撤消獲得鎖的嘗試(即,撤出鎖對象的等待隊列)。lockInterruptibly方法在收到另一線程發送的中斷請求後會取消擷取鎖對象的嘗試。

讓我們使用Lock對象來解決下我們在活性(Liveness)一節中看到的死鎖問題。Alphonse和Gaston會注意到對方鞠躬的動作。我們通過強制雙方在鞠躬之前必須擷取雙方的鎖對象來改善模型。下面是改善模型後的代碼。為了示範普遍原理,我們假設Alphonse和Gaston非常癡迷于新獲得的能夠安全鞠躬的能力,以至于他們不能停止向對方鞠躬:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;

public class Safelock {
    static class Friend {
        private final String name;
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                if (! (myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            return myLock && yourLock;
        }

        public void bow(Friend bower) {
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has"
                        + " bowed to me!%n", 
                        this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                System.out.format("%s: %s started"
                    + " to bow to me, but saw that"
                    + " I was already bowing to"
                    + " him.%n",
                    this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" +
                " bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }

        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {}
                bowee.bow(bower);
            }
        }
    }


    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}
           

[color=blue]Executors[/color]

目前為止,在在所有示例中,我們通過定義Runnable對象描述被新線程執行的任務,通過定義Thread對象描述線程本身。兩者緊緊聯系在一起。這在小型應用中能夠很好的工作,但是在大型應用中,把線程的管理和建立與線程的使用分割開來是非常重要的。封裝了線程建立和管理的對象被叫做executors.下面的章節将較長的描述executors。

[list][*]Executor接口定義了三種executor對象類型

[*]線程池(Thread Pools) 是executor最普遍的實作類型

[*]Fork/Join (JDK 7)是一個多程序架構[/list]

[color=orange]Executor接口[/color]

Java.util.concurrent包定義了三種executor接口:

[list][*]Executor,支援加載新任務的簡單接口

[*]ExecutorService,Executor的子接口,增加了幫助管理任務和executor本身的生命周期的功能。

[*]ScheduledExecutorService,ExecutorService的子接口,支援周期性地執行任務。[/list]

要指出的是,Executor對象都聲明為這三種接口的引用而不是某個Executor實作類的引用。(譯注:Executor的執行個體,通過工廠(Executors)模式建立,沒有具體的實作類)

[color=red]Executor[/color]

Executor接口隻定義了一個方法execute,被設計用來代替一般的建立線程慣例,如果r是一個Runnable對象,e是一個Executor對象你可以将:

(new Thread(r)).start();
           

替換為:

e.execute(r);
           

然而,對于execute方法的實作并沒有特殊要求。低級的實作隻是建立一個新的線程并立即執行。基于Executor的實作,execute可能就是這樣做的,但更有可能是使用一個已經存在的工作線程(worker Threads)去執行r,或者将r放在一個執行隊列中等待工作線程有空的時候再執行。(我們會線上程池一節中描述工作線程)。

Java.util.concurrent包中的executor實作被設計成能夠充分利用更進階的ExecutorService和ScheduledExecutorService接口,它們也和Executor接口一起工作。

[color=red]ExecutorService[/color]

ExecutorService接口提供了另一個相似的submit方法,但比execute更加通用。和execute一樣,submit接受Runnable對象,但也接受Callable對象,Callable允許任務執行後傳回一個值。Submit方法傳回Future對象,Future對象被用來接收Callable傳回的值,并管理Callable和Runnable對象所代表的任務。(譯注:此處提供一個能夠描述submit 及Future 的優勢[url]http://623deyingxiong.iteye.com/admin/blogs/1753975[/url])

ExecutorService也提供處理Callable對象(譯注:Callable對象類似于Runnable,但其”call”方法可以有傳回值)的方法。最後,ExecutorService提供大量的方法管理executor的關閉。為了支援立即停止(immediate shutdown),任務應該正确地進行中斷請求(譯注:即線上程接收到中斷通知後立即終止執行)。

[color=red]ScheduledExecutorService[/color]

ScheduledExecutorService接口為它的父類ExecutorService的行為提供計劃,允許在執行Runnable和Callable任務之前停頓一段時間。接口定義了scheduleAtFixedRate和scheduleWithFixedDelay,這兩個方法以特定的時間間隔重複地執行特定任務。

[color=orange]線程池[/color]

大部分java.util.concurrent中的executor實作類使用了工作線程的(worker threads)線程池。工作線程不屬于特定某個Runnable和Callable任務,經常用來執行多個任務。

使用工作線程可以減少建立線程的資源浪費。Thread對象占用了大量的記憶體,在一個大型應用中,配置設定和釋放線程對象會造成大量的線程管理開支(overhead)。

一種常用的線程池是固定大小線程池(the fixed thread pool)。這種線程池有特定數量的線程在運作;如果一個線程在使用過程中意外停止(譯注:如抛出未捕獲的異常),它會自動被另一個新線程替代。任務通過一個内部的任務隊列送出給線程池執行,此任務隊列可以在活動任務數量大于線程池中工作線程個數時,存儲多餘的活動任務。

“固定大小線程池”的一個好處是”優雅的緩沖”。考慮一個web應用服務,它的每一個線程隻處理一個HTTP請求。如果這個應用簡單地為每一個新來的請求建立一個新的處理線程,那麼,當請求數量足夠多時,線程占用的資源總和将超過系統的承受能力,伺服器會是以忽然停止對所有請求的應答(譯注:常見的記憶體溢出)。使用固定大小線程池後,即使請求數量超出工作線程能夠處理的請求上限,但是新來的HTTP請求會被暫時存放在消息隊列中,當出現空閑的工作線程後,這些HTTP請求就會得到及時的處理。

建立一個使用固定線程池的executor的方法是調用 java.util.concurrent.Executors工廠類(譯注:它是所有ExecutorService的工廠)的newFixedThreadPool方法 。此類也提供了下面的工廠方法:

[list][*]newCachedThreadPool方法建立了一個使用可擴充線程池的executor。擁有這種線程池的executor适合執行生命周期較短的任務。

[*]newSingleThreadExecutor方法建立了一個隻有一個工作線程的executor。

[*]還有一些工廠方法提供以上executor的ScheduledExecutorService版本。[/list]

如果以上工廠方法都無法滿足你的需要, java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor 将為你提供更多選擇。

[color=orange]Fork/Join[/color]

[color=red][i](譯者聲明:本節沒有做過實踐,可能有些了解會有問題,個人覺得有點MapReduce的意思)[/i][/color]

Java SE 7中的新特性,fork/join架構幫助你建立多程序應用。它被設計用來完成可以分成很多小程序的工作。目的是使用所有可用的程序來提升你的應用的性能。

像任何ExecutorService一樣,fork/join架構把任務分發給線程池中工作線程。不同的是,因為fork/join架構使用work-stealing 算法。完成工作的工作線程可以從其他還在忙碌的線程那裡偷任務來執行。

fork/join架構的核心是ForkJoinPool類,一個AbstractExecutorService的擴充類。ForkJoinPool實作了核心的work-stealing算法,能夠執行ForkJoinTask任務。

[color=red]基本應用[/color]

使用fork/join架構很簡單。第一步是寫一些代碼來執行工作的一部分。你的代碼應該看起來像這樣:

把以上代碼封裝成ForkJoinTask子類,特别地作為更具體的類型RecursiveTask(可以傳回結果)或 RecursiveAction。

[color=red]Blurring for Clarity[/color]

為了幫助你了解fork/join 架構是如何工作的,考慮一個簡單的例子。假設你想對一張圖檔模糊處理。原圖檔用一個整數數組表示,每個整數包含了對描述像素的顔色值。模糊化的目标圖檔也是由相同大小的整數數組表示。

模糊化通過不斷重複一次修改一個像素值來完成。每個像素值被修改為周圍像素的平均值(紅,綠,藍都被平均),結果被放在目标數組中。下面是可能的實作:

現在你實作抽象方法compute(),可以直接做模糊化處理,也可以将它分成小的任務。一個簡單的方法是通過數組的長度來判斷工作是要直接進行或者分割。

完整的代碼中,還要包含其他的如顯示原圖檔和目标圖檔的代碼,詳情請看ForkBlur類。

[color=blue]并發集合(Concurrent Collections)[/color]

Java.util.concurrent 包裡有很多對Java集合架構(Java Collections Framework)的擴充。下面這些是下面這些是最容易分類的集合接口:

[list][*]BlockingQueue 定義了一個先進先出的資料結構,當你試圖将資料添加到一個已滿的隊列,或者從空隊列中取資料時,将會阻塞或者逾時退出。

[*]ConcurrentMap 是java.util.Map的子接口,定義了很多有用的自動的操作。隻有當鍵值存在時,才能替換或删除一個鍵值對,或者隻有當鍵值不存在(absent)時才能添加一個鍵值對。使這些操作自動化幫助避免同步(synchronization)。對ConcurrentMap标準的普适實作是ConcurrentHashMap,它是HashMap的并發衍生類。

[*]ConcurrentNavigableMap 是一個ConcurrentMap的子接口,它支援模糊比對(approximate matches)。标準的對ConcurrentnavigableMap的普适實作是ConcurrentSkipListMap,它是TreeMap并發衍生類。

[/list]

所有的這些集合通過建立對集合中元素操作(增删改查)的先後關系(happens-before relationship),來幫助我們避免記憶體一緻性錯誤。

[color=blue]原子變量(Atomic Variables)[/color]

Java.util.concurrent.atomic 包中定義了對單個變量的原子操作支援。包中的所有的類的getter和setter,都像對volatile變量操作一樣,具有原子性。那意味着,一個set操作與後續的get操作存在絕對的先後關系。

為了看到原子變量是如何使用的,我們回到我們原來用來描述線程幹擾的Counter類:

class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }

}
           

讓Counter安全運算不發生線程幹擾的方法是像在SynchronizedCounter同步它的方法:

對于這個簡單地類,synchronization是一個可以接受的解決方案。但是對于更複雜的類,我們就要避免不必要的synchronization對線程活性的影響。将int型字段替換為AtomicInteger讓我們可以不需要借助synchronization就可以避免線程幹擾,就像在AtomicCounter中這樣:

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }

}
           

[color=blue]并發随機數[/color]

在JDK7中,java.util.concurrent 包含了一個友善應用程式使用的類,ThreadLocalRandom,可以将它用來在多線程或ForJoinTask中獲得随機數。

在并發環境中,使用ThreadLocalRandom替換Math.random()可以減少沖突,更可以提高性能。

你所做的隻是調用ThreadLocalRandom.curretn(),然後調用它的方法中的一個來獲得随機數。這裡是一個例子:

[color=blue][size=medium]擴充閱讀[/size][/color]

[list][*]Concurrent Programming in Java: Design Principles and Pattern (2nd Edition) by Doug Lea. A comprehensive work by a leading expert, who's also the architect of the Java platform's concurrency framework.

[*]Java Concurrency in Practice by Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea. A practical guide designed to be accessible to the novice.

[*]Effective Java Programming Language Guide by Joshua Bloch. Though this is a general programming guide, its chapter on threads contains essential "best practices" for concurrent programming.

[*]Concurrency: State Models & Java Programs (2nd Edition), by Jeff Magee and Jeff Kramer. An introduction to concurrent programming through a combination of modeling and practical examples.

[*]Java Concurrent Animated: Animations that show usage of concurrency features.[/list]