Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / Schedule.c
index d7d5741..0850749 100644 (file)
@@ -125,7 +125,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 static void schedulePreLoop (void);
 static void scheduleFindWork (Capability *cap);
 #if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task, rtsBool);
+static void scheduleYield (Capability **pcap, Task *task);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
@@ -204,7 +204,6 @@ schedule (Capability *initialCapability, Task *task)
   rtsBool ready_to_gc;
 #if defined(THREADED_RTS)
   rtsBool first = rtsTrue;
-  rtsBool force_yield = rtsFalse;
 #endif
   
   cap = initialCapability;
@@ -328,9 +327,7 @@ schedule (Capability *initialCapability, Task *task)
     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
     }
 
-  yield:
-    scheduleYield(&cap,task,force_yield);
-    force_yield = rtsFalse;
+    scheduleYield(&cap,task);
 
     if (emptyRunQueue(cap)) continue; // look for work again
 #endif
@@ -490,19 +487,6 @@ run_thread:
 
     traceEventStopThread(cap, t, ret);
 
-#if defined(THREADED_RTS)
-    // If ret is ThreadBlocked, and this Task is bound to the TSO that
-    // blocked, we are in limbo - the TSO is now owned by whatever it
-    // is blocked on, and may in fact already have been woken up,
-    // perhaps even on a different Capability.  It may be the case
-    // that task->cap != cap.  We better yield this Capability
-    // immediately and return to normaility.
-    if (ret == ThreadBlocked) {
-        force_yield = rtsTrue;
-        goto yield;
-    }
-#endif
-
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
     ASSERT(t->cap == cap);
 
@@ -639,23 +623,13 @@ shouldYieldCapability (Capability *cap, Task *task)
 // and also check the benchmarks in nofib/parallel for regressions.
 
 static void
-scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
+scheduleYield (Capability **pcap, Task *task)
 {
     Capability *cap = *pcap;
 
     // if we have work, and we don't need to give up the Capability, continue.
     //
-    // The force_yield flag is used when a bound thread blocks.  This
-    // is a particularly tricky situation: the current Task does not
-    // own the TSO any more, since it is on some queue somewhere, and
-    // might be woken up or manipulated by another thread at any time.
-    // The TSO and Task might be migrated to another Capability.
-    // Certain invariants might be in doubt, such as task->bound->cap
-    // == cap.  We have to yield the current Capability immediately,
-    // no messing around.
-    //
-    if (!force_yield &&
-        !shouldYieldCapability(cap,task) && 
+    if (!shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
          !emptyInbox(cap) ||
          sched_state >= SCHED_INTERRUPTING))
@@ -1261,23 +1235,23 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
          ASSERT(task->incall->tso == t);
 
          if (t->what_next == ThreadComplete) {
-             if (task->ret) {
+             if (task->incall->ret) {
                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-                 *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; 
+                 *(task->incall->ret) = (StgClosure *)task->incall->tso->sp[1]; 
              }
-             task->stat = Success;
+             task->incall->stat = Success;
          } else {
-             if (task->ret) {
-                 *(task->ret) = NULL;
+             if (task->incall->ret) {
+                 *(task->incall->ret) = NULL;
              }
              if (sched_state >= SCHED_INTERRUPTING) {
                   if (heap_overflow) {
-                      task->stat = HeapExhausted;
+                      task->incall->stat = HeapExhausted;
                   } else {
-                      task->stat = Interrupted;
+                      task->incall->stat = Interrupted;
                   }
              } else {
-                 task->stat = Killed;
+                 task->incall->stat = Killed;
              }
          }
 #ifdef DEBUG
@@ -1559,10 +1533,14 @@ forkProcess(HsStablePtr *entry
     ACQUIRE_LOCK(&cap->lock);
     ACQUIRE_LOCK(&cap->running_task->lock);
 
+    stopTimer(); // See #4074
+
     pid = fork();
     
     if (pid) { // parent
        
+        startTimer(); // #4074
+
         RELEASE_LOCK(&sched_mutex);
         RELEASE_LOCK(&cap->lock);
         RELEASE_LOCK(&cap->running_task->lock);
@@ -1738,13 +1716,17 @@ recoverSuspendedTask (Capability *cap, Task *task)
  * the whole system.
  *
  * The Haskell thread making the C call is put to sleep for the
- * duration of the call, on the susepended_ccalling_threads queue.  We
+ * duration of the call, on the suspended_ccalling_threads queue.  We
  * give out a token to the task, which it can use to resume the thread
  * on return from the C function.
+ *
+ * If this is an interruptible C call, this means that the FFI call may be
+ * unceremoniously terminated and should be scheduled on an
+ * unbound worker thread.
  * ------------------------------------------------------------------------- */
    
 void *
-suspendThread (StgRegTable *reg)
+suspendThread (StgRegTable *reg, rtsBool interruptible)
 {
   Capability *cap;
   int saved_errno;
@@ -1773,12 +1755,10 @@ suspendThread (StgRegTable *reg)
 
   threadPaused(cap,tso);
 
-  if ((tso->flags & TSO_BLOCKEX) == 0)  {
-      tso->why_blocked = BlockedOnCCall;
-      tso->flags |= TSO_BLOCKEX;
-      tso->flags &= ~TSO_INTERRUPTIBLE;
+  if (interruptible) {
+    tso->why_blocked = BlockedOnCCall_Interruptible;
   } else {
-      tso->why_blocked = BlockedOnCCall_NoUnblockExc;
+    tso->why_blocked = BlockedOnCCall;
   }
 
   // Hand back capability
@@ -1837,12 +1817,11 @@ resumeThread (void *task_)
 
     traceEventRunThread(cap, tso);
     
-    if (tso->why_blocked == BlockedOnCCall) {
+    if ((tso->flags & TSO_BLOCKEX) == 0) {
         // avoid locking the TSO if we don't have to
         if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
         }
-       tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
     }
     
     /* Reset blocking status */
@@ -1891,8 +1870,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
-        traceEventMigrateThread (cap, tso, capabilities[cpu].no);
-       wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
+        migrateThread(cap, tso, &capabilities[cpu]);
     }
 #else
     appendToRunQueue(cap,tso);
@@ -1914,8 +1892,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     tso->cap = cap;
 
     task->incall->tso = tso;
-    task->ret = ret;
-    task->stat = NoStatus;
+    task->incall->ret = ret;
+    task->incall->stat = NoStatus;
 
     appendToRunQueue(cap,tso);
 
@@ -1924,7 +1902,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 
     cap = schedule(cap,task);
 
-    ASSERT(task->stat != NoStatus);
+    ASSERT(task->incall->stat != NoStatus);
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
@@ -2024,12 +2002,7 @@ initScheduler(void)
 }
 
 void
-exitScheduler(
-    rtsBool wait_foreign
-#if !defined(THREADED_RTS)
-                         __attribute__((unused))
-#endif
-)
+exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
                /* see Capability.c, shutdownCapability() */
 {
     Task *task = NULL;
@@ -2359,7 +2332,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
     // we must own all Capabilities.
 
     if (tso->why_blocked != BlockedOnCCall &&
-       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+       tso->why_blocked != BlockedOnCCall_Interruptible) {
        throwToSingleThreaded(tso->cap,tso,NULL);
     }
 }
@@ -2371,9 +2344,9 @@ deleteThread_(Capability *cap, StgTSO *tso)
   // like deleteThread(), but we delete threads in foreign calls, too.
 
     if (tso->why_blocked == BlockedOnCCall ||
-       tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
-       unblockOne(cap,tso);
+       tso->why_blocked == BlockedOnCCall_Interruptible) {
        tso->what_next = ThreadKilled;
+       appendToRunQueue(tso->cap, tso);
     } else {
        deleteThread(cap,tso);
     }
@@ -2578,6 +2551,12 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
+        case BlockedOnMsgThrowTo:
+            // This can happen if the target is masking, blocks on a
+            // black hole, and then is found to be unreachable.  In
+            // this case, we want to let the target wake up and carry
+            // on, and do nothing to this thread.
+            continue;
        default:
            barf("resurrectThreads: thread blocked in a strange way: %d",
                  tso->why_blocked);