阻塞队列
-
什么是阻塞队列?
顾名思义,阻塞队列实质是一个队列,满足FIFO特性。
当队列满的时候,还要往队列里面放元素,会发生阻塞。
当队列空的时候,还要取元素出来,也会发生阻塞。
当在多线程通讯时,队列发生阻塞的情况的时候,线程还会等待队列变成非阻塞状态的队列就是阻塞队列。
通俗的说,一个线程从队列中取数据,发现队列中的数据为空,这个时候该线程不报错,会一直等待队列非空。
- 阻塞队列是Queue下的api
从零开始学架构(5)-juc(中)
阻塞队列的四组api
方法 | 第一组会抛出异常 | 返回特殊值,不抛出异常 | 延迟等待 | 一直等待 |
---|---|---|---|---|
插入 | add() | offer(e) | offer(e,time) | put() |
取出 | remove | poll() | poll(time) | take() |
检查 | element() | peek() | - | - |
表格解释:按照表格列观察,有四列表示有四组api,每个插入操作和一个取出操作相对应。
第一组api,add和remove,是向阻塞队列中插入和删除元素,如果发生阻塞情况的时候程序会抛出异常,element可以查看队列头部元素。
第二组,offer()和poll(),指的是如果插入和删除元素,发生阻塞时,不会跑出异常,但是会返回一个特殊的值,offer阻塞会返回一个bool值。
第三组,offer(e,time) poll(time)是可以延迟若干时间,再进行操作,延迟的时间为参数time,如果等待若干时间操作还是发生阻塞,同样会返回特殊值。
第三组,put(),take(),发生阻塞时,会一直等待下去,直到不发生阻塞。
应用
如何利用上面api,写一个同步队列。这里我们可以对容量为1的队列,使用put和take操作。即插入一次值,必须等待该值出队列和再进行插入。写代码2个线程模拟插入和删除。
public static void main(String[] args) {
ArrayBlockingQueue arrayBlockingQueue =new ArrayBlockingQueue(1);
new Thread(()->{
try {
arrayBlockingQueue.put("a");
System.out.println(Thread.currentThread().getName()+" "+"put:a");
arrayBlockingQueue.put("b");
System.out.println(Thread.currentThread().getName()+" "+"put:b");
arrayBlockingQueue.put("c");
System.out.println(Thread.currentThread().getName()+" "+"put:c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
System.out.println(Thread.currentThread().getName()+" "+arrayBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
线程池
我们应该对池化技术应该不陌生,有线程线,连接池,对象池等
池化技术的本质就是提高程序的使用率,降低我们的性能消耗。
线程池的三大方法,七大参数,四种拒绝策略
-
三大方法
三种创建线程池的方法:
- SingleThreadExecutor:创建一个线程的方法
- newFixedThreadPool:固定线程数量的方法
- newCachedThreadPool:可伸缩数量的方法
线程池的执行用execute方法,执行完需要关闭shutdown()
实例:
public static void main(String[] args) {
// ExecutorService executorService =Executors.newSingleThreadExecutor();
// ExecutorService executorService=Executors.newFixedThreadPool(3);
ExecutorService executorService =Executors.newCachedThreadPool();
try {
for (int i = 1; i <=30; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
但是在阿里巴巴开发手册中明确的说,禁止这样来创建和调用线程,而是使用ThreadPoolExecutor方法来创建调用线程。
其实原因时上述三大方法其实都是底层源码都是调用的 TheadPoolExecutor方法,为了让程序员更好的理解原理,故禁止调用上层方法。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
threadPoolExecutor有七大参数
回过头看三大方法其中的newSingleThead方法其实就是线程总量大小为1,同步队列,线程池一旦有线程池为空就关闭。其他方法类似。
使用ThreadPoolExecutor的例子
public class ThreadPoolDemo2 {
public static void main(String[] args) {
//开启一个初始大小为2,最大大小为5的线程池,阻塞队列大小为3 ,等待超过3秒就会关闭,拒绝策略是主线程执行
ExecutorService executorService =new ThreadPoolExecutor(2, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 1; i <=30 ; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+" "+"ok");
});
}
} finally {
executorService.shutdown();
}
}
上述代码形象化解释:
一个银行前台初始化有2个开放窗口,但最大窗口是5个,等候区(即阻塞队列大小为3),如果后续,阻塞队列满了,就会继续打开未开放的窗口。
四大拒绝策略
即当所有的线程池,阻塞队列都满的情况下,对新来的任务采用以下四种策略。
/**
* 1、ThreadPoolExecutor.AbortPolicy(); 抛出异常,丢弃任务
* 思路:看到效果一样的东西,可以研究研究!
* 2、ThreadPoolExecutor.DiscardPolicy();不抛出异常,丢弃任务
* 3、ThreadPoolExecutor.DiscardOldestPolicy(); 尝试获取任务,不一定执行!
* 4、ThreadPoolExecutor.CallerRunsPolicy(); 哪来的去哪里找对应的线程执行!
*/
最大线程池如何设置
要考虑具体的硬件设施。
cpu密集型的话,根据cpu处理器数量来定线程数。
io密集型,即操作非常大的io资源时,比价耗时,线程数就是常用的IO任务数
四个函数式接口
理解函数式接口可以简化编程
记住一句话,所有的函数式接口都可以用lambda表达式来简化。
- Function : 有一个输入参数有一个输出参数
- Consumer:有一个输入参数,没有输出参数
- Supplier:没有输入参数,只有输出参数
- Predicate:有一个输入参数,判断是否正确!
/**
*函数式编程
* 四大函数式编程 function,consumer,supplier,predicate
*/
public class FunctionDemo01 {
public static void main(String[] args) {
//consumer没有输出 无法演示
//用lambda表达式 简化function函数式接口
Function<String,Integer>function =(str)->{return str.length();};
System.out.println(function.apply("abc"));
//用lambda表达式 简化consumer函数式接口
Supplier<String> supplier =()->{return "hello world";};
System.out.println(supplier.get());
//用lambda表达式 简化predicate函数式接口
Predicate<String> predicate =(str)->{
return str.isEmpty();
};
System.out.println(predicate.test("abc"));
}
}
stream流式计算
流式计算是对Collections对象进行操作。
如例子中对List集合操作
其中filter,map都是函数式接口
import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class StreamDemo01 {
public static void main(String[] args) {
User user1 =new User(1,"a",23);
User user2 =new User(2,"b",24);
User user3 =new User(3,"c",25);
User user4 =new User(4,"d",26);
User user5 =new User(5,"e",27);
List<User>list = Arrays.asList(user1,user2,user3,user4,user5);
list.stream()
.filter(u->{return (u.getId()%2)==0;}).
filter(u->{return u.getAge()>25;}).
map(u->{return u.getName().toUpperCase();}).
sorted((u1,u2)->{return u2.compareTo(u1);}).
forEach(System.out::println);
}
}