一、集合安全問題
1.1 ArrayList
- 空的集合初始值為10
- object類型的數組
- 擴容Arrays.copyOf 原始大小的一倍
- 線程不安全
1.1.1 不安全
java.util.concurrentModificationException
- Vector加了鎖保證了資料一緻性,但是并發性急劇下降,是以很少用!
- ArrayList犧牲了線程安全進而保證并發性
1.1.2 如何解決ArrayList線程不安全問題
1.new Vector<>()
2.Collection與Collections
- Collection為集合類的父接口
- Collections為輔助類來解決ArrayList線程不安全問題
List<String> list = Collections.synchronizedList(new ArrayList<>());
3.CopyOnWriteArrayList<>()類
寫時複制 讀寫分離的思想
List<String> list = new CopyOnWriteArrayList<>();
private tranisent volatile []…
1.2 HashSet
底層:HashMap 初始值16 負載因子0.75
線程不安全解決的問題與上面雷同
解決辦法一Collections.synchronizedSet():
解決辦法二CopyOnWriteArraySet<>():
1.3 HashMap
示範錯誤 java.util.concurrentModificationException
解決辦法一:
Map<String,String> map = new ConcurrentHashMap<>();
解決辦法二:
Collections.synchronizedMap();
二、JAVA鎖機制
公平鎖:多個新線程按照申請順序來擷取鎖,先到先得 非
非公平鎖:多個線程并不是按照申請的順序,有可能造成優先級反轉或者饑餓現象。
2.1 可重入鎖【遞歸鎖】
ReentrantLock
線程可以進入任何一個它已經擁有的鎖同步着的代碼塊
通過構造函數制定該鎖是否為公平鎖,預設是非公平鎖。
非公平鎖的優勢在于吞吐量比較大
對于Synchronized而言,也是一種非公平鎖
作用:避免死鎖
2.2 自旋鎖
是指嘗試擷取鎖的線程不會阻塞,而是采用循環的方式來嘗試乎擷取鎖,這樣的好處就是減少線程上下文的切換消耗,缺點是循環會消耗CPU.
do
while()
CAS
期望值與工作區間的值比較
2.2.1 自旋鎖代碼
public class SpinLock{
//原子引用線程
AtomicReference<Thread> ar = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"come in");
while(!ar.compareAndSet(null,thread)){
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
ar.compareAndSet(thread,null);
System.out.println(Thread.currentThread().getName()+"unlock");
}
public static void main(String[] args){
SpinLock sl = new SpinLock();
new Thread(()-> {
// 加鎖
sl.myLock();
// 暫停一會
try{TimeUnit.SECONDS.sleep(5);}catch(...){}
// 解鎖
sl.myUnLock();
},"AA").start();
try{TimeUnit.SECONDS.sleep(1);}catch(...){}
new Thread(()-> {
// 加鎖
sl.myLock();
try{TimeUnit.SECONDS.sleep(1);}catch(...){}
// 解鎖
sl.myUnLock();
},"BB").start();
}
}
2.3 獨占鎖(寫鎖)/共享鎖(讀鎖)
獨占鎖:指該鎖一次隻能被一個線程所持有的。
ReentrantLock Synchronized 都是獨占鎖
共享鎖:該鎖可以被多個線程所持有
ReentrantReadWriteLock為共享鎖,寫鎖為獨占鎖
讀鎖的共享鎖可保證并發讀是非常高效的,讀寫,寫讀,寫寫的過程都是互斥的。
一個線程去寫【原子+獨占】絕對不可以被阻斷,多個線程可以讀
【問題描述如下:】
class MyCache{//緩存資源類
//volatile 可見性 不保證原子性 禁止指令重排
private volatile Map<String,Object> map = new HashMap<>();
//解決問題 原子性
//private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock ();
public void put(String key,Object value){
//加寫鎖
rwLock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"正在寫入"+key);
try{Time.MILLSECONDS.sleep(300);}catch(){};
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"寫入完成");
}catch(Exception e){
}finally{
rwLock.writeLock().unlock();
}
}
public void get(String key,Object value){
//加讀鎖
rwLock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"正在讀取"+key);
try{Time.MILLSECONDS.sleep(300);}catch(){};
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+"讀取完成"+result);
}catch(Exception e){
}finally{
rwLock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo{
public static void main(String[] args){
MyCache myCache = new MyCache();
//寫
for(int i = 1;i <= 5;i++){
new Thread(() -> {
final int tempInt = i;
myCache.put(tempInt+"",tempInt+"");
},String.valueOf(i)).start();
}
//讀
for(int i = 1;i <= 5;i++){
new Thread(() -> {
final int tempInt = i;
myCache.get(tempInt+"");
},String.valueOf(i)).start();
}
}
}
這樣既保證了資料一緻性,有保證了并發性,讀寫分離。
Synchronized太重量。
三、CountDownLatch【線程做減法倒計時】
3.1 離開教室鎖門問題産生!
public class CountDownLatchDemo{
public static void main(String[] args){
for(int i = 1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"上完自習,離開教室");
},String.valueOf(i)).start();
}
System.out,println(Thread.currentThread().getName()+"班長最後關門走人");
}
}
3.2 解決問題:CountDownLatch
public class CountDownLatchDemo{
public static void main(String[] args){
// 計數
CountDownLatch countDownLatch = new CountDownLatch(6);
for(int i = 1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"上完自習,離開教室");
countDownLatch.countDown();//減1操作
},String.valueOf(i)).start();
}
// 主線程等待
countDownLatch.await();
System.out,println(Thread.currentThread().getName()+"班長鎖門,最後關門走人");
}
}
四、CyclicBarrier【加法】
加法 與CountDownLatch【減法】相反
加到一定的數值然後做事
最後一個線程到達屏障時候才會進行
public class CountDownLatchDemo{
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{System.out.println("***召喚神龍***");});
for(int i = 1;i<=7;i++){
final int tempInt = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集到第"+tempInt +"顆龍珠");
cyclicBarrier.await();
},String.valueOf(i)).start();
}
}
}
五、Semaphore【信号量】
多個共享資源的互斥使用
并發線程數量的控制
public class CountDownLatchDemo{
public static void main(String[] args){
// 模拟3個停車位
Semaphore semaphore = new Semaphore(3);
for(int i = 1; i <= 6;i++){
new Thread(()->{
try{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"搶到車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"停車3S後,離開車位");
}catch(...){
}finally{
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
六、阻塞隊列【MQ核心】
6.1 阻塞隊列ArrayBlockingQueue<>()
報異常
沒有異常,直接傳回布爾類型false
一直阻塞,取出用take方法
過時不候
6.2 阻塞隊列 SynchronousQueue<>()
不消費,不會繼續插下一個,會卡在 put(1)
6.3 生産者-消費者案例【新方式】
案例:一個初始值為0的變量,兩個線程交替操作,一個加一,一個減一,來5輪
class SahreData{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// 加法
public void increament(){
lock.lock{
try{
//1.判斷
while(number != 0){
condition.await();
}
//2.幹活
number++;
System.out.println(Thread.currentThread().getName()+ number);
//3.通知喚醒
condition.signalAll();
}catch(...){
}finally{
lock.unlock();
}
}
}
// 減法
public void decreament(){
lock.lock{
try{
//1.判斷
while(number == 0){
condition.await();
}
//2.幹活
number--;
System.out.println(Thread.currentThread().getName()+ number);
//3.通知喚醒
condition.signalAll();
}catch(...){
}finally{
lock.unlock();
}
}
}
}
public class ProdConsumer{
public static void main(String[] args){
ShareData shareData = new ShareData();
new Thread(()->{
for(int i = 1;i<=5;i++){
shareData.increament();
}
},"A").start();
new Thread(()->{
for(int i = 1;i<=5;i++){
shareData.decreament();
}
},"B").start();
}
}
6.4 虛假喚醒
防止虛假喚醒 一定要用while 不要用if
6.3 中的代碼換成If,多添加幾個線程就會出現問題!
會出現結果 1 2 -1 0等等,并沒有控制住結果。
七、Synchrinized與Lock的差別
1.前者JVM層面,是Java的關鍵字,後者是API層面,java5以後的出現的。
2. synchronized不可以中斷
3. Reentranrlock可以中斷,設定逾時,或者中斷方法
4.synchronized預設非公平鎖
5.Reentranrlock可以分組喚醒,精确喚醒
6.synchronized要麼随即喚醒一個,要麼喚醒全部notify() notifyAll()
實作案例:
多線程之間要按照順序調用,實作A-B-C三個線程啟動:
AA列印5次,BB列印10次,CC列印15次
然後
AA列印5次,BB列印10次,CC列印15次
…
循環10次
7.1 列印案例【新的Lock版本】
class ShareResource{
private int number = 1;//A1 B2 C3
private Lock lock = new ReentrantLock();
private Condition c1 = new lock.newCondition();
private Condition c2 = new lock.newCondition();
private Condition c3 = new lock.newCondition();
public void prints5(){
lock.lock();
try{
//1.判斷
while(number != 1){
c1.await();
}
//2.幹活
for(int i = 1;i<=5;i++){
System.out.println(Thread.currentThread().getName()+"\t"+number);
}
//3.通知2
number = 2;
c2.signal();
}catch(){}finally{
lock.unlock();
}
}
public void prints10(){
lock.lock();
try{
//1.判斷
while(number != 2){
c2.await();
}
//2.幹活
for(int i = 1;i<=10;i++){
System.out.println(Thread.currentThread().getName()+"\t"+number);
}
//3.通知2
number = 3;
c3.signal();
}catch(){}finally{
lock.unlock();
}
}
public void prints15(){
lock.lock();
try{
//1.判斷
while(number != 3){
c3.await();
}
//2.幹活
for(int i = 1;i<=10;i++){
System.out.println(Thread.currentThread().getName()+"\t"+number);
}
//3.通知2
number = 1;
c1.signal();
}catch(){}finally{
lock.unlock();
}
}
}
public class SyncAndReentrantLockDemo{
ShareSource shareSource = new ShareSource();
new Thread(()->{
for(int i=0;i<=10;i++){
shareSource.prints5();
}
},"A").start();
new Thread(()->{
for(int i=0;i<=10;i++){
shareSource.prints10();
}
},"B").start();
new Thread(()->{
for(int i=0;i<=10;i++){
shareSource.prints15();
}
},"C").start();
}
7.2 生産消費案例【阻塞隊列版本】高并發
class MyResource{
private volatile boolean FLAG = true;//預設開啟,生産+消費
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd(){
String data = null;
boolean retValue;
while(FLAG){
data = atomInteger.incrementAndGet()+"";
retValue = blockingQueue.offer(data,2L,TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName()+"\t 插入隊列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"\t 插入隊列"+data+"失敗");
}
TimeUint.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t 生産叫停,false 生産結束");
}
public void MyConsumer(){
String result = null;
while(FLAG){
result = blockingQueue.poll(2L,TimeUnit.SECONDS);
if(null == result || result.equalsIngoreCase("")){
FLAG = false;
System.out.println(Thread.currentThread().getName()+"\t 超過2S沒有消費,消費退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t 消費隊列"+result+"成功");
}
}
public void stop(){
this.FLAG = fasle;
}
}
public class ProdConsumer_BlockQueueDemo{
public static void main(String[] args){
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+ "\t 生産線成啟動");
myResource.myProd();
},"Prod").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+ "\t 消費線成啟動");
myResource.myConsumer();
},"Consumer").start();
//暫停一會
try{TimeUnit.SECONDS.sleep(5);catchh(...){}}
System.out.println("5S結束,大老闆叫停,活動結束");
myResource.stop();
}
}
八、線程池
8.1 Runnable與 Callable
class MyThread implements Runnable{
@Override
public void run(){
}
}
class MyThread2 implements Callable<Integer>{
@Override
public Integer call() throws Exception{
System.out.println("進來了");
return 1024;
}
}
public class CallableDemo{
public static void main(String[] args){
FuterTask<Integer> futerTask = new FuterTask<>(new MyThread);
Thread t1 = new Thread(futerTask,"AAA");
t1.start();
int result01 = 100;
int result02 = futerTask.get();
System.out.println(result01+result02);
}
}
結果:1124
8.2 線程池的優勢
Executor頂級接口和工具包Executors
底層:阻塞隊列
1.降低資源消耗
2.提高響應速度
3.提高線程的可管理性
// 拓展工具類
// Array Arrays
// Collection Collections
// Executor Executors
8.3 線程池實作的方式【3種核心】
工作中你用那個??? 哪個都不用的
阿裡巴巴開發手冊:不允許使用Executors區建立,而是使用ThreadPoolExecutor的方式,這樣的處理方式更加明确線程池的運作規則,避免資源耗盡。
但是也要學習!!!!!!如下:
第一種【重要】:
public class MyThreadPoolDemo{
public static void main(String[] args){
// 1個線程池 5個線程
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
for(int i = 1;i<=10;i++){//10個使用者
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"辦理業務");
});
}
}catch(...){
}finally{
thread.shutdown();
}
}
}
第二種:
public class MyThreadPoolDemo{
public static void main(String[] args){
// 1個線程池 1個線程
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try{
for(int i = 1;i<=10;i++){//10個使用者
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"辦理業務");
});
}
}catch(...){
}finally{
thread.shutdown();
}
}
}
第三種:
public class MyThreadPoolDemo{
public static void main(String[] args){
// 1個線程池 不定線程
ExecutorService threadPool = Executors.newCacgedThreadPool();
try{
for(int i = 1;i<=10;i++){//10個使用者
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"辦理業務");
});
}
}catch(...){
}finally{
thread.shutdown();
}
}
}
8.4 線程池7大參數
8.5 線程池拒絕政策
等待隊列已經滿了,再也塞不下新任務了
線程池中的max線程也達到了最大,無法繼續為新任務服務
8.6 手寫線程池【大廠工作核心–7大參數】
public class MyThreadPoolDemo{
public static void main(String[] args){
ExecutorService threadPool = new ThreadPoolExecutor(
2,//核心數
5,//最大線程數
1L,//活躍時間
TimeUint.SECONDS,//機關
new LinkedBlockingQueue<Runnable>(3),//阻塞隊列大小個數
Executors.defaultThreadFactory(),//線程工廠
new ThreadPoolExecutor.AbortPolicy());//拒絕政策,會抛異常
try{
for(int i = 1;i<=5;i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"辦理業務");
});
}
}catch(...){
}finally{
threadPool.shutdown();
}
}
}
8.7 如何合理配置線程的數量呢?
回答:
1.CPU密集型?
2.IO密集型?