- /* write RESUME events to log file and
- update blocked and fetch time (depending on type of the orig closure) */
- if (RtsFlags.ParFlags.ParStats.Full) {
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- if (emptyRunQueue())
- emitSchedule = rtsTrue;
-
- switch (get_itbl(node)->type) {
- case FETCH_ME_BQ:
- ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
- case RBH:
- case FETCH_ME:
- case BLACKHOLE_BQ:
- ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
-#ifdef DIST
- case MVAR:
- break;
-#endif
- default:
- barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
- }
- }
-}
-#endif
-
-#if defined(GRAN)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgTSO *tso;
- PEs node_loc, tso_loc;
-
- node_loc = where_is(node); // should be lifted out of loop
- tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- tso_loc = where_is((StgClosure *)tso);
- if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
- /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
- ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
- // insertThread(tso, node_loc);
- new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
- ResumeThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len_local++;
- // len++;
- } else { // TSO is remote (actually should be FMBQ)
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
- RtsFlags.GranFlags.Costs.gunblocktime +
- RtsFlags.GranFlags.Costs.latency;
- new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
- UnblockThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len++;
- }
- /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
- IF_GRAN_DEBUG(bq,
- debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
- (node_loc==tso_loc ? "Local" : "Global"),
- tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
- tso->block_info.closure = NULL;
- IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
- tso->id, tso));
-}
-#elif defined(PARALLEL_HASKELL)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgBlockingQueueElement *next;
-
- switch (get_itbl(bqe)->type) {
- case TSO:
- ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
- /* if it's a TSO just push it onto the run_queue */
- next = bqe->link;
- ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
- APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- threadRunnable();
- unblockCount(bqe, node);
- /* reset blocking status after dumping event */
- ((StgTSO *)bqe)->why_blocked = NotBlocked;
- break;
-
- case BLOCKED_FETCH:
- /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
- next = bqe->link;
- bqe->link = (StgBlockingQueueElement *)PendingFetches;
- PendingFetches = (StgBlockedFetch *)bqe;
- break;
-
-# if defined(DEBUG)
- /* can ignore this case in a non-debugging setup;
- see comments on RBHSave closures above */
- case CONSTR:
- /* check that the closure is an RBHSave closure */
- ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
- break;
-
- default:
- barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
- get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
- (StgClosure *)bqe);
-# endif
- }
- IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
- return next;
-}
-#endif
-
-StgTSO *
-unblockOne(Capability *cap, StgTSO *tso)
-{
- StgTSO *next;
-
- ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
-
- tso->why_blocked = NotBlocked;
- next = tso->link;
- tso->link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
- tso->cap = cap;
- appendToRunQueue(cap,tso);
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(tso->cap, tso);
- }
-#else
- appendToRunQueue(cap,tso);
- context_switch = 1;
-#endif
-
- IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
- return next;
-}
-
-
-#if defined(GRAN)
-void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- PEs node_loc;
- nat len = 0;
-
- IF_GRAN_DEBUG(bq,
- debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
- node, CurrentProc, CurrentTime[CurrentProc],
- CurrentTSO->id, CurrentTSO));
-
- node_loc = where_is(node);
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
- get_itbl(q)->type == CONSTR); // closure (type constructor)
- ASSERT(is_unique(node));
-
- /* FAKE FETCH: magically copy the node to the tso's proc;
- no Fetch necessary because in reality the node should not have been
- moved to the other PE in the first place
- */
- if (CurrentProc!=node_loc) {
- IF_GRAN_DEBUG(bq,
- debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
- node, node_loc, CurrentProc, CurrentTSO->id,
- // CurrentTSO, where_is(CurrentTSO),
- node->header.gran.procs));
- node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
- IF_GRAN_DEBUG(bq,
- debugBelch("## new bitmask of node %p is %#x\n",
- node, node->header.gran.procs));
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- globalGranStats.tot_fake_fetches++;
- }
- }
-
- bqe = q;
- // ToDo: check: ASSERT(CurrentProc==node_loc);
- while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
- //next = bqe->link;
- /*
- bqe points to the current element in the queue
- next points to the next element in the queue
- */
- //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- //tso_loc = where_is(tso);
- len++;
- bqe = unblockOne(bqe, node);
- }
-
- /* if this is the BQ of an RBH, we have to put back the info ripped out of
- the closure to make room for the anchor of the BQ */
- if (bqe!=END_BQ_QUEUE) {
- ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
- /*
- ASSERT((info_ptr==&RBH_Save_0_info) ||
- (info_ptr==&RBH_Save_1_info) ||
- (info_ptr==&RBH_Save_2_info));
- */
- /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
- ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
- ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
-
- IF_GRAN_DEBUG(bq,
- debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
- node, info_type(node)));
- }
-
- /* statistics gathering */
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- // globalGranStats.tot_bq_processing_time += bq_processing_time;
- globalGranStats.tot_bq_len += len; // total length of all bqs awakened
- // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
- globalGranStats.tot_awbq++; // total no. of bqs awakened
- }
- IF_GRAN_DEBUG(bq,
- debugBelch("## BQ Stats of %p: [%d entries] %s\n",
- node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
-}
-#elif defined(PARALLEL_HASKELL)
-void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
-
- IF_PAR_DEBUG(verbose,
- debugBelch("##-_ AwBQ for node %p on [%x]: \n",
- node, mytid));
-#ifdef DIST
- //RFP
- if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
- IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
- return;
- }
-#endif
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO ||
- get_itbl(q)->type == BLOCKED_FETCH ||
- get_itbl(q)->type == CONSTR);
-
- bqe = q;
- while (get_itbl(bqe)->type==TSO ||
- get_itbl(bqe)->type==BLOCKED_FETCH) {
- bqe = unblockOne(bqe, node);
- }
-}
-
-#else /* !GRAN && !PARALLEL_HASKELL */
-
-void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
-{
- if (tso == NULL) return; // hack; see bug #1235728, and comments in
- // Exception.cmm
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
- }
-}
-#endif
-
-/* ---------------------------------------------------------------------------
- Interrupt execution
- - usually called inside a signal handler so it mustn't do anything fancy.
- ------------------------------------------------------------------------ */
-
-void
-interruptStgRts(void)
-{
- sched_state = SCHED_INTERRUPTING;
- context_switch = 1;
-#if defined(THREADED_RTS)
- prodAllCapabilities();
-#endif
-}
-
-/* -----------------------------------------------------------------------------
- Unblock a thread
-
- This is for use when we raise an exception in another thread, which
- may be blocked.
- This has nothing to do with the UnblockThread event in GranSim. -- HWL
- -------------------------------------------------------------------------- */
-
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
- NB: only the type of the blocking queue is different in GranSim and GUM
- the operations on the queue-elements are the same
- long live polymorphism!
-
- Locks: sched_mutex is held upon entry and exit.
-
-*/
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
- StgBlockingQueueElement *t, **last;
-
- switch (tso->why_blocked) {
-
- case NotBlocked:
- return; /* not blocked */
-
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = (StgBlockingQueueElement **)&mvar->head;
- for (t = (StgBlockingQueueElement *)mvar->head;
- t != END_BQ_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- if (mvar->tail == tso) {
- mvar->tail = (StgTSO *)last_tso;
- }
- goto done;
- }
- }
- barf("unblockThread (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
- {
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
- last = &bq->blocking_queue;
- for (t = bq->blocking_queue;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("unblockThread (BLACKHOLE): TSO not found");
- }
-
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- if (target->what_next == ThreadRelocated) {
- target = target->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- ASSERT(target->blocked_exceptions != NULL);
-
- last = (StgBlockingQueueElement **)&target->blocked_exceptions;
- for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("unblockThread (Exception): TSO not found");
- }
-
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- /* take TSO off blocked_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- blocked_queue_hd = (StgTSO *)t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = (StgTSO *)prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("unblockThread (I/O): TSO not found");
- }
-
- case BlockedOnDelay:
- {
- /* take TSO off sleeping_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- sleeping_queue = (StgTSO *)t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("unblockThread (delay): TSO not found");
- }
-
- default:
- barf("unblockThread");
- }
-
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- pushOnRunQueue(cap,tso);
-}
-#else
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
- StgTSO *t, **last;
-
- /* To avoid locking unnecessarily. */
- if (tso->why_blocked == NotBlocked) {
- return;
- }
-
- switch (tso->why_blocked) {
-
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgTSO *last_tso = END_TSO_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = &mvar->head;
- for (t = mvar->head; t != END_TSO_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == tso) {
- *last = tso->link;
- if (mvar->tail == tso) {
- mvar->tail = last_tso;
- }
- goto done;
- }
- }
- barf("unblockThread (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- {
- last = &blackhole_queue;
- for (t = blackhole_queue; t != END_TSO_QUEUE;
- last = &t->link, t = t->link) {
- if (t == tso) {
- *last = tso->link;
- goto done;
- }
- }
- barf("unblockThread (BLACKHOLE): TSO not found");
- }
-
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- ASSERT(target->blocked_exceptions != NULL);
-
- last = &target->blocked_exceptions;
- for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == tso) {
- *last = tso->link;
- goto done;
- }
- }
- barf("unblockThread (Exception): TSO not found");
- }
-
-#if !defined(THREADED_RTS)
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- StgTSO *prev = NULL;
- for (t = blocked_queue_hd; t != END_TSO_QUEUE;
- prev = t, t = t->link) {
- if (t == tso) {
- if (prev == NULL) {
- blocked_queue_hd = t->link;
- if (blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if (blocked_queue_tl == t) {
- blocked_queue_tl = prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("unblockThread (I/O): TSO not found");
- }
-
- case BlockedOnDelay:
- {
- StgTSO *prev = NULL;
- for (t = sleeping_queue; t != END_TSO_QUEUE;
- prev = t, t = t->link) {
- if (t == tso) {
- if (prev == NULL) {
- sleeping_queue = t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("unblockThread (delay): TSO not found");
- }
-#endif
-
- default:
- barf("unblockThread");
- }
-
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- appendToRunQueue(cap,tso);
-
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
- }
- tso->cap = cap;
-}