1. 实现自定义阻塞队列
阻塞队列在get元素时如果队列为空那么直接阻塞当前线程,在添加元素时notifyAll所有线程
public class RequestQueue {
private final LinkedList<Request> queue = new LinkedList<>();
public Request getRequest() {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
break;
}
}
return queue.removeFirst();
}
}
public void putRequest(Request request) {
synchronized (queue) {
queue.addLast(request);
queue.notifyAll();
}
}
}
2. 实现队列元素(Request)
public class Request {
private final String values;
public Request(String values) {
this.values = values;
}
public String getValues() {
return values;
}
}
3.实现client 线程发送请求
public class ClientThread extends Thread {
private final RequestQueue queue;
private final String sendValue;
public ClientThread(RequestQueue queue, String sendValue) {
this.queue = queue;
this.sendValue = sendValue;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println("client send message -> " + sendValue + i);
queue.putRequest(new Request(sendValue + i));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
RequestQueue queue = new RequestQueue();
ServerThread serverThread =
new ServerThread(queue);
ClientThread clientThread =
new ClientThread(queue, "send value");
serverThread.start();
clientThread.start();
}
}
4. 实现server线程处理数据
public class ServerThread extends Thread {
private final RequestQueue queue;
public ServerThread(RequestQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Request request = queue.getRequest();
if (request == null)
continue;
System.out.println("server consumer" + request.getValues());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}