Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / Schedule.c
index f7b26a4..0850749 100644 (file)
@@ -1235,23 +1235,23 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
          ASSERT(task->incall->tso == t);
 
          if (t->what_next == ThreadComplete) {
-             if (task->ret) {
+             if (task->incall->ret) {
                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-                 *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; 
+                 *(task->incall->ret) = (StgClosure *)task->incall->tso->sp[1]; 
              }
-             task->stat = Success;
+             task->incall->stat = Success;
          } else {
-             if (task->ret) {
-                 *(task->ret) = NULL;
+             if (task->incall->ret) {
+                 *(task->incall->ret) = NULL;
              }
              if (sched_state >= SCHED_INTERRUPTING) {
                   if (heap_overflow) {
-                      task->stat = HeapExhausted;
+                      task->incall->stat = HeapExhausted;
                   } else {
-                      task->stat = Interrupted;
+                      task->incall->stat = Interrupted;
                   }
              } else {
-                 task->stat = Killed;
+                 task->incall->stat = Killed;
              }
          }
 #ifdef DEBUG
@@ -1533,10 +1533,14 @@ forkProcess(HsStablePtr *entry
     ACQUIRE_LOCK(&cap->lock);
     ACQUIRE_LOCK(&cap->running_task->lock);
 
+    stopTimer(); // See #4074
+
     pid = fork();
     
     if (pid) { // parent
        
+        startTimer(); // #4074
+
         RELEASE_LOCK(&sched_mutex);
         RELEASE_LOCK(&cap->lock);
         RELEASE_LOCK(&cap->running_task->lock);
@@ -1712,13 +1716,17 @@ recoverSuspendedTask (Capability *cap, Task *task)
  * the whole system.
  *
  * The Haskell thread making the C call is put to sleep for the
- * duration of the call, on the susepended_ccalling_threads queue.  We
+ * duration of the call, on the suspended_ccalling_threads queue.  We
  * give out a token to the task, which it can use to resume the thread
  * on return from the C function.
+ *
+ * If this is an interruptible C call, this means that the FFI call may be
+ * unceremoniously terminated and should be scheduled on an
+ * unbound worker thread.
  * ------------------------------------------------------------------------- */
    
 void *
-suspendThread (StgRegTable *reg)
+suspendThread (StgRegTable *reg, rtsBool interruptible)
 {
   Capability *cap;
   int saved_errno;
@@ -1747,12 +1755,10 @@ suspendThread (StgRegTable *reg)
 
   threadPaused(cap,tso);
 
-  if ((tso->flags & TSO_BLOCKEX) == 0)  {
-      tso->why_blocked = BlockedOnCCall;
-      tso->flags |= TSO_BLOCKEX;
-      tso->flags &= ~TSO_INTERRUPTIBLE;
+  if (interruptible) {
+    tso->why_blocked = BlockedOnCCall_Interruptible;
   } else {
-      tso->why_blocked = BlockedOnCCall_NoUnblockExc;
+    tso->why_blocked = BlockedOnCCall;
   }
 
   // Hand back capability
@@ -1811,12 +1817,11 @@ resumeThread (void *task_)
 
     traceEventRunThread(cap, tso);
     
-    if (tso->why_blocked == BlockedOnCCall) {
+    if ((tso->flags & TSO_BLOCKEX) == 0) {
         // avoid locking the TSO if we don't have to
         if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
         }
-       tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
     }
     
     /* Reset blocking status */
@@ -1887,8 +1892,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     tso->cap = cap;
 
     task->incall->tso = tso;
-    task->ret = ret;
-    task->stat = NoStatus;
+    task->incall->ret = ret;
+    task->incall->stat = NoStatus;
 
     appendToRunQueue(cap,tso);
 
@@ -1897,7 +1902,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 
     cap = schedule(cap,task);
 
-    ASSERT(task->stat != NoStatus);
+    ASSERT(task->incall->stat != NoStatus);
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
@@ -1997,12 +2002,7 @@ initScheduler(void)
 }
 
 void
-exitScheduler(
-    rtsBool wait_foreign
-#if !defined(THREADED_RTS)
-                         __attribute__((unused))
-#endif
-)
+exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
                /* see Capability.c, shutdownCapability() */
 {
     Task *task = NULL;
@@ -2332,7 +2332,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
     // we must own all Capabilities.
 
     if (tso->why_blocked != BlockedOnCCall &&
-       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+       tso->why_blocked != BlockedOnCCall_Interruptible) {
        throwToSingleThreaded(tso->cap,tso,NULL);
     }
 }
@@ -2344,7 +2344,7 @@ deleteThread_(Capability *cap, StgTSO *tso)
   // like deleteThread(), but we delete threads in foreign calls, too.
 
     if (tso->why_blocked == BlockedOnCCall ||
-       tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+       tso->why_blocked == BlockedOnCCall_Interruptible) {
        tso->what_next = ThreadKilled;
        appendToRunQueue(tso->cap, tso);
     } else {
@@ -2551,6 +2551,12 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
+        case BlockedOnMsgThrowTo:
+            // This can happen if the target is masking, blocks on a
+            // black hole, and then is found to be unreachable.  In
+            // this case, we want to let the target wake up and carry
+            // on, and do nothing to this thread.
+            continue;
        default:
            barf("resurrectThreads: thread blocked in a strange way: %d",
                  tso->why_blocked);