Make allocatePinned use local storage, and other refactorings
[ghc-hetmet.git] / rts / Schedule.c
index fb63796..3ae1fe0 100644 (file)
@@ -26,7 +26,6 @@
 #include "Proftimer.h"
 #include "ProfHeap.h"
 #include "Weak.h"
-#include "eventlog/EventLog.h"
 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
 #include "Sparks.h"
 #include "Capability.h"
@@ -172,17 +171,6 @@ static void deleteAllThreads (Capability *cap);
 static void deleteThread_(Capability *cap, StgTSO *tso);
 #endif
 
-#ifdef DEBUG
-static char *whatNext_strs[] = {
-  [0]               = "(unknown)",
-  [ThreadRunGHC]    = "ThreadRunGHC",
-  [ThreadInterpret] = "ThreadInterpret",
-  [ThreadKilled]    = "ThreadKilled",
-  [ThreadRelocated] = "ThreadRelocated",
-  [ThreadComplete]  = "ThreadComplete"
-};
-#endif
-
 /* -----------------------------------------------------------------------------
  * Putting a thread on the run queue: different scheduling policies
  * -------------------------------------------------------------------------- */
@@ -249,9 +237,7 @@ schedule (Capability *initialCapability, Task *task)
   // The sched_mutex is *NOT* held
   // NB. on return, we still hold a capability.
 
-  debugTrace (DEBUG_sched, 
-             "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
-             task, initialCapability);
+  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
 
   schedulePreLoop();
 
@@ -396,12 +382,11 @@ schedule (Capability *initialCapability, Task *task)
       
        if (bound) {
            if (bound == task) {
-               debugTrace(DEBUG_sched,
-                          "### Running thread %lu in bound thread", (unsigned long)t->id);
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
-                          "### thread %lu bound to another OS thread", (unsigned long)t->id);
+                          "thread %lu bound to another OS thread",
+                           (unsigned long)t->id);
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
@@ -410,7 +395,8 @@ schedule (Capability *initialCapability, Task *task)
            // The thread we want to run is unbound.
            if (task->tso) { 
                debugTrace(DEBUG_sched,
-                          "### this OS thread cannot run thread %lu", (unsigned long)t->id);
+                          "this OS thread cannot run thread %lu",
+                           (unsigned long)t->id);
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
@@ -445,9 +431,6 @@ run_thread:
     // that.
     cap->r.rCurrentTSO = t;
 
-    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
-                             (long)t->id, whatNext_strs[t->what_next]);
-
     startHeapProfTimer();
 
     // Check for exceptions blocked on this thread
@@ -485,7 +468,7 @@ run_thread:
     }
 #endif
 
-    postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+    traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
 
     switch (prev_what_next) {
        
@@ -535,7 +518,7 @@ run_thread:
     t->saved_winerror = GetLastError();
 #endif
 
-    postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+    traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
 
 #if defined(THREADED_RTS)
     // If ret is ThreadBlocked, and this Task is bound to the TSO that
@@ -545,9 +528,6 @@ run_thread:
     // that task->cap != cap.  We better yield this Capability
     // immediately and return to normaility.
     if (ret == ThreadBlocked) {
-       debugTrace(DEBUG_sched,
-                  "--<< thread %lu (%s) stopped: blocked",
-                  (unsigned long)t->id, whatNext_strs[t->what_next]);
         force_yield = rtsTrue;
         goto yield;
     }
@@ -798,7 +778,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
                    appendToRunQueue(free_caps[i],t);
 
-        postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+                    traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
 
                    if (t->bound) { t->bound->cap = free_caps[i]; }
                    t->cap = free_caps[i];
@@ -822,7 +802,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    if (spark != NULL) {
                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
 
-      postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
+      traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
 
                        newSpark(&(free_caps[i]->r), spark);
                    }
@@ -1082,7 +1062,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t)
             // partially-evaluated thunks on the heap.
             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
             
-            ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+//            ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
         }
     }
 
