[project @ 2005-03-05 16:19:14 by panne]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* If this flag is set, we are running Haskell code.  Used to detect
172  * uses of 'foreign import unsafe' that should be 'safe'.
173  */
174 rtsBool in_haskell = rtsFalse;
175
176 /* Next thread ID to allocate.
177  * Locks required: thread_id_mutex
178  */
179 static StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the closure to enter)
191  *  + 1                       (stg_ap_v_ret)
192  *  + 1                       (spare slot req'd by stg_ap_v_ret)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199
200
201 #if defined(GRAN)
202 StgTSO *CurrentTSO;
203 #endif
204
205 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
206  *  exists - earlier gccs apparently didn't.
207  *  -= chak
208  */
209 StgTSO dummy_tso;
210
211 static rtsBool ready_to_gc;
212
213 /*
214  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
215  * in an MT setting, needed to signal that a worker thread shouldn't hang around
216  * in the scheduler when it is out of work.
217  */
218 static rtsBool shutting_down_scheduler = rtsFalse;
219
220 void            addToBlockedQueue ( StgTSO *tso );
221
222 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
223        void     interruptStgRts   ( void );
224
225 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
226
227 #if defined(RTS_SUPPORTS_THREADS)
228 /* ToDo: carefully document the invariants that go together
229  *       with these synchronisation objects.
230  */
231 Mutex     sched_mutex       = INIT_MUTEX_VAR;
232 Mutex     term_mutex        = INIT_MUTEX_VAR;
233
234 #endif /* RTS_SUPPORTS_THREADS */
235
236 #if defined(PAR)
237 StgTSO *LastTSO;
238 rtsTime TimeOfLastYield;
239 rtsBool emitSchedule = rtsTrue;
240 #endif
241
242 #if DEBUG
243 static char *whatNext_strs[] = {
244   "(unknown)",
245   "ThreadRunGHC",
246   "ThreadInterpret",
247   "ThreadKilled",
248   "ThreadRelocated",
249   "ThreadComplete"
250 };
251 #endif
252
253 #if defined(PAR)
254 StgTSO * createSparkThread(rtsSpark spark);
255 StgTSO * activateSpark (rtsSpark spark);  
256 #endif
257
258 /* ----------------------------------------------------------------------------
259  * Starting Tasks
260  * ------------------------------------------------------------------------- */
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 static rtsBool startingWorkerThread = rtsFalse;
264
265 static void taskStart(void);
266 static void
267 taskStart(void)
268 {
269   ACQUIRE_LOCK(&sched_mutex);
270   startingWorkerThread = rtsFalse;
271   schedule(NULL,NULL);
272   RELEASE_LOCK(&sched_mutex);
273 }
274
275 void
276 startSchedulerTaskIfNecessary(void)
277 {
278   if(run_queue_hd != END_TSO_QUEUE
279     || blocked_queue_hd != END_TSO_QUEUE
280     || sleeping_queue != END_TSO_QUEUE)
281   {
282     if(!startingWorkerThread)
283     { // we don't want to start another worker thread
284       // just because the last one hasn't yet reached the
285       // "waiting for capability" state
286       startingWorkerThread = rtsTrue;
287       if(!startTask(taskStart))
288       {
289         startingWorkerThread = rtsFalse;
290       }
291     }
292   }
293 }
294 #endif
295
296 /* ---------------------------------------------------------------------------
297    Main scheduling loop.
298
299    We use round-robin scheduling, each thread returning to the
300    scheduler loop when one of these conditions is detected:
301
302       * out of heap space
303       * timer expires (thread yields)
304       * thread blocks
305       * thread ends
306       * stack overflow
307
308    Locking notes:  we acquire the scheduler lock once at the beginning
309    of the scheduler loop, and release it when
310     
311       * running a thread, or
312       * waiting for work, or
313       * waiting for a GC to complete.
314
315    GRAN version:
316      In a GranSim setup this loop iterates over the global event queue.
317      This revolves around the global event queue, which determines what 
318      to do next. Therefore, it's more complicated than either the 
319      concurrent or the parallel (GUM) setup.
320
321    GUM version:
322      GUM iterates over incoming messages.
323      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
324      and sends out a fish whenever it has nothing to do; in-between
325      doing the actual reductions (shared code below) it processes the
326      incoming messages and deals with delayed operations 
327      (see PendingFetches).
328      This is not the ugliest code you could imagine, but it's bloody close.
329
330    ------------------------------------------------------------------------ */
331 static void
332 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
333           Capability *initialCapability )
334 {
335   StgTSO *t;
336   Capability *cap;
337   StgThreadReturnCode ret;
338 #if defined(GRAN)
339   rtsEvent *event;
340 #elif defined(PAR)
341   StgSparkPool *pool;
342   rtsSpark spark;
343   StgTSO *tso;
344   GlobalTaskId pe;
345   rtsBool receivedFinish = rtsFalse;
346 # if defined(DEBUG)
347   nat tp_size, sp_size; // stats only
348 # endif
349 #endif
350   rtsBool was_interrupted = rtsFalse;
351   nat prev_what_next;
352   
353   // Pre-condition: sched_mutex is held.
354   // We might have a capability, passed in as initialCapability.
355   cap = initialCapability;
356
357 #if defined(RTS_SUPPORTS_THREADS)
358   //
359   // in the threaded case, the capability is either passed in via the
360   // initialCapability parameter, or initialized inside the scheduler
361   // loop 
362   //
363   IF_DEBUG(scheduler,
364            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
365                        mainThread, initialCapability);
366       );
367 #else
368   // simply initialise it in the non-threaded case
369   grabCapability(&cap);
370 #endif
371
372 #if defined(GRAN)
373   /* set up first event to get things going */
374   /* ToDo: assign costs for system setup and init MainTSO ! */
375   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
376             ContinueThread, 
377             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
378
379   IF_DEBUG(gran,
380            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
381            G_TSO(CurrentTSO, 5));
382
383   if (RtsFlags.GranFlags.Light) {
384     /* Save current time; GranSim Light only */
385     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
386   }      
387
388   event = get_next_event();
389
390   while (event!=(rtsEvent*)NULL) {
391     /* Choose the processor with the next event */
392     CurrentProc = event->proc;
393     CurrentTSO = event->tso;
394
395 #elif defined(PAR)
396
397   while (!receivedFinish) {    /* set by processMessages */
398                                /* when receiving PP_FINISH message         */ 
399
400 #else // everything except GRAN and PAR
401
402   while (1) {
403
404 #endif
405
406      IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409       // Yield the capability to higher-priority tasks if necessary.
410       //
411       if (cap != NULL) {
412           yieldCapability(&cap);
413       }
414
415       // If we do not currently hold a capability, we wait for one
416       //
417       if (cap == NULL) {
418           waitForCapability(&sched_mutex, &cap,
419                             mainThread ? &mainThread->bound_thread_cond : NULL);
420       }
421
422       // We now have a capability...
423 #endif
424
425     // Check whether we have re-entered the RTS from Haskell without
426     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
427     // call).
428     if (in_haskell) {
429           errorBelch("schedule: re-entered unsafely.\n"
430                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
431           stg_exit(1);
432     }
433
434     //
435     // If we're interrupted (the user pressed ^C, or some other
436     // termination condition occurred), kill all the currently running
437     // threads.
438     //
439     if (interrupted) {
440         IF_DEBUG(scheduler, sched_belch("interrupted"));
441         interrupted = rtsFalse;
442         was_interrupted = rtsTrue;
443 #if defined(RTS_SUPPORTS_THREADS)
444         // In the threaded RTS, deadlock detection doesn't work,
445         // so just exit right away.
446         errorBelch("interrupted");
447         releaseCapability(cap);
448         RELEASE_LOCK(&sched_mutex);
449         shutdownHaskellAndExit(EXIT_SUCCESS);
450 #else
451         deleteAllThreads();
452 #endif
453     }
454
455 #if defined(RTS_USER_SIGNALS)
456     // check for signals each time around the scheduler
457     if (signals_pending()) {
458       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
459       startSignalHandlers();
460       ACQUIRE_LOCK(&sched_mutex);
461     }
462 #endif
463
464     //
465     // Check whether any waiting threads need to be woken up.  If the
466     // run queue is empty, and there are no other tasks running, we
467     // can wait indefinitely for something to happen.
468     //
469     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
470     {
471 #if defined(RTS_SUPPORTS_THREADS)
472         // We shouldn't be here...
473         barf("schedule: awaitEvent() in threaded RTS");
474 #endif
475         awaitEvent( EMPTY_RUN_QUEUE() );
476     }
477     // we can be interrupted while waiting for I/O...
478     if (interrupted) continue;
479
480     /* 
481      * Detect deadlock: when we have no threads to run, there are no
482      * threads waiting on I/O or sleeping, and all the other tasks are
483      * waiting for work, we must have a deadlock of some description.
484      *
485      * We first try to find threads blocked on themselves (ie. black
486      * holes), and generate NonTermination exceptions where necessary.
487      *
488      * If no threads are black holed, we have a deadlock situation, so
489      * inform all the main threads.
490      */
491 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
492     if (   EMPTY_THREAD_QUEUES() )
493     {
494         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
495
496         // Garbage collection can release some new threads due to
497         // either (a) finalizers or (b) threads resurrected because
498         // they are unreachable and will therefore be sent an
499         // exception.  Any threads thus released will be immediately
500         // runnable.
501         GarbageCollect(GetRoots,rtsTrue);
502         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
503
504 #if defined(RTS_USER_SIGNALS)
505         /* If we have user-installed signal handlers, then wait
506          * for signals to arrive rather then bombing out with a
507          * deadlock.
508          */
509         if ( anyUserHandlers() ) {
510             IF_DEBUG(scheduler, 
511                      sched_belch("still deadlocked, waiting for signals..."));
512
513             awaitUserSignals();
514
515             // we might be interrupted...
516             if (interrupted) { continue; }
517
518             if (signals_pending()) {
519                 RELEASE_LOCK(&sched_mutex);
520                 startSignalHandlers();
521                 ACQUIRE_LOCK(&sched_mutex);
522             }
523             ASSERT(!EMPTY_RUN_QUEUE());
524             goto not_deadlocked;
525         }
526 #endif
527
528         /* Probably a real deadlock.  Send the current main thread the
529          * Deadlock exception (or in the SMP build, send *all* main
530          * threads the deadlock exception, since none of them can make
531          * progress).
532          */
533         {
534             StgMainThread *m;
535             m = main_threads;
536             switch (m->tso->why_blocked) {
537             case BlockedOnBlackHole:
538             case BlockedOnException:
539             case BlockedOnMVar:
540                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
541                 break;
542             default:
543                 barf("deadlock: main thread blocked in a strange way");
544             }
545         }
546     }
547   not_deadlocked:
548
549 #elif defined(RTS_SUPPORTS_THREADS)
550     // ToDo: add deadlock detection in threaded RTS
551 #elif defined(PAR)
552     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
553 #endif
554
555 #if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
556     /* win32: might be back here due to awaitEvent() being abandoned
557      * as a result of a console event having been delivered.
558      */
559     if ( EMPTY_RUN_QUEUE() ) {
560         continue; // nothing to do
561     }
562 #endif
563
564 #if defined(GRAN)
565     if (RtsFlags.GranFlags.Light)
566       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
567
568     /* adjust time based on time-stamp */
569     if (event->time > CurrentTime[CurrentProc] &&
570         event->evttype != ContinueThread)
571       CurrentTime[CurrentProc] = event->time;
572     
573     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
574     if (!RtsFlags.GranFlags.Light)
575       handleIdlePEs();
576
577     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
578
579     /* main event dispatcher in GranSim */
580     switch (event->evttype) {
581       /* Should just be continuing execution */
582     case ContinueThread:
583       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
584       /* ToDo: check assertion
585       ASSERT(run_queue_hd != (StgTSO*)NULL &&
586              run_queue_hd != END_TSO_QUEUE);
587       */
588       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
589       if (!RtsFlags.GranFlags.DoAsyncFetch &&
590           procStatus[CurrentProc]==Fetching) {
591         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
592               CurrentTSO->id, CurrentTSO, CurrentProc);
593         goto next_thread;
594       } 
595       /* Ignore ContinueThreads for completed threads */
596       if (CurrentTSO->what_next == ThreadComplete) {
597         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
598               CurrentTSO->id, CurrentTSO, CurrentProc);
599         goto next_thread;
600       } 
601       /* Ignore ContinueThreads for threads that are being migrated */
602       if (PROCS(CurrentTSO)==Nowhere) { 
603         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
604               CurrentTSO->id, CurrentTSO, CurrentProc);
605         goto next_thread;
606       }
607       /* The thread should be at the beginning of the run queue */
608       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
609         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
610               CurrentTSO->id, CurrentTSO, CurrentProc);
611         break; // run the thread anyway
612       }
613       /*
614       new_event(proc, proc, CurrentTime[proc],
615                 FindWork,
616                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
617       goto next_thread; 
618       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
619       break; // now actually run the thread; DaH Qu'vam yImuHbej 
620
621     case FetchNode:
622       do_the_fetchnode(event);
623       goto next_thread;             /* handle next event in event queue  */
624       
625     case GlobalBlock:
626       do_the_globalblock(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case FetchReply:
630       do_the_fetchreply(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case UnblockThread:   /* Move from the blocked queue to the tail of */
634       do_the_unblock(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case ResumeThread:  /* Move from the blocked queue to the tail of */
638       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
639       event->tso->gran.blocktime += 
640         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
641       do_the_startthread(event);
642       goto next_thread;             /* handle next event in event queue  */
643       
644     case StartThread:
645       do_the_startthread(event);
646       goto next_thread;             /* handle next event in event queue  */
647       
648     case MoveThread:
649       do_the_movethread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case MoveSpark:
653       do_the_movespark(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case FindWork:
657       do_the_findwork(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     default:
661       barf("Illegal event type %u\n", event->evttype);
662     }  /* switch */
663     
664     /* This point was scheduler_loop in the old RTS */
665
666     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
667
668     TimeOfLastEvent = CurrentTime[CurrentProc];
669     TimeOfNextEvent = get_time_of_next_event();
670     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
671     // CurrentTSO = ThreadQueueHd;
672
673     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
674                          TimeOfNextEvent));
675
676     if (RtsFlags.GranFlags.Light) 
677       GranSimLight_leave_system(event, &ActiveTSO); 
678
679     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
680
681     IF_DEBUG(gran, 
682              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
683
684     /* in a GranSim setup the TSO stays on the run queue */
685     t = CurrentTSO;
686     /* Take a thread from the run queue. */
687     POP_RUN_QUEUE(t); // take_off_run_queue(t);
688
689     IF_DEBUG(gran, 
690              debugBelch("GRAN: About to run current thread, which is\n");
691              G_TSO(t,5));
692
693     context_switch = 0; // turned on via GranYield, checking events and time slice
694
695     IF_DEBUG(gran, 
696              DumpGranEvent(GR_SCHEDULE, t));
697
698     procStatus[CurrentProc] = Busy;
699
700 #elif defined(PAR)
701     if (PendingFetches != END_BF_QUEUE) {
702         processFetches();
703     }
704
705     /* ToDo: phps merge with spark activation above */
706     /* check whether we have local work and send requests if we have none */
707     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
708       /* :-[  no local threads => look out for local sparks */
709       /* the spark pool for the current PE */
710       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
711       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
712           pool->hd < pool->tl) {
713         /* 
714          * ToDo: add GC code check that we really have enough heap afterwards!!
715          * Old comment:
716          * If we're here (no runnable threads) and we have pending
717          * sparks, we must have a space problem.  Get enough space
718          * to turn one of those pending sparks into a
719          * thread... 
720          */
721
722         spark = findSpark(rtsFalse);                /* get a spark */
723         if (spark != (rtsSpark) NULL) {
724           tso = activateSpark(spark);       /* turn the spark into a thread */
725           IF_PAR_DEBUG(schedule,
726                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
727                              tso->id, tso, advisory_thread_count));
728
729           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
730             debugBelch("==^^ failed to activate spark\n");
731             goto next_thread;
732           }               /* otherwise fall through & pick-up new tso */
733         } else {
734           IF_PAR_DEBUG(verbose,
735                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
736                              spark_queue_len(pool)));
737           goto next_thread;
738         }
739       }
740
741       /* If we still have no work we need to send a FISH to get a spark
742          from another PE 
743       */
744       if (EMPTY_RUN_QUEUE()) {
745       /* =8-[  no local sparks => look for work on other PEs */
746         /*
747          * We really have absolutely no work.  Send out a fish
748          * (there may be some out there already), and wait for
749          * something to arrive.  We clearly can't run any threads
750          * until a SCHEDULE or RESUME arrives, and so that's what
751          * we're hoping to see.  (Of course, we still have to
752          * respond to other types of messages.)
753          */
754         TIME now = msTime() /*CURRENT_TIME*/;
755         IF_PAR_DEBUG(verbose, 
756                      debugBelch("--  now=%ld\n", now));
757         IF_PAR_DEBUG(verbose,
758                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
759                          (last_fish_arrived_at!=0 &&
760                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
761                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
762                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
763                              last_fish_arrived_at,
764                              RtsFlags.ParFlags.fishDelay, now);
765                      });
766         
767         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
768             (last_fish_arrived_at==0 ||
769              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
770           /* outstandingFishes is set in sendFish, processFish;
771              avoid flooding system with fishes via delay */
772           pe = choosePE();
773           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
774                    NEW_FISH_HUNGER);
775
776           // Global statistics: count no. of fishes
777           if (RtsFlags.ParFlags.ParStats.Global &&
778               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
779             globalParStats.tot_fish_mess++;
780           }
781         }
782       
783         receivedFinish = processMessages();
784         goto next_thread;
785       }
786     } else if (PacketsWaiting()) {  /* Look for incoming messages */
787       receivedFinish = processMessages();
788     }
789
790     /* Now we are sure that we have some work available */
791     ASSERT(run_queue_hd != END_TSO_QUEUE);
792
793     /* Take a thread from the run queue, if we have work */
794     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
795     IF_DEBUG(sanity,checkTSO(t));
796
797     /* ToDo: write something to the log-file
798     if (RTSflags.ParFlags.granSimStats && !sameThread)
799         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
800
801     CurrentTSO = t;
802     */
803     /* the spark pool for the current PE */
804     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
805
806     IF_DEBUG(scheduler, 
807              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
808                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
809
810 # if 1
811     if (0 && RtsFlags.ParFlags.ParStats.Full && 
812         t && LastTSO && t->id != LastTSO->id && 
813         LastTSO->why_blocked == NotBlocked && 
814         LastTSO->what_next != ThreadComplete) {
815       // if previously scheduled TSO not blocked we have to record the context switch
816       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
817                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
818     }
819
820     if (RtsFlags.ParFlags.ParStats.Full && 
821         (emitSchedule /* forced emit */ ||
822         (t && LastTSO && t->id != LastTSO->id))) {
823       /* 
824          we are running a different TSO, so write a schedule event to log file
825          NB: If we use fair scheduling we also have to write  a deschedule 
826              event for LastTSO; with unfair scheduling we know that the
827              previous tso has blocked whenever we switch to another tso, so
828              we don't need it in GUM for now
829       */
830       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
831                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
832       emitSchedule = rtsFalse;
833     }
834      
835 # endif
836 #else /* !GRAN && !PAR */
837   
838     // grab a thread from the run queue
839     ASSERT(run_queue_hd != END_TSO_QUEUE);
840     POP_RUN_QUEUE(t);
841
842     // Sanity check the thread we're about to run.  This can be
843     // expensive if there is lots of thread switching going on...
844     IF_DEBUG(sanity,checkTSO(t));
845 #endif
846
847 #ifdef THREADED_RTS
848     {
849       StgMainThread *m = t->main;
850       
851       if(m)
852       {
853         if(m == mainThread)
854         {
855           IF_DEBUG(scheduler,
856             sched_belch("### Running thread %d in bound thread", t->id));
857           // yes, the Haskell thread is bound to the current native thread
858         }
859         else
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### thread %d bound to another OS thread", t->id));
863           // no, bound to a different Haskell thread: pass to that thread
864           PUSH_ON_RUN_QUEUE(t);
865           passCapability(&m->bound_thread_cond);
866           continue;
867         }
868       }
869       else
870       {
871         if(mainThread != NULL)
872         // The thread we want to run is bound.
873         {
874           IF_DEBUG(scheduler,
875             sched_belch("### this OS thread cannot run thread %d", t->id));
876           // no, the current native thread is bound to a different
877           // Haskell thread, so pass it to any worker thread
878           PUSH_ON_RUN_QUEUE(t);
879           passCapabilityToWorker();
880           continue; 
881         }
882       }
883     }
884 #endif
885
886     cap->r.rCurrentTSO = t;
887     
888     /* context switches are now initiated by the timer signal, unless
889      * the user specified "context switch as often as possible", with
890      * +RTS -C0
891      */
892     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
893          && (run_queue_hd != END_TSO_QUEUE
894              || blocked_queue_hd != END_TSO_QUEUE
895              || sleeping_queue != END_TSO_QUEUE)))
896         context_switch = 1;
897
898 run_thread:
899
900     RELEASE_LOCK(&sched_mutex);
901
902     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
903                               (long)t->id, whatNext_strs[t->what_next]));
904
905 #ifdef PROFILING
906     startHeapProfTimer();
907 #endif
908
909     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
910     /* Run the current thread 
911      */
912     prev_what_next = t->what_next;
913
914     errno = t->saved_errno;
915     in_haskell = rtsTrue;
916
917     switch (prev_what_next) {
918
919     case ThreadKilled:
920     case ThreadComplete:
921         /* Thread already finished, return to scheduler. */
922         ret = ThreadFinished;
923         break;
924
925     case ThreadRunGHC:
926         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
927         break;
928
929     case ThreadInterpret:
930         ret = interpretBCO(cap);
931         break;
932
933     default:
934       barf("schedule: invalid what_next field");
935     }
936
937     in_haskell = rtsFalse;
938
939     // The TSO might have moved, so find the new location:
940     t = cap->r.rCurrentTSO;
941
942     // And save the current errno in this thread.
943     t->saved_errno = errno;
944
945     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
946     
947     /* Costs for the scheduler are assigned to CCS_SYSTEM */
948 #ifdef PROFILING
949     stopHeapProfTimer();
950     CCCS = CCS_SYSTEM;
951 #endif
952     
953     ACQUIRE_LOCK(&sched_mutex);
954     
955 #ifdef RTS_SUPPORTS_THREADS
956     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
957 #elif !defined(GRAN) && !defined(PAR)
958     IF_DEBUG(scheduler,debugBelch("sched: "););
959 #endif
960     
961 #if defined(PAR)
962     /* HACK 675: if the last thread didn't yield, make sure to print a 
963        SCHEDULE event to the log file when StgRunning the next thread, even
964        if it is the same one as before */
965     LastTSO = t; 
966     TimeOfLastYield = CURRENT_TIME;
967 #endif
968
969     switch (ret) {
970     case HeapOverflow:
971 #if defined(GRAN)
972       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
973       globalGranStats.tot_heapover++;
974 #elif defined(PAR)
975       globalParStats.tot_heapover++;
976 #endif
977
978       // did the task ask for a large block?
979       if (cap->r.rHpAlloc > BLOCK_SIZE) {
980           // if so, get one and push it on the front of the nursery.
981           bdescr *bd;
982           nat blocks;
983           
984           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
985
986           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
987                                    (long)t->id, whatNext_strs[t->what_next], blocks));
988
989           // don't do this if it would push us over the
990           // alloc_blocks_lim limit; we'll GC first.
991           if (alloc_blocks + blocks < alloc_blocks_lim) {
992
993               alloc_blocks += blocks;
994               bd = allocGroup( blocks );
995
996               // link the new group into the list
997               bd->link = cap->r.rCurrentNursery;
998               bd->u.back = cap->r.rCurrentNursery->u.back;
999               if (cap->r.rCurrentNursery->u.back != NULL) {
1000                   cap->r.rCurrentNursery->u.back->link = bd;
1001               } else {
1002                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1003                          g0s0->blocks == cap->r.rNursery);
1004                   cap->r.rNursery = g0s0->blocks = bd;
1005               }           
1006               cap->r.rCurrentNursery->u.back = bd;
1007
1008               // initialise it as a nursery block.  We initialise the
1009               // step, gen_no, and flags field of *every* sub-block in
1010               // this large block, because this is easier than making
1011               // sure that we always find the block head of a large
1012               // block whenever we call Bdescr() (eg. evacuate() and
1013               // isAlive() in the GC would both have to do this, at
1014               // least).
1015               { 
1016                   bdescr *x;
1017                   for (x = bd; x < bd + blocks; x++) {
1018                       x->step = g0s0;
1019                       x->gen_no = 0;
1020                       x->flags = 0;
1021                   }
1022               }
1023
1024               // don't forget to update the block count in g0s0.
1025               g0s0->n_blocks += blocks;
1026               // This assert can be a killer if the app is doing lots
1027               // of large block allocations.
1028               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1029
1030               // now update the nursery to point to the new block
1031               cap->r.rCurrentNursery = bd;
1032
1033               // we might be unlucky and have another thread get on the
1034               // run queue before us and steal the large block, but in that
1035               // case the thread will just end up requesting another large
1036               // block.
1037               PUSH_ON_RUN_QUEUE(t);
1038               break;
1039           }
1040       }
1041
1042       /* make all the running tasks block on a condition variable,
1043        * maybe set context_switch and wait till they all pile in,
1044        * then have them wait on a GC condition variable.
1045        */
1046       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1047                                (long)t->id, whatNext_strs[t->what_next]));
1048       threadPaused(t);
1049 #if defined(GRAN)
1050       ASSERT(!is_on_queue(t,CurrentProc));
1051 #elif defined(PAR)
1052       /* Currently we emit a DESCHEDULE event before GC in GUM.
1053          ToDo: either add separate event to distinguish SYSTEM time from rest
1054                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1055       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1056         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1057                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1058         emitSchedule = rtsTrue;
1059       }
1060 #endif
1061       
1062       ready_to_gc = rtsTrue;
1063       PUSH_ON_RUN_QUEUE(t);
1064       /* actual GC is done at the end of the while loop */
1065       break;
1066       
1067     case StackOverflow:
1068 #if defined(GRAN)
1069       IF_DEBUG(gran, 
1070                DumpGranEvent(GR_DESCHEDULE, t));
1071       globalGranStats.tot_stackover++;
1072 #elif defined(PAR)
1073       // IF_DEBUG(par, 
1074       // DumpGranEvent(GR_DESCHEDULE, t);
1075       globalParStats.tot_stackover++;
1076 #endif
1077       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1078                                (long)t->id, whatNext_strs[t->what_next]));
1079       /* just adjust the stack for this thread, then pop it back
1080        * on the run queue.
1081        */
1082       threadPaused(t);
1083       { 
1084         /* enlarge the stack */
1085         StgTSO *new_t = threadStackOverflow(t);
1086         
1087         /* This TSO has moved, so update any pointers to it from the
1088          * main thread stack.  It better not be on any other queues...
1089          * (it shouldn't be).
1090          */
1091         if (t->main != NULL) {
1092             t->main->tso = new_t;
1093         }
1094         PUSH_ON_RUN_QUEUE(new_t);
1095       }
1096       break;
1097
1098     case ThreadYielding:
1099       // Reset the context switch flag.  We don't do this just before
1100       // running the thread, because that would mean we would lose ticks
1101       // during GC, which can lead to unfair scheduling (a thread hogs
1102       // the CPU because the tick always arrives during GC).  This way
1103       // penalises threads that do a lot of allocation, but that seems
1104       // better than the alternative.
1105       context_switch = 0;
1106
1107 #if defined(GRAN)
1108       IF_DEBUG(gran, 
1109                DumpGranEvent(GR_DESCHEDULE, t));
1110       globalGranStats.tot_yields++;
1111 #elif defined(PAR)
1112       // IF_DEBUG(par, 
1113       // DumpGranEvent(GR_DESCHEDULE, t);
1114       globalParStats.tot_yields++;
1115 #endif
1116       /* put the thread back on the run queue.  Then, if we're ready to
1117        * GC, check whether this is the last task to stop.  If so, wake
1118        * up the GC thread.  getThread will block during a GC until the
1119        * GC is finished.
1120        */
1121       IF_DEBUG(scheduler,
1122                if (t->what_next != prev_what_next) {
1123                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1124                          (long)t->id, whatNext_strs[t->what_next]);
1125                } else {
1126                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1127                          (long)t->id, whatNext_strs[t->what_next]);
1128                }
1129                );
1130
1131       IF_DEBUG(sanity,
1132                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1133                checkTSO(t));
1134       ASSERT(t->link == END_TSO_QUEUE);
1135
1136       // Shortcut if we're just switching evaluators: don't bother
1137       // doing stack squeezing (which can be expensive), just run the
1138       // thread.
1139       if (t->what_next != prev_what_next) {
1140           goto run_thread;
1141       }
1142
1143       threadPaused(t);
1144
1145 #if defined(GRAN)
1146       ASSERT(!is_on_queue(t,CurrentProc));
1147
1148       IF_DEBUG(sanity,
1149                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1150                checkThreadQsSanity(rtsTrue));
1151 #endif
1152
1153 #if defined(PAR)
1154       if (RtsFlags.ParFlags.doFairScheduling) { 
1155         /* this does round-robin scheduling; good for concurrency */
1156         APPEND_TO_RUN_QUEUE(t);
1157       } else {
1158         /* this does unfair scheduling; good for parallelism */
1159         PUSH_ON_RUN_QUEUE(t);
1160       }
1161 #else
1162       // this does round-robin scheduling; good for concurrency
1163       APPEND_TO_RUN_QUEUE(t);
1164 #endif
1165
1166 #if defined(GRAN)
1167       /* add a ContinueThread event to actually process the thread */
1168       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1169                 ContinueThread,
1170                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1171       IF_GRAN_DEBUG(bq, 
1172                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1173                G_EVENTQ(0);
1174                G_CURR_THREADQ(0));
1175 #endif /* GRAN */
1176       break;
1177
1178     case ThreadBlocked:
1179 #if defined(GRAN)
1180       IF_DEBUG(scheduler,
1181                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1182                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1183                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1184
1185       // ??? needed; should emit block before
1186       IF_DEBUG(gran, 
1187                DumpGranEvent(GR_DESCHEDULE, t)); 
1188       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1189       /*
1190         ngoq Dogh!
1191       ASSERT(procStatus[CurrentProc]==Busy || 
1192               ((procStatus[CurrentProc]==Fetching) && 
1193               (t->block_info.closure!=(StgClosure*)NULL)));
1194       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1195           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1196             procStatus[CurrentProc]==Fetching)) 
1197         procStatus[CurrentProc] = Idle;
1198       */
1199 #elif defined(PAR)
1200       IF_DEBUG(scheduler,
1201                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1202                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1203       IF_PAR_DEBUG(bq,
1204
1205                    if (t->block_info.closure!=(StgClosure*)NULL) 
1206                      print_bq(t->block_info.closure));
1207
1208       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1209       blockThread(t);
1210
1211       /* whatever we schedule next, we must log that schedule */
1212       emitSchedule = rtsTrue;
1213
1214 #else /* !GRAN */
1215       /* don't need to do anything.  Either the thread is blocked on
1216        * I/O, in which case we'll have called addToBlockedQueue
1217        * previously, or it's blocked on an MVar or Blackhole, in which
1218        * case it'll be on the relevant queue already.
1219        */
1220       ASSERT(t->why_blocked != NotBlocked);
1221       IF_DEBUG(scheduler,
1222                debugBelch("--<< thread %d (%s) stopped: ", 
1223                        t->id, whatNext_strs[t->what_next]);
1224                printThreadBlockage(t);
1225                debugBelch("\n"));
1226
1227       /* Only for dumping event to log file 
1228          ToDo: do I need this in GranSim, too?
1229       blockThread(t);
1230       */
1231 #endif
1232       threadPaused(t);
1233       break;
1234
1235     case ThreadFinished:
1236       /* Need to check whether this was a main thread, and if so, signal
1237        * the task that started it with the return value.  If we have no
1238        * more main threads, we probably need to stop all the tasks until
1239        * we get a new one.
1240        */
1241       /* We also end up here if the thread kills itself with an
1242        * uncaught exception, see Exception.hc.
1243        */
1244       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1245                                t->id, whatNext_strs[t->what_next]));
1246 #if defined(GRAN)
1247       endThread(t, CurrentProc); // clean-up the thread
1248 #elif defined(PAR)
1249       /* For now all are advisory -- HWL */
1250       //if(t->priority==AdvisoryPriority) ??
1251       advisory_thread_count--;
1252       
1253 # ifdef DIST
1254       if(t->dist.priority==RevalPriority)
1255         FinishReval(t);
1256 # endif
1257       
1258       if (RtsFlags.ParFlags.ParStats.Full &&
1259           !RtsFlags.ParFlags.ParStats.Suppressed) 
1260         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1261 #endif
1262
1263       //
1264       // Check whether the thread that just completed was a main
1265       // thread, and if so return with the result.  
1266       //
1267       // There is an assumption here that all thread completion goes
1268       // through this point; we need to make sure that if a thread
1269       // ends up in the ThreadKilled state, that it stays on the run
1270       // queue so it can be dealt with here.
1271       //
1272       if (
1273 #if defined(RTS_SUPPORTS_THREADS)
1274           mainThread != NULL
1275 #else
1276           mainThread->tso == t
1277 #endif
1278           )
1279       {
1280           // We are a bound thread: this must be our thread that just
1281           // completed.
1282           ASSERT(mainThread->tso == t);
1283
1284           if (t->what_next == ThreadComplete) {
1285               if (mainThread->ret) {
1286                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1287                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1288               }
1289               mainThread->stat = Success;
1290           } else {
1291               if (mainThread->ret) {
1292                   *(mainThread->ret) = NULL;
1293               }
1294               if (was_interrupted) {
1295                   mainThread->stat = Interrupted;
1296               } else {
1297                   mainThread->stat = Killed;
1298               }
1299           }
1300 #ifdef DEBUG
1301           removeThreadLabel((StgWord)mainThread->tso->id);
1302 #endif
1303           if (mainThread->prev == NULL) {
1304               main_threads = mainThread->link;
1305           } else {
1306               mainThread->prev->link = mainThread->link;
1307           }
1308           if (mainThread->link != NULL) {
1309               mainThread->link->prev = NULL;
1310           }
1311           releaseCapability(cap);
1312           return;
1313       }
1314
1315 #ifdef RTS_SUPPORTS_THREADS
1316       ASSERT(t->main == NULL);
1317 #else
1318       if (t->main != NULL) {
1319           // Must be a main thread that is not the topmost one.  Leave
1320           // it on the run queue until the stack has unwound to the
1321           // point where we can deal with this.  Leaving it on the run
1322           // queue also ensures that the garbage collector knows about
1323           // this thread and its return value (it gets dropped from the
1324           // all_threads list so there's no other way to find it).
1325           APPEND_TO_RUN_QUEUE(t);
1326       }
1327 #endif
1328       break;
1329
1330     default:
1331       barf("schedule: invalid thread return code %d", (int)ret);
1332     }
1333
1334 #ifdef PROFILING
1335     // When we have +RTS -i0 and we're heap profiling, do a census at
1336     // every GC.  This lets us get repeatable runs for debugging.
1337     if (performHeapProfile ||
1338         (RtsFlags.ProfFlags.profileInterval==0 &&
1339          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1340         GarbageCollect(GetRoots, rtsTrue);
1341         heapCensus();
1342         performHeapProfile = rtsFalse;
1343         ready_to_gc = rtsFalse; // we already GC'd
1344     }
1345 #endif
1346
1347     if (ready_to_gc) {
1348       /* Kick any transactions which are invalid back to their atomically frames.
1349        * When next scheduled they will try to commit, this commit will fail and
1350        * they will retry. */
1351       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1352         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1353           if (!stmValidateTransaction (t -> trec)) {
1354             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1355
1356             // strip the stack back to the ATOMICALLY_FRAME, aborting
1357             // the (nested) transaction, and saving the stack of any
1358             // partially-evaluated thunks on the heap.
1359             raiseAsync_(t, NULL, rtsTrue);
1360             
1361 #ifdef REG_R1
1362             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1363 #endif
1364           }
1365         }
1366       }
1367
1368       /* everybody back, start the GC.
1369        * Could do it in this thread, or signal a condition var
1370        * to do it in another thread.  Either way, we need to
1371        * broadcast on gc_pending_cond afterward.
1372        */
1373 #if defined(RTS_SUPPORTS_THREADS)
1374       IF_DEBUG(scheduler,sched_belch("doing GC"));
1375 #endif
1376       GarbageCollect(GetRoots,rtsFalse);
1377       ready_to_gc = rtsFalse;
1378 #if defined(GRAN)
1379       /* add a ContinueThread event to continue execution of current thread */
1380       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1381                 ContinueThread,
1382                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1383       IF_GRAN_DEBUG(bq, 
1384                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1385                G_EVENTQ(0);
1386                G_CURR_THREADQ(0));
1387 #endif /* GRAN */
1388     }
1389
1390 #if defined(GRAN)
1391   next_thread:
1392     IF_GRAN_DEBUG(unused,
1393                   print_eventq(EventHd));
1394
1395     event = get_next_event();
1396 #elif defined(PAR)
1397   next_thread:
1398     /* ToDo: wait for next message to arrive rather than busy wait */
1399 #endif /* GRAN */
1400
1401   } /* end of while(1) */
1402
1403   IF_PAR_DEBUG(verbose,
1404                debugBelch("== Leaving schedule() after having received Finish\n"));
1405 }
1406
1407 /* ---------------------------------------------------------------------------
1408  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1409  * used by Control.Concurrent for error checking.
1410  * ------------------------------------------------------------------------- */
1411  
1412 StgBool
1413 rtsSupportsBoundThreads(void)
1414 {
1415 #ifdef THREADED_RTS
1416   return rtsTrue;
1417 #else
1418   return rtsFalse;
1419 #endif
1420 }
1421
1422 /* ---------------------------------------------------------------------------
1423  * isThreadBound(tso): check whether tso is bound to an OS thread.
1424  * ------------------------------------------------------------------------- */
1425  
1426 StgBool
1427 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1428 {
1429 #ifdef THREADED_RTS
1430   return (tso->main != NULL);
1431 #endif
1432   return rtsFalse;
1433 }
1434
1435 /* ---------------------------------------------------------------------------
1436  * Singleton fork(). Do not copy any running threads.
1437  * ------------------------------------------------------------------------- */
1438
1439 #ifndef mingw32_HOST_OS
1440 #define FORKPROCESS_PRIMOP_SUPPORTED
1441 #endif
1442
1443 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1444 static void 
1445 deleteThreadImmediately(StgTSO *tso);
1446 #endif
1447 StgInt
1448 forkProcess(HsStablePtr *entry
1449 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1450             STG_UNUSED
1451 #endif
1452            )
1453 {
1454 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1455   pid_t pid;
1456   StgTSO* t,*next;
1457   StgMainThread *m;
1458   SchedulerStatus rc;
1459
1460   IF_DEBUG(scheduler,sched_belch("forking!"));
1461   rts_lock(); // This not only acquires sched_mutex, it also
1462               // makes sure that no other threads are running
1463
1464   pid = fork();
1465
1466   if (pid) { /* parent */
1467
1468   /* just return the pid */
1469     rts_unlock();
1470     return pid;
1471     
1472   } else { /* child */
1473     
1474     
1475       // delete all threads
1476     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1477     
1478     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1479       next = t->link;
1480
1481         // don't allow threads to catch the ThreadKilled exception
1482       deleteThreadImmediately(t);
1483     }
1484     
1485       // wipe the main thread list
1486     while((m = main_threads) != NULL) {
1487       main_threads = m->link;
1488 # ifdef THREADED_RTS
1489       closeCondition(&m->bound_thread_cond);
1490 # endif
1491       stgFree(m);
1492     }
1493     
1494     rc = rts_evalStableIO(entry, NULL);  // run the action
1495     rts_checkSchedStatus("forkProcess",rc);
1496     
1497     rts_unlock();
1498     
1499     hs_exit();                      // clean up and exit
1500     stg_exit(0);
1501   }
1502 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1503   barf("forkProcess#: primop not supported, sorry!\n");
1504   return -1;
1505 #endif
1506 }
1507
1508 /* ---------------------------------------------------------------------------
1509  * deleteAllThreads():  kill all the live threads.
1510  *
1511  * This is used when we catch a user interrupt (^C), before performing
1512  * any necessary cleanups and running finalizers.
1513  *
1514  * Locks: sched_mutex held.
1515  * ------------------------------------------------------------------------- */
1516    
1517 void
1518 deleteAllThreads ( void )
1519 {
1520   StgTSO* t, *next;
1521   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1522   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1523       next = t->global_link;
1524       deleteThread(t);
1525   }      
1526
1527   // The run queue now contains a bunch of ThreadKilled threads.  We
1528   // must not throw these away: the main thread(s) will be in there
1529   // somewhere, and the main scheduler loop has to deal with it.
1530   // Also, the run queue is the only thing keeping these threads from
1531   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1532
1533   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1534   ASSERT(sleeping_queue == END_TSO_QUEUE);
1535 }
1536
1537 /* startThread and  insertThread are now in GranSim.c -- HWL */
1538
1539
1540 /* ---------------------------------------------------------------------------
1541  * Suspending & resuming Haskell threads.
1542  * 
1543  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1544  * its capability before calling the C function.  This allows another
1545  * task to pick up the capability and carry on running Haskell
1546  * threads.  It also means that if the C call blocks, it won't lock
1547  * the whole system.
1548  *
1549  * The Haskell thread making the C call is put to sleep for the
1550  * duration of the call, on the susepended_ccalling_threads queue.  We
1551  * give out a token to the task, which it can use to resume the thread
1552  * on return from the C function.
1553  * ------------------------------------------------------------------------- */
1554    
1555 StgInt
1556 suspendThread( StgRegTable *reg )
1557 {
1558   nat tok;
1559   Capability *cap;
1560   int saved_errno = errno;
1561
1562   /* assume that *reg is a pointer to the StgRegTable part
1563    * of a Capability.
1564    */
1565   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1566
1567   ACQUIRE_LOCK(&sched_mutex);
1568
1569   IF_DEBUG(scheduler,
1570            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1571
1572   // XXX this might not be necessary --SDM
1573   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1574
1575   threadPaused(cap->r.rCurrentTSO);
1576   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1577   suspended_ccalling_threads = cap->r.rCurrentTSO;
1578
1579   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1580       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1581       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1582   } else {
1583       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1584   }
1585
1586   /* Use the thread ID as the token; it should be unique */
1587   tok = cap->r.rCurrentTSO->id;
1588
1589   /* Hand back capability */
1590   releaseCapability(cap);
1591   
1592 #if defined(RTS_SUPPORTS_THREADS)
1593   /* Preparing to leave the RTS, so ensure there's a native thread/task
1594      waiting to take over.
1595   */
1596   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1597 #endif
1598
1599   in_haskell = rtsFalse;
1600   RELEASE_LOCK(&sched_mutex);
1601   
1602   errno = saved_errno;
1603   return tok; 
1604 }
1605
1606 StgRegTable *
1607 resumeThread( StgInt tok )
1608 {
1609   StgTSO *tso, **prev;
1610   Capability *cap;
1611   int saved_errno = errno;
1612
1613 #if defined(RTS_SUPPORTS_THREADS)
1614   /* Wait for permission to re-enter the RTS with the result. */
1615   ACQUIRE_LOCK(&sched_mutex);
1616   waitForReturnCapability(&sched_mutex, &cap);
1617
1618   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1619 #else
1620   grabCapability(&cap);
1621 #endif
1622
1623   /* Remove the thread off of the suspended list */
1624   prev = &suspended_ccalling_threads;
1625   for (tso = suspended_ccalling_threads; 
1626        tso != END_TSO_QUEUE; 
1627        prev = &tso->link, tso = tso->link) {
1628     if (tso->id == (StgThreadID)tok) {
1629       *prev = tso->link;
1630       break;
1631     }
1632   }
1633   if (tso == END_TSO_QUEUE) {
1634     barf("resumeThread: thread not found");
1635   }
1636   tso->link = END_TSO_QUEUE;
1637   
1638   if(tso->why_blocked == BlockedOnCCall) {
1639       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1640       tso->blocked_exceptions = NULL;
1641   }
1642   
1643   /* Reset blocking status */
1644   tso->why_blocked  = NotBlocked;
1645
1646   cap->r.rCurrentTSO = tso;
1647   in_haskell = rtsTrue;
1648   RELEASE_LOCK(&sched_mutex);
1649   errno = saved_errno;
1650   return &cap->r;
1651 }
1652
1653
1654 /* ---------------------------------------------------------------------------
1655  * Static functions
1656  * ------------------------------------------------------------------------ */
1657 static void unblockThread(StgTSO *tso);
1658
1659 /* ---------------------------------------------------------------------------
1660  * Comparing Thread ids.
1661  *
1662  * This is used from STG land in the implementation of the
1663  * instances of Eq/Ord for ThreadIds.
1664  * ------------------------------------------------------------------------ */
1665
1666 int
1667 cmp_thread(StgPtr tso1, StgPtr tso2) 
1668
1669   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1670   StgThreadID id2 = ((StgTSO *)tso2)->id;
1671  
1672   if (id1 < id2) return (-1);
1673   if (id1 > id2) return 1;
1674   return 0;
1675 }
1676
1677 /* ---------------------------------------------------------------------------
1678  * Fetching the ThreadID from an StgTSO.
1679  *
1680  * This is used in the implementation of Show for ThreadIds.
1681  * ------------------------------------------------------------------------ */
1682 int
1683 rts_getThreadId(StgPtr tso) 
1684 {
1685   return ((StgTSO *)tso)->id;
1686 }
1687
1688 #ifdef DEBUG
1689 void
1690 labelThread(StgPtr tso, char *label)
1691 {
1692   int len;
1693   void *buf;
1694
1695   /* Caveat: Once set, you can only set the thread name to "" */
1696   len = strlen(label)+1;
1697   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1698   strncpy(buf,label,len);
1699   /* Update will free the old memory for us */
1700   updateThreadLabel(((StgTSO *)tso)->id,buf);
1701 }
1702 #endif /* DEBUG */
1703
1704 /* ---------------------------------------------------------------------------
1705    Create a new thread.
1706
1707    The new thread starts with the given stack size.  Before the
1708    scheduler can run, however, this thread needs to have a closure
1709    (and possibly some arguments) pushed on its stack.  See
1710    pushClosure() in Schedule.h.
1711
1712    createGenThread() and createIOThread() (in SchedAPI.h) are
1713    convenient packaged versions of this function.
1714
1715    currently pri (priority) is only used in a GRAN setup -- HWL
1716    ------------------------------------------------------------------------ */
1717 #if defined(GRAN)
1718 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1719 StgTSO *
1720 createThread(nat size, StgInt pri)
1721 #else
1722 StgTSO *
1723 createThread(nat size)
1724 #endif
1725 {
1726
1727     StgTSO *tso;
1728     nat stack_size;
1729
1730     /* First check whether we should create a thread at all */
1731 #if defined(PAR)
1732   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1733   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1734     threadsIgnored++;
1735     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1736           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1737     return END_TSO_QUEUE;
1738   }
1739   threadsCreated++;
1740 #endif
1741
1742 #if defined(GRAN)
1743   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1744 #endif
1745
1746   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1747
1748   /* catch ridiculously small stack sizes */
1749   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1750     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1751   }
1752
1753   stack_size = size - TSO_STRUCT_SIZEW;
1754
1755   tso = (StgTSO *)allocate(size);
1756   TICK_ALLOC_TSO(stack_size, 0);
1757
1758   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1759 #if defined(GRAN)
1760   SET_GRAN_HDR(tso, ThisPE);
1761 #endif
1762
1763   // Always start with the compiled code evaluator
1764   tso->what_next = ThreadRunGHC;
1765
1766   tso->id = next_thread_id++; 
1767   tso->why_blocked  = NotBlocked;
1768   tso->blocked_exceptions = NULL;
1769
1770   tso->saved_errno = 0;
1771   tso->main = NULL;
1772   
1773   tso->stack_size   = stack_size;
1774   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1775                               - TSO_STRUCT_SIZEW;
1776   tso->sp           = (P_)&(tso->stack) + stack_size;
1777
1778   tso->trec = NO_TREC;
1779
1780 #ifdef PROFILING
1781   tso->prof.CCCS = CCS_MAIN;
1782 #endif
1783
1784   /* put a stop frame on the stack */
1785   tso->sp -= sizeofW(StgStopFrame);
1786   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1787   tso->link = END_TSO_QUEUE;
1788
1789   // ToDo: check this
1790 #if defined(GRAN)
1791   /* uses more flexible routine in GranSim */
1792   insertThread(tso, CurrentProc);
1793 #else
1794   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1795    * from its creation
1796    */
1797 #endif
1798
1799 #if defined(GRAN) 
1800   if (RtsFlags.GranFlags.GranSimStats.Full) 
1801     DumpGranEvent(GR_START,tso);
1802 #elif defined(PAR)
1803   if (RtsFlags.ParFlags.ParStats.Full) 
1804     DumpGranEvent(GR_STARTQ,tso);
1805   /* HACk to avoid SCHEDULE 
1806      LastTSO = tso; */
1807 #endif
1808
1809   /* Link the new thread on the global thread list.
1810    */
1811   tso->global_link = all_threads;
1812   all_threads = tso;
1813
1814 #if defined(DIST)
1815   tso->dist.priority = MandatoryPriority; //by default that is...
1816 #endif
1817
1818 #if defined(GRAN)
1819   tso->gran.pri = pri;
1820 # if defined(DEBUG)
1821   tso->gran.magic = TSO_MAGIC; // debugging only
1822 # endif
1823   tso->gran.sparkname   = 0;
1824   tso->gran.startedat   = CURRENT_TIME; 
1825   tso->gran.exported    = 0;
1826   tso->gran.basicblocks = 0;
1827   tso->gran.allocs      = 0;
1828   tso->gran.exectime    = 0;
1829   tso->gran.fetchtime   = 0;
1830   tso->gran.fetchcount  = 0;
1831   tso->gran.blocktime   = 0;
1832   tso->gran.blockcount  = 0;
1833   tso->gran.blockedat   = 0;
1834   tso->gran.globalsparks = 0;
1835   tso->gran.localsparks  = 0;
1836   if (RtsFlags.GranFlags.Light)
1837     tso->gran.clock  = Now; /* local clock */
1838   else
1839     tso->gran.clock  = 0;
1840
1841   IF_DEBUG(gran,printTSO(tso));
1842 #elif defined(PAR)
1843 # if defined(DEBUG)
1844   tso->par.magic = TSO_MAGIC; // debugging only
1845 # endif
1846   tso->par.sparkname   = 0;
1847   tso->par.startedat   = CURRENT_TIME; 
1848   tso->par.exported    = 0;
1849   tso->par.basicblocks = 0;
1850   tso->par.allocs      = 0;
1851   tso->par.exectime    = 0;
1852   tso->par.fetchtime   = 0;
1853   tso->par.fetchcount  = 0;
1854   tso->par.blocktime   = 0;
1855   tso->par.blockcount  = 0;
1856   tso->par.blockedat   = 0;
1857   tso->par.globalsparks = 0;
1858   tso->par.localsparks  = 0;
1859 #endif
1860
1861 #if defined(GRAN)
1862   globalGranStats.tot_threads_created++;
1863   globalGranStats.threads_created_on_PE[CurrentProc]++;
1864   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1865   globalGranStats.tot_sq_probes++;
1866 #elif defined(PAR)
1867   // collect parallel global statistics (currently done together with GC stats)
1868   if (RtsFlags.ParFlags.ParStats.Global &&
1869       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1870     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1871     globalParStats.tot_threads_created++;
1872   }
1873 #endif 
1874
1875 #if defined(GRAN)
1876   IF_GRAN_DEBUG(pri,
1877                 sched_belch("==__ schedule: Created TSO %d (%p);",
1878                       CurrentProc, tso, tso->id));
1879 #elif defined(PAR)
1880     IF_PAR_DEBUG(verbose,
1881                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1882                        (long)tso->id, tso, advisory_thread_count));
1883 #else
1884   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1885                                 (long)tso->id, (long)tso->stack_size));
1886 #endif    
1887   return tso;
1888 }
1889
1890 #if defined(PAR)
1891 /* RFP:
1892    all parallel thread creation calls should fall through the following routine.
1893 */
1894 StgTSO *
1895 createSparkThread(rtsSpark spark) 
1896 { StgTSO *tso;
1897   ASSERT(spark != (rtsSpark)NULL);
1898   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1899   { threadsIgnored++;
1900     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1901           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1902     return END_TSO_QUEUE;
1903   }
1904   else
1905   { threadsCreated++;
1906     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1907     if (tso==END_TSO_QUEUE)     
1908       barf("createSparkThread: Cannot create TSO");
1909 #if defined(DIST)
1910     tso->priority = AdvisoryPriority;
1911 #endif
1912     pushClosure(tso,spark);
1913     PUSH_ON_RUN_QUEUE(tso);
1914     advisory_thread_count++;    
1915   }
1916   return tso;
1917 }
1918 #endif
1919
1920 /*
1921   Turn a spark into a thread.
1922   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1923 */
1924 #if defined(PAR)
1925 StgTSO *
1926 activateSpark (rtsSpark spark) 
1927 {
1928   StgTSO *tso;
1929
1930   tso = createSparkThread(spark);
1931   if (RtsFlags.ParFlags.ParStats.Full) {   
1932     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1933     IF_PAR_DEBUG(verbose,
1934                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1935                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1936   }
1937   // ToDo: fwd info on local/global spark to thread -- HWL
1938   // tso->gran.exported =  spark->exported;
1939   // tso->gran.locked =   !spark->global;
1940   // tso->gran.sparkname = spark->name;
1941
1942   return tso;
1943 }
1944 #endif
1945
1946 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1947                                    Capability *initialCapability
1948                                    );
1949
1950
1951 /* ---------------------------------------------------------------------------
1952  * scheduleThread()
1953  *
1954  * scheduleThread puts a thread on the head of the runnable queue.
1955  * This will usually be done immediately after a thread is created.
1956  * The caller of scheduleThread must create the thread using e.g.
1957  * createThread and push an appropriate closure
1958  * on this thread's stack before the scheduler is invoked.
1959  * ------------------------------------------------------------------------ */
1960
1961 static void scheduleThread_ (StgTSO* tso);
1962
1963 void
1964 scheduleThread_(StgTSO *tso)
1965 {
1966   // The thread goes at the *end* of the run-queue, to avoid possible
1967   // starvation of any threads already on the queue.
1968   APPEND_TO_RUN_QUEUE(tso);
1969   threadRunnable();
1970 }
1971
1972 void
1973 scheduleThread(StgTSO* tso)
1974 {
1975   ACQUIRE_LOCK(&sched_mutex);
1976   scheduleThread_(tso);
1977   RELEASE_LOCK(&sched_mutex);
1978 }
1979
1980 #if defined(RTS_SUPPORTS_THREADS)
1981 static Condition bound_cond_cache;
1982 static int bound_cond_cache_full = 0;
1983 #endif
1984
1985
1986 SchedulerStatus
1987 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1988                    Capability *initialCapability)
1989 {
1990     // Precondition: sched_mutex must be held
1991     StgMainThread *m;
1992
1993     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1994     m->tso = tso;
1995     tso->main = m;
1996     m->ret = ret;
1997     m->stat = NoStatus;
1998     m->link = main_threads;
1999     m->prev = NULL;
2000     if (main_threads != NULL) {
2001         main_threads->prev = m;
2002     }
2003     main_threads = m;
2004
2005 #if defined(RTS_SUPPORTS_THREADS)
2006     // Allocating a new condition for each thread is expensive, so we
2007     // cache one.  This is a pretty feeble hack, but it helps speed up
2008     // consecutive call-ins quite a bit.
2009     if (bound_cond_cache_full) {
2010         m->bound_thread_cond = bound_cond_cache;
2011         bound_cond_cache_full = 0;
2012     } else {
2013         initCondition(&m->bound_thread_cond);
2014     }
2015 #endif
2016
2017     /* Put the thread on the main-threads list prior to scheduling the TSO.
2018        Failure to do so introduces a race condition in the MT case (as
2019        identified by Wolfgang Thaller), whereby the new task/OS thread 
2020        created by scheduleThread_() would complete prior to the thread
2021        that spawned it managed to put 'itself' on the main-threads list.
2022        The upshot of it all being that the worker thread wouldn't get to
2023        signal the completion of the its work item for the main thread to
2024        see (==> it got stuck waiting.)    -- sof 6/02.
2025     */
2026     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2027     
2028     APPEND_TO_RUN_QUEUE(tso);
2029     // NB. Don't call threadRunnable() here, because the thread is
2030     // bound and only runnable by *this* OS thread, so waking up other
2031     // workers will just slow things down.
2032
2033     return waitThread_(m, initialCapability);
2034 }
2035
2036 /* ---------------------------------------------------------------------------
2037  * initScheduler()
2038  *
2039  * Initialise the scheduler.  This resets all the queues - if the
2040  * queues contained any threads, they'll be garbage collected at the
2041  * next pass.
2042  *
2043  * ------------------------------------------------------------------------ */
2044
2045 void 
2046 initScheduler(void)
2047 {
2048 #if defined(GRAN)
2049   nat i;
2050
2051   for (i=0; i<=MAX_PROC; i++) {
2052     run_queue_hds[i]      = END_TSO_QUEUE;
2053     run_queue_tls[i]      = END_TSO_QUEUE;
2054     blocked_queue_hds[i]  = END_TSO_QUEUE;
2055     blocked_queue_tls[i]  = END_TSO_QUEUE;
2056     ccalling_threadss[i]  = END_TSO_QUEUE;
2057     sleeping_queue        = END_TSO_QUEUE;
2058   }
2059 #else
2060   run_queue_hd      = END_TSO_QUEUE;
2061   run_queue_tl      = END_TSO_QUEUE;
2062   blocked_queue_hd  = END_TSO_QUEUE;
2063   blocked_queue_tl  = END_TSO_QUEUE;
2064   sleeping_queue    = END_TSO_QUEUE;
2065 #endif 
2066
2067   suspended_ccalling_threads  = END_TSO_QUEUE;
2068
2069   main_threads = NULL;
2070   all_threads  = END_TSO_QUEUE;
2071
2072   context_switch = 0;
2073   interrupted    = 0;
2074
2075   RtsFlags.ConcFlags.ctxtSwitchTicks =
2076       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2077       
2078 #if defined(RTS_SUPPORTS_THREADS)
2079   /* Initialise the mutex and condition variables used by
2080    * the scheduler. */
2081   initMutex(&sched_mutex);
2082   initMutex(&term_mutex);
2083 #endif
2084   
2085   ACQUIRE_LOCK(&sched_mutex);
2086
2087   /* A capability holds the state a native thread needs in
2088    * order to execute STG code. At least one capability is
2089    * floating around (only SMP builds have more than one).
2090    */
2091   initCapabilities();
2092   
2093 #if defined(RTS_SUPPORTS_THREADS)
2094     /* start our haskell execution tasks */
2095     startTaskManager(0,taskStart);
2096 #endif
2097
2098 #if /* defined(SMP) ||*/ defined(PAR)
2099   initSparkPools();
2100 #endif
2101
2102   RELEASE_LOCK(&sched_mutex);
2103 }
2104
2105 void
2106 exitScheduler( void )
2107 {
2108 #if defined(RTS_SUPPORTS_THREADS)
2109   stopTaskManager();
2110 #endif
2111   shutting_down_scheduler = rtsTrue;
2112 }
2113
2114 /* ----------------------------------------------------------------------------
2115    Managing the per-task allocation areas.
2116    
2117    Each capability comes with an allocation area.  These are
2118    fixed-length block lists into which allocation can be done.
2119
2120    ToDo: no support for two-space collection at the moment???
2121    ------------------------------------------------------------------------- */
2122
2123 static
2124 SchedulerStatus
2125 waitThread_(StgMainThread* m, Capability *initialCapability)
2126 {
2127   SchedulerStatus stat;
2128
2129   // Precondition: sched_mutex must be held.
2130   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2131
2132 #if defined(GRAN)
2133   /* GranSim specific init */
2134   CurrentTSO = m->tso;                // the TSO to run
2135   procStatus[MainProc] = Busy;        // status of main PE
2136   CurrentProc = MainProc;             // PE to run it on
2137   schedule(m,initialCapability);
2138 #else
2139   schedule(m,initialCapability);
2140   ASSERT(m->stat != NoStatus);
2141 #endif
2142
2143   stat = m->stat;
2144
2145 #if defined(RTS_SUPPORTS_THREADS)
2146   // Free the condition variable, returning it to the cache if possible.
2147   if (!bound_cond_cache_full) {
2148       bound_cond_cache = m->bound_thread_cond;
2149       bound_cond_cache_full = 1;
2150   } else {
2151       closeCondition(&m->bound_thread_cond);
2152   }
2153 #endif
2154
2155   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2156   stgFree(m);
2157
2158   // Postcondition: sched_mutex still held
2159   return stat;
2160 }
2161
2162 /* ---------------------------------------------------------------------------
2163    Where are the roots that we know about?
2164
2165         - all the threads on the runnable queue
2166         - all the threads on the blocked queue
2167         - all the threads on the sleeping queue
2168         - all the thread currently executing a _ccall_GC
2169         - all the "main threads"
2170      
2171    ------------------------------------------------------------------------ */
2172
2173 /* This has to be protected either by the scheduler monitor, or by the
2174         garbage collection monitor (probably the latter).
2175         KH @ 25/10/99
2176 */
2177
2178 void
2179 GetRoots( evac_fn evac )
2180 {
2181 #if defined(GRAN)
2182   {
2183     nat i;
2184     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2185       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2186           evac((StgClosure **)&run_queue_hds[i]);
2187       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2188           evac((StgClosure **)&run_queue_tls[i]);
2189       
2190       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2191           evac((StgClosure **)&blocked_queue_hds[i]);
2192       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2193           evac((StgClosure **)&blocked_queue_tls[i]);
2194       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2195           evac((StgClosure **)&ccalling_threads[i]);
2196     }
2197   }
2198
2199   markEventQueue();
2200
2201 #else /* !GRAN */
2202   if (run_queue_hd != END_TSO_QUEUE) {
2203       ASSERT(run_queue_tl != END_TSO_QUEUE);
2204       evac((StgClosure **)&run_queue_hd);
2205       evac((StgClosure **)&run_queue_tl);
2206   }
2207   
2208   if (blocked_queue_hd != END_TSO_QUEUE) {
2209       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2210       evac((StgClosure **)&blocked_queue_hd);
2211       evac((StgClosure **)&blocked_queue_tl);
2212   }
2213   
2214   if (sleeping_queue != END_TSO_QUEUE) {
2215       evac((StgClosure **)&sleeping_queue);
2216   }
2217 #endif 
2218
2219   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2220       evac((StgClosure **)&suspended_ccalling_threads);
2221   }
2222
2223 #if defined(PAR) || defined(GRAN)
2224   markSparkQueue(evac);
2225 #endif
2226
2227 #if defined(RTS_USER_SIGNALS)
2228   // mark the signal handlers (signals should be already blocked)
2229   markSignalHandlers(evac);
2230 #endif
2231 }
2232
2233 /* -----------------------------------------------------------------------------
2234    performGC
2235
2236    This is the interface to the garbage collector from Haskell land.
2237    We provide this so that external C code can allocate and garbage
2238    collect when called from Haskell via _ccall_GC.
2239
2240    It might be useful to provide an interface whereby the programmer
2241    can specify more roots (ToDo).
2242    
2243    This needs to be protected by the GC condition variable above.  KH.
2244    -------------------------------------------------------------------------- */
2245
2246 static void (*extra_roots)(evac_fn);
2247
2248 void
2249 performGC(void)
2250 {
2251   /* Obligated to hold this lock upon entry */
2252   ACQUIRE_LOCK(&sched_mutex);
2253   GarbageCollect(GetRoots,rtsFalse);
2254   RELEASE_LOCK(&sched_mutex);
2255 }
2256
2257 void
2258 performMajorGC(void)
2259 {
2260   ACQUIRE_LOCK(&sched_mutex);
2261   GarbageCollect(GetRoots,rtsTrue);
2262   RELEASE_LOCK(&sched_mutex);
2263 }
2264
2265 static void
2266 AllRoots(evac_fn evac)
2267 {
2268     GetRoots(evac);             // the scheduler's roots
2269     extra_roots(evac);          // the user's roots
2270 }
2271
2272 void
2273 performGCWithRoots(void (*get_roots)(evac_fn))
2274 {
2275   ACQUIRE_LOCK(&sched_mutex);
2276   extra_roots = get_roots;
2277   GarbageCollect(AllRoots,rtsFalse);
2278   RELEASE_LOCK(&sched_mutex);
2279 }
2280
2281 /* -----------------------------------------------------------------------------
2282    Stack overflow
2283
2284    If the thread has reached its maximum stack size, then raise the
2285    StackOverflow exception in the offending thread.  Otherwise
2286    relocate the TSO into a larger chunk of memory and adjust its stack
2287    size appropriately.
2288    -------------------------------------------------------------------------- */
2289
2290 static StgTSO *
2291 threadStackOverflow(StgTSO *tso)
2292 {
2293   nat new_stack_size, new_tso_size, stack_words;
2294   StgPtr new_sp;
2295   StgTSO *dest;
2296
2297   IF_DEBUG(sanity,checkTSO(tso));
2298   if (tso->stack_size >= tso->max_stack_size) {
2299
2300     IF_DEBUG(gc,
2301              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2302                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2303              /* If we're debugging, just print out the top of the stack */
2304              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2305                                               tso->sp+64)));
2306
2307     /* Send this thread the StackOverflow exception */
2308     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2309     return tso;
2310   }
2311
2312   /* Try to double the current stack size.  If that takes us over the
2313    * maximum stack size for this thread, then use the maximum instead.
2314    * Finally round up so the TSO ends up as a whole number of blocks.
2315    */
2316   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2317   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2318                                        TSO_STRUCT_SIZE)/sizeof(W_);
2319   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2320   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2321
2322   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2323
2324   dest = (StgTSO *)allocate(new_tso_size);
2325   TICK_ALLOC_TSO(new_stack_size,0);
2326
2327   /* copy the TSO block and the old stack into the new area */
2328   memcpy(dest,tso,TSO_STRUCT_SIZE);
2329   stack_words = tso->stack + tso->stack_size - tso->sp;
2330   new_sp = (P_)dest + new_tso_size - stack_words;
2331   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2332
2333   /* relocate the stack pointers... */
2334   dest->sp         = new_sp;
2335   dest->stack_size = new_stack_size;
2336         
2337   /* Mark the old TSO as relocated.  We have to check for relocated
2338    * TSOs in the garbage collector and any primops that deal with TSOs.
2339    *
2340    * It's important to set the sp value to just beyond the end
2341    * of the stack, so we don't attempt to scavenge any part of the
2342    * dead TSO's stack.
2343    */
2344   tso->what_next = ThreadRelocated;
2345   tso->link = dest;
2346   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2347   tso->why_blocked = NotBlocked;
2348
2349   IF_PAR_DEBUG(verbose,
2350                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2351                      tso->id, tso, tso->stack_size);
2352                /* If we're debugging, just print out the top of the stack */
2353                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2354                                                 tso->sp+64)));
2355   
2356   IF_DEBUG(sanity,checkTSO(tso));
2357 #if 0
2358   IF_DEBUG(scheduler,printTSO(dest));
2359 #endif
2360
2361   return dest;
2362 }
2363
2364 /* ---------------------------------------------------------------------------
2365    Wake up a queue that was blocked on some resource.
2366    ------------------------------------------------------------------------ */
2367
2368 #if defined(GRAN)
2369 STATIC_INLINE void
2370 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2371 {
2372 }
2373 #elif defined(PAR)
2374 STATIC_INLINE void
2375 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2376 {
2377   /* write RESUME events to log file and
2378      update blocked and fetch time (depending on type of the orig closure) */
2379   if (RtsFlags.ParFlags.ParStats.Full) {
2380     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2381                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2382                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2383     if (EMPTY_RUN_QUEUE())
2384       emitSchedule = rtsTrue;
2385
2386     switch (get_itbl(node)->type) {
2387         case FETCH_ME_BQ:
2388           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2389           break;
2390         case RBH:
2391         case FETCH_ME:
2392         case BLACKHOLE_BQ:
2393           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2394           break;
2395 #ifdef DIST
2396         case MVAR:
2397           break;
2398 #endif    
2399         default:
2400           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2401         }
2402       }
2403 }
2404 #endif
2405
2406 #if defined(GRAN)
2407 static StgBlockingQueueElement *
2408 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2409 {
2410     StgTSO *tso;
2411     PEs node_loc, tso_loc;
2412
2413     node_loc = where_is(node); // should be lifted out of loop
2414     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2415     tso_loc = where_is((StgClosure *)tso);
2416     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2417       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2418       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2419       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2420       // insertThread(tso, node_loc);
2421       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2422                 ResumeThread,
2423                 tso, node, (rtsSpark*)NULL);
2424       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2425       // len_local++;
2426       // len++;
2427     } else { // TSO is remote (actually should be FMBQ)
2428       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2429                                   RtsFlags.GranFlags.Costs.gunblocktime +
2430                                   RtsFlags.GranFlags.Costs.latency;
2431       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2432                 UnblockThread,
2433                 tso, node, (rtsSpark*)NULL);
2434       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2435       // len++;
2436     }
2437     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2438     IF_GRAN_DEBUG(bq,
2439                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2440                           (node_loc==tso_loc ? "Local" : "Global"), 
2441                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2442     tso->block_info.closure = NULL;
2443     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2444                              tso->id, tso));
2445 }
2446 #elif defined(PAR)
2447 static StgBlockingQueueElement *
2448 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2449 {
2450     StgBlockingQueueElement *next;
2451
2452     switch (get_itbl(bqe)->type) {
2453     case TSO:
2454       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2455       /* if it's a TSO just push it onto the run_queue */
2456       next = bqe->link;
2457       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2458       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2459       threadRunnable();
2460       unblockCount(bqe, node);
2461       /* reset blocking status after dumping event */
2462       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2463       break;
2464
2465     case BLOCKED_FETCH:
2466       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2467       next = bqe->link;
2468       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2469       PendingFetches = (StgBlockedFetch *)bqe;
2470       break;
2471
2472 # if defined(DEBUG)
2473       /* can ignore this case in a non-debugging setup; 
2474          see comments on RBHSave closures above */
2475     case CONSTR:
2476       /* check that the closure is an RBHSave closure */
2477       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2478              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2479              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2480       break;
2481
2482     default:
2483       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2484            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2485            (StgClosure *)bqe);
2486 # endif
2487     }
2488   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2489   return next;
2490 }
2491
2492 #else /* !GRAN && !PAR */
2493 static StgTSO *
2494 unblockOneLocked(StgTSO *tso)
2495 {
2496   StgTSO *next;
2497
2498   ASSERT(get_itbl(tso)->type == TSO);
2499   ASSERT(tso->why_blocked != NotBlocked);
2500   tso->why_blocked = NotBlocked;
2501   next = tso->link;
2502   tso->link = END_TSO_QUEUE;
2503   APPEND_TO_RUN_QUEUE(tso);
2504   threadRunnable();
2505   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2506   return next;
2507 }
2508 #endif
2509
2510 #if defined(GRAN) || defined(PAR)
2511 INLINE_ME StgBlockingQueueElement *
2512 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2513 {
2514   ACQUIRE_LOCK(&sched_mutex);
2515   bqe = unblockOneLocked(bqe, node);
2516   RELEASE_LOCK(&sched_mutex);
2517   return bqe;
2518 }
2519 #else
2520 INLINE_ME StgTSO *
2521 unblockOne(StgTSO *tso)
2522 {
2523   ACQUIRE_LOCK(&sched_mutex);
2524   tso = unblockOneLocked(tso);
2525   RELEASE_LOCK(&sched_mutex);
2526   return tso;
2527 }
2528 #endif
2529
2530 #if defined(GRAN)
2531 void 
2532 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2533 {
2534   StgBlockingQueueElement *bqe;
2535   PEs node_loc;
2536   nat len = 0; 
2537
2538   IF_GRAN_DEBUG(bq, 
2539                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2540                       node, CurrentProc, CurrentTime[CurrentProc], 
2541                       CurrentTSO->id, CurrentTSO));
2542
2543   node_loc = where_is(node);
2544
2545   ASSERT(q == END_BQ_QUEUE ||
2546          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2547          get_itbl(q)->type == CONSTR); // closure (type constructor)
2548   ASSERT(is_unique(node));
2549
2550   /* FAKE FETCH: magically copy the node to the tso's proc;
2551      no Fetch necessary because in reality the node should not have been 
2552      moved to the other PE in the first place
2553   */
2554   if (CurrentProc!=node_loc) {
2555     IF_GRAN_DEBUG(bq, 
2556                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2557                         node, node_loc, CurrentProc, CurrentTSO->id, 
2558                         // CurrentTSO, where_is(CurrentTSO),
2559                         node->header.gran.procs));
2560     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2561     IF_GRAN_DEBUG(bq, 
2562                   debugBelch("## new bitmask of node %p is %#x\n",
2563                         node, node->header.gran.procs));
2564     if (RtsFlags.GranFlags.GranSimStats.Global) {
2565       globalGranStats.tot_fake_fetches++;
2566     }
2567   }
2568
2569   bqe = q;
2570   // ToDo: check: ASSERT(CurrentProc==node_loc);
2571   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2572     //next = bqe->link;
2573     /* 
2574        bqe points to the current element in the queue
2575        next points to the next element in the queue
2576     */
2577     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2578     //tso_loc = where_is(tso);
2579     len++;
2580     bqe = unblockOneLocked(bqe, node);
2581   }
2582
2583   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2584      the closure to make room for the anchor of the BQ */
2585   if (bqe!=END_BQ_QUEUE) {
2586     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2587     /*
2588     ASSERT((info_ptr==&RBH_Save_0_info) ||
2589            (info_ptr==&RBH_Save_1_info) ||
2590            (info_ptr==&RBH_Save_2_info));
2591     */
2592     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2593     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2594     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2595
2596     IF_GRAN_DEBUG(bq,
2597                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2598                         node, info_type(node)));
2599   }
2600
2601   /* statistics gathering */
2602   if (RtsFlags.GranFlags.GranSimStats.Global) {
2603     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2604     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2605     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2606     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2607   }
2608   IF_GRAN_DEBUG(bq,
2609                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2610                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2611 }
2612 #elif defined(PAR)
2613 void 
2614 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2615 {
2616   StgBlockingQueueElement *bqe;
2617
2618   ACQUIRE_LOCK(&sched_mutex);
2619
2620   IF_PAR_DEBUG(verbose, 
2621                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2622                      node, mytid));
2623 #ifdef DIST  
2624   //RFP
2625   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2626     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2627     return;
2628   }
2629 #endif
2630   
2631   ASSERT(q == END_BQ_QUEUE ||
2632          get_itbl(q)->type == TSO ||           
2633          get_itbl(q)->type == BLOCKED_FETCH || 
2634          get_itbl(q)->type == CONSTR); 
2635
2636   bqe = q;
2637   while (get_itbl(bqe)->type==TSO || 
2638          get_itbl(bqe)->type==BLOCKED_FETCH) {
2639     bqe = unblockOneLocked(bqe, node);
2640   }
2641   RELEASE_LOCK(&sched_mutex);
2642 }
2643
2644 #else   /* !GRAN && !PAR */
2645
2646 void
2647 awakenBlockedQueueNoLock(StgTSO *tso)
2648 {
2649   while (tso != END_TSO_QUEUE) {
2650     tso = unblockOneLocked(tso);
2651   }
2652 }
2653
2654 void
2655 awakenBlockedQueue(StgTSO *tso)
2656 {
2657   ACQUIRE_LOCK(&sched_mutex);
2658   while (tso != END_TSO_QUEUE) {
2659     tso = unblockOneLocked(tso);
2660   }
2661   RELEASE_LOCK(&sched_mutex);
2662 }
2663 #endif
2664
2665 /* ---------------------------------------------------------------------------
2666    Interrupt execution
2667    - usually called inside a signal handler so it mustn't do anything fancy.   
2668    ------------------------------------------------------------------------ */
2669
2670 void
2671 interruptStgRts(void)
2672 {
2673     interrupted    = 1;
2674     context_switch = 1;
2675 }
2676
2677 /* -----------------------------------------------------------------------------
2678    Unblock a thread
2679
2680    This is for use when we raise an exception in another thread, which
2681    may be blocked.
2682    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2683    -------------------------------------------------------------------------- */
2684
2685 #if defined(GRAN) || defined(PAR)
2686 /*
2687   NB: only the type of the blocking queue is different in GranSim and GUM
2688       the operations on the queue-elements are the same
2689       long live polymorphism!
2690
2691   Locks: sched_mutex is held upon entry and exit.
2692
2693 */
2694 static void
2695 unblockThread(StgTSO *tso)
2696 {
2697   StgBlockingQueueElement *t, **last;
2698
2699   switch (tso->why_blocked) {
2700
2701   case NotBlocked:
2702     return;  /* not blocked */
2703
2704   case BlockedOnSTM:
2705     // Be careful: nothing to do here!  We tell the scheduler that the thread
2706     // is runnable and we leave it to the stack-walking code to abort the 
2707     // transaction while unwinding the stack.  We should perhaps have a debugging
2708     // test to make sure that this really happens and that the 'zombie' transaction
2709     // does not get committed.
2710     goto done;
2711
2712   case BlockedOnMVar:
2713     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2714     {
2715       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2716       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2717
2718       last = (StgBlockingQueueElement **)&mvar->head;
2719       for (t = (StgBlockingQueueElement *)mvar->head; 
2720            t != END_BQ_QUEUE; 
2721            last = &t->link, last_tso = t, t = t->link) {
2722         if (t == (StgBlockingQueueElement *)tso) {
2723           *last = (StgBlockingQueueElement *)tso->link;
2724           if (mvar->tail == tso) {
2725             mvar->tail = (StgTSO *)last_tso;
2726           }
2727           goto done;
2728         }
2729       }
2730       barf("unblockThread (MVAR): TSO not found");
2731     }
2732
2733   case BlockedOnBlackHole:
2734     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2735     {
2736       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2737
2738       last = &bq->blocking_queue;
2739       for (t = bq->blocking_queue; 
2740            t != END_BQ_QUEUE; 
2741            last = &t->link, t = t->link) {
2742         if (t == (StgBlockingQueueElement *)tso) {
2743           *last = (StgBlockingQueueElement *)tso->link;
2744           goto done;
2745         }
2746       }
2747       barf("unblockThread (BLACKHOLE): TSO not found");
2748     }
2749
2750   case BlockedOnException:
2751     {
2752       StgTSO *target  = tso->block_info.tso;
2753
2754       ASSERT(get_itbl(target)->type == TSO);
2755
2756       if (target->what_next == ThreadRelocated) {
2757           target = target->link;
2758           ASSERT(get_itbl(target)->type == TSO);
2759       }
2760
2761       ASSERT(target->blocked_exceptions != NULL);
2762
2763       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2764       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2765            t != END_BQ_QUEUE; 
2766            last = &t->link, t = t->link) {
2767         ASSERT(get_itbl(t)->type == TSO);
2768         if (t == (StgBlockingQueueElement *)tso) {
2769           *last = (StgBlockingQueueElement *)tso->link;
2770           goto done;
2771         }
2772       }
2773       barf("unblockThread (Exception): TSO not found");
2774     }
2775
2776   case BlockedOnRead:
2777   case BlockedOnWrite:
2778 #if defined(mingw32_HOST_OS)
2779   case BlockedOnDoProc:
2780 #endif
2781     {
2782       /* take TSO off blocked_queue */
2783       StgBlockingQueueElement *prev = NULL;
2784       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2785            prev = t, t = t->link) {
2786         if (t == (StgBlockingQueueElement *)tso) {
2787           if (prev == NULL) {
2788             blocked_queue_hd = (StgTSO *)t->link;
2789             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2790               blocked_queue_tl = END_TSO_QUEUE;
2791             }
2792           } else {
2793             prev->link = t->link;
2794             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2795               blocked_queue_tl = (StgTSO *)prev;
2796             }
2797           }
2798           goto done;
2799         }
2800       }
2801       barf("unblockThread (I/O): TSO not found");
2802     }
2803
2804   case BlockedOnDelay:
2805     {
2806       /* take TSO off sleeping_queue */
2807       StgBlockingQueueElement *prev = NULL;
2808       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2809            prev = t, t = t->link) {
2810         if (t == (StgBlockingQueueElement *)tso) {
2811           if (prev == NULL) {
2812             sleeping_queue = (StgTSO *)t->link;
2813           } else {
2814             prev->link = t->link;
2815           }
2816           goto done;
2817         }
2818       }
2819       barf("unblockThread (delay): TSO not found");
2820     }
2821
2822   default:
2823     barf("unblockThread");
2824   }
2825
2826  done:
2827   tso->link = END_TSO_QUEUE;
2828   tso->why_blocked = NotBlocked;
2829   tso->block_info.closure = NULL;
2830   PUSH_ON_RUN_QUEUE(tso);
2831 }
2832 #else
2833 static void
2834 unblockThread(StgTSO *tso)
2835 {
2836   StgTSO *t, **last;
2837   
2838   /* To avoid locking unnecessarily. */
2839   if (tso->why_blocked == NotBlocked) {
2840     return;
2841   }
2842
2843   switch (tso->why_blocked) {
2844
2845   case BlockedOnSTM:
2846     // Be careful: nothing to do here!  We tell the scheduler that the thread
2847     // is runnable and we leave it to the stack-walking code to abort the 
2848     // transaction while unwinding the stack.  We should perhaps have a debugging
2849     // test to make sure that this really happens and that the 'zombie' transaction
2850     // does not get committed.
2851     goto done;
2852
2853   case BlockedOnMVar:
2854     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2855     {
2856       StgTSO *last_tso = END_TSO_QUEUE;
2857       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2858
2859       last = &mvar->head;
2860       for (t = mvar->head; t != END_TSO_QUEUE; 
2861            last = &t->link, last_tso = t, t = t->link) {
2862         if (t == tso) {
2863           *last = tso->link;
2864           if (mvar->tail == tso) {
2865             mvar->tail = last_tso;
2866           }
2867           goto done;
2868         }
2869       }
2870       barf("unblockThread (MVAR): TSO not found");
2871     }
2872
2873   case BlockedOnBlackHole:
2874     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2875     {
2876       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2877
2878       last = &bq->blocking_queue;
2879       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2880            last = &t->link, t = t->link) {
2881         if (t == tso) {
2882           *last = tso->link;
2883           goto done;
2884         }
2885       }
2886       barf("unblockThread (BLACKHOLE): TSO not found");
2887     }
2888
2889   case BlockedOnException:
2890     {
2891       StgTSO *target  = tso->block_info.tso;
2892
2893       ASSERT(get_itbl(target)->type == TSO);
2894
2895       while (target->what_next == ThreadRelocated) {
2896           target = target->link;
2897           ASSERT(get_itbl(target)->type == TSO);
2898       }
2899       
2900       ASSERT(target->blocked_exceptions != NULL);
2901
2902       last = &target->blocked_exceptions;
2903       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2904            last = &t->link, t = t->link) {
2905         ASSERT(get_itbl(t)->type == TSO);
2906         if (t == tso) {
2907           *last = tso->link;
2908           goto done;
2909         }
2910       }
2911       barf("unblockThread (Exception): TSO not found");
2912     }
2913
2914   case BlockedOnRead:
2915   case BlockedOnWrite:
2916 #if defined(mingw32_HOST_OS)
2917   case BlockedOnDoProc:
2918 #endif
2919     {
2920       StgTSO *prev = NULL;
2921       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2922            prev = t, t = t->link) {
2923         if (t == tso) {
2924           if (prev == NULL) {
2925             blocked_queue_hd = t->link;
2926             if (blocked_queue_tl == t) {
2927               blocked_queue_tl = END_TSO_QUEUE;
2928             }
2929           } else {
2930             prev->link = t->link;
2931             if (blocked_queue_tl == t) {
2932               blocked_queue_tl = prev;
2933             }
2934           }
2935           goto done;
2936         }
2937       }
2938       barf("unblockThread (I/O): TSO not found");
2939     }
2940
2941   case BlockedOnDelay:
2942     {
2943       StgTSO *prev = NULL;
2944       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2945            prev = t, t = t->link) {
2946         if (t == tso) {
2947           if (prev == NULL) {
2948             sleeping_queue = t->link;
2949           } else {
2950             prev->link = t->link;
2951           }
2952           goto done;
2953         }
2954       }
2955       barf("unblockThread (delay): TSO not found");
2956     }
2957
2958   default:
2959     barf("unblockThread");
2960   }
2961
2962  done:
2963   tso->link = END_TSO_QUEUE;
2964   tso->why_blocked = NotBlocked;
2965   tso->block_info.closure = NULL;
2966   APPEND_TO_RUN_QUEUE(tso);
2967 }
2968 #endif
2969
2970 /* -----------------------------------------------------------------------------
2971  * raiseAsync()
2972  *
2973  * The following function implements the magic for raising an
2974  * asynchronous exception in an existing thread.
2975  *
2976  * We first remove the thread from any queue on which it might be
2977  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2978  *
2979  * We strip the stack down to the innermost CATCH_FRAME, building
2980  * thunks in the heap for all the active computations, so they can 
2981  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2982  * an application of the handler to the exception, and push it on
2983  * the top of the stack.
2984  * 
2985  * How exactly do we save all the active computations?  We create an
2986  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2987  * AP_STACKs pushes everything from the corresponding update frame
2988  * upwards onto the stack.  (Actually, it pushes everything up to the
2989  * next update frame plus a pointer to the next AP_STACK object.
2990  * Entering the next AP_STACK object pushes more onto the stack until we
2991  * reach the last AP_STACK object - at which point the stack should look
2992  * exactly as it did when we killed the TSO and we can continue
2993  * execution by entering the closure on top of the stack.
2994  *
2995  * We can also kill a thread entirely - this happens if either (a) the 
2996  * exception passed to raiseAsync is NULL, or (b) there's no
2997  * CATCH_FRAME on the stack.  In either case, we strip the entire
2998  * stack and replace the thread with a zombie.
2999  *
3000  * Locks: sched_mutex held upon entry nor exit.
3001  *
3002  * -------------------------------------------------------------------------- */
3003  
3004 void 
3005 deleteThread(StgTSO *tso)
3006 {
3007   if (tso->why_blocked != BlockedOnCCall &&
3008       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3009       raiseAsync(tso,NULL);
3010   }
3011 }
3012
3013 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3014 static void 
3015 deleteThreadImmediately(StgTSO *tso)
3016 { // for forkProcess only:
3017   // delete thread without giving it a chance to catch the KillThread exception
3018
3019   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3020       return;
3021   }
3022
3023   if (tso->why_blocked != BlockedOnCCall &&
3024       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3025     unblockThread(tso);
3026   }
3027
3028   tso->what_next = ThreadKilled;
3029 }
3030 #endif
3031
3032 void
3033 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3034 {
3035   /* When raising async exs from contexts where sched_mutex isn't held;
3036      use raiseAsyncWithLock(). */
3037   ACQUIRE_LOCK(&sched_mutex);
3038   raiseAsync(tso,exception);
3039   RELEASE_LOCK(&sched_mutex);
3040 }
3041
3042 void
3043 raiseAsync(StgTSO *tso, StgClosure *exception)
3044 {
3045     raiseAsync_(tso, exception, rtsFalse);
3046 }
3047
3048 static void
3049 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3050 {
3051     StgRetInfoTable *info;
3052     StgPtr sp;
3053   
3054     // Thread already dead?
3055     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3056         return;
3057     }
3058
3059     IF_DEBUG(scheduler, 
3060              sched_belch("raising exception in thread %ld.", (long)tso->id));
3061     
3062     // Remove it from any blocking queues
3063     unblockThread(tso);
3064
3065     sp = tso->sp;
3066     
3067     // The stack freezing code assumes there's a closure pointer on
3068     // the top of the stack, so we have to arrange that this is the case...
3069     //
3070     if (sp[0] == (W_)&stg_enter_info) {
3071         sp++;
3072     } else {
3073         sp--;
3074         sp[0] = (W_)&stg_dummy_ret_closure;
3075     }
3076
3077     while (1) {
3078         nat i;
3079
3080         // 1. Let the top of the stack be the "current closure"
3081         //
3082         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3083         // CATCH_FRAME.
3084         //
3085         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3086         // current closure applied to the chunk of stack up to (but not
3087         // including) the update frame.  This closure becomes the "current
3088         // closure".  Go back to step 2.
3089         //
3090         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3091         // top of the stack applied to the exception.
3092         // 
3093         // 5. If it's a STOP_FRAME, then kill the thread.
3094         // 
3095         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3096         // transaction
3097        
3098         
3099         StgPtr frame;
3100         
3101         frame = sp + 1;
3102         info = get_ret_itbl((StgClosure *)frame);
3103         
3104         while (info->i.type != UPDATE_FRAME
3105                && (info->i.type != CATCH_FRAME || exception == NULL)
3106                && info->i.type != STOP_FRAME
3107                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3108         {
3109             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3110               // IF we find an ATOMICALLY_FRAME then we abort the
3111               // current transaction and propagate the exception.  In
3112               // this case (unlike ordinary exceptions) we do not care
3113               // whether the transaction is valid or not because its
3114               // possible validity cannot have caused the exception
3115               // and will not be visible after the abort.
3116               IF_DEBUG(stm,
3117                        debugBelch("Found atomically block delivering async exception\n"));
3118               stmAbortTransaction(tso -> trec);
3119               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3120             }
3121             frame += stack_frame_sizeW((StgClosure *)frame);
3122             info = get_ret_itbl((StgClosure *)frame);
3123         }
3124         
3125         switch (info->i.type) {
3126             
3127         case ATOMICALLY_FRAME:
3128             ASSERT(stop_at_atomically);
3129             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3130             stmCondemnTransaction(tso -> trec);
3131 #ifdef REG_R1
3132             tso->sp = frame;
3133 #else
3134             // R1 is not a register: the return convention for IO in
3135             // this case puts the return value on the stack, so we
3136             // need to set up the stack to return to the atomically
3137             // frame properly...
3138             tso->sp = frame - 2;
3139             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3140             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3141 #endif
3142             tso->what_next = ThreadRunGHC;
3143             return;
3144
3145         case CATCH_FRAME:
3146             // If we find a CATCH_FRAME, and we've got an exception to raise,
3147             // then build the THUNK raise(exception), and leave it on
3148             // top of the CATCH_FRAME ready to enter.
3149             //
3150         {
3151 #ifdef PROFILING
3152             StgCatchFrame *cf = (StgCatchFrame *)frame;
3153 #endif
3154             StgClosure *raise;
3155             
3156             // we've got an exception to raise, so let's pass it to the
3157             // handler in this frame.
3158             //
3159             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3160             TICK_ALLOC_SE_THK(1,0);
3161             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3162             raise->payload[0] = exception;
3163             
3164             // throw away the stack from Sp up to the CATCH_FRAME.
3165             //
3166             sp = frame - 1;
3167             
3168             /* Ensure that async excpetions are blocked now, so we don't get
3169              * a surprise exception before we get around to executing the
3170              * handler.
3171              */
3172             if (tso->blocked_exceptions == NULL) {
3173                 tso->blocked_exceptions = END_TSO_QUEUE;
3174             }
3175             
3176             /* Put the newly-built THUNK on top of the stack, ready to execute
3177              * when the thread restarts.
3178              */
3179             sp[0] = (W_)raise;
3180             sp[-1] = (W_)&stg_enter_info;
3181             tso->sp = sp-1;
3182             tso->what_next = ThreadRunGHC;
3183             IF_DEBUG(sanity, checkTSO(tso));
3184             return;
3185         }
3186         
3187         case UPDATE_FRAME:
3188         {
3189             StgAP_STACK * ap;
3190             nat words;
3191             
3192             // First build an AP_STACK consisting of the stack chunk above the
3193             // current update frame, with the top word on the stack as the
3194             // fun field.
3195             //
3196             words = frame - sp - 1;
3197             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3198             
3199             ap->size = words;
3200             ap->fun  = (StgClosure *)sp[0];
3201             sp++;
3202             for(i=0; i < (nat)words; ++i) {
3203                 ap->payload[i] = (StgClosure *)*sp++;
3204             }
3205             
3206             SET_HDR(ap,&stg_AP_STACK_info,
3207                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3208             TICK_ALLOC_UP_THK(words+1,0);
3209             
3210             IF_DEBUG(scheduler,
3211                      debugBelch("sched: Updating ");
3212                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3213                      debugBelch(" with ");
3214                      printObj((StgClosure *)ap);
3215                 );
3216
3217             // Replace the updatee with an indirection - happily
3218             // this will also wake up any threads currently
3219             // waiting on the result.
3220             //
3221             // Warning: if we're in a loop, more than one update frame on
3222             // the stack may point to the same object.  Be careful not to
3223             // overwrite an IND_OLDGEN in this case, because we'll screw
3224             // up the mutable lists.  To be on the safe side, don't
3225             // overwrite any kind of indirection at all.  See also
3226             // threadSqueezeStack in GC.c, where we have to make a similar
3227             // check.
3228             //
3229             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3230                 // revert the black hole
3231                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3232                                (StgClosure *)ap);
3233             }
3234             sp += sizeofW(StgUpdateFrame) - 1;
3235             sp[0] = (W_)ap; // push onto stack
3236             break;
3237         }
3238         
3239         case STOP_FRAME:
3240             // We've stripped the entire stack, the thread is now dead.
3241             sp += sizeofW(StgStopFrame);
3242             tso->what_next = ThreadKilled;
3243             tso->sp = sp;
3244             return;
3245             
3246         default:
3247             barf("raiseAsync");
3248         }
3249     }
3250     barf("raiseAsync");
3251 }
3252
3253 /* -----------------------------------------------------------------------------
3254    raiseExceptionHelper
3255    
3256    This function is called by the raise# primitve, just so that we can
3257    move some of the tricky bits of raising an exception from C-- into
3258    C.  Who knows, it might be a useful re-useable thing here too.
3259    -------------------------------------------------------------------------- */
3260
3261 StgWord
3262 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3263 {
3264     StgClosure *raise_closure = NULL;
3265     StgPtr p, next;
3266     StgRetInfoTable *info;
3267     //
3268     // This closure represents the expression 'raise# E' where E
3269     // is the exception raise.  It is used to overwrite all the
3270     // thunks which are currently under evaluataion.
3271     //
3272
3273     //    
3274     // LDV profiling: stg_raise_info has THUNK as its closure
3275     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3276     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3277     // 1 does not cause any problem unless profiling is performed.
3278     // However, when LDV profiling goes on, we need to linearly scan
3279     // small object pool, where raise_closure is stored, so we should
3280     // use MIN_UPD_SIZE.
3281     //
3282     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3283     //                                 sizeofW(StgClosure)+1);
3284     //
3285
3286     //
3287     // Walk up the stack, looking for the catch frame.  On the way,
3288     // we update any closures pointed to from update frames with the
3289     // raise closure that we just built.
3290     //
3291     p = tso->sp;
3292     while(1) {
3293         info = get_ret_itbl((StgClosure *)p);
3294         next = p + stack_frame_sizeW((StgClosure *)p);
3295         switch (info->i.type) {
3296             
3297         case UPDATE_FRAME:
3298             // Only create raise_closure if we need to.
3299             if (raise_closure == NULL) {
3300                 raise_closure = 
3301                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3302                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3303                 raise_closure->payload[0] = exception;
3304             }
3305             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3306             p = next;
3307             continue;
3308
3309         case ATOMICALLY_FRAME:
3310             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3311             tso->sp = p;
3312             return ATOMICALLY_FRAME;
3313             
3314         case CATCH_FRAME:
3315             tso->sp = p;
3316             return CATCH_FRAME;
3317
3318         case CATCH_STM_FRAME:
3319             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3320             tso->sp = p;
3321             return CATCH_STM_FRAME;
3322             
3323         case STOP_FRAME:
3324             tso->sp = p;
3325             return STOP_FRAME;
3326
3327         case CATCH_RETRY_FRAME:
3328         default:
3329             p = next; 
3330             continue;
3331         }
3332     }
3333 }
3334
3335
3336 /* -----------------------------------------------------------------------------
3337    findRetryFrameHelper
3338
3339    This function is called by the retry# primitive.  It traverses the stack
3340    leaving tso->sp referring to the frame which should handle the retry.  
3341
3342    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3343    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3344
3345    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3346    despite the similar implementation.
3347
3348    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3349    not be created within memory transactions.
3350    -------------------------------------------------------------------------- */
3351
3352 StgWord
3353 findRetryFrameHelper (StgTSO *tso)
3354 {
3355   StgPtr           p, next;
3356   StgRetInfoTable *info;
3357
3358   p = tso -> sp;
3359   while (1) {
3360     info = get_ret_itbl((StgClosure *)p);
3361     next = p + stack_frame_sizeW((StgClosure *)p);
3362     switch (info->i.type) {
3363       
3364     case ATOMICALLY_FRAME:
3365       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3366       tso->sp = p;
3367       return ATOMICALLY_FRAME;
3368       
3369     case CATCH_RETRY_FRAME:
3370       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3371       tso->sp = p;
3372       return CATCH_RETRY_FRAME;
3373       
3374     case CATCH_STM_FRAME:
3375     default:
3376       ASSERT(info->i.type != CATCH_FRAME);
3377       ASSERT(info->i.type != STOP_FRAME);
3378       p = next; 
3379       continue;
3380     }
3381   }
3382 }
3383
3384 /* -----------------------------------------------------------------------------
3385    resurrectThreads is called after garbage collection on the list of
3386    threads found to be garbage.  Each of these threads will be woken
3387    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3388    on an MVar, or NonTermination if the thread was blocked on a Black
3389    Hole.
3390
3391    Locks: sched_mutex isn't held upon entry nor exit.
3392    -------------------------------------------------------------------------- */
3393
3394 void
3395 resurrectThreads( StgTSO *threads )
3396 {
3397   StgTSO *tso, *next;
3398
3399   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3400     next = tso->global_link;
3401     tso->global_link = all_threads;
3402     all_threads = tso;
3403     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3404
3405     switch (tso->why_blocked) {
3406     case BlockedOnMVar:
3407     case BlockedOnException:
3408       /* Called by GC - sched_mutex lock is currently held. */
3409       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3410       break;
3411     case BlockedOnBlackHole:
3412       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3413       break;
3414     case BlockedOnSTM:
3415       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3416       break;
3417     case NotBlocked:
3418       /* This might happen if the thread was blocked on a black hole
3419        * belonging to a thread that we've just woken up (raiseAsync
3420        * can wake up threads, remember...).
3421        */
3422       continue;
3423     default:
3424       barf("resurrectThreads: thread blocked in a strange way");
3425     }
3426   }
3427 }
3428
3429 /* ----------------------------------------------------------------------------
3430  * Debugging: why is a thread blocked
3431  * [Also provides useful information when debugging threaded programs
3432  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3433    ------------------------------------------------------------------------- */
3434
3435 static
3436 void
3437 printThreadBlockage(StgTSO *tso)
3438 {
3439   switch (tso->why_blocked) {
3440   case BlockedOnRead:
3441     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3442     break;
3443   case BlockedOnWrite:
3444     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3445     break;
3446 #if defined(mingw32_HOST_OS)
3447     case BlockedOnDoProc:
3448     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3449     break;
3450 #endif
3451   case BlockedOnDelay:
3452     debugBelch("is blocked until %d", tso->block_info.target);
3453     break;
3454   case BlockedOnMVar:
3455     debugBelch("is blocked on an MVar");
3456     break;
3457   case BlockedOnException:
3458     debugBelch("is blocked on delivering an exception to thread %d",
3459             tso->block_info.tso->id);
3460     break;
3461   case BlockedOnBlackHole:
3462     debugBelch("is blocked on a black hole");
3463     break;
3464   case NotBlocked:
3465     debugBelch("is not blocked");
3466     break;
3467 #if defined(PAR)
3468   case BlockedOnGA:
3469     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3470             tso->block_info.closure, info_type(tso->block_info.closure));
3471     break;
3472   case BlockedOnGA_NoSend:
3473     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3474             tso->block_info.closure, info_type(tso->block_info.closure));
3475     break;
3476 #endif
3477   case BlockedOnCCall:
3478     debugBelch("is blocked on an external call");
3479     break;
3480   case BlockedOnCCall_NoUnblockExc:
3481     debugBelch("is blocked on an external call (exceptions were already blocked)");
3482     break;
3483   case BlockedOnSTM:
3484     debugBelch("is blocked on an STM operation");
3485     break;
3486   default:
3487     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3488          tso->why_blocked, tso->id, tso);
3489   }
3490 }
3491
3492 static
3493 void
3494 printThreadStatus(StgTSO *tso)
3495 {
3496   switch (tso->what_next) {
3497   case ThreadKilled:
3498     debugBelch("has been killed");
3499     break;
3500   case ThreadComplete:
3501     debugBelch("has completed");
3502     break;
3503   default:
3504     printThreadBlockage(tso);
3505   }
3506 }
3507
3508 void
3509 printAllThreads(void)
3510 {
3511   StgTSO *t;
3512
3513 # if defined(GRAN)
3514   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3515   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3516                        time_string, rtsFalse/*no commas!*/);
3517
3518   debugBelch("all threads at [%s]:\n", time_string);
3519 # elif defined(PAR)
3520   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3521   ullong_format_string(CURRENT_TIME,
3522                        time_string, rtsFalse/*no commas!*/);
3523
3524   debugBelch("all threads at [%s]:\n", time_string);
3525 # else
3526   debugBelch("all threads:\n");
3527 # endif
3528
3529   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3530     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3531 #if defined(DEBUG)
3532     {
3533       void *label = lookupThreadLabel(t->id);
3534       if (label) debugBelch("[\"%s\"] ",(char *)label);
3535     }
3536 #endif
3537     printThreadStatus(t);
3538     debugBelch("\n");
3539   }
3540 }
3541     
3542 #ifdef DEBUG
3543
3544 /* 
3545    Print a whole blocking queue attached to node (debugging only).
3546 */
3547 # if defined(PAR)
3548 void 
3549 print_bq (StgClosure *node)
3550 {
3551   StgBlockingQueueElement *bqe;
3552   StgTSO *tso;
3553   rtsBool end;
3554
3555   debugBelch("## BQ of closure %p (%s): ",
3556           node, info_type(node));
3557
3558   /* should cover all closures that may have a blocking queue */
3559   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3560          get_itbl(node)->type == FETCH_ME_BQ ||
3561          get_itbl(node)->type == RBH ||
3562          get_itbl(node)->type == MVAR);
3563     
3564   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3565
3566   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3567 }
3568
3569 /* 
3570    Print a whole blocking queue starting with the element bqe.
3571 */
3572 void 
3573 print_bqe (StgBlockingQueueElement *bqe)
3574 {
3575   rtsBool end;
3576
3577   /* 
3578      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3579   */
3580   for (end = (bqe==END_BQ_QUEUE);
3581        !end; // iterate until bqe points to a CONSTR
3582        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3583        bqe = end ? END_BQ_QUEUE : bqe->link) {
3584     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3585     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3586     /* types of closures that may appear in a blocking queue */
3587     ASSERT(get_itbl(bqe)->type == TSO ||           
3588            get_itbl(bqe)->type == BLOCKED_FETCH || 
3589            get_itbl(bqe)->type == CONSTR); 
3590     /* only BQs of an RBH end with an RBH_Save closure */
3591     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3592
3593     switch (get_itbl(bqe)->type) {
3594     case TSO:
3595       debugBelch(" TSO %u (%x),",
3596               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3597       break;
3598     case BLOCKED_FETCH:
3599       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3600               ((StgBlockedFetch *)bqe)->node, 
3601               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3602               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3603               ((StgBlockedFetch *)bqe)->ga.weight);
3604       break;
3605     case CONSTR:
3606       debugBelch(" %s (IP %p),",
3607               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3608                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3609                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3610                "RBH_Save_?"), get_itbl(bqe));
3611       break;
3612     default:
3613       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3614            info_type((StgClosure *)bqe)); // , node, info_type(node));
3615       break;
3616     }
3617   } /* for */
3618   debugBelch("\n");
3619 }
3620 # elif defined(GRAN)
3621 void 
3622 print_bq (StgClosure *node)
3623 {
3624   StgBlockingQueueElement *bqe;
3625   PEs node_loc, tso_loc;
3626   rtsBool end;
3627
3628   /* should cover all closures that may have a blocking queue */
3629   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3630          get_itbl(node)->type == FETCH_ME_BQ ||
3631          get_itbl(node)->type == RBH);
3632     
3633   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3634   node_loc = where_is(node);
3635
3636   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3637           node, info_type(node), node_loc);
3638
3639   /* 
3640      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3641   */
3642   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3643        !end; // iterate until bqe points to a CONSTR
3644        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3645     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3646     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3647     /* types of closures that may appear in a blocking queue */
3648     ASSERT(get_itbl(bqe)->type == TSO ||           
3649            get_itbl(bqe)->type == CONSTR); 
3650     /* only BQs of an RBH end with an RBH_Save closure */
3651     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3652
3653     tso_loc = where_is((StgClosure *)bqe);
3654     switch (get_itbl(bqe)->type) {
3655     case TSO:
3656       debugBelch(" TSO %d (%p) on [PE %d],",
3657               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3658       break;
3659     case CONSTR:
3660       debugBelch(" %s (IP %p),",
3661               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3662                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3663                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3664                "RBH_Save_?"), get_itbl(bqe));
3665       break;
3666     default:
3667       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3668            info_type((StgClosure *)bqe), node, info_type(node));
3669       break;
3670     }
3671   } /* for */
3672   debugBelch("\n");
3673 }
3674 #else
3675 /* 
3676    Nice and easy: only TSOs on the blocking queue
3677 */
3678 void 
3679 print_bq (StgClosure *node)
3680 {
3681   StgTSO *tso;
3682
3683   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3684   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3685        tso != END_TSO_QUEUE; 
3686        tso=tso->link) {
3687     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3688     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3689     debugBelch(" TSO %d (%p),", tso->id, tso);
3690   }
3691   debugBelch("\n");
3692 }
3693 # endif
3694
3695 #if defined(PAR)
3696 static nat
3697 run_queue_len(void)
3698 {
3699   nat i;
3700   StgTSO *tso;
3701
3702   for (i=0, tso=run_queue_hd; 
3703        tso != END_TSO_QUEUE;
3704        i++, tso=tso->link)
3705     /* nothing */
3706
3707   return i;
3708 }
3709 #endif
3710
3711 void
3712 sched_belch(char *s, ...)
3713 {
3714   va_list ap;
3715   va_start(ap,s);
3716 #ifdef RTS_SUPPORTS_THREADS
3717   debugBelch("sched (task %p): ", osThreadId());
3718 #elif defined(PAR)
3719   debugBelch("== ");
3720 #else
3721   debugBelch("sched: ");
3722 #endif
3723   vdebugBelch(s, ap);
3724   debugBelch("\n");
3725   va_end(ap);
3726 }
3727
3728 #endif /* DEBUG */