1. synchronized实现生产者消费者消息队列
Message类只有
get()
无
set()
,并且加上了
fianl
,只能读取不能修改,确保了线程安全
/**
* 消息类--商品
* 无Setter()线程安全
*/
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
@Override
public String toString() {
return "Message{ " +
"id=" + id +
", message=" + message +
'}';
}
}
//线程间通信的消息队列
class MessageQueue {
//存放消息的队列
private LinkedList<Message> list = new LinkedList<Message> ();
//容量
private int capacity;
//初始化容器
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//存入消息
public void put(Message message) {
synchronized (list) {
while (list.size() == capacity) {
try {
System.err.println("对队列为满,<生产者>线程等待...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//尾部添加消息
list.addLast(message);
System.out.println("生产消息!" + message);
//唤醒等待的消费者
list.notifyAll();
}
}
//获取消息
public Message take() {
//检查队列是否为空
synchronized (list) {
while(list.isEmpty()) {
try {
System.err.println("对队列为空,<消费者>线程等待...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从对头取出消息并返回
Message msg = list.removeFirst();
System.out.println("消费消息" + msg);
//通着生产者生产
list.notifyAll();
return msg;
}
}
}
//测试类
public class _01ProduceConsumer_Sync {
public static void main(String[] args) {
MessageQueue msque = new MessageQueue(2);
//生产者
for (int i = 0; i < 3; i++) {
//Lambda中引用的外部局部变量必须为final,再定义一个局部变量
int id = i;
new Thread(() -> {
msque.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
msque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR5UMrpWT6lkeNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5ITNyMDOzIjMxIDOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2. Lock实现
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
@Override
public String toString() {
return "Message{ " +
"id=" + id +
", message=" + message +
'}';
}
}
//线程间通信的消息队列
class MessageQueue {
//存放消息的队列,安全集合ConcurrentLinkedQueue
private LinkedList<Message> list = new LinkedList<Message> ();
//容量
private int capacity;
//初始化容器
public MessageQueue(int capacity) {
this.capacity = capacity;
}
Lock lock = new ReentrantLock();
Condition produce = lock.newCondition();
Condition consume = lock.newCondition();
//存入消息
public void put(Message message) {
try {
lock.lock();
while (list.size() == capacity) {
System.err.println("对队列为满,<生产者>线程等待...");
produce.await();
}
//尾部添加消息
list.addLast(message);
System.out.println("生产消息!" + message);
//唤醒等待的消费者
consume.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//获取消息
public Message take() {
Message msg = null;
//检查队列是否为空
try {
lock.lock();
while(list.isEmpty()) {
System.err.println("对队列为空,<消费者>线程等待...");
consume.await();
}
//从对头取出消息并返回
msg = list.removeFirst();
System.out.println("消费消息" + msg);
//通着生产者生产
produce.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return msg;
}
}
//测试类
public class _02ProduceConsumer_Lock {
public static void main(String[] args) {
MessageQueue msque = new MessageQueue(2);
//生产者
for (int i = 0; i < 3; i++) {
//Lambda中引用的外部局部变量必须为final,再定义一个局部变量
int id = i;
new Thread(() -> {
msque.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
msque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
3. BlockingQueue
public class _03BlockingQueue {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
Runnable producer = () -> {
try {
queue.put("数据");
System.out.println("生产数据...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
Runnable consumer = () -> {
try {
String product = queue.take();
System.out.println("消费" + product + "!!!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
IntStream.rangeClosed(1, 50).forEach(i -> {
CompletableFuture.runAsync(producer);
CompletableFuture.runAsync(consumer);
});
countDownLatch.await();
}
}