Use message-passing to implement throwTo in the RTS
[ghc-hetmet.git] / rts / Capability.c
index ce6eceb..5f54eca 100644 (file)
@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
     cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
-    cap->wakeup_queue_hd    = END_TSO_QUEUE;
-    cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->inbox              = (Message*)END_TSO_QUEUE;
     cap->sparks_created     = 0;
     cap->sparks_converted   = 0;
     cap->sparks_pruned      = 0;
@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
     if (always_wakeup || 
-        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptyRunQueue(cap) || !emptyInbox(cap) ||
         !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *my_cap, 
+wakeupThreadOnCapability (Capability *cap,
                           Capability *other_cap, 
                           StgTSO *tso)
 {
-    ACQUIRE_LOCK(&other_cap->lock);
+    MessageWakeup *msg;
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
     }
     tso->cap = other_cap;
 
-    ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
 
-    if (other_cap->running_task == NULL) {
-       // nobody is running this Capability, we can add our thread
-       // directly onto the run queue and start up a Task to run it.
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
 
-       other_cap->running_task = myTask(); 
-            // precond for releaseCapability_() and appendToRunQueue()
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    msg->header.info = &stg_MSG_WAKEUP_info;
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
 
-       appendToRunQueue(other_cap,tso);
-
-       releaseCapability_(other_cap,rtsFalse);
-    } else {
-       appendToWakeupQueue(my_cap,other_cap,tso);
-        other_cap->context_switch = 1;
-       // someone is running on this Capability, so it cannot be
-       // freed without first checking the wakeup queue (see
-       // releaseCapability_).
-    }
-
-    RELEASE_LOCK(&other_cap->lock);
+    sendMessage(other_cap, (Message*)msg);
 }
 
 /* ----------------------------------------------------------------------------
@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+        evac(user, (StgClosure **)(void *)&cap->inbox);
 #endif
        for (incall = cap->suspended_ccalls; incall != NULL; 
             incall=incall->next) {
@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
 {
     markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 }
+
+/* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+    ACQUIRE_LOCK(&cap->lock);
+
+    msg->link = cap->inbox;
+    cap->inbox = msg;
+
+    if (cap->running_task == NULL) {
+       cap->running_task = myTask(); 
+            // precond for releaseCapability_()
+       releaseCapability_(cap,rtsFalse);
+    } else {
+        contextSwitchCapability(cap);
+    }
+
+    RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS