[project @ 2003-09-12 16:16:43 by sof]
authorsof <unknown>
Fri, 12 Sep 2003 16:16:43 +0000 (16:16 +0000)
committersof <unknown>
Fri, 12 Sep 2003 16:16:43 +0000 (16:16 +0000)
- awaitRequests() comments
- code reformatting

Merge to STABLE

ghc/rts/win32/AsyncIO.c

index 12de16e..fabe85b 100644 (file)
@@ -16,7 +16,7 @@
  * Overview:
  *
  * Haskell code issue asynchronous I/O requests via the 
- * asyncRead# and asyncWrite# primops. These cause addIORequest()
+ * async{Read,Write,DoOp}# primops. These cause addIORequest()
  * to be invoked, which forwards the request to the underlying
  * asynchronous I/O subsystem. Each request is tagged with a unique
  * ID.
@@ -35,9 +35,9 @@
  */
 
 typedef struct CompletedReq {
-  unsigned int   reqID;
-  int            len;
-  int            errCode;
+    unsigned int   reqID;
+    int            len;
+    int            errCode;
 } CompletedReq;
 
 #define MAX_REQUESTS 200
@@ -57,30 +57,32 @@ onIOComplete(unsigned int reqID,
             void* buf STG_UNUSED,
             int   errCode)
 {
-  /* Deposit result of request in queue/table */
-  EnterCriticalSection(&queue_lock);
-  if (completed_hw == MAX_REQUESTS) {
-    /* Not likely */
-    fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
-    fflush(stderr);
-  } else {
+    /* Deposit result of request in queue/table */
+    EnterCriticalSection(&queue_lock);
+    if (completed_hw == MAX_REQUESTS) {
+       /* Not likely */
+       fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
+       fflush(stderr);
+    } else {
 #if 0
-    fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
+       fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
+               reqID, len, errCode, issued_reqs, completed_hw); 
+       fflush(stderr);
 #endif
-    completedTable[completed_hw].reqID   = reqID;
-    completedTable[completed_hw].len     = len;
-    completedTable[completed_hw].errCode = errCode;
-    completed_hw++;
-    issued_reqs--;
-    if (completed_hw == 1) {
-      /* The event is used to wake up the scheduler thread should it
-       * be blocked waiting for requests to complete. It reset once
-       * that thread has cleared out the request queue/table.
-       */
-      SetEvent(completed_req_event);
+       completedTable[completed_hw].reqID   = reqID;
+       completedTable[completed_hw].len     = len;
+       completedTable[completed_hw].errCode = errCode;
+       completed_hw++;
+       issued_reqs--;
+       if (completed_hw == 1) {
+           /* The event is used to wake up the scheduler thread should it
+            * be blocked waiting for requests to complete. It reset once
+            * that thread has cleared out the request queue/table.
+            */
+           SetEvent(completed_req_event);
+       }
     }
-  }
-  LeaveCriticalSection(&queue_lock);
+    LeaveCriticalSection(&queue_lock);
 }
 
 unsigned int
