阻塞队列
用C++简易实现生产者消费者模型,即实现一个阻塞队列;
之前Web服务器项中的异步日志系统也用到了该模型;
- 生产者生产数据到缓冲区中,消费者从缓冲区中取数据;
- 缓冲区满时,生产者线程阻塞,进入等待状态。
- 缓冲区空时,消费者线程阻塞,进入等待状态。
- 生产者生产资源后,会唤醒可能等待着的消费者;
- 消费者消耗资源后,会唤醒可能阻塞着的生产者;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
#include <bits/stdc++.h>
template <class T> class BlockQueue { private: std::queue<T> que_; std::mutex mtx_; size_t capacity_;
std::condition_variable cv_consumer_; std::condition_variable cv_producer_;
public: explicit BlockQueue(size_t MaxCapacity = 100) : capacity_(MaxCapacity) { assert(MaxCapacity > 0); }
void push(const T &item); T pop(); };
template <typename T> void BlockQueue<T>::push(const T &item) { std::unique_lock<std::mutex> locker(mtx_);
while (que_.size() >= capacity_) { cv_producer_.wait(locker); }
que_.push(item);
cv_consumer_.notify_one(); }
template <typename T> T BlockQueue<T>::pop() { std::unique_lock<std::mutex> locker(mtx_);
while (que_.empty()) { cv_consumer_.wait(locker); }
T item = que_.front(); que_.pop();
cv_producer_.notify_one();
return item; }
void producer(BlockQueue<int> &q) { for (int i = 0; i < 10; i++) { q.push(i); std::cout << "push: " << i << std::endl; } }
void comsumer(BlockQueue<int> &q) { for (int i = 0; i < 10; i++) { int item = q.pop(); std::cout << "pop:" << item << std::endl; } }
int main() { BlockQueue<int> bq(5);
std::thread t1(producer, std::ref(bq)); std::thread t2(comsumer, std::ref(bq));
t1.join(); t2.join();
return 0; }
|
编译用:
g++ BlockQueue.cpp -o bq -pthread