Bike-X  0.8
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
OVR_ThreadCommandQueue.cpp
Go to the documentation of this file.
1 /************************************************************************************
2 
3 PublicHeader: None
4 Filename : OVR_ThreadCommandQueue.cpp
5 Content : Command queue for operations executed on a thread
6 Created : October 29, 2012
7 
8 Copyright : Copyright 2014 Oculus VR, Inc. All Rights reserved.
9 
10 Licensed under the Oculus VR Rift SDK License Version 3.1 (the "License");
11 you may not use the Oculus VR Rift SDK except in compliance with the License,
12 which is provided at the time of installation or download, or which
13 otherwise accompanies this software in either electronic or hard copy form.
14 
15 You may obtain a copy of the License at
16 
17 http://www.oculusvr.com/licenses/LICENSE-3.1
18 
19 Unless required by applicable law or agreed to in writing, the Oculus VR SDK
20 distributed under the License is distributed on an "AS IS" BASIS,
21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 See the License for the specific language governing permissions and
23 limitations under the License.
24 
25 ************************************************************************************/
26 
27 #include "OVR_ThreadCommandQueue.h"
28 
29 namespace OVR {
30 
31 
32 //------------------------------------------------------------------------
33 // ***** CircularBuffer
34 
35 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
36 // which allows writing and reading variable-size data chucks. Write fails
37 // if buffer is full.
38 
39 class CircularBuffer
40 {
41  enum {
42  AlignSize = 16,
44  };
45 
48  UPInt Tail; // Byte offset of next item to be popped.
49  UPInt Head; // Byte offset of where next push will take place.
50  UPInt End; // When Head < Tail, this is used instead of Size.
51 
52  inline UPInt roundUpSize(UPInt size)
53  { return (size + AlignMask) & ~(UPInt)AlignMask; }
54 
55 public:
56 
58  : Size(size), Tail(0), Head(0), End(0)
59  {
61  }
63  {
64  // For ThreadCommands, we must consume everything before shutdown.
67  }
68 
69  bool IsEmpty() const { return (Head == Tail); }
70 
71  // Allocates a state block of specified size and advances pointers,
72  // returning 0 if buffer is full.
73  UByte* Write(UPInt size);
74 
75  // Returns a pointer to next available data block; 0 if none available.
77  { return (Head != Tail) ? (pBuffer + Tail) : 0; }
78  // Consumes data of specified size; this must match size passed to Write.
79  void ReadEnd(UPInt size);
80 };
81 
82 
83 // Allocates a state block of specified size and advances pointers,
84 // returning 0 if buffer is full.
86 {
87  UByte* p = 0;
88 
89  size = roundUpSize(size);
90  // Since this is circular buffer, always allow at least one item.
91  OVR_ASSERT(size < Size/2);
92 
93  if (Head >= Tail)
94  {
95  OVR_ASSERT(End == 0);
96 
97  if (size <= (Size - Head))
98  {
99  p = pBuffer + Head;
100  Head += size;
101  }
102  else if (size < Tail)
103  {
104  p = pBuffer;
105  End = Head;
106  Head = size;
107  OVR_ASSERT(Head != Tail);
108  }
109  }
110  else
111  {
112  OVR_ASSERT(End != 0);
113 
114  if ((Tail - Head) > size)
115  {
116  p = pBuffer + Head;
117  Head += size;
118  OVR_ASSERT(Head != Tail);
119  }
120  }
121 
122  return p;
123 }
124 
126 {
127  OVR_ASSERT(Head != Tail);
128  size = roundUpSize(size);
129 
130  Tail += size;
131  if (Tail == End)
132  {
133  Tail = End = 0;
134  }
135  else if (Tail == Head)
136  {
137  OVR_ASSERT(End == 0);
138  Tail = Head = 0;
139  }
140 }
141 
142 
143 //-------------------------------------------------------------------------------------
144 // ***** ThreadCommand
145 
147 {
148  if (Size)
149  Destruct<ThreadCommand>(toCommand());
150 }
151 
153 {
154  ThreadCommand* cmd = (ThreadCommand*)data;
155  OVR_ASSERT(cmd->Size <= MaxSize);
156 
157  if (Size)
158  Destruct<ThreadCommand>(toCommand());
159  Size = cmd->Size;
160  memcpy(Buffer, (void*)cmd, Size);
161 }
162 
164 {
165  ThreadCommand* command = toCommand();
166  OVR_ASSERT(command);
167  command->Execute();
168  if (NeedsWait())
169  GetEvent()->PulseEvent();
170 }
171 
172 //-------------------------------------------------------------------------------------
173 
175 {
177  friend class ThreadCommandQueue;
178 
179 public:
180 
182  : pQueue(queue), ExitEnqueued(false), ExitProcessed(false), CommandBuffer(2048)
183  {
184  }
186 
187 
188  bool PushCommand(const ThreadCommand& command);
189  bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
190 
191 
192  // ExitCommand is used by notify us that Thread is shutting down.
193  struct ExitCommand : public ThreadCommand
194  {
196 
198  : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
199 
200  virtual void Execute() const
201  {
202  Lock::Locker lock(&pImpl->QueueLock);
203  pImpl->ExitProcessed = true;
204  }
205  virtual ThreadCommand* CopyConstruct(void* p) const
206  { return Construct<ExitCommand>(p, *this); }
207  };
208 
209 
211  {
212  NotifyEvent* p = AvailableEvents.GetFirst();
213 
214  if (!AvailableEvents.IsNull(p))
215  p->RemoveNode();
216  else
217  p = new NotifyEvent;
218  return p;
219  }
220 
222  {
223  AvailableEvents.PushBack(p);
224  }
225 
227  {
228  while(!AvailableEvents.IsEmpty())
229  {
230  NotifyEvent* p = AvailableEvents.GetFirst();
231  p->RemoveNode();
232  delete p;
233  }
234  }
235 
238  volatile bool ExitEnqueued;
239  volatile bool ExitProcessed;
243 };
244 
245 
246 
248 {
249  Lock::Locker lock(&QueueLock);
250  OVR_ASSERT(BlockedProducers.IsEmpty());
251  FreeNotifyEvents_NTS();
252 }
253 
255 {
256  ThreadCommand::NotifyEvent* completeEvent = 0;
257  ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
258 
259  // Repeat writing command into buffer until it is available.
260  do {
261 
262  { // Lock Scope
263  Lock::Locker lock(&QueueLock);
264 
265  if (queueAvailableEvent)
266  {
267  FreeNotifyEvent_NTS(queueAvailableEvent);
268  queueAvailableEvent = 0;
269  }
270 
271  // Don't allow any commands after PushExitCommand() is called.
272  if (ExitEnqueued && !command.ExitFlag)
273  return false;
274 
275 
276  bool bufferWasEmpty = CommandBuffer.IsEmpty();
277  UByte* buffer = CommandBuffer.Write(command.GetSize());
278  if (buffer)
279  {
280  ThreadCommand* c = command.CopyConstruct(buffer);
281  if (c->NeedsWait())
282  completeEvent = c->pEvent = AllocNotifyEvent_NTS();
283  // Signal-waker consumer when we add data to buffer.
284  if (bufferWasEmpty)
285  pQueue->OnPushNonEmpty_Locked();
286  break;
287  }
288 
289  queueAvailableEvent = AllocNotifyEvent_NTS();
290  BlockedProducers.PushBack(queueAvailableEvent);
291  } // Lock Scope
292 
293  queueAvailableEvent->Wait();
294 
295  } while(1);
296 
297  // Command was enqueued, wait if necessary.
298  if (completeEvent)
299  {
300  completeEvent->Wait();
301  Lock::Locker lock(&QueueLock);
302  FreeNotifyEvent_NTS(completeEvent);
303  }
304 
305  return true;
306 }
307 
308 
309 // Pops the next command from the thread queue, if any is available.
311 {
312  Lock::Locker lock(&QueueLock);
313 
314  UByte* buffer = CommandBuffer.ReadBegin();
315  if (!buffer)
316  {
317  // Notify thread while in lock scope, enabling initialization of wait.
318  pQueue->OnPopEmpty_Locked();
319  return false;
320  }
321 
322  popBuffer->InitFromBuffer(buffer);
323  CommandBuffer.ReadEnd(popBuffer->GetSize());
324 
325  if (!BlockedProducers.IsEmpty())
326  {
327  ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
328  queueAvailableEvent->RemoveNode();
329  queueAvailableEvent->PulseEvent();
330  // Event is freed later by waiter.
331  }
332  return true;
333 }
334 
335 
336 //-------------------------------------------------------------------------------------
337 
339 {
340  pImpl = new ThreadCommandQueueImpl(this);
341 }
343 {
344  delete pImpl;
345 }
346 
348 {
349  return pImpl->PushCommand(command);
350 }
351 
353 {
354  return pImpl->PopCommand(popBuffer);
355 }
356 
358 {
359  // Exit is processed in two stages:
360  // - First, ExitEnqueued flag is set to block further commands from queuing up.
361  // - Second, the actual exit call is processed on the consumer thread, flushing
362  // any prior commands.
363  // IsExiting() only returns true after exit has flushed.
364  {
365  Lock::Locker lock(&pImpl->QueueLock);
366  if (pImpl->ExitEnqueued)
367  return;
368  pImpl->ExitEnqueued = true;
369  }
370 
371  PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
372 }
373 
375 {
376  return pImpl->ExitProcessed;
377 }
378 
379 
380 } // namespace OVR
#define OVR_FREE_ALIGNED(p)
ExitCommand(ThreadCommandQueueImpl *impl, bool wait)
#define OVR_ALLOC_ALIGNED(s, a)
__BEGIN_NAMESPACE_STD void * memcpy(void *__restrict __dest, const void *__restrict __src, size_t __n) __THROW __nonnull((1
virtual void Execute() const =0
void FreeNotifyEvent_NTS(NotifyEvent *p)
size_t UPInt
Definition: OVR_Types.h:218
bool PushCommand(const ThreadCommand &command)
uint8_t UByte
Definition: OVR_Types.h:249
void RemoveNode()
Definition: OVR_List.h:58
ThreadCommandQueueImpl(ThreadCommandQueue *queue)
#define OVR_ASSERT(p)
bool PopCommand(ThreadCommand::PopBuffer *popBuffer)
UByte * Write(UPInt size)
virtual ThreadCommand * CopyConstruct(void *p) const
bool PushCommand(const ThreadCommand &command)
ThreadCommand::NotifyEvent NotifyEvent
ThreadCommand * toCommand() const
bool PopCommand(ThreadCommand::PopBuffer *popBuffer)
UPInt roundUpSize(UPInt size)
virtual ThreadCommand * CopyConstruct(void *p) const =0