1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2010
5 * Inter-Capability message passing
7 * --------------------------------------------------------------------------*/
12 #include "Capability.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
18 /* ----------------------------------------------------------------------------
19 Send a message to another Capability
20 ------------------------------------------------------------------------- */
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
26 ACQUIRE_LOCK(&to_cap->lock);
30 const StgInfoTable *i = msg->header.info;
31 if (i != &stg_MSG_THROWTO_info &&
32 i != &stg_MSG_BLACKHOLE_info &&
33 i != &stg_MSG_TRY_WAKEUP_info &&
34 i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
35 i != &stg_WHITEHOLE_info) {
36 barf("sendMessage: %p", i);
41 msg->link = to_cap->inbox;
44 recordClosureMutated(from_cap,(StgClosure*)msg);
46 if (to_cap->running_task == NULL) {
47 to_cap->running_task = myTask();
48 // precond for releaseCapability_()
49 releaseCapability_(to_cap,rtsFalse);
51 contextSwitchCapability(to_cap);
54 RELEASE_LOCK(&to_cap->lock);
57 #endif /* THREADED_RTS */
59 /* ----------------------------------------------------------------------------
61 ------------------------------------------------------------------------- */
66 executeMessage (Capability *cap, Message *m)
68 const StgInfoTable *i;
71 write_barrier(); // allow m->header to be modified by another thread
73 if (i == &stg_MSG_TRY_WAKEUP_info)
75 StgTSO *tso = ((MessageWakeup *)m)->tso;
76 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
78 tryWakeupThread(cap, tso);
80 else if (i == &stg_MSG_THROWTO_info)
82 MessageThrowTo *t = (MessageThrowTo *)m;
84 const StgInfoTable *i;
86 i = lockClosure((StgClosure*)m);
87 if (i != &stg_MSG_THROWTO_info) {
88 unlockClosure((StgClosure*)m, i);
92 debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
93 (lnat)t->source->id, (lnat)t->target->id);
95 ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
96 ASSERT(t->source->block_info.closure == (StgClosure *)m);
98 r = throwToMsg(cap, t);
101 case THROWTO_SUCCESS: {
102 // this message is done
103 StgTSO *source = t->source;
104 doneWithMsgThrowTo(t);
105 tryWakeupThread(cap, source);
108 case THROWTO_BLOCKED:
109 // unlock the message
110 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
114 else if (i == &stg_MSG_BLACKHOLE_info)
117 MessageBlackHole *b = (MessageBlackHole*)m;
119 r = messageBlackHole(cap, b);
121 tryWakeupThread(cap, b->tso);
125 else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
127 // message was revoked
130 else if (i == &stg_WHITEHOLE_info)
136 barf("executeMessage: %p", i);
142 /* ----------------------------------------------------------------------------
143 Handle a MSG_BLACKHOLE message
145 This is called from two places: either we just entered a BLACKHOLE
146 (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
149 We need to establish whether the BLACKHOLE belongs to
151 - if so, arrange to block the current thread on it
152 - otherwise, forward the message to the right place
155 - 0 if the blocked thread can be woken up by the caller
156 - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
157 at some point in the future.
159 ------------------------------------------------------------------------- */
161 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
163 const StgInfoTable *info;
165 StgBlockingQueue *bq;
166 StgClosure *bh = UNTAG_CLOSURE(msg->bh);
169 debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
170 (lnat)msg->tso->id, msg->bh);
172 info = bh->header.info;
174 // If we got this message in our inbox, it might be that the
175 // BLACKHOLE has already been updated, and GC has shorted out the
176 // indirection, so the pointer no longer points to a BLACKHOLE at
178 if (info != &stg_BLACKHOLE_info &&
179 info != &stg_CAF_BLACKHOLE_info &&
180 info != &__stg_EAGER_BLACKHOLE_info &&
181 info != &stg_WHITEHOLE_info) {
182 // if it is a WHITEHOLE, then a thread is in the process of
183 // trying to BLACKHOLE it. But we know that it was once a
184 // BLACKHOLE, so there is at least a valid pointer in the
185 // payload, so we can carry on.
189 // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
192 // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
193 // and turns this into an infinite loop.
194 p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
195 info = p->header.info;
197 if (info == &stg_IND_info)
199 // This could happen, if e.g. we got a BLOCKING_QUEUE that has
200 // just been replaced with an IND by another thread in
201 // updateThunk(). In which case, if we read the indirectee
202 // again we should get the value.
206 else if (info == &stg_TSO_info)
211 if (owner->cap != cap) {
212 sendMessage(cap, owner->cap, (Message*)msg);
213 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
217 // owner is the owner of the BLACKHOLE, and resides on this
218 // Capability. msg->tso is the first thread to block on this
219 // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
221 bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
223 // initialise the BLOCKING_QUEUE object
224 SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
229 msg->link = (MessageBlackHole*)END_TSO_QUEUE;
231 // All BLOCKING_QUEUES are linked in a list on owner->bq, so
232 // that we can search through them in the event that there is
233 // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
234 // becomes orphaned (see updateThunk()).
235 bq->link = owner->bq;
237 dirty_TSO(cap, owner); // we modified owner->bq
239 // If the owner of the blackhole is currently runnable, then
240 // bump it to the front of the run queue. This gives the
241 // blocked-on thread a little boost which should help unblock
242 // this thread, and may avoid a pile-up of other threads
243 // becoming blocked on the same BLACKHOLE (#3838).
245 // NB. we check to make sure that the owner is not the same as
246 // the current thread, since in that case it will not be on
248 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
249 removeFromRunQueue(cap, owner);
250 pushOnRunQueue(cap,owner);
253 // point to the BLOCKING_QUEUE from the BLACKHOLE
254 write_barrier(); // make the BQ visible
255 ((StgInd*)bh)->indirectee = (StgClosure *)bq;
256 recordClosureMutated(cap,bh); // bh was mutated
258 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
259 (lnat)msg->tso->id, (lnat)owner->id);
263 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
264 info == &stg_BLOCKING_QUEUE_DIRTY_info)
266 StgBlockingQueue *bq = (StgBlockingQueue *)p;
268 ASSERT(bq->bh == bh);
272 ASSERT(owner != END_TSO_QUEUE);
275 if (owner->cap != cap) {
276 sendMessage(cap, owner->cap, (Message*)msg);
277 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
282 msg->link = bq->queue;
284 recordClosureMutated(cap,(StgClosure*)msg);
286 if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
287 bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
288 recordClosureMutated(cap,(StgClosure*)bq);
291 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
292 (lnat)msg->tso->id, (lnat)owner->id);
295 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
296 removeFromRunQueue(cap, owner);
297 pushOnRunQueue(cap,owner);
303 return 0; // not blocked
306 // A shorter version of messageBlackHole(), that just returns the
307 // owner (or NULL if the owner cannot be found, because the blackhole
308 // has been updated in the meantime).
310 StgTSO * blackHoleOwner (StgClosure *bh)
312 const StgInfoTable *info;
315 info = bh->header.info;
317 if (info != &stg_BLACKHOLE_info &&
318 info != &stg_CAF_BLACKHOLE_info &&
319 info != &__stg_EAGER_BLACKHOLE_info &&
320 info != &stg_WHITEHOLE_info) {
324 // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
327 // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
328 // and turns this into an infinite loop.
329 p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
330 info = p->header.info;
332 if (info == &stg_IND_info) goto loop;
334 else if (info == &stg_TSO_info)
338 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
339 info == &stg_BLOCKING_QUEUE_DIRTY_info)
341 StgBlockingQueue *bq = (StgBlockingQueue *)p;
345 return NULL; // not blocked