模式理解
生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中常用的一种设计模式。
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
使用 wait()、 notify() 实现生产者消费者模式
1. 两个方法只能运行在同步方法或者同步代码块中
2. 调用wait/notify方法前,必须获得调用对象的锁
3. 调用obj.wait(),当wait执行时,就会先执行释放obj锁
4. wait/notify是定义在Object内的方法
5. wait必须放在while循环中,因为wait调用后,线程会挂起,当notify唤醒线程后,线程会解除wait状态,重新去参与竞争对象锁,当得到锁后,会继续执行wait后面的代码,用while的话还会进行条件判断,避免【虚假唤醒】的出现,而if的话并不进行条件判断,从而有可能导致bug
6. wait/notify必须放在同步块中,如果不放在同步块中,由于cpu执行的随机性,有可能会出现我们预期之外的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public class ProducerConsumer {
private static int produceNumber = 0; private static int consumeNumber = 0;
public static class Storage { private static final int maxSize = 4; final List<Object> productPool = new LinkedList<>();
public void product(Object obi) { synchronized (productPool) { while (productPool.size() > maxSize) { try { productPool.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } productPool.add(obi); productPool.notify(); produceNumber++; } }
public Object consume() { Object result; synchronized (productPool) { while (productPool.isEmpty()) { try { productPool.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } result = productPool.remove(0); productPool.notify(); consumeNumber++; } return result; } }
public static class Producer extends Thread { private Storage storage;
public Producer(Storage storage) { this.storage = storage; }
@Override public void run() { for (int i = 0; i < 4; i++) { Object obj = new Object(); storage.product(obj); System.out.println("Producer-" + Thread.currentThread().getName() + ": produce " + i); } } }
public static class Consumer extends Thread { private Storage storage;
public Consumer(Storage storage) { this.storage = storage; }
@Override public void run() { for (int i = 0; i < 6; i++) { storage.consume(); System.out.println("Consumer-" + Thread.currentThread().getName() + ": consume " + i); } } }
public static void main(String[] args) { Storage storage = new Storage();
Producer producer1 = new Producer(storage); Producer producer2 = new Producer(storage); Producer producer3 = new Producer(storage);
Consumer consumer1 = new Consumer(storage); Consumer consumer2 = new Consumer(storage);
producer1.start(); producer2.start(); producer3.start();
consumer1.start(); consumer2.start();
try { producer1.join(); producer2.join(); producer3.join(); consumer1.join(); consumer2.join(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("Finish" + " produce: " + produceNumber + " consume: " + consumeNumber); } }
|
结果输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Producer-Thread-2: produce 0 Consumer-Thread-3: consume 0 Producer-Thread-0: produce 0 Producer-Thread-0: produce 1 Producer-Thread-0: produce 2 Producer-Thread-1: produce 0 Producer-Thread-0: produce 3 Consumer-Thread-3: consume 1 Consumer-Thread-4: consume 0 Producer-Thread-2: produce 1 Consumer-Thread-4: consume 1 Consumer-Thread-3: consume 2 Producer-Thread-1: produce 1 Producer-Thread-1: produce 2 Consumer-Thread-3: consume 3 Consumer-Thread-4: consume 2 Producer-Thread-2: produce 2 Consumer-Thread-4: consume 3 Consumer-Thread-3: consume 4 Producer-Thread-1: produce 3 Consumer-Thread-3: consume 5 Consumer-Thread-4: consume 4 Consumer-Thread-4: consume 5 Producer-Thread-2: produce 3 Finish produce: 12 consume: 12
|
注:当我们调用某个线程的 join() 方法后,将会挂起调用线程,直到被调用线程执行完毕,调用线程才会继续执行。
使用 BlockingQueue 实现生产者消费者
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞到有空间可用;如果队列为空,那么take方法将会阻塞到有元素可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| public class BlockQueueProducerConsumer {
private static int produceNumber = 0; private static int consumeNumber = 0;
public static class Storage { final BlockingQueue<Object> productPool = new LinkedBlockingQueue<>(4);
public void product(Object obi) { try { produceNumber++; productPool.put(obi); } catch (InterruptedException e) { e.printStackTrace(); } }
public Object consume() { try { consumeNumber++; return productPool.take(); } catch (InterruptedException e) { e.printStackTrace(); return null; } } }
public static class Producer extends Thread { private Storage storage;
public Producer(Storage storage) { this.storage = storage; }
@Override public void run() { for (int i = 0; i < 4; i++) { Object obj = new Object(); storage.product(obj); System.out.println("Producer-" + Thread.currentThread().getName() + ": produce " + i); } } }
public static class Consumer extends Thread { private Storage storage;
public Consumer(Storage storage) { this.storage = storage; }
@Override public void run() { for (int i = 0; i < 6; i++) { storage.consume(); System.out.println("Consumer-" + Thread.currentThread().getName() + ": consume " + i); } } }
public static void main(String[] args) { Storage storage = new Storage();
Producer producer1 = new Producer(storage); Producer producer2 = new Producer(storage); Producer producer3 = new Producer(storage);
Consumer consumer1 = new Consumer(storage); Consumer consumer2 = new Consumer(storage);
producer1.start(); producer2.start(); producer3.start();
consumer1.start(); consumer2.start();
try { producer1.join(); producer2.join(); producer3.join(); consumer1.join(); consumer2.join(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("Finish" + " produce: " + produceNumber + " consume: " + consumeNumber); } }
|
结果输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Consumer-Thread-3: consume 0 Producer-Thread-2: produce 0 Producer-Thread-1: produce 0 Producer-Thread-0: produce 0 Producer-Thread-1: produce 1 Producer-Thread-2: produce 1 Consumer-Thread-4: consume 0 Consumer-Thread-3: consume 1 Consumer-Thread-4: consume 1 Producer-Thread-1: produce 2 Producer-Thread-0: produce 1 Producer-Thread-1: produce 3 Consumer-Thread-4: consume 2 Producer-Thread-2: produce 2 Consumer-Thread-3: consume 2 Consumer-Thread-3: consume 3 Consumer-Thread-3: consume 4 Consumer-Thread-3: consume 5 Producer-Thread-2: produce 3 Consumer-Thread-4: consume 3 Consumer-Thread-4: consume 4 Producer-Thread-0: produce 2 Producer-Thread-0: produce 3 Consumer-Thread-4: consume 5 Finish produce: 12 consume: 12
|