谈一谈并发队列的实现

简介

本文将谈一谈并发队列,讲解集中可手写的并发队列的实现方式,以及介绍几种开源并发队列的实现,当然都是c++版本的。
可手撸的并发队列实现起来相对简单,面试的时候可以撸一撸,简单生产环境也可以用一用;
当然,实际生产环境中,还是建议直接用高性能的开源实现。
可手撸的版本包括,单锁队列、双锁队列、原子队列;开源并发队列包括boost中的并发队列、tbb中的并发队列、folly中的并发队列、moodycamel中的并发队列。

单锁队列实现

单锁队列是一种实现简单的并发队列,它通过一个锁控制入队和出队,通过两个条件变量分别控制队列空和队列满。
数据入队之前判断队列是否满,如果满了,则等待full条件变量;出队前判断队列是否为空,如果为空,则等待empty条件变量。当队列满时出队则唤醒full条件变量;当队列为空并入队时,唤醒empty条件变量。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#pragma once

#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>

class BlockedQueue {
public:
explicit BlockedQueue(size_t capacity) : capacity_(capacity) {}

bool TryPush(int value) {
std::unique_lock lock(mutex_);
if (data_.size() >= capacity_) {
return false;
}
Enqueue(value);
return true;
}

bool TryPop(int *value) {
if (value == nullptr) {
return false;
}
std::unique_lock lock(mutex_);
if (!data_.empty()) {
return false;
}
Dequeue(value);
return true;
}

void Push(int value) {
std::unique_lock lock(mutex_);
if (data_.size() >= capacity_) {
full_.wait(lock);
}
Enqueue(value);
}

void Pop(int *value) {
if (value == nullptr) {
return;
}
std::unique_lock lock(mutex_);
if (data_.empty()) {
empty_.wait(lock);
}
Dequeue(value);
}
private:
void Enqueue(int value) {
data_.push(value);
if (data_.size() == 1) {
empty_.notify_one();
}
}

void Dequeue(int* value) {
*value = data_.front();
data_.pop();
if (data_.size() == capacity_ - 1) {
full_.notify_one();
}
}

private:
std::mutex mutex_;
std::condition_variable full_;
std::condition_variable empty_;
std::queue<int> data_;
const size_t capacity_;
};

上面的实现可以用单链表代替std::queue。
这种结构实现简单,临界区较小,可以在请求维度的数据中使用。

双锁队列实现

双锁队列是另一种并发队列,使用两把锁分别控制入队和出队,性能相较于单锁的稍微强点。
双锁队列还包括两个条件变量,分别用于表示队列满和队列空;同时还拥有一个原子变量记录当前队列长度。
入队列时,先等待“非满”条件变量,然后入队列,如果队列没满,则唤醒“非满”变量。如果入队前队列是空的,入队后就要唤醒“非空”变量。
出队时,先等待“非空”条件变量,然后出队,如果队列还有数据,则唤醒“非空”变量。如果出队之前队列时非空的,则出队后唤醒“非满”变量。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#pragma once

#include <atomic>
#include <mutex>
#include <condition_variable>

struct Node {
int val;
Node* next{nullptr};
explicit Node(int value = 0): val(value) { }
};

class LinkedBlockQueue {
public:
LinkedBlockQueue(size_t size): capacity_(size) {
head_ = new Node;
tail_ = head_;
}

bool TryPush(int value) {
if (count_.load() >= capacity_) {
return false;
}
std::unique_lock lock(push_mutex_);
Enqueue(value);
auto current = count_.fetch_add(1);
if (current + 1 < capacity_) {
not_full_.notify_one();
}
if (current == 0) {
NotifyNotEmpty();
}

return true;
}

bool TryPop(int* value) {
if (value == nullptr) {
return false;
}
if (count_.load() == 0) {
return false;
}
std::unique_lock lock(pop_mutex_);
Dequeue(value);
auto current = count_.fetch_sub(1);
if (current - 1 == 0) {
not_empty_.notify_one();
}
if (current == capacity_) {
NotifyNotFull();
}

return true;
}

void Push(int value) {
size_t current = 0;
{
std::unique_lock lock(push_mutex_);
not_full_.wait(lock, [&](){ return count_.load() < capacity_;});
Enqueue(value);
current = count_.fetch_add(1);
if (current - 1 < capacity_) {
not_full_.notify_one();
}
}
if (current == 0) {
NotifyNotEmpty();
}
}

void Pop(int* value) {
if (value == nullptr) {
return;
}
size_t current = 0;
{
std::unique_lock lock(pop_mutex_);
not_empty_.wait(lock, [&](){ return count_.load() > 0; });
Dequeue(value);
current = count_.fetch_sub(1);
if (current > 1) {
not_empty_.notify_one();
}
}
if (current == capacity_) {
NotifyNotFull();
}
}

private:
void Enqueue(int value) {
tail_->next = new Node(value);
tail_ = tail_->next;
}

void Dequeue(int* value) {
auto* node = head_->next;
*value = node->val;
head_->next = node->next;
delete node;
}

void NotifyNotFull() {
std::unique_lock lock(push_mutex_);
not_full_.notify_one();
}

void NotifyNotEmpty() {
std::unique_lock lock(pop_mutex_);
not_empty_.notify_one();
}

private:
Node* head_;
Node* tail_;
std::mutex push_mutex_;
std::mutex pop_mutex_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
const size_t capacity_;
std::atomic_size_t count_{0};
};

代码稍微复杂一点,不过还是可以手工实现的。

原子队列实现

boost中并发队列实现

tbb中并发队列实现

folly中并发队列实现

moodycamel中并发队列实现

性能比较