New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / Threads.c
index 28820c8..0c3e591 100644 (file)
@@ -9,11 +9,16 @@
 #include "PosixSource.h"
 #include "Rts.h"
 
+#include "Capability.h"
+#include "Updates.h"
 #include "Threads.h"
 #include "STM.h"
 #include "Schedule.h"
 #include "Trace.h"
 #include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
 
 /* Next thread ID to allocate.
  * LOCK: sched_mutex
@@ -63,7 +68,7 @@ createThread(Capability *cap, nat size)
     }
 
     size = round_to_mblocks(size);
-    tso = (StgTSO *)allocateLocal(cap, size);
+    tso = (StgTSO *)allocate(cap, size);
 
     stack_size = size - TSO_STRUCT_SIZEW;
     TICK_ALLOC_TSO(stack_size, 0);
@@ -74,8 +79,11 @@ createThread(Capability *cap, nat size)
     tso->what_next = ThreadRunGHC;
 
     tso->why_blocked  = NotBlocked;
-    tso->blocked_exceptions = END_TSO_QUEUE;
-    tso->flags = TSO_DIRTY;
+    tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+    tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
+    tso->flags = 0;
+    tso->dirty = 1;
     
     tso->saved_errno = 0;
     tso->bound = NULL;
@@ -101,15 +109,13 @@ createThread(Capability *cap, nat size)
      */
     ACQUIRE_LOCK(&sched_mutex);
     tso->id = next_thread_id++;  // while we have the mutex
-    tso->global_link = g0s0->threads;
-    g0s0->threads = tso;
+    tso->global_link = g0->threads;
+    g0->threads = tso;
     RELEASE_LOCK(&sched_mutex);
     
-    postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
+    // ToDo: report the stack size in the event?
+    traceEventCreateThread(cap, tso);
 
-    debugTrace(DEBUG_sched,
-              "created thread %ld, stack size = %lx words", 
-              (long)tso->id, (long)tso->stack_size);
     return tso;
 }
 
@@ -147,7 +153,7 @@ rts_getThreadId(StgPtr tso)
    Fails fatally if the TSO is not on the queue.
    -------------------------------------------------------------------------- */
 
-void
+rtsBool // returns True if we modified queue
 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
 {
     StgTSO *t, *prev;
@@ -157,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
+                return rtsFalse;
            } else {
                *queue = t->_link;
+                return rtsTrue;
            }
-           return;
        }
     }
     barf("removeThreadFromQueue: not found");
 }
 
-void
+rtsBool // returns True if we modified head or tail
 removeThreadFromDeQueue (Capability *cap, 
                          StgTSO **head, StgTSO **tail, StgTSO *tso)
 {
     StgTSO *t, *prev;
+    rtsBool flag = rtsFalse;
 
     prev = NULL;
     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
+                flag = rtsFalse;
            } else {
                *head = t->_link;
+                flag = rtsTrue;
            }
            if (*tail == tso) {
                if (prev) {
@@ -186,8 +196,10 @@ removeThreadFromDeQueue (Capability *cap,
                } else {
                    *tail = END_TSO_QUEUE;
                }
-           }
-           return;
+                return rtsTrue;
+           } else {
+                return flag;
+            }
        }
     }
     barf("removeThreadFromMVarQueue: not found");
