Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
10 KiB

  1. /*
  2. * A queue is (roughly speaking) a Monitor in the sense of Hoare's paper.
  3. * It is an Object that includes synchronisation.
  4. *
  5. * It has the following methods:
  6. * Create: Create the queue and set it up ready for use.
  7. * Destroy: Input to the queue is finished (it will deallocate itself later).
  8. * Put: Put an element on the queue (and release a Get thread)
  9. * Get: Take an element off the queue (but wait if queue was empty).
  10. * Abort: Everything on the queue, now or later is useless. Shut down.
  11. * GetInstanceData: Retrieve one DWORD which was set on Create.
  12. * All the method names are prefixed with Queue_ e.g. Queue_Create.
  13. *
  14. * The Queue is designed to be filled by one (or more) thread(s) and emptied
  15. * by other(s). The queue itself creates the emptying threads. The Create
  16. * method specifies a procedure which will be used to empty it.
  17. * Elements on the queue are strictly first-in first-out, but bear in
  18. * mind that IF there are multiple emptying threads then although one thread
  19. * may get its element before another in strict order, what happens next
  20. * is not defined by the QUEUE mechanism.
  21. *
  22. * The QUEUE starts an emptying thread when
  23. * An element is put and the number of emptier threads started is fewer
  24. * than MaxEmptiers
  25. * AND the number of emptiers running is currently zero
  26. * OR the queue has more than (MinQueueToStart elements times the number
  27. * of emptier threads already running)
  28. *
  29. * What this means is that the queue will start emptier threads as needed up
  30. * to a limit of MaxEmptiers in such a way as to try to keep the queue down
  31. * to no more than MinQueueToStart per running emptier thread.
  32. *
  33. * Emptier threads should stop themselves when they try to get an element but
  34. * get the return code STOPTHREAD or ENDQUEUE. These return codes are issued
  35. * when
  36. * The queue is empty and there are already enough emptier threads waiting
  37. * or The queue is empty and has received a destroy request.
  38. *
  39. * If the thread receiving it is the last or only thread which has been started
  40. * to empty this queue, it will get a ENDQUEUE return code, i.e. all the other
  41. * emptying threads have already received a STOPTHREAD. Otherwise it will
  42. * just get STOPTHREAD. The queue de-allocates itself when it returns ENDQUEUE.
  43. * (No queue operation should be attempted after that).
  44. * This happens when the queue has received a destroy request
  45. * and the queue is empty and there are no emptier threads running (only
  46. * occurs if there has never been a Put) or when the last emptier thread
  47. * gets its ENDQUEUE.
  48. *
  49. * Queue_Create has an Event parameter. If this is not NULL then this event
  50. * is Set when the queue is destroyed. This Event is created by the calling
  51. * process. The caller must not Close it until the queue signals it.
  52. *
  53. * Information can be passed from the creator of the queue to the emptiers
  54. * via the InstanceData parameter. The value passed to Queue_Create can
  55. * be retrieved by the emptiers by calling Queue_GetInstanceData.
  56. *
  57. * If the instance data is in fact a pointer, the queue is unaware of this
  58. * and will never access or free the data pointed to.
  59. *
  60. * As well as controlling the emptier threads (starting more and more
  61. * in response to a growing queue) we also need to control the filler,
  62. * slowing it down if we are getting over-runs. For instance if we
  63. * have a broken output (say broken network connection) and a job which
  64. * is sending 200MB of data, we don't want to have a 200MB queue build up!
  65. *
  66. * To fix this, we have an absolute limit on the size of the queue (yet
  67. * another Create parameter).
  68. *
  69. * Error handling is tricky. Errors which only affect individual elements
  70. * should be handled by the Getters and Putters (e.g. by including in the
  71. * data of an element a code which indicates that the item is in error).
  72. * Errors which mean that the whole thing should be taken down can be handled
  73. * as follows. The QUEUE has a method Abort which works much as Destroy,
  74. * but will purge the queue of any held elements first.
  75. * As a QUEUE element may have storage chained off it which needs to be
  76. * freed, there is a parameter on Abort which is the ABORTPROC.
  77. * This is called once for each element on the queue to dispose of the element.
  78. * The storage of the element itself is then freed.
  79. * If the ABORTPROC is NULL then the storage of the element is just freed.
  80. *
  81. * If the queue were to be deallocated by the Getter then the next Put would
  82. * trap, so the queue is left in existence, but the Putting threads
  83. * get a FALSE return code when they try to Put after an Abort. They should
  84. * then do a Queue_Destroy (they may also want to Abort any queue they are
  85. * themselves reading from). The Getting threads should meanwhile keep running.
  86. * All except one will promptly get a STOPTHREAD. The last one will block on
  87. * the Get. When the Destroy comes in, indicating that the Putting side has
  88. * completely finished with the queue, the Get will be released with a final
  89. * ENDQUEUE and the queue itself will be deallocated.
  90. * Any attempt to use it after that will probably trap!
  91. *
  92. * Typical usage for a pipeline of queues where a thread is potentially one
  93. * of several which are getting from one queue and putting to another is:
  94. *
  95. * for (; ; ){
  96. * len = Queue_Get(inqueue, data, maxlen);
  97. * if (len==STOPTHREAD){
  98. * tidy up;
  99. * ExitThread(0);
  100. * }
  101. * if (len=ENDQUEUE){
  102. * tidy up;
  103. * Queue_Destroy(outqueue);
  104. * ExitThread(0);
  105. * }
  106. * if (len<0) {
  107. * ASSERT you have a bug!
  108. * }
  109. *
  110. * process(&data, &len);
  111. *
  112. * if (!Queue_Put(outqueue, data, len)){
  113. * Queue_Abort(inqueue, InQueueAbortProc);
  114. * }
  115. *
  116. * }
  117. *
  118. *
  119. * Note that there is a partial ordering in time of actions performed by the
  120. * various parallel threads all running this loop which ensures that outqueue
  121. * is handled properly, i.e. all the puts complete before the Destroy.
  122. * This partial ordering is:
  123. *
  124. * Put_by_thread_A(outqueue) Put_by_thread_B(outqueue)
  125. * | |
  126. * | |
  127. * v v
  128. * Get_by_thread_A(inqueue) Get_by_thread_A(inqueue)
  129. * | |
  130. * | |
  131. * v v
  132. * STOPTHREAD_for_thread_A ----> ENDQUEUE_for_thread_B--> Queue_Destroy(outqueue)
  133. *
  134. * Which threads get the STOPTHREAD is indeterminate, but they all happen BEFORE
  135. * the other thread gets the ENDQUEUE.
  136. *
  137. */
  138. /* Return codes from Queue_Get. All non-successful return codes are <0 */
  139. #define STOPTHREAD -1 /* Please tidy up and then ExitThread(0)
  140. ** There is no queue element for you.
  141. */
  142. #define TOOLONG -2 /* Your buffer was too short. This was a no-op
  143. ** the data is still on the queue.
  144. */
  145. #define ENDQUEUE -3 /* This queue is now closing. You are the last
  146. ** thread active. All the others (if any) have
  147. ** had STOPTHREAD.
  148. ** There is no queue element for you.
  149. */
  150. #define NEGTHREADS -4 /* Bug in queue. Apparently negative number of
  151. ** threads running!
  152. */
  153. #define SICKQUEUE -5 /* Bug in queue. Trying to get from an empty
  154. ** queue.
  155. */
  156. typedef struct queue_tag * QUEUE;
  157. typedef int (* EMPTYPROC)(QUEUE Queue);
  158. /* Queue_Create:
  159. ** Return a queue handle for a newly created empty queue
  160. ** NULL returned means it failed.
  161. */
  162. QUEUE Queue_Create( EMPTYPROC Emptier
  163. , int MaxEmptiers
  164. , int MinQueueToStart
  165. , int MaxQueue
  166. , HANDLE Event
  167. , DWORD InstanceData
  168. , PSZ Name // of the queue
  169. );
  170. /* Queue_Put:
  171. ** Put an element from buffer Data of length Len bytes onto the queue.
  172. ** Will wait until the queue has room
  173. ** FALSE returned means the queue has been aborted and no
  174. ** put will ever succeed again.
  175. ** This operation may NOT be performed after a Queue_Destroy on Queue
  176. */
  177. BOOL Queue_Put(QUEUE Queue, LPBYTE Data, UINT Len);
  178. /* Queue_Get:
  179. ** Get an element from the queue. (Waits until there is one)
  180. ** The element is copied into Data. MaxLen is buffer length in bytes.
  181. ** Negative return codes imply no element is gotten.
  182. ** A negative return code is STOPTHREAD or ENDQUEUE or an error.
  183. ** On receiving STOPTHREAD or ENDQUEUE the caller should clean up and
  184. ** then ExitThread(0);
  185. ** If the caller is the last active thread getting from this queue, it
  186. ** will get ENDQUEUE rather than STOPTHREAD.
  187. ** Positive return code = length of data gotten in bytes.
  188. */
  189. int Queue_Get(QUEUE Queue, LPBYTE Data, int MaxLen);
  190. /* Queue_Destroy:
  191. ** Mark the queue as completed. No further data may ever by Put on it.
  192. ** When the last element has been gotten, it will return ENDTHREAD to
  193. ** a Queue_Get and deallocate itself. If it has an Event it will signal
  194. ** the event at that point.
  195. ** The Queue_Destroy operation returns promptly. It does not wait for
  196. ** further Gets or for the deallocation.
  197. */
  198. void Queue_Destroy(QUEUE Queue);
  199. /* Queue_GetInstanceData:
  200. ** Retrieve the DWORD of instance data that was given on Create
  201. */
  202. DWORD Queue_GetInstanceData(QUEUE Queue);
  203. /* QUEUEABORTPROC:
  204. * Data points to the element to be aborted. Len is its length in bytes.
  205. * See Queue_Abort.
  206. */
  207. typedef void (* QUEUEABORTPROC)(LPSTR Data, int Len);
  208. /* Queue_Abort:
  209. ** Abort the queue. Normally called by the Getter.
  210. ** Discard all elements on the queue,
  211. ** If the queue has already been aborted this will be a no-op.
  212. ** It purges all the data elements. If the Abort parameter is non-NULL
  213. ** then it is called for each element before deallocating it. This
  214. ** allows storage which is hung off the element to be freed.
  215. ** After this, all Put operations will return FALSE. If they were
  216. ** waiting they will promptly complete. The queue is NOT deallocated.
  217. ** That only happens when the last Get completes after the queue has been
  218. ** Queue_Destroyed. This means the normal sequence is:
  219. ** Getter discovers that the queue is now pointless and does Queue_Abort
  220. ** Getter does another Get (which blocks)
  221. ** Putter gets FALSE return code on next (or any outstanding) Put
  222. ** (Putter may want to propagates the error back to his source)
  223. ** Putter does Queue_Destroy
  224. ** The blocked Get is released and the queue is deallocated.
  225. */
  226. void Queue_Abort(QUEUE Queue, QUEUEABORTPROC Abort);