生产者消费者模式示例

模式理解

生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中常用的一种设计模式。

在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。


使用 wait()、 notify() 实现生产者消费者模式


1. 两个方法只能运行在同步方法或者同步代码块中
2. 调用wait/notify方法前,必须获得调用对象的锁
3. 调用obj.wait(),当wait执行时,就会先执行释放obj锁
4. wait/notify是定义在Object内的方法
5. wait必须放在while循环中,因为wait调用后,线程会挂起,当notify唤醒线程后,线程会解除wait状态,重新去参与竞争对象锁,当得到锁后,会继续执行wait后面的代码,用while的话还会进行条件判断,避免【虚假唤醒】的出现,而if的话并不进行条件判断,从而有可能导致bug
6. wait/notify必须放在同步块中,如果不放在同步块中,由于cpu执行的随机性,有可能会出现我们预期之外的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class ProducerConsumer {

private static int produceNumber = 0;
private static int consumeNumber = 0;

public static class Storage {
private static final int maxSize = 4;
final List<Object> productPool = new LinkedList<>();

public void product(Object obi) {
synchronized (productPool) {
while (productPool.size() > maxSize) {
try {
productPool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productPool.add(obi);
productPool.notify();
produceNumber++;
}
}

public Object consume() {
Object result;
synchronized (productPool) {
while (productPool.isEmpty()) {
try {
productPool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = productPool.remove(0);
productPool.notify();
consumeNumber++;
}
return result;
}
}

public static class Producer extends Thread {
private Storage storage;

public Producer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
for (int i = 0; i < 4; i++) {
Object obj = new Object();
storage.product(obj);
System.out.println("Producer-" + Thread.currentThread().getName() + ": produce " + i);
}
}
}

public static class Consumer extends Thread {
private Storage storage;

public Consumer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
for (int i = 0; i < 6; i++) {
storage.consume();
System.out.println("Consumer-" + Thread.currentThread().getName() + ": consume " + i);
}
}
}

public static void main(String[] args) {
Storage storage = new Storage();

Producer producer1 = new Producer(storage);
Producer producer2 = new Producer(storage);
Producer producer3 = new Producer(storage);

Consumer consumer1 = new Consumer(storage);
Consumer consumer2 = new Consumer(storage);

producer1.start();
producer2.start();
producer3.start();


consumer1.start();
consumer2.start();

try {
producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Finish" + " produce: " + produceNumber + " consume: " + consumeNumber);
}
}

结果输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Producer-Thread-2: produce 0
Consumer-Thread-3: consume 0
Producer-Thread-0: produce 0
Producer-Thread-0: produce 1
Producer-Thread-0: produce 2
Producer-Thread-1: produce 0
Producer-Thread-0: produce 3
Consumer-Thread-3: consume 1
Consumer-Thread-4: consume 0
Producer-Thread-2: produce 1
Consumer-Thread-4: consume 1
Consumer-Thread-3: consume 2
Producer-Thread-1: produce 1
Producer-Thread-1: produce 2
Consumer-Thread-3: consume 3
Consumer-Thread-4: consume 2
Producer-Thread-2: produce 2
Consumer-Thread-4: consume 3
Consumer-Thread-3: consume 4
Producer-Thread-1: produce 3
Consumer-Thread-3: consume 5
Consumer-Thread-4: consume 4
Consumer-Thread-4: consume 5
Producer-Thread-2: produce 3
Finish produce: 12 consume: 12

注:当我们调用某个线程的 join() 方法后,将会挂起调用线程,直到被调用线程执行完毕,调用线程才会继续执行。


使用 BlockingQueue 实现生产者消费者


阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞到有空间可用;如果队列为空,那么take方法将会阻塞到有元素可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class BlockQueueProducerConsumer {

private static int produceNumber = 0;
private static int consumeNumber = 0;

public static class Storage {
final BlockingQueue<Object> productPool = new LinkedBlockingQueue<>(4);

public void product(Object obi) {
try {
produceNumber++;
productPool.put(obi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public Object consume() {
try {
consumeNumber++;
return productPool.take();
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
}

public static class Producer extends Thread {
private Storage storage;

public Producer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
for (int i = 0; i < 4; i++) {
Object obj = new Object();
storage.product(obj);
System.out.println("Producer-" + Thread.currentThread().getName() + ": produce " + i);
}
}
}

public static class Consumer extends Thread {
private Storage storage;

public Consumer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
for (int i = 0; i < 6; i++) {
storage.consume();
System.out.println("Consumer-" + Thread.currentThread().getName() + ": consume " + i);
}
}
}

public static void main(String[] args) {
Storage storage = new Storage();

Producer producer1 = new Producer(storage);
Producer producer2 = new Producer(storage);
Producer producer3 = new Producer(storage);

Consumer consumer1 = new Consumer(storage);
Consumer consumer2 = new Consumer(storage);

producer1.start();
producer2.start();
producer3.start();


consumer1.start();
consumer2.start();

try {
producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Finish" + " produce: " + produceNumber + " consume: " + consumeNumber);
}
}

结果输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Consumer-Thread-3: consume 0
Producer-Thread-2: produce 0
Producer-Thread-1: produce 0
Producer-Thread-0: produce 0
Producer-Thread-1: produce 1
Producer-Thread-2: produce 1
Consumer-Thread-4: consume 0
Consumer-Thread-3: consume 1
Consumer-Thread-4: consume 1
Producer-Thread-1: produce 2
Producer-Thread-0: produce 1
Producer-Thread-1: produce 3
Consumer-Thread-4: consume 2
Producer-Thread-2: produce 2
Consumer-Thread-3: consume 2
Consumer-Thread-3: consume 3
Consumer-Thread-3: consume 4
Consumer-Thread-3: consume 5
Producer-Thread-2: produce 3
Consumer-Thread-4: consume 3
Consumer-Thread-4: consume 4
Producer-Thread-0: produce 2
Producer-Thread-0: produce 3
Consumer-Thread-4: consume 5
Finish produce: 12 consume: 12





文章目录
  1. 1. 模式理解
  2. 2. 使用 wait()、 notify() 实现生产者消费者模式
  3. 3. 使用 BlockingQueue 实现生产者消费者