Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
10 KiB

  1. /*++
  2. Copyright (c) 1996 Microsoft Corporation
  3. Module Name:
  4. simpleq.c
  5. Abstract:
  6. Simple non-blocking queue, that allows
  7. multiple concurrent data providers
  8. and singe data consumer
  9. Author:
  10. GorN 9-Feb-1999
  11. Revision History:
  12. --*/
  13. #include "evtlogp.h"
  14. #include "simpleq.h"
  15. DWORD SimpleQueueInitialize(
  16. IN OUT PSIMPLEQUEUE q,
  17. IN DWORD cbSize,
  18. IN PWCHAR Name,
  19. IN DATA_AVAILABLE_CALLBACK DataAvailableCallback,
  20. IN DROPPED_DATA_NOTIFY Callback,
  21. IN DWORD NotifyInterval
  22. )
  23. /*++
  24. Routine Description:
  25. Initializes a queue
  26. Arguments:
  27. q - a queue to be initialized
  28. cbSize - size of the queue in bytes
  29. Name - Name of the queue. It will be supplied to DroppedDataNotifyCallback
  30. DataAvailableCallback - This function will be called if there are data available
  31. in the queue. This function will not be called again until
  32. Read/CompleteRead operations empty the queue.
  33. DroppedDataNotifyCallback - This function will be called if there are dropped data and
  34. the last time we reported dropped data was NotifyInterval
  35. or more seconds before.
  36. NotifyInterval - We will not report dropped data unless it has been longer
  37. than NotifyInterval seconds since the last report
  38. Return Value:
  39. ERROR_SUCCESS - success
  40. error code - called failed
  41. */
  42. {
  43. cbSize = SQB_INFLATE_SIZE(cbSize);
  44. ZeroMemory(q, sizeof(SIMPLEQUEUE) );
  45. q->Begin = LocalAlloc(LPTR, cbSize);
  46. if (q->Begin == 0) {
  47. return GetLastError();
  48. }
  49. q->End = q->Begin + cbSize;
  50. q->Head = q->Begin;
  51. q->Tail = q->Begin;
  52. q->Wrap = 0;
  53. q->Empty = TRUE;
  54. q->Name = Name;
  55. q->ReadInProgress = 0;
  56. q->DataAvailableCallback = DataAvailableCallback;
  57. #ifdef COUNT_DROPPED_PACKETS
  58. GetSystemTimeAsFileTime( (LPFILETIME)&q->NextDroppedDataNotify );
  59. q->DroppedDataNotifyInterval.QuadPart = Int32x32To64(NotifyInterval, 10000000);
  60. q->NextDroppedDataNotify.QuadPart += q->DroppedDataNotifyInterval.QuadPart;
  61. q->DroppedDataNotifyCallback = Callback;
  62. q->DroppedDataCount = 0;
  63. q->DroppedDataSize = 0;
  64. #endif
  65. InitializeCriticalSection(&q->Lock);
  66. q->Enabled = TRUE;
  67. return ERROR_SUCCESS;
  68. }
  69. VOID SimpleQueueDelete(
  70. IN PSIMPLEQUEUE q
  71. )
  72. /*++
  73. Routine Description:
  74. Destroys a queue
  75. Arguments:
  76. q - a queue to be destroyed
  77. Return Value:
  78. None
  79. Comments:
  80. This routine will destroy queue's CriticalSection
  81. and deallocate queue's memory. It is the responsibility of
  82. the caller to guarantee that nobody will be using the queue
  83. after this function is called
  84. */
  85. {
  86. if (q->Begin) {
  87. LocalFree(q->Begin);
  88. DeleteCriticalSection(&q->Lock);
  89. }
  90. }
  91. BOOL SimpleQueueTryAdd(
  92. IN PSIMPLEQUEUE q,
  93. IN DWORD PayloadSize,
  94. IN PVOID Payload)
  95. /*++
  96. Routine Description:
  97. Tries to add data in a queue
  98. Arguments:
  99. q - a queue
  100. PayloadSise - size of the chunk to be added to a queue
  101. Payload - pointer to a buffer that countains data to be added
  102. Return Value:
  103. TRUE - if the data were put into the queue successfully
  104. FALSE - otherwise
  105. Comments:
  106. DataAvailableCallback will be called
  107. if there are data available. DataAvailableCallback will not be called
  108. during subsequent add requests until Read/CompleteRead
  109. operations empty the queue.
  110. */
  111. {
  112. BOOL DataAvailableCallRequired = FALSE;
  113. DWORD BlockSize = SQB_PAYLOADSIZE_TO_BLOCKSIZE(PayloadSize);
  114. if (!q->Enabled) {
  115. return FALSE;
  116. }
  117. EnterCriticalSection(&q->Lock);
  118. if (q->Wrap) {
  119. if (q->Head + BlockSize > q->Tail) {
  120. goto NoRoom;
  121. }
  122. } else {
  123. if (q->End - q->Head < (INT)BlockSize) {
  124. // not enough room for this data at the
  125. // end of the queue.
  126. // Let's see whether we have enough room at the front
  127. if (q->Tail - q->Begin < (INT)BlockSize) {
  128. goto NoRoom;
  129. }
  130. q->Wrap = q->Head;
  131. q->Head = q->Begin;
  132. }
  133. }
  134. SQB_HEADER(q->Head)->PayloadSize = PayloadSize;
  135. CopyMemory( SQB_PAYLOAD(q->Head), Payload, PayloadSize);
  136. q->Head += BlockSize;
  137. q->Empty = FALSE;
  138. if ( !q->ReadInProgress ) {
  139. DataAvailableCallRequired = TRUE;
  140. q->ReadInProgress = TRUE;
  141. }
  142. LeaveCriticalSection(&q->Lock);
  143. if (DataAvailableCallRequired) {
  144. q->DataAvailableCallback(q); // Post a worker item in the queue //
  145. }
  146. return TRUE;
  147. NoRoom:
  148. #ifdef COUNT_DROPPED_PACKETS
  149. (q->DroppedDataCount) += 1;
  150. (q->DroppedDataSize) += PayloadSize;
  151. #endif
  152. LeaveCriticalSection(&q->Lock);
  153. return FALSE;
  154. }
  155. BOOL
  156. SimpleQueueReadAll(
  157. IN PSIMPLEQUEUE q,
  158. OUT PVOID* begin,
  159. OUT PVOID* end
  160. )
  161. /*++
  162. Routine Description:
  163. Allows to read all available blocks
  164. Arguments:
  165. q - a queue
  166. begin - receives a pointer to the first queue block
  167. end - receives a pointer past the end of the last queue block
  168. Return Value:
  169. TRUE - if we get at least one block
  170. FALSE - if the queue is empty
  171. Comments:
  172. This function not always give you ALL available blocks in the
  173. queue. It gives you all blocks up until the hard end of the queue buffer or
  174. the writing head of the queue, whatever is smaller.
  175. If the function returns success, it guarantees that begin < end.
  176. When you finished processing of the data, you need to call
  177. SimpleQueueReadComplete function.
  178. You can walk over these block using SQB_NEXTBLOCK macro.
  179. */
  180. {
  181. EnterCriticalSection(&q->Lock);
  182. if (q->Empty) {
  183. q->ReadInProgress = 0;
  184. LeaveCriticalSection(&q->Lock);
  185. return FALSE;
  186. }
  187. if (q->Wrap) {
  188. if (q->Tail == q->Wrap) {
  189. q->Tail = q->Begin;
  190. *begin = q->Begin;
  191. *end = q->Head;
  192. q->Wrap = 0;
  193. } else {
  194. *begin = q->Tail;
  195. *end = q->Wrap;
  196. }
  197. } else {
  198. *begin = q->Tail;
  199. *end = q->Head;
  200. }
  201. LeaveCriticalSection(&q->Lock);
  202. return TRUE;
  203. }
  204. BOOL
  205. SimpleQueueReadOne(
  206. IN PSIMPLEQUEUE q,
  207. OUT PVOID* begin,
  208. OUT PVOID* end
  209. )
  210. /*++
  211. Routine Description:
  212. Allows to read a single block of data
  213. Arguments:
  214. q - a queue
  215. begin - receives a pointer to the beginning of the first available queue block
  216. end - receives a pointer past the end of this block
  217. Return Value:
  218. TRUE - success
  219. FALSE - if the queue is empty
  220. Comments:
  221. When you finished processing of the data, you need to call
  222. SimpleQueueReadComplete function.
  223. */
  224. {
  225. EnterCriticalSection(&q->Lock);
  226. if (q->Empty) {
  227. q->ReadInProgress = 0;
  228. LeaveCriticalSection(&q->Lock);
  229. return FALSE;
  230. }
  231. if (q->Wrap) {
  232. if (q->Tail == q->Wrap) {
  233. q->Tail = q->Begin;
  234. *begin = q->Begin;
  235. q->Wrap = 0;
  236. } else {
  237. *begin = q->Tail;
  238. }
  239. } else {
  240. *begin = q->Tail;
  241. }
  242. // we have one or more items //
  243. *end = SQB_NEXTBLOCK(q->Tail);
  244. LeaveCriticalSection(&q->Lock);
  245. return TRUE;
  246. }
  247. BOOL
  248. SimpleQueueReadComplete(
  249. IN PSIMPLEQUEUE q,
  250. IN PVOID newtail
  251. )
  252. /*++
  253. Routine Description:
  254. Use this function to signal that the block of data was
  255. consumed
  256. Arguments:
  257. q - a queue
  258. end - receives a pointer past the end of the last consumed block.
  259. Usually this is a value returned by the PVOID end parameter of
  260. ReadOne and ReadAll
  261. Return Value:
  262. TRUE - There are more data
  263. FALSE - if the queue is empty
  264. Important!!!
  265. If the result of this function is TRUE, the caller should consume the data
  266. using ReadOne or ReadAll functions followed by the calls
  267. to ReadComplete until it returns FALSE.
  268. Otherwise, no futher DataAvailable notifications will be produced bu
  269. SimpleQueueTryAdd
  270. */
  271. {
  272. BOOL moreData;
  273. EnterCriticalSection(&q->Lock);
  274. q->Tail = newtail;
  275. if (q->Tail == q->Head) {
  276. q->Empty = TRUE;
  277. moreData = FALSE;
  278. } else {
  279. moreData = TRUE;
  280. }
  281. q->ReadInProgress = moreData;
  282. LeaveCriticalSection(&q->Lock);
  283. return moreData;
  284. }
  285. #ifdef COUNT_DROPPED_PACKETS
  286. VOID
  287. CheckForDroppedData(
  288. IN PSIMPLEQUEUE q,
  289. IN BOOL Now
  290. )
  291. /*++
  292. Routine Description:
  293. This function checks whether there were
  294. some data dropped and if the time is right,
  295. calls DropNotifyCallback function.
  296. Arguments:
  297. q - a queue
  298. Now - If TRUE, than DropNotifyCallback will be called
  299. immediately if there are dropped data.
  300. If FALSE, DropNotifyCallback will be called
  301. only if it is more then DroppedNotifyInterval
  302. seconds elapsed, since the last time we called
  303. DropNotifyCallback
  304. Return Value:
  305. None
  306. */
  307. {
  308. if (q->DroppedDataNotifyCallback) {
  309. ULARGE_INTEGER current;
  310. GetSystemTimeAsFileTime( (LPFILETIME)&current );
  311. EnterCriticalSection(&q->Lock);
  312. if ( q->DroppedDataCount &&
  313. (Now || CompareFileTime( (LPFILETIME)&current,
  314. (LPFILETIME)&q->NextDroppedDataNotify) > 0 )
  315. )
  316. {
  317. DWORD DroppedCount, DroppedSize;
  318. DroppedCount = q->DroppedDataCount;
  319. DroppedSize = q->DroppedDataSize;
  320. q->DroppedDataCount = 0;
  321. q->DroppedDataSize = 0;
  322. q->NextDroppedDataNotify.QuadPart =
  323. current.QuadPart + q->DroppedDataNotifyInterval.QuadPart;
  324. LeaveCriticalSection(&q->Lock);
  325. q->DroppedDataNotifyCallback(q->Name, DroppedCount, DroppedSize);
  326. } else {
  327. LeaveCriticalSection(&q->Lock);
  328. }
  329. }
  330. }
  331. #endif