update submodules for GHC.HetMet.GArrow -> Control.GArrow renaming
[ghc-hetmet.git] / rts / Threads.c
index 05a13c7..3e1c5cf 100644 (file)
 #include "ThreadLabels.h"
 #include "Updates.h"
 #include "Messages.h"
+#include "RaiseAsync.h"
+#include "Prelude.h"
+#include "Printer.h"
+#include "sm/Sanity.h"
 #include "sm/Storage.h"
 
+#include <string.h>
+
 /* Next thread ID to allocate.
  * LOCK: sched_mutex
  */
@@ -54,57 +60,67 @@ StgTSO *
 createThread(Capability *cap, nat size)
 {
     StgTSO *tso;
+    StgStack *stack;
     nat stack_size;
 
     /* sched_mutex is *not* required */
 
-    /* First check whether we should create a thread at all */
-
-    // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
-
     /* catch ridiculously small stack sizes */
-    if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
-       size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+    if (size < MIN_STACK_WORDS + sizeofW(StgStack)) {
+        size = MIN_STACK_WORDS + sizeofW(StgStack);
     }
 
-    size = round_to_mblocks(size);
-    tso = (StgTSO *)allocate(cap, size);
-
-    stack_size = size - TSO_STRUCT_SIZEW;
-    TICK_ALLOC_TSO(stack_size, 0);
-
+    /* The size argument we are given includes all the per-thread
+     * overheads:
+     *
+     *    - The TSO structure
+     *    - The STACK header
+     *
+     * This is so that we can use a nice round power of 2 for the
+     * default stack size (e.g. 1k), and if we're allocating lots of
+     * threads back-to-back they'll fit nicely in a block.  It's a bit
+     * of a benchmark hack, but it doesn't do any harm.
+     */
+    stack_size = round_to_mblocks(size - sizeofW(StgTSO));
+    stack = (StgStack *)allocate(cap, stack_size);
+    TICK_ALLOC_STACK(stack_size);
+    SET_HDR(stack, &stg_STACK_info, CCS_SYSTEM);
+    stack->stack_size   = stack_size - sizeofW(StgStack);
+    stack->sp           = stack->stack + stack->stack_size;
+    stack->dirty        = 1;
+
+    tso = (StgTSO *)allocate(cap, sizeofW(StgTSO));
+    TICK_ALLOC_TSO();
     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
 
     // Always start with the compiled code evaluator
     tso->what_next = ThreadRunGHC;
-
     tso->why_blocked  = NotBlocked;
     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
     tso->flags = 0;
     tso->dirty = 1;
-    
+    tso->_link = END_TSO_QUEUE;
+
     tso->saved_errno = 0;
     tso->bound = NULL;
     tso->cap = cap;
     
-    tso->stack_size     = stack_size;
-    tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
-                         - TSO_STRUCT_SIZEW;
-    tso->sp             = (P_)&(tso->stack) + stack_size;
+    tso->stackobj       = stack;
+    tso->tot_stack_size = stack->stack_size;
 
     tso->trec = NO_TREC;
-    
+
 #ifdef PROFILING
     tso->prof.CCCS = CCS_MAIN;
 #endif
     
-  /* put a stop frame on the stack */
-    tso->sp -= sizeofW(StgStopFrame);
-    SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
-    tso->_link = END_TSO_QUEUE;
-    
+    // put a stop frame on the stack
+    stack->sp -= sizeofW(StgStopFrame);
+    SET_HDR((StgClosure*)stack->sp,
+            (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+
     /* Link the new thread on the global thread list.
      */
     ACQUIRE_LOCK(&sched_mutex);
@@ -163,9 +179,11 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
+                t->_link = END_TSO_QUEUE;
                 return rtsFalse;
            } else {
                *queue = t->_link;
+                t->_link = END_TSO_QUEUE;
                 return rtsTrue;
            }
        }
@@ -190,7 +208,8 @@ removeThreadFromDeQueue (Capability *cap,
                *head = t->_link;
                 flag = rtsTrue;
            }
-           if (*tail == tso) {
+            t->_link = END_TSO_QUEUE;
+            if (*tail == tso) {
                if (prev) {
                    *tail = prev;
                } else {
@@ -205,83 +224,20 @@ removeThreadFromDeQueue (Capability *cap,
     barf("removeThreadFromMVarQueue: not found");
 }
 
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
-    // caller must do the write barrier, because replacing the info
-    // pointer will unlock the MVar.
-    removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
-    tso->_link = END_TSO_QUEUE;
-}
-
 /* ----------------------------------------------------------------------------
-   unblockOne()
-
-   unblock a single thread.
-   ------------------------------------------------------------------------- */
-
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
-    return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
-
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso, 
-            rtsBool allow_migrate USED_IF_THREADS)
-{
-  StgTSO *next;
-
-  // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
-  ASSERT(tso->why_blocked != NotBlocked);
-  ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-         tso->block_info.closure->header.info == &stg_IND_info);
+   tryWakeupThread()
 
-  next = tso->_link;
-  tso->_link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
-  if (tso->cap == cap || (!tsoLocked(tso) && 
-                         allow_migrate && 
-                         RtsFlags.ParFlags.wakeupMigrate)) {
-      // We are waking up this thread on the current Capability, which
-      // might involve migrating it from the Capability it was last on.
-      if (tso->bound) {
-         ASSERT(tso->bound->task->cap == tso->cap);
-         tso->bound->task->cap = cap;
-      }
-
-      tso->cap = cap;
-      write_barrier();
-      tso->why_blocked = NotBlocked;
-      appendToRunQueue(cap,tso);
-
-      // context-switch soonish so we can migrate the new thread if
-      // necessary.  NB. not contextSwitchCapability(cap), which would
-      // force a context switch immediately.
-      cap->context_switch = 1;
-  } else {
-      // we'll try to wake it up on the Capability it was last on.
-      wakeupThreadOnCapability(cap, tso->cap, tso);
-  }
-#else
-  tso->why_blocked = NotBlocked;
-  appendToRunQueue(cap,tso);
+   Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
+   always safe to call it too many times, but it is not safe in
+   general to omit a call.
 
-  // context-switch soonish so we can migrate the new thread if
-  // necessary.  NB. not contextSwitchCapability(cap), which would
-  // force a context switch immediately.
-  cap->context_switch = 1;
-#endif
-
-  traceEventThreadWakeup (cap, tso, tso->cap->no);
-
-  return next;
-}
+   ------------------------------------------------------------------------- */
 
 void
 tryWakeupThread (Capability *cap, StgTSO *tso)
 {
+    traceEventThreadWakeup (cap, tso, tso->cap->no);
+
 #ifdef THREADED_RTS
     if (tso->cap != cap)
     {
@@ -298,6 +254,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
 
     switch (tso->why_blocked)
     {
+    case BlockedOnMVar:
+    {
+        if (tso->_link == END_TSO_QUEUE) {
+            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+            goto unblock;
+        } else {
+            return;
+        }
+    }
+
     case BlockedOnMsgThrowTo:
     {
         const StgInfoTable *i;
@@ -307,27 +273,58 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
         if (i != &stg_MSG_NULL_info) {
             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                           (lnat)tso->id, tso->block_info.throwto->header.info);
-            break; // still blocked
+            return;
         }
 
         // remove the block frame from the stack
-        ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
-        tso->sp += 3;
-        // fall through...
+        ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
+        tso->stackobj->sp += 3;
+        goto unblock;
     }
+
     case BlockedOnBlackHole:
     case BlockedOnSTM:
-    {
-        // just run the thread now, if the BH is not really available,
-        // we'll block again.
-        tso->why_blocked = NotBlocked;
-        appendToRunQueue(cap,tso);
-        break;
-    }
+    case ThreadMigrating:
+        goto unblock;
+
     default:
         // otherwise, do nothing
-        break;
+        return;
     }
+
+unblock:
+    // just run the thread now, if the BH is not really available,
+    // we'll block again.
+    tso->why_blocked = NotBlocked;
+    appendToRunQueue(cap,tso);
+
+    // We used to set the context switch flag here, which would
+    // trigger a context switch a short time in the future (at the end
+    // of the current nursery block).  The idea is that we have just
+    // woken up a thread, so we may need to load-balance and migrate
+    // threads to other CPUs.  On the other hand, setting the context
+    // switch flag here unfairly penalises the current thread by
+    // yielding its time slice too early.
+    //
+    // The synthetic benchmark nofib/smp/chan can be used to show the
+    // difference quite clearly.
+
+    // cap->context_switch = 1;
+}
+
+/* ----------------------------------------------------------------------------
+   migrateThread
+   ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+    traceEventMigrateThread (from, tso, to->no);
+    // ThreadMigrating tells the target cap that it needs to be added to
+    // the run queue when it receives the MSG_TRY_WAKEUP.
+    tso->why_blocked = ThreadMigrating;
+    tso->cap = to;
+    tryWakeupThread(from, tso);
 }
 
 /* ----------------------------------------------------------------------------
@@ -417,6 +414,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
     i = thunk->header.info;
     if (i != &stg_BLACKHOLE_info &&
         i != &stg_CAF_BLACKHOLE_info &&
+        i != &__stg_EAGER_BLACKHOLE_info &&
         i != &stg_WHITEHOLE_info) {
         updateWithIndirection(cap, thunk, val);
         return;
@@ -428,7 +426,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
 
     i = v->header.info;
     if (i == &stg_TSO_info) {
-        owner = deRefTSO((StgTSO*)v);
+        owner = (StgTSO*)v;
         if (owner != tso) {
             checkBlockingQueues(cap, tso);
         }
@@ -441,7 +439,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
         return;
     }
 
-    owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+    owner = ((StgBlockingQueue*)v)->owner;
 
     if (owner != tso) {
         checkBlockingQueues(cap, tso);
@@ -450,47 +448,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
     }
 }
 
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
-                          Capability *other_cap, 
-                          StgTSO *tso)
-{
-    MessageWakeup *msg;
-
-    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
-    if (tso->bound) {
-       ASSERT(tso->bound->task->cap == tso->cap);
-       tso->bound->task->cap = other_cap;
-    }
-    tso->cap = other_cap;
-
-    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-           tso->block_info.closure->header.info == &stg_IND_info);
-
-    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
-    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
-    SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
-    msg->tso = tso;
-    tso->block_info.closure = (StgClosure *)msg;
-    dirty_TSO(cap, tso);
-    write_barrier();
-    tso->why_blocked = BlockedOnMsgWakeup;
-
-    sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
 /* ---------------------------------------------------------------------------
  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
  * used by Control.Concurrent for error checking.
@@ -519,6 +476,222 @@ isThreadBound(StgTSO* tso USED_IF_THREADS)
   return rtsFalse;
 }
 
+/* -----------------------------------------------------------------------------
+   Stack overflow
+
+   If the thread has reached its maximum stack size, then raise the
+   StackOverflow exception in the offending thread.  Otherwise
+   relocate the TSO into a larger chunk of memory and adjust its stack
+   size appropriately.
+   -------------------------------------------------------------------------- */
+
+void
+threadStackOverflow (Capability *cap, StgTSO *tso)
+{
+    StgStack *new_stack, *old_stack;
+    StgUnderflowFrame *frame;
+    lnat chunk_size;
+
+    IF_DEBUG(sanity,checkTSO(tso));
+
+    if (tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize
+        && !(tso->flags & TSO_BLOCKEX)) {
+        // NB. never raise a StackOverflow exception if the thread is
+        // inside Control.Exceptino.block.  It is impractical to protect
+        // against stack overflow exceptions, since virtually anything
+        // can raise one (even 'catch'), so this is the only sensible
+        // thing to do here.  See bug #767.
+        //
+
+        if (tso->flags & TSO_SQUEEZED) {
+            return;
+        }
+        // #3677: In a stack overflow situation, stack squeezing may
+        // reduce the stack size, but we don't know whether it has been
+        // reduced enough for the stack check to succeed if we try
+        // again.  Fortunately stack squeezing is idempotent, so all we
+        // need to do is record whether *any* squeezing happened.  If we
+        // are at the stack's absolute -K limit, and stack squeezing
+        // happened, then we try running the thread again.  The
+        // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
+        // squeezing happened or not.
+
+        debugTrace(DEBUG_gc,
+                   "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
+                   (long)tso->id, tso, (long)tso->stackobj->stack_size,
+                   RtsFlags.GcFlags.maxStkSize);
+        IF_DEBUG(gc,
+                 /* If we're debugging, just print out the top of the stack */
+                 printStackChunk(tso->stackobj->sp,
+                                 stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
+                                         tso->stackobj->sp+64)));
+
+        // Send this thread the StackOverflow exception
+        throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
+    }
+
+
+    // We also want to avoid enlarging the stack if squeezing has
+    // already released some of it.  However, we don't want to get into
+    // a pathalogical situation where a thread has a nearly full stack
+    // (near its current limit, but not near the absolute -K limit),
+    // keeps allocating a little bit, squeezing removes a little bit,
+    // and then it runs again.  So to avoid this, if we squeezed *and*
+    // there is still less than BLOCK_SIZE_W words free, then we enlarge
+    // the stack anyway.
+    if ((tso->flags & TSO_SQUEEZED) && 
+        ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) {
+        return;
+    }
+
+    old_stack = tso->stackobj;
+
+    // If we used less than half of the previous stack chunk, then we
+    // must have failed a stack check for a large amount of stack.  In
+    // this case we allocate a double-sized chunk to try to
+    // accommodate the large stack request.  If that also fails, the
+    // next chunk will be 4x normal size, and so on.
+    //
+    // It would be better to have the mutator tell us how much stack
+    // was needed, as we do with heap allocations, but this works for
+    // now.
+    //
+    if (old_stack->sp > old_stack->stack + old_stack->stack_size / 2)
+    {
+        chunk_size = 2 * (old_stack->stack_size + sizeofW(StgStack));
+    }
+    else
+    {
+        chunk_size = RtsFlags.GcFlags.stkChunkSize;
+    }
+
+    debugTraceCap(DEBUG_sched, cap,
+                  "allocating new stack chunk of size %d bytes",
+                  chunk_size * sizeof(W_));
+
+    new_stack = (StgStack*) allocate(cap, chunk_size);
+    SET_HDR(new_stack, &stg_STACK_info, CCS_SYSTEM);
+    TICK_ALLOC_STACK(chunk_size);
+
+    new_stack->dirty = 0; // begin clean, we'll mark it dirty below
+    new_stack->stack_size = chunk_size - sizeofW(StgStack);
+    new_stack->sp = new_stack->stack + new_stack->stack_size;
+
+    tso->tot_stack_size += new_stack->stack_size;
+
+    new_stack->sp -= sizeofW(StgUnderflowFrame);
+    frame = (StgUnderflowFrame*)new_stack->sp;
+    frame->info = &stg_stack_underflow_frame_info;
+    frame->next_chunk  = old_stack;
+
+    {
+        StgWord *sp;
+        nat chunk_words, size;
+
+        // find the boundary of the chunk of old stack we're going to
+        // copy to the new stack.  We skip over stack frames until we
+        // reach the smaller of
+        //
+        //   * the chunk buffer size (+RTS -kb)
+        //   * the end of the old stack
+        //
+        for (sp = old_stack->sp;
+             sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize,
+                          old_stack->stack + old_stack->stack_size); )
+        {
+            size = stack_frame_sizeW((StgClosure*)sp);
+
+            // if including this frame would exceed the size of the
+            // new stack (taking into account the underflow frame),
+            // then stop at the previous frame.
+            if (sp + size > old_stack->stack + (new_stack->stack_size -
+                                                sizeofW(StgUnderflowFrame))) {
+                break;
+            }
+            sp += size;
+        }
+
+        // copy the stack chunk between tso->sp and sp to
+        //   new_tso->sp + (tso->sp - sp)
+        chunk_words = sp - old_stack->sp;
+
+        memcpy(/* dest   */ new_stack->sp - chunk_words,
+               /* source */ old_stack->sp,
+               /* size   */ chunk_words * sizeof(W_));
+
+        old_stack->sp += chunk_words;
+        new_stack->sp -= chunk_words;
+    }
+
+    // if the old stack chunk is now empty, discard it.  With the
+    // default settings, -ki1k -kb1k, this means the first stack chunk
+    // will be discarded after the first overflow, being replaced by a
+    // non-moving 32k chunk.
+    if (old_stack->sp == old_stack->stack + old_stack->stack_size) {
+        frame->next_chunk = (StgStack*)END_TSO_QUEUE; // dummy
+    }
+
+    tso->stackobj = new_stack;
+
+    // we're about to run it, better mark it dirty
+    dirty_STACK(cap, new_stack);
+
+    IF_DEBUG(sanity,checkTSO(tso));
+    // IF_DEBUG(scheduler,printTSO(new_tso));
+}
+
+
+/* ---------------------------------------------------------------------------
+   Stack underflow - called from the stg_stack_underflow_info frame
+   ------------------------------------------------------------------------ */
+
+nat // returns offset to the return address
+threadStackUnderflow (Capability *cap, StgTSO *tso)
+{
+    StgStack *new_stack, *old_stack;
+    StgUnderflowFrame *frame;
+    nat retvals;
+
+    debugTraceCap(DEBUG_sched, cap, "stack underflow");
+
+    old_stack = tso->stackobj;
+
+    frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size
+                                 - sizeofW(StgUnderflowFrame));
+    ASSERT(frame->info == &stg_stack_underflow_frame_info);
+
+    new_stack = (StgStack*)frame->next_chunk;
+    tso->stackobj = new_stack;
+
+    retvals = (P_)frame - old_stack->sp;
+    if (retvals != 0)
+    {
+        // we have some return values to copy to the old stack
+        if ((nat)(new_stack->sp - new_stack->stack) < retvals)
+        {
+            barf("threadStackUnderflow: not enough space for return values");
+        }
+
+        new_stack->sp -= retvals;
+
+        memcpy(/* dest */ new_stack->sp,
+               /* src  */ old_stack->sp,
+               /* size */ retvals * sizeof(W_));
+    }
+
+    // empty the old stack.  The GC may still visit this object
+    // because it is on the mutable list.
+    old_stack->sp = old_stack->stack + old_stack->stack_size;
+
+    // restore the stack parameters, and update tot_stack_size
+    tso->tot_stack_size -= old_stack->stack_size;
+
+    // we're about to run it, better mark it dirty
+    dirty_STACK(cap, new_stack);
+
+    return retvals;
+}
+
 /* ----------------------------------------------------------------------------
  * Debugging: why is a thread blocked
  * ------------------------------------------------------------------------- */
