1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2010
5 * Inter-Capability message passing
7 * --------------------------------------------------------------------------*/
12 #include "Capability.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
18 /* ----------------------------------------------------------------------------
19 Send a message to another Capability
20 ------------------------------------------------------------------------- */
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
26 ACQUIRE_LOCK(&to_cap->lock);
30 const StgInfoTable *i = msg->header.info;
31 if (i != &stg_MSG_WAKEUP_info &&
32 i != &stg_MSG_THROWTO_info &&
33 i != &stg_MSG_BLACKHOLE_info &&
34 i != &stg_MSG_TRY_WAKEUP_info &&
35 i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
36 i != &stg_WHITEHOLE_info) {
37 barf("sendMessage: %p", i);
42 msg->link = to_cap->inbox;
45 recordClosureMutated(from_cap,(StgClosure*)msg);
47 if (to_cap->running_task == NULL) {
48 to_cap->running_task = myTask();
49 // precond for releaseCapability_()
50 releaseCapability_(to_cap,rtsFalse);
52 contextSwitchCapability(to_cap);
55 RELEASE_LOCK(&to_cap->lock);
58 #endif /* THREADED_RTS */
60 /* ----------------------------------------------------------------------------
62 ------------------------------------------------------------------------- */
67 executeMessage (Capability *cap, Message *m)
69 const StgInfoTable *i;
72 write_barrier(); // allow m->header to be modified by another thread
74 if (i == &stg_MSG_WAKEUP_info)
76 // the plan is to eventually get rid of these and use
77 // TRY_WAKEUP instead.
78 MessageWakeup *w = (MessageWakeup *)m;
80 debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
82 ASSERT(tso->cap == cap);
83 ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
84 ASSERT(tso->block_info.closure == (StgClosure *)m);
85 tso->why_blocked = NotBlocked;
86 appendToRunQueue(cap, tso);
88 else if (i == &stg_MSG_TRY_WAKEUP_info)
90 StgTSO *tso = ((MessageWakeup *)m)->tso;
91 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
93 tryWakeupThread(cap, tso);
95 else if (i == &stg_MSG_THROWTO_info)
97 MessageThrowTo *t = (MessageThrowTo *)m;
99 const StgInfoTable *i;
101 i = lockClosure((StgClosure*)m);
102 if (i != &stg_MSG_THROWTO_info) {
103 unlockClosure((StgClosure*)m, i);
107 debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
108 (lnat)t->source->id, (lnat)t->target->id);
110 ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
111 ASSERT(t->source->block_info.closure == (StgClosure *)m);
113 r = throwToMsg(cap, t);
116 case THROWTO_SUCCESS:
117 // this message is done
118 unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
119 tryWakeupThread(cap, t->source);
121 case THROWTO_BLOCKED:
122 // unlock the message
123 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
127 else if (i == &stg_MSG_BLACKHOLE_info)
130 MessageBlackHole *b = (MessageBlackHole*)m;
132 r = messageBlackHole(cap, b);
134 tryWakeupThread(cap, b->tso);
138 else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
140 // message was revoked
143 else if (i == &stg_WHITEHOLE_info)
149 barf("executeMessage: %p", i);
155 /* ----------------------------------------------------------------------------
156 Handle a MSG_BLACKHOLE message
158 This is called from two places: either we just entered a BLACKHOLE
159 (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
162 We need to establish whether the BLACKHOLE belongs to
164 - if so, arrange to block the current thread on it
165 - otherwise, forward the message to the right place
168 - 0 if the blocked thread can be woken up by the caller
169 - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
170 at some point in the future.
172 ------------------------------------------------------------------------- */
174 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
176 const StgInfoTable *info;
178 StgBlockingQueue *bq;
179 StgClosure *bh = msg->bh;
182 debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
183 (lnat)msg->tso->id, msg->bh);
185 info = bh->header.info;
187 // If we got this message in our inbox, it might be that the
188 // BLACKHOLE has already been updated, and GC has shorted out the
189 // indirection, so the pointer no longer points to a BLACKHOLE at
191 if (info != &stg_BLACKHOLE_info &&
192 info != &stg_CAF_BLACKHOLE_info &&
193 info != &stg_WHITEHOLE_info) {
194 // if it is a WHITEHOLE, then a thread is in the process of
195 // trying to BLACKHOLE it. But we know that it was once a
196 // BLACKHOLE, so there is at least a valid pointer in the
197 // payload, so we can carry on.
201 // we know at this point that the closure
203 p = ((StgInd*)bh)->indirectee;
204 info = p->header.info;
206 if (info == &stg_IND_info)
208 // This could happen, if e.g. we got a BLOCKING_QUEUE that has
209 // just been replaced with an IND by another thread in
210 // updateThunk(). In which case, if we read the indirectee
211 // again we should get the value.
215 else if (info == &stg_TSO_info)
217 owner = deRefTSO((StgTSO *)p);
220 if (owner->cap != cap) {
221 sendMessage(cap, owner->cap, (Message*)msg);
222 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
226 // owner is the owner of the BLACKHOLE, and resides on this
227 // Capability. msg->tso is the first thread to block on this
228 // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
230 bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
232 // initialise the BLOCKING_QUEUE object
233 SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
238 msg->link = (MessageBlackHole*)END_TSO_QUEUE;
240 // All BLOCKING_QUEUES are linked in a list on owner->bq, so
241 // that we can search through them in the event that there is
242 // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
243 // becomes orphaned (see updateThunk()).
244 bq->link = owner->bq;
246 dirty_TSO(cap, owner); // we modified owner->bq
248 // If the owner of the blackhole is currently runnable, then
249 // bump it to the front of the run queue. This gives the
250 // blocked-on thread a little boost which should help unblock
251 // this thread, and may avoid a pile-up of other threads
252 // becoming blocked on the same BLACKHOLE (#3838).
254 // NB. we check to make sure that the owner is not the same as
255 // the current thread, since in that case it will not be on
257 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
258 removeFromRunQueue(cap, owner);
259 pushOnRunQueue(cap,owner);
262 // point to the BLOCKING_QUEUE from the BLACKHOLE
263 write_barrier(); // make the BQ visible
264 ((StgInd*)bh)->indirectee = (StgClosure *)bq;
265 recordClosureMutated(cap,bh); // bh was mutated
267 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
268 (lnat)msg->tso->id, (lnat)owner->id);
272 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
273 info == &stg_BLOCKING_QUEUE_DIRTY_info)
275 StgBlockingQueue *bq = (StgBlockingQueue *)p;
277 ASSERT(bq->bh == bh);
279 owner = deRefTSO(bq->owner);
281 ASSERT(owner != END_TSO_QUEUE);
284 if (owner->cap != cap) {
285 sendMessage(cap, owner->cap, (Message*)msg);
286 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
291 msg->link = bq->queue;
293 recordClosureMutated(cap,(StgClosure*)msg);
295 if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
296 bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
297 recordClosureMutated(cap,(StgClosure*)bq);
300 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
301 (lnat)msg->tso->id, (lnat)owner->id);
304 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
305 removeFromRunQueue(cap, owner);
306 pushOnRunQueue(cap,owner);
312 return 0; // not blocked