merge upstream HEAD
[ghc-hetmet.git] / rts / Messages.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2010
4  *
5  * Inter-Capability message passing
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17
18 /* ----------------------------------------------------------------------------
19    Send a message to another Capability
20    ------------------------------------------------------------------------- */
21
22 #ifdef THREADED_RTS
23
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26     ACQUIRE_LOCK(&to_cap->lock);
27
28 #ifdef DEBUG    
29     {
30         const StgInfoTable *i = msg->header.info;
31         if (i != &stg_MSG_THROWTO_info &&
32             i != &stg_MSG_BLACKHOLE_info &&
33             i != &stg_MSG_TRY_WAKEUP_info &&
34             i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
35             i != &stg_WHITEHOLE_info) {
36             barf("sendMessage: %p", i);
37         }
38     }
39 #endif
40
41     msg->link = to_cap->inbox;
42     to_cap->inbox = msg;
43
44     recordClosureMutated(from_cap,(StgClosure*)msg);
45
46     if (to_cap->running_task == NULL) {
47         to_cap->running_task = myTask(); 
48             // precond for releaseCapability_()
49         releaseCapability_(to_cap,rtsFalse);
50     } else {
51         contextSwitchCapability(to_cap);
52     }
53
54     RELEASE_LOCK(&to_cap->lock);
55 }
56
57 #endif /* THREADED_RTS */
58
59 /* ----------------------------------------------------------------------------
60    Handle a message
61    ------------------------------------------------------------------------- */
62
63 #ifdef THREADED_RTS
64
65 void
66 executeMessage (Capability *cap, Message *m)
67 {
68     const StgInfoTable *i;
69
70 loop:
71     write_barrier(); // allow m->header to be modified by another thread
72     i = m->header.info;
73     if (i == &stg_MSG_TRY_WAKEUP_info)
74     {
75         StgTSO *tso = ((MessageWakeup *)m)->tso;
76         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", 
77                       (lnat)tso->id);
78         tryWakeupThread(cap, tso);
79     }
80     else if (i == &stg_MSG_THROWTO_info)
81     {
82         MessageThrowTo *t = (MessageThrowTo *)m;
83         nat r;
84         const StgInfoTable *i;
85
86         i = lockClosure((StgClosure*)m);
87         if (i != &stg_MSG_THROWTO_info) {
88             unlockClosure((StgClosure*)m, i);
89             goto loop;
90         }
91
92         debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
93                       (lnat)t->source->id, (lnat)t->target->id);
94
95         ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
96         ASSERT(t->source->block_info.closure == (StgClosure *)m);
97
98         r = throwToMsg(cap, t);
99
100         switch (r) {
101         case THROWTO_SUCCESS: {
102             // this message is done
103             StgTSO *source = t->source;
104             doneWithMsgThrowTo(t);
105             tryWakeupThread(cap, source);
106             break;
107         }
108         case THROWTO_BLOCKED:
109             // unlock the message
110             unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
111             break;
112         }
113     }
114     else if (i == &stg_MSG_BLACKHOLE_info)
115     {
116         nat r;
117         MessageBlackHole *b = (MessageBlackHole*)m;
118
119         r = messageBlackHole(cap, b);
120         if (r == 0) {
121             tryWakeupThread(cap, b->tso);
122         }
123         return;
124     }
125     else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
126     {
127         // message was revoked
128         return;
129     }
130     else if (i == &stg_WHITEHOLE_info)
131     {
132         goto loop;
133     }
134     else
135     {
136         barf("executeMessage: %p", i);
137     }
138 }
139
140 #endif
141
142 /* ----------------------------------------------------------------------------
143    Handle a MSG_BLACKHOLE message
144
145    This is called from two places: either we just entered a BLACKHOLE
146    (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
147    cap->inbox.  
148
149    We need to establish whether the BLACKHOLE belongs to
150    this Capability, and 
151      - if so, arrange to block the current thread on it
152      - otherwise, forward the message to the right place
153
154    Returns:
155      - 0 if the blocked thread can be woken up by the caller
156      - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
157        at some point in the future.
158
159    ------------------------------------------------------------------------- */
160
161 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
162 {
163     const StgInfoTable *info;
164     StgClosure *p;
165     StgBlockingQueue *bq;
166     StgClosure *bh = UNTAG_CLOSURE(msg->bh);
167     StgTSO *owner;
168
169     debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", 
170                   (lnat)msg->tso->id, msg->bh);
171
172     info = bh->header.info;
173
174     // If we got this message in our inbox, it might be that the
175     // BLACKHOLE has already been updated, and GC has shorted out the
176     // indirection, so the pointer no longer points to a BLACKHOLE at
177     // all.
178     if (info != &stg_BLACKHOLE_info && 
179         info != &stg_CAF_BLACKHOLE_info && 
180         info != &__stg_EAGER_BLACKHOLE_info &&
181         info != &stg_WHITEHOLE_info) {
182         // if it is a WHITEHOLE, then a thread is in the process of
183         // trying to BLACKHOLE it.  But we know that it was once a
184         // BLACKHOLE, so there is at least a valid pointer in the
185         // payload, so we can carry on.
186         return 0;
187     }
188
189     // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
190     // or a value.
191 loop:
192     // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
193     // and turns this into an infinite loop.
194     p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
195     info = p->header.info;
196
197     if (info == &stg_IND_info)
198     {
199         // This could happen, if e.g. we got a BLOCKING_QUEUE that has
200         // just been replaced with an IND by another thread in
201         // updateThunk().  In which case, if we read the indirectee
202         // again we should get the value.
203         goto loop;
204     }
205
206     else if (info == &stg_TSO_info)
207     {
208         owner = (StgTSO*)p;
209
210 #ifdef THREADED_RTS
211         if (owner->cap != cap) {
212             sendMessage(cap, owner->cap, (Message*)msg);
213             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
214             return 1;
215         }
216 #endif
217         // owner is the owner of the BLACKHOLE, and resides on this
218         // Capability.  msg->tso is the first thread to block on this
219         // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
220
221         bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
222             
223         // initialise the BLOCKING_QUEUE object
224         SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
225         bq->bh = bh;
226         bq->queue = msg;
227         bq->owner = owner;
228         
229         msg->link = (MessageBlackHole*)END_TSO_QUEUE;
230         
231         // All BLOCKING_QUEUES are linked in a list on owner->bq, so
232         // that we can search through them in the event that there is
233         // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
234         // becomes orphaned (see updateThunk()).
235         bq->link = owner->bq;
236         owner->bq = bq;
237         dirty_TSO(cap, owner); // we modified owner->bq
238
239         // If the owner of the blackhole is currently runnable, then
240         // bump it to the front of the run queue.  This gives the
241         // blocked-on thread a little boost which should help unblock
242         // this thread, and may avoid a pile-up of other threads
243         // becoming blocked on the same BLACKHOLE (#3838).
244         //
245         // NB. we check to make sure that the owner is not the same as
246         // the current thread, since in that case it will not be on
247         // the run queue.
248         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
249             removeFromRunQueue(cap, owner);
250             pushOnRunQueue(cap,owner);
251         }
252
253         // point to the BLOCKING_QUEUE from the BLACKHOLE
254         write_barrier(); // make the BQ visible
255         ((StgInd*)bh)->indirectee = (StgClosure *)bq;
256         recordClosureMutated(cap,bh); // bh was mutated
257
258         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
259                       (lnat)msg->tso->id, (lnat)owner->id);
260
261         return 1; // blocked
262     }
263     else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || 
264              info == &stg_BLOCKING_QUEUE_DIRTY_info)
265     {
266         StgBlockingQueue *bq = (StgBlockingQueue *)p;
267
268         ASSERT(bq->bh == bh);
269
270         owner = bq->owner;
271
272         ASSERT(owner != END_TSO_QUEUE);
273
274 #ifdef THREADED_RTS
275         if (owner->cap != cap) {
276             sendMessage(cap, owner->cap, (Message*)msg);
277             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
278             return 1;
279         }
280 #endif
281
282         msg->link = bq->queue;
283         bq->queue = msg;
284         recordClosureMutated(cap,(StgClosure*)msg);
285
286         if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
287             bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
288             recordClosureMutated(cap,(StgClosure*)bq);
289         }
290
291         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
292                       (lnat)msg->tso->id, (lnat)owner->id);
293
294         // See above, #3838
295         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
296             removeFromRunQueue(cap, owner);
297             pushOnRunQueue(cap,owner);
298         }
299
300         return 1; // blocked
301     }
302     
303     return 0; // not blocked
304 }
305
306 // A shorter version of messageBlackHole(), that just returns the
307 // owner (or NULL if the owner cannot be found, because the blackhole
308 // has been updated in the meantime).
309
310 StgTSO * blackHoleOwner (StgClosure *bh)
311 {
312     const StgInfoTable *info;
313     StgClosure *p;
314
315     info = bh->header.info;
316
317     if (info != &stg_BLACKHOLE_info &&
318         info != &stg_CAF_BLACKHOLE_info && 
319         info != &__stg_EAGER_BLACKHOLE_info &&
320         info != &stg_WHITEHOLE_info) {
321         return NULL;
322     }
323
324     // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
325     // or a value.
326 loop:
327     // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
328     // and turns this into an infinite loop.
329     p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
330     info = p->header.info;
331
332     if (info == &stg_IND_info) goto loop;
333
334     else if (info == &stg_TSO_info)
335     {
336         return (StgTSO*)p;
337     }
338     else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || 
339              info == &stg_BLOCKING_QUEUE_DIRTY_info)
340     {
341         StgBlockingQueue *bq = (StgBlockingQueue *)p;
342         return bq->owner;
343     }
344     
345     return NULL; // not blocked
346 }
347
348