Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / Threads.c
index 05a13c7..7344134 100644 (file)
@@ -205,83 +205,26 @@ removeThreadFromDeQueue (Capability *cap,
     barf("removeThreadFromMVarQueue: not found");
 }
 
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
-    // caller must do the write barrier, because replacing the info
-    // pointer will unlock the MVar.
-    removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
-    tso->_link = END_TSO_QUEUE;
-}
-
 /* ----------------------------------------------------------------------------
-   unblockOne()
+   tryWakeupThread()
 
-   unblock a single thread.
-   ------------------------------------------------------------------------- */
+   Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
+   always safe to call it too many times, but it is not safe in
+   general to omit a call.
 
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
-    return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
+   ------------------------------------------------------------------------- */
 
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso, 
-            rtsBool allow_migrate USED_IF_THREADS)
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
 {
-  StgTSO *next;
-
-  // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
-  ASSERT(tso->why_blocked != NotBlocked);
-  ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-         tso->block_info.closure->header.info == &stg_IND_info);
-
-  next = tso->_link;
-  tso->_link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
-  if (tso->cap == cap || (!tsoLocked(tso) && 
-                         allow_migrate && 
-                         RtsFlags.ParFlags.wakeupMigrate)) {
-      // We are waking up this thread on the current Capability, which
-      // might involve migrating it from the Capability it was last on.
-      if (tso->bound) {
-         ASSERT(tso->bound->task->cap == tso->cap);
-         tso->bound->task->cap = cap;
-      }
-
-      tso->cap = cap;
-      write_barrier();
-      tso->why_blocked = NotBlocked;
-      appendToRunQueue(cap,tso);
-
-      // context-switch soonish so we can migrate the new thread if
-      // necessary.  NB. not contextSwitchCapability(cap), which would
-      // force a context switch immediately.
-      cap->context_switch = 1;
-  } else {
-      // we'll try to wake it up on the Capability it was last on.
-      wakeupThreadOnCapability(cap, tso->cap, tso);
-  }
-#else
-  tso->why_blocked = NotBlocked;
-  appendToRunQueue(cap,tso);
-
-  // context-switch soonish so we can migrate the new thread if
-  // necessary.  NB. not contextSwitchCapability(cap), which would
-  // force a context switch immediately.
-  cap->context_switch = 1;
-#endif
-
-  traceEventThreadWakeup (cap, tso, tso->cap->no);
-
-  return next;
+    tryWakeupThread_(cap, deRefTSO(tso));
 }
 
 void
-tryWakeupThread (Capability *cap, StgTSO *tso)
+tryWakeupThread_ (Capability *cap, StgTSO *tso)
 {
+    traceEventThreadWakeup (cap, tso, tso->cap->no);
+
 #ifdef THREADED_RTS
     if (tso->cap != cap)
     {
@@ -298,6 +241,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
 
     switch (tso->why_blocked)
     {
+    case BlockedOnMVar:
+    {
+        if (tso->_link == END_TSO_QUEUE) {
+            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+            goto unblock;
+        } else {
+            return;
+        }
+    }
+
     case BlockedOnMsgThrowTo:
     {
         const StgInfoTable *i;
@@ -307,27 +260,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
         if (i != &stg_MSG_NULL_info) {
             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                           (lnat)tso->id, tso->block_info.throwto->header.info);
-            break; // still blocked
+            return;
         }
 
         // remove the block frame from the stack
         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
         tso->sp += 3;
-        // fall through...
+        goto unblock;
     }
+
     case BlockedOnBlackHole:
     case BlockedOnSTM:
-    {
-        // just run the thread now, if the BH is not really available,
-        // we'll block again.
-        tso->why_blocked = NotBlocked;
-        appendToRunQueue(cap,tso);
-        break;
-    }
+    case ThreadMigrating:
+        goto unblock;
+
     default:
         // otherwise, do nothing
-        break;
+        return;
     }
+
+unblock:
+    // just run the thread now, if the BH is not really available,
+    // we'll block again.
+    tso->why_blocked = NotBlocked;
+    appendToRunQueue(cap,tso);
+}
+
+/* ----------------------------------------------------------------------------
+   migrateThread
+   ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+    traceEventMigrateThread (from, tso, to->no);
+    // ThreadMigrating tells the target cap that it needs to be added to
+    // the run queue when it receives the MSG_TRY_WAKEUP.
+    tso->why_blocked = ThreadMigrating;
+    tso->cap = to;
+    tryWakeupThread(from, tso);
 }
 
 /* ----------------------------------------------------------------------------
@@ -417,6 +388,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
     i = thunk->header.info;
     if (i != &stg_BLACKHOLE_info &&
         i != &stg_CAF_BLACKHOLE_info &&
+        i != &__stg_EAGER_BLACKHOLE_info &&
         i != &stg_WHITEHOLE_info) {
         updateWithIndirection(cap, thunk, val);
         return;
@@ -450,47 +422,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
     }
 }
 
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
-                          Capability *other_cap, 
-                          StgTSO *tso)
-{
-    MessageWakeup *msg;
-
-    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
-    if (tso->bound) {
-       ASSERT(tso->bound->task->cap == tso->cap);
-       tso->bound->task->cap = other_cap;
-    }
-    tso->cap = other_cap;
-
-    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-           tso->block_info.closure->header.info == &stg_IND_info);
-
-    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
-    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
-    SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
-    msg->tso = tso;
-    tso->block_info.closure = (StgClosure *)msg;
-    dirty_TSO(cap, tso);
-    write_barrier();
-    tso->why_blocked = BlockedOnMsgWakeup;
-
-    sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
 /* ---------------------------------------------------------------------------
  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
  * used by Control.Concurrent for error checking.
@@ -549,20 +480,20 @@ printThreadBlockage(StgTSO *tso)
       debugBelch("is blocked on a black hole %p", 
                  ((StgBlockingQueue*)tso->block_info.bh->bh));
     break;
-  case BlockedOnMsgWakeup:
-    debugBelch("is blocked on a wakeup message");
-    break;
   case BlockedOnMsgThrowTo:
     debugBelch("is blocked on a throwto message");
     break;
   case NotBlocked:
     debugBelch("is not blocked");
     break;
+  case ThreadMigrating:
+    debugBelch("is runnable, but not on the run queue");
+    break;
   case BlockedOnCCall:
     debugBelch("is blocked on an external call");
     break;
-  case BlockedOnCCall_NoUnblockExc:
-    debugBelch("is blocked on an external call (exceptions were already blocked)");
+  case BlockedOnCCall_Interruptible:
+    debugBelch("is blocked on an external call (but may be interrupted)");
     break;
   case BlockedOnSTM:
     debugBelch("is blocked on an STM operation");