DAWorkbench 0.0.1
DAWorkbench API
载入中...
搜索中...
未找到
da_concurrent_queue.hpp
1#ifndef DA_CONCURRENT_QUEUE_H
2#define DA_CONCURRENT_QUEUE_H
3#include <queue>
4#include <mutex>
5#include <condition_variable>
6
12template< typename T >
14{
15public:
16 using queue_type = std::queue< T >;
17 using value_type = typename queue_type::value_type;
18 using reference = typename queue_type::reference;
19 using const_reference = typename queue_type::const_reference;
20 using size_type = typename queue_type::size_type;
21 using mutex_type = std::mutex;
22 using lock_guard_type = std::lock_guard< mutex_type >;
23 using unique_lock_type = std::unique_lock< mutex_type >;
24
25public:
27 da_concurrent_queue(size_type capacity);
28 bool empty() const;
29 std::size_t size() const;
30 void push(const T& v);
31 void set(const T& v);
32 T get();
33 T get(int waitms);
34
35private:
36 size_type m_capacity;
37 queue_type m_fifo;
38 mutable mutex_type m_mutex;
39 std::condition_variable m_pushWait;
40 std::condition_variable m_popWait;
41};
42
46template< typename T >
50
55template< typename T >
56da_concurrent_queue< T >::da_concurrent_queue(size_type capacity) : m_capacity(capacity)
57{
58}
59
64template< typename T >
66{
67 std::lock_guard< std::mutex > lg(m_mutex);
68
69 return (m_fifo.empty());
70}
71
78template< typename T >
80{
81 std::unique_lock< std::mutex > lg(m_mutex);
82
83 if (m_capacity > 0) {
84 //只有限定容积时才做推入等待
85 while (m_fifo.size() >= m_capacity) {
86 m_pushWait.wait(lg);
87 }
88 }
89 m_fifo.push(v);
90 m_popWait.notify_one();
91}
92
93template< typename T >
94void da_concurrent_queue< T >::set(const T& v)
95{
96 push(v);
97}
98
103template< typename T >
105{
106 std::unique_lock< std::mutex > lg(m_mutex);
107
108 while (m_fifo.empty()) {
109 m_popWait.wait(lg);
110 }
111 T v = m_fifo.front();
112
113 m_fifo.pop();
114 if (m_capacity > 0) {
115 //如果有推出,则推入的等待可以唤醒
116 m_pushWait.notify_one();
117 }
118 //通过移动语义,避免拷贝(不用显示声明std::move(v),编译器会优化)
119 return v;
120}
121
127template< typename T >
129{
130 std::unique_lock< std::mutex > lg(m_mutex);
131 if (!m_popWait.wait_for(lg, std::chrono::milliseconds(waitms), [this] { return !(this->m_fifo.empty()); })) {
132 return T();
133 }
134 T v = m_fifo.front();
135
136 m_fifo.pop();
137 if (m_capacity > 0) {
138 //如果有推出,则推入的等待可以唤醒
139 m_pushWait.notify_one();
140 }
141 //通过移动语义,避免拷贝(不用显示声明std::move(v),编译器会优化)
142 return v;
143}
144
149template< typename T >
151{
152 std::lock_guard< std::mutex > lg(m_mutex);
153
154 return (m_fifo.size());
155}
156
157#endif // SAFEQUEUE_H
这个是专门为生产者消费者实现的安全FIFO
Definition da_concurrent_queue.hpp:14
bool empty() const
队列是否为空
Definition da_concurrent_queue.hpp:65
std::size_t size() const
获取队列的尺寸
Definition da_concurrent_queue.hpp:150
T get()
推出
Definition da_concurrent_queue.hpp:104
void push(const T &v)
推入fifo
Definition da_concurrent_queue.hpp:79
da_concurrent_queue()
构造函数,不限制容量的队列
Definition da_concurrent_queue.hpp:47