[project @ 2004-08-19 11:27:45 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 44a156b..71c3ec9 100644 (file)
@@ -1,7 +1,6 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.193 2004/03/01 14:18:35 simonmar Exp $
  *
- * (c) The GHC Team, 1998-2003
+ * (c) The GHC Team, 1998-2004
  *
  * Scheduler
  *
@@ -42,9 +41,9 @@
 #include "SchedAPI.h"
 #include "RtsUtils.h"
 #include "RtsFlags.h"
+#include "BlockAlloc.h"
 #include "Storage.h"
 #include "StgRun.h"
-#include "StgStartup.h"
 #include "Hooks.h"
 #define COMPILING_SCHEDULER
 #include "Schedule.h"
@@ -59,6 +58,8 @@
 #include "Timer.h"
 #include "Prelude.h"
 #include "ThreadLabels.h"
+#include "LdvProfile.h"
+#include "Updates.h"
 #ifdef PROFILING
 #include "Proftimer.h"
 #include "ProfHeap.h"
@@ -234,6 +235,7 @@ rtsBool emitSchedule = rtsTrue;
 
 #if DEBUG
 static char *whatNext_strs[] = {
+  "(unknown)",
   "ThreadRunGHC",
   "ThreadInterpret",
   "ThreadKilled",
@@ -276,7 +278,10 @@ startSchedulerTaskIfNecessary(void)
       // just because the last one hasn't yet reached the
       // "waiting for capability" state
       startingWorkerThread = rtsTrue;
-      startTask(taskStart);
+      if(!startTask(taskStart))
+      {
+        startingWorkerThread = rtsFalse;
+      }
     }
   }
 }
@@ -337,7 +342,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 # endif
 #endif
   rtsBool was_interrupted = rtsFalse;
-  StgTSOWhatNext prev_what_next;
+  nat prev_what_next;
   
   // Pre-condition: sched_mutex is held.
   // We might have a capability, passed in as initialCapability.
@@ -876,8 +881,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
             || blocked_queue_hd != END_TSO_QUEUE
             || sleeping_queue != END_TSO_QUEUE)))
        context_switch = 1;
-    else
-       context_switch = 0;
 
 run_thread:
 
@@ -894,23 +897,35 @@ run_thread:
     /* Run the current thread 
      */
     prev_what_next = t->what_next;
+
+    errno = t->saved_errno;
+
     switch (prev_what_next) {
+
     case ThreadKilled:
     case ThreadComplete:
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;
+
     case ThreadRunGHC:
-       errno = t->saved_errno;
        ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
-       t->saved_errno = errno;
        break;
+
     case ThreadInterpret:
        ret = interpretBCO(cap);
        break;
+
     default:
       barf("schedule: invalid what_next field");
     }
+
+    // The TSO might have moved, so find the new location:
+    t = cap->r.rCurrentTSO;
+
+    // And save the current errno in this thread.
+    t->saved_errno = errno;
+
     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
     
     /* Costs for the scheduler are assigned to CCS_SYSTEM */
@@ -926,7 +941,6 @@ run_thread:
 #elif !defined(GRAN) && !defined(PAR)
     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
 #endif
-    t = cap->r.rCurrentTSO;
     
 #if defined(PAR)
     /* HACK 675: if the last thread didn't yield, make sure to print a 
@@ -946,12 +960,12 @@ run_thread:
 #endif
 
       // did the task ask for a large block?
-      if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
+      if (cap->r.rHpAlloc > BLOCK_SIZE) {
          // if so, get one and push it on the front of the nursery.
          bdescr *bd;
          nat blocks;
          
-         blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
+         blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
 
          IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
                                   t->id, whatNext_strs[t->what_next], blocks));
@@ -1062,12 +1076,19 @@ run_thread:
        if (t->main != NULL) {
            t->main->tso = new_t;
        }
-       threadPaused(new_t);
        PUSH_ON_RUN_QUEUE(new_t);
       }
       break;
 
     case ThreadYielding:
+      // Reset the context switch flag.  We don't do this just before
+      // running the thread, because that would mean we would lose ticks
+      // during GC, which can lead to unfair scheduling (a thread hogs
+      // the CPU because the tick always arrives during GC).  This way
+      // penalises threads that do a lot of allocation, but that seems
+      // better than the alternative.
+      context_switch = 0;
+
 #if defined(GRAN)
       IF_DEBUG(gran, 
               DumpGranEvent(GR_DESCHEDULE, t));
@@ -1380,13 +1401,22 @@ isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
  * Singleton fork(). Do not copy any running threads.
  * ------------------------------------------------------------------------- */
 
