day15多线程-02
1.线程池
1.1 线程状态介绍
- 当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。
- 线程对象在不同的时期有不同的状态。那么Java中的线程存在哪几种状态呢?
- Java中的线程状态被定义在了java.lang.Thread.State枚举类中,State枚举类的源码如下:
public class Thread {
public enum State {
/* 新建 */
NEW ,
/* 可运行状态 */
RUNNABLE ,
/* 阻塞状态 */
BLOCKED ,
/* 无限等待状态 */
WAITING ,
/* 计时等待 */
TIMED_WAITING ,
/* 终止 */
TERMINATED;
}
// 获取当前线程的状态
public State getState() {
return jdk.internal.misc.VM.toThreadState(threadStatus);
}
}
- 通过源码我们可以看到Java中的线程存在6种状态,每种线程状态的含义如下
- NEW 新建状态
- RUNNABLE 就绪状态
- BLOCKED 阻塞状态
- WAITING 等待状态
- TIMED_WAITING 限时等待状态
- TERMINATED 结束状态
1.2 线程池-基本原理
线程池也是可以看做成一个池子,在该池子中存储很多个线程。为了提高性能,我们就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。
1.3 线程池-Executors默认线程池
使用Executors中所提供的静态方法来创建线程池
- static ExecutorService newCachedThreadPool() 创建一个默认的线程池
代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + "线程执行了...");
});
executorService.shutdown();
}
}
1.4 线程池-Executors创建指定上限的线程池
使用Executors中所提供的静态方法来创建线程池
-
static ExecutorService newFixedThreadPool(int nThreads) :
创建一个指定最多线程数量的线程池
代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Demo2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
System.out.println(pool.getPoolSize());
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"线程执行了...");
});
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"线程执行了...");
});
System.out.println(pool.getPoolSize());
executorService.shutdown();
}
}
1.5 线程池-ThreadPoolExecutor
- 创建线程池对象 : ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最 大存活时间,任务队列,创建线程工厂,任务的拒绝策略);
代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo3 {
public static void main(String[] args) {
ThreadPoolExecutor poor = new ThreadPoolExecutor(
// 参数一:核心线程数量
2,
// 参数二:最大线程数
5,
// 参数三:空闲线程最大存活时间
3,
// 参数四:时间单位
TimeUnit.SECONDS,
// 参数五:任务队列
new ArrayBlockingQueue<>(5),
// 参数六:创建线程工厂
Executors.defaultThreadFactory(),
// 参数七:任务的拒绝策略
new ThreadPoolExecutor.AbortPolicy());
System.out.println(poor.getPoolSize());
poor.submit(new myThread());
poor.submit(new myThread());
System.out.println(poor.getPoolSize());
poor.shutdown();
}
}
class myThread implements Runnable {
@Override
public void run() {
System.out.println("汉堡包真好吃");
}
}
1.6 线程池-参数详解
- corePoolSize: 核心线程的最大值,不能小于0
- maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
- keepAliveTime: 空闲线程最大存活时间,不能小于0
- unit: 时间单位
- workQueue: 任务队列,不能为null
- threadFactory: 创建线程工厂,不能为null
- handler: 任务的拒绝策略,不能为null
1.7 线程池-非默认任务拒绝策略
- RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类
-
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出
RejectedExecutionException异常,是默认的策略。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常,这是不推荐的做法。
- ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中。
- ThreadPoolExecutor.CallerRunsPolicy: 调用任务的run()方法绕过线程池直接执行。
- 注:明确线程池可执行的任务数 = 队列容量 + 最大线程数
案例演示1:演示ThreadPoolExecutor.AbortPolicy任务处理策略
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo4 {
public static void main(String[] args) {
ThreadPoolExecutor poor = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <= 11; i++) {
int j = i;
poor.submit(() -> {
System.out.println(j + Thread.currentThread().getName() + "程序执行了");
});
}
poor.shutdown();
}
}
案例演示2:演示ThreadPoolExecutor.DiscardPolicy任务处理策略 new ThreadPoolExecutor.DiscardPolicy()
案例演示3:演示ThreadPoolExecutor.DiscardOldestPolicy任务处理策略 new ThreadPoolExecutor.DiscardOldestPolicy()
案例演示4:演示ThreadPoolExecutor.CallerRunsPolicy任务处理策略 new ThreadPoolExecutor.CallerRunsPolicy()
2. 原子性
2.1 volatile-问题
2.2 volatile解决以上案例出现的问题
- 当A线程修改了共享数据时,B线程没有及时获取到最新的值,如果还在使用原先的值,就会出现问题
- 1.堆内存是唯一的,每一个线程都有自己的线程栈。
- 2.每一个线程在使用堆里面变量的时候,都会先拷贝一份到变量的副本中。
- 3.在线程中,每一次使用是从变量的副本中获取的。
- Volatile关键字:强制线程每次在使用的时候,都会看一下共享区域最新的值
2.3 synchronized解决
-
1.线程获得锁
2.清空变量副本
3.拷贝共享变量最新的值到变量副本中
4.执行代码
5.将修改后变量副本中的值赋值给共享数据
6.释放锁
2.4 原子性
概述
所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的
干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体
代码
public class IceCream implements Runnable {
public static volatile int counts = 0;
@Override
public void run() {
for (int i = 0; i < 100; i++) {
counts++;
System.out.println("已经送了" + counts + "个冰淇淋");
}
}
}
class Test {
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
IceCream iceCream = new IceCream();
Thread tr = new Thread(iceCream);
tr.start();
}
}
}
- 代码总结 : count++ 不是一个原子性操作, 他在执行的过程中,有可能被其他线程打断
2.5 volatile关键字不能保证原子性
-
解决方案 :
我们可以给count++操作添加锁,那么count++操作就是临界区中的代码,临界区中的代码一次只能被一个线程去执行,所以count++就变成了原子操作。
代码
public class IceCream implements Runnable {
public static volatile int counts = 0;
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (this) {
counts++;
System.out.println("已经送了" + counts + "个冰淇淋");
}
}
}
}
2.6 原子性_AtomicInteger
-
概述:java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,
线程安全地更新一个变量的方式。AtomicInteger的常用方法如下:
- public AtomicInteger():初始化一个默认值为0的原子型Integer
- public AtomicInteger(int initialValue):初始化一个指定值的原子型Integer
- int get():获取值
- int getAndIncrement():以原子方式将当前值加1,注意,这里返回的是自增前的值。
- int incrementAndGet():以原子方式将当前值加1,注意,这里返回的是自增后的值。
- int addAndGet(int data):以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
- int getAndSet(int value):以原子方式设置为newValue的值,并返回旧值。
代码
import java.util.concurrent.atomic.AtomicInteger;
public class Demo6 {
public static void main(String[] args) {
// AtomicInteger ai = new AtomicInteger();//默认值为0
AtomicInteger ai1 = new AtomicInteger(10);//初始值为10
System.out.println(ai1.get());//获得初始值10
// System.out.println(ai1.incrementAndGet());//先自增后获取,11
// System.out.println(ai1.getAndIncrement());//先获取后自增,10
// System.out.println(ai1.addAndGet(10));//先添加值后获取,20
// System.out.println(ai1.getAndAdd(10));//先获取值后添加,10
System.out.println(ai1.getAndSet(20));//先获取后将值修改,10
System.out.println(ai1.get());//20
}
}
2.7 AtomicInteger-内存解析
- AtomicInteger原理:自旋锁 + CAS 算法
CAS算法:
- 有3个操作数(内存值V, 旧的预期值A,要修改的值B)
- 当旧的预期值A == 内存值:此时修改成功,将V改为B
- 当旧的预期值A!=内存值:此时修改失败,不做任何操作,并重新获取现在的最新值(这个重新获取的动作就是自旋)
2.8 AtomicInteger-源码解析
案例代码
import java.util.concurrent.atomic.AtomicInteger;
public class IceCream2 implements Runnable {
public AtomicInteger ai;
public IceCream2(AtomicInteger ai) {
this.ai = ai;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("已经送了" + ai.incrementAndGet() + "个冰淇淋");
}
}
}
class Test2 {
public static void main(String[] args) {
AtomicInteger ai = new AtomicInteger();
for (int i = 0; i < 100; i++) {
IceCream2 iceCream2 = new IceCream2(ai);
Thread tr = new Thread(iceCream2);
tr.start();
}
}
}
源码解析
//先自增,然后获取自增后的结果
public final int incrementAndGet() {
//+ 1 自增后的结果
//this 就表示当前的atomicInteger(值)
//1 自增一次
return U.getAndAddInt(this, VALUE, 1) + 1;
}
public final int getAndAddInt(Object o, long offset, int delta) {
//v 旧值 int v;
//自旋的过程
do {
//不断的获取旧值
v = getIntVolatile(o, offset);
//如果这个方法的返回值为false,那么继续自旋
//如果这个方法的返回值为true,那么自旋结束
//o 表示的就是内存值
//v 旧值
//v + delta 修改后的值
} while (!weakCompareAndSetInt(o, offset, v, v + delta)); //作用:比较内存中的值,旧值是否相等,如果相等就把修改后的值写到内存中,返回true。表示修改成功。
// 如果不相等,无法把修改后的值写到内存中,返回false。表示修改失败。
//如果修改失败,那么继续自旋。
return v;
}
2.9 悲观锁和乐观锁
synchronized和CAS的区别
- 相同点:在多线程情况下,都可以保证共享数据的安全性。
-
不同点:
synchronized总是从最坏的角度出发,认为每次获取数据的时候,别人都有可能修改。所以在每次操作共享数据之前,都会上锁。(悲观锁)
cas是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据。
如果别人修改过,那么我再次获取现在最新的值。 如果别人没有修改过,那么我现在直接修改共享数据的值.(乐观锁)
3.并发工具类
3.1 并发工具类-Hashtable
Hashtable出现的原因
- 在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。
- 为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。
3.2 并发工具类-ConcurrentHashMap
基本使用
- ConcurrentHashMap出现的原因:在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。
- 为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。基于以上两个原因我们可以使用JDK1.5以后所提供的ConcurrentHashMap。
总结
-
1.HashMap是线程不安全的。多线程环境下会有数据安全问题
2.Hashtable是线程安全的,但是会将整张表锁起来,效率低下
3.ConcurrentHashMap也是线程安全的,效率较高。在JDK7和JDK8中,底层原理不一样。
import java.util.concurrent.ConcurrentHashMap;
public class Demo7 {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, String> chm = new ConcurrentHashMap<>();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 25; i++) {
chm.put(i + "", i + "");
}
});
Thread thread2 = new Thread(() -> {
for (int i = 25; i < 51; i++) {
chm.put(i + "", i + "");
}
});
thread1.start();
thread2.start();
System.out.println("---------------------");
Thread.sleep(1000);
for (int i = 0; i < 51; i++) {
System.out.println(chm.get(i + ""));
}
}
}
3.3 并发工具类-ConcurrentHashMap1.7原理
创建对象
- 1.默认创建一个长度16,加载因子为0.75的大数组,这个大数组一旦创建无法扩容
- 2.还会创建一个长度为2的小数组,把地址值赋值给0索引处,其他索引位置的元素都是null
注意事项
- 如果为null,则按照模板创建小数组.二次哈希,计算出在小数组中应存入的位置.
- 如果需要扩容,则将小数组扩容两倍;如果不需要扩容,则就会判断小数组的这个位置有没有元素.
- 如果没有元素,则直接存.如果有元素,就会调用equals方法,比较属性值.如果equals为true,则不存.
-
如果equals为false,形成哈希桶结构
3.4 并发工具类-ConcurrentHashMap1.8原理
总结
-
1.如果使用空参构造创建ConcurrentHashMap对象,则什么事情都不做在第一次添加元素的时候创建哈希表
2.计算当前元素应存入的索引。
3.如果该索引位置为null,则利用cas算法,将本结点添加到数组中。
4.如果该索引位置不为null,则利用volatile关键字获得当前位置最新的结点地址,挂在他下面,变成链表。
5.当链表的长度大于等于8时,自动转换成红黑树6,以链表或者红黑树头结点为锁对象,配合悲观锁保证多线程,操作集合时数据的安全性
3.5 并发工具类-CountDownLatch
CountDownLatch类 :
- public CountDownLatch(int count) 参数传递线程数,表示等待线程数量
- public void await() 让线程等待
- public void countDown() 当前线程执行完毕
- 使用场景:让某一条线程等待其他线程执行完毕之后再执行
代码
import java.util.concurrent.CountDownLatch;
public class MotherThread extends Thread{
private CountDownLatch cdt;
public MotherThread(CountDownLatch cdt) {
this.cdt = cdt;
}
@Override
public void run() {
try {
cdt.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.CountDownLatch;
public class ChildThread1 extends Thread {
private CountDownLatch cdl;
public ChildThread1(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "个饺子");
}
cdl.countDown();
System.out.println(Thread.currentThread().getName() + "吃完饺子了");
}
}
import java.util.concurrent.CountDownLatch;
public class ChildThread2 extends Thread {
private CountDownLatch cdl;
public ChildThread2(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "个饺子");
}
cdl.countDown();
System.out.println(Thread.currentThread().getName() + "吃完饺子了");
}
}
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdt = new CountDownLatch(2);
MotherThread mt = new MotherThread(cdt);
mt.start();
ChildThread1 ct1 = new ChildThread1(cdt);
ct1.setName("小王");
ct1.start();
ChildThread2 ct2 = new ChildThread2(cdt);
ct2.setName("小李");
ct2.start();
Thread.sleep(2000);
System.out.println("妈妈收拾饭桌");
}
}
总结
- CountDownLatch(int count):参数写等待线程的数量。并定义了一个计数器。
- await():让线程等待,当计数器为0时,会唤醒等待的线程
- countDown(): 线程执行完毕时调用,会将计数器-1。
3.6 并发工具类-Semaphore
- 使用场景:可以控制访问特定资源的线程数量。
实现步骤
-
1.需要有人管理这个通道
2.当有车进来了,发通行许可证
3.当车出去了,收回通行许可证
4.如果通行许可证发完了,那么其他车辆只能等着
代码
import java.util.concurrent.Semaphore;
public class SemaphoreDemo implements Runnable {
//1,需要有人管理这个通道
private Semaphore semaphore = new Semaphore(3);
@Override
public void run() {
//2,当有车进来了,发通行许可证
try {
semaphore.acquire();
System.out.println("获得许可证,开始驾驶");
Thread.sleep(100);
//3,当车出去了,收回通行许可证
semaphore.release();
System.out.println("归还许可证");
//4,如果通行许可证发完了,那么其他车辆只能等着*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Test {
public static void main(String[] args) {
SemaphoreDemo sd = new SemaphoreDemo();
for (int i = 0; i < 100; i++) {
new Thread(sd).start();
}
}
}