JAVA语言经典并发问题:生产者-消费者
小标 2018-07-19 来源 : 阅读 971 评论 0

摘要:本文主要向大家介绍了JAVA语言经典并发问题:生产者-消费者,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了JAVA语言经典并发问题:生产者-消费者,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

问题描述

生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:

· 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;

· 生产者、消费者不可同时访问缓冲区;

· 生产者不可向满缓冲区放产品;

· 消费者不可从空缓冲区取产品。

信号量解法

public class PCSemaphore {

    private final static int BUFFER_SIZE = 10;

 

    public static void main(String[] args) {

        Semaphore mutex = new Semaphore(1);

        Semaphore full = new Semaphore(0);

        Semaphore empty = new Semaphore(BUFFER_SIZE);

 

        Queue<Integer> buffer = new LinkedList<>();

 

        Producer producer = new Producer(mutex, empty, full, buffer);

        Consumer consumer = new Consumer(mutex, empty, full, buffer);

 

        // 可以初始化多个生产者、消费者

        new Thread(producer, "p1").start();

        new Thread(producer, "p2").start();

        new Thread(consumer, "c1").start();

        new Thread(consumer, "c2").start();

        new Thread(consumer, "c3").start();

    }

}

class Producer implements Runnable {

    private Semaphore mutex, empty, full;

    private Queue<Integer> buffer;

    private Integer counter = 0;

 

    public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {

        this.mutex = mutex;

        this.empty = empty;

        this.full = full;

        this.buffer = buffer;

    }

 

    @Override

    public void run() {

        while (true) {

            try {

                empty.acquire();

                mutex.acquire();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

            buffer.offer(counter++);

            

            try {

                Thread.sleep(100);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

            full.release();

            mutex.release();

        }

    }

}

class Consumer implements Runnable {

    private Semaphore mutex, empty, full;

    private Queue<Integer> buffer;

 

    public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {

        this.mutex = mutex;

        this.empty = empty;

        this.full = full;

        this.buffer = buffer;

    }

 

    @Override

    public void run() {

        String threadName = Thread.currentThread().getName();

        

        while (true) {

            try {

                full.acquire();

                mutex.acquire();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

            Integer product = buffer.poll();

            int left = buffer.size();

            System.out.printf("%s consumed %d left %d%n", threadName, product, left);

            

            try {

                Thread.sleep(200);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

            empty.release();

            mutex.release();

        }

    }

}

教科书的解法,问题有三:

1. 

不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:

同样地不可交换生产者中的empty和mutex信号量的请求顺序。
即必须遵守先资源信号量,再互斥信号量的请求顺序。
教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。

2. 

将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。

3. 

效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。

注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。

wait() & notify()

public class PCWaitNotify {

    public final static int BUFFER_SIZE = 10;

 

    public static void main(String[] args) {

        Object mutex = new Object();

        AtomicInteger counter = new AtomicInteger(0);

        Queue<Integer> buffer = new LinkedList<>();

 

        Producer producer = new Producer(buffer, counter, mutex);

        Consumer consumer = new Consumer(buffer, mutex);

 

        new Thread(producer).start();

        new Thread(producer).start();

        new Thread(consumer).start();

    }

}

class Producer implements Runnable {

    private Random rand = new Random();

    private Queue<Integer> buffer;

    private AtomicInteger counter; // 支持原子操作的基本类型包装类

    private Object mutex;

 

    public Producer(Queue<Integer> buffer, AtomicInteger counter, Object mutex) {

        this.buffer = buffer;

        this.counter = counter;

        this.mutex = mutex;

    }

 

    @Override

    public void run() {

        while (true) {

            synchronized (mutex) {

                while (buffer.size() == PCWaitNotify.BUFFER_SIZE) {

                    try {

                        mutex.wait();

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

 

                buffer.offer(counter.incrementAndGet());

                mutex.notify();

            }

            

            try {

                Thread.sleep(rand.nextInt(800));

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

class Consumer implements Runnable {

    private Random rand = new Random();

    private Queue<Integer> buffer;

    private Object mutex;

 

    public Consumer(Queue<Integer> buffer, Object mutex) {

        this.buffer = buffer;

        this.mutex = mutex;

    }

 

    @Override

    public void run() {

        while (true) {

            synchronized (mutex) {

                while (buffer.size() == 0) {

                    try {

                        mutex.wait();

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

                

                System.out.println("consumed " + buffer.poll() + " left " + buffer.size());

                mutex.notify();

            }

            try {

                Thread.sleep(rand.nextInt(500));

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。

BlockingQueue

public class PCBlockingQueue {

    private final static int BUFFER_SIZE = 10;

 

    public static void main(String[] args) {

        BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(BUFFER_SIZE);

        AtomicInteger counter = new AtomicInteger(0);

        

        Producer producer = new Producer(buffer, counter);

        Consumer consumer = new Consumer(buffer);

        

        new Thread(producer).start();

        new Thread(producer).start();

        new Thread(consumer).start();

    }

}

class Producer implements Runnable {

    private Random rand = new Random();

    private AtomicInteger counter;

    private BlockingQueue<Integer> buffer;

 

    public Producer(BlockingQueue<Integer> buffer, AtomicInteger counter) {

        this.buffer = buffer;

        this.counter = counter;

    }

 

    @Override

    public void run() {

        while (true) {

            try {

                Thread.sleep(rand.nextInt(800));

                Integer product = counter.incrementAndGet();

                buffer.put(product);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

class Consumer implements Runnable {

    private Random rand = new Random();

    private BlockingQueue<Integer> buffer;

    

    public Consumer(BlockingQueue<Integer> buffer) {

        this.buffer = buffer;

    }

 

    @Override

    public void run() {

        while (true) {

            try {

                Thread.sleep(rand.nextInt(600));

                Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。

                System.out.println("consumed " + product + " left " + buffer.size());

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!


本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程