X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FCapability.c;h=f5e77a900faa62c1b4a83684fde35564f84ee814;hp=cf85372ce08ab696a17c43950fcb003ba7bc63ca;hb=5d52d9b64c21dcf77849866584744722f8121389;hpb=f8a01fb39f3756bec409c9b752fd4f8088e130bf diff --git a/rts/Capability.c b/rts/Capability.c index cf85372..f5e77a9 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void) STATIC_INLINE rtsBool globalWorkToDo (void) { - return blackholes_need_checking - || sched_state >= SCHED_INTERRUPTING - ; + return sched_state >= SCHED_INTERRUPTING + || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock } #endif @@ -173,10 +172,10 @@ STATIC_INLINE void newReturningTask (Capability *cap, Task *task) { ASSERT_LOCK_HELD(&cap->lock); - ASSERT(task->return_link == NULL); + ASSERT(task->next == NULL); if (cap->returning_tasks_hd) { - ASSERT(cap->returning_tasks_tl->return_link == NULL); - cap->returning_tasks_tl->return_link = task; + ASSERT(cap->returning_tasks_tl->next == NULL); + cap->returning_tasks_tl->next = task; } else { cap->returning_tasks_hd = task; } @@ -190,11 +189,11 @@ popReturningTask (Capability *cap) Task *task; task = cap->returning_tasks_hd; ASSERT(task); - cap->returning_tasks_hd = task->return_link; + cap->returning_tasks_hd = task->next; if (!cap->returning_tasks_hd) { cap->returning_tasks_tl = NULL; } - task->return_link = NULL; + task->next = NULL; return task; } #endif @@ -220,11 +219,10 @@ initCapability( Capability *cap, nat i ) initMutex(&cap->lock); cap->running_task = NULL; // indicates cap is free cap->spare_workers = NULL; - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; - cap->wakeup_queue_hd = END_TSO_QUEUE; - cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->inbox = (Message*)END_TSO_QUEUE; cap->sparks_created = 0; cap->sparks_converted = 0; cap->sparks_pruned = 0; @@ -342,7 +340,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); debugTrace(DEBUG_sched, "passing capability %d to %s %p", - cap->no, task->tso ? "bound task" : "worker", + cap->no, task->incall->tso ? "bound task" : "worker", (void *)task->id); ACQUIRE_LOCK(&task->lock); task->wakeup = rtsTrue; @@ -398,7 +396,7 @@ releaseCapability_ (Capability* cap, // assertion is false: in schedule() we force a yield after // ThreadBlocked, but the thread may be back on the run queue // by now. - task = cap->run_queue_hd->bound; + task = cap->run_queue_hd->bound->task; giveCapabilityToTask(cap,task); return; } @@ -411,7 +409,7 @@ releaseCapability_ (Capability* cap, if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { debugTrace(DEBUG_sched, "starting new worker on capability %d", cap->no); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); return; } } @@ -419,7 +417,7 @@ releaseCapability_ (Capability* cap, // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. if (always_wakeup || - !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptyRunQueue(cap) || !emptyInbox(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); @@ -462,9 +460,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) // in which case it is not replaced on the spare_worker queue. // This happens when the system is shutting down (see // Schedule.c:workerStart()). - // Also, be careful to check that this task hasn't just exited - // Haskell to do a foreign call (task->suspended_tso). - if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + if (!isBoundTask(task) && !task->stopped) { task->next = cap->spare_workers; cap->spare_workers = task; } @@ -612,7 +608,7 @@ yieldCapability (Capability** pCap, Task *task) continue; } - if (task->tso == NULL) { + if (task->incall->tso == NULL) { ASSERT(cap->spare_workers != NULL); // if we're not at the front of the queue, release it // again. This is unlikely to happen. @@ -640,50 +636,6 @@ yieldCapability (Capability** pCap, Task *task) } /* ---------------------------------------------------------------------------- - * Wake up a thread on a Capability. - * - * This is used when the current Task is running on a Capability and - * wishes to wake up a thread on a different Capability. - * ------------------------------------------------------------------------- */ - -void -wakeupThreadOnCapability (Capability *my_cap, - Capability *other_cap, - StgTSO *tso) -{ - ACQUIRE_LOCK(&other_cap->lock); - - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = other_cap; - } - tso->cap = other_cap; - - ASSERT(tso->bound ? tso->bound->cap == other_cap : 1); - - if (other_cap->running_task == NULL) { - // nobody is running this Capability, we can add our thread - // directly onto the run queue and start up a Task to run it. - - other_cap->running_task = myTask(); - // precond for releaseCapability_() and appendToRunQueue() - - appendToRunQueue(other_cap,tso); - - releaseCapability_(other_cap,rtsFalse); - } else { - appendToWakeupQueue(my_cap,other_cap,tso); - other_cap->context_switch = 1; - // someone is running on this Capability, so it cannot be - // freed without first checking the wakeup queue (see - // releaseCapability_). - } - - RELEASE_LOCK(&other_cap->lock); -} - -/* ---------------------------------------------------------------------------- * prodCapability * * If a Capability is currently idle, wake up a Task on it. Used to @@ -781,7 +733,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) // that will try to return to code that has been unloaded. // We can be a bit more relaxed when this is a standalone // program that is about to terminate, and let safe=false. - if (cap->suspended_ccalling_tasks && safe) { + if (cap->suspended_ccalls && safe) { debugTrace(DEBUG_sched, "thread(s) are involved in foreign calls, yielding"); cap->running_task = NULL; @@ -871,7 +823,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, { nat i; Capability *cap; - Task *task; + InCall *incall; // Each GC thread is responsible for following roots from the // Capability of the same number. There will usually be the same @@ -883,12 +835,11 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, evac(user, (StgClosure **)(void *)&cap->run_queue_hd); evac(user, (StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); + evac(user, (StgClosure **)(void *)&cap->inbox); #endif - for (task = cap->suspended_ccalling_tasks; task != NULL; - task=task->next) { - evac(user, (StgClosure **)(void *)&task->suspended_tso); + for (incall = cap->suspended_ccalls; incall != NULL; + incall=incall->next) { + evac(user, (StgClosure **)(void *)&incall->suspended_tso); } #if defined(THREADED_RTS) @@ -912,3 +863,8 @@ markCapabilities (evac_fn evac, void *user) { markSomeCapabilities(evac, user, 0, 1, rtsFalse); } + +/* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ +