TypechoJoeTheme

Toasobi的博客

生产者消费者4种实现

本文最后更新于2023年10月08日,已超过346天没有更新。如果文章内容或图片资源失效,请留言反馈,我会及时处理,谢谢!

synchronized之wait与notify实现

最基础的实现方式
public class producerCoustomer1 {
    private static Integer count = 0; //当前队列中有几个生产出的物品
    private static final Integer FULL = 10; //一共能放多少个生产出的物品

    private static String LOCK = "lock"; //全局锁

    /**
     * 定义生产者
     */
    class Producer implements Runnable {

        @Override
        public void run() { //线程执行方法
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(3000); //先睡个3s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (LOCK){
                    while (count == FULL){
                        try {
                            LOCK.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count ++;
                    System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
                    LOCK.notify();
                }
            }
        }
    }

    /**
     * 定义消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    while (count == 0) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
                    LOCK.notifyAll();
                }
            }
        }
    }

    public static void main(String[] args) {
        producerCoustomer1 pc = new producerCoustomer1();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

打印出来的结果如下:
(每隔3秒出来6条数据)

可重入锁ReentrantLock的实现

这个案例没有用到重入锁的多次获取锁计数器加一的特性
ReentrantLock比synchronized安全,而要像synchronized一样实现wait和notify,则需要使用Condition对象
private static Integer count = 0;
private static final Integer FULL = 10;
//创建一个锁对象
private Lock lock = new ReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

class Producer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000); //先睡个3s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //获取锁
                lock.lock();
                try {
                    while (count == FULL) {
                        try {
                            notFull.await(); //阻塞
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName() + "生产者生产,目前共有" + count);
                    //唤醒消费者
                    notEmpty.signal();
                } finally {
                    lock.unlock();
                }

            }
        }
    }

    /**
     * 定义消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            notEmpty.await(); //阻塞
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                    //唤醒生产者
                    notFull.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        producerCoustomer2 pc = new producerCoustomer2();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

执行结果如下:

阻塞队列BlockingQueue的实现

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如>下两种:

  • 当队列满了的时候进行入队列操作
  • 当队列空了的时候进行出队列操作
    因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻
    塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
    从上可知,阻塞队列是线程安全的

此外,对于入队出队操作,使用BlockingQueue种不一样的方法失败时会得到不一样处理结果,可以自己去查阅资料

public class producerCustomer3 {
    private static volatile Integer count = 0;
    //创建一个阻塞队列
    final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); //限制容量为10


    class Producer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(1);
                    synchronized (count) {
                        count++;
                    }
                    System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    blockingQueue.take();
                    synchronized (count) {
                        count--;
                    }
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        producerCustomer3 pc = new producerCustomer3();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

对于该阻塞队列的实现方式,有两点需要注意:

  1. 阻塞队列并没有给生产消费方法加锁,所以对于共享变量count需要用volatile处理,并在修改的时候加上synchronized
    2.阻塞队列方式由于没有加锁,所以打印出来的数据并不是按照线程执行顺序打印的,但是本质上没有问题

结果如下:

信号量实现

这个方法我还多加了一个isProducerTurn用来展示生产消费不能连续的情况
public class producerCustomer4 {
    private static Integer count = 0;
    //创建3个信号量
    final Semaphore notFull = new Semaphore(10);
    final Semaphore notEmpty = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1); //加入一个mutex信号量,维护生产者消费者之间的同步关系,保证二者交替运行

    volatile boolean isProducerTurn = true;

    class Producer implements Runnable {

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(3000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                try {
                    notFull.acquire();
                    mutex.acquire();
                    while(!isProducerTurn){
                        mutex.release(); // 如果不是生产者轮次,释放锁并等待
                        notFull.release();
                        Thread.sleep(100);
                        notFull.acquire();
                        mutex.acquire();
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
                    isProducerTurn = false; // 设置为消费者轮次
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally{
                    mutex.release();
                    notEmpty.release(); //可以换消费者进来消费
                }

            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    notEmpty.acquire();
                    mutex.acquire();
                    while (isProducerTurn){
                        mutex.release(); // 如果不是生产者轮次,释放锁并等待
                        notEmpty.release();
                        Thread.sleep(100);
                        notEmpty.acquire();
                        mutex.acquire();
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                    isProducerTurn = true; // 设置为生产者轮次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notFull.release(); //消费完了,可以换生产者来生产
                }
            }
        }
    }

    public static void main(String[] args) {
        producerCustomer4 pc = new producerCustomer4();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

执行结果如下:

朗读
赞(0)
评论 (0)