Toasobi
生产者消费者4种实现
本文最后更新于2023年10月08日,已超过454天没有更新。如果文章内容或图片资源失效,请留言反馈,我会及时处理,谢谢!
synchronized之wait与notify实现
最基础的实现方式
public class producerCoustomer1 {
private static Integer count = 0; //当前队列中有几个生产出的物品
private static final Integer FULL = 10; //一共能放多少个生产出的物品
private static String LOCK = "lock"; //全局锁
/**
* 定义生产者
*/
class Producer implements Runnable {
@Override
public void run() { //线程执行方法
for(int i = 0; i < 10; i++){
try{
Thread.sleep(3000); //先睡个3s
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (LOCK){
while (count == FULL){
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count ++;
System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
LOCK.notify();
}
}
}
}
/**
* 定义消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (LOCK) {
while (count == 0) {
try {
LOCK.wait();
} catch (Exception e) {
}
}
count--;
System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
LOCK.notifyAll();
}
}
}
}
public static void main(String[] args) {
producerCoustomer1 pc = new producerCoustomer1();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
}
打印出来的结果如下:
(每隔3秒出来6条数据)
可重入锁ReentrantLock的实现
这个案例没有用到重入锁的多次获取锁计数器加一的特性
ReentrantLock比synchronized安全,而要像synchronized一样实现wait和notify,则需要使用Condition对象
private static Integer count = 0;
private static final Integer FULL = 10;
//创建一个锁对象
private Lock lock = new ReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000); //先睡个3s
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取锁
lock.lock();
try {
while (count == FULL) {
try {
notFull.await(); //阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName() + "生产者生产,目前共有" + count);
//唤醒消费者
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
}
/**
* 定义消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock.lock();
try {
while (count == 0) {
try {
notEmpty.await(); //阻塞
} catch (Exception e) {
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
//唤醒生产者
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
producerCoustomer2 pc = new producerCoustomer2();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
}
执行结果如下:
阻塞队列BlockingQueue的实现
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如>下两种:
- 当队列满了的时候进行入队列操作
- 当队列空了的时候进行出队列操作
因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻
塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
从上可知,阻塞队列是线程安全的此外,对于入队出队操作,使用BlockingQueue种不一样的方法失败时会得到不一样处理结果,可以自己去查阅资料
public class producerCustomer3 {
private static volatile Integer count = 0;
//创建一个阻塞队列
final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); //限制容量为10
class Producer implements Runnable{
@Override
public void run() {
for(int i = 0; i < 10; i++){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
blockingQueue.put(1);
synchronized (count) {
count++;
}
System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
blockingQueue.take();
synchronized (count) {
count--;
}
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
producerCustomer3 pc = new producerCustomer3();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
}
对于该阻塞队列的实现方式,有两点需要注意:
- 阻塞队列并没有给生产消费方法加锁,所以对于共享变量count需要用volatile处理,并在修改的时候加上synchronized
2.阻塞队列方式由于没有加锁,所以打印出来的数据并不是按照线程执行顺序打印的,但是本质上没有问题
结果如下:
信号量实现
这个方法我还多加了一个isProducerTurn用来展示生产消费不能连续的情况
public class producerCustomer4 {
private static Integer count = 0;
//创建3个信号量
final Semaphore notFull = new Semaphore(10);
final Semaphore notEmpty = new Semaphore(0);
final Semaphore mutex = new Semaphore(1); //加入一个mutex信号量,维护生产者消费者之间的同步关系,保证二者交替运行
volatile boolean isProducerTurn = true;
class Producer implements Runnable {
@Override
public void run() {
for(int i = 0; i < 10; i++){
try{
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
try {
notFull.acquire();
mutex.acquire();
while(!isProducerTurn){
mutex.release(); // 如果不是生产者轮次,释放锁并等待
notFull.release();
Thread.sleep(100);
notFull.acquire();
mutex.acquire();
}
count++;
System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
isProducerTurn = false; // 设置为消费者轮次
}catch (InterruptedException e){
e.printStackTrace();
}finally{
mutex.release();
notEmpty.release(); //可以换消费者进来消费
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
notEmpty.acquire();
mutex.acquire();
while (isProducerTurn){
mutex.release(); // 如果不是生产者轮次,释放锁并等待
notEmpty.release();
Thread.sleep(100);
notEmpty.acquire();
mutex.acquire();
}
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
isProducerTurn = true; // 设置为生产者轮次
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release(); //消费完了,可以换生产者来生产
}
}
}
}
public static void main(String[] args) {
producerCustomer4 pc = new producerCustomer4();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
}
执行结果如下: