[project @ 2006-01-18 10:31:50 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS and (inc. SMP) runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(SMP)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
231
232 static void unblockThread(Capability *cap, StgTSO *tso);
233 static rtsBool checkBlackHoles(Capability *cap);
234 static void AllRoots(evac_fn evac);
235
236 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
237
238 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
239                         rtsBool stop_at_atomically, StgPtr stop_here);
240
241 static void deleteThread (Capability *cap, StgTSO *tso);
242 static void deleteRunQueue (Capability *cap);
243
244 #ifdef DEBUG
245 static void printThreadBlockage(StgTSO *tso);
246 static void printThreadStatus(StgTSO *tso);
247 void printThreadQueue(StgTSO *tso);
248 #endif
249
250 #if defined(PARALLEL_HASKELL)
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);  
253 #endif
254
255 #ifdef DEBUG
256 static char *whatNext_strs[] = {
257   "(unknown)",
258   "ThreadRunGHC",
259   "ThreadInterpret",
260   "ThreadKilled",
261   "ThreadRelocated",
262   "ThreadComplete"
263 };
264 #endif
265
266 /* -----------------------------------------------------------------------------
267  * Putting a thread on the run queue: different scheduling policies
268  * -------------------------------------------------------------------------- */
269
270 STATIC_INLINE void
271 addToRunQueue( Capability *cap, StgTSO *t )
272 {
273 #if defined(PARALLEL_HASKELL)
274     if (RtsFlags.ParFlags.doFairScheduling) { 
275         // this does round-robin scheduling; good for concurrency
276         appendToRunQueue(cap,t);
277     } else {
278         // this does unfair scheduling; good for parallelism
279         pushOnRunQueue(cap,t);
280     }
281 #else
282     // this does round-robin scheduling; good for concurrency
283     appendToRunQueue(cap,t);
284 #endif
285 }
286
287 /* ---------------------------------------------------------------------------
288    Main scheduling loop.
289
290    We use round-robin scheduling, each thread returning to the
291    scheduler loop when one of these conditions is detected:
292
293       * out of heap space
294       * timer expires (thread yields)
295       * thread blocks
296       * thread ends
297       * stack overflow
298
299    GRAN version:
300      In a GranSim setup this loop iterates over the global event queue.
301      This revolves around the global event queue, which determines what 
302      to do next. Therefore, it's more complicated than either the 
303      concurrent or the parallel (GUM) setup.
304
305    GUM version:
306      GUM iterates over incoming messages.
307      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
308      and sends out a fish whenever it has nothing to do; in-between
309      doing the actual reductions (shared code below) it processes the
310      incoming messages and deals with delayed operations 
311      (see PendingFetches).
312      This is not the ugliest code you could imagine, but it's bloody close.
313
314    ------------------------------------------------------------------------ */
315
316 static Capability *
317 schedule (Capability *initialCapability, Task *task)
318 {
319   StgTSO *t;
320   Capability *cap;
321   StgThreadReturnCode ret;
322 #if defined(GRAN)
323   rtsEvent *event;
324 #elif defined(PARALLEL_HASKELL)
325   StgTSO *tso;
326   GlobalTaskId pe;
327   rtsBool receivedFinish = rtsFalse;
328 # if defined(DEBUG)
329   nat tp_size, sp_size; // stats only
330 # endif
331 #endif
332   nat prev_what_next;
333   rtsBool ready_to_gc;
334 #if defined(THREADED_RTS)
335   rtsBool first = rtsTrue;
336 #endif
337   
338   cap = initialCapability;
339
340   // Pre-condition: this task owns initialCapability.
341   // The sched_mutex is *NOT* held
342   // NB. on return, we still hold a capability.
343
344   IF_DEBUG(scheduler,
345            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
346                        task, initialCapability);
347       );
348
349   schedulePreLoop();
350
351   // -----------------------------------------------------------
352   // Scheduler loop starts here:
353
354 #if defined(PARALLEL_HASKELL)
355 #define TERMINATION_CONDITION        (!receivedFinish)
356 #elif defined(GRAN)
357 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
358 #else
359 #define TERMINATION_CONDITION        rtsTrue
360 #endif
361
362   while (TERMINATION_CONDITION) {
363
364 #if defined(GRAN)
365       /* Choose the processor with the next event */
366       CurrentProc = event->proc;
367       CurrentTSO = event->tso;
368 #endif
369
370 #if defined(THREADED_RTS)
371       if (first) {
372           // don't yield the first time, we want a chance to run this
373           // thread for a bit, even if there are others banging at the
374           // door.
375           first = rtsFalse;
376           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
377       } else {
378           // Yield the capability to higher-priority tasks if necessary.
379           yieldCapability(&cap, task);
380       }
381 #endif
382       
383 #ifdef SMP
384       schedulePushWork(cap,task);
385 #endif
386
387     // Check whether we have re-entered the RTS from Haskell without
388     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
389     // call).
390     if (cap->in_haskell) {
391           errorBelch("schedule: re-entered unsafely.\n"
392                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
393           stg_exit(EXIT_FAILURE);
394     }
395
396     //
397     // Test for interruption.  If interrupted==rtsTrue, then either
398     // we received a keyboard interrupt (^C), or the scheduler is
399     // trying to shut down all the tasks (shutting_down_scheduler) in
400     // the threaded RTS.
401     //
402     if (interrupted) {
403         deleteRunQueue(cap);
404 #if defined(SMP)
405         discardSparksCap(cap);
406 #endif
407         if (shutting_down_scheduler) {
408             IF_DEBUG(scheduler, sched_belch("shutting down"));
409             // If we are a worker, just exit.  If we're a bound thread
410             // then we will exit below when we've removed our TSO from
411             // the run queue.
412             if (task->tso == NULL && emptyRunQueue(cap)) {
413                 return cap;
414             }
415         } else {
416             IF_DEBUG(scheduler, sched_belch("interrupted"));
417         }
418     }
419
420 #if defined(SMP)
421     // If the run queue is empty, take a spark and turn it into a thread.
422     {
423         if (emptyRunQueue(cap)) {
424             StgClosure *spark;
425             spark = findSpark(cap);
426             if (spark != NULL) {
427                 IF_DEBUG(scheduler,
428                          sched_belch("turning spark of closure %p into a thread",
429                                      (StgClosure *)spark));
430                 createSparkThread(cap,spark);     
431             }
432         }
433     }
434 #endif // SMP
435
436     scheduleStartSignalHandlers(cap);
437
438     // Only check the black holes here if we've nothing else to do.
439     // During normal execution, the black hole list only gets checked
440     // at GC time, to avoid repeatedly traversing this possibly long
441     // list each time around the scheduler.
442     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
443
444     scheduleCheckBlockedThreads(cap);
445
446     scheduleDetectDeadlock(cap,task);
447
448     // Normally, the only way we can get here with no threads to
449     // run is if a keyboard interrupt received during 
450     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
451     // Additionally, it is not fatal for the
452     // threaded RTS to reach here with no threads to run.
453     //
454     // win32: might be here due to awaitEvent() being abandoned
455     // as a result of a console event having been delivered.
456     if ( emptyRunQueue(cap) ) {
457 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
458         ASSERT(interrupted);
459 #endif
460         continue; // nothing to do
461     }
462
463 #if defined(PARALLEL_HASKELL)
464     scheduleSendPendingMessages();
465     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
466         continue;
467
468 #if defined(SPARKS)
469     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
470 #endif
471
472     /* If we still have no work we need to send a FISH to get a spark
473        from another PE */
474     if (emptyRunQueue(cap)) {
475         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
476         ASSERT(rtsFalse); // should not happen at the moment
477     }
478     // from here: non-empty run queue.
479     //  TODO: merge above case with this, only one call processMessages() !
480     if (PacketsWaiting()) {  /* process incoming messages, if
481                                 any pending...  only in else
482                                 because getRemoteWork waits for
483                                 messages as well */
484         receivedFinish = processMessages();
485     }
486 #endif
487
488 #if defined(GRAN)
489     scheduleProcessEvent(event);
490 #endif
491
492     // 
493     // Get a thread to run
494     //
495     t = popRunQueue(cap);
496
497 #if defined(GRAN) || defined(PAR)
498     scheduleGranParReport(); // some kind of debuging output
499 #else
500     // Sanity check the thread we're about to run.  This can be
501     // expensive if there is lots of thread switching going on...
502     IF_DEBUG(sanity,checkTSO(t));
503 #endif
504
505 #if defined(THREADED_RTS)
506     // Check whether we can run this thread in the current task.
507     // If not, we have to pass our capability to the right task.
508     {
509         Task *bound = t->bound;
510       
511         if (bound) {
512             if (bound == task) {
513                 IF_DEBUG(scheduler,
514                          sched_belch("### Running thread %d in bound thread",
515                                      t->id));
516                 // yes, the Haskell thread is bound to the current native thread
517             } else {
518                 IF_DEBUG(scheduler,
519                          sched_belch("### thread %d bound to another OS thread",
520                                      t->id));
521                 // no, bound to a different Haskell thread: pass to that thread
522                 pushOnRunQueue(cap,t);
523                 continue;
524             }
525         } else {
526             // The thread we want to run is unbound.
527             if (task->tso) { 
528                 IF_DEBUG(scheduler,
529                          sched_belch("### this OS thread cannot run thread %d", t->id));
530                 // no, the current native thread is bound to a different
531                 // Haskell thread, so pass it to any worker thread
532                 pushOnRunQueue(cap,t);
533                 continue; 
534             }
535         }
536     }
537 #endif
538
539     cap->r.rCurrentTSO = t;
540     
541     /* context switches are initiated by the timer signal, unless
542      * the user specified "context switch as often as possible", with
543      * +RTS -C0
544      */
545     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
546         && !emptyThreadQueues(cap)) {
547         context_switch = 1;
548     }
549          
550 run_thread:
551
552     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
553                               (long)t->id, whatNext_strs[t->what_next]));
554
555 #if defined(PROFILING)
556     startHeapProfTimer();
557 #endif
558
559     // ----------------------------------------------------------------------
560     // Run the current thread 
561
562     prev_what_next = t->what_next;
563
564     errno = t->saved_errno;
565     cap->in_haskell = rtsTrue;
566
567     recent_activity = ACTIVITY_YES;
568
569     switch (prev_what_next) {
570         
571     case ThreadKilled:
572     case ThreadComplete:
573         /* Thread already finished, return to scheduler. */
574         ret = ThreadFinished;
575         break;
576         
577     case ThreadRunGHC:
578     {
579         StgRegTable *r;
580         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
581         cap = regTableToCapability(r);
582         ret = r->rRet;
583         break;
584     }
585     
586     case ThreadInterpret:
587         cap = interpretBCO(cap);
588         ret = cap->r.rRet;
589         break;
590         
591     default:
592         barf("schedule: invalid what_next field");
593     }
594
595     cap->in_haskell = rtsFalse;
596
597     // The TSO might have moved, eg. if it re-entered the RTS and a GC
598     // happened.  So find the new location:
599     t = cap->r.rCurrentTSO;
600
601     // We have run some Haskell code: there might be blackhole-blocked
602     // threads to wake up now.
603     // Lock-free test here should be ok, we're just setting a flag.
604     if ( blackhole_queue != END_TSO_QUEUE ) {
605         blackholes_need_checking = rtsTrue;
606     }
607     
608     // And save the current errno in this thread.
609     // XXX: possibly bogus for SMP because this thread might already
610     // be running again, see code below.
611     t->saved_errno = errno;
612
613 #ifdef SMP
614     // If ret is ThreadBlocked, and this Task is bound to the TSO that
615     // blocked, we are in limbo - the TSO is now owned by whatever it
616     // is blocked on, and may in fact already have been woken up,
617     // perhaps even on a different Capability.  It may be the case
618     // that task->cap != cap.  We better yield this Capability
619     // immediately and return to normaility.
620     if (ret == ThreadBlocked) {
621         IF_DEBUG(scheduler,
622                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
623                              t->id, whatNext_strs[t->what_next]));
624         continue;
625     }
626 #endif
627
628     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
629
630     // ----------------------------------------------------------------------
631     
632     // Costs for the scheduler are assigned to CCS_SYSTEM
633 #if defined(PROFILING)
634     stopHeapProfTimer();
635     CCCS = CCS_SYSTEM;
636 #endif
637     
638 #if defined(THREADED_RTS)
639     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
640 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
641     IF_DEBUG(scheduler,debugBelch("sched: "););
642 #endif
643     
644     schedulePostRunThread();
645
646     ready_to_gc = rtsFalse;
647
648     switch (ret) {
649     case HeapOverflow:
650         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
651         break;
652
653     case StackOverflow:
654         scheduleHandleStackOverflow(cap,task,t);
655         break;
656
657     case ThreadYielding:
658         if (scheduleHandleYield(cap, t, prev_what_next)) {
659             // shortcut for switching between compiler/interpreter:
660             goto run_thread; 
661         }
662         break;
663
664     case ThreadBlocked:
665         scheduleHandleThreadBlocked(t);
666         break;
667
668     case ThreadFinished:
669         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
670         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
671         break;
672
673     default:
674       barf("schedule: invalid thread return code %d", (int)ret);
675     }
676
677     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
678     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
679   } /* end of while() */
680
681   IF_PAR_DEBUG(verbose,
682                debugBelch("== Leaving schedule() after having received Finish\n"));
683 }
684
685 /* ----------------------------------------------------------------------------
686  * Setting up the scheduler loop
687  * ------------------------------------------------------------------------- */
688
689 static void
690 schedulePreLoop(void)
691 {
692 #if defined(GRAN) 
693     /* set up first event to get things going */
694     /* ToDo: assign costs for system setup and init MainTSO ! */
695     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
696               ContinueThread, 
697               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
698     
699     IF_DEBUG(gran,
700              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
701                         CurrentTSO);
702              G_TSO(CurrentTSO, 5));
703     
704     if (RtsFlags.GranFlags.Light) {
705         /* Save current time; GranSim Light only */
706         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
707     }      
708 #endif
709 }
710
711 /* -----------------------------------------------------------------------------
712  * schedulePushWork()
713  *
714  * Push work to other Capabilities if we have some.
715  * -------------------------------------------------------------------------- */
716
717 #ifdef SMP
718 static void
719 schedulePushWork(Capability *cap USED_IF_SMP, 
720                  Task *task      USED_IF_SMP)
721 {
722     Capability *free_caps[n_capabilities], *cap0;
723     nat i, n_free_caps;
724
725     // Check whether we have more threads on our run queue, or sparks
726     // in our pool, that we could hand to another Capability.
727     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
728         && sparkPoolSizeCap(cap) < 2) {
729         return;
730     }
731
732     // First grab as many free Capabilities as we can.
733     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
734         cap0 = &capabilities[i];
735         if (cap != cap0 && tryGrabCapability(cap0,task)) {
736             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
737                 // it already has some work, we just grabbed it at 
738                 // the wrong moment.  Or maybe it's deadlocked!
739                 releaseCapability(cap0);
740             } else {
741                 free_caps[n_free_caps++] = cap0;
742             }
743         }
744     }
745
746     // we now have n_free_caps free capabilities stashed in
747     // free_caps[].  Share our run queue equally with them.  This is
748     // probably the simplest thing we could do; improvements we might
749     // want to do include:
750     //
751     //   - giving high priority to moving relatively new threads, on 
752     //     the gournds that they haven't had time to build up a
753     //     working set in the cache on this CPU/Capability.
754     //
755     //   - giving low priority to moving long-lived threads
756
757     if (n_free_caps > 0) {
758         StgTSO *prev, *t, *next;
759         rtsBool pushed_to_all;
760
761         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
762
763         i = 0;
764         pushed_to_all = rtsFalse;
765
766         if (cap->run_queue_hd != END_TSO_QUEUE) {
767             prev = cap->run_queue_hd;
768             t = prev->link;
769             prev->link = END_TSO_QUEUE;
770             for (; t != END_TSO_QUEUE; t = next) {
771                 next = t->link;
772                 t->link = END_TSO_QUEUE;
773                 if (t->what_next == ThreadRelocated
774                     || t->bound == task) { // don't move my bound thread
775                     prev->link = t;
776                     prev = t;
777                 } else if (i == n_free_caps) {
778                     pushed_to_all = rtsTrue;
779                     i = 0;
780                     // keep one for us
781                     prev->link = t;
782                     prev = t;
783                 } else {
784                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
785                     appendToRunQueue(free_caps[i],t);
786                     if (t->bound) { t->bound->cap = free_caps[i]; }
787                     i++;
788                 }
789             }
790             cap->run_queue_tl = prev;
791         }
792
793         // If there are some free capabilities that we didn't push any
794         // threads to, then try to push a spark to each one.
795         if (!pushed_to_all) {
796             StgClosure *spark;
797             // i is the next free capability to push to
798             for (; i < n_free_caps; i++) {
799                 if (emptySparkPoolCap(free_caps[i])) {
800                     spark = findSpark(cap);
801                     if (spark != NULL) {
802                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
803                         newSpark(&(free_caps[i]->r), spark);
804                     }
805                 }
806             }
807         }
808
809         // release the capabilities
810         for (i = 0; i < n_free_caps; i++) {
811             task->cap = free_caps[i];
812             releaseCapability(free_caps[i]);
813         }
814     }
815     task->cap = cap; // reset to point to our Capability.
816 }
817 #endif
818
819 /* ----------------------------------------------------------------------------
820  * Start any pending signal handlers
821  * ------------------------------------------------------------------------- */
822
823 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
824 static void
825 scheduleStartSignalHandlers(Capability *cap)
826 {
827     if (signals_pending()) { // safe outside the lock
828         startSignalHandlers(cap);
829     }
830 }
831 #else
832 static void
833 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
834 {
835 }
836 #endif
837
838 /* ----------------------------------------------------------------------------
839  * Check for blocked threads that can be woken up.
840  * ------------------------------------------------------------------------- */
841
842 static void
843 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
844 {
845 #if !defined(THREADED_RTS)
846     //
847     // Check whether any waiting threads need to be woken up.  If the
848     // run queue is empty, and there are no other tasks running, we
849     // can wait indefinitely for something to happen.
850     //
851     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
852     {
853         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
854     }
855 #endif
856 }
857
858
859 /* ----------------------------------------------------------------------------
860  * Check for threads blocked on BLACKHOLEs that can be woken up
861  * ------------------------------------------------------------------------- */
862 static void
863 scheduleCheckBlackHoles (Capability *cap)
864 {
865     if ( blackholes_need_checking ) // check without the lock first
866     {
867         ACQUIRE_LOCK(&sched_mutex);
868         if ( blackholes_need_checking ) {
869             checkBlackHoles(cap);
870             blackholes_need_checking = rtsFalse;
871         }
872         RELEASE_LOCK(&sched_mutex);
873     }
874 }
875
876 /* ----------------------------------------------------------------------------
877  * Detect deadlock conditions and attempt to resolve them.
878  * ------------------------------------------------------------------------- */
879
880 static void
881 scheduleDetectDeadlock (Capability *cap, Task *task)
882 {
883
884 #if defined(PARALLEL_HASKELL)
885     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
886     return;
887 #endif
888
889     /* 
890      * Detect deadlock: when we have no threads to run, there are no
891      * threads blocked, waiting for I/O, or sleeping, and all the
892      * other tasks are waiting for work, we must have a deadlock of
893      * some description.
894      */
895     if ( emptyThreadQueues(cap) )
896     {
897 #if defined(THREADED_RTS)
898         /* 
899          * In the threaded RTS, we only check for deadlock if there
900          * has been no activity in a complete timeslice.  This means
901          * we won't eagerly start a full GC just because we don't have
902          * any threads to run currently.
903          */
904         if (recent_activity != ACTIVITY_INACTIVE) return;
905 #endif
906
907         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
908
909         // Garbage collection can release some new threads due to
910         // either (a) finalizers or (b) threads resurrected because
911         // they are unreachable and will therefore be sent an
912         // exception.  Any threads thus released will be immediately
913         // runnable.
914         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
915         recent_activity = ACTIVITY_DONE_GC;
916         
917         if ( !emptyRunQueue(cap) ) return;
918
919 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
920         /* If we have user-installed signal handlers, then wait
921          * for signals to arrive rather then bombing out with a
922          * deadlock.
923          */
924         if ( anyUserHandlers() ) {
925             IF_DEBUG(scheduler, 
926                      sched_belch("still deadlocked, waiting for signals..."));
927
928             awaitUserSignals();
929
930             if (signals_pending()) {
931                 startSignalHandlers(cap);
932             }
933
934             // either we have threads to run, or we were interrupted:
935             ASSERT(!emptyRunQueue(cap) || interrupted);
936         }
937 #endif
938
939 #if !defined(THREADED_RTS)
940         /* Probably a real deadlock.  Send the current main thread the
941          * Deadlock exception.
942          */
943         if (task->tso) {
944             switch (task->tso->why_blocked) {
945             case BlockedOnSTM:
946             case BlockedOnBlackHole:
947             case BlockedOnException:
948             case BlockedOnMVar:
949                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
950                 return;
951             default:
952                 barf("deadlock: main thread blocked in a strange way");
953             }
954         }
955         return;
956 #endif
957     }
958 }
959
960 /* ----------------------------------------------------------------------------
961  * Process an event (GRAN only)
962  * ------------------------------------------------------------------------- */
963
964 #if defined(GRAN)
965 static StgTSO *
966 scheduleProcessEvent(rtsEvent *event)
967 {
968     StgTSO *t;
969
970     if (RtsFlags.GranFlags.Light)
971       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
972
973     /* adjust time based on time-stamp */
974     if (event->time > CurrentTime[CurrentProc] &&
975         event->evttype != ContinueThread)
976       CurrentTime[CurrentProc] = event->time;
977     
978     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
979     if (!RtsFlags.GranFlags.Light)
980       handleIdlePEs();
981
982     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
983
984     /* main event dispatcher in GranSim */
985     switch (event->evttype) {
986       /* Should just be continuing execution */
987     case ContinueThread:
988       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
989       /* ToDo: check assertion
990       ASSERT(run_queue_hd != (StgTSO*)NULL &&
991              run_queue_hd != END_TSO_QUEUE);
992       */
993       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
994       if (!RtsFlags.GranFlags.DoAsyncFetch &&
995           procStatus[CurrentProc]==Fetching) {
996         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
997               CurrentTSO->id, CurrentTSO, CurrentProc);
998         goto next_thread;
999       } 
1000       /* Ignore ContinueThreads for completed threads */
1001       if (CurrentTSO->what_next == ThreadComplete) {
1002         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1003               CurrentTSO->id, CurrentTSO, CurrentProc);
1004         goto next_thread;
1005       } 
1006       /* Ignore ContinueThreads for threads that are being migrated */
1007       if (PROCS(CurrentTSO)==Nowhere) { 
1008         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1009               CurrentTSO->id, CurrentTSO, CurrentProc);
1010         goto next_thread;
1011       }
1012       /* The thread should be at the beginning of the run queue */
1013       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1014         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1015               CurrentTSO->id, CurrentTSO, CurrentProc);
1016         break; // run the thread anyway
1017       }
1018       /*
1019       new_event(proc, proc, CurrentTime[proc],
1020                 FindWork,
1021                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1022       goto next_thread; 
1023       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1024       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1025
1026     case FetchNode:
1027       do_the_fetchnode(event);
1028       goto next_thread;             /* handle next event in event queue  */
1029       
1030     case GlobalBlock:
1031       do_the_globalblock(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case FetchReply:
1035       do_the_fetchreply(event);
1036       goto next_thread;             /* handle next event in event queue  */
1037       
1038     case UnblockThread:   /* Move from the blocked queue to the tail of */
1039       do_the_unblock(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case ResumeThread:  /* Move from the blocked queue to the tail of */
1043       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1044       event->tso->gran.blocktime += 
1045         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1046       do_the_startthread(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case StartThread:
1050       do_the_startthread(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     case MoveThread:
1054       do_the_movethread(event);
1055       goto next_thread;             /* handle next event in event queue  */
1056       
1057     case MoveSpark:
1058       do_the_movespark(event);
1059       goto next_thread;             /* handle next event in event queue  */
1060       
1061     case FindWork:
1062       do_the_findwork(event);
1063       goto next_thread;             /* handle next event in event queue  */
1064       
1065     default:
1066       barf("Illegal event type %u\n", event->evttype);
1067     }  /* switch */
1068     
1069     /* This point was scheduler_loop in the old RTS */
1070
1071     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1072
1073     TimeOfLastEvent = CurrentTime[CurrentProc];
1074     TimeOfNextEvent = get_time_of_next_event();
1075     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1076     // CurrentTSO = ThreadQueueHd;
1077
1078     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1079                          TimeOfNextEvent));
1080
1081     if (RtsFlags.GranFlags.Light) 
1082       GranSimLight_leave_system(event, &ActiveTSO); 
1083
1084     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1085
1086     IF_DEBUG(gran, 
1087              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1088
1089     /* in a GranSim setup the TSO stays on the run queue */
1090     t = CurrentTSO;
1091     /* Take a thread from the run queue. */
1092     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1093
1094     IF_DEBUG(gran, 
1095              debugBelch("GRAN: About to run current thread, which is\n");
1096              G_TSO(t,5));
1097
1098     context_switch = 0; // turned on via GranYield, checking events and time slice
1099
1100     IF_DEBUG(gran, 
1101              DumpGranEvent(GR_SCHEDULE, t));
1102
1103     procStatus[CurrentProc] = Busy;
1104 }
1105 #endif // GRAN
1106
1107 /* ----------------------------------------------------------------------------
1108  * Send pending messages (PARALLEL_HASKELL only)
1109  * ------------------------------------------------------------------------- */
1110
1111 #if defined(PARALLEL_HASKELL)
1112 static StgTSO *
1113 scheduleSendPendingMessages(void)
1114 {
1115     StgSparkPool *pool;
1116     rtsSpark spark;
1117     StgTSO *t;
1118
1119 # if defined(PAR) // global Mem.Mgmt., omit for now
1120     if (PendingFetches != END_BF_QUEUE) {
1121         processFetches();
1122     }
1123 # endif
1124     
1125     if (RtsFlags.ParFlags.BufferTime) {
1126         // if we use message buffering, we must send away all message
1127         // packets which have become too old...
1128         sendOldBuffers(); 
1129     }
1130 }
1131 #endif
1132
1133 /* ----------------------------------------------------------------------------
1134  * Activate spark threads (PARALLEL_HASKELL only)
1135  * ------------------------------------------------------------------------- */
1136
1137 #if defined(PARALLEL_HASKELL)
1138 static void
1139 scheduleActivateSpark(void)
1140 {
1141 #if defined(SPARKS)
1142   ASSERT(emptyRunQueue());
1143 /* We get here if the run queue is empty and want some work.
1144    We try to turn a spark into a thread, and add it to the run queue,
1145    from where it will be picked up in the next iteration of the scheduler
1146    loop.
1147 */
1148
1149       /* :-[  no local threads => look out for local sparks */
1150       /* the spark pool for the current PE */
1151       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1152       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1153           pool->hd < pool->tl) {
1154         /* 
1155          * ToDo: add GC code check that we really have enough heap afterwards!!
1156          * Old comment:
1157          * If we're here (no runnable threads) and we have pending
1158          * sparks, we must have a space problem.  Get enough space
1159          * to turn one of those pending sparks into a
1160          * thread... 
1161          */
1162
1163         spark = findSpark(rtsFalse);            /* get a spark */
1164         if (spark != (rtsSpark) NULL) {
1165           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1166           IF_PAR_DEBUG(fish, // schedule,
1167                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1168                              tso->id, tso, advisory_thread_count));
1169
1170           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1171             IF_PAR_DEBUG(fish, // schedule,
1172                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1173                             spark));
1174             return rtsFalse; /* failed to generate a thread */
1175           }                  /* otherwise fall through & pick-up new tso */
1176         } else {
1177           IF_PAR_DEBUG(fish, // schedule,
1178                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1179                              spark_queue_len(pool)));
1180           return rtsFalse;  /* failed to generate a thread */
1181         }
1182         return rtsTrue;  /* success in generating a thread */
1183   } else { /* no more threads permitted or pool empty */
1184     return rtsFalse;  /* failed to generateThread */
1185   }
1186 #else
1187   tso = NULL; // avoid compiler warning only
1188   return rtsFalse;  /* dummy in non-PAR setup */
1189 #endif // SPARKS
1190 }
1191 #endif // PARALLEL_HASKELL
1192
1193 /* ----------------------------------------------------------------------------
1194  * Get work from a remote node (PARALLEL_HASKELL only)
1195  * ------------------------------------------------------------------------- */
1196     
1197 #if defined(PARALLEL_HASKELL)
1198 static rtsBool
1199 scheduleGetRemoteWork(rtsBool *receivedFinish)
1200 {
1201   ASSERT(emptyRunQueue());
1202
1203   if (RtsFlags.ParFlags.BufferTime) {
1204         IF_PAR_DEBUG(verbose, 
1205                 debugBelch("...send all pending data,"));
1206         {
1207           nat i;
1208           for (i=1; i<=nPEs; i++)
1209             sendImmediately(i); // send all messages away immediately
1210         }
1211   }
1212 # ifndef SPARKS
1213         //++EDEN++ idle() , i.e. send all buffers, wait for work
1214         // suppress fishing in EDEN... just look for incoming messages
1215         // (blocking receive)
1216   IF_PAR_DEBUG(verbose, 
1217                debugBelch("...wait for incoming messages...\n"));
1218   *receivedFinish = processMessages(); // blocking receive...
1219
1220         // and reenter scheduling loop after having received something
1221         // (return rtsFalse below)
1222
1223 # else /* activate SPARKS machinery */
1224 /* We get here, if we have no work, tried to activate a local spark, but still
1225    have no work. We try to get a remote spark, by sending a FISH message.
1226    Thread migration should be added here, and triggered when a sequence of 
1227    fishes returns without work. */
1228         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1229
1230       /* =8-[  no local sparks => look for work on other PEs */
1231         /*
1232          * We really have absolutely no work.  Send out a fish
1233          * (there may be some out there already), and wait for
1234          * something to arrive.  We clearly can't run any threads
1235          * until a SCHEDULE or RESUME arrives, and so that's what
1236          * we're hoping to see.  (Of course, we still have to
1237          * respond to other types of messages.)
1238          */
1239         rtsTime now = msTime() /*CURRENT_TIME*/;
1240         IF_PAR_DEBUG(verbose, 
1241                      debugBelch("--  now=%ld\n", now));
1242         IF_PAR_DEBUG(fish, // verbose,
1243              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1244                  (last_fish_arrived_at!=0 &&
1245                   last_fish_arrived_at+delay > now)) {
1246                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1247                      now, last_fish_arrived_at+delay, 
1248                      last_fish_arrived_at,
1249                      delay);
1250              });
1251   
1252         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1253             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1254           if (last_fish_arrived_at==0 ||
1255               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1256             /* outstandingFishes is set in sendFish, processFish;
1257                avoid flooding system with fishes via delay */
1258     next_fish_to_send_at = 0;  
1259   } else {
1260     /* ToDo: this should be done in the main scheduling loop to avoid the
1261              busy wait here; not so bad if fish delay is very small  */
1262     int iq = 0; // DEBUGGING -- HWL
1263     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1264     /* send a fish when ready, but process messages that arrive in the meantime */
1265     do {
1266       if (PacketsWaiting()) {
1267         iq++; // DEBUGGING
1268         *receivedFinish = processMessages();
1269       }
1270       now = msTime();
1271     } while (!*receivedFinish || now<next_fish_to_send_at);
1272     // JB: This means the fish could become obsolete, if we receive
1273     // work. Better check for work again? 
1274     // last line: while (!receivedFinish || !haveWork || now<...)
1275     // next line: if (receivedFinish || haveWork )
1276
1277     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1278       return rtsFalse;  // NB: this will leave scheduler loop
1279                         // immediately after return!
1280                           
1281     IF_PAR_DEBUG(fish, // verbose,
1282                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1283
1284   }
1285
1286     // JB: IMHO, this should all be hidden inside sendFish(...)
1287     /* pe = choosePE(); 
1288        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1289                 NEW_FISH_HUNGER);
1290
1291     // Global statistics: count no. of fishes
1292     if (RtsFlags.ParFlags.ParStats.Global &&
1293          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1294            globalParStats.tot_fish_mess++;
1295            }
1296     */ 
1297
1298   /* delayed fishes must have been sent by now! */
1299   next_fish_to_send_at = 0;  
1300   }
1301       
1302   *receivedFinish = processMessages();
1303 # endif /* SPARKS */
1304
1305  return rtsFalse;
1306  /* NB: this function always returns rtsFalse, meaning the scheduler
1307     loop continues with the next iteration; 
1308     rationale: 
1309       return code means success in finding work; we enter this function
1310       if there is no local work, thus have to send a fish which takes
1311       time until it arrives with work; in the meantime we should process
1312       messages in the main loop;
1313  */
1314 }
1315 #endif // PARALLEL_HASKELL
1316
1317 /* ----------------------------------------------------------------------------
1318  * PAR/GRAN: Report stats & debugging info(?)
1319  * ------------------------------------------------------------------------- */
1320
1321 #if defined(PAR) || defined(GRAN)
1322 static void
1323 scheduleGranParReport(void)
1324 {
1325   ASSERT(run_queue_hd != END_TSO_QUEUE);
1326
1327   /* Take a thread from the run queue, if we have work */
1328   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1329
1330     /* If this TSO has got its outport closed in the meantime, 
1331      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1332      * It has to be marked as TH_DEAD for this purpose.
1333      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1334
1335 JB: TODO: investigate wether state change field could be nuked
1336      entirely and replaced by the normal tso state (whatnext
1337      field). All we want to do is to kill tsos from outside.
1338      */
1339
1340     /* ToDo: write something to the log-file
1341     if (RTSflags.ParFlags.granSimStats && !sameThread)
1342         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1343
1344     CurrentTSO = t;
1345     */
1346     /* the spark pool for the current PE */
1347     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1348
1349     IF_DEBUG(scheduler, 
1350              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1351                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1352
1353     IF_PAR_DEBUG(fish,
1354              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1355                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1356
1357     if (RtsFlags.ParFlags.ParStats.Full && 
1358         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1359         (emitSchedule || // forced emit
1360          (t && LastTSO && t->id != LastTSO->id))) {
1361       /* 
1362          we are running a different TSO, so write a schedule event to log file
1363          NB: If we use fair scheduling we also have to write  a deschedule 
1364              event for LastTSO; with unfair scheduling we know that the
1365              previous tso has blocked whenever we switch to another tso, so
1366              we don't need it in GUM for now
1367       */
1368       IF_PAR_DEBUG(fish, // schedule,
1369                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1370
1371       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1372                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1373       emitSchedule = rtsFalse;
1374     }
1375 }     
1376 #endif
1377
1378 /* ----------------------------------------------------------------------------
1379  * After running a thread...
1380  * ------------------------------------------------------------------------- */
1381
1382 static void
1383 schedulePostRunThread(void)
1384 {
1385 #if defined(PAR)
1386     /* HACK 675: if the last thread didn't yield, make sure to print a 
1387        SCHEDULE event to the log file when StgRunning the next thread, even
1388        if it is the same one as before */
1389     LastTSO = t; 
1390     TimeOfLastYield = CURRENT_TIME;
1391 #endif
1392
1393   /* some statistics gathering in the parallel case */
1394
1395 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1396   switch (ret) {
1397     case HeapOverflow:
1398 # if defined(GRAN)
1399       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1400       globalGranStats.tot_heapover++;
1401 # elif defined(PAR)
1402       globalParStats.tot_heapover++;
1403 # endif
1404       break;
1405
1406      case StackOverflow:
1407 # if defined(GRAN)
1408       IF_DEBUG(gran, 
1409                DumpGranEvent(GR_DESCHEDULE, t));
1410       globalGranStats.tot_stackover++;
1411 # elif defined(PAR)
1412       // IF_DEBUG(par, 
1413       // DumpGranEvent(GR_DESCHEDULE, t);
1414       globalParStats.tot_stackover++;
1415 # endif
1416       break;
1417
1418     case ThreadYielding:
1419 # if defined(GRAN)
1420       IF_DEBUG(gran, 
1421                DumpGranEvent(GR_DESCHEDULE, t));
1422       globalGranStats.tot_yields++;
1423 # elif defined(PAR)
1424       // IF_DEBUG(par, 
1425       // DumpGranEvent(GR_DESCHEDULE, t);
1426       globalParStats.tot_yields++;
1427 # endif
1428       break; 
1429
1430     case ThreadBlocked:
1431 # if defined(GRAN)
1432       IF_DEBUG(scheduler,
1433                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1434                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1435                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1436                if (t->block_info.closure!=(StgClosure*)NULL)
1437                  print_bq(t->block_info.closure);
1438                debugBelch("\n"));
1439
1440       // ??? needed; should emit block before
1441       IF_DEBUG(gran, 
1442                DumpGranEvent(GR_DESCHEDULE, t)); 
1443       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1444       /*
1445         ngoq Dogh!
1446       ASSERT(procStatus[CurrentProc]==Busy || 
1447               ((procStatus[CurrentProc]==Fetching) && 
1448               (t->block_info.closure!=(StgClosure*)NULL)));
1449       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1450           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1451             procStatus[CurrentProc]==Fetching)) 
1452         procStatus[CurrentProc] = Idle;
1453       */
1454 # elif defined(PAR)
1455 //++PAR++  blockThread() writes the event (change?)
1456 # endif
1457     break;
1458
1459   case ThreadFinished:
1460     break;
1461
1462   default:
1463     barf("parGlobalStats: unknown return code");
1464     break;
1465     }
1466 #endif
1467 }
1468
1469 /* -----------------------------------------------------------------------------
1470  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1471  * -------------------------------------------------------------------------- */
1472
1473 static rtsBool
1474 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1475 {
1476     // did the task ask for a large block?
1477     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1478         // if so, get one and push it on the front of the nursery.
1479         bdescr *bd;
1480         lnat blocks;
1481         
1482         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1483         
1484         IF_DEBUG(scheduler,
1485                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1486                             (long)t->id, whatNext_strs[t->what_next], blocks));
1487         
1488         // don't do this if the nursery is (nearly) full, we'll GC first.
1489         if (cap->r.rCurrentNursery->link != NULL ||
1490             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1491                                                // if the nursery has only one block.
1492             
1493             ACQUIRE_SM_LOCK
1494             bd = allocGroup( blocks );
1495             RELEASE_SM_LOCK
1496             cap->r.rNursery->n_blocks += blocks;
1497             
1498             // link the new group into the list
1499             bd->link = cap->r.rCurrentNursery;
1500             bd->u.back = cap->r.rCurrentNursery->u.back;
1501             if (cap->r.rCurrentNursery->u.back != NULL) {
1502                 cap->r.rCurrentNursery->u.back->link = bd;
1503             } else {
1504 #if !defined(SMP)
1505                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1506                        g0s0 == cap->r.rNursery);
1507 #endif
1508                 cap->r.rNursery->blocks = bd;
1509             }             
1510             cap->r.rCurrentNursery->u.back = bd;
1511             
1512             // initialise it as a nursery block.  We initialise the
1513             // step, gen_no, and flags field of *every* sub-block in
1514             // this large block, because this is easier than making
1515             // sure that we always find the block head of a large
1516             // block whenever we call Bdescr() (eg. evacuate() and
1517             // isAlive() in the GC would both have to do this, at
1518             // least).
1519             { 
1520                 bdescr *x;
1521                 for (x = bd; x < bd + blocks; x++) {
1522                     x->step = cap->r.rNursery;
1523                     x->gen_no = 0;
1524                     x->flags = 0;
1525                 }
1526             }
1527             
1528             // This assert can be a killer if the app is doing lots
1529             // of large block allocations.
1530             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1531             
1532             // now update the nursery to point to the new block
1533             cap->r.rCurrentNursery = bd;
1534             
1535             // we might be unlucky and have another thread get on the
1536             // run queue before us and steal the large block, but in that
1537             // case the thread will just end up requesting another large
1538             // block.
1539             pushOnRunQueue(cap,t);
1540             return rtsFalse;  /* not actually GC'ing */
1541         }
1542     }
1543     
1544     IF_DEBUG(scheduler,
1545              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1546                         (long)t->id, whatNext_strs[t->what_next]));
1547 #if defined(GRAN)
1548     ASSERT(!is_on_queue(t,CurrentProc));
1549 #elif defined(PARALLEL_HASKELL)
1550     /* Currently we emit a DESCHEDULE event before GC in GUM.
1551        ToDo: either add separate event to distinguish SYSTEM time from rest
1552        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1553     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1554         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1555                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1556         emitSchedule = rtsTrue;
1557     }
1558 #endif
1559       
1560     pushOnRunQueue(cap,t);
1561     return rtsTrue;
1562     /* actual GC is done at the end of the while loop in schedule() */
1563 }
1564
1565 /* -----------------------------------------------------------------------------
1566  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1567  * -------------------------------------------------------------------------- */
1568
1569 static void
1570 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1571 {
1572     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1573                                   (long)t->id, whatNext_strs[t->what_next]));
1574     /* just adjust the stack for this thread, then pop it back
1575      * on the run queue.
1576      */
1577     { 
1578         /* enlarge the stack */
1579         StgTSO *new_t = threadStackOverflow(cap, t);
1580         
1581         /* The TSO attached to this Task may have moved, so update the
1582          * pointer to it.
1583          */
1584         if (task->tso == t) {
1585             task->tso = new_t;
1586         }
1587         pushOnRunQueue(cap,new_t);
1588     }
1589 }
1590
1591 /* -----------------------------------------------------------------------------
1592  * Handle a thread that returned to the scheduler with ThreadYielding
1593  * -------------------------------------------------------------------------- */
1594
1595 static rtsBool
1596 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1597 {
1598     // Reset the context switch flag.  We don't do this just before
1599     // running the thread, because that would mean we would lose ticks
1600     // during GC, which can lead to unfair scheduling (a thread hogs
1601     // the CPU because the tick always arrives during GC).  This way
1602     // penalises threads that do a lot of allocation, but that seems
1603     // better than the alternative.
1604     context_switch = 0;
1605     
1606     /* put the thread back on the run queue.  Then, if we're ready to
1607      * GC, check whether this is the last task to stop.  If so, wake
1608      * up the GC thread.  getThread will block during a GC until the
1609      * GC is finished.
1610      */
1611     IF_DEBUG(scheduler,
1612              if (t->what_next != prev_what_next) {
1613                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1614                             (long)t->id, whatNext_strs[t->what_next]);
1615              } else {
1616                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1617                             (long)t->id, whatNext_strs[t->what_next]);
1618              }
1619         );
1620     
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1623              checkTSO(t));
1624     ASSERT(t->link == END_TSO_QUEUE);
1625     
1626     // Shortcut if we're just switching evaluators: don't bother
1627     // doing stack squeezing (which can be expensive), just run the
1628     // thread.
1629     if (t->what_next != prev_what_next) {
1630         return rtsTrue;
1631     }
1632     
1633 #if defined(GRAN)
1634     ASSERT(!is_on_queue(t,CurrentProc));
1635       
1636     IF_DEBUG(sanity,
1637              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1638              checkThreadQsSanity(rtsTrue));
1639
1640 #endif
1641
1642     addToRunQueue(cap,t);
1643
1644 #if defined(GRAN)
1645     /* add a ContinueThread event to actually process the thread */
1646     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1647               ContinueThread,
1648               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1649     IF_GRAN_DEBUG(bq, 
1650                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1651                   G_EVENTQ(0);
1652                   G_CURR_THREADQ(0));
1653 #endif
1654     return rtsFalse;
1655 }
1656
1657 /* -----------------------------------------------------------------------------
1658  * Handle a thread that returned to the scheduler with ThreadBlocked
1659  * -------------------------------------------------------------------------- */
1660
1661 static void
1662 scheduleHandleThreadBlocked( StgTSO *t
1663 #if !defined(GRAN) && !defined(DEBUG)
1664     STG_UNUSED
1665 #endif
1666     )
1667 {
1668 #if defined(GRAN)
1669     IF_DEBUG(scheduler,
1670              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1671                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1672              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1673     
1674     // ??? needed; should emit block before
1675     IF_DEBUG(gran, 
1676              DumpGranEvent(GR_DESCHEDULE, t)); 
1677     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1678     /*
1679       ngoq Dogh!
1680       ASSERT(procStatus[CurrentProc]==Busy || 
1681       ((procStatus[CurrentProc]==Fetching) && 
1682       (t->block_info.closure!=(StgClosure*)NULL)));
1683       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1684       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1685       procStatus[CurrentProc]==Fetching)) 
1686       procStatus[CurrentProc] = Idle;
1687     */
1688 #elif defined(PAR)
1689     IF_DEBUG(scheduler,
1690              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1691                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1692     IF_PAR_DEBUG(bq,
1693                  
1694                  if (t->block_info.closure!=(StgClosure*)NULL) 
1695                  print_bq(t->block_info.closure));
1696     
1697     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1698     blockThread(t);
1699     
1700     /* whatever we schedule next, we must log that schedule */
1701     emitSchedule = rtsTrue;
1702     
1703 #else /* !GRAN */
1704
1705       // We don't need to do anything.  The thread is blocked, and it
1706       // has tidied up its stack and placed itself on whatever queue
1707       // it needs to be on.
1708
1709 #if !defined(SMP)
1710     ASSERT(t->why_blocked != NotBlocked);
1711              // This might not be true under SMP: we don't have
1712              // exclusive access to this TSO, so someone might have
1713              // woken it up by now.  This actually happens: try
1714              // conc023 +RTS -N2.
1715 #endif
1716
1717     IF_DEBUG(scheduler,
1718              debugBelch("--<< thread %d (%s) stopped: ", 
1719                         t->id, whatNext_strs[t->what_next]);
1720              printThreadBlockage(t);
1721              debugBelch("\n"));
1722     
1723     /* Only for dumping event to log file 
1724        ToDo: do I need this in GranSim, too?
1725        blockThread(t);
1726     */
1727 #endif
1728 }
1729
1730 /* -----------------------------------------------------------------------------
1731  * Handle a thread that returned to the scheduler with ThreadFinished
1732  * -------------------------------------------------------------------------- */
1733
1734 static rtsBool
1735 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1736 {
1737     /* Need to check whether this was a main thread, and if so,
1738      * return with the return value.
1739      *
1740      * We also end up here if the thread kills itself with an
1741      * uncaught exception, see Exception.cmm.
1742      */
1743     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1744                                   t->id, whatNext_strs[t->what_next]));
1745
1746 #if defined(GRAN)
1747       endThread(t, CurrentProc); // clean-up the thread
1748 #elif defined(PARALLEL_HASKELL)
1749       /* For now all are advisory -- HWL */
1750       //if(t->priority==AdvisoryPriority) ??
1751       advisory_thread_count--; // JB: Caution with this counter, buggy!
1752       
1753 # if defined(DIST)
1754       if(t->dist.priority==RevalPriority)
1755         FinishReval(t);
1756 # endif
1757     
1758 # if defined(EDENOLD)
1759       // the thread could still have an outport... (BUG)
1760       if (t->eden.outport != -1) {
1761       // delete the outport for the tso which has finished...
1762         IF_PAR_DEBUG(eden_ports,
1763                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1764                               t->eden.outport, t->id));
1765         deleteOPT(t);
1766       }
1767       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1768       if (t->eden.epid != -1) {
1769         IF_PAR_DEBUG(eden_ports,
1770                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1771                            t->id, t->eden.epid));
1772         removeTSOfromProcess(t);
1773       }
1774 # endif 
1775
1776 # if defined(PAR)
1777       if (RtsFlags.ParFlags.ParStats.Full &&
1778           !RtsFlags.ParFlags.ParStats.Suppressed) 
1779         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1780
1781       //  t->par only contains statistics: left out for now...
1782       IF_PAR_DEBUG(fish,
1783                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1784                               t->id,t,t->par.sparkname));
1785 # endif
1786 #endif // PARALLEL_HASKELL
1787
1788       //
1789       // Check whether the thread that just completed was a bound
1790       // thread, and if so return with the result.  
1791       //
1792       // There is an assumption here that all thread completion goes
1793       // through this point; we need to make sure that if a thread
1794       // ends up in the ThreadKilled state, that it stays on the run
1795       // queue so it can be dealt with here.
1796       //
1797
1798       if (t->bound) {
1799
1800           if (t->bound != task) {
1801 #if !defined(THREADED_RTS)
1802               // Must be a bound thread that is not the topmost one.  Leave
1803               // it on the run queue until the stack has unwound to the
1804               // point where we can deal with this.  Leaving it on the run
1805               // queue also ensures that the garbage collector knows about
1806               // this thread and its return value (it gets dropped from the
1807               // all_threads list so there's no other way to find it).
1808               appendToRunQueue(cap,t);
1809               return rtsFalse;
1810 #else
1811               // this cannot happen in the threaded RTS, because a
1812               // bound thread can only be run by the appropriate Task.
1813               barf("finished bound thread that isn't mine");
1814 #endif
1815           }
1816
1817           ASSERT(task->tso == t);
1818
1819           if (t->what_next == ThreadComplete) {
1820               if (task->ret) {
1821                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1822                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1823               }
1824               task->stat = Success;
1825           } else {
1826               if (task->ret) {
1827                   *(task->ret) = NULL;
1828               }
1829               if (interrupted) {
1830                   task->stat = Interrupted;
1831               } else {
1832                   task->stat = Killed;
1833               }
1834           }
1835 #ifdef DEBUG
1836           removeThreadLabel((StgWord)task->tso->id);
1837 #endif
1838           return rtsTrue; // tells schedule() to return
1839       }
1840
1841       return rtsFalse;
1842 }
1843
1844 /* -----------------------------------------------------------------------------
1845  * Perform a heap census, if PROFILING
1846  * -------------------------------------------------------------------------- */
1847
1848 static rtsBool
1849 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1850 {
1851 #if defined(PROFILING)
1852     // When we have +RTS -i0 and we're heap profiling, do a census at
1853     // every GC.  This lets us get repeatable runs for debugging.
1854     if (performHeapProfile ||
1855         (RtsFlags.ProfFlags.profileInterval==0 &&
1856          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1857         GarbageCollect(GetRoots, rtsTrue);
1858         heapCensus();
1859         performHeapProfile = rtsFalse;
1860         return rtsTrue;  // true <=> we already GC'd
1861     }
1862 #endif
1863     return rtsFalse;
1864 }
1865
1866 /* -----------------------------------------------------------------------------
1867  * Perform a garbage collection if necessary
1868  * -------------------------------------------------------------------------- */
1869
1870 static void
1871 scheduleDoGC( Capability *cap, Task *task USED_IF_SMP, rtsBool force_major )
1872 {
1873     StgTSO *t;
1874 #ifdef SMP
1875     static volatile StgWord waiting_for_gc;
1876     rtsBool was_waiting;
1877     nat i;
1878 #endif
1879
1880 #ifdef SMP
1881     // In order to GC, there must be no threads running Haskell code.
1882     // Therefore, the GC thread needs to hold *all* the capabilities,
1883     // and release them after the GC has completed.  
1884     //
1885     // This seems to be the simplest way: previous attempts involved
1886     // making all the threads with capabilities give up their
1887     // capabilities and sleep except for the *last* one, which
1888     // actually did the GC.  But it's quite hard to arrange for all
1889     // the other tasks to sleep and stay asleep.
1890     //
1891         
1892     was_waiting = cas(&waiting_for_gc, 0, 1);
1893     if (was_waiting) {
1894         do {
1895             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1896             yieldCapability(&cap,task);
1897         } while (waiting_for_gc);
1898         return;
1899     }
1900
1901     for (i=0; i < n_capabilities; i++) {
1902         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1903         if (cap != &capabilities[i]) {
1904             Capability *pcap = &capabilities[i];
1905             // we better hope this task doesn't get migrated to
1906             // another Capability while we're waiting for this one.
1907             // It won't, because load balancing happens while we have
1908             // all the Capabilities, but even so it's a slightly
1909             // unsavoury invariant.
1910             task->cap = pcap;
1911             context_switch = 1;
1912             waitForReturnCapability(&pcap, task);
1913             if (pcap != &capabilities[i]) {
1914                 barf("scheduleDoGC: got the wrong capability");
1915             }
1916         }
1917     }
1918
1919     waiting_for_gc = rtsFalse;
1920 #endif
1921
1922     /* Kick any transactions which are invalid back to their
1923      * atomically frames.  When next scheduled they will try to
1924      * commit, this commit will fail and they will retry.
1925      */
1926     { 
1927         StgTSO *next;
1928
1929         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1930             if (t->what_next == ThreadRelocated) {
1931                 next = t->link;
1932             } else {
1933                 next = t->global_link;
1934                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1935                     if (!stmValidateNestOfTransactions (t -> trec)) {
1936                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1937                         
1938                         // strip the stack back to the
1939                         // ATOMICALLY_FRAME, aborting the (nested)
1940                         // transaction, and saving the stack of any
1941                         // partially-evaluated thunks on the heap.
1942                         raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1943                         
1944 #ifdef REG_R1
1945                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1946 #endif
1947                     }
1948                 }
1949             }
1950         }
1951     }
1952     
1953     // so this happens periodically:
1954     scheduleCheckBlackHoles(cap);
1955     
1956     IF_DEBUG(scheduler, printAllThreads());
1957
1958     /* everybody back, start the GC.
1959      * Could do it in this thread, or signal a condition var
1960      * to do it in another thread.  Either way, we need to
1961      * broadcast on gc_pending_cond afterward.
1962      */
1963 #if defined(THREADED_RTS)
1964     IF_DEBUG(scheduler,sched_belch("doing GC"));
1965 #endif
1966     GarbageCollect(GetRoots, force_major);
1967     
1968 #if defined(SMP)
1969     // release our stash of capabilities.
1970     for (i = 0; i < n_capabilities; i++) {
1971         if (cap != &capabilities[i]) {
1972             task->cap = &capabilities[i];
1973             releaseCapability(&capabilities[i]);
1974         }
1975     }
1976     task->cap = cap;
1977 #endif
1978
1979 #if defined(GRAN)
1980     /* add a ContinueThread event to continue execution of current thread */
1981     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1982               ContinueThread,
1983               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1984     IF_GRAN_DEBUG(bq, 
1985                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1986                   G_EVENTQ(0);
1987                   G_CURR_THREADQ(0));
1988 #endif /* GRAN */
1989 }
1990
1991 /* ---------------------------------------------------------------------------
1992  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1993  * used by Control.Concurrent for error checking.
1994  * ------------------------------------------------------------------------- */
1995  
1996 StgBool
1997 rtsSupportsBoundThreads(void)
1998 {
1999 #if defined(THREADED_RTS)
2000   return rtsTrue;
2001 #else
2002   return rtsFalse;
2003 #endif
2004 }
2005
2006 /* ---------------------------------------------------------------------------
2007  * isThreadBound(tso): check whether tso is bound to an OS thread.
2008  * ------------------------------------------------------------------------- */
2009  
2010 StgBool
2011 isThreadBound(StgTSO* tso USED_IF_THREADS)
2012 {
2013 #if defined(THREADED_RTS)
2014   return (tso->bound != NULL);
2015 #endif
2016   return rtsFalse;
2017 }
2018
2019 /* ---------------------------------------------------------------------------
2020  * Singleton fork(). Do not copy any running threads.
2021  * ------------------------------------------------------------------------- */
2022
2023 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2024 #define FORKPROCESS_PRIMOP_SUPPORTED
2025 #endif
2026
2027 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2028 static void 
2029 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2030 #endif
2031 StgInt
2032 forkProcess(HsStablePtr *entry
2033 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2034             STG_UNUSED
2035 #endif
2036            )
2037 {
2038 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2039     Task *task;
2040     pid_t pid;
2041     StgTSO* t,*next;
2042     Capability *cap;
2043     
2044     IF_DEBUG(scheduler,sched_belch("forking!"));
2045     
2046     // ToDo: for SMP, we should probably acquire *all* the capabilities
2047     cap = rts_lock();
2048     
2049     pid = fork();
2050     
2051     if (pid) { // parent
2052         
2053         // just return the pid
2054         rts_unlock(cap);
2055         return pid;
2056         
2057     } else { // child
2058         
2059         // delete all threads
2060         cap->run_queue_hd = END_TSO_QUEUE;
2061         cap->run_queue_tl = END_TSO_QUEUE;
2062         
2063         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2064             next = t->link;
2065             
2066             // don't allow threads to catch the ThreadKilled exception
2067             deleteThreadImmediately(cap,t);
2068         }
2069         
2070         // wipe the task list
2071         ACQUIRE_LOCK(&sched_mutex);
2072         for (task = all_tasks; task != NULL; task=task->all_link) {
2073             if (task != cap->running_task) discardTask(task);
2074         }
2075         RELEASE_LOCK(&sched_mutex);
2076
2077         cap->suspended_ccalling_tasks = NULL;
2078
2079 #if defined(THREADED_RTS)
2080         // wipe our spare workers list.
2081         cap->spare_workers = NULL;
2082         cap->returning_tasks_hd = NULL;
2083         cap->returning_tasks_tl = NULL;
2084 #endif
2085
2086         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2087         rts_checkSchedStatus("forkProcess",cap);
2088         
2089         rts_unlock(cap);
2090         hs_exit();                      // clean up and exit
2091         stg_exit(EXIT_SUCCESS);
2092     }
2093 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2094     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2095     return -1;
2096 #endif
2097 }
2098
2099 /* ---------------------------------------------------------------------------
2100  * Delete the threads on the run queue of the current capability.
2101  * ------------------------------------------------------------------------- */
2102    
2103 static void
2104 deleteRunQueue (Capability *cap)
2105 {
2106     StgTSO *t, *next;
2107     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2108         ASSERT(t->what_next != ThreadRelocated);
2109         next = t->link;
2110         deleteThread(cap, t);
2111     }
2112 }
2113
2114 /* startThread and  insertThread are now in GranSim.c -- HWL */
2115
2116
2117 /* -----------------------------------------------------------------------------
2118    Managing the suspended_ccalling_tasks list.
2119    Locks required: sched_mutex
2120    -------------------------------------------------------------------------- */
2121
2122 STATIC_INLINE void
2123 suspendTask (Capability *cap, Task *task)
2124 {
2125     ASSERT(task->next == NULL && task->prev == NULL);
2126     task->next = cap->suspended_ccalling_tasks;
2127     task->prev = NULL;
2128     if (cap->suspended_ccalling_tasks) {
2129         cap->suspended_ccalling_tasks->prev = task;
2130     }
2131     cap->suspended_ccalling_tasks = task;
2132 }
2133
2134 STATIC_INLINE void
2135 recoverSuspendedTask (Capability *cap, Task *task)
2136 {
2137     if (task->prev) {
2138         task->prev->next = task->next;
2139     } else {
2140         ASSERT(cap->suspended_ccalling_tasks == task);
2141         cap->suspended_ccalling_tasks = task->next;
2142     }
2143     if (task->next) {
2144         task->next->prev = task->prev;
2145     }
2146     task->next = task->prev = NULL;
2147 }
2148
2149 /* ---------------------------------------------------------------------------
2150  * Suspending & resuming Haskell threads.
2151  * 
2152  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2153  * its capability before calling the C function.  This allows another
2154  * task to pick up the capability and carry on running Haskell
2155  * threads.  It also means that if the C call blocks, it won't lock
2156  * the whole system.
2157  *
2158  * The Haskell thread making the C call is put to sleep for the
2159  * duration of the call, on the susepended_ccalling_threads queue.  We
2160  * give out a token to the task, which it can use to resume the thread
2161  * on return from the C function.
2162  * ------------------------------------------------------------------------- */
2163    
2164 void *
2165 suspendThread (StgRegTable *reg)
2166 {
2167   Capability *cap;
2168   int saved_errno = errno;
2169   StgTSO *tso;
2170   Task *task;
2171
2172   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2173    */
2174   cap = regTableToCapability(reg);
2175
2176   task = cap->running_task;
2177   tso = cap->r.rCurrentTSO;
2178
2179   IF_DEBUG(scheduler,
2180            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2181
2182   // XXX this might not be necessary --SDM
2183   tso->what_next = ThreadRunGHC;
2184
2185   threadPaused(cap,tso);
2186
2187   if(tso->blocked_exceptions == NULL)  {
2188       tso->why_blocked = BlockedOnCCall;
2189       tso->blocked_exceptions = END_TSO_QUEUE;
2190   } else {
2191       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2192   }
2193
2194   // Hand back capability
2195   task->suspended_tso = tso;
2196
2197   ACQUIRE_LOCK(&cap->lock);
2198
2199   suspendTask(cap,task);
2200   cap->in_haskell = rtsFalse;
2201   releaseCapability_(cap);
2202   
2203   RELEASE_LOCK(&cap->lock);
2204
2205 #if defined(THREADED_RTS)
2206   /* Preparing to leave the RTS, so ensure there's a native thread/task
2207      waiting to take over.
2208   */
2209   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2210 #endif
2211
2212   errno = saved_errno;
2213   return task;
2214 }
2215
2216 StgRegTable *
2217 resumeThread (void *task_)
2218 {
2219     StgTSO *tso;
2220     Capability *cap;
2221     int saved_errno = errno;
2222     Task *task = task_;
2223
2224     cap = task->cap;
2225     // Wait for permission to re-enter the RTS with the result.
2226     waitForReturnCapability(&cap,task);
2227     // we might be on a different capability now... but if so, our
2228     // entry on the suspended_ccalling_tasks list will also have been
2229     // migrated.
2230
2231     // Remove the thread from the suspended list
2232     recoverSuspendedTask(cap,task);
2233
2234     tso = task->suspended_tso;
2235     task->suspended_tso = NULL;
2236     tso->link = END_TSO_QUEUE;
2237     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2238     
2239     if (tso->why_blocked == BlockedOnCCall) {
2240         awakenBlockedQueue(cap,tso->blocked_exceptions);
2241         tso->blocked_exceptions = NULL;
2242     }
2243     
2244     /* Reset blocking status */
2245     tso->why_blocked  = NotBlocked;
2246     
2247     cap->r.rCurrentTSO = tso;
2248     cap->in_haskell = rtsTrue;
2249     errno = saved_errno;
2250
2251     return &cap->r;
2252 }
2253
2254 /* ---------------------------------------------------------------------------
2255  * Comparing Thread ids.
2256  *
2257  * This is used from STG land in the implementation of the
2258  * instances of Eq/Ord for ThreadIds.
2259  * ------------------------------------------------------------------------ */
2260
2261 int
2262 cmp_thread(StgPtr tso1, StgPtr tso2) 
2263
2264   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2265   StgThreadID id2 = ((StgTSO *)tso2)->id;
2266  
2267   if (id1 < id2) return (-1);
2268   if (id1 > id2) return 1;
2269   return 0;
2270 }
2271
2272 /* ---------------------------------------------------------------------------
2273  * Fetching the ThreadID from an StgTSO.
2274  *
2275  * This is used in the implementation of Show for ThreadIds.
2276  * ------------------------------------------------------------------------ */
2277 int
2278 rts_getThreadId(StgPtr tso) 
2279 {
2280   return ((StgTSO *)tso)->id;
2281 }
2282
2283 #ifdef DEBUG
2284 void
2285 labelThread(StgPtr tso, char *label)
2286 {
2287   int len;
2288   void *buf;
2289
2290   /* Caveat: Once set, you can only set the thread name to "" */
2291   len = strlen(label)+1;
2292   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2293   strncpy(buf,label,len);
2294   /* Update will free the old memory for us */
2295   updateThreadLabel(((StgTSO *)tso)->id,buf);
2296 }
2297 #endif /* DEBUG */
2298
2299 /* ---------------------------------------------------------------------------
2300    Create a new thread.
2301
2302    The new thread starts with the given stack size.  Before the
2303    scheduler can run, however, this thread needs to have a closure
2304    (and possibly some arguments) pushed on its stack.  See
2305    pushClosure() in Schedule.h.
2306
2307    createGenThread() and createIOThread() (in SchedAPI.h) are
2308    convenient packaged versions of this function.
2309
2310    currently pri (priority) is only used in a GRAN setup -- HWL
2311    ------------------------------------------------------------------------ */
2312 #if defined(GRAN)
2313 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2314 StgTSO *
2315 createThread(nat size, StgInt pri)
2316 #else
2317 StgTSO *
2318 createThread(Capability *cap, nat size)
2319 #endif
2320 {
2321     StgTSO *tso;
2322     nat stack_size;
2323
2324     /* sched_mutex is *not* required */
2325
2326     /* First check whether we should create a thread at all */
2327 #if defined(PARALLEL_HASKELL)
2328     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2329     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2330         threadsIgnored++;
2331         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2332                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2333         return END_TSO_QUEUE;
2334     }
2335     threadsCreated++;
2336 #endif
2337
2338 #if defined(GRAN)
2339     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2340 #endif
2341
2342     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2343
2344     /* catch ridiculously small stack sizes */
2345     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2346         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2347     }
2348
2349     stack_size = size - TSO_STRUCT_SIZEW;
2350     
2351     tso = (StgTSO *)allocateLocal(cap, size);
2352     TICK_ALLOC_TSO(stack_size, 0);
2353
2354     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2355 #if defined(GRAN)
2356     SET_GRAN_HDR(tso, ThisPE);
2357 #endif
2358
2359     // Always start with the compiled code evaluator
2360     tso->what_next = ThreadRunGHC;
2361
2362     tso->why_blocked  = NotBlocked;
2363     tso->blocked_exceptions = NULL;
2364     
2365     tso->saved_errno = 0;
2366     tso->bound = NULL;
2367     
2368     tso->stack_size     = stack_size;
2369     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2370                           - TSO_STRUCT_SIZEW;
2371     tso->sp             = (P_)&(tso->stack) + stack_size;
2372
2373     tso->trec = NO_TREC;
2374     
2375 #ifdef PROFILING
2376     tso->prof.CCCS = CCS_MAIN;
2377 #endif
2378     
2379   /* put a stop frame on the stack */
2380     tso->sp -= sizeofW(StgStopFrame);
2381     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2382     tso->link = END_TSO_QUEUE;
2383     
2384   // ToDo: check this
2385 #if defined(GRAN)
2386     /* uses more flexible routine in GranSim */
2387     insertThread(tso, CurrentProc);
2388 #else
2389     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2390      * from its creation
2391      */
2392 #endif
2393     
2394 #if defined(GRAN) 
2395     if (RtsFlags.GranFlags.GranSimStats.Full) 
2396         DumpGranEvent(GR_START,tso);
2397 #elif defined(PARALLEL_HASKELL)
2398     if (RtsFlags.ParFlags.ParStats.Full) 
2399         DumpGranEvent(GR_STARTQ,tso);
2400     /* HACk to avoid SCHEDULE 
2401        LastTSO = tso; */
2402 #endif
2403     
2404     /* Link the new thread on the global thread list.
2405      */
2406     ACQUIRE_LOCK(&sched_mutex);
2407     tso->id = next_thread_id++;  // while we have the mutex
2408     tso->global_link = all_threads;
2409     all_threads = tso;
2410     RELEASE_LOCK(&sched_mutex);
2411     
2412 #if defined(DIST)
2413     tso->dist.priority = MandatoryPriority; //by default that is...
2414 #endif
2415     
2416 #if defined(GRAN)
2417     tso->gran.pri = pri;
2418 # if defined(DEBUG)
2419     tso->gran.magic = TSO_MAGIC; // debugging only
2420 # endif
2421     tso->gran.sparkname   = 0;
2422     tso->gran.startedat   = CURRENT_TIME; 
2423     tso->gran.exported    = 0;
2424     tso->gran.basicblocks = 0;
2425     tso->gran.allocs      = 0;
2426     tso->gran.exectime    = 0;
2427     tso->gran.fetchtime   = 0;
2428     tso->gran.fetchcount  = 0;
2429     tso->gran.blocktime   = 0;
2430     tso->gran.blockcount  = 0;
2431     tso->gran.blockedat   = 0;
2432     tso->gran.globalsparks = 0;
2433     tso->gran.localsparks  = 0;
2434     if (RtsFlags.GranFlags.Light)
2435         tso->gran.clock  = Now; /* local clock */
2436     else
2437         tso->gran.clock  = 0;
2438     
2439     IF_DEBUG(gran,printTSO(tso));
2440 #elif defined(PARALLEL_HASKELL)
2441 # if defined(DEBUG)
2442     tso->par.magic = TSO_MAGIC; // debugging only
2443 # endif
2444     tso->par.sparkname   = 0;
2445     tso->par.startedat   = CURRENT_TIME; 
2446     tso->par.exported    = 0;
2447     tso->par.basicblocks = 0;
2448     tso->par.allocs      = 0;
2449     tso->par.exectime    = 0;
2450     tso->par.fetchtime   = 0;
2451     tso->par.fetchcount  = 0;
2452     tso->par.blocktime   = 0;
2453     tso->par.blockcount  = 0;
2454     tso->par.blockedat   = 0;
2455     tso->par.globalsparks = 0;
2456     tso->par.localsparks  = 0;
2457 #endif
2458     
2459 #if defined(GRAN)
2460     globalGranStats.tot_threads_created++;
2461     globalGranStats.threads_created_on_PE[CurrentProc]++;
2462     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2463     globalGranStats.tot_sq_probes++;
2464 #elif defined(PARALLEL_HASKELL)
2465     // collect parallel global statistics (currently done together with GC stats)
2466     if (RtsFlags.ParFlags.ParStats.Global &&
2467         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2468         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2469         globalParStats.tot_threads_created++;
2470     }
2471 #endif 
2472     
2473 #if defined(GRAN)
2474     IF_GRAN_DEBUG(pri,
2475                   sched_belch("==__ schedule: Created TSO %d (%p);",
2476                               CurrentProc, tso, tso->id));
2477 #elif defined(PARALLEL_HASKELL)
2478     IF_PAR_DEBUG(verbose,
2479                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2480                              (long)tso->id, tso, advisory_thread_count));
2481 #else
2482     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2483                                    (long)tso->id, (long)tso->stack_size));
2484 #endif    
2485     return tso;
2486 }
2487
2488 #if defined(PAR)
2489 /* RFP:
2490    all parallel thread creation calls should fall through the following routine.
2491 */
2492 StgTSO *
2493 createThreadFromSpark(rtsSpark spark) 
2494 { StgTSO *tso;
2495   ASSERT(spark != (rtsSpark)NULL);
2496 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2497   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2498   { threadsIgnored++;
2499     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2500           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2501     return END_TSO_QUEUE;
2502   }
2503   else
2504   { threadsCreated++;
2505     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2506     if (tso==END_TSO_QUEUE)     
2507       barf("createSparkThread: Cannot create TSO");
2508 #if defined(DIST)
2509     tso->priority = AdvisoryPriority;
2510 #endif
2511     pushClosure(tso,spark);
2512     addToRunQueue(tso);
2513     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2514   }
2515   return tso;
2516 }
2517 #endif
2518
2519 /*
2520   Turn a spark into a thread.
2521   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2522 */
2523 #if 0
2524 StgTSO *
2525 activateSpark (rtsSpark spark) 
2526 {
2527   StgTSO *tso;
2528
2529   tso = createSparkThread(spark);
2530   if (RtsFlags.ParFlags.ParStats.Full) {   
2531     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2532       IF_PAR_DEBUG(verbose,
2533                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2534                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2535   }
2536   // ToDo: fwd info on local/global spark to thread -- HWL
2537   // tso->gran.exported =  spark->exported;
2538   // tso->gran.locked =   !spark->global;
2539   // tso->gran.sparkname = spark->name;
2540
2541   return tso;
2542 }
2543 #endif
2544
2545 /* ---------------------------------------------------------------------------
2546  * scheduleThread()
2547  *
2548  * scheduleThread puts a thread on the end  of the runnable queue.
2549  * This will usually be done immediately after a thread is created.
2550  * The caller of scheduleThread must create the thread using e.g.
2551  * createThread and push an appropriate closure
2552  * on this thread's stack before the scheduler is invoked.
2553  * ------------------------------------------------------------------------ */
2554
2555 void
2556 scheduleThread(Capability *cap, StgTSO *tso)
2557 {
2558     // The thread goes at the *end* of the run-queue, to avoid possible
2559     // starvation of any threads already on the queue.
2560     appendToRunQueue(cap,tso);
2561 }
2562
2563 Capability *
2564 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2565 {
2566     Task *task;
2567
2568     // We already created/initialised the Task
2569     task = cap->running_task;
2570
2571     // This TSO is now a bound thread; make the Task and TSO
2572     // point to each other.
2573     tso->bound = task;
2574
2575     task->tso = tso;
2576     task->ret = ret;
2577     task->stat = NoStatus;
2578
2579     appendToRunQueue(cap,tso);
2580
2581     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2582
2583 #if defined(GRAN)
2584     /* GranSim specific init */
2585     CurrentTSO = m->tso;                // the TSO to run
2586     procStatus[MainProc] = Busy;        // status of main PE
2587     CurrentProc = MainProc;             // PE to run it on
2588 #endif
2589
2590     cap = schedule(cap,task);
2591
2592     ASSERT(task->stat != NoStatus);
2593     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2594
2595     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2596     return cap;
2597 }
2598
2599 /* ----------------------------------------------------------------------------
2600  * Starting Tasks
2601  * ------------------------------------------------------------------------- */
2602
2603 #if defined(THREADED_RTS)
2604 void
2605 workerStart(Task *task)
2606 {
2607     Capability *cap;
2608
2609     // See startWorkerTask().
2610     ACQUIRE_LOCK(&task->lock);
2611     cap = task->cap;
2612     RELEASE_LOCK(&task->lock);
2613
2614     // set the thread-local pointer to the Task:
2615     taskEnter(task);
2616
2617     // schedule() runs without a lock.
2618     cap = schedule(cap,task);
2619
2620     // On exit from schedule(), we have a Capability.
2621     releaseCapability(cap);
2622     taskStop(task);
2623 }
2624 #endif
2625
2626 /* ---------------------------------------------------------------------------
2627  * initScheduler()
2628  *
2629  * Initialise the scheduler.  This resets all the queues - if the
2630  * queues contained any threads, they'll be garbage collected at the
2631  * next pass.
2632  *
2633  * ------------------------------------------------------------------------ */
2634
2635 void 
2636 initScheduler(void)
2637 {
2638 #if defined(GRAN)
2639   nat i;
2640   for (i=0; i<=MAX_PROC; i++) {
2641     run_queue_hds[i]      = END_TSO_QUEUE;
2642     run_queue_tls[i]      = END_TSO_QUEUE;
2643     blocked_queue_hds[i]  = END_TSO_QUEUE;
2644     blocked_queue_tls[i]  = END_TSO_QUEUE;
2645     ccalling_threadss[i]  = END_TSO_QUEUE;
2646     blackhole_queue[i]    = END_TSO_QUEUE;
2647     sleeping_queue        = END_TSO_QUEUE;
2648   }
2649 #elif !defined(THREADED_RTS)
2650   blocked_queue_hd  = END_TSO_QUEUE;
2651   blocked_queue_tl  = END_TSO_QUEUE;
2652   sleeping_queue    = END_TSO_QUEUE;
2653 #endif
2654
2655   blackhole_queue   = END_TSO_QUEUE;
2656   all_threads       = END_TSO_QUEUE;
2657
2658   context_switch = 0;
2659   interrupted    = 0;
2660
2661   RtsFlags.ConcFlags.ctxtSwitchTicks =
2662       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2663       
2664 #if defined(THREADED_RTS)
2665   /* Initialise the mutex and condition variables used by
2666    * the scheduler. */
2667   initMutex(&sched_mutex);
2668 #endif
2669   
2670   ACQUIRE_LOCK(&sched_mutex);
2671
2672   /* A capability holds the state a native thread needs in
2673    * order to execute STG code. At least one capability is
2674    * floating around (only SMP builds have more than one).
2675    */
2676   initCapabilities();
2677
2678   initTaskManager();
2679
2680 #if defined(SMP) || defined(PARALLEL_HASKELL)
2681   initSparkPools();
2682 #endif
2683
2684 #if defined(SMP)
2685   /*
2686    * Eagerly start one worker to run each Capability, except for
2687    * Capability 0.  The idea is that we're probably going to start a
2688    * bound thread on Capability 0 pretty soon, so we don't want a
2689    * worker task hogging it.
2690    */
2691   { 
2692       nat i;
2693       Capability *cap;
2694       for (i = 1; i < n_capabilities; i++) {
2695           cap = &capabilities[i];
2696           ACQUIRE_LOCK(&cap->lock);
2697           startWorkerTask(cap, workerStart);
2698           RELEASE_LOCK(&cap->lock);
2699       }
2700   }
2701 #endif
2702
2703   RELEASE_LOCK(&sched_mutex);
2704 }
2705
2706 void
2707 exitScheduler( void )
2708 {
2709     interrupted = rtsTrue;
2710     shutting_down_scheduler = rtsTrue;
2711
2712 #if defined(THREADED_RTS)
2713     { 
2714         Task *task;
2715         nat i;
2716         
2717         ACQUIRE_LOCK(&sched_mutex);
2718         task = newBoundTask();
2719         RELEASE_LOCK(&sched_mutex);
2720
2721         for (i = 0; i < n_capabilities; i++) {
2722             shutdownCapability(&capabilities[i], task);
2723         }
2724         boundTaskExiting(task);
2725         stopTaskManager();
2726     }
2727 #endif
2728 }
2729
2730 /* ---------------------------------------------------------------------------
2731    Where are the roots that we know about?
2732
2733         - all the threads on the runnable queue
2734         - all the threads on the blocked queue
2735         - all the threads on the sleeping queue
2736         - all the thread currently executing a _ccall_GC
2737         - all the "main threads"
2738      
2739    ------------------------------------------------------------------------ */
2740
2741 /* This has to be protected either by the scheduler monitor, or by the
2742         garbage collection monitor (probably the latter).
2743         KH @ 25/10/99
2744 */
2745
2746 void
2747 GetRoots( evac_fn evac )
2748 {
2749     nat i;
2750     Capability *cap;
2751     Task *task;
2752
2753 #if defined(GRAN)
2754     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2755         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2756             evac((StgClosure **)&run_queue_hds[i]);
2757         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2758             evac((StgClosure **)&run_queue_tls[i]);
2759         
2760         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2761             evac((StgClosure **)&blocked_queue_hds[i]);
2762         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2763             evac((StgClosure **)&blocked_queue_tls[i]);
2764         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2765             evac((StgClosure **)&ccalling_threads[i]);
2766     }
2767
2768     markEventQueue();
2769
2770 #else /* !GRAN */
2771
2772     for (i = 0; i < n_capabilities; i++) {
2773         cap = &capabilities[i];
2774         evac((StgClosure **)&cap->run_queue_hd);
2775         evac((StgClosure **)&cap->run_queue_tl);
2776         
2777         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2778              task=task->next) {
2779             evac((StgClosure **)&task->suspended_tso);
2780         }
2781     }
2782     
2783 #if !defined(THREADED_RTS)
2784     evac((StgClosure **)&blocked_queue_hd);
2785     evac((StgClosure **)&blocked_queue_tl);
2786     evac((StgClosure **)&sleeping_queue);
2787 #endif 
2788 #endif
2789
2790     evac((StgClosure **)&blackhole_queue);
2791
2792 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2793     markSparkQueue(evac);
2794 #endif
2795     
2796 #if defined(RTS_USER_SIGNALS)
2797     // mark the signal handlers (signals should be already blocked)
2798     markSignalHandlers(evac);
2799 #endif
2800 }
2801
2802 /* -----------------------------------------------------------------------------
2803    performGC
2804
2805    This is the interface to the garbage collector from Haskell land.
2806    We provide this so that external C code can allocate and garbage
2807    collect when called from Haskell via _ccall_GC.
2808
2809    It might be useful to provide an interface whereby the programmer
2810    can specify more roots (ToDo).
2811    
2812    This needs to be protected by the GC condition variable above.  KH.
2813    -------------------------------------------------------------------------- */
2814
2815 static void (*extra_roots)(evac_fn);
2816
2817 void
2818 performGC(void)
2819 {
2820 #ifdef THREADED_RTS
2821     // ToDo: we have to grab all the capabilities here.
2822     errorBelch("performGC not supported in threaded RTS (yet)");
2823     stg_exit(EXIT_FAILURE);
2824 #endif
2825     /* Obligated to hold this lock upon entry */
2826     GarbageCollect(GetRoots,rtsFalse);
2827 }
2828
2829 void
2830 performMajorGC(void)
2831 {
2832 #ifdef THREADED_RTS
2833     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2834     stg_exit(EXIT_FAILURE);
2835 #endif
2836     GarbageCollect(GetRoots,rtsTrue);
2837 }
2838
2839 static void
2840 AllRoots(evac_fn evac)
2841 {
2842     GetRoots(evac);             // the scheduler's roots
2843     extra_roots(evac);          // the user's roots
2844 }
2845
2846 void
2847 performGCWithRoots(void (*get_roots)(evac_fn))
2848 {
2849 #ifdef THREADED_RTS
2850     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2851     stg_exit(EXIT_FAILURE);
2852 #endif
2853     extra_roots = get_roots;
2854     GarbageCollect(AllRoots,rtsFalse);
2855 }
2856
2857 /* -----------------------------------------------------------------------------
2858    Stack overflow
2859
2860    If the thread has reached its maximum stack size, then raise the
2861    StackOverflow exception in the offending thread.  Otherwise
2862    relocate the TSO into a larger chunk of memory and adjust its stack
2863    size appropriately.
2864    -------------------------------------------------------------------------- */
2865
2866 static StgTSO *
2867 threadStackOverflow(Capability *cap, StgTSO *tso)
2868 {
2869   nat new_stack_size, stack_words;
2870   lnat new_tso_size;
2871   StgPtr new_sp;
2872   StgTSO *dest;
2873
2874   IF_DEBUG(sanity,checkTSO(tso));
2875   if (tso->stack_size >= tso->max_stack_size) {
2876
2877     IF_DEBUG(gc,
2878              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2879                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2880              /* If we're debugging, just print out the top of the stack */
2881              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2882                                               tso->sp+64)));
2883
2884     /* Send this thread the StackOverflow exception */
2885     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2886     return tso;
2887   }
2888
2889   /* Try to double the current stack size.  If that takes us over the
2890    * maximum stack size for this thread, then use the maximum instead.
2891    * Finally round up so the TSO ends up as a whole number of blocks.
2892    */
2893   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2894   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2895                                        TSO_STRUCT_SIZE)/sizeof(W_);
2896   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2897   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2898
2899   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2900
2901   dest = (StgTSO *)allocate(new_tso_size);
2902   TICK_ALLOC_TSO(new_stack_size,0);
2903
2904   /* copy the TSO block and the old stack into the new area */
2905   memcpy(dest,tso,TSO_STRUCT_SIZE);
2906   stack_words = tso->stack + tso->stack_size - tso->sp;
2907   new_sp = (P_)dest + new_tso_size - stack_words;
2908   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2909
2910   /* relocate the stack pointers... */
2911   dest->sp         = new_sp;
2912   dest->stack_size = new_stack_size;
2913         
2914   /* Mark the old TSO as relocated.  We have to check for relocated
2915    * TSOs in the garbage collector and any primops that deal with TSOs.
2916    *
2917    * It's important to set the sp value to just beyond the end
2918    * of the stack, so we don't attempt to scavenge any part of the
2919    * dead TSO's stack.
2920    */
2921   tso->what_next = ThreadRelocated;
2922   tso->link = dest;
2923   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2924   tso->why_blocked = NotBlocked;
2925
2926   IF_PAR_DEBUG(verbose,
2927                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2928                      tso->id, tso, tso->stack_size);
2929                /* If we're debugging, just print out the top of the stack */
2930                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2931                                                 tso->sp+64)));
2932   
2933   IF_DEBUG(sanity,checkTSO(tso));
2934 #if 0
2935   IF_DEBUG(scheduler,printTSO(dest));
2936 #endif
2937
2938   return dest;
2939 }
2940
2941 /* ---------------------------------------------------------------------------
2942    Wake up a queue that was blocked on some resource.
2943    ------------------------------------------------------------------------ */
2944
2945 #if defined(GRAN)
2946 STATIC_INLINE void
2947 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2948 {
2949 }
2950 #elif defined(PARALLEL_HASKELL)
2951 STATIC_INLINE void
2952 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2953 {
2954   /* write RESUME events to log file and
2955      update blocked and fetch time (depending on type of the orig closure) */
2956   if (RtsFlags.ParFlags.ParStats.Full) {
2957     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2958                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2959                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2960     if (emptyRunQueue())
2961       emitSchedule = rtsTrue;
2962
2963     switch (get_itbl(node)->type) {
2964         case FETCH_ME_BQ:
2965           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2966           break;
2967         case RBH:
2968         case FETCH_ME:
2969         case BLACKHOLE_BQ:
2970           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2971           break;
2972 #ifdef DIST
2973         case MVAR:
2974           break;
2975 #endif    
2976         default:
2977           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2978         }
2979       }
2980 }
2981 #endif
2982
2983 #if defined(GRAN)
2984 StgBlockingQueueElement *
2985 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2986 {
2987     StgTSO *tso;
2988     PEs node_loc, tso_loc;
2989
2990     node_loc = where_is(node); // should be lifted out of loop
2991     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2992     tso_loc = where_is((StgClosure *)tso);
2993     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2994       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2995       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2996       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2997       // insertThread(tso, node_loc);
2998       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2999                 ResumeThread,
3000                 tso, node, (rtsSpark*)NULL);
3001       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3002       // len_local++;
3003       // len++;
3004     } else { // TSO is remote (actually should be FMBQ)
3005       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3006                                   RtsFlags.GranFlags.Costs.gunblocktime +
3007                                   RtsFlags.GranFlags.Costs.latency;
3008       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3009                 UnblockThread,
3010                 tso, node, (rtsSpark*)NULL);
3011       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3012       // len++;
3013     }
3014     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3015     IF_GRAN_DEBUG(bq,
3016                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3017                           (node_loc==tso_loc ? "Local" : "Global"), 
3018                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3019     tso->block_info.closure = NULL;
3020     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3021                              tso->id, tso));
3022 }
3023 #elif defined(PARALLEL_HASKELL)
3024 StgBlockingQueueElement *
3025 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3026 {
3027     StgBlockingQueueElement *next;
3028
3029     switch (get_itbl(bqe)->type) {
3030     case TSO:
3031       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3032       /* if it's a TSO just push it onto the run_queue */
3033       next = bqe->link;
3034       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3035       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3036       threadRunnable();
3037       unblockCount(bqe, node);
3038       /* reset blocking status after dumping event */
3039       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3040       break;
3041
3042     case BLOCKED_FETCH:
3043       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3044       next = bqe->link;
3045       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3046       PendingFetches = (StgBlockedFetch *)bqe;
3047       break;
3048
3049 # if defined(DEBUG)
3050       /* can ignore this case in a non-debugging setup; 
3051          see comments on RBHSave closures above */
3052     case CONSTR:
3053       /* check that the closure is an RBHSave closure */
3054       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3055              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3056              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3057       break;
3058
3059     default:
3060       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3061            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3062            (StgClosure *)bqe);
3063 # endif
3064     }
3065   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3066   return next;
3067 }
3068 #endif
3069
3070 StgTSO *
3071 unblockOne(Capability *cap, StgTSO *tso)
3072 {
3073   StgTSO *next;
3074
3075   ASSERT(get_itbl(tso)->type == TSO);
3076   ASSERT(tso->why_blocked != NotBlocked);
3077   tso->why_blocked = NotBlocked;
3078   next = tso->link;
3079   tso->link = END_TSO_QUEUE;
3080
3081   // We might have just migrated this TSO to our Capability:
3082   if (tso->bound) {
3083       tso->bound->cap = cap;
3084   }
3085
3086   appendToRunQueue(cap,tso);
3087
3088   // we're holding a newly woken thread, make sure we context switch
3089   // quickly so we can migrate it if necessary.
3090   context_switch = 1;
3091   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3092   return next;
3093 }
3094
3095
3096 #if defined(GRAN)
3097 void 
3098 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3099 {
3100   StgBlockingQueueElement *bqe;
3101   PEs node_loc;
3102   nat len = 0; 
3103
3104   IF_GRAN_DEBUG(bq, 
3105                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3106                       node, CurrentProc, CurrentTime[CurrentProc], 
3107                       CurrentTSO->id, CurrentTSO));
3108
3109   node_loc = where_is(node);
3110
3111   ASSERT(q == END_BQ_QUEUE ||
3112          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3113          get_itbl(q)->type == CONSTR); // closure (type constructor)
3114   ASSERT(is_unique(node));
3115
3116   /* FAKE FETCH: magically copy the node to the tso's proc;
3117      no Fetch necessary because in reality the node should not have been 
3118      moved to the other PE in the first place
3119   */
3120   if (CurrentProc!=node_loc) {
3121     IF_GRAN_DEBUG(bq, 
3122                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3123                         node, node_loc, CurrentProc, CurrentTSO->id, 
3124                         // CurrentTSO, where_is(CurrentTSO),
3125                         node->header.gran.procs));
3126     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3127     IF_GRAN_DEBUG(bq, 
3128                   debugBelch("## new bitmask of node %p is %#x\n",
3129                         node, node->header.gran.procs));
3130     if (RtsFlags.GranFlags.GranSimStats.Global) {
3131       globalGranStats.tot_fake_fetches++;
3132     }
3133   }
3134
3135   bqe = q;
3136   // ToDo: check: ASSERT(CurrentProc==node_loc);
3137   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3138     //next = bqe->link;
3139     /* 
3140        bqe points to the current element in the queue
3141        next points to the next element in the queue
3142     */
3143     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3144     //tso_loc = where_is(tso);
3145     len++;
3146     bqe = unblockOne(bqe, node);
3147   }
3148
3149   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3150      the closure to make room for the anchor of the BQ */
3151   if (bqe!=END_BQ_QUEUE) {
3152     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3153     /*
3154     ASSERT((info_ptr==&RBH_Save_0_info) ||
3155            (info_ptr==&RBH_Save_1_info) ||
3156            (info_ptr==&RBH_Save_2_info));
3157     */
3158     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3159     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3160     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3161
3162     IF_GRAN_DEBUG(bq,
3163                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3164                         node, info_type(node)));
3165   }
3166
3167   /* statistics gathering */
3168   if (RtsFlags.GranFlags.GranSimStats.Global) {
3169     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3170     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3171     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3172     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3173   }
3174   IF_GRAN_DEBUG(bq,
3175                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3176                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3177 }
3178 #elif defined(PARALLEL_HASKELL)
3179 void 
3180 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3181 {
3182   StgBlockingQueueElement *bqe;
3183
3184   IF_PAR_DEBUG(verbose, 
3185                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3186                      node, mytid));
3187 #ifdef DIST  
3188   //RFP
3189   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3190     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3191     return;
3192   }
3193 #endif
3194   
3195   ASSERT(q == END_BQ_QUEUE ||
3196          get_itbl(q)->type == TSO ||           
3197          get_itbl(q)->type == BLOCKED_FETCH || 
3198          get_itbl(q)->type == CONSTR); 
3199
3200   bqe = q;
3201   while (get_itbl(bqe)->type==TSO || 
3202          get_itbl(bqe)->type==BLOCKED_FETCH) {
3203     bqe = unblockOne(bqe, node);
3204   }
3205 }
3206
3207 #else   /* !GRAN && !PARALLEL_HASKELL */
3208
3209 void
3210 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3211 {
3212     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3213                              // Exception.cmm
3214     while (tso != END_TSO_QUEUE) {
3215         tso = unblockOne(cap,tso);
3216     }
3217 }
3218 #endif
3219
3220 /* ---------------------------------------------------------------------------
3221    Interrupt execution
3222    - usually called inside a signal handler so it mustn't do anything fancy.   
3223    ------------------------------------------------------------------------ */
3224
3225 void
3226 interruptStgRts(void)
3227 {
3228     interrupted    = 1;
3229     context_switch = 1;
3230 #if defined(THREADED_RTS)
3231     prodAllCapabilities();
3232 #endif
3233 }
3234
3235 /* -----------------------------------------------------------------------------
3236    Unblock a thread
3237
3238    This is for use when we raise an exception in another thread, which
3239    may be blocked.
3240    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3241    -------------------------------------------------------------------------- */
3242
3243 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3244 /*
3245   NB: only the type of the blocking queue is different in GranSim and GUM
3246       the operations on the queue-elements are the same
3247       long live polymorphism!
3248
3249   Locks: sched_mutex is held upon entry and exit.
3250
3251 */
3252 static void
3253 unblockThread(Capability *cap, StgTSO *tso)
3254 {
3255   StgBlockingQueueElement *t, **last;
3256
3257   switch (tso->why_blocked) {
3258
3259   case NotBlocked:
3260     return;  /* not blocked */
3261
3262   case BlockedOnSTM:
3263     // Be careful: nothing to do here!  We tell the scheduler that the thread
3264     // is runnable and we leave it to the stack-walking code to abort the 
3265     // transaction while unwinding the stack.  We should perhaps have a debugging
3266     // test to make sure that this really happens and that the 'zombie' transaction
3267     // does not get committed.
3268     goto done;
3269
3270   case BlockedOnMVar:
3271     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3272     {
3273       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3274       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3275
3276       last = (StgBlockingQueueElement **)&mvar->head;
3277       for (t = (StgBlockingQueueElement *)mvar->head; 
3278            t != END_BQ_QUEUE; 
3279            last = &t->link, last_tso = t, t = t->link) {
3280         if (t == (StgBlockingQueueElement *)tso) {
3281           *last = (StgBlockingQueueElement *)tso->link;
3282           if (mvar->tail == tso) {
3283             mvar->tail = (StgTSO *)last_tso;
3284           }
3285           goto done;
3286         }
3287       }
3288       barf("unblockThread (MVAR): TSO not found");
3289     }
3290
3291   case BlockedOnBlackHole:
3292     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3293     {
3294       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3295
3296       last = &bq->blocking_queue;
3297       for (t = bq->blocking_queue; 
3298            t != END_BQ_QUEUE; 
3299            last = &t->link, t = t->link) {
3300         if (t == (StgBlockingQueueElement *)tso) {
3301           *last = (StgBlockingQueueElement *)tso->link;
3302           goto done;
3303         }
3304       }
3305       barf("unblockThread (BLACKHOLE): TSO not found");
3306     }
3307
3308   case BlockedOnException:
3309     {
3310       StgTSO *target  = tso->block_info.tso;
3311
3312       ASSERT(get_itbl(target)->type == TSO);
3313
3314       if (target->what_next == ThreadRelocated) {
3315           target = target->link;
3316           ASSERT(get_itbl(target)->type == TSO);
3317       }
3318
3319       ASSERT(target->blocked_exceptions != NULL);
3320
3321       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3322       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3323            t != END_BQ_QUEUE; 
3324            last = &t->link, t = t->link) {
3325         ASSERT(get_itbl(t)->type == TSO);
3326         if (t == (StgBlockingQueueElement *)tso) {
3327           *last = (StgBlockingQueueElement *)tso->link;
3328           goto done;
3329         }
3330       }
3331       barf("unblockThread (Exception): TSO not found");
3332     }
3333
3334   case BlockedOnRead:
3335   case BlockedOnWrite:
3336 #if defined(mingw32_HOST_OS)
3337   case BlockedOnDoProc:
3338 #endif
3339     {
3340       /* take TSO off blocked_queue */
3341       StgBlockingQueueElement *prev = NULL;
3342       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3343            prev = t, t = t->link) {
3344         if (t == (StgBlockingQueueElement *)tso) {
3345           if (prev == NULL) {
3346             blocked_queue_hd = (StgTSO *)t->link;
3347             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3348               blocked_queue_tl = END_TSO_QUEUE;
3349             }
3350           } else {
3351             prev->link = t->link;
3352             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3353               blocked_queue_tl = (StgTSO *)prev;
3354             }
3355           }
3356 #if defined(mingw32_HOST_OS)
3357           /* (Cooperatively) signal that the worker thread should abort
3358            * the request.
3359            */
3360           abandonWorkRequest(tso->block_info.async_result->reqID);
3361 #endif
3362           goto done;
3363         }
3364       }
3365       barf("unblockThread (I/O): TSO not found");
3366     }
3367
3368   case BlockedOnDelay:
3369     {
3370       /* take TSO off sleeping_queue */
3371       StgBlockingQueueElement *prev = NULL;
3372       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3373            prev = t, t = t->link) {
3374         if (t == (StgBlockingQueueElement *)tso) {
3375           if (prev == NULL) {
3376             sleeping_queue = (StgTSO *)t->link;
3377           } else {
3378             prev->link = t->link;
3379           }
3380           goto done;
3381         }
3382       }
3383       barf("unblockThread (delay): TSO not found");
3384     }
3385
3386   default:
3387     barf("unblockThread");
3388   }
3389
3390  done:
3391   tso->link = END_TSO_QUEUE;
3392   tso->why_blocked = NotBlocked;
3393   tso->block_info.closure = NULL;
3394   pushOnRunQueue(cap,tso);
3395 }
3396 #else
3397 static void
3398 unblockThread(Capability *cap, StgTSO *tso)
3399 {
3400   StgTSO *t, **last;
3401   
3402   /* To avoid locking unnecessarily. */
3403   if (tso->why_blocked == NotBlocked) {
3404     return;
3405   }
3406
3407   switch (tso->why_blocked) {
3408
3409   case BlockedOnSTM:
3410     // Be careful: nothing to do here!  We tell the scheduler that the thread
3411     // is runnable and we leave it to the stack-walking code to abort the 
3412     // transaction while unwinding the stack.  We should perhaps have a debugging
3413     // test to make sure that this really happens and that the 'zombie' transaction
3414     // does not get committed.
3415     goto done;
3416
3417   case BlockedOnMVar:
3418     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3419     {
3420       StgTSO *last_tso = END_TSO_QUEUE;
3421       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3422
3423       last = &mvar->head;
3424       for (t = mvar->head; t != END_TSO_QUEUE; 
3425            last = &t->link, last_tso = t, t = t->link) {
3426         if (t == tso) {
3427           *last = tso->link;
3428           if (mvar->tail == tso) {
3429             mvar->tail = last_tso;
3430           }
3431           goto done;
3432         }
3433       }
3434       barf("unblockThread (MVAR): TSO not found");
3435     }
3436
3437   case BlockedOnBlackHole:
3438     {
3439       last = &blackhole_queue;
3440       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3441            last = &t->link, t = t->link) {
3442         if (t == tso) {
3443           *last = tso->link;
3444           goto done;
3445         }
3446       }
3447       barf("unblockThread (BLACKHOLE): TSO not found");
3448     }
3449
3450   case BlockedOnException:
3451     {
3452       StgTSO *target  = tso->block_info.tso;
3453
3454       ASSERT(get_itbl(target)->type == TSO);
3455
3456       while (target->what_next == ThreadRelocated) {
3457           target = target->link;
3458           ASSERT(get_itbl(target)->type == TSO);
3459       }
3460       
3461       ASSERT(target->blocked_exceptions != NULL);
3462
3463       last = &target->blocked_exceptions;
3464       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3465            last = &t->link, t = t->link) {
3466         ASSERT(get_itbl(t)->type == TSO);
3467         if (t == tso) {
3468           *last = tso->link;
3469           goto done;
3470         }
3471       }
3472       barf("unblockThread (Exception): TSO not found");
3473     }
3474
3475 #if !defined(THREADED_RTS)
3476   case BlockedOnRead:
3477   case BlockedOnWrite:
3478 #if defined(mingw32_HOST_OS)
3479   case BlockedOnDoProc:
3480 #endif
3481     {
3482       StgTSO *prev = NULL;
3483       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3484            prev = t, t = t->link) {
3485         if (t == tso) {
3486           if (prev == NULL) {
3487             blocked_queue_hd = t->link;
3488             if (blocked_queue_tl == t) {
3489               blocked_queue_tl = END_TSO_QUEUE;
3490             }
3491           } else {
3492             prev->link = t->link;
3493             if (blocked_queue_tl == t) {
3494               blocked_queue_tl = prev;
3495             }
3496           }
3497 #if defined(mingw32_HOST_OS)
3498           /* (Cooperatively) signal that the worker thread should abort
3499            * the request.
3500            */
3501           abandonWorkRequest(tso->block_info.async_result->reqID);
3502 #endif
3503           goto done;
3504         }
3505       }
3506       barf("unblockThread (I/O): TSO not found");
3507     }
3508
3509   case BlockedOnDelay:
3510     {
3511       StgTSO *prev = NULL;
3512       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3513            prev = t, t = t->link) {
3514         if (t == tso) {
3515           if (prev == NULL) {
3516             sleeping_queue = t->link;
3517           } else {
3518             prev->link = t->link;
3519           }
3520           goto done;
3521         }
3522       }
3523       barf("unblockThread (delay): TSO not found");
3524     }
3525 #endif
3526
3527   default:
3528     barf("unblockThread");
3529   }
3530
3531  done:
3532   tso->link = END_TSO_QUEUE;
3533   tso->why_blocked = NotBlocked;
3534   tso->block_info.closure = NULL;
3535   appendToRunQueue(cap,tso);
3536 }
3537 #endif
3538
3539 /* -----------------------------------------------------------------------------
3540  * checkBlackHoles()
3541  *
3542  * Check the blackhole_queue for threads that can be woken up.  We do
3543  * this periodically: before every GC, and whenever the run queue is
3544  * empty.
3545  *
3546  * An elegant solution might be to just wake up all the blocked
3547  * threads with awakenBlockedQueue occasionally: they'll go back to
3548  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3549  * doesn't give us a way to tell whether we've actually managed to
3550  * wake up any threads, so we would be busy-waiting.
3551  *
3552  * -------------------------------------------------------------------------- */
3553
3554 static rtsBool
3555 checkBlackHoles (Capability *cap)
3556 {
3557     StgTSO **prev, *t;
3558     rtsBool any_woke_up = rtsFalse;
3559     StgHalfWord type;
3560
3561     // blackhole_queue is global:
3562     ASSERT_LOCK_HELD(&sched_mutex);
3563
3564     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3565
3566     // ASSUMES: sched_mutex
3567     prev = &blackhole_queue;
3568     t = blackhole_queue;
3569     while (t != END_TSO_QUEUE) {
3570         ASSERT(t->why_blocked == BlockedOnBlackHole);
3571         type = get_itbl(t->block_info.closure)->type;
3572         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3573             IF_DEBUG(sanity,checkTSO(t));
3574             t = unblockOne(cap, t);
3575             // urk, the threads migrate to the current capability
3576             // here, but we'd like to keep them on the original one.
3577             *prev = t;
3578             any_woke_up = rtsTrue;
3579         } else {
3580             prev = &t->link;
3581             t = t->link;
3582         }
3583     }
3584
3585     return any_woke_up;
3586 }
3587
3588 /* -----------------------------------------------------------------------------
3589  * raiseAsync()
3590  *
3591  * The following function implements the magic for raising an
3592  * asynchronous exception in an existing thread.
3593  *
3594  * We first remove the thread from any queue on which it might be
3595  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3596  *
3597  * We strip the stack down to the innermost CATCH_FRAME, building
3598  * thunks in the heap for all the active computations, so they can 
3599  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3600  * an application of the handler to the exception, and push it on
3601  * the top of the stack.
3602  * 
3603  * How exactly do we save all the active computations?  We create an
3604  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3605  * AP_STACKs pushes everything from the corresponding update frame
3606  * upwards onto the stack.  (Actually, it pushes everything up to the
3607  * next update frame plus a pointer to the next AP_STACK object.
3608  * Entering the next AP_STACK object pushes more onto the stack until we
3609  * reach the last AP_STACK object - at which point the stack should look
3610  * exactly as it did when we killed the TSO and we can continue
3611  * execution by entering the closure on top of the stack.
3612  *
3613  * We can also kill a thread entirely - this happens if either (a) the 
3614  * exception passed to raiseAsync is NULL, or (b) there's no
3615  * CATCH_FRAME on the stack.  In either case, we strip the entire
3616  * stack and replace the thread with a zombie.
3617  *
3618  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3619  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3620  * the TSO is currently blocked on or on the run queue of.
3621  *
3622  * -------------------------------------------------------------------------- */
3623  
3624 void
3625 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3626 {
3627     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3628 }
3629
3630 void
3631 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3632 {
3633     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3634 }
3635
3636 static void
3637 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3638             rtsBool stop_at_atomically, StgPtr stop_here)
3639 {
3640     StgRetInfoTable *info;
3641     StgPtr sp, frame;
3642     nat i;
3643   
3644     // Thread already dead?
3645     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3646         return;
3647     }
3648
3649     IF_DEBUG(scheduler, 
3650              sched_belch("raising exception in thread %ld.", (long)tso->id));
3651     
3652     // Remove it from any blocking queues
3653     unblockThread(cap,tso);
3654
3655     sp = tso->sp;
3656     
3657     // The stack freezing code assumes there's a closure pointer on
3658     // the top of the stack, so we have to arrange that this is the case...
3659     //
3660     if (sp[0] == (W_)&stg_enter_info) {
3661         sp++;
3662     } else {
3663         sp--;
3664         sp[0] = (W_)&stg_dummy_ret_closure;
3665     }
3666
3667     frame = sp + 1;
3668     while (stop_here == NULL || frame < stop_here) {
3669
3670         // 1. Let the top of the stack be the "current closure"
3671         //
3672         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3673         // CATCH_FRAME.
3674         //
3675         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3676         // current closure applied to the chunk of stack up to (but not
3677         // including) the update frame.  This closure becomes the "current
3678         // closure".  Go back to step 2.
3679         //
3680         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3681         // top of the stack applied to the exception.
3682         // 
3683         // 5. If it's a STOP_FRAME, then kill the thread.
3684         // 
3685         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3686         // transaction
3687        
3688         info = get_ret_itbl((StgClosure *)frame);
3689
3690         switch (info->i.type) {
3691
3692         case UPDATE_FRAME:
3693         {
3694             StgAP_STACK * ap;
3695             nat words;
3696             
3697             // First build an AP_STACK consisting of the stack chunk above the
3698             // current update frame, with the top word on the stack as the
3699             // fun field.
3700             //
3701             words = frame - sp - 1;
3702             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3703             
3704             ap->size = words;
3705             ap->fun  = (StgClosure *)sp[0];
3706             sp++;
3707             for(i=0; i < (nat)words; ++i) {
3708                 ap->payload[i] = (StgClosure *)*sp++;
3709             }
3710             
3711             SET_HDR(ap,&stg_AP_STACK_info,
3712                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3713             TICK_ALLOC_UP_THK(words+1,0);
3714             
3715             IF_DEBUG(scheduler,
3716                      debugBelch("sched: Updating ");
3717                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3718                      debugBelch(" with ");
3719                      printObj((StgClosure *)ap);
3720                 );
3721
3722             // Replace the updatee with an indirection
3723             //
3724             // Warning: if we're in a loop, more than one update frame on
3725             // the stack may point to the same object.  Be careful not to
3726             // overwrite an IND_OLDGEN in this case, because we'll screw
3727             // up the mutable lists.  To be on the safe side, don't
3728             // overwrite any kind of indirection at all.  See also
3729             // threadSqueezeStack in GC.c, where we have to make a similar
3730             // check.
3731             //
3732             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3733                 // revert the black hole
3734                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3735                                (StgClosure *)ap);
3736             }
3737             sp += sizeofW(StgUpdateFrame) - 1;
3738             sp[0] = (W_)ap; // push onto stack
3739             frame = sp + 1;
3740             continue; //no need to bump frame
3741         }
3742
3743         case STOP_FRAME:
3744             // We've stripped the entire stack, the thread is now dead.
3745             tso->what_next = ThreadKilled;
3746             tso->sp = frame + sizeofW(StgStopFrame);
3747             return;
3748
3749         case CATCH_FRAME:
3750             // If we find a CATCH_FRAME, and we've got an exception to raise,
3751             // then build the THUNK raise(exception), and leave it on
3752             // top of the CATCH_FRAME ready to enter.
3753             //
3754         {
3755 #ifdef PROFILING
3756             StgCatchFrame *cf = (StgCatchFrame *)frame;
3757 #endif
3758             StgThunk *raise;
3759             
3760             if (exception == NULL) break;
3761
3762             // we've got an exception to raise, so let's pass it to the
3763             // handler in this frame.
3764             //
3765             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3766             TICK_ALLOC_SE_THK(1,0);
3767             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3768             raise->payload[0] = exception;
3769             
3770             // throw away the stack from Sp up to the CATCH_FRAME.
3771             //
3772             sp = frame - 1;
3773             
3774             /* Ensure that async excpetions are blocked now, so we don't get
3775              * a surprise exception before we get around to executing the
3776              * handler.
3777              */
3778             if (tso->blocked_exceptions == NULL) {
3779                 tso->blocked_exceptions = END_TSO_QUEUE;
3780             }
3781
3782             /* Put the newly-built THUNK on top of the stack, ready to execute
3783              * when the thread restarts.
3784              */
3785             sp[0] = (W_)raise;
3786             sp[-1] = (W_)&stg_enter_info;
3787             tso->sp = sp-1;
3788             tso->what_next = ThreadRunGHC;
3789             IF_DEBUG(sanity, checkTSO(tso));
3790             return;
3791         }
3792             
3793         case ATOMICALLY_FRAME:
3794             if (stop_at_atomically) {
3795                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3796                 stmCondemnTransaction(cap, tso -> trec);
3797 #ifdef REG_R1
3798                 tso->sp = frame;
3799 #else
3800                 // R1 is not a register: the return convention for IO in
3801                 // this case puts the return value on the stack, so we
3802                 // need to set up the stack to return to the atomically
3803                 // frame properly...
3804                 tso->sp = frame - 2;
3805                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3806                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3807 #endif
3808                 tso->what_next = ThreadRunGHC;
3809                 return;
3810             }
3811             // Not stop_at_atomically... fall through and abort the
3812             // transaction.
3813             
3814         case CATCH_RETRY_FRAME:
3815             // IF we find an ATOMICALLY_FRAME then we abort the
3816             // current transaction and propagate the exception.  In
3817             // this case (unlike ordinary exceptions) we do not care
3818             // whether the transaction is valid or not because its
3819             // possible validity cannot have caused the exception
3820             // and will not be visible after the abort.
3821             IF_DEBUG(stm,
3822                      debugBelch("Found atomically block delivering async exception\n"));
3823             StgTRecHeader *trec = tso -> trec;
3824             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3825             stmAbortTransaction(cap, trec);
3826             tso -> trec = outer;
3827             break;
3828             
3829         default:
3830             break;
3831         }
3832
3833         // move on to the next stack frame
3834         frame += stack_frame_sizeW((StgClosure *)frame);
3835     }
3836
3837     // if we got here, then we stopped at stop_here
3838     ASSERT(stop_here != NULL);
3839 }
3840
3841 /* -----------------------------------------------------------------------------
3842    Deleting threads
3843
3844    This is used for interruption (^C) and forking, and corresponds to
3845    raising an exception but without letting the thread catch the
3846    exception.
3847    -------------------------------------------------------------------------- */
3848
3849 static void 
3850 deleteThread (Capability *cap, StgTSO *tso)
3851 {
3852   if (tso->why_blocked != BlockedOnCCall &&
3853       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3854       raiseAsync(cap,tso,NULL);
3855   }
3856 }
3857
3858 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3859 static void 
3860 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3861 { // for forkProcess only:
3862   // delete thread without giving it a chance to catch the KillThread exception
3863
3864   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3865       return;
3866   }
3867
3868   if (tso->why_blocked != BlockedOnCCall &&
3869       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3870       unblockThread(cap,tso);
3871   }
3872
3873   tso->what_next = ThreadKilled;
3874 }
3875 #endif
3876
3877 /* -----------------------------------------------------------------------------
3878    raiseExceptionHelper
3879    
3880    This function is called by the raise# primitve, just so that we can
3881    move some of the tricky bits of raising an exception from C-- into
3882    C.  Who knows, it might be a useful re-useable thing here too.
3883    -------------------------------------------------------------------------- */
3884
3885 StgWord
3886 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3887 {
3888     Capability *cap = regTableToCapability(reg);
3889     StgThunk *raise_closure = NULL;
3890     StgPtr p, next;
3891     StgRetInfoTable *info;
3892     //
3893     // This closure represents the expression 'raise# E' where E
3894     // is the exception raise.  It is used to overwrite all the
3895     // thunks which are currently under evaluataion.
3896     //
3897
3898     //    
3899     // LDV profiling: stg_raise_info has THUNK as its closure
3900     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3901     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3902     // 1 does not cause any problem unless profiling is performed.
3903     // However, when LDV profiling goes on, we need to linearly scan
3904     // small object pool, where raise_closure is stored, so we should
3905     // use MIN_UPD_SIZE.
3906     //
3907     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3908     //                                 sizeofW(StgClosure)+1);
3909     //
3910
3911     //
3912     // Walk up the stack, looking for the catch frame.  On the way,
3913     // we update any closures pointed to from update frames with the
3914     // raise closure that we just built.
3915     //
3916     p = tso->sp;
3917     while(1) {
3918         info = get_ret_itbl((StgClosure *)p);
3919         next = p + stack_frame_sizeW((StgClosure *)p);
3920         switch (info->i.type) {
3921             
3922         case UPDATE_FRAME:
3923             // Only create raise_closure if we need to.
3924             if (raise_closure == NULL) {
3925                 raise_closure = 
3926                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3927                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3928                 raise_closure->payload[0] = exception;
3929             }
3930             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3931             p = next;
3932             continue;
3933
3934         case ATOMICALLY_FRAME:
3935             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3936             tso->sp = p;
3937             return ATOMICALLY_FRAME;
3938             
3939         case CATCH_FRAME:
3940             tso->sp = p;
3941             return CATCH_FRAME;
3942
3943         case CATCH_STM_FRAME:
3944             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3945             tso->sp = p;
3946             return CATCH_STM_FRAME;
3947             
3948         case STOP_FRAME:
3949             tso->sp = p;
3950             return STOP_FRAME;
3951
3952         case CATCH_RETRY_FRAME:
3953         default:
3954             p = next; 
3955             continue;
3956         }
3957     }
3958 }
3959
3960
3961 /* -----------------------------------------------------------------------------
3962    findRetryFrameHelper
3963
3964    This function is called by the retry# primitive.  It traverses the stack
3965    leaving tso->sp referring to the frame which should handle the retry.  
3966
3967    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3968    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3969
3970    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3971    despite the similar implementation.
3972
3973    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3974    not be created within memory transactions.
3975    -------------------------------------------------------------------------- */
3976
3977 StgWord
3978 findRetryFrameHelper (StgTSO *tso)
3979 {
3980   StgPtr           p, next;
3981   StgRetInfoTable *info;
3982
3983   p = tso -> sp;
3984   while (1) {
3985     info = get_ret_itbl((StgClosure *)p);
3986     next = p + stack_frame_sizeW((StgClosure *)p);
3987     switch (info->i.type) {
3988       
3989     case ATOMICALLY_FRAME:
3990       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3991       tso->sp = p;
3992       return ATOMICALLY_FRAME;
3993       
3994     case CATCH_RETRY_FRAME:
3995       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3996       tso->sp = p;
3997       return CATCH_RETRY_FRAME;
3998       
3999     case CATCH_STM_FRAME:
4000     default:
4001       ASSERT(info->i.type != CATCH_FRAME);
4002       ASSERT(info->i.type != STOP_FRAME);
4003       p = next; 
4004       continue;
4005     }
4006   }
4007 }
4008
4009 /* -----------------------------------------------------------------------------
4010    resurrectThreads is called after garbage collection on the list of
4011    threads found to be garbage.  Each of these threads will be woken
4012    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4013    on an MVar, or NonTermination if the thread was blocked on a Black
4014    Hole.
4015
4016    Locks: assumes we hold *all* the capabilities.
4017    -------------------------------------------------------------------------- */
4018
4019 void
4020 resurrectThreads (StgTSO *threads)
4021 {
4022     StgTSO *tso, *next;
4023     Capability *cap;
4024
4025     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4026         next = tso->global_link;
4027         tso->global_link = all_threads;
4028         all_threads = tso;
4029         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4030         
4031         // Wake up the thread on the Capability it was last on for a
4032         // bound thread, or last_free_capability otherwise.
4033         if (tso->bound) {
4034             cap = tso->bound->cap;
4035         } else {
4036             cap = last_free_capability;
4037         }
4038         
4039         switch (tso->why_blocked) {
4040         case BlockedOnMVar:
4041         case BlockedOnException:
4042             /* Called by GC - sched_mutex lock is currently held. */
4043             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4044             break;
4045         case BlockedOnBlackHole:
4046             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4047             break;
4048         case BlockedOnSTM:
4049             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4050             break;
4051         case NotBlocked:
4052             /* This might happen if the thread was blocked on a black hole
4053              * belonging to a thread that we've just woken up (raiseAsync
4054              * can wake up threads, remember...).
4055              */
4056             continue;
4057         default:
4058             barf("resurrectThreads: thread blocked in a strange way");
4059         }
4060     }
4061 }
4062
4063 /* ----------------------------------------------------------------------------
4064  * Debugging: why is a thread blocked
4065  * [Also provides useful information when debugging threaded programs
4066  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4067    ------------------------------------------------------------------------- */
4068
4069 #if DEBUG
4070 static void
4071 printThreadBlockage(StgTSO *tso)
4072 {
4073   switch (tso->why_blocked) {
4074   case BlockedOnRead:
4075     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4076     break;
4077   case BlockedOnWrite:
4078     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4079     break;
4080 #if defined(mingw32_HOST_OS)
4081     case BlockedOnDoProc:
4082     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4083     break;
4084 #endif
4085   case BlockedOnDelay:
4086     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4087     break;
4088   case BlockedOnMVar:
4089     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4090     break;
4091   case BlockedOnException:
4092     debugBelch("is blocked on delivering an exception to thread %d",
4093             tso->block_info.tso->id);
4094     break;
4095   case BlockedOnBlackHole:
4096     debugBelch("is blocked on a black hole");
4097     break;
4098   case NotBlocked:
4099     debugBelch("is not blocked");
4100     break;
4101 #if defined(PARALLEL_HASKELL)
4102   case BlockedOnGA:
4103     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4104             tso->block_info.closure, info_type(tso->block_info.closure));
4105     break;
4106   case BlockedOnGA_NoSend:
4107     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4108             tso->block_info.closure, info_type(tso->block_info.closure));
4109     break;
4110 #endif
4111   case BlockedOnCCall:
4112     debugBelch("is blocked on an external call");
4113     break;
4114   case BlockedOnCCall_NoUnblockExc:
4115     debugBelch("is blocked on an external call (exceptions were already blocked)");
4116     break;
4117   case BlockedOnSTM:
4118     debugBelch("is blocked on an STM operation");
4119     break;
4120   default:
4121     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4122          tso->why_blocked, tso->id, tso);
4123   }
4124 }
4125
4126 void
4127 printThreadStatus(StgTSO *t)
4128 {
4129     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4130     {
4131       void *label = lookupThreadLabel(t->id);
4132       if (label) debugBelch("[\"%s\"] ",(char *)label);
4133     }
4134     if (t->what_next == ThreadRelocated) {
4135         debugBelch("has been relocated...\n");
4136     } else {
4137         switch (t->what_next) {
4138         case ThreadKilled:
4139             debugBelch("has been killed");
4140             break;
4141         case ThreadComplete:
4142             debugBelch("has completed");
4143             break;
4144         default:
4145             printThreadBlockage(t);
4146         }
4147         debugBelch("\n");
4148     }
4149 }
4150
4151 void
4152 printAllThreads(void)
4153 {
4154   StgTSO *t, *next;
4155   nat i;
4156   Capability *cap;
4157
4158 # if defined(GRAN)
4159   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4160   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4161                        time_string, rtsFalse/*no commas!*/);
4162
4163   debugBelch("all threads at [%s]:\n", time_string);
4164 # elif defined(PARALLEL_HASKELL)
4165   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4166   ullong_format_string(CURRENT_TIME,
4167                        time_string, rtsFalse/*no commas!*/);
4168
4169   debugBelch("all threads at [%s]:\n", time_string);
4170 # else
4171   debugBelch("all threads:\n");
4172 # endif
4173
4174   for (i = 0; i < n_capabilities; i++) {
4175       cap = &capabilities[i];
4176       debugBelch("threads on capability %d:\n", cap->no);
4177       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4178           printThreadStatus(t);
4179       }
4180   }
4181
4182   debugBelch("other threads:\n");
4183   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4184       if (t->why_blocked != NotBlocked) {
4185           printThreadStatus(t);
4186       }
4187       if (t->what_next == ThreadRelocated) {
4188           next = t->link;
4189       } else {
4190           next = t->global_link;
4191       }
4192   }
4193 }
4194
4195 // useful from gdb
4196 void 
4197 printThreadQueue(StgTSO *t)
4198 {
4199     nat i = 0;
4200     for (; t != END_TSO_QUEUE; t = t->link) {
4201         printThreadStatus(t);
4202         i++;
4203     }
4204     debugBelch("%d threads on queue\n", i);
4205 }
4206
4207 /* 
4208    Print a whole blocking queue attached to node (debugging only).
4209 */
4210 # if defined(PARALLEL_HASKELL)
4211 void 
4212 print_bq (StgClosure *node)
4213 {
4214   StgBlockingQueueElement *bqe;
4215   StgTSO *tso;
4216   rtsBool end;
4217
4218   debugBelch("## BQ of closure %p (%s): ",
4219           node, info_type(node));
4220
4221   /* should cover all closures that may have a blocking queue */
4222   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4223          get_itbl(node)->type == FETCH_ME_BQ ||
4224          get_itbl(node)->type == RBH ||
4225          get_itbl(node)->type == MVAR);
4226     
4227   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4228
4229   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4230 }
4231
4232 /* 
4233    Print a whole blocking queue starting with the element bqe.
4234 */
4235 void 
4236 print_bqe (StgBlockingQueueElement *bqe)
4237 {
4238   rtsBool end;
4239
4240   /* 
4241      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4242   */
4243   for (end = (bqe==END_BQ_QUEUE);
4244        !end; // iterate until bqe points to a CONSTR
4245        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4246        bqe = end ? END_BQ_QUEUE : bqe->link) {
4247     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4248     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4249     /* types of closures that may appear in a blocking queue */
4250     ASSERT(get_itbl(bqe)->type == TSO ||           
4251            get_itbl(bqe)->type == BLOCKED_FETCH || 
4252            get_itbl(bqe)->type == CONSTR); 
4253     /* only BQs of an RBH end with an RBH_Save closure */
4254     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4255
4256     switch (get_itbl(bqe)->type) {
4257     case TSO:
4258       debugBelch(" TSO %u (%x),",
4259               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4260       break;
4261     case BLOCKED_FETCH:
4262       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4263               ((StgBlockedFetch *)bqe)->node, 
4264               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4265               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4266               ((StgBlockedFetch *)bqe)->ga.weight);
4267       break;
4268     case CONSTR:
4269       debugBelch(" %s (IP %p),",
4270               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4271                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4272                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4273                "RBH_Save_?"), get_itbl(bqe));
4274       break;
4275     default:
4276       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4277            info_type((StgClosure *)bqe)); // , node, info_type(node));
4278       break;
4279     }
4280   } /* for */
4281   debugBelch("\n");
4282 }
4283 # elif defined(GRAN)
4284 void 
4285 print_bq (StgClosure *node)
4286 {
4287   StgBlockingQueueElement *bqe;
4288   PEs node_loc, tso_loc;
4289   rtsBool end;
4290
4291   /* should cover all closures that may have a blocking queue */
4292   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4293          get_itbl(node)->type == FETCH_ME_BQ ||
4294          get_itbl(node)->type == RBH);
4295     
4296   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4297   node_loc = where_is(node);
4298
4299   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4300           node, info_type(node), node_loc);
4301
4302   /* 
4303      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4304   */
4305   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4306        !end; // iterate until bqe points to a CONSTR
4307        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4308     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4309     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4310     /* types of closures that may appear in a blocking queue */
4311     ASSERT(get_itbl(bqe)->type == TSO ||           
4312            get_itbl(bqe)->type == CONSTR); 
4313     /* only BQs of an RBH end with an RBH_Save closure */
4314     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4315
4316     tso_loc = where_is((StgClosure *)bqe);
4317     switch (get_itbl(bqe)->type) {
4318     case TSO:
4319       debugBelch(" TSO %d (%p) on [PE %d],",
4320               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4321       break;
4322     case CONSTR:
4323       debugBelch(" %s (IP %p),",
4324               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4325                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4326                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4327                "RBH_Save_?"), get_itbl(bqe));
4328       break;
4329     default:
4330       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4331            info_type((StgClosure *)bqe), node, info_type(node));
4332       break;
4333     }
4334   } /* for */
4335   debugBelch("\n");
4336 }
4337 # endif
4338
4339 #if defined(PARALLEL_HASKELL)
4340 static nat
4341 run_queue_len(void)
4342 {
4343     nat i;
4344     StgTSO *tso;
4345     
4346     for (i=0, tso=run_queue_hd; 
4347          tso != END_TSO_QUEUE;
4348          i++, tso=tso->link) {
4349         /* nothing */
4350     }
4351         
4352     return i;
4353 }
4354 #endif
4355
4356 void
4357 sched_belch(char *s, ...)
4358 {
4359     va_list ap;
4360     va_start(ap,s);
4361 #ifdef THREADED_RTS
4362     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4363 #elif defined(PARALLEL_HASKELL)
4364     debugBelch("== ");
4365 #else
4366     debugBelch("sched: ");
4367 #endif
4368     vdebugBelch(s, ap);
4369     debugBelch("\n");
4370     va_end(ap);
4371 }
4372
4373 #endif /* DEBUG */