天天看點

技能提升:并發程式設計(四)

十六:ThradLoacl

1.簡單使用

public class Demo2 {
	private ThreadLocal<Integer> threadLocal=new ThreadLocal<Integer>() {
		protected Integer initialValue() {//初始化值
			Integer integer = new Integer(0);
			return integer;
		};
	};
	public int getNext() {
		Integer val = threadLocal.get();
		val++;
		threadLocal.set(val);
		return val;
	}
	
	public static void main(String[] args) {
		Demo2 demo2 = new Demo2();
		new Thread(new Runnable() {
			@Override
			public void run() {
				while (true) {
					int next = demo2.getNext();
					System.out.println(Thread.currentThread().getName()+"-----"+next);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				while (true) {
					int next = demo2.getNext();
					System.out.println(Thread.currentThread().getName()+"-----"+next);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}

}
           

2.ThreadLocal原了解析

get()方法源碼

public T get() {
	Thread t = Thread.currentThread();
	ThreadLocalMap map = getMap(t);//通過目前線程擷取ThreadLocalMap,ThreadLocalMap是ThreadLocal的靜态内部類
	if (map != null) {
		ThreadLocalMap.Entry e = map.getEntry(this);//通過ThreadLocal對象擷取Entry
		if (e != null) {
			@SuppressWarnings("unchecked")
			T result = (T)e.value;//獲得Entry的value
			return result;
		}
	}
	return setInitialValue();//ThreadLocalMap為null,調用setInitialValue方法
}

ThreadLocalMap getMap(Thread t) {
	return t.threadLocals;//ThreadLocalMap是Thread内部的成員變量
}

static class Entry extends WeakReference<ThreadLocal<?>> {//Entry是ThreadLocalMap的靜态内部類
	Object value;
	Entry(ThreadLocal<?> k, Object v) {//構造方法需要存入ThreadLocal對象及線程綁定的值
		super(k);
		value = v;
	}
}

 private T setInitialValue() {
	T value = initialValue();//調用ThreadLocal的initialValue方法,獲得初始值
	Thread t = Thread.currentThread();
	ThreadLocalMap map = getMap(t);
	if (map != null)
		map.set(this, value);//設定值
	else
		createMap(t, value);//建立ThreadLocalMap
	return value;
}

void createMap(Thread t, T firstValue) {
	t.threadLocals = new ThreadLocalMap(this, firstValue);//建立ThreadLocalMap綁定到Thread
}
           

set()方法源碼解析

public void set(T value) {
	Thread t = Thread.currentThread();
	ThreadLocalMap map = getMap(t);//擷取ThreadLocalMap
	if (map != null)
		map.set(this, value);//将目前ThreadLocal對象及綁定的值設定到Entry
	else
		createMap(t, value);//建立擷取ThreadLocalMap
}
           

remove()方法源碼解析

public void remove() {
	 ThreadLocalMap m = getMap(Thread.currentThread());
	 if (m != null)
		 m.remove(this);//直接将目前ThreadLocal對象移除
}
           

3. ThreadLocal引起記憶體溢出

 ThreadLocal的内部是ThreadLocalMap,ThreadLocalMap内部是由一個Entry數組組成,Entry類的構造函數為 Entry(弱引用的ThreadLocal對象, Object value對象),因為Entry的key是一個弱引用的ThreadLocal對象,是以在 垃圾回收 之前,将會清除此Entry對象的key,那麼ThreadLocalMap 中就會出現 key 為 null 的 Entry,就沒有辦法通路這些 key 為 null 的 Entry 的 value,這些 value 被Entry對象引用,是以value所占記憶體不會被釋放。若在指定的線程任務裡面,調用ThreadLocal對象的get()、set()、remove()方法,可以避免出現記憶體洩露。

         下圖虛線表示弱引用,ThreadLocal對象被GC回收了,那麼key變成了null。Map又是通過key拿到的value的對象。是以,GC在回收了key所占記憶體後,沒法通路到value的值,因為需要通過key才能通路到value對象。另外,如圖所示的引用鍊:CurrentThread -- Map -- Entry -- value ,是以,在目前線程沒有被回收的情況下,value所占記憶體也不會被回收。是以可能會造成了記憶體溢出。

技能提升:并發程式設計(四)

虛線表示是弱引用,弱引用隻要繼承WeakReference<T>類即可。是以說,當ThreadLocal對象被GC回收了以後,Entry對象的key就變成null了。這個時候沒法通路到 Object Value了。并且最緻命的是,Entry持有Object value。是以,value的記憶體将不會被釋放。

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;
 
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}
           

         因為上述的原因,在ThreadLocal這個類的get()、set()、remove()方法,均有實作回收 key 為 null 的 Entry 的 value所占的記憶體。是以,為了防止記憶體洩露(沒法通路到的記憶體),在不會再用ThreadLocal的線程任務末尾,調用一次 上述三個方法的其中一個即可,是以,可以了解到為什麼JDK源碼中要把Entry對象,用 弱引用的ThreadLocal對象,設計為key,那是因為要手動編寫代碼釋放ThreadLocalMap中 key為null的Entry對象,綜上所述:JVM就會自動回收某些對象将其置為null,進而避免OutOfMemory的錯誤。弱引用的對象可以被JVM設定為null。我們的代碼通過判斷key是否為null,進而 手動釋放 記憶體洩露的記憶體。

為什麼要将ThreadLocal設計為弱引用?因為弱引用的對象的生命周期直到下一次垃圾回收之前被回收。弱引用的對象将會被置為null。我們可以通過判斷弱引用對象是否已經為null,來進行相關的操作。在ThreadLocalMap中,如果鍵ThreadLocal已經被回收,說明ThreadLocal對象已經為null,是以其對應的值已經無法被通路到。這個時候,需要及時手動編寫代碼清理掉這個鍵值對,防止記憶體洩露導緻的記憶體溢出。

技能提升:并發程式設計(四)

十七:并發工具類使用及詳解

1.CountDownLatch(維護一個存儲器,當存儲器計數為0是,喚醒被阻塞的線程)

//a.txt
//20,30,54,21,87
//52,52,5,52,62
//12,5,55,12,14,78,54
//對文本中所有數字累計求和,一個線程計算一行,主線程計算總和
public class Demo3{
	
	private int[] nums;
	private int index;
	
	public Demo3(int line) {
		this.nums = new int[line];
	}
	public void add(String line,CountDownLatch countDownLatch) {
		System.out.println(Thread.currentThread().getName()+"開始執行計算任務-----");
		String[] ns=line.split(",");
		int total=0;
		for (String num : ns) {
			total+=Integer.parseInt(num);//求和
		}
		nums[index]=total;//将每一行求得的和加入到數組
		index++;
		System.out.println(Thread.currentThread().getName()+"結束執行計算任務,讀取的行為:"+line+",計算結果為:"+total);
		countDownLatch.countDown();//儲存器的計數-1,當減為0時,主線程就被喚醒執行
	}
	
	public void sum() {
		int totalNum=0;
		for (int i = 0; i < nums.length; i++) {
			totalNum+=nums[i];
		}
		System.out.println("最終結果為num="+totalNum);
	}
	
	public static List<String> readLine(){
		List<String> list = new ArrayList<String>();
		String line="";
		BufferedReader bufferedReader=null;
		try {
			bufferedReader = new BufferedReader(new FileReader("D:\\a.txt"));
			while((line=bufferedReader.readLine())!=null) {
				list.add(line);
			}
		}catch (Exception e) {
			e.printStackTrace();
		}finally {
			if (bufferedReader!=null) {
				try {
					bufferedReader.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		return list;
	}

	public static void main(String[] args) {
		List<String> readLine = readLine();
		Demo3 demo3 = new Demo3(readLine.size());
		CountDownLatch countDownLatch = new CountDownLatch(readLine.size());//設定存儲器計數
		for (int i = 0; i < readLine.size(); i++) {
			final int j=i;
			new Thread(new Runnable() {
				@Override
				public void run() {
					demo3.add(readLine.get(j),countDownLatch);
				}
			}).start();
		}
		
		try {
			countDownLatch.await();//其他線程沒有執行完畢,主線程等待
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		demo3.sum();
	}

}
           

源碼解析:

 對于CountDownLatch我們關心的就是await()方法及countDown()方法,而CountDownLatch存在一個内部類Sync

技能提升:并發程式設計(四)

可以發現,其内部實作了tryAcquireShared()方法及tryReleaseShared()方法,所有CountDownLatch也是基于共享式隊列同步器實作的。

await()方法:

public boolean await(long timeout, TimeUnit unit)
	throws InterruptedException {
	return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//調用同步器的tryAcquireSharedNanos
}

public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	if (tryAcquireShared(arg) < 0)//調用CountDownLatch内部同步器的tryAcquireShared()方法
		doAcquireSharedInterruptibly(arg);//不能擷取同步器,則将目前線程加入到等待隊列中
}

protected int tryAcquireShared(int acquires) {
	return (getState() == 0) ? 1 : -1;//共享模式下,詢問同步器目前status的值的狀态,來表示現在是否能獲得同步器
}

// 在可中斷的共享模式下的擷取
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 用目前線程建立一個共享節點并加入等待隊列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // node 節點的前一個節點
            final Node p = node.predecessor();
            // 前一個節點為頭結點時,說明 node 節點在等待隊列的最前方
            // (等待隊列的頭結點是一個 waitStatus == 0 的預設節點)
            // 再次嘗試擷取同步器,成功的話就...
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 節點 node 可以 park 時,就 park node 中的線程
            // 最後檢查線程的中斷狀态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

 countDown()方法:

public void countDown() {
	sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {//調用CountDownLatch内部同步器的tryReleaseShared()方法
		doReleaseShared();// 喚醒非取消狀态的非空的後繼節點
		return true;
	}
	return false;
}

// 共享模式下,修改同步器的 status 值,來表示釋放的動作
protected boolean tryReleaseShared(int releases) {
	for (;;) {
		int c = getState();
		if (c == 0)// 如果同步鎖的 status 已經為 0,就傳回 false。
			return false;
		int nextc = c-1;// 否則目前線程将 status減 1;
		if (compareAndSetState(c, nextc))
			return nextc == 0;// 操作成功時,如果減小後的新值為 0,傳回 true,否則傳回 false
	}
}

// 喚醒非取消狀态的非空的後繼節點
private void doReleaseShared() {
    for (;;) {
        // 從頭節點開始,如果隊列為空或者隻有一個節點,就不需要處理
        Node h = head;
        // 頭結點之後還有其他有效節點時
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // h 節點的等待狀态為 SIGNAL 時,修改等待狀态為 0,
            // CAS 失敗時就繼續循環,重新檢查
            // 修改成功時,unpark 後繼節點
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // h 節點的等待狀态為 0 時,修改 h 節點的等待狀态為 PROPAGATE
            // CAS 失敗時就繼續循環,重新檢查
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 頭節點被更新了,就繼續循環
        if (h == head)                   
            break;
    }
}

// unpark node 的後繼節點
private void unparkSuccessor(Node node) {
    // 如果 node 的等待狀态為負值,嘗試 CAS 操作修改為 0
    // 操作失敗或者 waitStatus 被其他等待線程修改了也是沒有關系的
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 要喚醒的線程包裝在後繼節點中,通常就是 node.next
    // 但是如果 node 的下一個節點被取消了,或者就是 null 的話,
    // 就要從隊列的尾節點從後往前周遊,找到有效的非取消狀态的後繼節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 找到後繼節點後,調用 native 方法,目前線程給後繼線程發放“許可”
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

2.CyclicBarrier(維護一個公共屏障點,先到屏障點的線程則阻塞,所有線程到達後在執行之後任務)

//模拟開會,所有線程到達後,在開會
public class Demo4{
	
	public void meeting(CyclicBarrier cyclicBarrier) {
		System.out.println(Thread.currentThread().getName()+"線程到達會議室(屏障點)-------");
		try {
			cyclicBarrier.await();
		} catch (Exception e) {
			e.printStackTrace();
		} 
		System.out.println(Thread.currentThread().getName()+"開始發言-----");
	}
	public static void main(String[] args) {
		Demo4 demo4 = new Demo4();
		CyclicBarrier cyclicBarrier = new CyclicBarrier(10,new Runnable() {
			@Override
			public void run() {
				System.out.println("開始開會啦--------");
			}
		});
		for (int i = 0; i < 10; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					demo4.meeting(cyclicBarrier);
				}
			}).start();
		}
	}
}
           

原了解析:

CyclicBarrier内部使用了ReentrantLock和Condition兩個類。它有兩個構造函數,最終都調用CyclicBarrier(int parties, Runnable barrierAction)

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
    this(parties, null);
}
           

CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程使用await()方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。

CyclicBarrier(int parties, Runnable barrierAction),用于所有線程到達屏障時,優先執行barrierAction。

await()方法

技能提升:并發程式設計(四)

dowait()方法,通過ReentrantLock及Condition實作

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 擷取獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 目前代
        final Generation g = generation;
        // 如果這代損壞了,抛出異常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果線程中斷了,抛出異常
        if (Thread.interrupted()) {
            // 将損壞狀态設定為true
            // 并通知其他阻塞在此屏障點上的線程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 擷取下标(構造方法傳入的線程數--)
        int index = --count;
        // 如果是 0,說明最後一個線程調用了該方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;//獲得構造方法傳入的Runnable對象
                // 執行屏障點任務
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 喚醒之前等待的線程
                nextGeneration();
                return 0;
            } finally {
                // 如果執行屏障點任務的時候失敗了,就将損壞狀态設定為true
                if (!ranAction)
                    breakBarrier();
            }
        }
        for (;;) {
            try {
                 // 如果沒有時間限制,則直接等待,直到被喚醒
                if (!timed)
                    trip.await();//trip為Condition,目前線程阻塞
                // 如果有時間限制,則等待指定時間
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 目前代沒有損壞
                if (g == generation && ! g.broken) {
                    // 讓屏障點失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面條件不滿足,說明這個線程不是這代的
                    // 就不會影響目前這代屏障點的執行,是以,就打個中斷标記
                    Thread.currentThread().interrupt();
                }
            }
 
            // 當有任何一個線程中斷了,就會調用breakBarrier方法
            // 就會喚醒其他的線程,其他線程醒來後,也要抛出異常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常換代了,傳回目前線程所在屏障點的下标
            // 如果 g == generation,說明還沒有換代,那為什麼會醒了?
            // 因為一個線程可以使用多個屏障點,當别的屏障點喚醒了這個線程,就會走到這裡,是以需要判斷是否是目前代。
            // 正是因為這個原因,才需要generation來保證正确。
            if (g != generation)
                return index;
            
            // 如果有時間限制,且時間小于等于0,銷毀屏障點并抛出異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放獨占鎖
        lock.unlock();
    }
}

private static class Generation {
    boolean broken = false;
}

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();//喚醒所有阻塞線程
}

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();//重置generation
}
           