@@ -90,163 +92,196 @@ addIORequest(int   fd,
             int   len,
             char* buf)
 {
-  EnterCriticalSection(&queue_lock);
-  issued_reqs++;
-  LeaveCriticalSection(&queue_lock);
+    EnterCriticalSection(&queue_lock);
+    issued_reqs++;
+    LeaveCriticalSection(&queue_lock);
 #if 0
-  fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
+    fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
 #endif
-  return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
+    return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
 }
 
 unsigned int
 addDelayRequest(int msecs)
 {
-  EnterCriticalSection(&queue_lock);
-  issued_reqs++;
-  LeaveCriticalSection(&queue_lock);
+    EnterCriticalSection(&queue_lock);
+    issued_reqs++;
+    LeaveCriticalSection(&queue_lock);
 #if 0
-  fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
+    fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
 #endif
-  return AddDelayRequest(msecs,onIOComplete);
+    return AddDelayRequest(msecs,onIOComplete);
 }
 
 unsigned int
 addDoProcRequest(void* proc, void* param)
 {
-  EnterCriticalSection(&queue_lock);
-  issued_reqs++;
-  LeaveCriticalSection(&queue_lock);
+    EnterCriticalSection(&queue_lock);
+    issued_reqs++;
+    LeaveCriticalSection(&queue_lock);
 #if 0
-  fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
+    fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
 #endif
-  return AddProcRequest(proc,param,onIOComplete);
+    return AddProcRequest(proc,param,onIOComplete);
 }
 
 
 int
 startupAsyncIO()
 {
-  if (!StartIOManager()) {
-    return 0;
-  }
-  InitializeCriticalSection(&queue_lock);
-  /* Create a pair of events:
-   *
-   *    - completed_req_event  -- signals the deposit of request result; manual reset.
-   *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
-   *                              thread to abandon wait for IO request completion.
-   *                              Auto reset.
-   */
-  completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
-  abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
-  wait_handles[0] = completed_req_event;
-  wait_handles[1] = abandon_req_wait;
-  completed_hw = 0;
-  return ( completed_req_event != INVALID_HANDLE_VALUE &&
-          abandon_req_wait    != INVALID_HANDLE_VALUE );
+    if (!StartIOManager()) {
+       return 0;
+    }
+    InitializeCriticalSection(&queue_lock);
+    /* Create a pair of events:
+     *
+     *    - completed_req_event  -- signals the deposit of request result; manual reset.
+     *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
+     *                              thread to abandon wait for IO request completion.
+     *                              Auto reset.
+     */
+    completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
+    abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
+    wait_handles[0] = completed_req_event;
+    wait_handles[1] = abandon_req_wait;
+    completed_hw = 0;
+    return ( completed_req_event != INVALID_HANDLE_VALUE &&
+            abandon_req_wait    != INVALID_HANDLE_VALUE );
 }
 
 void
 shutdownAsyncIO()
 {
-  CloseHandle(completed_req_event);
-  ShutdownIOManager();
+    CloseHandle(completed_req_event);
+    ShutdownIOManager();
 }
 