+#ifndef mingw32_TARGET_OS
+#define FORKPROCESS_PRIMOP_SUPPORTED
+#endif
+
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
 static void 
 deleteThreadImmediately(StgTSO *tso);
-
+#endif
 StgInt
-forkProcess(HsStablePtr *entry)
+forkProcess(HsStablePtr *entry
+#ifndef FORKPROCESS_PRIMOP_SUPPORTED
+           STG_UNUSED
+#endif
+           )
 {
-#ifndef mingw32_TARGET_OS
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
   pid_t pid;
   StgTSO* t,*next;
   StgMainThread *m;
@@ -1420,17 +1450,17 @@ forkProcess(HsStablePtr *entry)
       // wipe the main thread list
     while((m = main_threads) != NULL) {
       main_threads = m->link;
-#ifdef THREADED_RTS
+# ifdef THREADED_RTS
       closeCondition(&m->bound_thread_cond);
-#endif
+# endif
       stgFree(m);
     }
     
-#ifdef RTS_SUPPORTS_THREADS
+# ifdef RTS_SUPPORTS_THREADS
     resetTaskManagerAfterFork();      // tell startTask() and friends that
     startingWorkerThread = rtsFalse;  // we have no worker threads any more
     resetWorkerWakeupPipeAfterFork();
-#endif
+# endif
     
     rc = rts_evalStableIO(entry, NULL);  // run the action
     rts_checkSchedStatus("forkProcess",rc);
@@ -1440,10 +1470,10 @@ forkProcess(HsStablePtr *entry)
     hs_exit();                      // clean up and exit
     stg_exit(0);
   }
-#else /* mingw32 */
-  barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
+#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
+  barf("forkProcess#: primop not supported, sorry!\n");
   return -1;
-#endif /* mingw32 */
+#endif
 }
 
 /* ---------------------------------------------------------------------------
@@ -1464,9 +1494,15 @@ deleteAllThreads ( void )
       next = t->global_link;
       deleteThread(t);
   }      
-  run_queue_hd = run_queue_tl = END_TSO_QUEUE;
-  blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
-  sleeping_queue = END_TSO_QUEUE;
+
+  // The run queue now contains a bunch of ThreadKilled threads.  We
+  // must not throw these away: the main thread(s) will be in there
+  // somewhere, and the main scheduler loop has to deal with it.
+  // Also, the run queue is the only thing keeping these threads from
+  // being GC'd, and we don't want the "main thread has been GC'd" panic.
+
+  ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+  ASSERT(sleeping_queue == END_TSO_QUEUE);
 }
 
 /* startThread and  insertThread are now in GranSim.c -- HWL */
@@ -1488,12 +1524,7 @@ deleteAllThreads ( void )
  * ------------------------------------------------------------------------- */
    
 StgInt
