[project @ 2005-11-25 14:03:00 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically, StgPtr stop_here);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418 #if defined(SMP)
419         discardSparksCap(cap);
420 #endif
421         if (shutting_down_scheduler) {
422             IF_DEBUG(scheduler, sched_belch("shutting down"));
423             // If we are a worker, just exit.  If we're a bound thread
424             // then we will exit below when we've removed our TSO from
425             // the run queue.
426             if (task->tso == NULL && emptyRunQueue(cap)) {
427                 return cap;
428             }
429         } else {
430             IF_DEBUG(scheduler, sched_belch("interrupted"));
431         }
432     }
433
434 #if defined(SMP)
435     // If the run queue is empty, take a spark and turn it into a thread.
436     {
437         if (emptyRunQueue(cap)) {
438             StgClosure *spark;
439             spark = findSpark(cap);
440             if (spark != NULL) {
441                 IF_DEBUG(scheduler,
442                          sched_belch("turning spark of closure %p into a thread",
443                                      (StgClosure *)spark));
444                 createSparkThread(cap,spark);     
445             }
446         }
447     }
448 #endif // SMP
449
450     scheduleStartSignalHandlers(cap);
451
452     // Only check the black holes here if we've nothing else to do.
453     // During normal execution, the black hole list only gets checked
454     // at GC time, to avoid repeatedly traversing this possibly long
455     // list each time around the scheduler.
456     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
457
458     scheduleCheckBlockedThreads(cap);
459
460     scheduleDetectDeadlock(cap,task);
461
462     // Normally, the only way we can get here with no threads to
463     // run is if a keyboard interrupt received during 
464     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465     // Additionally, it is not fatal for the
466     // threaded RTS to reach here with no threads to run.
467     //
468     // win32: might be here due to awaitEvent() being abandoned
469     // as a result of a console event having been delivered.
470     if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
472         ASSERT(interrupted);
473 #endif
474         continue; // nothing to do
475     }
476
477 #if defined(PARALLEL_HASKELL)
478     scheduleSendPendingMessages();
479     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
480         continue;
481
482 #if defined(SPARKS)
483     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
484 #endif
485
486     /* If we still have no work we need to send a FISH to get a spark
487        from another PE */
488     if (emptyRunQueue(cap)) {
489         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490         ASSERT(rtsFalse); // should not happen at the moment
491     }
492     // from here: non-empty run queue.
493     //  TODO: merge above case with this, only one call processMessages() !
494     if (PacketsWaiting()) {  /* process incoming messages, if
495                                 any pending...  only in else
496                                 because getRemoteWork waits for
497                                 messages as well */
498         receivedFinish = processMessages();
499     }
500 #endif
501
502 #if defined(GRAN)
503     scheduleProcessEvent(event);
504 #endif
505
506     // 
507     // Get a thread to run
508     //
509     t = popRunQueue(cap);
510
511 #if defined(GRAN) || defined(PAR)
512     scheduleGranParReport(); // some kind of debuging output
513 #else
514     // Sanity check the thread we're about to run.  This can be
515     // expensive if there is lots of thread switching going on...
516     IF_DEBUG(sanity,checkTSO(t));
517 #endif
518
519 #if defined(THREADED_RTS)
520     // Check whether we can run this thread in the current task.
521     // If not, we have to pass our capability to the right task.
522     {
523         Task *bound = t->bound;
524       
525         if (bound) {
526             if (bound == task) {
527                 IF_DEBUG(scheduler,
528                          sched_belch("### Running thread %d in bound thread",
529                                      t->id));
530                 // yes, the Haskell thread is bound to the current native thread
531             } else {
532                 IF_DEBUG(scheduler,
533                          sched_belch("### thread %d bound to another OS thread",
534                                      t->id));
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 IF_DEBUG(scheduler,
543                          sched_belch("### this OS thread cannot run thread %d", t->id));
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]));
568
569 #if defined(PROFILING)
570     startHeapProfTimer();
571 #endif
572
573     // ----------------------------------------------------------------------
574     // Run the current thread 
575
576     prev_what_next = t->what_next;
577
578     errno = t->saved_errno;
579     cap->in_haskell = rtsTrue;
580
581     recent_activity = ACTIVITY_YES;
582
583     switch (prev_what_next) {
584         
585     case ThreadKilled:
586     case ThreadComplete:
587         /* Thread already finished, return to scheduler. */
588         ret = ThreadFinished;
589         break;
590         
591     case ThreadRunGHC:
592     {
593         StgRegTable *r;
594         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595         cap = regTableToCapability(r);
596         ret = r->rRet;
597         break;
598     }
599     
600     case ThreadInterpret:
601         cap = interpretBCO(cap);
602         ret = cap->r.rRet;
603         break;
604         
605     default:
606         barf("schedule: invalid what_next field");
607     }
608
609     cap->in_haskell = rtsFalse;
610
611     // The TSO might have moved, eg. if it re-entered the RTS and a GC
612     // happened.  So find the new location:
613     t = cap->r.rCurrentTSO;
614
615     // We have run some Haskell code: there might be blackhole-blocked
616     // threads to wake up now.
617     // Lock-free test here should be ok, we're just setting a flag.
618     if ( blackhole_queue != END_TSO_QUEUE ) {
619         blackholes_need_checking = rtsTrue;
620     }
621     
622     // And save the current errno in this thread.
623     // XXX: possibly bogus for SMP because this thread might already
624     // be running again, see code below.
625     t->saved_errno = errno;
626
627 #ifdef SMP
628     // If ret is ThreadBlocked, and this Task is bound to the TSO that
629     // blocked, we are in limbo - the TSO is now owned by whatever it
630     // is blocked on, and may in fact already have been woken up,
631     // perhaps even on a different Capability.  It may be the case
632     // that task->cap != cap.  We better yield this Capability
633     // immediately and return to normaility.
634     if (ret == ThreadBlocked) {
635         IF_DEBUG(scheduler,
636                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
637                             t->id, whatNext_strs[t->what_next]));
638         continue;
639     }
640 #endif
641
642     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
643
644     // ----------------------------------------------------------------------
645     
646     // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
648     stopHeapProfTimer();
649     CCCS = CCS_SYSTEM;
650 #endif
651     
652 #if defined(THREADED_RTS)
653     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655     IF_DEBUG(scheduler,debugBelch("sched: "););
656 #endif
657     
658     schedulePostRunThread();
659
660     ready_to_gc = rtsFalse;
661
662     switch (ret) {
663     case HeapOverflow:
664         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
665         break;
666
667     case StackOverflow:
668         scheduleHandleStackOverflow(cap,task,t);
669         break;
670
671     case ThreadYielding:
672         if (scheduleHandleYield(cap, t, prev_what_next)) {
673             // shortcut for switching between compiler/interpreter:
674             goto run_thread; 
675         }
676         break;
677
678     case ThreadBlocked:
679         scheduleHandleThreadBlocked(t);
680         break;
681
682     case ThreadFinished:
683         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
685         break;
686
687     default:
688       barf("schedule: invalid thread return code %d", (int)ret);
689     }
690
691     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693   } /* end of while() */
694
695   IF_PAR_DEBUG(verbose,
696                debugBelch("== Leaving schedule() after having received Finish\n"));
697 }
698
699 /* ----------------------------------------------------------------------------
700  * Setting up the scheduler loop
701  * ------------------------------------------------------------------------- */
702
703 static void
704 schedulePreLoop(void)
705 {
706 #if defined(GRAN) 
707     /* set up first event to get things going */
708     /* ToDo: assign costs for system setup and init MainTSO ! */
709     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
710               ContinueThread, 
711               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
712     
713     IF_DEBUG(gran,
714              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
715                         CurrentTSO);
716              G_TSO(CurrentTSO, 5));
717     
718     if (RtsFlags.GranFlags.Light) {
719         /* Save current time; GranSim Light only */
720         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
721     }      
722 #endif
723 }
724
725 /* -----------------------------------------------------------------------------
726  * schedulePushWork()
727  *
728  * Push work to other Capabilities if we have some.
729  * -------------------------------------------------------------------------- */
730
731 #ifdef SMP
732 static void
733 schedulePushWork(Capability *cap USED_WHEN_SMP, 
734                  Task *task      USED_WHEN_SMP)
735 {
736     Capability *free_caps[n_capabilities], *cap0;
737     nat i, n_free_caps;
738
739     // Check whether we have more threads on our run queue, or sparks
740     // in our pool, that we could hand to another Capability.
741     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742         && sparkPoolSizeCap(cap) < 2) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         rtsBool pushed_to_all;
774
775         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
776
777         i = 0;
778         pushed_to_all = rtsFalse;
779
780         if (cap->run_queue_hd != END_TSO_QUEUE) {
781             prev = cap->run_queue_hd;
782             t = prev->link;
783             prev->link = END_TSO_QUEUE;
784             for (; t != END_TSO_QUEUE; t = next) {
785                 next = t->link;
786                 t->link = END_TSO_QUEUE;
787                 if (t->what_next == ThreadRelocated
788                     || t->bound == task) { // don't move my bound thread
789                     prev->link = t;
790                     prev = t;
791                 } else if (i == n_free_caps) {
792                     pushed_to_all = rtsTrue;
793                     i = 0;
794                     // keep one for us
795                     prev->link = t;
796                     prev = t;
797                 } else {
798                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
799                     appendToRunQueue(free_caps[i],t);
800                     if (t->bound) { t->bound->cap = free_caps[i]; }
801                     i++;
802                 }
803             }
804             cap->run_queue_tl = prev;
805         }
806
807         // If there are some free capabilities that we didn't push any
808         // threads to, then try to push a spark to each one.
809         if (!pushed_to_all) {
810             StgClosure *spark;
811             // i is the next free capability to push to
812             for (; i < n_free_caps; i++) {
813                 if (emptySparkPoolCap(free_caps[i])) {
814                     spark = findSpark(cap);
815                     if (spark != NULL) {
816                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
817                         newSpark(&(free_caps[i]->r), spark);
818                     }
819                 }
820             }
821         }
822
823         // release the capabilities
824         for (i = 0; i < n_free_caps; i++) {
825             task->cap = free_caps[i];
826             releaseCapability(free_caps[i]);
827         }
828     }
829     task->cap = cap; // reset to point to our Capability.
830 }
831 #endif
832
833 /* ----------------------------------------------------------------------------
834  * Start any pending signal handlers
835  * ------------------------------------------------------------------------- */
836
837 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
838 static void
839 scheduleStartSignalHandlers(Capability *cap)
840 {
841     if (signals_pending()) { // safe outside the lock
842         startSignalHandlers(cap);
843     }
844 }
845 #else
846 static void
847 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
848 {
849 }
850 #endif
851
852 /* ----------------------------------------------------------------------------
853  * Check for blocked threads that can be woken up.
854  * ------------------------------------------------------------------------- */
855
856 static void
857 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
858 {
859 #if !defined(THREADED_RTS)
860     //
861     // Check whether any waiting threads need to be woken up.  If the
862     // run queue is empty, and there are no other tasks running, we
863     // can wait indefinitely for something to happen.
864     //
865     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
866     {
867         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
868     }
869 #endif
870 }
871
872
873 /* ----------------------------------------------------------------------------
874  * Check for threads blocked on BLACKHOLEs that can be woken up
875  * ------------------------------------------------------------------------- */
876 static void
877 scheduleCheckBlackHoles (Capability *cap)
878 {
879     if ( blackholes_need_checking ) // check without the lock first
880     {
881         ACQUIRE_LOCK(&sched_mutex);
882         if ( blackholes_need_checking ) {
883             checkBlackHoles(cap);
884             blackholes_need_checking = rtsFalse;
885         }
886         RELEASE_LOCK(&sched_mutex);
887     }
888 }
889
890 /* ----------------------------------------------------------------------------
891  * Detect deadlock conditions and attempt to resolve them.
892  * ------------------------------------------------------------------------- */
893
894 static void
895 scheduleDetectDeadlock (Capability *cap, Task *task)
896 {
897
898 #if defined(PARALLEL_HASKELL)
899     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
900     return;
901 #endif
902
903     /* 
904      * Detect deadlock: when we have no threads to run, there are no
905      * threads blocked, waiting for I/O, or sleeping, and all the
906      * other tasks are waiting for work, we must have a deadlock of
907      * some description.
908      */
909     if ( emptyThreadQueues(cap) )
910     {
911 #if defined(THREADED_RTS)
912         /* 
913          * In the threaded RTS, we only check for deadlock if there
914          * has been no activity in a complete timeslice.  This means
915          * we won't eagerly start a full GC just because we don't have
916          * any threads to run currently.
917          */
918         if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
922
923         // Garbage collection can release some new threads due to
924         // either (a) finalizers or (b) threads resurrected because
925         // they are unreachable and will therefore be sent an
926         // exception.  Any threads thus released will be immediately
927         // runnable.
928         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
929         recent_activity = ACTIVITY_DONE_GC;
930         
931         if ( !emptyRunQueue(cap) ) return;
932
933 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
934         /* If we have user-installed signal handlers, then wait
935          * for signals to arrive rather then bombing out with a
936          * deadlock.
937          */
938         if ( anyUserHandlers() ) {
939             IF_DEBUG(scheduler, 
940                      sched_belch("still deadlocked, waiting for signals..."));
941
942             awaitUserSignals();
943
944             if (signals_pending()) {
945                 startSignalHandlers(cap);
946             }
947
948             // either we have threads to run, or we were interrupted:
949             ASSERT(!emptyRunQueue(cap) || interrupted);
950         }
951 #endif
952
953 #if !defined(THREADED_RTS)
954         /* Probably a real deadlock.  Send the current main thread the
955          * Deadlock exception.
956          */
957         if (task->tso) {
958             switch (task->tso->why_blocked) {
959             case BlockedOnSTM:
960             case BlockedOnBlackHole:
961             case BlockedOnException:
962             case BlockedOnMVar:
963                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
964                 return;
965             default:
966                 barf("deadlock: main thread blocked in a strange way");
967             }
968         }
969         return;
970 #endif
971     }
972 }
973
974 /* ----------------------------------------------------------------------------
975  * Process an event (GRAN only)
976  * ------------------------------------------------------------------------- */
977
978 #if defined(GRAN)
979 static StgTSO *
980 scheduleProcessEvent(rtsEvent *event)
981 {
982     StgTSO *t;
983
984     if (RtsFlags.GranFlags.Light)
985       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
986
987     /* adjust time based on time-stamp */
988     if (event->time > CurrentTime[CurrentProc] &&
989         event->evttype != ContinueThread)
990       CurrentTime[CurrentProc] = event->time;
991     
992     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
993     if (!RtsFlags.GranFlags.Light)
994       handleIdlePEs();
995
996     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
997
998     /* main event dispatcher in GranSim */
999     switch (event->evttype) {
1000       /* Should just be continuing execution */
1001     case ContinueThread:
1002       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1003       /* ToDo: check assertion
1004       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1005              run_queue_hd != END_TSO_QUEUE);
1006       */
1007       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1008       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1009           procStatus[CurrentProc]==Fetching) {
1010         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1011               CurrentTSO->id, CurrentTSO, CurrentProc);
1012         goto next_thread;
1013       } 
1014       /* Ignore ContinueThreads for completed threads */
1015       if (CurrentTSO->what_next == ThreadComplete) {
1016         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1017               CurrentTSO->id, CurrentTSO, CurrentProc);
1018         goto next_thread;
1019       } 
1020       /* Ignore ContinueThreads for threads that are being migrated */
1021       if (PROCS(CurrentTSO)==Nowhere) { 
1022         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1023               CurrentTSO->id, CurrentTSO, CurrentProc);
1024         goto next_thread;
1025       }
1026       /* The thread should be at the beginning of the run queue */
1027       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1028         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1029               CurrentTSO->id, CurrentTSO, CurrentProc);
1030         break; // run the thread anyway
1031       }
1032       /*
1033       new_event(proc, proc, CurrentTime[proc],
1034                 FindWork,
1035                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1036       goto next_thread; 
1037       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1038       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1039
1040     case FetchNode:
1041       do_the_fetchnode(event);
1042       goto next_thread;             /* handle next event in event queue  */
1043       
1044     case GlobalBlock:
1045       do_the_globalblock(event);
1046       goto next_thread;             /* handle next event in event queue  */
1047       
1048     case FetchReply:
1049       do_the_fetchreply(event);
1050       goto next_thread;             /* handle next event in event queue  */
1051       
1052     case UnblockThread:   /* Move from the blocked queue to the tail of */
1053       do_the_unblock(event);
1054       goto next_thread;             /* handle next event in event queue  */
1055       
1056     case ResumeThread:  /* Move from the blocked queue to the tail of */
1057       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1058       event->tso->gran.blocktime += 
1059         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1060       do_the_startthread(event);
1061       goto next_thread;             /* handle next event in event queue  */
1062       
1063     case StartThread:
1064       do_the_startthread(event);
1065       goto next_thread;             /* handle next event in event queue  */
1066       
1067     case MoveThread:
1068       do_the_movethread(event);
1069       goto next_thread;             /* handle next event in event queue  */
1070       
1071     case MoveSpark:
1072       do_the_movespark(event);
1073       goto next_thread;             /* handle next event in event queue  */
1074       
1075     case FindWork:
1076       do_the_findwork(event);
1077       goto next_thread;             /* handle next event in event queue  */
1078       
1079     default:
1080       barf("Illegal event type %u\n", event->evttype);
1081     }  /* switch */
1082     
1083     /* This point was scheduler_loop in the old RTS */
1084
1085     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1086
1087     TimeOfLastEvent = CurrentTime[CurrentProc];
1088     TimeOfNextEvent = get_time_of_next_event();
1089     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1090     // CurrentTSO = ThreadQueueHd;
1091
1092     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1093                          TimeOfNextEvent));
1094
1095     if (RtsFlags.GranFlags.Light) 
1096       GranSimLight_leave_system(event, &ActiveTSO); 
1097
1098     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1099
1100     IF_DEBUG(gran, 
1101              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1102
1103     /* in a GranSim setup the TSO stays on the run queue */
1104     t = CurrentTSO;
1105     /* Take a thread from the run queue. */
1106     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1107
1108     IF_DEBUG(gran, 
1109              debugBelch("GRAN: About to run current thread, which is\n");
1110              G_TSO(t,5));
1111
1112     context_switch = 0; // turned on via GranYield, checking events and time slice
1113
1114     IF_DEBUG(gran, 
1115              DumpGranEvent(GR_SCHEDULE, t));
1116
1117     procStatus[CurrentProc] = Busy;
1118 }
1119 #endif // GRAN
1120
1121 /* ----------------------------------------------------------------------------
1122  * Send pending messages (PARALLEL_HASKELL only)
1123  * ------------------------------------------------------------------------- */
1124
1125 #if defined(PARALLEL_HASKELL)
1126 static StgTSO *
1127 scheduleSendPendingMessages(void)
1128 {
1129     StgSparkPool *pool;
1130     rtsSpark spark;
1131     StgTSO *t;
1132
1133 # if defined(PAR) // global Mem.Mgmt., omit for now
1134     if (PendingFetches != END_BF_QUEUE) {
1135         processFetches();
1136     }
1137 # endif
1138     
1139     if (RtsFlags.ParFlags.BufferTime) {
1140         // if we use message buffering, we must send away all message
1141         // packets which have become too old...
1142         sendOldBuffers(); 
1143     }
1144 }
1145 #endif
1146
1147 /* ----------------------------------------------------------------------------
1148  * Activate spark threads (PARALLEL_HASKELL only)
1149  * ------------------------------------------------------------------------- */
1150
1151 #if defined(PARALLEL_HASKELL)
1152 static void
1153 scheduleActivateSpark(void)
1154 {
1155 #if defined(SPARKS)
1156   ASSERT(emptyRunQueue());
1157 /* We get here if the run queue is empty and want some work.
1158    We try to turn a spark into a thread, and add it to the run queue,
1159    from where it will be picked up in the next iteration of the scheduler
1160    loop.
1161 */
1162
1163       /* :-[  no local threads => look out for local sparks */
1164       /* the spark pool for the current PE */
1165       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1166       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1167           pool->hd < pool->tl) {
1168         /* 
1169          * ToDo: add GC code check that we really have enough heap afterwards!!
1170          * Old comment:
1171          * If we're here (no runnable threads) and we have pending
1172          * sparks, we must have a space problem.  Get enough space
1173          * to turn one of those pending sparks into a
1174          * thread... 
1175          */
1176
1177         spark = findSpark(rtsFalse);            /* get a spark */
1178         if (spark != (rtsSpark) NULL) {
1179           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1180           IF_PAR_DEBUG(fish, // schedule,
1181                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1182                              tso->id, tso, advisory_thread_count));
1183
1184           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1185             IF_PAR_DEBUG(fish, // schedule,
1186                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1187                             spark));
1188             return rtsFalse; /* failed to generate a thread */
1189           }                  /* otherwise fall through & pick-up new tso */
1190         } else {
1191           IF_PAR_DEBUG(fish, // schedule,
1192                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1193                              spark_queue_len(pool)));
1194           return rtsFalse;  /* failed to generate a thread */
1195         }
1196         return rtsTrue;  /* success in generating a thread */
1197   } else { /* no more threads permitted or pool empty */
1198     return rtsFalse;  /* failed to generateThread */
1199   }
1200 #else
1201   tso = NULL; // avoid compiler warning only
1202   return rtsFalse;  /* dummy in non-PAR setup */
1203 #endif // SPARKS
1204 }
1205 #endif // PARALLEL_HASKELL
1206
1207 /* ----------------------------------------------------------------------------
1208  * Get work from a remote node (PARALLEL_HASKELL only)
1209  * ------------------------------------------------------------------------- */
1210     
1211 #if defined(PARALLEL_HASKELL)
1212 static rtsBool
1213 scheduleGetRemoteWork(rtsBool *receivedFinish)
1214 {
1215   ASSERT(emptyRunQueue());
1216
1217   if (RtsFlags.ParFlags.BufferTime) {
1218         IF_PAR_DEBUG(verbose, 
1219                 debugBelch("...send all pending data,"));
1220         {
1221           nat i;
1222           for (i=1; i<=nPEs; i++)
1223             sendImmediately(i); // send all messages away immediately
1224         }
1225   }
1226 # ifndef SPARKS
1227         //++EDEN++ idle() , i.e. send all buffers, wait for work
1228         // suppress fishing in EDEN... just look for incoming messages
1229         // (blocking receive)
1230   IF_PAR_DEBUG(verbose, 
1231                debugBelch("...wait for incoming messages...\n"));
1232   *receivedFinish = processMessages(); // blocking receive...
1233
1234         // and reenter scheduling loop after having received something
1235         // (return rtsFalse below)
1236
1237 # else /* activate SPARKS machinery */
1238 /* We get here, if we have no work, tried to activate a local spark, but still
1239    have no work. We try to get a remote spark, by sending a FISH message.
1240    Thread migration should be added here, and triggered when a sequence of 
1241    fishes returns without work. */
1242         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1243
1244       /* =8-[  no local sparks => look for work on other PEs */
1245         /*
1246          * We really have absolutely no work.  Send out a fish
1247          * (there may be some out there already), and wait for
1248          * something to arrive.  We clearly can't run any threads
1249          * until a SCHEDULE or RESUME arrives, and so that's what
1250          * we're hoping to see.  (Of course, we still have to
1251          * respond to other types of messages.)
1252          */
1253         rtsTime now = msTime() /*CURRENT_TIME*/;
1254         IF_PAR_DEBUG(verbose, 
1255                      debugBelch("--  now=%ld\n", now));
1256         IF_PAR_DEBUG(fish, // verbose,
1257              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1258                  (last_fish_arrived_at!=0 &&
1259                   last_fish_arrived_at+delay > now)) {
1260                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1261                      now, last_fish_arrived_at+delay, 
1262                      last_fish_arrived_at,
1263                      delay);
1264              });
1265   
1266         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1267             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1268           if (last_fish_arrived_at==0 ||
1269               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1270             /* outstandingFishes is set in sendFish, processFish;
1271                avoid flooding system with fishes via delay */
1272     next_fish_to_send_at = 0;  
1273   } else {
1274     /* ToDo: this should be done in the main scheduling loop to avoid the
1275              busy wait here; not so bad if fish delay is very small  */
1276     int iq = 0; // DEBUGGING -- HWL
1277     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1278     /* send a fish when ready, but process messages that arrive in the meantime */
1279     do {
1280       if (PacketsWaiting()) {
1281         iq++; // DEBUGGING
1282         *receivedFinish = processMessages();
1283       }
1284       now = msTime();
1285     } while (!*receivedFinish || now<next_fish_to_send_at);
1286     // JB: This means the fish could become obsolete, if we receive
1287     // work. Better check for work again? 
1288     // last line: while (!receivedFinish || !haveWork || now<...)
1289     // next line: if (receivedFinish || haveWork )
1290
1291     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1292       return rtsFalse;  // NB: this will leave scheduler loop
1293                         // immediately after return!
1294                           
1295     IF_PAR_DEBUG(fish, // verbose,
1296                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1297
1298   }
1299
1300     // JB: IMHO, this should all be hidden inside sendFish(...)
1301     /* pe = choosePE(); 
1302        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1303                 NEW_FISH_HUNGER);
1304
1305     // Global statistics: count no. of fishes
1306     if (RtsFlags.ParFlags.ParStats.Global &&
1307          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1308            globalParStats.tot_fish_mess++;
1309            }
1310     */ 
1311
1312   /* delayed fishes must have been sent by now! */
1313   next_fish_to_send_at = 0;  
1314   }
1315       
1316   *receivedFinish = processMessages();
1317 # endif /* SPARKS */
1318
1319  return rtsFalse;
1320  /* NB: this function always returns rtsFalse, meaning the scheduler
1321     loop continues with the next iteration; 
1322     rationale: 
1323       return code means success in finding work; we enter this function
1324       if there is no local work, thus have to send a fish which takes
1325       time until it arrives with work; in the meantime we should process
1326       messages in the main loop;
1327  */
1328 }
1329 #endif // PARALLEL_HASKELL
1330
1331 /* ----------------------------------------------------------------------------
1332  * PAR/GRAN: Report stats & debugging info(?)
1333  * ------------------------------------------------------------------------- */
1334
1335 #if defined(PAR) || defined(GRAN)
1336 static void
1337 scheduleGranParReport(void)
1338 {
1339   ASSERT(run_queue_hd != END_TSO_QUEUE);
1340
1341   /* Take a thread from the run queue, if we have work */
1342   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1343
1344     /* If this TSO has got its outport closed in the meantime, 
1345      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1346      * It has to be marked as TH_DEAD for this purpose.
1347      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1348
1349 JB: TODO: investigate wether state change field could be nuked
1350      entirely and replaced by the normal tso state (whatnext
1351      field). All we want to do is to kill tsos from outside.
1352      */
1353
1354     /* ToDo: write something to the log-file
1355     if (RTSflags.ParFlags.granSimStats && !sameThread)
1356         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1357
1358     CurrentTSO = t;
1359     */
1360     /* the spark pool for the current PE */
1361     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1362
1363     IF_DEBUG(scheduler, 
1364              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1365                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1366
1367     IF_PAR_DEBUG(fish,
1368              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1369                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1370
1371     if (RtsFlags.ParFlags.ParStats.Full && 
1372         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1373         (emitSchedule || // forced emit
1374          (t && LastTSO && t->id != LastTSO->id))) {
1375       /* 
1376          we are running a different TSO, so write a schedule event to log file
1377          NB: If we use fair scheduling we also have to write  a deschedule 
1378              event for LastTSO; with unfair scheduling we know that the
1379              previous tso has blocked whenever we switch to another tso, so
1380              we don't need it in GUM for now
1381       */
1382       IF_PAR_DEBUG(fish, // schedule,
1383                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1384
1385       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1386                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1387       emitSchedule = rtsFalse;
1388     }
1389 }     
1390 #endif
1391
1392 /* ----------------------------------------------------------------------------
1393  * After running a thread...
1394  * ------------------------------------------------------------------------- */
1395
1396 static void
1397 schedulePostRunThread(void)
1398 {
1399 #if defined(PAR)
1400     /* HACK 675: if the last thread didn't yield, make sure to print a 
1401        SCHEDULE event to the log file when StgRunning the next thread, even
1402        if it is the same one as before */
1403     LastTSO = t; 
1404     TimeOfLastYield = CURRENT_TIME;
1405 #endif
1406
1407   /* some statistics gathering in the parallel case */
1408
1409 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1410   switch (ret) {
1411     case HeapOverflow:
1412 # if defined(GRAN)
1413       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1414       globalGranStats.tot_heapover++;
1415 # elif defined(PAR)
1416       globalParStats.tot_heapover++;
1417 # endif
1418       break;
1419
1420      case StackOverflow:
1421 # if defined(GRAN)
1422       IF_DEBUG(gran, 
1423                DumpGranEvent(GR_DESCHEDULE, t));
1424       globalGranStats.tot_stackover++;
1425 # elif defined(PAR)
1426       // IF_DEBUG(par, 
1427       // DumpGranEvent(GR_DESCHEDULE, t);
1428       globalParStats.tot_stackover++;
1429 # endif
1430       break;
1431
1432     case ThreadYielding:
1433 # if defined(GRAN)
1434       IF_DEBUG(gran, 
1435                DumpGranEvent(GR_DESCHEDULE, t));
1436       globalGranStats.tot_yields++;
1437 # elif defined(PAR)
1438       // IF_DEBUG(par, 
1439       // DumpGranEvent(GR_DESCHEDULE, t);
1440       globalParStats.tot_yields++;
1441 # endif
1442       break; 
1443
1444     case ThreadBlocked:
1445 # if defined(GRAN)
1446       IF_DEBUG(scheduler,
1447                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1448                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1449                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1450                if (t->block_info.closure!=(StgClosure*)NULL)
1451                  print_bq(t->block_info.closure);
1452                debugBelch("\n"));
1453
1454       // ??? needed; should emit block before
1455       IF_DEBUG(gran, 
1456                DumpGranEvent(GR_DESCHEDULE, t)); 
1457       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1458       /*
1459         ngoq Dogh!
1460       ASSERT(procStatus[CurrentProc]==Busy || 
1461               ((procStatus[CurrentProc]==Fetching) && 
1462               (t->block_info.closure!=(StgClosure*)NULL)));
1463       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1464           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1465             procStatus[CurrentProc]==Fetching)) 
1466         procStatus[CurrentProc] = Idle;
1467       */
1468 # elif defined(PAR)
1469 //++PAR++  blockThread() writes the event (change?)
1470 # endif
1471     break;
1472
1473   case ThreadFinished:
1474     break;
1475
1476   default:
1477     barf("parGlobalStats: unknown return code");
1478     break;
1479     }
1480 #endif
1481 }
1482
1483 /* -----------------------------------------------------------------------------
1484  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1485  * -------------------------------------------------------------------------- */
1486
1487 static rtsBool
1488 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1489 {
1490     // did the task ask for a large block?
1491     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1492         // if so, get one and push it on the front of the nursery.
1493         bdescr *bd;
1494         lnat blocks;
1495         
1496         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1497         
1498         IF_DEBUG(scheduler,
1499                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1500                             (long)t->id, whatNext_strs[t->what_next], blocks));
1501         
1502         // don't do this if the nursery is (nearly) full, we'll GC first.
1503         if (cap->r.rCurrentNursery->link != NULL ||
1504             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1505                                                // if the nursery has only one block.
1506             
1507             ACQUIRE_SM_LOCK
1508             bd = allocGroup( blocks );
1509             RELEASE_SM_LOCK
1510             cap->r.rNursery->n_blocks += blocks;
1511             
1512             // link the new group into the list
1513             bd->link = cap->r.rCurrentNursery;
1514             bd->u.back = cap->r.rCurrentNursery->u.back;
1515             if (cap->r.rCurrentNursery->u.back != NULL) {
1516                 cap->r.rCurrentNursery->u.back->link = bd;
1517             } else {
1518 #if !defined(SMP)
1519                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1520                        g0s0 == cap->r.rNursery);
1521 #endif
1522                 cap->r.rNursery->blocks = bd;
1523             }             
1524             cap->r.rCurrentNursery->u.back = bd;
1525             
1526             // initialise it as a nursery block.  We initialise the
1527             // step, gen_no, and flags field of *every* sub-block in
1528             // this large block, because this is easier than making
1529             // sure that we always find the block head of a large
1530             // block whenever we call Bdescr() (eg. evacuate() and
1531             // isAlive() in the GC would both have to do this, at
1532             // least).
1533             { 
1534                 bdescr *x;
1535                 for (x = bd; x < bd + blocks; x++) {
1536                     x->step = cap->r.rNursery;
1537                     x->gen_no = 0;
1538                     x->flags = 0;
1539                 }
1540             }
1541             
1542             // This assert can be a killer if the app is doing lots
1543             // of large block allocations.
1544             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1545             
1546             // now update the nursery to point to the new block
1547             cap->r.rCurrentNursery = bd;
1548             
1549             // we might be unlucky and have another thread get on the
1550             // run queue before us and steal the large block, but in that
1551             // case the thread will just end up requesting another large
1552             // block.
1553             pushOnRunQueue(cap,t);
1554             return rtsFalse;  /* not actually GC'ing */
1555         }
1556     }
1557     
1558     IF_DEBUG(scheduler,
1559              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1560                         (long)t->id, whatNext_strs[t->what_next]));
1561 #if defined(GRAN)
1562     ASSERT(!is_on_queue(t,CurrentProc));
1563 #elif defined(PARALLEL_HASKELL)
1564     /* Currently we emit a DESCHEDULE event before GC in GUM.
1565        ToDo: either add separate event to distinguish SYSTEM time from rest
1566        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1567     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1568         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1569                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1570         emitSchedule = rtsTrue;
1571     }
1572 #endif
1573       
1574     pushOnRunQueue(cap,t);
1575     return rtsTrue;
1576     /* actual GC is done at the end of the while loop in schedule() */
1577 }
1578
1579 /* -----------------------------------------------------------------------------
1580  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1581  * -------------------------------------------------------------------------- */
1582
1583 static void
1584 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1585 {
1586     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1587                                   (long)t->id, whatNext_strs[t->what_next]));
1588     /* just adjust the stack for this thread, then pop it back
1589      * on the run queue.
1590      */
1591     { 
1592         /* enlarge the stack */
1593         StgTSO *new_t = threadStackOverflow(cap, t);
1594         
1595         /* The TSO attached to this Task may have moved, so update the
1596          * pointer to it.
1597          */
1598         if (task->tso == t) {
1599             task->tso = new_t;
1600         }
1601         pushOnRunQueue(cap,new_t);
1602     }
1603 }
1604
1605 /* -----------------------------------------------------------------------------
1606  * Handle a thread that returned to the scheduler with ThreadYielding
1607  * -------------------------------------------------------------------------- */
1608
1609 static rtsBool
1610 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1611 {
1612     // Reset the context switch flag.  We don't do this just before
1613     // running the thread, because that would mean we would lose ticks
1614     // during GC, which can lead to unfair scheduling (a thread hogs
1615     // the CPU because the tick always arrives during GC).  This way
1616     // penalises threads that do a lot of allocation, but that seems
1617     // better than the alternative.
1618     context_switch = 0;
1619     
1620     /* put the thread back on the run queue.  Then, if we're ready to
1621      * GC, check whether this is the last task to stop.  If so, wake
1622      * up the GC thread.  getThread will block during a GC until the
1623      * GC is finished.
1624      */
1625     IF_DEBUG(scheduler,
1626              if (t->what_next != prev_what_next) {
1627                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1628                             (long)t->id, whatNext_strs[t->what_next]);
1629              } else {
1630                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1631                             (long)t->id, whatNext_strs[t->what_next]);
1632              }
1633         );
1634     
1635     IF_DEBUG(sanity,
1636              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1637              checkTSO(t));
1638     ASSERT(t->link == END_TSO_QUEUE);
1639     
1640     // Shortcut if we're just switching evaluators: don't bother
1641     // doing stack squeezing (which can be expensive), just run the
1642     // thread.
1643     if (t->what_next != prev_what_next) {
1644         return rtsTrue;
1645     }
1646     
1647 #if defined(GRAN)
1648     ASSERT(!is_on_queue(t,CurrentProc));
1649       
1650     IF_DEBUG(sanity,
1651              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1652              checkThreadQsSanity(rtsTrue));
1653
1654 #endif
1655
1656     addToRunQueue(cap,t);
1657
1658 #if defined(GRAN)
1659     /* add a ContinueThread event to actually process the thread */
1660     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1661               ContinueThread,
1662               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1663     IF_GRAN_DEBUG(bq, 
1664                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1665                   G_EVENTQ(0);
1666                   G_CURR_THREADQ(0));
1667 #endif
1668     return rtsFalse;
1669 }
1670
1671 /* -----------------------------------------------------------------------------
1672  * Handle a thread that returned to the scheduler with ThreadBlocked
1673  * -------------------------------------------------------------------------- */
1674
1675 static void
1676 scheduleHandleThreadBlocked( StgTSO *t
1677 #if !defined(GRAN) && !defined(DEBUG)
1678     STG_UNUSED
1679 #endif
1680     )
1681 {
1682 #if defined(GRAN)
1683     IF_DEBUG(scheduler,
1684              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1685                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1686              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1687     
1688     // ??? needed; should emit block before
1689     IF_DEBUG(gran, 
1690              DumpGranEvent(GR_DESCHEDULE, t)); 
1691     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1692     /*
1693       ngoq Dogh!
1694       ASSERT(procStatus[CurrentProc]==Busy || 
1695       ((procStatus[CurrentProc]==Fetching) && 
1696       (t->block_info.closure!=(StgClosure*)NULL)));
1697       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1698       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1699       procStatus[CurrentProc]==Fetching)) 
1700       procStatus[CurrentProc] = Idle;
1701     */
1702 #elif defined(PAR)
1703     IF_DEBUG(scheduler,
1704              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1705                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1706     IF_PAR_DEBUG(bq,
1707                  
1708                  if (t->block_info.closure!=(StgClosure*)NULL) 
1709                  print_bq(t->block_info.closure));
1710     
1711     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1712     blockThread(t);
1713     
1714     /* whatever we schedule next, we must log that schedule */
1715     emitSchedule = rtsTrue;
1716     
1717 #else /* !GRAN */
1718
1719       // We don't need to do anything.  The thread is blocked, and it
1720       // has tidied up its stack and placed itself on whatever queue
1721       // it needs to be on.
1722
1723 #if !defined(SMP)
1724     ASSERT(t->why_blocked != NotBlocked);
1725              // This might not be true under SMP: we don't have
1726              // exclusive access to this TSO, so someone might have
1727              // woken it up by now.  This actually happens: try
1728              // conc023 +RTS -N2.
1729 #endif
1730
1731     IF_DEBUG(scheduler,
1732              debugBelch("--<< thread %d (%s) stopped: ", 
1733                         t->id, whatNext_strs[t->what_next]);
1734              printThreadBlockage(t);
1735              debugBelch("\n"));
1736     
1737     /* Only for dumping event to log file 
1738        ToDo: do I need this in GranSim, too?
1739        blockThread(t);
1740     */
1741 #endif
1742 }
1743
1744 /* -----------------------------------------------------------------------------
1745  * Handle a thread that returned to the scheduler with ThreadFinished
1746  * -------------------------------------------------------------------------- */
1747
1748 static rtsBool
1749 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1750 {
1751     /* Need to check whether this was a main thread, and if so,
1752      * return with the return value.
1753      *
1754      * We also end up here if the thread kills itself with an
1755      * uncaught exception, see Exception.cmm.
1756      */
1757     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1758                                   t->id, whatNext_strs[t->what_next]));
1759
1760 #if defined(GRAN)
1761       endThread(t, CurrentProc); // clean-up the thread
1762 #elif defined(PARALLEL_HASKELL)
1763       /* For now all are advisory -- HWL */
1764       //if(t->priority==AdvisoryPriority) ??
1765       advisory_thread_count--; // JB: Caution with this counter, buggy!
1766       
1767 # if defined(DIST)
1768       if(t->dist.priority==RevalPriority)
1769         FinishReval(t);
1770 # endif
1771     
1772 # if defined(EDENOLD)
1773       // the thread could still have an outport... (BUG)
1774       if (t->eden.outport != -1) {
1775       // delete the outport for the tso which has finished...
1776         IF_PAR_DEBUG(eden_ports,
1777                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1778                               t->eden.outport, t->id));
1779         deleteOPT(t);
1780       }
1781       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1782       if (t->eden.epid != -1) {
1783         IF_PAR_DEBUG(eden_ports,
1784                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1785                            t->id, t->eden.epid));
1786         removeTSOfromProcess(t);
1787       }
1788 # endif 
1789
1790 # if defined(PAR)
1791       if (RtsFlags.ParFlags.ParStats.Full &&
1792           !RtsFlags.ParFlags.ParStats.Suppressed) 
1793         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1794
1795       //  t->par only contains statistics: left out for now...
1796       IF_PAR_DEBUG(fish,
1797                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1798                               t->id,t,t->par.sparkname));
1799 # endif
1800 #endif // PARALLEL_HASKELL
1801
1802       //
1803       // Check whether the thread that just completed was a bound
1804       // thread, and if so return with the result.  
1805       //
1806       // There is an assumption here that all thread completion goes
1807       // through this point; we need to make sure that if a thread
1808       // ends up in the ThreadKilled state, that it stays on the run
1809       // queue so it can be dealt with here.
1810       //
1811
1812       if (t->bound) {
1813
1814           if (t->bound != task) {
1815 #if !defined(THREADED_RTS)
1816               // Must be a bound thread that is not the topmost one.  Leave
1817               // it on the run queue until the stack has unwound to the
1818               // point where we can deal with this.  Leaving it on the run
1819               // queue also ensures that the garbage collector knows about
1820               // this thread and its return value (it gets dropped from the
1821               // all_threads list so there's no other way to find it).
1822               appendToRunQueue(cap,t);
1823               return rtsFalse;
1824 #else
1825               // this cannot happen in the threaded RTS, because a
1826               // bound thread can only be run by the appropriate Task.
1827               barf("finished bound thread that isn't mine");
1828 #endif
1829           }
1830
1831           ASSERT(task->tso == t);
1832
1833           if (t->what_next == ThreadComplete) {
1834               if (task->ret) {
1835                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1836                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1837               }
1838               task->stat = Success;
1839           } else {
1840               if (task->ret) {
1841                   *(task->ret) = NULL;
1842               }
1843               if (interrupted) {
1844                   task->stat = Interrupted;
1845               } else {
1846                   task->stat = Killed;
1847               }
1848           }
1849 #ifdef DEBUG
1850           removeThreadLabel((StgWord)task->tso->id);
1851 #endif
1852           return rtsTrue; // tells schedule() to return
1853       }
1854
1855       return rtsFalse;
1856 }
1857
1858 /* -----------------------------------------------------------------------------
1859  * Perform a heap census, if PROFILING
1860  * -------------------------------------------------------------------------- */
1861
1862 static rtsBool
1863 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1864 {
1865 #if defined(PROFILING)
1866     // When we have +RTS -i0 and we're heap profiling, do a census at
1867     // every GC.  This lets us get repeatable runs for debugging.
1868     if (performHeapProfile ||
1869         (RtsFlags.ProfFlags.profileInterval==0 &&
1870          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1871         GarbageCollect(GetRoots, rtsTrue);
1872         heapCensus();
1873         performHeapProfile = rtsFalse;
1874         return rtsTrue;  // true <=> we already GC'd
1875     }
1876 #endif
1877     return rtsFalse;
1878 }
1879
1880 /* -----------------------------------------------------------------------------
1881  * Perform a garbage collection if necessary
1882  * -------------------------------------------------------------------------- */
1883
1884 static void
1885 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1886 {
1887     StgTSO *t;
1888 #ifdef SMP
1889     static volatile StgWord waiting_for_gc;
1890     rtsBool was_waiting;
1891     nat i;
1892 #endif
1893
1894 #ifdef SMP
1895     // In order to GC, there must be no threads running Haskell code.
1896     // Therefore, the GC thread needs to hold *all* the capabilities,
1897     // and release them after the GC has completed.  
1898     //
1899     // This seems to be the simplest way: previous attempts involved
1900     // making all the threads with capabilities give up their
1901     // capabilities and sleep except for the *last* one, which
1902     // actually did the GC.  But it's quite hard to arrange for all
1903     // the other tasks to sleep and stay asleep.
1904     //
1905         
1906     was_waiting = cas(&waiting_for_gc, 0, 1);
1907     if (was_waiting) {
1908         do {
1909             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1910             yieldCapability(&cap,task);
1911         } while (waiting_for_gc);
1912         return;
1913     }
1914
1915     for (i=0; i < n_capabilities; i++) {
1916         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1917         if (cap != &capabilities[i]) {
1918             Capability *pcap = &capabilities[i];
1919             // we better hope this task doesn't get migrated to
1920             // another Capability while we're waiting for this one.
1921             // It won't, because load balancing happens while we have
1922             // all the Capabilities, but even so it's a slightly
1923             // unsavoury invariant.
1924             task->cap = pcap;
1925             context_switch = 1;
1926             waitForReturnCapability(&pcap, task);
1927             if (pcap != &capabilities[i]) {
1928                 barf("scheduleDoGC: got the wrong capability");
1929             }
1930         }
1931     }
1932
1933     waiting_for_gc = rtsFalse;
1934 #endif
1935
1936     /* Kick any transactions which are invalid back to their
1937      * atomically frames.  When next scheduled they will try to
1938      * commit, this commit will fail and they will retry.
1939      */
1940     { 
1941         StgTSO *next;
1942
1943         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1944             if (t->what_next == ThreadRelocated) {
1945                 next = t->link;
1946             } else {
1947                 next = t->global_link;
1948                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1949                     if (!stmValidateNestOfTransactions (t -> trec)) {
1950                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1951                         
1952                         // strip the stack back to the
1953                         // ATOMICALLY_FRAME, aborting the (nested)
1954                         // transaction, and saving the stack of any
1955                         // partially-evaluated thunks on the heap.
1956                         raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1957                         
1958 #ifdef REG_R1
1959                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1960 #endif
1961                     }
1962                 }
1963             }
1964         }
1965     }
1966     
1967     // so this happens periodically:
1968     scheduleCheckBlackHoles(cap);
1969     
1970     IF_DEBUG(scheduler, printAllThreads());
1971
1972     /* everybody back, start the GC.
1973      * Could do it in this thread, or signal a condition var
1974      * to do it in another thread.  Either way, we need to
1975      * broadcast on gc_pending_cond afterward.
1976      */
1977 #if defined(THREADED_RTS)
1978     IF_DEBUG(scheduler,sched_belch("doing GC"));
1979 #endif
1980     GarbageCollect(GetRoots, force_major);
1981     
1982 #if defined(SMP)
1983     // release our stash of capabilities.
1984     for (i = 0; i < n_capabilities; i++) {
1985         if (cap != &capabilities[i]) {
1986             task->cap = &capabilities[i];
1987             releaseCapability(&capabilities[i]);
1988         }
1989     }
1990     task->cap = cap;
1991 #endif
1992
1993 #if defined(GRAN)
1994     /* add a ContinueThread event to continue execution of current thread */
1995     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1996               ContinueThread,
1997               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1998     IF_GRAN_DEBUG(bq, 
1999                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2000                   G_EVENTQ(0);
2001                   G_CURR_THREADQ(0));
2002 #endif /* GRAN */
2003 }
2004
2005 /* ---------------------------------------------------------------------------
2006  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2007  * used by Control.Concurrent for error checking.
2008  * ------------------------------------------------------------------------- */
2009  
2010 StgBool
2011 rtsSupportsBoundThreads(void)
2012 {
2013 #if defined(THREADED_RTS)
2014   return rtsTrue;
2015 #else
2016   return rtsFalse;
2017 #endif
2018 }
2019
2020 /* ---------------------------------------------------------------------------
2021  * isThreadBound(tso): check whether tso is bound to an OS thread.
2022  * ------------------------------------------------------------------------- */
2023  
2024 StgBool
2025 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2026 {
2027 #if defined(THREADED_RTS)
2028   return (tso->bound != NULL);
2029 #endif
2030   return rtsFalse;
2031 }
2032
2033 /* ---------------------------------------------------------------------------
2034  * Singleton fork(). Do not copy any running threads.
2035  * ------------------------------------------------------------------------- */
2036
2037 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2038 #define FORKPROCESS_PRIMOP_SUPPORTED
2039 #endif
2040
2041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2042 static void 
2043 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2044 #endif
2045 StgInt
2046 forkProcess(HsStablePtr *entry
2047 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2048             STG_UNUSED
2049 #endif
2050            )
2051 {
2052 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2053     Task *task;
2054     pid_t pid;
2055     StgTSO* t,*next;
2056     Capability *cap;
2057     
2058     IF_DEBUG(scheduler,sched_belch("forking!"));
2059     
2060     // ToDo: for SMP, we should probably acquire *all* the capabilities
2061     cap = rts_lock();
2062     
2063     pid = fork();
2064     
2065     if (pid) { // parent
2066         
2067         // just return the pid
2068         rts_unlock(cap);
2069         return pid;
2070         
2071     } else { // child
2072         
2073         // delete all threads
2074         cap->run_queue_hd = END_TSO_QUEUE;
2075         cap->run_queue_tl = END_TSO_QUEUE;
2076         
2077         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2078             next = t->link;
2079             
2080             // don't allow threads to catch the ThreadKilled exception
2081             deleteThreadImmediately(cap,t);
2082         }
2083         
2084         // wipe the task list
2085         ACQUIRE_LOCK(&sched_mutex);
2086         for (task = all_tasks; task != NULL; task=task->all_link) {
2087             if (task != cap->running_task) discardTask(task);
2088         }
2089         RELEASE_LOCK(&sched_mutex);
2090
2091 #if defined(THREADED_RTS)
2092         // wipe our spare workers list.
2093         cap->spare_workers = NULL;
2094 #endif
2095
2096         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2097         rts_checkSchedStatus("forkProcess",cap);
2098         
2099         rts_unlock(cap);
2100         hs_exit();                      // clean up and exit
2101         stg_exit(EXIT_SUCCESS);
2102     }
2103 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2104     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2105     return -1;
2106 #endif
2107 }
2108
2109 /* ---------------------------------------------------------------------------
2110  * Delete the threads on the run queue of the current capability.
2111  * ------------------------------------------------------------------------- */
2112    
2113 static void
2114 deleteRunQueue (Capability *cap)
2115 {
2116     StgTSO *t, *next;
2117     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2118         ASSERT(t->what_next != ThreadRelocated);
2119         next = t->link;
2120         deleteThread(cap, t);
2121     }
2122 }
2123
2124 /* startThread and  insertThread are now in GranSim.c -- HWL */
2125
2126
2127 /* -----------------------------------------------------------------------------
2128    Managing the suspended_ccalling_tasks list.
2129    Locks required: sched_mutex
2130    -------------------------------------------------------------------------- */
2131
2132 STATIC_INLINE void
2133 suspendTask (Capability *cap, Task *task)
2134 {
2135     ASSERT(task->next == NULL && task->prev == NULL);
2136     task->next = cap->suspended_ccalling_tasks;
2137     task->prev = NULL;
2138     if (cap->suspended_ccalling_tasks) {
2139         cap->suspended_ccalling_tasks->prev = task;
2140     }
2141     cap->suspended_ccalling_tasks = task;
2142 }
2143
2144 STATIC_INLINE void
2145 recoverSuspendedTask (Capability *cap, Task *task)
2146 {
2147     if (task->prev) {
2148         task->prev->next = task->next;
2149     } else {
2150         ASSERT(cap->suspended_ccalling_tasks == task);
2151         cap->suspended_ccalling_tasks = task->next;
2152     }
2153     if (task->next) {
2154         task->next->prev = task->prev;
2155     }
2156     task->next = task->prev = NULL;
2157 }
2158
2159 /* ---------------------------------------------------------------------------
2160  * Suspending & resuming Haskell threads.
2161  * 
2162  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2163  * its capability before calling the C function.  This allows another
2164  * task to pick up the capability and carry on running Haskell
2165  * threads.  It also means that if the C call blocks, it won't lock
2166  * the whole system.
2167  *
2168  * The Haskell thread making the C call is put to sleep for the
2169  * duration of the call, on the susepended_ccalling_threads queue.  We
2170  * give out a token to the task, which it can use to resume the thread
2171  * on return from the C function.
2172  * ------------------------------------------------------------------------- */
2173    
2174 void *
2175 suspendThread (StgRegTable *reg)
2176 {
2177   Capability *cap;
2178   int saved_errno = errno;
2179   StgTSO *tso;
2180   Task *task;
2181
2182   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2183    */
2184   cap = regTableToCapability(reg);
2185
2186   task = cap->running_task;
2187   tso = cap->r.rCurrentTSO;
2188
2189   IF_DEBUG(scheduler,
2190            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2191
2192   // XXX this might not be necessary --SDM
2193   tso->what_next = ThreadRunGHC;
2194
2195   threadPaused(cap,tso);
2196
2197   if(tso->blocked_exceptions == NULL)  {
2198       tso->why_blocked = BlockedOnCCall;
2199       tso->blocked_exceptions = END_TSO_QUEUE;
2200   } else {
2201       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2202   }
2203
2204   // Hand back capability
2205   task->suspended_tso = tso;
2206
2207   ACQUIRE_LOCK(&cap->lock);
2208
2209   suspendTask(cap,task);
2210   cap->in_haskell = rtsFalse;
2211   releaseCapability_(cap);
2212   
2213   RELEASE_LOCK(&cap->lock);
2214
2215 #if defined(THREADED_RTS)
2216   /* Preparing to leave the RTS, so ensure there's a native thread/task
2217      waiting to take over.
2218   */
2219   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2220 #endif
2221
2222   errno = saved_errno;
2223   return task;
2224 }
2225
2226 StgRegTable *
2227 resumeThread (void *task_)
2228 {
2229     StgTSO *tso;
2230     Capability *cap;
2231     int saved_errno = errno;
2232     Task *task = task_;
2233
2234     cap = task->cap;
2235     // Wait for permission to re-enter the RTS with the result.
2236     waitForReturnCapability(&cap,task);
2237     // we might be on a different capability now... but if so, our
2238     // entry on the suspended_ccalling_tasks list will also have been
2239     // migrated.
2240
2241     // Remove the thread from the suspended list
2242     recoverSuspendedTask(cap,task);
2243
2244     tso = task->suspended_tso;
2245     task->suspended_tso = NULL;
2246     tso->link = END_TSO_QUEUE;
2247     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2248     
2249     if (tso->why_blocked == BlockedOnCCall) {
2250         awakenBlockedQueue(cap,tso->blocked_exceptions);
2251         tso->blocked_exceptions = NULL;
2252     }
2253     
2254     /* Reset blocking status */
2255     tso->why_blocked  = NotBlocked;
2256     
2257     cap->r.rCurrentTSO = tso;
2258     cap->in_haskell = rtsTrue;
2259     errno = saved_errno;
2260
2261     return &cap->r;
2262 }
2263
2264 /* ---------------------------------------------------------------------------
2265  * Comparing Thread ids.
2266  *
2267  * This is used from STG land in the implementation of the
2268  * instances of Eq/Ord for ThreadIds.
2269  * ------------------------------------------------------------------------ */
2270
2271 int
2272 cmp_thread(StgPtr tso1, StgPtr tso2) 
2273
2274   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2275   StgThreadID id2 = ((StgTSO *)tso2)->id;
2276  
2277   if (id1 < id2) return (-1);
2278   if (id1 > id2) return 1;
2279   return 0;
2280 }
2281
2282 /* ---------------------------------------------------------------------------
2283  * Fetching the ThreadID from an StgTSO.
2284  *
2285  * This is used in the implementation of Show for ThreadIds.
2286  * ------------------------------------------------------------------------ */
2287 int
2288 rts_getThreadId(StgPtr tso) 
2289 {
2290   return ((StgTSO *)tso)->id;
2291 }
2292
2293 #ifdef DEBUG
2294 void
2295 labelThread(StgPtr tso, char *label)
2296 {
2297   int len;
2298   void *buf;
2299
2300   /* Caveat: Once set, you can only set the thread name to "" */
2301   len = strlen(label)+1;
2302   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2303   strncpy(buf,label,len);
2304   /* Update will free the old memory for us */
2305   updateThreadLabel(((StgTSO *)tso)->id,buf);
2306 }
2307 #endif /* DEBUG */
2308
2309 /* ---------------------------------------------------------------------------
2310    Create a new thread.
2311
2312    The new thread starts with the given stack size.  Before the
2313    scheduler can run, however, this thread needs to have a closure
2314    (and possibly some arguments) pushed on its stack.  See
2315    pushClosure() in Schedule.h.
2316
2317    createGenThread() and createIOThread() (in SchedAPI.h) are
2318    convenient packaged versions of this function.
2319
2320    currently pri (priority) is only used in a GRAN setup -- HWL
2321    ------------------------------------------------------------------------ */
2322 #if defined(GRAN)
2323 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2324 StgTSO *
2325 createThread(nat size, StgInt pri)
2326 #else
2327 StgTSO *
2328 createThread(Capability *cap, nat size)
2329 #endif
2330 {
2331     StgTSO *tso;
2332     nat stack_size;
2333
2334     /* sched_mutex is *not* required */
2335
2336     /* First check whether we should create a thread at all */
2337 #if defined(PARALLEL_HASKELL)
2338     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2339     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2340         threadsIgnored++;
2341         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2342                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2343         return END_TSO_QUEUE;
2344     }
2345     threadsCreated++;
2346 #endif
2347
2348 #if defined(GRAN)
2349     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2350 #endif
2351
2352     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2353
2354     /* catch ridiculously small stack sizes */
2355     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2356         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2357     }
2358
2359     stack_size = size - TSO_STRUCT_SIZEW;
2360     
2361     tso = (StgTSO *)allocateLocal(cap, size);
2362     TICK_ALLOC_TSO(stack_size, 0);
2363
2364     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2365 #if defined(GRAN)
2366     SET_GRAN_HDR(tso, ThisPE);
2367 #endif
2368
2369     // Always start with the compiled code evaluator
2370     tso->what_next = ThreadRunGHC;
2371
2372     tso->why_blocked  = NotBlocked;
2373     tso->blocked_exceptions = NULL;
2374     
2375     tso->saved_errno = 0;
2376     tso->bound = NULL;
2377     
2378     tso->stack_size     = stack_size;
2379     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2380                           - TSO_STRUCT_SIZEW;
2381     tso->sp             = (P_)&(tso->stack) + stack_size;
2382
2383     tso->trec = NO_TREC;
2384     
2385 #ifdef PROFILING
2386     tso->prof.CCCS = CCS_MAIN;
2387 #endif
2388     
2389   /* put a stop frame on the stack */
2390     tso->sp -= sizeofW(StgStopFrame);
2391     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2392     tso->link = END_TSO_QUEUE;
2393     
2394   // ToDo: check this
2395 #if defined(GRAN)
2396     /* uses more flexible routine in GranSim */
2397     insertThread(tso, CurrentProc);
2398 #else
2399     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2400      * from its creation
2401      */
2402 #endif
2403     
2404 #if defined(GRAN) 
2405     if (RtsFlags.GranFlags.GranSimStats.Full) 
2406         DumpGranEvent(GR_START,tso);
2407 #elif defined(PARALLEL_HASKELL)
2408     if (RtsFlags.ParFlags.ParStats.Full) 
2409         DumpGranEvent(GR_STARTQ,tso);
2410     /* HACk to avoid SCHEDULE 
2411        LastTSO = tso; */
2412 #endif
2413     
2414     /* Link the new thread on the global thread list.
2415      */
2416     ACQUIRE_LOCK(&sched_mutex);
2417     tso->id = next_thread_id++;  // while we have the mutex
2418     tso->global_link = all_threads;
2419     all_threads = tso;
2420     RELEASE_LOCK(&sched_mutex);
2421     
2422 #if defined(DIST)
2423     tso->dist.priority = MandatoryPriority; //by default that is...
2424 #endif
2425     
2426 #if defined(GRAN)
2427     tso->gran.pri = pri;
2428 # if defined(DEBUG)
2429     tso->gran.magic = TSO_MAGIC; // debugging only
2430 # endif
2431     tso->gran.sparkname   = 0;
2432     tso->gran.startedat   = CURRENT_TIME; 
2433     tso->gran.exported    = 0;
2434     tso->gran.basicblocks = 0;
2435     tso->gran.allocs      = 0;
2436     tso->gran.exectime    = 0;
2437     tso->gran.fetchtime   = 0;
2438     tso->gran.fetchcount  = 0;
2439     tso->gran.blocktime   = 0;
2440     tso->gran.blockcount  = 0;
2441     tso->gran.blockedat   = 0;
2442     tso->gran.globalsparks = 0;
2443     tso->gran.localsparks  = 0;
2444     if (RtsFlags.GranFlags.Light)
2445         tso->gran.clock  = Now; /* local clock */
2446     else
2447         tso->gran.clock  = 0;
2448     
2449     IF_DEBUG(gran,printTSO(tso));
2450 #elif defined(PARALLEL_HASKELL)
2451 # if defined(DEBUG)
2452     tso->par.magic = TSO_MAGIC; // debugging only
2453 # endif
2454     tso->par.sparkname   = 0;
2455     tso->par.startedat   = CURRENT_TIME; 
2456     tso->par.exported    = 0;
2457     tso->par.basicblocks = 0;
2458     tso->par.allocs      = 0;
2459     tso->par.exectime    = 0;
2460     tso->par.fetchtime   = 0;
2461     tso->par.fetchcount  = 0;
2462     tso->par.blocktime   = 0;
2463     tso->par.blockcount  = 0;
2464     tso->par.blockedat   = 0;
2465     tso->par.globalsparks = 0;
2466     tso->par.localsparks  = 0;
2467 #endif
2468     
2469 #if defined(GRAN)
2470     globalGranStats.tot_threads_created++;
2471     globalGranStats.threads_created_on_PE[CurrentProc]++;
2472     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2473     globalGranStats.tot_sq_probes++;
2474 #elif defined(PARALLEL_HASKELL)
2475     // collect parallel global statistics (currently done together with GC stats)
2476     if (RtsFlags.ParFlags.ParStats.Global &&
2477         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2478         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2479         globalParStats.tot_threads_created++;
2480     }
2481 #endif 
2482     
2483 #if defined(GRAN)
2484     IF_GRAN_DEBUG(pri,
2485                   sched_belch("==__ schedule: Created TSO %d (%p);",
2486                               CurrentProc, tso, tso->id));
2487 #elif defined(PARALLEL_HASKELL)
2488     IF_PAR_DEBUG(verbose,
2489                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2490                              (long)tso->id, tso, advisory_thread_count));
2491 #else
2492     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2493                                    (long)tso->id, (long)tso->stack_size));
2494 #endif    
2495     return tso;
2496 }
2497
2498 #if defined(PAR)
2499 /* RFP:
2500    all parallel thread creation calls should fall through the following routine.
2501 */
2502 StgTSO *
2503 createThreadFromSpark(rtsSpark spark) 
2504 { StgTSO *tso;
2505   ASSERT(spark != (rtsSpark)NULL);
2506 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2507   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2508   { threadsIgnored++;
2509     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2510           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2511     return END_TSO_QUEUE;
2512   }
2513   else
2514   { threadsCreated++;
2515     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2516     if (tso==END_TSO_QUEUE)     
2517       barf("createSparkThread: Cannot create TSO");
2518 #if defined(DIST)
2519     tso->priority = AdvisoryPriority;
2520 #endif
2521     pushClosure(tso,spark);
2522     addToRunQueue(tso);
2523     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2524   }
2525   return tso;
2526 }
2527 #endif
2528
2529 /*
2530   Turn a spark into a thread.
2531   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2532 */
2533 #if 0
2534 StgTSO *
2535 activateSpark (rtsSpark spark) 
2536 {
2537   StgTSO *tso;
2538
2539   tso = createSparkThread(spark);
2540   if (RtsFlags.ParFlags.ParStats.Full) {   
2541     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2542       IF_PAR_DEBUG(verbose,
2543                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2544                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2545   }
2546   // ToDo: fwd info on local/global spark to thread -- HWL
2547   // tso->gran.exported =  spark->exported;
2548   // tso->gran.locked =   !spark->global;
2549   // tso->gran.sparkname = spark->name;
2550
2551   return tso;
2552 }
2553 #endif
2554
2555 /* ---------------------------------------------------------------------------
2556  * scheduleThread()
2557  *
2558  * scheduleThread puts a thread on the end  of the runnable queue.
2559  * This will usually be done immediately after a thread is created.
2560  * The caller of scheduleThread must create the thread using e.g.
2561  * createThread and push an appropriate closure
2562  * on this thread's stack before the scheduler is invoked.
2563  * ------------------------------------------------------------------------ */
2564
2565 void
2566 scheduleThread(Capability *cap, StgTSO *tso)
2567 {
2568     // The thread goes at the *end* of the run-queue, to avoid possible
2569     // starvation of any threads already on the queue.
2570     appendToRunQueue(cap,tso);
2571 }
2572
2573 Capability *
2574 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2575 {
2576     Task *task;
2577
2578     // We already created/initialised the Task
2579     task = cap->running_task;
2580
2581     // This TSO is now a bound thread; make the Task and TSO
2582     // point to each other.
2583     tso->bound = task;
2584
2585     task->tso = tso;
2586     task->ret = ret;
2587     task->stat = NoStatus;
2588
2589     appendToRunQueue(cap,tso);
2590
2591     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2592
2593 #if defined(GRAN)
2594     /* GranSim specific init */
2595     CurrentTSO = m->tso;                // the TSO to run
2596     procStatus[MainProc] = Busy;        // status of main PE
2597     CurrentProc = MainProc;             // PE to run it on
2598 #endif
2599
2600     cap = schedule(cap,task);
2601
2602     ASSERT(task->stat != NoStatus);
2603     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2604
2605     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2606     return cap;
2607 }
2608
2609 /* ----------------------------------------------------------------------------
2610  * Starting Tasks
2611  * ------------------------------------------------------------------------- */
2612
2613 #if defined(THREADED_RTS)
2614 void
2615 workerStart(Task *task)
2616 {
2617     Capability *cap;
2618
2619     // See startWorkerTask().
2620     ACQUIRE_LOCK(&task->lock);
2621     cap = task->cap;
2622     RELEASE_LOCK(&task->lock);
2623
2624     // set the thread-local pointer to the Task:
2625     taskEnter(task);
2626
2627     // schedule() runs without a lock.
2628     cap = schedule(cap,task);
2629
2630     // On exit from schedule(), we have a Capability.
2631     releaseCapability(cap);
2632     taskStop(task);
2633 }
2634 #endif
2635
2636 /* ---------------------------------------------------------------------------
2637  * initScheduler()
2638  *
2639  * Initialise the scheduler.  This resets all the queues - if the
2640  * queues contained any threads, they'll be garbage collected at the
2641  * next pass.
2642  *
2643  * ------------------------------------------------------------------------ */
2644
2645 void 
2646 initScheduler(void)
2647 {
2648 #if defined(GRAN)
2649   nat i;
2650   for (i=0; i<=MAX_PROC; i++) {
2651     run_queue_hds[i]      = END_TSO_QUEUE;
2652     run_queue_tls[i]      = END_TSO_QUEUE;
2653     blocked_queue_hds[i]  = END_TSO_QUEUE;
2654     blocked_queue_tls[i]  = END_TSO_QUEUE;
2655     ccalling_threadss[i]  = END_TSO_QUEUE;
2656     blackhole_queue[i]    = END_TSO_QUEUE;
2657     sleeping_queue        = END_TSO_QUEUE;
2658   }
2659 #elif !defined(THREADED_RTS)
2660   blocked_queue_hd  = END_TSO_QUEUE;
2661   blocked_queue_tl  = END_TSO_QUEUE;
2662   sleeping_queue    = END_TSO_QUEUE;
2663 #endif
2664
2665   blackhole_queue   = END_TSO_QUEUE;
2666   all_threads       = END_TSO_QUEUE;
2667
2668   context_switch = 0;
2669   interrupted    = 0;
2670
2671   RtsFlags.ConcFlags.ctxtSwitchTicks =
2672       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2673       
2674 #if defined(THREADED_RTS)
2675   /* Initialise the mutex and condition variables used by
2676    * the scheduler. */
2677   initMutex(&sched_mutex);
2678 #endif
2679   
2680   ACQUIRE_LOCK(&sched_mutex);
2681
2682   /* A capability holds the state a native thread needs in
2683    * order to execute STG code. At least one capability is
2684    * floating around (only SMP builds have more than one).
2685    */
2686   initCapabilities();
2687
2688   initTaskManager();
2689
2690 #if defined(SMP) || defined(PARALLEL_HASKELL)
2691   initSparkPools();
2692 #endif
2693
2694 #if defined(SMP)
2695   /*
2696    * Eagerly start one worker to run each Capability, except for
2697    * Capability 0.  The idea is that we're probably going to start a
2698    * bound thread on Capability 0 pretty soon, so we don't want a
2699    * worker task hogging it.
2700    */
2701   { 
2702       nat i;
2703       Capability *cap;
2704       for (i = 1; i < n_capabilities; i++) {
2705           cap = &capabilities[i];
2706           ACQUIRE_LOCK(&cap->lock);
2707           startWorkerTask(cap, workerStart);
2708           RELEASE_LOCK(&cap->lock);
2709       }
2710   }
2711 #endif
2712
2713   RELEASE_LOCK(&sched_mutex);
2714 }
2715
2716 void
2717 exitScheduler( void )
2718 {
2719     interrupted = rtsTrue;
2720     shutting_down_scheduler = rtsTrue;
2721
2722 #if defined(THREADED_RTS)
2723     { 
2724         Task *task;
2725         nat i;
2726         
2727         ACQUIRE_LOCK(&sched_mutex);
2728         task = newBoundTask();
2729         RELEASE_LOCK(&sched_mutex);
2730
2731         for (i = 0; i < n_capabilities; i++) {
2732             shutdownCapability(&capabilities[i], task);
2733         }
2734         boundTaskExiting(task);
2735         stopTaskManager();
2736     }
2737 #endif
2738 }
2739
2740 /* ---------------------------------------------------------------------------
2741    Where are the roots that we know about?
2742
2743         - all the threads on the runnable queue
2744         - all the threads on the blocked queue
2745         - all the threads on the sleeping queue
2746         - all the thread currently executing a _ccall_GC
2747         - all the "main threads"
2748      
2749    ------------------------------------------------------------------------ */
2750
2751 /* This has to be protected either by the scheduler monitor, or by the
2752         garbage collection monitor (probably the latter).
2753         KH @ 25/10/99
2754 */
2755
2756 void
2757 GetRoots( evac_fn evac )
2758 {
2759     nat i;
2760     Capability *cap;
2761     Task *task;
2762
2763 #if defined(GRAN)
2764     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2765         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2766             evac((StgClosure **)&run_queue_hds[i]);
2767         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2768             evac((StgClosure **)&run_queue_tls[i]);
2769         
2770         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2771             evac((StgClosure **)&blocked_queue_hds[i]);
2772         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2773             evac((StgClosure **)&blocked_queue_tls[i]);
2774         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2775             evac((StgClosure **)&ccalling_threads[i]);
2776     }
2777
2778     markEventQueue();
2779
2780 #else /* !GRAN */
2781
2782     for (i = 0; i < n_capabilities; i++) {
2783         cap = &capabilities[i];
2784         evac((StgClosure **)&cap->run_queue_hd);
2785         evac((StgClosure **)&cap->run_queue_tl);
2786         
2787         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2788              task=task->next) {
2789             evac((StgClosure **)&task->suspended_tso);
2790         }
2791     }
2792     
2793 #if !defined(THREADED_RTS)
2794     evac((StgClosure **)&blocked_queue_hd);
2795     evac((StgClosure **)&blocked_queue_tl);
2796     evac((StgClosure **)&sleeping_queue);
2797 #endif 
2798 #endif
2799
2800     evac((StgClosure **)&blackhole_queue);
2801
2802 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2803     markSparkQueue(evac);
2804 #endif
2805     
2806 #if defined(RTS_USER_SIGNALS)
2807     // mark the signal handlers (signals should be already blocked)
2808     markSignalHandlers(evac);
2809 #endif
2810 }
2811
2812 /* -----------------------------------------------------------------------------
2813    performGC
2814
2815    This is the interface to the garbage collector from Haskell land.
2816    We provide this so that external C code can allocate and garbage
2817    collect when called from Haskell via _ccall_GC.
2818
2819    It might be useful to provide an interface whereby the programmer
2820    can specify more roots (ToDo).
2821    
2822    This needs to be protected by the GC condition variable above.  KH.
2823    -------------------------------------------------------------------------- */
2824
2825 static void (*extra_roots)(evac_fn);
2826
2827 void
2828 performGC(void)
2829 {
2830 #ifdef THREADED_RTS
2831     // ToDo: we have to grab all the capabilities here.
2832     errorBelch("performGC not supported in threaded RTS (yet)");
2833     stg_exit(EXIT_FAILURE);
2834 #endif
2835     /* Obligated to hold this lock upon entry */
2836     GarbageCollect(GetRoots,rtsFalse);
2837 }
2838
2839 void
2840 performMajorGC(void)
2841 {
2842 #ifdef THREADED_RTS
2843     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2844     stg_exit(EXIT_FAILURE);
2845 #endif
2846     GarbageCollect(GetRoots,rtsTrue);
2847 }
2848
2849 static void
2850 AllRoots(evac_fn evac)
2851 {
2852     GetRoots(evac);             // the scheduler's roots
2853     extra_roots(evac);          // the user's roots
2854 }
2855
2856 void
2857 performGCWithRoots(void (*get_roots)(evac_fn))
2858 {
2859 #ifdef THREADED_RTS
2860     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2861     stg_exit(EXIT_FAILURE);
2862 #endif
2863     extra_roots = get_roots;
2864     GarbageCollect(AllRoots,rtsFalse);
2865 }
2866
2867 /* -----------------------------------------------------------------------------
2868    Stack overflow
2869
2870    If the thread has reached its maximum stack size, then raise the
2871    StackOverflow exception in the offending thread.  Otherwise
2872    relocate the TSO into a larger chunk of memory and adjust its stack
2873    size appropriately.
2874    -------------------------------------------------------------------------- */
2875
2876 static StgTSO *
2877 threadStackOverflow(Capability *cap, StgTSO *tso)
2878 {
2879   nat new_stack_size, stack_words;
2880   lnat new_tso_size;
2881   StgPtr new_sp;
2882   StgTSO *dest;
2883
2884   IF_DEBUG(sanity,checkTSO(tso));
2885   if (tso->stack_size >= tso->max_stack_size) {
2886
2887     IF_DEBUG(gc,
2888              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2889                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2890              /* If we're debugging, just print out the top of the stack */
2891              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2892                                               tso->sp+64)));
2893
2894     /* Send this thread the StackOverflow exception */
2895     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2896     return tso;
2897   }
2898
2899   /* Try to double the current stack size.  If that takes us over the
2900    * maximum stack size for this thread, then use the maximum instead.
2901    * Finally round up so the TSO ends up as a whole number of blocks.
2902    */
2903   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2904   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2905                                        TSO_STRUCT_SIZE)/sizeof(W_);
2906   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2907   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2908
2909   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2910
2911   dest = (StgTSO *)allocate(new_tso_size);
2912   TICK_ALLOC_TSO(new_stack_size,0);
2913
2914   /* copy the TSO block and the old stack into the new area */
2915   memcpy(dest,tso,TSO_STRUCT_SIZE);
2916   stack_words = tso->stack + tso->stack_size - tso->sp;
2917   new_sp = (P_)dest + new_tso_size - stack_words;
2918   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2919
2920   /* relocate the stack pointers... */
2921   dest->sp         = new_sp;
2922   dest->stack_size = new_stack_size;
2923         
2924   /* Mark the old TSO as relocated.  We have to check for relocated
2925    * TSOs in the garbage collector and any primops that deal with TSOs.
2926    *
2927    * It's important to set the sp value to just beyond the end
2928    * of the stack, so we don't attempt to scavenge any part of the
2929    * dead TSO's stack.
2930    */
2931   tso->what_next = ThreadRelocated;
2932   tso->link = dest;
2933   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2934   tso->why_blocked = NotBlocked;
2935
2936   IF_PAR_DEBUG(verbose,
2937                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2938                      tso->id, tso, tso->stack_size);
2939                /* If we're debugging, just print out the top of the stack */
2940                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2941                                                 tso->sp+64)));
2942   
2943   IF_DEBUG(sanity,checkTSO(tso));
2944 #if 0
2945   IF_DEBUG(scheduler,printTSO(dest));
2946 #endif
2947
2948   return dest;
2949 }
2950
2951 /* ---------------------------------------------------------------------------
2952    Wake up a queue that was blocked on some resource.
2953    ------------------------------------------------------------------------ */
2954
2955 #if defined(GRAN)
2956 STATIC_INLINE void
2957 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2958 {
2959 }
2960 #elif defined(PARALLEL_HASKELL)
2961 STATIC_INLINE void
2962 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2963 {
2964   /* write RESUME events to log file and
2965      update blocked and fetch time (depending on type of the orig closure) */
2966   if (RtsFlags.ParFlags.ParStats.Full) {
2967     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2968                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2969                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2970     if (emptyRunQueue())
2971       emitSchedule = rtsTrue;
2972
2973     switch (get_itbl(node)->type) {
2974         case FETCH_ME_BQ:
2975           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2976           break;
2977         case RBH:
2978         case FETCH_ME:
2979         case BLACKHOLE_BQ:
2980           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2981           break;
2982 #ifdef DIST
2983         case MVAR:
2984           break;
2985 #endif    
2986         default:
2987           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2988         }
2989       }
2990 }
2991 #endif
2992
2993 #if defined(GRAN)
2994 StgBlockingQueueElement *
2995 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2996 {
2997     StgTSO *tso;
2998     PEs node_loc, tso_loc;
2999
3000     node_loc = where_is(node); // should be lifted out of loop
3001     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3002     tso_loc = where_is((StgClosure *)tso);
3003     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3004       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3005       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3006       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3007       // insertThread(tso, node_loc);
3008       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3009                 ResumeThread,
3010                 tso, node, (rtsSpark*)NULL);
3011       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3012       // len_local++;
3013       // len++;
3014     } else { // TSO is remote (actually should be FMBQ)
3015       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3016                                   RtsFlags.GranFlags.Costs.gunblocktime +
3017                                   RtsFlags.GranFlags.Costs.latency;
3018       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3019                 UnblockThread,
3020                 tso, node, (rtsSpark*)NULL);
3021       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3022       // len++;
3023     }
3024     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3025     IF_GRAN_DEBUG(bq,
3026                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3027                           (node_loc==tso_loc ? "Local" : "Global"), 
3028                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3029     tso->block_info.closure = NULL;
3030     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3031                              tso->id, tso));
3032 }
3033 #elif defined(PARALLEL_HASKELL)
3034 StgBlockingQueueElement *
3035 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3036 {
3037     StgBlockingQueueElement *next;
3038
3039     switch (get_itbl(bqe)->type) {
3040     case TSO:
3041       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3042       /* if it's a TSO just push it onto the run_queue */
3043       next = bqe->link;
3044       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3045       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3046       threadRunnable();
3047       unblockCount(bqe, node);
3048       /* reset blocking status after dumping event */
3049       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3050       break;
3051
3052     case BLOCKED_FETCH:
3053       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3054       next = bqe->link;
3055       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3056       PendingFetches = (StgBlockedFetch *)bqe;
3057       break;
3058
3059 # if defined(DEBUG)
3060       /* can ignore this case in a non-debugging setup; 
3061          see comments on RBHSave closures above */
3062     case CONSTR:
3063       /* check that the closure is an RBHSave closure */
3064       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3065              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3066              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3067       break;
3068
3069     default:
3070       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3071            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3072            (StgClosure *)bqe);
3073 # endif
3074     }
3075   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3076   return next;
3077 }
3078 #endif
3079
3080 StgTSO *
3081 unblockOne(Capability *cap, StgTSO *tso)
3082 {
3083   StgTSO *next;
3084
3085   ASSERT(get_itbl(tso)->type == TSO);
3086   ASSERT(tso->why_blocked != NotBlocked);
3087   tso->why_blocked = NotBlocked;
3088   next = tso->link;
3089   tso->link = END_TSO_QUEUE;
3090
3091   // We might have just migrated this TSO to our Capability:
3092   if (tso->bound) {
3093       tso->bound->cap = cap;
3094   }
3095
3096   appendToRunQueue(cap,tso);
3097
3098   // we're holding a newly woken thread, make sure we context switch
3099   // quickly so we can migrate it if necessary.
3100   context_switch = 1;
3101   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3102   return next;
3103 }
3104
3105
3106 #if defined(GRAN)
3107 void 
3108 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3109 {
3110   StgBlockingQueueElement *bqe;
3111   PEs node_loc;
3112   nat len = 0; 
3113
3114   IF_GRAN_DEBUG(bq, 
3115                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3116                       node, CurrentProc, CurrentTime[CurrentProc], 
3117                       CurrentTSO->id, CurrentTSO));
3118
3119   node_loc = where_is(node);
3120
3121   ASSERT(q == END_BQ_QUEUE ||
3122          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3123          get_itbl(q)->type == CONSTR); // closure (type constructor)
3124   ASSERT(is_unique(node));
3125
3126   /* FAKE FETCH: magically copy the node to the tso's proc;
3127      no Fetch necessary because in reality the node should not have been 
3128      moved to the other PE in the first place
3129   */
3130   if (CurrentProc!=node_loc) {
3131     IF_GRAN_DEBUG(bq, 
3132                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3133                         node, node_loc, CurrentProc, CurrentTSO->id, 
3134                         // CurrentTSO, where_is(CurrentTSO),
3135                         node->header.gran.procs));
3136     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3137     IF_GRAN_DEBUG(bq, 
3138                   debugBelch("## new bitmask of node %p is %#x\n",
3139                         node, node->header.gran.procs));
3140     if (RtsFlags.GranFlags.GranSimStats.Global) {
3141       globalGranStats.tot_fake_fetches++;
3142     }
3143   }
3144
3145   bqe = q;
3146   // ToDo: check: ASSERT(CurrentProc==node_loc);
3147   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3148     //next = bqe->link;
3149     /* 
3150        bqe points to the current element in the queue
3151        next points to the next element in the queue
3152     */
3153     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3154     //tso_loc = where_is(tso);
3155     len++;
3156     bqe = unblockOne(bqe, node);
3157   }
3158
3159   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3160      the closure to make room for the anchor of the BQ */
3161   if (bqe!=END_BQ_QUEUE) {
3162     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3163     /*
3164     ASSERT((info_ptr==&RBH_Save_0_info) ||
3165            (info_ptr==&RBH_Save_1_info) ||
3166            (info_ptr==&RBH_Save_2_info));
3167     */
3168     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3169     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3170     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3171
3172     IF_GRAN_DEBUG(bq,
3173                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3174                         node, info_type(node)));
3175   }
3176
3177   /* statistics gathering */
3178   if (RtsFlags.GranFlags.GranSimStats.Global) {
3179     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3180     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3181     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3182     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3183   }
3184   IF_GRAN_DEBUG(bq,
3185                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3186                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3187 }
3188 #elif defined(PARALLEL_HASKELL)
3189 void 
3190 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3191 {
3192   StgBlockingQueueElement *bqe;
3193
3194   IF_PAR_DEBUG(verbose, 
3195                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3196                      node, mytid));
3197 #ifdef DIST  
3198   //RFP
3199   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3200     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3201     return;
3202   }
3203 #endif
3204   
3205   ASSERT(q == END_BQ_QUEUE ||
3206          get_itbl(q)->type == TSO ||           
3207          get_itbl(q)->type == BLOCKED_FETCH || 
3208          get_itbl(q)->type == CONSTR); 
3209
3210   bqe = q;
3211   while (get_itbl(bqe)->type==TSO || 
3212          get_itbl(bqe)->type==BLOCKED_FETCH) {
3213     bqe = unblockOne(bqe, node);
3214   }
3215 }
3216
3217 #else   /* !GRAN && !PARALLEL_HASKELL */
3218
3219 void
3220 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3221 {
3222     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3223                              // Exception.cmm
3224     while (tso != END_TSO_QUEUE) {
3225         tso = unblockOne(cap,tso);
3226     }
3227 }
3228 #endif
3229
3230 /* ---------------------------------------------------------------------------
3231    Interrupt execution
3232    - usually called inside a signal handler so it mustn't do anything fancy.   
3233    ------------------------------------------------------------------------ */
3234
3235 void
3236 interruptStgRts(void)
3237 {
3238     interrupted    = 1;
3239     context_switch = 1;
3240 #if defined(THREADED_RTS)
3241     prodAllCapabilities();
3242 #endif
3243 }
3244
3245 /* -----------------------------------------------------------------------------
3246    Unblock a thread
3247
3248    This is for use when we raise an exception in another thread, which
3249    may be blocked.
3250    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3251    -------------------------------------------------------------------------- */
3252
3253 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3254 /*
3255   NB: only the type of the blocking queue is different in GranSim and GUM
3256       the operations on the queue-elements are the same
3257       long live polymorphism!
3258
3259   Locks: sched_mutex is held upon entry and exit.
3260
3261 */
3262 static void
3263 unblockThread(Capability *cap, StgTSO *tso)
3264 {
3265   StgBlockingQueueElement *t, **last;
3266
3267   switch (tso->why_blocked) {
3268
3269   case NotBlocked:
3270     return;  /* not blocked */
3271
3272   case BlockedOnSTM:
3273     // Be careful: nothing to do here!  We tell the scheduler that the thread
3274     // is runnable and we leave it to the stack-walking code to abort the 
3275     // transaction while unwinding the stack.  We should perhaps have a debugging
3276     // test to make sure that this really happens and that the 'zombie' transaction
3277     // does not get committed.
3278     goto done;
3279
3280   case BlockedOnMVar:
3281     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3282     {
3283       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3284       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3285
3286       last = (StgBlockingQueueElement **)&mvar->head;
3287       for (t = (StgBlockingQueueElement *)mvar->head; 
3288            t != END_BQ_QUEUE; 
3289            last = &t->link, last_tso = t, t = t->link) {
3290         if (t == (StgBlockingQueueElement *)tso) {
3291           *last = (StgBlockingQueueElement *)tso->link;
3292           if (mvar->tail == tso) {
3293             mvar->tail = (StgTSO *)last_tso;
3294           }
3295           goto done;
3296         }
3297       }
3298       barf("unblockThread (MVAR): TSO not found");
3299     }
3300
3301   case BlockedOnBlackHole:
3302     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3303     {
3304       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3305
3306       last = &bq->blocking_queue;
3307       for (t = bq->blocking_queue; 
3308            t != END_BQ_QUEUE; 
3309            last = &t->link, t = t->link) {
3310         if (t == (StgBlockingQueueElement *)tso) {
3311           *last = (StgBlockingQueueElement *)tso->link;
3312           goto done;
3313         }
3314       }
3315       barf("unblockThread (BLACKHOLE): TSO not found");
3316     }
3317
3318   case BlockedOnException:
3319     {
3320       StgTSO *target  = tso->block_info.tso;
3321
3322       ASSERT(get_itbl(target)->type == TSO);
3323
3324       if (target->what_next == ThreadRelocated) {
3325           target = target->link;
3326           ASSERT(get_itbl(target)->type == TSO);
3327       }
3328
3329       ASSERT(target->blocked_exceptions != NULL);
3330
3331       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3332       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3333            t != END_BQ_QUEUE; 
3334            last = &t->link, t = t->link) {
3335         ASSERT(get_itbl(t)->type == TSO);
3336         if (t == (StgBlockingQueueElement *)tso) {
3337           *last = (StgBlockingQueueElement *)tso->link;
3338           goto done;
3339         }
3340       }
3341       barf("unblockThread (Exception): TSO not found");
3342     }
3343
3344   case BlockedOnRead:
3345   case BlockedOnWrite:
3346 #if defined(mingw32_HOST_OS)
3347   case BlockedOnDoProc:
3348 #endif
3349     {
3350       /* take TSO off blocked_queue */
3351       StgBlockingQueueElement *prev = NULL;
3352       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3353            prev = t, t = t->link) {
3354         if (t == (StgBlockingQueueElement *)tso) {
3355           if (prev == NULL) {
3356             blocked_queue_hd = (StgTSO *)t->link;
3357             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3358               blocked_queue_tl = END_TSO_QUEUE;
3359             }
3360           } else {
3361             prev->link = t->link;
3362             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3363               blocked_queue_tl = (StgTSO *)prev;
3364             }
3365           }
3366 #if defined(mingw32_HOST_OS)
3367           /* (Cooperatively) signal that the worker thread should abort
3368            * the request.
3369            */
3370           abandonWorkRequest(tso->block_info.async_result->reqID);
3371 #endif
3372           goto done;
3373         }
3374       }
3375       barf("unblockThread (I/O): TSO not found");
3376     }
3377
3378   case BlockedOnDelay:
3379     {
3380       /* take TSO off sleeping_queue */
3381       StgBlockingQueueElement *prev = NULL;
3382       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3383            prev = t, t = t->link) {
3384         if (t == (StgBlockingQueueElement *)tso) {
3385           if (prev == NULL) {
3386             sleeping_queue = (StgTSO *)t->link;
3387           } else {
3388             prev->link = t->link;
3389           }
3390           goto done;
3391         }
3392       }
3393       barf("unblockThread (delay): TSO not found");
3394     }
3395
3396   default:
3397     barf("unblockThread");
3398   }
3399
3400  done:
3401   tso->link = END_TSO_QUEUE;
3402   tso->why_blocked = NotBlocked;
3403   tso->block_info.closure = NULL;
3404   pushOnRunQueue(cap,tso);
3405 }
3406 #else
3407 static void
3408 unblockThread(Capability *cap, StgTSO *tso)
3409 {
3410   StgTSO *t, **last;
3411   
3412   /* To avoid locking unnecessarily. */
3413   if (tso->why_blocked == NotBlocked) {
3414     return;
3415   }
3416
3417   switch (tso->why_blocked) {
3418
3419   case BlockedOnSTM:
3420     // Be careful: nothing to do here!  We tell the scheduler that the thread
3421     // is runnable and we leave it to the stack-walking code to abort the 
3422     // transaction while unwinding the stack.  We should perhaps have a debugging
3423     // test to make sure that this really happens and that the 'zombie' transaction
3424     // does not get committed.
3425     goto done;
3426
3427   case BlockedOnMVar:
3428     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3429     {
3430       StgTSO *last_tso = END_TSO_QUEUE;
3431       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3432
3433       last = &mvar->head;
3434       for (t = mvar->head; t != END_TSO_QUEUE; 
3435            last = &t->link, last_tso = t, t = t->link) {
3436         if (t == tso) {
3437           *last = tso->link;
3438           if (mvar->tail == tso) {
3439             mvar->tail = last_tso;
3440           }
3441           goto done;
3442         }
3443       }
3444       barf("unblockThread (MVAR): TSO not found");
3445     }
3446
3447   case BlockedOnBlackHole:
3448     {
3449       last = &blackhole_queue;
3450       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3451            last = &t->link, t = t->link) {
3452         if (t == tso) {
3453           *last = tso->link;
3454           goto done;
3455         }
3456       }
3457       barf("unblockThread (BLACKHOLE): TSO not found");
3458     }
3459
3460   case BlockedOnException:
3461     {
3462       StgTSO *target  = tso->block_info.tso;
3463
3464       ASSERT(get_itbl(target)->type == TSO);
3465
3466       while (target->what_next == ThreadRelocated) {
3467           target = target->link;
3468           ASSERT(get_itbl(target)->type == TSO);
3469       }
3470       
3471       ASSERT(target->blocked_exceptions != NULL);
3472
3473       last = &target->blocked_exceptions;
3474       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3475            last = &t->link, t = t->link) {
3476         ASSERT(get_itbl(t)->type == TSO);
3477         if (t == tso) {
3478           *last = tso->link;
3479           goto done;
3480         }
3481       }
3482       barf("unblockThread (Exception): TSO not found");
3483     }
3484
3485 #if !defined(THREADED_RTS)
3486   case BlockedOnRead:
3487   case BlockedOnWrite:
3488 #if defined(mingw32_HOST_OS)
3489   case BlockedOnDoProc:
3490 #endif
3491     {
3492       StgTSO *prev = NULL;
3493       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3494            prev = t, t = t->link) {
3495         if (t == tso) {
3496           if (prev == NULL) {
3497             blocked_queue_hd = t->link;
3498             if (blocked_queue_tl == t) {
3499               blocked_queue_tl = END_TSO_QUEUE;
3500             }
3501           } else {
3502             prev->link = t->link;
3503             if (blocked_queue_tl == t) {
3504               blocked_queue_tl = prev;
3505             }
3506           }
3507 #if defined(mingw32_HOST_OS)
3508           /* (Cooperatively) signal that the worker thread should abort
3509            * the request.
3510            */
3511           abandonWorkRequest(tso->block_info.async_result->reqID);
3512 #endif
3513           goto done;
3514         }
3515       }
3516       barf("unblockThread (I/O): TSO not found");
3517     }
3518
3519   case BlockedOnDelay:
3520     {
3521       StgTSO *prev = NULL;
3522       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3523            prev = t, t = t->link) {
3524         if (t == tso) {
3525           if (prev == NULL) {
3526             sleeping_queue = t->link;
3527           } else {
3528             prev->link = t->link;
3529           }
3530           goto done;
3531         }
3532       }
3533       barf("unblockThread (delay): TSO not found");
3534     }
3535 #endif
3536
3537   default:
3538     barf("unblockThread");
3539   }
3540
3541  done:
3542   tso->link = END_TSO_QUEUE;
3543   tso->why_blocked = NotBlocked;
3544   tso->block_info.closure = NULL;
3545   appendToRunQueue(cap,tso);
3546 }
3547 #endif
3548
3549 /* -----------------------------------------------------------------------------
3550  * checkBlackHoles()
3551  *
3552  * Check the blackhole_queue for threads that can be woken up.  We do
3553  * this periodically: before every GC, and whenever the run queue is
3554  * empty.
3555  *
3556  * An elegant solution might be to just wake up all the blocked
3557  * threads with awakenBlockedQueue occasionally: they'll go back to
3558  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3559  * doesn't give us a way to tell whether we've actually managed to
3560  * wake up any threads, so we would be busy-waiting.
3561  *
3562  * -------------------------------------------------------------------------- */
3563
3564 static rtsBool
3565 checkBlackHoles (Capability *cap)
3566 {
3567     StgTSO **prev, *t;
3568     rtsBool any_woke_up = rtsFalse;
3569     StgHalfWord type;
3570
3571     // blackhole_queue is global:
3572     ASSERT_LOCK_HELD(&sched_mutex);
3573
3574     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3575
3576     // ASSUMES: sched_mutex
3577     prev = &blackhole_queue;
3578     t = blackhole_queue;
3579     while (t != END_TSO_QUEUE) {
3580         ASSERT(t->why_blocked == BlockedOnBlackHole);
3581         type = get_itbl(t->block_info.closure)->type;
3582         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3583             IF_DEBUG(sanity,checkTSO(t));
3584             t = unblockOne(cap, t);
3585             // urk, the threads migrate to the current capability
3586             // here, but we'd like to keep them on the original one.
3587             *prev = t;
3588             any_woke_up = rtsTrue;
3589         } else {
3590             prev = &t->link;
3591             t = t->link;
3592         }
3593     }
3594
3595     return any_woke_up;
3596 }
3597
3598 /* -----------------------------------------------------------------------------
3599  * raiseAsync()
3600  *
3601  * The following function implements the magic for raising an
3602  * asynchronous exception in an existing thread.
3603  *
3604  * We first remove the thread from any queue on which it might be
3605  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3606  *
3607  * We strip the stack down to the innermost CATCH_FRAME, building
3608  * thunks in the heap for all the active computations, so they can 
3609  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3610  * an application of the handler to the exception, and push it on
3611  * the top of the stack.
3612  * 
3613  * How exactly do we save all the active computations?  We create an
3614  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3615  * AP_STACKs pushes everything from the corresponding update frame
3616  * upwards onto the stack.  (Actually, it pushes everything up to the
3617  * next update frame plus a pointer to the next AP_STACK object.
3618  * Entering the next AP_STACK object pushes more onto the stack until we
3619  * reach the last AP_STACK object - at which point the stack should look
3620  * exactly as it did when we killed the TSO and we can continue
3621  * execution by entering the closure on top of the stack.
3622  *
3623  * We can also kill a thread entirely - this happens if either (a) the 
3624  * exception passed to raiseAsync is NULL, or (b) there's no
3625  * CATCH_FRAME on the stack.  In either case, we strip the entire
3626  * stack and replace the thread with a zombie.
3627  *
3628  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3629  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3630  * the TSO is currently blocked on or on the run queue of.
3631  *
3632  * -------------------------------------------------------------------------- */
3633  
3634 void
3635 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3636 {
3637     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3638 }
3639
3640 void
3641 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3642 {
3643     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3644 }
3645
3646 static void
3647 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3648             rtsBool stop_at_atomically, StgPtr stop_here)
3649 {
3650     StgRetInfoTable *info;
3651     StgPtr sp, frame;
3652     nat i;
3653   
3654     // Thread already dead?
3655     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3656         return;
3657     }
3658
3659     IF_DEBUG(scheduler, 
3660              sched_belch("raising exception in thread %ld.", (long)tso->id));
3661     
3662     // Remove it from any blocking queues
3663     unblockThread(cap,tso);
3664
3665     sp = tso->sp;
3666     
3667     // The stack freezing code assumes there's a closure pointer on
3668     // the top of the stack, so we have to arrange that this is the case...
3669     //
3670     if (sp[0] == (W_)&stg_enter_info) {
3671         sp++;
3672     } else {
3673         sp--;
3674         sp[0] = (W_)&stg_dummy_ret_closure;
3675     }
3676
3677     frame = sp + 1;
3678     while (stop_here == NULL || frame < stop_here) {
3679
3680         // 1. Let the top of the stack be the "current closure"
3681         //
3682         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3683         // CATCH_FRAME.
3684         //
3685         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3686         // current closure applied to the chunk of stack up to (but not
3687         // including) the update frame.  This closure becomes the "current
3688         // closure".  Go back to step 2.
3689         //
3690         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3691         // top of the stack applied to the exception.
3692         // 
3693         // 5. If it's a STOP_FRAME, then kill the thread.
3694         // 
3695         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3696         // transaction
3697        
3698         info = get_ret_itbl((StgClosure *)frame);
3699
3700         switch (info->i.type) {
3701
3702         case UPDATE_FRAME:
3703         {
3704             StgAP_STACK * ap;
3705             nat words;
3706             
3707             // First build an AP_STACK consisting of the stack chunk above the
3708             // current update frame, with the top word on the stack as the
3709             // fun field.
3710             //
3711             words = frame - sp - 1;
3712             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3713             
3714             ap->size = words;
3715             ap->fun  = (StgClosure *)sp[0];
3716             sp++;
3717             for(i=0; i < (nat)words; ++i) {
3718                 ap->payload[i] = (StgClosure *)*sp++;
3719             }
3720             
3721             SET_HDR(ap,&stg_AP_STACK_info,
3722                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3723             TICK_ALLOC_UP_THK(words+1,0);
3724             
3725             IF_DEBUG(scheduler,
3726                      debugBelch("sched: Updating ");
3727                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3728                      debugBelch(" with ");
3729                      printObj((StgClosure *)ap);
3730                 );
3731
3732             // Replace the updatee with an indirection
3733             //
3734             // Warning: if we're in a loop, more than one update frame on
3735             // the stack may point to the same object.  Be careful not to
3736             // overwrite an IND_OLDGEN in this case, because we'll screw
3737             // up the mutable lists.  To be on the safe side, don't
3738             // overwrite any kind of indirection at all.  See also
3739             // threadSqueezeStack in GC.c, where we have to make a similar
3740             // check.
3741             //
3742             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3743                 // revert the black hole
3744                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3745                                (StgClosure *)ap);
3746             }
3747             sp += sizeofW(StgUpdateFrame) - 1;
3748             sp[0] = (W_)ap; // push onto stack
3749             frame = sp + 1;
3750             continue; //no need to bump frame
3751         }
3752
3753         case STOP_FRAME:
3754             // We've stripped the entire stack, the thread is now dead.
3755             tso->what_next = ThreadKilled;
3756             tso->sp = frame + sizeofW(StgStopFrame);
3757             return;
3758
3759         case CATCH_FRAME:
3760             // If we find a CATCH_FRAME, and we've got an exception to raise,
3761             // then build the THUNK raise(exception), and leave it on
3762             // top of the CATCH_FRAME ready to enter.
3763             //
3764         {
3765 #ifdef PROFILING
3766             StgCatchFrame *cf = (StgCatchFrame *)frame;
3767 #endif
3768             StgThunk *raise;
3769             
3770             if (exception == NULL) break;
3771
3772             // we've got an exception to raise, so let's pass it to the
3773             // handler in this frame.
3774             //
3775             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3776             TICK_ALLOC_SE_THK(1,0);
3777             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3778             raise->payload[0] = exception;
3779             
3780             // throw away the stack from Sp up to the CATCH_FRAME.
3781             //
3782             sp = frame - 1;
3783             
3784             /* Ensure that async excpetions are blocked now, so we don't get
3785              * a surprise exception before we get around to executing the
3786              * handler.
3787              */
3788             if (tso->blocked_exceptions == NULL) {
3789                 tso->blocked_exceptions = END_TSO_QUEUE;
3790             }
3791
3792             /* Put the newly-built THUNK on top of the stack, ready to execute
3793              * when the thread restarts.
3794              */
3795             sp[0] = (W_)raise;
3796             sp[-1] = (W_)&stg_enter_info;
3797             tso->sp = sp-1;
3798             tso->what_next = ThreadRunGHC;
3799             IF_DEBUG(sanity, checkTSO(tso));
3800             return;
3801         }
3802             
3803         case ATOMICALLY_FRAME:
3804             if (stop_at_atomically) {
3805                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3806                 stmCondemnTransaction(cap, tso -> trec);
3807 #ifdef REG_R1
3808                 tso->sp = frame;
3809 #else
3810                 // R1 is not a register: the return convention for IO in
3811                 // this case puts the return value on the stack, so we
3812                 // need to set up the stack to return to the atomically
3813                 // frame properly...
3814                 tso->sp = frame - 2;
3815                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3816                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3817 #endif
3818                 tso->what_next = ThreadRunGHC;
3819                 return;
3820             }
3821             // Not stop_at_atomically... fall through and abort the
3822             // transaction.
3823             
3824         case CATCH_RETRY_FRAME:
3825             // IF we find an ATOMICALLY_FRAME then we abort the
3826             // current transaction and propagate the exception.  In
3827             // this case (unlike ordinary exceptions) we do not care
3828             // whether the transaction is valid or not because its
3829             // possible validity cannot have caused the exception
3830             // and will not be visible after the abort.
3831             IF_DEBUG(stm,
3832                      debugBelch("Found atomically block delivering async exception\n"));
3833             StgTRecHeader *trec = tso -> trec;
3834             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3835             stmAbortTransaction(cap, trec);
3836             tso -> trec = outer;
3837             break;
3838             
3839         default:
3840             break;
3841         }
3842
3843         // move on to the next stack frame
3844         frame += stack_frame_sizeW((StgClosure *)frame);
3845     }
3846
3847     // if we got here, then we stopped at stop_here
3848     ASSERT(stop_here != NULL);
3849 }
3850
3851 /* -----------------------------------------------------------------------------
3852    Deleting threads
3853
3854    This is used for interruption (^C) and forking, and corresponds to
3855    raising an exception but without letting the thread catch the
3856    exception.
3857    -------------------------------------------------------------------------- */
3858
3859 static void 
3860 deleteThread (Capability *cap, StgTSO *tso)
3861 {
3862   if (tso->why_blocked != BlockedOnCCall &&
3863       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3864       raiseAsync(cap,tso,NULL);
3865   }
3866 }
3867
3868 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3869 static void 
3870 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3871 { // for forkProcess only:
3872   // delete thread without giving it a chance to catch the KillThread exception
3873
3874   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3875       return;
3876   }
3877
3878   if (tso->why_blocked != BlockedOnCCall &&
3879       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3880       unblockThread(cap,tso);
3881   }
3882
3883   tso->what_next = ThreadKilled;
3884 }
3885 #endif
3886
3887 /* -----------------------------------------------------------------------------
3888    raiseExceptionHelper
3889    
3890    This function is called by the raise# primitve, just so that we can
3891    move some of the tricky bits of raising an exception from C-- into
3892    C.  Who knows, it might be a useful re-useable thing here too.
3893    -------------------------------------------------------------------------- */
3894
3895 StgWord
3896 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3897 {
3898     Capability *cap = regTableToCapability(reg);
3899     StgThunk *raise_closure = NULL;
3900     StgPtr p, next;
3901     StgRetInfoTable *info;
3902     //
3903     // This closure represents the expression 'raise# E' where E
3904     // is the exception raise.  It is used to overwrite all the
3905     // thunks which are currently under evaluataion.
3906     //
3907
3908     //    
3909     // LDV profiling: stg_raise_info has THUNK as its closure
3910     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3911     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3912     // 1 does not cause any problem unless profiling is performed.
3913     // However, when LDV profiling goes on, we need to linearly scan
3914     // small object pool, where raise_closure is stored, so we should
3915     // use MIN_UPD_SIZE.
3916     //
3917     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3918     //                                 sizeofW(StgClosure)+1);
3919     //
3920
3921     //
3922     // Walk up the stack, looking for the catch frame.  On the way,
3923     // we update any closures pointed to from update frames with the
3924     // raise closure that we just built.
3925     //
3926     p = tso->sp;
3927     while(1) {
3928         info = get_ret_itbl((StgClosure *)p);
3929         next = p + stack_frame_sizeW((StgClosure *)p);
3930         switch (info->i.type) {
3931             
3932         case UPDATE_FRAME:
3933             // Only create raise_closure if we need to.
3934             if (raise_closure == NULL) {
3935                 raise_closure = 
3936                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3937                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3938                 raise_closure->payload[0] = exception;
3939             }
3940             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3941             p = next;
3942             continue;
3943
3944         case ATOMICALLY_FRAME:
3945             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3946             tso->sp = p;
3947             return ATOMICALLY_FRAME;
3948             
3949         case CATCH_FRAME:
3950             tso->sp = p;
3951             return CATCH_FRAME;
3952
3953         case CATCH_STM_FRAME:
3954             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3955             tso->sp = p;
3956             return CATCH_STM_FRAME;
3957             
3958         case STOP_FRAME:
3959             tso->sp = p;
3960             return STOP_FRAME;
3961
3962         case CATCH_RETRY_FRAME:
3963         default:
3964             p = next; 
3965             continue;
3966         }
3967     }
3968 }
3969
3970
3971 /* -----------------------------------------------------------------------------
3972    findRetryFrameHelper
3973
3974    This function is called by the retry# primitive.  It traverses the stack
3975    leaving tso->sp referring to the frame which should handle the retry.  
3976
3977    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3978    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3979
3980    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3981    despite the similar implementation.
3982
3983    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3984    not be created within memory transactions.
3985    -------------------------------------------------------------------------- */
3986
3987 StgWord
3988 findRetryFrameHelper (StgTSO *tso)
3989 {
3990   StgPtr           p, next;
3991   StgRetInfoTable *info;
3992
3993   p = tso -> sp;
3994   while (1) {
3995     info = get_ret_itbl((StgClosure *)p);
3996     next = p + stack_frame_sizeW((StgClosure *)p);
3997     switch (info->i.type) {
3998       
3999     case ATOMICALLY_FRAME:
4000       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4001       tso->sp = p;
4002       return ATOMICALLY_FRAME;
4003       
4004     case CATCH_RETRY_FRAME:
4005       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4006       tso->sp = p;
4007       return CATCH_RETRY_FRAME;
4008       
4009     case CATCH_STM_FRAME:
4010     default:
4011       ASSERT(info->i.type != CATCH_FRAME);
4012       ASSERT(info->i.type != STOP_FRAME);
4013       p = next; 
4014       continue;
4015     }
4016   }
4017 }
4018
4019 /* -----------------------------------------------------------------------------
4020    resurrectThreads is called after garbage collection on the list of
4021    threads found to be garbage.  Each of these threads will be woken
4022    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4023    on an MVar, or NonTermination if the thread was blocked on a Black
4024    Hole.
4025
4026    Locks: assumes we hold *all* the capabilities.
4027    -------------------------------------------------------------------------- */
4028
4029 void
4030 resurrectThreads (StgTSO *threads)
4031 {
4032     StgTSO *tso, *next;
4033     Capability *cap;
4034
4035     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4036         next = tso->global_link;
4037         tso->global_link = all_threads;
4038         all_threads = tso;
4039         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4040         
4041         // Wake up the thread on the Capability it was last on for a
4042         // bound thread, or last_free_capability otherwise.
4043         if (tso->bound) {
4044             cap = tso->bound->cap;
4045         } else {
4046             cap = last_free_capability;
4047         }
4048         
4049         switch (tso->why_blocked) {
4050         case BlockedOnMVar:
4051         case BlockedOnException:
4052             /* Called by GC - sched_mutex lock is currently held. */
4053             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4054             break;
4055         case BlockedOnBlackHole:
4056             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4057             break;
4058         case BlockedOnSTM:
4059             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4060             break;
4061         case NotBlocked:
4062             /* This might happen if the thread was blocked on a black hole
4063              * belonging to a thread that we've just woken up (raiseAsync
4064              * can wake up threads, remember...).
4065              */
4066             continue;
4067         default:
4068             barf("resurrectThreads: thread blocked in a strange way");
4069         }
4070     }
4071 }
4072
4073 /* ----------------------------------------------------------------------------
4074  * Debugging: why is a thread blocked
4075  * [Also provides useful information when debugging threaded programs
4076  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4077    ------------------------------------------------------------------------- */
4078
4079 #if DEBUG
4080 static void
4081 printThreadBlockage(StgTSO *tso)
4082 {
4083   switch (tso->why_blocked) {
4084   case BlockedOnRead:
4085     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4086     break;
4087   case BlockedOnWrite:
4088     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4089     break;
4090 #if defined(mingw32_HOST_OS)
4091     case BlockedOnDoProc:
4092     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4093     break;
4094 #endif
4095   case BlockedOnDelay:
4096     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4097     break;
4098   case BlockedOnMVar:
4099     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4100     break;
4101   case BlockedOnException:
4102     debugBelch("is blocked on delivering an exception to thread %d",
4103             tso->block_info.tso->id);
4104     break;
4105   case BlockedOnBlackHole:
4106     debugBelch("is blocked on a black hole");
4107     break;
4108   case NotBlocked:
4109     debugBelch("is not blocked");
4110     break;
4111 #if defined(PARALLEL_HASKELL)
4112   case BlockedOnGA:
4113     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4114             tso->block_info.closure, info_type(tso->block_info.closure));
4115     break;
4116   case BlockedOnGA_NoSend:
4117     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4118             tso->block_info.closure, info_type(tso->block_info.closure));
4119     break;
4120 #endif
4121   case BlockedOnCCall:
4122     debugBelch("is blocked on an external call");
4123     break;
4124   case BlockedOnCCall_NoUnblockExc:
4125     debugBelch("is blocked on an external call (exceptions were already blocked)");
4126     break;
4127   case BlockedOnSTM:
4128     debugBelch("is blocked on an STM operation");
4129     break;
4130   default:
4131     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4132          tso->why_blocked, tso->id, tso);
4133   }
4134 }
4135
4136 void
4137 printThreadStatus(StgTSO *t)
4138 {
4139     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4140     {
4141       void *label = lookupThreadLabel(t->id);
4142       if (label) debugBelch("[\"%s\"] ",(char *)label);
4143     }
4144     if (t->what_next == ThreadRelocated) {
4145         debugBelch("has been relocated...\n");
4146     } else {
4147         switch (t->what_next) {
4148         case ThreadKilled:
4149             debugBelch("has been killed");
4150             break;
4151         case ThreadComplete:
4152             debugBelch("has completed");
4153             break;
4154         default:
4155             printThreadBlockage(t);
4156         }
4157         debugBelch("\n");
4158     }
4159 }
4160
4161 void
4162 printAllThreads(void)
4163 {
4164   StgTSO *t, *next;
4165   nat i;
4166   Capability *cap;
4167
4168 # if defined(GRAN)
4169   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4170   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4171                        time_string, rtsFalse/*no commas!*/);
4172
4173   debugBelch("all threads at [%s]:\n", time_string);
4174 # elif defined(PARALLEL_HASKELL)
4175   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4176   ullong_format_string(CURRENT_TIME,
4177                        time_string, rtsFalse/*no commas!*/);
4178
4179   debugBelch("all threads at [%s]:\n", time_string);
4180 # else
4181   debugBelch("all threads:\n");
4182 # endif
4183
4184   for (i = 0; i < n_capabilities; i++) {
4185       cap = &capabilities[i];
4186       debugBelch("threads on capability %d:\n", cap->no);
4187       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4188           printThreadStatus(t);
4189       }
4190   }
4191
4192   debugBelch("other threads:\n");
4193   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4194       if (t->why_blocked != NotBlocked) {
4195           printThreadStatus(t);
4196       }
4197       if (t->what_next == ThreadRelocated) {
4198           next = t->link;
4199       } else {
4200           next = t->global_link;
4201       }
4202   }
4203 }
4204
4205 // useful from gdb
4206 void 
4207 printThreadQueue(StgTSO *t)
4208 {
4209     nat i = 0;
4210     for (; t != END_TSO_QUEUE; t = t->link) {
4211         printThreadStatus(t);
4212         i++;
4213     }
4214     debugBelch("%d threads on queue\n", i);
4215 }
4216
4217 /* 
4218    Print a whole blocking queue attached to node (debugging only).
4219 */
4220 # if defined(PARALLEL_HASKELL)
4221 void 
4222 print_bq (StgClosure *node)
4223 {
4224   StgBlockingQueueElement *bqe;
4225   StgTSO *tso;
4226   rtsBool end;
4227
4228   debugBelch("## BQ of closure %p (%s): ",
4229           node, info_type(node));
4230
4231   /* should cover all closures that may have a blocking queue */
4232   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4233          get_itbl(node)->type == FETCH_ME_BQ ||
4234          get_itbl(node)->type == RBH ||
4235          get_itbl(node)->type == MVAR);
4236     
4237   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4238
4239   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4240 }
4241
4242 /* 
4243    Print a whole blocking queue starting with the element bqe.
4244 */
4245 void 
4246 print_bqe (StgBlockingQueueElement *bqe)
4247 {
4248   rtsBool end;
4249
4250   /* 
4251      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4252   */
4253   for (end = (bqe==END_BQ_QUEUE);
4254        !end; // iterate until bqe points to a CONSTR
4255        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4256        bqe = end ? END_BQ_QUEUE : bqe->link) {
4257     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4258     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4259     /* types of closures that may appear in a blocking queue */
4260     ASSERT(get_itbl(bqe)->type == TSO ||           
4261            get_itbl(bqe)->type == BLOCKED_FETCH || 
4262            get_itbl(bqe)->type == CONSTR); 
4263     /* only BQs of an RBH end with an RBH_Save closure */
4264     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4265
4266     switch (get_itbl(bqe)->type) {
4267     case TSO:
4268       debugBelch(" TSO %u (%x),",
4269               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4270       break;
4271     case BLOCKED_FETCH:
4272       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4273               ((StgBlockedFetch *)bqe)->node, 
4274               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4275               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4276               ((StgBlockedFetch *)bqe)->ga.weight);
4277       break;
4278     case CONSTR:
4279       debugBelch(" %s (IP %p),",
4280               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4281                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4282                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4283                "RBH_Save_?"), get_itbl(bqe));
4284       break;
4285     default:
4286       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4287            info_type((StgClosure *)bqe)); // , node, info_type(node));
4288       break;
4289     }
4290   } /* for */
4291   debugBelch("\n");
4292 }
4293 # elif defined(GRAN)
4294 void 
4295 print_bq (StgClosure *node)
4296 {
4297   StgBlockingQueueElement *bqe;
4298   PEs node_loc, tso_loc;
4299   rtsBool end;
4300
4301   /* should cover all closures that may have a blocking queue */
4302   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4303          get_itbl(node)->type == FETCH_ME_BQ ||
4304          get_itbl(node)->type == RBH);
4305     
4306   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4307   node_loc = where_is(node);
4308
4309   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4310           node, info_type(node), node_loc);
4311
4312   /* 
4313      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4314   */
4315   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4316        !end; // iterate until bqe points to a CONSTR
4317        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4318     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4319     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4320     /* types of closures that may appear in a blocking queue */
4321     ASSERT(get_itbl(bqe)->type == TSO ||           
4322            get_itbl(bqe)->type == CONSTR); 
4323     /* only BQs of an RBH end with an RBH_Save closure */
4324     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4325
4326     tso_loc = where_is((StgClosure *)bqe);
4327     switch (get_itbl(bqe)->type) {
4328     case TSO:
4329       debugBelch(" TSO %d (%p) on [PE %d],",
4330               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4331       break;
4332     case CONSTR:
4333       debugBelch(" %s (IP %p),",
4334               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4335                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4336                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4337                "RBH_Save_?"), get_itbl(bqe));
4338       break;
4339     default:
4340       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4341            info_type((StgClosure *)bqe), node, info_type(node));
4342       break;
4343     }
4344   } /* for */
4345   debugBelch("\n");
4346 }
4347 # endif
4348
4349 #if defined(PARALLEL_HASKELL)
4350 static nat
4351 run_queue_len(void)
4352 {
4353     nat i;
4354     StgTSO *tso;
4355     
4356     for (i=0, tso=run_queue_hd; 
4357          tso != END_TSO_QUEUE;
4358          i++, tso=tso->link) {
4359         /* nothing */
4360     }
4361         
4362     return i;
4363 }
4364 #endif
4365
4366 void
4367 sched_belch(char *s, ...)
4368 {
4369     va_list ap;
4370     va_start(ap,s);
4371 #ifdef THREADED_RTS
4372     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4373 #elif defined(PARALLEL_HASKELL)
4374     debugBelch("== ");
4375 #else
4376     debugBelch("sched: ");
4377 #endif
4378     vdebugBelch(s, ap);
4379     debugBelch("\n");
4380     va_end(ap);
4381 }
4382
4383 #endif /* DEBUG */