From 9fc59c6ee536c538f3f9b3ad12f09fbc7fd40ca3 Mon Sep 17 00:00:00 2001 From: sof Date: Fri, 12 Sep 2003 16:16:43 +0000 Subject: [PATCH] [project @ 2003-09-12 16:16:43 by sof] - awaitRequests() comments - code reformatting Merge to STABLE --- ghc/rts/win32/AsyncIO.c | 309 ++++++++++++++++++++++++++--------------------- 1 file changed, 172 insertions(+), 137 deletions(-) diff --git a/ghc/rts/win32/AsyncIO.c b/ghc/rts/win32/AsyncIO.c index 12de16e..fabe85b 100644 --- a/ghc/rts/win32/AsyncIO.c +++ b/ghc/rts/win32/AsyncIO.c @@ -16,7 +16,7 @@ * Overview: * * Haskell code issue asynchronous I/O requests via the - * asyncRead# and asyncWrite# primops. These cause addIORequest() + * async{Read,Write,DoOp}# primops. These cause addIORequest() * to be invoked, which forwards the request to the underlying * asynchronous I/O subsystem. Each request is tagged with a unique * ID. @@ -35,9 +35,9 @@ */ typedef struct CompletedReq { - unsigned int reqID; - int len; - int errCode; + unsigned int reqID; + int len; + int errCode; } CompletedReq; #define MAX_REQUESTS 200 @@ -57,30 +57,32 @@ onIOComplete(unsigned int reqID, void* buf STG_UNUSED, int errCode) { - /* Deposit result of request in queue/table */ - EnterCriticalSection(&queue_lock); - if (completed_hw == MAX_REQUESTS) { - /* Not likely */ - fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID); - fflush(stderr); - } else { + /* Deposit result of request in queue/table */ + EnterCriticalSection(&queue_lock); + if (completed_hw == MAX_REQUESTS) { + /* Not likely */ + fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID); + fflush(stderr); + } else { #if 0 - fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr); + fprintf(stderr, "onCompl: %d %d %d %d %d\n", + reqID, len, errCode, issued_reqs, completed_hw); + fflush(stderr); #endif - completedTable[completed_hw].reqID = reqID; - completedTable[completed_hw].len = len; - completedTable[completed_hw].errCode = errCode; - completed_hw++; - issued_reqs--; - if (completed_hw == 1) { - /* The event is used to wake up the scheduler thread should it - * be blocked waiting for requests to complete. It reset once - * that thread has cleared out the request queue/table. - */ - SetEvent(completed_req_event); + completedTable[completed_hw].reqID = reqID; + completedTable[completed_hw].len = len; + completedTable[completed_hw].errCode = errCode; + completed_hw++; + issued_reqs--; + if (completed_hw == 1) { + /* The event is used to wake up the scheduler thread should it + * be blocked waiting for requests to complete. It reset once + * that thread has cleared out the request queue/table. + */ + SetEvent(completed_req_event); + } } - } - LeaveCriticalSection(&queue_lock); + LeaveCriticalSection(&queue_lock); } unsigned int @@ -90,163 +92,196 @@ addIORequest(int fd, int len, char* buf) { - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); #if 0 - fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr); + fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr); #endif - return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); + return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); } unsigned int addDelayRequest(int msecs) { - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); #if 0 - fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr); + fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr); #endif - return AddDelayRequest(msecs,onIOComplete); + return AddDelayRequest(msecs,onIOComplete); } unsigned int addDoProcRequest(void* proc, void* param) { - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); #if 0 - fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); + fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); #endif - return AddProcRequest(proc,param,onIOComplete); + return AddProcRequest(proc,param,onIOComplete); } int startupAsyncIO() { - if (!StartIOManager()) { - return 0; - } - InitializeCriticalSection(&queue_lock); - /* Create a pair of events: - * - * - completed_req_event -- signals the deposit of request result; manual reset. - * - abandon_req_wait -- external OS thread tells current RTS/Scheduler - * thread to abandon wait for IO request completion. - * Auto reset. - */ - completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); - abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); - wait_handles[0] = completed_req_event; - wait_handles[1] = abandon_req_wait; - completed_hw = 0; - return ( completed_req_event != INVALID_HANDLE_VALUE && - abandon_req_wait != INVALID_HANDLE_VALUE ); + if (!StartIOManager()) { + return 0; + } + InitializeCriticalSection(&queue_lock); + /* Create a pair of events: + * + * - completed_req_event -- signals the deposit of request result; manual reset. + * - abandon_req_wait -- external OS thread tells current RTS/Scheduler + * thread to abandon wait for IO request completion. + * Auto reset. + */ + completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); + abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); + wait_handles[0] = completed_req_event; + wait_handles[1] = abandon_req_wait; + completed_hw = 0; + return ( completed_req_event != INVALID_HANDLE_VALUE && + abandon_req_wait != INVALID_HANDLE_VALUE ); } void shutdownAsyncIO() { - CloseHandle(completed_req_event); - ShutdownIOManager(); + CloseHandle(completed_req_event); + ShutdownIOManager(); } +/* + * Function: awaitRequests(wait) + * + * Check for the completion of external IO work requests. Worker + * threads signal completion of IO requests by depositing them + * in a table (completedTable). awaitRequests() matches up + * requests in that table with threads on the blocked_queue, + * making the threads whose IO requests have completed runnable + * again. + * + * awaitRequests() is called by the scheduler periodically _or_ if + * it is out of work, and need to wait for the completion of IO + * requests to make further progress. In the latter scenario, + * awaitRequests() will simply block waiting for worker threads + * to complete if the 'completedTable' is empty. + */ int awaitRequests(rtsBool wait) { start: #if 0 - fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr); + fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait); + fflush(stderr); #endif - EnterCriticalSection(&queue_lock); - /* Nothing immediately available & we won't wait */ - if ((!wait && completed_hw == 0) || - (issued_reqs == 0 && completed_hw == 0)) { - LeaveCriticalSection(&queue_lock); - return 0; - } - if (completed_hw == 0) { - /* empty table, drop lock and wait */ - LeaveCriticalSection(&queue_lock); - if (wait) { - DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE); - switch (dwRes) { - case WAIT_OBJECT_0: - break; - case WAIT_OBJECT_0 + 1: - case WAIT_TIMEOUT: + EnterCriticalSection(&queue_lock); + /* Nothing immediately available & we won't wait */ + if ((!wait && completed_hw == 0) || + (issued_reqs == 0 && completed_hw == 0)) { + LeaveCriticalSection(&queue_lock); return 0; - default: - fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr); - return 0; - } - } else { - return 0; /* cannot happen */ } - goto start; - } else { - int i; - StgTSO *tso, *prev; - - for (i=0; i < completed_hw; i++) { - unsigned int rID = completedTable[i].reqID; - prev = NULL; - for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) { - switch(tso->why_blocked) { - case BlockedOnRead: - case BlockedOnWrite: - case BlockedOnDoProc: - if (tso->block_info.async_result->reqID == rID) { - /* Found the thread blocked waiting on request; stodgily fill - * in its result block. + if (completed_hw == 0) { + /* empty table, drop lock and wait */ + LeaveCriticalSection(&queue_lock); + if ( wait && !interrupted ) { + DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + break; + case WAIT_OBJECT_0 + 1: + case WAIT_TIMEOUT: + return 0; + default: + fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr); + return 0; + } + } else { + return 0; /* cannot happen */ + } + goto start; + } else { + int i; + StgTSO *tso, *prev; + + for (i=0; i < completed_hw; i++) { + /* For each of the completed requests, match up their Ids + * with those of the threads on the blocked_queue. If the + * thread that made the IO request has been subsequently + * killed (and removed from blocked_queue), no match will + * be found for that request Id. + * + * i.e., killing a Haskell thread doesn't attempt to cancel + * the IO request it is blocked on. + * */ - tso->block_info.async_result->len = completedTable[i].len; - tso->block_info.async_result->errCode = completedTable[i].errCode; - - /* Drop the matched TSO from blocked_queue */ - if ( prev == NULL ) { - blocked_queue_hd = tso->link; - if (blocked_queue_tl == tso) { - blocked_queue_tl = END_TSO_QUEUE; + unsigned int rID = completedTable[i].reqID; + prev = NULL; + + prev = NULL; + for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) { + + switch(tso->why_blocked) { + case BlockedOnRead: + case BlockedOnWrite: + case BlockedOnDoProc: + if (tso->block_info.async_result->reqID == rID) { + /* Found the thread blocked waiting on request; stodgily fill + * in its result block. + */ + tso->block_info.async_result->len = completedTable[i].len; + tso->block_info.async_result->errCode = completedTable[i].errCode; + + /* Drop the matched TSO from blocked_queue */ + if (prev) { + prev->link = tso->link; + } else { + blocked_queue_hd = tso->link; + } + if (blocked_queue_tl == tso) { + blocked_queue_tl = prev; + } + + /* Terminates the run queue + this inner for-loop. */ + tso->link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + PUSH_ON_RUN_QUEUE(tso); + break; + } + break; + default: + if (tso->why_blocked != NotBlocked) { + barf("awaitRequests: odd thread state"); + } + break; } - } else { - prev->link = tso->link; - if (blocked_queue_tl == tso) { - blocked_queue_tl = END_TSO_QUEUE; - } } - - /* Terminates the run queue + this inner for-loop. */ - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - PUSH_ON_RUN_QUEUE(tso); - break; - } - break; - default: - if (tso->why_blocked != NotBlocked) { - barf("awaitRequests: odd thread state"); - } - break; } - } + completed_hw = 0; + ResetEvent(completed_req_event); + LeaveCriticalSection(&queue_lock); + return 1; } - completed_hw = 0; - ResetEvent(completed_req_event); - LeaveCriticalSection(&queue_lock); - return 1; - } } +/* + * Function: abandonRequestWait() + * + * Wake up a thread that's blocked waiting for new IO requests + * to complete (via awaitRequests().) + */ void abandonRequestWait() { - /* the event is auto-reset, but in case there's no thread - * already waiting on the event, we want to return it to - * a non-signalled state. - */ - PulseEvent(abandon_req_wait); + /* the event is auto-reset, but in case there's no thread + * already waiting on the event, we want to return it to + * a non-signalled state. + */ + PulseEvent(abandon_req_wait); } -- 1.7.10.4