X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=05315a59d90db636e2a1c92fc38f584c24e90a31;hb=c51229b2bfd3b1a61d3966db894210ef848f0a6d;hp=70ddf090c17112589d8a1718c3e10146ff35625d;hpb=653e325e08c5f632aa194f9239e938faca5abba5;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 70ddf09..05315a5 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -136,7 +136,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task); +static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -174,12 +174,12 @@ static void deleteThread_(Capability *cap, StgTSO *tso); #ifdef DEBUG static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" + [0] = "(unknown)", + [ThreadRunGHC] = "ThreadRunGHC", + [ThreadInterpret] = "ThreadInterpret", + [ThreadKilled] = "ThreadKilled", + [ThreadRelocated] = "ThreadRelocated", + [ThreadComplete] = "ThreadComplete" }; #endif @@ -240,6 +240,7 @@ schedule (Capability *initialCapability, Task *task) rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; + rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -252,14 +253,6 @@ schedule (Capability *initialCapability, Task *task) "### NEW SCHEDULER LOOP (task: %p, cap: %p)", task, initialCapability); - if (running_finalizers) { - errorBelch("error: a C finalizer called back into Haskell.\n" - " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n" - " To create finalizers that may call back into Haskll, use\n" - " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr."); - stg_exit(EXIT_FAILURE); - } - schedulePreLoop(); // ----------------------------------------------------------- @@ -374,7 +367,9 @@ schedule (Capability *initialCapability, Task *task) } yield: - scheduleYield(&cap,task); + scheduleYield(&cap,task,force_yield); + force_yield = rtsFalse; + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -553,6 +548,7 @@ run_thread: debugTrace(DEBUG_sched, "--<< thread %lu (%s) stopped: blocked", (unsigned long)t->id, whatNext_strs[t->what_next]); + force_yield = rtsTrue; goto yield; } #endif @@ -675,12 +671,23 @@ shouldYieldCapability (Capability *cap, Task *task) // and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task) +scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. - if (!shouldYieldCapability(cap,task) && + // + // The force_yield flag is used when a bound thread blocks. This + // is a particularly tricky situation: the current Task does not + // own the TSO any more, since it is on some queue somewhere, and + // might be woken up or manipulated by another thread at any time. + // The TSO and Task might be migrated to another Capability. + // Certain invariants might be in doubt, such as task->bound->cap + // == cap. We have to yield the current Capability immediately, + // no messing around. + // + if (!force_yield && + !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) || blackholes_need_checking || @@ -1117,10 +1124,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(THREADED_RTS) - ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0 == cap->r.rNursery); -#endif cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -2664,10 +2667,9 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: - case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)blockedOnDeadMVar_closure); + (StgClosure *)blockedIndefinitelyOnMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, @@ -2675,7 +2677,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)blockedIndefinitely_closure); + (StgClosure *)blockedIndefinitelyOnSTM_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -2683,6 +2685,11 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnException: + // throwTo should never block indefinitely: if the target + // thread dies or completes, throwTo returns. + barf("resurrectThreads: thread BlockedOnException"); + break; default: barf("resurrectThreads: thread blocked in a strange way"); } @@ -2707,8 +2714,12 @@ performPendingThrowTos (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + Task *task, *saved_task;; step *step; + task = myTask(); + cap = task->cap; + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; @@ -2718,7 +2729,17 @@ performPendingThrowTos (StgTSO *threads) debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - cap = tso->cap; - maybePerformBlockedException(cap, tso); - } + // We must pretend this Capability belongs to the current Task + // for the time being, as invariants will be broken otherwise. + // In fact the current Task has exclusive access to the systme + // at this point, so this is just bookkeeping: + task->cap = tso->cap; + saved_task = tso->cap->running_task; + tso->cap->running_task = task; + maybePerformBlockedException(tso->cap, tso); + tso->cap->running_task = saved_task; + } + + // Restore our original Capability: + task->cap = cap; }