天天看點

從零開始學架構(5)-juc(中)

阻塞隊列

  • 什麼是阻塞隊列?

    顧名思義,阻塞隊列實質是一個隊列,滿足FIFO特性。

    當隊列滿的時候,還要往隊列裡面放元素,會發生阻塞。

    當隊列空的時候,還要取元素出來,也會發生阻塞。

    當在多線程通訊時,隊列發生阻塞的情況的時候,線程還會等待隊列變成非阻塞狀态的隊列就是阻塞隊列。

    通俗的說,一個線程從隊列中取資料,發現隊列中的資料為空,這個時候該線程不報錯,會一直等待隊列非空。

  • 阻塞隊列是Queue下的api
    從零開始學架構(5)-juc(中)

阻塞隊列的四組api

方法 第一組會抛出異常 傳回特殊值,不抛出異常 延遲等待 一直等待
插入 add() offer(e) offer(e,time) put()
取出 remove poll() poll(time) take()
檢查 element() peek() - -

表格解釋:按照表格列觀察,有四清單示有四組api,每個插入操作和一個取出操作相對應。

第一組api,add和remove,是向阻塞隊列中插入和删除元素,如果發生阻塞情況的時候程式會抛出異常,element可以檢視隊列頭部元素。

第二組,offer()和poll(),指的是如果插入和删除元素,發生阻塞時,不會跑出異常,但是會傳回一個特殊的值,offer阻塞會傳回一個bool值。

第三組,offer(e,time) poll(time)是可以延遲若幹時間,再進行操作,延遲的時間為參數time,如果等待若幹時間操作還是發生阻塞,同樣會傳回特殊值。

第三組,put(),take(),發生阻塞時,會一直等待下去,直到不發生阻塞。

應用

如何利用上面api,寫一個同步隊列。這裡我們可以對容量為1的隊列,使用put和take操作。即插入一次值,必須等待該值出隊列和再進行插入。寫代碼2個線程模拟插入和删除。

public static void main(String[] args)  {
        ArrayBlockingQueue arrayBlockingQueue =new ArrayBlockingQueue(1);


        new Thread(()->{
            try {

                arrayBlockingQueue.put("a");
                System.out.println(Thread.currentThread().getName()+" "+"put:a");

                arrayBlockingQueue.put("b");
                System.out.println(Thread.currentThread().getName()+" "+"put:b");

                arrayBlockingQueue.put("c");
                System.out.println(Thread.currentThread().getName()+" "+"put:c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();


        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
                System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
                System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();

    }
           

線程池

我們應該對池化技術應該不陌生,有線程線,連接配接池,對象池等

池化技術的本質就是提高程式的使用率,降低我們的性能消耗。

線程池的三大方法,七大參數,四種拒絕政策

  • 三大方法

    三種建立線程池的方法:

    • SingleThreadExecutor:建立一個線程的方法
    • newFixedThreadPool:固定線程數量的方法
    • newCachedThreadPool:可伸縮數量的方法

    線程池的執行用execute方法,執行完需要關閉shutdown()

    執行個體:

public static void main(String[] args) {
//        ExecutorService executorService =Executors.newSingleThreadExecutor();
//        ExecutorService executorService=Executors.newFixedThreadPool(3);
        ExecutorService executorService =Executors.newCachedThreadPool();


        try {
            for (int i = 1; i <=30; i++) {
                executorService.execute(()->{
                    System.out.println(Thread.currentThread().getName());
                });

            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
           

但是在阿裡巴巴開發手冊中明确的說,禁止這樣來建立和調用線程,而是使用ThreadPoolExecutor方法來建立調用線程。

其實原因時上述三大方法其實都是底層源碼都是調用的 TheadPoolExecutor方法,為了讓程式員更好的了解原理,故禁止調用上層方法。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
           

threadPoolExecutor有七大參數

從零開始學架構(5)-juc(中)

回過頭看三大方法其中的newSingleThead方法其實就是線程總量大小為1,同步隊列,線程池一旦有線程池為空就關閉。其他方法類似。

使用ThreadPoolExecutor的例子

public class ThreadPoolDemo2 {
    public static void main(String[] args) {

        //開啟一個初始大小為2,最大大小為5的線程池,阻塞隊列大小為3 ,等待超過3秒就會關閉,拒絕政策是主線程執行
        ExecutorService executorService =new ThreadPoolExecutor(2, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());


        try {
            for (int i = 1; i <=30 ; i++) {
                executorService.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" "+"ok");
                });
            }
        } finally {
            executorService.shutdown();

        }
    }
           

上述代碼形象化解釋:

一個銀行前台初始化有2個開放視窗,但最大視窗是5個,等候區(即阻塞隊列大小為3),如果後續,阻塞隊列滿了,就會繼續打開未開放的視窗。

從零開始學架構(5)-juc(中)

四大拒絕政策

即當所有的線程池,阻塞隊列都滿的情況下,對新來的任務采用以下四種政策。

/**
         * 1、ThreadPoolExecutor.AbortPolicy();  抛出異常,丢棄任務
         * 思路:看到效果一樣的東西,可以研究研究!
         * 2、ThreadPoolExecutor.DiscardPolicy();不抛出異常,丢棄任務
         * 3、ThreadPoolExecutor.DiscardOldestPolicy(); 嘗試擷取任務,不一定執行!
         * 4、ThreadPoolExecutor.CallerRunsPolicy(); 哪來的去哪裡找對應的線程執行!
         */
           

最大線程池如何設定

要考慮具體的硬體設施。

cpu密集型的話,根據cpu處理器數量來定線程數。

io密集型,即操作非常大的io資源時,比價耗時,線程數就是常用的IO任務數

四個函數式接口

了解函數式接口可以簡化程式設計

記住一句話,所有的函數式接口都可以用lambda表達式來簡化。

  • Function : 有一個輸入參數有一個輸出參數
  • Consumer:有一個輸入參數,沒有輸出參數
  • Supplier:沒有輸入參數,隻有輸出參數
  • Predicate:有一個輸入參數,判斷是否正确!
/**
 *函數式程式設計
 * 四大函數式程式設計 function,consumer,supplier,predicate
 */
public class FunctionDemo01 {
    public static void main(String[] args) {

       //consumer沒有輸出 無法示範
        
        //用lambda表達式 簡化function函數式接口
        Function<String,Integer>function =(str)->{return str.length();};
        System.out.println(function.apply("abc"));

        //用lambda表達式 簡化consumer函數式接口
        Supplier<String> supplier =()->{return "hello world";};
        System.out.println(supplier.get());

        //用lambda表達式 簡化predicate函數式接口
        Predicate<String> predicate =(str)->{
            return str.isEmpty();
        };
        System.out.println(predicate.test("abc"));

    }
}
           

stream流式計算

流式計算是對Collections對象進行操作。

如例子中對List集合操作

其中filter,map都是函數式接口

import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class StreamDemo01 {
    public static void main(String[] args) {
        User user1 =new User(1,"a",23);
        User user2 =new User(2,"b",24);
        User user3 =new User(3,"c",25);
        User user4 =new User(4,"d",26);
        User user5 =new User(5,"e",27);


        List<User>list = Arrays.asList(user1,user2,user3,user4,user5);
        
        list.stream()
                .filter(u->{return (u.getId()%2)==0;}).
                filter(u->{return u.getAge()>25;}).
                map(u->{return u.getName().toUpperCase();}).
                sorted((u1,u2)->{return u2.compareTo(u1);}).
                forEach(System.out::println);

    }
}