9435536aefa131c0ae864d944f77cfff78458570
[ghc-hetmet.git] / ghc / rts / parallel / HLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
3  * $Id: HLComms.c,v 1.6 2001/08/14 13:40:10 sewardj Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  * Contains the high-level routines (i.e. communication
8  * subsystem independent) used by GUM
9  * 
10  * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
11  * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
12  * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
13  * 
14  * ------------------------------------------------------------------------- */
15
16 #ifdef PAR /* whole file */
17
18 //@node High Level Communications Routines, , ,
19 //@section High Level Communications Routines
20
21 //@menu
22 //* Macros etc::                
23 //* Includes::                  
24 //* GUM Message Sending and Unpacking Functions::  
25 //* Message-Processing Functions::  
26 //* GUM Message Processor::     
27 //* Miscellaneous Functions::   
28 //* Index::                     
29 //@end menu
30
31 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
32 //@subsection Macros etc
33
34 /* Evidently not Posix */
35 /* #include "PosixSource.h" */
36
37 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
38 //@subsection Includes
39
40 #include "Rts.h"
41 #include "RtsUtils.h"
42 #include "RtsFlags.h"
43 #include "Storage.h"   // for recordMutable
44 #include "HLC.h"
45 #include "Parallel.h"
46 #include "GranSimRts.h"
47 #include "ParallelRts.h"
48 #include "Sparks.h"
49 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
50 #if defined(DEBUG)
51 # include "ParallelDebug.h"
52 #endif
53 #include "StgMacros.h" // inlined IS_... fcts
54
55 #ifdef DIST
56 #include "SchedAPI.h" //for createIOThread
57 extern unsigned int context_switch; 
58 #endif /* DIST */
59
60 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
61 //@subsection GUM Message Sending and Unpacking Functions
62
63 /*
64  * GUM Message Sending and Unpacking Functions
65  */
66
67 /*
68  * Allocate space for message processing
69  */
70
71 //@cindex gumPackBuffer
72 static rtsPackBuffer *gumPackBuffer;
73
74 //@cindex initMoreBuffers
75 rtsBool
76 initMoreBuffers(void)
77 {
78   if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize, 
79                                              "initMoreBuffers")) == NULL)
80     return rtsFalse;
81   return rtsTrue;
82 }
83
84 /*
85  * SendFetch packs the two global addresses and a load into a message +
86  * sends it.  
87
88 //@cindex FETCH
89
90    Structure of a FETCH message:
91
92          |    GA 1     |        GA 2          |
93          +------------------------------------+------+
94          | gtid | slot | weight | gtid | slot | load |
95          +------------------------------------+------+
96  */
97
98 //@cindex sendFetch
99 void
100 sendFetch(globalAddr *rga, globalAddr *lga, int load)
101 {
102   ASSERT(rga->weight > 0 && lga->weight > 0);
103   IF_PAR_DEBUG(fetch,
104                belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
105                      rga->payload.gc.gtid, rga->payload.gc.slot, 
106                      lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
107                      load));
108
109
110   /* ToDo: Dump event
111   DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid), 
112                    GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
113                    0, spark_queue_len(ADVISORY_POOL));
114   */
115
116   sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
117           (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot, 
118           (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid, 
119           (StgWord) lga->payload.gc.slot, (StgWord) load);
120 }
121
122 /*
123  * unpackFetch unpacks a FETCH message into two Global addresses and a load
124  * figure.  
125 */
126
127 //@cindex unpackFetch
128 static void
129 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
130 {
131   long buf[6];
132
133   GetArgs(buf, 6); 
134
135   IF_PAR_DEBUG(fetch,
136                belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
137                      (GlobalTaskId) buf[0], (int) buf[1], 
138                      (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
139
140   lga->weight = 1;
141   lga->payload.gc.gtid = (GlobalTaskId) buf[0];
142   lga->payload.gc.slot = (int) buf[1];
143
144   rga->weight = (unsigned) buf[2];
145   rga->payload.gc.gtid = (GlobalTaskId) buf[3];
146   rga->payload.gc.slot = (int) buf[4];
147
148   *load = (int) buf[5];
149
150   ASSERT(rga->weight > 0);
151 }
152
153 /*
154  * SendResume packs the remote blocking queue's GA and data into a message 
155  * and sends it.
156
157 //@cindex RESUME
158
159    Structure of a RESUME message:
160
161       -------------------------------
162       | weight | slot | n | data ...
163       -------------------------------
164
165    data is a packed graph represented as an rtsPackBuffer
166    n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
167  */
168
169 //@cindex sendResume
170 void
171 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
172 {
173   IF_PAR_DEBUG(fetch,
174                belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]", 
175                      packBuffer->id, nelem,
176                      rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
177                      rga->payload.gc.gtid));
178   IF_PAR_DEBUG(packet,
179                PrintPacket(packBuffer));
180
181   ASSERT(nelem==packBuffer->size);
182   /* check for magic end-of-buffer word */
183   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
184
185   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
186            nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer, 
187            2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
188 }
189
190 /*
191  * unpackResume unpacks a Resume message into two Global addresses and
192  * a data array.
193  */
194
195 //@cindex unpackResume
196 static void
197 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
198 {
199     long buf[3];
200
201     GetArgs(buf, 3); 
202
203     /*
204       RESUME event is written in awaken_blocked_queue
205     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
206                      GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
207     */
208
209     lga->weight = (unsigned) buf[0];
210     lga->payload.gc.gtid = mytid;
211     lga->payload.gc.slot = (int) buf[1];
212
213     *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
214     GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
215
216     IF_PAR_DEBUG(fetch,
217                  belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))", 
218                        packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
219
220     /* check for magic end-of-buffer word */
221     IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
222 }
223
224 /*
225  * SendAck packs the global address being acknowledged, together with
226  * an array of global addresses for any closures shipped and sends them.
227
228 //@cindex ACK
229
230    Structure of an ACK message:
231
232       |        GA 1          |        GA 2          | 
233       +---------------------------------------------+-------
234       | weight | gtid | slot | weight | gtid | slot |  .....  ngas times
235       + --------------------------------------------+------- 
236
237  */
238
239 //@cindex sendAck
240 void
241 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
242 {
243   static long *buffer;
244   long *p;
245   int i;
246
247   if(ngas==0)
248     return; //don't send unnecessary messages!!
249   
250   buffer = (long *) gumPackBuffer;
251
252   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
253     ASSERT(gagamap[1].weight > 0);
254     p[0] = (long) gagamap->weight;
255     p[1] = (long) gagamap->payload.gc.gtid;
256     p[2] = (long) gagamap->payload.gc.slot;
257     gagamap++;
258     p[3] = (long) gagamap->weight;
259     p[4] = (long) gagamap->payload.gc.gtid;
260     p[5] = (long) gagamap->payload.gc.slot;
261     gagamap++;
262   }
263   IF_PAR_DEBUG(schedule,
264                belch("~^,, Sending Ack (%d pairs) to [%x]\n", 
265                      ngas, task));
266
267   sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
268 }
269
270 /*
271  * unpackAck unpacks an Acknowledgement message into a Global address,
272  * a count of the number of global addresses following and a map of 
273  * Global addresses
274  */
275
276 //@cindex unpackAck
277 static void
278 unpackAck(int *ngas, globalAddr *gagamap)
279 {
280   long GAarraysize;
281   long buf[6];
282   
283   GetArgs(&GAarraysize, 1);
284   
285   *ngas = GAarraysize / 6;
286   
287   IF_PAR_DEBUG(schedule,
288                belch("~^,, Unpacking Ack (%d pairs) on [%x]\n", 
289                      *ngas, mytid));
290
291   while (GAarraysize > 0) {
292     GetArgs(buf, 6);
293     gagamap->weight = (rtsWeight) buf[0];
294     gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
295     gagamap->payload.gc.slot = (int) buf[2];
296     gagamap++;
297     gagamap->weight = (rtsWeight) buf[3];
298     gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
299     gagamap->payload.gc.slot = (int) buf[5];
300     ASSERT(gagamap->weight > 0);
301     gagamap++;
302     GAarraysize -= 6;
303   }
304 }
305
306 /*
307  * SendFish packs the global address being acknowledged, together with
308  * an array of global addresses for any closures shipped and sends them.
309
310 //@cindex FISH
311
312  Structure of a FISH message:
313
314      +----------------------------------+
315      | orig PE | age | history | hunger |
316      +----------------------------------+
317  */
318
319 //@cindex sendFish
320 void
321 sendFish(GlobalTaskId destPE, GlobalTaskId origPE, 
322          int age, int history, int hunger)
323 {
324   IF_PAR_DEBUG(fish,
325                belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)", 
326                      destPE, outstandingFishes));
327
328   sendOpV(PP_FISH, destPE, 4, 
329           (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
330
331   if (origPE == mytid) {
332     //fishing = rtsTrue;
333     outstandingFishes++;
334   }
335 }
336
337 /*
338  * unpackFish unpacks a FISH message into the global task id of the
339  * originating PE and 3 data fields: the age, history and hunger of the
340  * fish. The history + hunger are not currently used.
341
342  */
343
344 //@cindex unpackFish
345 static void
346 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
347 {
348   long buf[4];
349   
350   GetArgs(buf, 4);
351   
352   IF_PAR_DEBUG(fish,
353                belch("~^$$ Unpacking Fish from [%x] (age=%d)", 
354                      (GlobalTaskId) buf[0], (int) buf[1]));
355
356   *origPE = (GlobalTaskId) buf[0];
357   *age = (int) buf[1];
358   *history = (int) buf[2];
359   *hunger = (int) buf[3];
360 }
361
362 /*
363  * SendFree sends (weight, slot) pairs for GAs that we no longer need
364  * references to.  
365
366 //@cindex FREE
367
368    Structure of a FREE message:
369    
370        +-----------------------------
371        | n | weight_1 | slot_1 | ...
372        +-----------------------------
373  */
374 //@cindex sendFree
375 void
376 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
377 {
378     IF_PAR_DEBUG(free,
379                  belch("~^!! Sending Free (%d GAs) to [%x]", 
380                        nelem/2, pe));
381
382     sendOpN(PP_FREE, pe, nelem, data);
383 }
384
385 /*
386  * unpackFree unpacks a FREE message into the amount of data shipped and
387  * a data block.
388  */
389 //@cindex unpackFree
390 static void
391 unpackFree(int *nelem, StgWord *data)
392 {
393   long buf[1];
394   
395   GetArgs(buf, 1);
396   *nelem = (int) buf[0];
397
398   IF_PAR_DEBUG(free,
399                belch("~^!! Unpacking Free (%d GAs)", 
400                      *nelem/2));
401
402   GetArgs(data, *nelem);
403 }
404
405 /*
406  * SendSchedule sends a closure to be evaluated in response to a Fish
407  * message. The message is directed to the PE that originated the Fish
408  * (origPE), and includes the packed closure (data) along with its size
409  * (nelem).
410
411 //@cindex SCHEDULE
412
413    Structure of a SCHEDULE message:
414
415        +------------------------------------
416        | PE | n | pack buffer of a graph ...
417        +------------------------------------
418  */
419 //@cindex sendSchedule
420 void
421 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
422 {
423   IF_PAR_DEBUG(schedule,
424                belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n", 
425                      packBuffer->id, nelem, origPE));
426   IF_PAR_DEBUG(packet,
427                PrintPacket(packBuffer));
428
429   ASSERT(nelem==packBuffer->size);
430   /* check for magic end-of-buffer word */
431   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
432
433   sendOpN(PP_SCHEDULE, origPE, 
434           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
435 }
436
437 /*
438  * unpackSchedule unpacks a SCHEDULE message into the Global address of
439  * the closure shipped, the amount of data shipped (nelem) and the data
440  * block (data).
441  */
442
443 //@cindex unpackSchedule
444 static void
445 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
446 {
447   long buf[1];
448
449   /* first, just unpack 1 word containing the total size (including header) */
450   GetArgs(buf, 1);
451   /* no. of elems, not counting the header of the pack buffer */
452   *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
453
454   /* automatic cast of flat pvm-data to rtsPackBuffer */
455   GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
456
457   IF_PAR_DEBUG(schedule,
458                belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n", 
459                      packBuffer->id, *nelem, mytid));
460
461   ASSERT(*nelem==packBuffer->size);
462   /* check for magic end-of-buffer word */
463   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
464 }
465
466 #ifdef DIST
467 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
468 void
469 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
470 {  
471   IF_PAR_DEBUG(schedule,
472                belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n", 
473                      packBuffer->id, nelem, origPE));
474   IF_PAR_DEBUG(packet,
475                PrintPacket(packBuffer));
476
477   ASSERT(nelem==packBuffer->size);
478   /* check for magic end-of-buffer word */
479   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
480
481   sendOpN(PP_REVAL, origPE, 
482           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
483 }
484
485 void FinishReval(StgTSO *t)
486 { StgClosure *res;
487   globalAddr ga;
488   nat size;
489   rtsPackBuffer *buffer=NULL;
490   
491   ga.payload.gc.slot = t->revalSlot;
492   ga.payload.gc.gtid = t->revalTid;
493   ga.weight = 0; 
494   
495   //find where the reval result is
496   res = GALAlookup(&ga);
497   ASSERT(res);
498   
499   IF_PAR_DEBUG(schedule,
500     printGA(&ga);
501     belch(" needs the result %08x\n",res));       
502   
503   //send off the result
504   buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
505   ASSERT(buffer != (rtsPackBuffer *)NULL);
506   sendResume(&ga, size, buffer);
507
508   IF_PAR_DEBUG(schedule,
509     belch("@;~) Reval Finished"));
510 }
511
512 #endif /* DIST */
513
514 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
515 //@subsection Message-Processing Functions
516
517 /*
518  * Message-Processing Functions
519  *
520  * The following routines process incoming GUM messages. Often reissuing
521  * messages in response.
522  *
523  * processFish unpacks a fish message, reissuing it if it's our own,
524  * sending work if we have it or sending it onwards otherwise.
525  */
526
527 /*
528  * processFetches constructs and sends resume messages for every
529  * BlockedFetch which is ready to be awakened.
530  * awaken_blocked_queue (in Schedule.c) is responsible for moving 
531  * BlockedFetches from a blocking queue to the PendingFetches queue.
532  */
533 void GetRoots(void);
534 extern StgBlockedFetch *PendingFetches;
535
536 nat
537 pending_fetches_len(void)
538 {
539   StgBlockedFetch *bf;
540   nat n;
541
542   for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
543     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
544   }
545   return n;
546 }
547
548 //@cindex processFetches
549 void
550 processFetches(void) {
551   StgBlockedFetch *bf, *next;
552   StgClosure *closure;
553   StgInfoTable *ip;
554   globalAddr rga;
555   static rtsPackBuffer *packBuffer;
556     
557   IF_PAR_DEBUG(verbose,
558                belch("____ processFetches: %d pending fetches (root @ %p)",
559                      pending_fetches_len(), PendingFetches));
560   
561   for (bf = PendingFetches; 
562        bf != END_BF_QUEUE;
563        bf=next) {
564     /* the PendingFetches list contains only BLOCKED_FETCH closures */
565     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
566     /* store link (we might overwrite it via blockFetch later on */
567     next = (StgBlockedFetch *)(bf->link);
568
569     /*
570      * Find the target at the end of the indirection chain, and
571      * process it in much the same fashion as the original target
572      * of the fetch.  Though we hope to find graph here, we could
573      * find a black hole (of any flavor) or even a FetchMe.
574      */
575     closure = bf->node;
576     /*
577       We evacuate BQs and update the node fields where necessary in GC.c
578       So, if we find an EVACUATED closure, something has gone Very Wrong
579       (and therefore we let the RTS crash most ungracefully).
580     */
581     ASSERT(get_itbl(closure)->type != EVACUATED);
582       //  closure = ((StgEvacuated *)closure)->evacuee;
583
584     closure = UNWIND_IND(closure);
585     //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
586
587     ip = get_itbl(closure);
588     if (ip->type == FETCH_ME) {
589       /* Forward the Fetch to someone else */
590       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
591       rga.payload.gc.slot = bf->ga.payload.gc.slot;
592       rga.weight = bf->ga.weight;
593       
594       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
595
596       // Global statistics: count no. of fetches
597       if (RtsFlags.ParFlags.ParStats.Global &&
598           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
599         globalParStats.tot_fetch_mess++;
600       }
601
602       IF_PAR_DEBUG(fetch,
603                    belch("__-> processFetches: Forwarding fetch from %lx to %lx",
604                          mytid, rga.payload.gc.gtid));
605
606     } else if (IS_BLACK_HOLE(closure)) {
607       IF_PAR_DEBUG(verbose,
608                    belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
609                          closure, info_type(closure)));
610       bf->node = closure;
611       blockFetch(bf, closure);
612     } else {
613       /* We now have some local graph to send back */
614       nat size;
615
616       packBuffer = gumPackBuffer;
617       IF_PAR_DEBUG(verbose,
618                    belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
619                          closure, info_type(closure)));
620
621       if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
622         // Put current BF back on list
623         bf->link = (StgBlockingQueueElement *)PendingFetches;
624         PendingFetches = (StgBlockedFetch *)bf;
625         // ToDo: check that nothing more has to be done to prepare for GC!
626         barf("processFetches: out of heap while packing graph; ToDo: call GC here");
627         GarbageCollect(GetRoots, rtsFalse); 
628         bf = PendingFetches;
629         PendingFetches = (StgBlockedFetch *)(bf->link);
630         closure = bf->node;
631         packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
632         ASSERT(packBuffer != (rtsPackBuffer *)NULL);
633       }
634       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
635       rga.payload.gc.slot = bf->ga.payload.gc.slot;
636       rga.weight = bf->ga.weight;
637       
638       sendResume(&rga, size, packBuffer);
639
640       // Global statistics: count no. of fetches
641       if (RtsFlags.ParFlags.ParStats.Global &&
642           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
643         globalParStats.tot_resume_mess++;
644       }
645     }
646   }
647   PendingFetches = END_BF_QUEUE;
648 }
649
650 #if 0
651 /*
652   Alternatively to sending fetch messages directly from the FETCH_ME_entry
653   code we could just store the data about the remote data in a global
654   variable and send the fetch request from the main scheduling loop (similar
655   to processFetches above). This would save an expensive STGCALL in the entry 
656   code because we have to go back to the scheduler anyway.
657 */
658 //@cindex processFetches
659 void
660 processTheRealFetches(void) {
661   StgBlockedFetch *bf;
662   StgClosure *closure, *next;
663     
664   IF_PAR_DEBUG(verbose,
665                belch("__ processTheRealFetches: ");
666                printGA(&theGlobalFromGA);
667                printGA(&theGlobalToGA));
668
669   ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
670          theGlobalToGA.payload.gc.gtid != 0);
671
672   /* the old version did this in the FETCH_ME entry code */
673   sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
674   
675 }
676 #endif
677
678
679 /* 
680    Way of dealing with unwanted fish.
681    Used during startup/shutdown, or from unknown PEs 
682 */
683 void
684 bounceFish(void) { 
685   GlobalTaskId origPE;
686   int age, history, hunger;
687   
688   /* IF_PAR_DEBUG(verbose, */
689                belch(".... [%x] Bouncing unwanted FISH",mytid);
690
691   unpackFish(&origPE, &age, &history, &hunger);
692           
693   if (origPE == mytid) {
694     //fishing = rtsFalse;                   // fish has come home
695     outstandingFishes--;
696     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
697     return;                               // that's all
698   }
699
700   /* otherwise, send it home to die */
701   sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
702   // Global statistics: count no. of fetches
703       if (RtsFlags.ParFlags.ParStats.Global &&
704           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
705         globalParStats.tot_fish_mess++;
706       }
707 }
708    
709 /*
710  * processFish unpacks a fish message, reissuing it if it's our own,
711  * sending work if we have it or sending it onwards otherwise.
712  */
713 //@cindex processFish
714 static void
715 processFish(void)
716 {
717   GlobalTaskId origPE;
718   int age, history, hunger;
719   rtsSpark spark;
720   static rtsPackBuffer *packBuffer; 
721
722   unpackFish(&origPE, &age, &history, &hunger);
723
724   if (origPE == mytid) {
725     //fishing = rtsFalse;                   // fish has come home
726     outstandingFishes--;
727     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
728     return;                               // that's all
729   }
730
731   ASSERT(origPE != mytid);
732   IF_PAR_DEBUG(fish,
733                belch("$$__ processing fish; %d sparks available",
734                      spark_queue_len(&(MainRegTable.rSparks))));
735   while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
736     nat size;
737     // StgClosure *graph;
738
739     packBuffer = gumPackBuffer; 
740     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
741     if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
742       IF_PAR_DEBUG(fish,
743                    belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
744                          (StgClosure *)spark));
745       barf("processFish: out of heap while packing graph; ToDo: call GC here");
746       GarbageCollect(GetRoots, rtsFalse);
747       /* Now go back and try again */
748     } else {
749       IF_PAR_DEBUG(verbose,
750                    if (RtsFlags.ParFlags.ParStats.Sparks)
751                      belch("==== STEALING spark %x; sending to %x", spark, origPE));
752       
753       IF_PAR_DEBUG(fish,
754                    belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
755                          origPE, 
756                          (StgClosure *)spark, info_type((StgClosure *)spark)));
757       sendSchedule(origPE, size, packBuffer);
758       disposeSpark(spark);
759       // Global statistics: count no. of fetches
760       if (RtsFlags.ParFlags.ParStats.Global &&
761           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
762         globalParStats.tot_schedule_mess++;
763       }
764
765       break;
766     }
767   }
768   if (spark == (rtsSpark)NULL) {
769     IF_PAR_DEBUG(fish,
770                  belch("$$^^ No sparks available for FISH from %x",
771                        origPE));
772     /* We have no sparks to give */
773     if (age < FISH_LIFE_EXPECTANCY) {
774       /* and the fish is atill young, send it to another PE to look for work */
775       sendFish(choosePE(), origPE,
776                (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
777
778       // Global statistics: count no. of fetches
779       if (RtsFlags.ParFlags.ParStats.Global &&
780           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
781         globalParStats.tot_fish_mess++;
782       }
783     } else { /* otherwise, send it home to die */
784       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
785       // Global statistics: count no. of fetches
786       if (RtsFlags.ParFlags.ParStats.Global &&
787           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
788         globalParStats.tot_fish_mess++;
789       }
790     }
791   }
792 }  /* processFish */
793
794 /*
795  * processFetch either returns the requested data (if available) 
796  * or blocks the remote blocking queue on a black hole (if not).
797  */
798
799 //@cindex processFetch
800 static void
801 processFetch(void)
802 {
803   globalAddr ga, rga;
804   int load;
805   StgClosure *closure;
806   StgInfoTable *ip;
807
808   unpackFetch(&ga, &rga, &load);
809   IF_PAR_DEBUG(fetch,
810                belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
811                      ga.payload.gc.gtid, ga.payload.gc.slot,
812                      rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
813                      rga.payload.gc.gtid));
814
815   closure = GALAlookup(&ga);
816   ASSERT(closure != (StgClosure *)NULL);
817   ip = get_itbl(closure);
818   if (ip->type == FETCH_ME) {
819     /* Forward the Fetch to someone else */
820     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
821
822     // Global statistics: count no. of fetches
823     if (RtsFlags.ParFlags.ParStats.Global &&
824         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
825       globalParStats.tot_fetch_mess++;
826     }
827   } else if (rga.payload.gc.gtid == mytid) {
828     /* Our own FETCH forwarded back around to us */
829     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
830     
831     IF_PAR_DEBUG(fetch,
832                  belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
833                        closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
834     /* We may have already discovered that the fetch target is our own. */
835     if ((StgClosure *)fmbq != closure) 
836       CommonUp((StgClosure *)fmbq, closure);
837     (void) addWeight(&rga);
838   } else if (IS_BLACK_HOLE(closure)) {
839     /* This includes RBH's and FMBQ's */
840     StgBlockedFetch *bf;
841
842     /* Can we assert something on the remote GA? */
843     ASSERT(GALAlookup(&rga) == NULL);
844
845     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
846        closure into the BQ in order to denote that when updating this node
847        the result should be sent to the originator of this fetch message. */
848     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
849     IF_PAR_DEBUG(fetch,
850                  belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
851                        rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
852                        closure, info_type(closure)));
853     blockFetch(bf, closure);
854   } else {                      
855     /* The target of the FetchMe is some local graph */
856     nat size;
857     // StgClosure *graph;
858     rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
859
860     if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
861       barf("processFetch: out of heap while packing graph; ToDo: call GC here");
862       GarbageCollect(GetRoots, rtsFalse); 
863       closure = GALAlookup(&ga);
864       buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
865       ASSERT(buffer != (rtsPackBuffer *)NULL);
866     }
867     sendResume(&rga, size, buffer);
868
869     // Global statistics: count no. of fetches
870     if (RtsFlags.ParFlags.ParStats.Global &&
871         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
872       globalParStats.tot_resume_mess++;
873     }
874   }
875 }
876
877 /* 
878    The list of pending fetches must be a root-list for GC.
879    This routine is called from GC.c (same as marking GAs etc).
880 */
881 void
882 markPendingFetches(rtsBool major_gc) {
883
884   /* No need to traverse the list; this is done via the scavenge code
885      for a BLOCKED_FETCH closure, which evacuates the link field */
886
887   if (PendingFetches != END_BF_QUEUE ) {
888     IF_PAR_DEBUG(tables,
889                  fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
890                          PendingFetches));
891
892     PendingFetches = MarkRoot((StgClosure*)PendingFetches);
893
894     IF_PAR_DEBUG(verbose,
895                  fprintf(stderr, " %p\n", PendingFetches));
896
897   } else {
898     IF_PAR_DEBUG(tables,
899                  fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
900   }
901 }
902
903 /*
904  * processFree unpacks a FREE message and adds the weights to our GAs.
905  */
906 //@cindex processFree
907 static void
908 processFree(void)
909 {
910   int nelem;
911   static StgWord *buffer;
912   int i;
913   globalAddr ga;
914
915   buffer = (StgWord *)gumPackBuffer;
916   unpackFree(&nelem, buffer);
917   IF_PAR_DEBUG(free,
918                belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
919
920   ga.payload.gc.gtid = mytid;
921   for (i = 0; i < nelem;) {
922     ga.weight = (rtsWeight) buffer[i++];
923     ga.payload.gc.slot = (int) buffer[i++];
924     IF_PAR_DEBUG(free,
925                  fprintf(stderr, "!!-- Processing free "); 
926                  printGA(&ga);
927                  fputc('\n', stderr);
928                  );
929     (void) addWeight(&ga);
930   }
931 }
932
933 /*
934  * processResume unpacks a RESUME message into the graph, filling in
935  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
936  * FetchMe (now a blocking queue) are awakened, and the blocking queue
937  * is converted into an indirection.  Finally it sends an ACK in response
938  * which contains any newly allocated GAs.
939  */
940
941 //@cindex processResume
942 static void
943 processResume(GlobalTaskId sender)
944 {
945   int nelem;
946   nat nGAs;
947   static rtsPackBuffer *packBuffer;
948   StgClosure *newGraph, *old;
949   globalAddr lga;
950   globalAddr *gagamap;
951   
952   packBuffer = (rtsPackBuffer *)gumPackBuffer;
953   unpackResume(&lga, &nelem, packBuffer);
954
955   IF_PAR_DEBUG(fetch,
956                fprintf(stderr, "[]__ Rcvd Resume for "); 
957                printGA(&lga);
958                fputc('\n', stderr));
959   IF_PAR_DEBUG(packet,
960                PrintPacket((rtsPackBuffer *)packBuffer));
961   
962   /* 
963    * We always unpack the incoming graph, even if we've received the
964    * requested node in some other data packet (and already awakened
965    * the blocking queue).
966   if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
967     ReallyPerformThreadGC(packBuffer[0], rtsFalse);
968     SAVE_Hp -= packBuffer[0];
969   }
970    */
971
972   // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
973
974   /* Do this *after* GC; we don't want to release the object early! */
975
976   if (lga.weight > 0)
977     (void) addWeight(&lga);
978
979   old = GALAlookup(&lga);
980
981   /* ToDo:  The closure that requested this graph must be one of these two?*/
982   ASSERT(get_itbl(old)->type == FETCH_ME_BQ || 
983          get_itbl(old)->type == RBH);
984
985   if (RtsFlags.ParFlags.ParStats.Full) {
986     StgBlockingQueueElement *bqe, *last_bqe;
987
988     IF_PAR_DEBUG(fetch,
989                  belch("[]-- Resume is REPLY to closure %lx", old));
990
991     /* Write REPLY events to the log file, indicating that the remote
992        data has arrived 
993        NB: we emit a REPLY only for the *last* elem in the queue; this is
994            the one that triggered the fetch message; all other entries
995            have just added themselves to the queue, waiting for the data 
996            they know that has been requested (see entry code for FETCH_ME_BQ)
997     */
998     if ((get_itbl(old)->type == FETCH_ME_BQ ||
999          get_itbl(old)->type == RBH)) {
1000       for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1001            last_bqe = END_BQ_QUEUE;
1002              get_itbl(bqe)->type==TSO || 
1003              get_itbl(bqe)->type==BLOCKED_FETCH;
1004            last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1005
1006       ASSERT(last_bqe==END_BQ_QUEUE || 
1007              get_itbl((StgClosure *)last_bqe)->type == TSO);
1008
1009       /* last_bqe now points to the TSO that triggered the FETCH */ 
1010       if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1011         DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
1012                          GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1013                          0, spark_queue_len(&(MainRegTable.rSparks)));
1014     }
1015   }
1016
1017   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1018   ASSERT(newGraph != NULL);
1019
1020   /* 
1021    * Sometimes, unpacking will common up the resumee with the
1022    * incoming graph, but if it hasn't, we'd better do so now.
1023    */
1024    
1025   if (get_itbl(old)->type == FETCH_ME_BQ)
1026     CommonUp(old, newGraph);
1027
1028   IF_PAR_DEBUG(fetch,
1029                belch("[]-- Ready to resume unpacked graph at %p (%s)",
1030                      newGraph, info_type(newGraph)));
1031
1032   IF_PAR_DEBUG(tables,
1033                DebugPrintGAGAMap(gagamap, nGAs));
1034   
1035   sendAck(sender, nGAs, gagamap);
1036 }
1037
1038 /*
1039  * processSchedule unpacks a SCHEDULE message into the graph, filling
1040  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1041  * the local spark queue.  Finally it sends an ACK in response
1042  * which contains any newly allocated GAs.
1043  */
1044 //@cindex processSchedule
1045 static void
1046 processSchedule(GlobalTaskId sender)
1047 {
1048   nat nelem, nGAs;
1049   rtsBool success;
1050   static rtsPackBuffer *packBuffer;
1051   StgClosure *newGraph;
1052   globalAddr *gagamap;
1053   
1054   packBuffer = gumPackBuffer;           /* HWL */
1055   unpackSchedule(&nelem, packBuffer);
1056
1057   IF_PAR_DEBUG(schedule,
1058                belch("--__ Rcvd Schedule (%d elems)", nelem));
1059   IF_PAR_DEBUG(packet,
1060                PrintPacket(packBuffer));
1061
1062   /*
1063    * For now, the graph is a closure to be sparked as an advisory
1064    * spark, but in future it may be a complete spark with
1065    * required/advisory status, priority etc.
1066    */
1067
1068   /*
1069   space_required = packBuffer[0];
1070   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1071     ReallyPerformThreadGC(space_required, rtsFalse);
1072     SAVE_Hp -= space_required;
1073   }
1074   */
1075   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1076   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1077   ASSERT(newGraph != NULL);
1078   success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1079
1080   if (RtsFlags.ParFlags.ParStats.Full && 
1081       RtsFlags.ParFlags.ParStats.Sparks && 
1082       success) 
1083     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1084                      GR_STOLEN, ((StgTSO *)NULL), newGraph, 
1085                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1086
1087   IF_PAR_DEBUG(schedule,
1088                if (success)
1089                  belch("--^^  added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)", 
1090                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1091                else
1092                  belch("--^^  received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]", 
1093                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1094   IF_PAR_DEBUG(packet,
1095                belch("*<    Unpacked graph with root at %p (%s):", 
1096                      newGraph, info_type(newGraph));
1097                PrintGraph(newGraph, 0));
1098
1099   IF_PAR_DEBUG(tables,
1100                DebugPrintGAGAMap(gagamap, nGAs));
1101
1102   sendAck(sender, nGAs, gagamap);
1103
1104   //fishing = rtsFalse;
1105   ASSERT(outstandingFishes>0);
1106   outstandingFishes--;
1107 }
1108
1109 /*
1110  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1111  * (which represent shared thunks that have been shipped) into fetch-mes
1112  * to remote GAs.
1113  */
1114 //@cindex processAck
1115 static void
1116 processAck(void)
1117 {
1118   nat nGAs;
1119   globalAddr *gaga;
1120   globalAddr gagamap[256]; // ToDo: elim magic constant!!   MAX_GAS * 2];??
1121
1122   unpackAck(&nGAs, gagamap);
1123
1124   IF_PAR_DEBUG(tables,
1125                belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1126                DebugPrintGAGAMap(gagamap, nGAs));
1127
1128   IF_DEBUG(sanity,
1129            checkGAGAMap(gagamap, nGAs));
1130
1131   /*
1132    * For each (oldGA, newGA) pair, set the GA of the corresponding
1133    * thunk to the newGA, convert the thunk to a FetchMe, and return
1134    * the weight from the oldGA.
1135    */
1136   for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1137     StgClosure *old_closure = GALAlookup(gaga);
1138     StgClosure *new_closure = GALAlookup(gaga + 1);
1139
1140     ASSERT(old_closure != NULL);
1141     if (new_closure == NULL) {
1142       /* We don't have this closure, so we make a fetchme for it */
1143       globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1144       
1145       /* convertToFetchMe should be done unconditionally here.
1146          Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1147          so we have to check whether it is an RBH before converting
1148
1149          ASSERT(get_itbl(old_closure)==RBH);
1150       */
1151       if (get_itbl(old_closure)->type==RBH)
1152         convertToFetchMe((StgRBH *)old_closure, ga);
1153     } else {
1154       /* 
1155        * Oops...we've got this one already; update the RBH to
1156        * point to the object we already know about, whatever it
1157        * happens to be.
1158        */
1159       CommonUp(old_closure, new_closure);
1160       
1161       /* 
1162        * Increase the weight of the object by the amount just
1163        * received in the second part of the ACK pair.
1164        */
1165       (void) addWeight(gaga + 1);
1166     }
1167     (void) addWeight(gaga);
1168   }
1169
1170   /* check the sanity of the LAGA and GALA tables after mincing them */
1171   IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1172 }
1173
1174 #ifdef DIST
1175
1176 void
1177 bounceReval(void) {  
1178   barf("Task %x: TODO: should send NACK in response to REVAL",mytid);     
1179 }
1180
1181 static void
1182 processReval(GlobalTaskId sender) //similar to schedule...
1183 { nat nelem, space_required, nGAs;
1184   static rtsPackBuffer *packBuffer;
1185   StgClosure *newGraph;
1186   globalAddr *gagamap;
1187   StgTSO*     tso;
1188   globalAddr *ga;
1189   
1190   packBuffer = gumPackBuffer;           /* HWL */
1191   unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1192
1193   IF_PAR_DEBUG(packet,
1194                belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1195                PrintPacket(packBuffer));
1196
1197   /*
1198   space_required = packBuffer[0];
1199   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1200     ReallyPerformThreadGC(space_required, rtsFalse);
1201     SAVE_Hp -= space_required;
1202   }
1203   */
1204   
1205   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1206   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1207   ASSERT(newGraph != NULL);
1208   
1209   IF_PAR_DEBUG(packet,
1210                belch("@;~)  Unpacked graph with root at %p (%s):", 
1211                      newGraph, info_type(newGraph));
1212                PrintGraph(newGraph, 0));
1213
1214   IF_PAR_DEBUG(tables,
1215                DebugPrintGAGAMap(gagamap, nGAs));
1216
1217   IF_PAR_DEBUG(tables, 
1218     printLAGAtable();   
1219     DebugPrintGAGAMap(gagamap, nGAs));   
1220
1221   //We don't send an Ack to the head!!!!
1222   ASSERT(nGAs>0);  
1223   sendAck(sender, nGAs-1, gagamap+2);
1224   
1225   IF_PAR_DEBUG(verbose,
1226                belch("@;~)  About to create Reval thread on behalf of %x", 
1227                      sender));
1228   
1229   tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1230   tso->priority=RevalPriority;
1231   tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1232   tso->revalTid =gagamap->payload.gc.gtid;
1233   scheduleThread(tso);
1234   context_switch = 1; // switch at the earliest opportunity
1235
1236 #endif
1237
1238
1239 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1240 //@subsection GUM Message Processor
1241
1242 /*
1243  * GUM Message Processor
1244
1245  * processMessages processes any messages that have arrived, calling
1246  * appropriate routines depending on the message tag
1247  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1248  * present and performs a blocking receive! During profiling it
1249  * busy-waits in order to record idle time.
1250  */
1251
1252 //@cindex processMessages
1253 rtsBool
1254 processMessages(void)
1255 {
1256   rtsPacket packet;
1257   OpCode opcode;
1258   GlobalTaskId task;
1259   rtsBool receivedFinish = rtsFalse;
1260
1261   do {
1262     packet = GetPacket();  /* Get next message; block until one available */
1263     getOpcodeAndSender(packet, &opcode, &task);
1264
1265     if (task==SysManTask) { 
1266       switch (opcode) { 
1267       case PP_PETIDS:
1268         processPEtids();
1269         break;
1270           
1271       case PP_FINISH:
1272         IF_PAR_DEBUG(verbose,
1273                      belch("==== received FINISH [%p]", mytid));
1274         /* this boolean value is returned and propagated to the main 
1275            scheduling loop, thus shutting-down this PE */
1276         receivedFinish = rtsTrue;
1277         break;  
1278           
1279       default:  
1280         barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1281       }
1282     } else if (taskIDtoPE(task)==0) { 
1283       /* When a new PE joins then potentially FISH & REVAL message may
1284          reach PES before they are notified of the new PEs existance.  The
1285          only solution is to bounce/fail these messages back to the sender.
1286          But we will worry about it once we start seeing these race
1287          conditions!  */
1288       switch (opcode) { 
1289       case PP_FISH:
1290         bounceFish();
1291         break;
1292 #ifdef DIST       
1293       case PP_REVAL:
1294         bounceReval();
1295         break;    
1296 #endif          
1297       case PP_PETIDS:
1298         belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1299         break;
1300         
1301       case PP_FINISH:   
1302         break;
1303         
1304       default:  
1305         belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1306       }
1307     } else
1308       switch (opcode) {
1309       case PP_FETCH:
1310         processFetch();
1311         // Global statistics: count no. of fetches
1312         if (RtsFlags.ParFlags.ParStats.Global &&
1313             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1314           globalParStats.rec_fetch_mess++;
1315         }
1316         break;
1317
1318       case PP_RESUME:
1319         processResume(task);
1320         // Global statistics: count no. of fetches
1321         if (RtsFlags.ParFlags.ParStats.Global &&
1322             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1323           globalParStats.rec_resume_mess++;
1324         }
1325         break;
1326
1327       case PP_ACK:
1328         processAck();
1329         break;
1330
1331       case PP_FISH:
1332         processFish();
1333         // Global statistics: count no. of fetches
1334         if (RtsFlags.ParFlags.ParStats.Global &&
1335             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1336           globalParStats.rec_fish_mess++;
1337         }
1338         break;
1339
1340       case PP_FREE:
1341         processFree();
1342         break;
1343       
1344       case PP_SCHEDULE:
1345         processSchedule(task);
1346         // Global statistics: count no. of fetches
1347         if (RtsFlags.ParFlags.ParStats.Global &&
1348             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1349           globalParStats.rec_schedule_mess++;
1350         }
1351         break;
1352       
1353 #ifdef DIST      
1354       case PP_REVAL:
1355         processReval(task);
1356         // Global statistics: count no. of fetches
1357         if (RtsFlags.ParFlags.ParStats.Global &&
1358             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1359           globalParStats.rec_reval_mess++;
1360         }
1361         break;
1362 #endif
1363       
1364       default:
1365         /* Anything we're not prepared to deal with. */
1366         barf("Task %x: Unexpected opcode %x from %x",
1367              mytid, opcode, task);
1368       } /* switch */
1369
1370   } while (PacketsWaiting());   /* While there are messages: process them */
1371   return receivedFinish;
1372 }                               /* processMessages */
1373
1374 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1375 //@subsection Miscellaneous Functions
1376
1377 /*
1378  * blockFetch blocks a BlockedFetch node on some kind of black hole.
1379  */
1380 //@cindex blockFetch
1381 void
1382 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1383   bf->node = bh;
1384   switch (get_itbl(bh)->type) {
1385   case BLACKHOLE:
1386     bf->link = END_BQ_QUEUE;
1387     //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1388     SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1389     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1390     
1391     // put bh on the mutables list
1392     recordMutable((StgMutClosure *)bh);
1393     break;
1394     
1395   case BLACKHOLE_BQ:
1396     /* enqueue bf on blocking queue of closure bh */
1397     bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1398     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1399
1400     // put bh on the mutables list; ToDo: check
1401     recordMutable((StgMutClosure *)bh);
1402     break;
1403
1404   case FETCH_ME_BQ:
1405     /* enqueue bf on blocking queue of closure bh */
1406     bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1407     ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1408
1409     // put bh on the mutables list; ToDo: check
1410     recordMutable((StgMutClosure *)bh);
1411     break;
1412     
1413   case RBH:
1414     /* enqueue bf on blocking queue of closure bh */
1415     bf->link = ((StgRBH *)bh)->blocking_queue;
1416     ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1417
1418     // put bh on the mutables list; ToDo: check
1419     recordMutable((StgMutClosure *)bh);
1420     break;
1421     
1422   default:
1423     barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1424          (StgClosure *)bh, get_itbl((StgClosure *)bh), 
1425          info_type((StgClosure *)bh));
1426   }
1427   IF_PAR_DEBUG(bq,
1428                belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1429                      bh, info_type(bh));
1430                print_bq(bh));
1431 }
1432
1433
1434 /*
1435   @blockThread@ is called from the main scheduler whenever tso returns with
1436   a ThreadBlocked return code; tso has already been added to a blocking
1437   queue (that's done in the entry code of the closure, because it is a 
1438   cheap operation we have to do in any case); the main purpose of this
1439   routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1440   closure, which is indicated by the tso.why_blocked field;
1441   we also write an entry into the log file if we are generating one
1442
1443   Should update exectime etc in the entry code already; but we don't have
1444   something like ``system time'' in the log file anyway, so this should
1445   even out the inaccuracies.
1446 */
1447
1448 //@cindex blockThread
1449 void
1450 blockThread(StgTSO *tso)
1451 {
1452   globalAddr *remote_ga=NULL;
1453   globalAddr *local_ga;
1454   globalAddr fmbq_ga;
1455
1456   // ASSERT(we are on some blocking queue)
1457   ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1458
1459   /*
1460     We have to check why this thread has been blocked.
1461   */
1462   switch (tso->why_blocked) {
1463     case BlockedOnGA:
1464       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1465          FETCH_ME entry code */
1466       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1467
1468       /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1469          end this point; if something (eg. GC) happens inbetween the whole
1470          thing will blow up 
1471          The problem is that the ga field of the FETCH_ME has been overwritten
1472          with the head of the blocking queue (which is tso). 
1473       */
1474       ASSERT(looks_like_ga(&theGlobalFromGA));
1475       // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1476       remote_ga = &theGlobalFromGA; //tso->link;
1477       tso->link = (StgTSO*)END_BQ_QUEUE;
1478       /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1479          we have to send a Fetch message here! */
1480       if (RtsFlags.ParFlags.ParStats.Full) {
1481         /* Note that CURRENT_TIME may perform an unsafe call */
1482         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1483         tso->par.fetchcount++;
1484         tso->par.blockedat = CURRENT_TIME;
1485         /* we are about to send off a FETCH message, so dump a FETCH event */
1486         DumpRawGranEvent(CURRENT_PROC, 
1487                          taskIDtoPE(remote_ga->payload.gc.gtid),
1488                          GR_FETCH, tso, tso->block_info.closure, 0, 0);
1489       }
1490       /* Phil T. claims that this was a workaround for a hard-to-find
1491        * bug, hence I'm leaving it out for now --SDM 
1492        */
1493       /* Assign a brand-new global address to the newly created FMBQ  */
1494       local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1495       splitWeight(&fmbq_ga, local_ga);
1496       ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1497       
1498       sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1499
1500       // Global statistics: count no. of fetches
1501       if (RtsFlags.ParFlags.ParStats.Global &&
1502           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1503         globalParStats.tot_fetch_mess++;
1504       }
1505
1506       IF_DEBUG(sanity,
1507                theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1508       break;
1509
1510     case BlockedOnGA_NoSend:
1511       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1512          FETCH_ME_BQ entry code */
1513       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1514
1515       /* Fetch message has been sent already */
1516       if (RtsFlags.ParFlags.ParStats.Full) {
1517         /* Note that CURRENT_TIME may perform an unsafe call */
1518         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1519         tso->par.blockcount++;
1520         tso->par.blockedat = CURRENT_TIME;
1521         /* dump a block event, because fetch has been sent already */
1522         DumpRawGranEvent(CURRENT_PROC, thisPE,
1523                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1524       }
1525       break;
1526
1527     case BlockedOnMVar:
1528     case BlockedOnBlackHole:
1529       /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via 
1530          BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1531       ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1532              get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1533              get_itbl(tso->block_info.closure)->type==RBH);
1534
1535       /* if collecting stats update the execution time etc */
1536       if (RtsFlags.ParFlags.ParStats.Full) {
1537         /* Note that CURRENT_TIME may perform an unsafe call */
1538         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1539         tso->par.blockcount++;
1540         tso->par.blockedat = CURRENT_TIME;
1541         DumpRawGranEvent(CURRENT_PROC, thisPE,
1542                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1543       }
1544       break;
1545
1546     case BlockedOnDelay:
1547       /* Whats sort of stats shall we collect for an explicit threadDelay? */
1548       IF_PAR_DEBUG(verbose,
1549                belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1550                      tso->id));
1551       break;
1552
1553     /* Check that the following is impossible to happen, indeed
1554     case BlockedOnException:
1555     case BlockedOnRead:
1556     case BlockedOnWrite:
1557     */
1558     default:
1559       barf("blockThread: impossible why_blocked code %d for TSO %d",
1560            tso->why_blocked, tso->id);
1561   }
1562
1563   IF_PAR_DEBUG(verbose,
1564                belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1565                      tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1566                      (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1567   
1568   IF_PAR_DEBUG(bq,
1569                print_bq(tso->block_info.closure));
1570 }
1571
1572 /*
1573  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1574  * Important properties:
1575  *   - it varies during execution, even if the PE is idle
1576  *   - it's different for each PE
1577  *   - we never send a fish to ourselves
1578  */
1579 extern long lrand48 (void);
1580
1581 //@cindex choosePE
1582 GlobalTaskId
1583 choosePE(void)
1584 {
1585   long temp;
1586
1587   temp = lrand48() % nPEs;
1588   if (allPEs[temp] == mytid) {  /* Never send a FISH to yourself */
1589     temp = (temp + 1) % nPEs;
1590   }
1591   return allPEs[temp];
1592 }
1593
1594 /* 
1595  * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1596  * of the ga argument; called from processFetch when the local closure is
1597  * under evaluation
1598  */
1599 //@cindex createBlockedFetch
1600 StgClosure *
1601 createBlockedFetch (globalAddr ga, globalAddr rga)
1602 {
1603   StgBlockedFetch *bf;
1604   StgClosure *closure;
1605
1606   closure = GALAlookup(&ga);
1607   if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1608     barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1609     GarbageCollect(GetRoots, rtsFalse); 
1610     closure = GALAlookup(&ga);
1611     bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1612     // ToDo: check whether really guaranteed to succeed 2nd time around
1613   }
1614
1615   ASSERT(bf != (StgBlockedFetch *)NULL);
1616   SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1617   // ToDo: check whether other header info is needed
1618   bf->node = closure;
1619   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1620   bf->ga.payload.gc.slot = rga.payload.gc.slot;
1621   bf->ga.weight = rga.weight;
1622   // bf->link = NULL;  debugging
1623
1624   IF_PAR_DEBUG(schedule,
1625                fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1626                        bf, info_type((StgClosure*)bf));
1627                printGA(&(bf->ga));
1628                fputc('\n',stderr));
1629   return (StgClosure *)bf;
1630 }
1631
1632 /*
1633  * waitForTermination enters a loop ignoring spurious messages while
1634  * waiting for the termination sequence to be completed.  
1635  */
1636 //@cindex waitForTermination
1637 void
1638 waitForTermination(void)
1639 {
1640   do {
1641     rtsPacket p = GetPacket();
1642     processUnexpectedMessage(p);
1643   } while (rtsTrue);
1644 }
1645
1646 #ifdef DEBUG
1647 //@cindex DebugPrintGAGAMap
1648 void
1649 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1650 {
1651   nat i;
1652   
1653   for (i = 0; i < nGAs; ++i, gagamap += 2)
1654     fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1655             gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1656             gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1657 }
1658
1659 //@cindex checkGAGAMap
1660 void
1661 checkGAGAMap(globalAddr *gagamap, int nGAs)
1662 {
1663   nat i;
1664   
1665   for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1666     ASSERT(looks_like_ga(gagamap));
1667     ASSERT(looks_like_ga(gagamap+1));
1668   }
1669 }
1670 #endif
1671
1672 //@cindex freeMsgBuffer
1673 static StgWord **freeMsgBuffer = NULL;
1674 //@cindex freeMsgIndex
1675 static nat      *freeMsgIndex  = NULL;
1676
1677 //@cindex prepareFreeMsgBuffers
1678 void
1679 prepareFreeMsgBuffers(void)
1680 {
1681   nat i;
1682   
1683   /* Allocate the freeMsg buffers just once and then hang onto them. */
1684   if (freeMsgIndex == NULL) {
1685     freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), 
1686                                           "prepareFreeMsgBuffers (Index)");
1687     freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
1688                                           "prepareFreeMsgBuffers (Buffer)");
1689     
1690     for(i = 0; i < nPEs; i++) 
1691       if (i != (thisPE-1)) 
1692         freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1693                                                "prepareFreeMsgBuffers (Buffer #i)");
1694       else
1695         freeMsgBuffer[i] = 0;
1696   }
1697   
1698   /* Initialize the freeMsg buffer pointers to point to the start of their
1699      buffers */
1700   for (i = 0; i < nPEs; i++)
1701     freeMsgIndex[i] = 0;
1702 }
1703
1704 //@cindex freeRemoteGA
1705 void
1706 freeRemoteGA(int pe, globalAddr *ga)
1707 {
1708   nat i;
1709   
1710   ASSERT(GALAlookup(ga) == NULL);
1711   
1712   if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1713     IF_PAR_DEBUG(free,
1714                  belch("!! Filled a free message buffer (sending remaining messages indivisually)"));   
1715
1716     sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1717     i = 0;
1718   }
1719   freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1720   freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1721   freeMsgIndex[pe] = i;
1722
1723   IF_DEBUG(sanity,
1724            ga->weight = 0xdead0add;
1725            ga->payload.gc.gtid = 0xbbbbbbbb;
1726            ga->payload.gc.slot = 0xbbbbbbbb;);
1727 }
1728
1729 //@cindex sendFreeMessages
1730 void
1731 sendFreeMessages(void)
1732 {
1733   nat i;
1734   
1735   for (i = 0; i < nPEs; i++) 
1736     if (freeMsgIndex[i] > 0)
1737       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1738 }
1739
1740 /* synchronises with the other PEs. Receives and records in a global
1741  * variable the task-id of SysMan. If this is the main thread (discovered
1742  * in main.lc), identifies itself to SysMan. Finally it receives
1743  * from SysMan an array of the Global Task Ids of each PE, which is
1744  * returned as the value of the function.
1745  */
1746
1747 #if defined(PAR_TICKY)
1748 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1749 //@cindex stats_CntFreeGA
1750 void
1751 stats_CntFreeGA (void) {  // stats only
1752
1753   // Global statistics: residency of thread and spark pool
1754   if (RtsFlags.ParFlags.ParStats.Global &&
1755       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1756     nat i, s;
1757   
1758     globalParStats.cnt_free_GA++;
1759     for (i = 0, s = 0; i < nPEs; i++) 
1760       s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1761
1762     if ( s > globalParStats.res_free_GA )
1763       globalParStats.res_free_GA = s;
1764   }
1765 }
1766 #endif /* PAR_TICKY */
1767
1768 #endif /* PAR -- whole file */
1769
1770 //@node Index,  , Miscellaneous Functions, High Level Communications Routines
1771 //@subsection Index
1772
1773 //@index
1774 //* ACK::  @cindex\s-+ACK
1775 //* DebugPrintGAGAMap::  @cindex\s-+DebugPrintGAGAMap
1776 //* FETCH::  @cindex\s-+FETCH
1777 //* FISH::  @cindex\s-+FISH
1778 //* FREE::  @cindex\s-+FREE
1779 //* RESUME::  @cindex\s-+RESUME
1780 //* SCHEDULE::  @cindex\s-+SCHEDULE
1781 //* blockFetch::  @cindex\s-+blockFetch
1782 //* choosePE::  @cindex\s-+choosePE
1783 //* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
1784 //* freeMsgIndex::  @cindex\s-+freeMsgIndex
1785 //* freeRemoteGA::  @cindex\s-+freeRemoteGA
1786 //* gumPackBuffer::  @cindex\s-+gumPackBuffer
1787 //* initMoreBuffers::  @cindex\s-+initMoreBuffers
1788 //* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
1789 //* processAck::  @cindex\s-+processAck
1790 //* processFetch::  @cindex\s-+processFetch
1791 //* processFetches::  @cindex\s-+processFetches
1792 //* processFish::  @cindex\s-+processFish
1793 //* processFree::  @cindex\s-+processFree
1794 //* processMessages::  @cindex\s-+processMessages
1795 //* processResume::  @cindex\s-+processResume
1796 //* processSchedule::  @cindex\s-+processSchedule
1797 //* sendAck::  @cindex\s-+sendAck
1798 //* sendFetch::  @cindex\s-+sendFetch
1799 //* sendFish::  @cindex\s-+sendFish
1800 //* sendFree::  @cindex\s-+sendFree
1801 //* sendFreeMessages::  @cindex\s-+sendFreeMessages
1802 //* sendResume::  @cindex\s-+sendResume
1803 //* sendSchedule::  @cindex\s-+sendSchedule
1804 //* unpackAck::  @cindex\s-+unpackAck
1805 //* unpackFetch::  @cindex\s-+unpackFetch
1806 //* unpackFish::  @cindex\s-+unpackFish
1807 //* unpackFree::  @cindex\s-+unpackFree
1808 //* unpackResume::  @cindex\s-+unpackResume
1809 //* unpackSchedule::  @cindex\s-+unpackSchedule
1810 //* waitForTermination::  @cindex\s-+waitForTermination
1811 //@end index