#
root/OpenSceneGraph/trunk/src/osgPlugins/ffmpeg/BoundedMessageQueue.hpp
@
9816

Revision 9816, 5.5 kB (checked in by robert, 7 years ago) |
---|

Rev | Line | |
---|---|---|

[9816] | 1 | |

2 | #ifndef HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H | |

3 | #define HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H | |

4 | ||

5 | #include <OpenThreads/Condition> | |

6 | #include <OpenThreads/Mutex> | |

7 | #include <OpenThreads/ScopedLock> | |

8 | ||

9 | #include <cassert> | |

10 | #include <algorithm> | |

11 | #include <vector> | |

12 | ||

13 | ||

14 | ||

15 | namespace osgFFmpeg { | |

16 | ||

17 | ||

18 | ||

19 | template <class T> | |

20 | class BoundedMessageQueue | |

21 | { | |

22 | public: | |

23 | ||

24 | typedef T value_type; | |

25 | typedef size_t size_type; | |

26 | ||

27 | explicit BoundedMessageQueue(size_type capacity); | |

28 | ~BoundedMessageQueue(); | |

29 | ||

30 | void clear(); | |

31 | ||

32 | template <class Destructor> | |

33 | void flush(const Destructor destructor); | |

34 | ||

35 | void push(const value_type & value); | |

36 | bool tryPush(const value_type & value); | |

37 | bool timedPush(const value_type & value, unsigned long ms); | |

38 | ||

39 | value_type pop(); | |

40 | value_type tryPop(bool & is_empty); | |

41 | value_type timedPop(bool & is_empty, unsigned long ms); | |

42 | ||

43 | private: | |

44 | ||

45 | BoundedMessageQueue(const BoundedMessageQueue &); | |

46 | BoundedMessageQueue & operator = (const BoundedMessageQueue &); | |

47 | ||

48 | typedef std::vector<T> Buffer; | |

49 | typedef OpenThreads::Condition Condition; | |

50 | typedef OpenThreads::Mutex Mutex; | |

51 | typedef OpenThreads::ScopedLock<Mutex> ScopedLock; | |

52 | ||

53 | bool isFull() const; | |

54 | bool isEmpty() const; | |

55 | ||

56 | void unsafePush(const value_type & value); | |

57 | value_type unsafePop(); | |

58 | ||

59 | Buffer m_buffer; | |

60 | size_type m_begin; | |

61 | size_type m_end; | |

62 | size_type m_size; | |

63 | ||

64 | Mutex m_mutex; | |

65 | Condition m_not_empty; | |

66 | Condition m_not_full; | |

67 | }; | |

68 | ||

69 | ||

70 | ||

71 | ||

72 | ||

73 | template <class T> | |

74 | BoundedMessageQueue<T>::BoundedMessageQueue(const size_type capacity) : | |

75 | m_buffer(capacity), | |

76 | m_begin(0), | |

77 | m_end(0), | |

78 | m_size(0) | |

79 | { | |

80 | ||

81 | } | |

82 | ||

83 | ||

84 | ||

85 | template <class T> | |

86 | BoundedMessageQueue<T>::~BoundedMessageQueue() | |

87 | { | |

88 | ||

89 | } | |

90 | ||

91 | ||

92 | ||

93 | template <class T> | |

94 | void BoundedMessageQueue<T>::clear() | |

95 | { | |

96 | { | |

97 | ScopedLock lock(m_mutex); | |

98 | ||

99 | m_buffer.clear(); | |

100 | m_begin = 0; | |

101 | m_end = 0; | |

102 | m_size = 0; | |

103 | } | |

104 | ||

105 | m_not_full.broadcast(); | |

106 | } | |

107 | ||

108 | ||

109 | ||

110 | template <class T> | |

111 | template <class Destructor> | |

112 | void BoundedMessageQueue<T>::flush(const Destructor destructor) | |

113 | { | |

114 | { | |

115 | ScopedLock lock(m_mutex); | |

116 | ||

117 | while (! isEmpty()) | |

118 | { | |

119 | value_type value = unsafePop(); | |

120 | destructor(value); | |

121 | } | |

122 | ||

123 | m_begin = 0; | |

124 | m_end = 0; | |

125 | m_size = 0; | |

126 | } | |

127 | ||

128 | m_not_full.broadcast(); | |

129 | } | |

130 | ||

131 | ||

132 | ||

133 | template <class T> | |

134 | void BoundedMessageQueue<T>::push(const value_type & value) | |

135 | { | |

136 | { | |

137 | ScopedLock lock(m_mutex); | |

138 | ||

139 | while (isFull()) | |

140 | m_not_full.wait(&m_mutex); | |

141 | ||

142 | unsafePush(value); | |

143 | } | |

144 | ||

145 | m_not_empty.signal(); | |

146 | } | |

147 | ||

148 | ||

149 | ||

150 | template <class T> | |

151 | bool BoundedMessageQueue<T>::tryPush(const value_type & value) | |

152 | { | |

153 | { | |

154 | ScopedLock lock(m_mutex); | |

155 | ||

156 | if (isFull()) | |

157 | return false; | |

158 | ||

159 | unsafePush(value); | |

160 | } | |

161 | ||

162 | m_not_empty.signal(); | |

163 | ||

164 | return true; | |

165 | } | |

166 | ||

167 | ||

168 | ||

169 | template <class T> | |

170 | bool BoundedMessageQueue<T>::timedPush(const value_type & value, const unsigned long ms) | |

171 | { | |

172 | // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented). | |

173 | // This means that timedPush() could return false before the timeout has been hit. | |

174 | ||

175 | { | |

176 | ScopedLock lock(m_mutex); | |

177 | ||

178 | if (isFull()) | |

179 | m_not_full.wait(&m_mutex, ms); | |

180 | ||

181 | if (isFull()) | |

182 | return false; | |

183 | ||

184 | unsafePush(value); | |

185 | } | |

186 | ||

187 | m_not_empty.signal(); | |

188 | ||

189 | return true; | |

190 | } | |

191 | ||

192 | ||

193 | ||

194 | template <class T> | |

195 | typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::pop() | |

196 | { | |

197 | value_type value; | |

198 | ||

199 | { | |

200 | ScopedLock lock(m_mutex); | |

201 | ||

202 | while (isEmpty()) | |

203 | m_not_empty.wait(&m_mutex); | |

204 | ||

205 | value = unsafePop(); | |

206 | } | |

207 | ||

208 | m_not_full.signal(); | |

209 | ||

210 | return value; | |

211 | } | |

212 | ||

213 | ||

214 | ||

215 | template <class T> | |

216 | typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::tryPop(bool & is_empty) | |

217 | { | |

218 | value_type value; | |

219 | ||

220 | { | |

221 | ScopedLock lock(m_mutex); | |

222 | ||

223 | is_empty = isEmpty(); | |

224 | ||

225 | if (is_empty) | |

226 | return value_type(); | |

227 | ||

228 | value = unsafePop(); | |

229 | } | |

230 | ||

231 | m_not_full.signal(); | |

232 | ||

233 | return value; | |

234 | } | |

235 | ||

236 | ||

237 | ||

238 | template <class T> | |

239 | typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::timedPop(bool & is_empty, const unsigned long ms) | |

240 | { | |

241 | value_type value; | |

242 | ||

243 | { | |

244 | ScopedLock lock(m_mutex); | |

245 | ||

246 | // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented). | |

247 | // This means that timedPop() could return with (is_empty = true) before the timeout has been hit. | |

248 | ||

249 | if (isEmpty()) | |

250 | m_not_empty.wait(&m_mutex, ms); | |

251 | ||

252 | is_empty = isEmpty(); | |

253 | ||

254 | if (is_empty) | |

255 | return value_type(); | |

256 | ||

257 | value = unsafePop(); | |

258 | } | |

259 | ||

260 | m_not_full.signal(); | |

261 | ||

262 | return value; | |

263 | } | |

264 | ||

265 | ||

266 | ||

267 | template <class T> | |

268 | inline bool BoundedMessageQueue<T>::isFull() const | |

269 | { | |

270 | return m_size == m_buffer.size(); | |

271 | } | |

272 | ||

273 | ||

274 | ||

275 | template <class T> | |

276 | inline bool BoundedMessageQueue<T>::isEmpty() const | |

277 | { | |

278 | return m_size == 0; | |

279 | } | |

280 | ||

281 | ||

282 | ||

283 | template <class T> | |

284 | inline void BoundedMessageQueue<T>::unsafePush(const value_type & value) | |

285 | { | |

286 | // Note: this shall never be called if the queue is full. | |

287 | assert(! isFull()); | |

288 | ||

289 | m_buffer[m_end++] = value; | |

290 | ||

291 | if (m_end == m_buffer.size()) | |

292 | m_end = 0; | |

293 | ||

294 | ++m_size; | |

295 | } | |

296 | ||

297 | ||

298 | ||

299 | template <class T> | |

300 | inline typename BoundedMessageQueue<T>::value_type BoundedMessageQueue<T>::unsafePop() | |

301 | { | |

302 | // Note: this shall never be called if the queue is empty. | |

303 | assert(! isEmpty()); | |

304 | ||

305 | const size_t pos = m_begin; | |

306 | ||

307 | ++m_begin; | |

308 | --m_size; | |

309 | ||

310 | if (m_begin == m_buffer.size()) | |

311 | m_begin = 0; | |

312 | ||

313 | return m_buffer[pos]; | |

314 | } | |

315 | ||

316 | ||

317 | ||

318 | } // namespace osgFFmpeg | |

319 | ||

320 | ||

321 | ||

322 | #endif // HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H |

**Note:**See TracBrowser for help on using the browser.