[project @ 2005-01-12 12:36:28 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* Next thread ID to allocate.
172  * Locks required: thread_id_mutex
173  */
174 static StgThreadID next_thread_id = 1;
175
176 /*
177  * Pointers to the state of the current thread.
178  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
180  */
181  
182 /* The smallest stack size that makes any sense is:
183  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
184  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
185  *  + 1                       (the closure to enter)
186  *  + 1                       (stg_ap_v_ret)
187  *  + 1                       (spare slot req'd by stg_ap_v_ret)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
194
195
196 #if defined(GRAN)
197 StgTSO *CurrentTSO;
198 #endif
199
200 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
201  *  exists - earlier gccs apparently didn't.
202  *  -= chak
203  */
204 StgTSO dummy_tso;
205
206 static rtsBool ready_to_gc;
207
208 /*
209  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210  * in an MT setting, needed to signal that a worker thread shouldn't hang around
211  * in the scheduler when it is out of work.
212  */
213 static rtsBool shutting_down_scheduler = rtsFalse;
214
215 void            addToBlockedQueue ( StgTSO *tso );
216
217 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
218        void     interruptStgRts   ( void );
219
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void     detectBlackHoles  ( void );
222 #endif
223
224 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
225
226 #if defined(RTS_SUPPORTS_THREADS)
227 /* ToDo: carefully document the invariants that go together
228  *       with these synchronisation objects.
229  */
230 Mutex     sched_mutex       = INIT_MUTEX_VAR;
231 Mutex     term_mutex        = INIT_MUTEX_VAR;
232
233 #endif /* RTS_SUPPORTS_THREADS */
234
235 #if defined(PAR)
236 StgTSO *LastTSO;
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
239 #endif
240
241 #if DEBUG
242 static char *whatNext_strs[] = {
243   "(unknown)",
244   "ThreadRunGHC",
245   "ThreadInterpret",
246   "ThreadKilled",
247   "ThreadRelocated",
248   "ThreadComplete"
249 };
250 #endif
251
252 #if defined(PAR)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 /* ----------------------------------------------------------------------------
258  * Starting Tasks
259  * ------------------------------------------------------------------------- */
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 static rtsBool startingWorkerThread = rtsFalse;
263
264 static void taskStart(void);
265 static void
266 taskStart(void)
267 {
268   ACQUIRE_LOCK(&sched_mutex);
269   startingWorkerThread = rtsFalse;
270   schedule(NULL,NULL);
271   RELEASE_LOCK(&sched_mutex);
272 }
273
274 void
275 startSchedulerTaskIfNecessary(void)
276 {
277   if(run_queue_hd != END_TSO_QUEUE
278     || blocked_queue_hd != END_TSO_QUEUE
279     || sleeping_queue != END_TSO_QUEUE)
280   {
281     if(!startingWorkerThread)
282     { // we don't want to start another worker thread
283       // just because the last one hasn't yet reached the
284       // "waiting for capability" state
285       startingWorkerThread = rtsTrue;
286       if(!startTask(taskStart))
287       {
288         startingWorkerThread = rtsFalse;
289       }
290     }
291   }
292 }
293 #endif
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    Locking notes:  we acquire the scheduler lock once at the beginning
308    of the scheduler loop, and release it when
309     
310       * running a thread, or
311       * waiting for work, or
312       * waiting for a GC to complete.
313
314    GRAN version:
315      In a GranSim setup this loop iterates over the global event queue.
316      This revolves around the global event queue, which determines what 
317      to do next. Therefore, it's more complicated than either the 
318      concurrent or the parallel (GUM) setup.
319
320    GUM version:
321      GUM iterates over incoming messages.
322      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
323      and sends out a fish whenever it has nothing to do; in-between
324      doing the actual reductions (shared code below) it processes the
325      incoming messages and deals with delayed operations 
326      (see PendingFetches).
327      This is not the ugliest code you could imagine, but it's bloody close.
328
329    ------------------------------------------------------------------------ */
330 static void
331 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
332           Capability *initialCapability )
333 {
334   StgTSO *t;
335   Capability *cap;
336   StgThreadReturnCode ret;
337 #if defined(GRAN)
338   rtsEvent *event;
339 #elif defined(PAR)
340   StgSparkPool *pool;
341   rtsSpark spark;
342   StgTSO *tso;
343   GlobalTaskId pe;
344   rtsBool receivedFinish = rtsFalse;
345 # if defined(DEBUG)
346   nat tp_size, sp_size; // stats only
347 # endif
348 #endif
349   rtsBool was_interrupted = rtsFalse;
350   nat prev_what_next;
351   
352   // Pre-condition: sched_mutex is held.
353   // We might have a capability, passed in as initialCapability.
354   cap = initialCapability;
355
356 #if defined(RTS_SUPPORTS_THREADS)
357   //
358   // in the threaded case, the capability is either passed in via the
359   // initialCapability parameter, or initialized inside the scheduler
360   // loop 
361   //
362   IF_DEBUG(scheduler,
363            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
364                        mainThread, initialCapability);
365       );
366 #else
367   // simply initialise it in the non-threaded case
368   grabCapability(&cap);
369 #endif
370
371 #if defined(GRAN)
372   /* set up first event to get things going */
373   /* ToDo: assign costs for system setup and init MainTSO ! */
374   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
375             ContinueThread, 
376             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
377
378   IF_DEBUG(gran,
379            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
380            G_TSO(CurrentTSO, 5));
381
382   if (RtsFlags.GranFlags.Light) {
383     /* Save current time; GranSim Light only */
384     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
385   }      
386
387   event = get_next_event();
388
389   while (event!=(rtsEvent*)NULL) {
390     /* Choose the processor with the next event */
391     CurrentProc = event->proc;
392     CurrentTSO = event->tso;
393
394 #elif defined(PAR)
395
396   while (!receivedFinish) {    /* set by processMessages */
397                                /* when receiving PP_FINISH message         */ 
398
399 #else // everything except GRAN and PAR
400
401   while (1) {
402
403 #endif
404
405      IF_DEBUG(scheduler, printAllThreads());
406
407 #if defined(RTS_SUPPORTS_THREADS)
408       // Yield the capability to higher-priority tasks if necessary.
409       //
410       if (cap != NULL) {
411           yieldCapability(&cap);
412       }
413
414       // If we do not currently hold a capability, we wait for one
415       //
416       if (cap == NULL) {
417           waitForCapability(&sched_mutex, &cap,
418                             mainThread ? &mainThread->bound_thread_cond : NULL);
419       }
420
421       // We now have a capability...
422 #endif
423
424     //
425     // If we're interrupted (the user pressed ^C, or some other
426     // termination condition occurred), kill all the currently running
427     // threads.
428     //
429     if (interrupted) {
430         IF_DEBUG(scheduler, sched_belch("interrupted"));
431         interrupted = rtsFalse;
432         was_interrupted = rtsTrue;
433 #if defined(RTS_SUPPORTS_THREADS)
434         // In the threaded RTS, deadlock detection doesn't work,
435         // so just exit right away.
436         errorBelch("interrupted");
437         releaseCapability(cap);
438         RELEASE_LOCK(&sched_mutex);
439         shutdownHaskellAndExit(EXIT_SUCCESS);
440 #else
441         deleteAllThreads();
442 #endif
443     }
444
445 #if defined(RTS_USER_SIGNALS)
446     // check for signals each time around the scheduler
447     if (signals_pending()) {
448       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
449       startSignalHandlers();
450       ACQUIRE_LOCK(&sched_mutex);
451     }
452 #endif
453
454     //
455     // Check whether any waiting threads need to be woken up.  If the
456     // run queue is empty, and there are no other tasks running, we
457     // can wait indefinitely for something to happen.
458     //
459     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
460     {
461 #if defined(RTS_SUPPORTS_THREADS)
462         // We shouldn't be here...
463         barf("schedule: awaitEvent() in threaded RTS");
464 #endif
465         awaitEvent( EMPTY_RUN_QUEUE() );
466     }
467     // we can be interrupted while waiting for I/O...
468     if (interrupted) continue;
469
470     /* 
471      * Detect deadlock: when we have no threads to run, there are no
472      * threads waiting on I/O or sleeping, and all the other tasks are
473      * waiting for work, we must have a deadlock of some description.
474      *
475      * We first try to find threads blocked on themselves (ie. black
476      * holes), and generate NonTermination exceptions where necessary.
477      *
478      * If no threads are black holed, we have a deadlock situation, so
479      * inform all the main threads.
480      */
481 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
482     if (   EMPTY_THREAD_QUEUES() )
483     {
484         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
485
486         // Garbage collection can release some new threads due to
487         // either (a) finalizers or (b) threads resurrected because
488         // they are unreachable and will therefore be sent an
489         // exception.  Any threads thus released will be immediately
490         // runnable.
491         GarbageCollect(GetRoots,rtsTrue);
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS)
546     if ( EMPTY_RUN_QUEUE() ) {
547         continue; // nothing to do
548     }
549 #endif
550
551 #if defined(GRAN)
552     if (RtsFlags.GranFlags.Light)
553       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
554
555     /* adjust time based on time-stamp */
556     if (event->time > CurrentTime[CurrentProc] &&
557         event->evttype != ContinueThread)
558       CurrentTime[CurrentProc] = event->time;
559     
560     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561     if (!RtsFlags.GranFlags.Light)
562       handleIdlePEs();
563
564     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
565
566     /* main event dispatcher in GranSim */
567     switch (event->evttype) {
568       /* Should just be continuing execution */
569     case ContinueThread:
570       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
571       /* ToDo: check assertion
572       ASSERT(run_queue_hd != (StgTSO*)NULL &&
573              run_queue_hd != END_TSO_QUEUE);
574       */
575       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576       if (!RtsFlags.GranFlags.DoAsyncFetch &&
577           procStatus[CurrentProc]==Fetching) {
578         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
579               CurrentTSO->id, CurrentTSO, CurrentProc);
580         goto next_thread;
581       } 
582       /* Ignore ContinueThreads for completed threads */
583       if (CurrentTSO->what_next == ThreadComplete) {
584         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
585               CurrentTSO->id, CurrentTSO, CurrentProc);
586         goto next_thread;
587       } 
588       /* Ignore ContinueThreads for threads that are being migrated */
589       if (PROCS(CurrentTSO)==Nowhere) { 
590         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
591               CurrentTSO->id, CurrentTSO, CurrentProc);
592         goto next_thread;
593       }
594       /* The thread should be at the beginning of the run queue */
595       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
596         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
597               CurrentTSO->id, CurrentTSO, CurrentProc);
598         break; // run the thread anyway
599       }
600       /*
601       new_event(proc, proc, CurrentTime[proc],
602                 FindWork,
603                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
604       goto next_thread; 
605       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606       break; // now actually run the thread; DaH Qu'vam yImuHbej 
607
608     case FetchNode:
609       do_the_fetchnode(event);
610       goto next_thread;             /* handle next event in event queue  */
611       
612     case GlobalBlock:
613       do_the_globalblock(event);
614       goto next_thread;             /* handle next event in event queue  */
615       
616     case FetchReply:
617       do_the_fetchreply(event);
618       goto next_thread;             /* handle next event in event queue  */
619       
620     case UnblockThread:   /* Move from the blocked queue to the tail of */
621       do_the_unblock(event);
622       goto next_thread;             /* handle next event in event queue  */
623       
624     case ResumeThread:  /* Move from the blocked queue to the tail of */
625       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
626       event->tso->gran.blocktime += 
627         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628       do_the_startthread(event);
629       goto next_thread;             /* handle next event in event queue  */
630       
631     case StartThread:
632       do_the_startthread(event);
633       goto next_thread;             /* handle next event in event queue  */
634       
635     case MoveThread:
636       do_the_movethread(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case MoveSpark:
640       do_the_movespark(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FindWork:
644       do_the_findwork(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     default:
648       barf("Illegal event type %u\n", event->evttype);
649     }  /* switch */
650     
651     /* This point was scheduler_loop in the old RTS */
652
653     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
654
655     TimeOfLastEvent = CurrentTime[CurrentProc];
656     TimeOfNextEvent = get_time_of_next_event();
657     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658     // CurrentTSO = ThreadQueueHd;
659
660     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
661                          TimeOfNextEvent));
662
663     if (RtsFlags.GranFlags.Light) 
664       GranSimLight_leave_system(event, &ActiveTSO); 
665
666     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667
668     IF_DEBUG(gran, 
669              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
670
671     /* in a GranSim setup the TSO stays on the run queue */
672     t = CurrentTSO;
673     /* Take a thread from the run queue. */
674     POP_RUN_QUEUE(t); // take_off_run_queue(t);
675
676     IF_DEBUG(gran, 
677              debugBelch("GRAN: About to run current thread, which is\n");
678              G_TSO(t,5));
679
680     context_switch = 0; // turned on via GranYield, checking events and time slice
681
682     IF_DEBUG(gran, 
683              DumpGranEvent(GR_SCHEDULE, t));
684
685     procStatus[CurrentProc] = Busy;
686
687 #elif defined(PAR)
688     if (PendingFetches != END_BF_QUEUE) {
689         processFetches();
690     }
691
692     /* ToDo: phps merge with spark activation above */
693     /* check whether we have local work and send requests if we have none */
694     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
695       /* :-[  no local threads => look out for local sparks */
696       /* the spark pool for the current PE */
697       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699           pool->hd < pool->tl) {
700         /* 
701          * ToDo: add GC code check that we really have enough heap afterwards!!
702          * Old comment:
703          * If we're here (no runnable threads) and we have pending
704          * sparks, we must have a space problem.  Get enough space
705          * to turn one of those pending sparks into a
706          * thread... 
707          */
708
709         spark = findSpark(rtsFalse);                /* get a spark */
710         if (spark != (rtsSpark) NULL) {
711           tso = activateSpark(spark);       /* turn the spark into a thread */
712           IF_PAR_DEBUG(schedule,
713                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
714                              tso->id, tso, advisory_thread_count));
715
716           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717             debugBelch("==^^ failed to activate spark\n");
718             goto next_thread;
719           }               /* otherwise fall through & pick-up new tso */
720         } else {
721           IF_PAR_DEBUG(verbose,
722                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
723                              spark_queue_len(pool)));
724           goto next_thread;
725         }
726       }
727
728       /* If we still have no work we need to send a FISH to get a spark
729          from another PE 
730       */
731       if (EMPTY_RUN_QUEUE()) {
732       /* =8-[  no local sparks => look for work on other PEs */
733         /*
734          * We really have absolutely no work.  Send out a fish
735          * (there may be some out there already), and wait for
736          * something to arrive.  We clearly can't run any threads
737          * until a SCHEDULE or RESUME arrives, and so that's what
738          * we're hoping to see.  (Of course, we still have to
739          * respond to other types of messages.)
740          */
741         TIME now = msTime() /*CURRENT_TIME*/;
742         IF_PAR_DEBUG(verbose, 
743                      debugBelch("--  now=%ld\n", now));
744         IF_PAR_DEBUG(verbose,
745                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746                          (last_fish_arrived_at!=0 &&
747                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
749                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750                              last_fish_arrived_at,
751                              RtsFlags.ParFlags.fishDelay, now);
752                      });
753         
754         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755             (last_fish_arrived_at==0 ||
756              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757           /* outstandingFishes is set in sendFish, processFish;
758              avoid flooding system with fishes via delay */
759           pe = choosePE();
760           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
761                    NEW_FISH_HUNGER);
762
763           // Global statistics: count no. of fishes
764           if (RtsFlags.ParFlags.ParStats.Global &&
765               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766             globalParStats.tot_fish_mess++;
767           }
768         }
769       
770         receivedFinish = processMessages();
771         goto next_thread;
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       receivedFinish = processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779
780     /* Take a thread from the run queue, if we have work */
781     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
782     IF_DEBUG(sanity,checkTSO(t));
783
784     /* ToDo: write something to the log-file
785     if (RTSflags.ParFlags.granSimStats && !sameThread)
786         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787
788     CurrentTSO = t;
789     */
790     /* the spark pool for the current PE */
791     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792
793     IF_DEBUG(scheduler, 
794              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
795                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796
797 # if 1
798     if (0 && RtsFlags.ParFlags.ParStats.Full && 
799         t && LastTSO && t->id != LastTSO->id && 
800         LastTSO->why_blocked == NotBlocked && 
801         LastTSO->what_next != ThreadComplete) {
802       // if previously scheduled TSO not blocked we have to record the context switch
803       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805     }
806
807     if (RtsFlags.ParFlags.ParStats.Full && 
808         (emitSchedule /* forced emit */ ||
809         (t && LastTSO && t->id != LastTSO->id))) {
810       /* 
811          we are running a different TSO, so write a schedule event to log file
812          NB: If we use fair scheduling we also have to write  a deschedule 
813              event for LastTSO; with unfair scheduling we know that the
814              previous tso has blocked whenever we switch to another tso, so
815              we don't need it in GUM for now
816       */
817       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819       emitSchedule = rtsFalse;
820     }
821      
822 # endif
823 #else /* !GRAN && !PAR */
824   
825     // grab a thread from the run queue
826     ASSERT(run_queue_hd != END_TSO_QUEUE);
827     POP_RUN_QUEUE(t);
828
829     // Sanity check the thread we're about to run.  This can be
830     // expensive if there is lots of thread switching going on...
831     IF_DEBUG(sanity,checkTSO(t));
832 #endif
833
834 #ifdef THREADED_RTS
835     {
836       StgMainThread *m = t->main;
837       
838       if(m)
839       {
840         if(m == mainThread)
841         {
842           IF_DEBUG(scheduler,
843             sched_belch("### Running thread %d in bound thread", t->id));
844           // yes, the Haskell thread is bound to the current native thread
845         }
846         else
847         {
848           IF_DEBUG(scheduler,
849             sched_belch("### thread %d bound to another OS thread", t->id));
850           // no, bound to a different Haskell thread: pass to that thread
851           PUSH_ON_RUN_QUEUE(t);
852           passCapability(&m->bound_thread_cond);
853           continue;
854         }
855       }
856       else
857       {
858         if(mainThread != NULL)
859         // The thread we want to run is bound.
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### this OS thread cannot run thread %d", t->id));
863           // no, the current native thread is bound to a different
864           // Haskell thread, so pass it to any worker thread
865           PUSH_ON_RUN_QUEUE(t);
866           passCapabilityToWorker();
867           continue; 
868         }
869       }
870     }
871 #endif
872
873     cap->r.rCurrentTSO = t;
874     
875     /* context switches are now initiated by the timer signal, unless
876      * the user specified "context switch as often as possible", with
877      * +RTS -C0
878      */
879     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880          && (run_queue_hd != END_TSO_QUEUE
881              || blocked_queue_hd != END_TSO_QUEUE
882              || sleeping_queue != END_TSO_QUEUE)))
883         context_switch = 1;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               (long)t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,debugBelch("sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
971                                    (long)t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1031                                (long)t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1063                                (long)t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084       // Reset the context switch flag.  We don't do this just before
1085       // running the thread, because that would mean we would lose ticks
1086       // during GC, which can lead to unfair scheduling (a thread hogs
1087       // the CPU because the tick always arrives during GC).  This way
1088       // penalises threads that do a lot of allocation, but that seems
1089       // better than the alternative.
1090       context_switch = 0;
1091
1092 #if defined(GRAN)
1093       IF_DEBUG(gran, 
1094                DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_yields++;
1096 #elif defined(PAR)
1097       // IF_DEBUG(par, 
1098       // DumpGranEvent(GR_DESCHEDULE, t);
1099       globalParStats.tot_yields++;
1100 #endif
1101       /* put the thread back on the run queue.  Then, if we're ready to
1102        * GC, check whether this is the last task to stop.  If so, wake
1103        * up the GC thread.  getThread will block during a GC until the
1104        * GC is finished.
1105        */
1106       IF_DEBUG(scheduler,
1107                if (t->what_next != prev_what_next) {
1108                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1109                          (long)t->id, whatNext_strs[t->what_next]);
1110                } else {
1111                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1112                          (long)t->id, whatNext_strs[t->what_next]);
1113                }
1114                );
1115
1116       IF_DEBUG(sanity,
1117                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1118                checkTSO(t));
1119       ASSERT(t->link == END_TSO_QUEUE);
1120
1121       // Shortcut if we're just switching evaluators: don't bother
1122       // doing stack squeezing (which can be expensive), just run the
1123       // thread.
1124       if (t->what_next != prev_what_next) {
1125           goto run_thread;
1126       }
1127
1128       threadPaused(t);
1129
1130 #if defined(GRAN)
1131       ASSERT(!is_on_queue(t,CurrentProc));
1132
1133       IF_DEBUG(sanity,
1134                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135                checkThreadQsSanity(rtsTrue));
1136 #endif
1137
1138 #if defined(PAR)
1139       if (RtsFlags.ParFlags.doFairScheduling) { 
1140         /* this does round-robin scheduling; good for concurrency */
1141         APPEND_TO_RUN_QUEUE(t);
1142       } else {
1143         /* this does unfair scheduling; good for parallelism */
1144         PUSH_ON_RUN_QUEUE(t);
1145       }
1146 #else
1147       // this does round-robin scheduling; good for concurrency
1148       APPEND_TO_RUN_QUEUE(t);
1149 #endif
1150
1151 #if defined(GRAN)
1152       /* add a ContinueThread event to actually process the thread */
1153       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1154                 ContinueThread,
1155                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1156       IF_GRAN_DEBUG(bq, 
1157                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1158                G_EVENTQ(0);
1159                G_CURR_THREADQ(0));
1160 #endif /* GRAN */
1161       break;
1162
1163     case ThreadBlocked:
1164 #if defined(GRAN)
1165       IF_DEBUG(scheduler,
1166                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1167                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1169
1170       // ??? needed; should emit block before
1171       IF_DEBUG(gran, 
1172                DumpGranEvent(GR_DESCHEDULE, t)); 
1173       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174       /*
1175         ngoq Dogh!
1176       ASSERT(procStatus[CurrentProc]==Busy || 
1177               ((procStatus[CurrentProc]==Fetching) && 
1178               (t->block_info.closure!=(StgClosure*)NULL)));
1179       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181             procStatus[CurrentProc]==Fetching)) 
1182         procStatus[CurrentProc] = Idle;
1183       */
1184 #elif defined(PAR)
1185       IF_DEBUG(scheduler,
1186                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1187                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188       IF_PAR_DEBUG(bq,
1189
1190                    if (t->block_info.closure!=(StgClosure*)NULL) 
1191                      print_bq(t->block_info.closure));
1192
1193       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194       blockThread(t);
1195
1196       /* whatever we schedule next, we must log that schedule */
1197       emitSchedule = rtsTrue;
1198
1199 #else /* !GRAN */
1200       /* don't need to do anything.  Either the thread is blocked on
1201        * I/O, in which case we'll have called addToBlockedQueue
1202        * previously, or it's blocked on an MVar or Blackhole, in which
1203        * case it'll be on the relevant queue already.
1204        */
1205       ASSERT(t->why_blocked != NotBlocked);
1206       IF_DEBUG(scheduler,
1207                debugBelch("--<< thread %d (%s) stopped: ", 
1208                        t->id, whatNext_strs[t->what_next]);
1209                printThreadBlockage(t);
1210                debugBelch("\n"));
1211
1212       /* Only for dumping event to log file 
1213          ToDo: do I need this in GranSim, too?
1214       blockThread(t);
1215       */
1216 #endif
1217       threadPaused(t);
1218       break;
1219
1220     case ThreadFinished:
1221       /* Need to check whether this was a main thread, and if so, signal
1222        * the task that started it with the return value.  If we have no
1223        * more main threads, we probably need to stop all the tasks until
1224        * we get a new one.
1225        */
1226       /* We also end up here if the thread kills itself with an
1227        * uncaught exception, see Exception.hc.
1228        */
1229       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1230                                t->id, whatNext_strs[t->what_next]));
1231 #if defined(GRAN)
1232       endThread(t, CurrentProc); // clean-up the thread
1233 #elif defined(PAR)
1234       /* For now all are advisory -- HWL */
1235       //if(t->priority==AdvisoryPriority) ??
1236       advisory_thread_count--;
1237       
1238 # ifdef DIST
1239       if(t->dist.priority==RevalPriority)
1240         FinishReval(t);
1241 # endif
1242       
1243       if (RtsFlags.ParFlags.ParStats.Full &&
1244           !RtsFlags.ParFlags.ParStats.Suppressed) 
1245         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1246 #endif
1247
1248       //
1249       // Check whether the thread that just completed was a main
1250       // thread, and if so return with the result.  
1251       //
1252       // There is an assumption here that all thread completion goes
1253       // through this point; we need to make sure that if a thread
1254       // ends up in the ThreadKilled state, that it stays on the run
1255       // queue so it can be dealt with here.
1256       //
1257       if (
1258 #if defined(RTS_SUPPORTS_THREADS)
1259           mainThread != NULL
1260 #else
1261           mainThread->tso == t
1262 #endif
1263           )
1264       {
1265           // We are a bound thread: this must be our thread that just
1266           // completed.
1267           ASSERT(mainThread->tso == t);
1268
1269           if (t->what_next == ThreadComplete) {
1270               if (mainThread->ret) {
1271                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1273               }
1274               mainThread->stat = Success;
1275           } else {
1276               if (mainThread->ret) {
1277                   *(mainThread->ret) = NULL;
1278               }
1279               if (was_interrupted) {
1280                   mainThread->stat = Interrupted;
1281               } else {
1282                   mainThread->stat = Killed;
1283               }
1284           }
1285 #ifdef DEBUG
1286           removeThreadLabel((StgWord)mainThread->tso->id);
1287 #endif
1288           if (mainThread->prev == NULL) {
1289               main_threads = mainThread->link;
1290           } else {
1291               mainThread->prev->link = mainThread->link;
1292           }
1293           if (mainThread->link != NULL) {
1294               mainThread->link->prev = NULL;
1295           }
1296           releaseCapability(cap);
1297           return;
1298       }
1299
1300 #ifdef RTS_SUPPORTS_THREADS
1301       ASSERT(t->main == NULL);
1302 #else
1303       if (t->main != NULL) {
1304           // Must be a main thread that is not the topmost one.  Leave
1305           // it on the run queue until the stack has unwound to the
1306           // point where we can deal with this.  Leaving it on the run
1307           // queue also ensures that the garbage collector knows about
1308           // this thread and its return value (it gets dropped from the
1309           // all_threads list so there's no other way to find it).
1310           APPEND_TO_RUN_QUEUE(t);
1311       }
1312 #endif
1313       break;
1314
1315     default:
1316       barf("schedule: invalid thread return code %d", (int)ret);
1317     }
1318
1319 #ifdef PROFILING
1320     // When we have +RTS -i0 and we're heap profiling, do a census at
1321     // every GC.  This lets us get repeatable runs for debugging.
1322     if (performHeapProfile ||
1323         (RtsFlags.ProfFlags.profileInterval==0 &&
1324          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325         GarbageCollect(GetRoots, rtsTrue);
1326         heapCensus();
1327         performHeapProfile = rtsFalse;
1328         ready_to_gc = rtsFalse; // we already GC'd
1329     }
1330 #endif
1331
1332     if (ready_to_gc) {
1333       /* Kick any transactions which are invalid back to their atomically frames.
1334        * When next scheduled they will try to commit, this commit will fail and
1335        * they will retry. */
1336       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1337         if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1338           if (!stmValidateTransaction (t -> trec)) {
1339             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1340
1341             // strip the stack back to the ATOMICALLY_FRAME, aborting
1342             // the (nested) transaction, and saving the stack of any
1343             // partially-evaluated thunks on the heap.
1344             raiseAsync_(t, NULL, rtsTrue);
1345             
1346             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1347           }
1348         }
1349       }
1350
1351       /* everybody back, start the GC.
1352        * Could do it in this thread, or signal a condition var
1353        * to do it in another thread.  Either way, we need to
1354        * broadcast on gc_pending_cond afterward.
1355        */
1356 #if defined(RTS_SUPPORTS_THREADS)
1357       IF_DEBUG(scheduler,sched_belch("doing GC"));
1358 #endif
1359       GarbageCollect(GetRoots,rtsFalse);
1360       ready_to_gc = rtsFalse;
1361 #if defined(GRAN)
1362       /* add a ContinueThread event to continue execution of current thread */
1363       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1364                 ContinueThread,
1365                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1366       IF_GRAN_DEBUG(bq, 
1367                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1368                G_EVENTQ(0);
1369                G_CURR_THREADQ(0));
1370 #endif /* GRAN */
1371     }
1372
1373 #if defined(GRAN)
1374   next_thread:
1375     IF_GRAN_DEBUG(unused,
1376                   print_eventq(EventHd));
1377
1378     event = get_next_event();
1379 #elif defined(PAR)
1380   next_thread:
1381     /* ToDo: wait for next message to arrive rather than busy wait */
1382 #endif /* GRAN */
1383
1384   } /* end of while(1) */
1385
1386   IF_PAR_DEBUG(verbose,
1387                debugBelch("== Leaving schedule() after having received Finish\n"));
1388 }
1389
1390 /* ---------------------------------------------------------------------------
1391  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1392  * used by Control.Concurrent for error checking.
1393  * ------------------------------------------------------------------------- */
1394  
1395 StgBool
1396 rtsSupportsBoundThreads(void)
1397 {
1398 #ifdef THREADED_RTS
1399   return rtsTrue;
1400 #else
1401   return rtsFalse;
1402 #endif
1403 }
1404
1405 /* ---------------------------------------------------------------------------
1406  * isThreadBound(tso): check whether tso is bound to an OS thread.
1407  * ------------------------------------------------------------------------- */
1408  
1409 StgBool
1410 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1411 {
1412 #ifdef THREADED_RTS
1413   return (tso->main != NULL);
1414 #endif
1415   return rtsFalse;
1416 }
1417
1418 /* ---------------------------------------------------------------------------
1419  * Singleton fork(). Do not copy any running threads.
1420  * ------------------------------------------------------------------------- */
1421
1422 #ifndef mingw32_TARGET_OS
1423 #define FORKPROCESS_PRIMOP_SUPPORTED
1424 #endif
1425
1426 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1427 static void 
1428 deleteThreadImmediately(StgTSO *tso);
1429 #endif
1430 StgInt
1431 forkProcess(HsStablePtr *entry
1432 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1433             STG_UNUSED
1434 #endif
1435            )
1436 {
1437 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1438   pid_t pid;
1439   StgTSO* t,*next;
1440   StgMainThread *m;
1441   SchedulerStatus rc;
1442
1443   IF_DEBUG(scheduler,sched_belch("forking!"));
1444   rts_lock(); // This not only acquires sched_mutex, it also
1445               // makes sure that no other threads are running
1446
1447   pid = fork();
1448
1449   if (pid) { /* parent */
1450
1451   /* just return the pid */
1452     rts_unlock();
1453     return pid;
1454     
1455   } else { /* child */
1456     
1457     
1458       // delete all threads
1459     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1460     
1461     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1462       next = t->link;
1463
1464         // don't allow threads to catch the ThreadKilled exception
1465       deleteThreadImmediately(t);
1466     }
1467     
1468       // wipe the main thread list
1469     while((m = main_threads) != NULL) {
1470       main_threads = m->link;
1471 # ifdef THREADED_RTS
1472       closeCondition(&m->bound_thread_cond);
1473 # endif
1474       stgFree(m);
1475     }
1476     
1477     rc = rts_evalStableIO(entry, NULL);  // run the action
1478     rts_checkSchedStatus("forkProcess",rc);
1479     
1480     rts_unlock();
1481     
1482     hs_exit();                      // clean up and exit
1483     stg_exit(0);
1484   }
1485 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1486   barf("forkProcess#: primop not supported, sorry!\n");
1487   return -1;
1488 #endif
1489 }
1490
1491 /* ---------------------------------------------------------------------------
1492  * deleteAllThreads():  kill all the live threads.
1493  *
1494  * This is used when we catch a user interrupt (^C), before performing
1495  * any necessary cleanups and running finalizers.
1496  *
1497  * Locks: sched_mutex held.
1498  * ------------------------------------------------------------------------- */
1499    
1500 void
1501 deleteAllThreads ( void )
1502 {
1503   StgTSO* t, *next;
1504   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1505   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1506       next = t->global_link;
1507       deleteThread(t);
1508   }      
1509
1510   // The run queue now contains a bunch of ThreadKilled threads.  We
1511   // must not throw these away: the main thread(s) will be in there
1512   // somewhere, and the main scheduler loop has to deal with it.
1513   // Also, the run queue is the only thing keeping these threads from
1514   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1515
1516   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1517   ASSERT(sleeping_queue == END_TSO_QUEUE);
1518 }
1519
1520 /* startThread and  insertThread are now in GranSim.c -- HWL */
1521
1522
1523 /* ---------------------------------------------------------------------------
1524  * Suspending & resuming Haskell threads.
1525  * 
1526  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1527  * its capability before calling the C function.  This allows another
1528  * task to pick up the capability and carry on running Haskell
1529  * threads.  It also means that if the C call blocks, it won't lock
1530  * the whole system.
1531  *
1532  * The Haskell thread making the C call is put to sleep for the
1533  * duration of the call, on the susepended_ccalling_threads queue.  We
1534  * give out a token to the task, which it can use to resume the thread
1535  * on return from the C function.
1536  * ------------------------------------------------------------------------- */
1537    
1538 StgInt
1539 suspendThread( StgRegTable *reg )
1540 {
1541   nat tok;
1542   Capability *cap;
1543   int saved_errno = errno;
1544
1545   /* assume that *reg is a pointer to the StgRegTable part
1546    * of a Capability.
1547    */
1548   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1549
1550   ACQUIRE_LOCK(&sched_mutex);
1551
1552   IF_DEBUG(scheduler,
1553            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1554
1555   // XXX this might not be necessary --SDM
1556   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1557
1558   threadPaused(cap->r.rCurrentTSO);
1559   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1560   suspended_ccalling_threads = cap->r.rCurrentTSO;
1561
1562   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1563       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1564       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1565   } else {
1566       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1567   }
1568
1569   /* Use the thread ID as the token; it should be unique */
1570   tok = cap->r.rCurrentTSO->id;
1571
1572   /* Hand back capability */
1573   releaseCapability(cap);
1574   
1575 #if defined(RTS_SUPPORTS_THREADS)
1576   /* Preparing to leave the RTS, so ensure there's a native thread/task
1577      waiting to take over.
1578   */
1579   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1580 #endif
1581
1582   RELEASE_LOCK(&sched_mutex);
1583   
1584   errno = saved_errno;
1585   return tok; 
1586 }
1587
1588 StgRegTable *
1589 resumeThread( StgInt tok )
1590 {
1591   StgTSO *tso, **prev;
1592   Capability *cap;
1593   int saved_errno = errno;
1594
1595 #if defined(RTS_SUPPORTS_THREADS)
1596   /* Wait for permission to re-enter the RTS with the result. */
1597   ACQUIRE_LOCK(&sched_mutex);
1598   waitForReturnCapability(&sched_mutex, &cap);
1599
1600   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1601 #else
1602   grabCapability(&cap);
1603 #endif
1604
1605   /* Remove the thread off of the suspended list */
1606   prev = &suspended_ccalling_threads;
1607   for (tso = suspended_ccalling_threads; 
1608        tso != END_TSO_QUEUE; 
1609        prev = &tso->link, tso = tso->link) {
1610     if (tso->id == (StgThreadID)tok) {
1611       *prev = tso->link;
1612       break;
1613     }
1614   }
1615   if (tso == END_TSO_QUEUE) {
1616     barf("resumeThread: thread not found");
1617   }
1618   tso->link = END_TSO_QUEUE;
1619   
1620   if(tso->why_blocked == BlockedOnCCall) {
1621       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1622       tso->blocked_exceptions = NULL;
1623   }
1624   
1625   /* Reset blocking status */
1626   tso->why_blocked  = NotBlocked;
1627
1628   cap->r.rCurrentTSO = tso;
1629   RELEASE_LOCK(&sched_mutex);
1630   errno = saved_errno;
1631   return &cap->r;
1632 }
1633
1634
1635 /* ---------------------------------------------------------------------------
1636  * Static functions
1637  * ------------------------------------------------------------------------ */
1638 static void unblockThread(StgTSO *tso);
1639
1640 /* ---------------------------------------------------------------------------
1641  * Comparing Thread ids.
1642  *
1643  * This is used from STG land in the implementation of the
1644  * instances of Eq/Ord for ThreadIds.
1645  * ------------------------------------------------------------------------ */
1646
1647 int
1648 cmp_thread(StgPtr tso1, StgPtr tso2) 
1649
1650   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1651   StgThreadID id2 = ((StgTSO *)tso2)->id;
1652  
1653   if (id1 < id2) return (-1);
1654   if (id1 > id2) return 1;
1655   return 0;
1656 }
1657
1658 /* ---------------------------------------------------------------------------
1659  * Fetching the ThreadID from an StgTSO.
1660  *
1661  * This is used in the implementation of Show for ThreadIds.
1662  * ------------------------------------------------------------------------ */
1663 int
1664 rts_getThreadId(StgPtr tso) 
1665 {
1666   return ((StgTSO *)tso)->id;
1667 }
1668
1669 #ifdef DEBUG
1670 void
1671 labelThread(StgPtr tso, char *label)
1672 {
1673   int len;
1674   void *buf;
1675
1676   /* Caveat: Once set, you can only set the thread name to "" */
1677   len = strlen(label)+1;
1678   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1679   strncpy(buf,label,len);
1680   /* Update will free the old memory for us */
1681   updateThreadLabel(((StgTSO *)tso)->id,buf);
1682 }
1683 #endif /* DEBUG */
1684
1685 /* ---------------------------------------------------------------------------
1686    Create a new thread.
1687
1688    The new thread starts with the given stack size.  Before the
1689    scheduler can run, however, this thread needs to have a closure
1690    (and possibly some arguments) pushed on its stack.  See
1691    pushClosure() in Schedule.h.
1692
1693    createGenThread() and createIOThread() (in SchedAPI.h) are
1694    convenient packaged versions of this function.
1695
1696    currently pri (priority) is only used in a GRAN setup -- HWL
1697    ------------------------------------------------------------------------ */
1698 #if defined(GRAN)
1699 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1700 StgTSO *
1701 createThread(nat size, StgInt pri)
1702 #else
1703 StgTSO *
1704 createThread(nat size)
1705 #endif
1706 {
1707
1708     StgTSO *tso;
1709     nat stack_size;
1710
1711     /* First check whether we should create a thread at all */
1712 #if defined(PAR)
1713   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1714   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1715     threadsIgnored++;
1716     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1717           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1718     return END_TSO_QUEUE;
1719   }
1720   threadsCreated++;
1721 #endif
1722
1723 #if defined(GRAN)
1724   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1725 #endif
1726
1727   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1728
1729   /* catch ridiculously small stack sizes */
1730   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1731     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1732   }
1733
1734   stack_size = size - TSO_STRUCT_SIZEW;
1735
1736   tso = (StgTSO *)allocate(size);
1737   TICK_ALLOC_TSO(stack_size, 0);
1738
1739   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1740 #if defined(GRAN)
1741   SET_GRAN_HDR(tso, ThisPE);
1742 #endif
1743
1744   // Always start with the compiled code evaluator
1745   tso->what_next = ThreadRunGHC;
1746
1747   tso->id = next_thread_id++; 
1748   tso->why_blocked  = NotBlocked;
1749   tso->blocked_exceptions = NULL;
1750
1751   tso->saved_errno = 0;
1752   tso->main = NULL;
1753   
1754   tso->stack_size   = stack_size;
1755   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1756                               - TSO_STRUCT_SIZEW;
1757   tso->sp           = (P_)&(tso->stack) + stack_size;
1758
1759   tso->trec = NO_TREC;
1760
1761 #ifdef PROFILING
1762   tso->prof.CCCS = CCS_MAIN;
1763 #endif
1764
1765   /* put a stop frame on the stack */
1766   tso->sp -= sizeofW(StgStopFrame);
1767   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1768   tso->link = END_TSO_QUEUE;
1769
1770   // ToDo: check this
1771 #if defined(GRAN)
1772   /* uses more flexible routine in GranSim */
1773   insertThread(tso, CurrentProc);
1774 #else
1775   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1776    * from its creation
1777    */
1778 #endif
1779
1780 #if defined(GRAN) 
1781   if (RtsFlags.GranFlags.GranSimStats.Full) 
1782     DumpGranEvent(GR_START,tso);
1783 #elif defined(PAR)
1784   if (RtsFlags.ParFlags.ParStats.Full) 
1785     DumpGranEvent(GR_STARTQ,tso);
1786   /* HACk to avoid SCHEDULE 
1787      LastTSO = tso; */
1788 #endif
1789
1790   /* Link the new thread on the global thread list.
1791    */
1792   tso->global_link = all_threads;
1793   all_threads = tso;
1794
1795 #if defined(DIST)
1796   tso->dist.priority = MandatoryPriority; //by default that is...
1797 #endif
1798
1799 #if defined(GRAN)
1800   tso->gran.pri = pri;
1801 # if defined(DEBUG)
1802   tso->gran.magic = TSO_MAGIC; // debugging only
1803 # endif
1804   tso->gran.sparkname   = 0;
1805   tso->gran.startedat   = CURRENT_TIME; 
1806   tso->gran.exported    = 0;
1807   tso->gran.basicblocks = 0;
1808   tso->gran.allocs      = 0;
1809   tso->gran.exectime    = 0;
1810   tso->gran.fetchtime   = 0;
1811   tso->gran.fetchcount  = 0;
1812   tso->gran.blocktime   = 0;
1813   tso->gran.blockcount  = 0;
1814   tso->gran.blockedat   = 0;
1815   tso->gran.globalsparks = 0;
1816   tso->gran.localsparks  = 0;
1817   if (RtsFlags.GranFlags.Light)
1818     tso->gran.clock  = Now; /* local clock */
1819   else
1820     tso->gran.clock  = 0;
1821
1822   IF_DEBUG(gran,printTSO(tso));
1823 #elif defined(PAR)
1824 # if defined(DEBUG)
1825   tso->par.magic = TSO_MAGIC; // debugging only
1826 # endif
1827   tso->par.sparkname   = 0;
1828   tso->par.startedat   = CURRENT_TIME; 
1829   tso->par.exported    = 0;
1830   tso->par.basicblocks = 0;
1831   tso->par.allocs      = 0;
1832   tso->par.exectime    = 0;
1833   tso->par.fetchtime   = 0;
1834   tso->par.fetchcount  = 0;
1835   tso->par.blocktime   = 0;
1836   tso->par.blockcount  = 0;
1837   tso->par.blockedat   = 0;
1838   tso->par.globalsparks = 0;
1839   tso->par.localsparks  = 0;
1840 #endif
1841
1842 #if defined(GRAN)
1843   globalGranStats.tot_threads_created++;
1844   globalGranStats.threads_created_on_PE[CurrentProc]++;
1845   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1846   globalGranStats.tot_sq_probes++;
1847 #elif defined(PAR)
1848   // collect parallel global statistics (currently done together with GC stats)
1849   if (RtsFlags.ParFlags.ParStats.Global &&
1850       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1851     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1852     globalParStats.tot_threads_created++;
1853   }
1854 #endif 
1855
1856 #if defined(GRAN)
1857   IF_GRAN_DEBUG(pri,
1858                 sched_belch("==__ schedule: Created TSO %d (%p);",
1859                       CurrentProc, tso, tso->id));
1860 #elif defined(PAR)
1861     IF_PAR_DEBUG(verbose,
1862                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1863                        (long)tso->id, tso, advisory_thread_count));
1864 #else
1865   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1866                                 (long)tso->id, (long)tso->stack_size));
1867 #endif    
1868   return tso;
1869 }
1870
1871 #if defined(PAR)
1872 /* RFP:
1873    all parallel thread creation calls should fall through the following routine.
1874 */
1875 StgTSO *
1876 createSparkThread(rtsSpark spark) 
1877 { StgTSO *tso;
1878   ASSERT(spark != (rtsSpark)NULL);
1879   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1880   { threadsIgnored++;
1881     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1882           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1883     return END_TSO_QUEUE;
1884   }
1885   else
1886   { threadsCreated++;
1887     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1888     if (tso==END_TSO_QUEUE)     
1889       barf("createSparkThread: Cannot create TSO");
1890 #if defined(DIST)
1891     tso->priority = AdvisoryPriority;
1892 #endif
1893     pushClosure(tso,spark);
1894     PUSH_ON_RUN_QUEUE(tso);
1895     advisory_thread_count++;    
1896   }
1897   return tso;
1898 }
1899 #endif
1900
1901 /*
1902   Turn a spark into a thread.
1903   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1904 */
1905 #if defined(PAR)
1906 StgTSO *
1907 activateSpark (rtsSpark spark) 
1908 {
1909   StgTSO *tso;
1910
1911   tso = createSparkThread(spark);
1912   if (RtsFlags.ParFlags.ParStats.Full) {   
1913     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1914     IF_PAR_DEBUG(verbose,
1915                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1916                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1917   }
1918   // ToDo: fwd info on local/global spark to thread -- HWL
1919   // tso->gran.exported =  spark->exported;
1920   // tso->gran.locked =   !spark->global;
1921   // tso->gran.sparkname = spark->name;
1922
1923   return tso;
1924 }
1925 #endif
1926
1927 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1928                                    Capability *initialCapability
1929                                    );
1930
1931
1932 /* ---------------------------------------------------------------------------
1933  * scheduleThread()
1934  *
1935  * scheduleThread puts a thread on the head of the runnable queue.
1936  * This will usually be done immediately after a thread is created.
1937  * The caller of scheduleThread must create the thread using e.g.
1938  * createThread and push an appropriate closure
1939  * on this thread's stack before the scheduler is invoked.
1940  * ------------------------------------------------------------------------ */
1941
1942 static void scheduleThread_ (StgTSO* tso);
1943
1944 void
1945 scheduleThread_(StgTSO *tso)
1946 {
1947   // The thread goes at the *end* of the run-queue, to avoid possible
1948   // starvation of any threads already on the queue.
1949   APPEND_TO_RUN_QUEUE(tso);
1950   threadRunnable();
1951 }
1952
1953 void
1954 scheduleThread(StgTSO* tso)
1955 {
1956   ACQUIRE_LOCK(&sched_mutex);
1957   scheduleThread_(tso);
1958   RELEASE_LOCK(&sched_mutex);
1959 }
1960
1961 #if defined(RTS_SUPPORTS_THREADS)
1962 static Condition bound_cond_cache;
1963 static int bound_cond_cache_full = 0;
1964 #endif
1965
1966
1967 SchedulerStatus
1968 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1969                    Capability *initialCapability)
1970 {
1971     // Precondition: sched_mutex must be held
1972     StgMainThread *m;
1973
1974     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1975     m->tso = tso;
1976     tso->main = m;
1977     m->ret = ret;
1978     m->stat = NoStatus;
1979     m->link = main_threads;
1980     m->prev = NULL;
1981     if (main_threads != NULL) {
1982         main_threads->prev = m;
1983     }
1984     main_threads = m;
1985
1986 #if defined(RTS_SUPPORTS_THREADS)
1987     // Allocating a new condition for each thread is expensive, so we
1988     // cache one.  This is a pretty feeble hack, but it helps speed up
1989     // consecutive call-ins quite a bit.
1990     if (bound_cond_cache_full) {
1991         m->bound_thread_cond = bound_cond_cache;
1992         bound_cond_cache_full = 0;
1993     } else {
1994         initCondition(&m->bound_thread_cond);
1995     }
1996 #endif
1997
1998     /* Put the thread on the main-threads list prior to scheduling the TSO.
1999        Failure to do so introduces a race condition in the MT case (as
2000        identified by Wolfgang Thaller), whereby the new task/OS thread 
2001        created by scheduleThread_() would complete prior to the thread
2002        that spawned it managed to put 'itself' on the main-threads list.
2003        The upshot of it all being that the worker thread wouldn't get to
2004        signal the completion of the its work item for the main thread to
2005        see (==> it got stuck waiting.)    -- sof 6/02.
2006     */
2007     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2008     
2009     APPEND_TO_RUN_QUEUE(tso);
2010     // NB. Don't call threadRunnable() here, because the thread is
2011     // bound and only runnable by *this* OS thread, so waking up other
2012     // workers will just slow things down.
2013
2014     return waitThread_(m, initialCapability);
2015 }
2016
2017 /* ---------------------------------------------------------------------------
2018  * initScheduler()
2019  *
2020  * Initialise the scheduler.  This resets all the queues - if the
2021  * queues contained any threads, they'll be garbage collected at the
2022  * next pass.
2023  *
2024  * ------------------------------------------------------------------------ */
2025
2026 void 
2027 initScheduler(void)
2028 {
2029 #if defined(GRAN)
2030   nat i;
2031
2032   for (i=0; i<=MAX_PROC; i++) {
2033     run_queue_hds[i]      = END_TSO_QUEUE;
2034     run_queue_tls[i]      = END_TSO_QUEUE;
2035     blocked_queue_hds[i]  = END_TSO_QUEUE;
2036     blocked_queue_tls[i]  = END_TSO_QUEUE;
2037     ccalling_threadss[i]  = END_TSO_QUEUE;
2038     sleeping_queue        = END_TSO_QUEUE;
2039   }
2040 #else
2041   run_queue_hd      = END_TSO_QUEUE;
2042   run_queue_tl      = END_TSO_QUEUE;
2043   blocked_queue_hd  = END_TSO_QUEUE;
2044   blocked_queue_tl  = END_TSO_QUEUE;
2045   sleeping_queue    = END_TSO_QUEUE;
2046 #endif 
2047
2048   suspended_ccalling_threads  = END_TSO_QUEUE;
2049
2050   main_threads = NULL;
2051   all_threads  = END_TSO_QUEUE;
2052
2053   context_switch = 0;
2054   interrupted    = 0;
2055
2056   RtsFlags.ConcFlags.ctxtSwitchTicks =
2057       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2058       
2059 #if defined(RTS_SUPPORTS_THREADS)
2060   /* Initialise the mutex and condition variables used by
2061    * the scheduler. */
2062   initMutex(&sched_mutex);
2063   initMutex(&term_mutex);
2064 #endif
2065   
2066   ACQUIRE_LOCK(&sched_mutex);
2067
2068   /* A capability holds the state a native thread needs in
2069    * order to execute STG code. At least one capability is
2070    * floating around (only SMP builds have more than one).
2071    */
2072   initCapabilities();
2073   
2074 #if defined(RTS_SUPPORTS_THREADS)
2075     /* start our haskell execution tasks */
2076     startTaskManager(0,taskStart);
2077 #endif
2078
2079 #if /* defined(SMP) ||*/ defined(PAR)
2080   initSparkPools();
2081 #endif
2082
2083   RELEASE_LOCK(&sched_mutex);
2084 }
2085
2086 void
2087 exitScheduler( void )
2088 {
2089 #if defined(RTS_SUPPORTS_THREADS)
2090   stopTaskManager();
2091 #endif
2092   shutting_down_scheduler = rtsTrue;
2093 }
2094
2095 /* ----------------------------------------------------------------------------
2096    Managing the per-task allocation areas.
2097    
2098    Each capability comes with an allocation area.  These are
2099    fixed-length block lists into which allocation can be done.
2100
2101    ToDo: no support for two-space collection at the moment???
2102    ------------------------------------------------------------------------- */
2103
2104 static
2105 SchedulerStatus
2106 waitThread_(StgMainThread* m, Capability *initialCapability)
2107 {
2108   SchedulerStatus stat;
2109
2110   // Precondition: sched_mutex must be held.
2111   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2112
2113 #if defined(GRAN)
2114   /* GranSim specific init */
2115   CurrentTSO = m->tso;                // the TSO to run
2116   procStatus[MainProc] = Busy;        // status of main PE
2117   CurrentProc = MainProc;             // PE to run it on
2118   schedule(m,initialCapability);
2119 #else
2120   schedule(m,initialCapability);
2121   ASSERT(m->stat != NoStatus);
2122 #endif
2123
2124   stat = m->stat;
2125
2126 #if defined(RTS_SUPPORTS_THREADS)
2127   // Free the condition variable, returning it to the cache if possible.
2128   if (!bound_cond_cache_full) {
2129       bound_cond_cache = m->bound_thread_cond;
2130       bound_cond_cache_full = 1;
2131   } else {
2132       closeCondition(&m->bound_thread_cond);
2133   }
2134 #endif
2135
2136   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2137   stgFree(m);
2138
2139   // Postcondition: sched_mutex still held
2140   return stat;
2141 }
2142
2143 /* ---------------------------------------------------------------------------
2144    Where are the roots that we know about?
2145
2146         - all the threads on the runnable queue
2147         - all the threads on the blocked queue
2148         - all the threads on the sleeping queue
2149         - all the thread currently executing a _ccall_GC
2150         - all the "main threads"
2151      
2152    ------------------------------------------------------------------------ */
2153
2154 /* This has to be protected either by the scheduler monitor, or by the
2155         garbage collection monitor (probably the latter).
2156         KH @ 25/10/99
2157 */
2158
2159 void
2160 GetRoots( evac_fn evac )
2161 {
2162 #if defined(GRAN)
2163   {
2164     nat i;
2165     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2166       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2167           evac((StgClosure **)&run_queue_hds[i]);
2168       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2169           evac((StgClosure **)&run_queue_tls[i]);
2170       
2171       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2172           evac((StgClosure **)&blocked_queue_hds[i]);
2173       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2174           evac((StgClosure **)&blocked_queue_tls[i]);
2175       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2176           evac((StgClosure **)&ccalling_threads[i]);
2177     }
2178   }
2179
2180   markEventQueue();
2181
2182 #else /* !GRAN */
2183   if (run_queue_hd != END_TSO_QUEUE) {
2184       ASSERT(run_queue_tl != END_TSO_QUEUE);
2185       evac((StgClosure **)&run_queue_hd);
2186       evac((StgClosure **)&run_queue_tl);
2187   }
2188   
2189   if (blocked_queue_hd != END_TSO_QUEUE) {
2190       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2191       evac((StgClosure **)&blocked_queue_hd);
2192       evac((StgClosure **)&blocked_queue_tl);
2193   }
2194   
2195   if (sleeping_queue != END_TSO_QUEUE) {
2196       evac((StgClosure **)&sleeping_queue);
2197   }
2198 #endif 
2199
2200   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2201       evac((StgClosure **)&suspended_ccalling_threads);
2202   }
2203
2204 #if defined(PAR) || defined(GRAN)
2205   markSparkQueue(evac);
2206 #endif
2207
2208 #if defined(RTS_USER_SIGNALS)
2209   // mark the signal handlers (signals should be already blocked)
2210   markSignalHandlers(evac);
2211 #endif
2212 }
2213
2214 /* -----------------------------------------------------------------------------
2215    performGC
2216
2217    This is the interface to the garbage collector from Haskell land.
2218    We provide this so that external C code can allocate and garbage
2219    collect when called from Haskell via _ccall_GC.
2220
2221    It might be useful to provide an interface whereby the programmer
2222    can specify more roots (ToDo).
2223    
2224    This needs to be protected by the GC condition variable above.  KH.
2225    -------------------------------------------------------------------------- */
2226
2227 static void (*extra_roots)(evac_fn);
2228
2229 void
2230 performGC(void)
2231 {
2232   /* Obligated to hold this lock upon entry */
2233   ACQUIRE_LOCK(&sched_mutex);
2234   GarbageCollect(GetRoots,rtsFalse);
2235   RELEASE_LOCK(&sched_mutex);
2236 }
2237
2238 void
2239 performMajorGC(void)
2240 {
2241   ACQUIRE_LOCK(&sched_mutex);
2242   GarbageCollect(GetRoots,rtsTrue);
2243   RELEASE_LOCK(&sched_mutex);
2244 }
2245
2246 static void
2247 AllRoots(evac_fn evac)
2248 {
2249     GetRoots(evac);             // the scheduler's roots
2250     extra_roots(evac);          // the user's roots
2251 }
2252
2253 void
2254 performGCWithRoots(void (*get_roots)(evac_fn))
2255 {
2256   ACQUIRE_LOCK(&sched_mutex);
2257   extra_roots = get_roots;
2258   GarbageCollect(AllRoots,rtsFalse);
2259   RELEASE_LOCK(&sched_mutex);
2260 }
2261
2262 /* -----------------------------------------------------------------------------
2263    Stack overflow
2264
2265    If the thread has reached its maximum stack size, then raise the
2266    StackOverflow exception in the offending thread.  Otherwise
2267    relocate the TSO into a larger chunk of memory and adjust its stack
2268    size appropriately.
2269    -------------------------------------------------------------------------- */
2270
2271 static StgTSO *
2272 threadStackOverflow(StgTSO *tso)
2273 {
2274   nat new_stack_size, new_tso_size, stack_words;
2275   StgPtr new_sp;
2276   StgTSO *dest;
2277
2278   IF_DEBUG(sanity,checkTSO(tso));
2279   if (tso->stack_size >= tso->max_stack_size) {
2280
2281     IF_DEBUG(gc,
2282              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2283                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2284              /* If we're debugging, just print out the top of the stack */
2285              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2286                                               tso->sp+64)));
2287
2288     /* Send this thread the StackOverflow exception */
2289     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2290     return tso;
2291   }
2292
2293   /* Try to double the current stack size.  If that takes us over the
2294    * maximum stack size for this thread, then use the maximum instead.
2295    * Finally round up so the TSO ends up as a whole number of blocks.
2296    */
2297   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2298   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2299                                        TSO_STRUCT_SIZE)/sizeof(W_);
2300   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2301   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2302
2303   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2304
2305   dest = (StgTSO *)allocate(new_tso_size);
2306   TICK_ALLOC_TSO(new_stack_size,0);
2307
2308   /* copy the TSO block and the old stack into the new area */
2309   memcpy(dest,tso,TSO_STRUCT_SIZE);
2310   stack_words = tso->stack + tso->stack_size - tso->sp;
2311   new_sp = (P_)dest + new_tso_size - stack_words;
2312   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2313
2314   /* relocate the stack pointers... */
2315   dest->sp         = new_sp;
2316   dest->stack_size = new_stack_size;
2317         
2318   /* Mark the old TSO as relocated.  We have to check for relocated
2319    * TSOs in the garbage collector and any primops that deal with TSOs.
2320    *
2321    * It's important to set the sp value to just beyond the end
2322    * of the stack, so we don't attempt to scavenge any part of the
2323    * dead TSO's stack.
2324    */
2325   tso->what_next = ThreadRelocated;
2326   tso->link = dest;
2327   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2328   tso->why_blocked = NotBlocked;
2329   dest->mut_link = NULL;
2330
2331   IF_PAR_DEBUG(verbose,
2332                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2333                      tso->id, tso, tso->stack_size);
2334                /* If we're debugging, just print out the top of the stack */
2335                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2336                                                 tso->sp+64)));
2337   
2338   IF_DEBUG(sanity,checkTSO(tso));
2339 #if 0
2340   IF_DEBUG(scheduler,printTSO(dest));
2341 #endif
2342
2343   return dest;
2344 }
2345
2346 /* ---------------------------------------------------------------------------
2347    Wake up a queue that was blocked on some resource.
2348    ------------------------------------------------------------------------ */
2349
2350 #if defined(GRAN)
2351 STATIC_INLINE void
2352 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2353 {
2354 }
2355 #elif defined(PAR)
2356 STATIC_INLINE void
2357 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2358 {
2359   /* write RESUME events to log file and
2360      update blocked and fetch time (depending on type of the orig closure) */
2361   if (RtsFlags.ParFlags.ParStats.Full) {
2362     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2363                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2364                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2365     if (EMPTY_RUN_QUEUE())
2366       emitSchedule = rtsTrue;
2367
2368     switch (get_itbl(node)->type) {
2369         case FETCH_ME_BQ:
2370           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2371           break;
2372         case RBH:
2373         case FETCH_ME:
2374         case BLACKHOLE_BQ:
2375           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2376           break;
2377 #ifdef DIST
2378         case MVAR:
2379           break;
2380 #endif    
2381         default:
2382           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2383         }
2384       }
2385 }
2386 #endif
2387
2388 #if defined(GRAN)
2389 static StgBlockingQueueElement *
2390 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2391 {
2392     StgTSO *tso;
2393     PEs node_loc, tso_loc;
2394
2395     node_loc = where_is(node); // should be lifted out of loop
2396     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2397     tso_loc = where_is((StgClosure *)tso);
2398     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2399       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2400       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2401       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2402       // insertThread(tso, node_loc);
2403       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2404                 ResumeThread,
2405                 tso, node, (rtsSpark*)NULL);
2406       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2407       // len_local++;
2408       // len++;
2409     } else { // TSO is remote (actually should be FMBQ)
2410       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2411                                   RtsFlags.GranFlags.Costs.gunblocktime +
2412                                   RtsFlags.GranFlags.Costs.latency;
2413       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2414                 UnblockThread,
2415                 tso, node, (rtsSpark*)NULL);
2416       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2417       // len++;
2418     }
2419     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2420     IF_GRAN_DEBUG(bq,
2421                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2422                           (node_loc==tso_loc ? "Local" : "Global"), 
2423                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2424     tso->block_info.closure = NULL;
2425     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2426                              tso->id, tso));
2427 }
2428 #elif defined(PAR)
2429 static StgBlockingQueueElement *
2430 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2431 {
2432     StgBlockingQueueElement *next;
2433
2434     switch (get_itbl(bqe)->type) {
2435     case TSO:
2436       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2437       /* if it's a TSO just push it onto the run_queue */
2438       next = bqe->link;
2439       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2440       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2441       threadRunnable();
2442       unblockCount(bqe, node);
2443       /* reset blocking status after dumping event */
2444       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2445       break;
2446
2447     case BLOCKED_FETCH:
2448       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2449       next = bqe->link;
2450       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2451       PendingFetches = (StgBlockedFetch *)bqe;
2452       break;
2453
2454 # if defined(DEBUG)
2455       /* can ignore this case in a non-debugging setup; 
2456          see comments on RBHSave closures above */
2457     case CONSTR:
2458       /* check that the closure is an RBHSave closure */
2459       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2460              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2461              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2462       break;
2463
2464     default:
2465       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2466            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2467            (StgClosure *)bqe);
2468 # endif
2469     }
2470   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2471   return next;
2472 }
2473
2474 #else /* !GRAN && !PAR */
2475 static StgTSO *
2476 unblockOneLocked(StgTSO *tso)
2477 {
2478   StgTSO *next;
2479
2480   ASSERT(get_itbl(tso)->type == TSO);
2481   ASSERT(tso->why_blocked != NotBlocked);
2482   tso->why_blocked = NotBlocked;
2483   next = tso->link;
2484   tso->link = END_TSO_QUEUE;
2485   APPEND_TO_RUN_QUEUE(tso);
2486   threadRunnable();
2487   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2488   return next;
2489 }
2490 #endif
2491
2492 #if defined(GRAN) || defined(PAR)
2493 INLINE_ME StgBlockingQueueElement *
2494 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2495 {
2496   ACQUIRE_LOCK(&sched_mutex);
2497   bqe = unblockOneLocked(bqe, node);
2498   RELEASE_LOCK(&sched_mutex);
2499   return bqe;
2500 }
2501 #else
2502 INLINE_ME StgTSO *
2503 unblockOne(StgTSO *tso)
2504 {
2505   ACQUIRE_LOCK(&sched_mutex);
2506   tso = unblockOneLocked(tso);
2507   RELEASE_LOCK(&sched_mutex);
2508   return tso;
2509 }
2510 #endif
2511
2512 #if defined(GRAN)
2513 void 
2514 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2515 {
2516   StgBlockingQueueElement *bqe;
2517   PEs node_loc;
2518   nat len = 0; 
2519
2520   IF_GRAN_DEBUG(bq, 
2521                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2522                       node, CurrentProc, CurrentTime[CurrentProc], 
2523                       CurrentTSO->id, CurrentTSO));
2524
2525   node_loc = where_is(node);
2526
2527   ASSERT(q == END_BQ_QUEUE ||
2528          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2529          get_itbl(q)->type == CONSTR); // closure (type constructor)
2530   ASSERT(is_unique(node));
2531
2532   /* FAKE FETCH: magically copy the node to the tso's proc;
2533      no Fetch necessary because in reality the node should not have been 
2534      moved to the other PE in the first place
2535   */
2536   if (CurrentProc!=node_loc) {
2537     IF_GRAN_DEBUG(bq, 
2538                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2539                         node, node_loc, CurrentProc, CurrentTSO->id, 
2540                         // CurrentTSO, where_is(CurrentTSO),
2541                         node->header.gran.procs));
2542     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2543     IF_GRAN_DEBUG(bq, 
2544                   debugBelch("## new bitmask of node %p is %#x\n",
2545                         node, node->header.gran.procs));
2546     if (RtsFlags.GranFlags.GranSimStats.Global) {
2547       globalGranStats.tot_fake_fetches++;
2548     }
2549   }
2550
2551   bqe = q;
2552   // ToDo: check: ASSERT(CurrentProc==node_loc);
2553   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2554     //next = bqe->link;
2555     /* 
2556        bqe points to the current element in the queue
2557        next points to the next element in the queue
2558     */
2559     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2560     //tso_loc = where_is(tso);
2561     len++;
2562     bqe = unblockOneLocked(bqe, node);
2563   }
2564
2565   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2566      the closure to make room for the anchor of the BQ */
2567   if (bqe!=END_BQ_QUEUE) {
2568     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2569     /*
2570     ASSERT((info_ptr==&RBH_Save_0_info) ||
2571            (info_ptr==&RBH_Save_1_info) ||
2572            (info_ptr==&RBH_Save_2_info));
2573     */
2574     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2575     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2576     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2577
2578     IF_GRAN_DEBUG(bq,
2579                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2580                         node, info_type(node)));
2581   }
2582
2583   /* statistics gathering */
2584   if (RtsFlags.GranFlags.GranSimStats.Global) {
2585     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2586     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2587     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2588     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2589   }
2590   IF_GRAN_DEBUG(bq,
2591                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2592                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2593 }
2594 #elif defined(PAR)
2595 void 
2596 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2597 {
2598   StgBlockingQueueElement *bqe;
2599
2600   ACQUIRE_LOCK(&sched_mutex);
2601
2602   IF_PAR_DEBUG(verbose, 
2603                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2604                      node, mytid));
2605 #ifdef DIST  
2606   //RFP
2607   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2608     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2609     return;
2610   }
2611 #endif
2612   
2613   ASSERT(q == END_BQ_QUEUE ||
2614          get_itbl(q)->type == TSO ||           
2615          get_itbl(q)->type == BLOCKED_FETCH || 
2616          get_itbl(q)->type == CONSTR); 
2617
2618   bqe = q;
2619   while (get_itbl(bqe)->type==TSO || 
2620          get_itbl(bqe)->type==BLOCKED_FETCH) {
2621     bqe = unblockOneLocked(bqe, node);
2622   }
2623   RELEASE_LOCK(&sched_mutex);
2624 }
2625
2626 #else   /* !GRAN && !PAR */
2627
2628 void
2629 awakenBlockedQueueNoLock(StgTSO *tso)
2630 {
2631   while (tso != END_TSO_QUEUE) {
2632     tso = unblockOneLocked(tso);
2633   }
2634 }
2635
2636 void
2637 awakenBlockedQueue(StgTSO *tso)
2638 {
2639   ACQUIRE_LOCK(&sched_mutex);
2640   while (tso != END_TSO_QUEUE) {
2641     tso = unblockOneLocked(tso);
2642   }
2643   RELEASE_LOCK(&sched_mutex);
2644 }
2645 #endif
2646
2647 /* ---------------------------------------------------------------------------
2648    Interrupt execution
2649    - usually called inside a signal handler so it mustn't do anything fancy.   
2650    ------------------------------------------------------------------------ */
2651
2652 void
2653 interruptStgRts(void)
2654 {
2655     interrupted    = 1;
2656     context_switch = 1;
2657 }
2658
2659 /* -----------------------------------------------------------------------------
2660    Unblock a thread
2661
2662    This is for use when we raise an exception in another thread, which
2663    may be blocked.
2664    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2665    -------------------------------------------------------------------------- */
2666
2667 #if defined(GRAN) || defined(PAR)
2668 /*
2669   NB: only the type of the blocking queue is different in GranSim and GUM
2670       the operations on the queue-elements are the same
2671       long live polymorphism!
2672
2673   Locks: sched_mutex is held upon entry and exit.
2674
2675 */
2676 static void
2677 unblockThread(StgTSO *tso)
2678 {
2679   StgBlockingQueueElement *t, **last;
2680
2681   switch (tso->why_blocked) {
2682
2683   case NotBlocked:
2684     return;  /* not blocked */
2685
2686   case BlockedOnSTM:
2687     // Be careful: nothing to do here!  We tell the scheduler that the thread
2688     // is runnable and we leave it to the stack-walking code to abort the 
2689     // transaction while unwinding the stack.  We should perhaps have a debugging
2690     // test to make sure that this really happens and that the 'zombie' transaction
2691     // does not get committed.
2692     goto done;
2693
2694   case BlockedOnMVar:
2695     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2696     {
2697       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2698       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2699
2700       last = (StgBlockingQueueElement **)&mvar->head;
2701       for (t = (StgBlockingQueueElement *)mvar->head; 
2702            t != END_BQ_QUEUE; 
2703            last = &t->link, last_tso = t, t = t->link) {
2704         if (t == (StgBlockingQueueElement *)tso) {
2705           *last = (StgBlockingQueueElement *)tso->link;
2706           if (mvar->tail == tso) {
2707             mvar->tail = (StgTSO *)last_tso;
2708           }
2709           goto done;
2710         }
2711       }
2712       barf("unblockThread (MVAR): TSO not found");
2713     }
2714
2715   case BlockedOnBlackHole:
2716     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2717     {
2718       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2719
2720       last = &bq->blocking_queue;
2721       for (t = bq->blocking_queue; 
2722            t != END_BQ_QUEUE; 
2723            last = &t->link, t = t->link) {
2724         if (t == (StgBlockingQueueElement *)tso) {
2725           *last = (StgBlockingQueueElement *)tso->link;
2726           goto done;
2727         }
2728       }
2729       barf("unblockThread (BLACKHOLE): TSO not found");
2730     }
2731
2732   case BlockedOnException:
2733     {
2734       StgTSO *target  = tso->block_info.tso;
2735
2736       ASSERT(get_itbl(target)->type == TSO);
2737
2738       if (target->what_next == ThreadRelocated) {
2739           target = target->link;
2740           ASSERT(get_itbl(target)->type == TSO);
2741       }
2742
2743       ASSERT(target->blocked_exceptions != NULL);
2744
2745       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2746       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2747            t != END_BQ_QUEUE; 
2748            last = &t->link, t = t->link) {
2749         ASSERT(get_itbl(t)->type == TSO);
2750         if (t == (StgBlockingQueueElement *)tso) {
2751           *last = (StgBlockingQueueElement *)tso->link;
2752           goto done;
2753         }
2754       }
2755       barf("unblockThread (Exception): TSO not found");
2756     }
2757
2758   case BlockedOnRead:
2759   case BlockedOnWrite:
2760 #if defined(mingw32_TARGET_OS)
2761   case BlockedOnDoProc:
2762 #endif
2763     {
2764       /* take TSO off blocked_queue */
2765       StgBlockingQueueElement *prev = NULL;
2766       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2767            prev = t, t = t->link) {
2768         if (t == (StgBlockingQueueElement *)tso) {
2769           if (prev == NULL) {
2770             blocked_queue_hd = (StgTSO *)t->link;
2771             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2772               blocked_queue_tl = END_TSO_QUEUE;
2773             }
2774           } else {
2775             prev->link = t->link;
2776             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2777               blocked_queue_tl = (StgTSO *)prev;
2778             }
2779           }
2780           goto done;
2781         }
2782       }
2783       barf("unblockThread (I/O): TSO not found");
2784     }
2785
2786   case BlockedOnDelay:
2787     {
2788       /* take TSO off sleeping_queue */
2789       StgBlockingQueueElement *prev = NULL;
2790       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2791            prev = t, t = t->link) {
2792         if (t == (StgBlockingQueueElement *)tso) {
2793           if (prev == NULL) {
2794             sleeping_queue = (StgTSO *)t->link;
2795           } else {
2796             prev->link = t->link;
2797           }
2798           goto done;
2799         }
2800       }
2801       barf("unblockThread (delay): TSO not found");
2802     }
2803
2804   default:
2805     barf("unblockThread");
2806   }
2807
2808  done:
2809   tso->link = END_TSO_QUEUE;
2810   tso->why_blocked = NotBlocked;
2811   tso->block_info.closure = NULL;
2812   PUSH_ON_RUN_QUEUE(tso);
2813 }
2814 #else
2815 static void
2816 unblockThread(StgTSO *tso)
2817 {
2818   StgTSO *t, **last;
2819   
2820   /* To avoid locking unnecessarily. */
2821   if (tso->why_blocked == NotBlocked) {
2822     return;
2823   }
2824
2825   switch (tso->why_blocked) {
2826
2827   case BlockedOnSTM:
2828     // Be careful: nothing to do here!  We tell the scheduler that the thread
2829     // is runnable and we leave it to the stack-walking code to abort the 
2830     // transaction while unwinding the stack.  We should perhaps have a debugging
2831     // test to make sure that this really happens and that the 'zombie' transaction
2832     // does not get committed.
2833     goto done;
2834
2835   case BlockedOnMVar:
2836     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2837     {
2838       StgTSO *last_tso = END_TSO_QUEUE;
2839       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2840
2841       last = &mvar->head;
2842       for (t = mvar->head; t != END_TSO_QUEUE; 
2843            last = &t->link, last_tso = t, t = t->link) {
2844         if (t == tso) {
2845           *last = tso->link;
2846           if (mvar->tail == tso) {
2847             mvar->tail = last_tso;
2848           }
2849           goto done;
2850         }
2851       }
2852       barf("unblockThread (MVAR): TSO not found");
2853     }
2854
2855   case BlockedOnBlackHole:
2856     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2857     {
2858       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2859
2860       last = &bq->blocking_queue;
2861       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2862            last = &t->link, t = t->link) {
2863         if (t == tso) {
2864           *last = tso->link;
2865           goto done;
2866         }
2867       }
2868       barf("unblockThread (BLACKHOLE): TSO not found");
2869     }
2870
2871   case BlockedOnException:
2872     {
2873       StgTSO *target  = tso->block_info.tso;
2874
2875       ASSERT(get_itbl(target)->type == TSO);
2876
2877       while (target->what_next == ThreadRelocated) {
2878           target = target->link;
2879           ASSERT(get_itbl(target)->type == TSO);
2880       }
2881       
2882       ASSERT(target->blocked_exceptions != NULL);
2883
2884       last = &target->blocked_exceptions;
2885       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2886            last = &t->link, t = t->link) {
2887         ASSERT(get_itbl(t)->type == TSO);
2888         if (t == tso) {
2889           *last = tso->link;
2890           goto done;
2891         }
2892       }
2893       barf("unblockThread (Exception): TSO not found");
2894     }
2895
2896   case BlockedOnRead:
2897   case BlockedOnWrite:
2898 #if defined(mingw32_TARGET_OS)
2899   case BlockedOnDoProc:
2900 #endif
2901     {
2902       StgTSO *prev = NULL;
2903       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2904            prev = t, t = t->link) {
2905         if (t == tso) {
2906           if (prev == NULL) {
2907             blocked_queue_hd = t->link;
2908             if (blocked_queue_tl == t) {
2909               blocked_queue_tl = END_TSO_QUEUE;
2910             }
2911           } else {
2912             prev->link = t->link;
2913             if (blocked_queue_tl == t) {
2914               blocked_queue_tl = prev;
2915             }
2916           }
2917           goto done;
2918         }
2919       }
2920       barf("unblockThread (I/O): TSO not found");
2921     }
2922
2923   case BlockedOnDelay:
2924     {
2925       StgTSO *prev = NULL;
2926       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2927            prev = t, t = t->link) {
2928         if (t == tso) {
2929           if (prev == NULL) {
2930             sleeping_queue = t->link;
2931           } else {
2932             prev->link = t->link;
2933           }
2934           goto done;
2935         }
2936       }
2937       barf("unblockThread (delay): TSO not found");
2938     }
2939
2940   default:
2941     barf("unblockThread");
2942   }
2943
2944  done:
2945   tso->link = END_TSO_QUEUE;
2946   tso->why_blocked = NotBlocked;
2947   tso->block_info.closure = NULL;
2948   APPEND_TO_RUN_QUEUE(tso);
2949 }
2950 #endif
2951
2952 /* -----------------------------------------------------------------------------
2953  * raiseAsync()
2954  *
2955  * The following function implements the magic for raising an
2956  * asynchronous exception in an existing thread.
2957  *
2958  * We first remove the thread from any queue on which it might be
2959  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2960  *
2961  * We strip the stack down to the innermost CATCH_FRAME, building
2962  * thunks in the heap for all the active computations, so they can 
2963  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2964  * an application of the handler to the exception, and push it on
2965  * the top of the stack.
2966  * 
2967  * How exactly do we save all the active computations?  We create an
2968  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2969  * AP_STACKs pushes everything from the corresponding update frame
2970  * upwards onto the stack.  (Actually, it pushes everything up to the
2971  * next update frame plus a pointer to the next AP_STACK object.
2972  * Entering the next AP_STACK object pushes more onto the stack until we
2973  * reach the last AP_STACK object - at which point the stack should look
2974  * exactly as it did when we killed the TSO and we can continue
2975  * execution by entering the closure on top of the stack.
2976  *
2977  * We can also kill a thread entirely - this happens if either (a) the 
2978  * exception passed to raiseAsync is NULL, or (b) there's no
2979  * CATCH_FRAME on the stack.  In either case, we strip the entire
2980  * stack and replace the thread with a zombie.
2981  *
2982  * Locks: sched_mutex held upon entry nor exit.
2983  *
2984  * -------------------------------------------------------------------------- */
2985  
2986 void 
2987 deleteThread(StgTSO *tso)
2988 {
2989   if (tso->why_blocked != BlockedOnCCall &&
2990       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2991       raiseAsync(tso,NULL);
2992   }
2993 }
2994
2995 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2996 static void 
2997 deleteThreadImmediately(StgTSO *tso)
2998 { // for forkProcess only:
2999   // delete thread without giving it a chance to catch the KillThread exception
3000
3001   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3002       return;
3003   }
3004
3005   if (tso->why_blocked != BlockedOnCCall &&
3006       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3007     unblockThread(tso);
3008   }
3009
3010   tso->what_next = ThreadKilled;
3011 }
3012 #endif
3013
3014 void
3015 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3016 {
3017   /* When raising async exs from contexts where sched_mutex isn't held;
3018      use raiseAsyncWithLock(). */
3019   ACQUIRE_LOCK(&sched_mutex);
3020   raiseAsync(tso,exception);
3021   RELEASE_LOCK(&sched_mutex);
3022 }
3023
3024 void
3025 raiseAsync(StgTSO *tso, StgClosure *exception)
3026 {
3027     raiseAsync_(tso, exception, rtsFalse);
3028 }
3029
3030 static void
3031 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3032 {
3033     StgRetInfoTable *info;
3034     StgPtr sp;
3035   
3036     // Thread already dead?
3037     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3038         return;
3039     }
3040
3041     IF_DEBUG(scheduler, 
3042              sched_belch("raising exception in thread %ld.", (long)tso->id));
3043     
3044     // Remove it from any blocking queues
3045     unblockThread(tso);
3046
3047     sp = tso->sp;
3048     
3049     // The stack freezing code assumes there's a closure pointer on
3050     // the top of the stack, so we have to arrange that this is the case...
3051     //
3052     if (sp[0] == (W_)&stg_enter_info) {
3053         sp++;
3054     } else {
3055         sp--;
3056         sp[0] = (W_)&stg_dummy_ret_closure;
3057     }
3058
3059     while (1) {
3060         nat i;
3061
3062         // 1. Let the top of the stack be the "current closure"
3063         //
3064         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3065         // CATCH_FRAME.
3066         //
3067         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3068         // current closure applied to the chunk of stack up to (but not
3069         // including) the update frame.  This closure becomes the "current
3070         // closure".  Go back to step 2.
3071         //
3072         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3073         // top of the stack applied to the exception.
3074         // 
3075         // 5. If it's a STOP_FRAME, then kill the thread.
3076         // 
3077         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3078         // transaction
3079        
3080         
3081         StgPtr frame;
3082         
3083         frame = sp + 1;
3084         info = get_ret_itbl((StgClosure *)frame);
3085         
3086         while (info->i.type != UPDATE_FRAME
3087                && (info->i.type != CATCH_FRAME || exception == NULL)
3088                && info->i.type != STOP_FRAME
3089                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3090         {
3091             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3092               // IF we find an ATOMICALLY_FRAME then we abort the
3093               // current transaction and propagate the exception.  In
3094               // this case (unlike ordinary exceptions) we do not care
3095               // whether the transaction is valid or not because its
3096               // possible validity cannot have caused the exception
3097               // and will not be visible after the abort.
3098               IF_DEBUG(stm,
3099                        debugBelch("Found atomically block delivering async exception\n"));
3100               stmAbortTransaction(tso -> trec);
3101               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3102             }
3103             frame += stack_frame_sizeW((StgClosure *)frame);
3104             info = get_ret_itbl((StgClosure *)frame);
3105         }
3106         
3107         switch (info->i.type) {
3108             
3109         case ATOMICALLY_FRAME:
3110             ASSERT(stop_at_atomically);
3111             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3112             stmCondemnTransaction(tso -> trec);
3113             tso->sp = frame;
3114             tso->what_next = ThreadRunGHC;
3115             return;
3116
3117         case CATCH_FRAME:
3118             // If we find a CATCH_FRAME, and we've got an exception to raise,
3119             // then build the THUNK raise(exception), and leave it on
3120             // top of the CATCH_FRAME ready to enter.
3121             //
3122         {
3123 #ifdef PROFILING
3124             StgCatchFrame *cf = (StgCatchFrame *)frame;
3125 #endif
3126             StgClosure *raise;
3127             
3128             // we've got an exception to raise, so let's pass it to the
3129             // handler in this frame.
3130             //
3131             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3132             TICK_ALLOC_SE_THK(1,0);
3133             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3134             raise->payload[0] = exception;
3135             
3136             // throw away the stack from Sp up to the CATCH_FRAME.
3137             //
3138             sp = frame - 1;
3139             
3140             /* Ensure that async excpetions are blocked now, so we don't get
3141              * a surprise exception before we get around to executing the
3142              * handler.
3143              */
3144             if (tso->blocked_exceptions == NULL) {
3145                 tso->blocked_exceptions = END_TSO_QUEUE;
3146             }
3147             
3148             /* Put the newly-built THUNK on top of the stack, ready to execute
3149              * when the thread restarts.
3150              */
3151             sp[0] = (W_)raise;
3152             sp[-1] = (W_)&stg_enter_info;
3153             tso->sp = sp-1;
3154             tso->what_next = ThreadRunGHC;
3155             IF_DEBUG(sanity, checkTSO(tso));
3156             return;
3157         }
3158         
3159         case UPDATE_FRAME:
3160         {
3161             StgAP_STACK * ap;
3162             nat words;
3163             
3164             // First build an AP_STACK consisting of the stack chunk above the
3165             // current update frame, with the top word on the stack as the
3166             // fun field.
3167             //
3168             words = frame - sp - 1;
3169             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3170             
3171             ap->size = words;
3172             ap->fun  = (StgClosure *)sp[0];
3173             sp++;
3174             for(i=0; i < (nat)words; ++i) {
3175                 ap->payload[i] = (StgClosure *)*sp++;
3176             }
3177             
3178             SET_HDR(ap,&stg_AP_STACK_info,
3179                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3180             TICK_ALLOC_UP_THK(words+1,0);
3181             
3182             IF_DEBUG(scheduler,
3183                      debugBelch("sched: Updating ");
3184                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3185                      debugBelch(" with ");
3186                      printObj((StgClosure *)ap);
3187                 );
3188
3189             // Replace the updatee with an indirection - happily
3190             // this will also wake up any threads currently
3191             // waiting on the result.
3192             //
3193             // Warning: if we're in a loop, more than one update frame on
3194             // the stack may point to the same object.  Be careful not to
3195             // overwrite an IND_OLDGEN in this case, because we'll screw
3196             // up the mutable lists.  To be on the safe side, don't
3197             // overwrite any kind of indirection at all.  See also
3198             // threadSqueezeStack in GC.c, where we have to make a similar
3199             // check.
3200             //
3201             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3202                 // revert the black hole
3203                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3204                                (StgClosure *)ap);
3205             }
3206             sp += sizeofW(StgUpdateFrame) - 1;
3207             sp[0] = (W_)ap; // push onto stack
3208             break;
3209         }
3210         
3211         case STOP_FRAME:
3212             // We've stripped the entire stack, the thread is now dead.
3213             sp += sizeofW(StgStopFrame);
3214             tso->what_next = ThreadKilled;
3215             tso->sp = sp;
3216             return;
3217             
3218         default:
3219             barf("raiseAsync");
3220         }
3221     }
3222     barf("raiseAsync");
3223 }
3224
3225 /* -----------------------------------------------------------------------------
3226    raiseExceptionHelper
3227    
3228    This function is called by the raise# primitve, just so that we can
3229    move some of the tricky bits of raising an exception from C-- into
3230    C.  Who knows, it might be a useful re-useable thing here too.
3231    -------------------------------------------------------------------------- */
3232
3233 StgWord
3234 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3235 {
3236     StgClosure *raise_closure = NULL;
3237     StgPtr p, next;
3238     StgRetInfoTable *info;
3239     //
3240     // This closure represents the expression 'raise# E' where E
3241     // is the exception raise.  It is used to overwrite all the
3242     // thunks which are currently under evaluataion.
3243     //
3244
3245     //    
3246     // LDV profiling: stg_raise_info has THUNK as its closure
3247     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3248     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3249     // 1 does not cause any problem unless profiling is performed.
3250     // However, when LDV profiling goes on, we need to linearly scan
3251     // small object pool, where raise_closure is stored, so we should
3252     // use MIN_UPD_SIZE.
3253     //
3254     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3255     //                                 sizeofW(StgClosure)+1);
3256     //
3257
3258     //
3259     // Walk up the stack, looking for the catch frame.  On the way,
3260     // we update any closures pointed to from update frames with the
3261     // raise closure that we just built.
3262     //
3263     p = tso->sp;
3264     while(1) {
3265         info = get_ret_itbl((StgClosure *)p);
3266         next = p + stack_frame_sizeW((StgClosure *)p);
3267         switch (info->i.type) {
3268             
3269         case UPDATE_FRAME:
3270             // Only create raise_closure if we need to.
3271             if (raise_closure == NULL) {
3272                 raise_closure = 
3273                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3274                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3275                 raise_closure->payload[0] = exception;
3276             }
3277             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3278             p = next;
3279             continue;
3280
3281         case ATOMICALLY_FRAME:
3282             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3283             tso->sp = p;
3284             return ATOMICALLY_FRAME;
3285             
3286         case CATCH_FRAME:
3287             tso->sp = p;
3288             return CATCH_FRAME;
3289
3290         case CATCH_STM_FRAME:
3291             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3292             tso->sp = p;
3293             return CATCH_STM_FRAME;
3294             
3295         case STOP_FRAME:
3296             tso->sp = p;
3297             return STOP_FRAME;
3298
3299         case CATCH_RETRY_FRAME:
3300         default:
3301             p = next; 
3302             continue;
3303         }
3304     }
3305 }
3306
3307
3308 /* -----------------------------------------------------------------------------
3309    findRetryFrameHelper
3310
3311    This function is called by the retry# primitive.  It traverses the stack
3312    leaving tso->sp referring to the frame which should handle the retry.  
3313
3314    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3315    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3316
3317    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3318    despite the similar implementation.
3319
3320    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3321    not be created within memory transactions.
3322    -------------------------------------------------------------------------- */
3323
3324 StgWord
3325 findRetryFrameHelper (StgTSO *tso)
3326 {
3327   StgPtr           p, next;
3328   StgRetInfoTable *info;
3329
3330   p = tso -> sp;
3331   while (1) {
3332     info = get_ret_itbl((StgClosure *)p);
3333     next = p + stack_frame_sizeW((StgClosure *)p);
3334     switch (info->i.type) {
3335       
3336     case ATOMICALLY_FRAME:
3337       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3338       tso->sp = p;
3339       return ATOMICALLY_FRAME;
3340       
3341     case CATCH_RETRY_FRAME:
3342       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3343       tso->sp = p;
3344       return CATCH_RETRY_FRAME;
3345       
3346     case CATCH_STM_FRAME:
3347     default:
3348       ASSERT(info->i.type != CATCH_FRAME);
3349       ASSERT(info->i.type != STOP_FRAME);
3350       p = next; 
3351       continue;
3352     }
3353   }
3354 }
3355    
3356 /* -----------------------------------------------------------------------------
3357    resurrectThreads is called after garbage collection on the list of
3358    threads found to be garbage.  Each of these threads will be woken
3359    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3360    on an MVar, or NonTermination if the thread was blocked on a Black
3361    Hole.
3362
3363    Locks: sched_mutex isn't held upon entry nor exit.
3364    -------------------------------------------------------------------------- */
3365
3366 void
3367 resurrectThreads( StgTSO *threads )
3368 {
3369   StgTSO *tso, *next;
3370
3371   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3372     next = tso->global_link;
3373     tso->global_link = all_threads;
3374     all_threads = tso;
3375     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3376
3377     switch (tso->why_blocked) {
3378     case BlockedOnMVar:
3379     case BlockedOnException:
3380       /* Called by GC - sched_mutex lock is currently held. */
3381       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3382       break;
3383     case BlockedOnBlackHole:
3384       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3385       break;
3386     case BlockedOnSTM:
3387       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3388       break;
3389     case NotBlocked:
3390       /* This might happen if the thread was blocked on a black hole
3391        * belonging to a thread that we've just woken up (raiseAsync
3392        * can wake up threads, remember...).
3393        */
3394       continue;
3395     default:
3396       barf("resurrectThreads: thread blocked in a strange way");
3397     }
3398   }
3399 }
3400
3401 /* ----------------------------------------------------------------------------
3402  * Debugging: why is a thread blocked
3403  * [Also provides useful information when debugging threaded programs
3404  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3405    ------------------------------------------------------------------------- */
3406
3407 static
3408 void
3409 printThreadBlockage(StgTSO *tso)
3410 {
3411   switch (tso->why_blocked) {
3412   case BlockedOnRead:
3413     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3414     break;
3415   case BlockedOnWrite:
3416     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3417     break;
3418 #if defined(mingw32_TARGET_OS)
3419     case BlockedOnDoProc:
3420     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3421     break;
3422 #endif
3423   case BlockedOnDelay:
3424     debugBelch("is blocked until %d", tso->block_info.target);
3425     break;
3426   case BlockedOnMVar:
3427     debugBelch("is blocked on an MVar");
3428     break;
3429   case BlockedOnException:
3430     debugBelch("is blocked on delivering an exception to thread %d",
3431             tso->block_info.tso->id);
3432     break;
3433   case BlockedOnBlackHole:
3434     debugBelch("is blocked on a black hole");
3435     break;
3436   case NotBlocked:
3437     debugBelch("is not blocked");
3438     break;
3439 #if defined(PAR)
3440   case BlockedOnGA:
3441     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3442             tso->block_info.closure, info_type(tso->block_info.closure));
3443     break;
3444   case BlockedOnGA_NoSend:
3445     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3446             tso->block_info.closure, info_type(tso->block_info.closure));
3447     break;
3448 #endif
3449   case BlockedOnCCall:
3450     debugBelch("is blocked on an external call");
3451     break;
3452   case BlockedOnCCall_NoUnblockExc:
3453     debugBelch("is blocked on an external call (exceptions were already blocked)");
3454     break;
3455   case BlockedOnSTM:
3456     debugBelch("is blocked on an STM operation");
3457     break;
3458   default:
3459     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3460          tso->why_blocked, tso->id, tso);
3461   }
3462 }
3463
3464 static
3465 void
3466 printThreadStatus(StgTSO *tso)
3467 {
3468   switch (tso->what_next) {
3469   case ThreadKilled:
3470     debugBelch("has been killed");
3471     break;
3472   case ThreadComplete:
3473     debugBelch("has completed");
3474     break;
3475   default:
3476     printThreadBlockage(tso);
3477   }
3478 }
3479
3480 void
3481 printAllThreads(void)
3482 {
3483   StgTSO *t;
3484
3485 # if defined(GRAN)
3486   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3487   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3488                        time_string, rtsFalse/*no commas!*/);
3489
3490   debugBelch("all threads at [%s]:\n", time_string);
3491 # elif defined(PAR)
3492   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3493   ullong_format_string(CURRENT_TIME,
3494                        time_string, rtsFalse/*no commas!*/);
3495
3496   debugBelch("all threads at [%s]:\n", time_string);
3497 # else
3498   debugBelch("all threads:\n");
3499 # endif
3500
3501   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3502     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3503 #if defined(DEBUG)
3504     {
3505       void *label = lookupThreadLabel(t->id);
3506       if (label) debugBelch("[\"%s\"] ",(char *)label);
3507     }
3508 #endif
3509     printThreadStatus(t);
3510     debugBelch("\n");
3511   }
3512 }
3513     
3514 #ifdef DEBUG
3515
3516 /* 
3517    Print a whole blocking queue attached to node (debugging only).
3518 */
3519 # if defined(PAR)
3520 void 
3521 print_bq (StgClosure *node)
3522 {
3523   StgBlockingQueueElement *bqe;
3524   StgTSO *tso;
3525   rtsBool end;
3526
3527   debugBelch("## BQ of closure %p (%s): ",
3528           node, info_type(node));
3529
3530   /* should cover all closures that may have a blocking queue */
3531   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3532          get_itbl(node)->type == FETCH_ME_BQ ||
3533          get_itbl(node)->type == RBH ||
3534          get_itbl(node)->type == MVAR);
3535     
3536   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3537
3538   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3539 }
3540
3541 /* 
3542    Print a whole blocking queue starting with the element bqe.
3543 */
3544 void 
3545 print_bqe (StgBlockingQueueElement *bqe)
3546 {
3547   rtsBool end;
3548
3549   /* 
3550      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3551   */
3552   for (end = (bqe==END_BQ_QUEUE);
3553        !end; // iterate until bqe points to a CONSTR
3554        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3555        bqe = end ? END_BQ_QUEUE : bqe->link) {
3556     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3557     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3558     /* types of closures that may appear in a blocking queue */
3559     ASSERT(get_itbl(bqe)->type == TSO ||           
3560            get_itbl(bqe)->type == BLOCKED_FETCH || 
3561            get_itbl(bqe)->type == CONSTR); 
3562     /* only BQs of an RBH end with an RBH_Save closure */
3563     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3564
3565     switch (get_itbl(bqe)->type) {
3566     case TSO:
3567       debugBelch(" TSO %u (%x),",
3568               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3569       break;
3570     case BLOCKED_FETCH:
3571       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3572               ((StgBlockedFetch *)bqe)->node, 
3573               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3574               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3575               ((StgBlockedFetch *)bqe)->ga.weight);
3576       break;
3577     case CONSTR:
3578       debugBelch(" %s (IP %p),",
3579               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3580                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3581                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3582                "RBH_Save_?"), get_itbl(bqe));
3583       break;
3584     default:
3585       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3586            info_type((StgClosure *)bqe)); // , node, info_type(node));
3587       break;
3588     }
3589   } /* for */
3590   debugBelch("\n");
3591 }
3592 # elif defined(GRAN)
3593 void 
3594 print_bq (StgClosure *node)
3595 {
3596   StgBlockingQueueElement *bqe;
3597   PEs node_loc, tso_loc;
3598   rtsBool end;
3599
3600   /* should cover all closures that may have a blocking queue */
3601   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3602          get_itbl(node)->type == FETCH_ME_BQ ||
3603          get_itbl(node)->type == RBH);
3604     
3605   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3606   node_loc = where_is(node);
3607
3608   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3609           node, info_type(node), node_loc);
3610
3611   /* 
3612      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3613   */
3614   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3615        !end; // iterate until bqe points to a CONSTR
3616        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3617     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3618     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3619     /* types of closures that may appear in a blocking queue */
3620     ASSERT(get_itbl(bqe)->type == TSO ||           
3621            get_itbl(bqe)->type == CONSTR); 
3622     /* only BQs of an RBH end with an RBH_Save closure */
3623     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3624
3625     tso_loc = where_is((StgClosure *)bqe);
3626     switch (get_itbl(bqe)->type) {
3627     case TSO:
3628       debugBelch(" TSO %d (%p) on [PE %d],",
3629               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3630       break;
3631     case CONSTR:
3632       debugBelch(" %s (IP %p),",
3633               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3634                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3635                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3636                "RBH_Save_?"), get_itbl(bqe));
3637       break;
3638     default:
3639       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3640            info_type((StgClosure *)bqe), node, info_type(node));
3641       break;
3642     }
3643   } /* for */
3644   debugBelch("\n");
3645 }
3646 #else
3647 /* 
3648    Nice and easy: only TSOs on the blocking queue
3649 */
3650 void 
3651 print_bq (StgClosure *node)
3652 {
3653   StgTSO *tso;
3654
3655   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3656   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3657        tso != END_TSO_QUEUE; 
3658        tso=tso->link) {
3659     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3660     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3661     debugBelch(" TSO %d (%p),", tso->id, tso);
3662   }
3663   debugBelch("\n");
3664 }
3665 # endif
3666
3667 #if defined(PAR)
3668 static nat
3669 run_queue_len(void)
3670 {
3671   nat i;
3672   StgTSO *tso;
3673
3674   for (i=0, tso=run_queue_hd; 
3675        tso != END_TSO_QUEUE;
3676        i++, tso=tso->link)
3677     /* nothing */
3678
3679   return i;
3680 }
3681 #endif
3682
3683 void
3684 sched_belch(char *s, ...)
3685 {
3686   va_list ap;
3687   va_start(ap,s);
3688 #ifdef RTS_SUPPORTS_THREADS
3689   debugBelch("sched (task %p): ", osThreadId());
3690 #elif defined(PAR)
3691   debugBelch("== ");
3692 #else
3693   debugBelch("sched: ");
3694 #endif
3695   vdebugBelch(s, ap);
3696   debugBelch("\n");
3697   va_end(ap);
3698 }
3699
3700 #endif /* DEBUG */