@@ -1106,7 +1086,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
        
        debugTrace(DEBUG_sched,
                   "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
-                  (long)t->id, whatNext_strs[t->what_next], blocks);
+                  (long)t->id, what_next_strs[t->what_next], blocks);
     
        // don't do this if the nursery is (nearly) full, we'll GC first.
        if (cap->r.rCurrentNursery->link != NULL ||
@@ -1124,10 +1104,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            if (cap->r.rCurrentNursery->u.back != NULL) {
                cap->r.rCurrentNursery->u.back->link = bd;
            } else {
-#if !defined(THREADED_RTS)
-               ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
-                      g0s0 == cap->r.rNursery);
-#endif
                cap->r.rNursery->blocks = bd;
            }             
            cap->r.rCurrentNursery->u.back = bd;
@@ -1142,8 +1118,8 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            { 
                bdescr *x;
                for (x = bd; x < bd + blocks; x++) {
-                   x->step = cap->r.rNursery;
-                   x->gen_no = 0;
+                    initBdescr(x,cap->r.rNursery);
+                    x->free = x->start;
                    x->flags = 0;
                }
            }
@@ -1164,10 +1140,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
        }
     }
     
-    debugTrace(DEBUG_sched,
-              "--<< thread %ld (%s) stopped: HeapOverflow",
-              (long)t->id, whatNext_strs[t->what_next]);
-
     if (cap->r.rHpLim == NULL || cap->context_switch) {
         // Sometimes we miss a context switch, e.g. when calling
         // primitives in a tight loop, MAYBE_GC() doesn't check the
@@ -1189,10 +1161,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
 static void
 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
 {
-    debugTrace (DEBUG_sched,
-               "--<< thread %ld (%s) stopped, StackOverflow", 
-               (long)t->id, whatNext_strs[t->what_next]);
-
     /* just adjust the stack for this thread, then pop it back
      * on the run queue.
      */
@@ -1234,11 +1202,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
     if (t->what_next != prev_what_next) {
        debugTrace(DEBUG_sched,
                   "--<< thread %ld (%s) stopped to switch evaluators", 
-                  (long)t->id, whatNext_strs[t->what_next]);
-    } else {
-       debugTrace(DEBUG_sched,
-                  "--<< thread %ld (%s) stopped, yielding",
-                  (long)t->id, whatNext_strs[t->what_next]);
+                  (long)t->id, what_next_strs[t->what_next]);
     }
 #endif
     
@@ -1285,12 +1249,7 @@ scheduleHandleThreadBlocked( StgTSO *t
     //      exception, see maybePerformBlockedException().
 
 #ifdef DEBUG
-    if (traceClass(DEBUG_sched)) {
-       debugTraceBegin("--<< thread %lu (%s) stopped: ", 
-                       (unsigned long)t->id, whatNext_strs[t->what_next]);
-       printThreadBlockage(t);
-       debugTraceEnd();
-    }
+    traceThreadStatus(DEBUG_sched, t);
 #endif
 }
 
@@ -1307,8 +1266,6 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
      * We also end up here if the thread kills itself with an
      * uncaught exception, see Exception.cmm.
      */
-    debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
-              (unsigned long)t->id, whatNext_strs[t->what_next]);
 
     // blocked exceptions can now complete, even if the thread was in
     // blocked mode (see #2910).  This unconditionally calls
