JUC多線程進階
1.什麼是JUC
源碼 + 官方文檔
JUC是 java util concurrent
業務:普通的線程代碼 Thread
Runnable: 沒有傳回值、效率相比于Callable 相對較低!
2.線程和程序
線程與程序的關系
程序是一個程式的執行;一個程序可以包含多個線程,線程是程式執行的最小機關(資源排程的最小機關)
java預設有幾個線程?
2個,main與GC
線程:開了一個程序Typore,寫字,自動儲存(線程負責的,輸入)
對于Java而言:Thread、Runnable、Callable
Java可以開啟線程嗎?
答:不能,調用Thread.start()方法實質上是調用的本地方法(native修飾),底層是C++來實作的,因為Java無法操作硬體。
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//本地方法,底層C++,Java無法直接操作硬體
private native void start0();
并發、并行
并發(多線程操作同一個資源)
- CPU一核,模拟出來多條線程。快速交替
并行(多個人一起幹事)
- CPU多核,多個線程可以同時執行。
package cn.fyyice.juc;
public class Test1 {
public static void main(String[] args) {
//擷取CPU核數
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并發程式設計的本質:充分利用CPU的資源
線程有幾個狀态
public enum State {
// 新生
NEW,
// 運作
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
// 逾時等待
TIMED_WAITING,
// 終止
TERMINATED;
}
wait與sleep的差別
1、來自不同的類
wait => Object
sleep => Thread
2、關于鎖的釋放
wait:會釋放鎖
sleep:睡覺了,抱着鎖睡覺,不會釋放!
3、使用的範圍是不同的
wait:必須在同步代碼塊中
sleep:可以在任何地方睡
4、是否需要捕獲異常
wait:不需要捕獲異常
sleep:必須要捕獲異常(超市等待)
線上程sleep過後,會讓出CPU資源。目的是不讓目前線程獨自霸占該程序所獲的CPU資源,以留一定時間給其他線程執行的機會。是以不會占用cpu。時間到了會正常傳回,線程處于就緒狀态,然後參與cpu排程,擷取到cpu資源之後就可以運作。
3.Lock鎖
在真正的多線程開發中(公司),線程就是一個單獨的資源類,沒有任何附屬的操作。
這裡以賣票的demo來進行講解
傳統synchronized解決
本質:排隊
package cn.fyyice.juc;
public class SaleTicket {
public static void main(String[] args) {
//并發:多線程操作一個資源類
Ticket t = new Ticket();
//@FunctionalInterface函數式接口,jdk 1.8 lombda表達式 (參數)->{代碼塊}
new Thread(()->{ for (int i = 0; i < 50; i++) t.sale();},"甲").start();
new Thread(()->{ for (int i = 0; i < 50; i++) t.sale();},"乙").start();
new Thread(()->{ for (int i = 0; i < 50; i++) t.sale();},"丙").start();
}
}
// OOP資源類
class Ticket{
private Integer ticket = 20;
public synchronized void sale(){
if (ticket > 0){
System.out.println(Thread.currentThread().getName()+"賣出了編号:"+ticket--+"的票,剩餘:"+ticket);
}
}
}
Lock接口
Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); }
當在不同範圍内發生鎖定和解鎖時,必須注意確定在鎖定時執行的所有代碼由try-finally或try-catch保護,以確定在必要時釋放鎖定。
Lock
實作提供了使用
synchronized
方法和語句的附加功能,通過提供非阻塞嘗試來擷取鎖(
tryLock()
),嘗試擷取可被中斷的鎖(
lockInterruptibly()
) ,以及嘗試擷取可以逾時(
tryLock(long, TimeUnit)
)。
LOCK中聲明的所有方法:
Modifier and Type | Method | Description |
---|---|---|
void | lock() | 獲得鎖。 |
void | lockInterruptibly() | 擷取鎖定,除非目前線程是 interrupted 。 |
Condition | newCondition() | 傳回一個新Condition綁定到該執行個體Lock執行個體。 |
boolean | tryLock() | 隻有在調用時才可以獲得鎖。 |
boolean | tryLock(long time, TimeUnit unit) | 如果在給定的等待時間内是空閑的,并且目前的線程尚未得到 interrupted,則擷取該鎖。 |
void | unlock() | 釋放鎖。 |
實作類:
所有已知實作類:
- ReentrantLock (可重入鎖—常用)
- ReentrantReadWriteLock.ReadLock (讀鎖)
- ReentrantReadWriteLock.WriteLock(寫鎖)
ReentrantLock源碼:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
NonfairSync:非公平鎖,可以插隊
FairSync:公平鎖,先來後到
package cn.fyyice.juc;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicket2 {
public static void main(String[] args) {
//并發:多線程操作一個資源類
Ticket2 t = new Ticket2();
//@FunctionalInterface函數式接口,jdk 1.8 lombda表達式 (參數)->{代碼塊}
new Thread(()->{ for (int i = 0; i < 30; i++) t.sale();},"A").start();
new Thread(()->{ for (int i = 0; i < 30; i++) t.sale();},"B").start();
new Thread(()->{ for (int i = 0; i < 30; i++) t.sale();},"C").start();
}
}
// OOP資源類
class Ticket2{
private Integer ticket = 20;
Lock lock = new ReentrantLock();
public void sale(){
//加鎖
lock.lock();
try {
if (ticket > 0){
System.out.println(Thread.currentThread().getName()+"賣出了編号:"+ticket--+"的票,剩餘:"+ticket);
}
}catch (Exception e){
e.printStackTrace();
}finally {
//解鎖
lock.unlock();
}
}
}
synchronized 與 Lock 差別
- Synchronized 是内置的Java關鍵字, Lock 是一個Java類
- Synchronized 無法判斷擷取鎖的狀态,Lock 可以判斷是否擷取到了鎖
- Synchronized 會自動釋放鎖, Lock 必須要手動釋放鎖!如果不釋放鎖,死鎖
- Synchronized 線程1(獲得鎖,但發生了阻塞),線程2會一直等待;Lock鎖就不一定會等待下去(tryLock方法,嘗試擷取鎖)
- Synchronized 可重入鎖,不可以中斷的,非公平;Lock 可重入鎖。可以判斷鎖,自己設定公平/非公平
- Synchronized 适合鎖少量的代碼同步問題;Lock 适合鎖大量的同步代碼
鎖是什麼,如何判斷鎖的是誰
4.生産者和消費者問題
線程之間的通信問題:生産者和消費者問題。 等待換新,通知喚醒
線程交替執行 A B 操作統一變量 num = 0
A num+1
B num-1
面試的:
單例模式、排序算法、生産者和消費者問題、死鎖
package cn.fyyice.juc.pc;
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
//判斷等待、業務、通知
class Data{
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0){
//等待
this.wait();
}
number ++;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0){
//等待
this.wait();
}
number --;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
this.notifyAll();
}
}
問題存在,A B C D四個線程。虛假喚醒!
新加入線程後,是否安全?
不安全。用if判斷的話,喚醒後線程會從wait之後的代碼開始運作,但是不會重新判斷if條件,直接繼續運作if代碼塊之後的代碼,而如果使用while的話,也會從wait之後的代碼運作,但是喚醒後會重新判斷循環條件,如果不成立再執行while代碼塊之後的代碼塊,成立的話繼續wait。
官方文檔解釋:
線程也可以喚醒,而不會被通知,中斷或逾時,即所謂的==虛假喚醒== 。 雖然這在實踐中很少會發生,但應用程式必須通過測試應該使線程被喚醒的條件來防範,并且如果條件不滿足則繼續等待。 換句話說,等待應該總是出現在循環中,就像這樣:
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
}
解決方法:
if 改為white!防止虛假喚醒
package cn.fyyice.juc.pc;
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//判斷等待、業務、通知
class Data{
private int number = 0;
public synchronized void increment() throws InterruptedException {
while (number != 0){
//等待
this.wait();
}
number ++;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0){
//等待
this.wait();
}
number --;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
this.notifyAll();
}
}
JUC 版本的生産者和消費者問題
傳統的:Synchronized wait notifyAll/notify
新版:Lock await signal
官方文檔:
public interface Condition
Condition
因素出
Object
螢幕方法(
wait
,
notify
和
notifyAll
)成不同的對象,以得到具有多個等待集的每個對象,通過将它們與使用任意的組合的效果
Lock
個實作。
Lock
替換
synchronized
方法和語句的使用,
Condition
取代了對象螢幕方法的使用。
條件(也稱為條件隊列或條件變量 )為一個線程暫停執行(“等待”)提供了一種方法,直到另一個線程通知某些狀态現在可能為真。 因為通路此共享狀态資訊發生在不同的線程中,是以它必須被保護,是以某種形式的鎖與該條件相關聯。 等待條件的關鍵屬性是它原子地釋放相關的鎖并挂起目前線程,就像
Object.wait
。
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally { lock.unlock(); }
}
public Object take() throws InterruptedException {
lock.lock(); try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally { lock.unlock(); }
}
}
代碼實作:
package cn.fyyice.juc.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A1").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B1").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C1").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D1").start();
}
}
//判斷等待、業務、通知
class Data2{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0){
//等待
condition.await();
}
number ++;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0){
//等待
condition.await();
}
number --;
System.out.println(Thread.currentThread().getName()+"-------->"+number);
//通知 業務完成
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
任何一個新的技術絕對不僅僅隻是覆寫了原來的技術
Condition的優勢
精準的通知和操作線程
- 設定标志位
Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); // 通過對監控的Condition進行signal方法進行喚醒
- 設定多個螢幕Condition
通過while循環對标志位進行判斷。滿足條件,則對指定的螢幕執行singal操作,進而達到精确喚醒
5.8鎖現象
8鎖,就是關于鎖的八個問題
package cn.fyyice.juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.send();
},"A").start();
try {
System.out.println("我要等待3s");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone {
// synchronized鎖的對象是方法的調用者!
// 這裡兩個方法用的是同一個鎖,誰先拿到誰先執行!
public synchronized void send(){
try {
System.out.println("我是phone中的等待");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("發資訊");
}
public synchronized void call(){
System.out.println("打電話");
}
}
package cn.fyyice.juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(()->{
phone.send();
},"A2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone2.call();
},"B2").start();
}
}
class Phone2 {
// synchronized鎖的對象是方法的調用者!
// 這裡兩個方法用的是同一個鎖,誰先拿到誰先執行!
public synchronized void send(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"-->發資訊");
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"-->打電話");
}
public void hello(){
System.out.println("hello");
}
}
package cn.fyyice.juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) {
Phone3 phone = new Phone3();
new Thread(() -> {
phone.send();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
},"B").start();
}
}
class Phone3 {
// synchronized鎖的對象是方法的調用者!
// 這裡兩個方法用的是同一個鎖,誰先拿到誰先執行!
public static synchronized void send(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"-->發資訊");
}
public static synchronized void call(){
System.out.println(Thread.currentThread().getName()+"-->打電話");
}
}
package cn.fyyice.juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) {
Phone4 phone = new Phone4();
new Thread(() -> {
phone.send();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
},"B").start();
}
}
class Phone4 {
// synchronized鎖的對象是方法的調用者!
// 這裡兩個方法用的是同一個鎖,誰先拿到誰先執行!
public static synchronized void send(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"-->發資訊");
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"-->打電話");
}
}
順序:發資訊在前,打電話在後
- 标準情況下,兩個線程先列印 發資訊還是打電話?
- send()方法延遲4s,兩個線程先列印發資訊 還是打電話?
- 在加入普通方法hello後,兩個線程先列印發資訊 還是hello?
- 兩個對象,2個同步方法, 發資訊還是 打電話?
- 增加兩個靜态的同步方法,隻有一個對象,先列印 發短信還是打電話?
- 兩個對象,增加兩個靜态的同步方法,隻有一個對象,先列印 發短信還是打電話?
- static方法的send,同步方法call,隻有一個對象,先列印 發短信還是打電話?
- static方法的send,同步方法call,隻有兩個對象,先列印 發短信還是打電話?
回複:
問題1,2: 按照順序執行,原因是因為這兩個方法都是加了鎖的,而synchronized是修飾在方法上面的,是以鎖的是這個方法的調用對象,在這裡兩個方法都是同一個調用對象,是以誰先拿到鎖誰先執行。
問題3: 不是同步的方法,不考慮鎖的因素。這裡其實還是應該先執行發資訊,隻不過因為sleep了,而hello不需要去等待這個鎖,是以先執行的hello。
問題4:
問題5: 先列印發短信,再列印打電話。因為在加入static修飾後,變成了靜态方法,在類初始化的時候就會執行,相當于類一加載就有了!鎖的是Class對象(模闆)
問題6: 先列印發短信,再列印打電話。因為鎖的是Class對象,類模闆是同一個,是以誰先拿到鎖誰先執行,這裡就是順序執行。
問題7: 先列印打電話,再列印發資訊。因為鎖的對象不一樣,發資訊方法鎖的對象是Class對象,而打電話方法鎖的是類調用對象。鎖不一樣,不需要去等待鎖的釋放。
問題8: 同上。
小結:
- new this 具體的一個類對象
- static Class 唯一的一個模闆
- 在主線程與子線程中的代碼執行順序問題
- 當主線程代碼段在(一個或多個)子線程中間時,首先執行的還是主線程代碼段。因為剛開始時,隻有主線程在使用CPU的執行權,因為其他兩個線程還沒有被建立,這時主線程的代碼就自上而下的去執行。線上程都建立完成後,此時就存在多個線程了,而線程的執行需要搶到CPU資源去執行,是以在後續就是誰先搶到CPU資源誰先執行了。
6.集合類不安全
List不安全
package cn.fyyice.juc.unfairy;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
//java.util.ConcurrentModificationException 并發修改異常
public class ListTest1 {
public static void main(String[] args) {
/**
* 并發下,list是不安全的
* 解決方案:
* 1.List<String> list = Vector()<>集合類
* 2.List<String> list = Collections.synchronizedList(new ArrayList<>())
* 3.List<String> list = new CopyOnWriteArrayList<>();
* CopyOnWrite 寫入時複制 COW 優化政策
* 多個線程調用的時候,list讀取是固定的,但是在寫入的時候可能會被後面的線程給覆寫掉
* 在寫入的時候避免覆寫,造成資料問題
* 讀寫分離
*/
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,6));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
Set不安全
package cn.fyyice.juc.unfairy;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
public class SetList {
public static void main(String[] args) {
//不安全
Set<Object> set = new HashSet<>();
/**
* 解決方案:
* Collections.synchronizedSet(new HashSet<>());
* new CopyOnWriteArraySet<>()
*/
for (int i = 0; i < 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,6));
},String.valueOf(i)).start();
}
}
}
hashSet底層是什麼?
public HashSet() {
map = new HashMap<>();
}
// add set 本質就是 map key 是無法重複的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object(); //不變的值
Map 不安全
/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 預設容量
/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f; //加載因子
package cn.fyyice.juc.unfairy;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class MapTest {
public static void main(String[] args) {
//map是這樣用的?預設等價于什麼?
Map<String, String> map = new HashMap<>();
//加載因子,初始化容量
/**
* new ConcurrentHashMap<>();
* Collections.synchronizedMap(new HashMap<>());
*/
for (int i = 1; i < 20; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
7.Callable
@FunctionalInterfacepublic interface Callable<V>
傳回結果并可能引發異常的任務。 實作者定義一個沒有參數的單一方法,稱為
call
。
Callable
接口類似于
Runnable
,因為它們都是為其執行個體可能由另一個線程執行的類設計的。 然而,A
Runnable
不傳回結果,也不能抛出被檢查的異常。
該
Executors
類包含的實用方法,從其他普通形式轉換為
Callable
類。
- 有傳回值
- 可以跑出異常
- 方法不同,run()/call()
- 通過FutureTask來實作
- 一個
可以用來包裝FutureTask
或Callable
對象。因為Runnable
實作FutureTask
,一Runnable
可以送出執行FutureTask
。Executor
代碼實作
package cn.fyyice.juc.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable() {}).start();
// new Thread(new FutureTask<V>()).start());
MyThread thread = new MyThread();
//适配類
FutureTask<String> task = new FutureTask<>(thread);
new Thread(task,"A").start();
new Thread(task,"C").start();
new Thread(task,"D").start();
//這個get方法可能會出現阻塞!把他放到最後、或者使用異步通信來處理
System.out.println(task.get());
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("成功進入Callable方法");
return Thread.currentThread().getName()+"--->hello";
}
}
輸出結果:
成功進入Callable方法A--->helloProcess finished with exit code 0
注:
- 有緩存
- 結果可能需要等待,會阻塞。
8.常用輔助類
-
CountDownLatch(減法計數器)
**官方解釋:**CountDownLatch是一個同步的輔助類,允許一個或多個線程,等待其他一組線程完成操作,再繼續執行。
可以了解為倒計時鎖。就比如玩LOL比對/排位。一場對局的開始必須等待所有人加載100%才能開始,是以就算你是外星人,[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-PK65aFpO-1620782826137)(file:///C:\Users\ZZBJR-~1\AppData\Local\Temp\SGPicFaceTpBq\8916\3E24C524.png)]是小霸王,你也得等着。
demo 01
package cn.fyyice.juc.utils;
import java.util.concurrent.CountDownLatch;
//計數器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//總數是10,必須要執行任務的時候才使用
CountDownLatch count = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"溜了");
count.countDown();
},String.valueOf(i)).start();
}
//等待計數器歸0,再向下執行
count.await();
System.out.println("遛完了,關門");
}
}
demo 02
package cn.fyyice;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class CountDown {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(()->{
try{
int time = new Random().nextInt(4)+1;
System.out.println(Thread.currentThread().getName()+"--->start loading,need "+time+" s");
TimeUnit.SECONDS.sleep(time);
System.out.println(Thread.currentThread().getName()+"--->loading finally!");
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},"Player " + i).start();
}
try {
System.out.println("game start...");
countDownLatch.await();
System.out.println("game end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
原理:
countDown(); 數量減1
await() 等待計數器歸零,然後再向下執行
當計數器歸零時,==await()==方法就會被喚醒,繼續執行。
-
CyclicBarrier(加法計數器)
**官方解釋:**CyclicBarrier是一個同步的輔助類,允許一組線程互相之間等待,達到一個共同點,再繼續執行。
可以把CyclicBarrier看成是個障礙,所有的線程必須到齊後才能一起通過這個障礙。常用于多線程分組計算。比如一個大型的任務,常常需要配置設定好多子任務去執行,隻有當所有子任務都執行完成時候,才能執行主任務;通俗點可以了解為一個公會副本獎勵,需要公會所有人成員都完成至少線上一小時才能釋出獎勵。
// 計數,并執行一個線程
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
package cn.fyyice.juc.utils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("太下飯了,飽了");
});
for (int i = 1; i <= 7; i++) {
//因為i是臨時變量
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"-->第"+temp+"次吃飯");
try {
// 每執行一次await() 内部會執行一次+1
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
-
Semaphore
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
計數信号量。從概念上講,一個信号量維護一組允許。每個
acquire()
塊如果必要的許可證前,是可用的,然後把它。每個
release()
添加許可,潛在收購方釋放阻塞。然而,不使用實際允許的對象;
Semaphore
隻是計數的數量和相應的行為。
信号量通常是用來限制線程的數量比可以通路一些(實體或邏輯)資源。例如,這裡是一個類使用一個信号量來控制通路的項目池:
package cn.fyyice.juc.utils;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//限流
Semaphore semaphore = new Semaphore(4);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
//得到
semaphore.acquire();
int time = new Random().nextInt(4)+1;
System.out.println(Thread.currentThread().getName()+"得到了車位,并且要停"+time+"s");
TimeUnit.SECONDS.sleep(time);
System.out.println(Thread.currentThread().getName()+"開走了,空出一個車位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
原理:
semaphore.acquire(); 獲得,假設已經滿了,那麼就會等待,直到被釋放位置!
semaphore.release(); 釋放,會将目前的信号量釋放+1,然後喚醒等待的線程!
作用:多個共享資源互斥作用!并發限流,控制最大的線程數!
9.讀寫鎖
ReadWriteLock
所有已知實作類:
ReentrantReadWriteLock
public interface ReadWriteLock
一個
ReadWriteLock
保持一對一聯系
locks
,隻讀操作和書寫。的
read lock
可能被多個線程同時舉行的讀者,隻要沒有作家。
write lock
是獨家的。
讀的時候可以被多線程同時讀,寫的時候隻能由一個線程去寫。
- 讀寫鎖允許通路共享資料比用互斥鎖允許更多的并發級别。它利用的事實,而隻有一個單一的線程在一個時間(一個作家線程)可以修改共享資料,在許多情況下,任何數量的線程可以同時讀取資料(是以讀者線程)。在理論上,并發的讀寫鎖的使用允許的增加将導緻在一個互斥鎖的使用性能的改進。在實踐中,這種增加的并發性隻會完全實作一個多處理器,然後,隻有當共享資料的通路模式是合适的。
package cn.fyyice.juc.raw;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 又稱為 獨占鎖(寫鎖) 一次隻能被一個線程占有
* 共享鎖(讀鎖) 多個線程可以同時占有
* ReadWriteLock
* 讀-讀 可以共存
* 讀-寫 不能共存
* 寫-寫 不能共存
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
// MyCache myCache = new MyCache();
MyLockCache myCache = new MyLockCache();
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"", UUID.randomUUID().toString().substring(0,5));
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyLockCache{
private volatile Map<String,Object> map = new HashMap<>();
//讀寫鎖:更加細粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//在寫操作的時候希望同一時間,隻有一個線程進行寫操作,避免覆寫
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"寫入:"+value);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"寫入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//讀的時候加鎖是為了防止 在讀的時候發生寫操作
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"讀取:"+key);
Object obj = map.get(key);
System.out.println("----------"+Thread.currentThread().getName()+"讀取完成---------");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
/**
* 自定義緩存
*/
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"寫入:"+value);
map.put(key,value);
System.out.println("----------"+Thread.currentThread().getName()+"寫入完成---------");
}
public void get(String key){
System.out.println(Thread.currentThread().getName()+"讀取:"+key);
Object obj = map.get(key);
System.out.println("----------"+Thread.currentThread().getName()+"讀取完成---------");
}
}
10.阻塞隊列
- 阻塞隊列(FIFO)
- 寫入:如果隊列滿了,就必須阻塞等待(取出)
- 取:如果隊列是空的,必須阻塞等待生産(寫入)
InInterface BlockingQueue
-
- 參數類型
-元素舉行此集合中的類型E
-
All Superinterfaces:
Collection , Iterable , Queue
-
All Known Subinterfaces:
BlockingDeque , TransferQueue
-
所有已知實作類:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue
- 參數類型
- 多線程并發處理
- 線程池
四組API
方式 | 抛出異常 | 有傳回值 | 阻塞等待 | 逾時等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,) |
移除 | remove() | poll() | take() | poll(,) |
檢測隊首元素 | element() | peek() | - | - |
SynchronousQueue 同步隊列
沒有容量
進去一個元素,必須等待元素取出來過後,才能put進去
package cn.fyyice.juc.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
// 同步隊列 和其他BlockingQueue不一樣,synchronousQueue不存儲元素
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"--->put1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"--->put2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"--->put3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"--->get1");
synchronousQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"--->get2");
synchronousQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"--->get3");
synchronousQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
11.線程池
線程池必會:三大方法、七大參數、四種拒絕政策
池化技術
程式的運作,本質:占用系統資源。我們需要做的是優化資源的使用!
常見:線程池、連接配接池、對象池
池化技術:事先準備好一些資源,有人要用就直接來這裡拿,用完過後回收。
線程池的好處:
- 降低資源的消耗
- 提高相應的速度
- 友善管理
線程複用、可以控制最大并發數、管理線程
三大方法
- Executors.newSingleThreadExecutor();
- Executors.newFixedThreadPool(線程數量);
- Executors.newCachedThreadPool();
package cn.fyyice.juc.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//Executor 工具類,3大方法
public class Demo1 {
public static void main(String[] args) {
//建立單個線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
//建立一個固定大小的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(6);
//建立一個可伸縮的線程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
//使用了線程池之後,使用線程池來建立線程
//始終隻有一個線程執行
// singleThreadExecutor.execute(()->{
// System.out.println(Thread.currentThread().getName()+"----> start");
// });
//固定線程數量執行
// fixedThreadPool.execute(()->{
// System.out.println(Thread.currentThread().getName()+"----> start");
// });
//根據cpu支援線程數量執行
cachedThreadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"----> start");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//線程池用完,程式結束,關閉線程池
//singleThreadExecutor.shutdown();
//fixedThreadPool.shutdown();
cachedThreadPool.shutdown();
}
}
}
七大參數
源碼分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//本質:開啟線程都是調用ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, // 核心線程池大小
int maximumPoolSize, // 最大核心線程池大小
long keepAliveTime, // 逾時了,沒有人調用就會有釋放
TimeUnit unit, //逾時機關
BlockingQueue<Runnable> workQueue, // 阻塞隊列
ThreadFactory threadFactory, // 線程工廠 建立線程的 ,一般不用動
RejectedExecutionHandler handler) { // 拒絕政策
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手動建立線程池
package cn.fyyice.juc.pool;
import java.util.concurrent.*;
public class Demo2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 6, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
System.out.println("目前CPU核數:"+Runtime.getRuntime().availableProcessors());
try {
//最大承載 maxImumPoolSize + capacity
for (int i = 1; i <= 8; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"---> deal");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
四種拒絕政策
- 如果超出了最大處理量(maxImumPoolSize + capacity),則不進行處理,抛出異常。
- 哪來的去哪裡。
- (放棄最舊的政策)隊列滿了,嘗試去和最早的競争。也不會抛出異常
- 隊列滿了,直接丢掉任務,不會抛出異常。(解決不了問題,就解決提出問題的人)
小結
最大線程倒地如何定義?(調優)
- CPU密集型
- 看電腦組態,幾核就是定義幾,可以保持CPU型效率最高
- IO 密集
- 判斷你的程式中,十分耗IO的線程,然後設定大于這個數的線程
12.四大函數式接口
作用:簡化底層程式設計模型
- lambda表達式
- 鍊式程式設計
- 函數式接口
- Stream流式計算
函數式接口
隻有隻一個方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//簡化程式設計模型,在新版本的架構底層中大量應用
//foreach(消費者類型的函數式接口)
四大函數式接口
- Consumer
- Function
- Predicate
- Supplier
代碼測試
1.Function
package cn.fyyice.juc.function;
import java.util.function.Function;
public class FunctionDemo {
public static void main(String[] args) {
// 可用作工具類
// Function<String, String> f = new Function<String, String>() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
Function<String, String> f = (str)-> {return str;};
System.out.println(f.apply("hello world"));
}
}
2.Predicate
package cn.fyyice.juc.function;
import java.util.function.Predicate;
/**
* 斷定性接口,傳回值隻能是boolean
*/
public class PredicateDemo {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<String>() {
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate<String> p = str->{return str.isEmpty();};
System.out.println(p.test("123"));
}
}
3.Consumer
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
package cn.fyyice.juc.function;
import java.util.function.Consumer;
/**
* 消費性接口:隻有輸入,沒有傳回值
*/
public class ConsumerDemo {
public static void main(String[] args) {
Consumer<String> consumer = str->{
System.out.println(str);
};
consumer.accept("hello");
}
}
4.Supplier
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
package cn.fyyice.juc.function;
import java.util.function.Supplier;
/**
* 供給型接口:沒有參數,隻有傳回值
*/
public class SupplierDemo {
public static void main(String[] args) {
Supplier supplier = ()->{return 1024;};
System.out.println(supplier.get());
}
}
13.Stream流式計算
什麼是Stream流式計算
大資料:存儲 + 計算
集合、MySQL本質就是存儲東西的;計算都是交給流來操作的!
Interface Stream
- 參數類型
T
-流中的元素的類型
All Superinterfaces:
AutoCloseable, BaseStream<t, Stream < T > >
package cn.fyyice.juc.stream;
import java.util.Arrays;
import java.util.List;
public class StreamTest {
public static void main(String[] args) {
/**
* 從以下使用者資料中篩選
* 1.id為偶數
* 2.年齡大于等于20
* 3.姓名轉為大寫
* 4.輸出第一條使用者記錄
*/
User user1 = new User(1,18,"a");
User user2 = new User(2,29,"b");
User user3 = new User(3,20,"c");
User user4 = new User(4,19,"d");
User user5 = new User(5,23,"e");
List<User> userList = Arrays.asList(user1,user2,user3,user4,user5);
userList.stream()
.filter(user -> {return user.getId() %2==0;})
.filter(user -> {return user.getAge() >= 20;})
.map((user ->{return user.getUsername().toUpperCase();}))
.limit(1)
.forEach(System.out::println);
}
}
14.ForkJoin
什麼是ForkJoin
ForkJoin在JDK1.7,并行執行任務!提高效率,大資料量
大資料:Map Reduce(把大任務拆分成小任務)
FockJoin特點:工作竊取
這個裡面維護的都是雙端隊列
假設 2個線程A和B。
A線程有4個任務,目前執行到第2個任務,B線程也有4個任務,目前已執行完。這時候B線程不會一直等待A線程執行完,而是會轉過去執行A線程未執行的任務,這就是工作竊取。
FockJoin 測試
執行
Class ForkJoinTask
- java.lang.Object
- java.util.concurrent.ForkJoinTask
-
All Implemented Interfaces:
Serializable, Future
已知直接子類:
CountedCompleter, RecursiveAction, RecursiveTask
RecursiveAction:遞歸事件(沒有傳回值)
RecursiveTask:遞歸任務(有傳回值)
ForkJoinDemo
package cn.fyyice.juc.fockjoin;
import java.util.concurrent.RecursiveTask;
/**
* 求和計算
* FockJoin
* 1. 通過ForkJoinPool來執行
* 2. 計算任務:execute(ForkJoinTask<?> task)
* 3. 計算類內建類 RecursiveTask<V>
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start = 1L;
private Long end = 10_0000_0000L;
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end-start) < temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else {
long middle = (start+end)/2;
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
//拆分任務,把任務壓入線程隊列
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
//拆分任務,把任務壓入線程隊列
task2.fork();
return task1.join()+task2.join();
}
}
}
計算求和效率對比
package cn.fyyice.juc.fockjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
private final static Long start = 1L;
private final static Long end = 10_0000_0000L;
public static void main(String[] args) {
test1();
System.out.println("----------------------------------------------------");
test2();
System.out.println("----------------------------------------------------");
test3();
}
public static void test1(){
long startTime = System.currentTimeMillis();
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
long endTime = System.currentTimeMillis();
System.out.println("普通累加求和結果:"+sum+"\n求和時間:"+(endTime-startTime));
}
public static void test2(){
try {
long startTime = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
// forkJoinPool.execute(new ForkJoinDemo(start,end)); 執行任務沒有結果,是以這裡使用sumbit
ForkJoinTask<Long> result = forkJoinPool.submit(new ForkJoinDemo(start, end));
Long sum = result.get();
long endTime = System.currentTimeMillis();
System.out.println("ForkJoin求和結果:"+sum+"\n求和時間:"+(endTime-startTime));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//Stream 并行流
public static void test3(){
long startTime = System.currentTimeMillis();
//range() rangeClosed(]
long result = LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum);
long endTime = System.currentTimeMillis();
System.out.println("Stream 并行流求和結果:"+result+"\n求和時間:"+(endTime-startTime));
}
}
運作結果
普通累加求和結果:500000000500000000
求和時間:6083
----------------------------------------------------
ForkJoin求和結果:500000000500000000
求和時間:3876
----------------------------------------------------
Stream 并行流求和結果:500000000500000000
求和時間:243
Process finished with exit code 0
15.異步回調
Future設計初衷
對将來的某個時間的結果進行模組化
這一節不是很懂,等我了解了再來寫吧。
16.JMM
談談你對Volatile的了解
Volatile是Java虛拟機提供的輕量級的同步機制(和Synchronized差不多,但是沒有他那麼強)
- 保證可見性
- 不保證原子性
- 禁止指令重排
什麼是JMM
JMM: Java記憶體模型,不存在的東西。是一個概念、約定
關于JMM的一些同步的約定:
- 線程解鎖前,必須把共享變量立刻刷回主存。
- 線程加鎖前,必須讀取主存中的最新值到自己的工作記憶體中。
- 加鎖和解鎖是同一把鎖。
線程工作記憶體、主記憶體
記憶體互動操作
記憶體互動操作有8種,虛拟機實作必須保證每一個操作都是原子的,不可在分的(對于double和long類型的變量來說,load、store、read和write操作在某些平台上允許例外)
-
- lock (鎖定):作用于主記憶體的變量,把一個變量辨別為線程獨占狀态
- unlock (解鎖):作用于主記憶體的變量,它把一個處于鎖定狀态的變量釋放出來,釋放後的變量才可以被其他線程鎖定
- read (讀取):作用于主記憶體變量,它把一個變量的值從主記憶體傳輸到線程的工作記憶體中,以便随後的load動作使用
- load (載入):作用于工作記憶體的變量,它把read操作從主存中變量放入工作記憶體中
- use (使用):作用于工作記憶體中的變量,它把工作記憶體中的變量傳輸給執行引擎,每當虛拟機遇到一個需要使用到變量的值,就會使用到這個指令
- assign (指派):作用于工作記憶體中的變量,它把一個從執行引擎中接受到的值放入工作記憶體的變量副本中
- store (存儲):作用于主記憶體中的變量,它把一個從工作記憶體中一個變量的值傳送到主記憶體中,以便後續的write使用
- write (寫入):作用于主記憶體中的變量,它把store操作從工作記憶體中得到的變量的值放入主記憶體的變量中
JMM對這八種指令的使用,制定了如下規則:
-
- 不允許read和load、store和write操作之一單獨出現。即使用了read必須load,使用了store必須write
- 不允許線程丢棄他最近的assign操作,即工作變量的資料改變了之後,必須告知主存
- 不允許一個線程将沒有assign的資料從工作記憶體同步回主記憶體
- 一個新的變量必須在主記憶體中誕生,不允許工作記憶體直接使用一個未被初始化的變量。就是怼變量實施use、store操作之前,必須經過assign和load操作
- 一個變量同一時間隻有一個線程能對其進行lock。多次lock後,必須執行相同次數的unlock才能解鎖
- 如果對一個變量進行lock操作,會清空所有工作記憶體中此變量的值,在執行引擎使用這個變量前,必須重新load或assign操作初始化變量的值
- 如果一個變量沒有被lock,就不能對其進行unlock操作。也不能unlock一個被其他線程鎖住的變量
- 對一個變量進行unlock操作之前,必須把此變量同步回主記憶體
JMM對這八種操作規則和對volatile的一些特殊規則就能确定哪裡操作是線程安全,哪些操作是線程不安全的了。但是這些規則實在複雜,很難在實踐中直接分析。是以一般我們也不會通過上述規則進行分析。更多的時候,使用java的happen-before規則來進行分析。
問題:程式不知道主記憶體的值已經被修改過了
17.Volatile
保證可見性
package cn.fyyice.juc.jmm;
import java.util.concurrent.TimeUnit;
public class JmmDemo {
// 不加volatile 程式會出現死循環,加了 volatile 可以保證可見性
private static int number = 0;
public static void main(String[] args) {
new Thread(()->{
while (number == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
number = 1;
System.out.println(number);
}
}
不保證原子性
原子性:不可分割
線程A在執行任務的時候,不能被打擾的,也不能被分割。要麼同時成功,要麼同時失敗。
package cn.fyyice.juc.jmm;
//volatile不保證原子性
public class VDemo {
private static volatile int number = 0;
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
},String.valueOf(i)).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+":"+number);
}
public static void add (){
//不是一個原子性操作
/**
* temp = number
* number = number +1;
* return temp
*/
number++;
}
}
private static volatile AtomicInteger number = new AtomicInteger();
public static void add (){
// number++;
//AtomicInteger +1操作
number.getAndIncrement();
}
問題:不使用synchronized、Lock鎖,如何解決原子性操作?
答:使用原子類操作
原子類為何這麼牛逼
這些類的底層都是直接和作業系統記憶體挂鈎。在記憶體中修改值!Unsafe類是一個很特殊的存在。
指令重排
概念:你寫的程式,計算機并不是按照你寫的那樣執行的。
源代碼 → 編譯器優化的重拍 → 指令并行也可能重排 → 記憶體系統也會重排 → 執行
系統處理器在指令重排的時候,會考慮資料之間的依賴性
volatile可以避免指令重排
在加入volatile後,系統會自動生成一個記憶體屏障。CPU指令。作用:
- 保證特定的操作的執行順序。
- 可以保證某些變量的記憶體可見性(利用這些特性,volatile實作了可見性)。
18.徹底玩轉單例模式
最重要的一點:構造器私有化
餓漢模式
這種方法可能會浪費大量空間
package cn.fyyice.juc.single;
public class Hungry {
/**
* 可能會浪費空間,在類加載的時候就把所有東西給建立出來了
*/
private Hungry(){}
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
DCL懶漢模式
需要考慮到多線程并發下的安全問題,還有指令重排的問題
package cn.fyyice.juc.single;
//懶漢單例
public class Lazy {
private Lazy(){
System.out.println(Thread.currentThread().getName()+"-> Hello");
}
private static volatile Lazy lazy;
public static Lazy getInstance() {
//由于在多線程并發下不安全,加入雙重檢測鎖模式 DCL懶漢式
if (lazy == null){
synchronized (Lazy.class){
if (lazy == null){
/**
* 不是一個原子性操作
* 1.配置設定記憶體空間
* 2.執行構造方法
* 3.把這個對象指向這個空間
*
* 123
* 132 A
* B //此時Lazy還沒有完成構造,導緻 B 線程在判斷的時候,lazy != null,直接走return,空間是一片虛無,是以需要加入volatile
*
*/
lazy = new Lazy();
}
}
}
return lazy;
}
public static void main(String[] args) {
for (int i = 1; i <= 10; i++) {
new Thread(()->{
Lazy.getInstance();
}).start();
}
}
}
靜态内部類
package cn.fyyice.juc.single;
/**
* 靜态内部類
*/
public class Holder {
private Holder(){}
public static Holder getInstance(){
return innerClass.holder;
}
public static class innerClass{
private static final Holder holder = new Holder();
}
}
單例不安全,反射
枚舉類登場
package cn.fyyice.juc.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* enum : 本身也是一個class類,jkd1.5
*
*/
public enum EnumSingle {
INSTANCE;
public static EnumSingle getInstance(){
return INSTANCE;
}
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle single = EnumSingle.getInstance();
//在這裡idea和源碼欺騙了我們,說的是無參構造,但事實上通過jad反編譯過後,發現構造函數是一個String,int。狂神牛逼
Constructor<EnumSingle> constructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
constructor.setAccessible(true);
EnumSingle enumSingle = constructor.newInstance();
System.out.println(single == enumSingle);
}
}
反射嘗試暴力破解
Exception in thread "main" java.lang.IllegalArgumentException: Cannot reflectively create enum objects
at java.lang.reflect.Constructor.newInstance(Constructor.java:417)
at cn.fyyice.juc.single.EnumSingle.main(EnumSingle.java:22)
Process finished with exit code 1
枚舉類不能被破解,實屬牛批
19.深入了解CAS
什麼是CAS
//compareAndSet 比較并交換
//如果我期望的值達到了,那麼就更新,否則不更新
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
package cn.fyyice.juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
/**
* public final boolean compareAndSet(int expect, int update) {
* return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
* }
* 如果我期望的值達到了,那麼就更新,否則不更新
*/
atomicInteger.compareAndSet(2020,2021);
atomicInteger.compareAndSet(2020,2022);
System.out.println("result:"+atomicInteger.get());
}
}
unsafe類
Java無法操作記憶體,但是 Java可以調用C++(native)操作記憶體。這個Unsafe相當于Java的後門,通過這個類來操作記憶體。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//擷取位址偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
記憶體操作,效率很高
// var1 -->getAndIncrement var2 -->valueOffset var4 --> 1
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
//自旋鎖
do {
//擷取記憶體位址中的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
CAS:比較目前工作記憶體中的值,如果這個值是期望的,那麼就執行。如果不是,就一直循環
缺點:
- 由于底層是自旋鎖,循環會耗時
- 一次性隻能保證一個共享變量的原子性
- 存在ABA問題
CAS:ABA問題(狸貓換太子)
業務場景:庫存業務
兩個并發同時讀取資料庫查詢庫存。此時庫存count = 10
A線程、B線程同時讀取到庫存都是為10,A線程購買了2個商品,B線程購買了3個商品。由于調用了CAS,假設A線程先完成,則庫存count 為 10→ 8 → 7,但理想結果應該為5,這裡就出現了資料不一緻。(最後一次設定庫存會覆寫和掩蓋前一次并發操作)
20.原子引用
帶版本号的操作(類似樂觀鎖操作)
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
Integer使用了對象緩存機制,預設範圍-128~127,推薦侍弄靜态工廠方法valueOf擷取對象執行個體,而不是new,因為valueOf使用緩存,而new一定會建立新的對象配置設定新的記憶體空間。
package cn.fyyice.juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
//如果泛型是一個包裝類,要注意泛型的引用問題
//在正常的業務操作中,這裡比較的都是一個個的對象
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
public static void main(String[] args) {
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("Thread 1-1 ----"+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 2, stamp, stamp + 1));
System.out.println("Thread 1-2 ----"+atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(2, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("Thread 1-3 ----"+atomicStampedReference.getStamp());
},"Thread 1").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("Thread 2-1 ----"+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2 修改:"+atomicStampedReference.compareAndSet(1, 6, stamp, stamp + 1));
System.out.println("Thread 2-2 ----"+atomicStampedReference.getStamp());
},"Thread 2").start();
}
}
21.各種鎖的了解
公平鎖、非公平鎖
公平鎖:非常公平,線程順序執行,先來後到,不能插隊
非公平鎖:非常不公平,可以插隊(預設都是非公平鎖)
通過構造器來建立
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入鎖
又稱為遞歸鎖。
拿到了外面的鎖之後,就可以拿到裡面的鎖(自動獲得的)
Synnchronized鎖
package cn.fyyice.juc.lock;
public class Demo1 {
public static void main(String[] args) {
Store store = new Store();
new Thread(()->{
store.pay();
store.sale();
},"A").start();
new Thread(()->{
store.pay();
store.sale();
},"B").start();
}
}
// synchronized對普通方法加鎖,鎖的是這個方法的調用者,本質上是同一把鎖
class Store{
public synchronized void pay(){
System.out.println(Thread.currentThread().getName()+"-->Pay");
}
public synchronized void sale(){
System.out.println(Thread.currentThread().getName()+"-->Sale");
}
}
Lock鎖
package cn.fyyice.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo2 {
public static void main(String[] args) {
Store2 store = new Store2();
new Thread(()->{
store.pay();
store.sale();
},"A").start();
new Thread(()->{
store.pay();
store.sale();
},"B").start();
}
}
//lock鎖。與synchronized展現的效果是一樣的,但是本質上是有差別的。
//在這裡lock鎖有兩把鎖,在使用的時候必須配對,否則會出現死鎖的情況
class Store2{
private Lock lock = new ReentrantLock();
public void pay(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"-->Pay");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void sale(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"-->Sale");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
自旋鎖
SpinLock(核心CAS)
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
自己寫一個自旋鎖
package cn.fyyice.juc.lock;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加鎖
public void lock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"--->myLock");
do{
}while (!atomicReference.compareAndSet(null,thread));
}
//解鎖
public void unlock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"--->myUnLock");
atomicReference.compareAndSet(thread,null);
}
}
測試
A加鎖過後,B加鎖,但是A先拿到了,進入try子產品中睡覺,此時此刻,B線程在不斷的自旋;當A線程睡完過後執行解鎖,釋放了鎖後B線程拿到鎖,解鎖。
package cn.fyyice.juc.lock;
import java.util.concurrent.TimeUnit;
public class SpinLockTest {
public static void main(String[] args) {
SpinLock lock = new SpinLock();
new Thread(()->{
lock.lock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"B").start();
}
}
結果
A--->myLock
B--->myLock
A--->myUnLock
B--->myUnLock
Process finished with exit code 0
死鎖
多個線程互相争搶資源。
package cn.fyyice.juc.lock;
import java.util.concurrent.TimeUnit;
public class DeadLock {
public static void main(String[] args) {
new Thread(new MyLock("lockA","lockB"),"T1").start();
new Thread(new MyLock("lockB","lockA"),"T2").start();
}
}
class MyLock implements Runnable{
private String lockA;
private String lockB;
MyLock(String lockA,String lockB){
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName() + "-->lock:" + lockA + "===get-->"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName() + "-->lock:" + lockB + "===get-->"+lockA);
}
}
}
}
解決問題
排查死鎖解決問題:
- 使用
定位程序号jps-l
Microsoft Windows [版本 10.0.19042.867] (c) 2020 Microsoft Corporation. 保留所有權利。 D:\project\juc>jps -l 11428 cn.fyyice.juc.lock.DeadLock 1208 cn.fyyice.juc.lock.Demo2 7596 7980 sun.tools.jps.Jps 9260 org.jetbrains.jps.cmdline.Launcher
- 使用
找到死鎖問題(看堆棧資訊)jstack 程序号
Found one Java-level deadlock:============================="T2": waiting to lock monitor 0x0000017fd1a26d68 (object 0x000000076b6a6440, a java.lang.String), which is held by "T1""T1": waiting to lock monitor 0x0000017fd1a296a8 (object 0x000000076b6a6478, a java.lang.String), which is held by "T2"Java stack information for the threads listed above:==================================================="T2": at cn.fyyice.juc.lock.MyLock.run(DeadLock.java:31) - waiting to lock <0x000000076b6a6440> (a java.lang.String) - locked <0x000000076b6a6478> (a java.lang.String) at java.lang.Thread.run(Thread.java:748)"T1": at cn.fyyice.juc.lock.MyLock.run(DeadLock.java:31) - waiting to lock <0x000000076b6a6478> (a java.lang.String) - locked <0x000000076b6a6440> (a java.lang.String) at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
總結
視訊學習大概花了5天時間,有些地方仍然有些問題,還需要多多了解呀!