-suspendThread( StgRegTable *reg, 
-              rtsBool concCall
-#if !defined(DEBUG)
-              STG_UNUSED
-#endif
-              )
+suspendThread( StgRegTable *reg )
 {
   nat tok;
   Capability *cap;
@@ -1507,7 +1538,7 @@ suspendThread( StgRegTable *reg,
   ACQUIRE_LOCK(&sched_mutex);
 
   IF_DEBUG(scheduler,
-          sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
+          sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
 
   // XXX this might not be necessary --SDM
   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
@@ -1545,8 +1576,7 @@ suspendThread( StgRegTable *reg,
 }
 
 StgRegTable *
-resumeThread( StgInt tok,
-             rtsBool concCall STG_UNUSED )
+resumeThread( StgInt tok )
 {
   StgTSO *tso, **prev;
   Capability *cap;
@@ -1723,9 +1753,10 @@ createThread(nat size)
   /* put a stop frame on the stack */
   tso->sp -= sizeofW(StgStopFrame);
   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+  tso->link = END_TSO_QUEUE;
+
   // ToDo: check this
 #if defined(GRAN)
-  tso->link = END_TSO_QUEUE;
   /* uses more flexible routine in GranSim */
   insertThread(tso, CurrentProc);
 #else
@@ -1902,7 +1933,9 @@ void
 scheduleThread_(StgTSO *tso)
 {
   // Precondition: sched_mutex must be held.
-  PUSH_ON_RUN_QUEUE(tso);
+  // The thread goes at the *end* of the run-queue, to avoid possible
+  // starvation of any threads already on the queue.
+  APPEND_TO_RUN_QUEUE(tso);
   THREAD_RUNNABLE();
 }
 
@@ -1962,7 +1995,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
     */
     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
     
-    PUSH_ON_RUN_QUEUE(tso);
+    APPEND_TO_RUN_QUEUE(tso);
     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
     // bound and only runnable by *this* OS thread, so waking up other
     // workers will just slow things down.
@@ -2392,8 +2425,8 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
       /* if it's a TSO just push it onto the run_queue */
       next = bqe->link;
-      // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
-      PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
+      ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
+      APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
       THREAD_RUNNABLE();
       unblockCount(bqe, node);
       /* reset blocking status after dumping event */
@@ -2437,7 +2470,8 @@ unblockOneLocked(StgTSO *tso)
   ASSERT(tso->why_blocked != NotBlocked);
   tso->why_blocked = NotBlocked;
   next = tso->link;
-  PUSH_ON_RUN_QUEUE(tso);
+  tso->link = END_TSO_QUEUE;
+  APPEND_TO_RUN_QUEUE(tso);
   THREAD_RUNNABLE();
   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
   return next;
@@ -2887,7 +2921,7 @@ unblockThread(StgTSO *tso)
   tso->link = END_TSO_QUEUE;
   tso->why_blocked = NotBlocked;
   tso->block_info.closure = NULL;
-  PUSH_ON_RUN_QUEUE(tso);
+  APPEND_TO_RUN_QUEUE(tso);
 }
 #endif
 
@@ -2931,6 +2965,7 @@ deleteThread(StgTSO *tso)
   raiseAsync(tso,NULL);
 }
 
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
 static void 
 deleteThreadImmediately(StgTSO *tso)
 { // for forkProcess only:
@@ -2947,6 +2982,7 @@ deleteThreadImmediately(StgTSO *tso)
 
   tso->what_next = ThreadKilled;
 }
+#endif
 
 void
 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
@@ -3105,7 +3141,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
            //
            if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
                // revert the black hole
-               UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
+               UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
+                              (StgClosure *)ap);
            }
            sp += sizeofW(StgUpdateFrame) - 1;
            sp[0] = (W_)ap; // push onto stack
@@ -3127,6 +3164,77 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
 }
 
 /* -----------------------------------------------------------------------------
+   raiseExceptionHelper
+   
+   This function is called by the raise# primitve, just so that we can
+   move some of the tricky bits of raising an exception from C-- into
+   C.  Who knows, it might be a useful re-useable thing here too.
+   -------------------------------------------------------------------------- */
+
+StgWord
+raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
+{
+    StgClosure *raise_closure = NULL;
+    StgPtr p, next;
+    StgRetInfoTable *info;
+    //
+    // This closure represents the expression 'raise# E' where E
+    // is the exception raise.  It is used to overwrite all the
+    // thunks which are currently under evaluataion.
+    //
+
+    //    
+    // LDV profiling: stg_raise_info has THUNK as its closure
+    // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
+    // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
+    // 1 does not cause any problem unless profiling is performed.
+    // However, when LDV profiling goes on, we need to linearly scan
+    // small object pool, where raise_closure is stored, so we should
+    // use MIN_UPD_SIZE.
+    //
+    // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
+    //                                        sizeofW(StgClosure)+1);
+    //
+
+    //
+    // Walk up the stack, looking for the catch frame.  On the way,
+    // we update any closures pointed to from update frames with the
+    // raise closure that we just built.
+    //
+    p = tso->sp;
+    while(1) {
+       info = get_ret_itbl((StgClosure *)p);
+       next = p + stack_frame_sizeW((StgClosure *)p);
+       switch (info->i.type) {
+           
+       case UPDATE_FRAME:
+           // Only create raise_closure if we need to.
+           if (raise_closure == NULL) {
+               raise_closure = 
+                   (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
+               SET_HDR(raise_closure, &stg_raise_info, CCCS);
+               raise_closure->payload[0] = exception;
+           }
+           UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
+           p = next;
+           continue;
+           
+       case CATCH_FRAME:
+           tso->sp = p;
+           return CATCH_FRAME;
+           
+       case STOP_FRAME:
+           tso->sp = p;
+           return STOP_FRAME;
+
+       default:
+           p = next; 
+           continue;
+       }
+    }
+}
+
+/* -----------------------------------------------------------------------------
    resurrectThreads is called after garbage collection on the list of
    threads found to be garbage.  Each of these threads will be woken
    up and sent a signal: BlockedOnDeadMVar if the thread was blocked