ae5d5d1abc879c5cf5f76859bf3f2cd442ade6ea
[ghc-hetmet.git] / rts / Messages.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2010
4  *
5  * Inter-Capability message passing
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17
18 /* ----------------------------------------------------------------------------
19    Send a message to another Capability
20    ------------------------------------------------------------------------- */
21
22 #ifdef THREADED_RTS
23
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26     ACQUIRE_LOCK(&to_cap->lock);
27
28 #ifdef DEBUG    
29     {
30         const StgInfoTable *i = msg->header.info;
31         if (i != &stg_MSG_WAKEUP_info &&
32             i != &stg_MSG_THROWTO_info &&
33             i != &stg_MSG_BLACKHOLE_info &&
34             i != &stg_MSG_TRY_WAKEUP_info &&
35             i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
36             i != &stg_WHITEHOLE_info) {
37             barf("sendMessage: %p", i);
38         }
39     }
40 #endif
41
42     msg->link = to_cap->inbox;
43     to_cap->inbox = msg;
44
45     recordClosureMutated(from_cap,(StgClosure*)msg);
46
47     if (to_cap->running_task == NULL) {
48         to_cap->running_task = myTask(); 
49             // precond for releaseCapability_()
50         releaseCapability_(to_cap,rtsFalse);
51     } else {
52         contextSwitchCapability(to_cap);
53     }
54
55     RELEASE_LOCK(&to_cap->lock);
56 }
57
58 #endif /* THREADED_RTS */
59
60 /* ----------------------------------------------------------------------------
61    Handle a message
62    ------------------------------------------------------------------------- */
63
64 #ifdef THREADED_RTS
65
66 void
67 executeMessage (Capability *cap, Message *m)
68 {
69     const StgInfoTable *i;
70
71 loop:
72     write_barrier(); // allow m->header to be modified by another thread
73     i = m->header.info;
74     if (i == &stg_MSG_WAKEUP_info)
75     {
76         // the plan is to eventually get rid of these and use
77         // TRY_WAKEUP instead.
78         MessageWakeup *w = (MessageWakeup *)m;
79         StgTSO *tso = w->tso;
80         debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
81                       (lnat)tso->id);
82         ASSERT(tso->cap == cap);
83         ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
84         ASSERT(tso->block_info.closure == (StgClosure *)m);
85         tso->why_blocked = NotBlocked;
86         appendToRunQueue(cap, tso);
87     }
88     else if (i == &stg_MSG_TRY_WAKEUP_info)
89     {
90         StgTSO *tso = ((MessageWakeup *)m)->tso;
91         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", 
92                       (lnat)tso->id);
93         tryWakeupThread(cap, tso);
94     }
95     else if (i == &stg_MSG_THROWTO_info)
96     {
97         MessageThrowTo *t = (MessageThrowTo *)m;
98         nat r;
99         const StgInfoTable *i;
100
101         i = lockClosure((StgClosure*)m);
102         if (i != &stg_MSG_THROWTO_info) {
103             unlockClosure((StgClosure*)m, i);
104             goto loop;
105         }
106
107         debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
108                       (lnat)t->source->id, (lnat)t->target->id);
109
110         ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
111         ASSERT(t->source->block_info.closure == (StgClosure *)m);
112
113         r = throwToMsg(cap, t);
114
115         switch (r) {
116         case THROWTO_SUCCESS:
117             // this message is done
118             unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
119             tryWakeupThread(cap, t->source);
120             break;
121         case THROWTO_BLOCKED:
122             // unlock the message
123             unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
124             break;
125         }
126     }
127     else if (i == &stg_MSG_BLACKHOLE_info)
128     {
129         nat r;
130         MessageBlackHole *b = (MessageBlackHole*)m;
131
132         r = messageBlackHole(cap, b);
133         if (r == 0) {
134             tryWakeupThread(cap, b->tso);
135         }
136         return;
137     }
138     else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
139     {
140         // message was revoked
141         return;
142     }
143     else if (i == &stg_WHITEHOLE_info)
144     {
145         goto loop;
146     }
147     else
148     {
149         barf("executeMessage: %p", i);
150     }
151 }
152
153 #endif
154
155 /* ----------------------------------------------------------------------------
156    Handle a MSG_BLACKHOLE message
157
158    This is called from two places: either we just entered a BLACKHOLE
159    (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
160    cap->inbox.  
161
162    We need to establish whether the BLACKHOLE belongs to
163    this Capability, and 
164      - if so, arrange to block the current thread on it
165      - otherwise, forward the message to the right place
166
167    Returns:
168      - 0 if the blocked thread can be woken up by the caller
169      - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
170        at some point in the future.
171
172    ------------------------------------------------------------------------- */
173
174 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
175 {
176     const StgInfoTable *info;
177     StgClosure *p;
178     StgBlockingQueue *bq;
179     StgClosure *bh = msg->bh;
180     StgTSO *owner;
181
182     debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", 
183                   (lnat)msg->tso->id, msg->bh);
184
185     info = bh->header.info;
186
187     // If we got this message in our inbox, it might be that the
188     // BLACKHOLE has already been updated, and GC has shorted out the
189     // indirection, so the pointer no longer points to a BLACKHOLE at
190     // all.
191     if (info != &stg_BLACKHOLE_info && 
192         info != &stg_CAF_BLACKHOLE_info && 
193         info != &stg_WHITEHOLE_info) {
194         // if it is a WHITEHOLE, then a thread is in the process of
195         // trying to BLACKHOLE it.  But we know that it was once a
196         // BLACKHOLE, so there is at least a valid pointer in the
197         // payload, so we can carry on.
198         return 0;
199     }
200
201     // we know at this point that the closure 
202 loop:
203     p = ((StgInd*)bh)->indirectee;
204     info = p->header.info;
205
206     if (info == &stg_IND_info)
207     {
208         // This could happen, if e.g. we got a BLOCKING_QUEUE that has
209         // just been replaced with an IND by another thread in
210         // updateThunk().  In which case, if we read the indirectee
211         // again we should get the value.
212         goto loop;
213     }
214
215     else if (info == &stg_TSO_info)
216     {
217         owner = deRefTSO((StgTSO *)p);
218
219 #ifdef THREADED_RTS
220         if (owner->cap != cap) {
221             sendMessage(cap, owner->cap, (Message*)msg);
222             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
223             return 1;
224         }
225 #endif
226         // owner is the owner of the BLACKHOLE, and resides on this
227         // Capability.  msg->tso is the first thread to block on this
228         // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
229
230         bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
231             
232         // initialise the BLOCKING_QUEUE object
233         SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
234         bq->bh = bh;
235         bq->queue = msg;
236         bq->owner = owner;
237         
238         msg->link = (MessageBlackHole*)END_TSO_QUEUE;
239         
240         // All BLOCKING_QUEUES are linked in a list on owner->bq, so
241         // that we can search through them in the event that there is
242         // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
243         // becomes orphaned (see updateThunk()).
244         bq->link = owner->bq;
245         owner->bq = bq;
246         dirty_TSO(cap, owner); // we modified owner->bq
247
248         // If the owner of the blackhole is currently runnable, then
249         // bump it to the front of the run queue.  This gives the
250         // blocked-on thread a little boost which should help unblock
251         // this thread, and may avoid a pile-up of other threads
252         // becoming blocked on the same BLACKHOLE (#3838).
253         //
254         // NB. we check to make sure that the owner is not the same as
255         // the current thread, since in that case it will not be on
256         // the run queue.
257         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
258             removeFromRunQueue(cap, owner);
259             pushOnRunQueue(cap,owner);
260         }
261
262         // point to the BLOCKING_QUEUE from the BLACKHOLE
263         write_barrier(); // make the BQ visible
264         ((StgInd*)bh)->indirectee = (StgClosure *)bq;
265         recordClosureMutated(cap,bh); // bh was mutated
266
267         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
268                       (lnat)msg->tso->id, (lnat)owner->id);
269
270         return 1; // blocked
271     }
272     else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || 
273              info == &stg_BLOCKING_QUEUE_DIRTY_info)
274     {
275         StgBlockingQueue *bq = (StgBlockingQueue *)p;
276
277         ASSERT(bq->bh == bh);
278
279         owner = deRefTSO(bq->owner);
280
281         ASSERT(owner != END_TSO_QUEUE);
282
283 #ifdef THREADED_RTS
284         if (owner->cap != cap) {
285             sendMessage(cap, owner->cap, (Message*)msg);
286             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
287             return 1;
288         }
289 #endif
290
291         msg->link = bq->queue;
292         bq->queue = msg;
293         recordClosureMutated(cap,(StgClosure*)msg);
294
295         if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
296             bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
297             recordClosureMutated(cap,(StgClosure*)bq);
298         }
299
300         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
301                       (lnat)msg->tso->id, (lnat)owner->id);
302
303         // See above, #3838
304         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
305             removeFromRunQueue(cap, owner);
306             pushOnRunQueue(cap,owner);
307         }
308
309         return 1; // blocked
310     }
311     
312     return 0; // not blocked
313 }
314