[project @ 2005-05-25 08:33:15 by simonmar]
authorsimonmar <unknown>
Wed, 25 May 2005 08:33:15 +0000 (08:33 +0000)
committersimonmar <unknown>
Wed, 25 May 2005 08:33:15 +0000 (08:33 +0000)
something very strange happened with previous commit; try again

ghc/rts/Schedule.c

index dcd32da..e646679 100644 (file)
@@ -841,8 +841,9 @@ scheduleCheckBlockedThreads(void)
        // We shouldn't be here...
        barf("schedule: awaitEvent() in threaded RTS");
 #else
-       awaitEvent( EMPTY_RUN_QUEUE() );
+       awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
 #endif
+    }
 }
 
 
@@ -1675,7 +1676,696 @@ scheduleHandleThreadBlocked( StgTSO *t
     /*
       ngoq Dogh!
       ASSERT(procStatus[CurrentProc]==Busy || 
-      ((procStatus[Curren
+      ((procStatus[CurrentProc]==Fetching) && 
+      (t->block_info.closure!=(StgClosure*)NULL)));
+      if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
+      !(!RtsFlags.GranFlags.DoAsyncFetch &&
+      procStatus[CurrentProc]==Fetching)) 
+      procStatus[CurrentProc] = Idle;
+    */
+#elif defined(PAR)
+    IF_DEBUG(scheduler,
+            debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
+                       t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
+    IF_PAR_DEBUG(bq,
+                
+                if (t->block_info.closure!=(StgClosure*)NULL) 
+                print_bq(t->block_info.closure));
+    
+    /* Send a fetch (if BlockedOnGA) and dump event to log file */
+    blockThread(t);
+    
+    /* whatever we schedule next, we must log that schedule */
+    emitSchedule = rtsTrue;
+    
+#else /* !GRAN */
+
+      // We don't need to do anything.  The thread is blocked, and it
+      // has tidied up its stack and placed itself on whatever queue
+      // it needs to be on.
+
+#if !defined(SMP)
+    ASSERT(t->why_blocked != NotBlocked);
+            // This might not be true under SMP: we don't have
+            // exclusive access to this TSO, so someone might have
+            // woken it up by now.  This actually happens: try
+            // conc023 +RTS -N2.
+#endif
+
+    IF_DEBUG(scheduler,
+            debugBelch("--<< thread %d (%s) stopped: ", 
+                       t->id, whatNext_strs[t->what_next]);
+            printThreadBlockage(t);
+            debugBelch("\n"));
+    
+    /* Only for dumping event to log file 
+       ToDo: do I need this in GranSim, too?
+       blockThread(t);
+    */
+#endif
+}
+
+/* -----------------------------------------------------------------------------
+ * Handle a thread that returned to the scheduler with ThreadFinished
+ * ASSUMES: sched_mutex
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+scheduleHandleThreadFinished( StgMainThread *mainThread
+                             USED_WHEN_RTS_SUPPORTS_THREADS,
+                             Capability *cap,
+                             StgTSO *t )
+{
+    /* Need to check whether this was a main thread, and if so,
+     * return with the return value.
+     *
+     * We also end up here if the thread kills itself with an
+     * uncaught exception, see Exception.cmm.
+     */
+    IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
+                                 t->id, whatNext_strs[t->what_next]));
+
+#if defined(GRAN)
+      endThread(t, CurrentProc); // clean-up the thread
+#elif defined(PARALLEL_HASKELL)
+      /* For now all are advisory -- HWL */
+      //if(t->priority==AdvisoryPriority) ??
+      advisory_thread_count--; // JB: Caution with this counter, buggy!
+      
+# if defined(DIST)
+      if(t->dist.priority==RevalPriority)
+       FinishReval(t);
+# endif
+    
+# if defined(EDENOLD)
+      // the thread could still have an outport... (BUG)
+      if (t->eden.outport != -1) {
+      // delete the outport for the tso which has finished...
+       IF_PAR_DEBUG(eden_ports,
+                  debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
+                             t->eden.outport, t->id));
+       deleteOPT(t);
+      }
+      // thread still in the process (HEAVY BUG! since outport has just been closed...)
+      if (t->eden.epid != -1) {
+       IF_PAR_DEBUG(eden_ports,
+                  debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
+                          t->id, t->eden.epid));
+       removeTSOfromProcess(t);
+      }
+# endif 
+
+# if defined(PAR)
+      if (RtsFlags.ParFlags.ParStats.Full &&
+         !RtsFlags.ParFlags.ParStats.Suppressed) 
+       DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
+
+      //  t->par only contains statistics: left out for now...
+      IF_PAR_DEBUG(fish,
+                  debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
+                             t->id,t,t->par.sparkname));
+# endif
+#endif // PARALLEL_HASKELL
+
+      //
+      // Check whether the thread that just completed was a main
+      // thread, and if so return with the result.  
+      //
+      // There is an assumption here that all thread completion goes
+      // through this point; we need to make sure that if a thread
+      // ends up in the ThreadKilled state, that it stays on the run
+      // queue so it can be dealt with here.
+      //
+      if (
+#if defined(RTS_SUPPORTS_THREADS)
+         mainThread != NULL
+#else
+         mainThread->tso == t
+#endif
+         )
+      {
+         // We are a bound thread: this must be our thread that just
+         // completed.
+         ASSERT(mainThread->tso == t);
+
+         if (t->what_next == ThreadComplete) {
+             if (mainThread->ret) {
+                 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+                 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
+             }
+             mainThread->stat = Success;
+         } else {
+             if (mainThread->ret) {
+                 *(mainThread->ret) = NULL;
+             }
+             if (interrupted) {
+                 mainThread->stat = Interrupted;
+             } else {
+                 mainThread->stat = Killed;
+             }
+         }
+#ifdef DEBUG
+         removeThreadLabel((StgWord)mainThread->tso->id);
+#endif
+         if (mainThread->prev == NULL) {
+             ASSERT(mainThread == main_threads);
+             main_threads = mainThread->link;
+         } else {
+             mainThread->prev->link = mainThread->link;
+         }
+         if (mainThread->link != NULL) {
+             mainThread->link->prev = mainThread->prev;
+         }
+         releaseCapability(cap);
+         return rtsTrue; // tells schedule() to return
+      }
+
+#ifdef RTS_SUPPORTS_THREADS
+      ASSERT(t->main == NULL);
+#else
+      if (t->main != NULL) {
+         // Must be a main thread that is not the topmost one.  Leave
+         // it on the run queue until the stack has unwound to the
+         // point where we can deal with this.  Leaving it on the run
+         // queue also ensures that the garbage collector knows about
+         // this thread and its return value (it gets dropped from the
+         // all_threads list so there's no other way to find it).
+         APPEND_TO_RUN_QUEUE(t);
+      }
+#endif
+      return rtsFalse;
+}
+
+/* -----------------------------------------------------------------------------
+ * Perform a heap census, if PROFILING
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
+{
+#if defined(PROFILING)
+    // When we have +RTS -i0 and we're heap profiling, do a census at
+    // every GC.  This lets us get repeatable runs for debugging.
+    if (performHeapProfile ||
+       (RtsFlags.ProfFlags.profileInterval==0 &&
+        RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
+       GarbageCollect(GetRoots, rtsTrue);
+       heapCensus();
+       performHeapProfile = rtsFalse;
+       return rtsTrue;  // true <=> we already GC'd
+    }
+#endif
+    return rtsFalse;
+}
+
+/* -----------------------------------------------------------------------------
+ * Perform a garbage collection if necessary
+ * ASSUMES: sched_mutex
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleDoGC( rtsBool force_major )
+{
+    StgTSO *t;
+#ifdef SMP
+    Capability *cap;
+    static rtsBool waiting_for_gc;
+    int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
+           // subtract one because we're already holding one.
+    Capability *caps[n_capabilities];
+#endif
+
+#ifdef SMP
+    // In order to GC, there must be no threads running Haskell code.
+    // Therefore, the GC thread needs to hold *all* the capabilities,
+    // and release them after the GC has completed.  
+    //
+    // This seems to be the simplest way: previous attempts involved
+    // making all the threads with capabilities give up their
+    // capabilities and sleep except for the *last* one, which
+    // actually did the GC.  But it's quite hard to arrange for all
+    // the other tasks to sleep and stay asleep.
+    //
+       
+    // Someone else is already trying to GC
+    if (waiting_for_gc) return;
+    waiting_for_gc = rtsTrue;
+
+    while (n_capabilities > 0) {
+       IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+       waitForReturnCapability(&sched_mutex, &cap);
+       n_capabilities--;
+       caps[n_capabilities] = cap;
+    }
+
+    waiting_for_gc = rtsFalse;
+#endif
+
+    /* Kick any transactions which are invalid back to their
+     * atomically frames.  When next scheduled they will try to
+     * commit, this commit will fail and they will retry.
+     */
+    for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+       if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+           if (!stmValidateTransaction (t -> trec)) {
+               IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+               
+               // strip the stack back to the ATOMICALLY_FRAME, aborting
+               // the (nested) transaction, and saving the stack of any
+               // partially-evaluated thunks on the heap.
+               raiseAsync_(t, NULL, rtsTrue);
+               
+#ifdef REG_R1
+               ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+#endif
+           }
+       }
+    }
+    
+    // so this happens periodically:
+    scheduleCheckBlackHoles();
+    
+    IF_DEBUG(scheduler, printAllThreads());
+
+    /* everybody back, start the GC.
+     * Could do it in this thread, or signal a condition var
+     * to do it in another thread.  Either way, we need to
+     * broadcast on gc_pending_cond afterward.
+     */
+#if defined(RTS_SUPPORTS_THREADS)
+    IF_DEBUG(scheduler,sched_belch("doing GC"));
+#endif
+    GarbageCollect(GetRoots, force_major);
+    
+#if defined(SMP)
+    {
+       // release our stash of capabilities.
+       nat i;
+       for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+           releaseCapability(caps[i]);
+       }
+    }
+#endif
+
+#if defined(GRAN)
+    /* add a ContinueThread event to continue execution of current thread */
+    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+             ContinueThread,
+             t, (StgClosure*)NULL, (rtsSpark*)NULL);
+    IF_GRAN_DEBUG(bq, 
+                 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+                 G_EVENTQ(0);
+                 G_CURR_THREADQ(0));
+#endif /* GRAN */
+}
+
+/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#if defined(RTS_SUPPORTS_THREADS)
+  return rtsTrue;
+#else
+  return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+StgBool
+isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+{
+#if defined(RTS_SUPPORTS_THREADS)
+  return (tso->main != NULL);
+#endif
+  return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
+ * Singleton fork(). Do not copy any running threads.
+ * ------------------------------------------------------------------------- */
+
+#ifndef mingw32_HOST_OS
+#define FORKPROCESS_PRIMOP_SUPPORTED
+#endif
+
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void 
+deleteThreadImmediately(StgTSO *tso);
+#endif
+StgInt
+forkProcess(HsStablePtr *entry
+#ifndef FORKPROCESS_PRIMOP_SUPPORTED
+           STG_UNUSED
+#endif
+           )
+{
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+  pid_t pid;
+  StgTSO* t,*next;
+  StgMainThread *m;
+  SchedulerStatus rc;
+
+  IF_DEBUG(scheduler,sched_belch("forking!"));
+  rts_lock(); // This not only acquires sched_mutex, it also
+              // makes sure that no other threads are running
+
+  pid = fork();
+
+  if (pid) { /* parent */
+
+  /* just return the pid */
+    rts_unlock();
+    return pid;
+    
+  } else { /* child */
+    
+    
+      // delete all threads
+    run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+    
+    for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      next = t->link;
+
+        // don't allow threads to catch the ThreadKilled exception
+      deleteThreadImmediately(t);
+    }
+    
+      // wipe the main thread list
+    while((m = main_threads) != NULL) {
+      main_threads = m->link;
+# ifdef THREADED_RTS
+      closeCondition(&m->bound_thread_cond);
+# endif
+      stgFree(m);
+    }
+    
+    rc = rts_evalStableIO(entry, NULL);  // run the action
+    rts_checkSchedStatus("forkProcess",rc);
+    
+    rts_unlock();
+    
+    hs_exit();                      // clean up and exit
+    stg_exit(0);
+  }
+#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
+  barf("forkProcess#: primop not supported, sorry!\n");
+  return -1;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * deleteAllThreads():  kill all the live threads.
+ *
+ * This is used when we catch a user interrupt (^C), before performing
+ * any necessary cleanups and running finalizers.
+ *
+ * Locks: sched_mutex held.
+ * ------------------------------------------------------------------------- */
+   
+void
+deleteAllThreads ( void )
+{
+  StgTSO* t, *next;
+  IF_DEBUG(scheduler,sched_belch("deleting all threads"));
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+         deleteThread(t);
+      }
+  }      
+
+  // The run queue now contains a bunch of ThreadKilled threads.  We
+  // must not throw these away: the main thread(s) will be in there
+  // somewhere, and the main scheduler loop has to deal with it.
+  // Also, the run queue is the only thing keeping these threads from
+  // being GC'd, and we don't want the "main thread has been GC'd" panic.
+
+  ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+  ASSERT(blackhole_queue == END_TSO_QUEUE);
+  ASSERT(sleeping_queue == END_TSO_QUEUE);
+}
+
+/* startThread and  insertThread are now in GranSim.c -- HWL */
+
+
+/* ---------------------------------------------------------------------------
+ * Suspending & resuming Haskell threads.
+ * 
+ * When making a "safe" call to C (aka _ccall_GC), the task gives back
+ * its capability before calling the C function.  This allows another
+ * task to pick up the capability and carry on running Haskell
+ * threads.  It also means that if the C call blocks, it won't lock
+ * the whole system.
+ *
+ * The Haskell thread making the C call is put to sleep for the
+ * duration of the call, on the susepended_ccalling_threads queue.  We
+ * give out a token to the task, which it can use to resume the thread
+ * on return from the C function.
+ * ------------------------------------------------------------------------- */
+   
+StgInt
+suspendThread( StgRegTable *reg )
+{
+  nat tok;
+  Capability *cap;
+  int saved_errno = errno;
+
+  /* assume that *reg is a pointer to the StgRegTable part
+   * of a Capability.
+   */
+  cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
+
+  ACQUIRE_LOCK(&sched_mutex);
+
+  IF_DEBUG(scheduler,
+          sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
+
+  // XXX this might not be necessary --SDM
+  cap->r.rCurrentTSO->what_next = ThreadRunGHC;
+
+  threadPaused(cap->r.rCurrentTSO);
+  cap->r.rCurrentTSO->link = suspended_ccalling_threads;
+  suspended_ccalling_threads = cap->r.rCurrentTSO;
+
+  if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
+      cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+      cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+  } else {
+      cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+  }
+
+  /* Use the thread ID as the token; it should be unique */
+  tok = cap->r.rCurrentTSO->id;
+
+  /* Hand back capability */
+  cap->r.rInHaskell = rtsFalse;
+  releaseCapability(cap);
+  
+#if defined(RTS_SUPPORTS_THREADS)
+  /* Preparing to leave the RTS, so ensure there's a native thread/task
+     waiting to take over.
+  */
+  IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
+#endif
+
+  RELEASE_LOCK(&sched_mutex);
+  
+  errno = saved_errno;
+  return tok; 
+}
+
+StgRegTable *
+resumeThread( StgInt tok )
+{
+  StgTSO *tso, **prev;
+  Capability *cap;
+  int saved_errno = errno;
+
+#if defined(RTS_SUPPORTS_THREADS)
+  /* Wait for permission to re-enter the RTS with the result. */
+  ACQUIRE_LOCK(&sched_mutex);
+  waitForReturnCapability(&sched_mutex, &cap);
+
+  IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
+#else
+  grabCapability(&cap);
+#endif
+
+  /* Remove the thread off of the suspended list */
+  prev = &suspended_ccalling_threads;
+  for (tso = suspended_ccalling_threads; 
+       tso != END_TSO_QUEUE; 
+       prev = &tso->link, tso = tso->link) {
+    if (tso->id == (StgThreadID)tok) {
+      *prev = tso->link;
+      break;
+    }
+  }
+  if (tso == END_TSO_QUEUE) {
+    barf("resumeThread: thread not found");
+  }
+  tso->link = END_TSO_QUEUE;
+  
+  if(tso->why_blocked == BlockedOnCCall) {
+      awakenBlockedQueueNoLock(tso->blocked_exceptions);
+      tso->blocked_exceptions = NULL;
+  }
+  
+  /* Reset blocking status */
+  tso->why_blocked  = NotBlocked;
+
+  cap->r.rCurrentTSO = tso;
+  cap->r.rInHaskell = rtsTrue;
+  RELEASE_LOCK(&sched_mutex);
+  errno = saved_errno;
+  return &cap->r;
+}
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int
+cmp_thread(StgPtr tso1, StgPtr tso2) 
+{ 
+  StgThreadID id1 = ((StgTSO *)tso1)->id; 
+  StgThreadID id2 = ((StgTSO *)tso2)->id;
+  if (id1 < id2) return (-1);
+  if (id1 > id2) return 1;
+  return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ * Fetching the ThreadID from an StgTSO.
+ *
+ * This is used in the implementation of Show for ThreadIds.
+ * ------------------------------------------------------------------------ */
+int
+rts_getThreadId(StgPtr tso) 
+{
+  return ((StgTSO *)tso)->id;
+}
+
+#ifdef DEBUG
+void
+labelThread(StgPtr tso, char *label)
+{
+  int len;
+  void *buf;
+
+  /* Caveat: Once set, you can only set the thread name to "" */
+  len = strlen(label)+1;
+  buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+  strncpy(buf,label,len);
+  /* Update will free the old memory for us */
+  updateThreadLabel(((StgTSO *)tso)->id,buf);
+}
+#endif /* DEBUG */
+
+/* ---------------------------------------------------------------------------
+   Create a new thread.
+
+   The new thread starts with the given stack size.  Before the
+   scheduler can run, however, this thread needs to have a closure
+   (and possibly some arguments) pushed on its stack.  See
+   pushClosure() in Schedule.h.
+
+   createGenThread() and createIOThread() (in SchedAPI.h) are
+   convenient packaged versions of this function.
+
+   currently pri (priority) is only used in a GRAN setup -- HWL
+   ------------------------------------------------------------------------ */
+#if defined(GRAN)
+/*   currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat size, StgInt pri)
+#else
+StgTSO *
+createThread(nat size)
+#endif
+{
+
+    StgTSO *tso;
+    nat stack_size;
+
+    /* First check whether we should create a thread at all */
+#if defined(PARALLEL_HASKELL)
+  /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+  if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+    threadsIgnored++;
+    debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+         RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+    return END_TSO_QUEUE;
+  }
+  threadsCreated++;
+#endif
+
+#if defined(GRAN)
+  ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+  // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+  /* catch ridiculously small stack sizes */
+  if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+    size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+  }
+
+  stack_size = size - TSO_STRUCT_SIZEW;
+
+  tso = (StgTSO *)allocate(size);
+  TICK_ALLOC_TSO(stack_size, 0);
+
+  SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+#if defined(GRAN)
+  SET_GRAN_HDR(tso, ThisPE);
+#endif
+
+  // Always start with the compiled code evaluator
+  tso->what_next = ThreadRunGHC;
+
+  tso->id = next_thread_id++; 
+  tso->why_blocked  = NotBlocked;
+  tso->blocked_exceptions = NULL;
+
+  tso->saved_errno = 0;
+  tso->main = NULL;
+  
+  tso->stack_size   = stack_size;
+  tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
+                              - TSO_STRUCT_SIZEW;
+  tso->sp           = (P_)&(tso->stack) + stack_size;
+
+  tso->trec = NO_TREC;
+
+#ifdef PROFILING
+  tso->prof.CCCS = CCS_MAIN;
+#endif
+
+  /* put a stop frame on the stack */
+  tso->sp -= sizeofW(StgStopFrame);
+  SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+  tso->link = END_TSO_QUEUE;
+
+  // ToDo: check this
+#if defined(GRAN)
+  /* uses more flexible routine in GranSim */
+  insertThread(tso, CurrentProc);
+#else
+  /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
    * from its creation
    */
 #endif
