2 * A fixed-size queue; MT-friendly.
11 static void queue_error_rc( char* loc, DWORD err);
12 static void queue_error( char* loc, char* reason);
15 /* Wrapper around OS call to create semaphore */
17 newSemaphore(int initCount, int max)
20 s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */
21 initCount, /* LONG lInitialCount */
22 max, /* LONG lMaxCount */
23 NULL); /* LPCTSTR (anonymous / no object name) */
25 queue_error_rc("newSemaphore", GetLastError());
32 * Function: NewWorkQueue
34 * The queue constructor - semaphores are initialised to match
35 * max number of queue entries.
41 WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
44 queue_error("NewWorkQueue", "malloc() failed");
48 memset(wq, 0, sizeof *wq);
50 InitializeCriticalSection(&wq->queueLock);
51 wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
52 wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
54 /* Fail if we were unable to create any of the sync objects. */
55 if ( NULL == wq->workAvailable ||
56 NULL == wq->roomAvailable ) {
65 FreeWorkQueue ( WorkQueue* pq )
69 /* Free any remaining work items. */
70 for (i = 0; i < WORKQUEUE_SIZE; i++) {
71 if (pq->items[i] != NULL) {
76 /* Close the semaphores; any threads blocked waiting
77 * on either will as a result be woken up.
79 if ( pq->workAvailable ) {
80 CloseHandle(pq->workAvailable);
82 if ( pq->roomAvailable ) {
83 CloseHandle(pq->roomAvailable);
85 DeleteCriticalSection(&pq->queueLock);
91 GetWorkQueueHandle ( WorkQueue* pq )
95 return pq->workAvailable;
101 * Fetch a work item from the queue, blocking if none available.
102 * Return value indicates of FALSE indicates error/fatal condition.
105 GetWork ( WorkQueue* pq, void** ppw )
110 queue_error("GetWork", "NULL WorkQueue object");
114 queue_error("GetWork", "NULL WorkItem object");
118 /* Block waiting for work item to become available */
119 if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
120 queue_error_rc("GetWork.WaitForSingleObject(workAvailable)",
121 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
125 return FetchWork(pq,ppw);
129 * Function: FetchWork
131 * Fetch a work item from the queue, blocking if none available.
132 * Return value indicates of FALSE indicates error/fatal condition.
135 FetchWork ( WorkQueue* pq, void** ppw )
140 queue_error("FetchWork", "NULL WorkQueue object");
144 queue_error("FetchWork", "NULL WorkItem object");
148 EnterCriticalSection(&pq->queueLock);
149 *ppw = pq->items[pq->head];
150 /* For sanity's sake, zero out the pointer. */
151 pq->items[pq->head] = NULL;
152 pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
153 rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
154 LeaveCriticalSection(&pq->queueLock);
156 queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
164 * Function: SubmitWork
166 * Add work item to the queue, blocking if no room available.
167 * Return value indicates of FALSE indicates error/fatal condition.
170 SubmitWork ( WorkQueue* pq, void* pw )
175 queue_error("SubmitWork", "NULL WorkQueue object");
179 queue_error("SubmitWork", "NULL WorkItem object");
183 /* Block waiting for work item to become available */
184 if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
185 queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)",
186 ( (WAIT_FAILED == rc) ? GetLastError() : rc));
191 EnterCriticalSection(&pq->queueLock);
192 pq->items[pq->tail] = pw;
193 pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
194 rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
195 LeaveCriticalSection(&pq->queueLock);
197 queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
207 queue_error_rc( char* loc,
210 fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
217 queue_error( char* loc,
220 fprintf(stderr, "%s failed: %s\n", loc, reason);