root/OpenSceneGraph/trunk/src/osgPlugins/ffmpeg/BoundedMessageQueue.hpp @ 9816

Revision 9816, 5.5 kB (checked in by robert, 6 years ago)

From Tanguy Fautre (Aris Technologies), ffmpeg plugin

RevLine 
[9816]1
2#ifndef HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H
3#define HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H
4
5#include <OpenThreads/Condition>
6#include <OpenThreads/Mutex>
7#include <OpenThreads/ScopedLock>
8
9#include <cassert>
10#include <algorithm>
11#include <vector>
12
13
14
15namespace osgFFmpeg {
16
17
18
19template <class T>
20class BoundedMessageQueue
21{
22public:
23
24    typedef T value_type;
25    typedef size_t size_type;
26
27    explicit BoundedMessageQueue(size_type capacity);
28    ~BoundedMessageQueue();
29
30    void clear();
31
32    template <class Destructor>
33    void flush(const Destructor destructor);
34
35    void push(const value_type & value);
36    bool tryPush(const value_type & value);
37    bool timedPush(const value_type & value, unsigned long ms);
38
39    value_type pop();
40    value_type tryPop(bool & is_empty);
41    value_type timedPop(bool & is_empty, unsigned long ms);
42
43private:
44
45    BoundedMessageQueue(const BoundedMessageQueue &);
46    BoundedMessageQueue & operator = (const BoundedMessageQueue &);
47
48    typedef std::vector<T> Buffer;
49    typedef OpenThreads::Condition Condition;
50    typedef OpenThreads::Mutex Mutex;
51    typedef OpenThreads::ScopedLock<Mutex> ScopedLock;
52
53    bool isFull() const;
54    bool isEmpty() const;
55
56    void unsafePush(const value_type & value);
57    value_type unsafePop();
58
59    Buffer        m_buffer;
60    size_type    m_begin;
61    size_type    m_end;
62    size_type    m_size;
63
64    Mutex        m_mutex;
65    Condition    m_not_empty;
66    Condition    m_not_full;
67};
68
69
70
71
72
73template <class T>
74BoundedMessageQueue<T>::BoundedMessageQueue(const size_type capacity) :
75    m_buffer(capacity),
76    m_begin(0),
77    m_end(0),
78    m_size(0)
79{
80
81}
82
83
84
85template <class T>
86BoundedMessageQueue<T>::~BoundedMessageQueue()
87{
88
89}
90
91
92
93template <class T>
94void BoundedMessageQueue<T>::clear()
95{
96    {
97        ScopedLock lock(m_mutex);
98
99        m_buffer.clear();
100        m_begin = 0;
101        m_end = 0;
102        m_size = 0;
103    }
104
105    m_not_full.broadcast();
106}
107
108
109
110template <class T>
111template <class Destructor>
112void BoundedMessageQueue<T>::flush(const Destructor destructor)
113{
114    {
115        ScopedLock lock(m_mutex);
116
117        while (! isEmpty())
118        {
119            value_type value = unsafePop();
120            destructor(value);
121        }
122
123        m_begin = 0;
124        m_end = 0;
125        m_size = 0;
126    }
127
128    m_not_full.broadcast();
129}
130
131
132
133template <class T>
134void BoundedMessageQueue<T>::push(const value_type & value)
135{
136    {
137        ScopedLock lock(m_mutex);
138
139        while (isFull())
140            m_not_full.wait(&m_mutex);
141
142        unsafePush(value);
143    }
144
145    m_not_empty.signal();
146}
147
148
149
150template <class T>
151bool BoundedMessageQueue<T>::tryPush(const value_type & value)
152{
153    {
154        ScopedLock lock(m_mutex);
155
156        if (isFull())
157            return false;
158
159        unsafePush(value);
160    }
161
162    m_not_empty.signal();
163
164    return true;
165}
166
167
168
169template <class T>
170bool BoundedMessageQueue<T>::timedPush(const value_type & value, const unsigned long ms)
171{
172    // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented).
173    // This means that timedPush() could return false before the timeout has been hit.
174
175    {
176        ScopedLock lock(m_mutex);
177
178        if (isFull())
179            m_not_full.wait(&m_mutex, ms);
180
181        if (isFull())
182            return false;
183
184        unsafePush(value);
185    }
186
187    m_not_empty.signal();
188
189    return true;
190}
191
192
193
194template <class T>
195typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::pop()
196{
197    value_type value;
198
199    {
200        ScopedLock lock(m_mutex);
201
202        while (isEmpty())
203            m_not_empty.wait(&m_mutex);
204
205        value = unsafePop();
206    }
207
208    m_not_full.signal();
209
210    return value;
211}
212
213
214
215template <class T>
216typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::tryPop(bool & is_empty)
217{
218    value_type value;
219
220    {
221        ScopedLock lock(m_mutex);
222
223        is_empty = isEmpty();
224
225        if (is_empty)
226            return value_type();
227
228        value = unsafePop();
229    }
230
231    m_not_full.signal();
232
233    return value;
234}
235
236
237
238template <class T>
239typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::timedPop(bool & is_empty, const unsigned long ms)
240{
241    value_type value;
242
243    {
244        ScopedLock lock(m_mutex);
245
246        // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented).
247        // This means that timedPop() could return with (is_empty = true) before the timeout has been hit.
248
249        if (isEmpty())
250            m_not_empty.wait(&m_mutex, ms);
251
252        is_empty = isEmpty();
253
254        if (is_empty)
255            return value_type();
256
257        value = unsafePop();
258    }
259
260    m_not_full.signal();
261
262    return value;
263}
264
265
266
267template <class T>
268inline bool BoundedMessageQueue<T>::isFull() const
269{
270    return m_size == m_buffer.size();
271}
272
273
274
275template <class T>
276inline bool BoundedMessageQueue<T>::isEmpty() const
277{
278    return m_size == 0;
279}
280
281
282
283template <class T>
284inline void BoundedMessageQueue<T>::unsafePush(const value_type & value)
285{
286    // Note: this shall never be called if the queue is full.
287    assert(! isFull());   
288
289    m_buffer[m_end++] = value;
290
291    if (m_end == m_buffer.size())
292        m_end = 0;
293
294    ++m_size;
295}
296   
297
298
299template <class T>
300inline typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::unsafePop()
301{
302    // Note: this shall never be called if the queue is empty.
303    assert(! isEmpty());
304
305    const size_t pos = m_begin;
306
307    ++m_begin;
308    --m_size;
309
310    if (m_begin == m_buffer.size())
311        m_begin = 0;
312
313    return m_buffer[pos];
314}
315
316
317
318} // namespace osgFFmpeg
319
320
321
322#endif // HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H
Note: See TracBrowser for help on using the browser.