[project @ 2005-11-03 16:34:45 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex = INIT_MUTEX_VAR;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif          
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418         if (shutting_down_scheduler) {
419             IF_DEBUG(scheduler, sched_belch("shutting down"));
420             // If we are a worker, just exit.  If we're a bound thread
421             // then we will exit below when we've removed our TSO from
422             // the run queue.
423             if (task->tso == NULL && emptyRunQueue(cap)) {
424                 return cap;
425             }
426         } else {
427             IF_DEBUG(scheduler, sched_belch("interrupted"));
428         }
429     }
430
431 #if defined(not_yet) && defined(SMP)
432     //
433     // Top up the run queue from our spark pool.  We try to make the
434     // number of threads in the run queue equal to the number of
435     // free capabilities.
436     //
437     {
438         StgClosure *spark;
439         if (emptyRunQueue()) {
440             spark = findSpark(rtsFalse);
441             if (spark == NULL) {
442                 break; /* no more sparks in the pool */
443             } else {
444                 createSparkThread(spark);         
445                 IF_DEBUG(scheduler,
446                          sched_belch("==^^ turning spark of closure %p into a thread",
447                                      (StgClosure *)spark));
448             }
449         }
450     }
451 #endif // SMP
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464
465     // Normally, the only way we can get here with no threads to
466     // run is if a keyboard interrupt received during 
467     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468     // Additionally, it is not fatal for the
469     // threaded RTS to reach here with no threads to run.
470     //
471     // win32: might be here due to awaitEvent() being abandoned
472     // as a result of a console event having been delivered.
473     if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475         ASSERT(interrupted);
476 #endif
477         continue; // nothing to do
478     }
479
480 #if defined(PARALLEL_HASKELL)
481     scheduleSendPendingMessages();
482     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483         continue;
484
485 #if defined(SPARKS)
486     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
487 #endif
488
489     /* If we still have no work we need to send a FISH to get a spark
490        from another PE */
491     if (emptyRunQueue(cap)) {
492         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493         ASSERT(rtsFalse); // should not happen at the moment
494     }
495     // from here: non-empty run queue.
496     //  TODO: merge above case with this, only one call processMessages() !
497     if (PacketsWaiting()) {  /* process incoming messages, if
498                                 any pending...  only in else
499                                 because getRemoteWork waits for
500                                 messages as well */
501         receivedFinish = processMessages();
502     }
503 #endif
504
505 #if defined(GRAN)
506     scheduleProcessEvent(event);
507 #endif
508
509     // 
510     // Get a thread to run
511     //
512     t = popRunQueue(cap);
513
514 #if defined(GRAN) || defined(PAR)
515     scheduleGranParReport(); // some kind of debuging output
516 #else
517     // Sanity check the thread we're about to run.  This can be
518     // expensive if there is lots of thread switching going on...
519     IF_DEBUG(sanity,checkTSO(t));
520 #endif
521
522 #if defined(THREADED_RTS)
523     // Check whether we can run this thread in the current task.
524     // If not, we have to pass our capability to the right task.
525     {
526         Task *bound = t->bound;
527       
528         if (bound) {
529             if (bound == task) {
530                 IF_DEBUG(scheduler,
531                          sched_belch("### Running thread %d in bound thread",
532                                      t->id));
533                 // yes, the Haskell thread is bound to the current native thread
534             } else {
535                 IF_DEBUG(scheduler,
536                          sched_belch("### thread %d bound to another OS thread",
537                                      t->id));
538                 // no, bound to a different Haskell thread: pass to that thread
539                 pushOnRunQueue(cap,t);
540                 continue;
541             }
542         } else {
543             // The thread we want to run is unbound.
544             if (task->tso) { 
545                 IF_DEBUG(scheduler,
546                          sched_belch("### this OS thread cannot run thread %d", t->id));
547                 // no, the current native thread is bound to a different
548                 // Haskell thread, so pass it to any worker thread
549                 pushOnRunQueue(cap,t);
550                 continue; 
551             }
552         }
553     }
554 #endif
555
556     cap->r.rCurrentTSO = t;
557     
558     /* context switches are initiated by the timer signal, unless
559      * the user specified "context switch as often as possible", with
560      * +RTS -C0
561      */
562     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563         && !emptyThreadQueues(cap)) {
564         context_switch = 1;
565     }
566          
567 run_thread:
568
569     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]));
571
572 #if defined(PROFILING)
573     startHeapProfTimer();
574 #endif
575
576     // ----------------------------------------------------------------------
577     // Run the current thread 
578
579     prev_what_next = t->what_next;
580
581     errno = t->saved_errno;
582     cap->in_haskell = rtsTrue;
583
584     recent_activity = ACTIVITY_YES;
585
586     switch (prev_what_next) {
587         
588     case ThreadKilled:
589     case ThreadComplete:
590         /* Thread already finished, return to scheduler. */
591         ret = ThreadFinished;
592         break;
593         
594     case ThreadRunGHC:
595     {
596         StgRegTable *r;
597         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598         cap = regTableToCapability(r);
599         ret = r->rRet;
600         break;
601     }
602     
603     case ThreadInterpret:
604         cap = interpretBCO(cap);
605         ret = cap->r.rRet;
606         break;
607         
608     default:
609         barf("schedule: invalid what_next field");
610     }
611
612     cap->in_haskell = rtsFalse;
613
614     // The TSO might have moved, eg. if it re-entered the RTS and a GC
615     // happened.  So find the new location:
616     t = cap->r.rCurrentTSO;
617
618 #ifdef SMP
619     // If ret is ThreadBlocked, and this Task is bound to the TSO that
620     // blocked, we are in limbo - the TSO is now owned by whatever it
621     // is blocked on, and may in fact already have been woken up,
622     // perhaps even on a different Capability.  It may be the case
623     // that task->cap != cap.  We better yield this Capability
624     // immediately and return to normaility.
625     if (ret == ThreadBlocked) {
626         IF_DEBUG(scheduler,
627                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
628                             t->id, whatNext_strs[t->what_next]));
629         continue;
630     }
631 #endif
632
633     ASSERT_CAPABILITY_INVARIANTS(cap,task);
634
635     // And save the current errno in this thread.
636     t->saved_errno = errno;
637
638     // ----------------------------------------------------------------------
639     
640     // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
642     stopHeapProfTimer();
643     CCCS = CCS_SYSTEM;
644 #endif
645     
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653 #if defined(THREADED_RTS)
654     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656     IF_DEBUG(scheduler,debugBelch("sched: "););
657 #endif
658     
659     schedulePostRunThread();
660
661     ready_to_gc = rtsFalse;
662
663     switch (ret) {
664     case HeapOverflow:
665         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
666         break;
667
668     case StackOverflow:
669         scheduleHandleStackOverflow(cap,task,t);
670         break;
671
672     case ThreadYielding:
673         if (scheduleHandleYield(cap, t, prev_what_next)) {
674             // shortcut for switching between compiler/interpreter:
675             goto run_thread; 
676         }
677         break;
678
679     case ThreadBlocked:
680         scheduleHandleThreadBlocked(t);
681         break;
682
683     case ThreadFinished:
684         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685         ASSERT_CAPABILITY_INVARIANTS(cap,task);
686         break;
687
688     default:
689       barf("schedule: invalid thread return code %d", (int)ret);
690     }
691
692     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694   } /* end of while() */
695
696   IF_PAR_DEBUG(verbose,
697                debugBelch("== Leaving schedule() after having received Finish\n"));
698 }
699
700 /* ----------------------------------------------------------------------------
701  * Setting up the scheduler loop
702  * ------------------------------------------------------------------------- */
703
704 static void
705 schedulePreLoop(void)
706 {
707 #if defined(GRAN) 
708     /* set up first event to get things going */
709     /* ToDo: assign costs for system setup and init MainTSO ! */
710     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711               ContinueThread, 
712               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
713     
714     IF_DEBUG(gran,
715              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
716                         CurrentTSO);
717              G_TSO(CurrentTSO, 5));
718     
719     if (RtsFlags.GranFlags.Light) {
720         /* Save current time; GranSim Light only */
721         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
722     }      
723 #endif
724 }
725
726 /* -----------------------------------------------------------------------------
727  * schedulePushWork()
728  *
729  * Push work to other Capabilities if we have some.
730  * -------------------------------------------------------------------------- */
731
732 #ifdef SMP
733 static void
734 schedulePushWork(Capability *cap USED_WHEN_SMP, 
735                  Task *task      USED_WHEN_SMP)
736 {
737     Capability *free_caps[n_capabilities], *cap0;
738     nat i, n_free_caps;
739
740     // Check whether we have more threads on our run queue that we
741     // could hand to another Capability.
742     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
774
775         prev = cap->run_queue_hd;
776         t = prev->link;
777         prev->link = END_TSO_QUEUE;
778         i = 0;
779         for (; t != END_TSO_QUEUE; t = next) {
780             next = t->link;
781             t->link = END_TSO_QUEUE;
782             if (t->what_next == ThreadRelocated
783                 || t->bound == task) { // don't move my bound thread
784                 prev->link = t;
785                 prev = t;
786             } else if (i == n_free_caps) {
787                 i = 0;
788                 // keep one for us
789                 prev->link = t;
790                 prev = t;
791             } else {
792                 appendToRunQueue(free_caps[i],t);
793                 if (t->bound) { t->bound->cap = free_caps[i]; }
794                 i++;
795             }
796         }
797         cap->run_queue_tl = prev;
798
799         // release the capabilities
800         for (i = 0; i < n_free_caps; i++) {
801             task->cap = free_caps[i];
802             releaseCapability(free_caps[i]);
803         }
804     }
805     task->cap = cap; // reset to point to our Capability.
806 }
807 #endif
808
809 /* ----------------------------------------------------------------------------
810  * Start any pending signal handlers
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleStartSignalHandlers(Capability *cap)
815 {
816 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817     if (signals_pending()) { // safe outside the lock
818         startSignalHandlers(cap);
819     }
820 #endif
821 }
822
823 /* ----------------------------------------------------------------------------
824  * Check for blocked threads that can be woken up.
825  * ------------------------------------------------------------------------- */
826
827 static void
828 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
829 {
830 #if !defined(THREADED_RTS)
831     //
832     // Check whether any waiting threads need to be woken up.  If the
833     // run queue is empty, and there are no other tasks running, we
834     // can wait indefinitely for something to happen.
835     //
836     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
837     {
838         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
839     }
840 #endif
841 }
842
843
844 /* ----------------------------------------------------------------------------
845  * Check for threads blocked on BLACKHOLEs that can be woken up
846  * ------------------------------------------------------------------------- */
847 static void
848 scheduleCheckBlackHoles (Capability *cap)
849 {
850     if ( blackholes_need_checking ) // check without the lock first
851     {
852         ACQUIRE_LOCK(&sched_mutex);
853         if ( blackholes_need_checking ) {
854             checkBlackHoles(cap);
855             blackholes_need_checking = rtsFalse;
856         }
857         RELEASE_LOCK(&sched_mutex);
858     }
859 }
860
861 /* ----------------------------------------------------------------------------
862  * Detect deadlock conditions and attempt to resolve them.
863  * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleDetectDeadlock (Capability *cap, Task *task)
867 {
868
869 #if defined(PARALLEL_HASKELL)
870     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
871     return;
872 #endif
873
874     /* 
875      * Detect deadlock: when we have no threads to run, there are no
876      * threads blocked, waiting for I/O, or sleeping, and all the
877      * other tasks are waiting for work, we must have a deadlock of
878      * some description.
879      */
880     if ( emptyThreadQueues(cap) )
881     {
882 #if defined(THREADED_RTS)
883         /* 
884          * In the threaded RTS, we only check for deadlock if there
885          * has been no activity in a complete timeslice.  This means
886          * we won't eagerly start a full GC just because we don't have
887          * any threads to run currently.
888          */
889         if (recent_activity != ACTIVITY_INACTIVE) return;
890 #endif
891
892         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
893
894         // Garbage collection can release some new threads due to
895         // either (a) finalizers or (b) threads resurrected because
896         // they are unreachable and will therefore be sent an
897         // exception.  Any threads thus released will be immediately
898         // runnable.
899         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
900         recent_activity = ACTIVITY_DONE_GC;
901         
902         if ( !emptyRunQueue(cap) ) return;
903
904 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905         /* If we have user-installed signal handlers, then wait
906          * for signals to arrive rather then bombing out with a
907          * deadlock.
908          */
909         if ( anyUserHandlers() ) {
910             IF_DEBUG(scheduler, 
911                      sched_belch("still deadlocked, waiting for signals..."));
912
913             awaitUserSignals();
914
915             if (signals_pending()) {
916                 startSignalHandlers(cap);
917             }
918
919             // either we have threads to run, or we were interrupted:
920             ASSERT(!emptyRunQueue(cap) || interrupted);
921         }
922 #endif
923
924 #if !defined(THREADED_RTS)
925         /* Probably a real deadlock.  Send the current main thread the
926          * Deadlock exception.
927          */
928         if (task->tso) {
929             switch (task->tso->why_blocked) {
930             case BlockedOnSTM:
931             case BlockedOnBlackHole:
932             case BlockedOnException:
933             case BlockedOnMVar:
934                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
935                 return;
936             default:
937                 barf("deadlock: main thread blocked in a strange way");
938             }
939         }
940         return;
941 #endif
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Process an event (GRAN only)
947  * ------------------------------------------------------------------------- */
948
949 #if defined(GRAN)
950 static StgTSO *
951 scheduleProcessEvent(rtsEvent *event)
952 {
953     StgTSO *t;
954
955     if (RtsFlags.GranFlags.Light)
956       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
957
958     /* adjust time based on time-stamp */
959     if (event->time > CurrentTime[CurrentProc] &&
960         event->evttype != ContinueThread)
961       CurrentTime[CurrentProc] = event->time;
962     
963     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
964     if (!RtsFlags.GranFlags.Light)
965       handleIdlePEs();
966
967     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
968
969     /* main event dispatcher in GranSim */
970     switch (event->evttype) {
971       /* Should just be continuing execution */
972     case ContinueThread:
973       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
974       /* ToDo: check assertion
975       ASSERT(run_queue_hd != (StgTSO*)NULL &&
976              run_queue_hd != END_TSO_QUEUE);
977       */
978       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979       if (!RtsFlags.GranFlags.DoAsyncFetch &&
980           procStatus[CurrentProc]==Fetching) {
981         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982               CurrentTSO->id, CurrentTSO, CurrentProc);
983         goto next_thread;
984       } 
985       /* Ignore ContinueThreads for completed threads */
986       if (CurrentTSO->what_next == ThreadComplete) {
987         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
988               CurrentTSO->id, CurrentTSO, CurrentProc);
989         goto next_thread;
990       } 
991       /* Ignore ContinueThreads for threads that are being migrated */
992       if (PROCS(CurrentTSO)==Nowhere) { 
993         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994               CurrentTSO->id, CurrentTSO, CurrentProc);
995         goto next_thread;
996       }
997       /* The thread should be at the beginning of the run queue */
998       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
999         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000               CurrentTSO->id, CurrentTSO, CurrentProc);
1001         break; // run the thread anyway
1002       }
1003       /*
1004       new_event(proc, proc, CurrentTime[proc],
1005                 FindWork,
1006                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1007       goto next_thread; 
1008       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1009       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1010
1011     case FetchNode:
1012       do_the_fetchnode(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case GlobalBlock:
1016       do_the_globalblock(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case FetchReply:
1020       do_the_fetchreply(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case UnblockThread:   /* Move from the blocked queue to the tail of */
1024       do_the_unblock(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case ResumeThread:  /* Move from the blocked queue to the tail of */
1028       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1029       event->tso->gran.blocktime += 
1030         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1031       do_the_startthread(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case StartThread:
1035       do_the_startthread(event);
1036       goto next_thread;             /* handle next event in event queue  */
1037       
1038     case MoveThread:
1039       do_the_movethread(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case MoveSpark:
1043       do_the_movespark(event);
1044       goto next_thread;             /* handle next event in event queue  */
1045       
1046     case FindWork:
1047       do_the_findwork(event);
1048       goto next_thread;             /* handle next event in event queue  */
1049       
1050     default:
1051       barf("Illegal event type %u\n", event->evttype);
1052     }  /* switch */
1053     
1054     /* This point was scheduler_loop in the old RTS */
1055
1056     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1057
1058     TimeOfLastEvent = CurrentTime[CurrentProc];
1059     TimeOfNextEvent = get_time_of_next_event();
1060     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1061     // CurrentTSO = ThreadQueueHd;
1062
1063     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1064                          TimeOfNextEvent));
1065
1066     if (RtsFlags.GranFlags.Light) 
1067       GranSimLight_leave_system(event, &ActiveTSO); 
1068
1069     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1070
1071     IF_DEBUG(gran, 
1072              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1073
1074     /* in a GranSim setup the TSO stays on the run queue */
1075     t = CurrentTSO;
1076     /* Take a thread from the run queue. */
1077     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1078
1079     IF_DEBUG(gran, 
1080              debugBelch("GRAN: About to run current thread, which is\n");
1081              G_TSO(t,5));
1082
1083     context_switch = 0; // turned on via GranYield, checking events and time slice
1084
1085     IF_DEBUG(gran, 
1086              DumpGranEvent(GR_SCHEDULE, t));
1087
1088     procStatus[CurrentProc] = Busy;
1089 }
1090 #endif // GRAN
1091
1092 /* ----------------------------------------------------------------------------
1093  * Send pending messages (PARALLEL_HASKELL only)
1094  * ------------------------------------------------------------------------- */
1095
1096 #if defined(PARALLEL_HASKELL)
1097 static StgTSO *
1098 scheduleSendPendingMessages(void)
1099 {
1100     StgSparkPool *pool;
1101     rtsSpark spark;
1102     StgTSO *t;
1103
1104 # if defined(PAR) // global Mem.Mgmt., omit for now
1105     if (PendingFetches != END_BF_QUEUE) {
1106         processFetches();
1107     }
1108 # endif
1109     
1110     if (RtsFlags.ParFlags.BufferTime) {
1111         // if we use message buffering, we must send away all message
1112         // packets which have become too old...
1113         sendOldBuffers(); 
1114     }
1115 }
1116 #endif
1117
1118 /* ----------------------------------------------------------------------------
1119  * Activate spark threads (PARALLEL_HASKELL only)
1120  * ------------------------------------------------------------------------- */
1121
1122 #if defined(PARALLEL_HASKELL)
1123 static void
1124 scheduleActivateSpark(void)
1125 {
1126 #if defined(SPARKS)
1127   ASSERT(emptyRunQueue());
1128 /* We get here if the run queue is empty and want some work.
1129    We try to turn a spark into a thread, and add it to the run queue,
1130    from where it will be picked up in the next iteration of the scheduler
1131    loop.
1132 */
1133
1134       /* :-[  no local threads => look out for local sparks */
1135       /* the spark pool for the current PE */
1136       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1137       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1138           pool->hd < pool->tl) {
1139         /* 
1140          * ToDo: add GC code check that we really have enough heap afterwards!!
1141          * Old comment:
1142          * If we're here (no runnable threads) and we have pending
1143          * sparks, we must have a space problem.  Get enough space
1144          * to turn one of those pending sparks into a
1145          * thread... 
1146          */
1147
1148         spark = findSpark(rtsFalse);            /* get a spark */
1149         if (spark != (rtsSpark) NULL) {
1150           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1151           IF_PAR_DEBUG(fish, // schedule,
1152                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1153                              tso->id, tso, advisory_thread_count));
1154
1155           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1156             IF_PAR_DEBUG(fish, // schedule,
1157                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1158                             spark));
1159             return rtsFalse; /* failed to generate a thread */
1160           }                  /* otherwise fall through & pick-up new tso */
1161         } else {
1162           IF_PAR_DEBUG(fish, // schedule,
1163                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1164                              spark_queue_len(pool)));
1165           return rtsFalse;  /* failed to generate a thread */
1166         }
1167         return rtsTrue;  /* success in generating a thread */
1168   } else { /* no more threads permitted or pool empty */
1169     return rtsFalse;  /* failed to generateThread */
1170   }
1171 #else
1172   tso = NULL; // avoid compiler warning only
1173   return rtsFalse;  /* dummy in non-PAR setup */
1174 #endif // SPARKS
1175 }
1176 #endif // PARALLEL_HASKELL
1177
1178 /* ----------------------------------------------------------------------------
1179  * Get work from a remote node (PARALLEL_HASKELL only)
1180  * ------------------------------------------------------------------------- */
1181     
1182 #if defined(PARALLEL_HASKELL)
1183 static rtsBool
1184 scheduleGetRemoteWork(rtsBool *receivedFinish)
1185 {
1186   ASSERT(emptyRunQueue());
1187
1188   if (RtsFlags.ParFlags.BufferTime) {
1189         IF_PAR_DEBUG(verbose, 
1190                 debugBelch("...send all pending data,"));
1191         {
1192           nat i;
1193           for (i=1; i<=nPEs; i++)
1194             sendImmediately(i); // send all messages away immediately
1195         }
1196   }
1197 # ifndef SPARKS
1198         //++EDEN++ idle() , i.e. send all buffers, wait for work
1199         // suppress fishing in EDEN... just look for incoming messages
1200         // (blocking receive)
1201   IF_PAR_DEBUG(verbose, 
1202                debugBelch("...wait for incoming messages...\n"));
1203   *receivedFinish = processMessages(); // blocking receive...
1204
1205         // and reenter scheduling loop after having received something
1206         // (return rtsFalse below)
1207
1208 # else /* activate SPARKS machinery */
1209 /* We get here, if we have no work, tried to activate a local spark, but still
1210    have no work. We try to get a remote spark, by sending a FISH message.
1211    Thread migration should be added here, and triggered when a sequence of 
1212    fishes returns without work. */
1213         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1214
1215       /* =8-[  no local sparks => look for work on other PEs */
1216         /*
1217          * We really have absolutely no work.  Send out a fish
1218          * (there may be some out there already), and wait for
1219          * something to arrive.  We clearly can't run any threads
1220          * until a SCHEDULE or RESUME arrives, and so that's what
1221          * we're hoping to see.  (Of course, we still have to
1222          * respond to other types of messages.)
1223          */
1224         rtsTime now = msTime() /*CURRENT_TIME*/;
1225         IF_PAR_DEBUG(verbose, 
1226                      debugBelch("--  now=%ld\n", now));
1227         IF_PAR_DEBUG(fish, // verbose,
1228              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229                  (last_fish_arrived_at!=0 &&
1230                   last_fish_arrived_at+delay > now)) {
1231                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1232                      now, last_fish_arrived_at+delay, 
1233                      last_fish_arrived_at,
1234                      delay);
1235              });
1236   
1237         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1239           if (last_fish_arrived_at==0 ||
1240               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1241             /* outstandingFishes is set in sendFish, processFish;
1242                avoid flooding system with fishes via delay */
1243     next_fish_to_send_at = 0;  
1244   } else {
1245     /* ToDo: this should be done in the main scheduling loop to avoid the
1246              busy wait here; not so bad if fish delay is very small  */
1247     int iq = 0; // DEBUGGING -- HWL
1248     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1249     /* send a fish when ready, but process messages that arrive in the meantime */
1250     do {
1251       if (PacketsWaiting()) {
1252         iq++; // DEBUGGING
1253         *receivedFinish = processMessages();
1254       }
1255       now = msTime();
1256     } while (!*receivedFinish || now<next_fish_to_send_at);
1257     // JB: This means the fish could become obsolete, if we receive
1258     // work. Better check for work again? 
1259     // last line: while (!receivedFinish || !haveWork || now<...)
1260     // next line: if (receivedFinish || haveWork )
1261
1262     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1263       return rtsFalse;  // NB: this will leave scheduler loop
1264                         // immediately after return!
1265                           
1266     IF_PAR_DEBUG(fish, // verbose,
1267                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1268
1269   }
1270
1271     // JB: IMHO, this should all be hidden inside sendFish(...)
1272     /* pe = choosePE(); 
1273        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1274                 NEW_FISH_HUNGER);
1275
1276     // Global statistics: count no. of fishes
1277     if (RtsFlags.ParFlags.ParStats.Global &&
1278          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1279            globalParStats.tot_fish_mess++;
1280            }
1281     */ 
1282
1283   /* delayed fishes must have been sent by now! */
1284   next_fish_to_send_at = 0;  
1285   }
1286       
1287   *receivedFinish = processMessages();
1288 # endif /* SPARKS */
1289
1290  return rtsFalse;
1291  /* NB: this function always returns rtsFalse, meaning the scheduler
1292     loop continues with the next iteration; 
1293     rationale: 
1294       return code means success in finding work; we enter this function
1295       if there is no local work, thus have to send a fish which takes
1296       time until it arrives with work; in the meantime we should process
1297       messages in the main loop;
1298  */
1299 }
1300 #endif // PARALLEL_HASKELL
1301
1302 /* ----------------------------------------------------------------------------
1303  * PAR/GRAN: Report stats & debugging info(?)
1304  * ------------------------------------------------------------------------- */
1305
1306 #if defined(PAR) || defined(GRAN)
1307 static void
1308 scheduleGranParReport(void)
1309 {
1310   ASSERT(run_queue_hd != END_TSO_QUEUE);
1311
1312   /* Take a thread from the run queue, if we have work */
1313   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1314
1315     /* If this TSO has got its outport closed in the meantime, 
1316      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1317      * It has to be marked as TH_DEAD for this purpose.
1318      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1319
1320 JB: TODO: investigate wether state change field could be nuked
1321      entirely and replaced by the normal tso state (whatnext
1322      field). All we want to do is to kill tsos from outside.
1323      */
1324
1325     /* ToDo: write something to the log-file
1326     if (RTSflags.ParFlags.granSimStats && !sameThread)
1327         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1328
1329     CurrentTSO = t;
1330     */
1331     /* the spark pool for the current PE */
1332     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1333
1334     IF_DEBUG(scheduler, 
1335              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1336                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1337
1338     IF_PAR_DEBUG(fish,
1339              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1340                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1341
1342     if (RtsFlags.ParFlags.ParStats.Full && 
1343         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1344         (emitSchedule || // forced emit
1345          (t && LastTSO && t->id != LastTSO->id))) {
1346       /* 
1347          we are running a different TSO, so write a schedule event to log file
1348          NB: If we use fair scheduling we also have to write  a deschedule 
1349              event for LastTSO; with unfair scheduling we know that the
1350              previous tso has blocked whenever we switch to another tso, so
1351              we don't need it in GUM for now
1352       */
1353       IF_PAR_DEBUG(fish, // schedule,
1354                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1355
1356       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1357                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1358       emitSchedule = rtsFalse;
1359     }
1360 }     
1361 #endif
1362
1363 /* ----------------------------------------------------------------------------
1364  * After running a thread...
1365  * ------------------------------------------------------------------------- */
1366
1367 static void
1368 schedulePostRunThread(void)
1369 {
1370 #if defined(PAR)
1371     /* HACK 675: if the last thread didn't yield, make sure to print a 
1372        SCHEDULE event to the log file when StgRunning the next thread, even
1373        if it is the same one as before */
1374     LastTSO = t; 
1375     TimeOfLastYield = CURRENT_TIME;
1376 #endif
1377
1378   /* some statistics gathering in the parallel case */
1379
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1381   switch (ret) {
1382     case HeapOverflow:
1383 # if defined(GRAN)
1384       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385       globalGranStats.tot_heapover++;
1386 # elif defined(PAR)
1387       globalParStats.tot_heapover++;
1388 # endif
1389       break;
1390
1391      case StackOverflow:
1392 # if defined(GRAN)
1393       IF_DEBUG(gran, 
1394                DumpGranEvent(GR_DESCHEDULE, t));
1395       globalGranStats.tot_stackover++;
1396 # elif defined(PAR)
1397       // IF_DEBUG(par, 
1398       // DumpGranEvent(GR_DESCHEDULE, t);
1399       globalParStats.tot_stackover++;
1400 # endif
1401       break;
1402
1403     case ThreadYielding:
1404 # if defined(GRAN)
1405       IF_DEBUG(gran, 
1406                DumpGranEvent(GR_DESCHEDULE, t));
1407       globalGranStats.tot_yields++;
1408 # elif defined(PAR)
1409       // IF_DEBUG(par, 
1410       // DumpGranEvent(GR_DESCHEDULE, t);
1411       globalParStats.tot_yields++;
1412 # endif
1413       break; 
1414
1415     case ThreadBlocked:
1416 # if defined(GRAN)
1417       IF_DEBUG(scheduler,
1418                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1419                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1420                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421                if (t->block_info.closure!=(StgClosure*)NULL)
1422                  print_bq(t->block_info.closure);
1423                debugBelch("\n"));
1424
1425       // ??? needed; should emit block before
1426       IF_DEBUG(gran, 
1427                DumpGranEvent(GR_DESCHEDULE, t)); 
1428       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1429       /*
1430         ngoq Dogh!
1431       ASSERT(procStatus[CurrentProc]==Busy || 
1432               ((procStatus[CurrentProc]==Fetching) && 
1433               (t->block_info.closure!=(StgClosure*)NULL)));
1434       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436             procStatus[CurrentProc]==Fetching)) 
1437         procStatus[CurrentProc] = Idle;
1438       */
1439 # elif defined(PAR)
1440 //++PAR++  blockThread() writes the event (change?)
1441 # endif
1442     break;
1443
1444   case ThreadFinished:
1445     break;
1446
1447   default:
1448     barf("parGlobalStats: unknown return code");
1449     break;
1450     }
1451 #endif
1452 }
1453
1454 /* -----------------------------------------------------------------------------
1455  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456  * -------------------------------------------------------------------------- */
1457
1458 static rtsBool
1459 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1460 {
1461     // did the task ask for a large block?
1462     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1463         // if so, get one and push it on the front of the nursery.
1464         bdescr *bd;
1465         lnat blocks;
1466         
1467         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1468         
1469         IF_DEBUG(scheduler,
1470                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1471                             (long)t->id, whatNext_strs[t->what_next], blocks));
1472         
1473         // don't do this if the nursery is (nearly) full, we'll GC first.
1474         if (cap->r.rCurrentNursery->link != NULL ||
1475             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1476                                                // if the nursery has only one block.
1477             
1478             ACQUIRE_SM_LOCK
1479             bd = allocGroup( blocks );
1480             RELEASE_SM_LOCK
1481             cap->r.rNursery->n_blocks += blocks;
1482             
1483             // link the new group into the list
1484             bd->link = cap->r.rCurrentNursery;
1485             bd->u.back = cap->r.rCurrentNursery->u.back;
1486             if (cap->r.rCurrentNursery->u.back != NULL) {
1487                 cap->r.rCurrentNursery->u.back->link = bd;
1488             } else {
1489 #if !defined(SMP)
1490                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1491                        g0s0 == cap->r.rNursery);
1492 #endif
1493                 cap->r.rNursery->blocks = bd;
1494             }             
1495             cap->r.rCurrentNursery->u.back = bd;
1496             
1497             // initialise it as a nursery block.  We initialise the
1498             // step, gen_no, and flags field of *every* sub-block in
1499             // this large block, because this is easier than making
1500             // sure that we always find the block head of a large
1501             // block whenever we call Bdescr() (eg. evacuate() and
1502             // isAlive() in the GC would both have to do this, at
1503             // least).
1504             { 
1505                 bdescr *x;
1506                 for (x = bd; x < bd + blocks; x++) {
1507                     x->step = cap->r.rNursery;
1508                     x->gen_no = 0;
1509                     x->flags = 0;
1510                 }
1511             }
1512             
1513             // This assert can be a killer if the app is doing lots
1514             // of large block allocations.
1515             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1516             
1517             // now update the nursery to point to the new block
1518             cap->r.rCurrentNursery = bd;
1519             
1520             // we might be unlucky and have another thread get on the
1521             // run queue before us and steal the large block, but in that
1522             // case the thread will just end up requesting another large
1523             // block.
1524             pushOnRunQueue(cap,t);
1525             return rtsFalse;  /* not actually GC'ing */
1526         }
1527     }
1528     
1529     IF_DEBUG(scheduler,
1530              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1531                         (long)t->id, whatNext_strs[t->what_next]));
1532 #if defined(GRAN)
1533     ASSERT(!is_on_queue(t,CurrentProc));
1534 #elif defined(PARALLEL_HASKELL)
1535     /* Currently we emit a DESCHEDULE event before GC in GUM.
1536        ToDo: either add separate event to distinguish SYSTEM time from rest
1537        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1538     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1539         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1540                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1541         emitSchedule = rtsTrue;
1542     }
1543 #endif
1544       
1545     pushOnRunQueue(cap,t);
1546     return rtsTrue;
1547     /* actual GC is done at the end of the while loop in schedule() */
1548 }
1549
1550 /* -----------------------------------------------------------------------------
1551  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1552  * -------------------------------------------------------------------------- */
1553
1554 static void
1555 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1556 {
1557     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1558                                   (long)t->id, whatNext_strs[t->what_next]));
1559     /* just adjust the stack for this thread, then pop it back
1560      * on the run queue.
1561      */
1562     { 
1563         /* enlarge the stack */
1564         StgTSO *new_t = threadStackOverflow(cap, t);
1565         
1566         /* The TSO attached to this Task may have moved, so update the
1567          * pointer to it.
1568          */
1569         if (task->tso == t) {
1570             task->tso = new_t;
1571         }
1572         pushOnRunQueue(cap,new_t);
1573     }
1574 }
1575
1576 /* -----------------------------------------------------------------------------
1577  * Handle a thread that returned to the scheduler with ThreadYielding
1578  * -------------------------------------------------------------------------- */
1579
1580 static rtsBool
1581 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1582 {
1583     // Reset the context switch flag.  We don't do this just before
1584     // running the thread, because that would mean we would lose ticks
1585     // during GC, which can lead to unfair scheduling (a thread hogs
1586     // the CPU because the tick always arrives during GC).  This way
1587     // penalises threads that do a lot of allocation, but that seems
1588     // better than the alternative.
1589     context_switch = 0;
1590     
1591     /* put the thread back on the run queue.  Then, if we're ready to
1592      * GC, check whether this is the last task to stop.  If so, wake
1593      * up the GC thread.  getThread will block during a GC until the
1594      * GC is finished.
1595      */
1596     IF_DEBUG(scheduler,
1597              if (t->what_next != prev_what_next) {
1598                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1599                             (long)t->id, whatNext_strs[t->what_next]);
1600              } else {
1601                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1602                             (long)t->id, whatNext_strs[t->what_next]);
1603              }
1604         );
1605     
1606     IF_DEBUG(sanity,
1607              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1608              checkTSO(t));
1609     ASSERT(t->link == END_TSO_QUEUE);
1610     
1611     // Shortcut if we're just switching evaluators: don't bother
1612     // doing stack squeezing (which can be expensive), just run the
1613     // thread.
1614     if (t->what_next != prev_what_next) {
1615         return rtsTrue;
1616     }
1617     
1618 #if defined(GRAN)
1619     ASSERT(!is_on_queue(t,CurrentProc));
1620       
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1623              checkThreadQsSanity(rtsTrue));
1624
1625 #endif
1626
1627     addToRunQueue(cap,t);
1628
1629 #if defined(GRAN)
1630     /* add a ContinueThread event to actually process the thread */
1631     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1632               ContinueThread,
1633               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1634     IF_GRAN_DEBUG(bq, 
1635                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1636                   G_EVENTQ(0);
1637                   G_CURR_THREADQ(0));
1638 #endif
1639     return rtsFalse;
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643  * Handle a thread that returned to the scheduler with ThreadBlocked
1644  * -------------------------------------------------------------------------- */
1645
1646 static void
1647 scheduleHandleThreadBlocked( StgTSO *t
1648 #if !defined(GRAN) && !defined(DEBUG)
1649     STG_UNUSED
1650 #endif
1651     )
1652 {
1653 #if defined(GRAN)
1654     IF_DEBUG(scheduler,
1655              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1656                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1657              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1658     
1659     // ??? needed; should emit block before
1660     IF_DEBUG(gran, 
1661              DumpGranEvent(GR_DESCHEDULE, t)); 
1662     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1663     /*
1664       ngoq Dogh!
1665       ASSERT(procStatus[CurrentProc]==Busy || 
1666       ((procStatus[CurrentProc]==Fetching) && 
1667       (t->block_info.closure!=(StgClosure*)NULL)));
1668       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1669       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1670       procStatus[CurrentProc]==Fetching)) 
1671       procStatus[CurrentProc] = Idle;
1672     */
1673 #elif defined(PAR)
1674     IF_DEBUG(scheduler,
1675              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1676                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1677     IF_PAR_DEBUG(bq,
1678                  
1679                  if (t->block_info.closure!=(StgClosure*)NULL) 
1680                  print_bq(t->block_info.closure));
1681     
1682     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1683     blockThread(t);
1684     
1685     /* whatever we schedule next, we must log that schedule */
1686     emitSchedule = rtsTrue;
1687     
1688 #else /* !GRAN */
1689
1690       // We don't need to do anything.  The thread is blocked, and it
1691       // has tidied up its stack and placed itself on whatever queue
1692       // it needs to be on.
1693
1694 #if !defined(SMP)
1695     ASSERT(t->why_blocked != NotBlocked);
1696              // This might not be true under SMP: we don't have
1697              // exclusive access to this TSO, so someone might have
1698              // woken it up by now.  This actually happens: try
1699              // conc023 +RTS -N2.
1700 #endif
1701
1702     IF_DEBUG(scheduler,
1703              debugBelch("--<< thread %d (%s) stopped: ", 
1704                         t->id, whatNext_strs[t->what_next]);
1705              printThreadBlockage(t);
1706              debugBelch("\n"));
1707     
1708     /* Only for dumping event to log file 
1709        ToDo: do I need this in GranSim, too?
1710        blockThread(t);
1711     */
1712 #endif
1713 }
1714
1715 /* -----------------------------------------------------------------------------
1716  * Handle a thread that returned to the scheduler with ThreadFinished
1717  * -------------------------------------------------------------------------- */
1718
1719 static rtsBool
1720 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1721 {
1722     /* Need to check whether this was a main thread, and if so,
1723      * return with the return value.
1724      *
1725      * We also end up here if the thread kills itself with an
1726      * uncaught exception, see Exception.cmm.
1727      */
1728     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1729                                   t->id, whatNext_strs[t->what_next]));
1730
1731 #if defined(GRAN)
1732       endThread(t, CurrentProc); // clean-up the thread
1733 #elif defined(PARALLEL_HASKELL)
1734       /* For now all are advisory -- HWL */
1735       //if(t->priority==AdvisoryPriority) ??
1736       advisory_thread_count--; // JB: Caution with this counter, buggy!
1737       
1738 # if defined(DIST)
1739       if(t->dist.priority==RevalPriority)
1740         FinishReval(t);
1741 # endif
1742     
1743 # if defined(EDENOLD)
1744       // the thread could still have an outport... (BUG)
1745       if (t->eden.outport != -1) {
1746       // delete the outport for the tso which has finished...
1747         IF_PAR_DEBUG(eden_ports,
1748                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1749                               t->eden.outport, t->id));
1750         deleteOPT(t);
1751       }
1752       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1753       if (t->eden.epid != -1) {
1754         IF_PAR_DEBUG(eden_ports,
1755                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1756                            t->id, t->eden.epid));
1757         removeTSOfromProcess(t);
1758       }
1759 # endif 
1760
1761 # if defined(PAR)
1762       if (RtsFlags.ParFlags.ParStats.Full &&
1763           !RtsFlags.ParFlags.ParStats.Suppressed) 
1764         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1765
1766       //  t->par only contains statistics: left out for now...
1767       IF_PAR_DEBUG(fish,
1768                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1769                               t->id,t,t->par.sparkname));
1770 # endif
1771 #endif // PARALLEL_HASKELL
1772
1773       //
1774       // Check whether the thread that just completed was a bound
1775       // thread, and if so return with the result.  
1776       //
1777       // There is an assumption here that all thread completion goes
1778       // through this point; we need to make sure that if a thread
1779       // ends up in the ThreadKilled state, that it stays on the run
1780       // queue so it can be dealt with here.
1781       //
1782
1783       if (t->bound) {
1784
1785           if (t->bound != task) {
1786 #if !defined(THREADED_RTS)
1787               // Must be a bound thread that is not the topmost one.  Leave
1788               // it on the run queue until the stack has unwound to the
1789               // point where we can deal with this.  Leaving it on the run
1790               // queue also ensures that the garbage collector knows about
1791               // this thread and its return value (it gets dropped from the
1792               // all_threads list so there's no other way to find it).
1793               appendToRunQueue(cap,t);
1794               return rtsFalse;
1795 #else
1796               // this cannot happen in the threaded RTS, because a
1797               // bound thread can only be run by the appropriate Task.
1798               barf("finished bound thread that isn't mine");
1799 #endif
1800           }
1801
1802           ASSERT(task->tso == t);
1803
1804           if (t->what_next == ThreadComplete) {
1805               if (task->ret) {
1806                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1807                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1808               }
1809               task->stat = Success;
1810           } else {
1811               if (task->ret) {
1812                   *(task->ret) = NULL;
1813               }
1814               if (interrupted) {
1815                   task->stat = Interrupted;
1816               } else {
1817                   task->stat = Killed;
1818               }
1819           }
1820 #ifdef DEBUG
1821           removeThreadLabel((StgWord)task->tso->id);
1822 #endif
1823           return rtsTrue; // tells schedule() to return
1824       }
1825
1826       return rtsFalse;
1827 }
1828
1829 /* -----------------------------------------------------------------------------
1830  * Perform a heap census, if PROFILING
1831  * -------------------------------------------------------------------------- */
1832
1833 static rtsBool
1834 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1835 {
1836 #if defined(PROFILING)
1837     // When we have +RTS -i0 and we're heap profiling, do a census at
1838     // every GC.  This lets us get repeatable runs for debugging.
1839     if (performHeapProfile ||
1840         (RtsFlags.ProfFlags.profileInterval==0 &&
1841          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1842         GarbageCollect(GetRoots, rtsTrue);
1843         heapCensus();
1844         performHeapProfile = rtsFalse;
1845         return rtsTrue;  // true <=> we already GC'd
1846     }
1847 #endif
1848     return rtsFalse;
1849 }
1850
1851 /* -----------------------------------------------------------------------------
1852  * Perform a garbage collection if necessary
1853  * -------------------------------------------------------------------------- */
1854
1855 static void
1856 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1857 {
1858     StgTSO *t;
1859 #ifdef SMP
1860     static volatile StgWord waiting_for_gc;
1861     rtsBool was_waiting;
1862     nat i;
1863 #endif
1864
1865 #ifdef SMP
1866     // In order to GC, there must be no threads running Haskell code.
1867     // Therefore, the GC thread needs to hold *all* the capabilities,
1868     // and release them after the GC has completed.  
1869     //
1870     // This seems to be the simplest way: previous attempts involved
1871     // making all the threads with capabilities give up their
1872     // capabilities and sleep except for the *last* one, which
1873     // actually did the GC.  But it's quite hard to arrange for all
1874     // the other tasks to sleep and stay asleep.
1875     //
1876         
1877     was_waiting = cas(&waiting_for_gc, 0, 1);
1878     if (was_waiting) {
1879         do {
1880             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1881             yieldCapability(&cap,task);
1882         } while (waiting_for_gc);
1883         return;
1884     }
1885
1886     for (i=0; i < n_capabilities; i++) {
1887         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1888         if (cap != &capabilities[i]) {
1889             Capability *pcap = &capabilities[i];
1890             // we better hope this task doesn't get migrated to
1891             // another Capability while we're waiting for this one.
1892             // It won't, because load balancing happens while we have
1893             // all the Capabilities, but even so it's a slightly
1894             // unsavoury invariant.
1895             task->cap = pcap;
1896             context_switch = 1;
1897             waitForReturnCapability(&pcap, task);
1898             if (pcap != &capabilities[i]) {
1899                 barf("scheduleDoGC: got the wrong capability");
1900             }
1901         }
1902     }
1903
1904     waiting_for_gc = rtsFalse;
1905 #endif
1906
1907     /* Kick any transactions which are invalid back to their
1908      * atomically frames.  When next scheduled they will try to
1909      * commit, this commit will fail and they will retry.
1910      */
1911     { 
1912         StgTSO *next;
1913
1914         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1915             if (t->what_next == ThreadRelocated) {
1916                 next = t->link;
1917             } else {
1918                 next = t->global_link;
1919                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1920                     if (!stmValidateNestOfTransactions (t -> trec)) {
1921                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1922                         
1923                         // strip the stack back to the
1924                         // ATOMICALLY_FRAME, aborting the (nested)
1925                         // transaction, and saving the stack of any
1926                         // partially-evaluated thunks on the heap.
1927                         raiseAsync_(cap, t, NULL, rtsTrue);
1928                         
1929 #ifdef REG_R1
1930                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1931 #endif
1932                     }
1933                 }
1934             }
1935         }
1936     }
1937     
1938     // so this happens periodically:
1939     scheduleCheckBlackHoles(cap);
1940     
1941     IF_DEBUG(scheduler, printAllThreads());
1942
1943     /* everybody back, start the GC.
1944      * Could do it in this thread, or signal a condition var
1945      * to do it in another thread.  Either way, we need to
1946      * broadcast on gc_pending_cond afterward.
1947      */
1948 #if defined(THREADED_RTS)
1949     IF_DEBUG(scheduler,sched_belch("doing GC"));
1950 #endif
1951     GarbageCollect(GetRoots, force_major);
1952     
1953 #if defined(SMP)
1954     // release our stash of capabilities.
1955     for (i = 0; i < n_capabilities; i++) {
1956         if (cap != &capabilities[i]) {
1957             task->cap = &capabilities[i];
1958             releaseCapability(&capabilities[i]);
1959         }
1960     }
1961     task->cap = cap;
1962 #endif
1963
1964 #if defined(GRAN)
1965     /* add a ContinueThread event to continue execution of current thread */
1966     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1967               ContinueThread,
1968               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1969     IF_GRAN_DEBUG(bq, 
1970                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1971                   G_EVENTQ(0);
1972                   G_CURR_THREADQ(0));
1973 #endif /* GRAN */
1974 }
1975
1976 /* ---------------------------------------------------------------------------
1977  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1978  * used by Control.Concurrent for error checking.
1979  * ------------------------------------------------------------------------- */
1980  
1981 StgBool
1982 rtsSupportsBoundThreads(void)
1983 {
1984 #if defined(THREADED_RTS)
1985   return rtsTrue;
1986 #else
1987   return rtsFalse;
1988 #endif
1989 }
1990
1991 /* ---------------------------------------------------------------------------
1992  * isThreadBound(tso): check whether tso is bound to an OS thread.
1993  * ------------------------------------------------------------------------- */
1994  
1995 StgBool
1996 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1997 {
1998 #if defined(THREADED_RTS)
1999   return (tso->bound != NULL);
2000 #endif
2001   return rtsFalse;
2002 }
2003
2004 /* ---------------------------------------------------------------------------
2005  * Singleton fork(). Do not copy any running threads.
2006  * ------------------------------------------------------------------------- */
2007
2008 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2009 #define FORKPROCESS_PRIMOP_SUPPORTED
2010 #endif
2011
2012 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2013 static void 
2014 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2015 #endif
2016 StgInt
2017 forkProcess(HsStablePtr *entry
2018 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2019             STG_UNUSED
2020 #endif
2021            )
2022 {
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2024     pid_t pid;
2025     StgTSO* t,*next;
2026     Task *task;
2027     Capability *cap;
2028     
2029     IF_DEBUG(scheduler,sched_belch("forking!"));
2030     
2031     // ToDo: for SMP, we should probably acquire *all* the capabilities
2032     cap = rts_lock();
2033     
2034     pid = fork();
2035     
2036     if (pid) { // parent
2037         
2038         // just return the pid
2039         rts_unlock(cap);
2040         return pid;
2041         
2042     } else { // child
2043         
2044         // delete all threads
2045         cap->run_queue_hd = END_TSO_QUEUE;
2046         cap->run_queue_tl = END_TSO_QUEUE;
2047         
2048         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2049             next = t->link;
2050             
2051             // don't allow threads to catch the ThreadKilled exception
2052             deleteThreadImmediately(cap,t);
2053         }
2054         
2055         // wipe the main thread list
2056         while ((task = all_tasks) != NULL) {
2057             all_tasks = task->all_link;
2058             discardTask(task);
2059         }
2060         
2061         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2062         rts_checkSchedStatus("forkProcess",cap);
2063         
2064         rts_unlock(cap);
2065         hs_exit();                      // clean up and exit
2066         stg_exit(EXIT_SUCCESS);
2067     }
2068 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2069     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2070     return -1;
2071 #endif
2072 }
2073
2074 /* ---------------------------------------------------------------------------
2075  * Delete the threads on the run queue of the current capability.
2076  * ------------------------------------------------------------------------- */
2077    
2078 static void
2079 deleteRunQueue (Capability *cap)
2080 {
2081     StgTSO *t, *next;
2082     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2083         ASSERT(t->what_next != ThreadRelocated);
2084         next = t->link;
2085         deleteThread(cap, t);
2086     }
2087 }
2088
2089 /* startThread and  insertThread are now in GranSim.c -- HWL */
2090
2091
2092 /* -----------------------------------------------------------------------------
2093    Managing the suspended_ccalling_tasks list.
2094    Locks required: sched_mutex
2095    -------------------------------------------------------------------------- */
2096
2097 STATIC_INLINE void
2098 suspendTask (Capability *cap, Task *task)
2099 {
2100     ASSERT(task->next == NULL && task->prev == NULL);
2101     task->next = cap->suspended_ccalling_tasks;
2102     task->prev = NULL;
2103     if (cap->suspended_ccalling_tasks) {
2104         cap->suspended_ccalling_tasks->prev = task;
2105     }
2106     cap->suspended_ccalling_tasks = task;
2107 }
2108
2109 STATIC_INLINE void
2110 recoverSuspendedTask (Capability *cap, Task *task)
2111 {
2112     if (task->prev) {
2113         task->prev->next = task->next;
2114     } else {
2115         ASSERT(cap->suspended_ccalling_tasks == task);
2116         cap->suspended_ccalling_tasks = task->next;
2117     }
2118     if (task->next) {
2119         task->next->prev = task->prev;
2120     }
2121     task->next = task->prev = NULL;
2122 }
2123
2124 /* ---------------------------------------------------------------------------
2125  * Suspending & resuming Haskell threads.
2126  * 
2127  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2128  * its capability before calling the C function.  This allows another
2129  * task to pick up the capability and carry on running Haskell
2130  * threads.  It also means that if the C call blocks, it won't lock
2131  * the whole system.
2132  *
2133  * The Haskell thread making the C call is put to sleep for the
2134  * duration of the call, on the susepended_ccalling_threads queue.  We
2135  * give out a token to the task, which it can use to resume the thread
2136  * on return from the C function.
2137  * ------------------------------------------------------------------------- */
2138    
2139 void *
2140 suspendThread (StgRegTable *reg)
2141 {
2142   Capability *cap;
2143   int saved_errno = errno;
2144   StgTSO *tso;
2145   Task *task;
2146
2147   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2148    */
2149   cap = regTableToCapability(reg);
2150
2151   task = cap->running_task;
2152   tso = cap->r.rCurrentTSO;
2153
2154   IF_DEBUG(scheduler,
2155            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2156
2157   // XXX this might not be necessary --SDM
2158   tso->what_next = ThreadRunGHC;
2159
2160   threadPaused(tso);
2161
2162   if(tso->blocked_exceptions == NULL)  {
2163       tso->why_blocked = BlockedOnCCall;
2164       tso->blocked_exceptions = END_TSO_QUEUE;
2165   } else {
2166       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2167   }
2168
2169   // Hand back capability
2170   task->suspended_tso = tso;
2171
2172   ACQUIRE_LOCK(&cap->lock);
2173
2174   suspendTask(cap,task);
2175   cap->in_haskell = rtsFalse;
2176   releaseCapability_(cap);
2177   
2178   RELEASE_LOCK(&cap->lock);
2179
2180 #if defined(THREADED_RTS)
2181   /* Preparing to leave the RTS, so ensure there's a native thread/task
2182      waiting to take over.
2183   */
2184   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2185 #endif
2186
2187   errno = saved_errno;
2188   return task;
2189 }
2190
2191 StgRegTable *
2192 resumeThread (void *task_)
2193 {
2194     StgTSO *tso;
2195     Capability *cap;
2196     int saved_errno = errno;
2197     Task *task = task_;
2198
2199     cap = task->cap;
2200     // Wait for permission to re-enter the RTS with the result.
2201     waitForReturnCapability(&cap,task);
2202     // we might be on a different capability now... but if so, our
2203     // entry on the suspended_ccalling_tasks list will also have been
2204     // migrated.
2205
2206     // Remove the thread from the suspended list
2207     recoverSuspendedTask(cap,task);
2208
2209     tso = task->suspended_tso;
2210     task->suspended_tso = NULL;
2211     tso->link = END_TSO_QUEUE;
2212     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2213     
2214     if (tso->why_blocked == BlockedOnCCall) {
2215         awakenBlockedQueue(cap,tso->blocked_exceptions);
2216         tso->blocked_exceptions = NULL;
2217     }
2218     
2219     /* Reset blocking status */
2220     tso->why_blocked  = NotBlocked;
2221     
2222     cap->r.rCurrentTSO = tso;
2223     cap->in_haskell = rtsTrue;
2224     errno = saved_errno;
2225
2226     return &cap->r;
2227 }
2228
2229 /* ---------------------------------------------------------------------------
2230  * Comparing Thread ids.
2231  *
2232  * This is used from STG land in the implementation of the
2233  * instances of Eq/Ord for ThreadIds.
2234  * ------------------------------------------------------------------------ */
2235
2236 int
2237 cmp_thread(StgPtr tso1, StgPtr tso2) 
2238
2239   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2240   StgThreadID id2 = ((StgTSO *)tso2)->id;
2241  
2242   if (id1 < id2) return (-1);
2243   if (id1 > id2) return 1;
2244   return 0;
2245 }
2246
2247 /* ---------------------------------------------------------------------------
2248  * Fetching the ThreadID from an StgTSO.
2249  *
2250  * This is used in the implementation of Show for ThreadIds.
2251  * ------------------------------------------------------------------------ */
2252 int
2253 rts_getThreadId(StgPtr tso) 
2254 {
2255   return ((StgTSO *)tso)->id;
2256 }
2257
2258 #ifdef DEBUG
2259 void
2260 labelThread(StgPtr tso, char *label)
2261 {
2262   int len;
2263   void *buf;
2264
2265   /* Caveat: Once set, you can only set the thread name to "" */
2266   len = strlen(label)+1;
2267   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2268   strncpy(buf,label,len);
2269   /* Update will free the old memory for us */
2270   updateThreadLabel(((StgTSO *)tso)->id,buf);
2271 }
2272 #endif /* DEBUG */
2273
2274 /* ---------------------------------------------------------------------------
2275    Create a new thread.
2276
2277    The new thread starts with the given stack size.  Before the
2278    scheduler can run, however, this thread needs to have a closure
2279    (and possibly some arguments) pushed on its stack.  See
2280    pushClosure() in Schedule.h.
2281
2282    createGenThread() and createIOThread() (in SchedAPI.h) are
2283    convenient packaged versions of this function.
2284
2285    currently pri (priority) is only used in a GRAN setup -- HWL
2286    ------------------------------------------------------------------------ */
2287 #if defined(GRAN)
2288 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2289 StgTSO *
2290 createThread(nat size, StgInt pri)
2291 #else
2292 StgTSO *
2293 createThread(Capability *cap, nat size)
2294 #endif
2295 {
2296     StgTSO *tso;
2297     nat stack_size;
2298
2299     /* sched_mutex is *not* required */
2300
2301     /* First check whether we should create a thread at all */
2302 #if defined(PARALLEL_HASKELL)
2303     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2304     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2305         threadsIgnored++;
2306         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2307                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2308         return END_TSO_QUEUE;
2309     }
2310     threadsCreated++;
2311 #endif
2312
2313 #if defined(GRAN)
2314     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2315 #endif
2316
2317     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2318
2319     /* catch ridiculously small stack sizes */
2320     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2321         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2322     }
2323
2324     stack_size = size - TSO_STRUCT_SIZEW;
2325     
2326     tso = (StgTSO *)allocateLocal(cap, size);
2327     TICK_ALLOC_TSO(stack_size, 0);
2328
2329     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2330 #if defined(GRAN)
2331     SET_GRAN_HDR(tso, ThisPE);
2332 #endif
2333
2334     // Always start with the compiled code evaluator
2335     tso->what_next = ThreadRunGHC;
2336
2337     tso->why_blocked  = NotBlocked;
2338     tso->blocked_exceptions = NULL;
2339     
2340     tso->saved_errno = 0;
2341     tso->bound = NULL;
2342     
2343     tso->stack_size     = stack_size;
2344     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2345                           - TSO_STRUCT_SIZEW;
2346     tso->sp             = (P_)&(tso->stack) + stack_size;
2347
2348     tso->trec = NO_TREC;
2349     
2350 #ifdef PROFILING
2351     tso->prof.CCCS = CCS_MAIN;
2352 #endif
2353     
2354   /* put a stop frame on the stack */
2355     tso->sp -= sizeofW(StgStopFrame);
2356     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2357     tso->link = END_TSO_QUEUE;
2358     
2359   // ToDo: check this
2360 #if defined(GRAN)
2361     /* uses more flexible routine in GranSim */
2362     insertThread(tso, CurrentProc);
2363 #else
2364     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2365      * from its creation
2366      */
2367 #endif
2368     
2369 #if defined(GRAN) 
2370     if (RtsFlags.GranFlags.GranSimStats.Full) 
2371         DumpGranEvent(GR_START,tso);
2372 #elif defined(PARALLEL_HASKELL)
2373     if (RtsFlags.ParFlags.ParStats.Full) 
2374         DumpGranEvent(GR_STARTQ,tso);
2375     /* HACk to avoid SCHEDULE 
2376        LastTSO = tso; */
2377 #endif
2378     
2379     /* Link the new thread on the global thread list.
2380      */
2381     ACQUIRE_LOCK(&sched_mutex);
2382     tso->id = next_thread_id++;  // while we have the mutex
2383     tso->global_link = all_threads;
2384     all_threads = tso;
2385     RELEASE_LOCK(&sched_mutex);
2386     
2387 #if defined(DIST)
2388     tso->dist.priority = MandatoryPriority; //by default that is...
2389 #endif
2390     
2391 #if defined(GRAN)
2392     tso->gran.pri = pri;
2393 # if defined(DEBUG)
2394     tso->gran.magic = TSO_MAGIC; // debugging only
2395 # endif
2396     tso->gran.sparkname   = 0;
2397     tso->gran.startedat   = CURRENT_TIME; 
2398     tso->gran.exported    = 0;
2399     tso->gran.basicblocks = 0;
2400     tso->gran.allocs      = 0;
2401     tso->gran.exectime    = 0;
2402     tso->gran.fetchtime   = 0;
2403     tso->gran.fetchcount  = 0;
2404     tso->gran.blocktime   = 0;
2405     tso->gran.blockcount  = 0;
2406     tso->gran.blockedat   = 0;
2407     tso->gran.globalsparks = 0;
2408     tso->gran.localsparks  = 0;
2409     if (RtsFlags.GranFlags.Light)
2410         tso->gran.clock  = Now; /* local clock */
2411     else
2412         tso->gran.clock  = 0;
2413     
2414     IF_DEBUG(gran,printTSO(tso));
2415 #elif defined(PARALLEL_HASKELL)
2416 # if defined(DEBUG)
2417     tso->par.magic = TSO_MAGIC; // debugging only
2418 # endif
2419     tso->par.sparkname   = 0;
2420     tso->par.startedat   = CURRENT_TIME; 
2421     tso->par.exported    = 0;
2422     tso->par.basicblocks = 0;
2423     tso->par.allocs      = 0;
2424     tso->par.exectime    = 0;
2425     tso->par.fetchtime   = 0;
2426     tso->par.fetchcount  = 0;
2427     tso->par.blocktime   = 0;
2428     tso->par.blockcount  = 0;
2429     tso->par.blockedat   = 0;
2430     tso->par.globalsparks = 0;
2431     tso->par.localsparks  = 0;
2432 #endif
2433     
2434 #if defined(GRAN)
2435     globalGranStats.tot_threads_created++;
2436     globalGranStats.threads_created_on_PE[CurrentProc]++;
2437     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2438     globalGranStats.tot_sq_probes++;
2439 #elif defined(PARALLEL_HASKELL)
2440     // collect parallel global statistics (currently done together with GC stats)
2441     if (RtsFlags.ParFlags.ParStats.Global &&
2442         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2443         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2444         globalParStats.tot_threads_created++;
2445     }
2446 #endif 
2447     
2448 #if defined(GRAN)
2449     IF_GRAN_DEBUG(pri,
2450                   sched_belch("==__ schedule: Created TSO %d (%p);",
2451                               CurrentProc, tso, tso->id));
2452 #elif defined(PARALLEL_HASKELL)
2453     IF_PAR_DEBUG(verbose,
2454                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2455                              (long)tso->id, tso, advisory_thread_count));
2456 #else
2457     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2458                                    (long)tso->id, (long)tso->stack_size));
2459 #endif    
2460     return tso;
2461 }
2462
2463 #if defined(PAR)
2464 /* RFP:
2465    all parallel thread creation calls should fall through the following routine.
2466 */
2467 StgTSO *
2468 createThreadFromSpark(rtsSpark spark) 
2469 { StgTSO *tso;
2470   ASSERT(spark != (rtsSpark)NULL);
2471 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2472   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2473   { threadsIgnored++;
2474     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2475           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2476     return END_TSO_QUEUE;
2477   }
2478   else
2479   { threadsCreated++;
2480     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2481     if (tso==END_TSO_QUEUE)     
2482       barf("createSparkThread: Cannot create TSO");
2483 #if defined(DIST)
2484     tso->priority = AdvisoryPriority;
2485 #endif
2486     pushClosure(tso,spark);
2487     addToRunQueue(tso);
2488     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2489   }
2490   return tso;
2491 }
2492 #endif
2493
2494 /*
2495   Turn a spark into a thread.
2496   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2497 */
2498 #if 0
2499 StgTSO *
2500 activateSpark (rtsSpark spark) 
2501 {
2502   StgTSO *tso;
2503
2504   tso = createSparkThread(spark);
2505   if (RtsFlags.ParFlags.ParStats.Full) {   
2506     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2507       IF_PAR_DEBUG(verbose,
2508                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2509                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2510   }
2511   // ToDo: fwd info on local/global spark to thread -- HWL
2512   // tso->gran.exported =  spark->exported;
2513   // tso->gran.locked =   !spark->global;
2514   // tso->gran.sparkname = spark->name;
2515
2516   return tso;
2517 }
2518 #endif
2519
2520 /* ---------------------------------------------------------------------------
2521  * scheduleThread()
2522  *
2523  * scheduleThread puts a thread on the end  of the runnable queue.
2524  * This will usually be done immediately after a thread is created.
2525  * The caller of scheduleThread must create the thread using e.g.
2526  * createThread and push an appropriate closure
2527  * on this thread's stack before the scheduler is invoked.
2528  * ------------------------------------------------------------------------ */
2529
2530 void
2531 scheduleThread(Capability *cap, StgTSO *tso)
2532 {
2533     // The thread goes at the *end* of the run-queue, to avoid possible
2534     // starvation of any threads already on the queue.
2535     appendToRunQueue(cap,tso);
2536 }
2537
2538 Capability *
2539 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2540 {
2541     Task *task;
2542
2543     // We already created/initialised the Task
2544     task = cap->running_task;
2545
2546     // This TSO is now a bound thread; make the Task and TSO
2547     // point to each other.
2548     tso->bound = task;
2549
2550     task->tso = tso;
2551     task->ret = ret;
2552     task->stat = NoStatus;
2553
2554     appendToRunQueue(cap,tso);
2555
2556     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2557
2558 #if defined(GRAN)
2559     /* GranSim specific init */
2560     CurrentTSO = m->tso;                // the TSO to run
2561     procStatus[MainProc] = Busy;        // status of main PE
2562     CurrentProc = MainProc;             // PE to run it on
2563 #endif
2564
2565     cap = schedule(cap,task);
2566
2567     ASSERT(task->stat != NoStatus);
2568     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2569
2570     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2571     return cap;
2572 }
2573
2574 /* ----------------------------------------------------------------------------
2575  * Starting Tasks
2576  * ------------------------------------------------------------------------- */
2577
2578 #if defined(THREADED_RTS)
2579 void
2580 workerStart(Task *task)
2581 {
2582     Capability *cap;
2583
2584     // See startWorkerTask().
2585     ACQUIRE_LOCK(&task->lock);
2586     cap = task->cap;
2587     RELEASE_LOCK(&task->lock);
2588
2589     // set the thread-local pointer to the Task:
2590     taskEnter(task);
2591
2592     // schedule() runs without a lock.
2593     cap = schedule(cap,task);
2594
2595     // On exit from schedule(), we have a Capability.
2596     releaseCapability(cap);
2597     taskStop(task);
2598 }
2599 #endif
2600
2601 /* ---------------------------------------------------------------------------
2602  * initScheduler()
2603  *
2604  * Initialise the scheduler.  This resets all the queues - if the
2605  * queues contained any threads, they'll be garbage collected at the
2606  * next pass.
2607  *
2608  * ------------------------------------------------------------------------ */
2609
2610 void 
2611 initScheduler(void)
2612 {
2613 #if defined(GRAN)
2614   nat i;
2615   for (i=0; i<=MAX_PROC; i++) {
2616     run_queue_hds[i]      = END_TSO_QUEUE;
2617     run_queue_tls[i]      = END_TSO_QUEUE;
2618     blocked_queue_hds[i]  = END_TSO_QUEUE;
2619     blocked_queue_tls[i]  = END_TSO_QUEUE;
2620     ccalling_threadss[i]  = END_TSO_QUEUE;
2621     blackhole_queue[i]    = END_TSO_QUEUE;
2622     sleeping_queue        = END_TSO_QUEUE;
2623   }
2624 #elif !defined(THREADED_RTS)
2625   blocked_queue_hd  = END_TSO_QUEUE;
2626   blocked_queue_tl  = END_TSO_QUEUE;
2627   sleeping_queue    = END_TSO_QUEUE;
2628 #endif
2629
2630   blackhole_queue   = END_TSO_QUEUE;
2631   all_threads       = END_TSO_QUEUE;
2632
2633   context_switch = 0;
2634   interrupted    = 0;
2635
2636   RtsFlags.ConcFlags.ctxtSwitchTicks =
2637       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2638       
2639 #if defined(THREADED_RTS)
2640   /* Initialise the mutex and condition variables used by
2641    * the scheduler. */
2642   initMutex(&sched_mutex);
2643 #endif
2644   
2645   ACQUIRE_LOCK(&sched_mutex);
2646
2647   /* A capability holds the state a native thread needs in
2648    * order to execute STG code. At least one capability is
2649    * floating around (only SMP builds have more than one).
2650    */
2651   initCapabilities();
2652
2653   initTaskManager();
2654
2655 #if defined(SMP)
2656   /*
2657    * Eagerly start one worker to run each Capability, except for
2658    * Capability 0.  The idea is that we're probably going to start a
2659    * bound thread on Capability 0 pretty soon, so we don't want a
2660    * worker task hogging it.
2661    */
2662   { 
2663       nat i;
2664       Capability *cap;
2665       for (i = 1; i < n_capabilities; i++) {
2666           cap = &capabilities[i];
2667           ACQUIRE_LOCK(&cap->lock);
2668           startWorkerTask(cap, workerStart);
2669           RELEASE_LOCK(&cap->lock);
2670       }
2671   }
2672 #endif
2673
2674 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2675   initSparkPools();
2676 #endif
2677
2678   RELEASE_LOCK(&sched_mutex);
2679 }
2680
2681 void
2682 exitScheduler( void )
2683 {
2684     interrupted = rtsTrue;
2685     shutting_down_scheduler = rtsTrue;
2686
2687 #if defined(THREADED_RTS)
2688     { 
2689         Task *task;
2690         nat i;
2691         
2692         ACQUIRE_LOCK(&sched_mutex);
2693         task = newBoundTask();
2694         RELEASE_LOCK(&sched_mutex);
2695
2696         for (i = 0; i < n_capabilities; i++) {
2697             shutdownCapability(&capabilities[i], task);
2698         }
2699         boundTaskExiting(task);
2700         stopTaskManager();
2701     }
2702 #endif
2703 }
2704
2705 /* ---------------------------------------------------------------------------
2706    Where are the roots that we know about?
2707
2708         - all the threads on the runnable queue
2709         - all the threads on the blocked queue
2710         - all the threads on the sleeping queue
2711         - all the thread currently executing a _ccall_GC
2712         - all the "main threads"
2713      
2714    ------------------------------------------------------------------------ */
2715
2716 /* This has to be protected either by the scheduler monitor, or by the
2717         garbage collection monitor (probably the latter).
2718         KH @ 25/10/99
2719 */
2720
2721 void
2722 GetRoots( evac_fn evac )
2723 {
2724     nat i;
2725     Capability *cap;
2726     Task *task;
2727
2728 #if defined(GRAN)
2729     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2730         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2731             evac((StgClosure **)&run_queue_hds[i]);
2732         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2733             evac((StgClosure **)&run_queue_tls[i]);
2734         
2735         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2736             evac((StgClosure **)&blocked_queue_hds[i]);
2737         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2738             evac((StgClosure **)&blocked_queue_tls[i]);
2739         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2740             evac((StgClosure **)&ccalling_threads[i]);
2741     }
2742
2743     markEventQueue();
2744
2745 #else /* !GRAN */
2746
2747     for (i = 0; i < n_capabilities; i++) {
2748         cap = &capabilities[i];
2749         evac((StgClosure **)&cap->run_queue_hd);
2750         evac((StgClosure **)&cap->run_queue_tl);
2751         
2752         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2753              task=task->next) {
2754             evac((StgClosure **)&task->suspended_tso);
2755         }
2756     }
2757     
2758 #if !defined(THREADED_RTS)
2759     evac((StgClosure **)&blocked_queue_hd);
2760     evac((StgClosure **)&blocked_queue_tl);
2761     evac((StgClosure **)&sleeping_queue);
2762 #endif 
2763 #endif
2764
2765     evac((StgClosure **)&blackhole_queue);
2766
2767 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2768     markSparkQueue(evac);
2769 #endif
2770     
2771 #if defined(RTS_USER_SIGNALS)
2772     // mark the signal handlers (signals should be already blocked)
2773     markSignalHandlers(evac);
2774 #endif
2775 }
2776
2777 /* -----------------------------------------------------------------------------
2778    performGC
2779
2780    This is the interface to the garbage collector from Haskell land.
2781    We provide this so that external C code can allocate and garbage
2782    collect when called from Haskell via _ccall_GC.
2783
2784    It might be useful to provide an interface whereby the programmer
2785    can specify more roots (ToDo).
2786    
2787    This needs to be protected by the GC condition variable above.  KH.
2788    -------------------------------------------------------------------------- */
2789
2790 static void (*extra_roots)(evac_fn);
2791
2792 void
2793 performGC(void)
2794 {
2795 #ifdef THREADED_RTS
2796     // ToDo: we have to grab all the capabilities here.
2797     errorBelch("performGC not supported in threaded RTS (yet)");
2798     stg_exit(EXIT_FAILURE);
2799 #endif
2800     /* Obligated to hold this lock upon entry */
2801     GarbageCollect(GetRoots,rtsFalse);
2802 }
2803
2804 void
2805 performMajorGC(void)
2806 {
2807 #ifdef THREADED_RTS
2808     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2809     stg_exit(EXIT_FAILURE);
2810 #endif
2811     GarbageCollect(GetRoots,rtsTrue);
2812 }
2813
2814 static void
2815 AllRoots(evac_fn evac)
2816 {
2817     GetRoots(evac);             // the scheduler's roots
2818     extra_roots(evac);          // the user's roots
2819 }
2820
2821 void
2822 performGCWithRoots(void (*get_roots)(evac_fn))
2823 {
2824 #ifdef THREADED_RTS
2825     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2826     stg_exit(EXIT_FAILURE);
2827 #endif
2828     extra_roots = get_roots;
2829     GarbageCollect(AllRoots,rtsFalse);
2830 }
2831
2832 /* -----------------------------------------------------------------------------
2833    Stack overflow
2834
2835    If the thread has reached its maximum stack size, then raise the
2836    StackOverflow exception in the offending thread.  Otherwise
2837    relocate the TSO into a larger chunk of memory and adjust its stack
2838    size appropriately.
2839    -------------------------------------------------------------------------- */
2840
2841 static StgTSO *
2842 threadStackOverflow(Capability *cap, StgTSO *tso)
2843 {
2844   nat new_stack_size, stack_words;
2845   lnat new_tso_size;
2846   StgPtr new_sp;
2847   StgTSO *dest;
2848
2849   IF_DEBUG(sanity,checkTSO(tso));
2850   if (tso->stack_size >= tso->max_stack_size) {
2851
2852     IF_DEBUG(gc,
2853              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2854                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2855              /* If we're debugging, just print out the top of the stack */
2856              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2857                                               tso->sp+64)));
2858
2859     /* Send this thread the StackOverflow exception */
2860     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2861     return tso;
2862   }
2863
2864   /* Try to double the current stack size.  If that takes us over the
2865    * maximum stack size for this thread, then use the maximum instead.
2866    * Finally round up so the TSO ends up as a whole number of blocks.
2867    */
2868   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2869   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2870                                        TSO_STRUCT_SIZE)/sizeof(W_);
2871   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2872   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2873
2874   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2875
2876   dest = (StgTSO *)allocate(new_tso_size);
2877   TICK_ALLOC_TSO(new_stack_size,0);
2878
2879   /* copy the TSO block and the old stack into the new area */
2880   memcpy(dest,tso,TSO_STRUCT_SIZE);
2881   stack_words = tso->stack + tso->stack_size - tso->sp;
2882   new_sp = (P_)dest + new_tso_size - stack_words;
2883   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2884
2885   /* relocate the stack pointers... */
2886   dest->sp         = new_sp;
2887   dest->stack_size = new_stack_size;
2888         
2889   /* Mark the old TSO as relocated.  We have to check for relocated
2890    * TSOs in the garbage collector and any primops that deal with TSOs.
2891    *
2892    * It's important to set the sp value to just beyond the end
2893    * of the stack, so we don't attempt to scavenge any part of the
2894    * dead TSO's stack.
2895    */
2896   tso->what_next = ThreadRelocated;
2897   tso->link = dest;
2898   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2899   tso->why_blocked = NotBlocked;
2900
2901   IF_PAR_DEBUG(verbose,
2902                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2903                      tso->id, tso, tso->stack_size);
2904                /* If we're debugging, just print out the top of the stack */
2905                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2906                                                 tso->sp+64)));
2907   
2908   IF_DEBUG(sanity,checkTSO(tso));
2909 #if 0
2910   IF_DEBUG(scheduler,printTSO(dest));
2911 #endif
2912
2913   return dest;
2914 }
2915
2916 /* ---------------------------------------------------------------------------
2917    Wake up a queue that was blocked on some resource.
2918    ------------------------------------------------------------------------ */
2919
2920 #if defined(GRAN)
2921 STATIC_INLINE void
2922 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2923 {
2924 }
2925 #elif defined(PARALLEL_HASKELL)
2926 STATIC_INLINE void
2927 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2928 {
2929   /* write RESUME events to log file and
2930      update blocked and fetch time (depending on type of the orig closure) */
2931   if (RtsFlags.ParFlags.ParStats.Full) {
2932     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2933                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2934                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2935     if (emptyRunQueue())
2936       emitSchedule = rtsTrue;
2937
2938     switch (get_itbl(node)->type) {
2939         case FETCH_ME_BQ:
2940           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2941           break;
2942         case RBH:
2943         case FETCH_ME:
2944         case BLACKHOLE_BQ:
2945           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2946           break;
2947 #ifdef DIST
2948         case MVAR:
2949           break;
2950 #endif    
2951         default:
2952           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2953         }
2954       }
2955 }
2956 #endif
2957
2958 #if defined(GRAN)
2959 StgBlockingQueueElement *
2960 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2961 {
2962     StgTSO *tso;
2963     PEs node_loc, tso_loc;
2964
2965     node_loc = where_is(node); // should be lifted out of loop
2966     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2967     tso_loc = where_is((StgClosure *)tso);
2968     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2969       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2970       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2971       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2972       // insertThread(tso, node_loc);
2973       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2974                 ResumeThread,
2975                 tso, node, (rtsSpark*)NULL);
2976       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2977       // len_local++;
2978       // len++;
2979     } else { // TSO is remote (actually should be FMBQ)
2980       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2981                                   RtsFlags.GranFlags.Costs.gunblocktime +
2982                                   RtsFlags.GranFlags.Costs.latency;
2983       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2984                 UnblockThread,
2985                 tso, node, (rtsSpark*)NULL);
2986       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2987       // len++;
2988     }
2989     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2990     IF_GRAN_DEBUG(bq,
2991                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2992                           (node_loc==tso_loc ? "Local" : "Global"), 
2993                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2994     tso->block_info.closure = NULL;
2995     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2996                              tso->id, tso));
2997 }
2998 #elif defined(PARALLEL_HASKELL)
2999 StgBlockingQueueElement *
3000 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3001 {
3002     StgBlockingQueueElement *next;
3003
3004     switch (get_itbl(bqe)->type) {
3005     case TSO:
3006       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3007       /* if it's a TSO just push it onto the run_queue */
3008       next = bqe->link;
3009       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3010       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3011       threadRunnable();
3012       unblockCount(bqe, node);
3013       /* reset blocking status after dumping event */
3014       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3015       break;
3016
3017     case BLOCKED_FETCH:
3018       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3019       next = bqe->link;
3020       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3021       PendingFetches = (StgBlockedFetch *)bqe;
3022       break;
3023
3024 # if defined(DEBUG)
3025       /* can ignore this case in a non-debugging setup; 
3026          see comments on RBHSave closures above */
3027     case CONSTR:
3028       /* check that the closure is an RBHSave closure */
3029       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3030              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3031              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3032       break;
3033
3034     default:
3035       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3036            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3037            (StgClosure *)bqe);
3038 # endif
3039     }
3040   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3041   return next;
3042 }
3043 #endif
3044
3045 StgTSO *
3046 unblockOne(Capability *cap, StgTSO *tso)
3047 {
3048   StgTSO *next;
3049
3050   ASSERT(get_itbl(tso)->type == TSO);
3051   ASSERT(tso->why_blocked != NotBlocked);
3052   tso->why_blocked = NotBlocked;
3053   next = tso->link;
3054   tso->link = END_TSO_QUEUE;
3055
3056   // We might have just migrated this TSO to our Capability:
3057   if (tso->bound) {
3058       tso->bound->cap = cap;
3059   }
3060
3061   appendToRunQueue(cap,tso);
3062
3063   // we're holding a newly woken thread, make sure we context switch
3064   // quickly so we can migrate it if necessary.
3065   context_switch = 1;
3066   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3067   return next;
3068 }
3069
3070
3071 #if defined(GRAN)
3072 void 
3073 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3074 {
3075   StgBlockingQueueElement *bqe;
3076   PEs node_loc;
3077   nat len = 0; 
3078
3079   IF_GRAN_DEBUG(bq, 
3080                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3081                       node, CurrentProc, CurrentTime[CurrentProc], 
3082                       CurrentTSO->id, CurrentTSO));
3083
3084   node_loc = where_is(node);
3085
3086   ASSERT(q == END_BQ_QUEUE ||
3087          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3088          get_itbl(q)->type == CONSTR); // closure (type constructor)
3089   ASSERT(is_unique(node));
3090
3091   /* FAKE FETCH: magically copy the node to the tso's proc;
3092      no Fetch necessary because in reality the node should not have been 
3093      moved to the other PE in the first place
3094   */
3095   if (CurrentProc!=node_loc) {
3096     IF_GRAN_DEBUG(bq, 
3097                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3098                         node, node_loc, CurrentProc, CurrentTSO->id, 
3099                         // CurrentTSO, where_is(CurrentTSO),
3100                         node->header.gran.procs));
3101     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3102     IF_GRAN_DEBUG(bq, 
3103                   debugBelch("## new bitmask of node %p is %#x\n",
3104                         node, node->header.gran.procs));
3105     if (RtsFlags.GranFlags.GranSimStats.Global) {
3106       globalGranStats.tot_fake_fetches++;
3107     }
3108   }
3109
3110   bqe = q;
3111   // ToDo: check: ASSERT(CurrentProc==node_loc);
3112   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3113     //next = bqe->link;
3114     /* 
3115        bqe points to the current element in the queue
3116        next points to the next element in the queue
3117     */
3118     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3119     //tso_loc = where_is(tso);
3120     len++;
3121     bqe = unblockOne(bqe, node);
3122   }
3123
3124   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3125      the closure to make room for the anchor of the BQ */
3126   if (bqe!=END_BQ_QUEUE) {
3127     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3128     /*
3129     ASSERT((info_ptr==&RBH_Save_0_info) ||
3130            (info_ptr==&RBH_Save_1_info) ||
3131            (info_ptr==&RBH_Save_2_info));
3132     */
3133     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3134     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3135     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3136
3137     IF_GRAN_DEBUG(bq,
3138                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3139                         node, info_type(node)));
3140   }
3141
3142   /* statistics gathering */
3143   if (RtsFlags.GranFlags.GranSimStats.Global) {
3144     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3145     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3146     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3147     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3148   }
3149   IF_GRAN_DEBUG(bq,
3150                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3151                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3152 }
3153 #elif defined(PARALLEL_HASKELL)
3154 void 
3155 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3156 {
3157   StgBlockingQueueElement *bqe;
3158
3159   IF_PAR_DEBUG(verbose, 
3160                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3161                      node, mytid));
3162 #ifdef DIST  
3163   //RFP
3164   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3165     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3166     return;
3167   }
3168 #endif
3169   
3170   ASSERT(q == END_BQ_QUEUE ||
3171          get_itbl(q)->type == TSO ||           
3172          get_itbl(q)->type == BLOCKED_FETCH || 
3173          get_itbl(q)->type == CONSTR); 
3174
3175   bqe = q;
3176   while (get_itbl(bqe)->type==TSO || 
3177          get_itbl(bqe)->type==BLOCKED_FETCH) {
3178     bqe = unblockOne(bqe, node);
3179   }
3180 }
3181
3182 #else   /* !GRAN && !PARALLEL_HASKELL */
3183
3184 void
3185 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3186 {
3187     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3188                              // Exception.cmm
3189     while (tso != END_TSO_QUEUE) {
3190         tso = unblockOne(cap,tso);
3191     }
3192 }
3193 #endif
3194
3195 /* ---------------------------------------------------------------------------
3196    Interrupt execution
3197    - usually called inside a signal handler so it mustn't do anything fancy.   
3198    ------------------------------------------------------------------------ */
3199
3200 void
3201 interruptStgRts(void)
3202 {
3203     interrupted    = 1;
3204     context_switch = 1;
3205 #if defined(THREADED_RTS)
3206     prodAllCapabilities();
3207 #endif
3208 }
3209
3210 /* -----------------------------------------------------------------------------
3211    Unblock a thread
3212
3213    This is for use when we raise an exception in another thread, which
3214    may be blocked.
3215    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3216    -------------------------------------------------------------------------- */
3217
3218 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3219 /*
3220   NB: only the type of the blocking queue is different in GranSim and GUM
3221       the operations on the queue-elements are the same
3222       long live polymorphism!
3223
3224   Locks: sched_mutex is held upon entry and exit.
3225
3226 */
3227 static void
3228 unblockThread(Capability *cap, StgTSO *tso)
3229 {
3230   StgBlockingQueueElement *t, **last;
3231
3232   switch (tso->why_blocked) {
3233
3234   case NotBlocked:
3235     return;  /* not blocked */
3236
3237   case BlockedOnSTM:
3238     // Be careful: nothing to do here!  We tell the scheduler that the thread
3239     // is runnable and we leave it to the stack-walking code to abort the 
3240     // transaction while unwinding the stack.  We should perhaps have a debugging
3241     // test to make sure that this really happens and that the 'zombie' transaction
3242     // does not get committed.
3243     goto done;
3244
3245   case BlockedOnMVar:
3246     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3247     {
3248       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3249       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3250
3251       last = (StgBlockingQueueElement **)&mvar->head;
3252       for (t = (StgBlockingQueueElement *)mvar->head; 
3253            t != END_BQ_QUEUE; 
3254            last = &t->link, last_tso = t, t = t->link) {
3255         if (t == (StgBlockingQueueElement *)tso) {
3256           *last = (StgBlockingQueueElement *)tso->link;
3257           if (mvar->tail == tso) {
3258             mvar->tail = (StgTSO *)last_tso;
3259           }
3260           goto done;
3261         }
3262       }
3263       barf("unblockThread (MVAR): TSO not found");
3264     }
3265
3266   case BlockedOnBlackHole:
3267     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3268     {
3269       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3270
3271       last = &bq->blocking_queue;
3272       for (t = bq->blocking_queue; 
3273            t != END_BQ_QUEUE; 
3274            last = &t->link, t = t->link) {
3275         if (t == (StgBlockingQueueElement *)tso) {
3276           *last = (StgBlockingQueueElement *)tso->link;
3277           goto done;
3278         }
3279       }
3280       barf("unblockThread (BLACKHOLE): TSO not found");
3281     }
3282
3283   case BlockedOnException:
3284     {
3285       StgTSO *target  = tso->block_info.tso;
3286
3287       ASSERT(get_itbl(target)->type == TSO);
3288
3289       if (target->what_next == ThreadRelocated) {
3290           target = target->link;
3291           ASSERT(get_itbl(target)->type == TSO);
3292       }
3293
3294       ASSERT(target->blocked_exceptions != NULL);
3295
3296       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3297       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3298            t != END_BQ_QUEUE; 
3299            last = &t->link, t = t->link) {
3300         ASSERT(get_itbl(t)->type == TSO);
3301         if (t == (StgBlockingQueueElement *)tso) {
3302           *last = (StgBlockingQueueElement *)tso->link;
3303           goto done;
3304         }
3305       }
3306       barf("unblockThread (Exception): TSO not found");
3307     }
3308
3309   case BlockedOnRead:
3310   case BlockedOnWrite:
3311 #if defined(mingw32_HOST_OS)
3312   case BlockedOnDoProc:
3313 #endif
3314     {
3315       /* take TSO off blocked_queue */
3316       StgBlockingQueueElement *prev = NULL;
3317       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3318            prev = t, t = t->link) {
3319         if (t == (StgBlockingQueueElement *)tso) {
3320           if (prev == NULL) {
3321             blocked_queue_hd = (StgTSO *)t->link;
3322             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3323               blocked_queue_tl = END_TSO_QUEUE;
3324             }
3325           } else {
3326             prev->link = t->link;
3327             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3328               blocked_queue_tl = (StgTSO *)prev;
3329             }
3330           }
3331 #if defined(mingw32_HOST_OS)
3332           /* (Cooperatively) signal that the worker thread should abort
3333            * the request.
3334            */
3335           abandonWorkRequest(tso->block_info.async_result->reqID);
3336 #endif
3337           goto done;
3338         }
3339       }
3340       barf("unblockThread (I/O): TSO not found");
3341     }
3342
3343   case BlockedOnDelay:
3344     {
3345       /* take TSO off sleeping_queue */
3346       StgBlockingQueueElement *prev = NULL;
3347       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3348            prev = t, t = t->link) {
3349         if (t == (StgBlockingQueueElement *)tso) {
3350           if (prev == NULL) {
3351             sleeping_queue = (StgTSO *)t->link;
3352           } else {
3353             prev->link = t->link;
3354           }
3355           goto done;
3356         }
3357       }
3358       barf("unblockThread (delay): TSO not found");
3359     }
3360
3361   default:
3362     barf("unblockThread");
3363   }
3364
3365  done:
3366   tso->link = END_TSO_QUEUE;
3367   tso->why_blocked = NotBlocked;
3368   tso->block_info.closure = NULL;
3369   pushOnRunQueue(cap,tso);
3370 }
3371 #else
3372 static void
3373 unblockThread(Capability *cap, StgTSO *tso)
3374 {
3375   StgTSO *t, **last;
3376   
3377   /* To avoid locking unnecessarily. */
3378   if (tso->why_blocked == NotBlocked) {
3379     return;
3380   }
3381
3382   switch (tso->why_blocked) {
3383
3384   case BlockedOnSTM:
3385     // Be careful: nothing to do here!  We tell the scheduler that the thread
3386     // is runnable and we leave it to the stack-walking code to abort the 
3387     // transaction while unwinding the stack.  We should perhaps have a debugging
3388     // test to make sure that this really happens and that the 'zombie' transaction
3389     // does not get committed.
3390     goto done;
3391
3392   case BlockedOnMVar:
3393     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3394     {
3395       StgTSO *last_tso = END_TSO_QUEUE;
3396       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3397
3398       last = &mvar->head;
3399       for (t = mvar->head; t != END_TSO_QUEUE; 
3400            last = &t->link, last_tso = t, t = t->link) {
3401         if (t == tso) {
3402           *last = tso->link;
3403           if (mvar->tail == tso) {
3404             mvar->tail = last_tso;
3405           }
3406           goto done;
3407         }
3408       }
3409       barf("unblockThread (MVAR): TSO not found");
3410     }
3411
3412   case BlockedOnBlackHole:
3413     {
3414       last = &blackhole_queue;
3415       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3416            last = &t->link, t = t->link) {
3417         if (t == tso) {
3418           *last = tso->link;
3419           goto done;
3420         }
3421       }
3422       barf("unblockThread (BLACKHOLE): TSO not found");
3423     }
3424
3425   case BlockedOnException:
3426     {
3427       StgTSO *target  = tso->block_info.tso;
3428
3429       ASSERT(get_itbl(target)->type == TSO);
3430
3431       while (target->what_next == ThreadRelocated) {
3432           target = target->link;
3433           ASSERT(get_itbl(target)->type == TSO);
3434       }
3435       
3436       ASSERT(target->blocked_exceptions != NULL);
3437
3438       last = &target->blocked_exceptions;
3439       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3440            last = &t->link, t = t->link) {
3441         ASSERT(get_itbl(t)->type == TSO);
3442         if (t == tso) {
3443           *last = tso->link;
3444           goto done;
3445         }
3446       }
3447       barf("unblockThread (Exception): TSO not found");
3448     }
3449
3450 #if !defined(THREADED_RTS)
3451   case BlockedOnRead:
3452   case BlockedOnWrite:
3453 #if defined(mingw32_HOST_OS)
3454   case BlockedOnDoProc:
3455 #endif
3456     {
3457       StgTSO *prev = NULL;
3458       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3459            prev = t, t = t->link) {
3460         if (t == tso) {
3461           if (prev == NULL) {
3462             blocked_queue_hd = t->link;
3463             if (blocked_queue_tl == t) {
3464               blocked_queue_tl = END_TSO_QUEUE;
3465             }
3466           } else {
3467             prev->link = t->link;
3468             if (blocked_queue_tl == t) {
3469               blocked_queue_tl = prev;
3470             }
3471           }
3472 #if defined(mingw32_HOST_OS)
3473           /* (Cooperatively) signal that the worker thread should abort
3474            * the request.
3475            */
3476           abandonWorkRequest(tso->block_info.async_result->reqID);
3477 #endif
3478           goto done;
3479         }
3480       }
3481       barf("unblockThread (I/O): TSO not found");
3482     }
3483
3484   case BlockedOnDelay:
3485     {
3486       StgTSO *prev = NULL;
3487       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3488            prev = t, t = t->link) {
3489         if (t == tso) {
3490           if (prev == NULL) {
3491             sleeping_queue = t->link;
3492           } else {
3493             prev->link = t->link;
3494           }
3495           goto done;
3496         }
3497       }
3498       barf("unblockThread (delay): TSO not found");
3499     }
3500 #endif
3501
3502   default:
3503     barf("unblockThread");
3504   }
3505
3506  done:
3507   tso->link = END_TSO_QUEUE;
3508   tso->why_blocked = NotBlocked;
3509   tso->block_info.closure = NULL;
3510   appendToRunQueue(cap,tso);
3511 }
3512 #endif
3513
3514 /* -----------------------------------------------------------------------------
3515  * checkBlackHoles()
3516  *
3517  * Check the blackhole_queue for threads that can be woken up.  We do
3518  * this periodically: before every GC, and whenever the run queue is
3519  * empty.
3520  *
3521  * An elegant solution might be to just wake up all the blocked
3522  * threads with awakenBlockedQueue occasionally: they'll go back to
3523  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3524  * doesn't give us a way to tell whether we've actually managed to
3525  * wake up any threads, so we would be busy-waiting.
3526  *
3527  * -------------------------------------------------------------------------- */
3528
3529 static rtsBool
3530 checkBlackHoles (Capability *cap)
3531 {
3532     StgTSO **prev, *t;
3533     rtsBool any_woke_up = rtsFalse;
3534     StgHalfWord type;
3535
3536     // blackhole_queue is global:
3537     ASSERT_LOCK_HELD(&sched_mutex);
3538
3539     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3540
3541     // ASSUMES: sched_mutex
3542     prev = &blackhole_queue;
3543     t = blackhole_queue;
3544     while (t != END_TSO_QUEUE) {
3545         ASSERT(t->why_blocked == BlockedOnBlackHole);
3546         type = get_itbl(t->block_info.closure)->type;
3547         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3548             IF_DEBUG(sanity,checkTSO(t));
3549             t = unblockOne(cap, t);
3550             // urk, the threads migrate to the current capability
3551             // here, but we'd like to keep them on the original one.
3552             *prev = t;
3553             any_woke_up = rtsTrue;
3554         } else {
3555             prev = &t->link;
3556             t = t->link;
3557         }
3558     }
3559
3560     return any_woke_up;
3561 }
3562
3563 /* -----------------------------------------------------------------------------
3564  * raiseAsync()
3565  *
3566  * The following function implements the magic for raising an
3567  * asynchronous exception in an existing thread.
3568  *
3569  * We first remove the thread from any queue on which it might be
3570  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3571  *
3572  * We strip the stack down to the innermost CATCH_FRAME, building
3573  * thunks in the heap for all the active computations, so they can 
3574  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3575  * an application of the handler to the exception, and push it on
3576  * the top of the stack.
3577  * 
3578  * How exactly do we save all the active computations?  We create an
3579  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3580  * AP_STACKs pushes everything from the corresponding update frame
3581  * upwards onto the stack.  (Actually, it pushes everything up to the
3582  * next update frame plus a pointer to the next AP_STACK object.
3583  * Entering the next AP_STACK object pushes more onto the stack until we
3584  * reach the last AP_STACK object - at which point the stack should look
3585  * exactly as it did when we killed the TSO and we can continue
3586  * execution by entering the closure on top of the stack.
3587  *
3588  * We can also kill a thread entirely - this happens if either (a) the 
3589  * exception passed to raiseAsync is NULL, or (b) there's no
3590  * CATCH_FRAME on the stack.  In either case, we strip the entire
3591  * stack and replace the thread with a zombie.
3592  *
3593  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3594  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3595  * the TSO is currently blocked on or on the run queue of.
3596  *
3597  * -------------------------------------------------------------------------- */
3598  
3599 void
3600 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3601 {
3602     raiseAsync_(cap, tso, exception, rtsFalse);
3603 }
3604
3605 static void
3606 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3607             rtsBool stop_at_atomically)
3608 {
3609     StgRetInfoTable *info;
3610     StgPtr sp;
3611   
3612     // Thread already dead?
3613     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3614         return;
3615     }
3616
3617     IF_DEBUG(scheduler, 
3618              sched_belch("raising exception in thread %ld.", (long)tso->id));
3619     
3620     // Remove it from any blocking queues
3621     unblockThread(cap,tso);
3622
3623     sp = tso->sp;
3624     
3625     // The stack freezing code assumes there's a closure pointer on
3626     // the top of the stack, so we have to arrange that this is the case...
3627     //
3628     if (sp[0] == (W_)&stg_enter_info) {
3629         sp++;
3630     } else {
3631         sp--;
3632         sp[0] = (W_)&stg_dummy_ret_closure;
3633     }
3634
3635     while (1) {
3636         nat i;
3637
3638         // 1. Let the top of the stack be the "current closure"
3639         //
3640         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3641         // CATCH_FRAME.
3642         //
3643         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3644         // current closure applied to the chunk of stack up to (but not
3645         // including) the update frame.  This closure becomes the "current
3646         // closure".  Go back to step 2.
3647         //
3648         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3649         // top of the stack applied to the exception.
3650         // 
3651         // 5. If it's a STOP_FRAME, then kill the thread.
3652         // 
3653         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3654         // transaction
3655        
3656         
3657         StgPtr frame;
3658         
3659         frame = sp + 1;
3660         info = get_ret_itbl((StgClosure *)frame);
3661         
3662         while (info->i.type != UPDATE_FRAME
3663                && (info->i.type != CATCH_FRAME || exception == NULL)
3664                && info->i.type != STOP_FRAME
3665                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3666         {
3667             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3668               // IF we find an ATOMICALLY_FRAME then we abort the
3669               // current transaction and propagate the exception.  In
3670               // this case (unlike ordinary exceptions) we do not care
3671               // whether the transaction is valid or not because its
3672               // possible validity cannot have caused the exception
3673               // and will not be visible after the abort.
3674               IF_DEBUG(stm,
3675                        debugBelch("Found atomically block delivering async exception\n"));
3676               stmAbortTransaction(tso -> trec);
3677               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3678             }
3679             frame += stack_frame_sizeW((StgClosure *)frame);
3680             info = get_ret_itbl((StgClosure *)frame);
3681         }
3682         
3683         switch (info->i.type) {
3684             
3685         case ATOMICALLY_FRAME:
3686             ASSERT(stop_at_atomically);
3687             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3688             stmCondemnTransaction(tso -> trec);
3689 #ifdef REG_R1
3690             tso->sp = frame;
3691 #else
3692             // R1 is not a register: the return convention for IO in
3693             // this case puts the return value on the stack, so we
3694             // need to set up the stack to return to the atomically
3695             // frame properly...
3696             tso->sp = frame - 2;
3697             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3698             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3699 #endif
3700             tso->what_next = ThreadRunGHC;
3701             return;
3702
3703         case CATCH_FRAME:
3704             // If we find a CATCH_FRAME, and we've got an exception to raise,
3705             // then build the THUNK raise(exception), and leave it on
3706             // top of the CATCH_FRAME ready to enter.
3707             //
3708         {
3709 #ifdef PROFILING
3710             StgCatchFrame *cf = (StgCatchFrame *)frame;
3711 #endif
3712             StgThunk *raise;
3713             
3714             // we've got an exception to raise, so let's pass it to the
3715             // handler in this frame.
3716             //
3717             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3718             TICK_ALLOC_SE_THK(1,0);
3719             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3720             raise->payload[0] = exception;
3721             
3722             // throw away the stack from Sp up to the CATCH_FRAME.
3723             //
3724             sp = frame - 1;
3725             
3726             /* Ensure that async excpetions are blocked now, so we don't get
3727              * a surprise exception before we get around to executing the
3728              * handler.
3729              */
3730             if (tso->blocked_exceptions == NULL) {
3731                 tso->blocked_exceptions = END_TSO_QUEUE;
3732             }
3733             
3734             /* Put the newly-built THUNK on top of the stack, ready to execute
3735              * when the thread restarts.
3736              */
3737             sp[0] = (W_)raise;
3738             sp[-1] = (W_)&stg_enter_info;
3739             tso->sp = sp-1;
3740             tso->what_next = ThreadRunGHC;
3741             IF_DEBUG(sanity, checkTSO(tso));
3742             return;
3743         }
3744         
3745         case UPDATE_FRAME:
3746         {
3747             StgAP_STACK * ap;
3748             nat words;
3749             
3750             // First build an AP_STACK consisting of the stack chunk above the
3751             // current update frame, with the top word on the stack as the
3752             // fun field.
3753             //
3754             words = frame - sp - 1;
3755             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3756             
3757             ap->size = words;
3758             ap->fun  = (StgClosure *)sp[0];
3759             sp++;
3760             for(i=0; i < (nat)words; ++i) {
3761                 ap->payload[i] = (StgClosure *)*sp++;
3762             }
3763             
3764             SET_HDR(ap,&stg_AP_STACK_info,
3765                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3766             TICK_ALLOC_UP_THK(words+1,0);
3767             
3768             IF_DEBUG(scheduler,
3769                      debugBelch("sched: Updating ");
3770                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3771                      debugBelch(" with ");
3772                      printObj((StgClosure *)ap);
3773                 );
3774
3775             // Replace the updatee with an indirection - happily
3776             // this will also wake up any threads currently
3777             // waiting on the result.
3778             //
3779             // Warning: if we're in a loop, more than one update frame on
3780             // the stack may point to the same object.  Be careful not to
3781             // overwrite an IND_OLDGEN in this case, because we'll screw
3782             // up the mutable lists.  To be on the safe side, don't
3783             // overwrite any kind of indirection at all.  See also
3784             // threadSqueezeStack in GC.c, where we have to make a similar
3785             // check.
3786             //
3787             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3788                 // revert the black hole
3789                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3790                                (StgClosure *)ap);
3791             }
3792             sp += sizeofW(StgUpdateFrame) - 1;
3793             sp[0] = (W_)ap; // push onto stack
3794             break;
3795         }
3796         
3797         case STOP_FRAME:
3798             // We've stripped the entire stack, the thread is now dead.
3799             sp += sizeofW(StgStopFrame);
3800             tso->what_next = ThreadKilled;
3801             tso->sp = sp;
3802             return;
3803             
3804         default:
3805             barf("raiseAsync");
3806         }
3807     }
3808     barf("raiseAsync");
3809 }
3810
3811 /* -----------------------------------------------------------------------------
3812    Deleting threads
3813
3814    This is used for interruption (^C) and forking, and corresponds to
3815    raising an exception but without letting the thread catch the
3816    exception.
3817    -------------------------------------------------------------------------- */
3818
3819 static void 
3820 deleteThread (Capability *cap, StgTSO *tso)
3821 {
3822   if (tso->why_blocked != BlockedOnCCall &&
3823       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3824       raiseAsync(cap,tso,NULL);
3825   }
3826 }
3827
3828 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3829 static void 
3830 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3831 { // for forkProcess only:
3832   // delete thread without giving it a chance to catch the KillThread exception
3833
3834   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3835       return;
3836   }
3837
3838   if (tso->why_blocked != BlockedOnCCall &&
3839       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3840       unblockThread(cap,tso);
3841   }
3842
3843   tso->what_next = ThreadKilled;
3844 }
3845 #endif
3846
3847 /* -----------------------------------------------------------------------------
3848    raiseExceptionHelper
3849    
3850    This function is called by the raise# primitve, just so that we can
3851    move some of the tricky bits of raising an exception from C-- into
3852    C.  Who knows, it might be a useful re-useable thing here too.
3853    -------------------------------------------------------------------------- */
3854
3855 StgWord
3856 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3857 {
3858     Capability *cap = regTableToCapability(reg);
3859     StgThunk *raise_closure = NULL;
3860     StgPtr p, next;
3861     StgRetInfoTable *info;
3862     //
3863     // This closure represents the expression 'raise# E' where E
3864     // is the exception raise.  It is used to overwrite all the
3865     // thunks which are currently under evaluataion.
3866     //
3867
3868     //    
3869     // LDV profiling: stg_raise_info has THUNK as its closure
3870     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3871     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3872     // 1 does not cause any problem unless profiling is performed.
3873     // However, when LDV profiling goes on, we need to linearly scan
3874     // small object pool, where raise_closure is stored, so we should
3875     // use MIN_UPD_SIZE.
3876     //
3877     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3878     //                                 sizeofW(StgClosure)+1);
3879     //
3880
3881     //
3882     // Walk up the stack, looking for the catch frame.  On the way,
3883     // we update any closures pointed to from update frames with the
3884     // raise closure that we just built.
3885     //
3886     p = tso->sp;
3887     while(1) {
3888         info = get_ret_itbl((StgClosure *)p);
3889         next = p + stack_frame_sizeW((StgClosure *)p);
3890         switch (info->i.type) {
3891             
3892         case UPDATE_FRAME:
3893             // Only create raise_closure if we need to.
3894             if (raise_closure == NULL) {
3895                 raise_closure = 
3896                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3897                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3898                 raise_closure->payload[0] = exception;
3899             }
3900             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3901             p = next;
3902             continue;
3903
3904         case ATOMICALLY_FRAME:
3905             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3906             tso->sp = p;
3907             return ATOMICALLY_FRAME;
3908             
3909         case CATCH_FRAME:
3910             tso->sp = p;
3911             return CATCH_FRAME;
3912
3913         case CATCH_STM_FRAME:
3914             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3915             tso->sp = p;
3916             return CATCH_STM_FRAME;
3917             
3918         case STOP_FRAME:
3919             tso->sp = p;
3920             return STOP_FRAME;
3921
3922         case CATCH_RETRY_FRAME:
3923         default:
3924             p = next; 
3925             continue;
3926         }
3927     }
3928 }
3929
3930
3931 /* -----------------------------------------------------------------------------
3932    findRetryFrameHelper
3933
3934    This function is called by the retry# primitive.  It traverses the stack
3935    leaving tso->sp referring to the frame which should handle the retry.  
3936
3937    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3938    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3939
3940    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3941    despite the similar implementation.
3942
3943    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3944    not be created within memory transactions.
3945    -------------------------------------------------------------------------- */
3946
3947 StgWord
3948 findRetryFrameHelper (StgTSO *tso)
3949 {
3950   StgPtr           p, next;
3951   StgRetInfoTable *info;
3952
3953   p = tso -> sp;
3954   while (1) {
3955     info = get_ret_itbl((StgClosure *)p);
3956     next = p + stack_frame_sizeW((StgClosure *)p);
3957     switch (info->i.type) {
3958       
3959     case ATOMICALLY_FRAME:
3960       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3961       tso->sp = p;
3962       return ATOMICALLY_FRAME;
3963       
3964     case CATCH_RETRY_FRAME:
3965       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3966       tso->sp = p;
3967       return CATCH_RETRY_FRAME;
3968       
3969     case CATCH_STM_FRAME:
3970     default:
3971       ASSERT(info->i.type != CATCH_FRAME);
3972       ASSERT(info->i.type != STOP_FRAME);
3973       p = next; 
3974       continue;
3975     }
3976   }
3977 }
3978
3979 /* -----------------------------------------------------------------------------
3980    resurrectThreads is called after garbage collection on the list of
3981    threads found to be garbage.  Each of these threads will be woken
3982    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3983    on an MVar, or NonTermination if the thread was blocked on a Black
3984    Hole.
3985
3986    Locks: assumes we hold *all* the capabilities.
3987    -------------------------------------------------------------------------- */
3988
3989 void
3990 resurrectThreads (StgTSO *threads)
3991 {
3992     StgTSO *tso, *next;
3993     Capability *cap;
3994
3995     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3996         next = tso->global_link;
3997         tso->global_link = all_threads;
3998         all_threads = tso;
3999         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4000         
4001         // Wake up the thread on the Capability it was last on for a
4002         // bound thread, or last_free_capability otherwise.
4003         if (tso->bound) {
4004             cap = tso->bound->cap;
4005         } else {
4006             cap = last_free_capability;
4007         }
4008         
4009         switch (tso->why_blocked) {
4010         case BlockedOnMVar:
4011         case BlockedOnException:
4012             /* Called by GC - sched_mutex lock is currently held. */
4013             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4014             break;
4015         case BlockedOnBlackHole:
4016             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4017             break;
4018         case BlockedOnSTM:
4019             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4020             break;
4021         case NotBlocked:
4022             /* This might happen if the thread was blocked on a black hole
4023              * belonging to a thread that we've just woken up (raiseAsync
4024              * can wake up threads, remember...).
4025              */
4026             continue;
4027         default:
4028             barf("resurrectThreads: thread blocked in a strange way");
4029         }
4030     }
4031 }
4032
4033 /* ----------------------------------------------------------------------------
4034  * Debugging: why is a thread blocked
4035  * [Also provides useful information when debugging threaded programs
4036  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4037    ------------------------------------------------------------------------- */
4038
4039 #if DEBUG
4040 static void
4041 printThreadBlockage(StgTSO *tso)
4042 {
4043   switch (tso->why_blocked) {
4044   case BlockedOnRead:
4045     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4046     break;
4047   case BlockedOnWrite:
4048     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4049     break;
4050 #if defined(mingw32_HOST_OS)
4051     case BlockedOnDoProc:
4052     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4053     break;
4054 #endif
4055   case BlockedOnDelay:
4056     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4057     break;
4058   case BlockedOnMVar:
4059     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4060     break;
4061   case BlockedOnException:
4062     debugBelch("is blocked on delivering an exception to thread %d",
4063             tso->block_info.tso->id);
4064     break;
4065   case BlockedOnBlackHole:
4066     debugBelch("is blocked on a black hole");
4067     break;
4068   case NotBlocked:
4069     debugBelch("is not blocked");
4070     break;
4071 #if defined(PARALLEL_HASKELL)
4072   case BlockedOnGA:
4073     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4074             tso->block_info.closure, info_type(tso->block_info.closure));
4075     break;
4076   case BlockedOnGA_NoSend:
4077     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4078             tso->block_info.closure, info_type(tso->block_info.closure));
4079     break;
4080 #endif
4081   case BlockedOnCCall:
4082     debugBelch("is blocked on an external call");
4083     break;
4084   case BlockedOnCCall_NoUnblockExc:
4085     debugBelch("is blocked on an external call (exceptions were already blocked)");
4086     break;
4087   case BlockedOnSTM:
4088     debugBelch("is blocked on an STM operation");
4089     break;
4090   default:
4091     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4092          tso->why_blocked, tso->id, tso);
4093   }
4094 }
4095
4096 void
4097 printThreadStatus(StgTSO *t)
4098 {
4099     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4100     {
4101       void *label = lookupThreadLabel(t->id);
4102       if (label) debugBelch("[\"%s\"] ",(char *)label);
4103     }
4104     if (t->what_next == ThreadRelocated) {
4105         debugBelch("has been relocated...\n");
4106     } else {
4107         switch (t->what_next) {
4108         case ThreadKilled:
4109             debugBelch("has been killed");
4110             break;
4111         case ThreadComplete:
4112             debugBelch("has completed");
4113             break;
4114         default:
4115             printThreadBlockage(t);
4116         }
4117         debugBelch("\n");
4118     }
4119 }
4120
4121 void
4122 printAllThreads(void)
4123 {
4124   StgTSO *t, *next;
4125   nat i;
4126   Capability *cap;
4127
4128 # if defined(GRAN)
4129   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4130   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4131                        time_string, rtsFalse/*no commas!*/);
4132
4133   debugBelch("all threads at [%s]:\n", time_string);
4134 # elif defined(PARALLEL_HASKELL)
4135   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4136   ullong_format_string(CURRENT_TIME,
4137                        time_string, rtsFalse/*no commas!*/);
4138
4139   debugBelch("all threads at [%s]:\n", time_string);
4140 # else
4141   debugBelch("all threads:\n");
4142 # endif
4143
4144   for (i = 0; i < n_capabilities; i++) {
4145       cap = &capabilities[i];
4146       debugBelch("threads on capability %d:\n", cap->no);
4147       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4148           printThreadStatus(t);
4149       }
4150   }
4151
4152   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4153       if (t->why_blocked != NotBlocked) {
4154           printThreadStatus(t);
4155       }
4156       if (t->what_next == ThreadRelocated) {
4157           next = t->link;
4158       } else {
4159           next = t->global_link;
4160       }
4161   }
4162 }
4163
4164 // useful from gdb
4165 void 
4166 printThreadQueue(StgTSO *t)
4167 {
4168     nat i = 0;
4169     for (; t != END_TSO_QUEUE; t = t->link) {
4170         printThreadStatus(t);
4171         i++;
4172     }
4173     debugBelch("%d threads on queue\n", i);
4174 }
4175
4176 /* 
4177    Print a whole blocking queue attached to node (debugging only).
4178 */
4179 # if defined(PARALLEL_HASKELL)
4180 void 
4181 print_bq (StgClosure *node)
4182 {
4183   StgBlockingQueueElement *bqe;
4184   StgTSO *tso;
4185   rtsBool end;
4186
4187   debugBelch("## BQ of closure %p (%s): ",
4188           node, info_type(node));
4189
4190   /* should cover all closures that may have a blocking queue */
4191   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4192          get_itbl(node)->type == FETCH_ME_BQ ||
4193          get_itbl(node)->type == RBH ||
4194          get_itbl(node)->type == MVAR);
4195     
4196   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4197
4198   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4199 }
4200
4201 /* 
4202    Print a whole blocking queue starting with the element bqe.
4203 */
4204 void 
4205 print_bqe (StgBlockingQueueElement *bqe)
4206 {
4207   rtsBool end;
4208
4209   /* 
4210      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4211   */
4212   for (end = (bqe==END_BQ_QUEUE);
4213        !end; // iterate until bqe points to a CONSTR
4214        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4215        bqe = end ? END_BQ_QUEUE : bqe->link) {
4216     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4217     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4218     /* types of closures that may appear in a blocking queue */
4219     ASSERT(get_itbl(bqe)->type == TSO ||           
4220            get_itbl(bqe)->type == BLOCKED_FETCH || 
4221            get_itbl(bqe)->type == CONSTR); 
4222     /* only BQs of an RBH end with an RBH_Save closure */
4223     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4224
4225     switch (get_itbl(bqe)->type) {
4226     case TSO:
4227       debugBelch(" TSO %u (%x),",
4228               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4229       break;
4230     case BLOCKED_FETCH:
4231       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4232               ((StgBlockedFetch *)bqe)->node, 
4233               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4234               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4235               ((StgBlockedFetch *)bqe)->ga.weight);
4236       break;
4237     case CONSTR:
4238       debugBelch(" %s (IP %p),",
4239               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4240                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4241                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4242                "RBH_Save_?"), get_itbl(bqe));
4243       break;
4244     default:
4245       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4246            info_type((StgClosure *)bqe)); // , node, info_type(node));
4247       break;
4248     }
4249   } /* for */
4250   debugBelch("\n");
4251 }
4252 # elif defined(GRAN)
4253 void 
4254 print_bq (StgClosure *node)
4255 {
4256   StgBlockingQueueElement *bqe;
4257   PEs node_loc, tso_loc;
4258   rtsBool end;
4259
4260   /* should cover all closures that may have a blocking queue */
4261   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4262          get_itbl(node)->type == FETCH_ME_BQ ||
4263          get_itbl(node)->type == RBH);
4264     
4265   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4266   node_loc = where_is(node);
4267
4268   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4269           node, info_type(node), node_loc);
4270
4271   /* 
4272      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4273   */
4274   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4275        !end; // iterate until bqe points to a CONSTR
4276        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4277     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4278     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4279     /* types of closures that may appear in a blocking queue */
4280     ASSERT(get_itbl(bqe)->type == TSO ||           
4281            get_itbl(bqe)->type == CONSTR); 
4282     /* only BQs of an RBH end with an RBH_Save closure */
4283     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4284
4285     tso_loc = where_is((StgClosure *)bqe);
4286     switch (get_itbl(bqe)->type) {
4287     case TSO:
4288       debugBelch(" TSO %d (%p) on [PE %d],",
4289               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4290       break;
4291     case CONSTR:
4292       debugBelch(" %s (IP %p),",
4293               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4294                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4295                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4296                "RBH_Save_?"), get_itbl(bqe));
4297       break;
4298     default:
4299       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4300            info_type((StgClosure *)bqe), node, info_type(node));
4301       break;
4302     }
4303   } /* for */
4304   debugBelch("\n");
4305 }
4306 # endif
4307
4308 #if defined(PARALLEL_HASKELL)
4309 static nat
4310 run_queue_len(void)
4311 {
4312     nat i;
4313     StgTSO *tso;
4314     
4315     for (i=0, tso=run_queue_hd; 
4316          tso != END_TSO_QUEUE;
4317          i++, tso=tso->link) {
4318         /* nothing */
4319     }
4320         
4321     return i;
4322 }
4323 #endif
4324
4325 void
4326 sched_belch(char *s, ...)
4327 {
4328     va_list ap;
4329     va_start(ap,s);
4330 #ifdef THREADED_RTS
4331     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4332 #elif defined(PARALLEL_HASKELL)
4333     debugBelch("== ");
4334 #else
4335     debugBelch("sched: ");
4336 #endif
4337     vdebugBelch(s, ap);
4338     debugBelch("\n");
4339     va_end(ap);
4340 }
4341
4342 #endif /* DEBUG */