@@ -196,7 +208,10 @@ removeThreadFromDeQueue (Capability *cap,
 void
 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
 {
+    // caller must do the write barrier, because replacing the info
+    // pointer will unlock the MVar.
     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
+    tso->_link = END_TSO_QUEUE;
 }
 
 /* ----------------------------------------------------------------------------
@@ -219,8 +234,9 @@ unblockOne_ (Capability *cap, StgTSO *tso,
 
   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+  ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+         tso->block_info.closure->header.info == &stg_IND_info);
 
-  tso->why_blocked = NotBlocked;
   next = tso->_link;
   tso->_link = END_TSO_QUEUE;
 
@@ -231,11 +247,13 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       // We are waking up this thread on the current Capability, which
       // might involve migrating it from the Capability it was last on.
       if (tso->bound) {
-         ASSERT(tso->bound->cap == tso->cap);
-         tso->bound->cap = cap;
+         ASSERT(tso->bound->task->cap == tso->cap);
+         tso->bound->task->cap = cap;
       }
 
       tso->cap = cap;
+      write_barrier();
+      tso->why_blocked = NotBlocked;
       appendToRunQueue(cap,tso);
 
       // context-switch soonish so we can migrate the new thread if
@@ -247,6 +265,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       wakeupThreadOnCapability(cap, tso->cap, tso);
   }
 #else
+  tso->why_blocked = NotBlocked;
   appendToRunQueue(cap,tso);
 
   // context-switch soonish so we can migrate the new thread if
@@ -255,14 +274,43 @@ unblockOne_ (Capability *cap, StgTSO *tso,
   cap->context_switch = 1;
 #endif
 
-  postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
-
-  debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
-            (long)tso->id, tso->cap->no);
+  traceEventThreadWakeup (cap, tso, tso->cap->no);
 
   return next;
 }
 
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
+{
+#ifdef THREADED_RTS
+    if (tso->cap != cap)
+    {
+        MessageWakeup *msg;
+        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+        msg->tso = tso;
+        sendMessage(cap, tso->cap, (Message*)msg);
+        return;
+    }
+#endif
+
+    switch (tso->why_blocked)
+    {
+    case BlockedOnBlackHole:
+    case BlockedOnSTM:
+    {
+        // just run the thread now, if the BH is not really available,
+        // we'll block again.
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+        break;
+    }
+    default:
+        // otherwise, do nothing
+        break;
+    }
+}
+
 /* ----------------------------------------------------------------------------
    awakenBlockedQueue
 
@@ -270,13 +318,160 @@ unblockOne_ (Capability *cap, StgTSO *tso,
    ------------------------------------------------------------------------- */
 
 void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
+{
+    MessageBlackHole *msg;
+    const StgInfoTable *i;
+
+    ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
+           bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
+
+    for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
+         msg = msg->link) {
+        i = msg->header.info;
+        if (i != &stg_IND_info) {
+            ASSERT(i == &stg_MSG_BLACKHOLE_info);
+            tryWakeupThread(cap,msg->tso);
+        }
+    }
+
+    // overwrite the BQ with an indirection so it will be
+    // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+    // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+    // another thread might be looking at this BLOCKING_QUEUE and
+    // checking the owner field at the same time.
+    bq->bh = 0; bq->queue = 0; bq->owner = 0;
+#endif
+    OVERWRITE_INFO(bq, &stg_IND_info);
+}
+
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it.  We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+    StgBlockingQueue *bq, *next;
+    StgClosure *p;
+
+    debugTraceCap(DEBUG_sched, cap,
+                  "collision occurred; checking blocking queues for thread %ld",
+                  (lnat)tso->id);
+    
+    for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+        next = bq->link;
+
+        if (bq->header.info == &stg_IND_info) {
+            // ToDo: could short it out right here, to avoid
+            // traversing this IND multiple times.
+            continue;
+        }
+        
+        p = bq->bh;
+
+        if (p->header.info != &stg_BLACKHOLE_info ||
+            ((StgInd *)p)->indirectee != (StgClosure*)bq)
+        {
+            wakeBlockingQueue(cap,bq);
+        }   
+    }
+}
+
+/* ----------------------------------------------------------------------------
+   updateThunk
+
+   Update a thunk with a value.  In order to do this, we need to know
+   which TSO owns (or is evaluating) the thunk, in case we need to
+   awaken any threads that are blocked on it.
+   ------------------------------------------------------------------------- */
+
+void
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
 {
-    while (tso != END_TSO_QUEUE) {
-       tso = unblockOne(cap,tso);
+    StgClosure *v;
+    StgTSO *owner;
+    const StgInfoTable *i;
+
+    i = thunk->header.info;
+    if (i != &stg_BLACKHOLE_info &&
+        i != &stg_CAF_BLACKHOLE_info &&
+        i != &stg_WHITEHOLE_info) {
+        updateWithIndirection(cap, thunk, val);
+        return;
+    }
+    
+    v = ((StgInd*)thunk)->indirectee;
+
+    updateWithIndirection(cap, thunk, val);
+
+    i = v->header.info;
+    if (i == &stg_TSO_info) {
+        owner = deRefTSO((StgTSO*)v);
+        if (owner != tso) {
+            checkBlockingQueues(cap, tso);
+        }
+        return;
+    }
+
+    if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+        i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+        checkBlockingQueues(cap, tso);
+        return;
+    }
+
+    owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+
+    if (owner != tso) {
+        checkBlockingQueues(cap, tso);
+    } else {
+        wakeBlockingQueue(cap, (StgBlockingQueue*)v);
     }
 }
 
