阻塞隊列
-
什麼是阻塞隊列?
顧名思義,阻塞隊列實質是一個隊列,滿足FIFO特性。
當隊列滿的時候,還要往隊列裡面放元素,會發生阻塞。
當隊列空的時候,還要取元素出來,也會發生阻塞。
當在多線程通訊時,隊列發生阻塞的情況的時候,線程還會等待隊列變成非阻塞狀态的隊列就是阻塞隊列。
通俗的說,一個線程從隊列中取資料,發現隊列中的資料為空,這個時候該線程不報錯,會一直等待隊列非空。
- 阻塞隊列是Queue下的api
從零開始學架構(5)-juc(中)
阻塞隊列的四組api
方法 | 第一組會抛出異常 | 傳回特殊值,不抛出異常 | 延遲等待 | 一直等待 |
---|---|---|---|---|
插入 | add() | offer(e) | offer(e,time) | put() |
取出 | remove | poll() | poll(time) | take() |
檢查 | element() | peek() | - | - |
表格解釋:按照表格列觀察,有四清單示有四組api,每個插入操作和一個取出操作相對應。
第一組api,add和remove,是向阻塞隊列中插入和删除元素,如果發生阻塞情況的時候程式會抛出異常,element可以檢視隊列頭部元素。
第二組,offer()和poll(),指的是如果插入和删除元素,發生阻塞時,不會跑出異常,但是會傳回一個特殊的值,offer阻塞會傳回一個bool值。
第三組,offer(e,time) poll(time)是可以延遲若幹時間,再進行操作,延遲的時間為參數time,如果等待若幹時間操作還是發生阻塞,同樣會傳回特殊值。
第三組,put(),take(),發生阻塞時,會一直等待下去,直到不發生阻塞。
應用
如何利用上面api,寫一個同步隊列。這裡我們可以對容量為1的隊列,使用put和take操作。即插入一次值,必須等待該值出隊列和再進行插入。寫代碼2個線程模拟插入和删除。
public static void main(String[] args) {
ArrayBlockingQueue arrayBlockingQueue =new ArrayBlockingQueue(1);
new Thread(()->{
try {
arrayBlockingQueue.put("a");
System.out.println(Thread.currentThread().getName()+" "+"put:a");
arrayBlockingQueue.put("b");
System.out.println(Thread.currentThread().getName()+" "+"put:b");
arrayBlockingQueue.put("c");
System.out.println(Thread.currentThread().getName()+" "+"put:c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
線程池
我們應該對池化技術應該不陌生,有線程線,連接配接池,對象池等
池化技術的本質就是提高程式的使用率,降低我們的性能消耗。
線程池的三大方法,七大參數,四種拒絕政策
-
三大方法
三種建立線程池的方法:
- SingleThreadExecutor:建立一個線程的方法
- newFixedThreadPool:固定線程數量的方法
- newCachedThreadPool:可伸縮數量的方法
線程池的執行用execute方法,執行完需要關閉shutdown()
執行個體:
public static void main(String[] args) {
// ExecutorService executorService =Executors.newSingleThreadExecutor();
// ExecutorService executorService=Executors.newFixedThreadPool(3);
ExecutorService executorService =Executors.newCachedThreadPool();
try {
for (int i = 1; i <=30; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
但是在阿裡巴巴開發手冊中明确的說,禁止這樣來建立和調用線程,而是使用ThreadPoolExecutor方法來建立調用線程。
其實原因時上述三大方法其實都是底層源碼都是調用的 TheadPoolExecutor方法,為了讓程式員更好的了解原理,故禁止調用上層方法。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
threadPoolExecutor有七大參數
回過頭看三大方法其中的newSingleThead方法其實就是線程總量大小為1,同步隊列,線程池一旦有線程池為空就關閉。其他方法類似。
使用ThreadPoolExecutor的例子
public class ThreadPoolDemo2 {
public static void main(String[] args) {
//開啟一個初始大小為2,最大大小為5的線程池,阻塞隊列大小為3 ,等待超過3秒就會關閉,拒絕政策是主線程執行
ExecutorService executorService =new ThreadPoolExecutor(2, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 1; i <=30 ; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+" "+"ok");
});
}
} finally {
executorService.shutdown();
}
}
上述代碼形象化解釋:
一個銀行前台初始化有2個開放視窗,但最大視窗是5個,等候區(即阻塞隊列大小為3),如果後續,阻塞隊列滿了,就會繼續打開未開放的視窗。
四大拒絕政策
即當所有的線程池,阻塞隊列都滿的情況下,對新來的任務采用以下四種政策。
/**
* 1、ThreadPoolExecutor.AbortPolicy(); 抛出異常,丢棄任務
* 思路:看到效果一樣的東西,可以研究研究!
* 2、ThreadPoolExecutor.DiscardPolicy();不抛出異常,丢棄任務
* 3、ThreadPoolExecutor.DiscardOldestPolicy(); 嘗試擷取任務,不一定執行!
* 4、ThreadPoolExecutor.CallerRunsPolicy(); 哪來的去哪裡找對應的線程執行!
*/
最大線程池如何設定
要考慮具體的硬體設施。
cpu密集型的話,根據cpu處理器數量來定線程數。
io密集型,即操作非常大的io資源時,比價耗時,線程數就是常用的IO任務數
四個函數式接口
了解函數式接口可以簡化程式設計
記住一句話,所有的函數式接口都可以用lambda表達式來簡化。
- Function : 有一個輸入參數有一個輸出參數
- Consumer:有一個輸入參數,沒有輸出參數
- Supplier:沒有輸入參數,隻有輸出參數
- Predicate:有一個輸入參數,判斷是否正确!
/**
*函數式程式設計
* 四大函數式程式設計 function,consumer,supplier,predicate
*/
public class FunctionDemo01 {
public static void main(String[] args) {
//consumer沒有輸出 無法示範
//用lambda表達式 簡化function函數式接口
Function<String,Integer>function =(str)->{return str.length();};
System.out.println(function.apply("abc"));
//用lambda表達式 簡化consumer函數式接口
Supplier<String> supplier =()->{return "hello world";};
System.out.println(supplier.get());
//用lambda表達式 簡化predicate函數式接口
Predicate<String> predicate =(str)->{
return str.isEmpty();
};
System.out.println(predicate.test("abc"));
}
}
stream流式計算
流式計算是對Collections對象進行操作。
如例子中對List集合操作
其中filter,map都是函數式接口
import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class StreamDemo01 {
public static void main(String[] args) {
User user1 =new User(1,"a",23);
User user2 =new User(2,"b",24);
User user3 =new User(3,"c",25);
User user4 =new User(4,"d",26);
User user5 =new User(5,"e",27);
List<User>list = Arrays.asList(user1,user2,user3,user4,user5);
list.stream()
.filter(u->{return (u.getId()%2)==0;}).
filter(u->{return u.getAge()>25;}).
map(u->{return u.getName().toUpperCase();}).
sorted((u1,u2)->{return u2.compareTo(u1);}).
forEach(System.out::println);
}
}