谈一谈并发队列的实现

简介

本文将谈一谈并发队列,讲解集中可手写的并发队列的实现方式,以及介绍几种开源并发队列的实现,当然都是c++版本的。
可手撸的并发队列实现起来相对简单,面试的时候可以撸一撸,简单生产环境也可以用一用;
当然,实际生产环境中,还是建议直接用高性能的开源实现。
可手撸的版本包括,单锁队列、双锁队列、原子队列;开源并发队列包括boost中的并发队列、tbb中的并发队列、folly中的并发队列、moodycamel中的并发队列。

单锁队列实现

单锁队列是一种实现简单的并发队列,它通过一个锁控制入队和出队,通过两个条件变量分别控制队列空和队列满。
数据入队之前判断队列是否满,如果满了,则等待full条件变量;出队前判断队列是否为空,如果为空,则等待empty条件变量。当队列满时出队则唤醒full条件变量;当队列为空并入队时,唤醒empty条件变量。

代码如下:

#pragma once

#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>

class BlockedQueue {
 public:
  explicit BlockedQueue(size_t capacity) : capacity_(capacity) {}

  bool TryPush(int value) {
    std::unique_lock lock(mutex_);
    if (data_.size() >= capacity_) {
      return false;
    }
    Enqueue(value);
    return true;
  }

  bool TryPop(int *value) {
    if (value == nullptr) {
      return false;
    }
    std::unique_lock lock(mutex_);
    if (!data_.empty()) {
      return false;
    }
    Dequeue(value);
    return true;
  }

  void Push(int value) {
    std::unique_lock lock(mutex_);
    if (data_.size() >= capacity_) {
      full_.wait(lock);
    }
    Enqueue(value);
  }

  void Pop(int *value) {
    if (value == nullptr) {
      return;
    }
    std::unique_lock lock(mutex_);
    if (data_.empty()) {
      empty_.wait(lock);
    }
    Dequeue(value);
  }
 private:
  void Enqueue(int value) {
    data_.push(value);
    if (data_.size() == 1) {
      empty_.notify_one();
    }
  }

  void Dequeue(int* value) {
    *value = data_.front();
    data_.pop();
    if (data_.size() == capacity_ - 1) {
      full_.notify_one();
    }
  }

 private:
  std::mutex mutex_;
  std::condition_variable full_;
  std::condition_variable empty_;
  std::queue<int> data_;
  const size_t capacity_;
};

上面的实现可以用单链表代替std::queue。
这种结构实现简单,临界区较小,可以在请求维度的数据中使用。

双锁队列实现

双锁队列是另一种并发队列,使用两把锁分别控制入队和出队,性能相较于单锁的稍微强点。
双锁队列还包括两个条件变量,分别用于表示队列满和队列空;同时还拥有一个原子变量记录当前队列长度。
入队列时,先等待“非满”条件变量,然后入队列,如果队列没满,则唤醒“非满”变量。如果入队前队列是空的,入队后就要唤醒“非空”变量。
出队时,先等待“非空”条件变量,然后出队,如果队列还有数据,则唤醒“非空”变量。如果出队之前队列时非空的,则出队后唤醒“非满”变量。

代码如下:

#pragma once

#include <atomic>
#include <mutex>
#include <condition_variable>

struct Node {
  int val;
  Node* next{nullptr};
  explicit Node(int value = 0): val(value) { }
};

class LinkedBlockQueue {
 public:
  LinkedBlockQueue(size_t size): capacity_(size) {
    head_ = new Node;
    tail_ = head_;
  }

  bool TryPush(int value) {
    if (count_.load() >= capacity_) {
      return false;
    }
    std::unique_lock lock(push_mutex_);
    Enqueue(value);
    auto current = count_.fetch_add(1);
    if (current + 1 < capacity_) {
      not_full_.notify_one();
    }
    if (current == 0) {
      NotifyNotEmpty();
    }

    return true;
  }

  bool TryPop(int* value) {
    if (value == nullptr) {
      return false;
    }
    if (count_.load() == 0) {
      return false;
    }
    std::unique_lock lock(pop_mutex_);
    Dequeue(value);
    auto current = count_.fetch_sub(1);
    if (current - 1 == 0) {
      not_empty_.notify_one();
    }
    if (current == capacity_) {
      NotifyNotFull();
    }

    return true;
  }

  void Push(int value) {
    size_t current = 0;
    {
      std::unique_lock lock(push_mutex_);
      not_full_.wait(lock, [&](){ return count_.load() < capacity_;});
      Enqueue(value);
      current = count_.fetch_add(1);
      if (current - 1 < capacity_) {
        not_full_.notify_one();
      }
    }
    if (current == 0) {
      NotifyNotEmpty();
    }
  }

  void Pop(int* value) {
    if (value == nullptr) {
      return;
    }
    size_t current = 0;
    {
      std::unique_lock lock(pop_mutex_);
      not_empty_.wait(lock, [&](){ return count_.load() > 0; });
      Dequeue(value);
      current = count_.fetch_sub(1);
      if (current > 1) {
        not_empty_.notify_one();
      }
    }
    if (current == capacity_) {
      NotifyNotFull();
    }
  }

 private:
  void Enqueue(int value) {
    tail_->next = new Node(value);
    tail_ = tail_->next;
  }

  void Dequeue(int* value) {
    auto* node = head_->next;
    *value = node->val;
    head_->next = node->next;
    delete node;
  }

  void NotifyNotFull() {
    std::unique_lock lock(push_mutex_);
    not_full_.notify_one();
  }

  void NotifyNotEmpty() {
    std::unique_lock lock(pop_mutex_);
    not_empty_.notify_one();
  }

 private:
  Node* head_;
  Node* tail_;
  std::mutex push_mutex_;
  std::mutex pop_mutex_;
  std::condition_variable not_full_;
  std::condition_variable not_empty_;
  const size_t capacity_;
  std::atomic_size_t count_{0};
};

代码稍微复杂一点,不过还是可以手工实现的。

原子队列实现

boost中并发队列实现

tbb中并发队列实现

folly中并发队列实现

moodycamel中并发队列实现

性能比较