2 * A fixed-size queue; MT-friendly.
11 static void queue_error_rc( char* loc, DWORD err);
12 static void queue_error( char* loc, char* reason);
15 /* Wrapper around OS call to create semaphore */
17 newSemaphore(int initCount, int max)
20 s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */
21 initCount, /* LONG lInitialCount */
22 max, /* LONG lMaxCount */
23 NULL); /* LPCTSTR (anonymous / no object name) */
25 queue_error_rc("newSemaphore", GetLastError());
32 * Function: NewWorkQueue
34 * The queue constructor - semaphores are initialised to match
35 * max number of queue entries.
41 WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
44 queue_error("NewWorkQueue", "malloc() failed");
51 InitializeCriticalSection(&wq->queueLock);
52 wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
53 wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
55 /* Fail if we were unable to create any of the sync objects. */
56 if ( NULL == wq->workAvailable ||
57 NULL == wq->roomAvailable ) {
66 FreeWorkQueue ( WorkQueue* pq )
68 /* Close the semaphores; any threads blocked waiting
69 * on either will as a result be woken up.
71 if ( pq->workAvailable ) {
72 CloseHandle(pq->workAvailable);
74 if ( pq->roomAvailable ) {
75 CloseHandle(pq->workAvailable);
82 GetWorkQueueHandle ( WorkQueue* pq )
86 return pq->workAvailable;
92 * Fetch a work item from the queue, blocking if none available.
93 * Return value indicates of FALSE indicates error/fatal condition.
96 GetWork ( WorkQueue* pq, void** ppw )
101 queue_error("GetWork", "NULL WorkQueue object");
105 queue_error("GetWork", "NULL WorkItem object");
109 /* Block waiting for work item to become available */
110 if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
111 queue_error_rc("GetWork.WaitForSingleObject(workAvailable)",
112 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
116 return FetchWork(pq,ppw);
120 * Function: FetchWork
122 * Fetch a work item from the queue, blocking if none available.
123 * Return value indicates of FALSE indicates error/fatal condition.
126 FetchWork ( WorkQueue* pq, void** ppw )
131 queue_error("FetchWork", "NULL WorkQueue object");
135 queue_error("FetchWork", "NULL WorkItem object");
139 EnterCriticalSection(&pq->queueLock);
140 *ppw = pq->items[pq->head];
141 /* For sanity's sake, zero out the pointer. */
142 pq->items[pq->head] = NULL;
143 pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
144 rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
145 LeaveCriticalSection(&pq->queueLock);
147 queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
155 * Function: SubmitWork
157 * Add work item to the queue, blocking if no room available.
158 * Return value indicates of FALSE indicates error/fatal condition.
161 SubmitWork ( WorkQueue* pq, void* pw )
166 queue_error("SubmitWork", "NULL WorkQueue object");
170 queue_error("SubmitWork", "NULL WorkItem object");
174 /* Block waiting for work item to become available */
175 if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
176 queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)",
177 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
182 EnterCriticalSection(&pq->queueLock);
183 pq->items[pq->tail] = pw;
184 pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
185 rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
186 LeaveCriticalSection(&pq->queueLock);
188 queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
198 queue_error_rc( char* loc,
201 fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
208 queue_error( char* loc,
211 fprintf(stderr, "%s failed: %s\n", loc, reason);