@@ -549,20 +722,20 @@ printThreadBlockage(StgTSO *tso)
       debugBelch("is blocked on a black hole %p", 
                  ((StgBlockingQueue*)tso->block_info.bh->bh));
     break;
-  case BlockedOnMsgWakeup:
-    debugBelch("is blocked on a wakeup message");
-    break;
   case BlockedOnMsgThrowTo:
     debugBelch("is blocked on a throwto message");
     break;
   case NotBlocked:
     debugBelch("is not blocked");
     break;
+  case ThreadMigrating:
+    debugBelch("is runnable, but not on the run queue");
+    break;
   case BlockedOnCCall:
     debugBelch("is blocked on an external call");
     break;
-  case BlockedOnCCall_NoUnblockExc:
-    debugBelch("is blocked on an external call (exceptions were already blocked)");
+  case BlockedOnCCall_Interruptible:
+    debugBelch("is blocked on an external call (but may be interrupted)");
     break;
   case BlockedOnSTM:
     debugBelch("is blocked on an STM operation");
@@ -582,10 +755,7 @@ printThreadStatus(StgTSO *t)
       void *label = lookupThreadLabel(t->id);
       if (label) debugBelch("[\"%s\"] ",(char *)label);
     }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-    } else {
-       switch (t->what_next) {
+        switch (t->what_next) {
        case ThreadKilled:
            debugBelch("has been killed");
            break;
@@ -597,11 +767,8 @@ printThreadStatus(StgTSO *t)
        }
         if (t->dirty) {
             debugBelch(" (TSO_DIRTY)");
-        } else if (t->flags & TSO_LINK_DIRTY) {
-            debugBelch(" (TSO_LINK_DIRTY)");
         }
        debugBelch("\n");
-    }
 }
 
 void
@@ -627,11 +794,7 @@ printAllThreads(void)
       if (t->why_blocked != NotBlocked) {
          printThreadStatus(t);
       }
-      if (t->what_next == ThreadRelocated) {
-         next = t->_link;
-      } else {
-         next = t->global_link;
-      }
+      next = t->global_link;
     }
   }
 }