@@ -1460,7 +1417,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     
     if (gc_type == PENDING_GC_SEQ)
     {
-        postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+        traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+    }
+    else
+    {
+        traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
+        debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
+    }
+
+    // do this while the other Capabilities stop:
+    if (cap) scheduleCheckBlackHoles(cap);
+
+    if (gc_type == PENDING_GC_SEQ)
+    {
         // single-threaded GC: grab all the capabilities
         for (i=0; i < n_capabilities; i++) {
             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
@@ -1483,16 +1452,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     {
         // multi-threaded GC: make sure all the Capabilities donate one
         // GC thread each.
-        postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
-        debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
-
         waitForGcThreads(cap);
     }
-#endif
 
-    // so this happens periodically:
+#else /* !THREADED_RTS */
+
+    // do this while the other Capabilities stop:
     if (cap) scheduleCheckBlackHoles(cap);
-    
+
+#endif
+
     IF_DEBUG(scheduler, printAllThreads());
 
 delete_threads_and_gc:
@@ -1509,8 +1478,7 @@ delete_threads_and_gc:
     heap_census = scheduleNeedHeapProfile(rtsTrue);
 
 #if defined(THREADED_RTS)
-    postEvent(cap, EVENT_GC_START, 0, 0);
-    debugTrace(DEBUG_sched, "doing GC");
+    traceSchedEvent(cap, EVENT_GC_START, 0, 0);
     // reset waiting_for_gc *before* GC, so that when the GC threads
     // emerge they don't immediately re-enter the GC.
     waiting_for_gc = 0;
@@ -1518,7 +1486,7 @@ delete_threads_and_gc:
 #else
     GarbageCollect(force_major || heap_census, 0, cap);
 #endif
-    postEvent(cap, EVENT_GC_END, 0, 0);
+    traceSchedEvent(cap, EVENT_GC_END, 0, 0);
 
     if (recent_activity == ACTIVITY_INACTIVE && force_major)
     {
@@ -1715,6 +1683,10 @@ forkProcess(HsStablePtr *entry
         initTimer();
         startTimer();
 
+#if defined(THREADED_RTS)
+        cap = ioManagerStartCap(cap);
+#endif
+
        cap = rts_evalStableIO(cap, entry, NULL);  // run the action
        rts_checkSchedStatus("forkProcess",cap);
        
@@ -1833,10 +1805,7 @@ suspendThread (StgRegTable *reg)
   task = cap->running_task;
   tso = cap->r.rCurrentTSO;
 
-  postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
-  debugTrace(DEBUG_sched, 
-            "thread %lu did a safe foreign call", 
-            (unsigned long)cap->r.rCurrentTSO->id);
+  traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL);
 
   // XXX this might not be necessary --SDM
   tso->what_next = ThreadRunGHC;
@@ -1862,13 +1831,6 @@ suspendThread (StgRegTable *reg)
   
   RELEASE_LOCK(&cap->lock);
 
-#if defined(THREADED_RTS)
-  /* Preparing to leave the RTS, so ensure there's a native thread/task
-     waiting to take over.
-  */
-  debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
-#endif
-
   errno = saved_errno;
 #if mingw32_HOST_OS
   SetLastError(saved_winerror);
@@ -1906,8 +1868,7 @@ resumeThread (void *task_)
     task->suspended_tso = NULL;
     tso->_link = END_TSO_QUEUE; // no write barrier reqd
 
-    postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
-    debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
+    traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next);
     
     if (tso->why_blocked == BlockedOnCCall) {
         // avoid locking the TSO if we don't have to
@@ -1963,7 +1924,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
-        postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
+        traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no);
        wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
     }
 #else
@@ -2139,9 +2100,10 @@ exitScheduler(
        for (i = 0; i < n_capabilities; i++) {
            shutdownCapability(&capabilities[i], task, wait_foreign);
        }
-       boundTaskExiting(task);
     }
 #endif
+
+    boundTaskExiting(task);
 }
 
 void
@@ -2228,12 +2190,27 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   // while we are moving the TSO:
   lockClosure((StgClosure *)tso);
 
