X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=e614ae77f37bff47d4116243dbf24cd378d5c81d;hb=bd2fb1c5eacc886737afd72cc889386e00ed5d23;hp=70df69675272937e55978bb45bbd2af920c2efbd;hpb=68146c5b63e599fdb27a9d4da37f9a29e45de504;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 70df696..e614ae7 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.23 1999/08/25 10:23:53 simonmar Exp $ + * $Id: Schedule.c,v 1.30 1999/11/08 15:30:39 sewardj Exp $ * * (c) The GHC Team, 1998-1999 * @@ -7,6 +7,26 @@ * * ---------------------------------------------------------------------------*/ +/* Version with scheduler monitor support for SMPs. + + This design provides a high-level API to create and schedule threads etc. + as documented in the SMP design document. + + It uses a monitor design controlled by a single mutex to exercise control + over accesses to shared data structures, and builds on the Posix threads + library. + + The majority of state is shared. In order to keep essential per-task state, + there is a Capability structure, which contains all the information + needed to run a thread: its STG registers, a pointer to its TSO, a + nursery etc. During STG execution, a pointer to the capability is + kept in a register (BaseReg). + + In a non-SMP build, there is one global capability, namely MainRegTable. + + SDM & KH, 10/99 +*/ + #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -25,24 +45,68 @@ #include "Signals.h" #include "Profiling.h" #include "Sanity.h" +#include "Stats.h" + +/* Main threads: + * + * These are the threads which clients have requested that we run. + * + * In an SMP build, we might have several concurrent clients all + * waiting for results, and each one will wait on a condition variable + * until the result is available. + * + * In non-SMP, clients are strictly nested: the first client calls + * into the RTS, which might call out again to C with a _ccall_GC, and + * eventually re-enter the RTS. + * + * Main threads information is kept in a linked list: + */ +typedef struct StgMainThread_ { + StgTSO * tso; + SchedulerStatus stat; + StgClosure ** ret; +#ifdef SMP + pthread_cond_t wakeup; +#endif + struct StgMainThread_ *link; +} StgMainThread; +/* Main thread queue. + * Locks required: sched_mutex. + */ +static StgMainThread *main_threads; + +/* Thread queues. + * Locks required: sched_mutex. + */ StgTSO *run_queue_hd, *run_queue_tl; StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *ccalling_threads; -#define MAX_SCHEDULE_NESTING 256 -nat next_main_thread; -StgTSO *main_threads[MAX_SCHEDULE_NESTING]; +/* Threads suspended in _ccall_GC. + * Locks required: sched_mutex. + */ +static StgTSO *suspended_ccalling_threads; + +#ifndef SMP +static rtsBool in_ccall_gc; +#endif static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); +/* KH: The following two flags are shared memory locations. There is no need + to lock them, since they are only unset at the end of a scheduler + operation. +*/ + /* flag set by signal handler to precipitate a context switch */ nat context_switch; /* if this flag is set as well, give up execution */ static nat interrupted; -/* Next thread ID to allocate */ +/* Next thread ID to allocate. + * Locks required: sched_mutex + */ StgThreadID next_thread_id = 1; /* @@ -50,14 +114,7 @@ StgThreadID next_thread_id = 1; * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell * thread. If CurrentTSO == NULL, then we're at the scheduler level. */ -StgTSO *CurrentTSO; -StgRegTable MainRegTable; - -/* - * The thread state for the main thread. - */ -StgTSO *MainTSO; - + /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) @@ -70,6 +127,442 @@ StgTSO *MainTSO; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +/* Free capability list. + * Locks required: sched_mutex. + */ +#ifdef SMP +Capability *free_capabilities; /* Available capabilities for running threads */ +nat n_free_capabilities; /* total number of available capabilities */ +#else +Capability MainRegTable; /* for non-SMP, we have one global capability */ +#endif + +rtsBool ready_to_gc; + +/* All our current task ids, saved in case we need to kill them later. + */ +#ifdef SMP +task_info *task_ids; +#endif + +void addToBlockedQueue ( StgTSO *tso ); + +static void schedule ( void ); +static void initThread ( StgTSO *tso, nat stack_size ); + void interruptStgRts ( void ); + +#ifdef SMP +pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER; + +nat await_death; +#endif + +/* ----------------------------------------------------------------------------- + Main scheduling loop. + + We use round-robin scheduling, each thread returning to the + scheduler loop when one of these conditions is detected: + + * out of heap space + * timer expires (thread yields) + * thread blocks + * thread ends + * stack overflow + + Locking notes: we acquire the scheduler lock once at the beginning + of the scheduler loop, and release it when + + * running a thread, or + * waiting for work, or + * waiting for a GC to complete. + + -------------------------------------------------------------------------- */ + +static void +schedule( void ) +{ + StgTSO *t; + Capability *cap; + StgThreadReturnCode ret; + + ACQUIRE_LOCK(&sched_mutex); + + while (1) { + + /* Check whether any waiting threads need to be woken up. + * If the run queue is empty, we can wait indefinitely for + * something to happen. + */ + if (blocked_queue_hd != END_TSO_QUEUE) { + awaitEvent(run_queue_hd == END_TSO_QUEUE); + } + + /* check for signals each time around the scheduler */ +#ifndef __MINGW32__ + if (signals_pending()) { + start_signal_handlers(); + } +#endif + +#ifdef SMP + /* If there's a GC pending, don't do anything until it has + * completed. + */ + if (ready_to_gc) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n", + pthread_self());); + pthread_cond_wait(&gc_pending_cond, &sched_mutex); + } + + /* block until we've got a thread on the run queue and a free + * capability. + */ + while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): waiting for work\n", + pthread_self());); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): work now available\n", + pthread_self());); + } +#endif + + /* grab a thread from the run queue + */ + t = POP_RUN_QUEUE(); + + /* grab a capability + */ +#ifdef SMP + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = t; + + /* set the context_switch flag + */ + if (run_queue_hd == END_TSO_QUEUE) + context_switch = 0; + else + context_switch = 1; + + RELEASE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id)); +#else + IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id)); +#endif + + /* Run the current thread + */ + switch (cap->rCurrentTSO->whatNext) { + case ThreadKilled: + case ThreadComplete: + /* Thread already finished, return to scheduler. */ + ret = ThreadFinished; + break; + case ThreadEnterGHC: + ret = StgRun((StgFunPtr) stg_enterStackTop, cap); + break; + case ThreadRunGHC: + ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); + break; + case ThreadEnterHugs: +#ifdef INTERPRETER + { + StgClosure* c; + IF_DEBUG(scheduler,belch("schedule: entering Hugs")); + c = (StgClosure *)(cap->rCurrentTSO->sp[0]); + cap->rCurrentTSO->sp += 1; + ret = enter(cap,c); + break; + } +#else + barf("Panic: entered a BCO but no bytecode interpreter in this build"); +#endif + default: + barf("schedule: invalid whatNext field"); + } + + /* Costs for the scheduler are assigned to CCS_SYSTEM */ +#ifdef PROFILING + CCCS = CCS_SYSTEM; +#endif + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): ", pthread_self());); +#else + IF_DEBUG(scheduler,fprintf(stderr,"schedule: ");); +#endif + t = cap->rCurrentTSO; + + switch (ret) { + case HeapOverflow: + /* make all the running tasks block on a condition variable, + * maybe set context_switch and wait till they all pile in, + * then have them wait on a GC condition variable. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id)); + threadPaused(t); + + ready_to_gc = rtsTrue; + context_switch = 1; /* stop other threads ASAP */ + PUSH_ON_RUN_QUEUE(t); + break; + + case StackOverflow: + /* just adjust the stack for this thread, then pop it back + * on the run queue. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id)); + threadPaused(t); + { + StgMainThread *m; + /* enlarge the stack */ + StgTSO *new_t = threadStackOverflow(t); + + /* This TSO has moved, so update any pointers to it from the + * main thread stack. It better not be on any other queues... + * (it shouldn't be) + */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso == t) { + m->tso = new_t; + } + } + PUSH_ON_RUN_QUEUE(new_t); + } + break; + + case ThreadYielding: + /* put the thread back on the run queue. Then, if we're ready to + * GC, check whether this is the last task to stop. If so, wake + * up the GC thread. getThread will block during a GC until the + * GC is finished. + */ + IF_DEBUG(scheduler, + if (t->whatNext == ThreadEnterHugs) { + /* ToDo: or maybe a timer expired when we were in Hugs? + * or maybe someone hit ctrl-C + */ + belch("thread %ld stopped to switch to Hugs", t->id); + } else { + belch("thread %ld stopped, yielding", t->id); + } + ); + threadPaused(t); + APPEND_TO_RUN_QUEUE(t); + break; + + case ThreadBlocked: + /* don't need to do anything. Either the thread is blocked on + * I/O, in which case we'll have called addToBlockedQueue + * previously, or it's blocked on an MVar or Blackhole, in which + * case it'll be on the relevant queue already. + */ + IF_DEBUG(scheduler, + fprintf(stderr, "thread %d stopped, ", t->id); + printThreadBlockage(t); + fprintf(stderr, "\n")); + threadPaused(t); + break; + + case ThreadFinished: + /* Need to check whether this was a main thread, and if so, signal + * the task that started it with the return value. If we have no + * more main threads, we probably need to stop all the tasks until + * we get a new one. + */ + IF_DEBUG(scheduler,belch("thread %ld finished", t->id)); + t->whatNext = ThreadComplete; + break; + + default: + barf("doneThread: invalid thread return code"); + } + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + +#ifdef SMP + if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) { +#else + if (ready_to_gc) { +#endif + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self())); +#endif + GarbageCollect(GetRoots); + ready_to_gc = rtsFalse; +#ifdef SMP + pthread_cond_broadcast(&gc_pending_cond); +#endif + } + + /* Go through the list of main threads and wake up any + * clients whose computations have finished. ToDo: this + * should be done more efficiently without a linear scan + * of the main threads list, somehow... + */ +#ifdef SMP + { + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->whatNext == ThreadComplete) { + if (m->ret) { + *(m->ret) = (StgClosure *)m->tso->sp[0]; + } + *prev = m->link; + m->stat = Success; + pthread_cond_broadcast(&m->wakeup); + } + if (m->tso->whatNext == ThreadKilled) { + *prev = m->link; + m->stat = Killed; + pthread_cond_broadcast(&m->wakeup); + } + } + } +#else + /* If our main thread has finished or been killed, return. + * If we were re-entered as a result of a _ccall_gc, then + * pop the blocked thread off the ccalling_threads stack back + * into CurrentTSO. + */ + { + StgMainThread *m = main_threads; + if (m->tso->whatNext == ThreadComplete + || m->tso->whatNext == ThreadKilled) { + main_threads = main_threads->link; + if (m->tso->whatNext == ThreadComplete) { + /* we finished successfully, fill in the return value */ + if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; }; + m->stat = Success; + return; + } else { + m->stat = Killed; + return; + } + } + } +#endif + + } /* end of while(1) */ +} + +/* ----------------------------------------------------------------------------- + * Suspending & resuming Haskell threads. + * + * When making a "safe" call to C (aka _ccall_GC), the task gives back + * its capability before calling the C function. This allows another + * task to pick up the capability and carry on running Haskell + * threads. It also means that if the C call blocks, it won't lock + * the whole system. + * + * The Haskell thread making the C call is put to sleep for the + * duration of the call, on the susepended_ccalling_threads queue. We + * give out a token to the task, which it can use to resume the thread + * on return from the C function. + * -------------------------------------------------------------------------- */ + +StgInt +suspendThread( Capability *cap ) +{ + nat tok; + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n", + pthread_self(), cap->rCurrentTSO->id)); +#else + IF_DEBUG(scheduler, + fprintf(stderr, "schedule: thread %d did a _ccall_gc\n", + cap->rCurrentTSO->id)); +#endif + + threadPaused(cap->rCurrentTSO); + cap->rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->rCurrentTSO; + + /* Use the thread ID as the token; it should be unique */ + tok = cap->rCurrentTSO->id; + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + + RELEASE_LOCK(&sched_mutex); + return tok; +} + +Capability * +resumeThread( StgInt tok ) +{ + StgTSO *tso, **prev; + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + + prev = &suspended_ccalling_threads; + for (tso = suspended_ccalling_threads; + tso != END_TSO_QUEUE; + prev = &tso->link, tso = tso->link) { + if (tso->id == (StgThreadID)tok) { + *prev = tso->link; + break; + } + } + if (tso == END_TSO_QUEUE) { + barf("resumeThread: thread not found"); + } + +#ifdef SMP + while (free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr,"schedule (task %ld): waiting to resume\n", + pthread_self())); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler,fprintf(stderr, + "schedule (task %ld): resuming thread %d\n", + pthread_self(), tso->id)); + } + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = tso; + + RELEASE_LOCK(&sched_mutex); + return cap; +} + /* ----------------------------------------------------------------------------- * Static functions * -------------------------------------------------------------------------- */ @@ -126,8 +619,17 @@ initThread(StgTSO *tso, nat stack_size) { SET_INFO(tso,&TSO_info); tso->whatNext = ThreadEnterGHC; - tso->id = next_thread_id++; - tso->blocked_on = NULL; + + /* tso->id needs to be unique. For now we use a heavyweight mutex to + protect the increment operation on next_thread_id. + In future, we could use an atomic increment instead. + */ + + ACQUIRE_LOCK(&sched_mutex); + tso->id = next_thread_id++; + RELEASE_LOCK(&sched_mutex); + + tso->why_blocked = NotBlocked; tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS; tso->stack_size = stack_size; @@ -144,58 +646,264 @@ initThread(StgTSO *tso, nat stack_size) SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN); tso->su = (StgUpdateFrame*)tso->sp; - IF_DEBUG(scheduler,belch("Initialised thread %ld, stack size = %lx words\n", + IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words", tso->id, tso->stack_size)); - /* Put the new thread on the head of the runnable queue. - * The caller of createThread better push an appropriate closure - * on this thread's stack before the scheduler is invoked. +} + + +/* ----------------------------------------------------------------------------- + * scheduleThread() + * + * scheduleThread puts a thread on the head of the runnable queue. + * This will usually be done immediately after a thread is created. + * The caller of scheduleThread must create the thread using e.g. + * createThread and push an appropriate closure + * on this thread's stack before the scheduler is invoked. + * -------------------------------------------------------------------------- */ + +void +scheduleThread(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); + + /* Put the new thread on the head of the runnable queue. The caller + * better push an appropriate closure on this thread's stack + * beforehand. In the SMP case, the thread may start running as + * soon as we release the scheduler lock below. */ - tso->link = run_queue_hd; - run_queue_hd = tso; - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_tl = tso; - } + PUSH_ON_RUN_QUEUE(tso); + THREAD_RUNNABLE(); IF_DEBUG(scheduler,printTSO(tso)); + RELEASE_LOCK(&sched_mutex); } + +/* ----------------------------------------------------------------------------- + * startTasks() + * + * Start up Posix threads to run each of the scheduler tasks. + * I believe the task ids are not needed in the system as defined. + * KH @ 25/10/99 + * -------------------------------------------------------------------------- */ + +#ifdef SMP +static void * +taskStart( void *arg STG_UNUSED ) +{ + schedule(); + return NULL; +} +#endif + /* ----------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. + * + * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * -------------------------------------------------------------------------- */ +#ifdef SMP +static void +term_handler(int sig STG_UNUSED) +{ + nat i; + pthread_t me = pthread_self(); + + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + if (task_ids[i].id == me) { + task_ids[i].mut_time = usertime() - task_ids[i].gc_time; + if (task_ids[i].mut_time < 0.0) { + task_ids[i].mut_time = 0.0; + } + } + } + ACQUIRE_LOCK(&term_mutex); + await_death--; + RELEASE_LOCK(&term_mutex); + pthread_exit(NULL); +} +#endif + void initScheduler(void) { run_queue_hd = END_TSO_QUEUE; run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; - ccalling_threads = END_TSO_QUEUE; - next_main_thread = 0; + + suspended_ccalling_threads = END_TSO_QUEUE; + + main_threads = NULL; context_switch = 0; interrupted = 0; enteredCAFs = END_CAF_LIST; + + /* Install the SIGHUP handler */ +#ifdef SMP + { + struct sigaction action,oact; + + action.sa_handler = term_handler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + if (sigaction(SIGTERM, &action, &oact) != 0) { + barf("can't install TERM handler"); + } + } +#endif + +#ifdef SMP + /* Allocate N Capabilities */ + { + nat i; + Capability *cap, *prev; + cap = NULL; + prev = NULL; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); + cap->link = prev; + prev = cap; + } + free_capabilities = cap; + n_free_capabilities = RtsFlags.ConcFlags.nNodes; + } + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n", + n_free_capabilities);); +#endif } -/* ----------------------------------------------------------------------------- - Main scheduling loop. +#ifdef SMP +void +startTasks( void ) +{ + nat i; + int r; + pthread_t tid; + + /* make some space for saving all the thread ids */ + task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info), + "initScheduler:task_ids"); + + /* and create all the threads */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + r = pthread_create(&tid,NULL,taskStart,NULL); + if (r != 0) { + barf("startTasks: Can't create new Posix thread"); + } + task_ids[i].id = tid; + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Started task: %ld\n",tid);); + } +} +#endif - We use round-robin scheduling, each thread returning to the - scheduler loop when one of these conditions is detected: +void +exitScheduler( void ) +{ +#ifdef SMP + nat i; - * stack overflow - * out of heap space - * timer expires (thread yields) - * thread blocks - * thread ends + /* Don't want to use pthread_cancel, since we'd have to install + * these silly exception handlers (pthread_cleanup_{push,pop}) around + * all our locks. + */ +#if 0 + /* Cancel all our tasks */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_cancel(task_ids[i].id); + } + + /* Wait for all the tasks to terminate */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n", + task_ids[i].id)); + pthread_join(task_ids[i].id, NULL); + } +#endif + + /* Send 'em all a SIGHUP. That should shut 'em up. + */ + await_death = RtsFlags.ConcFlags.nNodes; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_kill(task_ids[i].id,SIGTERM); + } + while (await_death > 0) { + sched_yield(); + } +#endif +} + +/* ----------------------------------------------------------------------------- + Managing the per-task allocation areas. + + Each capability comes with an allocation area. These are + fixed-length block lists into which allocation can be done. + + ToDo: no support for two-space collection at the moment??? -------------------------------------------------------------------------- */ +/* ----------------------------------------------------------------------------- + * waitThread is the external interface for running a new computataion + * and waiting for the result. + * + * In the non-SMP case, we create a new main thread, push it on the + * main-thread stack, and invoke the scheduler to run it. The + * scheduler will return when the top main thread on the stack has + * completed or died, and fill in the necessary fields of the + * main_thread structure. + * + * In the SMP case, we create a main thread as before, but we then + * create a new condition variable and sleep on it. When our new + * main thread has completed, we'll be woken up and the status/result + * will be in the main_thread struct. + * -------------------------------------------------------------------------- */ + +SchedulerStatus +waitThread(StgTSO *tso, /*out*/StgClosure **ret) +{ + StgMainThread *m; + SchedulerStatus stat; + + ACQUIRE_LOCK(&sched_mutex); + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#ifdef SMP + pthread_cond_init(&m->wakeup, NULL); +#endif + + m->link = main_threads; + main_threads = m; + +#ifdef SMP + pthread_cond_wait(&m->wakeup, &sched_mutex); +#else + schedule(); +#endif + + stat = m->stat; + ASSERT(stat != NoStatus); + +#ifdef SMP + pthread_cond_destroy(&m->wakeup); +#endif + free(m); + + RELEASE_LOCK(&sched_mutex); + return stat; +} + + +#if 0 SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) { StgTSO *t; @@ -245,14 +953,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) /* Take a thread from the run queue. */ - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } + t = POP_RUN_QUEUE(); while (t != END_TSO_QUEUE) { CurrentTSO = t; @@ -260,7 +961,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) /* If we have more threads on the run queue, set up a context * switch at some point in the future. */ - if (run_queue_hd != END_TSO_QUEUE) { + if (run_queue_hd != END_TSO_QUEUE || blocked_queue_hd != END_TSO_QUEUE) { context_switch = 1; } else { context_switch = 0; @@ -376,23 +1077,14 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) /* Put the thread back on the run queue, at the end. * t->link is already set to END_TSO_QUEUE. */ - ASSERT(t->link == END_TSO_QUEUE); - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_hd = run_queue_tl = t; - } else { - ASSERT(get_itbl(run_queue_tl)->type == TSO); - if (run_queue_hd == run_queue_tl) { - run_queue_hd->link = t; - run_queue_tl = t; - } else { - run_queue_tl->link = t; - run_queue_tl = t; - } - } + APPEND_TO_RUN_QUEUE(t); break; case ThreadBlocked: - IF_DEBUG(scheduler,belch("Thread %ld stopped, blocking\n", t->id)); + IF_DEBUG(scheduler, + fprintf(stderr, "Thread %d stopped, ", t->id); + printThreadBlockage(t); + fprintf(stderr, "\n")); threadPaused(t); /* assume the thread has put itself on some blocked queue * somewhere. @@ -400,7 +1092,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) break; case ThreadFinished: - IF_DEBUG(scheduler,belch("Thread %ld finished\n", t->id)); + IF_DEBUG(scheduler,fprintf(stderr,"thread %ld finished\n", t->id)); t->whatNext = ThreadComplete; break; @@ -438,22 +1130,54 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) } next_thread: - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } + /* Checked whether any waiting threads need to be woken up. + * If the run queue is empty, we can wait indefinitely for + * something to happen. + */ + if (blocked_queue_hd != END_TSO_QUEUE) { + awaitEvent(run_queue_hd == END_TSO_QUEUE); } + + t = POP_RUN_QUEUE(); } - if (blocked_queue_hd != END_TSO_QUEUE) { - return AllBlocked; - } else { - return Deadlock; + /* If we got to here, then we ran out of threads to run, but the + * main thread hasn't finished yet. It must be blocked on an MVar + * or a black hole somewhere, so we return deadlock. + */ + return Deadlock; +} +#endif + +/* ----------------------------------------------------------------------------- + Debugging: why is a thread blocked + -------------------------------------------------------------------------- */ + +#ifdef DEBUG +void printThreadBlockage(StgTSO *tso) +{ + switch (tso->why_blocked) { + case BlockedOnRead: + fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd); + break; + case BlockedOnWrite: + fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd); + break; + case BlockedOnDelay: + fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay); + break; + case BlockedOnMVar: + fprintf(stderr,"blocked on an MVar"); + break; + case BlockedOnBlackHole: + fprintf(stderr,"blocked on a black hole"); + break; + case NotBlocked: + fprintf(stderr,"not blocked"); + break; } } +#endif /* ----------------------------------------------------------------------------- Where are the roots that we know about? @@ -465,9 +1189,14 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) -------------------------------------------------------------------------- */ +/* This has to be protected either by the scheduler monitor, or by the + garbage collection monitor (probably the latter). + KH @ 25/10/99 +*/ + static void GetRoots(void) { - nat i; + StgMainThread *m; run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); @@ -475,11 +1204,11 @@ static void GetRoots(void) blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); - ccalling_threads = (StgTSO *)MarkRoot((StgClosure *)ccalling_threads); - - for (i = 0; i < next_main_thread; i++) { - main_threads[i] = (StgTSO *)MarkRoot((StgClosure *)main_threads[i]); + for (m = main_threads; m != NULL; m = m->link) { + m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); } + suspended_ccalling_threads = + (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); } /* ----------------------------------------------------------------------------- @@ -491,6 +1220,8 @@ static void GetRoots(void) It might be useful to provide an interface whereby the programmer can specify more roots (ToDo). + + This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ void (*extra_roots)(void); @@ -557,7 +1288,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"schedule: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); @@ -588,36 +1319,64 @@ threadStackOverflow(StgTSO *tso) tso->whatNext = ThreadKilled; tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->su = (StgUpdateFrame *)tso->sp; - tso->blocked_on = NULL; + tso->why_blocked = NotBlocked; dest->mut_link = NULL; IF_DEBUG(sanity,checkTSO(tso)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); #endif + +#if 0 + /* This will no longer work: KH */ if (tso == MainTSO) { /* hack */ MainTSO = dest; } +#endif return dest; } /* ----------------------------------------------------------------------------- - Wake up a queue that was blocked on some resource (usually a - computation in progress). + Wake up a queue that was blocked on some resource. -------------------------------------------------------------------------- */ -void awaken_blocked_queue(StgTSO *q) +static StgTSO * +unblockOneLocked(StgTSO *tso) { - StgTSO *tso; + StgTSO *next; + + ASSERT(get_itbl(tso)->type == TSO); + ASSERT(tso->why_blocked != NotBlocked); + tso->why_blocked = NotBlocked; + next = tso->link; + PUSH_ON_RUN_QUEUE(tso); + THREAD_RUNNABLE(); +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld", + pthread_self(), tso->id)); +#else + IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id)); +#endif + return next; +} + +inline StgTSO * +unblockOne(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); + tso = unblockOneLocked(tso); + RELEASE_LOCK(&sched_mutex); + return tso; +} - while (q != END_TSO_QUEUE) { - ASSERT(get_itbl(q)->type == TSO); - tso = q; - q = tso->link; - PUSH_ON_RUN_QUEUE(tso); - tso->blocked_on = NULL; - IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id)); +void +awakenBlockedQueue(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); + while (tso != END_TSO_QUEUE) { + tso = unblockOneLocked(tso); } + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -644,16 +1403,17 @@ unblockThread(StgTSO *tso) { StgTSO *t, **last; - if (tso->blocked_on == NULL) { - return; /* not blocked */ - } + ACQUIRE_LOCK(&sched_mutex); + switch (tso->why_blocked) { - switch (get_itbl(tso->blocked_on)->type) { + case NotBlocked: + return; /* not blocked */ - case MVAR: + case BlockedOnMVar: + ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { StgTSO *last_tso = END_TSO_QUEUE; - StgMVar *mvar = (StgMVar *)(tso->blocked_on); + StgMVar *mvar = (StgMVar *)(tso->block_info.closure); last = &mvar->head; for (t = mvar->head; t != END_TSO_QUEUE; @@ -669,9 +1429,10 @@ unblockThread(StgTSO *tso) barf("unblockThread (MVAR): TSO not found"); } - case BLACKHOLE_BQ: + case BlockedOnBlackHole: + ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); { - StgBlockingQueue *bq = (StgBlockingQueue *)(tso->blocked_on); + StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); last = &bq->blocking_queue; for (t = bq->blocking_queue; t != END_TSO_QUEUE; @@ -684,14 +1445,34 @@ unblockThread(StgTSO *tso) barf("unblockThread (BLACKHOLE): TSO not found"); } + case BlockedOnDelay: + case BlockedOnRead: + case BlockedOnWrite: + { + last = &blocked_queue_hd; + for (t = blocked_queue_hd; t != END_TSO_QUEUE; + last = &t->link, t = t->link) { + if (t == tso) { + *last = tso->link; + if (blocked_queue_tl == t) { + blocked_queue_tl = tso->link; + } + goto done; + } + } + barf("unblockThread (I/O): TSO not found"); + } + default: barf("unblockThread"); } done: tso->link = END_TSO_QUEUE; - tso->blocked_on = NULL; + tso->why_blocked = NotBlocked; + tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -743,7 +1524,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) return; } - IF_DEBUG(scheduler, belch("Raising exception in thread %ld.", tso->id)); + IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id)); /* Remove it from any blocking queues */ unblockThread(tso); @@ -814,7 +1595,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, - fprintf(stderr, "Updating "); + fprintf(stderr, "schedule: Updating "); printPtr((P_)su->updatee); fprintf(stderr, " with "); printObj((StgClosure *)ap); @@ -850,7 +1631,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) o->payload[1] = cf->handler; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -876,7 +1657,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) payloadCPtr(o,0) = (StgClosure *)ap; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -902,3 +1683,4 @@ raiseAsync(StgTSO *tso, StgClosure *exception) } barf("raiseAsync"); } +