DAWorkbench
0.0.1
DAWorkbench API
载入中...
搜索中...
未找到
da_concurrent_queue.hpp
1
#ifndef DA_CONCURRENT_QUEUE_H
2
#define DA_CONCURRENT_QUEUE_H
3
#include <queue>
4
#include <mutex>
5
#include <condition_variable>
6
12
template
<
typename
T >
13
class
da_concurrent_queue
14
{
15
public
:
16
using
queue_type = std::queue< T >;
17
using
value_type =
typename
queue_type::value_type;
18
using
reference =
typename
queue_type::reference;
19
using
const_reference =
typename
queue_type::const_reference;
20
using
size_type =
typename
queue_type::size_type;
21
using
mutex_type = std::mutex;
22
using
lock_guard_type = std::lock_guard< mutex_type >;
23
using
unique_lock_type = std::unique_lock< mutex_type >;
24
25
public
:
26
da_concurrent_queue
();
27
da_concurrent_queue
(size_type capacity);
28
bool
empty
()
const
;
29
std::size_t
size
()
const
;
30
void
push
(
const
T& v);
31
void
set(
const
T& v);
32
T
get
();
33
T
get
(
int
waitms);
34
35
private
:
36
size_type m_capacity;
37
queue_type m_fifo;
38
mutable
mutex_type m_mutex;
39
std::condition_variable m_pushWait;
40
std::condition_variable m_popWait;
41
};
42
46
template
<
typename
T >
47
da_concurrent_queue< T >::da_concurrent_queue
() : m_capacity(0)
48
{
49
}
50
55
template
<
typename
T >
56
da_concurrent_queue< T >::da_concurrent_queue
(size_type capacity) : m_capacity(capacity)
57
{
58
}
59
64
template
<
typename
T >
65
bool
da_concurrent_queue< T >::empty
()
const
66
{
67
std::lock_guard< std::mutex > lg(m_mutex);
68
69
return
(m_fifo.empty());
70
}
71
78
template
<
typename
T >
79
void
da_concurrent_queue< T >::push
(
const
T& v)
80
{
81
std::unique_lock< std::mutex > lg(m_mutex);
82
83
if
(m_capacity > 0) {
84
//只有限定容积时才做推入等待
85
while
(m_fifo.size() >= m_capacity) {
86
m_pushWait.wait(lg);
87
}
88
}
89
m_fifo.push(v);
90
m_popWait.notify_one();
91
}
92
93
template
<
typename
T >
94
void
da_concurrent_queue< T >::set
(
const
T& v)
95
{
96
push(v);
97
}
98
103
template
<
typename
T >
104
T
da_concurrent_queue< T >::get
()
105
{
106
std::unique_lock< std::mutex > lg(m_mutex);
107
108
while
(m_fifo.empty()) {
109
m_popWait.wait(lg);
110
}
111
T v = m_fifo.front();
112
113
m_fifo.pop();
114
if
(m_capacity > 0) {
115
//如果有推出,则推入的等待可以唤醒
116
m_pushWait.notify_one();
117
}
118
//通过移动语义,避免拷贝(不用显示声明std::move(v),编译器会优化)
119
return
v;
120
}
121
127
template
<
typename
T >
128
T
da_concurrent_queue< T >::get
(
int
waitms)
129
{
130
std::unique_lock< std::mutex > lg(m_mutex);
131
if
(!m_popWait.wait_for(lg, std::chrono::milliseconds(waitms), [
this
] { return !(this->m_fifo.empty()); })) {
132
return
T();
133
}
134
T v = m_fifo.front();
135
136
m_fifo.pop();
137
if
(m_capacity > 0) {
138
//如果有推出,则推入的等待可以唤醒
139
m_pushWait.notify_one();
140
}
141
//通过移动语义,避免拷贝(不用显示声明std::move(v),编译器会优化)
142
return
v;
143
}
144
149
template
<
typename
T >
150
std::size_t
da_concurrent_queue< T >::size
()
const
151
{
152
std::lock_guard< std::mutex > lg(m_mutex);
153
154
return
(m_fifo.size());
155
}
156
157
#endif
// SAFEQUEUE_H
da_concurrent_queue
这个是专门为生产者消费者实现的安全FIFO
Definition
da_concurrent_queue.hpp:14
da_concurrent_queue::empty
bool empty() const
队列是否为空
Definition
da_concurrent_queue.hpp:65
da_concurrent_queue::size
std::size_t size() const
获取队列的尺寸
Definition
da_concurrent_queue.hpp:150
da_concurrent_queue::get
T get()
推出
Definition
da_concurrent_queue.hpp:104
da_concurrent_queue::push
void push(const T &v)
推入fifo
Definition
da_concurrent_queue.hpp:79
da_concurrent_queue::da_concurrent_queue
da_concurrent_queue()
构造函数,不限制容量的队列
Definition
da_concurrent_queue.hpp:47
src
DAShared
da_concurrent_queue.hpp
制作者
1.9.8