*
* (c) sof, 2002-2003.
*/
+#include "Rts.h"
#include "IOManager.h"
#include "WorkQueue.h"
+#include "ConsoleHandler.h"
#include <stdio.h>
#include <stdlib.h>
#include <io.h>
int workersIdle;
HANDLE hExitEvent;
unsigned int requestID;
+ /* fields for keeping track of active WorkItems */
+ CritSection active_work_lock;
+ WorkItem* active_work_items;
} IOManagerState;
/* ToDo: wrap up this state via a IOManager handle instead? */
static IOManagerState* ioMan;
+static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
+static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
+
/*
* The routine executed by each worker thread.
*/
WorkQueue* pq = iom->workQueue;
WorkItem* work;
int len = 0, fd = 0;
- DWORD errCode;
+ DWORD errCode = 0;
void* complData;
hWaits[0] = (HANDLE)iom->hExitEvent;
*/
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
+ if (rc == WAIT_OBJECT_0) {
+ // we received the exit event
+ return 0;
+ }
+
EnterCriticalSection(&iom->manLock);
/* Signal that the thread is 'non-idle' and about to consume
* a work item.
iom->queueSize--;
LeaveCriticalSection(&iom->manLock);
- if ( WAIT_OBJECT_0 == rc ) {
- /* shutdown */
- return 0;
- } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+ if ( rc == (WAIT_OBJECT_0 + 1) ) {
/* work item available, fetch it. */
if (FetchWork(pq,(void**)&work)) {
+ work->abandonOp = 0;
+ RegisterWorkItem(iom,work);
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = recv(work->workData.ioData.fd,
errCode = WSAGetLastError();
}
} else {
+ while (1) {
+ /* Do the read(), with extra-special handling for Ctrl+C */
len = read(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
+ if ( len == 0 && work->workData.ioData.len != 0 ) {
+ /* Given the following scenario:
+ * - a console handler has been registered that handles Ctrl+C
+ * events.
+ * - we've not tweaked the 'console mode' settings to turn on
+ * ENABLE_PROCESSED_INPUT.
+ * - we're blocked waiting on input from standard input.
+ * - the user hits Ctrl+C.
+ *
+ * The OS will invoke the console handler (in a separate OS thread),
+ * and the above read() (i.e., under the hood, a ReadFile() op) returns
+ * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
+ * want to percolate this error condition back to the Haskell user.
+ * Do this by waiting for the completion of the Haskell console handler.
+ * If upon completion of the console handler routine, the Haskell thread
+ * that issued the request is found to have been thrown an exception,
+ * the worker abandons the request (since that's what the Haskell thread
+ * has done.) If the Haskell thread hasn't been interrupted, the worker
+ * retries the read request as if nothing happened.
+ */
+ if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
+ /* For now, only abort when dealing with the standard input handle.
+ * i.e., for all others, an error is raised.
+ */
+ HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
+ if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
+ if (rts_waitConsoleHandlerCompletion()) {
+ /* If the Scheduler has set work->abandonOp, the Haskell thread has
+ * been thrown an exception (=> the worker must abandon this request.)
+ * We test for this below before invoking the on-completion routine.
+ */
+ if (work->abandonOp) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } else {
+ break; /* Treat it like an error */
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fflush(stderr);
continue;
}
- work->onCompletion(work->requestID,
- fd,
- len,
- complData,
- errCode);
+ if (!work->abandonOp) {
+ work->onCompletion(work->requestID,
+ fd,
+ len,
+ complData,
+ errCode);
+ }
/* Free the WorkItem */
+ DeregisterWorkItem(iom,work);
free(work);
} else {
fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
return 1;
}
} else {
- fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+ fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
return 1;
}
}
ioMan->workersIdle = 0;
ioMan->queueSize = 0;
ioMan->requestID = 1;
+ InitializeCriticalSection(&ioMan->active_work_lock);
+ ioMan->active_work_items = NULL;
return TRUE;
}
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
+ wItem->link = NULL;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
wItem->workData.delayData.msecs = msecs;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
+ wItem->link = NULL;
return depositWorkItem(reqID, wItem);
}
wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
+ wItem->abandonOp = 0;
+ wItem->link = NULL;
return depositWorkItem(reqID, wItem);
}
-void ShutdownIOManager()
+void ShutdownIOManager ( void )
{
SetEvent(ioMan->hExitEvent);
- free(ioMan);
- ioMan = NULL;
+ // ToDo: we can't free this now, because the worker thread(s)
+ // haven't necessarily finished with it yet. Perhaps it should
+ // have a reference count or something.
+ // free(ioMan);
+ // ioMan = NULL;
+}
+
+/* Keep track of WorkItems currently being serviced. */
+static
+void
+RegisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ EnterCriticalSection(&ioMan->active_work_lock);
+ wi->link = ioMan->active_work_items;
+ ioMan->active_work_items = wi;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+static
+void
+DeregisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ WorkItem *ptr, *prev;
+
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
+ if (wi->requestID == ptr->requestID) {
+ if (prev==NULL) {
+ ioMan->active_work_items = ptr->link;
+ } else {
+ prev->link = ptr->link;
+ }
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+
+/*
+ * Function: abandonWorkRequest()
+ *
+ * Signal that a work request isn't of interest. Called by the Scheduler
+ * if a blocked Haskell thread has an exception thrown to it.
+ *
+ * Note: we're not aborting the system call that a worker might be blocked on
+ * here, just disabling the propagation of its result once its finished. We
+ * may have to go the whole hog here and switch to overlapped I/O so that we
+ * can abort blocked system calls.
+ */
+void
+abandonWorkRequest ( int reqID )
+{
+ WorkItem *ptr;
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
+ if (ptr->requestID == (unsigned int)reqID ) {
+ ptr->abandonOp = 1;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ /* Note: if the request ID isn't present, the worker will have
+ * finished sometime since awaitRequests() last drained the completed
+ * request table; i.e., not an error.
+ */
+ LeaveCriticalSection(&ioMan->active_work_lock);
}