-  if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
+  if (tso->stack_size >= tso->max_stack_size
+      && !(tso->flags & TSO_BLOCKEX)) {
       // NB. never raise a StackOverflow exception if the thread is
       // inside Control.Exceptino.block.  It is impractical to protect
       // against stack overflow exceptions, since virtually anything
       // can raise one (even 'catch'), so this is the only sensible
       // thing to do here.  See bug #767.
+      //
+
+      if (tso->flags & TSO_SQUEEZED) {
+          return tso;
+      }
+      // #3677: In a stack overflow situation, stack squeezing may
+      // reduce the stack size, but we don't know whether it has been
+      // reduced enough for the stack check to succeed if we try
+      // again.  Fortunately stack squeezing is idempotent, so all we
+      // need to do is record whether *any* squeezing happened.  If we
+      // are at the stack's absolute -K limit, and stack squeezing
+      // happened, then we try running the thread again.  The
+      // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
+      // squeezing happened or not.
 
       debugTrace(DEBUG_gc,
                 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
@@ -2249,6 +2226,21 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
       return tso;
   }
 
+
+  // We also want to avoid enlarging the stack if squeezing has
+  // already released some of it.  However, we don't want to get into
+  // a pathalogical situation where a thread has a nearly full stack
+  // (near its current limit, but not near the absolute -K limit),
+  // keeps allocating a little bit, squeezing removes a little bit,
+  // and then it runs again.  So to avoid this, if we squeezed *and*
+  // there is still less than BLOCK_SIZE_W words free, then we enlarge
+  // the stack anyway.
+  if ((tso->flags & TSO_SQUEEZED) && 
+      ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
+      unlockTSO(tso);
+      return tso;
+  }
+
   /* Try to double the current stack size.  If that takes us over the
    * maximum stack size for this thread, then use the maximum instead
    * (that is, unless we're already at or over the max size and we
@@ -2270,7 +2262,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
             "increasing stack size from %ld words to %d.",
             (long)tso->stack_size, new_stack_size);
 
-  dest = (StgTSO *)allocateLocal(cap,new_tso_size);
+  dest = (StgTSO *)allocate(cap,new_tso_size);
   TICK_ALLOC_TSO(new_stack_size,0);
 
   /* copy the TSO block and the old stack into the new area */
@@ -2541,7 +2533,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
            // Only create raise_closure if we need to.
            if (raise_closure == NULL) {
                raise_closure = 
-                   (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+                   (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
@@ -2619,7 +2611,7 @@ findRetryFrameHelper (StgTSO *tso)
       
     case CATCH_STM_FRAME: {
         StgTRecHeader *trec = tso -> trec;
-       StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+       StgTRecHeader *outer = trec -> enclosing_trec;
         debugTrace(DEBUG_stm,
                   "found CATCH_STM_FRAME at %p during retry", p);
         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
@@ -2671,10 +2663,9 @@ resurrectThreads (StgTSO *threads)
        
        switch (tso->why_blocked) {
        case BlockedOnMVar:
-       case BlockedOnException:
            /* Called by GC - sched_mutex lock is currently held. */
            throwToSingleThreaded(cap, tso,
-                                 (StgClosure *)blockedOnDeadMVar_closure);
+                                 (StgClosure *)blockedIndefinitelyOnMVar_closure);
            break;
        case BlockedOnBlackHole:
            throwToSingleThreaded(cap, tso,
@@ -2682,7 +2673,7 @@ resurrectThreads (StgTSO *threads)
            break;
        case BlockedOnSTM:
            throwToSingleThreaded(cap, tso,
-                                 (StgClosure *)blockedIndefinitely_closure);
+                                 (StgClosure *)blockedIndefinitelyOnSTM_closure);
            break;
        case NotBlocked:
            /* This might happen if the thread was blocked on a black hole
@@ -2690,6 +2681,11 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
+       case BlockedOnException:
+            // throwTo should never block indefinitely: if the target
+            // thread dies or completes, throwTo returns.
+           barf("resurrectThreads: thread BlockedOnException");
+            break;
        default:
            barf("resurrectThreads: thread blocked in a strange way");
        }
@@ -2714,8 +2710,12 @@ performPendingThrowTos (StgTSO *threads)
 {
     StgTSO *tso, *next;
     Capability *cap;
+    Task *task, *saved_task;;
     step *step;
 
+    task = myTask();
+    cap = task->cap;
+
     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
        next = tso->global_link;
 
@@ -2725,7 +2725,17 @@ performPendingThrowTos (StgTSO *threads)
 
        debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
        
-       cap = tso->cap;
-        maybePerformBlockedException(cap, tso);
-    }
+        // We must pretend this Capability belongs to the current Task
+        // for the time being, as invariants will be broken otherwise.
+        // In fact the current Task has exclusive access to the systme
+        // at this point, so this is just bookkeeping:
+       task->cap = tso->cap;
+        saved_task = tso->cap->running_task;
+        tso->cap->running_task = task;
+        maybePerformBlockedException(tso->cap, tso);
+        tso->cap->running_task = saved_task;
+    }
+
+    // Restore our original Capability:
+    task->cap = cap;
 }