一、多線程
程序:一個計算機程式的運作執行個體,包含了需要執行的指令;有自己的獨立位址空間,包含程式内容和資料;不同程序的位址空間是互相隔離的;程序擁有各種資源和狀态資訊,包括打開的檔案、子程序和信号處理。
線程:表示程式的執行流程,是cpu排程執行的基本機關;線程有自己的程式計數器、寄存器、堆棧和幀。同一程序中的線程共用相同的位址空間,同時共享進程序鎖擁有的記憶體和其他資源。
表示線程的是java.lang.thread類,在虛拟機啟動之後,通常隻有java類的main方法這個普通線程運作,運作時可以建立和啟動新的線程;還有一類守護線程(damon thread),守護線程在背景運作,提供程式運作時所需的服務。當虛拟機中運作的所有線程都是守護線程時,虛拟機終止運作。
3、線程間的可見性:一個線程對程序中共享的資料的修改,是否對另一個線程可見
可見性問題:
public class idgenerator{
private int value = 0;
public int getnext(){
return value++;
}
}
對于idgenerator的getnext()方法,在多線程下不能保證傳回值是不重複的:各個線程之間互相競争cpu時間來擷取運作機會,cpu切換可能發生在執行間隙。
以上代碼getnext()的指令序列:cpu切換可能發生在7條指令之間,多個getnext的指令交織在一起。
aload_0
dup
getfield #12
dup_x1
iconst_1
iadd
putfield #12
b、cpu緩存:
c、指令順序重排
出行性能考慮,編譯器在編譯時可能會對位元組代碼的指令順序進行重新排列,以優化指令的執行順序,在單線程中不會有問題,但在多線程可能産生與可見性相關的問題。
屏蔽了cpu緩存等細節,隻關注主存中的共享變量;關注對象的執行個體域、靜态域和數組元素;關注線程間的動作。
1、volatile關鍵詞:用來對共享變量的通路進行同步,上一次寫入操作的結果對下一次讀取操作是肯定可見的。(在寫入volatile變量值之後,cpu緩存中的内容會被寫回記憶體;在讀取volatile變量時,cpu緩存中的對應内容會被置為失效,重新從主存中進行讀取),volatile不使用鎖,性能優于synchronized關鍵詞。
用來確定對一個變量的修改被正确地傳播到其他線程中。
例子:a線程是worker,一直跑循環,b線程調用setdone(true),a線程即停止任務
public class worker{
private volatile boolean done;
public void setdone(boolean done){
this.done = done;
public void work(){
while(!done){
//執行任務;
}
例子:錯誤使用。因為沒有鎖的支援,volatile的修改不能依賴于目前值,目前值可能在其他線程中被修改。(worker是直接賦新值與目前值無關)
public class counter {
public volatile static int count = 0;
public static void inc() {
//這裡延遲1毫秒,使得結果明顯
try {
thread.sleep(1);
} catch (interruptedexception e) {
}
count++;
}
public static void main(string[] args) {
//同時啟動1000個線程,去進行i++計算,看看實際結果
for (int i = 0; i < 1000; i++) {
new thread(new runnable() {
@override
public void run() {
counter.inc();
}
}).start();
//這裡每次運作的值都有可能不同,可能不為1000
system.out.println("運作結果:counter.count=" + counter.count);
2、final關鍵詞
final關鍵詞聲明的域的值隻能被初始化一次,一般在構造方法中初始化。。(在多線程開發中,final域通常用來實作不可變對象)
當對象中的共享變量的值不可能發生變化時,在多線程中也就不需要同步機制來進行處理,故在多線程開發中應盡可能使用不可變對象。
另外,在代碼執行時,final域的值可以被儲存在寄存器中,而不用從主存中頻繁重新讀取。
3、java基本類型的原子操作
1)基本類型,引用類型的複制引用是原子操作;(即一條指令完成)
2)long與double的指派,引用是可以分割的,非原子操作;
3)要線上程間共享long或double的字段時,必須在synchronized中操作,或是聲明成volatile
三、java提供的線程同步方式
1、synchronized關鍵字
方法或代碼塊的互斥性來完成實際上的一個原子操作。(方法或代碼塊在被一個線程調用時,其他線程處于等待狀态)
所有的java對象都有一個與synchronzied關聯的螢幕對象(monitor),允許線程在該螢幕對象上進行加鎖和解鎖操作。
a、靜态方法:java類對應的class類的對象所關聯的螢幕對象。
b、執行個體方法:目前對象執行個體所關聯的螢幕對象。
c、代碼塊:代碼塊聲明中的對象所關聯的螢幕對象。
注:當鎖被釋放,對共享變量的修改會寫入主存;當活得鎖,cpu緩存中的内容被置為無效。編譯器在處理synchronized方法或代碼塊,不會把其中包含的代碼移動到synchronized方法或代碼塊之外,進而避免了由于代碼重排而造成的問題。
例:以下方法getnext()和getnextv2() 都獲得了目前執行個體所關聯的螢幕對象
public class synchronizedidgenerator{
public synchronized int getnext(){
public int getnextv2(){
synchronized(this){
return value++;
2、object類的wait、notify和notifyall方法
生産者和消費者模式,判斷緩沖區是否滿來消費,緩沖區是否空來生産的邏輯。如果用while 和 volatile也可以做,不過本質上會讓線程處于忙等待,占用cpu時間,對性能造成影響。
wait: 将目前線程放入,該對象的等待池中,線程a調用了b對象的wait()方法,線程a進入b對象的等待池,并且釋放b的鎖。(這裡,線程a必須持有b的鎖,是以調用的代碼必須在synchronized修飾下,否則直接抛出java.lang.illegalmonitorstateexception異常)。
notify:将該對象中等待池中的線程,随機選取一個放入對象的鎖池,當目前線程結束後釋放掉鎖, 鎖池中的線程即可競争對象的鎖來獲得執行機會。
notifyall:将對象中等待池中的線程,全部放入鎖池。
(notify鎖喚醒的線程選擇由虛拟機實作來決定,不能保證一個對象鎖關聯的等待集合中的線程按照所期望的順序被喚醒,很可能一個線程被喚醒之後,發現他所要求的條件并沒有滿足,而重新進入等待池。因為當等待池中包含多個線程時,一般使用notifyall方法,不過該方法會導緻線程在沒有必要的情況下被喚醒,之後又馬上進入等待池,對性能有影響,不過能保證程式的正确性)
工作流程:
a、consumer線程a 來 看産品,發現産品為空,調用産品對象的wait(),線程a進入産品對象的等待池并釋放産品的鎖。
b、producer線程b獲得産品的鎖,執行産品的notifyall(),consumer線程a從産品的等待池進入鎖池,producer線程b生産産品,然後退出釋放鎖。
c、consumer線程a獲得産品鎖,進入執行,發現有産品,消費産品,然後退出。
例子:
public synchronized string pop(){
this.notifyall();// 喚醒對象等待池中的所有線程,可能喚醒的就是 生産者(當生産者發現産品滿,就會進入對象的等待池,這裡代碼省略,基本略同)
while(index == -1){//如果發現沒産品,就釋放鎖,進入對象等待池
this.wait();
}//當生産者生産完後,消費者從this.wait()方法再開始執行,第一次還會執行循環,萬一産品還是為空,則再等待,是以這裡必須用while循環,不能用if
string good = buffer[index];
buffer[index] = null;
index--;
return good;// 消費完産品,退出。
注:wait()方法有逾時和不逾時之分,逾時的在經過一段時間,線程還在對象的等待池中,那麼線程也會推出等待狀态。
3、線程狀态轉換:
已經廢棄的方法:stop、suspend、resume、destroy,這些方法在實作上時不安全的。
線程的狀态:new、runnable、blocked、waiting、timed_waiting(有逾時的等待)、terminated。
a、方法sleep()進入的阻塞狀态,不會釋放對象的鎖(即大家一起睡,誰也别想執行代碼),是以不要讓sleep方法處在synchronized方法或代碼塊中,否則造成其他等待擷取鎖的線程長時間處于等待。
b、方法join()則是主線程等待子線程完成,再往下執行。例如main方法建立兩個線程a和b
public static void main(string[] args) throws interruptedexception {
thread t1 = new thread(new threadtestera());
thread t2 = new thread(new threadtesterb());
t1.start();
t1.join(); // 等t1執行完再往下執行
t2.start();
t2.join(); // 在虛拟機執行中,這句可能被忽略
c、方法interrupt(),向被調用的對象線程發起中斷請求。如線程a通過調用線程b的d的interrupt方法來發出中斷請求,線程b來處理這個請求,當然也可以忽略,這不是必須的。object類的wait()、thread類的join()和sleep方法都會抛出受檢異常java.lang.interruptedexception,通過interrupt方法中斷該線程會導緻線程離開等待狀态。對于wait()調用來說,線程需要重新擷取螢幕對象上的鎖之後才能抛出interruptedexception異常,并緻以異常的處理邏輯。
可以通過thread類的isinterrupted方法來判斷是否有中斷請求發生,通常可以利用這個方法來判斷是否退出線程(類似上面的volatitle修飾符的例子);
thread類還有個方法interrupted(),該方法不但可以判斷目前線程是否被中斷,還會清楚線程内部的中斷标記,如果傳回true,即曾被請求中斷,同時調用完後,清除中斷标記。
如果一個線程在某個對象的等待池,那麼notify和interrupt 都可以使該線程從等待池中被移除。如果同時發生,那麼看實際發生順序。如果是notify先,那照常喚醒,沒影響。如果是interrupt先,并且虛拟機選擇讓該線程中斷,那麼即使nofity,也會忽略該線程,而喚醒等待池中的另一個線程。
四、非阻塞方式
線程之間同步機制的核心是監視對象上的鎖,競争鎖來獲得執行代碼的機會。當一個對象擷取對象的鎖,然後其他嘗試擷取鎖的對象會處于等待狀态,這種鎖機制的實作方式很大程度限制了多線程程式的吞吐量和性能(線程阻塞),且會帶來死鎖(線程a有a對象鎖,等着擷取b對象鎖,線程b有b對象鎖,等待擷取a對象鎖)和優先級倒置(優先級低的線程獲得鎖,優先級高的隻能等待對方釋放鎖)等問題。
如果能不阻塞線程,又能保證多線程程式的正确性,就能有更好的性能。
在程式中,對共享變量的使用一般遵循一定的模式,即讀取、修改和寫入三步組成。之前碰到的問題是,這三步執行中可能線程執行切換,造成非原子操作。鎖機制是把這三步變成一個原子操作。
目前cpu本身實作 将這三步 合起來 形成一個原子操作,無需線程鎖機制幹預,常見的指令是“比較和替換”(compare and swap,cas),這個指令會先比較某個記憶體位址的目前值是不是指定的舊指,如果是,就用新值替換,否則什麼也不做,指令傳回的結果是記憶體位址的目前值。通過cas指令可以實作不依賴鎖機制的非阻塞算法。一般做法是把cas指令的調用放在一個無限循環中,不斷嘗試,知道cas指令成功完成修改。
java.util.concurrent.atomic包中提供了cas指令。(不是所有cpu都支援cas,在某些平台,java.util.concurrent.atomic的實作仍然是鎖機制)
atomic包中提供的java類分成三類:
1、支援以原子操作來進行更新的資料類型的java類(atomicboolean、atomicinteger、atomicreference),在記憶體模型相關的語義上,這四個類的對象類似于volatile變量。
類中的常用方法:
a、compareandset:接受兩個參數,一個是期望的舊值,一個是替換的新值。
b、weakcompareandset:效果同compareandset(jsr中表示weak原子方式讀取和有條件地寫入變量但不建立任何
happen-before 排序,但在源代碼中和compareandset完全一樣,是以并沒有按jsr實作)
c、get和set:分别用來直接擷取和設定變量的值。
d、lazyset:與set類似,但允許編譯器把lazyset方法的調用與後面的指令進行重排,是以對值得設定操作有可能被推遲。
例:
public class atomicidgenerator{
private final atomicinter counter = new atomicinteger(0);
return counter.getandincrement();
// getandincrement方法的内部實作方式,這也是cas方法的一般模式,cas方法不一定成功,是以包裝在一個無限循環中,直到成功
public final int getandincrement(){
for(;;){
int current = get();
int next = current +1;
if(compareandset(current,next))
return current;
2、提供對數組類型的變量進行處理的java類,atomicintegerarray、atomiclongarray和atomicreferencearray類。(同上,隻是放在類數組裡,調用時也隻是多了一個操作元素索引的參數)
3、通過反射的方式對任何對象中包含的volatitle變量使用cas方法,atomicintegerfieldupdater、atomiclongfieldupdater、atomicreferencefieldupdater。他們提供了一種方式把cas的功能擴充到了任何java類中聲明為volatitle的域上。(靈活,但語義較弱,因為對象的volatitle可能被非atomic的其他方式被修改)
public class treenode{
private volatile treenode parent;
// 靜态工廠方法
private static final atomicreferencefieldupdater<treenode, treenode> parentupdater = atomicreferencefieldupdater.newupdater(treenode.class,treenode.class,"parent");
public boolean compareandsetparent(treenode expect, treenode update){
return parentupdater.compareandset(this, expect, update);
注:java.util.concurrent.atomic包中的java類屬于比較底層的實作,一般作為java.util.concurrent包中很多非阻塞的資料結構的實作基礎。
比較多的用atomicboolean、atomicinteger、atomiclong和atomicreference。在實作線程安全的計數器時,atomicinteger和atomiclong類時最佳的選擇。
五、進階同步機制(比synchronized更靈活的加鎖機制)
synchronized和volatile,以及wait、notify等方法抽象層次低,在程式開發中使用比較繁瑣,易出錯。
而多線程之間的互動來說,存在某些固定的模式,如生産者-消費者和讀者-寫者模式,把這些模式抽象成高層api,使用起來會非常友善。
java.util.concurrent包為多線程提供了高層的api,滿足日常開發中的常見需求。
常用接口
1、lock接口,表示一個鎖方法:
a、lock(),擷取所,如果無法擷取所鎖,會處于等待狀态
b、unlock(),釋放鎖。(一般放在finally代碼塊中)
c、lockinterruptibly(),與lock()類似,但允許目前線程在等待擷取鎖的過程中被中斷。(是以要處理interruptedexception)
d、trylock(),以非阻塞方式擷取鎖,如果無法擷取鎖,則傳回false。(trylock()的另一個重載可以指定逾時,如果指定逾時,當無法擷取鎖,會等待而阻塞,同時線程可以被中斷)
2、readwritelock接口,表示兩個鎖,讀取的共享鎖和寫入的排他鎖。(适合常見的讀者--寫者場景)
readwritelock接口的readlock和writelock方法來擷取對應的鎖的lock接口的實作。
如果是相反的情況,較多的線程寫入,則接口會降低性能。
3、reentrantlock類和reentrantreadwritelock,分别為上面兩個接口的實作類。
他們具有重入性:即允許一個線程多次擷取同一個鎖(他們會記住上次擷取鎖并且未釋放的線程對象,和加鎖的次數,getholdcount())
同一個線程每次擷取鎖,加鎖數+1,每次釋放鎖,加鎖數-1,到0,則該鎖被釋放,可以被其他線程擷取。
public class lockidgenrator{
//new reentrantlock(true)是重載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的線程,避免一些線程長期無法獲得鎖
private int reentrantlock lock = reentrantlock();
privafte int value = 0;
lock.lock(); //進來就加鎖,沒有鎖會等待
try{
return value++;//實際操作
}finally{
lock.unlock();//釋放鎖
注:重入性減少了鎖在各個線程之間的等待,例如便利一個hashmap,每次next()之前加鎖,之後釋放,可以保證一個線程一口氣完成便利,而不會每次next()之後釋放鎖,然後和其他線程競争,降低了加鎖的代價, 提供了程式整體的吞吐量。(即,讓一個線程一口氣完成任務,再把鎖傳遞給其他線程)。
4、condition接口,lock接口代替了synchronized,condition接口替代了object的wait、nofity。
a、await(),使目前線程進入等待狀态,知道被喚醒或中斷。重載形式可以指定逾時時間。
b、awaitnanos(),以納秒為機關等待。
c、awaituntil(),指定逾時發生的時間點,而不是經過的時間,參數為java.util.date。
d、awaituninterruptibly(),前面幾種會響應其他線程發出的中斷請求,他會無視,直到被喚醒。
注:與object類的wait()相同,await()會釋放其所持有的鎖。
e、signal()和signalall, 相當于 notify和notifyall
lock lock = new reentrantlock();
condition condition = lock.newcondition();
lock.lock();
try{
while(/*邏輯條件不滿足*/){
condition.await();
}finally{
lock.unlock();
六、底層同步器
多線程程式中,線程之間存在多種不同的同步方式。除了java标準庫提供的同步方式之外,程式中特有的同步方式需要由開發人員自己來實作。
常見的一種需求是 對有限個共享資源的通路,比如多台個人電腦,2台列印機,當多個線程在等待同一個資源時,從公平角度出發,會用fifo隊列。
如果程式中的同步方式可以抽象成對有限個資源的通路,那麼可以使用java.util.concurrent.locks包中的abstractqueuedsynchronizer類和abstractqueuedlongsynchronizer類作為實作的基礎,前者用int類型的變量來維護内部狀态,而後者用long類型。(可以将這個變量了解為共享資源個數)
通過getstate、setstate、和compareandsetstate3個方法更新内部變量的值。
abstractqueuedsynchronizer類是abstract的,需要覆寫其中包含的部分方法,通常做法是把其作為一個java類的内部類,外部類提供具體的同步方式,内部類則作為實作的基礎。有兩種模式,排他模式和共享模式,分别對應方法 tryacquire()、tryrelease 和 tryacquireshared、tryreleaseshared,在這些方法中,使用getstate、setstate、compareandsetstate3個方法來修改内部變量的值,以此來反應資源的狀态。
public class simpleresourcemanager{
private final innersynchronizer synchronizer;
private static class innersynchronizer extends abstractqueuedsynchronizer{
innersynchronizer(int numofresources){
setstate(numofresources);
protected int tryacquireshared(int acquires){
for(;;){
int available = getstate();
int remain = available - acquires;
if(remain <0 || comapreandsetstate(available, remain){
return remain;
}
}
protected boolean try releaseshared(int releases){
int available = getstate();
int next = available + releases;
if(compareandsetstate(available,next){
return true;
public simpleresourcemanager(int numofresources){
synchronizer = new innersynchronizer(numofresources);
public void acquire() throws interruptedexception{
synchronizer.acquiresharedinterruptibly(1);
}
pubic void release(){
synchronizer.releaseshared(1);
七、進階同步對象(提高開發效率)
atomic和locks包提供的java類可以滿足基本的互斥和同步通路的需求,但這些java類的抽象層次較低,使用比較複雜。
更簡單的做法是使用java.util.concurrent包中的進階同步對象。
1、信号量。
信号量一般用來數量有限的資源,每類資源有一個對象的信号量,信号量的值表示資源的可用數量。
在使用資源時,需要從該信号量上擷取許可,成功擷取許可,資源的可用數-1;完成對資源的使用,釋放許可,資源可用數+1; 當資源數為0時,需要擷取資源的線程以阻塞的方式來等待資源,或過段時間之後再來檢查資源是否可用。(上面的simpleresourcemanager類實際上時信号量的一個簡單實作)
java.util.concurrent.semaphore類,在建立semaphore類的對象時指定資源的可用數
a、acquire(),以阻塞方式擷取許可
b、tryacquire(),以非阻塞方式擷取許可
c、release(),釋放許可。
d、accquireuninterruptibly(),accquire()方法擷取許可以的過程可以被中斷,如果不希望被中斷,使用此方法。
public class printermanager{
private final semphore semaphore;
private final list<printer> printers = new arraylist<>():
public printermanager(collection<? extends printer> printers){
this.printers.addall(printers);
//這裡重載方法,第二個參數為true,以公平競争模式,防止線程饑餓
this.semaphore = new semaphore(this.printers.size(),true);
public printer acquireprinter() throws interruptedexception{
semaphore.acquire();
return getavailableprinter();
public void releaseprinter(printer printer){
putbackprinter(pinter);
semaphore.release();
private synchronized printer getavailableprinter(){
printer result = printers.get(0);
printers.remove(0);
return result;
private synchronized void putbackprinter(printer printer){
printers.add(printer);
2、倒數閘門
多線程協作時,一個線程等待另外的線程完成任務才能繼續進行。
java.util.concurrent.countdownlatch類,建立該類時,指定等待完成的任務數;當一個任務完成,調用countdonw(),任務數-1。等待任務完成的線程通過await(),進入阻塞狀态,直到任務數量為0。countdownlatch類為一次性,一旦任務數為0,再調用await()不再阻塞目前線程,直接傳回。
public class pagesizesorter{
// 并發性能遠遠優于hashtable的 map實作,hashtable做任何操作都需要獲得鎖,同一時間隻有有個線程能使用,而concurrenthashmap是分段加鎖,不同線程通路不同的資料段,完全不受影響,忘記hashtable吧。
private static final concurrenthashmap<string , interger> sizemap = new concurrenthashmap<>();
private static class getsizeworker implements runnable{
private final string urlstring;
public getsizeworker(string urlstring , countdownlatch signal){
this.urlstring = urlstirng;
this.signal = signal;
public void run(){
try{
inputstream is = new url(urlstring).openstream();
int size = ioutils.tobytearray(is).length;
sizemap.put(urlstring, size);
}catch(ioexception e){
sizemap.put(urlstring, -1);
}finally{
signal.countdown()://完成一個任務 , 任務數-1
private void sort(){
list<entry<string, integer> list = new arraylist<sizemap.entryset());
collections.slort(list, new comparator<entry<string,integer>>(){
public int compare (entry<string, integer> o1, entry<sting , integer> o2){
return integer.compare(o2.getvalue(),o1.getvalue());
};
system.out.println(arrays.deeptostring(list.toarray()));
public void sortpagesize(collection<string> urls) throws interruptedexception{
countdownlatch sortsignal = new countdownlatch(urls.size());
for(string url: urls){
new thread(new getsizeworker(url, sortsignal)).start();
sortsignal.await()://主線程在這裡等待,任務數歸0,則繼續執行
sort();
3、循環屏障
循環屏障在作用上類似倒數閘門,不過他不像倒數閘門是一次性的,可以循環使用。另外,線程之間是互相平等的,彼此都需要等待對方完成,當一個線程完成自己的任務之後,等待其他線程完成。當所有線程都完成任務之後,所有線程才可以繼續運作。
當線程之間需要再次進行互相等待時,可以複用同一個循環屏障。
類java.uti.concurrent.cyclicbarrier用來表示循環屏障,建立時指定使用該對象的線程數目,還可以指定一個runnable接口的對象作為每次循環後執行的動作。(當最後一個線程完成任務之後,所有線程繼續執行之前,被執行。如果線程之間需要更新一些共享的内部狀态,可以利用這個runnalbe接口的對象來處理)。
每個線程任務完成之後,通過調用await方法進行等待,當所有線程都調用await方法之後,處于等待狀态的線程都可以繼續執行。在所有線程中,隻要有一個在等待中被中斷,逾時或是其他錯誤,整個循環屏障會失敗,所有等待中的其他線程抛出java.uti.concurrent.brokenbarrierexception。
例:每個線程負責找一個數字區間的質數,當所有線程完成後,如果質數數目不夠,繼續擴大範圍查找
public class primenumber{
private static final int total_coutn = 5000;
private static final int range_length= 200;
private static final int worker_number = 5;
private static volatitle boolean done = false;
private static int rangecount = 0;
private static final list<long> results = new arraylist<long>():
private static final cyclicbarrier barrier = new cyclicbarrier(worker_number, new runnable(){
if(results.size() >= total_count){
done = true;
}
});
private static class primefinder implements runnable{
while(!done){// 整個過程在一個 while循環下,await()等待,下次循環開始,會再次判斷 執行條件
int range = getnextrange();
long start = rang * range_length;
long end = (range + 1) * range_length;
for(long i = start; i<end;i++){
if(isprime(i)){
updateresult(i);
}
try{
barrier.await();
}catch (interruptedexception | bokenbarrierexception e){
done = true;
private synchronized static void updateresult(long value){
results.add(value);
private synchronized static int getnextrange(){
return rangecount++;
private static boolean isprime(long number){
//找質數的代碼
public void calculate(){
for(int i=0;i<worker_number;i++){
new thread(new primefinder()).start();
//計算完成
4、對象交換器
适合于兩個線程需要進行資料交換的場景。(一個線程完成後,把結果交給另一個線程繼續處理)
java.util.concurrent.exchanger類,提供了這種對象交換能力,兩個線程共享一個exchanger類的對象,一個線程完成對資料的處理之後,調用exchanger類的exchange()方法把處理之後的資料作為參數發送給另外一個線程。而exchange方法的傳回結果是另外一個線程鎖提供的相同類型的對象。如果另外一個線程未完成對資料的處理,那麼exchange()會使目前線程進入等待狀态,直到另外一個線程也調用了exchange方法來進行資料交換。
public class sendandreceiver{
private final exchanger<stringbuilder> exchanger = new exchanger<stringbuilder>();
private class sender implements runnable{
stringbuilder content = new stringbuilder("hello");
content = exchanger.exchange(content);
}catch(interruptedexception e){
thread.currentthread().interrupt();
private class receiver implements runnable{
stringbuilder content = new stringbuilder("world");
public void exchange(){
new thread(new sender()).start();
new thread(new receiver()).start();
八、資料結構(多線程程式使用的高性能資料結構)
java.util.concurrent包中提供了一些适合多線程程式使用的高性能資料結構,包括隊列和集合類對象等。
1、隊列
a、blockingqueue接口:線程安全的阻塞式隊列;當隊列已滿時,想隊列添加會阻塞;當隊列空時,取資料會阻塞。(非常适合消費者-生産者模式)
阻塞方式:put()、take()。
非阻塞方式:offer()、poll()。
實作類:基于數組的固定元素個數的arraybolockingqueue和基于連結清單結構的不固定元素個數的linkedblockqueue類。
b、blockingdeque接口: 與blockingqueue相似,但可以對頭尾進行添加和删除操作的雙向隊列;方法分為兩類,分别在隊首和對尾進行操作。
實作類:标準庫值提供了一個基于連結清單的實作,linkedblockgingdeque。
2、集合類
在多線程程式中,如果共享變量時集合類的對象,則不适合直接使用java.util包中的集合類。這些類要麼不是線程安全,要麼在多線程下性能比較差。
應該使用java.util.concurrent包中的集合類。
a、concurrentmap接口: 繼承自java.util.map接口
putifabsent():隻有在散清單不包含給定鍵時,才會把給定的值放入。
remove():删除條目。
replace(key,value):把value 替換到給定的key上。
replace(key, oldvalue, newvalue):cas的實作。
實作類:concurrenthashmap:
建立時,如果可以預估可能包含的條目個數,可以優化性能。(因為動态調整所能包含的數目操作比較耗時,這個hashmap也一樣,隻是多線程下更耗時)。
建立時,預估進行更新操作的線程數,這樣實作中會根據這個數把内部空間劃分為對應數量的部分。(預設是16,如果隻有一個線程進行寫操作,其他都是讀取,那麼把值設為1 可以提高性能)。
注:當從集合中建立出疊代器周遊map元素時,不一定能看到正在添加的資料,隻能和集合保證弱一緻性。(當然使用疊代器不會因為檢視正在改變的map,而抛出java.util.concurrentmodifycationexception)
b、copyonwritearraylist接口:繼承自java.util.list接口。
顧名思義,在copyonwritearraylist的實作類,所有對清單的更新操作都會新建立一個底層數組的副本,并使用副本來存儲資料;對清單更新操作加鎖,讀取操作不加鎖。
适合多讀取少修改的場景,如果更新操作多,那麼不适合用,同樣疊代器隻能表示建立時清單的狀态,更新後使用了新的底層數組,疊代器還是引用舊的底層數組。
九、多線程任務的執行
過去線程的執行,是先建立thread類的想,再調用start方法啟動,這種做法要求開發人員對線程進行維護,線上程較多時,一般建立一個線程池同一管理,同時降低重複建立線程的開銷
在j2se5.0中,java.util.concurrent包提供了豐富的用來管理線程和執行任務的實作。
1、基本接口(描述任務)
a、callable接口:
runnable接口受限于run方法的類型簽名,而callable隻有一個方法call(),可以有傳回值,可以抛出受檢異常。
b、future接口:
過去,需要異步線程的任務執行結果,要求主線程和任務執行線程之間進行同步和資料傳遞。
future簡化了任務的異步執行,作為異步操作的一個抽象。調用get()方法可以擷取異步的執行結果,如果任務沒有執行完,會等待,直到任務完成或被取消,cancel()可以取消。
c、delayed接口:
延遲執行任務,getdelay()傳回目前剩餘的延遲時間,如果不大于0,說明延遲時間已經過去,應該排程并執行該任務。
2、組合接口(描述任務)
a、runnablefuture接口:繼承自runnable接口和future接口。
當來自runnalbe接口中的run方法成功執行之後,相當于future接口表示的異步任務已經完成,可以通過get()擷取運作結果。
b、scheduledfuture接口:繼承future接口和delayed接口,表示一個可以調用的異步操作。
c、runnablescheduledfuture接口:繼承自runnable、delayed和future,接口中包含isperiodic,表明該異步操作是否可以被重複執行。
3、executor接口、excutorserver接口、scheduleexecutorservice接口和completionservice接口(描述任務執行)
a、executor接口,execute()用來執行一個runnable接口的實作對象,不同的executor實作采取不同執行政策,但提供的任務執行功能比較弱。
b、excutorserver接口,繼承自executor;
提供了對任務的管理:submit(),可以吧callable和runnable作為任務送出,得到一個future作為傳回,可以擷取任務結果或取消任務。
提供批量執行:invokeall()和invokeany(),同時送出多個callable;invokeall(),會等待所有任務都執行完成,傳回一個包含每個任務對應future的清單;invokeany(),任何一個任務成功完成,即傳回該任務結果。
提供任務關閉:shutdown()、shutdownnow()來關閉服務,前者不允許新的任務送出,後者試圖終止正在運作和等待的任務,并傳回已經送出單沒有被運作的任務清單。(兩個方法都不會等待服務真正關閉,隻是發出關閉請求。)。shutdowndow,通常做法是向線程發出中斷請求,是以確定送出的任務實作了正确的中斷處理邏輯。
c、scheduleexecutorservice接口,繼承自excutorserver接口:支援任務的延遲執行和定期執行,可以執行callable或runnable。
schedule(),排程一個任務在延遲若幹時間之後執行;
scheduleatfixedrate():在初始延遲後,每隔一段時間循環執行;在下一次執行開始時,上一次執行可能還未結束。(同一時間,可能有多個)
schedulewithfixeddelay:同上,隻是在上一次任務執行完後,經過給定的間隔時間再開始下一次執行。(同一時間,隻有一個)
以上三個方法都傳回scheduledfuture接口的實作對象。
d、completionservice接口,共享任務執行結果。
通常在使用executorservice接口,通過submit送出任務,并得到一個future接口來擷取任務結果,如果任務送出者和執行結果的使用者是程式的不同部分,那就要把future在不同部分進行傳遞;而completionservice就是解決這個問題,程式不同部分可以共享completionservice,任務送出後,執行結果可以通過take(阻塞),poll(非阻塞)來擷取。
标準庫提供的實作是 executorcompletionservice,在建立時,需要提供一個executor接口的實作作為參數,用來實際執行任務。
例:多線程方式下載下傳檔案
public class filedownloader{
// 線程池
private final executorservice executor = executors.newfixedthreadpool(10);
public boolean download(final url url, final path path){
future<path> future = executor.submit(new callable<path>(){ //submit送出任務
public path call(){
//這裡就省略ioexception的處理了
inputstream is = url.openstream();
files.copy(is, path, standardcopyoption.replace_existing);
return path;
});
return future.get() !=null ? true : false;
}<span style="font-family: arial, helvetica, sans-serif;">catch(interruptedexception | executionexception e){</span>
return false;
public void close(){//當不再使用filedownloader類的對象時,應該使用close方法關閉其中包含的executorservice接口的實作對象,否則虛拟機不會退出,占用記憶體不釋放
executor.shutdown();// 發出關閉請求,此時不會再接受新任務
if(!executor.awaittermination(3, timeunit.minutes)){// awaittermination 來等待一段時間,使正在執行的任務或等待的任務有機會完成
executor.shutdownnow();// 如果等待時間過後還有任務沒完成,則強制結束
executor.awaittermination(1, timeunit.minutes);// 再等待一段時間,使被強制結束的任務完成必要的清理工作
}catch(interruptedexception e){
executor.shutdownnow();
thread.currentthread().interrupt();
對java.util.concurrent包進行更新,增加了新的輕量級任務執行架構fork/join和多階段線程同步工具。
1、輕量級任務執行架構fork/join
這個架構的目的主要是更好地利用底層平台上的多核和多處理器來進行并行處理。
通過分治算法或map/reduce算法來解決問題。
fork/join 類比于 map/reduce。
fork操作是把一個大的問題劃分為若幹個較小的問題,劃分過程一般為遞歸,直到可以直接進行計算的粒度适合的子問題;子問題在結算後,可以得到整個問題的部分解
join操作收集子結果,合并,得到完整解,也可能是 遞歸進行的。
相對一般的線程池實作,f/j架構的優勢在任務的處理方式上。在一般線程池中,一個線程由于某些原因無法運作,會等待;而在f/j,某個子問題由于等待另外一個子問題的完成而無法繼續運作,那麼處理該子問題的線程會主動尋找其他尚未運作的子問題來執行。這種方式減少了等待時間,提高了性能。
a、forkjointask類:表示一個由f/j架構執行的任務,該類實作了future接口,可以按照future接口的方式來使用。(表示任務)
fork(),異步方式啟動任務的執行。
join(),等待任務完成并傳回執行結果。
在建立自己的任務時,最好不要直接繼承自forkjointask,而是繼承其子類,recurivetask或recursiveaction,前者可以傳回結果,後者不行。
b、forkjoinpool類:表示任務執行,實作了executorservice接口,除了可以執行forkjointask,也可以執行callable和runnable。(任務執行)
執行任務的兩大類:
第一類:execute、invoke或submit方法:直接送出任務。
第二類:fork():運作forkjointask在執行過程中的子任務。
一般作法是表示整個問題的forkjointask用第一類送出,執行過程中産生的子任務不需要處理,forkjoinpool會負責子任務執行。
例:查找數組中的最大值
private static class maxvaluetask extends recursivetask<long>{
private final long[] array;
private final int start;
private final int end;
maxvaluetask(long[] array, int start, int end){
this.array = array;
this.start = start;
this.end = end;
//compute是recursivetask的主方法
protected long compute(){
long max = long.min_value;
if(end - start < rang_length){//尋找最大值
for(int i = start; i<end;i++{
if(array[i] > max){
max = array[i];
}else{// 二分任務
int mid = (start + end) /2;
maxvaluetask lowtask = new maxvaluetask(array, start , mid);
maxvaluetask hightask = new maxvaluetask(array, mid, end);
lowtask.fork();// 異步啟動任務
hightask.fork();
max = math.max(max, lowtask.join());//等待執行結果
max = math.max(max, hightask.join();
return max;
public long calculate(long[] array){
maxvaluetask task = new maxvaluetask(array, 0 , array.length);
long result = forkjoinpool.invoke(task);
注:這個例子是示例,但從性能上說直接對整個數組順序比較效率高,畢竟多線程所帶來的額外開銷過大。
在實際中,f/j架構發揮作用的場合很多,比如在一個目錄包含的所有文本中搜尋某個關鍵字,可以每個檔案建立一個子任務。
如果相關的功能可以用遞歸和分治來解決,就适合f/j。
2、多階段線程同步工具
phaser類是java se 7中新增的一個使用同步工具,功能和靈活性比倒數閘門和循環屏障要強很多。
在f/j架構中的子任務之間要進行同步時,應優先考慮phaser。
phaser把多個線程寫作執行的任務劃分成多個階段(phase),程式設計時要明确各個階段的任務,每個階段都可以有任意個參與者,線程可以随時注冊并參與到某個階段,當一個階段中所有線程都成功完成之後,phaser的onadvance()被調用,可以通過覆寫添加自定義處理邏輯(類似循環屏障的使用的runnable接口),然後phaser類會自動進入下個階段。如此循環,知道phaser不再包含任何參與者。
phaser建立後,初始階段編号為0,構造函數中指定初始參與個數。
register(),bulkregister(),動态添加一個或多個參與者。
arrive(),某個參與者完成任務後調用
arriveandderegister(),任務完成,取消自己的注冊。
arriveandawaitadvance(),自己完成等待其他參與者完成。,進入阻塞,直到phaser成功進入下個階段。
awaitadvance()、awaitadvanceinterruptibly(),等待phaser進入下個階段,參數為目前階段的編号,後者可以設定逾時和進行中斷請求。
另外,phaser的一個重要特征是多個phaser可以組成樹形結構,phaser提供了構造方法來指定目前對象的父對象;當一個子對象參與者>0,會自動注冊到父對象中;當=0,自動解除注冊。
例:從指定網址,下載下傳img标簽的照片
階段1、處理網址對應的html文本,和抽取img的連結;2、建立圖檔下載下傳子線程,主線程等待;3、子線程下載下傳圖檔,主線程等待;4、任務完成退出
public class webpageimagedownloader{
private final phaser phaser = new phaser(1);//初始參與數1,代表主線程。
public void download(url url, final path path) throws ioexception{
string content = getcontent(url);//獲得html文本,省略。
list<url> imageurls = extractimageurls(content);//獲得圖檔連結,省略。
for(final url imageurl : imageurls){
phaser.register();//子線程注冊
new thread(){
public void run(){
phaser.arriveandawaitadvance();//第二階段的等待,等待進入第三階段
try{
inputstream is = imageurl.openstream();
file.copy(is, getsavepath(path, imageurl), standardcopyoption.replace_existing);
}catch(ioexception e){
e.printstacktrace():
}finally{
phaser.arriveandderegister();//子線程完成任務,退出。
}.start();
phaser.arriveandawaitadvance();//第二階段等待,子線程在注冊
phaser.arriveandawaitadvance();//第三階段等待,子線程在下載下傳
phaser.arriveandderegister();//所有線程退出。
十一、threadlocal類
java.lang.threadlocal,線程局部變量,把一個共享變量變為一個線程的私有對象。不同線程通路一個threadlocal類的對象時,鎖通路和修改的事每個線程變量各自獨立的對象。通過threadlocal可以快速把一個非線程安全的對象轉換成線程安全的對象。(同時也就不能達到資料傳遞的作用了)。
a、get()和set()分别用來擷取和設定目前線程中包含的對象的值。
b、remove(),删除。
c、initialvalue(),初始化值。如果沒有通過set方法設定值,第一個調用get,會通過initvalue來擷取對象的初始值。
threadloacl的一般用法,建立一個threadlocal的匿名子類并覆寫initalvalue(),把threadloacl的使用封裝在另一個類中
public class threadlocalidgenerator{
private static final threadlocal<idgenerator> idgenerator = new threadlocal<idgenerator>(){
protected idgenerator initalvalue(){
return new idgenerator();//idgenerator 是個初始int value =0,然後getnext(){ return value++}
public static int getnext(){
return idgenerator.get().getnext();
threadloal的另外一個作用是建立線程唯一的對象,在有些情況,一個對象在代碼中各個部分都需要用到,傳統做法是把這個對象作為參數在代碼間傳遞,如果使用這個對i昂的代碼都在同一個線程,可以封裝在threadlocal中。
如:在多線程中,生成随機數
java.util.random會帶來競争問題,java.util.concurrent.threadlocalrandom類提供多線程下的随機數聲場,底層是threadloacl。
總結:多線程開發中應該優先使用高層api,如果無法滿足,使用java.util.concurrent.atomic和java.util.concurrent.locks包提供的中層api,而synchronized和volatile,以及wait,notify和notifyall等低層api 應該最後考慮。