Use message-passing to implement throwTo in the RTS
[ghc-hetmet.git] / rts / Capability.c
index f4fdd70..5f54eca 100644 (file)
@@ -98,7 +98,7 @@ findSpark (Capability *cap)
           cap->sparks_converted++;
 
           // Post event for running a spark from capability's own pool.
-          traceSchedEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO, 0);
+          traceEventRunSpark(cap, cap->r.rCurrentTSO);
 
           return spark;
       }
@@ -132,8 +132,7 @@ findSpark (Capability *cap)
           if (spark != NULL) {
               cap->sparks_converted++;
 
-              traceSchedEvent(cap, EVENT_STEAL_SPARK, 
-                              cap->r.rCurrentTSO, robbed->no);
+              traceEventStealSpark(cap, cap->r.rCurrentTSO, robbed->no);
               
               return spark;
           }
@@ -174,10 +173,10 @@ STATIC_INLINE void
 newReturningTask (Capability *cap, Task *task)
 {
     ASSERT_LOCK_HELD(&cap->lock);
-    ASSERT(task->return_link == NULL);
+    ASSERT(task->next == NULL);
     if (cap->returning_tasks_hd) {
-       ASSERT(cap->returning_tasks_tl->return_link == NULL);
-       cap->returning_tasks_tl->return_link = task;
+       ASSERT(cap->returning_tasks_tl->next == NULL);
+       cap->returning_tasks_tl->next = task;
     } else {
        cap->returning_tasks_hd = task;
     }
@@ -191,11 +190,11 @@ popReturningTask (Capability *cap)
     Task *task;
     task = cap->returning_tasks_hd;
     ASSERT(task);
-    cap->returning_tasks_hd = task->return_link;
+    cap->returning_tasks_hd = task->next;
     if (!cap->returning_tasks_hd) {
        cap->returning_tasks_tl = NULL;
     }
-    task->return_link = NULL;
+    task->next = NULL;
     return task;
 }
 #endif
@@ -221,11 +220,10 @@ initCapability( Capability *cap, nat i )
     initMutex(&cap->lock);
     cap->running_task      = NULL; // indicates cap is free
     cap->spare_workers     = NULL;
-    cap->suspended_ccalling_tasks = NULL;
+    cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
-    cap->wakeup_queue_hd    = END_TSO_QUEUE;
-    cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->inbox              = (Message*)END_TSO_QUEUE;
     cap->sparks_created     = 0;
     cap->sparks_converted   = 0;
     cap->sparks_pruned      = 0;
@@ -343,7 +341,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
     ASSERT_LOCK_HELD(&cap->lock);
     ASSERT(task->cap == cap);
     debugTrace(DEBUG_sched, "passing capability %d to %s %p",
-               cap->no, task->tso ? "bound task" : "worker",
+               cap->no, task->incall->tso ? "bound task" : "worker",
                (void *)task->id);
     ACQUIRE_LOCK(&task->lock);
     task->wakeup = rtsTrue;
@@ -395,8 +393,11 @@ releaseCapability_ (Capability* cap,
     // give this Capability to the appropriate Task.
     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
        // Make sure we're not about to try to wake ourselves up
-       ASSERT(task != cap->run_queue_hd->bound);
-       task = cap->run_queue_hd->bound;
+       // ASSERT(task != cap->run_queue_hd->bound);
+        // assertion is false: in schedule() we force a yield after
+       // ThreadBlocked, but the thread may be back on the run queue
+       // by now.
+       task = cap->run_queue_hd->bound->task;
        giveCapabilityToTask(cap,task);
        return;
     }
@@ -409,7 +410,7 @@ releaseCapability_ (Capability* cap,
        if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
            debugTrace(DEBUG_sched,
                       "starting new worker on capability %d", cap->no);
-           startWorkerTask(cap, workerStart);
+           startWorkerTask(cap);
            return;
        }
     }