CyclicBarrier和CountDownLatch的差別:

CountDownLatch的計數器隻能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,可以使用多次,是以CyclicBarrier能夠處理更為複雜的場景;

CyclicBarrier還提供了一些其他有用的方法,比如getNumberWaiting()方法可以獲得CyclicBarrier阻塞的線程數量,isBroken()方法用來了解阻塞的線程是否被中斷;

CountDownLatch允許一個或多個線程等待一組事件的産生,而CyclicBarrier用于等待其他線程運作到屏障點位置。

3.Semaphore(用于指定多少線程執行)

//一個方法隻能有N個線程執行,其他線程隻能等待,當一個線程執行完畢,就會有一個線程執行方法
public class Demo5{
	
	public void method(Semaphore semaphore) {
		try {
			semaphore.acquire();//擷取許可
			System.out.println(Thread.currentThread().getName()+"執行--------");
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally {
			semaphore.release();//釋放許可,其他線程可進入
		}
	}
	public static void main(String[] args) {
		Demo5 demo5 = new Demo5();
		Semaphore semaphore = new Semaphore(10);//指定同時執行的線程數
		while(true) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					demo5.method(semaphore);
				}
			}).start();
		}
	}
}
           

源碼解析:

構造方法

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
public Semaphore(int permits, boolean fair) {
	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
           

Semaphore底層也是通過AQS實作的,其内部維護公平同步器及非公平同步器,可通過構造方法的fair參數指定,預設使用非公平同步器實作。

acquire()方法

public void acquire() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	if (tryAcquireShared(arg) < 0)
		doAcquireSharedInterruptibly(arg);
}

