天天看点

day15多线程-02

day15多线程-02

1.线程池

1.1 线程状态介绍

  • 当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。
  • 线程对象在不同的时期有不同的状态。那么Java中的线程存在哪几种状态呢?
  • Java中的线程状态被定义在了java.lang.Thread.State枚举类中,State枚举类的源码如下:
public class Thread { 
	public enum State { 
	/* 新建 */ 
	NEW , 
	/* 可运行状态 */ 
	RUNNABLE , 
	/* 阻塞状态 */ 
	BLOCKED , 
	/* 无限等待状态 */ 
	WAITING , 
	/* 计时等待 */ 
	TIMED_WAITING , 
	/* 终止 */ 
	TERMINATED; 
}
// 获取当前线程的状态 
public State getState() { 
	return jdk.internal.misc.VM.toThreadState(threadStatus); 
	} 
}
           
  • 通过源码我们可以看到Java中的线程存在6种状态,每种线程状态的含义如下
  • NEW 新建状态
  • RUNNABLE 就绪状态
  • BLOCKED 阻塞状态
  • WAITING 等待状态
  • TIMED_WAITING 限时等待状态
  • TERMINATED 结束状态

1.2 线程池-基本原理

线程池也是可以看做成一个池子,在该池子中存储很多个线程。为了提高性能,我们就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。

1.3 线程池-Executors默认线程池

使用Executors中所提供的静态方法来创建线程池

  • static ExecutorService newCachedThreadPool() 创建一个默认的线程池
代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + "线程执行了...");
        });
        executorService.shutdown();
    }
}
           

1.4 线程池-Executors创建指定上限的线程池

使用Executors中所提供的静态方法来创建线程池

  • static ExecutorService newFixedThreadPool(int nThreads) :

    创建一个指定最多线程数量的线程池

代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Demo2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
        System.out.println(pool.getPoolSize());
        executorService.submit(()->{
            System.out.println(Thread.currentThread().getName()+"线程执行了...");
        });

        executorService.submit(()->{
            System.out.println(Thread.currentThread().getName()+"线程执行了...");
        });
        System.out.println(pool.getPoolSize());
        executorService.shutdown();
    }
}
           

1.5 线程池-ThreadPoolExecutor

  • 创建线程池对象 : ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最 大存活时间,任务队列,创建线程工厂,任务的拒绝策略);
代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo3 {
    public static void main(String[] args) {
        ThreadPoolExecutor poor = new ThreadPoolExecutor(
        // 参数一:核心线程数量
        		2,
        // 参数二:最大线程数
                5,
        // 参数三:空闲线程最大存活时间
                3,
        // 参数四:时间单位
                TimeUnit.SECONDS,
        // 参数五:任务队列
                new ArrayBlockingQueue<>(5),
        // 参数六:创建线程工厂        
                Executors.defaultThreadFactory(),
        // 参数七:任务的拒绝策略        
                new ThreadPoolExecutor.AbortPolicy());
        System.out.println(poor.getPoolSize());
        poor.submit(new myThread());
        poor.submit(new myThread());
        System.out.println(poor.getPoolSize());
        poor.shutdown();
    }
}

class myThread implements Runnable {

    @Override
    public void run() {
        System.out.println("汉堡包真好吃");
    }
}
           

1.6 线程池-参数详解

  • corePoolSize: 核心线程的最大值,不能小于0
  • maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
  • keepAliveTime: 空闲线程最大存活时间,不能小于0
  • unit: 时间单位
  • workQueue: 任务队列,不能为null
  • threadFactory: 创建线程工厂,不能为null
  • handler: 任务的拒绝策略,不能为null

1.7 线程池-非默认任务拒绝策略

  • RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类
  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出

    RejectedExecutionException异常,是默认的策略。

  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常,这是不推荐的做法。
  • ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中。
  • ThreadPoolExecutor.CallerRunsPolicy: 调用任务的run()方法绕过线程池直接执行。
  • 注:明确线程池可执行的任务数 = 队列容量 + 最大线程数

案例演示1:演示ThreadPoolExecutor.AbortPolicy任务处理策略

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo4 {
    public static void main(String[] args) {
        ThreadPoolExecutor poor = new ThreadPoolExecutor(2,
                5,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 1; i <= 11; i++) {
            int j = i;
            poor.submit(() -> {
                System.out.println(j + Thread.currentThread().getName() + "程序执行了");
            });
        }
        poor.shutdown();
    }
}
           

案例演示2:演示ThreadPoolExecutor.DiscardPolicy任务处理策略 new ThreadPoolExecutor.DiscardPolicy()

案例演示3:演示ThreadPoolExecutor.DiscardOldestPolicy任务处理策略 new ThreadPoolExecutor.DiscardOldestPolicy()

案例演示4:演示ThreadPoolExecutor.CallerRunsPolicy任务处理策略 new ThreadPoolExecutor.CallerRunsPolicy()

2. 原子性

2.1 volatile-问题

2.2 volatile解决以上案例出现的问题

  • 当A线程修改了共享数据时,B线程没有及时获取到最新的值,如果还在使用原先的值,就会出现问题
  • 1.堆内存是唯一的,每一个线程都有自己的线程栈。
  • 2.每一个线程在使用堆里面变量的时候,都会先拷贝一份到变量的副本中。
  • 3.在线程中,每一次使用是从变量的副本中获取的。
  • Volatile关键字:强制线程每次在使用的时候,都会看一下共享区域最新的值

2.3 synchronized解决

  • 1.线程获得锁

    2.清空变量副本

    3.拷贝共享变量最新的值到变量副本中

    4.执行代码

    5.将修改后变量副本中的值赋值给共享数据

    6.释放锁

2.4 原子性

概述

所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的

干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体

代码
public class IceCream implements Runnable {
    public static volatile int counts = 0;

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            counts++;
            System.out.println("已经送了" + counts + "个冰淇淋");
        }
    }
}

class Test {
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            IceCream iceCream = new IceCream();
            Thread tr = new Thread(iceCream);
            tr.start();
        }
    }
}
           
  • 代码总结 : count++ 不是一个原子性操作, 他在执行的过程中,有可能被其他线程打断

2.5 volatile关键字不能保证原子性

  • 解决方案 :

    我们可以给count++操作添加锁,那么count++操作就是临界区中的代码,临界区中的代码一次只能被一个线程去执行,所以count++就变成了原子操作。

代码
public class IceCream implements Runnable {
    public static volatile int counts = 0;

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            synchronized (this) {
                counts++;
                System.out.println("已经送了" + counts + "个冰淇淋");
            }
        }
    }
}
           

2.6 原子性_AtomicInteger

  • 概述:java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,

    线程安全地更新一个变量的方式。AtomicInteger的常用方法如下:

  • public AtomicInteger():初始化一个默认值为0的原子型Integer
  • public AtomicInteger(int initialValue):初始化一个指定值的原子型Integer
  • int get():获取值
  • int getAndIncrement():以原子方式将当前值加1,注意,这里返回的是自增前的值。
  • int incrementAndGet():以原子方式将当前值加1,注意,这里返回的是自增后的值。
  • int addAndGet(int data):以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
  • int getAndSet(int value):以原子方式设置为newValue的值,并返回旧值。
代码
import java.util.concurrent.atomic.AtomicInteger;

public class Demo6 {
    public static void main(String[] args) {
//        AtomicInteger ai = new AtomicInteger();//默认值为0
        AtomicInteger ai1 = new AtomicInteger(10);//初始值为10
        System.out.println(ai1.get());//获得初始值10
//        System.out.println(ai1.incrementAndGet());//先自增后获取,11
//        System.out.println(ai1.getAndIncrement());//先获取后自增,10
//        System.out.println(ai1.addAndGet(10));//先添加值后获取,20
//        System.out.println(ai1.getAndAdd(10));//先获取值后添加,10
        System.out.println(ai1.getAndSet(20));//先获取后将值修改,10
        System.out.println(ai1.get());//20
    }
}
           

2.7 AtomicInteger-内存解析

  • AtomicInteger原理:自旋锁 + CAS 算法

CAS算法:

  • 有3个操作数(内存值V, 旧的预期值A,要修改的值B)
  • 当旧的预期值A == 内存值:此时修改成功,将V改为B
  • 当旧的预期值A!=内存值:此时修改失败,不做任何操作,并重新获取现在的最新值(这个重新获取的动作就是自旋)

2.8 AtomicInteger-源码解析

案例代码

import java.util.concurrent.atomic.AtomicInteger;

public class IceCream2 implements Runnable {
    public AtomicInteger ai;

    public IceCream2(AtomicInteger ai) {
        this.ai = ai;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("已经送了" + ai.incrementAndGet() + "个冰淇淋");
        }
    }
}

class Test2 {
    public static void main(String[] args) {
        AtomicInteger ai = new AtomicInteger();
        for (int i = 0; i < 100; i++) {
            IceCream2 iceCream2 = new IceCream2(ai);
            Thread tr = new Thread(iceCream2);
            tr.start();
        }
    }
}
           

源码解析

//先自增,然后获取自增后的结果 
public final int incrementAndGet() { 
	//+ 1 自增后的结果 
	//this 就表示当前的atomicInteger(值) 
	//1 自增一次 
	return U.getAndAddInt(this, VALUE, 1) + 1; 
}
public final int getAndAddInt(Object o, long offset, int delta) { 
	//v 旧值 int v; 
	//自旋的过程 
	do {
		//不断的获取旧值 
		v = getIntVolatile(o, offset); 
		//如果这个方法的返回值为false,那么继续自旋 
		//如果这个方法的返回值为true,那么自旋结束 
		//o 表示的就是内存值 
		//v 旧值 
		//v + delta 修改后的值 
		} while (!weakCompareAndSetInt(o, offset, v, v + delta)); //作用:比较内存中的值,旧值是否相等,如果相等就把修改后的值写到内存中,返回true。表示修改成功。 
		// 如果不相等,无法把修改后的值写到内存中,返回false。表示修改失败。 
		//如果修改失败,那么继续自旋。 
		return v;
}
           

2.9 悲观锁和乐观锁

synchronized和CAS的区别

  • 相同点:在多线程情况下,都可以保证共享数据的安全性。
  • 不同点:

    synchronized总是从最坏的角度出发,认为每次获取数据的时候,别人都有可能修改。所以在每次操作共享数据之前,都会上锁。(悲观锁)

    cas是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据。

    如果别人修改过,那么我再次获取现在最新的值。 如果别人没有修改过,那么我现在直接修改共享数据的值.(乐观锁)

    3.并发工具类

    3.1 并发工具类-Hashtable

    Hashtable出现的原因

  • 在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。
  • 为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。

3.2 并发工具类-ConcurrentHashMap

基本使用

  • ConcurrentHashMap出现的原因:在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。
  • 为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。基于以上两个原因我们可以使用JDK1.5以后所提供的ConcurrentHashMap。

总结

  • 1.HashMap是线程不安全的。多线程环境下会有数据安全问题

    2.Hashtable是线程安全的,但是会将整张表锁起来,效率低下

    3.ConcurrentHashMap也是线程安全的,效率较高。在JDK7和JDK8中,底层原理不一样。

import java.util.concurrent.ConcurrentHashMap;

public class Demo7 {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, String> chm = new ConcurrentHashMap<>();
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 25; i++) {
                chm.put(i + "", i + "");
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 25; i < 51; i++) {
                chm.put(i + "", i + "");
            }
        });
        thread1.start();
        thread2.start();
        System.out.println("---------------------");
        Thread.sleep(1000);
        for (int i = 0; i < 51; i++) {
            System.out.println(chm.get(i + ""));
        }
    }
}
           

3.3 并发工具类-ConcurrentHashMap1.7原理

创建对象

  • 1.默认创建一个长度16,加载因子为0.75的大数组,这个大数组一旦创建无法扩容
  • 2.还会创建一个长度为2的小数组,把地址值赋值给0索引处,其他索引位置的元素都是null