+/*
+ * Function: awaitRequests(wait)
+ *
+ * Check for the completion of external IO work requests. Worker
+ * threads signal completion of IO requests by depositing them
+ * in a table (completedTable). awaitRequests() matches up 
+ * requests in that table with threads on the blocked_queue, 
+ * making the threads whose IO requests have completed runnable
+ * again.
+ * 
+ * awaitRequests() is called by the scheduler periodically _or_ if
+ * it is out of work, and need to wait for the completion of IO
+ * requests to make further progress. In the latter scenario, 
+ * awaitRequests() will simply block waiting for worker threads 
+ * to complete if the 'completedTable' is empty.
+ */
 int
 awaitRequests(rtsBool wait)
 {
 start:
 #if 0
-  fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
+    fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
+    fflush(stderr);
 #endif
-  EnterCriticalSection(&queue_lock);
-  /* Nothing immediately available & we won't wait */
-  if ((!wait && completed_hw == 0) || 
-      (issued_reqs == 0 && completed_hw == 0)) {
-    LeaveCriticalSection(&queue_lock);
-    return 0;
-  }
-  if (completed_hw == 0) {
-    /* empty table, drop lock and wait */
-    LeaveCriticalSection(&queue_lock);
-    if (wait) {
-      DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
-      switch (dwRes) {
-      case WAIT_OBJECT_0:
-       break;
-      case WAIT_OBJECT_0 + 1:
-      case WAIT_TIMEOUT:
+    EnterCriticalSection(&queue_lock);
+    /* Nothing immediately available & we won't wait */
+    if ((!wait && completed_hw == 0) || 
+       (issued_reqs == 0 && completed_hw == 0)) {
+       LeaveCriticalSection(&queue_lock);
        return 0;
-      default:
-       fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
-       return 0;
-      }
-    } else {
-      return 0; /* cannot happen */
     }
-    goto start;
-  } else {
-    int i;
-    StgTSO *tso, *prev;
-    
-    for (i=0; i < completed_hw; i++) {
-      unsigned int rID = completedTable[i].reqID;
-      prev = NULL;
-      for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
-       switch(tso->why_blocked) {
-       case BlockedOnRead:
-       case BlockedOnWrite:
-       case BlockedOnDoProc:
-         if (tso->block_info.async_result->reqID == rID) {
-           /* Found the thread blocked waiting on request; stodgily fill 
-            * in its result block. 
+    if (completed_hw == 0) {
+       /* empty table, drop lock and wait */
+       LeaveCriticalSection(&queue_lock);
+       if ( wait && !interrupted ) {
+           DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
+           switch (dwRes) {
+           case WAIT_OBJECT_0:
+               break;
+           case WAIT_OBJECT_0 + 1:
+           case WAIT_TIMEOUT:
+               return 0;
+           default:
+               fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
+               return 0;
+           }
+       } else {
+           return 0; /* cannot happen */
+       }
+       goto start;
+    } else {
+       int i;
+       StgTSO *tso, *prev;
+       
+       for (i=0; i < completed_hw; i++) {
+           /* For each of the completed requests, match up their Ids
+            * with those of the threads on the blocked_queue. If the
+            * thread that made the IO request has been subsequently
+            * killed (and removed from blocked_queue), no match will
+            * be found for that request Id. 
+            *
+            * i.e., killing a Haskell thread doesn't attempt to cancel
+            * the IO request it is blocked on.
+            *
             */
-           tso->block_info.async_result->len = completedTable[i].len;
-           tso->block_info.async_result->errCode = completedTable[i].errCode;
-
-           /* Drop the matched TSO from blocked_queue */
-           if ( prev == NULL ) {
-               blocked_queue_hd = tso->link;
-               if (blocked_queue_tl == tso) {
-                   blocked_queue_tl = END_TSO_QUEUE;
+           unsigned int rID = completedTable[i].reqID;
+           prev = NULL;
+           
+           prev = NULL;
+           for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
+       
+               switch(tso->why_blocked) {
+               case BlockedOnRead:
+               case BlockedOnWrite:
+               case BlockedOnDoProc:
+                   if (tso->block_info.async_result->reqID == rID) {
+                       /* Found the thread blocked waiting on request; stodgily fill 
+                        * in its result block. 
+                        */
+                       tso->block_info.async_result->len = completedTable[i].len;
+                       tso->block_info.async_result->errCode = completedTable[i].errCode;
+                       
+                       /* Drop the matched TSO from blocked_queue */
+                       if (prev) {
+                           prev->link = tso->link;
+                       } else {
+                           blocked_queue_hd = tso->link;
+                       }
+                       if (blocked_queue_tl == tso) {
+                           blocked_queue_tl = prev;
+                       }
+                   
+                       /* Terminates the run queue + this inner for-loop. */
+                       tso->link = END_TSO_QUEUE;
+                       tso->why_blocked = NotBlocked;
+                       PUSH_ON_RUN_QUEUE(tso);
+                       break;
+                   }
+                   break;
+               default:
+                   if (tso->why_blocked != NotBlocked) {
+                       barf("awaitRequests: odd thread state");
+                   }
+                   break;
                }
-           } else {
-             prev->link = tso->link;
-             if (blocked_queue_tl == tso) {
-                 blocked_queue_tl = END_TSO_QUEUE;
-             }
            }
-
-           /* Terminates the run queue + this inner for-loop. */
-           tso->link = END_TSO_QUEUE;
-           tso->why_blocked = NotBlocked;
-           PUSH_ON_RUN_QUEUE(tso);
-           break;
-         }
-         break;
-       default:
-         if (tso->why_blocked != NotBlocked) {
-             barf("awaitRequests: odd thread state");
-         }
-         break;
        }
-      }
+       completed_hw = 0;
+       ResetEvent(completed_req_event);
+       LeaveCriticalSection(&queue_lock);
+       return 1;
     }
-    completed_hw = 0;
-    ResetEvent(completed_req_event);
-    LeaveCriticalSection(&queue_lock);
-    return 1;
-  }
 }
 
+/*
+ * Function: abandonRequestWait()
+ *
+ * Wake up a thread that's blocked waiting for new IO requests
+ * to complete (via awaitRequests().)
+ */
 void
 abandonRequestWait()
 {
-  /* the event is auto-reset, but in case there's no thread
-   * already waiting on the event, we want to return it to
-   * a non-signalled state.
-   */
-  PulseEvent(abandon_req_wait);
+    /* the event is auto-reset, but in case there's no thread
+     * already waiting on the event, we want to return it to
+     * a non-signalled state.
+     */
+    PulseEvent(abandon_req_wait);
 }