//公平同步器的tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
	for (;;) {
		if (hasQueuedPredecessors())//如果前面有線程再等待,直接傳回-1
			return -1;
		int available = getState();//擷取剩餘許可數量
		int remaining = available - acquires;//-1
		if (remaining < 0 ||
			compareAndSetState(available, remaining))//如果許可不夠或者可以将許可數量重置的話,傳回
			return remaining;
	}
}

//非公平同步器的tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
	return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
	for (;;) {
		//對于公平同步器就少了一不判斷之前是否有線程等待
		int available = getState();
		int remaining = available - acquires;
		if (remaining < 0 ||
			compareAndSetState(available, remaining))
			return remaining;
	}
}

//如果tryAcquireShared傳回<0,則阻塞目前線程
private void doAcquireSharedInterruptibly(int arg)
	throws InterruptedException {
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())//阻塞線程
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
           

從上面可以看到,公平同步器與非公平同步器的差別就在于會首先判斷目前隊列中有沒有線程在等待,如果有,就老老實實進入到等待隊列;而不像非公平同步器一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。看完了擷取許可後,再看一下釋放許可。

release()方法

public void release() {
	sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		doReleaseShared();
		return true;
	}
	return false;
}

protected final boolean tryReleaseShared(int releases) {
	for (;;) {
		int current = getState();//獲得許可數
		int next = current + releases;
		if (next < current) // overflow
			throw new Error("Maximum permit count exceeded");
		if (compareAndSetState(current, next))
			return true;
	}
}

