2 * A fixed-size queue; MT-friendly.
11 static void queue_error_rc( char* loc, DWORD err);
12 static void queue_error( char* loc, char* reason);
15 /* Wrapper around OS call to create semaphore */
17 newSemaphore(int initCount, int max)
20 s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */
21 initCount, /* LONG lInitialCount */
22 max, /* LONG lMaxCount */
23 NULL); /* LPCTSTR (anonymous / no object name) */
25 queue_error_rc("newSemaphore", GetLastError());
32 * Function: NewWorkQueue
34 * The queue constructor - semaphores are initialised to match
35 * max number of queue entries.
41 WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
44 queue_error("NewWorkQueue", "malloc() failed");
48 memset(wq, 0, sizeof *wq);
50 InitializeCriticalSection(&wq->queueLock);
51 wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
52 wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
54 /* Fail if we were unable to create any of the sync objects. */
55 if ( NULL == wq->workAvailable ||
56 NULL == wq->roomAvailable ) {
65 FreeWorkQueue ( WorkQueue* pq )
69 /* Free any remaining work items. */
70 for (i = 0; i < WORKQUEUE_SIZE; i++) {
71 if (pq->items[i] != NULL) {
76 /* Close the semaphores; any threads blocked waiting
77 * on either will as a result be woken up.
79 if ( pq->workAvailable ) {
80 CloseHandle(pq->workAvailable);
82 if ( pq->roomAvailable ) {
83 CloseHandle(pq->roomAvailable);
90 GetWorkQueueHandle ( WorkQueue* pq )
94 return pq->workAvailable;
100 * Fetch a work item from the queue, blocking if none available.
101 * Return value indicates of FALSE indicates error/fatal condition.
104 GetWork ( WorkQueue* pq, void** ppw )
109 queue_error("GetWork", "NULL WorkQueue object");
113 queue_error("GetWork", "NULL WorkItem object");
117 /* Block waiting for work item to become available */
118 if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
119 queue_error_rc("GetWork.WaitForSingleObject(workAvailable)",
120 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
124 return FetchWork(pq,ppw);
128 * Function: FetchWork
130 * Fetch a work item from the queue, blocking if none available.
131 * Return value indicates of FALSE indicates error/fatal condition.
134 FetchWork ( WorkQueue* pq, void** ppw )
139 queue_error("FetchWork", "NULL WorkQueue object");
143 queue_error("FetchWork", "NULL WorkItem object");
147 EnterCriticalSection(&pq->queueLock);
148 *ppw = pq->items[pq->head];
149 /* For sanity's sake, zero out the pointer. */
150 pq->items[pq->head] = NULL;
151 pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
152 rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
153 LeaveCriticalSection(&pq->queueLock);
155 queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
163 * Function: SubmitWork
165 * Add work item to the queue, blocking if no room available.
166 * Return value indicates of FALSE indicates error/fatal condition.
169 SubmitWork ( WorkQueue* pq, void* pw )
174 queue_error("SubmitWork", "NULL WorkQueue object");
178 queue_error("SubmitWork", "NULL WorkItem object");
182 /* Block waiting for work item to become available */
183 if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
184 queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)",
185 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
190 EnterCriticalSection(&pq->queueLock);
191 pq->items[pq->tail] = pw;
192 pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
193 rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
194 LeaveCriticalSection(&pq->queueLock);
196 queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
206 queue_error_rc( char* loc,
209 fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
216 queue_error( char* loc,
219 fprintf(stderr, "%s failed: %s\n", loc, reason);