注意事项

  • 如果为null,则按照模板创建小数组.二次哈希,计算出在小数组中应存入的位置.
  • 如果需要扩容,则将小数组扩容两倍;如果不需要扩容,则就会判断小数组的这个位置有没有元素.
  • 如果没有元素,则直接存.如果有元素,就会调用equals方法,比较属性值.如果equals为true,则不存.
  • 如果equals为false,形成哈希桶结构

    3.4 并发工具类-ConcurrentHashMap1.8原理

    总结

  • 1.如果使用空参构造创建ConcurrentHashMap对象,则什么事情都不做在第一次添加元素的时候创建哈希表

    2.计算当前元素应存入的索引。

    3.如果该索引位置为null,则利用cas算法,将本结点添加到数组中。

    4.如果该索引位置不为null,则利用volatile关键字获得当前位置最新的结点地址,挂在他下面,变成链表。

    5.当链表的长度大于等于8时,自动转换成红黑树6,以链表或者红黑树头结点为锁对象,配合悲观锁保证多线程,操作集合时数据的安全性

3.5 并发工具类-CountDownLatch

CountDownLatch类 :

  • public CountDownLatch(int count) 参数传递线程数,表示等待线程数量
  • public void await() 让线程等待
  • public void countDown() 当前线程执行完毕
  • 使用场景:让某一条线程等待其他线程执行完毕之后再执行
代码
import java.util.concurrent.CountDownLatch;

public class MotherThread extends Thread{
    private CountDownLatch cdt;

    public MotherThread(CountDownLatch cdt) {
        this.cdt = cdt;
    }

    @Override
    public void run() {
        try {
            cdt.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
import java.util.concurrent.CountDownLatch;

public class ChildThread1 extends Thread {
    private CountDownLatch cdl;

    public ChildThread1(CountDownLatch cdl) {
        this.cdl = cdl;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "个饺子");
        }
        cdl.countDown();
        System.out.println(Thread.currentThread().getName() + "吃完饺子了");
    }
}
import java.util.concurrent.CountDownLatch;

public class ChildThread2 extends Thread {
    private CountDownLatch cdl;

    public ChildThread2(CountDownLatch cdl) {
        this.cdl = cdl;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            System.out.println(Thread.currentThread().getName() + "正在吃第" + i + "个饺子");
        }
        cdl.countDown();
        System.out.println(Thread.currentThread().getName() + "吃完饺子了");
    }
}
import java.util.concurrent.CountDownLatch;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdt = new CountDownLatch(2);
        MotherThread mt = new MotherThread(cdt);
        mt.start();

        ChildThread1 ct1 = new ChildThread1(cdt);
        ct1.setName("小王");
        ct1.start();

        ChildThread2 ct2 = new ChildThread2(cdt);
        ct2.setName("小李");
        ct2.start();

        Thread.sleep(2000);
        System.out.println("妈妈收拾饭桌");
    }
}
           

总结

  • CountDownLatch(int count):参数写等待线程的数量。并定义了一个计数器。
  • await():让线程等待,当计数器为0时,会唤醒等待的线程
  • countDown(): 线程执行完毕时调用,会将计数器-1。

3.6 并发工具类-Semaphore

  • 使用场景:可以控制访问特定资源的线程数量。

实现步骤

  • 1.需要有人管理这个通道

    2.当有车进来了,发通行许可证

    3.当车出去了,收回通行许可证

    4.如果通行许可证发完了,那么其他车辆只能等着

代码
import java.util.concurrent.Semaphore;

public class SemaphoreDemo implements Runnable {
    //1,需要有人管理这个通道
    private Semaphore semaphore = new Semaphore(3);

    @Override
    public void run() {

        //2,当有车进来了,发通行许可证
        try {
            semaphore.acquire();
            System.out.println("获得许可证,开始驾驶");
            Thread.sleep(100);
            //3,当车出去了,收回通行许可证
            semaphore.release();
            System.out.println("归还许可证");
            //4,如果通行许可证发完了,那么其他车辆只能等着*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Test {
    public static void main(String[] args) {
        SemaphoreDemo sd = new SemaphoreDemo();
        for (int i = 0; i < 100; i++) {
            new Thread(sd).start();
        }
    }
}