private void doReleaseShared() {
	for (;;) {
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			if (ws == Node.SIGNAL) {
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				unparkSuccessor(h);//喚醒節點
			}
			else if (ws == 0 &&
					 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		if (h == head)                   // loop if head changed
			break;
	}
}
           

Semaphore是信号量,用于管理一組資源。其内部是基于AQS的共享模式,AQS的狀态表示許可證的數量,在許可證數量不夠時,線程将會被挂起;而一旦有一個線程釋放一個資源,那麼就有可能重新喚醒等待隊列中的線程繼續執行。

4.Exchanger(用于線程間資料交換)

public class Demo6{

	public void a(Exchanger<String> exchanger) {
		try {
			System.out.println("a方法執行------");
			Thread.sleep(2000);
			String reString="1234";
			System.out.println("等待對比結果--------");
			String exchange = exchanger.exchange(reString);
			System.out.println("a"+"\t"+exchange);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public void b(Exchanger<String> exchanger) {
		try {
			System.out.println("b方法執行-----");
			Thread.sleep(4000);
			String reString="12345";
			String exchange = exchanger.exchange(reString);
			System.out.println("b"+"\t"+exchange);
			System.out.println("開始比對資料---");
			System.out.println("比對結果為:"+exchange.equals(reString));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public static void main(String[] args) {
		Demo6 demo6 = new Demo6();
		Exchanger<String> exchanger = new Exchanger<String>();
		new Thread(new Runnable() {
			@Override
			public void run() {
				demo6.a(exchanger);
			}
		}).start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				demo6.b(exchanger);
			}
		}).start();
	}
}
           

十八:Future使用與原理

1.Future的使用

public class Demo {
	public static void main(String[] args) {
		Callable<Integer> callable=new Callable<Integer>() {
			@Override
			public Integer call() throws Exception {
				System.out.println("正在計算結果---------");
				Thread.sleep(1000);
				return 1;
			}
		};
		FutureTask<Integer> futureTask = new FutureTask<>(callable);//Future提前完成任務
		new Thread(futureTask).start();
		
		//執行其他邏輯
		System.out.println("主線程不能閑着------");
		
		try {
			Integer integer = futureTask.get();
			System.out.println("主線程幹完自己的事,獲得子線程執行結果為:"+integer);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
}
           

2.Future源碼解析

通過上面案例發現,Future是配合Callable使用的,而最後又将FutureTask交給Thread運作,對于Thread一般接收為Runnable接口。

技能提升:并發程式設計(四)

通過源碼發現FutureTask其實是Runnable的子類,那這裡就引出一個問題,Callable與Runnable的差別?

1)Callbale可以有傳回值及抛出異常,而Runnable不行。

進入FutureTask的構造方法,發現其中儲存了傳入進來的Callable對象

技能提升:并發程式設計(四)
技能提升:并發程式設計(四)

然後線程執行時,就會調用FutureTask的run方法,其中就調用了Callable的call()方法。 

2)Callable的call方法不是異步執行,是由FutureTask的run()方法調用的,而Runnable的run()方法是被線程調用的,是異步執行

通過閱讀源碼發現FutureTask存在兩個構造方法

技能提升:并發程式設計(四)
技能提升:并發程式設計(四)

發現第一個構造方法傳入Callable直接指派給成員變量,第二個構造方法則是傳入Runnable及一個泛型的V,通過Runnable及V包裝為Callable。

技能提升:并發程式設計(四)

通過Runnable及V通過RunnableAdapter适配為Callable。

技能提升:并發程式設計(四)

在RunnableAdapter中實作Callable接口,實作call()方法,調用Runnable的run()方法,傳回result。

在FutureTask中維護7種狀态,分别為

/*
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private static final int NEW          = 0;//初始化狀态
private static final int COMPLETING   = 1;//正在執行的狀态
private static final int NORMAL       = 2;//正常完成狀态
private static final int EXCEPTIONAL  = 3;//出現異常狀态
private static final int CANCELLED    = 4;//取消狀态
private static final int INTERRUPTING = 5;//正在中斷狀态
private static final int INTERRUPTED  = 6;//已經中斷狀态
           

在new Thread()時傳入FutureTask.,調用Thread的start()方法就會執行FutureTask的run()方法,下面就看看run()方法

public void run() {
	//首先狀态不等于new,并且将目前線程設定到成員變量runner上不成功,直接ruturn,否則執行下一步
	if (state != NEW ||
		!UNSAFE.compareAndSwapObject(this, runnerOffset,
									 null, Thread.currentThread()))
		return;
	try {
		Callable<V> c = callable;
		if (c != null && state == NEW) {
			V result;
			boolean ran;
			try {
				result = c.call();//執行通過FutureTask構造方法傳入的callable的call()方法
				ran = true;
			} catch (Throwable ex) {
				result = null;
				ran = false;
				setException(ex);//執行call()父是出現異常,則設定異常
			}
			if (ran)
				set(result);//如果正常執行就she執行結果
		}
	} finally {
		runner = null;//将目前線程置空
		int s = state;
		if (s >= INTERRUPTING)//判斷目前狀态是否大于等于INTERRUPTING
			handlePossibleCancellationInterrupt(s);
	}
}

//設定異常
protected void setException(Throwable t) {
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将狀态從new變為COMPLETING正在執行狀态
		outcome = t;//設定給成員變量outcome
		UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //在将狀态設定為EXCEPTIONAL,出現異常狀态
		//喚醒等待線程,等待線程也是被包裝為等待節點,放在等待隊列中,是在執行FutureTask的get()方法時,放入節點的
		finishCompletion();
	}
}

//設定結果
protected void set(V v) {
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将狀态從new變為COMPLETING正在執行狀态
		outcome = v;//設定給成員變量outcome,由此發現outcome不止存儲異常,還可以存儲執行結果
		UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //在将狀态設定為EXCEPTIONAL,正常執行完畢狀态
		//喚醒等待線程,等待線程也是被包裝為等待節點,放在等待隊列中,是在執行FutureTask的get()方法時,放入節點的
		finishCompletion();
	}
}
           

當run()方法執行完畢後就調用FutureTask的get()方法

public V get() throws InterruptedException, ExecutionException {
	int s = state;
	if (s <= COMPLETING)//判斷目前狀态是否<=COMPLETING,表示未完成狀态
		s = awaitDone(false, 0L);//如果為完成,則等待
	return report(s);//如果過大于COMPLETING狀态,表示完成,不區分正常完成或非正常完成
}
//帶有逾時時間
public V get(long timeout, TimeUnit unit)
	throws InterruptedException, ExecutionException, TimeoutException {
	if (unit == null)
		throw new NullPointerException();
	int s = state;
	if (s <= COMPLETING &&
		(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
		throw new TimeoutException();
	return report(s);
}

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
	Object x = outcome;//對于outcome不隻是存儲執行結果,還可能存儲執行時出現的異常
	if (s == NORMAL)//如果正常完成,直接傳回執行結果
		return (V)x;
	if (s >= CANCELLED)//如果狀态大于等于CANCELLED,抛出異常
		throw new CancellationException();
	throw new ExecutionException((Throwable)x);//否者,将執行Call()方法出現的異常直接抛出
}

//線程等待
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	WaitNode q = null;//等待隊列中的等待節點
	boolean queued = false;
	for (;;) {//死循環
		if (Thread.interrupted()) {//如果目前線程終止了
			removeWaiter(q);//移除等待節點,如果過第一執行循環,移除位null,之後的循環q就不為null
			throw new InterruptedException();
		}

		int s = state;
		if (s > COMPLETING) {//如果目前狀态>COMPLETING,表示call()執行完畢
			if (q != null)
				q.thread = null;//等待節點中的線程為null
			return s;//傳回目前狀态
		}
		else if (s == COMPLETING)//如果過狀态等于COMPLETING
			Thread.yield();//目前線程讓出CPU執行權,讓其他線程(優先級>=目前線程的線程)執行
		else if (q == null)//當q=null是,就建立等待節點,此時q不為null
			q = new WaitNode();
		else if (!queued)
			//将節點加入到等待隊列中
			queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
												 q.next = waiters, q);
		else if (timed) {//是否設定逾時時間
			nanos = deadline - System.nanoTime();
			if (nanos <= 0L) {//逾時
				removeWaiter(q);//移除等待節點
				return state;
			}
			LockSupport.parkNanos(this, nanos);//等待
		}
		else
			LockSupport.park(this);//線程等待
	}
}

static final class WaitNode {
	volatile Thread thread;//目前線程
	volatile WaitNode next;//下一個節點
	WaitNode() { thread = Thread.currentThread(); }
}
           

當調用get()方法後,線程處于等待狀态,隻有在執行call()方法完畢後,通過set()方法才會由finishCompletion()喚醒

private void finishCompletion() {
	for (WaitNode q; (q = waiters) != null;) {//循環判斷是否還有等待節點
		if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
			for (;;) {//死循環,喚醒所有節點
				Thread t = q.thread;
				if (t != null) {
					q.thread = null;//等待線程為null
					LockSupport.unpark(t);//喚醒線程
				}
				WaitNode next = q.next;//擷取下一個節點
				if (next == null)//當=null時,說明沒有下一個節點
					break;
				q.next = null; //下一個節點為null
				q = next;//将下一個節點指派給q,繼續循環
			}
			break;
		}
	}
	done();
	callable = null;
}
           

十九:Fork/Join架構

如果一個應用程式能夠被分解成多個子任務,而且結合多個子任務的結果就能夠得到最終的答案,那麼它就适合使用Fork/Join模式來實作。

//計算1-100的和,分任務相加,兩個兩個相加  1+2 3+4  4+5
public class Demo extends RecursiveTask<Integer> {
	private int begin;
	private int end;
	public Demo(int begin, int end) {
		this.begin = begin;
		this.end = end;
	}

	@Override
	protected Integer compute() {//拆分任務
		int sum=0;
		if (end-begin<=2) {//計算
			for (int i=begin; i<=end; i++) {
				sum+=i;
			}
		}else {
			//拆分
			Demo demo = new Demo(begin, (begin+end)/2);
			Demo demo2 = new Demo((begin+end)/2+1, end);
			//執行任務
			demo.fork();
			demo2.fork();
			//合并
			Integer join = demo.join();
			Integer join2 = demo2.join();
			sum=join+join2;
		}
		return sum;
	}
	
	public static void main(String[] args) {
		try {
			ForkJoinPool forkJoinPool = new ForkJoinPool();
			Future<Integer> future = forkJoinPool.submit(new Demo(1,100));
			System.out.println("-----------");
			System.out.println("計算的值為:"+future.get());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
	
}
           

二十:并發容器

1.CopyOnWriteArrayList

ArrayList線程不安全案例:

public class Demo4 {
	
	/**
	 * 會出現java.util.ConcurrentModificationException異常  并發修改異常
	 */
	public static void main(String[] args) {
		final List<String> list = new ArrayList<String>();
		for (int i = 0; i < 30; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					list.add(UUID.randomUUID().toString().substring(0, 8));
					System.out.println(list);
				}
			},String.valueOf(i)).start();
		}
	}

}
           
技能提升:并發程式設計(四)

解決辦法:

  • 使用Vector替代
  • 使用Collections提供的同步方法: Collections.synchronizedList(new ArrayList<String>());
  • 使用CopyOnWriteArrayList:List<String> list = new CopyOnWriteArrayList<String>();

CopyOnWrite容器即寫時複制的容器。通俗的了解是當我們往一個容器添加元素的時候,不直接往目前容器添加,而是先将目前容器進行Copy,複制出一個新的容器,然後新的容器裡添加元素,添加完元素之後,再将原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行并發的讀,而不需要加鎖,因為目前容器不會添加任何元素。是以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。

原理:

add()方法

public boolean add(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();//加鎖
	try {
		Object[] elements = getArray();//擷取數組
		int len = elements.length;
		Object[] newElements = Arrays.copyOf(elements, len + 1);//複制數組,長度+1
		newElements[len] = e;//設定元素
		setArray(newElements);//重新設定數組
		return true;
	} finally {
		lock.unlock();//釋放鎖
	}
}
           

讀的時候不需要加鎖,如果讀的時候有多個線程正在向CopyOnWriteArrayList添加資料,讀還是會讀到舊的資料,因為寫的時候不會鎖住舊的CopyOnWriteArrayList。

2.ConcurrentLinkedQueue

ConcurrentLinkedQueue是一個基于連結節點的無界線程安全隊列,它采用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部,當我們擷取一個元素時,它會傳回隊列頭部的元素。

源碼解析:

技能提升:并發程式設計(四)

首先發現在其構造函數中就建立了一個節點,是頭尾節點都執行這個建立的節點,對于node中儲存的是要存儲的值及下一個節點的位址。

入隊列:

技能提升:并發程式設計(四)
public boolean offer(E e) {
	checkNotNull(e);//判斷存儲的值是否為null,如果為null,抛出異常
	final Node<E> newNode = new Node<E>(e);//建立一個新節點
	for (Node<E> t = tail, p = t;;) {//定義兩個指針 p,t都指向tail
		Node<E> q = p.next;//q執行p的next,第一次建立時q=null
		if (q == null) {
			if (p.casNext(null, newNode)) {//p的下一個節點指向新建立的節點
				if (p != t) 
					casTail(t, newNode);//将tail指向想建立的節點
				return true;
			}
		}
		else if (p == q)
			p = (t != (t = tail)) ? t : head;
		else
			p = (p != t && t != (t = tail)) ? t : q;
	}
}
           

入隊主要做兩件事情,第一是将入隊節點設定成目前隊列尾節點的下一個節點。第二是更新tail節點,如果tail節點的next節點不為空,則将入隊節點設定成tail節點,如果tail節點的next節點為空,則将入隊節點設定成tail的next節點,是以tail節點不總是尾節點。 

出隊列:

技能提升:并發程式設計(四)
public E poll() {
	restartFromHead://标号
	for (;;) {
		for (Node<E> h = head, p = h, q;;) {//定義三個指針,h,q,p,h指向head 
			E item = p.item;//item等于p指向節點存儲的值
			if (item != null && p.casItem(item, null)) {//判斷item是否等于null,不等于null,設定p的item為null
				if (p != h) 
					updateHead(h, ((q = p.next) != null) ? q : p);//設定head節點,head後移
				return item;
			}
			else if ((q = p.next) == null) {//判斷p.next==null,不管if是否成功,q都指向p.next
				updateHead(h, p);
				return null;
			}
			else if (p == q)
				continue restartFromHead;//跳轉到标号
			else
				p = q;
		}
	}
}
           

從上圖可知,并不是每次出隊時都更新head節點,當head節點裡有元素時,直接彈出head節點裡的元素,而不會更新head節點。隻有當head節點裡沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變量來減少使用CAS更新head節點的消耗,進而提高出隊效率。