+/* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+wakeupThreadOnCapability (Capability *cap,
+                          Capability *other_cap, 
+                          StgTSO *tso)
+{
+    MessageWakeup *msg;
+
+    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+    if (tso->bound) {
+       ASSERT(tso->bound->task->cap == tso->cap);
+       tso->bound->task->cap = other_cap;
+    }
+    tso->cap = other_cap;
+
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
+
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
+
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
+
+    sendMessage(cap, other_cap, (Message*)msg);
+}
+
+#endif /* THREADED_RTS */
+
 /* ---------------------------------------------------------------------------
  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
  * used by Control.Concurrent for error checking.
@@ -331,12 +526,15 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
-  case BlockedOnException:
-    debugBelch("is blocked on delivering an exception to thread %lu",
-              (unsigned long)tso->block_info.tso->id);
-    break;
   case BlockedOnBlackHole:
-    debugBelch("is blocked on a black hole");
+      debugBelch("is blocked on a black hole %p", 
+                 ((StgBlockingQueue*)tso->block_info.bh->bh));
+    break;
+  case BlockedOnMsgWakeup:
+    debugBelch("is blocked on a wakeup message");
+    break;
+  case BlockedOnMsgThrowTo:
+    debugBelch("is blocked on a throwto message");
     break;
   case NotBlocked:
     debugBelch("is not blocked");
@@ -356,6 +554,7 @@ printThreadBlockage(StgTSO *tso)
   }
 }
 
+
 void
 printThreadStatus(StgTSO *t)
 {
@@ -377,7 +576,7 @@ printThreadStatus(StgTSO *t)
        default:
            printThreadBlockage(t);
        }
-        if (t->flags & TSO_DIRTY) {
+        if (t->dirty) {
             debugBelch(" (TSO_DIRTY)");
         } else if (t->flags & TSO_LINK_DIRTY) {
             debugBelch(" (TSO_LINK_DIRTY)");
@@ -390,24 +589,10 @@ void
 printAllThreads(void)
 {
   StgTSO *t, *next;
-  nat i, s;
+  nat i, g;
   Capability *cap;
 
-# if defined(GRAN)
-  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
-  ullong_format_string(TIME_ON_PROC(CurrentProc), 
-                      time_string, rtsFalse/*no commas!*/);
-
-  debugBelch("all threads at [%s]:\n", time_string);
-# elif defined(PARALLEL_HASKELL)
-  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
-  ullong_format_string(CURRENT_TIME,
-                      time_string, rtsFalse/*no commas!*/);
-
-  debugBelch("all threads at [%s]:\n", time_string);
-# else
   debugBelch("all threads:\n");
-# endif
 
   for (i = 0; i < n_capabilities; i++) {
       cap = &capabilities[i];
@@ -418,8 +603,8 @@ printAllThreads(void)
   }
 
   debugBelch("other threads:\n");
-  for (s = 0; s < total_steps; s++) {
-    for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
+  for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+    for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
       if (t->why_blocked != NotBlocked) {
          printThreadStatus(t);
       }