天天看点

Java多线程 代码实例总结Java多线程 代码实例总结

Java多线程 代码实例总结

atomic的ID生成器

import java.util.concurrent.atomic.AtomicLong;

public class IdGenerator {
    AtomicLong var = new AtomicLong(0);
    public long getNextId(){
        return var.incrementAndGet();
    }

    public static void main(String[] args) {
        IdGenerator idGenerator = new IdGenerator();
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(idGenerator.getNextId());
            }).start();
        }
    }
}

           

synchronized实现阻塞队列

  • synchronized、notify、notifyall、wait
import java.util.LinkedList;
import java.util.Queue;

public class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s){
        this.queue.add(s);
        this.notify();
    }

    public synchronized String getTask() throws InterruptedException {
        while (this.queue.isEmpty()){
            this.wait();
        }
        return queue.remove();
    }
}

           
public class TaskQueueTest {
    public static void main(String[] args) throws InterruptedException {
        TaskQueue taskQueue = new TaskQueue();
        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println(taskQueue.getTask());;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                taskQueue.addTask("task+"+i);
            }
        }).start();

        Thread.sleep(2000l);
    }
}

           

reentrant实现阻塞队列

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s){
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public String getTask(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                condition.await();
            }
            String s = queue.remove();
            return s;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }
}

           
import java.util.LinkedList;
import java.util.List;

public class ReentrantCounterTest {
    public static void main(String[] args) {
        ReentrantCounter reentrantCounter = new ReentrantCounter();
        List<Thread> lists = new LinkedList<>();
        for (int i = 0; i < 5; i++) {
            lists.add(new Thread(()->{
                reentrantCounter.add(5);
            }));
        }

        for (int i = 0; i < 5; i++) {
            lists.add(new Thread(()->{
                try {
                    reentrantCounter.dec(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
        }

        for (int i = 0; i < lists.size(); i++) {
            lists.get(i).start();
        }
    }
}

           

reentrant带自旋

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantCounter {
    private final Lock lock = new ReentrantLock();
    private int cnt;

    public void add(int n){
        lock.lock();
        try {
            cnt += n;
            System.out.println(cnt);
        }finally {
            lock.unlock();
        }
    }

    public void add2(int n) throws InterruptedException {
        if(lock.tryLock(1, TimeUnit.SECONDS)){//尝试获取锁的时候最多等待1秒
            try {
                cnt += n;
                System.out.println(cnt);
            }finally {
                lock.unlock();
            }
        }
    }

    public void dec(int n) throws InterruptedException {
        if(lock.tryLock(1, TimeUnit.SECONDS)){
            try{
                cnt-=n;
                System.out.println(cnt);
            }finally {
                lock.unlock();
            }
        }
    }
}

           

ReadWriteLock悲观读写锁

  • ReadWriteLock

    可以解决多线程同时读,但只有一个线程能写的问题。

    如果我们深入分析

    ReadWriteLock

    ,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
import java.util.Arrays;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteCounter {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock rlock = rwLock.readLock();
    private final Lock wlock = rwLock.writeLock();

    private int[] counts = new int[10];

    public void inc(int index){
        wlock.lock();
        try{
            System.out.println("Writing");
            counts[index] += 1;
        }finally {
            System.out.println("write finished");
            wlock.unlock();
        }
    }

    public int[] getCounts(){
        rlock.lock();
        try{
            return Arrays.copyOf(counts, counts.length);
        }finally {
            rlock.unlock();
        }
    }

}

           

stampedLock

  • 读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
import java.util.concurrent.locks.StampedLock;

public class Stamped {
    private final StampedLock stampedLock = new StampedLock();
    private double x;
    private double y;

    public void move(double deltaX, double deltaY){
        long stamp = stampedLock.writeLock();//获取写锁
        try {
            x += deltaX;
            y += deltaY;
        }finally {
            stampedLock.unlockWrite(stamp);//释放写锁
        }
    }

    public double distance(){
        long stamp = stampedLock.tryOptimisticRead();//获得乐观读锁
        double curX = x;
        double curY = y;
        if(!stampedLock.validate(stamp)){//被修改
            stamp = stampedLock.readLock();//获取悲观读锁
            try{
                curX = x;
                curY = y;
            }finally {
                stampedLock.unlockRead(stamp);
            }
        }
        return Math.sqrt(curX*curX + curY*curY);
    }

}

           

线程池

import java.util.concurrent.*;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
//            es.submit(new Task("es"+i));
        }
        es.shutdown();


        int min=4, max=6;
        ExecutorService es1 = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
//            es1.submit(new Task("es1 #"+i));
        }
        es1.shutdownNow();

        ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
        //5秒后执行一次
        ses.schedule(new Task("schedule task"), 5l, TimeUnit.SECONDS);
        //fixrate 固定时间触发。5秒后执行,每3秒触发,不管前一次是否执行完毕
        ses.scheduleAtFixedRate(new Task("fixrate task"), 5, 3, TimeUnit.SECONDS);
        //fixdelay 上次任务执行完毕后,等待固定时间间隔再执行,下面是等待三秒
        ses.scheduleWithFixedDelay(new Task("fixdelay task"),5, 3, TimeUnit.SECONDS);
//        ses.shutdown();
    }
}

class Task implements Runnable{
    private final String name;
    public Task(String name){
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("start task - "+this.name+" ...");
        try{
            Thread.sleep(1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        System.out.println("end task "+ this.name);
    }
}

           

callable Future

import java.util.concurrent.*;

public class CallableTask implements Callable<String> {
    @Override
    public String call() throws Exception {
//        Thread.sleep(3000l);
        return "this is callable return";
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newCachedThreadPool();
        Callable<String> task = new CallableTask();
        Future<String> future = es.submit(task);
//        Thread.sleep(2000l);
//        String result = future.get();//线程未结束会产生阻塞
//        System.out.println(result);
        while (future.isDone()){
            System.out.println(future.get());
        }
    }
}

           

ThreadLocal

public class ThreadLocalTest {
    private final static ThreadLocal<User> threadLocalUser = new ThreadLocal<User>();

    public void processUser(User user){
        threadLocalUser.set(user);
        step1();
        step2();
        threadLocalUser.remove();
    }

    public void step1(){
        User user = threadLocalUser.get();
        System.out.println("Step1 process: " + user.toString());
    }
    public void step2(){
        User user = threadLocalUser.get();
        System.out.println("Step2 process: " + user.toString());
    }
}


class User{
    String name;
    int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}