@@ -3441,7 +4131,159 @@ printThreadStatus(StgTSO *tso)
 {
   switch (tso->what_next) {
   case ThreadKilled:
-    debugBelch("has 
+    debugBelch("has been killed");
+    break;
+  case ThreadComplete:
+    debugBelch("has completed");
+    break;
+  default:
+    printThreadBlockage(tso);
+  }
+}
+
+void
+printAllThreads(void)
+{
+  StgTSO *t;
+
+# if defined(GRAN)
+  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+  ullong_format_string(TIME_ON_PROC(CurrentProc), 
+                      time_string, rtsFalse/*no commas!*/);
+
+  debugBelch("all threads at [%s]:\n", time_string);
+# elif defined(PARALLEL_HASKELL)
+  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+  ullong_format_string(CURRENT_TIME,
+                      time_string, rtsFalse/*no commas!*/);
+
+  debugBelch("all threads at [%s]:\n", time_string);
+# else
+  debugBelch("all threads:\n");
+# endif
+
+  for (t = all_threads; t != END_TSO_QUEUE; ) {
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+#if defined(DEBUG)
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+#endif
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+       t = t->link;
+    } else {
+       printThreadStatus(t);
+       debugBelch("\n");
+       t = t->global_link;
+    }
+  }
+}
+
+#ifdef DEBUG
+
+// useful from gdb
+void 
+printThreadQueue(StgTSO *t)
+{
+    nat i = 0;
+    for (; t != END_TSO_QUEUE; t = t->link) {
+       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+       if (t->what_next == ThreadRelocated) {
+           debugBelch("has been relocated...\n");
+       } else {
+           printThreadStatus(t);
+           debugBelch("\n");
+       }
+       i++;
+    }
+    debugBelch("%d threads on queue\n", i);
+}
+
+/* 
+   Print a whole blocking queue attached to node (debugging only).
+*/
+# if defined(PARALLEL_HASKELL)
+void 
+print_bq (StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+  StgTSO *tso;
+  rtsBool end;
+
+  debugBelch("## BQ of closure %p (%s): ",
+         node, info_type(node));
+
+  /* should cover all closures that may have a blocking queue */
+  ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+        get_itbl(node)->type == FETCH_ME_BQ ||
+        get_itbl(node)->type == RBH ||
+        get_itbl(node)->type == MVAR);
+    
+  ASSERT(node!=(StgClosure*)NULL);         // sanity check
+
+  print_bqe(((StgBlockingQueue*)node)->blocking_queue);
+}
+
+/* 
+   Print a whole blocking queue starting with the element bqe.
+*/
+void 
+print_bqe (StgBlockingQueueElement *bqe)
+{
+  rtsBool end;
+
+  /* 
+     NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+  */
+  for (end = (bqe==END_BQ_QUEUE);
+       !end; // iterate until bqe points to a CONSTR
+       end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
+       bqe = end ? END_BQ_QUEUE : bqe->link) {
+    ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
+    ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
+    /* types of closures that may appear in a blocking queue */
+    ASSERT(get_itbl(bqe)->type == TSO ||           
+          get_itbl(bqe)->type == BLOCKED_FETCH || 
+          get_itbl(bqe)->type == CONSTR); 
+    /* only BQs of an RBH end with an RBH_Save closure */
+    //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+    switch (get_itbl(bqe)->type) {
+    case TSO:
+      debugBelch(" TSO %u (%x),",
+             ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+      break;
+    case BLOCKED_FETCH:
+      debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
+             ((StgBlockedFetch *)bqe)->node, 
+             ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+             ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+             ((StgBlockedFetch *)bqe)->ga.weight);
+      break;
+    case CONSTR:
+      debugBelch(" %s (IP %p),",
+             (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+              get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+              get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+              "RBH_Save_?"), get_itbl(bqe));
+      break;
+    default:
+      barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
+          info_type((StgClosure *)bqe)); // , node, info_type(node));
+      break;
+    }
+  } /* for */
+  debugBelch("\n");
+}
+# elif defined(GRAN)
+void 
+print_bq (StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+  PEs node_loc, tso_loc;
+  rtsBool end;
 
   /* should cover all closures that may have a blocking queue */
   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||