X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=1c55585dd1c66d8d1970c7c02afea11d3123ba4b;hb=30681e796f707fa109aaf756d4586049f595195d;hp=8f3f7e323c46b9530dae7836a8186d9070adf902;hpb=c4729619f98b8aee2ea5555638f939518b7edd00;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 8f3f7e3..1c55585 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.26 1999/10/04 16:13:18 simonmar Exp $ + * $Id: Schedule.c,v 1.31 1999/11/09 15:46:54 simonmar Exp $ * * (c) The GHC Team, 1998-1999 * @@ -7,6 +7,26 @@ * * ---------------------------------------------------------------------------*/ +/* Version with scheduler monitor support for SMPs. + + This design provides a high-level API to create and schedule threads etc. + as documented in the SMP design document. + + It uses a monitor design controlled by a single mutex to exercise control + over accesses to shared data structures, and builds on the Posix threads + library. + + The majority of state is shared. In order to keep essential per-task state, + there is a Capability structure, which contains all the information + needed to run a thread: its STG registers, a pointer to its TSO, a + nursery etc. During STG execution, a pointer to the capability is + kept in a register (BaseReg). + + In a non-SMP build, there is one global capability, namely MainRegTable. + + SDM & KH, 10/99 +*/ + #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -25,24 +45,64 @@ #include "Signals.h" #include "Profiling.h" #include "Sanity.h" +#include "Stats.h" + +/* Main threads: + * + * These are the threads which clients have requested that we run. + * + * In an SMP build, we might have several concurrent clients all + * waiting for results, and each one will wait on a condition variable + * until the result is available. + * + * In non-SMP, clients are strictly nested: the first client calls + * into the RTS, which might call out again to C with a _ccall_GC, and + * eventually re-enter the RTS. + * + * Main threads information is kept in a linked list: + */ +typedef struct StgMainThread_ { + StgTSO * tso; + SchedulerStatus stat; + StgClosure ** ret; +#ifdef SMP + pthread_cond_t wakeup; +#endif + struct StgMainThread_ *link; +} StgMainThread; + +/* Main thread queue. + * Locks required: sched_mutex. + */ +static StgMainThread *main_threads; +/* Thread queues. + * Locks required: sched_mutex. + */ StgTSO *run_queue_hd, *run_queue_tl; StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *ccalling_threads; -#define MAX_SCHEDULE_NESTING 256 -nat next_main_thread; -StgTSO *main_threads[MAX_SCHEDULE_NESTING]; +/* Threads suspended in _ccall_GC. + * Locks required: sched_mutex. + */ +static StgTSO *suspended_ccalling_threads; static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); +/* KH: The following two flags are shared memory locations. There is no need + to lock them, since they are only unset at the end of a scheduler + operation. +*/ + /* flag set by signal handler to precipitate a context switch */ nat context_switch; /* if this flag is set as well, give up execution */ static nat interrupted; -/* Next thread ID to allocate */ +/* Next thread ID to allocate. + * Locks required: sched_mutex + */ StgThreadID next_thread_id = 1; /* @@ -50,14 +110,7 @@ StgThreadID next_thread_id = 1; * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell * thread. If CurrentTSO == NULL, then we're at the scheduler level. */ -StgTSO *CurrentTSO; -StgRegTable MainRegTable; - -/* - * The thread state for the main thread. - */ -StgTSO *MainTSO; - + /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) @@ -70,6 +123,478 @@ StgTSO *MainTSO; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +/* Free capability list. + * Locks required: sched_mutex. + */ +#ifdef SMP +Capability *free_capabilities; /* Available capabilities for running threads */ +nat n_free_capabilities; /* total number of available capabilities */ +#else +Capability MainRegTable; /* for non-SMP, we have one global capability */ +#endif + +rtsBool ready_to_gc; + +/* All our current task ids, saved in case we need to kill them later. + */ +#ifdef SMP +task_info *task_ids; +#endif + +void addToBlockedQueue ( StgTSO *tso ); + +static void schedule ( void ); +static void initThread ( StgTSO *tso, nat stack_size ); + void interruptStgRts ( void ); + +#ifdef SMP +pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER; + +nat await_death; +#endif + +/* ----------------------------------------------------------------------------- + Main scheduling loop. + + We use round-robin scheduling, each thread returning to the + scheduler loop when one of these conditions is detected: + + * out of heap space + * timer expires (thread yields) + * thread blocks + * thread ends + * stack overflow + + Locking notes: we acquire the scheduler lock once at the beginning + of the scheduler loop, and release it when + + * running a thread, or + * waiting for work, or + * waiting for a GC to complete. + + -------------------------------------------------------------------------- */ + +static void +schedule( void ) +{ + StgTSO *t; + Capability *cap; + StgThreadReturnCode ret; + + ACQUIRE_LOCK(&sched_mutex); + + while (1) { + + /* Check whether any waiting threads need to be woken up. If the + * run queue is empty, and there are no other tasks running, we + * can wait indefinitely for something to happen. + * ToDo: what if another client comes along & requests another + * main thread? + */ + if (blocked_queue_hd != END_TSO_QUEUE) { + awaitEvent( + (run_queue_hd == END_TSO_QUEUE) +#ifdef SMP + && (n_free_capabilities == RtsFlags.ConcFlags.nNodes) +#endif + ); + } + + /* check for signals each time around the scheduler */ +#ifndef __MINGW32__ + if (signals_pending()) { + start_signal_handlers(); + } +#endif + + /* Detect deadlock: when we have no threads to run, there are + * no threads waiting on I/O or sleeping, and all the other + * tasks are waiting for work, we must have a deadlock. Inform + * all the main threads. + */ +#ifdef SMP + if (blocked_queue_hd == END_TSO_QUEUE + && run_queue_hd == END_TSO_QUEUE + && (n_free_capabilities == RtsFlags.ConcFlags.nNodes) + ) { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + m->ret = NULL; + m->stat = Deadlock; + pthread_cond_broadcast(&m->wakeup); + } + main_threads = NULL; + } +#else /* ! SMP */ + if (blocked_queue_hd == END_TSO_QUEUE + && run_queue_hd == END_TSO_QUEUE) { + StgMainThread *m = main_threads; + m->ret = NULL; + m->stat = Deadlock; + main_threads = m->link; + return; + } +#endif + +#ifdef SMP + /* If there's a GC pending, don't do anything until it has + * completed. + */ + if (ready_to_gc) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n", + pthread_self());); + pthread_cond_wait(&gc_pending_cond, &sched_mutex); + } + + /* block until we've got a thread on the run queue and a free + * capability. + */ + while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): waiting for work\n", + pthread_self());); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): work now available\n", + pthread_self());); + } +#endif + + /* grab a thread from the run queue + */ + t = POP_RUN_QUEUE(); + + /* grab a capability + */ +#ifdef SMP + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = t; + + /* set the context_switch flag + */ + if (run_queue_hd == END_TSO_QUEUE) + context_switch = 0; + else + context_switch = 1; + + RELEASE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id)); +#else + IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id)); +#endif + + /* Run the current thread + */ + switch (cap->rCurrentTSO->whatNext) { + case ThreadKilled: + case ThreadComplete: + /* Thread already finished, return to scheduler. */ + ret = ThreadFinished; + break; + case ThreadEnterGHC: + ret = StgRun((StgFunPtr) stg_enterStackTop, cap); + break; + case ThreadRunGHC: + ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); + break; + case ThreadEnterHugs: +#ifdef INTERPRETER + { + StgClosure* c; + IF_DEBUG(scheduler,belch("schedule: entering Hugs")); + c = (StgClosure *)(cap->rCurrentTSO->sp[0]); + cap->rCurrentTSO->sp += 1; + ret = enter(cap,c); + break; + } +#else + barf("Panic: entered a BCO but no bytecode interpreter in this build"); +#endif + default: + barf("schedule: invalid whatNext field"); + } + + /* Costs for the scheduler are assigned to CCS_SYSTEM */ +#ifdef PROFILING + CCCS = CCS_SYSTEM; +#endif + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): ", pthread_self());); +#else + IF_DEBUG(scheduler,fprintf(stderr,"schedule: ");); +#endif + t = cap->rCurrentTSO; + + switch (ret) { + case HeapOverflow: + /* make all the running tasks block on a condition variable, + * maybe set context_switch and wait till they all pile in, + * then have them wait on a GC condition variable. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id)); + threadPaused(t); + + ready_to_gc = rtsTrue; + context_switch = 1; /* stop other threads ASAP */ + PUSH_ON_RUN_QUEUE(t); + break; + + case StackOverflow: + /* just adjust the stack for this thread, then pop it back + * on the run queue. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id)); + threadPaused(t); + { + StgMainThread *m; + /* enlarge the stack */ + StgTSO *new_t = threadStackOverflow(t); + + /* This TSO has moved, so update any pointers to it from the + * main thread stack. It better not be on any other queues... + * (it shouldn't be) + */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso == t) { + m->tso = new_t; + } + } + PUSH_ON_RUN_QUEUE(new_t); + } + break; + + case ThreadYielding: + /* put the thread back on the run queue. Then, if we're ready to + * GC, check whether this is the last task to stop. If so, wake + * up the GC thread. getThread will block during a GC until the + * GC is finished. + */ + IF_DEBUG(scheduler, + if (t->whatNext == ThreadEnterHugs) { + /* ToDo: or maybe a timer expired when we were in Hugs? + * or maybe someone hit ctrl-C + */ + belch("thread %ld stopped to switch to Hugs", t->id); + } else { + belch("thread %ld stopped, yielding", t->id); + } + ); + threadPaused(t); + APPEND_TO_RUN_QUEUE(t); + break; + + case ThreadBlocked: + /* don't need to do anything. Either the thread is blocked on + * I/O, in which case we'll have called addToBlockedQueue + * previously, or it's blocked on an MVar or Blackhole, in which + * case it'll be on the relevant queue already. + */ + IF_DEBUG(scheduler, + fprintf(stderr, "thread %d stopped, ", t->id); + printThreadBlockage(t); + fprintf(stderr, "\n")); + threadPaused(t); + break; + + case ThreadFinished: + /* Need to check whether this was a main thread, and if so, signal + * the task that started it with the return value. If we have no + * more main threads, we probably need to stop all the tasks until + * we get a new one. + */ + IF_DEBUG(scheduler,belch("thread %ld finished", t->id)); + t->whatNext = ThreadComplete; + break; + + default: + barf("doneThread: invalid thread return code"); + } + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + +#ifdef SMP + if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) { +#else + if (ready_to_gc) { +#endif + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self())); +#endif + GarbageCollect(GetRoots); + ready_to_gc = rtsFalse; +#ifdef SMP + pthread_cond_broadcast(&gc_pending_cond); +#endif + } + + /* Go through the list of main threads and wake up any + * clients whose computations have finished. ToDo: this + * should be done more efficiently without a linear scan + * of the main threads list, somehow... + */ +#ifdef SMP + { + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->whatNext == ThreadComplete) { + if (m->ret) { + *(m->ret) = (StgClosure *)m->tso->sp[0]; + } + *prev = m->link; + m->stat = Success; + pthread_cond_broadcast(&m->wakeup); + } + if (m->tso->whatNext == ThreadKilled) { + *prev = m->link; + m->stat = Killed; + pthread_cond_broadcast(&m->wakeup); + } + } + } +#else + /* If our main thread has finished or been killed, return. + * If we were re-entered as a result of a _ccall_gc, then + * pop the blocked thread off the ccalling_threads stack back + * into CurrentTSO. + */ + { + StgMainThread *m = main_threads; + if (m->tso->whatNext == ThreadComplete + || m->tso->whatNext == ThreadKilled) { + main_threads = main_threads->link; + if (m->tso->whatNext == ThreadComplete) { + /* we finished successfully, fill in the return value */ + if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; }; + m->stat = Success; + return; + } else { + m->stat = Killed; + return; + } + } + } +#endif + + } /* end of while(1) */ +} + +/* ----------------------------------------------------------------------------- + * Suspending & resuming Haskell threads. + * + * When making a "safe" call to C (aka _ccall_GC), the task gives back + * its capability before calling the C function. This allows another + * task to pick up the capability and carry on running Haskell + * threads. It also means that if the C call blocks, it won't lock + * the whole system. + * + * The Haskell thread making the C call is put to sleep for the + * duration of the call, on the susepended_ccalling_threads queue. We + * give out a token to the task, which it can use to resume the thread + * on return from the C function. + * -------------------------------------------------------------------------- */ + +StgInt +suspendThread( Capability *cap ) +{ + nat tok; + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n", + pthread_self(), cap->rCurrentTSO->id)); +#else + IF_DEBUG(scheduler, + fprintf(stderr, "schedule: thread %d did a _ccall_gc\n", + cap->rCurrentTSO->id)); +#endif + + threadPaused(cap->rCurrentTSO); + cap->rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->rCurrentTSO; + + /* Use the thread ID as the token; it should be unique */ + tok = cap->rCurrentTSO->id; + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + + RELEASE_LOCK(&sched_mutex); + return tok; +} + +Capability * +resumeThread( StgInt tok ) +{ + StgTSO *tso, **prev; + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + + prev = &suspended_ccalling_threads; + for (tso = suspended_ccalling_threads; + tso != END_TSO_QUEUE; + prev = &tso->link, tso = tso->link) { + if (tso->id == (StgThreadID)tok) { + *prev = tso->link; + break; + } + } + if (tso == END_TSO_QUEUE) { + barf("resumeThread: thread not found"); + } + +#ifdef SMP + while (free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr,"schedule (task %ld): waiting to resume\n", + pthread_self())); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler,fprintf(stderr, + "schedule (task %ld): resuming thread %d\n", + pthread_self(), tso->id)); + } + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = tso; + + RELEASE_LOCK(&sched_mutex); + return cap; +} + /* ----------------------------------------------------------------------------- * Static functions * -------------------------------------------------------------------------- */ @@ -126,7 +651,16 @@ initThread(StgTSO *tso, nat stack_size) { SET_INFO(tso,&TSO_info); tso->whatNext = ThreadEnterGHC; - tso->id = next_thread_id++; + + /* tso->id needs to be unique. For now we use a heavyweight mutex to + protect the increment operation on next_thread_id. + In future, we could use an atomic increment instead. + */ + + ACQUIRE_LOCK(&sched_mutex); + tso->id = next_thread_id++; + RELEASE_LOCK(&sched_mutex); + tso->why_blocked = NotBlocked; tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS; @@ -144,328 +678,262 @@ initThread(StgTSO *tso, nat stack_size) SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN); tso->su = (StgUpdateFrame*)tso->sp; - IF_DEBUG(scheduler,belch("Initialised thread %ld, stack size = %lx words\n", + IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words", tso->id, tso->stack_size)); - /* Put the new thread on the head of the runnable queue. - * The caller of createThread better push an appropriate closure - * on this thread's stack before the scheduler is invoked. +} + + +/* ----------------------------------------------------------------------------- + * scheduleThread() + * + * scheduleThread puts a thread on the head of the runnable queue. + * This will usually be done immediately after a thread is created. + * The caller of scheduleThread must create the thread using e.g. + * createThread and push an appropriate closure + * on this thread's stack before the scheduler is invoked. + * -------------------------------------------------------------------------- */ + +void +scheduleThread(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); + + /* Put the new thread on the head of the runnable queue. The caller + * better push an appropriate closure on this thread's stack + * beforehand. In the SMP case, the thread may start running as + * soon as we release the scheduler lock below. */ - tso->link = run_queue_hd; - run_queue_hd = tso; - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_tl = tso; - } + PUSH_ON_RUN_QUEUE(tso); + THREAD_RUNNABLE(); IF_DEBUG(scheduler,printTSO(tso)); + RELEASE_LOCK(&sched_mutex); } + +/* ----------------------------------------------------------------------------- + * startTasks() + * + * Start up Posix threads to run each of the scheduler tasks. + * I believe the task ids are not needed in the system as defined. + * KH @ 25/10/99 + * -------------------------------------------------------------------------- */ + +#ifdef SMP +static void * +taskStart( void *arg STG_UNUSED ) +{ + schedule(); + return NULL; +} +#endif + /* ----------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. + * + * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * -------------------------------------------------------------------------- */ +#ifdef SMP +static void +term_handler(int sig STG_UNUSED) +{ + stat_workerStop(); + ACQUIRE_LOCK(&term_mutex); + await_death--; + RELEASE_LOCK(&term_mutex); + pthread_exit(NULL); +} +#endif + void initScheduler(void) { run_queue_hd = END_TSO_QUEUE; run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; - ccalling_threads = END_TSO_QUEUE; - next_main_thread = 0; + + suspended_ccalling_threads = END_TSO_QUEUE; + + main_threads = NULL; context_switch = 0; interrupted = 0; enteredCAFs = END_CAF_LIST; -} -/* ----------------------------------------------------------------------------- - Main scheduling loop. - - We use round-robin scheduling, each thread returning to the - scheduler loop when one of these conditions is detected: - - * stack overflow - * out of heap space - * timer expires (thread yields) - * thread blocks - * thread ends - -------------------------------------------------------------------------- */ - -SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) -{ - StgTSO *t; - StgThreadReturnCode ret; - StgTSO **MainTSO; - rtsBool in_ccall_gc; - - /* Return value is NULL by default, it is only filled in if the - * main thread completes successfully. - */ - if (ret_val) { *ret_val = NULL; } - - /* Save away a pointer to the main thread so that we can keep track - * of it should a garbage collection happen. We keep a stack of - * main threads in order to support scheduler re-entry. We can't - * use the normal TSO linkage for this stack, because the main TSO - * may need to be linked onto other queues. - */ - main_threads[next_main_thread] = main; - MainTSO = &main_threads[next_main_thread]; - next_main_thread++; - IF_DEBUG(scheduler, - fprintf(stderr, "Scheduler entered: nesting = %d\n", - next_main_thread);); + /* Install the SIGHUP handler */ +#ifdef SMP + { + struct sigaction action,oact; - /* Are we being re-entered? - */ - if (CurrentTSO != NULL) { - /* This happens when a _ccall_gc from Haskell ends up re-entering - * the scheduler. - * - * Block the current thread (put it on the ccalling_queue) and - * continue executing. The calling thread better have stashed - * away its state properly and left its stack with a proper stack - * frame on the top. - */ - threadPaused(CurrentTSO); - CurrentTSO->link = ccalling_threads; - ccalling_threads = CurrentTSO; - in_ccall_gc = rtsTrue; - IF_DEBUG(scheduler, - fprintf(stderr, "Re-entry, thread %d did a _ccall_gc\n", - CurrentTSO->id);); - } else { - in_ccall_gc = rtsFalse; + action.sa_handler = term_handler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + if (sigaction(SIGTERM, &action, &oact) != 0) { + barf("can't install TERM handler"); + } } +#endif - /* Take a thread from the run queue. - */ - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; +#ifdef SMP + /* Allocate N Capabilities */ + { + nat i; + Capability *cap, *prev; + cap = NULL; + prev = NULL; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); + cap->link = prev; + prev = cap; } + free_capabilities = cap; + n_free_capabilities = RtsFlags.ConcFlags.nNodes; } + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n", + n_free_capabilities);); +#endif +} - while (t != END_TSO_QUEUE) { - CurrentTSO = t; - - /* If we have more threads on the run queue, set up a context - * switch at some point in the future. - */ - if (run_queue_hd != END_TSO_QUEUE || blocked_queue_hd != END_TSO_QUEUE) { - context_switch = 1; - } else { - context_switch = 0; +#ifdef SMP +void +startTasks( void ) +{ + nat i; + int r; + pthread_t tid; + + /* make some space for saving all the thread ids */ + task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info), + "initScheduler:task_ids"); + + /* and create all the threads */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + r = pthread_create(&tid,NULL,taskStart,NULL); + if (r != 0) { + barf("startTasks: Can't create new Posix thread"); } - IF_DEBUG(scheduler, belch("Running thread %ld...\n", t->id)); + task_ids[i].id = tid; + task_ids[i].mut_time = 0.0; + task_ids[i].mut_etime = 0.0; + task_ids[i].gc_time = 0.0; + task_ids[i].gc_etime = 0.0; + task_ids[i].elapsedtimestart = elapsedtime(); + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Started task: %ld\n",tid);); + } +} +#endif - /* Be friendly to the storage manager: we're about to *run* this - * thread, so we better make sure the TSO is mutable. - */ - if (t->mut_link == NULL) { - recordMutable((StgMutClosure *)t); - } +void +exitScheduler( void ) +{ +#ifdef SMP + nat i; - /* Run the current thread */ - switch (t->whatNext) { - case ThreadKilled: - case ThreadComplete: - /* thread already killed. Drop it and carry on. */ - goto next_thread; - case ThreadEnterGHC: - ret = StgRun((StgFunPtr) stg_enterStackTop); - break; - case ThreadRunGHC: - ret = StgRun((StgFunPtr) stg_returnToStackTop); - break; - case ThreadEnterHugs: -#ifdef INTERPRETER - { - IF_DEBUG(scheduler,belch("entering Hugs")); - LoadThreadState(); - /* CHECK_SENSIBLE_REGS(); */ - { - StgClosure* c = (StgClosure *)Sp[0]; - Sp += 1; - ret = enter(c); - } - SaveThreadState(); - break; - } -#else - barf("Panic: entered a BCO but no bytecode interpreter in this build"); + /* Don't want to use pthread_cancel, since we'd have to install + * these silly exception handlers (pthread_cleanup_{push,pop}) around + * all our locks. + */ +#if 0 + /* Cancel all our tasks */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_cancel(task_ids[i].id); + } + + /* Wait for all the tasks to terminate */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n", + task_ids[i].id)); + pthread_join(task_ids[i].id, NULL); + } #endif - default: - barf("schedule: invalid whatNext field"); - } - /* We may have garbage collected while running the thread - * (eg. something nefarious like _ccall_GC_ performGC), and hence - * CurrentTSO may have moved. Update t to reflect this. - */ - t = CurrentTSO; - CurrentTSO = NULL; - - /* Costs for the scheduler are assigned to CCS_SYSTEM */ -#ifdef PROFILING - CCCS = CCS_SYSTEM; + /* Send 'em all a SIGHUP. That should shut 'em up. + */ + await_death = RtsFlags.ConcFlags.nNodes; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_kill(task_ids[i].id,SIGTERM); + } + while (await_death > 0) { + sched_yield(); + } #endif +} - switch (ret) { +/* ----------------------------------------------------------------------------- + Managing the per-task allocation areas. + + Each capability comes with an allocation area. These are + fixed-length block lists into which allocation can be done. - case HeapOverflow: - IF_DEBUG(scheduler,belch("Thread %ld stopped: HeapOverflow\n", t->id)); - threadPaused(t); - PUSH_ON_RUN_QUEUE(t); - GarbageCollect(GetRoots); - break; + ToDo: no support for two-space collection at the moment??? + -------------------------------------------------------------------------- */ - case StackOverflow: - IF_DEBUG(scheduler,belch("Thread %ld stopped, StackOverflow\n", t->id)); - { - nat i; - /* enlarge the stack */ - StgTSO *new_t = threadStackOverflow(t); - - /* This TSO has moved, so update any pointers to it from the - * main thread stack. It better not be on any other queues... - * (it shouldn't be) - */ - for (i = 0; i < next_main_thread; i++) { - if (main_threads[i] == t) { - main_threads[i] = new_t; - } - } - t = new_t; - } - PUSH_ON_RUN_QUEUE(t); - break; +/* ----------------------------------------------------------------------------- + * waitThread is the external interface for running a new computataion + * and waiting for the result. + * + * In the non-SMP case, we create a new main thread, push it on the + * main-thread stack, and invoke the scheduler to run it. The + * scheduler will return when the top main thread on the stack has + * completed or died, and fill in the necessary fields of the + * main_thread structure. + * + * In the SMP case, we create a main thread as before, but we then + * create a new condition variable and sleep on it. When our new + * main thread has completed, we'll be woken up and the status/result + * will be in the main_thread struct. + * -------------------------------------------------------------------------- */ - case ThreadYielding: - IF_DEBUG(scheduler, - if (t->whatNext == ThreadEnterHugs) { - /* ToDo: or maybe a timer expired when we were in Hugs? - * or maybe someone hit ctrl-C - */ - belch("Thread %ld stopped to switch to Hugs\n", t->id); - } else { - belch("Thread %ld stopped, timer expired\n", t->id); - } - ); - threadPaused(t); - if (interrupted) { - IF_DEBUG(scheduler,belch("Scheduler interrupted - returning")); - deleteThread(t); - while (run_queue_hd != END_TSO_QUEUE) { - run_queue_hd = t->link; - deleteThread(t); - } - run_queue_tl = END_TSO_QUEUE; - /* ToDo: should I do the same with blocked queues? */ - return Interrupted; - } +SchedulerStatus +waitThread(StgTSO *tso, /*out*/StgClosure **ret) +{ + StgMainThread *m; + SchedulerStatus stat; - /* Put the thread back on the run queue, at the end. - * t->link is already set to END_TSO_QUEUE. - */ - ASSERT(t->link == END_TSO_QUEUE); - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_hd = run_queue_tl = t; - } else { - ASSERT(get_itbl(run_queue_tl)->type == TSO); - if (run_queue_hd == run_queue_tl) { - run_queue_hd->link = t; - run_queue_tl = t; - } else { - run_queue_tl->link = t; - run_queue_tl = t; - } - } - break; + ACQUIRE_LOCK(&sched_mutex); + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - case ThreadBlocked: - IF_DEBUG(scheduler, - fprintf(stderr, "Thread %d stopped, ", t->id); - printThreadBlockage(t); - fprintf(stderr, "\n")); - threadPaused(t); - /* assume the thread has put itself on some blocked queue - * somewhere. - */ - break; + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#ifdef SMP + pthread_cond_init(&m->wakeup, NULL); +#endif - case ThreadFinished: - IF_DEBUG(scheduler,belch("Thread %ld finished\n", t->id)); - t->whatNext = ThreadComplete; - break; + m->link = main_threads; + main_threads = m; - default: - barf("schedule: invalid thread return code"); - } + IF_DEBUG(scheduler, fprintf(stderr, "schedule: new main thread (%d)\n", + m->tso->id)); - /* check for signals each time around the scheduler */ -#ifndef __MINGW32__ - if (signals_pending()) { - start_signal_handlers(); - } +#ifdef SMP + do { + pthread_cond_wait(&m->wakeup, &sched_mutex); + } while (m->stat == NoStatus); +#else + schedule(); + ASSERT(m->stat != NoStatus); #endif - /* If our main thread has finished or been killed, return. - * If we were re-entered as a result of a _ccall_gc, then - * pop the blocked thread off the ccalling_threads stack back - * into CurrentTSO. - */ - if ((*MainTSO)->whatNext == ThreadComplete - || (*MainTSO)->whatNext == ThreadKilled) { - next_main_thread--; - if (in_ccall_gc) { - CurrentTSO = ccalling_threads; - ccalling_threads = ccalling_threads->link; - /* remember to stub the link field of CurrentTSO */ - CurrentTSO->link = END_TSO_QUEUE; - } - if ((*MainTSO)->whatNext == ThreadComplete) { - /* we finished successfully, fill in the return value */ - if (ret_val) { *ret_val = (StgClosure *)(*MainTSO)->sp[0]; }; - return Success; - } else { - return Killed; - } - } - next_thread: - /* Checked whether any waiting threads need to be woken up. - * If the run queue is empty, we can wait indefinitely for - * something to happen. - */ - if (blocked_queue_hd != END_TSO_QUEUE) { - awaitEvent(run_queue_hd == END_TSO_QUEUE); - } + stat = m->stat; - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } - } +#ifdef SMP + pthread_cond_destroy(&m->wakeup); +#endif + free(m); - /* If we got to here, then we ran out of threads to run, but the - * main thread hasn't finished yet. It must be blocked on an MVar - * or a black hole somewhere, so we return deadlock. - */ - return Deadlock; + RELEASE_LOCK(&sched_mutex); + return stat; } - + /* ----------------------------------------------------------------------------- Debugging: why is a thread blocked -------------------------------------------------------------------------- */ @@ -506,9 +974,14 @@ void printThreadBlockage(StgTSO *tso) -------------------------------------------------------------------------- */ +/* This has to be protected either by the scheduler monitor, or by the + garbage collection monitor (probably the latter). + KH @ 25/10/99 +*/ + static void GetRoots(void) { - nat i; + StgMainThread *m; run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); @@ -516,11 +989,11 @@ static void GetRoots(void) blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); - ccalling_threads = (StgTSO *)MarkRoot((StgClosure *)ccalling_threads); - - for (i = 0; i < next_main_thread; i++) { - main_threads[i] = (StgTSO *)MarkRoot((StgClosure *)main_threads[i]); + for (m = main_threads; m != NULL; m = m->link) { + m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); } + suspended_ccalling_threads = + (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); } /* ----------------------------------------------------------------------------- @@ -532,6 +1005,8 @@ static void GetRoots(void) It might be useful to provide an interface whereby the programmer can specify more roots (ToDo). + + This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ void (*extra_roots)(void); @@ -598,7 +1073,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"schedule: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); @@ -636,9 +1111,13 @@ threadStackOverflow(StgTSO *tso) #if 0 IF_DEBUG(scheduler,printTSO(dest)); #endif + +#if 0 + /* This will no longer work: KH */ if (tso == MainTSO) { /* hack */ MainTSO = dest; } +#endif return dest; } @@ -646,7 +1125,8 @@ threadStackOverflow(StgTSO *tso) Wake up a queue that was blocked on some resource. -------------------------------------------------------------------------- */ -StgTSO *unblockOne(StgTSO *tso) +static StgTSO * +unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -654,17 +1134,34 @@ StgTSO *unblockOne(StgTSO *tso) ASSERT(tso->why_blocked != NotBlocked); tso->why_blocked = NotBlocked; next = tso->link; - tso->link = END_TSO_QUEUE; PUSH_ON_RUN_QUEUE(tso); - IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id)); + THREAD_RUNNABLE(); +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld", + pthread_self(), tso->id)); +#else + IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id)); +#endif return next; } -void awakenBlockedQueue(StgTSO *tso) +inline StgTSO * +unblockOne(StgTSO *tso) { + ACQUIRE_LOCK(&sched_mutex); + tso = unblockOneLocked(tso); + RELEASE_LOCK(&sched_mutex); + return tso; +} + +void +awakenBlockedQueue(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { - tso = unblockOne(tso); + tso = unblockOneLocked(tso); } + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -691,6 +1188,7 @@ unblockThread(StgTSO *tso) { StgTSO *t, **last; + ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -759,6 +1257,7 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -810,7 +1309,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) return; } - IF_DEBUG(scheduler, belch("Raising exception in thread %ld.", tso->id)); + IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id)); /* Remove it from any blocking queues */ unblockThread(tso); @@ -881,7 +1380,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, - fprintf(stderr, "Updating "); + fprintf(stderr, "schedule: Updating "); printPtr((P_)su->updatee); fprintf(stderr, " with "); printObj((StgClosure *)ap); @@ -891,7 +1390,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) * this will also wake up any threads currently * waiting on the result. */ - UPD_IND(su->updatee,ap); /* revert the black hole */ + UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */ su = su->link; sp += sizeofW(StgUpdateFrame) -1; sp[0] = (W_)ap; /* push onto stack */ @@ -917,7 +1416,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) o->payload[1] = cf->handler; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -943,7 +1442,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) payloadCPtr(o,0) = (StgClosure *)ap; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -969,3 +1468,4 @@ raiseAsync(StgTSO *tso, StgClosure *exception) } barf("raiseAsync"); } +