Source code of Windows XP (NT5)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
16 KiB

  1. #include <windows.h>
  2. #include <string.h>
  3. #include <stdio.h>
  4. #include "gutils.h"
  5. #include "list.h"
  6. #include "queue.h"
  7. #define NAMELENGTH 20
  8. typedef struct queue_tag{
  9. CRITICAL_SECTION CritSect; /* to single-thread queue operations */
  10. HANDLE Event; /* Event to set when finished */
  11. HANDLE Sem; /* semaphore for Get to wait on */
  12. HANDLE MaxSem; /* semaphore for Put to wait on */
  13. int Waiting; /* num tasks waiting ~= -(Sem count) */
  14. LIST List; /* the queue itself */
  15. BOOL Alive; /* TRUE => no destroy request yet */
  16. BOOL Aborted; /* TRUE => the queue has been aborted */
  17. EMPTYPROC Emptier; /* the thread proc for emptying */
  18. int MaxEmptiers; /* max number of emptying threads */
  19. int MinQueueToStart;/* start another emptier Q this long */
  20. int MaxQueue; /* absolute maximum size of queue
  21. * (for debug only) */
  22. int Running; /* number of emptiers in existence
  23. * Once an emptier is created this
  24. * stays positive until Queue_Destroy */
  25. DWORD InstanceData; /* instance data for emptier */
  26. char Name[NAMELENGTH+1]; /* Name for the queue (for debug) */
  27. } QUEUEDATA;
  28. /* DOGMA:
  29. Any Wait must occur outside the critical section.
  30. Any update to the queue must occur inside the critical section.
  31. Any peeking from outside the critical section must be taken with salt.
  32. The queue has between 0 and MaxQueue elements on its list. The Maximum
  33. is policed by MaxSem which is initialised to MaxQueue and Waited for by
  34. Put before adding an element and Released by Get whenever it takes an element
  35. off. MaxQueue itself is just kept around for debug purposes.
  36. Put must Wait before entering the critical section, therefore a failed Put
  37. (e.g. Put to an Aborted queue) will have already upset the semaphore and so
  38. must back it out.
  39. Abort clears the queue and so must adequately simulate the elements being
  40. gotten. In fact it just does a single Release on MaxSem which ensures that
  41. a Put can complete. Any blocked Puts will then succeed one at a time as
  42. each one backs out.
  43. Abort is primarily intended for use by the Getter. Caling it before any
  44. element has ever been put is peculiar, but harmless.
  45. The minumum is policed by Sem which is initialised to 0, is Waited for by
  46. Get before getting an element and Released by Put whenever it puts one.
  47. Queue_Destroy neds to ensure that no thread will block on the Get but all
  48. threads will run into the empty queue and get STOPTHREAD or ENDQUEUE. It
  49. therefore releases the semaphore as many times as there are threads running.
  50. Abort clears the queue and simulates the elements being gotten so that
  51. a single Get is left blocked waiting for the Destroy. Whether there is a
  52. Get actually waiting at the moment is not interesting. Even if there were
  53. not, one could be by the time the abort is done. There are the following
  54. cases (Not Alive means the Queue_Destroy is already in):
  55. Not empty Alive -> empty it, let all but 1 run.
  56. Not empty Not Alive -> empty it, let all run.
  57. Empty Alive -> let all but 1 run.
  58. Empty Not Alive -> let all run.
  59. Since Queue_Destroy has already released everything, the Not Alive cases
  60. need no further releasing.
  61. */
  62. /* Queue_Create:
  63. ** Return a queue handle for a newly created empty queue
  64. ** NULL returned means it failed.
  65. */
  66. QUEUE Queue_Create( EMPTYPROC Emptier /* thread proc to start */
  67. , int MaxEmptiers /* max Getting threads */
  68. , int MinQueueToStart /* elements per thread */
  69. , int MaxQueue /* max elements on q */
  70. , HANDLE Event /* signal on deallocation */
  71. , DWORD InstanceData
  72. , PSZ Name
  73. )
  74. { QUEUE Queue;
  75. Queue = (QUEUE)GlobalAlloc(GMEM_FIXED, sizeof(QUEUEDATA));
  76. if (Queue==NULL) {
  77. char msg[80];
  78. wsprintf(msg, "Could not allocate storage for queue %s", Name);
  79. /* Trace_Error(msg, FALSE); */
  80. return NULL;
  81. }
  82. InitializeCriticalSection(&Queue->CritSect);
  83. //??? should allow for failure!!!
  84. /* the value of about 10 million is chosen to be effectively infinite */
  85. Queue->Sem = CreateSemaphore(NULL, 0, 99999999, NULL);
  86. //??? should allow for failure!!!
  87. Queue->MaxSem = CreateSemaphore(NULL, MaxQueue, 99999999, NULL);
  88. //??? should allow for failure!!!
  89. Queue->Waiting = 0;
  90. Queue->List = List_Create();
  91. Queue->Alive = TRUE;
  92. Queue->Aborted = FALSE;
  93. Queue->Emptier = Emptier;
  94. Queue->MaxEmptiers = MaxEmptiers;
  95. Queue->MinQueueToStart = MinQueueToStart;
  96. Queue->MaxQueue = MaxQueue;
  97. Queue->Running = 0;
  98. Queue->Event = Event;
  99. Queue->InstanceData = InstanceData;
  100. strncpy(Queue->Name, Name, NAMELENGTH);
  101. Queue->Name[NAMELENGTH]='\0'; /* guardian */
  102. return Queue;
  103. } /* Queue_Create */
  104. /* Destroy:
  105. ** Internal procedure.
  106. ** Actually deallocate the queue and signal its event (if any)
  107. ** Must have already left the critical section
  108. */
  109. static void Destroy(QUEUE Queue)
  110. {
  111. //dprintf1(("Actual Destroy of queue '%s'\n", Queue->Name));
  112. DeleteCriticalSection(&(Queue->CritSect));
  113. CloseHandle(Queue->Sem);
  114. CloseHandle(Queue->MaxSem);
  115. List_Destroy(&(Queue->List));
  116. if (Queue->Event!=NULL) {
  117. SetEvent(Queue->Event);
  118. }
  119. GlobalFree( (HGLOBAL)Queue);
  120. } /* Destroy */
  121. /* Queue_Put:
  122. ** Put an element from buffer Data of length Len bytes onto the queue.
  123. ** Will wait until the queue has room
  124. ** FALSE returned means the queue has been aborted and no
  125. ** put will ever succeed again.
  126. ** This operation may NOT be performed after a Queue_Destroy on Queue
  127. */
  128. BOOL Queue_Put(QUEUE Queue, LPBYTE Data, UINT Len)
  129. {
  130. DWORD ThreadId;
  131. //dprintf1(("Put to queue '%s'\n", Queue->Name));
  132. WaitForSingleObject(Queue->MaxSem, INFINITE);
  133. EnterCriticalSection(&Queue->CritSect);
  134. //dprintf1(("Put running to queue '%s'\n", Queue->Name));
  135. if ((Queue->Aborted) || (!Queue->Alive)) {
  136. //dprintf1(("(legal) Queue_Put to Aborted queue '%s'\n", Queue->Name));
  137. LeaveCriticalSection(&Queue->CritSect);
  138. ReleaseSemaphore(Queue->MaxSem, 1, NULL); /* let next in */
  139. return FALSE; /* Caller should soon please Queue_Destroy */
  140. }
  141. List_AddFirst(Queue->List, Data, Len);
  142. ReleaseSemaphore(Queue->Sem, 1, NULL);
  143. --Queue->Waiting;
  144. if ( Queue->Running < Queue->MaxEmptiers
  145. && ( Queue->Running<=0
  146. || List_Card(Queue->List) > Queue->MinQueueToStart*Queue->Running
  147. )
  148. ) {
  149. ++Queue->Running;
  150. LeaveCriticalSection(&Queue->CritSect);
  151. return ( (BOOL)CreateThread( NULL
  152. , 0
  153. , (LPTHREAD_START_ROUTINE)
  154. Queue->Emptier
  155. , (LPVOID)Queue
  156. , 0
  157. , &ThreadId
  158. )
  159. );
  160. }
  161. LeaveCriticalSection(&Queue->CritSect);
  162. return TRUE;
  163. } /* Queue_Put */
  164. /* Queue_Get:
  165. ** Get an element from the queue. (Waits until there is one)
  166. ** The elemeent is copied into Data. MaxLen is buffer length in bytes.
  167. ** Negative return codes imply no element is gotten.
  168. ** A negative return code is STOPTHREAD or ENDQUEUE or an error.
  169. ** On receiving STOPTHREAD or ENDQUEUE the caller should clean up and
  170. ** then ExitThread(0);
  171. ** If the caller is the last active thread getting from this queue, it
  172. ** will get ENDQUEUE rather than STOPTHREAD.
  173. ** Positive return code = length of data gotten in bytes.
  174. */
  175. int Queue_Get(QUEUE Queue, LPBYTE Data, int MaxLen)
  176. { LPBYTE ListData;
  177. int Len;
  178. //dprintf1(("Get from queue '%s'\n", Queue->Name));
  179. EnterCriticalSection(&Queue->CritSect);
  180. //dprintf1(("Get running from queue '%s'\n", Queue->Name));
  181. if (List_IsEmpty(Queue->List)) {
  182. if (!Queue->Alive) {
  183. --(Queue->Running);
  184. if (Queue->Running<=0 ) {
  185. if (Queue->Running<0 ) {
  186. char msg[80];
  187. wsprintf( msg
  188. , "Negative threads running on queue %s"
  189. , Queue->Name
  190. );
  191. // Trace_Error(msg, FALSE);
  192. // return NEGTHREADS; ???
  193. }
  194. LeaveCriticalSection(&Queue->CritSect);
  195. Destroy(Queue);
  196. return ENDQUEUE;
  197. }
  198. LeaveCriticalSection(&Queue->CritSect);
  199. return STOPTHREAD;
  200. }
  201. if (Queue->Waiting>0) {
  202. /* already another thread waiting, besides us */
  203. --(Queue->Running);
  204. LeaveCriticalSection(&(Queue->CritSect));
  205. return STOPTHREAD;
  206. }
  207. }
  208. ++(Queue->Waiting);
  209. LeaveCriticalSection(&(Queue->CritSect));
  210. WaitForSingleObject(Queue->Sem, INFINITE);
  211. EnterCriticalSection(&(Queue->CritSect));
  212. /* If the queue is empty now it must be dead */
  213. if (List_IsEmpty(Queue->List)) {
  214. if (Queue->Alive && (!Queue->Aborted)) {
  215. char msg[80];
  216. wsprintf( msg
  217. , "Queue %s empty but not dead during Get!"
  218. , Queue->Name
  219. );
  220. // Trace_Error(msg, FALSE);
  221. return SICKQUEUE;
  222. }
  223. else {
  224. --(Queue->Running);
  225. if (Queue->Running==0) {
  226. LeaveCriticalSection(&(Queue->CritSect));
  227. Destroy(Queue);
  228. return ENDQUEUE;
  229. }
  230. LeaveCriticalSection(&(Queue->CritSect));
  231. return STOPTHREAD;
  232. }
  233. }
  234. /* The queue is not empty and we are in the critical section. */
  235. ListData = List_Last(Queue->List);
  236. Len = List_ItemLength(ListData);
  237. if (Len>MaxLen) {
  238. ReleaseSemaphore(Queue->Sem, 1, NULL);
  239. --Queue->Waiting;
  240. LeaveCriticalSection(&Queue->CritSect);
  241. return TOOLONG;
  242. }
  243. memcpy(Data, ListData, Len);
  244. List_DeleteLast(Queue->List);
  245. LeaveCriticalSection(&Queue->CritSect);
  246. ReleaseSemaphore(Queue->MaxSem, 1, NULL);
  247. return Len;
  248. } /* Queue_Get */
  249. /* Queue_Destroy:
  250. ** Mark the queue as completed. No further data may ever by Put on it.
  251. ** When the last element has been gotten, it will return ENDTHREAD to
  252. ** a Queue_Get and deallocate itself. If it has an Event it will signal
  253. ** the event at that point.
  254. ** The Queue_Destroy operation returns promptly. It does not wait for
  255. ** further Gets or for the deallocation.
  256. */
  257. void Queue_Destroy(QUEUE Queue)
  258. {
  259. EnterCriticalSection(&(Queue->CritSect));
  260. //dprintf1(("Queue_Destroy %s\n", Queue->Name));
  261. Queue->Alive = FALSE;
  262. if ( List_IsEmpty(Queue->List)) {
  263. if (Queue->Running==0) {
  264. /* Only possible if nobody ever got started */
  265. LeaveCriticalSection(&(Queue->CritSect));
  266. Destroy(Queue);
  267. return;
  268. }
  269. else { int i;
  270. /* The list is empty, but some threads could be
  271. blocked on the Get (or about to block) so
  272. release every thread that might ever wait on Get */
  273. for (i=0; i<Queue->Running; ++i) {
  274. ReleaseSemaphore(Queue->Sem, 1, NULL);
  275. --(Queue->Waiting);
  276. }
  277. LeaveCriticalSection(&(Queue->CritSect));
  278. }
  279. }
  280. else LeaveCriticalSection(&(Queue->CritSect));
  281. return;
  282. } /* Queue_Destroy */
  283. /* Queue_GetInstanceData:
  284. ** Retrieve the DWORD of instance data that was given on Create
  285. */
  286. DWORD Queue_GetInstanceData(QUEUE Queue)
  287. { return Queue->InstanceData;
  288. } /* Queue_GetInstanceData */
  289. /* Queue_Abort:
  290. ** Abort the queue. Normally called by the Getter.
  291. ** Discard all elements on the queue,
  292. ** If the queue has already been aborted this will be a no-op.
  293. ** It purges all the data elements. If the Abort parameter is non-NULL
  294. ** then it is called for each element before deallocating it. This
  295. ** allows storage which is hung off the element to be freed.
  296. ** After this, all Put operations will return FALSE. If they were
  297. ** waiting they will promptly complete. The queue is NOT deallocated.
  298. ** That only happens when the last Get completes after the queue has been
  299. ** Queue_Destroyed. This means the normal sequence is:
  300. ** Getter discovers that the queue is now pointless and does Queue_Abort
  301. ** Getter does another Get (which blocks)
  302. ** Putter gets FALSE return code on next (or any outstanding) Put
  303. ** (Putter may want to propagates the error back to his source)
  304. ** Putter does Queue_Destroy
  305. ** The blocked Get is released and the queue is deallocated.
  306. */
  307. void Queue_Abort(QUEUE Queue, QUEUEABORTPROC Abort)
  308. {
  309. /* This is similar to Destroy, but we call the Abort proc and
  310. free the storage of the elements. Destroy allows them to run down.
  311. It is essential that the last Get must block until the sender does a
  312. Queue_Destroy (if it has not been done already). The Alive flag
  313. tells whether the Queue_Destroy has been done. All Getters except
  314. the last should be released.
  315. */
  316. //dprintf1(("Queue_Abort '%s'\n", Queue->Name));
  317. EnterCriticalSection(&(Queue->CritSect));
  318. //dprintf1(("Queue_Abort running for queue '%s'\n", Queue->Name));
  319. for (; ; ) {
  320. LPSTR Cursor = List_First(Queue->List);
  321. int Len;
  322. if (Cursor==NULL) break;
  323. Len = List_ItemLength(Cursor);
  324. if (Abort!=NULL) {
  325. Abort(Cursor, Len);
  326. }
  327. List_DeleteFirst(Queue->List);
  328. }
  329. /* Queue is now empty. Do not destroy. That's for the Putters */
  330. Queue->Aborted = TRUE;
  331. /* make sure the next Queue_Get blocks unless Queue_Destroy already done */
  332. //dprintf1(("Queue_Abort '%s' fixing semaphore to block\n", Queue->Name));
  333. if (Queue->Alive){
  334. while(Queue->Waiting<0) {
  335. WaitForSingleObject(Queue->Sem, INFINITE);
  336. ++(Queue->Waiting);
  337. }
  338. }
  339. //dprintf1(("Queue_Abort '%s' semaphore now set to block\n", Queue->Name));
  340. LeaveCriticalSection(&(Queue->CritSect));
  341. ReleaseSemaphore(Queue->MaxSem, 1, NULL);
  342. return;
  343. } /* Queue_Abort */