Reorganisation of the source tree
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
diff --git a/ghc/rts/win32/IOManager.c b/ghc/rts/win32/IOManager.c
deleted file mode 100644 (file)
index a67c350..0000000
+++ /dev/null
@@ -1,510 +0,0 @@
-/* IOManager.c
- *
- * Non-blocking / asynchronous I/O for Win32.
- *
- * (c) sof, 2002-2003.
- */
-#include "Rts.h"
-#include "IOManager.h"
-#include "WorkQueue.h"
-#include "ConsoleHandler.h"
-#include <stdio.h>
-#include <stdlib.h>
-#include <io.h>
-#include <winsock.h>
-#include <process.h>
-
-/*
- * Internal state maintained by the IO manager.
- */
-typedef struct IOManagerState {
-    CritSection      manLock;
-    WorkQueue*       workQueue;
-    int              queueSize;
-    int              numWorkers;
-    int              workersIdle;
-    HANDLE           hExitEvent;
-    unsigned int     requestID;
-    /* fields for keeping track of active WorkItems */
-    CritSection      active_work_lock;
-    WorkItem*        active_work_items;
-} IOManagerState;
-
-/* ToDo: wrap up this state via a IOManager handle instead? */
-static IOManagerState* ioMan;
-
-static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
-static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
-
-/*
- * The routine executed by each worker thread.
- */
-static
-unsigned
-WINAPI
-IOWorkerProc(PVOID param)
-{
-    HANDLE  hWaits[2];
-    DWORD   rc;
-    IOManagerState* iom = (IOManagerState*)param;
-    WorkQueue* pq = iom->workQueue;
-    WorkItem*  work;
-    int        len = 0, fd = 0;
-    DWORD      errCode = 0;
-    void*      complData;
-
-    hWaits[0] = (HANDLE)iom->hExitEvent;
-    hWaits[1] = GetWorkQueueHandle(pq);
-  
-    while (1) {
-       /* The error code is communicated back on completion of request; reset. */
-       errCode = 0;
-       
-       EnterCriticalSection(&iom->manLock);
-       /* Signal that the worker is idle.
-        *
-        * 'workersIdle' is used when determining whether or not to
-        * increase the worker thread pool when adding a new request.
-        * (see addIORequest().)
-        */
-       iom->workersIdle++;
-       LeaveCriticalSection(&iom->manLock);
-       
-       /*
-        * A possible future refinement is to make long-term idle threads
-        * wake up and decide to shut down should the number of idle threads
-        * be above some threshold.
-        *
-        */
-       rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
-
-       if (rc == WAIT_OBJECT_0) {
-           // we received the exit event
-           return 0;
-       }
-
-       EnterCriticalSection(&iom->manLock);
-       /* Signal that the thread is 'non-idle' and about to consume 
-        * a work item.
-        */
-       iom->workersIdle--;
-       iom->queueSize--;
-       LeaveCriticalSection(&iom->manLock);
-    
-       if ( rc == (WAIT_OBJECT_0 + 1) ) {
-           /* work item available, fetch it. */
-           if (FetchWork(pq,(void**)&work)) {
-               work->abandonOp = 0;
-               RegisterWorkItem(iom,work);
-               if ( work->workKind & WORKER_READ ) {
-                   if ( work->workKind & WORKER_FOR_SOCKET ) {
-                       len = recv(work->workData.ioData.fd, 
-                                  work->workData.ioData.buf,
-                                  work->workData.ioData.len,
-                                  0);
-                       if (len == SOCKET_ERROR) {
-                           errCode = WSAGetLastError();
-                       }
-                   } else {
-                       while (1) {
-                       /* Do the read(), with extra-special handling for Ctrl+C */
-                       len = read(work->workData.ioData.fd,
-                                  work->workData.ioData.buf,
-                                  work->workData.ioData.len);
-                       if ( len == 0 && work->workData.ioData.len != 0 ) {
-                           /* Given the following scenario:
-                            *     - a console handler has been registered that handles Ctrl+C
-                            *       events.
-                            *     - we've not tweaked the 'console mode' settings to turn on
-                            *       ENABLE_PROCESSED_INPUT.
-                            *     - we're blocked waiting on input from standard input.
-                            *     - the user hits Ctrl+C.
-                            *
-                            * The OS will invoke the console handler (in a separate OS thread),
-                            * and the above read() (i.e., under the hood, a ReadFile() op) returns
-                            * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
-                            * want to percolate this error condition back to the Haskell user.
-                            * Do this by waiting for the completion of the Haskell console handler.
-                            * If upon completion of the console handler routine, the Haskell thread 
-                            * that issued the request is found to have been thrown an exception, 
-                            * the worker abandons the request (since that's what the Haskell thread 
-                            * has done.) If the Haskell thread hasn't been interrupted, the worker 
-                            * retries the read request as if nothing happened.
-                            */
-                           if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
-                               /* For now, only abort when dealing with the standard input handle.
-                                * i.e., for all others, an error is raised.
-                                */
-                               HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
-                               if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
-                                   if (rts_waitConsoleHandlerCompletion()) {
-                                       /* If the Scheduler has set work->abandonOp, the Haskell thread has 
-                                        * been thrown an exception (=> the worker must abandon this request.)
-                                        * We test for this below before invoking the on-completion routine.
-                                        */
-                                       if (work->abandonOp) {
-                                           break;
-                                       } else {
-                                           continue;
-                                       }
-                                   } 
-                               } else { 
-                                   break; /* Treat it like an error */
-                               }
-                           } else {
-                               break;
-                           }
-                       } else {
-                           break;
-                       }
-                       }
-                       if (len == -1) { errCode = errno; }
-                   }
-                   complData = work->workData.ioData.buf;
-                   fd = work->workData.ioData.fd;
-               } else if ( work->workKind & WORKER_WRITE ) {
-                   if ( work->workKind & WORKER_FOR_SOCKET ) {
-                       len = send(work->workData.ioData.fd,
-                                  work->workData.ioData.buf,
-                                  work->workData.ioData.len,
-                                  0);
-                       if (len == SOCKET_ERROR) {
-                           errCode = WSAGetLastError();
-                       }
-                   } else {
-                       len = write(work->workData.ioData.fd,
-                                   work->workData.ioData.buf,
-                                   work->workData.ioData.len);
-                       if (len == -1) { errCode = errno; }
-                   }
-                   complData = work->workData.ioData.buf;
-                   fd = work->workData.ioData.fd;
-               } else if ( work->workKind & WORKER_DELAY ) {
-                   /* Approximate implementation of threadDelay;
-                    * 
-                    * Note: Sleep() is in milliseconds, not micros.
-                    */
-                   Sleep(work->workData.delayData.msecs / 1000);
-                   len = work->workData.delayData.msecs;
-                   complData = NULL;
-                   fd = 0;
-                   errCode = 0;
-               } else if ( work->workKind & WORKER_DO_PROC ) {
-                   /* perform operation/proc on behalf of Haskell thread. */
-                   if (work->workData.procData.proc) {
-                       /* The procedure is assumed to encode result + success/failure
-                        * via its param.
-                        */
-                       errCode=work->workData.procData.proc(work->workData.procData.param);
-                   } else {
-                       errCode=1;
-                   }
-                   complData = work->workData.procData.param;
-               } else {
-                   fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
-                   fflush(stderr);
-                   continue;
-               }
-               if (!work->abandonOp) {
-                   work->onCompletion(work->requestID,
-                                      fd,
-                                      len,
-                                      complData,
-                                      errCode);
-               }
-               /* Free the WorkItem */
-               DeregisterWorkItem(iom,work);
-               free(work);
-           } else {
-               fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
-               return 1;
-           }
-       } else {
-           fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
-           return 1;
-       }
-    }
-    return 0;
-}
-
-static 
-BOOL
-NewIOWorkerThread(IOManagerState* iom)
-{
-    unsigned threadId;
-    return ( 0 != _beginthreadex(NULL,
-                                0,
-                                IOWorkerProc,
-                                (LPVOID)iom,
-                                0,
-                                &threadId) );
-}
-
-BOOL
-StartIOManager(void)
-{
-    HANDLE hExit;
-    WorkQueue* wq;
-
-    wq = NewWorkQueue();
-    if ( !wq ) return FALSE;  
-  
-    ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
-  
-    if (!ioMan) {
-       FreeWorkQueue(wq);
-       return FALSE;
-    }
-
-    /* A manual-reset event */
-    hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
-    if ( !hExit ) {
-       FreeWorkQueue(wq);
-       free(ioMan);
-       return FALSE;
-    }
-  
-    ioMan->hExitEvent = hExit;
-    InitializeCriticalSection(&ioMan->manLock);
-    ioMan->workQueue   = wq;
-    ioMan->numWorkers  = 0;
-    ioMan->workersIdle = 0;
-    ioMan->queueSize   = 0;
-    ioMan->requestID   = 1;
-    InitializeCriticalSection(&ioMan->active_work_lock);
-    ioMan->active_work_items = NULL;
-    return TRUE;
-}
-
-/*
- * Function: depositWorkItem()
- *
- * Local function which deposits a WorkItem onto a work queue,
- * deciding in the process whether or not the thread pool needs
- * to be augmented with another thread to handle the new request.
- *
- */
-static
-int
-depositWorkItem( unsigned int reqID,
-                WorkItem* wItem )
-{
-    EnterCriticalSection(&ioMan->manLock);
-
-#if 0
-    fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
-    fflush(stderr);
-#endif
-    /* A new worker thread is created when there are fewer idle threads
-     * than non-consumed queue requests. This ensures that requests will
-     * be dealt with in a timely manner.
-     *
-     * [Long explanation of why the previous thread pool policy lead to 
-     * trouble]
-     *
-     * Previously, the thread pool was augmented iff no idle worker threads
-     * were available. That strategy runs the risk of repeatedly adding to
-     * the request queue without expanding the thread pool to handle this
-     * sudden spike in queued requests. 
-     * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
-     * thread is created and the request is simply queued. If addIORequest()
-     * is called again _before the OS schedules a worker thread to pull the
-     * request off the queue_, workersIdle is still 1 and another request is 
-     * simply added to the queue. Once the worker thread is run, only one
-     * request is de-queued, leaving the 2nd request in the queue]
-     * 
-     * Assuming none of the queued requests take an inordinate amount of to 
-     * complete, the request queue would eventually be drained. But if that's 
-     * not the case, the later requests will end up languishing in the queue 
-     * indefinitely. The non-timely handling of requests may cause CH applications
-     * to misbehave / hang; bad.
-     *
-     */
-    ioMan->queueSize++;
-    if ( (ioMan->workersIdle < ioMan->queueSize) ) {
-       /* see if giving up our quantum ferrets out some idle threads.
-        */
-       LeaveCriticalSection(&ioMan->manLock);
-       Sleep(0);
-       EnterCriticalSection(&ioMan->manLock);
-       if ( (ioMan->workersIdle < ioMan->queueSize) ) {
-           /* No, go ahead and create another. */
-           ioMan->numWorkers++;
-           LeaveCriticalSection(&ioMan->manLock);
-           NewIOWorkerThread(ioMan);
-       } else {
-           LeaveCriticalSection(&ioMan->manLock);
-       }
-    } else {
-       LeaveCriticalSection(&ioMan->manLock);
-    }
-  
-    if (SubmitWork(ioMan->workQueue,wItem)) {
-       /* Note: the work item has potentially been consumed by a worker thread
-        *       (and freed) at this point, so we cannot use wItem's requestID.
-        */
-       return reqID;
-    } else {
-       return 0;
-    }
-}
-
-/*
- * Function: AddIORequest()
- *
- * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, deciding whether or not to augment
- * the thread pool in the process. 
- */
-int
-AddIORequest ( int   fd,
-              BOOL  forWriting,
-              BOOL  isSocket,
-              int   len,
-              char* buffer,
-              CompletionProc onCompletion)
-{
-    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
-    unsigned int reqID = ioMan->requestID++;
-    if (!ioMan || !wItem) return 0;
-  
-    /* Fill in the blanks */
-    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
-                         ( forWriting ? WORKER_WRITE : WORKER_READ );
-    wItem->workData.ioData.fd  = fd;
-    wItem->workData.ioData.len = len;
-    wItem->workData.ioData.buf = buffer;
-    wItem->link = NULL;
-
-    wItem->onCompletion        = onCompletion;
-    wItem->requestID           = reqID;
-  
-    return depositWorkItem(reqID, wItem);
-}       
-
-/*
- * Function: AddDelayRequest()
- *
- * Like AddIORequest(), but this time adding a delay request to
- * the request queue.
- */
-BOOL
-AddDelayRequest ( unsigned int   msecs,
-                 CompletionProc onCompletion)
-{
-    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-    unsigned int reqID = ioMan->requestID++;
-    if (!ioMan || !wItem) return FALSE;
-  
-    /* Fill in the blanks */
-    wItem->workKind     = WORKER_DELAY;
-    wItem->workData.delayData.msecs = msecs;
-    wItem->onCompletion = onCompletion;
-    wItem->requestID    = reqID;
-    wItem->link         = NULL;
-
-    return depositWorkItem(reqID, wItem);
-}
-
-/*
- * Function: AddProcRequest()
- *
- * Add an asynchronous procedure request.
- */
-BOOL
-AddProcRequest ( void* proc,
-                void* param,
-                CompletionProc onCompletion)
-{
-    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-    unsigned int reqID = ioMan->requestID++;
-    if (!ioMan || !wItem) return FALSE;
-  
-    /* Fill in the blanks */
-    wItem->workKind     = WORKER_DO_PROC;
-    wItem->workData.procData.proc  = proc;
-    wItem->workData.procData.param = param;
-    wItem->onCompletion = onCompletion;
-    wItem->requestID    = reqID;
-    wItem->abandonOp    = 0;
-    wItem->link         = NULL;
-
-    return depositWorkItem(reqID, wItem);
-}
-
-void ShutdownIOManager ( void )
-{
-  SetEvent(ioMan->hExitEvent);
-  // ToDo: we can't free this now, because the worker thread(s)
-  // haven't necessarily finished with it yet.  Perhaps it should
-  // have a reference count or something.
-  // free(ioMan);
-  // ioMan = NULL;
-}
-
-/* Keep track of WorkItems currently being serviced. */
-static 
-void
-RegisterWorkItem(IOManagerState* ioMan, 
-                WorkItem* wi)
-{
-    EnterCriticalSection(&ioMan->active_work_lock);
-    wi->link = ioMan->active_work_items;
-    ioMan->active_work_items = wi;
-    LeaveCriticalSection(&ioMan->active_work_lock);
-}
-
-static 
-void
-DeregisterWorkItem(IOManagerState* ioMan, 
-                  WorkItem* wi)
-{
-    WorkItem *ptr, *prev;
-    
-    EnterCriticalSection(&ioMan->active_work_lock);
-    for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
-       if (wi->requestID == ptr->requestID) {
-           if (prev==NULL) {
-               ioMan->active_work_items = ptr->link;
-           } else {
-               prev->link = ptr->link;
-           }
-           LeaveCriticalSection(&ioMan->active_work_lock);
-           return;
-       }
-    }
-    fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
-    LeaveCriticalSection(&ioMan->active_work_lock);
-}
-
-
-/*
- * Function: abandonWorkRequest()
- *
- * Signal that a work request isn't of interest. Called by the Scheduler
- * if a blocked Haskell thread has an exception thrown to it.
- *
- * Note: we're not aborting the system call that a worker might be blocked on
- * here, just disabling the propagation of its result once its finished. We
- * may have to go the whole hog here and switch to overlapped I/O so that we
- * can abort blocked system calls.
- */
-void
-abandonWorkRequest ( int reqID )
-{
-    WorkItem *ptr;
-    EnterCriticalSection(&ioMan->active_work_lock);
-    for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
-       if (ptr->requestID == (unsigned int)reqID ) {
-           ptr->abandonOp = 1;
-           LeaveCriticalSection(&ioMan->active_work_lock);
-           return;
-       }
-    }
-    /* Note: if the request ID isn't present, the worker will have
-     * finished sometime since awaitRequests() last drained the completed
-     * request table; i.e., not an error.
-     */
-    LeaveCriticalSection(&ioMan->active_work_lock);
-}