@@ -417,7 +418,7 @@ releaseCapability_ (Capability* cap,
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
     if (always_wakeup || 
-        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptyRunQueue(cap) || !emptyInbox(cap) ||
         !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
@@ -460,9 +461,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
     // in which case it is not replaced on the spare_worker queue.
     // This happens when the system is shutting down (see
     // Schedule.c:workerStart()).
-    // Also, be careful to check that this task hasn't just exited
-    // Haskell to do a foreign call (task->suspended_tso).
-    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+    if (!isBoundTask(task) && !task->stopped) {
        task->next = cap->spare_workers;
        cap->spare_workers = task;
     }
@@ -579,9 +578,9 @@ yieldCapability (Capability** pCap, Task *task)
     Capability *cap = *pCap;
 
     if (waiting_for_gc == PENDING_GC_PAR) {
-        traceSchedEvent(cap, EVENT_GC_START, 0, 0);
+        traceEventGcStart(cap);
         gcWorkerThread(cap);
-        traceSchedEvent(cap, EVENT_GC_END, 0, 0);
+        traceEventGcEnd(cap);
         return;
     }
 
@@ -610,7 +609,7 @@ yieldCapability (Capability** pCap, Task *task)
                continue;
            }
 
-           if (task->tso == NULL) {
+           if (task->incall->tso == NULL) {
                ASSERT(cap->spare_workers != NULL);
                // if we're not at the front of the queue, release it
                // again.  This is unlikely to happen.
@@ -645,40 +644,33 @@ yieldCapability (Capability** pCap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *my_cap, 
+wakeupThreadOnCapability (Capability *cap,
                           Capability *other_cap, 
                           StgTSO *tso)
 {
-    ACQUIRE_LOCK(&other_cap->lock);
+    MessageWakeup *msg;
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
-       ASSERT(tso->bound->cap == tso->cap);
-       tso->bound->cap = other_cap;
+       ASSERT(tso->bound->task->cap == tso->cap);
+       tso->bound->task->cap = other_cap;
     }
     tso->cap = other_cap;
 
-    ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
 
-    if (other_cap->running_task == NULL) {
-       // nobody is running this Capability, we can add our thread
-       // directly onto the run queue and start up a Task to run it.
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
 
-       other_cap->running_task = myTask(); 
-            // precond for releaseCapability_() and appendToRunQueue()
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    msg->header.info = &stg_MSG_WAKEUP_info;
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
 
-       appendToRunQueue(other_cap,tso);
-
-       releaseCapability_(other_cap,rtsFalse);
-    } else {
-       appendToWakeupQueue(my_cap,other_cap,tso);
-        other_cap->context_switch = 1;
-       // someone is running on this Capability, so it cannot be
-       // freed without first checking the wakeup queue (see
-       // releaseCapability_).
-    }
-
-    RELEASE_LOCK(&other_cap->lock);
+    sendMessage(other_cap, (Message*)msg);
 }
 
 /* ----------------------------------------------------------------------------
@@ -779,16 +771,24 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
         // that will try to return to code that has been unloaded.
         // We can be a bit more relaxed when this is a standalone
         // program that is about to terminate, and let safe=false.
-        if (cap->suspended_ccalling_tasks && safe) {
+        if (cap->suspended_ccalls && safe) {
            debugTrace(DEBUG_sched, 
                       "thread(s) are involved in foreign calls, yielding");
             cap->running_task = NULL;
            RELEASE_LOCK(&cap->lock);
+            // The IO manager thread might have been slow to start up,
+            // so the first attempt to kill it might not have
+            // succeeded.  Just in case, try again - the kill message
+            // will only be sent once.
+            //
+            // To reproduce this deadlock: run ffi002(threaded1)
+            // repeatedly on a loaded machine.
+            ioManagerDie();
             yieldThread();
             continue;
         }
-            
-        traceSchedEvent(cap, EVENT_SHUTDOWN, 0, 0);
+
+        traceEventShutdown(cap);
        RELEASE_LOCK(&cap->lock);
        break;
     }
@@ -861,7 +861,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
 {
     nat i;
     Capability *cap;
-    Task *task;
+    InCall *incall;
 
     // Each GC thread is responsible for following roots from the
     // Capability of the same number.  There will usually be the same
@@ -873,12 +873,11 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+        evac(user, (StgClosure **)(void *)&cap->inbox);
 #endif
-       for (task = cap->suspended_ccalling_tasks; task != NULL; 
-            task=task->next) {
-           evac(user, (StgClosure **)(void *)&task->suspended_tso);
+       for (incall = cap->suspended_ccalls; incall != NULL; 
+            incall=incall->next) {
+           evac(user, (StgClosure **)(void *)&incall->suspended_tso);
        }
 
 #if defined(THREADED_RTS)
@@ -902,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
 {
     markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 }
+
+/* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+    ACQUIRE_LOCK(&cap->lock);
+
+    msg->link = cap->inbox;
+    cap->inbox = msg;
+
+    if (cap->running_task == NULL) {
+       cap->running_task = myTask(); 
+            // precond for releaseCapability_()
+       releaseCapability_(cap,rtsFalse);
+    } else {
+        contextSwitchCapability(cap);
+    }
+
+    RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS