day15多線程-02
1.線程池
1.1 線程狀态介紹
- 當線程被建立并啟動以後,它既不是一啟動就進入了執行狀态,也不是一直處于執行狀态。
- 線程對象在不同的時期有不同的狀态。那麼Java中的線程存在哪幾種狀态呢?
- Java中的線程狀态被定義在了java.lang.Thread.State枚舉類中,State枚舉類的源碼如下:
public class Thread {
public enum State {
/* 建立 */
NEW ,
/* 可運作狀态 */
RUNNABLE ,
/* 阻塞狀态 */
BLOCKED ,
/* 無限等待狀态 */
WAITING ,
/* 計時等待 */
TIMED_WAITING ,
/* 終止 */
TERMINATED;
}
// 擷取目前線程的狀态
public State getState() {
return jdk.internal.misc.VM.toThreadState(threadStatus);
}
}
- 通過源碼我們可以看到Java中的線程存在6種狀态,每種線程狀态的含義如下
- NEW 建立狀态
- RUNNABLE 就緒狀态
- BLOCKED 阻塞狀态
- WAITING 等待狀态
- TIMED_WAITING 限時等待狀态
- TERMINATED 結束狀态
1.2 線程池-基本原理
線程池也是可以看做成一個池子,在該池子中存儲很多個線程。為了提高性能,我們就可以采用線程池。線程池在啟動的時,會建立大量空閑線程,當我們向線程池送出任務的時,線程池就會啟動一個線程來執行該任務。等待任務執行完畢以後,線程并不會死亡,而是再次傳回到線程池中稱為空閑狀态。等待下一次任務的執行。
1.3 線程池-Executors預設線程池
使用Executors中所提供的靜态方法來建立線程池
- static ExecutorService newCachedThreadPool() 建立一個預設的線程池
代碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + "線程執行了...");
});
executorService.shutdown();
}
}
1.4 線程池-Executors建立指定上限的線程池
使用Executors中所提供的靜态方法來建立線程池
-
static ExecutorService newFixedThreadPool(int nThreads) :
建立一個指定最多線程數量的線程池
代碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Demo2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
System.out.println(pool.getPoolSize());
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"線程執行了...");
});
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"線程執行了...");
});
System.out.println(pool.getPoolSize());
executorService.shutdown();
}
}
1.5 線程池-ThreadPoolExecutor
- 建立線程池對象 : ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心線程數量,最大線程數量,空閑線程最 大存活時間,任務隊列,建立線程工廠,任務的拒絕政策);
代碼
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo3 {
public static void main(String[] args) {
ThreadPoolExecutor poor = new ThreadPoolExecutor(
// 參數一:核心線程數量
2,
// 參數二:最大線程數
5,
// 參數三:空閑線程最大存活時間
3,
// 參數四:時間機關
TimeUnit.SECONDS,
// 參數五:任務隊列
new ArrayBlockingQueue<>(5),
// 參數六:建立線程工廠
Executors.defaultThreadFactory(),
// 參數七:任務的拒絕政策
new ThreadPoolExecutor.AbortPolicy());
System.out.println(poor.getPoolSize());
poor.submit(new myThread());
poor.submit(new myThread());
System.out.println(poor.getPoolSize());
poor.shutdown();
}
}
class myThread implements Runnable {
@Override
public void run() {
System.out.println("漢堡包真好吃");
}
}
1.6 線程池-參數詳解
- corePoolSize: 核心線程的最大值,不能小于0
- maximumPoolSize:最大線程數,不能小于等于0,maximumPoolSize >= corePoolSize
- keepAliveTime: 空閑線程最大存活時間,不能小于0
- unit: 時間機關
- workQueue: 任務隊列,不能為null
- threadFactory: 建立線程工廠,不能為null
- handler: 任務的拒絕政策,不能為null
1.7 線程池-非預設任務拒絕政策
- RejectedExecutionHandler是jdk提供的一個任務拒絕政策接口,它下面存在4個子類
-
ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出
RejectedExecutionException異常,是預設的政策。
- ThreadPoolExecutor.DiscardPolicy:丢棄任務,但是不抛出異常,這是不推薦的做法。
- ThreadPoolExecutor.DiscardOldestPolicy:抛棄隊列中等待最久的任務,然後把目前任務加入隊列中。
- ThreadPoolExecutor.CallerRunsPolicy: 調用任務的run()方法繞過線程池直接執行。
- 注:明确線程池可執行的任務數 = 隊列容量 + 最大線程數
案例示範1:示範ThreadPoolExecutor.AbortPolicy任務處理政策
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo4 {
public static void main(String[] args) {
ThreadPoolExecutor poor = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <= 11; i++) {
int j = i;
poor.submit(() -> {
System.out.println(j + Thread.currentThread().getName() + "程式執行了");
});
}
poor.shutdown();
}
}
案例示範2:示範ThreadPoolExecutor.DiscardPolicy任務處理政策 new ThreadPoolExecutor.DiscardPolicy()
案例示範3:示範ThreadPoolExecutor.DiscardOldestPolicy任務處理政策 new ThreadPoolExecutor.DiscardOldestPolicy()
案例示範4:示範ThreadPoolExecutor.CallerRunsPolicy任務處理政策 new ThreadPoolExecutor.CallerRunsPolicy()
2. 原子性
2.1 volatile-問題
2.2 volatile解決以上案例出現的問題
- 當A線程修改了共享資料時,B線程沒有及時擷取到最新的值,如果還在使用原先的值,就會出現問題
- 1.堆記憶體是唯一的,每一個線程都有自己的線程棧。
- 2.每一個線程在使用堆裡面變量的時候,都會先拷貝一份到變量的副本中。
- 3.線上程中,每一次使用是從變量的副本中擷取的。
- Volatile關鍵字:強制線程每次在使用的時候,都會看一下共享區域最新的值
2.3 synchronized解決
-
1.線程獲得鎖
2.清空變量副本
3.拷貝共享變量最新的值到變量副本中
4.執行代碼
5.将修改後變量副本中的值指派給共享資料
6.釋放鎖
2.4 原子性
概述
所謂的原子性是指在一次操作或者多次操作中,要麼所有的操作全部都得到了執行并且不會受到任何因素的
幹擾而中斷,要麼所有的操作都不執行,多個操作是一個不可以分割的整體
代碼
public class IceCream implements Runnable {
public static volatile int counts = 0;
@Override
public void run() {
for (int i = 0; i < 100; i++) {
counts++;
System.out.println("已經送了" + counts + "個冰淇淋");
}
}
}
class Test {
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
IceCream iceCream = new IceCream();
Thread tr = new Thread(iceCream);
tr.start();
}
}
}
- 代碼總結 : count++ 不是一個原子性操作, 他在執行的過程中,有可能被其他線程打斷
2.5 volatile關鍵字不能保證原子性
-
解決方案 :
我們可以給count++操作添加鎖,那麼count++操作就是臨界區中的代碼,臨界區中的代碼一次隻能被一個線程去執行,是以count++就變成了原子操作。
代碼
public class IceCream implements Runnable {
public static volatile int counts = 0;
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (this) {
counts++;
System.out.println("已經送了" + counts + "個冰淇淋");
}
}
}
}
2.6 原子性_AtomicInteger
-
概述:java從JDK1.5開始提供了java.util.concurrent.atomic包(簡稱Atomic包),這個包中的原子操作類提供了一種用法簡單,性能高效,
線程安全地更新一個變量的方式。AtomicInteger的常用方法如下:
- public AtomicInteger():初始化一個預設值為0的原子型Integer
- public AtomicInteger(int initialValue):初始化一個指定值的原子型Integer
- int get():擷取值
- int getAndIncrement():以原子方式将目前值加1,注意,這裡傳回的是自增前的值。
- int incrementAndGet():以原子方式将目前值加1,注意,這裡傳回的是自增後的值。
- int addAndGet(int data):以原子方式将輸入的數值與執行個體中的值(AtomicInteger裡的value)相加,并傳回結果。
- int getAndSet(int value):以原子方式設定為newValue的值,并傳回舊值。
代碼
import java.util.concurrent.atomic.AtomicInteger;
public class Demo6 {
public static void main(String[] args) {
// AtomicInteger ai = new AtomicInteger();//預設值為0
AtomicInteger ai1 = new AtomicInteger(10);//初始值為10
System.out.println(ai1.get());//獲得初始值10
// System.out.println(ai1.incrementAndGet());//先自增後擷取,11
// System.out.println(ai1.getAndIncrement());//先擷取後自增,10
// System.out.println(ai1.addAndGet(10));//先添加值後擷取,20
// System.out.println(ai1.getAndAdd(10));//先擷取值後添加,10
System.out.println(ai1.getAndSet(20));//先擷取後将值修改,10
System.out.println(ai1.get());//20
}
}
2.7 AtomicInteger-記憶體解析
- AtomicInteger原理:自旋鎖 + CAS 算法
CAS算法:
- 有3個操作數(記憶體值V, 舊的預期值A,要修改的值B)
- 當舊的預期值A == 記憶體值:此時修改成功,将V改為B
- 當舊的預期值A!=記憶體值:此時修改失敗,不做任何操作,并重新擷取現在的最新值(這個重新擷取的動作就是自旋)
2.8 AtomicInteger-源碼解析
案例代碼
import java.util.concurrent.atomic.AtomicInteger;
public class IceCream2 implements Runnable {
public AtomicInteger ai;
public IceCream2(AtomicInteger ai) {
this.ai = ai;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("已經送了" + ai.incrementAndGet() + "個冰淇淋");
}
}
}
class Test2 {
public static void main(String[] args) {
AtomicInteger ai = new AtomicInteger();
for (int i = 0; i < 100; i++) {
IceCream2 iceCream2 = new IceCream2(ai);
Thread tr = new Thread(iceCream2);
tr.start();
}
}
}
源碼解析
//先自增,然後擷取自增後的結果
public final int incrementAndGet() {
//+ 1 自增後的結果
//this 就表示目前的atomicInteger(值)
//1 自增一次
return U.getAndAddInt(this, VALUE, 1) + 1;
}
public final int getAndAddInt(Object o, long offset, int delta) {
//v 舊值 int v;
//自旋的過程
do {
//不斷的擷取舊值
v = getIntVolatile(o, offset);
//如果這個方法的傳回值為false,那麼繼續自旋
//如果這個方法的傳回值為true,那麼自旋結束
//o 表示的就是記憶體值
//v 舊值
//v + delta 修改後的值
} while (!weakCompareAndSetInt(o, offset, v, v + delta)); //作用:比較記憶體中的值,舊值是否相等,如果相等就把修改後的值寫到記憶體中,傳回true。表示修改成功。
// 如果不相等,無法把修改後的值寫到記憶體中,傳回false。表示修改失敗。
//如果修改失敗,那麼繼續自旋。
return v;
}
2.9 悲觀鎖和樂觀鎖
synchronized和CAS的差別
- 相同點:在多線程情況下,都可以保證共享資料的安全性。
-
不同點:
synchronized總是從最壞的角度出發,認為每次擷取資料的時候,别人都有可能修改。是以在每次操作共享資料之前,都會上鎖。(悲觀鎖)
cas是從樂觀的角度出發,假設每次擷取資料别人都不會修改,是以不會上鎖。隻不過在修改共享資料的時候,會檢查一下,别人有沒有修改過這個資料。
如果别人修改過,那麼我再次擷取現在最新的值。 如果别人沒有修改過,那麼我現在直接修改共享資料的值.(樂觀鎖)
3.并發工具類
3.1 并發工具類-Hashtable
Hashtable出現的原因
- 在集合類中HashMap是比較常用的集合對象,但是HashMap是線程不安全的(多線程環境下可能會存在問題)。
- 為了保證資料的安全性我們可以使用Hashtable,但是Hashtable的效率低下。
3.2 并發工具類-ConcurrentHashMap
基本使用
- ConcurrentHashMap出現的原因:在集合類中HashMap是比較常用的集合對象,但是HashMap是線程不安全的(多線程環境下可能會存在問題)。
- 為了保證資料的安全性我們可以使用Hashtable,但是Hashtable的效率低下。基于以上兩個原因我們可以使用JDK1.5以後所提供的ConcurrentHashMap。
總結
-
1.HashMap是線程不安全的。多線程環境下會有資料安全問題
2.Hashtable是線程安全的,但是會将整張表鎖起來,效率低下
3.ConcurrentHashMap也是線程安全的,效率較高。在JDK7和JDK8中,底層原理不一樣。
import java.util.concurrent.ConcurrentHashMap;
public class Demo7 {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, String> chm = new ConcurrentHashMap<>();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 25; i++) {
chm.put(i + "", i + "");
}
});
Thread thread2 = new Thread(() -> {
for (int i = 25; i < 51; i++) {
chm.put(i + "", i + "");
}
});
thread1.start();
thread2.start();
System.out.println("---------------------");
Thread.sleep(1000);
for (int i = 0; i < 51; i++) {
System.out.println(chm.get(i + ""));
}
}
}
3.3 并發工具類-ConcurrentHashMap1.7原理
建立對象
- 1.預設建立一個長度16,加載因子為0.75的大數組,這個大數組一旦建立無法擴容
- 2.還會建立一個長度為2的小數組,把位址值指派給0索引處,其他索引位置的元素都是null
注意事項
- 如果為null,則按照模闆建立小數組.二次哈希,計算出在小數組中應存入的位置.
- 如果需要擴容,則将小數組擴容兩倍;如果不需要擴容,則就會判斷小數組的這個位置有沒有元素.
- 如果沒有元素,則直接存.如果有元素,就會調用equals方法,比較屬性值.如果equals為true,則不存.
-
如果equals為false,形成哈希桶結構
3.4 并發工具類-ConcurrentHashMap1.8原理
總結
-
1.如果使用空參構造建立ConcurrentHashMap對象,則什麼事情都不做在第一次添加元素的時候建立哈希表
2.計算目前元素應存入的索引。
3.如果該索引位置為null,則利用cas算法,将本結點添加到數組中。
4.如果該索引位置不為null,則利用volatile關鍵字獲得目前位置最新的結點位址,挂在他下面,變成連結清單。
5.當連結清單的長度大于等于8時,自動轉換成紅黑樹6,以連結清單或者紅黑樹頭結點為鎖對象,配合悲觀鎖保證多線程,操作集合時資料的安全性
3.5 并發工具類-CountDownLatch
CountDownLatch類 :
- public CountDownLatch(int count) 參數傳遞線程數,表示等待線程數量
- public void await() 讓線程等待
- public void countDown() 目前線程執行完畢
- 使用場景:讓某一條線程等待其他線程執行完畢之後再執行
代碼
import java.util.concurrent.CountDownLatch;
public class MotherThread extends Thread{
private CountDownLatch cdt;
public MotherThread(CountDownLatch cdt) {
this.cdt = cdt;
}
@Override
public void run() {
try {
cdt.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.CountDownLatch;
public class ChildThread1 extends Thread {
private CountDownLatch cdl;
public ChildThread1(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "個餃子");
}
cdl.countDown();
System.out.println(Thread.currentThread().getName() + "吃完餃子了");
}
}
import java.util.concurrent.CountDownLatch;
public class ChildThread2 extends Thread {
private CountDownLatch cdl;
public ChildThread2(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "個餃子");
}
cdl.countDown();
System.out.println(Thread.currentThread().getName() + "吃完餃子了");
}
}
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdt = new CountDownLatch(2);
MotherThread mt = new MotherThread(cdt);
mt.start();
ChildThread1 ct1 = new ChildThread1(cdt);
ct1.setName("小王");
ct1.start();
ChildThread2 ct2 = new ChildThread2(cdt);
ct2.setName("小李");
ct2.start();
Thread.sleep(2000);
System.out.println("媽媽收拾飯桌");
}
}
總結
- CountDownLatch(int count):參數寫等待線程的數量。并定義了一個計數器。
- await():讓線程等待,當計數器為0時,會喚醒等待的線程
- countDown(): 線程執行完畢時調用,會将計數器-1。
3.6 并發工具類-Semaphore
- 使用場景:可以控制通路特定資源的線程數量。
實作步驟
-
1.需要有人管理這個通道
2.當有車進來了,發通行許可證
3.當車出去了,收回通行許可證
4.如果通行許可證發完了,那麼其他車輛隻能等着
代碼
import java.util.concurrent.Semaphore;
public class SemaphoreDemo implements Runnable {
//1,需要有人管理這個通道
private Semaphore semaphore = new Semaphore(3);
@Override
public void run() {
//2,當有車進來了,發通行許可證
try {
semaphore.acquire();
System.out.println("獲得許可證,開始駕駛");
Thread.sleep(100);
//3,當車出去了,收回通行許可證
semaphore.release();
System.out.println("歸還許可證");
//4,如果通行許可證發完了,那麼其他車輛隻能等着*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Test {
public static void main(String[] args) {
SemaphoreDemo sd = new SemaphoreDemo();
for (int i = 0; i < 100; i++) {
new Thread(sd).start();
}
}
}