From: simonmar Date: Wed, 6 Apr 2005 15:27:06 +0000 (+0000) Subject: [project @ 2005-04-06 15:27:06 by simonmar] X-Git-Tag: Initial_conversion_from_CVS_complete~789 X-Git-Url: http://git.megacz.com/?a=commitdiff_plain;h=9a92cb1ca49cb555ff66dcfcb9295ebf75d1ce01;p=ghc-hetmet.git [project @ 2005-04-06 15:27:06 by simonmar] Revamp the Task API: now we use the same implementation for threaded and SMP. We also keep per-task timing stats in the threaded RTS now, which makes the output of +RTS -sstderr more useful. --- diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 8a93dc9..bdb651d 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -207,6 +207,10 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) #if !defined(SMP) ASSERT(rts_n_free_capabilities == 0); #endif +#if defined(SMP) + cap->link = free_capabilities; + free_capabilities = cap; +#endif // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. if (rts_n_waiting_workers > 0) { @@ -214,12 +218,6 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) // thread that is yielding its capability will repeatedly // signal returning_worker_cond. -#if defined(SMP) - // SMP variant untested - cap->link = free_capabilities; - free_capabilities = cap; -#endif - rts_n_waiting_workers--; signalCondition(&returning_worker_cond); IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); @@ -230,13 +228,15 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) } else { signalCondition(passTarget); } +#if defined(SMP) + rts_n_free_capabilities++; +#else rts_n_free_capabilities = 1; +#endif IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it")); } else { #if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; rts_n_free_capabilities++; #else rts_n_free_capabilities = 1; @@ -433,7 +433,6 @@ passCapabilityToWorker( void ) ToDo: should check whether the thread at the front of the queue is bound, and if so wake up the appropriate worker. -------------------------------------------------------------------------- */ - void threadRunnable ( void ) { @@ -444,3 +443,17 @@ threadRunnable ( void ) startSchedulerTaskIfNecessary(); #endif } + + +/* ---------------------------------------------------------------------------- + prodWorker() + + Wake up... time to die. + -------------------------------------------------------------------------- */ +void +prodWorker ( void ) +{ +#if defined(RTS_SUPPORTS_THREADS) + signalCondition(&thread_ready_cond); +#endif +} diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index c575335..963aa85 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -35,6 +35,8 @@ extern void releaseCapability( Capability* cap ); // extern void threadRunnable ( void ); +extern void prodWorker ( void ); + #ifdef RTS_SUPPORTS_THREADS // Gives up the current capability IFF there is a higher-priority // thread waiting for it. This happens in one of two ways: diff --git a/ghc/rts/Main.c b/ghc/rts/Main.c index 7721438..c6d4170 100644 --- a/ghc/rts/Main.c +++ b/ghc/rts/Main.c @@ -16,6 +16,7 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "Prelude.h" +#include "Task.h" #include #ifdef DEBUG @@ -50,6 +51,11 @@ int main(int argc, char *argv[]) startupHaskell(argc,argv,__stginit_ZCMain); + /* Register this thread as a task, so we can get timing stats about it */ +#if defined(RTS_SUPPORTS_THREADS) + threadIsTask(osThreadId()); +#endif + /* kick off the computation by creating the main thread with a pointer to mainIO_closure representing the computation of the overall program; then enter the scheduler with this thread and off we go; diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 6e363a6..6cac6c1 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -273,7 +273,6 @@ static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // scheduler clearer. // static void schedulePreLoop(void); -static void scheduleHandleInterrupt(void); static void scheduleStartSignalHandlers(void); static void scheduleCheckBlockedThreads(void); static void scheduleCheckBlackHoles(void); @@ -333,26 +332,25 @@ taskStart(void) ACQUIRE_LOCK(&sched_mutex); startingWorkerThread = rtsFalse; schedule(NULL,NULL); + taskStop(); RELEASE_LOCK(&sched_mutex); } void startSchedulerTaskIfNecessary(void) { - if(run_queue_hd != END_TSO_QUEUE - || blocked_queue_hd != END_TSO_QUEUE - || sleeping_queue != END_TSO_QUEUE) - { - if(!startingWorkerThread) - { // we don't want to start another worker thread - // just because the last one hasn't yet reached the - // "waiting for capability" state - startingWorkerThread = rtsTrue; - if (!startTask(taskStart)) { - startingWorkerThread = rtsFalse; - } + if ( !EMPTY_RUN_QUEUE() + && !shutting_down_scheduler // not if we're shutting down + && !startingWorkerThread ) + { + // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + if (!maybeStartNewWorker(taskStart)) { + startingWorkerThread = rtsFalse; + } } - } } #endif @@ -508,7 +506,26 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, stg_exit(1); } - scheduleHandleInterrupt(); + // + // Test for interruption. If interrupted==rtsTrue, then either + // we received a keyboard interrupt (^C), or the scheduler is + // trying to shut down all the tasks (shutting_down_scheduler) in + // the threaded RTS. + // + if (interrupted) { + if (shutting_down_scheduler) { + IF_DEBUG(scheduler, sched_belch("shutting down")); + releaseCapability(cap); + if (mainThread) { + mainThread->stat = Interrupted; + mainThread->ret = NULL; + } + return; + } else { + IF_DEBUG(scheduler, sched_belch("interrupted")); + deleteAllThreads(); + } + } #if defined(not_yet) && defined(SMP) // @@ -792,33 +809,6 @@ schedulePreLoop(void) } /* ---------------------------------------------------------------------------- - * Deal with the interrupt flag - * ASSUMES: sched_mutex - * ------------------------------------------------------------------------- */ - -static -void scheduleHandleInterrupt(void) -{ - // - // Test for interruption. If interrupted==rtsTrue, then either - // we received a keyboard interrupt (^C), or the scheduler is - // trying to shut down all the tasks (shutting_down_scheduler) in - // the threaded RTS. - // - if (interrupted) { - if (shutting_down_scheduler) { - IF_DEBUG(scheduler, sched_belch("shutting down")); -#if defined(RTS_SUPPORTS_THREADS) - shutdownThread(); -#endif - } else { - IF_DEBUG(scheduler, sched_belch("interrupted")); - deleteAllThreads(); - } - } -} - -/* ---------------------------------------------------------------------------- * Start any pending signal handlers * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ @@ -1476,7 +1466,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", + debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); // don't do this if it would push us over the @@ -2620,8 +2610,12 @@ initScheduler(void) initCapabilities(); #if defined(RTS_SUPPORTS_THREADS) - /* start our haskell execution tasks */ - startTaskManager(0,taskStart); + initTaskManager(); +#endif + +#if defined(SMP) + /* eagerly start some extra workers */ + startTasks(RtsFlags.ParFlags.nNodes, taskStart); #endif #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL) @@ -2634,11 +2628,12 @@ initScheduler(void) void exitScheduler( void ) { + interrupted = rtsTrue; + shutting_down_scheduler = rtsTrue; #if defined(RTS_SUPPORTS_THREADS) - stopTaskManager(); + if (threadIsTask(osThreadId())) { taskStop(); } + stopTaskManager(); #endif - interrupted = rtsTrue; - shutting_down_scheduler = rtsTrue; } /* ---------------------------------------------------------------------------- diff --git a/ghc/rts/Stats.c b/ghc/rts/Stats.c index 7c14590..efa3e31 100644 --- a/ghc/rts/Stats.c +++ b/ghc/rts/Stats.c @@ -363,13 +363,13 @@ stat_startExit(void) PROF_VAL(RPe_tot_time + HCe_tot_time) - InitElapsedStamp; if (MutElapsedTime < 0) { MutElapsedTime = 0; } /* sometimes -0.00 */ - /* for SMP, we don't know the mutator time yet, we have to inspect + /* for threads, we don't know the mutator time yet, we have to inspect * all the running threads to find out, and they haven't stopped * yet. So we just timestamp MutUserTime at this point so we can * calculate the EXIT time. The real MutUserTime is calculated * in stat_exit below. */ -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) MutUserTime = CurrentUserTime; #else MutUserTime = CurrentUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime; @@ -381,7 +381,7 @@ void stat_endExit(void) { getTimes(); -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) ExitUserTime = CurrentUserTime - MutUserTime; #else ExitUserTime = CurrentUserTime - MutUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime; @@ -478,17 +478,13 @@ stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat gen) GC_tot_time += gc_time; GCe_tot_time += gc_etime; -#if defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) { - nat i; - pthread_t me = pthread_self(); - - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - if (me == taskTable[i].id) { - taskTable[i].gc_time += gc_time; - taskTable[i].gc_etime += gc_etime; - break; - } + TaskInfo *task_info = taskOfId(osThreadId()); + + if (task_info != NULL) { + task_info->gc_time += gc_time; + task_info->gc_etime += gc_etime; } } #endif @@ -582,35 +578,16 @@ stat_endHeapCensus(void) stats for this thread into the taskTable struct for that thread. -------------------------------------------------------------------------- */ -#if defined(SMP) void -stat_workerStop(void) -{ - nat i; - pthread_t me = pthread_self(); - - getTimes(); - - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - if (taskTable[i].id == me) { - taskTable[i].mut_time = CurrentUserTime - taskTable[i].gc_time; - taskTable[i].mut_etime = CurrentElapsedTime - - GCe_tot_time - - taskTable[i].elapsedtimestart; - if (taskTable[i].mut_time < 0.0) { taskTable[i].mut_time = 0.0; } - if (taskTable[i].mut_etime < 0.0) { taskTable[i].mut_etime = 0.0; } - } - } -} -#endif - -#if defined(SMP) -long int stat_getElapsedTime () +stat_getTimes ( long *currentElapsedTime, + long *currentUserTime, + long *elapsedGCTime ) { getTimes(); - return CurrentElapsedTime; + *currentElapsedTime = CurrentElapsedTime; + *currentUserTime = CurrentUserTime; + *elapsedGCTime = GCe_tot_time; } -#endif /* ----------------------------------------------------------------------------- Called at the end of execution @@ -631,15 +608,10 @@ stat_exit(int alloc) nat g, total_collections = 0; getTimes(); - time = CurrentUserTime; etime = CurrentElapsedTime - ElapsedTimeStart; GC_tot_alloc += alloc; - /* avoid divide by zero if time is measured as 0.00 seconds -- SDM */ - if (time == 0.0) time = 1; - if (etime == 0.0) etime = 1; - /* Count total garbage collections */ for (g = 0; g < RtsFlags.GcFlags.generations; g++) total_collections += generations[g].collections; @@ -647,17 +619,24 @@ stat_exit(int alloc) /* For SMP, we have to get the user time from each thread * and try to work out the total time. */ -#ifdef SMP - { nat i; +#if defined(RTS_SUPPORTS_THREADS) + { + nat i; MutUserTime = 0.0; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { + for (i = 0; i < taskCount; i++) { MutUserTime += taskTable[i].mut_time; } } time = MutUserTime + GC_tot_time + InitUserTime + ExitUserTime; if (MutUserTime < 0) { MutUserTime = 0; } +#else + time = CurrentUserTime; #endif + /* avoid divide by zero if time is measured as 0.00 seconds -- SDM */ + if (time == 0.0) time = 1; + if (etime == 0.0) etime = 1; + if (RtsFlags.GcFlags.giveStats >= VERBOSE_GC_STATS) { statsPrintf("%9ld %9.9s %9.9s", (lnat)alloc*sizeof(W_), "", ""); statsPrintf(" %5.2f %5.2f\n\n", 0.0, 0.0); @@ -690,17 +669,18 @@ stat_exit(int alloc) statsPrintf("\n%11ld Mb total memory in use\n\n", mblocks_allocated * MBLOCK_SIZE / (1024 * 1024)); -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) { nat i; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - statsPrintf(" Task %2d: MUT time: %6.2fs (%6.2fs elapsed)\n" - " GC time: %6.2fs (%6.2fs elapsed)\n\n", - i, - TICK_TO_DBL(taskTable[i].mut_time), - TICK_TO_DBL(taskTable[i].mut_etime), - TICK_TO_DBL(taskTable[i].gc_time), - TICK_TO_DBL(taskTable[i].gc_etime)); + for (i = 0; i < taskCount; i++) { + statsPrintf(" Task %2d %-8s : MUT time: %6.2fs (%6.2fs elapsed)\n" + " GC time: %6.2fs (%6.2fs elapsed)\n\n", + i, + taskTable[i].is_worker ? "(worker)" : "(bound)", + TICK_TO_DBL(taskTable[i].mut_time), + TICK_TO_DBL(taskTable[i].mut_etime), + TICK_TO_DBL(taskTable[i].gc_time), + TICK_TO_DBL(taskTable[i].gc_etime)); } } #endif diff --git a/ghc/rts/Stats.h b/ghc/rts/Stats.h index 962d38c..0ed1307 100644 --- a/ghc/rts/Stats.h +++ b/ghc/rts/Stats.h @@ -46,6 +46,6 @@ extern double mut_user_time_during_heap_census(void); extern void statDescribeGens( void ); extern HsInt64 getAllocations( void ); -#if defined(SMP) -extern long int stat_getElapsedTime ( void ); -#endif +extern void stat_getTimes ( long *currentElapsedTime, + long *currentUserTime, + long *elapsedGCTime ); diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 42dc9c9..76ea891 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -1,25 +1,13 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 2001- + * (c) The GHC Team 2001-2005 * * The task manager subsystem. Tasks execute STG code, with this * module providing the API which the Scheduler uses to control their * creation and destruction. - * - * Two kinds of RTS builds uses 'tasks' - the SMP and the - * 'native thread-friendly' builds. * - * The SMP build lets multiple tasks concurrently execute STG code, - * all sharing vital internal RTS data structures in a controlled manner - * (see details elsewhere...ToDo: fill in ref!) - * - * The 'threads' build has at any one time only one task executing STG - * code, other tasks are either busy executing code outside the RTS - * (e.g., a C call) or waiting for their turn to (again) evaluate some - * STG code. A task relinquishes its RTS token when it is asked to - * evaluate an external (C) call. - * * -------------------------------------------------------------------------*/ + #include "Rts.h" #if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" @@ -28,188 +16,234 @@ #include "Stats.h" #include "RtsFlags.h" #include "Schedule.h" +#include "Hash.h" +#include "Capability.h" #if HAVE_SIGNAL_H #include #endif -/* There's not all that much code that is shared between the - * SMP and threads version of the 'task manager.' A sign - * that the code ought to be structured differently..(Maybe ToDo). - */ +#define INIT_TASK_TABLE_SIZE 16 -/* - * The following Task Manager-local variables are assumed to be - * accessed with the RTS lock in hand. - */ -#if defined(SMP) TaskInfo* taskTable; -#endif -/* upper bound / the number of tasks created. */ -static nat maxTasks; -/* number of tasks currently created */ -static nat taskCount; -static nat awaitDeath; +static nat taskTableSize; + +HashTable *taskHash; // maps OSThreadID to TaskInfo* + +nat taskCount; +static nat tasksRunning; +static nat workerCount; + +#define DEFAULT_MAX_WORKERS 64 +nat maxWorkers; // we won't create more workers than this -#if defined(SMP) void -startTaskManager( nat maxCount, void (*taskStart)(void) ) +initTaskManager (void) { - nat i; - static int initialized = 0; - - if (!initialized) { - - taskCount = 0; - maxTasks = maxCount; - /* allocate table holding task metadata */ + static int initialized = 0; + + if (!initialized) { + taskTableSize = INIT_TASK_TABLE_SIZE; + taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo), + "initTaskManager"); + + taskCount = 0; + workerCount = 0; + tasksRunning = 0; + + taskHash = allocHashTable(); - if (maxCount > 0) { - taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo), - "startTaskManager:tasks"); - - /* and eagerly create them all. */ - for (i = 0; i < maxCount; i++) { - startTask(taskStart); - taskCount++; - } + maxWorkers = DEFAULT_MAX_WORKERS; + + initialized = 1; } - initialized = 1; - } } -rtsBool -startTask ( void (*taskStart)(void) ) +static void +expandTaskTable (void) { - int r; - OSThreadId tid; - - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - - taskTable[taskCount].id = tid; - taskTable[taskCount].mut_time = 0.0; - taskTable[taskCount].mut_etime = 0.0; - taskTable[taskCount].gc_time = 0.0; - taskTable[taskCount].gc_etime = 0.0; - taskTable[taskCount].elapsedtimestart = stat_getElapsedTime(); - - IF_DEBUG(scheduler,debugBelch("scheduler: Started task: %ld\n",tid);); - return rtsTrue; + taskTableSize *= 2; + taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo), + "expandTaskTable"); } void stopTaskManager (void) { - nat i; - OSThreadId tid = osThreadId(); - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(taskTable[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < maxCount; i++) { - IF_DEBUG(scheduler,debugBelch("scheduler: waiting for task %ld\n", - taskTable[i].id)); - pthread_join(taskTable[i].id, NULL); - } -#endif - - /* Send 'em all a SIGHUP. That should shut 'em up. */ - awaitDeath = taskCount==0 ? 0 : taskCount-1; - for (i = 0; i < taskCount; i++) { - /* don't cancel the thread running this piece of code. */ - if ( taskTable[i].id != tid ) { - pthread_kill(taskTable[i].id,SIGTERM); + nat i; + + IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning)); + for (i = 1000; i > 0; i--) { + if (tasksRunning == 0) { + IF_DEBUG(scheduler, sched_belch("all tasks stopped")); + return; + } + prodWorker(); + yieldThread(); } - } - while (awaitDeath > 0) { - sched_yield(); - } - - return; + IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning)); + + /* + OLD CODE follows: + */ +#if old_code + /* Send 'em all a SIGHUP. That should shut 'em up. */ + awaitDeath = taskCount==0 ? 0 : taskCount-1; + for (i = 0; i < taskCount; i++) { + /* don't cancel the thread running this piece of code. */ + if ( taskTable[i].id != tid ) { + pthread_kill(taskTable[i].id,SIGTERM); + } + } + while (awaitDeath > 0) { + sched_yield(); + } +#endif // old_code } -void -resetTaskManagerAfterFork (void) + +rtsBool +startTasks (nat num, void (*taskStart)(void)) { - barf("resetTaskManagerAfterFork not implemented for SMP"); + nat i; + for (i = 0; i < num; i++) { + if (!startTask(taskStart)) { + return rtsFalse; + } + } + return rtsTrue; } -#else -/************ THREADS version *****************/ - -void -startTaskManager( nat maxCount, - void (*taskStart)(void) STG_UNUSED ) +static TaskInfo* +newTask (OSThreadId id, rtsBool is_worker) { - /* In the threaded case, maxCount is used to limit the - the creation of worker tasks. Tasks are created lazily, i.e., - when the current task gives up the token on executing - STG code. - */ - maxTasks = maxCount; - taskCount = 0; + long currentElapsedTime, currentUserTime, elapsedGCTime; + TaskInfo *task_info; + + if (taskCount >= taskTableSize) { + expandTaskTable(); + } + + insertHashTable( taskHash, id, &(taskTable[taskCount]) ); + + stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); + + task_info = &taskTable[taskCount]; + + task_info->id = id; + task_info->is_worker = is_worker; + task_info->stopped = rtsFalse; + task_info->mut_time = 0.0; + task_info->mut_etime = 0.0; + task_info->gc_time = 0.0; + task_info->gc_etime = 0.0; + task_info->muttimestart = currentUserTime; + task_info->elapsedtimestart = currentElapsedTime; + + taskCount++; + workerCount++; + tasksRunning++; + + IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks);); + + return task_info; } rtsBool -startTask ( void (*taskStart)(void) ) +startTask (void (*taskStart)(void)) { int r; OSThreadId tid; - - /* If more than one worker thread is known to be blocked waiting - on thread_ready_cond, don't create a new one. - */ - if ( rts_n_waiting_tasks > 0) { - IF_DEBUG(scheduler,debugBelch( - "scheduler: startTask: %d tasks waiting, not creating new one.\n", - rts_n_waiting_tasks);); - // the task will run as soon as a capability is available, - // so there's no need to wake it. - return rtsFalse; - } - /* If the task limit has been reached, just return. */ - if (maxTasks > 0 && taskCount == maxTasks) { - IF_DEBUG(scheduler,debugBelch("scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks)); - return rtsFalse; - } - r = createOSThread(&tid,taskStart); if (r != 0) { barf("startTask: Can't create new task"); } - taskCount++; - - IF_DEBUG(scheduler,debugBelch("scheduler: startTask: new task %ld (total_count: %d; waiting: %d)\n", tid, taskCount, rts_n_waiting_tasks);); + newTask (tid, rtsTrue); return rtsTrue; } +TaskInfo * +threadIsTask (OSThreadId id) +{ + TaskInfo *task_info; + + task_info = lookupHashTable(taskHash, id); + if (task_info != NULL) { + if (task_info->stopped) { + task_info->stopped = rtsFalse; + } + return task_info; + } + + return newTask(id, rtsFalse); +} +TaskInfo * +taskOfId (OSThreadId id) +{ + return lookupHashTable(taskHash, id); +} void -stopTaskManager () +taskStop (void) { + OSThreadId id; + long currentElapsedTime, currentUserTime, elapsedGCTime; + TaskInfo *task_info; + + id = osThreadId(); + task_info = taskOfId(id); + if (task_info == NULL) { + debugBelch("taskStop: not a task"); + return; + } + ASSERT(task_info->id == id); + + task_info->stopped = rtsTrue; + tasksRunning--; + stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); + + task_info->mut_time = + currentUserTime - task_info->muttimestart - task_info->gc_time; + task_info->mut_etime = + currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime; + + if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; } + if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; } } void -resetTaskManagerAfterFork ( void ) +resetTaskManagerAfterFork (void) { - rts_n_waiting_tasks = 0; - taskCount = 0; + rts_n_waiting_tasks = 0; + taskCount = 0; } -#endif +rtsBool +maybeStartNewWorker (void (*taskStart)(void)) +{ + /* + * If more than one worker thread is known to be blocked waiting + * on thread_ready_cond, don't create a new one. + */ + if ( rts_n_waiting_tasks > 0) { + IF_DEBUG(scheduler,sched_belch( + "startTask: %d tasks waiting, not creating new one", + rts_n_waiting_tasks);); + // the task will run as soon as a capability is available, + // so there's no need to wake it. + return rtsFalse; + } + + /* If the task limit has been reached, just return. */ + if (maxWorkers > 0 && workerCount >= maxWorkers) { + IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers)); + return rtsFalse; + } + + return startTask(taskStart); +} #endif /* RTS_SUPPORTS_THREADS */ diff --git a/ghc/rts/Task.h b/ghc/rts/Task.h index 7dd29ad..b1add2f 100644 --- a/ghc/rts/Task.h +++ b/ghc/rts/Task.h @@ -1,36 +1,107 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 2001-2003 + * (c) The GHC Team 2001-2005 * - * Types + prototypes for functions in Task.c - * (RTS subsystem for handling tasks, agents thay may execute STG code). + * Tasks * * -------------------------------------------------------------------------*/ + #ifndef __TASK_H__ #define __TASK_H__ -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ +/* Definition of a Task: + * + * A task is an OSThread that runs Haskell code. Every OSThread + * created by the RTS for the purposes of running Haskell code is a + * Task. We maintain information about Tasks mainly for the purposes + * of stats gathering. + * + * There may exist OSThreads that run Haskell code, but which aren't + * tasks (they don't have an associated TaskInfo structure). This + * happens when a thread makes an in-call to Haskell: we don't want to + * create a Task for every in-call and register stats for all these + * threads, so it is not therefore mandatory to have a Task for every + * thread running Haskell code. + * + * The SMP build lets multiple tasks concurrently execute STG code, + * all sharing vital internal RTS data structures in a controlled manner. + * + * The 'threaded' build has at any one time only one task executing STG + * code, other tasks are either busy executing code outside the RTS + * (e.g., a C call) or waiting for their turn to (again) evaluate some + * STG code. A task relinquishes its RTS token when it is asked to + * evaluate an external (C) call. + */ + +#if defined(RTS_SUPPORTS_THREADS) /* to the end */ /* - * Tasks evaluate STG code; the TaskInfo structure collects together + * Tasks evaluate Haskell code; the TaskInfo structure collects together * misc metadata about a task. - * */ typedef struct _TaskInfo { OSThreadId id; - double elapsedtimestart; - double mut_time; - double mut_etime; - double gc_time; - double gc_etime; + rtsBool is_worker; /* rtsFalse <=> is a bound thread */ + rtsBool stopped; /* this task has stopped or exited Haskell */ + long elapsedtimestart; + long muttimestart; + long mut_time; + long mut_etime; + long gc_time; + long gc_etime; } TaskInfo; extern TaskInfo *taskTable; +extern nat taskCount; -extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) ); -extern void stopTaskManager ( void ); -extern void resetTaskManagerAfterFork ( void ); +/* + * Start and stop the task manager. + * Requires: sched_mutex. + */ +extern void initTaskManager (void); +extern void stopTaskManager (void); -extern rtsBool startTask ( void (*taskStart)(void) ); +/* + * Two ways to start tasks: either singly or in a batch + * Requires: sched_mutex. + */ +extern rtsBool startTasks (nat num, void (*taskStart)(void)); +extern rtsBool startTask (void (*taskStart)(void)); + +/* + * Notify the task manager that a task has stopped. This is used + * mainly for stats-gathering purposes. + * Requires: sched_mutex. + */ +extern void taskStop (void); + +/* + * After a fork, the tasks are not carried into the child process, so + * we must tell the task manager. + * Requires: sched_mutex. + */ +extern void resetTaskManagerAfterFork (void); + +/* + * Tell the task manager that the current OS thread is now a task, + * because it has entered Haskell as a bound thread. + * Requires: sched_mutex. + */ +extern TaskInfo* threadIsTask (OSThreadId id); + +/* + * Get the TaskInfo structure corresponding to an OSThread. Returns + * NULL if the thread is not a task. + * Requires: sched_mutex. + */ +extern TaskInfo* taskOfId (OSThreadId id); + +/* + * Decides whether to call startTask() or not, based on how many + * workers are already running and waiting for work. Returns + * rtsTrue if a worker was created. + * Requires: sched_mutex. + */ +extern rtsBool maybeStartNewWorker (void (*taskStart)(void)); #endif /* RTS_SUPPORTS_THREADS */ #endif /* __TASK_H__ */