[project @ 2004-11-10 04:17:50 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "Timer.h"
59 #include "Prelude.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
62 #include "Updates.h"
63 #ifdef PROFILING
64 #include "Proftimer.h"
65 #include "ProfHeap.h"
66 #endif
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
69 # include "GranSim.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
73 # include "FetchMe.h"
74 # include "HLC.h"
75 #endif
76 #include "Sparks.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 #ifdef THREADED_RTS
97 #define USED_IN_THREADED_RTS
98 #else
99 #define USED_IN_THREADED_RTS STG_UNUSED
100 #endif
101
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #else
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
106 #endif
107
108 /* Main thread queue.
109  * Locks required: sched_mutex.
110  */
111 StgMainThread *main_threads = NULL;
112
113 /* Thread queues.
114  * Locks required: sched_mutex.
115  */
116 #if defined(GRAN)
117
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
120
121 /* 
122    In GranSim we have a runnable and a blocked queue for each processor.
123    In order to minimise code changes new arrays run_queue_hds/tls
124    are created. run_queue_hd is then a short cut (macro) for
125    run_queue_hds[CurrentProc] (see GranSim.h).
126    -- HWL
127 */
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132    the std RTS (i.e. we are cheating). However, we don't use this list in
133    the GranSim specific code at the moment (so we are only potentially
134    cheating).  */
135
136 #else /* !GRAN */
137
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
143
144 #endif
145
146 /* Linked list of all threads.
147  * Used for detecting garbage collected threads.
148  */
149 StgTSO *all_threads = NULL;
150
151 /* When a thread performs a safe C call (_ccall_GC, using old
152  * terminology), it gets put on the suspended_ccalling_threads
153  * list. Used by the garbage collector.
154  */
155 static StgTSO *suspended_ccalling_threads;
156
157 static StgTSO *threadStackOverflow(StgTSO *tso);
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 int context_switch = 0;
166
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
169
170 /* Next thread ID to allocate.
171  * Locks required: thread_id_mutex
172  */
173 static StgThreadID next_thread_id = 1;
174
175 /*
176  * Pointers to the state of the current thread.
177  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
179  */
180  
181 /* The smallest stack size that makes any sense is:
182  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
183  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
184  *  + 1                       (the closure to enter)
185  *  + 1                       (stg_ap_v_ret)
186  *  + 1                       (spare slot req'd by stg_ap_v_ret)
187  *
188  * A thread with this stack will bomb immediately with a stack
189  * overflow, which will increase its stack size.  
190  */
191
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
193
194
195 #if defined(GRAN)
196 StgTSO *CurrentTSO;
197 #endif
198
199 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
200  *  exists - earlier gccs apparently didn't.
201  *  -= chak
202  */
203 StgTSO dummy_tso;
204
205 static rtsBool ready_to_gc;
206
207 /*
208  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209  * in an MT setting, needed to signal that a worker thread shouldn't hang around
210  * in the scheduler when it is out of work.
211  */
212 static rtsBool shutting_down_scheduler = rtsFalse;
213
214 void            addToBlockedQueue ( StgTSO *tso );
215
216 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
217        void     interruptStgRts   ( void );
218
219 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
220 static void     detectBlackHoles  ( void );
221 #endif
222
223 #if defined(RTS_SUPPORTS_THREADS)
224 /* ToDo: carefully document the invariants that go together
225  *       with these synchronisation objects.
226  */
227 Mutex     sched_mutex       = INIT_MUTEX_VAR;
228 Mutex     term_mutex        = INIT_MUTEX_VAR;
229
230 #endif /* RTS_SUPPORTS_THREADS */
231
232 #if defined(PAR)
233 StgTSO *LastTSO;
234 rtsTime TimeOfLastYield;
235 rtsBool emitSchedule = rtsTrue;
236 #endif
237
238 #if DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 #if defined(PAR)
250 StgTSO * createSparkThread(rtsSpark spark);
251 StgTSO * activateSpark (rtsSpark spark);  
252 #endif
253
254 /* ----------------------------------------------------------------------------
255  * Starting Tasks
256  * ------------------------------------------------------------------------- */
257
258 #if defined(RTS_SUPPORTS_THREADS)
259 static rtsBool startingWorkerThread = rtsFalse;
260
261 static void taskStart(void);
262 static void
263 taskStart(void)
264 {
265   ACQUIRE_LOCK(&sched_mutex);
266   startingWorkerThread = rtsFalse;
267   schedule(NULL,NULL);
268   RELEASE_LOCK(&sched_mutex);
269 }
270
271 void
272 startSchedulerTaskIfNecessary(void)
273 {
274   if(run_queue_hd != END_TSO_QUEUE
275     || blocked_queue_hd != END_TSO_QUEUE
276     || sleeping_queue != END_TSO_QUEUE)
277   {
278     if(!startingWorkerThread)
279     { // we don't want to start another worker thread
280       // just because the last one hasn't yet reached the
281       // "waiting for capability" state
282       startingWorkerThread = rtsTrue;
283       if(!startTask(taskStart))
284       {
285         startingWorkerThread = rtsFalse;
286       }
287     }
288   }
289 }
290 #endif
291
292 /* ---------------------------------------------------------------------------
293    Main scheduling loop.
294
295    We use round-robin scheduling, each thread returning to the
296    scheduler loop when one of these conditions is detected:
297
298       * out of heap space
299       * timer expires (thread yields)
300       * thread blocks
301       * thread ends
302       * stack overflow
303
304    Locking notes:  we acquire the scheduler lock once at the beginning
305    of the scheduler loop, and release it when
306     
307       * running a thread, or
308       * waiting for work, or
309       * waiting for a GC to complete.
310
311    GRAN version:
312      In a GranSim setup this loop iterates over the global event queue.
313      This revolves around the global event queue, which determines what 
314      to do next. Therefore, it's more complicated than either the 
315      concurrent or the parallel (GUM) setup.
316
317    GUM version:
318      GUM iterates over incoming messages.
319      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
320      and sends out a fish whenever it has nothing to do; in-between
321      doing the actual reductions (shared code below) it processes the
322      incoming messages and deals with delayed operations 
323      (see PendingFetches).
324      This is not the ugliest code you could imagine, but it's bloody close.
325
326    ------------------------------------------------------------------------ */
327 static void
328 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
329           Capability *initialCapability )
330 {
331   StgTSO *t;
332   Capability *cap;
333   StgThreadReturnCode ret;
334 #if defined(GRAN)
335   rtsEvent *event;
336 #elif defined(PAR)
337   StgSparkPool *pool;
338   rtsSpark spark;
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   rtsBool was_interrupted = rtsFalse;
347   nat prev_what_next;
348   
349   // Pre-condition: sched_mutex is held.
350   // We might have a capability, passed in as initialCapability.
351   cap = initialCapability;
352
353 #if defined(RTS_SUPPORTS_THREADS)
354   //
355   // in the threaded case, the capability is either passed in via the
356   // initialCapability parameter, or initialized inside the scheduler
357   // loop 
358   //
359   IF_DEBUG(scheduler,
360            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
361                        mainThread, initialCapability);
362       );
363 #else
364   // simply initialise it in the non-threaded case
365   grabCapability(&cap);
366 #endif
367
368 #if defined(GRAN)
369   /* set up first event to get things going */
370   /* ToDo: assign costs for system setup and init MainTSO ! */
371   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
372             ContinueThread, 
373             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
374
375   IF_DEBUG(gran,
376            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377            G_TSO(CurrentTSO, 5));
378
379   if (RtsFlags.GranFlags.Light) {
380     /* Save current time; GranSim Light only */
381     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
382   }      
383
384   event = get_next_event();
385
386   while (event!=(rtsEvent*)NULL) {
387     /* Choose the processor with the next event */
388     CurrentProc = event->proc;
389     CurrentTSO = event->tso;
390
391 #elif defined(PAR)
392
393   while (!receivedFinish) {    /* set by processMessages */
394                                /* when receiving PP_FINISH message         */ 
395
396 #else // everything except GRAN and PAR
397
398   while (1) {
399
400 #endif
401
402      IF_DEBUG(scheduler, printAllThreads());
403
404 #if defined(RTS_SUPPORTS_THREADS)
405       // Yield the capability to higher-priority tasks if necessary.
406       //
407       if (cap != NULL) {
408           yieldCapability(&cap);
409       }
410
411       // If we do not currently hold a capability, we wait for one
412       //
413       if (cap == NULL) {
414           waitForCapability(&sched_mutex, &cap,
415                             mainThread ? &mainThread->bound_thread_cond : NULL);
416       }
417
418       // We now have a capability...
419 #endif
420
421     //
422     // If we're interrupted (the user pressed ^C, or some other
423     // termination condition occurred), kill all the currently running
424     // threads.
425     //
426     if (interrupted) {
427         IF_DEBUG(scheduler, sched_belch("interrupted"));
428         interrupted = rtsFalse;
429         was_interrupted = rtsTrue;
430 #if defined(RTS_SUPPORTS_THREADS)
431         // In the threaded RTS, deadlock detection doesn't work,
432         // so just exit right away.
433         errorBelch("interrupted");
434         releaseCapability(cap);
435         RELEASE_LOCK(&sched_mutex);
436         shutdownHaskellAndExit(EXIT_SUCCESS);
437 #else
438         deleteAllThreads();
439 #endif
440     }
441
442 #if defined(RTS_USER_SIGNALS)
443     // check for signals each time around the scheduler
444     if (signals_pending()) {
445       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
446       startSignalHandlers();
447       ACQUIRE_LOCK(&sched_mutex);
448     }
449 #endif
450
451     //
452     // Check whether any waiting threads need to be woken up.  If the
453     // run queue is empty, and there are no other tasks running, we
454     // can wait indefinitely for something to happen.
455     //
456     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
457     {
458 #if defined(RTS_SUPPORTS_THREADS)
459         // We shouldn't be here...
460         barf("schedule: awaitEvent() in threaded RTS");
461 #endif
462         awaitEvent( EMPTY_RUN_QUEUE() );
463     }
464     // we can be interrupted while waiting for I/O...
465     if (interrupted) continue;
466
467     /* 
468      * Detect deadlock: when we have no threads to run, there are no
469      * threads waiting on I/O or sleeping, and all the other tasks are
470      * waiting for work, we must have a deadlock of some description.
471      *
472      * We first try to find threads blocked on themselves (ie. black
473      * holes), and generate NonTermination exceptions where necessary.
474      *
475      * If no threads are black holed, we have a deadlock situation, so
476      * inform all the main threads.
477      */
478 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
479     if (   EMPTY_THREAD_QUEUES() )
480     {
481         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
482
483         // Garbage collection can release some new threads due to
484         // either (a) finalizers or (b) threads resurrected because
485         // they are unreachable and will therefore be sent an
486         // exception.  Any threads thus released will be immediately
487         // runnable.
488         GarbageCollect(GetRoots,rtsTrue);
489         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
490
491 #if defined(RTS_USER_SIGNALS)
492         /* If we have user-installed signal handlers, then wait
493          * for signals to arrive rather then bombing out with a
494          * deadlock.
495          */
496         if ( anyUserHandlers() ) {
497             IF_DEBUG(scheduler, 
498                      sched_belch("still deadlocked, waiting for signals..."));
499
500             awaitUserSignals();
501
502             // we might be interrupted...
503             if (interrupted) { continue; }
504
505             if (signals_pending()) {
506                 RELEASE_LOCK(&sched_mutex);
507                 startSignalHandlers();
508                 ACQUIRE_LOCK(&sched_mutex);
509             }
510             ASSERT(!EMPTY_RUN_QUEUE());
511             goto not_deadlocked;
512         }
513 #endif
514
515         /* Probably a real deadlock.  Send the current main thread the
516          * Deadlock exception (or in the SMP build, send *all* main
517          * threads the deadlock exception, since none of them can make
518          * progress).
519          */
520         {
521             StgMainThread *m;
522             m = main_threads;
523             switch (m->tso->why_blocked) {
524             case BlockedOnBlackHole:
525             case BlockedOnException:
526             case BlockedOnMVar:
527                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
528                 break;
529             default:
530                 barf("deadlock: main thread blocked in a strange way");
531             }
532         }
533     }
534   not_deadlocked:
535
536 #elif defined(RTS_SUPPORTS_THREADS)
537     // ToDo: add deadlock detection in threaded RTS
538 #elif defined(PAR)
539     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
540 #endif
541
542 #if defined(RTS_SUPPORTS_THREADS)
543     if ( EMPTY_RUN_QUEUE() ) {
544         continue; // nothing to do
545     }
546 #endif
547
548 #if defined(GRAN)
549     if (RtsFlags.GranFlags.Light)
550       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
551
552     /* adjust time based on time-stamp */
553     if (event->time > CurrentTime[CurrentProc] &&
554         event->evttype != ContinueThread)
555       CurrentTime[CurrentProc] = event->time;
556     
557     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
558     if (!RtsFlags.GranFlags.Light)
559       handleIdlePEs();
560
561     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
562
563     /* main event dispatcher in GranSim */
564     switch (event->evttype) {
565       /* Should just be continuing execution */
566     case ContinueThread:
567       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
568       /* ToDo: check assertion
569       ASSERT(run_queue_hd != (StgTSO*)NULL &&
570              run_queue_hd != END_TSO_QUEUE);
571       */
572       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
573       if (!RtsFlags.GranFlags.DoAsyncFetch &&
574           procStatus[CurrentProc]==Fetching) {
575         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
576               CurrentTSO->id, CurrentTSO, CurrentProc);
577         goto next_thread;
578       } 
579       /* Ignore ContinueThreads for completed threads */
580       if (CurrentTSO->what_next == ThreadComplete) {
581         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
582               CurrentTSO->id, CurrentTSO, CurrentProc);
583         goto next_thread;
584       } 
585       /* Ignore ContinueThreads for threads that are being migrated */
586       if (PROCS(CurrentTSO)==Nowhere) { 
587         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
588               CurrentTSO->id, CurrentTSO, CurrentProc);
589         goto next_thread;
590       }
591       /* The thread should be at the beginning of the run queue */
592       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
593         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
594               CurrentTSO->id, CurrentTSO, CurrentProc);
595         break; // run the thread anyway
596       }
597       /*
598       new_event(proc, proc, CurrentTime[proc],
599                 FindWork,
600                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
601       goto next_thread; 
602       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
603       break; // now actually run the thread; DaH Qu'vam yImuHbej 
604
605     case FetchNode:
606       do_the_fetchnode(event);
607       goto next_thread;             /* handle next event in event queue  */
608       
609     case GlobalBlock:
610       do_the_globalblock(event);
611       goto next_thread;             /* handle next event in event queue  */
612       
613     case FetchReply:
614       do_the_fetchreply(event);
615       goto next_thread;             /* handle next event in event queue  */
616       
617     case UnblockThread:   /* Move from the blocked queue to the tail of */
618       do_the_unblock(event);
619       goto next_thread;             /* handle next event in event queue  */
620       
621     case ResumeThread:  /* Move from the blocked queue to the tail of */
622       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
623       event->tso->gran.blocktime += 
624         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
625       do_the_startthread(event);
626       goto next_thread;             /* handle next event in event queue  */
627       
628     case StartThread:
629       do_the_startthread(event);
630       goto next_thread;             /* handle next event in event queue  */
631       
632     case MoveThread:
633       do_the_movethread(event);
634       goto next_thread;             /* handle next event in event queue  */
635       
636     case MoveSpark:
637       do_the_movespark(event);
638       goto next_thread;             /* handle next event in event queue  */
639       
640     case FindWork:
641       do_the_findwork(event);
642       goto next_thread;             /* handle next event in event queue  */
643       
644     default:
645       barf("Illegal event type %u\n", event->evttype);
646     }  /* switch */
647     
648     /* This point was scheduler_loop in the old RTS */
649
650     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
651
652     TimeOfLastEvent = CurrentTime[CurrentProc];
653     TimeOfNextEvent = get_time_of_next_event();
654     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
655     // CurrentTSO = ThreadQueueHd;
656
657     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
658                          TimeOfNextEvent));
659
660     if (RtsFlags.GranFlags.Light) 
661       GranSimLight_leave_system(event, &ActiveTSO); 
662
663     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
664
665     IF_DEBUG(gran, 
666              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
667
668     /* in a GranSim setup the TSO stays on the run queue */
669     t = CurrentTSO;
670     /* Take a thread from the run queue. */
671     POP_RUN_QUEUE(t); // take_off_run_queue(t);
672
673     IF_DEBUG(gran, 
674              debugBelch("GRAN: About to run current thread, which is\n");
675              G_TSO(t,5));
676
677     context_switch = 0; // turned on via GranYield, checking events and time slice
678
679     IF_DEBUG(gran, 
680              DumpGranEvent(GR_SCHEDULE, t));
681
682     procStatus[CurrentProc] = Busy;
683
684 #elif defined(PAR)
685     if (PendingFetches != END_BF_QUEUE) {
686         processFetches();
687     }
688
689     /* ToDo: phps merge with spark activation above */
690     /* check whether we have local work and send requests if we have none */
691     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
692       /* :-[  no local threads => look out for local sparks */
693       /* the spark pool for the current PE */
694       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
695       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
696           pool->hd < pool->tl) {
697         /* 
698          * ToDo: add GC code check that we really have enough heap afterwards!!
699          * Old comment:
700          * If we're here (no runnable threads) and we have pending
701          * sparks, we must have a space problem.  Get enough space
702          * to turn one of those pending sparks into a
703          * thread... 
704          */
705
706         spark = findSpark(rtsFalse);                /* get a spark */
707         if (spark != (rtsSpark) NULL) {
708           tso = activateSpark(spark);       /* turn the spark into a thread */
709           IF_PAR_DEBUG(schedule,
710                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
711                              tso->id, tso, advisory_thread_count));
712
713           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
714             debugBelch("==^^ failed to activate spark\n");
715             goto next_thread;
716           }               /* otherwise fall through & pick-up new tso */
717         } else {
718           IF_PAR_DEBUG(verbose,
719                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
720                              spark_queue_len(pool)));
721           goto next_thread;
722         }
723       }
724
725       /* If we still have no work we need to send a FISH to get a spark
726          from another PE 
727       */
728       if (EMPTY_RUN_QUEUE()) {
729       /* =8-[  no local sparks => look for work on other PEs */
730         /*
731          * We really have absolutely no work.  Send out a fish
732          * (there may be some out there already), and wait for
733          * something to arrive.  We clearly can't run any threads
734          * until a SCHEDULE or RESUME arrives, and so that's what
735          * we're hoping to see.  (Of course, we still have to
736          * respond to other types of messages.)
737          */
738         TIME now = msTime() /*CURRENT_TIME*/;
739         IF_PAR_DEBUG(verbose, 
740                      debugBelch("--  now=%ld\n", now));
741         IF_PAR_DEBUG(verbose,
742                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
743                          (last_fish_arrived_at!=0 &&
744                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
745                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
746                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
747                              last_fish_arrived_at,
748                              RtsFlags.ParFlags.fishDelay, now);
749                      });
750         
751         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
752             (last_fish_arrived_at==0 ||
753              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
754           /* outstandingFishes is set in sendFish, processFish;
755              avoid flooding system with fishes via delay */
756           pe = choosePE();
757           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
758                    NEW_FISH_HUNGER);
759
760           // Global statistics: count no. of fishes
761           if (RtsFlags.ParFlags.ParStats.Global &&
762               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
763             globalParStats.tot_fish_mess++;
764           }
765         }
766       
767         receivedFinish = processMessages();
768         goto next_thread;
769       }
770     } else if (PacketsWaiting()) {  /* Look for incoming messages */
771       receivedFinish = processMessages();
772     }
773
774     /* Now we are sure that we have some work available */
775     ASSERT(run_queue_hd != END_TSO_QUEUE);
776
777     /* Take a thread from the run queue, if we have work */
778     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
779     IF_DEBUG(sanity,checkTSO(t));
780
781     /* ToDo: write something to the log-file
782     if (RTSflags.ParFlags.granSimStats && !sameThread)
783         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
784
785     CurrentTSO = t;
786     */
787     /* the spark pool for the current PE */
788     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
789
790     IF_DEBUG(scheduler, 
791              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
792                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
793
794 # if 1
795     if (0 && RtsFlags.ParFlags.ParStats.Full && 
796         t && LastTSO && t->id != LastTSO->id && 
797         LastTSO->why_blocked == NotBlocked && 
798         LastTSO->what_next != ThreadComplete) {
799       // if previously scheduled TSO not blocked we have to record the context switch
800       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
801                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
802     }
803
804     if (RtsFlags.ParFlags.ParStats.Full && 
805         (emitSchedule /* forced emit */ ||
806         (t && LastTSO && t->id != LastTSO->id))) {
807       /* 
808          we are running a different TSO, so write a schedule event to log file
809          NB: If we use fair scheduling we also have to write  a deschedule 
810              event for LastTSO; with unfair scheduling we know that the
811              previous tso has blocked whenever we switch to another tso, so
812              we don't need it in GUM for now
813       */
814       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
816       emitSchedule = rtsFalse;
817     }
818      
819 # endif
820 #else /* !GRAN && !PAR */
821   
822     // grab a thread from the run queue
823     ASSERT(run_queue_hd != END_TSO_QUEUE);
824     POP_RUN_QUEUE(t);
825
826     // Sanity check the thread we're about to run.  This can be
827     // expensive if there is lots of thread switching going on...
828     IF_DEBUG(sanity,checkTSO(t));
829 #endif
830
831 #ifdef THREADED_RTS
832     {
833       StgMainThread *m = t->main;
834       
835       if(m)
836       {
837         if(m == mainThread)
838         {
839           IF_DEBUG(scheduler,
840             sched_belch("### Running thread %d in bound thread", t->id));
841           // yes, the Haskell thread is bound to the current native thread
842         }
843         else
844         {
845           IF_DEBUG(scheduler,
846             sched_belch("### thread %d bound to another OS thread", t->id));
847           // no, bound to a different Haskell thread: pass to that thread
848           PUSH_ON_RUN_QUEUE(t);
849           passCapability(&m->bound_thread_cond);
850           continue;
851         }
852       }
853       else
854       {
855         if(mainThread != NULL)
856         // The thread we want to run is bound.
857         {
858           IF_DEBUG(scheduler,
859             sched_belch("### this OS thread cannot run thread %d", t->id));
860           // no, the current native thread is bound to a different
861           // Haskell thread, so pass it to any worker thread
862           PUSH_ON_RUN_QUEUE(t);
863           passCapabilityToWorker();
864           continue; 
865         }
866       }
867     }
868 #endif
869
870     cap->r.rCurrentTSO = t;
871     
872     /* context switches are now initiated by the timer signal, unless
873      * the user specified "context switch as often as possible", with
874      * +RTS -C0
875      */
876     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
877          && (run_queue_hd != END_TSO_QUEUE
878              || blocked_queue_hd != END_TSO_QUEUE
879              || sleeping_queue != END_TSO_QUEUE)))
880         context_switch = 1;
881
882 run_thread:
883
884     RELEASE_LOCK(&sched_mutex);
885
886     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
887                               (long)t->id, whatNext_strs[t->what_next]));
888
889 #ifdef PROFILING
890     startHeapProfTimer();
891 #endif
892
893     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
894     /* Run the current thread 
895      */
896     prev_what_next = t->what_next;
897
898     errno = t->saved_errno;
899
900     switch (prev_what_next) {
901
902     case ThreadKilled:
903     case ThreadComplete:
904         /* Thread already finished, return to scheduler. */
905         ret = ThreadFinished;
906         break;
907
908     case ThreadRunGHC:
909         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
910         break;
911
912     case ThreadInterpret:
913         ret = interpretBCO(cap);
914         break;
915
916     default:
917       barf("schedule: invalid what_next field");
918     }
919
920     // The TSO might have moved, so find the new location:
921     t = cap->r.rCurrentTSO;
922
923     // And save the current errno in this thread.
924     t->saved_errno = errno;
925
926     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
927     
928     /* Costs for the scheduler are assigned to CCS_SYSTEM */
929 #ifdef PROFILING
930     stopHeapProfTimer();
931     CCCS = CCS_SYSTEM;
932 #endif
933     
934     ACQUIRE_LOCK(&sched_mutex);
935     
936 #ifdef RTS_SUPPORTS_THREADS
937     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
938 #elif !defined(GRAN) && !defined(PAR)
939     IF_DEBUG(scheduler,debugBelch("sched: "););
940 #endif
941     
942 #if defined(PAR)
943     /* HACK 675: if the last thread didn't yield, make sure to print a 
944        SCHEDULE event to the log file when StgRunning the next thread, even
945        if it is the same one as before */
946     LastTSO = t; 
947     TimeOfLastYield = CURRENT_TIME;
948 #endif
949
950     switch (ret) {
951     case HeapOverflow:
952 #if defined(GRAN)
953       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
954       globalGranStats.tot_heapover++;
955 #elif defined(PAR)
956       globalParStats.tot_heapover++;
957 #endif
958
959       // did the task ask for a large block?
960       if (cap->r.rHpAlloc > BLOCK_SIZE) {
961           // if so, get one and push it on the front of the nursery.
962           bdescr *bd;
963           nat blocks;
964           
965           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
966
967           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
968                                    (long)t->id, whatNext_strs[t->what_next], blocks));
969
970           // don't do this if it would push us over the
971           // alloc_blocks_lim limit; we'll GC first.
972           if (alloc_blocks + blocks < alloc_blocks_lim) {
973
974               alloc_blocks += blocks;
975               bd = allocGroup( blocks );
976
977               // link the new group into the list
978               bd->link = cap->r.rCurrentNursery;
979               bd->u.back = cap->r.rCurrentNursery->u.back;
980               if (cap->r.rCurrentNursery->u.back != NULL) {
981                   cap->r.rCurrentNursery->u.back->link = bd;
982               } else {
983                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
984                          g0s0->blocks == cap->r.rNursery);
985                   cap->r.rNursery = g0s0->blocks = bd;
986               }           
987               cap->r.rCurrentNursery->u.back = bd;
988
989               // initialise it as a nursery block.  We initialise the
990               // step, gen_no, and flags field of *every* sub-block in
991               // this large block, because this is easier than making
992               // sure that we always find the block head of a large
993               // block whenever we call Bdescr() (eg. evacuate() and
994               // isAlive() in the GC would both have to do this, at
995               // least).
996               { 
997                   bdescr *x;
998                   for (x = bd; x < bd + blocks; x++) {
999                       x->step = g0s0;
1000                       x->gen_no = 0;
1001                       x->flags = 0;
1002                   }
1003               }
1004
1005               // don't forget to update the block count in g0s0.
1006               g0s0->n_blocks += blocks;
1007               // This assert can be a killer if the app is doing lots
1008               // of large block allocations.
1009               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1010
1011               // now update the nursery to point to the new block
1012               cap->r.rCurrentNursery = bd;
1013
1014               // we might be unlucky and have another thread get on the
1015               // run queue before us and steal the large block, but in that
1016               // case the thread will just end up requesting another large
1017               // block.
1018               PUSH_ON_RUN_QUEUE(t);
1019               break;
1020           }
1021       }
1022
1023       /* make all the running tasks block on a condition variable,
1024        * maybe set context_switch and wait till they all pile in,
1025        * then have them wait on a GC condition variable.
1026        */
1027       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1028                                (long)t->id, whatNext_strs[t->what_next]));
1029       threadPaused(t);
1030 #if defined(GRAN)
1031       ASSERT(!is_on_queue(t,CurrentProc));
1032 #elif defined(PAR)
1033       /* Currently we emit a DESCHEDULE event before GC in GUM.
1034          ToDo: either add separate event to distinguish SYSTEM time from rest
1035                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1036       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1037         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1038                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1039         emitSchedule = rtsTrue;
1040       }
1041 #endif
1042       
1043       ready_to_gc = rtsTrue;
1044       context_switch = 1;               /* stop other threads ASAP */
1045       PUSH_ON_RUN_QUEUE(t);
1046       /* actual GC is done at the end of the while loop */
1047       break;
1048       
1049     case StackOverflow:
1050 #if defined(GRAN)
1051       IF_DEBUG(gran, 
1052                DumpGranEvent(GR_DESCHEDULE, t));
1053       globalGranStats.tot_stackover++;
1054 #elif defined(PAR)
1055       // IF_DEBUG(par, 
1056       // DumpGranEvent(GR_DESCHEDULE, t);
1057       globalParStats.tot_stackover++;
1058 #endif
1059       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1060                                (long)t->id, whatNext_strs[t->what_next]));
1061       /* just adjust the stack for this thread, then pop it back
1062        * on the run queue.
1063        */
1064       threadPaused(t);
1065       { 
1066         /* enlarge the stack */
1067         StgTSO *new_t = threadStackOverflow(t);
1068         
1069         /* This TSO has moved, so update any pointers to it from the
1070          * main thread stack.  It better not be on any other queues...
1071          * (it shouldn't be).
1072          */
1073         if (t->main != NULL) {
1074             t->main->tso = new_t;
1075         }
1076         PUSH_ON_RUN_QUEUE(new_t);
1077       }
1078       break;
1079
1080     case ThreadYielding:
1081       // Reset the context switch flag.  We don't do this just before
1082       // running the thread, because that would mean we would lose ticks
1083       // during GC, which can lead to unfair scheduling (a thread hogs
1084       // the CPU because the tick always arrives during GC).  This way
1085       // penalises threads that do a lot of allocation, but that seems
1086       // better than the alternative.
1087       context_switch = 0;
1088
1089 #if defined(GRAN)
1090       IF_DEBUG(gran, 
1091                DumpGranEvent(GR_DESCHEDULE, t));
1092       globalGranStats.tot_yields++;
1093 #elif defined(PAR)
1094       // IF_DEBUG(par, 
1095       // DumpGranEvent(GR_DESCHEDULE, t);
1096       globalParStats.tot_yields++;
1097 #endif
1098       /* put the thread back on the run queue.  Then, if we're ready to
1099        * GC, check whether this is the last task to stop.  If so, wake
1100        * up the GC thread.  getThread will block during a GC until the
1101        * GC is finished.
1102        */
1103       IF_DEBUG(scheduler,
1104                if (t->what_next != prev_what_next) {
1105                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1106                          (long)t->id, whatNext_strs[t->what_next]);
1107                } else {
1108                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1109                          (long)t->id, whatNext_strs[t->what_next]);
1110                }
1111                );
1112
1113       IF_DEBUG(sanity,
1114                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1115                checkTSO(t));
1116       ASSERT(t->link == END_TSO_QUEUE);
1117
1118       // Shortcut if we're just switching evaluators: don't bother
1119       // doing stack squeezing (which can be expensive), just run the
1120       // thread.
1121       if (t->what_next != prev_what_next) {
1122           goto run_thread;
1123       }
1124
1125       threadPaused(t);
1126
1127 #if defined(GRAN)
1128       ASSERT(!is_on_queue(t,CurrentProc));
1129
1130       IF_DEBUG(sanity,
1131                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1132                checkThreadQsSanity(rtsTrue));
1133 #endif
1134
1135 #if defined(PAR)
1136       if (RtsFlags.ParFlags.doFairScheduling) { 
1137         /* this does round-robin scheduling; good for concurrency */
1138         APPEND_TO_RUN_QUEUE(t);
1139       } else {
1140         /* this does unfair scheduling; good for parallelism */
1141         PUSH_ON_RUN_QUEUE(t);
1142       }
1143 #else
1144       // this does round-robin scheduling; good for concurrency
1145       APPEND_TO_RUN_QUEUE(t);
1146 #endif
1147
1148 #if defined(GRAN)
1149       /* add a ContinueThread event to actually process the thread */
1150       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1151                 ContinueThread,
1152                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1153       IF_GRAN_DEBUG(bq, 
1154                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1155                G_EVENTQ(0);
1156                G_CURR_THREADQ(0));
1157 #endif /* GRAN */
1158       break;
1159
1160     case ThreadBlocked:
1161 #if defined(GRAN)
1162       IF_DEBUG(scheduler,
1163                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1164                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1165                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1166
1167       // ??? needed; should emit block before
1168       IF_DEBUG(gran, 
1169                DumpGranEvent(GR_DESCHEDULE, t)); 
1170       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1171       /*
1172         ngoq Dogh!
1173       ASSERT(procStatus[CurrentProc]==Busy || 
1174               ((procStatus[CurrentProc]==Fetching) && 
1175               (t->block_info.closure!=(StgClosure*)NULL)));
1176       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1177           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1178             procStatus[CurrentProc]==Fetching)) 
1179         procStatus[CurrentProc] = Idle;
1180       */
1181 #elif defined(PAR)
1182       IF_DEBUG(scheduler,
1183                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1184                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1185       IF_PAR_DEBUG(bq,
1186
1187                    if (t->block_info.closure!=(StgClosure*)NULL) 
1188                      print_bq(t->block_info.closure));
1189
1190       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1191       blockThread(t);
1192
1193       /* whatever we schedule next, we must log that schedule */
1194       emitSchedule = rtsTrue;
1195
1196 #else /* !GRAN */
1197       /* don't need to do anything.  Either the thread is blocked on
1198        * I/O, in which case we'll have called addToBlockedQueue
1199        * previously, or it's blocked on an MVar or Blackhole, in which
1200        * case it'll be on the relevant queue already.
1201        */
1202       IF_DEBUG(scheduler,
1203                debugBelch("--<< thread %d (%s) stopped: ", 
1204                        t->id, whatNext_strs[t->what_next]);
1205                printThreadBlockage(t);
1206                debugBelch("\n"));
1207
1208       /* Only for dumping event to log file 
1209          ToDo: do I need this in GranSim, too?
1210       blockThread(t);
1211       */
1212 #endif
1213       threadPaused(t);
1214       break;
1215
1216     case ThreadFinished:
1217       /* Need to check whether this was a main thread, and if so, signal
1218        * the task that started it with the return value.  If we have no
1219        * more main threads, we probably need to stop all the tasks until
1220        * we get a new one.
1221        */
1222       /* We also end up here if the thread kills itself with an
1223        * uncaught exception, see Exception.hc.
1224        */
1225       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1226                                t->id, whatNext_strs[t->what_next]));
1227 #if defined(GRAN)
1228       endThread(t, CurrentProc); // clean-up the thread
1229 #elif defined(PAR)
1230       /* For now all are advisory -- HWL */
1231       //if(t->priority==AdvisoryPriority) ??
1232       advisory_thread_count--;
1233       
1234 # ifdef DIST
1235       if(t->dist.priority==RevalPriority)
1236         FinishReval(t);
1237 # endif
1238       
1239       if (RtsFlags.ParFlags.ParStats.Full &&
1240           !RtsFlags.ParFlags.ParStats.Suppressed) 
1241         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1242 #endif
1243
1244       //
1245       // Check whether the thread that just completed was a main
1246       // thread, and if so return with the result.  
1247       //
1248       // There is an assumption here that all thread completion goes
1249       // through this point; we need to make sure that if a thread
1250       // ends up in the ThreadKilled state, that it stays on the run
1251       // queue so it can be dealt with here.
1252       //
1253       if (
1254 #if defined(RTS_SUPPORTS_THREADS)
1255           mainThread != NULL
1256 #else
1257           mainThread->tso == t
1258 #endif
1259           )
1260       {
1261           // We are a bound thread: this must be our thread that just
1262           // completed.
1263           ASSERT(mainThread->tso == t);
1264
1265           if (t->what_next == ThreadComplete) {
1266               if (mainThread->ret) {
1267                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1268                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1269               }
1270               mainThread->stat = Success;
1271           } else {
1272               if (mainThread->ret) {
1273                   *(mainThread->ret) = NULL;
1274               }
1275               if (was_interrupted) {
1276                   mainThread->stat = Interrupted;
1277               } else {
1278                   mainThread->stat = Killed;
1279               }
1280           }
1281 #ifdef DEBUG
1282           removeThreadLabel((StgWord)mainThread->tso->id);
1283 #endif
1284           if (mainThread->prev == NULL) {
1285               main_threads = mainThread->link;
1286           } else {
1287               mainThread->prev->link = mainThread->link;
1288           }
1289           if (mainThread->link != NULL) {
1290               mainThread->link->prev = NULL;
1291           }
1292           releaseCapability(cap);
1293           return;
1294       }
1295
1296 #ifdef RTS_SUPPORTS_THREADS
1297       ASSERT(t->main == NULL);
1298 #else
1299       if (t->main != NULL) {
1300           // Must be a main thread that is not the topmost one.  Leave
1301           // it on the run queue until the stack has unwound to the
1302           // point where we can deal with this.  Leaving it on the run
1303           // queue also ensures that the garbage collector knows about
1304           // this thread and its return value (it gets dropped from the
1305           // all_threads list so there's no other way to find it).
1306           APPEND_TO_RUN_QUEUE(t);
1307       }
1308 #endif
1309       break;
1310
1311     default:
1312       barf("schedule: invalid thread return code %d", (int)ret);
1313     }
1314
1315 #ifdef PROFILING
1316     // When we have +RTS -i0 and we're heap profiling, do a census at
1317     // every GC.  This lets us get repeatable runs for debugging.
1318     if (performHeapProfile ||
1319         (RtsFlags.ProfFlags.profileInterval==0 &&
1320          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1321         GarbageCollect(GetRoots, rtsTrue);
1322         heapCensus();
1323         performHeapProfile = rtsFalse;
1324         ready_to_gc = rtsFalse; // we already GC'd
1325     }
1326 #endif
1327
1328     if (ready_to_gc) {
1329       /* everybody back, start the GC.
1330        * Could do it in this thread, or signal a condition var
1331        * to do it in another thread.  Either way, we need to
1332        * broadcast on gc_pending_cond afterward.
1333        */
1334 #if defined(RTS_SUPPORTS_THREADS)
1335       IF_DEBUG(scheduler,sched_belch("doing GC"));
1336 #endif
1337       GarbageCollect(GetRoots,rtsFalse);
1338       ready_to_gc = rtsFalse;
1339 #if defined(GRAN)
1340       /* add a ContinueThread event to continue execution of current thread */
1341       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1342                 ContinueThread,
1343                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1344       IF_GRAN_DEBUG(bq, 
1345                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1346                G_EVENTQ(0);
1347                G_CURR_THREADQ(0));
1348 #endif /* GRAN */
1349     }
1350
1351 #if defined(GRAN)
1352   next_thread:
1353     IF_GRAN_DEBUG(unused,
1354                   print_eventq(EventHd));
1355
1356     event = get_next_event();
1357 #elif defined(PAR)
1358   next_thread:
1359     /* ToDo: wait for next message to arrive rather than busy wait */
1360 #endif /* GRAN */
1361
1362   } /* end of while(1) */
1363
1364   IF_PAR_DEBUG(verbose,
1365                debugBelch("== Leaving schedule() after having received Finish\n"));
1366 }
1367
1368 /* ---------------------------------------------------------------------------
1369  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1370  * used by Control.Concurrent for error checking.
1371  * ------------------------------------------------------------------------- */
1372  
1373 StgBool
1374 rtsSupportsBoundThreads(void)
1375 {
1376 #ifdef THREADED_RTS
1377   return rtsTrue;
1378 #else
1379   return rtsFalse;
1380 #endif
1381 }
1382
1383 /* ---------------------------------------------------------------------------
1384  * isThreadBound(tso): check whether tso is bound to an OS thread.
1385  * ------------------------------------------------------------------------- */
1386  
1387 StgBool
1388 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1389 {
1390 #ifdef THREADED_RTS
1391   return (tso->main != NULL);
1392 #endif
1393   return rtsFalse;
1394 }
1395
1396 /* ---------------------------------------------------------------------------
1397  * Singleton fork(). Do not copy any running threads.
1398  * ------------------------------------------------------------------------- */
1399
1400 #ifndef mingw32_TARGET_OS
1401 #define FORKPROCESS_PRIMOP_SUPPORTED
1402 #endif
1403
1404 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1405 static void 
1406 deleteThreadImmediately(StgTSO *tso);
1407 #endif
1408 StgInt
1409 forkProcess(HsStablePtr *entry
1410 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1411             STG_UNUSED
1412 #endif
1413            )
1414 {
1415 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1416   pid_t pid;
1417   StgTSO* t,*next;
1418   StgMainThread *m;
1419   SchedulerStatus rc;
1420
1421   IF_DEBUG(scheduler,sched_belch("forking!"));
1422   rts_lock(); // This not only acquires sched_mutex, it also
1423               // makes sure that no other threads are running
1424
1425   pid = fork();
1426
1427   if (pid) { /* parent */
1428
1429   /* just return the pid */
1430     rts_unlock();
1431     return pid;
1432     
1433   } else { /* child */
1434     
1435     
1436       // delete all threads
1437     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1438     
1439     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1440       next = t->link;
1441
1442         // don't allow threads to catch the ThreadKilled exception
1443       deleteThreadImmediately(t);
1444     }
1445     
1446       // wipe the main thread list
1447     while((m = main_threads) != NULL) {
1448       main_threads = m->link;
1449 # ifdef THREADED_RTS
1450       closeCondition(&m->bound_thread_cond);
1451 # endif
1452       stgFree(m);
1453     }
1454     
1455     rc = rts_evalStableIO(entry, NULL);  // run the action
1456     rts_checkSchedStatus("forkProcess",rc);
1457     
1458     rts_unlock();
1459     
1460     hs_exit();                      // clean up and exit
1461     stg_exit(0);
1462   }
1463 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1464   barf("forkProcess#: primop not supported, sorry!\n");
1465   return -1;
1466 #endif
1467 }
1468
1469 /* ---------------------------------------------------------------------------
1470  * deleteAllThreads():  kill all the live threads.
1471  *
1472  * This is used when we catch a user interrupt (^C), before performing
1473  * any necessary cleanups and running finalizers.
1474  *
1475  * Locks: sched_mutex held.
1476  * ------------------------------------------------------------------------- */
1477    
1478 void
1479 deleteAllThreads ( void )
1480 {
1481   StgTSO* t, *next;
1482   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1483   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1484       next = t->global_link;
1485       deleteThread(t);
1486   }      
1487
1488   // The run queue now contains a bunch of ThreadKilled threads.  We
1489   // must not throw these away: the main thread(s) will be in there
1490   // somewhere, and the main scheduler loop has to deal with it.
1491   // Also, the run queue is the only thing keeping these threads from
1492   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1493
1494   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1495   ASSERT(sleeping_queue == END_TSO_QUEUE);
1496 }
1497
1498 /* startThread and  insertThread are now in GranSim.c -- HWL */
1499
1500
1501 /* ---------------------------------------------------------------------------
1502  * Suspending & resuming Haskell threads.
1503  * 
1504  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1505  * its capability before calling the C function.  This allows another
1506  * task to pick up the capability and carry on running Haskell
1507  * threads.  It also means that if the C call blocks, it won't lock
1508  * the whole system.
1509  *
1510  * The Haskell thread making the C call is put to sleep for the
1511  * duration of the call, on the susepended_ccalling_threads queue.  We
1512  * give out a token to the task, which it can use to resume the thread
1513  * on return from the C function.
1514  * ------------------------------------------------------------------------- */
1515    
1516 StgInt
1517 suspendThread( StgRegTable *reg )
1518 {
1519   nat tok;
1520   Capability *cap;
1521   int saved_errno = errno;
1522
1523   /* assume that *reg is a pointer to the StgRegTable part
1524    * of a Capability.
1525    */
1526   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1527
1528   ACQUIRE_LOCK(&sched_mutex);
1529
1530   IF_DEBUG(scheduler,
1531            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1532
1533   // XXX this might not be necessary --SDM
1534   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1535
1536   threadPaused(cap->r.rCurrentTSO);
1537   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1538   suspended_ccalling_threads = cap->r.rCurrentTSO;
1539
1540   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1541       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1542       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1543   } else {
1544       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1545   }
1546
1547   /* Use the thread ID as the token; it should be unique */
1548   tok = cap->r.rCurrentTSO->id;
1549
1550   /* Hand back capability */
1551   releaseCapability(cap);
1552   
1553 #if defined(RTS_SUPPORTS_THREADS)
1554   /* Preparing to leave the RTS, so ensure there's a native thread/task
1555      waiting to take over.
1556   */
1557   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1558 #endif
1559
1560   RELEASE_LOCK(&sched_mutex);
1561   
1562   errno = saved_errno;
1563   return tok; 
1564 }
1565
1566 StgRegTable *
1567 resumeThread( StgInt tok )
1568 {
1569   StgTSO *tso, **prev;
1570   Capability *cap;
1571   int saved_errno = errno;
1572
1573 #if defined(RTS_SUPPORTS_THREADS)
1574   /* Wait for permission to re-enter the RTS with the result. */
1575   ACQUIRE_LOCK(&sched_mutex);
1576   waitForReturnCapability(&sched_mutex, &cap);
1577
1578   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1579 #else
1580   grabCapability(&cap);
1581 #endif
1582
1583   /* Remove the thread off of the suspended list */
1584   prev = &suspended_ccalling_threads;
1585   for (tso = suspended_ccalling_threads; 
1586        tso != END_TSO_QUEUE; 
1587        prev = &tso->link, tso = tso->link) {
1588     if (tso->id == (StgThreadID)tok) {
1589       *prev = tso->link;
1590       break;
1591     }
1592   }
1593   if (tso == END_TSO_QUEUE) {
1594     barf("resumeThread: thread not found");
1595   }
1596   tso->link = END_TSO_QUEUE;
1597   
1598   if(tso->why_blocked == BlockedOnCCall) {
1599       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1600       tso->blocked_exceptions = NULL;
1601   }
1602   
1603   /* Reset blocking status */
1604   tso->why_blocked  = NotBlocked;
1605
1606   cap->r.rCurrentTSO = tso;
1607   RELEASE_LOCK(&sched_mutex);
1608   errno = saved_errno;
1609   return &cap->r;
1610 }
1611
1612
1613 /* ---------------------------------------------------------------------------
1614  * Static functions
1615  * ------------------------------------------------------------------------ */
1616 static void unblockThread(StgTSO *tso);
1617
1618 /* ---------------------------------------------------------------------------
1619  * Comparing Thread ids.
1620  *
1621  * This is used from STG land in the implementation of the
1622  * instances of Eq/Ord for ThreadIds.
1623  * ------------------------------------------------------------------------ */
1624
1625 int
1626 cmp_thread(StgPtr tso1, StgPtr tso2) 
1627
1628   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1629   StgThreadID id2 = ((StgTSO *)tso2)->id;
1630  
1631   if (id1 < id2) return (-1);
1632   if (id1 > id2) return 1;
1633   return 0;
1634 }
1635
1636 /* ---------------------------------------------------------------------------
1637  * Fetching the ThreadID from an StgTSO.
1638  *
1639  * This is used in the implementation of Show for ThreadIds.
1640  * ------------------------------------------------------------------------ */
1641 int
1642 rts_getThreadId(StgPtr tso) 
1643 {
1644   return ((StgTSO *)tso)->id;
1645 }
1646
1647 #ifdef DEBUG
1648 void
1649 labelThread(StgPtr tso, char *label)
1650 {
1651   int len;
1652   void *buf;
1653
1654   /* Caveat: Once set, you can only set the thread name to "" */
1655   len = strlen(label)+1;
1656   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1657   strncpy(buf,label,len);
1658   /* Update will free the old memory for us */
1659   updateThreadLabel(((StgTSO *)tso)->id,buf);
1660 }
1661 #endif /* DEBUG */
1662
1663 /* ---------------------------------------------------------------------------
1664    Create a new thread.
1665
1666    The new thread starts with the given stack size.  Before the
1667    scheduler can run, however, this thread needs to have a closure
1668    (and possibly some arguments) pushed on its stack.  See
1669    pushClosure() in Schedule.h.
1670
1671    createGenThread() and createIOThread() (in SchedAPI.h) are
1672    convenient packaged versions of this function.
1673
1674    currently pri (priority) is only used in a GRAN setup -- HWL
1675    ------------------------------------------------------------------------ */
1676 #if defined(GRAN)
1677 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1678 StgTSO *
1679 createThread(nat size, StgInt pri)
1680 #else
1681 StgTSO *
1682 createThread(nat size)
1683 #endif
1684 {
1685
1686     StgTSO *tso;
1687     nat stack_size;
1688
1689     /* First check whether we should create a thread at all */
1690 #if defined(PAR)
1691   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1692   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1693     threadsIgnored++;
1694     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1695           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1696     return END_TSO_QUEUE;
1697   }
1698   threadsCreated++;
1699 #endif
1700
1701 #if defined(GRAN)
1702   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1703 #endif
1704
1705   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1706
1707   /* catch ridiculously small stack sizes */
1708   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1709     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1710   }
1711
1712   stack_size = size - TSO_STRUCT_SIZEW;
1713
1714   tso = (StgTSO *)allocate(size);
1715   TICK_ALLOC_TSO(stack_size, 0);
1716
1717   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1718 #if defined(GRAN)
1719   SET_GRAN_HDR(tso, ThisPE);
1720 #endif
1721
1722   // Always start with the compiled code evaluator
1723   tso->what_next = ThreadRunGHC;
1724
1725   tso->id = next_thread_id++; 
1726   tso->why_blocked  = NotBlocked;
1727   tso->blocked_exceptions = NULL;
1728
1729   tso->saved_errno = 0;
1730   tso->main = NULL;
1731   
1732   tso->stack_size   = stack_size;
1733   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1734                               - TSO_STRUCT_SIZEW;
1735   tso->sp           = (P_)&(tso->stack) + stack_size;
1736
1737 #ifdef PROFILING
1738   tso->prof.CCCS = CCS_MAIN;
1739 #endif
1740
1741   /* put a stop frame on the stack */
1742   tso->sp -= sizeofW(StgStopFrame);
1743   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1744   tso->link = END_TSO_QUEUE;
1745
1746   // ToDo: check this
1747 #if defined(GRAN)
1748   /* uses more flexible routine in GranSim */
1749   insertThread(tso, CurrentProc);
1750 #else
1751   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1752    * from its creation
1753    */
1754 #endif
1755
1756 #if defined(GRAN) 
1757   if (RtsFlags.GranFlags.GranSimStats.Full) 
1758     DumpGranEvent(GR_START,tso);
1759 #elif defined(PAR)
1760   if (RtsFlags.ParFlags.ParStats.Full) 
1761     DumpGranEvent(GR_STARTQ,tso);
1762   /* HACk to avoid SCHEDULE 
1763      LastTSO = tso; */
1764 #endif
1765
1766   /* Link the new thread on the global thread list.
1767    */
1768   tso->global_link = all_threads;
1769   all_threads = tso;
1770
1771 #if defined(DIST)
1772   tso->dist.priority = MandatoryPriority; //by default that is...
1773 #endif
1774
1775 #if defined(GRAN)
1776   tso->gran.pri = pri;
1777 # if defined(DEBUG)
1778   tso->gran.magic = TSO_MAGIC; // debugging only
1779 # endif
1780   tso->gran.sparkname   = 0;
1781   tso->gran.startedat   = CURRENT_TIME; 
1782   tso->gran.exported    = 0;
1783   tso->gran.basicblocks = 0;
1784   tso->gran.allocs      = 0;
1785   tso->gran.exectime    = 0;
1786   tso->gran.fetchtime   = 0;
1787   tso->gran.fetchcount  = 0;
1788   tso->gran.blocktime   = 0;
1789   tso->gran.blockcount  = 0;
1790   tso->gran.blockedat   = 0;
1791   tso->gran.globalsparks = 0;
1792   tso->gran.localsparks  = 0;
1793   if (RtsFlags.GranFlags.Light)
1794     tso->gran.clock  = Now; /* local clock */
1795   else
1796     tso->gran.clock  = 0;
1797
1798   IF_DEBUG(gran,printTSO(tso));
1799 #elif defined(PAR)
1800 # if defined(DEBUG)
1801   tso->par.magic = TSO_MAGIC; // debugging only
1802 # endif
1803   tso->par.sparkname   = 0;
1804   tso->par.startedat   = CURRENT_TIME; 
1805   tso->par.exported    = 0;
1806   tso->par.basicblocks = 0;
1807   tso->par.allocs      = 0;
1808   tso->par.exectime    = 0;
1809   tso->par.fetchtime   = 0;
1810   tso->par.fetchcount  = 0;
1811   tso->par.blocktime   = 0;
1812   tso->par.blockcount  = 0;
1813   tso->par.blockedat   = 0;
1814   tso->par.globalsparks = 0;
1815   tso->par.localsparks  = 0;
1816 #endif
1817
1818 #if defined(GRAN)
1819   globalGranStats.tot_threads_created++;
1820   globalGranStats.threads_created_on_PE[CurrentProc]++;
1821   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1822   globalGranStats.tot_sq_probes++;
1823 #elif defined(PAR)
1824   // collect parallel global statistics (currently done together with GC stats)
1825   if (RtsFlags.ParFlags.ParStats.Global &&
1826       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1827     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1828     globalParStats.tot_threads_created++;
1829   }
1830 #endif 
1831
1832 #if defined(GRAN)
1833   IF_GRAN_DEBUG(pri,
1834                 sched_belch("==__ schedule: Created TSO %d (%p);",
1835                       CurrentProc, tso, tso->id));
1836 #elif defined(PAR)
1837     IF_PAR_DEBUG(verbose,
1838                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1839                        (long)tso->id, tso, advisory_thread_count));
1840 #else
1841   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1842                                 (long)tso->id, (long)tso->stack_size));
1843 #endif    
1844   return tso;
1845 }
1846
1847 #if defined(PAR)
1848 /* RFP:
1849    all parallel thread creation calls should fall through the following routine.
1850 */
1851 StgTSO *
1852 createSparkThread(rtsSpark spark) 
1853 { StgTSO *tso;
1854   ASSERT(spark != (rtsSpark)NULL);
1855   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1856   { threadsIgnored++;
1857     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1858           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1859     return END_TSO_QUEUE;
1860   }
1861   else
1862   { threadsCreated++;
1863     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1864     if (tso==END_TSO_QUEUE)     
1865       barf("createSparkThread: Cannot create TSO");
1866 #if defined(DIST)
1867     tso->priority = AdvisoryPriority;
1868 #endif
1869     pushClosure(tso,spark);
1870     PUSH_ON_RUN_QUEUE(tso);
1871     advisory_thread_count++;    
1872   }
1873   return tso;
1874 }
1875 #endif
1876
1877 /*
1878   Turn a spark into a thread.
1879   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1880 */
1881 #if defined(PAR)
1882 StgTSO *
1883 activateSpark (rtsSpark spark) 
1884 {
1885   StgTSO *tso;
1886
1887   tso = createSparkThread(spark);
1888   if (RtsFlags.ParFlags.ParStats.Full) {   
1889     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1890     IF_PAR_DEBUG(verbose,
1891                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1892                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1893   }
1894   // ToDo: fwd info on local/global spark to thread -- HWL
1895   // tso->gran.exported =  spark->exported;
1896   // tso->gran.locked =   !spark->global;
1897   // tso->gran.sparkname = spark->name;
1898
1899   return tso;
1900 }
1901 #endif
1902
1903 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1904                                    Capability *initialCapability
1905                                    );
1906
1907
1908 /* ---------------------------------------------------------------------------
1909  * scheduleThread()
1910  *
1911  * scheduleThread puts a thread on the head of the runnable queue.
1912  * This will usually be done immediately after a thread is created.
1913  * The caller of scheduleThread must create the thread using e.g.
1914  * createThread and push an appropriate closure
1915  * on this thread's stack before the scheduler is invoked.
1916  * ------------------------------------------------------------------------ */
1917
1918 static void scheduleThread_ (StgTSO* tso);
1919
1920 void
1921 scheduleThread_(StgTSO *tso)
1922 {
1923   // The thread goes at the *end* of the run-queue, to avoid possible
1924   // starvation of any threads already on the queue.
1925   APPEND_TO_RUN_QUEUE(tso);
1926   threadRunnable();
1927 }
1928
1929 void
1930 scheduleThread(StgTSO* tso)
1931 {
1932   ACQUIRE_LOCK(&sched_mutex);
1933   scheduleThread_(tso);
1934   RELEASE_LOCK(&sched_mutex);
1935 }
1936
1937 #if defined(RTS_SUPPORTS_THREADS)
1938 static Condition bound_cond_cache;
1939 static int bound_cond_cache_full = 0;
1940 #endif
1941
1942
1943 SchedulerStatus
1944 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1945                    Capability *initialCapability)
1946 {
1947     // Precondition: sched_mutex must be held
1948     StgMainThread *m;
1949
1950     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1951     m->tso = tso;
1952     tso->main = m;
1953     m->ret = ret;
1954     m->stat = NoStatus;
1955     m->link = main_threads;
1956     m->prev = NULL;
1957     if (main_threads != NULL) {
1958         main_threads->prev = m;
1959     }
1960     main_threads = m;
1961
1962 #if defined(RTS_SUPPORTS_THREADS)
1963     // Allocating a new condition for each thread is expensive, so we
1964     // cache one.  This is a pretty feeble hack, but it helps speed up
1965     // consecutive call-ins quite a bit.
1966     if (bound_cond_cache_full) {
1967         m->bound_thread_cond = bound_cond_cache;
1968         bound_cond_cache_full = 0;
1969     } else {
1970         initCondition(&m->bound_thread_cond);
1971     }
1972 #endif
1973
1974     /* Put the thread on the main-threads list prior to scheduling the TSO.
1975        Failure to do so introduces a race condition in the MT case (as
1976        identified by Wolfgang Thaller), whereby the new task/OS thread 
1977        created by scheduleThread_() would complete prior to the thread
1978        that spawned it managed to put 'itself' on the main-threads list.
1979        The upshot of it all being that the worker thread wouldn't get to
1980        signal the completion of the its work item for the main thread to
1981        see (==> it got stuck waiting.)    -- sof 6/02.
1982     */
1983     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1984     
1985     APPEND_TO_RUN_QUEUE(tso);
1986     // NB. Don't call threadRunnable() here, because the thread is
1987     // bound and only runnable by *this* OS thread, so waking up other
1988     // workers will just slow things down.
1989
1990     return waitThread_(m, initialCapability);
1991 }
1992
1993 /* ---------------------------------------------------------------------------
1994  * initScheduler()
1995  *
1996  * Initialise the scheduler.  This resets all the queues - if the
1997  * queues contained any threads, they'll be garbage collected at the
1998  * next pass.
1999  *
2000  * ------------------------------------------------------------------------ */
2001
2002 void 
2003 initScheduler(void)
2004 {
2005 #if defined(GRAN)
2006   nat i;
2007
2008   for (i=0; i<=MAX_PROC; i++) {
2009     run_queue_hds[i]      = END_TSO_QUEUE;
2010     run_queue_tls[i]      = END_TSO_QUEUE;
2011     blocked_queue_hds[i]  = END_TSO_QUEUE;
2012     blocked_queue_tls[i]  = END_TSO_QUEUE;
2013     ccalling_threadss[i]  = END_TSO_QUEUE;
2014     sleeping_queue        = END_TSO_QUEUE;
2015   }
2016 #else
2017   run_queue_hd      = END_TSO_QUEUE;
2018   run_queue_tl      = END_TSO_QUEUE;
2019   blocked_queue_hd  = END_TSO_QUEUE;
2020   blocked_queue_tl  = END_TSO_QUEUE;
2021   sleeping_queue    = END_TSO_QUEUE;
2022 #endif 
2023
2024   suspended_ccalling_threads  = END_TSO_QUEUE;
2025
2026   main_threads = NULL;
2027   all_threads  = END_TSO_QUEUE;
2028
2029   context_switch = 0;
2030   interrupted    = 0;
2031
2032   RtsFlags.ConcFlags.ctxtSwitchTicks =
2033       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2034       
2035 #if defined(RTS_SUPPORTS_THREADS)
2036   /* Initialise the mutex and condition variables used by
2037    * the scheduler. */
2038   initMutex(&sched_mutex);
2039   initMutex(&term_mutex);
2040 #endif
2041   
2042   ACQUIRE_LOCK(&sched_mutex);
2043
2044   /* A capability holds the state a native thread needs in
2045    * order to execute STG code. At least one capability is
2046    * floating around (only SMP builds have more than one).
2047    */
2048   initCapabilities();
2049   
2050 #if defined(RTS_SUPPORTS_THREADS)
2051     /* start our haskell execution tasks */
2052     startTaskManager(0,taskStart);
2053 #endif
2054
2055 #if /* defined(SMP) ||*/ defined(PAR)
2056   initSparkPools();
2057 #endif
2058
2059   RELEASE_LOCK(&sched_mutex);
2060 }
2061
2062 void
2063 exitScheduler( void )
2064 {
2065 #if defined(RTS_SUPPORTS_THREADS)
2066   stopTaskManager();
2067 #endif
2068   shutting_down_scheduler = rtsTrue;
2069 }
2070
2071 /* ----------------------------------------------------------------------------
2072    Managing the per-task allocation areas.
2073    
2074    Each capability comes with an allocation area.  These are
2075    fixed-length block lists into which allocation can be done.
2076
2077    ToDo: no support for two-space collection at the moment???
2078    ------------------------------------------------------------------------- */
2079
2080 static
2081 SchedulerStatus
2082 waitThread_(StgMainThread* m, Capability *initialCapability)
2083 {
2084   SchedulerStatus stat;
2085
2086   // Precondition: sched_mutex must be held.
2087   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2088
2089 #if defined(GRAN)
2090   /* GranSim specific init */
2091   CurrentTSO = m->tso;                // the TSO to run
2092   procStatus[MainProc] = Busy;        // status of main PE
2093   CurrentProc = MainProc;             // PE to run it on
2094   schedule(m,initialCapability);
2095 #else
2096   schedule(m,initialCapability);
2097   ASSERT(m->stat != NoStatus);
2098 #endif
2099
2100   stat = m->stat;
2101
2102 #if defined(RTS_SUPPORTS_THREADS)
2103   // Free the condition variable, returning it to the cache if possible.
2104   if (!bound_cond_cache_full) {
2105       bound_cond_cache = m->bound_thread_cond;
2106       bound_cond_cache_full = 1;
2107   } else {
2108       closeCondition(&m->bound_thread_cond);
2109   }
2110 #endif
2111
2112   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2113   stgFree(m);
2114
2115   // Postcondition: sched_mutex still held
2116   return stat;
2117 }
2118
2119 /* ---------------------------------------------------------------------------
2120    Where are the roots that we know about?
2121
2122         - all the threads on the runnable queue
2123         - all the threads on the blocked queue
2124         - all the threads on the sleeping queue
2125         - all the thread currently executing a _ccall_GC
2126         - all the "main threads"
2127      
2128    ------------------------------------------------------------------------ */
2129
2130 /* This has to be protected either by the scheduler monitor, or by the
2131         garbage collection monitor (probably the latter).
2132         KH @ 25/10/99
2133 */
2134
2135 void
2136 GetRoots( evac_fn evac )
2137 {
2138 #if defined(GRAN)
2139   {
2140     nat i;
2141     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2142       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2143           evac((StgClosure **)&run_queue_hds[i]);
2144       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2145           evac((StgClosure **)&run_queue_tls[i]);
2146       
2147       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2148           evac((StgClosure **)&blocked_queue_hds[i]);
2149       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2150           evac((StgClosure **)&blocked_queue_tls[i]);
2151       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2152           evac((StgClosure **)&ccalling_threads[i]);
2153     }
2154   }
2155
2156   markEventQueue();
2157
2158 #else /* !GRAN */
2159   if (run_queue_hd != END_TSO_QUEUE) {
2160       ASSERT(run_queue_tl != END_TSO_QUEUE);
2161       evac((StgClosure **)&run_queue_hd);
2162       evac((StgClosure **)&run_queue_tl);
2163   }
2164   
2165   if (blocked_queue_hd != END_TSO_QUEUE) {
2166       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2167       evac((StgClosure **)&blocked_queue_hd);
2168       evac((StgClosure **)&blocked_queue_tl);
2169   }
2170   
2171   if (sleeping_queue != END_TSO_QUEUE) {
2172       evac((StgClosure **)&sleeping_queue);
2173   }
2174 #endif 
2175
2176   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2177       evac((StgClosure **)&suspended_ccalling_threads);
2178   }
2179
2180 #if defined(PAR) || defined(GRAN)
2181   markSparkQueue(evac);
2182 #endif
2183
2184 #if defined(RTS_USER_SIGNALS)
2185   // mark the signal handlers (signals should be already blocked)
2186   markSignalHandlers(evac);
2187 #endif
2188 }
2189
2190 /* -----------------------------------------------------------------------------
2191    performGC
2192
2193    This is the interface to the garbage collector from Haskell land.
2194    We provide this so that external C code can allocate and garbage
2195    collect when called from Haskell via _ccall_GC.
2196
2197    It might be useful to provide an interface whereby the programmer
2198    can specify more roots (ToDo).
2199    
2200    This needs to be protected by the GC condition variable above.  KH.
2201    -------------------------------------------------------------------------- */
2202
2203 static void (*extra_roots)(evac_fn);
2204
2205 void
2206 performGC(void)
2207 {
2208   /* Obligated to hold this lock upon entry */
2209   ACQUIRE_LOCK(&sched_mutex);
2210   GarbageCollect(GetRoots,rtsFalse);
2211   RELEASE_LOCK(&sched_mutex);
2212 }
2213
2214 void
2215 performMajorGC(void)
2216 {
2217   ACQUIRE_LOCK(&sched_mutex);
2218   GarbageCollect(GetRoots,rtsTrue);
2219   RELEASE_LOCK(&sched_mutex);
2220 }
2221
2222 static void
2223 AllRoots(evac_fn evac)
2224 {
2225     GetRoots(evac);             // the scheduler's roots
2226     extra_roots(evac);          // the user's roots
2227 }
2228
2229 void
2230 performGCWithRoots(void (*get_roots)(evac_fn))
2231 {
2232   ACQUIRE_LOCK(&sched_mutex);
2233   extra_roots = get_roots;
2234   GarbageCollect(AllRoots,rtsFalse);
2235   RELEASE_LOCK(&sched_mutex);
2236 }
2237
2238 /* -----------------------------------------------------------------------------
2239    Stack overflow
2240
2241    If the thread has reached its maximum stack size, then raise the
2242    StackOverflow exception in the offending thread.  Otherwise
2243    relocate the TSO into a larger chunk of memory and adjust its stack
2244    size appropriately.
2245    -------------------------------------------------------------------------- */
2246
2247 static StgTSO *
2248 threadStackOverflow(StgTSO *tso)
2249 {
2250   nat new_stack_size, new_tso_size, stack_words;
2251   StgPtr new_sp;
2252   StgTSO *dest;
2253
2254   IF_DEBUG(sanity,checkTSO(tso));
2255   if (tso->stack_size >= tso->max_stack_size) {
2256
2257     IF_DEBUG(gc,
2258              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2259                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2260              /* If we're debugging, just print out the top of the stack */
2261              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2262                                               tso->sp+64)));
2263
2264     /* Send this thread the StackOverflow exception */
2265     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2266     return tso;
2267   }
2268
2269   /* Try to double the current stack size.  If that takes us over the
2270    * maximum stack size for this thread, then use the maximum instead.
2271    * Finally round up so the TSO ends up as a whole number of blocks.
2272    */
2273   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2274   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2275                                        TSO_STRUCT_SIZE)/sizeof(W_);
2276   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2277   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2278
2279   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2280
2281   dest = (StgTSO *)allocate(new_tso_size);
2282   TICK_ALLOC_TSO(new_stack_size,0);
2283
2284   /* copy the TSO block and the old stack into the new area */
2285   memcpy(dest,tso,TSO_STRUCT_SIZE);
2286   stack_words = tso->stack + tso->stack_size - tso->sp;
2287   new_sp = (P_)dest + new_tso_size - stack_words;
2288   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2289
2290   /* relocate the stack pointers... */
2291   dest->sp         = new_sp;
2292   dest->stack_size = new_stack_size;
2293         
2294   /* Mark the old TSO as relocated.  We have to check for relocated
2295    * TSOs in the garbage collector and any primops that deal with TSOs.
2296    *
2297    * It's important to set the sp value to just beyond the end
2298    * of the stack, so we don't attempt to scavenge any part of the
2299    * dead TSO's stack.
2300    */
2301   tso->what_next = ThreadRelocated;
2302   tso->link = dest;
2303   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2304   tso->why_blocked = NotBlocked;
2305   dest->mut_link = NULL;
2306
2307   IF_PAR_DEBUG(verbose,
2308                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2309                      tso->id, tso, tso->stack_size);
2310                /* If we're debugging, just print out the top of the stack */
2311                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2312                                                 tso->sp+64)));
2313   
2314   IF_DEBUG(sanity,checkTSO(tso));
2315 #if 0
2316   IF_DEBUG(scheduler,printTSO(dest));
2317 #endif
2318
2319   return dest;
2320 }
2321
2322 /* ---------------------------------------------------------------------------
2323    Wake up a queue that was blocked on some resource.
2324    ------------------------------------------------------------------------ */
2325
2326 #if defined(GRAN)
2327 STATIC_INLINE void
2328 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2329 {
2330 }
2331 #elif defined(PAR)
2332 STATIC_INLINE void
2333 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2334 {
2335   /* write RESUME events to log file and
2336      update blocked and fetch time (depending on type of the orig closure) */
2337   if (RtsFlags.ParFlags.ParStats.Full) {
2338     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2339                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2340                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2341     if (EMPTY_RUN_QUEUE())
2342       emitSchedule = rtsTrue;
2343
2344     switch (get_itbl(node)->type) {
2345         case FETCH_ME_BQ:
2346           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2347           break;
2348         case RBH:
2349         case FETCH_ME:
2350         case BLACKHOLE_BQ:
2351           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2352           break;
2353 #ifdef DIST
2354         case MVAR:
2355           break;
2356 #endif    
2357         default:
2358           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2359         }
2360       }
2361 }
2362 #endif
2363
2364 #if defined(GRAN)
2365 static StgBlockingQueueElement *
2366 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2367 {
2368     StgTSO *tso;
2369     PEs node_loc, tso_loc;
2370
2371     node_loc = where_is(node); // should be lifted out of loop
2372     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2373     tso_loc = where_is((StgClosure *)tso);
2374     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2375       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2376       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2377       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2378       // insertThread(tso, node_loc);
2379       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2380                 ResumeThread,
2381                 tso, node, (rtsSpark*)NULL);
2382       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2383       // len_local++;
2384       // len++;
2385     } else { // TSO is remote (actually should be FMBQ)
2386       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2387                                   RtsFlags.GranFlags.Costs.gunblocktime +
2388                                   RtsFlags.GranFlags.Costs.latency;
2389       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2390                 UnblockThread,
2391                 tso, node, (rtsSpark*)NULL);
2392       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2393       // len++;
2394     }
2395     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2396     IF_GRAN_DEBUG(bq,
2397                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2398                           (node_loc==tso_loc ? "Local" : "Global"), 
2399                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2400     tso->block_info.closure = NULL;
2401     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2402                              tso->id, tso));
2403 }
2404 #elif defined(PAR)
2405 static StgBlockingQueueElement *
2406 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2407 {
2408     StgBlockingQueueElement *next;
2409
2410     switch (get_itbl(bqe)->type) {
2411     case TSO:
2412       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2413       /* if it's a TSO just push it onto the run_queue */
2414       next = bqe->link;
2415       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2416       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2417       threadRunnable();
2418       unblockCount(bqe, node);
2419       /* reset blocking status after dumping event */
2420       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2421       break;
2422
2423     case BLOCKED_FETCH:
2424       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2425       next = bqe->link;
2426       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2427       PendingFetches = (StgBlockedFetch *)bqe;
2428       break;
2429
2430 # if defined(DEBUG)
2431       /* can ignore this case in a non-debugging setup; 
2432          see comments on RBHSave closures above */
2433     case CONSTR:
2434       /* check that the closure is an RBHSave closure */
2435       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2436              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2437              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2438       break;
2439
2440     default:
2441       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2442            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2443            (StgClosure *)bqe);
2444 # endif
2445     }
2446   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2447   return next;
2448 }
2449
2450 #else /* !GRAN && !PAR */
2451 static StgTSO *
2452 unblockOneLocked(StgTSO *tso)
2453 {
2454   StgTSO *next;
2455
2456   ASSERT(get_itbl(tso)->type == TSO);
2457   ASSERT(tso->why_blocked != NotBlocked);
2458   tso->why_blocked = NotBlocked;
2459   next = tso->link;
2460   tso->link = END_TSO_QUEUE;
2461   APPEND_TO_RUN_QUEUE(tso);
2462   threadRunnable();
2463   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2464   return next;
2465 }
2466 #endif
2467
2468 #if defined(GRAN) || defined(PAR)
2469 INLINE_ME StgBlockingQueueElement *
2470 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2471 {
2472   ACQUIRE_LOCK(&sched_mutex);
2473   bqe = unblockOneLocked(bqe, node);
2474   RELEASE_LOCK(&sched_mutex);
2475   return bqe;
2476 }
2477 #else
2478 INLINE_ME StgTSO *
2479 unblockOne(StgTSO *tso)
2480 {
2481   ACQUIRE_LOCK(&sched_mutex);
2482   tso = unblockOneLocked(tso);
2483   RELEASE_LOCK(&sched_mutex);
2484   return tso;
2485 }
2486 #endif
2487
2488 #if defined(GRAN)
2489 void 
2490 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2491 {
2492   StgBlockingQueueElement *bqe;
2493   PEs node_loc;
2494   nat len = 0; 
2495
2496   IF_GRAN_DEBUG(bq, 
2497                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2498                       node, CurrentProc, CurrentTime[CurrentProc], 
2499                       CurrentTSO->id, CurrentTSO));
2500
2501   node_loc = where_is(node);
2502
2503   ASSERT(q == END_BQ_QUEUE ||
2504          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2505          get_itbl(q)->type == CONSTR); // closure (type constructor)
2506   ASSERT(is_unique(node));
2507
2508   /* FAKE FETCH: magically copy the node to the tso's proc;
2509      no Fetch necessary because in reality the node should not have been 
2510      moved to the other PE in the first place
2511   */
2512   if (CurrentProc!=node_loc) {
2513     IF_GRAN_DEBUG(bq, 
2514                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2515                         node, node_loc, CurrentProc, CurrentTSO->id, 
2516                         // CurrentTSO, where_is(CurrentTSO),
2517                         node->header.gran.procs));
2518     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2519     IF_GRAN_DEBUG(bq, 
2520                   debugBelch("## new bitmask of node %p is %#x\n",
2521                         node, node->header.gran.procs));
2522     if (RtsFlags.GranFlags.GranSimStats.Global) {
2523       globalGranStats.tot_fake_fetches++;
2524     }
2525   }
2526
2527   bqe = q;
2528   // ToDo: check: ASSERT(CurrentProc==node_loc);
2529   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2530     //next = bqe->link;
2531     /* 
2532        bqe points to the current element in the queue
2533        next points to the next element in the queue
2534     */
2535     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2536     //tso_loc = where_is(tso);
2537     len++;
2538     bqe = unblockOneLocked(bqe, node);
2539   }
2540
2541   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2542      the closure to make room for the anchor of the BQ */
2543   if (bqe!=END_BQ_QUEUE) {
2544     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2545     /*
2546     ASSERT((info_ptr==&RBH_Save_0_info) ||
2547            (info_ptr==&RBH_Save_1_info) ||
2548            (info_ptr==&RBH_Save_2_info));
2549     */
2550     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2551     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2552     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2553
2554     IF_GRAN_DEBUG(bq,
2555                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2556                         node, info_type(node)));
2557   }
2558
2559   /* statistics gathering */
2560   if (RtsFlags.GranFlags.GranSimStats.Global) {
2561     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2562     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2563     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2564     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2565   }
2566   IF_GRAN_DEBUG(bq,
2567                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2568                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2569 }
2570 #elif defined(PAR)
2571 void 
2572 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2573 {
2574   StgBlockingQueueElement *bqe;
2575
2576   ACQUIRE_LOCK(&sched_mutex);
2577
2578   IF_PAR_DEBUG(verbose, 
2579                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2580                      node, mytid));
2581 #ifdef DIST  
2582   //RFP
2583   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2584     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2585     return;
2586   }
2587 #endif
2588   
2589   ASSERT(q == END_BQ_QUEUE ||
2590          get_itbl(q)->type == TSO ||           
2591          get_itbl(q)->type == BLOCKED_FETCH || 
2592          get_itbl(q)->type == CONSTR); 
2593
2594   bqe = q;
2595   while (get_itbl(bqe)->type==TSO || 
2596          get_itbl(bqe)->type==BLOCKED_FETCH) {
2597     bqe = unblockOneLocked(bqe, node);
2598   }
2599   RELEASE_LOCK(&sched_mutex);
2600 }
2601
2602 #else   /* !GRAN && !PAR */
2603
2604 void
2605 awakenBlockedQueueNoLock(StgTSO *tso)
2606 {
2607   while (tso != END_TSO_QUEUE) {
2608     tso = unblockOneLocked(tso);
2609   }
2610 }
2611
2612 void
2613 awakenBlockedQueue(StgTSO *tso)
2614 {
2615   ACQUIRE_LOCK(&sched_mutex);
2616   while (tso != END_TSO_QUEUE) {
2617     tso = unblockOneLocked(tso);
2618   }
2619   RELEASE_LOCK(&sched_mutex);
2620 }
2621 #endif
2622
2623 /* ---------------------------------------------------------------------------
2624    Interrupt execution
2625    - usually called inside a signal handler so it mustn't do anything fancy.   
2626    ------------------------------------------------------------------------ */
2627
2628 void
2629 interruptStgRts(void)
2630 {
2631     interrupted    = 1;
2632     context_switch = 1;
2633 }
2634
2635 /* -----------------------------------------------------------------------------
2636    Unblock a thread
2637
2638    This is for use when we raise an exception in another thread, which
2639    may be blocked.
2640    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2641    -------------------------------------------------------------------------- */
2642
2643 #if defined(GRAN) || defined(PAR)
2644 /*
2645   NB: only the type of the blocking queue is different in GranSim and GUM
2646       the operations on the queue-elements are the same
2647       long live polymorphism!
2648
2649   Locks: sched_mutex is held upon entry and exit.
2650
2651 */
2652 static void
2653 unblockThread(StgTSO *tso)
2654 {
2655   StgBlockingQueueElement *t, **last;
2656
2657   switch (tso->why_blocked) {
2658
2659   case NotBlocked:
2660     return;  /* not blocked */
2661
2662   case BlockedOnMVar:
2663     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2664     {
2665       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2666       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2667
2668       last = (StgBlockingQueueElement **)&mvar->head;
2669       for (t = (StgBlockingQueueElement *)mvar->head; 
2670            t != END_BQ_QUEUE; 
2671            last = &t->link, last_tso = t, t = t->link) {
2672         if (t == (StgBlockingQueueElement *)tso) {
2673           *last = (StgBlockingQueueElement *)tso->link;
2674           if (mvar->tail == tso) {
2675             mvar->tail = (StgTSO *)last_tso;
2676           }
2677           goto done;
2678         }
2679       }
2680       barf("unblockThread (MVAR): TSO not found");
2681     }
2682
2683   case BlockedOnBlackHole:
2684     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2685     {
2686       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2687
2688       last = &bq->blocking_queue;
2689       for (t = bq->blocking_queue; 
2690            t != END_BQ_QUEUE; 
2691            last = &t->link, t = t->link) {
2692         if (t == (StgBlockingQueueElement *)tso) {
2693           *last = (StgBlockingQueueElement *)tso->link;
2694           goto done;
2695         }
2696       }
2697       barf("unblockThread (BLACKHOLE): TSO not found");
2698     }
2699
2700   case BlockedOnException:
2701     {
2702       StgTSO *target  = tso->block_info.tso;
2703
2704       ASSERT(get_itbl(target)->type == TSO);
2705
2706       if (target->what_next == ThreadRelocated) {
2707           target = target->link;
2708           ASSERT(get_itbl(target)->type == TSO);
2709       }
2710
2711       ASSERT(target->blocked_exceptions != NULL);
2712
2713       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2714       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2715            t != END_BQ_QUEUE; 
2716            last = &t->link, t = t->link) {
2717         ASSERT(get_itbl(t)->type == TSO);
2718         if (t == (StgBlockingQueueElement *)tso) {
2719           *last = (StgBlockingQueueElement *)tso->link;
2720           goto done;
2721         }
2722       }
2723       barf("unblockThread (Exception): TSO not found");
2724     }
2725
2726   case BlockedOnRead:
2727   case BlockedOnWrite:
2728 #if defined(mingw32_TARGET_OS)
2729   case BlockedOnDoProc:
2730 #endif
2731     {
2732       /* take TSO off blocked_queue */
2733       StgBlockingQueueElement *prev = NULL;
2734       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2735            prev = t, t = t->link) {
2736         if (t == (StgBlockingQueueElement *)tso) {
2737           if (prev == NULL) {
2738             blocked_queue_hd = (StgTSO *)t->link;
2739             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2740               blocked_queue_tl = END_TSO_QUEUE;
2741             }
2742           } else {
2743             prev->link = t->link;
2744             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2745               blocked_queue_tl = (StgTSO *)prev;
2746             }
2747           }
2748           goto done;
2749         }
2750       }
2751       barf("unblockThread (I/O): TSO not found");
2752     }
2753
2754   case BlockedOnDelay:
2755     {
2756       /* take TSO off sleeping_queue */
2757       StgBlockingQueueElement *prev = NULL;
2758       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2759            prev = t, t = t->link) {
2760         if (t == (StgBlockingQueueElement *)tso) {
2761           if (prev == NULL) {
2762             sleeping_queue = (StgTSO *)t->link;
2763           } else {
2764             prev->link = t->link;
2765           }
2766           goto done;
2767         }
2768       }
2769       barf("unblockThread (delay): TSO not found");
2770     }
2771
2772   default:
2773     barf("unblockThread");
2774   }
2775
2776  done:
2777   tso->link = END_TSO_QUEUE;
2778   tso->why_blocked = NotBlocked;
2779   tso->block_info.closure = NULL;
2780   PUSH_ON_RUN_QUEUE(tso);
2781 }
2782 #else
2783 static void
2784 unblockThread(StgTSO *tso)
2785 {
2786   StgTSO *t, **last;
2787   
2788   /* To avoid locking unnecessarily. */
2789   if (tso->why_blocked == NotBlocked) {
2790     return;
2791   }
2792
2793   switch (tso->why_blocked) {
2794
2795   case BlockedOnMVar:
2796     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2797     {
2798       StgTSO *last_tso = END_TSO_QUEUE;
2799       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2800
2801       last = &mvar->head;
2802       for (t = mvar->head; t != END_TSO_QUEUE; 
2803            last = &t->link, last_tso = t, t = t->link) {
2804         if (t == tso) {
2805           *last = tso->link;
2806           if (mvar->tail == tso) {
2807             mvar->tail = last_tso;
2808           }
2809           goto done;
2810         }
2811       }
2812       barf("unblockThread (MVAR): TSO not found");
2813     }
2814
2815   case BlockedOnBlackHole:
2816     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2817     {
2818       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2819
2820       last = &bq->blocking_queue;
2821       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2822            last = &t->link, t = t->link) {
2823         if (t == tso) {
2824           *last = tso->link;
2825           goto done;
2826         }
2827       }
2828       barf("unblockThread (BLACKHOLE): TSO not found");
2829     }
2830
2831   case BlockedOnException:
2832     {
2833       StgTSO *target  = tso->block_info.tso;
2834
2835       ASSERT(get_itbl(target)->type == TSO);
2836
2837       while (target->what_next == ThreadRelocated) {
2838           target = target->link;
2839           ASSERT(get_itbl(target)->type == TSO);
2840       }
2841       
2842       ASSERT(target->blocked_exceptions != NULL);
2843
2844       last = &target->blocked_exceptions;
2845       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2846            last = &t->link, t = t->link) {
2847         ASSERT(get_itbl(t)->type == TSO);
2848         if (t == tso) {
2849           *last = tso->link;
2850           goto done;
2851         }
2852       }
2853       barf("unblockThread (Exception): TSO not found");
2854     }
2855
2856   case BlockedOnRead:
2857   case BlockedOnWrite:
2858 #if defined(mingw32_TARGET_OS)
2859   case BlockedOnDoProc:
2860 #endif
2861     {
2862       StgTSO *prev = NULL;
2863       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2864            prev = t, t = t->link) {
2865         if (t == tso) {
2866           if (prev == NULL) {
2867             blocked_queue_hd = t->link;
2868             if (blocked_queue_tl == t) {
2869               blocked_queue_tl = END_TSO_QUEUE;
2870             }
2871           } else {
2872             prev->link = t->link;
2873             if (blocked_queue_tl == t) {
2874               blocked_queue_tl = prev;
2875             }
2876           }
2877           goto done;
2878         }
2879       }
2880       barf("unblockThread (I/O): TSO not found");
2881     }
2882
2883   case BlockedOnDelay:
2884     {
2885       StgTSO *prev = NULL;
2886       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2887            prev = t, t = t->link) {
2888         if (t == tso) {
2889           if (prev == NULL) {
2890             sleeping_queue = t->link;
2891           } else {
2892             prev->link = t->link;
2893           }
2894           goto done;
2895         }
2896       }
2897       barf("unblockThread (delay): TSO not found");
2898     }
2899
2900   default:
2901     barf("unblockThread");
2902   }
2903
2904  done:
2905   tso->link = END_TSO_QUEUE;
2906   tso->why_blocked = NotBlocked;
2907   tso->block_info.closure = NULL;
2908   APPEND_TO_RUN_QUEUE(tso);
2909 }
2910 #endif
2911
2912 /* -----------------------------------------------------------------------------
2913  * raiseAsync()
2914  *
2915  * The following function implements the magic for raising an
2916  * asynchronous exception in an existing thread.
2917  *
2918  * We first remove the thread from any queue on which it might be
2919  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2920  *
2921  * We strip the stack down to the innermost CATCH_FRAME, building
2922  * thunks in the heap for all the active computations, so they can 
2923  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2924  * an application of the handler to the exception, and push it on
2925  * the top of the stack.
2926  * 
2927  * How exactly do we save all the active computations?  We create an
2928  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2929  * AP_STACKs pushes everything from the corresponding update frame
2930  * upwards onto the stack.  (Actually, it pushes everything up to the
2931  * next update frame plus a pointer to the next AP_STACK object.
2932  * Entering the next AP_STACK object pushes more onto the stack until we
2933  * reach the last AP_STACK object - at which point the stack should look
2934  * exactly as it did when we killed the TSO and we can continue
2935  * execution by entering the closure on top of the stack.
2936  *
2937  * We can also kill a thread entirely - this happens if either (a) the 
2938  * exception passed to raiseAsync is NULL, or (b) there's no
2939  * CATCH_FRAME on the stack.  In either case, we strip the entire
2940  * stack and replace the thread with a zombie.
2941  *
2942  * Locks: sched_mutex held upon entry nor exit.
2943  *
2944  * -------------------------------------------------------------------------- */
2945  
2946 void 
2947 deleteThread(StgTSO *tso)
2948 {
2949   raiseAsync(tso,NULL);
2950 }
2951
2952 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2953 static void 
2954 deleteThreadImmediately(StgTSO *tso)
2955 { // for forkProcess only:
2956   // delete thread without giving it a chance to catch the KillThread exception
2957
2958   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2959       return;
2960   }
2961
2962   if (tso->why_blocked != BlockedOnCCall &&
2963       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2964     unblockThread(tso);
2965   }
2966
2967   tso->what_next = ThreadKilled;
2968 }
2969 #endif
2970
2971 void
2972 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2973 {
2974   /* When raising async exs from contexts where sched_mutex isn't held;
2975      use raiseAsyncWithLock(). */
2976   ACQUIRE_LOCK(&sched_mutex);
2977   raiseAsync(tso,exception);
2978   RELEASE_LOCK(&sched_mutex);
2979 }
2980
2981 void
2982 raiseAsync(StgTSO *tso, StgClosure *exception)
2983 {
2984     StgRetInfoTable *info;
2985     StgPtr sp;
2986   
2987     // Thread already dead?
2988     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2989         return;
2990     }
2991
2992     IF_DEBUG(scheduler, 
2993              sched_belch("raising exception in thread %ld.", (long)tso->id));
2994     
2995     // Remove it from any blocking queues
2996     unblockThread(tso);
2997
2998     sp = tso->sp;
2999     
3000     // The stack freezing code assumes there's a closure pointer on
3001     // the top of the stack, so we have to arrange that this is the case...
3002     //
3003     if (sp[0] == (W_)&stg_enter_info) {
3004         sp++;
3005     } else {
3006         sp--;
3007         sp[0] = (W_)&stg_dummy_ret_closure;
3008     }
3009
3010     while (1) {
3011         nat i;
3012
3013         // 1. Let the top of the stack be the "current closure"
3014         //
3015         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3016         // CATCH_FRAME.
3017         //
3018         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3019         // current closure applied to the chunk of stack up to (but not
3020         // including) the update frame.  This closure becomes the "current
3021         // closure".  Go back to step 2.
3022         //
3023         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3024         // top of the stack applied to the exception.
3025         // 
3026         // 5. If it's a STOP_FRAME, then kill the thread.
3027         
3028         StgPtr frame;
3029         
3030         frame = sp + 1;
3031         info = get_ret_itbl((StgClosure *)frame);
3032         
3033         while (info->i.type != UPDATE_FRAME
3034                && (info->i.type != CATCH_FRAME || exception == NULL)
3035                && info->i.type != STOP_FRAME) {
3036             frame += stack_frame_sizeW((StgClosure *)frame);
3037             info = get_ret_itbl((StgClosure *)frame);
3038         }
3039         
3040         switch (info->i.type) {
3041             
3042         case CATCH_FRAME:
3043             // If we find a CATCH_FRAME, and we've got an exception to raise,
3044             // then build the THUNK raise(exception), and leave it on
3045             // top of the CATCH_FRAME ready to enter.
3046             //
3047         {
3048 #ifdef PROFILING
3049             StgCatchFrame *cf = (StgCatchFrame *)frame;
3050 #endif
3051             StgClosure *raise;
3052             
3053             // we've got an exception to raise, so let's pass it to the
3054             // handler in this frame.
3055             //
3056             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3057             TICK_ALLOC_SE_THK(1,0);
3058             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3059             raise->payload[0] = exception;
3060             
3061             // throw away the stack from Sp up to the CATCH_FRAME.
3062             //
3063             sp = frame - 1;
3064             
3065             /* Ensure that async excpetions are blocked now, so we don't get
3066              * a surprise exception before we get around to executing the
3067              * handler.
3068              */
3069             if (tso->blocked_exceptions == NULL) {
3070                 tso->blocked_exceptions = END_TSO_QUEUE;
3071             }
3072             
3073             /* Put the newly-built THUNK on top of the stack, ready to execute
3074              * when the thread restarts.
3075              */
3076             sp[0] = (W_)raise;
3077             sp[-1] = (W_)&stg_enter_info;
3078             tso->sp = sp-1;
3079             tso->what_next = ThreadRunGHC;
3080             IF_DEBUG(sanity, checkTSO(tso));
3081             return;
3082         }
3083         
3084         case UPDATE_FRAME:
3085         {
3086             StgAP_STACK * ap;
3087             nat words;
3088             
3089             // First build an AP_STACK consisting of the stack chunk above the
3090             // current update frame, with the top word on the stack as the
3091             // fun field.
3092             //
3093             words = frame - sp - 1;
3094             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3095             
3096             ap->size = words;
3097             ap->fun  = (StgClosure *)sp[0];
3098             sp++;
3099             for(i=0; i < (nat)words; ++i) {
3100                 ap->payload[i] = (StgClosure *)*sp++;
3101             }
3102             
3103             SET_HDR(ap,&stg_AP_STACK_info,
3104                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3105             TICK_ALLOC_UP_THK(words+1,0);
3106             
3107             IF_DEBUG(scheduler,
3108                      debugBelch("sched: Updating ");
3109                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3110                      debugBelch(" with ");
3111                      printObj((StgClosure *)ap);
3112                 );
3113
3114             // Replace the updatee with an indirection - happily
3115             // this will also wake up any threads currently
3116             // waiting on the result.
3117             //
3118             // Warning: if we're in a loop, more than one update frame on
3119             // the stack may point to the same object.  Be careful not to
3120             // overwrite an IND_OLDGEN in this case, because we'll screw
3121             // up the mutable lists.  To be on the safe side, don't
3122             // overwrite any kind of indirection at all.  See also
3123             // threadSqueezeStack in GC.c, where we have to make a similar
3124             // check.
3125             //
3126             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3127                 // revert the black hole
3128                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3129                                (StgClosure *)ap);
3130             }
3131             sp += sizeofW(StgUpdateFrame) - 1;
3132             sp[0] = (W_)ap; // push onto stack
3133             break;
3134         }
3135         
3136         case STOP_FRAME:
3137             // We've stripped the entire stack, the thread is now dead.
3138             sp += sizeofW(StgStopFrame);
3139             tso->what_next = ThreadKilled;
3140             tso->sp = sp;
3141             return;
3142             
3143         default:
3144             barf("raiseAsync");
3145         }
3146     }
3147     barf("raiseAsync");
3148 }
3149
3150 /* -----------------------------------------------------------------------------
3151    raiseExceptionHelper
3152    
3153    This function is called by the raise# primitve, just so that we can
3154    move some of the tricky bits of raising an exception from C-- into
3155    C.  Who knows, it might be a useful re-useable thing here too.
3156    -------------------------------------------------------------------------- */
3157
3158 StgWord
3159 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3160 {
3161     StgClosure *raise_closure = NULL;
3162     StgPtr p, next;
3163     StgRetInfoTable *info;
3164     //
3165     // This closure represents the expression 'raise# E' where E
3166     // is the exception raise.  It is used to overwrite all the
3167     // thunks which are currently under evaluataion.
3168     //
3169
3170     //    
3171     // LDV profiling: stg_raise_info has THUNK as its closure
3172     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3173     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3174     // 1 does not cause any problem unless profiling is performed.
3175     // However, when LDV profiling goes on, we need to linearly scan
3176     // small object pool, where raise_closure is stored, so we should
3177     // use MIN_UPD_SIZE.
3178     //
3179     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3180     //                                 sizeofW(StgClosure)+1);
3181     //
3182
3183     //
3184     // Walk up the stack, looking for the catch frame.  On the way,
3185     // we update any closures pointed to from update frames with the
3186     // raise closure that we just built.
3187     //
3188     p = tso->sp;
3189     while(1) {
3190         info = get_ret_itbl((StgClosure *)p);
3191         next = p + stack_frame_sizeW((StgClosure *)p);
3192         switch (info->i.type) {
3193             
3194         case UPDATE_FRAME:
3195             // Only create raise_closure if we need to.
3196             if (raise_closure == NULL) {
3197                 raise_closure = 
3198                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3199                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3200                 raise_closure->payload[0] = exception;
3201             }
3202             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3203             p = next;
3204             continue;
3205             
3206         case CATCH_FRAME:
3207             tso->sp = p;
3208             return CATCH_FRAME;
3209             
3210         case STOP_FRAME:
3211             tso->sp = p;
3212             return STOP_FRAME;
3213
3214         default:
3215             p = next; 
3216             continue;
3217         }
3218     }
3219 }
3220
3221 /* -----------------------------------------------------------------------------
3222    resurrectThreads is called after garbage collection on the list of
3223    threads found to be garbage.  Each of these threads will be woken
3224    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3225    on an MVar, or NonTermination if the thread was blocked on a Black
3226    Hole.
3227
3228    Locks: sched_mutex isn't held upon entry nor exit.
3229    -------------------------------------------------------------------------- */
3230
3231 void
3232 resurrectThreads( StgTSO *threads )
3233 {
3234   StgTSO *tso, *next;
3235
3236   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3237     next = tso->global_link;
3238     tso->global_link = all_threads;
3239     all_threads = tso;
3240     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3241
3242     switch (tso->why_blocked) {
3243     case BlockedOnMVar:
3244     case BlockedOnException:
3245       /* Called by GC - sched_mutex lock is currently held. */
3246       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3247       break;
3248     case BlockedOnBlackHole:
3249       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3250       break;
3251     case NotBlocked:
3252       /* This might happen if the thread was blocked on a black hole
3253        * belonging to a thread that we've just woken up (raiseAsync
3254        * can wake up threads, remember...).
3255        */
3256       continue;
3257     default:
3258       barf("resurrectThreads: thread blocked in a strange way");
3259     }
3260   }
3261 }
3262
3263 /* ----------------------------------------------------------------------------
3264  * Debugging: why is a thread blocked
3265  * [Also provides useful information when debugging threaded programs
3266  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3267    ------------------------------------------------------------------------- */
3268
3269 static
3270 void
3271 printThreadBlockage(StgTSO *tso)
3272 {
3273   switch (tso->why_blocked) {
3274   case BlockedOnRead:
3275     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3276     break;
3277   case BlockedOnWrite:
3278     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3279     break;
3280 #if defined(mingw32_TARGET_OS)
3281     case BlockedOnDoProc:
3282     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3283     break;
3284 #endif
3285   case BlockedOnDelay:
3286     debugBelch("is blocked until %d", tso->block_info.target);
3287     break;
3288   case BlockedOnMVar:
3289     debugBelch("is blocked on an MVar");
3290     break;
3291   case BlockedOnException:
3292     debugBelch("is blocked on delivering an exception to thread %d",
3293             tso->block_info.tso->id);
3294     break;
3295   case BlockedOnBlackHole:
3296     debugBelch("is blocked on a black hole");
3297     break;
3298   case NotBlocked:
3299     debugBelch("is not blocked");
3300     break;
3301 #if defined(PAR)
3302   case BlockedOnGA:
3303     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3304             tso->block_info.closure, info_type(tso->block_info.closure));
3305     break;
3306   case BlockedOnGA_NoSend:
3307     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3308             tso->block_info.closure, info_type(tso->block_info.closure));
3309     break;
3310 #endif
3311   case BlockedOnCCall:
3312     debugBelch("is blocked on an external call");
3313     break;
3314   case BlockedOnCCall_NoUnblockExc:
3315     debugBelch("is blocked on an external call (exceptions were already blocked)");
3316     break;
3317   default:
3318     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3319          tso->why_blocked, tso->id, tso);
3320   }
3321 }
3322
3323 static
3324 void
3325 printThreadStatus(StgTSO *tso)
3326 {
3327   switch (tso->what_next) {
3328   case ThreadKilled:
3329     debugBelch("has been killed");
3330     break;
3331   case ThreadComplete:
3332     debugBelch("has completed");
3333     break;
3334   default:
3335     printThreadBlockage(tso);
3336   }
3337 }
3338
3339 void
3340 printAllThreads(void)
3341 {
3342   StgTSO *t;
3343
3344 # if defined(GRAN)
3345   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3346   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3347                        time_string, rtsFalse/*no commas!*/);
3348
3349   debugBelch("all threads at [%s]:\n", time_string);
3350 # elif defined(PAR)
3351   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3352   ullong_format_string(CURRENT_TIME,
3353                        time_string, rtsFalse/*no commas!*/);
3354
3355   debugBelch("all threads at [%s]:\n", time_string);
3356 # else
3357   debugBelch("all threads:\n");
3358 # endif
3359
3360   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3361     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3362 #if defined(DEBUG)
3363     {
3364       void *label = lookupThreadLabel(t->id);
3365       if (label) debugBelch("[\"%s\"] ",(char *)label);
3366     }
3367 #endif
3368     printThreadStatus(t);
3369     debugBelch("\n");
3370   }
3371 }
3372     
3373 #ifdef DEBUG
3374
3375 /* 
3376    Print a whole blocking queue attached to node (debugging only).
3377 */
3378 # if defined(PAR)
3379 void 
3380 print_bq (StgClosure *node)
3381 {
3382   StgBlockingQueueElement *bqe;
3383   StgTSO *tso;
3384   rtsBool end;
3385
3386   debugBelch("## BQ of closure %p (%s): ",
3387           node, info_type(node));
3388
3389   /* should cover all closures that may have a blocking queue */
3390   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3391          get_itbl(node)->type == FETCH_ME_BQ ||
3392          get_itbl(node)->type == RBH ||
3393          get_itbl(node)->type == MVAR);
3394     
3395   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3396
3397   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3398 }
3399
3400 /* 
3401    Print a whole blocking queue starting with the element bqe.
3402 */
3403 void 
3404 print_bqe (StgBlockingQueueElement *bqe)
3405 {
3406   rtsBool end;
3407
3408   /* 
3409      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3410   */
3411   for (end = (bqe==END_BQ_QUEUE);
3412        !end; // iterate until bqe points to a CONSTR
3413        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3414        bqe = end ? END_BQ_QUEUE : bqe->link) {
3415     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3416     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3417     /* types of closures that may appear in a blocking queue */
3418     ASSERT(get_itbl(bqe)->type == TSO ||           
3419            get_itbl(bqe)->type == BLOCKED_FETCH || 
3420            get_itbl(bqe)->type == CONSTR); 
3421     /* only BQs of an RBH end with an RBH_Save closure */
3422     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3423
3424     switch (get_itbl(bqe)->type) {
3425     case TSO:
3426       debugBelch(" TSO %u (%x),",
3427               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3428       break;
3429     case BLOCKED_FETCH:
3430       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3431               ((StgBlockedFetch *)bqe)->node, 
3432               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3433               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3434               ((StgBlockedFetch *)bqe)->ga.weight);
3435       break;
3436     case CONSTR:
3437       debugBelch(" %s (IP %p),",
3438               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3439                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3440                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3441                "RBH_Save_?"), get_itbl(bqe));
3442       break;
3443     default:
3444       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3445            info_type((StgClosure *)bqe)); // , node, info_type(node));
3446       break;
3447     }
3448   } /* for */
3449   debugBelch("\n");
3450 }
3451 # elif defined(GRAN)
3452 void 
3453 print_bq (StgClosure *node)
3454 {
3455   StgBlockingQueueElement *bqe;
3456   PEs node_loc, tso_loc;
3457   rtsBool end;
3458
3459   /* should cover all closures that may have a blocking queue */
3460   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3461          get_itbl(node)->type == FETCH_ME_BQ ||
3462          get_itbl(node)->type == RBH);
3463     
3464   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3465   node_loc = where_is(node);
3466
3467   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3468           node, info_type(node), node_loc);
3469
3470   /* 
3471      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3472   */
3473   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3474        !end; // iterate until bqe points to a CONSTR
3475        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3476     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3477     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3478     /* types of closures that may appear in a blocking queue */
3479     ASSERT(get_itbl(bqe)->type == TSO ||           
3480            get_itbl(bqe)->type == CONSTR); 
3481     /* only BQs of an RBH end with an RBH_Save closure */
3482     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3483
3484     tso_loc = where_is((StgClosure *)bqe);
3485     switch (get_itbl(bqe)->type) {
3486     case TSO:
3487       debugBelch(" TSO %d (%p) on [PE %d],",
3488               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3489       break;
3490     case CONSTR:
3491       debugBelch(" %s (IP %p),",
3492               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3493                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3494                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3495                "RBH_Save_?"), get_itbl(bqe));
3496       break;
3497     default:
3498       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3499            info_type((StgClosure *)bqe), node, info_type(node));
3500       break;
3501     }
3502   } /* for */
3503   debugBelch("\n");
3504 }
3505 #else
3506 /* 
3507    Nice and easy: only TSOs on the blocking queue
3508 */
3509 void 
3510 print_bq (StgClosure *node)
3511 {
3512   StgTSO *tso;
3513
3514   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3515   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3516        tso != END_TSO_QUEUE; 
3517        tso=tso->link) {
3518     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3519     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3520     debugBelch(" TSO %d (%p),", tso->id, tso);
3521   }
3522   debugBelch("\n");
3523 }
3524 # endif
3525
3526 #if defined(PAR)
3527 static nat
3528 run_queue_len(void)
3529 {
3530   nat i;
3531   StgTSO *tso;
3532
3533   for (i=0, tso=run_queue_hd; 
3534        tso != END_TSO_QUEUE;
3535        i++, tso=tso->link)
3536     /* nothing */
3537
3538   return i;
3539 }
3540 #endif
3541
3542 void
3543 sched_belch(char *s, ...)
3544 {
3545   va_list ap;
3546   va_start(ap,s);
3547 #ifdef RTS_SUPPORTS_THREADS
3548   debugBelch("sched (task %p): ", osThreadId());
3549 #elif defined(PAR)
3550   debugBelch("== ");
3551 #else
3552   debugBelch("sched: ");
3553 #endif
3554   vdebugBelch(s, ap);
3555   debugBelch("\n");
3556   va_end(ap);
3557 }
3558
3559 #endif /* DEBUG */