天天看点

【多线程】队列模式

1. 实现自定义阻塞队列

阻塞队列在get元素时如果队列为空那么直接阻塞当前线程,在添加元素时notifyAll所有线程
public class RequestQueue {

    private final LinkedList<Request> queue = new LinkedList<>();


    public Request getRequest() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    break;
                }
            }
            return queue.removeFirst();
        }
    }


    public void putRequest(Request request) {
        synchronized (queue) {
            queue.addLast(request);
            queue.notifyAll();
        }
    }
}
           

2. 实现队列元素(Request)

public class Request {

    private final String values;

    public Request(String values) {
        this.values = values;
    }

    public String getValues() {
        return values;
    }
}
           

3.实现client 线程发送请求

public class ClientThread extends Thread {
    private final RequestQueue queue;
    private final String sendValue;

    public ClientThread(RequestQueue queue, String sendValue) {
        this.queue = queue;
        this.sendValue = sendValue;
    }
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println("client send message -> " + sendValue + i);
            queue.putRequest(new Request(sendValue + i));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        RequestQueue queue = new RequestQueue();
        ServerThread serverThread =
                new ServerThread(queue);
        ClientThread clientThread =
                new ClientThread(queue, "send value");
        serverThread.start();
        clientThread.start();
    }
}
           

4. 实现server线程处理数据

public class ServerThread extends Thread {

    private final RequestQueue queue;

    public ServerThread(RequestQueue queue) {
        this.queue = queue;
    }


    @Override
    public void run() {
        while (true) {
            Request request = queue.getRequest();
            if (request == null)
                continue;
            System.out.println("server consumer" + request.getValues());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}