[project @ 2004-09-03 15:28:18 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "Timer.h"
59 #include "Prelude.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
62 #include "Updates.h"
63 #ifdef PROFILING
64 #include "Proftimer.h"
65 #include "ProfHeap.h"
66 #endif
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
69 # include "GranSim.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
73 # include "FetchMe.h"
74 # include "HLC.h"
75 #endif
76 #include "Sparks.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 #ifdef THREADED_RTS
97 #define USED_IN_THREADED_RTS
98 #else
99 #define USED_IN_THREADED_RTS STG_UNUSED
100 #endif
101
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #else
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
106 #endif
107
108 /* Main thread queue.
109  * Locks required: sched_mutex.
110  */
111 StgMainThread *main_threads = NULL;
112
113 /* Thread queues.
114  * Locks required: sched_mutex.
115  */
116 #if defined(GRAN)
117
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
120
121 /* 
122    In GranSim we have a runnable and a blocked queue for each processor.
123    In order to minimise code changes new arrays run_queue_hds/tls
124    are created. run_queue_hd is then a short cut (macro) for
125    run_queue_hds[CurrentProc] (see GranSim.h).
126    -- HWL
127 */
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132    the std RTS (i.e. we are cheating). However, we don't use this list in
133    the GranSim specific code at the moment (so we are only potentially
134    cheating).  */
135
136 #else /* !GRAN */
137
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
143
144 #endif
145
146 /* Linked list of all threads.
147  * Used for detecting garbage collected threads.
148  */
149 StgTSO *all_threads = NULL;
150
151 /* When a thread performs a safe C call (_ccall_GC, using old
152  * terminology), it gets put on the suspended_ccalling_threads
153  * list. Used by the garbage collector.
154  */
155 static StgTSO *suspended_ccalling_threads;
156
157 static StgTSO *threadStackOverflow(StgTSO *tso);
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 nat context_switch = 0;
166
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
169
170 /* Next thread ID to allocate.
171  * Locks required: thread_id_mutex
172  */
173 static StgThreadID next_thread_id = 1;
174
175 /*
176  * Pointers to the state of the current thread.
177  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
179  */
180  
181 /* The smallest stack size that makes any sense is:
182  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
183  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
184  *  + 1                       (the closure to enter)
185  *  + 1                       (stg_ap_v_ret)
186  *  + 1                       (spare slot req'd by stg_ap_v_ret)
187  *
188  * A thread with this stack will bomb immediately with a stack
189  * overflow, which will increase its stack size.  
190  */
191
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
193
194
195 #if defined(GRAN)
196 StgTSO *CurrentTSO;
197 #endif
198
199 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
200  *  exists - earlier gccs apparently didn't.
201  *  -= chak
202  */
203 StgTSO dummy_tso;
204
205 static rtsBool ready_to_gc;
206
207 /*
208  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209  * in an MT setting, needed to signal that a worker thread shouldn't hang around
210  * in the scheduler when it is out of work.
211  */
212 static rtsBool shutting_down_scheduler = rtsFalse;
213
214 void            addToBlockedQueue ( StgTSO *tso );
215
216 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
217        void     interruptStgRts   ( void );
218
219 static void     detectBlackHoles  ( void );
220
221 #if defined(RTS_SUPPORTS_THREADS)
222 /* ToDo: carefully document the invariants that go together
223  *       with these synchronisation objects.
224  */
225 Mutex     sched_mutex       = INIT_MUTEX_VAR;
226 Mutex     term_mutex        = INIT_MUTEX_VAR;
227
228 #endif /* RTS_SUPPORTS_THREADS */
229
230 #if defined(PAR)
231 StgTSO *LastTSO;
232 rtsTime TimeOfLastYield;
233 rtsBool emitSchedule = rtsTrue;
234 #endif
235
236 #if DEBUG
237 static char *whatNext_strs[] = {
238   "(unknown)",
239   "ThreadRunGHC",
240   "ThreadInterpret",
241   "ThreadKilled",
242   "ThreadRelocated",
243   "ThreadComplete"
244 };
245 #endif
246
247 #if defined(PAR)
248 StgTSO * createSparkThread(rtsSpark spark);
249 StgTSO * activateSpark (rtsSpark spark);  
250 #endif
251
252 /* ----------------------------------------------------------------------------
253  * Starting Tasks
254  * ------------------------------------------------------------------------- */
255
256 #if defined(RTS_SUPPORTS_THREADS)
257 static rtsBool startingWorkerThread = rtsFalse;
258
259 static void taskStart(void);
260 static void
261 taskStart(void)
262 {
263   ACQUIRE_LOCK(&sched_mutex);
264   startingWorkerThread = rtsFalse;
265   schedule(NULL,NULL);
266   RELEASE_LOCK(&sched_mutex);
267 }
268
269 void
270 startSchedulerTaskIfNecessary(void)
271 {
272   if(run_queue_hd != END_TSO_QUEUE
273     || blocked_queue_hd != END_TSO_QUEUE
274     || sleeping_queue != END_TSO_QUEUE)
275   {
276     if(!startingWorkerThread)
277     { // we don't want to start another worker thread
278       // just because the last one hasn't yet reached the
279       // "waiting for capability" state
280       startingWorkerThread = rtsTrue;
281       if(!startTask(taskStart))
282       {
283         startingWorkerThread = rtsFalse;
284       }
285     }
286   }
287 }
288 #endif
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    Locking notes:  we acquire the scheduler lock once at the beginning
303    of the scheduler loop, and release it when
304     
305       * running a thread, or
306       * waiting for work, or
307       * waiting for a GC to complete.
308
309    GRAN version:
310      In a GranSim setup this loop iterates over the global event queue.
311      This revolves around the global event queue, which determines what 
312      to do next. Therefore, it's more complicated than either the 
313      concurrent or the parallel (GUM) setup.
314
315    GUM version:
316      GUM iterates over incoming messages.
317      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318      and sends out a fish whenever it has nothing to do; in-between
319      doing the actual reductions (shared code below) it processes the
320      incoming messages and deals with delayed operations 
321      (see PendingFetches).
322      This is not the ugliest code you could imagine, but it's bloody close.
323
324    ------------------------------------------------------------------------ */
325 static void
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327           Capability *initialCapability )
328 {
329   StgTSO *t;
330   Capability *cap;
331   StgThreadReturnCode ret;
332 #if defined(GRAN)
333   rtsEvent *event;
334 #elif defined(PAR)
335   StgSparkPool *pool;
336   rtsSpark spark;
337   StgTSO *tso;
338   GlobalTaskId pe;
339   rtsBool receivedFinish = rtsFalse;
340 # if defined(DEBUG)
341   nat tp_size, sp_size; // stats only
342 # endif
343 #endif
344   rtsBool was_interrupted = rtsFalse;
345   nat prev_what_next;
346   
347   // Pre-condition: sched_mutex is held.
348   // We might have a capability, passed in as initialCapability.
349   cap = initialCapability;
350
351 #if defined(RTS_SUPPORTS_THREADS)
352   //
353   // in the threaded case, the capability is either passed in via the
354   // initialCapability parameter, or initialized inside the scheduler
355   // loop 
356   //
357   IF_DEBUG(scheduler,
358            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
359                        mainThread, initialCapability);
360       );
361 #else
362   // simply initialise it in the non-threaded case
363   grabCapability(&cap);
364 #endif
365
366 #if defined(GRAN)
367   /* set up first event to get things going */
368   /* ToDo: assign costs for system setup and init MainTSO ! */
369   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
370             ContinueThread, 
371             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
372
373   IF_DEBUG(gran,
374            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
375            G_TSO(CurrentTSO, 5));
376
377   if (RtsFlags.GranFlags.Light) {
378     /* Save current time; GranSim Light only */
379     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
380   }      
381
382   event = get_next_event();
383
384   while (event!=(rtsEvent*)NULL) {
385     /* Choose the processor with the next event */
386     CurrentProc = event->proc;
387     CurrentTSO = event->tso;
388
389 #elif defined(PAR)
390
391   while (!receivedFinish) {    /* set by processMessages */
392                                /* when receiving PP_FINISH message         */ 
393
394 #else // everything except GRAN and PAR
395
396   while (1) {
397
398 #endif
399
400      IF_DEBUG(scheduler, printAllThreads());
401
402 #if defined(RTS_SUPPORTS_THREADS)
403       // Yield the capability to higher-priority tasks if necessary.
404       //
405       if (cap != NULL) {
406           yieldCapability(&cap);
407       }
408
409       // If we do not currently hold a capability, we wait for one
410       //
411       if (cap == NULL) {
412           waitForCapability(&sched_mutex, &cap,
413                             mainThread ? &mainThread->bound_thread_cond : NULL);
414       }
415
416       // We now have a capability...
417 #endif
418
419     //
420     // If we're interrupted (the user pressed ^C, or some other
421     // termination condition occurred), kill all the currently running
422     // threads.
423     //
424     if (interrupted) {
425         IF_DEBUG(scheduler, sched_belch("interrupted"));
426         interrupted = rtsFalse;
427         was_interrupted = rtsTrue;
428 #if defined(RTS_SUPPORTS_THREADS)
429         // In the threaded RTS, deadlock detection doesn't work,
430         // so just exit right away.
431         errorBelch("interrupted");
432         releaseCapability(cap);
433         RELEASE_LOCK(&sched_mutex);
434         shutdownHaskellAndExit(EXIT_SUCCESS);
435 #else
436         deleteAllThreads();
437 #endif
438     }
439
440 #if defined(RTS_USER_SIGNALS)
441     // check for signals each time around the scheduler
442     if (signals_pending()) {
443       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
444       startSignalHandlers();
445       ACQUIRE_LOCK(&sched_mutex);
446     }
447 #endif
448
449     //
450     // Check whether any waiting threads need to be woken up.  If the
451     // run queue is empty, and there are no other tasks running, we
452     // can wait indefinitely for something to happen.
453     //
454     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
455 #if defined(RTS_SUPPORTS_THREADS)
456                 || EMPTY_RUN_QUEUE()
457 #endif
458         )
459     {
460       awaitEvent( EMPTY_RUN_QUEUE() );
461     }
462     // we can be interrupted while waiting for I/O...
463     if (interrupted) continue;
464
465     /* 
466      * Detect deadlock: when we have no threads to run, there are no
467      * threads waiting on I/O or sleeping, and all the other tasks are
468      * waiting for work, we must have a deadlock of some description.
469      *
470      * We first try to find threads blocked on themselves (ie. black
471      * holes), and generate NonTermination exceptions where necessary.
472      *
473      * If no threads are black holed, we have a deadlock situation, so
474      * inform all the main threads.
475      */
476 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
477     if (   EMPTY_THREAD_QUEUES() )
478     {
479         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
480         // Garbage collection can release some new threads due to
481         // either (a) finalizers or (b) threads resurrected because
482         // they are about to be send BlockedOnDeadMVar.  Any threads
483         // thus released will be immediately runnable.
484         GarbageCollect(GetRoots,rtsTrue);
485
486         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
487
488         IF_DEBUG(scheduler, 
489                  sched_belch("still deadlocked, checking for black holes..."));
490         detectBlackHoles();
491
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS)
546     if ( EMPTY_RUN_QUEUE() ) {
547         continue; // nothing to do
548     }
549 #endif
550
551 #if defined(GRAN)
552     if (RtsFlags.GranFlags.Light)
553       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
554
555     /* adjust time based on time-stamp */
556     if (event->time > CurrentTime[CurrentProc] &&
557         event->evttype != ContinueThread)
558       CurrentTime[CurrentProc] = event->time;
559     
560     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561     if (!RtsFlags.GranFlags.Light)
562       handleIdlePEs();
563
564     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
565
566     /* main event dispatcher in GranSim */
567     switch (event->evttype) {
568       /* Should just be continuing execution */
569     case ContinueThread:
570       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
571       /* ToDo: check assertion
572       ASSERT(run_queue_hd != (StgTSO*)NULL &&
573              run_queue_hd != END_TSO_QUEUE);
574       */
575       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576       if (!RtsFlags.GranFlags.DoAsyncFetch &&
577           procStatus[CurrentProc]==Fetching) {
578         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
579               CurrentTSO->id, CurrentTSO, CurrentProc);
580         goto next_thread;
581       } 
582       /* Ignore ContinueThreads for completed threads */
583       if (CurrentTSO->what_next == ThreadComplete) {
584         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
585               CurrentTSO->id, CurrentTSO, CurrentProc);
586         goto next_thread;
587       } 
588       /* Ignore ContinueThreads for threads that are being migrated */
589       if (PROCS(CurrentTSO)==Nowhere) { 
590         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
591               CurrentTSO->id, CurrentTSO, CurrentProc);
592         goto next_thread;
593       }
594       /* The thread should be at the beginning of the run queue */
595       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
596         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
597               CurrentTSO->id, CurrentTSO, CurrentProc);
598         break; // run the thread anyway
599       }
600       /*
601       new_event(proc, proc, CurrentTime[proc],
602                 FindWork,
603                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
604       goto next_thread; 
605       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606       break; // now actually run the thread; DaH Qu'vam yImuHbej 
607
608     case FetchNode:
609       do_the_fetchnode(event);
610       goto next_thread;             /* handle next event in event queue  */
611       
612     case GlobalBlock:
613       do_the_globalblock(event);
614       goto next_thread;             /* handle next event in event queue  */
615       
616     case FetchReply:
617       do_the_fetchreply(event);
618       goto next_thread;             /* handle next event in event queue  */
619       
620     case UnblockThread:   /* Move from the blocked queue to the tail of */
621       do_the_unblock(event);
622       goto next_thread;             /* handle next event in event queue  */
623       
624     case ResumeThread:  /* Move from the blocked queue to the tail of */
625       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
626       event->tso->gran.blocktime += 
627         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628       do_the_startthread(event);
629       goto next_thread;             /* handle next event in event queue  */
630       
631     case StartThread:
632       do_the_startthread(event);
633       goto next_thread;             /* handle next event in event queue  */
634       
635     case MoveThread:
636       do_the_movethread(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case MoveSpark:
640       do_the_movespark(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FindWork:
644       do_the_findwork(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     default:
648       barf("Illegal event type %u\n", event->evttype);
649     }  /* switch */
650     
651     /* This point was scheduler_loop in the old RTS */
652
653     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
654
655     TimeOfLastEvent = CurrentTime[CurrentProc];
656     TimeOfNextEvent = get_time_of_next_event();
657     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658     // CurrentTSO = ThreadQueueHd;
659
660     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
661                          TimeOfNextEvent));
662
663     if (RtsFlags.GranFlags.Light) 
664       GranSimLight_leave_system(event, &ActiveTSO); 
665
666     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667
668     IF_DEBUG(gran, 
669              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
670
671     /* in a GranSim setup the TSO stays on the run queue */
672     t = CurrentTSO;
673     /* Take a thread from the run queue. */
674     POP_RUN_QUEUE(t); // take_off_run_queue(t);
675
676     IF_DEBUG(gran, 
677              debugBelch("GRAN: About to run current thread, which is\n");
678              G_TSO(t,5));
679
680     context_switch = 0; // turned on via GranYield, checking events and time slice
681
682     IF_DEBUG(gran, 
683              DumpGranEvent(GR_SCHEDULE, t));
684
685     procStatus[CurrentProc] = Busy;
686
687 #elif defined(PAR)
688     if (PendingFetches != END_BF_QUEUE) {
689         processFetches();
690     }
691
692     /* ToDo: phps merge with spark activation above */
693     /* check whether we have local work and send requests if we have none */
694     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
695       /* :-[  no local threads => look out for local sparks */
696       /* the spark pool for the current PE */
697       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699           pool->hd < pool->tl) {
700         /* 
701          * ToDo: add GC code check that we really have enough heap afterwards!!
702          * Old comment:
703          * If we're here (no runnable threads) and we have pending
704          * sparks, we must have a space problem.  Get enough space
705          * to turn one of those pending sparks into a
706          * thread... 
707          */
708
709         spark = findSpark(rtsFalse);                /* get a spark */
710         if (spark != (rtsSpark) NULL) {
711           tso = activateSpark(spark);       /* turn the spark into a thread */
712           IF_PAR_DEBUG(schedule,
713                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
714                              tso->id, tso, advisory_thread_count));
715
716           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717             debugBelch("==^^ failed to activate spark\n");
718             goto next_thread;
719           }               /* otherwise fall through & pick-up new tso */
720         } else {
721           IF_PAR_DEBUG(verbose,
722                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
723                              spark_queue_len(pool)));
724           goto next_thread;
725         }
726       }
727
728       /* If we still have no work we need to send a FISH to get a spark
729          from another PE 
730       */
731       if (EMPTY_RUN_QUEUE()) {
732       /* =8-[  no local sparks => look for work on other PEs */
733         /*
734          * We really have absolutely no work.  Send out a fish
735          * (there may be some out there already), and wait for
736          * something to arrive.  We clearly can't run any threads
737          * until a SCHEDULE or RESUME arrives, and so that's what
738          * we're hoping to see.  (Of course, we still have to
739          * respond to other types of messages.)
740          */
741         TIME now = msTime() /*CURRENT_TIME*/;
742         IF_PAR_DEBUG(verbose, 
743                      debugBelch("--  now=%ld\n", now));
744         IF_PAR_DEBUG(verbose,
745                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746                          (last_fish_arrived_at!=0 &&
747                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
749                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750                              last_fish_arrived_at,
751                              RtsFlags.ParFlags.fishDelay, now);
752                      });
753         
754         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755             (last_fish_arrived_at==0 ||
756              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757           /* outstandingFishes is set in sendFish, processFish;
758              avoid flooding system with fishes via delay */
759           pe = choosePE();
760           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
761                    NEW_FISH_HUNGER);
762
763           // Global statistics: count no. of fishes
764           if (RtsFlags.ParFlags.ParStats.Global &&
765               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766             globalParStats.tot_fish_mess++;
767           }
768         }
769       
770         receivedFinish = processMessages();
771         goto next_thread;
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       receivedFinish = processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779
780     /* Take a thread from the run queue, if we have work */
781     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
782     IF_DEBUG(sanity,checkTSO(t));
783
784     /* ToDo: write something to the log-file
785     if (RTSflags.ParFlags.granSimStats && !sameThread)
786         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787
788     CurrentTSO = t;
789     */
790     /* the spark pool for the current PE */
791     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792
793     IF_DEBUG(scheduler, 
794              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
795                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796
797 # if 1
798     if (0 && RtsFlags.ParFlags.ParStats.Full && 
799         t && LastTSO && t->id != LastTSO->id && 
800         LastTSO->why_blocked == NotBlocked && 
801         LastTSO->what_next != ThreadComplete) {
802       // if previously scheduled TSO not blocked we have to record the context switch
803       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805     }
806
807     if (RtsFlags.ParFlags.ParStats.Full && 
808         (emitSchedule /* forced emit */ ||
809         (t && LastTSO && t->id != LastTSO->id))) {
810       /* 
811          we are running a different TSO, so write a schedule event to log file
812          NB: If we use fair scheduling we also have to write  a deschedule 
813              event for LastTSO; with unfair scheduling we know that the
814              previous tso has blocked whenever we switch to another tso, so
815              we don't need it in GUM for now
816       */
817       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819       emitSchedule = rtsFalse;
820     }
821      
822 # endif
823 #else /* !GRAN && !PAR */
824   
825     // grab a thread from the run queue
826     ASSERT(run_queue_hd != END_TSO_QUEUE);
827     POP_RUN_QUEUE(t);
828
829     // Sanity check the thread we're about to run.  This can be
830     // expensive if there is lots of thread switching going on...
831     IF_DEBUG(sanity,checkTSO(t));
832 #endif
833
834 #ifdef THREADED_RTS
835     {
836       StgMainThread *m = t->main;
837       
838       if(m)
839       {
840         if(m == mainThread)
841         {
842           IF_DEBUG(scheduler,
843             sched_belch("### Running thread %d in bound thread", t->id));
844           // yes, the Haskell thread is bound to the current native thread
845         }
846         else
847         {
848           IF_DEBUG(scheduler,
849             sched_belch("### thread %d bound to another OS thread", t->id));
850           // no, bound to a different Haskell thread: pass to that thread
851           PUSH_ON_RUN_QUEUE(t);
852           passCapability(&m->bound_thread_cond);
853           continue;
854         }
855       }
856       else
857       {
858         if(mainThread != NULL)
859         // The thread we want to run is bound.
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### this OS thread cannot run thread %d", t->id));
863           // no, the current native thread is bound to a different
864           // Haskell thread, so pass it to any worker thread
865           PUSH_ON_RUN_QUEUE(t);
866           passCapabilityToWorker();
867           continue; 
868         }
869       }
870     }
871 #endif
872
873     cap->r.rCurrentTSO = t;
874     
875     /* context switches are now initiated by the timer signal, unless
876      * the user specified "context switch as often as possible", with
877      * +RTS -C0
878      */
879     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880          && (run_queue_hd != END_TSO_QUEUE
881              || blocked_queue_hd != END_TSO_QUEUE
882              || sleeping_queue != END_TSO_QUEUE)))
883         context_switch = 1;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               (long)t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,debugBelch("sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
971                                    (long)t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1031                                (long)t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1063                                (long)t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084       // Reset the context switch flag.  We don't do this just before
1085       // running the thread, because that would mean we would lose ticks
1086       // during GC, which can lead to unfair scheduling (a thread hogs
1087       // the CPU because the tick always arrives during GC).  This way
1088       // penalises threads that do a lot of allocation, but that seems
1089       // better than the alternative.
1090       context_switch = 0;
1091
1092 #if defined(GRAN)
1093       IF_DEBUG(gran, 
1094                DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_yields++;
1096 #elif defined(PAR)
1097       // IF_DEBUG(par, 
1098       // DumpGranEvent(GR_DESCHEDULE, t);
1099       globalParStats.tot_yields++;
1100 #endif
1101       /* put the thread back on the run queue.  Then, if we're ready to
1102        * GC, check whether this is the last task to stop.  If so, wake
1103        * up the GC thread.  getThread will block during a GC until the
1104        * GC is finished.
1105        */
1106       IF_DEBUG(scheduler,
1107                if (t->what_next != prev_what_next) {
1108                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1109                          (long)t->id, whatNext_strs[t->what_next]);
1110                } else {
1111                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1112                          (long)t->id, whatNext_strs[t->what_next]);
1113                }
1114                );
1115
1116       IF_DEBUG(sanity,
1117                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1118                checkTSO(t));
1119       ASSERT(t->link == END_TSO_QUEUE);
1120
1121       // Shortcut if we're just switching evaluators: don't bother
1122       // doing stack squeezing (which can be expensive), just run the
1123       // thread.
1124       if (t->what_next != prev_what_next) {
1125           goto run_thread;
1126       }
1127
1128       threadPaused(t);
1129
1130 #if defined(GRAN)
1131       ASSERT(!is_on_queue(t,CurrentProc));
1132
1133       IF_DEBUG(sanity,
1134                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135                checkThreadQsSanity(rtsTrue));
1136 #endif
1137
1138 #if defined(PAR)
1139       if (RtsFlags.ParFlags.doFairScheduling) { 
1140         /* this does round-robin scheduling; good for concurrency */
1141         APPEND_TO_RUN_QUEUE(t);
1142       } else {
1143         /* this does unfair scheduling; good for parallelism */
1144         PUSH_ON_RUN_QUEUE(t);
1145       }
1146 #else
1147       // this does round-robin scheduling; good for concurrency
1148       APPEND_TO_RUN_QUEUE(t);
1149 #endif
1150
1151 #if defined(GRAN)
1152       /* add a ContinueThread event to actually process the thread */
1153       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1154                 ContinueThread,
1155                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1156       IF_GRAN_DEBUG(bq, 
1157                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1158                G_EVENTQ(0);
1159                G_CURR_THREADQ(0));
1160 #endif /* GRAN */
1161       break;
1162
1163     case ThreadBlocked:
1164 #if defined(GRAN)
1165       IF_DEBUG(scheduler,
1166                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1167                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1169
1170       // ??? needed; should emit block before
1171       IF_DEBUG(gran, 
1172                DumpGranEvent(GR_DESCHEDULE, t)); 
1173       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174       /*
1175         ngoq Dogh!
1176       ASSERT(procStatus[CurrentProc]==Busy || 
1177               ((procStatus[CurrentProc]==Fetching) && 
1178               (t->block_info.closure!=(StgClosure*)NULL)));
1179       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181             procStatus[CurrentProc]==Fetching)) 
1182         procStatus[CurrentProc] = Idle;
1183       */
1184 #elif defined(PAR)
1185       IF_DEBUG(scheduler,
1186                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1187                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188       IF_PAR_DEBUG(bq,
1189
1190                    if (t->block_info.closure!=(StgClosure*)NULL) 
1191                      print_bq(t->block_info.closure));
1192
1193       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194       blockThread(t);
1195
1196       /* whatever we schedule next, we must log that schedule */
1197       emitSchedule = rtsTrue;
1198
1199 #else /* !GRAN */
1200       /* don't need to do anything.  Either the thread is blocked on
1201        * I/O, in which case we'll have called addToBlockedQueue
1202        * previously, or it's blocked on an MVar or Blackhole, in which
1203        * case it'll be on the relevant queue already.
1204        */
1205       IF_DEBUG(scheduler,
1206                debugBelch("--<< thread %d (%s) stopped: ", 
1207                        t->id, whatNext_strs[t->what_next]);
1208                printThreadBlockage(t);
1209                debugBelch("\n"));
1210
1211       /* Only for dumping event to log file 
1212          ToDo: do I need this in GranSim, too?
1213       blockThread(t);
1214       */
1215 #endif
1216       threadPaused(t);
1217       break;
1218
1219     case ThreadFinished:
1220       /* Need to check whether this was a main thread, and if so, signal
1221        * the task that started it with the return value.  If we have no
1222        * more main threads, we probably need to stop all the tasks until
1223        * we get a new one.
1224        */
1225       /* We also end up here if the thread kills itself with an
1226        * uncaught exception, see Exception.hc.
1227        */
1228       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1229                                t->id, whatNext_strs[t->what_next]));
1230 #if defined(GRAN)
1231       endThread(t, CurrentProc); // clean-up the thread
1232 #elif defined(PAR)
1233       /* For now all are advisory -- HWL */
1234       //if(t->priority==AdvisoryPriority) ??
1235       advisory_thread_count--;
1236       
1237 # ifdef DIST
1238       if(t->dist.priority==RevalPriority)
1239         FinishReval(t);
1240 # endif
1241       
1242       if (RtsFlags.ParFlags.ParStats.Full &&
1243           !RtsFlags.ParFlags.ParStats.Suppressed) 
1244         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1245 #endif
1246
1247       //
1248       // Check whether the thread that just completed was a main
1249       // thread, and if so return with the result.  
1250       //
1251       // There is an assumption here that all thread completion goes
1252       // through this point; we need to make sure that if a thread
1253       // ends up in the ThreadKilled state, that it stays on the run
1254       // queue so it can be dealt with here.
1255       //
1256       if (
1257 #if defined(RTS_SUPPORTS_THREADS)
1258           mainThread != NULL
1259 #else
1260           mainThread->tso == t
1261 #endif
1262           )
1263       {
1264           // We are a bound thread: this must be our thread that just
1265           // completed.
1266           ASSERT(mainThread->tso == t);
1267
1268           if (t->what_next == ThreadComplete) {
1269               if (mainThread->ret) {
1270                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1271                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1272               }
1273               mainThread->stat = Success;
1274           } else {
1275               if (mainThread->ret) {
1276                   *(mainThread->ret) = NULL;
1277               }
1278               if (was_interrupted) {
1279                   mainThread->stat = Interrupted;
1280               } else {
1281                   mainThread->stat = Killed;
1282               }
1283           }
1284 #ifdef DEBUG
1285           removeThreadLabel((StgWord)mainThread->tso->id);
1286 #endif
1287           if (mainThread->prev == NULL) {
1288               main_threads = mainThread->link;
1289           } else {
1290               mainThread->prev->link = mainThread->link;
1291           }
1292           if (mainThread->link != NULL) {
1293               mainThread->link->prev = NULL;
1294           }
1295           releaseCapability(cap);
1296           return;
1297       }
1298
1299 #ifdef RTS_SUPPORTS_THREADS
1300       ASSERT(t->main == NULL);
1301 #else
1302       if (t->main != NULL) {
1303           // Must be a main thread that is not the topmost one.  Leave
1304           // it on the run queue until the stack has unwound to the
1305           // point where we can deal with this.  Leaving it on the run
1306           // queue also ensures that the garbage collector knows about
1307           // this thread and its return value (it gets dropped from the
1308           // all_threads list so there's no other way to find it).
1309           APPEND_TO_RUN_QUEUE(t);
1310       }
1311 #endif
1312       break;
1313
1314     default:
1315       barf("schedule: invalid thread return code %d", (int)ret);
1316     }
1317
1318 #ifdef PROFILING
1319     // When we have +RTS -i0 and we're heap profiling, do a census at
1320     // every GC.  This lets us get repeatable runs for debugging.
1321     if (performHeapProfile ||
1322         (RtsFlags.ProfFlags.profileInterval==0 &&
1323          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1324         GarbageCollect(GetRoots, rtsTrue);
1325         heapCensus();
1326         performHeapProfile = rtsFalse;
1327         ready_to_gc = rtsFalse; // we already GC'd
1328     }
1329 #endif
1330
1331     if (ready_to_gc) {
1332       /* everybody back, start the GC.
1333        * Could do it in this thread, or signal a condition var
1334        * to do it in another thread.  Either way, we need to
1335        * broadcast on gc_pending_cond afterward.
1336        */
1337 #if defined(RTS_SUPPORTS_THREADS)
1338       IF_DEBUG(scheduler,sched_belch("doing GC"));
1339 #endif
1340       GarbageCollect(GetRoots,rtsFalse);
1341       ready_to_gc = rtsFalse;
1342 #if defined(GRAN)
1343       /* add a ContinueThread event to continue execution of current thread */
1344       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1345                 ContinueThread,
1346                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1347       IF_GRAN_DEBUG(bq, 
1348                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1349                G_EVENTQ(0);
1350                G_CURR_THREADQ(0));
1351 #endif /* GRAN */
1352     }
1353
1354 #if defined(GRAN)
1355   next_thread:
1356     IF_GRAN_DEBUG(unused,
1357                   print_eventq(EventHd));
1358
1359     event = get_next_event();
1360 #elif defined(PAR)
1361   next_thread:
1362     /* ToDo: wait for next message to arrive rather than busy wait */
1363 #endif /* GRAN */
1364
1365   } /* end of while(1) */
1366
1367   IF_PAR_DEBUG(verbose,
1368                debugBelch("== Leaving schedule() after having received Finish\n"));
1369 }
1370
1371 /* ---------------------------------------------------------------------------
1372  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1373  * used by Control.Concurrent for error checking.
1374  * ------------------------------------------------------------------------- */
1375  
1376 StgBool
1377 rtsSupportsBoundThreads(void)
1378 {
1379 #ifdef THREADED_RTS
1380   return rtsTrue;
1381 #else
1382   return rtsFalse;
1383 #endif
1384 }
1385
1386 /* ---------------------------------------------------------------------------
1387  * isThreadBound(tso): check whether tso is bound to an OS thread.
1388  * ------------------------------------------------------------------------- */
1389  
1390 StgBool
1391 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1392 {
1393 #ifdef THREADED_RTS
1394   return (tso->main != NULL);
1395 #endif
1396   return rtsFalse;
1397 }
1398
1399 /* ---------------------------------------------------------------------------
1400  * Singleton fork(). Do not copy any running threads.
1401  * ------------------------------------------------------------------------- */
1402
1403 #ifndef mingw32_TARGET_OS
1404 #define FORKPROCESS_PRIMOP_SUPPORTED
1405 #endif
1406
1407 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1408 static void 
1409 deleteThreadImmediately(StgTSO *tso);
1410 #endif
1411 StgInt
1412 forkProcess(HsStablePtr *entry
1413 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1414             STG_UNUSED
1415 #endif
1416            )
1417 {
1418 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1419   pid_t pid;
1420   StgTSO* t,*next;
1421   StgMainThread *m;
1422   SchedulerStatus rc;
1423
1424   IF_DEBUG(scheduler,sched_belch("forking!"));
1425   rts_lock(); // This not only acquires sched_mutex, it also
1426               // makes sure that no other threads are running
1427
1428   pid = fork();
1429
1430   if (pid) { /* parent */
1431
1432   /* just return the pid */
1433     rts_unlock();
1434     return pid;
1435     
1436   } else { /* child */
1437     
1438     
1439       // delete all threads
1440     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1441     
1442     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1443       next = t->link;
1444
1445         // don't allow threads to catch the ThreadKilled exception
1446       deleteThreadImmediately(t);
1447     }
1448     
1449       // wipe the main thread list
1450     while((m = main_threads) != NULL) {
1451       main_threads = m->link;
1452 # ifdef THREADED_RTS
1453       closeCondition(&m->bound_thread_cond);
1454 # endif
1455       stgFree(m);
1456     }
1457     
1458 # ifdef RTS_SUPPORTS_THREADS
1459     resetTaskManagerAfterFork();      // tell startTask() and friends that
1460     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1461     resetWorkerWakeupPipeAfterFork();
1462 # endif
1463     
1464     rc = rts_evalStableIO(entry, NULL);  // run the action
1465     rts_checkSchedStatus("forkProcess",rc);
1466     
1467     rts_unlock();
1468     
1469     hs_exit();                      // clean up and exit
1470     stg_exit(0);
1471   }
1472 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1473   barf("forkProcess#: primop not supported, sorry!\n");
1474   return -1;
1475 #endif
1476 }
1477
1478 /* ---------------------------------------------------------------------------
1479  * deleteAllThreads():  kill all the live threads.
1480  *
1481  * This is used when we catch a user interrupt (^C), before performing
1482  * any necessary cleanups and running finalizers.
1483  *
1484  * Locks: sched_mutex held.
1485  * ------------------------------------------------------------------------- */
1486    
1487 void
1488 deleteAllThreads ( void )
1489 {
1490   StgTSO* t, *next;
1491   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1492   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1493       next = t->global_link;
1494       deleteThread(t);
1495   }      
1496
1497   // The run queue now contains a bunch of ThreadKilled threads.  We
1498   // must not throw these away: the main thread(s) will be in there
1499   // somewhere, and the main scheduler loop has to deal with it.
1500   // Also, the run queue is the only thing keeping these threads from
1501   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1502
1503   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1504   ASSERT(sleeping_queue == END_TSO_QUEUE);
1505 }
1506
1507 /* startThread and  insertThread are now in GranSim.c -- HWL */
1508
1509
1510 /* ---------------------------------------------------------------------------
1511  * Suspending & resuming Haskell threads.
1512  * 
1513  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1514  * its capability before calling the C function.  This allows another
1515  * task to pick up the capability and carry on running Haskell
1516  * threads.  It also means that if the C call blocks, it won't lock
1517  * the whole system.
1518  *
1519  * The Haskell thread making the C call is put to sleep for the
1520  * duration of the call, on the susepended_ccalling_threads queue.  We
1521  * give out a token to the task, which it can use to resume the thread
1522  * on return from the C function.
1523  * ------------------------------------------------------------------------- */
1524    
1525 StgInt
1526 suspendThread( StgRegTable *reg )
1527 {
1528   nat tok;
1529   Capability *cap;
1530   int saved_errno = errno;
1531
1532   /* assume that *reg is a pointer to the StgRegTable part
1533    * of a Capability.
1534    */
1535   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1536
1537   ACQUIRE_LOCK(&sched_mutex);
1538
1539   IF_DEBUG(scheduler,
1540            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1541
1542   // XXX this might not be necessary --SDM
1543   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1544
1545   threadPaused(cap->r.rCurrentTSO);
1546   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1547   suspended_ccalling_threads = cap->r.rCurrentTSO;
1548
1549   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1550       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1551       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1552   } else {
1553       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1554   }
1555
1556   /* Use the thread ID as the token; it should be unique */
1557   tok = cap->r.rCurrentTSO->id;
1558
1559   /* Hand back capability */
1560   releaseCapability(cap);
1561   
1562 #if defined(RTS_SUPPORTS_THREADS)
1563   /* Preparing to leave the RTS, so ensure there's a native thread/task
1564      waiting to take over.
1565   */
1566   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1567 #endif
1568
1569   /* Other threads _might_ be available for execution; signal this */
1570   THREAD_RUNNABLE();
1571   RELEASE_LOCK(&sched_mutex);
1572   
1573   errno = saved_errno;
1574   return tok; 
1575 }
1576
1577 StgRegTable *
1578 resumeThread( StgInt tok )
1579 {
1580   StgTSO *tso, **prev;
1581   Capability *cap;
1582   int saved_errno = errno;
1583
1584 #if defined(RTS_SUPPORTS_THREADS)
1585   /* Wait for permission to re-enter the RTS with the result. */
1586   ACQUIRE_LOCK(&sched_mutex);
1587   waitForReturnCapability(&sched_mutex, &cap);
1588
1589   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1590 #else
1591   grabCapability(&cap);
1592 #endif
1593
1594   /* Remove the thread off of the suspended list */
1595   prev = &suspended_ccalling_threads;
1596   for (tso = suspended_ccalling_threads; 
1597        tso != END_TSO_QUEUE; 
1598        prev = &tso->link, tso = tso->link) {
1599     if (tso->id == (StgThreadID)tok) {
1600       *prev = tso->link;
1601       break;
1602     }
1603   }
1604   if (tso == END_TSO_QUEUE) {
1605     barf("resumeThread: thread not found");
1606   }
1607   tso->link = END_TSO_QUEUE;
1608   
1609   if(tso->why_blocked == BlockedOnCCall) {
1610       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1611       tso->blocked_exceptions = NULL;
1612   }
1613   
1614   /* Reset blocking status */
1615   tso->why_blocked  = NotBlocked;
1616
1617   cap->r.rCurrentTSO = tso;
1618   RELEASE_LOCK(&sched_mutex);
1619   errno = saved_errno;
1620   return &cap->r;
1621 }
1622
1623
1624 /* ---------------------------------------------------------------------------
1625  * Static functions
1626  * ------------------------------------------------------------------------ */
1627 static void unblockThread(StgTSO *tso);
1628
1629 /* ---------------------------------------------------------------------------
1630  * Comparing Thread ids.
1631  *
1632  * This is used from STG land in the implementation of the
1633  * instances of Eq/Ord for ThreadIds.
1634  * ------------------------------------------------------------------------ */
1635
1636 int
1637 cmp_thread(StgPtr tso1, StgPtr tso2) 
1638
1639   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1640   StgThreadID id2 = ((StgTSO *)tso2)->id;
1641  
1642   if (id1 < id2) return (-1);
1643   if (id1 > id2) return 1;
1644   return 0;
1645 }
1646
1647 /* ---------------------------------------------------------------------------
1648  * Fetching the ThreadID from an StgTSO.
1649  *
1650  * This is used in the implementation of Show for ThreadIds.
1651  * ------------------------------------------------------------------------ */
1652 int
1653 rts_getThreadId(StgPtr tso) 
1654 {
1655   return ((StgTSO *)tso)->id;
1656 }
1657
1658 #ifdef DEBUG
1659 void
1660 labelThread(StgPtr tso, char *label)
1661 {
1662   int len;
1663   void *buf;
1664
1665   /* Caveat: Once set, you can only set the thread name to "" */
1666   len = strlen(label)+1;
1667   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1668   strncpy(buf,label,len);
1669   /* Update will free the old memory for us */
1670   updateThreadLabel(((StgTSO *)tso)->id,buf);
1671 }
1672 #endif /* DEBUG */
1673
1674 /* ---------------------------------------------------------------------------
1675    Create a new thread.
1676
1677    The new thread starts with the given stack size.  Before the
1678    scheduler can run, however, this thread needs to have a closure
1679    (and possibly some arguments) pushed on its stack.  See
1680    pushClosure() in Schedule.h.
1681
1682    createGenThread() and createIOThread() (in SchedAPI.h) are
1683    convenient packaged versions of this function.
1684
1685    currently pri (priority) is only used in a GRAN setup -- HWL
1686    ------------------------------------------------------------------------ */
1687 #if defined(GRAN)
1688 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1689 StgTSO *
1690 createThread(nat size, StgInt pri)
1691 #else
1692 StgTSO *
1693 createThread(nat size)
1694 #endif
1695 {
1696
1697     StgTSO *tso;
1698     nat stack_size;
1699
1700     /* First check whether we should create a thread at all */
1701 #if defined(PAR)
1702   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1703   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1704     threadsIgnored++;
1705     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1706           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1707     return END_TSO_QUEUE;
1708   }
1709   threadsCreated++;
1710 #endif
1711
1712 #if defined(GRAN)
1713   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1714 #endif
1715
1716   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1717
1718   /* catch ridiculously small stack sizes */
1719   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1720     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1721   }
1722
1723   stack_size = size - TSO_STRUCT_SIZEW;
1724
1725   tso = (StgTSO *)allocate(size);
1726   TICK_ALLOC_TSO(stack_size, 0);
1727
1728   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1729 #if defined(GRAN)
1730   SET_GRAN_HDR(tso, ThisPE);
1731 #endif
1732
1733   // Always start with the compiled code evaluator
1734   tso->what_next = ThreadRunGHC;
1735
1736   tso->id = next_thread_id++; 
1737   tso->why_blocked  = NotBlocked;
1738   tso->blocked_exceptions = NULL;
1739
1740   tso->saved_errno = 0;
1741   tso->main = NULL;
1742   
1743   tso->stack_size   = stack_size;
1744   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1745                               - TSO_STRUCT_SIZEW;
1746   tso->sp           = (P_)&(tso->stack) + stack_size;
1747
1748 #ifdef PROFILING
1749   tso->prof.CCCS = CCS_MAIN;
1750 #endif
1751
1752   /* put a stop frame on the stack */
1753   tso->sp -= sizeofW(StgStopFrame);
1754   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1755   tso->link = END_TSO_QUEUE;
1756
1757   // ToDo: check this
1758 #if defined(GRAN)
1759   /* uses more flexible routine in GranSim */
1760   insertThread(tso, CurrentProc);
1761 #else
1762   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1763    * from its creation
1764    */
1765 #endif
1766
1767 #if defined(GRAN) 
1768   if (RtsFlags.GranFlags.GranSimStats.Full) 
1769     DumpGranEvent(GR_START,tso);
1770 #elif defined(PAR)
1771   if (RtsFlags.ParFlags.ParStats.Full) 
1772     DumpGranEvent(GR_STARTQ,tso);
1773   /* HACk to avoid SCHEDULE 
1774      LastTSO = tso; */
1775 #endif
1776
1777   /* Link the new thread on the global thread list.
1778    */
1779   tso->global_link = all_threads;
1780   all_threads = tso;
1781
1782 #if defined(DIST)
1783   tso->dist.priority = MandatoryPriority; //by default that is...
1784 #endif
1785
1786 #if defined(GRAN)
1787   tso->gran.pri = pri;
1788 # if defined(DEBUG)
1789   tso->gran.magic = TSO_MAGIC; // debugging only
1790 # endif
1791   tso->gran.sparkname   = 0;
1792   tso->gran.startedat   = CURRENT_TIME; 
1793   tso->gran.exported    = 0;
1794   tso->gran.basicblocks = 0;
1795   tso->gran.allocs      = 0;
1796   tso->gran.exectime    = 0;
1797   tso->gran.fetchtime   = 0;
1798   tso->gran.fetchcount  = 0;
1799   tso->gran.blocktime   = 0;
1800   tso->gran.blockcount  = 0;
1801   tso->gran.blockedat   = 0;
1802   tso->gran.globalsparks = 0;
1803   tso->gran.localsparks  = 0;
1804   if (RtsFlags.GranFlags.Light)
1805     tso->gran.clock  = Now; /* local clock */
1806   else
1807     tso->gran.clock  = 0;
1808
1809   IF_DEBUG(gran,printTSO(tso));
1810 #elif defined(PAR)
1811 # if defined(DEBUG)
1812   tso->par.magic = TSO_MAGIC; // debugging only
1813 # endif
1814   tso->par.sparkname   = 0;
1815   tso->par.startedat   = CURRENT_TIME; 
1816   tso->par.exported    = 0;
1817   tso->par.basicblocks = 0;
1818   tso->par.allocs      = 0;
1819   tso->par.exectime    = 0;
1820   tso->par.fetchtime   = 0;
1821   tso->par.fetchcount  = 0;
1822   tso->par.blocktime   = 0;
1823   tso->par.blockcount  = 0;
1824   tso->par.blockedat   = 0;
1825   tso->par.globalsparks = 0;
1826   tso->par.localsparks  = 0;
1827 #endif
1828
1829 #if defined(GRAN)
1830   globalGranStats.tot_threads_created++;
1831   globalGranStats.threads_created_on_PE[CurrentProc]++;
1832   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1833   globalGranStats.tot_sq_probes++;
1834 #elif defined(PAR)
1835   // collect parallel global statistics (currently done together with GC stats)
1836   if (RtsFlags.ParFlags.ParStats.Global &&
1837       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1838     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1839     globalParStats.tot_threads_created++;
1840   }
1841 #endif 
1842
1843 #if defined(GRAN)
1844   IF_GRAN_DEBUG(pri,
1845                 sched_belch("==__ schedule: Created TSO %d (%p);",
1846                       CurrentProc, tso, tso->id));
1847 #elif defined(PAR)
1848     IF_PAR_DEBUG(verbose,
1849                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1850                        (long)tso->id, tso, advisory_thread_count));
1851 #else
1852   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1853                                 (long)tso->id, (long)tso->stack_size));
1854 #endif    
1855   return tso;
1856 }
1857
1858 #if defined(PAR)
1859 /* RFP:
1860    all parallel thread creation calls should fall through the following routine.
1861 */
1862 StgTSO *
1863 createSparkThread(rtsSpark spark) 
1864 { StgTSO *tso;
1865   ASSERT(spark != (rtsSpark)NULL);
1866   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1867   { threadsIgnored++;
1868     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1869           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1870     return END_TSO_QUEUE;
1871   }
1872   else
1873   { threadsCreated++;
1874     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1875     if (tso==END_TSO_QUEUE)     
1876       barf("createSparkThread: Cannot create TSO");
1877 #if defined(DIST)
1878     tso->priority = AdvisoryPriority;
1879 #endif
1880     pushClosure(tso,spark);
1881     PUSH_ON_RUN_QUEUE(tso);
1882     advisory_thread_count++;    
1883   }
1884   return tso;
1885 }
1886 #endif
1887
1888 /*
1889   Turn a spark into a thread.
1890   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1891 */
1892 #if defined(PAR)
1893 StgTSO *
1894 activateSpark (rtsSpark spark) 
1895 {
1896   StgTSO *tso;
1897
1898   tso = createSparkThread(spark);
1899   if (RtsFlags.ParFlags.ParStats.Full) {   
1900     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1901     IF_PAR_DEBUG(verbose,
1902                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1903                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1904   }
1905   // ToDo: fwd info on local/global spark to thread -- HWL
1906   // tso->gran.exported =  spark->exported;
1907   // tso->gran.locked =   !spark->global;
1908   // tso->gran.sparkname = spark->name;
1909
1910   return tso;
1911 }
1912 #endif
1913
1914 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1915                                    Capability *initialCapability
1916                                    );
1917
1918
1919 /* ---------------------------------------------------------------------------
1920  * scheduleThread()
1921  *
1922  * scheduleThread puts a thread on the head of the runnable queue.
1923  * This will usually be done immediately after a thread is created.
1924  * The caller of scheduleThread must create the thread using e.g.
1925  * createThread and push an appropriate closure
1926  * on this thread's stack before the scheduler is invoked.
1927  * ------------------------------------------------------------------------ */
1928
1929 static void scheduleThread_ (StgTSO* tso);
1930
1931 void
1932 scheduleThread_(StgTSO *tso)
1933 {
1934   // Precondition: sched_mutex must be held.
1935   // The thread goes at the *end* of the run-queue, to avoid possible
1936   // starvation of any threads already on the queue.
1937   APPEND_TO_RUN_QUEUE(tso);
1938   THREAD_RUNNABLE();
1939 }
1940
1941 void
1942 scheduleThread(StgTSO* tso)
1943 {
1944   ACQUIRE_LOCK(&sched_mutex);
1945   scheduleThread_(tso);
1946   RELEASE_LOCK(&sched_mutex);
1947 }
1948
1949 #if defined(RTS_SUPPORTS_THREADS)
1950 static Condition bound_cond_cache;
1951 static int bound_cond_cache_full = 0;
1952 #endif
1953
1954
1955 SchedulerStatus
1956 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1957                    Capability *initialCapability)
1958 {
1959     // Precondition: sched_mutex must be held
1960     StgMainThread *m;
1961
1962     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1963     m->tso = tso;
1964     tso->main = m;
1965     m->ret = ret;
1966     m->stat = NoStatus;
1967     m->link = main_threads;
1968     m->prev = NULL;
1969     if (main_threads != NULL) {
1970         main_threads->prev = m;
1971     }
1972     main_threads = m;
1973
1974 #if defined(RTS_SUPPORTS_THREADS)
1975     // Allocating a new condition for each thread is expensive, so we
1976     // cache one.  This is a pretty feeble hack, but it helps speed up
1977     // consecutive call-ins quite a bit.
1978     if (bound_cond_cache_full) {
1979         m->bound_thread_cond = bound_cond_cache;
1980         bound_cond_cache_full = 0;
1981     } else {
1982         initCondition(&m->bound_thread_cond);
1983     }
1984 #endif
1985
1986     /* Put the thread on the main-threads list prior to scheduling the TSO.
1987        Failure to do so introduces a race condition in the MT case (as
1988        identified by Wolfgang Thaller), whereby the new task/OS thread 
1989        created by scheduleThread_() would complete prior to the thread
1990        that spawned it managed to put 'itself' on the main-threads list.
1991        The upshot of it all being that the worker thread wouldn't get to
1992        signal the completion of the its work item for the main thread to
1993        see (==> it got stuck waiting.)    -- sof 6/02.
1994     */
1995     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1996     
1997     APPEND_TO_RUN_QUEUE(tso);
1998     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1999     // bound and only runnable by *this* OS thread, so waking up other
2000     // workers will just slow things down.
2001
2002     return waitThread_(m, initialCapability);
2003 }
2004
2005 /* ---------------------------------------------------------------------------
2006  * initScheduler()
2007  *
2008  * Initialise the scheduler.  This resets all the queues - if the
2009  * queues contained any threads, they'll be garbage collected at the
2010  * next pass.
2011  *
2012  * ------------------------------------------------------------------------ */
2013
2014 void 
2015 initScheduler(void)
2016 {
2017 #if defined(GRAN)
2018   nat i;
2019
2020   for (i=0; i<=MAX_PROC; i++) {
2021     run_queue_hds[i]      = END_TSO_QUEUE;
2022     run_queue_tls[i]      = END_TSO_QUEUE;
2023     blocked_queue_hds[i]  = END_TSO_QUEUE;
2024     blocked_queue_tls[i]  = END_TSO_QUEUE;
2025     ccalling_threadss[i]  = END_TSO_QUEUE;
2026     sleeping_queue        = END_TSO_QUEUE;
2027   }
2028 #else
2029   run_queue_hd      = END_TSO_QUEUE;
2030   run_queue_tl      = END_TSO_QUEUE;
2031   blocked_queue_hd  = END_TSO_QUEUE;
2032   blocked_queue_tl  = END_TSO_QUEUE;
2033   sleeping_queue    = END_TSO_QUEUE;
2034 #endif 
2035
2036   suspended_ccalling_threads  = END_TSO_QUEUE;
2037
2038   main_threads = NULL;
2039   all_threads  = END_TSO_QUEUE;
2040
2041   context_switch = 0;
2042   interrupted    = 0;
2043
2044   RtsFlags.ConcFlags.ctxtSwitchTicks =
2045       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2046       
2047 #if defined(RTS_SUPPORTS_THREADS)
2048   /* Initialise the mutex and condition variables used by
2049    * the scheduler. */
2050   initMutex(&sched_mutex);
2051   initMutex(&term_mutex);
2052 #endif
2053   
2054   ACQUIRE_LOCK(&sched_mutex);
2055
2056   /* A capability holds the state a native thread needs in
2057    * order to execute STG code. At least one capability is
2058    * floating around (only SMP builds have more than one).
2059    */
2060   initCapabilities();
2061   
2062 #if defined(RTS_SUPPORTS_THREADS)
2063     /* start our haskell execution tasks */
2064     startTaskManager(0,taskStart);
2065 #endif
2066
2067 #if /* defined(SMP) ||*/ defined(PAR)
2068   initSparkPools();
2069 #endif
2070
2071   RELEASE_LOCK(&sched_mutex);
2072 }
2073
2074 void
2075 exitScheduler( void )
2076 {
2077 #if defined(RTS_SUPPORTS_THREADS)
2078   stopTaskManager();
2079 #endif
2080   shutting_down_scheduler = rtsTrue;
2081 }
2082
2083 /* ----------------------------------------------------------------------------
2084    Managing the per-task allocation areas.
2085    
2086    Each capability comes with an allocation area.  These are
2087    fixed-length block lists into which allocation can be done.
2088
2089    ToDo: no support for two-space collection at the moment???
2090    ------------------------------------------------------------------------- */
2091
2092 static
2093 SchedulerStatus
2094 waitThread_(StgMainThread* m, Capability *initialCapability)
2095 {
2096   SchedulerStatus stat;
2097
2098   // Precondition: sched_mutex must be held.
2099   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2100
2101 #if defined(GRAN)
2102   /* GranSim specific init */
2103   CurrentTSO = m->tso;                // the TSO to run
2104   procStatus[MainProc] = Busy;        // status of main PE
2105   CurrentProc = MainProc;             // PE to run it on
2106   schedule(m,initialCapability);
2107 #else
2108   schedule(m,initialCapability);
2109   ASSERT(m->stat != NoStatus);
2110 #endif
2111
2112   stat = m->stat;
2113
2114 #if defined(RTS_SUPPORTS_THREADS)
2115   // Free the condition variable, returning it to the cache if possible.
2116   if (!bound_cond_cache_full) {
2117       bound_cond_cache = m->bound_thread_cond;
2118       bound_cond_cache_full = 1;
2119   } else {
2120       closeCondition(&m->bound_thread_cond);
2121   }
2122 #endif
2123
2124   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2125   stgFree(m);
2126
2127   // Postcondition: sched_mutex still held
2128   return stat;
2129 }
2130
2131 /* ---------------------------------------------------------------------------
2132    Where are the roots that we know about?
2133
2134         - all the threads on the runnable queue
2135         - all the threads on the blocked queue
2136         - all the threads on the sleeping queue
2137         - all the thread currently executing a _ccall_GC
2138         - all the "main threads"
2139      
2140    ------------------------------------------------------------------------ */
2141
2142 /* This has to be protected either by the scheduler monitor, or by the
2143         garbage collection monitor (probably the latter).
2144         KH @ 25/10/99
2145 */
2146
2147 void
2148 GetRoots( evac_fn evac )
2149 {
2150 #if defined(GRAN)
2151   {
2152     nat i;
2153     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2154       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2155           evac((StgClosure **)&run_queue_hds[i]);
2156       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2157           evac((StgClosure **)&run_queue_tls[i]);
2158       
2159       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2160           evac((StgClosure **)&blocked_queue_hds[i]);
2161       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2162           evac((StgClosure **)&blocked_queue_tls[i]);
2163       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2164           evac((StgClosure **)&ccalling_threads[i]);
2165     }
2166   }
2167
2168   markEventQueue();
2169
2170 #else /* !GRAN */
2171   if (run_queue_hd != END_TSO_QUEUE) {
2172       ASSERT(run_queue_tl != END_TSO_QUEUE);
2173       evac((StgClosure **)&run_queue_hd);
2174       evac((StgClosure **)&run_queue_tl);
2175   }
2176   
2177   if (blocked_queue_hd != END_TSO_QUEUE) {
2178       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2179       evac((StgClosure **)&blocked_queue_hd);
2180       evac((StgClosure **)&blocked_queue_tl);
2181   }
2182   
2183   if (sleeping_queue != END_TSO_QUEUE) {
2184       evac((StgClosure **)&sleeping_queue);
2185   }
2186 #endif 
2187
2188   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2189       evac((StgClosure **)&suspended_ccalling_threads);
2190   }
2191
2192 #if defined(PAR) || defined(GRAN)
2193   markSparkQueue(evac);
2194 #endif
2195
2196 #if defined(RTS_USER_SIGNALS)
2197   // mark the signal handlers (signals should be already blocked)
2198   markSignalHandlers(evac);
2199 #endif
2200 }
2201
2202 /* -----------------------------------------------------------------------------
2203    performGC
2204
2205    This is the interface to the garbage collector from Haskell land.
2206    We provide this so that external C code can allocate and garbage
2207    collect when called from Haskell via _ccall_GC.
2208
2209    It might be useful to provide an interface whereby the programmer
2210    can specify more roots (ToDo).
2211    
2212    This needs to be protected by the GC condition variable above.  KH.
2213    -------------------------------------------------------------------------- */
2214
2215 static void (*extra_roots)(evac_fn);
2216
2217 void
2218 performGC(void)
2219 {
2220   /* Obligated to hold this lock upon entry */
2221   ACQUIRE_LOCK(&sched_mutex);
2222   GarbageCollect(GetRoots,rtsFalse);
2223   RELEASE_LOCK(&sched_mutex);
2224 }
2225
2226 void
2227 performMajorGC(void)
2228 {
2229   ACQUIRE_LOCK(&sched_mutex);
2230   GarbageCollect(GetRoots,rtsTrue);
2231   RELEASE_LOCK(&sched_mutex);
2232 }
2233
2234 static void
2235 AllRoots(evac_fn evac)
2236 {
2237     GetRoots(evac);             // the scheduler's roots
2238     extra_roots(evac);          // the user's roots
2239 }
2240
2241 void
2242 performGCWithRoots(void (*get_roots)(evac_fn))
2243 {
2244   ACQUIRE_LOCK(&sched_mutex);
2245   extra_roots = get_roots;
2246   GarbageCollect(AllRoots,rtsFalse);
2247   RELEASE_LOCK(&sched_mutex);
2248 }
2249
2250 /* -----------------------------------------------------------------------------
2251    Stack overflow
2252
2253    If the thread has reached its maximum stack size, then raise the
2254    StackOverflow exception in the offending thread.  Otherwise
2255    relocate the TSO into a larger chunk of memory and adjust its stack
2256    size appropriately.
2257    -------------------------------------------------------------------------- */
2258
2259 static StgTSO *
2260 threadStackOverflow(StgTSO *tso)
2261 {
2262   nat new_stack_size, new_tso_size, stack_words;
2263   StgPtr new_sp;
2264   StgTSO *dest;
2265
2266   IF_DEBUG(sanity,checkTSO(tso));
2267   if (tso->stack_size >= tso->max_stack_size) {
2268
2269     IF_DEBUG(gc,
2270              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2271                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2272              /* If we're debugging, just print out the top of the stack */
2273              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2274                                               tso->sp+64)));
2275
2276     /* Send this thread the StackOverflow exception */
2277     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2278     return tso;
2279   }
2280
2281   /* Try to double the current stack size.  If that takes us over the
2282    * maximum stack size for this thread, then use the maximum instead.
2283    * Finally round up so the TSO ends up as a whole number of blocks.
2284    */
2285   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2286   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2287                                        TSO_STRUCT_SIZE)/sizeof(W_);
2288   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2289   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2290
2291   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2292
2293   dest = (StgTSO *)allocate(new_tso_size);
2294   TICK_ALLOC_TSO(new_stack_size,0);
2295
2296   /* copy the TSO block and the old stack into the new area */
2297   memcpy(dest,tso,TSO_STRUCT_SIZE);
2298   stack_words = tso->stack + tso->stack_size - tso->sp;
2299   new_sp = (P_)dest + new_tso_size - stack_words;
2300   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2301
2302   /* relocate the stack pointers... */
2303   dest->sp         = new_sp;
2304   dest->stack_size = new_stack_size;
2305         
2306   /* Mark the old TSO as relocated.  We have to check for relocated
2307    * TSOs in the garbage collector and any primops that deal with TSOs.
2308    *
2309    * It's important to set the sp value to just beyond the end
2310    * of the stack, so we don't attempt to scavenge any part of the
2311    * dead TSO's stack.
2312    */
2313   tso->what_next = ThreadRelocated;
2314   tso->link = dest;
2315   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2316   tso->why_blocked = NotBlocked;
2317   dest->mut_link = NULL;
2318
2319   IF_PAR_DEBUG(verbose,
2320                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2321                      tso->id, tso, tso->stack_size);
2322                /* If we're debugging, just print out the top of the stack */
2323                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2324                                                 tso->sp+64)));
2325   
2326   IF_DEBUG(sanity,checkTSO(tso));
2327 #if 0
2328   IF_DEBUG(scheduler,printTSO(dest));
2329 #endif
2330
2331   return dest;
2332 }
2333
2334 /* ---------------------------------------------------------------------------
2335    Wake up a queue that was blocked on some resource.
2336    ------------------------------------------------------------------------ */
2337
2338 #if defined(GRAN)
2339 STATIC_INLINE void
2340 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2341 {
2342 }
2343 #elif defined(PAR)
2344 STATIC_INLINE void
2345 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2346 {
2347   /* write RESUME events to log file and
2348      update blocked and fetch time (depending on type of the orig closure) */
2349   if (RtsFlags.ParFlags.ParStats.Full) {
2350     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2351                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2352                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2353     if (EMPTY_RUN_QUEUE())
2354       emitSchedule = rtsTrue;
2355
2356     switch (get_itbl(node)->type) {
2357         case FETCH_ME_BQ:
2358           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2359           break;
2360         case RBH:
2361         case FETCH_ME:
2362         case BLACKHOLE_BQ:
2363           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2364           break;
2365 #ifdef DIST
2366         case MVAR:
2367           break;
2368 #endif    
2369         default:
2370           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2371         }
2372       }
2373 }
2374 #endif
2375
2376 #if defined(GRAN)
2377 static StgBlockingQueueElement *
2378 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2379 {
2380     StgTSO *tso;
2381     PEs node_loc, tso_loc;
2382
2383     node_loc = where_is(node); // should be lifted out of loop
2384     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2385     tso_loc = where_is((StgClosure *)tso);
2386     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2387       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2388       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2389       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2390       // insertThread(tso, node_loc);
2391       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2392                 ResumeThread,
2393                 tso, node, (rtsSpark*)NULL);
2394       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2395       // len_local++;
2396       // len++;
2397     } else { // TSO is remote (actually should be FMBQ)
2398       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2399                                   RtsFlags.GranFlags.Costs.gunblocktime +
2400                                   RtsFlags.GranFlags.Costs.latency;
2401       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2402                 UnblockThread,
2403                 tso, node, (rtsSpark*)NULL);
2404       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2405       // len++;
2406     }
2407     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2408     IF_GRAN_DEBUG(bq,
2409                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2410                           (node_loc==tso_loc ? "Local" : "Global"), 
2411                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2412     tso->block_info.closure = NULL;
2413     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2414                              tso->id, tso));
2415 }
2416 #elif defined(PAR)
2417 static StgBlockingQueueElement *
2418 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2419 {
2420     StgBlockingQueueElement *next;
2421
2422     switch (get_itbl(bqe)->type) {
2423     case TSO:
2424       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2425       /* if it's a TSO just push it onto the run_queue */
2426       next = bqe->link;
2427       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2428       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2429       THREAD_RUNNABLE();
2430       unblockCount(bqe, node);
2431       /* reset blocking status after dumping event */
2432       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2433       break;
2434
2435     case BLOCKED_FETCH:
2436       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2437       next = bqe->link;
2438       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2439       PendingFetches = (StgBlockedFetch *)bqe;
2440       break;
2441
2442 # if defined(DEBUG)
2443       /* can ignore this case in a non-debugging setup; 
2444          see comments on RBHSave closures above */
2445     case CONSTR:
2446       /* check that the closure is an RBHSave closure */
2447       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2448              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2449              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2450       break;
2451
2452     default:
2453       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2454            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2455            (StgClosure *)bqe);
2456 # endif
2457     }
2458   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2459   return next;
2460 }
2461
2462 #else /* !GRAN && !PAR */
2463 static StgTSO *
2464 unblockOneLocked(StgTSO *tso)
2465 {
2466   StgTSO *next;
2467
2468   ASSERT(get_itbl(tso)->type == TSO);
2469   ASSERT(tso->why_blocked != NotBlocked);
2470   tso->why_blocked = NotBlocked;
2471   next = tso->link;
2472   tso->link = END_TSO_QUEUE;
2473   APPEND_TO_RUN_QUEUE(tso);
2474   THREAD_RUNNABLE();
2475   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2476   return next;
2477 }
2478 #endif
2479
2480 #if defined(GRAN) || defined(PAR)
2481 INLINE_ME StgBlockingQueueElement *
2482 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2483 {
2484   ACQUIRE_LOCK(&sched_mutex);
2485   bqe = unblockOneLocked(bqe, node);
2486   RELEASE_LOCK(&sched_mutex);
2487   return bqe;
2488 }
2489 #else
2490 INLINE_ME StgTSO *
2491 unblockOne(StgTSO *tso)
2492 {
2493   ACQUIRE_LOCK(&sched_mutex);
2494   tso = unblockOneLocked(tso);
2495   RELEASE_LOCK(&sched_mutex);
2496   return tso;
2497 }
2498 #endif
2499
2500 #if defined(GRAN)
2501 void 
2502 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2503 {
2504   StgBlockingQueueElement *bqe;
2505   PEs node_loc;
2506   nat len = 0; 
2507
2508   IF_GRAN_DEBUG(bq, 
2509                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2510                       node, CurrentProc, CurrentTime[CurrentProc], 
2511                       CurrentTSO->id, CurrentTSO));
2512
2513   node_loc = where_is(node);
2514
2515   ASSERT(q == END_BQ_QUEUE ||
2516          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2517          get_itbl(q)->type == CONSTR); // closure (type constructor)
2518   ASSERT(is_unique(node));
2519
2520   /* FAKE FETCH: magically copy the node to the tso's proc;
2521      no Fetch necessary because in reality the node should not have been 
2522      moved to the other PE in the first place
2523   */
2524   if (CurrentProc!=node_loc) {
2525     IF_GRAN_DEBUG(bq, 
2526                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2527                         node, node_loc, CurrentProc, CurrentTSO->id, 
2528                         // CurrentTSO, where_is(CurrentTSO),
2529                         node->header.gran.procs));
2530     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2531     IF_GRAN_DEBUG(bq, 
2532                   debugBelch("## new bitmask of node %p is %#x\n",
2533                         node, node->header.gran.procs));
2534     if (RtsFlags.GranFlags.GranSimStats.Global) {
2535       globalGranStats.tot_fake_fetches++;
2536     }
2537   }
2538
2539   bqe = q;
2540   // ToDo: check: ASSERT(CurrentProc==node_loc);
2541   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2542     //next = bqe->link;
2543     /* 
2544        bqe points to the current element in the queue
2545        next points to the next element in the queue
2546     */
2547     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2548     //tso_loc = where_is(tso);
2549     len++;
2550     bqe = unblockOneLocked(bqe, node);
2551   }
2552
2553   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2554      the closure to make room for the anchor of the BQ */
2555   if (bqe!=END_BQ_QUEUE) {
2556     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2557     /*
2558     ASSERT((info_ptr==&RBH_Save_0_info) ||
2559            (info_ptr==&RBH_Save_1_info) ||
2560            (info_ptr==&RBH_Save_2_info));
2561     */
2562     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2563     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2564     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2565
2566     IF_GRAN_DEBUG(bq,
2567                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2568                         node, info_type(node)));
2569   }
2570
2571   /* statistics gathering */
2572   if (RtsFlags.GranFlags.GranSimStats.Global) {
2573     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2574     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2575     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2576     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2577   }
2578   IF_GRAN_DEBUG(bq,
2579                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2580                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2581 }
2582 #elif defined(PAR)
2583 void 
2584 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2585 {
2586   StgBlockingQueueElement *bqe;
2587
2588   ACQUIRE_LOCK(&sched_mutex);
2589
2590   IF_PAR_DEBUG(verbose, 
2591                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2592                      node, mytid));
2593 #ifdef DIST  
2594   //RFP
2595   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2596     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2597     return;
2598   }
2599 #endif
2600   
2601   ASSERT(q == END_BQ_QUEUE ||
2602          get_itbl(q)->type == TSO ||           
2603          get_itbl(q)->type == BLOCKED_FETCH || 
2604          get_itbl(q)->type == CONSTR); 
2605
2606   bqe = q;
2607   while (get_itbl(bqe)->type==TSO || 
2608          get_itbl(bqe)->type==BLOCKED_FETCH) {
2609     bqe = unblockOneLocked(bqe, node);
2610   }
2611   RELEASE_LOCK(&sched_mutex);
2612 }
2613
2614 #else   /* !GRAN && !PAR */
2615
2616 void
2617 awakenBlockedQueueNoLock(StgTSO *tso)
2618 {
2619   while (tso != END_TSO_QUEUE) {
2620     tso = unblockOneLocked(tso);
2621   }
2622 }
2623
2624 void
2625 awakenBlockedQueue(StgTSO *tso)
2626 {
2627   ACQUIRE_LOCK(&sched_mutex);
2628   while (tso != END_TSO_QUEUE) {
2629     tso = unblockOneLocked(tso);
2630   }
2631   RELEASE_LOCK(&sched_mutex);
2632 }
2633 #endif
2634
2635 /* ---------------------------------------------------------------------------
2636    Interrupt execution
2637    - usually called inside a signal handler so it mustn't do anything fancy.   
2638    ------------------------------------------------------------------------ */
2639
2640 void
2641 interruptStgRts(void)
2642 {
2643     interrupted    = 1;
2644     context_switch = 1;
2645 #ifdef RTS_SUPPORTS_THREADS
2646     wakeBlockedWorkerThread();
2647 #endif
2648 }
2649
2650 /* -----------------------------------------------------------------------------
2651    Unblock a thread
2652
2653    This is for use when we raise an exception in another thread, which
2654    may be blocked.
2655    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2656    -------------------------------------------------------------------------- */
2657
2658 #if defined(GRAN) || defined(PAR)
2659 /*
2660   NB: only the type of the blocking queue is different in GranSim and GUM
2661       the operations on the queue-elements are the same
2662       long live polymorphism!
2663
2664   Locks: sched_mutex is held upon entry and exit.
2665
2666 */
2667 static void
2668 unblockThread(StgTSO *tso)
2669 {
2670   StgBlockingQueueElement *t, **last;
2671
2672   switch (tso->why_blocked) {
2673
2674   case NotBlocked:
2675     return;  /* not blocked */
2676
2677   case BlockedOnMVar:
2678     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2679     {
2680       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2681       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2682
2683       last = (StgBlockingQueueElement **)&mvar->head;
2684       for (t = (StgBlockingQueueElement *)mvar->head; 
2685            t != END_BQ_QUEUE; 
2686            last = &t->link, last_tso = t, t = t->link) {
2687         if (t == (StgBlockingQueueElement *)tso) {
2688           *last = (StgBlockingQueueElement *)tso->link;
2689           if (mvar->tail == tso) {
2690             mvar->tail = (StgTSO *)last_tso;
2691           }
2692           goto done;
2693         }
2694       }
2695       barf("unblockThread (MVAR): TSO not found");
2696     }
2697
2698   case BlockedOnBlackHole:
2699     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2700     {
2701       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2702
2703       last = &bq->blocking_queue;
2704       for (t = bq->blocking_queue; 
2705            t != END_BQ_QUEUE; 
2706            last = &t->link, t = t->link) {
2707         if (t == (StgBlockingQueueElement *)tso) {
2708           *last = (StgBlockingQueueElement *)tso->link;
2709           goto done;
2710         }
2711       }
2712       barf("unblockThread (BLACKHOLE): TSO not found");
2713     }
2714
2715   case BlockedOnException:
2716     {
2717       StgTSO *target  = tso->block_info.tso;
2718
2719       ASSERT(get_itbl(target)->type == TSO);
2720
2721       if (target->what_next == ThreadRelocated) {
2722           target = target->link;
2723           ASSERT(get_itbl(target)->type == TSO);
2724       }
2725
2726       ASSERT(target->blocked_exceptions != NULL);
2727
2728       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2729       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2730            t != END_BQ_QUEUE; 
2731            last = &t->link, t = t->link) {
2732         ASSERT(get_itbl(t)->type == TSO);
2733         if (t == (StgBlockingQueueElement *)tso) {
2734           *last = (StgBlockingQueueElement *)tso->link;
2735           goto done;
2736         }
2737       }
2738       barf("unblockThread (Exception): TSO not found");
2739     }
2740
2741   case BlockedOnRead:
2742   case BlockedOnWrite:
2743 #if defined(mingw32_TARGET_OS)
2744   case BlockedOnDoProc:
2745 #endif
2746     {
2747       /* take TSO off blocked_queue */
2748       StgBlockingQueueElement *prev = NULL;
2749       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2750            prev = t, t = t->link) {
2751         if (t == (StgBlockingQueueElement *)tso) {
2752           if (prev == NULL) {
2753             blocked_queue_hd = (StgTSO *)t->link;
2754             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2755               blocked_queue_tl = END_TSO_QUEUE;
2756             }
2757           } else {
2758             prev->link = t->link;
2759             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2760               blocked_queue_tl = (StgTSO *)prev;
2761             }
2762           }
2763           goto done;
2764         }
2765       }
2766       barf("unblockThread (I/O): TSO not found");
2767     }
2768
2769   case BlockedOnDelay:
2770     {
2771       /* take TSO off sleeping_queue */
2772       StgBlockingQueueElement *prev = NULL;
2773       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2774            prev = t, t = t->link) {
2775         if (t == (StgBlockingQueueElement *)tso) {
2776           if (prev == NULL) {
2777             sleeping_queue = (StgTSO *)t->link;
2778           } else {
2779             prev->link = t->link;
2780           }
2781           goto done;
2782         }
2783       }
2784       barf("unblockThread (delay): TSO not found");
2785     }
2786
2787   default:
2788     barf("unblockThread");
2789   }
2790
2791  done:
2792   tso->link = END_TSO_QUEUE;
2793   tso->why_blocked = NotBlocked;
2794   tso->block_info.closure = NULL;
2795   PUSH_ON_RUN_QUEUE(tso);
2796 }
2797 #else
2798 static void
2799 unblockThread(StgTSO *tso)
2800 {
2801   StgTSO *t, **last;
2802   
2803   /* To avoid locking unnecessarily. */
2804   if (tso->why_blocked == NotBlocked) {
2805     return;
2806   }
2807
2808   switch (tso->why_blocked) {
2809
2810   case BlockedOnMVar:
2811     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2812     {
2813       StgTSO *last_tso = END_TSO_QUEUE;
2814       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2815
2816       last = &mvar->head;
2817       for (t = mvar->head; t != END_TSO_QUEUE; 
2818            last = &t->link, last_tso = t, t = t->link) {
2819         if (t == tso) {
2820           *last = tso->link;
2821           if (mvar->tail == tso) {
2822             mvar->tail = last_tso;
2823           }
2824           goto done;
2825         }
2826       }
2827       barf("unblockThread (MVAR): TSO not found");
2828     }
2829
2830   case BlockedOnBlackHole:
2831     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2832     {
2833       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2834
2835       last = &bq->blocking_queue;
2836       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2837            last = &t->link, t = t->link) {
2838         if (t == tso) {
2839           *last = tso->link;
2840           goto done;
2841         }
2842       }
2843       barf("unblockThread (BLACKHOLE): TSO not found");
2844     }
2845
2846   case BlockedOnException:
2847     {
2848       StgTSO *target  = tso->block_info.tso;
2849
2850       ASSERT(get_itbl(target)->type == TSO);
2851
2852       while (target->what_next == ThreadRelocated) {
2853           target = target->link;
2854           ASSERT(get_itbl(target)->type == TSO);
2855       }
2856       
2857       ASSERT(target->blocked_exceptions != NULL);
2858
2859       last = &target->blocked_exceptions;
2860       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2861            last = &t->link, t = t->link) {
2862         ASSERT(get_itbl(t)->type == TSO);
2863         if (t == tso) {
2864           *last = tso->link;
2865           goto done;
2866         }
2867       }
2868       barf("unblockThread (Exception): TSO not found");
2869     }
2870
2871   case BlockedOnRead:
2872   case BlockedOnWrite:
2873 #if defined(mingw32_TARGET_OS)
2874   case BlockedOnDoProc:
2875 #endif
2876     {
2877       StgTSO *prev = NULL;
2878       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2879            prev = t, t = t->link) {
2880         if (t == tso) {
2881           if (prev == NULL) {
2882             blocked_queue_hd = t->link;
2883             if (blocked_queue_tl == t) {
2884               blocked_queue_tl = END_TSO_QUEUE;
2885             }
2886           } else {
2887             prev->link = t->link;
2888             if (blocked_queue_tl == t) {
2889               blocked_queue_tl = prev;
2890             }
2891           }
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (I/O): TSO not found");
2896     }
2897
2898   case BlockedOnDelay:
2899     {
2900       StgTSO *prev = NULL;
2901       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2902            prev = t, t = t->link) {
2903         if (t == tso) {
2904           if (prev == NULL) {
2905             sleeping_queue = t->link;
2906           } else {
2907             prev->link = t->link;
2908           }
2909           goto done;
2910         }
2911       }
2912       barf("unblockThread (delay): TSO not found");
2913     }
2914
2915   default:
2916     barf("unblockThread");
2917   }
2918
2919  done:
2920   tso->link = END_TSO_QUEUE;
2921   tso->why_blocked = NotBlocked;
2922   tso->block_info.closure = NULL;
2923   APPEND_TO_RUN_QUEUE(tso);
2924 }
2925 #endif
2926
2927 /* -----------------------------------------------------------------------------
2928  * raiseAsync()
2929  *
2930  * The following function implements the magic for raising an
2931  * asynchronous exception in an existing thread.
2932  *
2933  * We first remove the thread from any queue on which it might be
2934  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2935  *
2936  * We strip the stack down to the innermost CATCH_FRAME, building
2937  * thunks in the heap for all the active computations, so they can 
2938  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2939  * an application of the handler to the exception, and push it on
2940  * the top of the stack.
2941  * 
2942  * How exactly do we save all the active computations?  We create an
2943  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2944  * AP_STACKs pushes everything from the corresponding update frame
2945  * upwards onto the stack.  (Actually, it pushes everything up to the
2946  * next update frame plus a pointer to the next AP_STACK object.
2947  * Entering the next AP_STACK object pushes more onto the stack until we
2948  * reach the last AP_STACK object - at which point the stack should look
2949  * exactly as it did when we killed the TSO and we can continue
2950  * execution by entering the closure on top of the stack.
2951  *
2952  * We can also kill a thread entirely - this happens if either (a) the 
2953  * exception passed to raiseAsync is NULL, or (b) there's no
2954  * CATCH_FRAME on the stack.  In either case, we strip the entire
2955  * stack and replace the thread with a zombie.
2956  *
2957  * Locks: sched_mutex held upon entry nor exit.
2958  *
2959  * -------------------------------------------------------------------------- */
2960  
2961 void 
2962 deleteThread(StgTSO *tso)
2963 {
2964   raiseAsync(tso,NULL);
2965 }
2966
2967 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2968 static void 
2969 deleteThreadImmediately(StgTSO *tso)
2970 { // for forkProcess only:
2971   // delete thread without giving it a chance to catch the KillThread exception
2972
2973   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2974       return;
2975   }
2976
2977   if (tso->why_blocked != BlockedOnCCall &&
2978       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2979     unblockThread(tso);
2980   }
2981
2982   tso->what_next = ThreadKilled;
2983 }
2984 #endif
2985
2986 void
2987 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2988 {
2989   /* When raising async exs from contexts where sched_mutex isn't held;
2990      use raiseAsyncWithLock(). */
2991   ACQUIRE_LOCK(&sched_mutex);
2992   raiseAsync(tso,exception);
2993   RELEASE_LOCK(&sched_mutex);
2994 }
2995
2996 void
2997 raiseAsync(StgTSO *tso, StgClosure *exception)
2998 {
2999     StgRetInfoTable *info;
3000     StgPtr sp;
3001   
3002     // Thread already dead?
3003     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3004         return;
3005     }
3006
3007     IF_DEBUG(scheduler, 
3008              sched_belch("raising exception in thread %ld.", (long)tso->id));
3009     
3010     // Remove it from any blocking queues
3011     unblockThread(tso);
3012
3013     sp = tso->sp;
3014     
3015     // The stack freezing code assumes there's a closure pointer on
3016     // the top of the stack, so we have to arrange that this is the case...
3017     //
3018     if (sp[0] == (W_)&stg_enter_info) {
3019         sp++;
3020     } else {
3021         sp--;
3022         sp[0] = (W_)&stg_dummy_ret_closure;
3023     }
3024
3025     while (1) {
3026         nat i;
3027
3028         // 1. Let the top of the stack be the "current closure"
3029         //
3030         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3031         // CATCH_FRAME.
3032         //
3033         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3034         // current closure applied to the chunk of stack up to (but not
3035         // including) the update frame.  This closure becomes the "current
3036         // closure".  Go back to step 2.
3037         //
3038         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3039         // top of the stack applied to the exception.
3040         // 
3041         // 5. If it's a STOP_FRAME, then kill the thread.
3042         
3043         StgPtr frame;
3044         
3045         frame = sp + 1;
3046         info = get_ret_itbl((StgClosure *)frame);
3047         
3048         while (info->i.type != UPDATE_FRAME
3049                && (info->i.type != CATCH_FRAME || exception == NULL)
3050                && info->i.type != STOP_FRAME) {
3051             frame += stack_frame_sizeW((StgClosure *)frame);
3052             info = get_ret_itbl((StgClosure *)frame);
3053         }
3054         
3055         switch (info->i.type) {
3056             
3057         case CATCH_FRAME:
3058             // If we find a CATCH_FRAME, and we've got an exception to raise,
3059             // then build the THUNK raise(exception), and leave it on
3060             // top of the CATCH_FRAME ready to enter.
3061             //
3062         {
3063 #ifdef PROFILING
3064             StgCatchFrame *cf = (StgCatchFrame *)frame;
3065 #endif
3066             StgClosure *raise;
3067             
3068             // we've got an exception to raise, so let's pass it to the
3069             // handler in this frame.
3070             //
3071             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3072             TICK_ALLOC_SE_THK(1,0);
3073             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3074             raise->payload[0] = exception;
3075             
3076             // throw away the stack from Sp up to the CATCH_FRAME.
3077             //
3078             sp = frame - 1;
3079             
3080             /* Ensure that async excpetions are blocked now, so we don't get
3081              * a surprise exception before we get around to executing the
3082              * handler.
3083              */
3084             if (tso->blocked_exceptions == NULL) {
3085                 tso->blocked_exceptions = END_TSO_QUEUE;
3086             }
3087             
3088             /* Put the newly-built THUNK on top of the stack, ready to execute
3089              * when the thread restarts.
3090              */
3091             sp[0] = (W_)raise;
3092             sp[-1] = (W_)&stg_enter_info;
3093             tso->sp = sp-1;
3094             tso->what_next = ThreadRunGHC;
3095             IF_DEBUG(sanity, checkTSO(tso));
3096             return;
3097         }
3098         
3099         case UPDATE_FRAME:
3100         {
3101             StgAP_STACK * ap;
3102             nat words;
3103             
3104             // First build an AP_STACK consisting of the stack chunk above the
3105             // current update frame, with the top word on the stack as the
3106             // fun field.
3107             //
3108             words = frame - sp - 1;
3109             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3110             
3111             ap->size = words;
3112             ap->fun  = (StgClosure *)sp[0];
3113             sp++;
3114             for(i=0; i < (nat)words; ++i) {
3115                 ap->payload[i] = (StgClosure *)*sp++;
3116             }
3117             
3118             SET_HDR(ap,&stg_AP_STACK_info,
3119                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3120             TICK_ALLOC_UP_THK(words+1,0);
3121             
3122             IF_DEBUG(scheduler,
3123                      debugBelch("sched: Updating ");
3124                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3125                      debugBelch(" with ");
3126                      printObj((StgClosure *)ap);
3127                 );
3128
3129             // Replace the updatee with an indirection - happily
3130             // this will also wake up any threads currently
3131             // waiting on the result.
3132             //
3133             // Warning: if we're in a loop, more than one update frame on
3134             // the stack may point to the same object.  Be careful not to
3135             // overwrite an IND_OLDGEN in this case, because we'll screw
3136             // up the mutable lists.  To be on the safe side, don't
3137             // overwrite any kind of indirection at all.  See also
3138             // threadSqueezeStack in GC.c, where we have to make a similar
3139             // check.
3140             //
3141             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3142                 // revert the black hole
3143                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3144                                (StgClosure *)ap);
3145             }
3146             sp += sizeofW(StgUpdateFrame) - 1;
3147             sp[0] = (W_)ap; // push onto stack
3148             break;
3149         }
3150         
3151         case STOP_FRAME:
3152             // We've stripped the entire stack, the thread is now dead.
3153             sp += sizeofW(StgStopFrame);
3154             tso->what_next = ThreadKilled;
3155             tso->sp = sp;
3156             return;
3157             
3158         default:
3159             barf("raiseAsync");
3160         }
3161     }
3162     barf("raiseAsync");
3163 }
3164
3165 /* -----------------------------------------------------------------------------
3166    raiseExceptionHelper
3167    
3168    This function is called by the raise# primitve, just so that we can
3169    move some of the tricky bits of raising an exception from C-- into
3170    C.  Who knows, it might be a useful re-useable thing here too.
3171    -------------------------------------------------------------------------- */
3172
3173 StgWord
3174 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3175 {
3176     StgClosure *raise_closure = NULL;
3177     StgPtr p, next;
3178     StgRetInfoTable *info;
3179     //
3180     // This closure represents the expression 'raise# E' where E
3181     // is the exception raise.  It is used to overwrite all the
3182     // thunks which are currently under evaluataion.
3183     //
3184
3185     //    
3186     // LDV profiling: stg_raise_info has THUNK as its closure
3187     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3188     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3189     // 1 does not cause any problem unless profiling is performed.
3190     // However, when LDV profiling goes on, we need to linearly scan
3191     // small object pool, where raise_closure is stored, so we should
3192     // use MIN_UPD_SIZE.
3193     //
3194     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3195     //                                 sizeofW(StgClosure)+1);
3196     //
3197
3198     //
3199     // Walk up the stack, looking for the catch frame.  On the way,
3200     // we update any closures pointed to from update frames with the
3201     // raise closure that we just built.
3202     //
3203     p = tso->sp;
3204     while(1) {
3205         info = get_ret_itbl((StgClosure *)p);
3206         next = p + stack_frame_sizeW((StgClosure *)p);
3207         switch (info->i.type) {
3208             
3209         case UPDATE_FRAME:
3210             // Only create raise_closure if we need to.
3211             if (raise_closure == NULL) {
3212                 raise_closure = 
3213                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3214                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3215                 raise_closure->payload[0] = exception;
3216             }
3217             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3218             p = next;
3219             continue;
3220             
3221         case CATCH_FRAME:
3222             tso->sp = p;
3223             return CATCH_FRAME;
3224             
3225         case STOP_FRAME:
3226             tso->sp = p;
3227             return STOP_FRAME;
3228
3229         default:
3230             p = next; 
3231             continue;
3232         }
3233     }
3234 }
3235
3236 /* -----------------------------------------------------------------------------
3237    resurrectThreads is called after garbage collection on the list of
3238    threads found to be garbage.  Each of these threads will be woken
3239    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3240    on an MVar, or NonTermination if the thread was blocked on a Black
3241    Hole.
3242
3243    Locks: sched_mutex isn't held upon entry nor exit.
3244    -------------------------------------------------------------------------- */
3245
3246 void
3247 resurrectThreads( StgTSO *threads )
3248 {
3249   StgTSO *tso, *next;
3250
3251   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3252     next = tso->global_link;
3253     tso->global_link = all_threads;
3254     all_threads = tso;
3255     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3256
3257     switch (tso->why_blocked) {
3258     case BlockedOnMVar:
3259     case BlockedOnException:
3260       /* Called by GC - sched_mutex lock is currently held. */
3261       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3262       break;
3263     case BlockedOnBlackHole:
3264       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3265       break;
3266     case NotBlocked:
3267       /* This might happen if the thread was blocked on a black hole
3268        * belonging to a thread that we've just woken up (raiseAsync
3269        * can wake up threads, remember...).
3270        */
3271       continue;
3272     default:
3273       barf("resurrectThreads: thread blocked in a strange way");
3274     }
3275   }
3276 }
3277
3278 /* -----------------------------------------------------------------------------
3279  * Blackhole detection: if we reach a deadlock, test whether any
3280  * threads are blocked on themselves.  Any threads which are found to
3281  * be self-blocked get sent a NonTermination exception.
3282  *
3283  * This is only done in a deadlock situation in order to avoid
3284  * performance overhead in the normal case.
3285  *
3286  * Locks: sched_mutex is held upon entry and exit.
3287  * -------------------------------------------------------------------------- */
3288
3289 static void
3290 detectBlackHoles( void )
3291 {
3292     StgTSO *tso = all_threads;
3293     StgPtr frame;
3294     StgClosure *blocked_on;
3295     StgRetInfoTable *info;
3296
3297     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3298
3299         while (tso->what_next == ThreadRelocated) {
3300             tso = tso->link;
3301             ASSERT(get_itbl(tso)->type == TSO);
3302         }
3303       
3304         if (tso->why_blocked != BlockedOnBlackHole) {
3305             continue;
3306         }
3307         blocked_on = tso->block_info.closure;
3308
3309         frame = tso->sp;
3310
3311         while(1) {
3312             info = get_ret_itbl((StgClosure *)frame);
3313             switch (info->i.type) {
3314             case UPDATE_FRAME:
3315                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3316                     /* We are blocking on one of our own computations, so
3317                      * send this thread the NonTermination exception.  
3318                      */
3319                     IF_DEBUG(scheduler, 
3320                              sched_belch("thread %d is blocked on itself", tso->id));
3321                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3322                     goto done;
3323                 }
3324                 
3325                 frame = (StgPtr)((StgUpdateFrame *)frame + 1);
3326                 continue;
3327
3328             case STOP_FRAME:
3329                 goto done;
3330
3331                 // normal stack frames; do nothing except advance the pointer
3332             default:
3333                 frame += stack_frame_sizeW((StgClosure *)frame);
3334             }
3335         }   
3336         done: ;
3337     }
3338 }
3339
3340 /* ----------------------------------------------------------------------------
3341  * Debugging: why is a thread blocked
3342  * [Also provides useful information when debugging threaded programs
3343  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3344    ------------------------------------------------------------------------- */
3345
3346 static
3347 void
3348 printThreadBlockage(StgTSO *tso)
3349 {
3350   switch (tso->why_blocked) {
3351   case BlockedOnRead:
3352     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3353     break;
3354   case BlockedOnWrite:
3355     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3356     break;
3357 #if defined(mingw32_TARGET_OS)
3358     case BlockedOnDoProc:
3359     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3360     break;
3361 #endif
3362   case BlockedOnDelay:
3363     debugBelch("is blocked until %d", tso->block_info.target);
3364     break;
3365   case BlockedOnMVar:
3366     debugBelch("is blocked on an MVar");
3367     break;
3368   case BlockedOnException:
3369     debugBelch("is blocked on delivering an exception to thread %d",
3370             tso->block_info.tso->id);
3371     break;
3372   case BlockedOnBlackHole:
3373     debugBelch("is blocked on a black hole");
3374     break;
3375   case NotBlocked:
3376     debugBelch("is not blocked");
3377     break;
3378 #if defined(PAR)
3379   case BlockedOnGA:
3380     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3381             tso->block_info.closure, info_type(tso->block_info.closure));
3382     break;
3383   case BlockedOnGA_NoSend:
3384     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3385             tso->block_info.closure, info_type(tso->block_info.closure));
3386     break;
3387 #endif
3388   case BlockedOnCCall:
3389     debugBelch("is blocked on an external call");
3390     break;
3391   case BlockedOnCCall_NoUnblockExc:
3392     debugBelch("is blocked on an external call (exceptions were already blocked)");
3393     break;
3394   default:
3395     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3396          tso->why_blocked, tso->id, tso);
3397   }
3398 }
3399
3400 static
3401 void
3402 printThreadStatus(StgTSO *tso)
3403 {
3404   switch (tso->what_next) {
3405   case ThreadKilled:
3406     debugBelch("has been killed");
3407     break;
3408   case ThreadComplete:
3409     debugBelch("has completed");
3410     break;
3411   default:
3412     printThreadBlockage(tso);
3413   }
3414 }
3415
3416 void
3417 printAllThreads(void)
3418 {
3419   StgTSO *t;
3420   void *label;
3421
3422 # if defined(GRAN)
3423   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3424   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3425                        time_string, rtsFalse/*no commas!*/);
3426
3427   debugBelch("all threads at [%s]:\n", time_string);
3428 # elif defined(PAR)
3429   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3430   ullong_format_string(CURRENT_TIME,
3431                        time_string, rtsFalse/*no commas!*/);
3432
3433   debugBelch("all threads at [%s]:\n", time_string);
3434 # else
3435   debugBelch("all threads:\n");
3436 # endif
3437
3438   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3439     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3440     label = lookupThreadLabel(t->id);
3441     if (label) debugBelch("[\"%s\"] ",(char *)label);
3442     printThreadStatus(t);
3443     debugBelch("\n");
3444   }
3445 }
3446     
3447 #ifdef DEBUG
3448
3449 /* 
3450    Print a whole blocking queue attached to node (debugging only).
3451 */
3452 # if defined(PAR)
3453 void 
3454 print_bq (StgClosure *node)
3455 {
3456   StgBlockingQueueElement *bqe;
3457   StgTSO *tso;
3458   rtsBool end;
3459
3460   debugBelch("## BQ of closure %p (%s): ",
3461           node, info_type(node));
3462
3463   /* should cover all closures that may have a blocking queue */
3464   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3465          get_itbl(node)->type == FETCH_ME_BQ ||
3466          get_itbl(node)->type == RBH ||
3467          get_itbl(node)->type == MVAR);
3468     
3469   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3470
3471   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3472 }
3473
3474 /* 
3475    Print a whole blocking queue starting with the element bqe.
3476 */
3477 void 
3478 print_bqe (StgBlockingQueueElement *bqe)
3479 {
3480   rtsBool end;
3481
3482   /* 
3483      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3484   */
3485   for (end = (bqe==END_BQ_QUEUE);
3486        !end; // iterate until bqe points to a CONSTR
3487        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3488        bqe = end ? END_BQ_QUEUE : bqe->link) {
3489     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3490     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3491     /* types of closures that may appear in a blocking queue */
3492     ASSERT(get_itbl(bqe)->type == TSO ||           
3493            get_itbl(bqe)->type == BLOCKED_FETCH || 
3494            get_itbl(bqe)->type == CONSTR); 
3495     /* only BQs of an RBH end with an RBH_Save closure */
3496     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3497
3498     switch (get_itbl(bqe)->type) {
3499     case TSO:
3500       debugBelch(" TSO %u (%x),",
3501               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3502       break;
3503     case BLOCKED_FETCH:
3504       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3505               ((StgBlockedFetch *)bqe)->node, 
3506               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3507               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3508               ((StgBlockedFetch *)bqe)->ga.weight);
3509       break;
3510     case CONSTR:
3511       debugBelch(" %s (IP %p),",
3512               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3513                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3514                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3515                "RBH_Save_?"), get_itbl(bqe));
3516       break;
3517     default:
3518       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3519            info_type((StgClosure *)bqe)); // , node, info_type(node));
3520       break;
3521     }
3522   } /* for */
3523   debugBelch("\n");
3524 }
3525 # elif defined(GRAN)
3526 void 
3527 print_bq (StgClosure *node)
3528 {
3529   StgBlockingQueueElement *bqe;
3530   PEs node_loc, tso_loc;
3531   rtsBool end;
3532
3533   /* should cover all closures that may have a blocking queue */
3534   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3535          get_itbl(node)->type == FETCH_ME_BQ ||
3536          get_itbl(node)->type == RBH);
3537     
3538   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3539   node_loc = where_is(node);
3540
3541   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3542           node, info_type(node), node_loc);
3543
3544   /* 
3545      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3546   */
3547   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3548        !end; // iterate until bqe points to a CONSTR
3549        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3550     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3551     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3552     /* types of closures that may appear in a blocking queue */
3553     ASSERT(get_itbl(bqe)->type == TSO ||           
3554            get_itbl(bqe)->type == CONSTR); 
3555     /* only BQs of an RBH end with an RBH_Save closure */
3556     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3557
3558     tso_loc = where_is((StgClosure *)bqe);
3559     switch (get_itbl(bqe)->type) {
3560     case TSO:
3561       debugBelch(" TSO %d (%p) on [PE %d],",
3562               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3563       break;
3564     case CONSTR:
3565       debugBelch(" %s (IP %p),",
3566               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3567                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3568                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3569                "RBH_Save_?"), get_itbl(bqe));
3570       break;
3571     default:
3572       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3573            info_type((StgClosure *)bqe), node, info_type(node));
3574       break;
3575     }
3576   } /* for */
3577   debugBelch("\n");
3578 }
3579 #else
3580 /* 
3581    Nice and easy: only TSOs on the blocking queue
3582 */
3583 void 
3584 print_bq (StgClosure *node)
3585 {
3586   StgTSO *tso;
3587
3588   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3589   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3590        tso != END_TSO_QUEUE; 
3591        tso=tso->link) {
3592     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3593     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3594     debugBelch(" TSO %d (%p),", tso->id, tso);
3595   }
3596   debugBelch("\n");
3597 }
3598 # endif
3599
3600 #if defined(PAR)
3601 static nat
3602 run_queue_len(void)
3603 {
3604   nat i;
3605   StgTSO *tso;
3606
3607   for (i=0, tso=run_queue_hd; 
3608        tso != END_TSO_QUEUE;
3609        i++, tso=tso->link)
3610     /* nothing */
3611
3612   return i;
3613 }
3614 #endif
3615
3616 void
3617 sched_belch(char *s, ...)
3618 {
3619   va_list ap;
3620   va_start(ap,s);
3621 #ifdef RTS_SUPPORTS_THREADS
3622   debugBelch("sched (task %p): ", osThreadId());
3623 #elif defined(PAR)
3624   debugBelch("== ");
3625 #else
3626   debugBelch("sched: ");
3627 #endif
3628   vdebugBelch(s, ap);
3629   debugBelch("\n");
3630   va_end(ap);
3631 }
3632
3633 #endif /* DEBUG */