add Outputable instance for OccIfaceEq
[ghc-hetmet.git] / rts / parallel / HLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
3  *
4  * High Level Communications Routines (HLComms.lc)
5  *
6  * Contains the high-level routines (i.e. communication
7  * subsystem independent) used by GUM
8  * 
9  * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
10  * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
11  * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
12  * 
13  * ------------------------------------------------------------------------- */
14
15 #ifdef PAR /* whole file */
16
17 //@node High Level Communications Routines, , ,
18 //@section High Level Communications Routines
19
20 //@menu
21 //* Macros etc::                
22 //* Includes::                  
23 //* GUM Message Sending and Unpacking Functions::  
24 //* Message-Processing Functions::  
25 //* GUM Message Processor::     
26 //* Miscellaneous Functions::   
27 //* Index::                     
28 //@end menu
29
30 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
31 //@subsection Macros etc
32
33 /* Evidently not Posix */
34 /* #include "PosixSource.h" */
35
36 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
37 //@subsection Includes
38
39 #include "Rts.h"
40 #include "RtsUtils.h"
41 #include "RtsFlags.h"
42 #include "Storage.h"   // for recordMutable
43 #include "HLC.h"
44 #include "Parallel.h"
45 #include "GranSimRts.h"
46 #include "ParallelRts.h"
47 #include "Sparks.h"
48 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
49 #if defined(DEBUG)
50 # include "ParallelDebug.h"
51 #endif
52 #include "StgMacros.h" // inlined IS_... fcts
53
54 #ifdef DIST
55 #include "SchedAPI.h" //for createIOThread
56 extern unsigned int context_switch; 
57 #endif /* DIST */
58
59 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
60 //@subsection GUM Message Sending and Unpacking Functions
61
62 /*
63  * GUM Message Sending and Unpacking Functions
64  */
65
66 /*
67  * Allocate space for message processing
68  */
69
70 //@cindex gumPackBuffer
71 static rtsPackBuffer *gumPackBuffer;
72
73 //@cindex initMoreBuffers
74 rtsBool
75 initMoreBuffers(void)
76 {
77   if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize, 
78                                              "initMoreBuffers")) == NULL)
79     return rtsFalse;
80   return rtsTrue;
81 }
82
83 /*
84  * SendFetch packs the two global addresses and a load into a message +
85  * sends it.  
86
87 //@cindex FETCH
88
89    Structure of a FETCH message:
90
91          |    GA 1     |        GA 2          |
92          +------------------------------------+------+
93          | gtid | slot | weight | gtid | slot | load |
94          +------------------------------------+------+
95  */
96
97 //@cindex sendFetch
98 void
99 sendFetch(globalAddr *rga, globalAddr *lga, int load)
100 {
101   ASSERT(rga->weight > 0 && lga->weight > 0);
102   IF_PAR_DEBUG(fetch,
103                belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
104                      rga->payload.gc.gtid, rga->payload.gc.slot, 
105                      lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
106                      load));
107
108
109   /* ToDo: Dump event
110   DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid), 
111                    GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
112                    0, spark_queue_len(ADVISORY_POOL));
113   */
114
115   sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
116           (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot, 
117           (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid, 
118           (StgWord) lga->payload.gc.slot, (StgWord) load);
119 }
120
121 /*
122  * unpackFetch unpacks a FETCH message into two Global addresses and a load
123  * figure.  
124 */
125
126 //@cindex unpackFetch
127 static void
128 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
129 {
130   long buf[6];
131
132   GetArgs(buf, 6); 
133
134   IF_PAR_DEBUG(fetch,
135                belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
136                      (GlobalTaskId) buf[0], (int) buf[1], 
137                      (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
138
139   lga->weight = 1;
140   lga->payload.gc.gtid = (GlobalTaskId) buf[0];
141   lga->payload.gc.slot = (int) buf[1];
142
143   rga->weight = (unsigned) buf[2];
144   rga->payload.gc.gtid = (GlobalTaskId) buf[3];
145   rga->payload.gc.slot = (int) buf[4];
146
147   *load = (int) buf[5];
148
149   ASSERT(rga->weight > 0);
150 }
151
152 /*
153  * SendResume packs the remote blocking queue's GA and data into a message 
154  * and sends it.
155
156 //@cindex RESUME
157
158    Structure of a RESUME message:
159
160       -------------------------------
161       | weight | slot | n | data ...
162       -------------------------------
163
164    data is a packed graph represented as an rtsPackBuffer
165    n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
166  */
167
168 //@cindex sendResume
169 void
170 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
171 {
172   IF_PAR_DEBUG(fetch,
173                belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]", 
174                      packBuffer->id, nelem,
175                      rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
176                      rga->payload.gc.gtid));
177   IF_PAR_DEBUG(packet,
178                PrintPacket(packBuffer));
179
180   ASSERT(nelem==packBuffer->size);
181   /* check for magic end-of-buffer word */
182   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
183
184   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
185            nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer, 
186            2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
187 }
188
189 /*
190  * unpackResume unpacks a Resume message into two Global addresses and
191  * a data array.
192  */
193
194 //@cindex unpackResume
195 static void
196 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
197 {
198     long buf[3];
199
200     GetArgs(buf, 3); 
201
202     /*
203       RESUME event is written in awaken_blocked_queue
204     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
205                      GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
206     */
207
208     lga->weight = (unsigned) buf[0];
209     lga->payload.gc.gtid = mytid;
210     lga->payload.gc.slot = (int) buf[1];
211
212     *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
213     GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
214
215     IF_PAR_DEBUG(fetch,
216                  belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))", 
217                        packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
218
219     /* check for magic end-of-buffer word */
220     IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
221 }
222
223 /*
224  * SendAck packs the global address being acknowledged, together with
225  * an array of global addresses for any closures shipped and sends them.
226
227 //@cindex ACK
228
229    Structure of an ACK message:
230
231       |        GA 1          |        GA 2          | 
232       +---------------------------------------------+-------
233       | weight | gtid | slot | weight | gtid | slot |  .....  ngas times
234       + --------------------------------------------+------- 
235
236  */
237
238 //@cindex sendAck
239 void
240 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
241 {
242   static long *buffer;
243   long *p;
244   int i;
245
246   if(ngas==0)
247     return; //don't send unnecessary messages!!
248   
249   buffer = (long *) gumPackBuffer;
250
251   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
252     ASSERT(gagamap[1].weight > 0);
253     p[0] = (long) gagamap->weight;
254     p[1] = (long) gagamap->payload.gc.gtid;
255     p[2] = (long) gagamap->payload.gc.slot;
256     gagamap++;
257     p[3] = (long) gagamap->weight;
258     p[4] = (long) gagamap->payload.gc.gtid;
259     p[5] = (long) gagamap->payload.gc.slot;
260     gagamap++;
261   }
262   IF_PAR_DEBUG(schedule,
263                belch("~^,, Sending Ack (%d pairs) to [%x]\n", 
264                      ngas, task));
265
266   sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
267 }
268
269 /*
270  * unpackAck unpacks an Acknowledgement message into a Global address,
271  * a count of the number of global addresses following and a map of 
272  * Global addresses
273  */
274
275 //@cindex unpackAck
276 static void
277 unpackAck(int *ngas, globalAddr *gagamap)
278 {
279   long GAarraysize;
280   long buf[6];
281   
282   GetArgs(&GAarraysize, 1);
283   
284   *ngas = GAarraysize / 6;
285   
286   IF_PAR_DEBUG(schedule,
287                belch("~^,, Unpacking Ack (%d pairs) on [%x]\n", 
288                      *ngas, mytid));
289
290   while (GAarraysize > 0) {
291     GetArgs(buf, 6);
292     gagamap->weight = (rtsWeight) buf[0];
293     gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
294     gagamap->payload.gc.slot = (int) buf[2];
295     gagamap++;
296     gagamap->weight = (rtsWeight) buf[3];
297     gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
298     gagamap->payload.gc.slot = (int) buf[5];
299     ASSERT(gagamap->weight > 0);
300     gagamap++;
301     GAarraysize -= 6;
302   }
303 }
304
305 /*
306  * SendFish packs the global address being acknowledged, together with
307  * an array of global addresses for any closures shipped and sends them.
308
309 //@cindex FISH
310
311  Structure of a FISH message:
312
313      +----------------------------------+
314      | orig PE | age | history | hunger |
315      +----------------------------------+
316  */
317
318 //@cindex sendFish
319 void
320 sendFish(GlobalTaskId destPE, GlobalTaskId origPE, 
321          int age, int history, int hunger)
322 {
323   IF_PAR_DEBUG(fish,
324                belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)", 
325                      destPE, outstandingFishes));
326
327   sendOpV(PP_FISH, destPE, 4, 
328           (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
329
330   if (origPE == mytid) {
331     //fishing = rtsTrue;
332     outstandingFishes++;
333   }
334 }
335
336 /*
337  * unpackFish unpacks a FISH message into the global task id of the
338  * originating PE and 3 data fields: the age, history and hunger of the
339  * fish. The history + hunger are not currently used.
340
341  */
342
343 //@cindex unpackFish
344 static void
345 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
346 {
347   long buf[4];
348   
349   GetArgs(buf, 4);
350   
351   IF_PAR_DEBUG(fish,
352                belch("~^$$ Unpacking Fish from [%x] (age=%d)", 
353                      (GlobalTaskId) buf[0], (int) buf[1]));
354
355   *origPE = (GlobalTaskId) buf[0];
356   *age = (int) buf[1];
357   *history = (int) buf[2];
358   *hunger = (int) buf[3];
359 }
360
361 /*
362  * SendFree sends (weight, slot) pairs for GAs that we no longer need
363  * references to.  
364
365 //@cindex FREE
366
367    Structure of a FREE message:
368    
369        +-----------------------------
370        | n | weight_1 | slot_1 | ...
371        +-----------------------------
372  */
373 //@cindex sendFree
374 void
375 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
376 {
377     IF_PAR_DEBUG(free,
378                  belch("~^!! Sending Free (%d GAs) to [%x]", 
379                        nelem/2, pe));
380
381     sendOpN(PP_FREE, pe, nelem, data);
382 }
383
384 /*
385  * unpackFree unpacks a FREE message into the amount of data shipped and
386  * a data block.
387  */
388 //@cindex unpackFree
389 static void
390 unpackFree(int *nelem, StgWord *data)
391 {
392   long buf[1];
393   
394   GetArgs(buf, 1);
395   *nelem = (int) buf[0];
396
397   IF_PAR_DEBUG(free,
398                belch("~^!! Unpacking Free (%d GAs)", 
399                      *nelem/2));
400
401   GetArgs(data, *nelem);
402 }
403
404 /*
405  * SendSchedule sends a closure to be evaluated in response to a Fish
406  * message. The message is directed to the PE that originated the Fish
407  * (origPE), and includes the packed closure (data) along with its size
408  * (nelem).
409
410 //@cindex SCHEDULE
411
412    Structure of a SCHEDULE message:
413
414        +------------------------------------
415        | PE | n | pack buffer of a graph ...
416        +------------------------------------
417  */
418 //@cindex sendSchedule
419 void
420 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
421 {
422   IF_PAR_DEBUG(schedule,
423                belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n", 
424                      packBuffer->id, nelem, origPE));
425   IF_PAR_DEBUG(packet,
426                PrintPacket(packBuffer));
427
428   ASSERT(nelem==packBuffer->size);
429   /* check for magic end-of-buffer word */
430   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
431
432   sendOpN(PP_SCHEDULE, origPE, 
433           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
434 }
435
436 /*
437  * unpackSchedule unpacks a SCHEDULE message into the Global address of
438  * the closure shipped, the amount of data shipped (nelem) and the data
439  * block (data).
440  */
441
442 //@cindex unpackSchedule
443 static void
444 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
445 {
446   long buf[1];
447
448   /* first, just unpack 1 word containing the total size (including header) */
449   GetArgs(buf, 1);
450   /* no. of elems, not counting the header of the pack buffer */
451   *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
452
453   /* automatic cast of flat pvm-data to rtsPackBuffer */
454   GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
455
456   IF_PAR_DEBUG(schedule,
457                belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n", 
458                      packBuffer->id, *nelem, mytid));
459
460   ASSERT(*nelem==packBuffer->size);
461   /* check for magic end-of-buffer word */
462   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
463 }
464
465 #ifdef DIST
466 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
467 void
468 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
469 {  
470   IF_PAR_DEBUG(schedule,
471                belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n", 
472                      packBuffer->id, nelem, origPE));
473   IF_PAR_DEBUG(packet,
474                PrintPacket(packBuffer));
475
476   ASSERT(nelem==packBuffer->size);
477   /* check for magic end-of-buffer word */
478   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
479
480   sendOpN(PP_REVAL, origPE, 
481           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
482 }
483
484 void FinishReval(StgTSO *t)
485 { StgClosure *res;
486   globalAddr ga;
487   nat size;
488   rtsPackBuffer *buffer=NULL;
489   
490   ga.payload.gc.slot = t->revalSlot;
491   ga.payload.gc.gtid = t->revalTid;
492   ga.weight = 0; 
493   
494   //find where the reval result is
495   res = GALAlookup(&ga);
496   ASSERT(res);
497   
498   IF_PAR_DEBUG(schedule,
499     printGA(&ga);
500     belch(" needs the result %08x\n",res));       
501   
502   //send off the result
503   buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
504   ASSERT(buffer != (rtsPackBuffer *)NULL);
505   sendResume(&ga, size, buffer);
506
507   IF_PAR_DEBUG(schedule,
508     belch("@;~) Reval Finished"));
509 }
510
511 #endif /* DIST */
512
513 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
514 //@subsection Message-Processing Functions
515
516 /*
517  * Message-Processing Functions
518  *
519  * The following routines process incoming GUM messages. Often reissuing
520  * messages in response.
521  *
522  * processFish unpacks a fish message, reissuing it if it's our own,
523  * sending work if we have it or sending it onwards otherwise.
524  */
525
526 /*
527  * processFetches constructs and sends resume messages for every
528  * BlockedFetch which is ready to be awakened.
529  * awaken_blocked_queue (in Schedule.c) is responsible for moving 
530  * BlockedFetches from a blocking queue to the PendingFetches queue.
531  */
532 void GetRoots(void);
533 extern StgBlockedFetch *PendingFetches;
534
535 nat
536 pending_fetches_len(void)
537 {
538   StgBlockedFetch *bf;
539   nat n;
540
541   for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
542     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
543   }
544   return n;
545 }
546
547 //@cindex processFetches
548 void
549 processFetches(void) {
550   StgBlockedFetch *bf, *next;
551   StgClosure *closure;
552   StgInfoTable *ip;
553   globalAddr rga;
554   static rtsPackBuffer *packBuffer;
555     
556   IF_PAR_DEBUG(verbose,
557                belch("____ processFetches: %d pending fetches (root @ %p)",
558                      pending_fetches_len(), PendingFetches));
559   
560   for (bf = PendingFetches; 
561        bf != END_BF_QUEUE;
562        bf=next) {
563     /* the PendingFetches list contains only BLOCKED_FETCH closures */
564     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
565     /* store link (we might overwrite it via blockFetch later on */
566     next = (StgBlockedFetch *)(bf->link);
567
568     /*
569      * Find the target at the end of the indirection chain, and
570      * process it in much the same fashion as the original target
571      * of the fetch.  Though we hope to find graph here, we could
572      * find a black hole (of any flavor) or even a FetchMe.
573      */
574     closure = bf->node;
575     /*
576       We evacuate BQs and update the node fields where necessary in GC.c
577       So, if we find an EVACUATED closure, something has gone Very Wrong
578       (and therefore we let the RTS crash most ungracefully).
579     */
580     ASSERT(get_itbl(closure)->type != EVACUATED);
581       //  closure = ((StgEvacuated *)closure)->evacuee;
582
583     closure = UNWIND_IND(closure);
584     //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
585
586     ip = get_itbl(closure);
587     if (ip->type == FETCH_ME) {
588       /* Forward the Fetch to someone else */
589       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
590       rga.payload.gc.slot = bf->ga.payload.gc.slot;
591       rga.weight = bf->ga.weight;
592       
593       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
594
595       // Global statistics: count no. of fetches
596       if (RtsFlags.ParFlags.ParStats.Global &&
597           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
598         globalParStats.tot_fetch_mess++;
599       }
600
601       IF_PAR_DEBUG(fetch,
602                    belch("__-> processFetches: Forwarding fetch from %lx to %lx",
603                          mytid, rga.payload.gc.gtid));
604
605     } else if (IS_BLACK_HOLE(closure)) {
606       IF_PAR_DEBUG(verbose,
607                    belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
608                          closure, info_type(closure)));
609       bf->node = closure;
610       blockFetch(bf, closure);
611     } else {
612       /* We now have some local graph to send back */
613       nat size;
614
615       packBuffer = gumPackBuffer;
616       IF_PAR_DEBUG(verbose,
617                    belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
618                          closure, info_type(closure)));
619
620       if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
621         // Put current BF back on list
622         bf->link = (StgBlockingQueueElement *)PendingFetches;
623         PendingFetches = (StgBlockedFetch *)bf;
624         // ToDo: check that nothing more has to be done to prepare for GC!
625         barf("processFetches: out of heap while packing graph; ToDo: call GC here");
626         GarbageCollect(GetRoots, rtsFalse); 
627         bf = PendingFetches;
628         PendingFetches = (StgBlockedFetch *)(bf->link);
629         closure = bf->node;
630         packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
631         ASSERT(packBuffer != (rtsPackBuffer *)NULL);
632       }
633       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
634       rga.payload.gc.slot = bf->ga.payload.gc.slot;
635       rga.weight = bf->ga.weight;
636       
637       sendResume(&rga, size, packBuffer);
638
639       // Global statistics: count no. of fetches
640       if (RtsFlags.ParFlags.ParStats.Global &&
641           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
642         globalParStats.tot_resume_mess++;
643       }
644     }
645   }
646   PendingFetches = END_BF_QUEUE;
647 }
648
649 #if 0
650 /*
651   Alternatively to sending fetch messages directly from the FETCH_ME_entry
652   code we could just store the data about the remote data in a global
653   variable and send the fetch request from the main scheduling loop (similar
654   to processFetches above). This would save an expensive STGCALL in the entry 
655   code because we have to go back to the scheduler anyway.
656 */
657 //@cindex processFetches
658 void
659 processTheRealFetches(void) {
660   StgBlockedFetch *bf;
661   StgClosure *closure, *next;
662     
663   IF_PAR_DEBUG(verbose,
664                belch("__ processTheRealFetches: ");
665                printGA(&theGlobalFromGA);
666                printGA(&theGlobalToGA));
667
668   ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
669          theGlobalToGA.payload.gc.gtid != 0);
670
671   /* the old version did this in the FETCH_ME entry code */
672   sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
673   
674 }
675 #endif
676
677
678 /* 
679    Way of dealing with unwanted fish.
680    Used during startup/shutdown, or from unknown PEs 
681 */
682 void
683 bounceFish(void) { 
684   GlobalTaskId origPE;
685   int age, history, hunger;
686   
687   /* IF_PAR_DEBUG(verbose, */
688                belch(".... [%x] Bouncing unwanted FISH",mytid);
689
690   unpackFish(&origPE, &age, &history, &hunger);
691           
692   if (origPE == mytid) {
693     //fishing = rtsFalse;                   // fish has come home
694     outstandingFishes--;
695     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
696     return;                               // that's all
697   }
698
699   /* otherwise, send it home to die */
700   sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
701   // Global statistics: count no. of fetches
702       if (RtsFlags.ParFlags.ParStats.Global &&
703           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
704         globalParStats.tot_fish_mess++;
705       }
706 }
707    
708 /*
709  * processFish unpacks a fish message, reissuing it if it's our own,
710  * sending work if we have it or sending it onwards otherwise.
711  */
712 //@cindex processFish
713 static void
714 processFish(void)
715 {
716   GlobalTaskId origPE;
717   int age, history, hunger;
718   rtsSpark spark;
719   static rtsPackBuffer *packBuffer; 
720
721   unpackFish(&origPE, &age, &history, &hunger);
722
723   if (origPE == mytid) {
724     //fishing = rtsFalse;                   // fish has come home
725     outstandingFishes--;
726     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
727     return;                               // that's all
728   }
729
730   ASSERT(origPE != mytid);
731   IF_PAR_DEBUG(fish,
732                belch("$$__ processing fish; %d sparks available",
733                      spark_queue_len(&(MainRegTable.rSparks))));
734   while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
735     nat size;
736     // StgClosure *graph;
737
738     packBuffer = gumPackBuffer; 
739     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
740     if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
741       IF_PAR_DEBUG(fish,
742                    belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
743                          (StgClosure *)spark));
744       barf("processFish: out of heap while packing graph; ToDo: call GC here");
745       GarbageCollect(GetRoots, rtsFalse);
746       /* Now go back and try again */
747     } else {
748       IF_PAR_DEBUG(verbose,
749                    if (RtsFlags.ParFlags.ParStats.Sparks)
750                      belch("==== STEALING spark %x; sending to %x", spark, origPE));
751       
752       IF_PAR_DEBUG(fish,
753                    belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
754                          origPE, 
755                          (StgClosure *)spark, info_type((StgClosure *)spark)));
756       sendSchedule(origPE, size, packBuffer);
757       disposeSpark(spark);
758       // Global statistics: count no. of fetches
759       if (RtsFlags.ParFlags.ParStats.Global &&
760           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
761         globalParStats.tot_schedule_mess++;
762       }
763
764       break;
765     }
766   }
767   if (spark == (rtsSpark)NULL) {
768     IF_PAR_DEBUG(fish,
769                  belch("$$^^ No sparks available for FISH from %x",
770                        origPE));
771     /* We have no sparks to give */
772     if (age < FISH_LIFE_EXPECTANCY) {
773       /* and the fish is atill young, send it to another PE to look for work */
774       sendFish(choosePE(), origPE,
775                (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
776
777       // Global statistics: count no. of fetches
778       if (RtsFlags.ParFlags.ParStats.Global &&
779           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
780         globalParStats.tot_fish_mess++;
781       }
782     } else { /* otherwise, send it home to die */
783       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
784       // Global statistics: count no. of fetches
785       if (RtsFlags.ParFlags.ParStats.Global &&
786           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
787         globalParStats.tot_fish_mess++;
788       }
789     }
790   }
791 }  /* processFish */
792
793 /*
794  * processFetch either returns the requested data (if available) 
795  * or blocks the remote blocking queue on a black hole (if not).
796  */
797
798 //@cindex processFetch
799 static void
800 processFetch(void)
801 {
802   globalAddr ga, rga;
803   int load;
804   StgClosure *closure;
805   StgInfoTable *ip;
806
807   unpackFetch(&ga, &rga, &load);
808   IF_PAR_DEBUG(fetch,
809                belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
810                      ga.payload.gc.gtid, ga.payload.gc.slot,
811                      rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
812                      rga.payload.gc.gtid));
813
814   closure = GALAlookup(&ga);
815   ASSERT(closure != (StgClosure *)NULL);
816   ip = get_itbl(closure);
817   if (ip->type == FETCH_ME) {
818     /* Forward the Fetch to someone else */
819     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
820
821     // Global statistics: count no. of fetches
822     if (RtsFlags.ParFlags.ParStats.Global &&
823         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
824       globalParStats.tot_fetch_mess++;
825     }
826   } else if (rga.payload.gc.gtid == mytid) {
827     /* Our own FETCH forwarded back around to us */
828     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
829     
830     IF_PAR_DEBUG(fetch,
831                  belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
832                        closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
833     /* We may have already discovered that the fetch target is our own. */
834     if ((StgClosure *)fmbq != closure) 
835       CommonUp((StgClosure *)fmbq, closure);
836     (void) addWeight(&rga);
837   } else if (IS_BLACK_HOLE(closure)) {
838     /* This includes RBH's and FMBQ's */
839     StgBlockedFetch *bf;
840
841     /* Can we assert something on the remote GA? */
842     ASSERT(GALAlookup(&rga) == NULL);
843
844     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
845        closure into the BQ in order to denote that when updating this node
846        the result should be sent to the originator of this fetch message. */
847     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
848     IF_PAR_DEBUG(fetch,
849                  belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
850                        rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
851                        closure, info_type(closure)));
852     blockFetch(bf, closure);
853   } else {                      
854     /* The target of the FetchMe is some local graph */
855     nat size;
856     // StgClosure *graph;
857     rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
858
859     if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
860       barf("processFetch: out of heap while packing graph; ToDo: call GC here");
861       GarbageCollect(GetRoots, rtsFalse); 
862       closure = GALAlookup(&ga);
863       buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
864       ASSERT(buffer != (rtsPackBuffer *)NULL);
865     }
866     sendResume(&rga, size, buffer);
867
868     // Global statistics: count no. of fetches
869     if (RtsFlags.ParFlags.ParStats.Global &&
870         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
871       globalParStats.tot_resume_mess++;
872     }
873   }
874 }
875
876 /* 
877    The list of pending fetches must be a root-list for GC.
878    This routine is called from GC.c (same as marking GAs etc).
879 */
880 void
881 markPendingFetches(rtsBool major_gc) {
882
883   /* No need to traverse the list; this is done via the scavenge code
884      for a BLOCKED_FETCH closure, which evacuates the link field */
885
886   if (PendingFetches != END_BF_QUEUE ) {
887     IF_PAR_DEBUG(tables,
888                  fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
889                          PendingFetches));
890
891     PendingFetches = MarkRoot((StgClosure*)PendingFetches);
892
893     IF_PAR_DEBUG(verbose,
894                  fprintf(stderr, " %p\n", PendingFetches));
895
896   } else {
897     IF_PAR_DEBUG(tables,
898                  fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
899   }
900 }
901
902 /*
903  * processFree unpacks a FREE message and adds the weights to our GAs.
904  */
905 //@cindex processFree
906 static void
907 processFree(void)
908 {
909   int nelem;
910   static StgWord *buffer;
911   int i;
912   globalAddr ga;
913
914   buffer = (StgWord *)gumPackBuffer;
915   unpackFree(&nelem, buffer);
916   IF_PAR_DEBUG(free,
917                belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
918
919   ga.payload.gc.gtid = mytid;
920   for (i = 0; i < nelem;) {
921     ga.weight = (rtsWeight) buffer[i++];
922     ga.payload.gc.slot = (int) buffer[i++];
923     IF_PAR_DEBUG(free,
924                  fprintf(stderr, "!!-- Processing free "); 
925                  printGA(&ga);
926                  fputc('\n', stderr);
927                  );
928     (void) addWeight(&ga);
929   }
930 }
931
932 /*
933  * processResume unpacks a RESUME message into the graph, filling in
934  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
935  * FetchMe (now a blocking queue) are awakened, and the blocking queue
936  * is converted into an indirection.  Finally it sends an ACK in response
937  * which contains any newly allocated GAs.
938  */
939
940 //@cindex processResume
941 static void
942 processResume(GlobalTaskId sender)
943 {
944   int nelem;
945   nat nGAs;
946   static rtsPackBuffer *packBuffer;
947   StgClosure *newGraph, *old;
948   globalAddr lga;
949   globalAddr *gagamap;
950   
951   packBuffer = (rtsPackBuffer *)gumPackBuffer;
952   unpackResume(&lga, &nelem, packBuffer);
953
954   IF_PAR_DEBUG(fetch,
955                fprintf(stderr, "[]__ Rcvd Resume for "); 
956                printGA(&lga);
957                fputc('\n', stderr));
958   IF_PAR_DEBUG(packet,
959                PrintPacket((rtsPackBuffer *)packBuffer));
960   
961   /* 
962    * We always unpack the incoming graph, even if we've received the
963    * requested node in some other data packet (and already awakened
964    * the blocking queue).
965   if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
966     ReallyPerformThreadGC(packBuffer[0], rtsFalse);
967     SAVE_Hp -= packBuffer[0];
968   }
969    */
970
971   // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
972
973   /* Do this *after* GC; we don't want to release the object early! */
974
975   if (lga.weight > 0)
976     (void) addWeight(&lga);
977
978   old = GALAlookup(&lga);
979
980   /* ToDo:  The closure that requested this graph must be one of these two?*/
981   ASSERT(get_itbl(old)->type == FETCH_ME_BQ || 
982          get_itbl(old)->type == RBH);
983
984   if (RtsFlags.ParFlags.ParStats.Full) {
985     StgBlockingQueueElement *bqe, *last_bqe;
986
987     IF_PAR_DEBUG(fetch,
988                  belch("[]-- Resume is REPLY to closure %lx", old));
989
990     /* Write REPLY events to the log file, indicating that the remote
991        data has arrived 
992        NB: we emit a REPLY only for the *last* elem in the queue; this is
993            the one that triggered the fetch message; all other entries
994            have just added themselves to the queue, waiting for the data 
995            they know that has been requested (see entry code for FETCH_ME_BQ)
996     */
997     if ((get_itbl(old)->type == FETCH_ME_BQ ||
998          get_itbl(old)->type == RBH)) {
999       for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1000            last_bqe = END_BQ_QUEUE;
1001              get_itbl(bqe)->type==TSO || 
1002              get_itbl(bqe)->type==BLOCKED_FETCH;
1003            last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1004
1005       ASSERT(last_bqe==END_BQ_QUEUE || 
1006              get_itbl((StgClosure *)last_bqe)->type == TSO);
1007
1008       /* last_bqe now points to the TSO that triggered the FETCH */ 
1009       if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1010         DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
1011                          GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1012                          0, spark_queue_len(&(MainRegTable.rSparks)));
1013     }
1014   }
1015
1016   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1017   ASSERT(newGraph != NULL);
1018
1019   /* 
1020    * Sometimes, unpacking will common up the resumee with the
1021    * incoming graph, but if it hasn't, we'd better do so now.
1022    */
1023    
1024   if (get_itbl(old)->type == FETCH_ME_BQ)
1025     CommonUp(old, newGraph);
1026
1027   IF_PAR_DEBUG(fetch,
1028                belch("[]-- Ready to resume unpacked graph at %p (%s)",
1029                      newGraph, info_type(newGraph)));
1030
1031   IF_PAR_DEBUG(tables,
1032                DebugPrintGAGAMap(gagamap, nGAs));
1033   
1034   sendAck(sender, nGAs, gagamap);
1035 }
1036
1037 /*
1038  * processSchedule unpacks a SCHEDULE message into the graph, filling
1039  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1040  * the local spark queue.  Finally it sends an ACK in response
1041  * which contains any newly allocated GAs.
1042  */
1043 //@cindex processSchedule
1044 static void
1045 processSchedule(GlobalTaskId sender)
1046 {
1047   nat nelem, nGAs;
1048   rtsBool success;
1049   static rtsPackBuffer *packBuffer;
1050   StgClosure *newGraph;
1051   globalAddr *gagamap;
1052   
1053   packBuffer = gumPackBuffer;           /* HWL */
1054   unpackSchedule(&nelem, packBuffer);
1055
1056   IF_PAR_DEBUG(schedule,
1057                belch("--__ Rcvd Schedule (%d elems)", nelem));
1058   IF_PAR_DEBUG(packet,
1059                PrintPacket(packBuffer));
1060
1061   /*
1062    * For now, the graph is a closure to be sparked as an advisory
1063    * spark, but in future it may be a complete spark with
1064    * required/advisory status, priority etc.
1065    */
1066
1067   /*
1068   space_required = packBuffer[0];
1069   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1070     ReallyPerformThreadGC(space_required, rtsFalse);
1071     SAVE_Hp -= space_required;
1072   }
1073   */
1074   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1075   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1076   ASSERT(newGraph != NULL);
1077   success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1078
1079   if (RtsFlags.ParFlags.ParStats.Full && 
1080       RtsFlags.ParFlags.ParStats.Sparks && 
1081       success) 
1082     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1083                      GR_STOLEN, ((StgTSO *)NULL), newGraph, 
1084                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1085
1086   IF_PAR_DEBUG(schedule,
1087                if (success)
1088                  belch("--^^  added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)", 
1089                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1090                else
1091                  belch("--^^  received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]", 
1092                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1093   IF_PAR_DEBUG(packet,
1094                belch("*<    Unpacked graph with root at %p (%s):", 
1095                      newGraph, info_type(newGraph));
1096                PrintGraph(newGraph, 0));
1097
1098   IF_PAR_DEBUG(tables,
1099                DebugPrintGAGAMap(gagamap, nGAs));
1100
1101   sendAck(sender, nGAs, gagamap);
1102
1103   //fishing = rtsFalse;
1104   ASSERT(outstandingFishes>0);
1105   outstandingFishes--;
1106 }
1107
1108 /*
1109  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1110  * (which represent shared thunks that have been shipped) into fetch-mes
1111  * to remote GAs.
1112  */
1113 //@cindex processAck
1114 static void
1115 processAck(void)
1116 {
1117   nat nGAs;
1118   globalAddr *gaga;
1119   globalAddr gagamap[256]; // ToDo: elim magic constant!!   MAX_GAS * 2];??
1120
1121   unpackAck(&nGAs, gagamap);
1122
1123   IF_PAR_DEBUG(tables,
1124                belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1125                DebugPrintGAGAMap(gagamap, nGAs));
1126
1127   IF_DEBUG(sanity,
1128            checkGAGAMap(gagamap, nGAs));
1129
1130   /*
1131    * For each (oldGA, newGA) pair, set the GA of the corresponding
1132    * thunk to the newGA, convert the thunk to a FetchMe, and return
1133    * the weight from the oldGA.
1134    */
1135   for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1136     StgClosure *old_closure = GALAlookup(gaga);
1137     StgClosure *new_closure = GALAlookup(gaga + 1);
1138
1139     ASSERT(old_closure != NULL);
1140     if (new_closure == NULL) {
1141       /* We don't have this closure, so we make a fetchme for it */
1142       globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1143       
1144       /* convertToFetchMe should be done unconditionally here.
1145          Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1146          so we have to check whether it is an RBH before converting
1147
1148          ASSERT(get_itbl(old_closure)==RBH);
1149       */
1150       if (get_itbl(old_closure)->type==RBH)
1151         convertToFetchMe((StgRBH *)old_closure, ga);
1152     } else {
1153       /* 
1154        * Oops...we've got this one already; update the RBH to
1155        * point to the object we already know about, whatever it
1156        * happens to be.
1157        */
1158       CommonUp(old_closure, new_closure);
1159       
1160       /* 
1161        * Increase the weight of the object by the amount just
1162        * received in the second part of the ACK pair.
1163        */
1164       (void) addWeight(gaga + 1);
1165     }
1166     (void) addWeight(gaga);
1167   }
1168
1169   /* check the sanity of the LAGA and GALA tables after mincing them */
1170   IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1171 }
1172
1173 #ifdef DIST
1174
1175 void
1176 bounceReval(void) {  
1177   barf("Task %x: TODO: should send NACK in response to REVAL",mytid);     
1178 }
1179
1180 static void
1181 processReval(GlobalTaskId sender) //similar to schedule...
1182 { nat nelem, space_required, nGAs;
1183   static rtsPackBuffer *packBuffer;
1184   StgClosure *newGraph;
1185   globalAddr *gagamap;
1186   StgTSO*     tso;
1187   globalAddr *ga;
1188   
1189   packBuffer = gumPackBuffer;           /* HWL */
1190   unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1191
1192   IF_PAR_DEBUG(packet,
1193                belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1194                PrintPacket(packBuffer));
1195
1196   /*
1197   space_required = packBuffer[0];
1198   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1199     ReallyPerformThreadGC(space_required, rtsFalse);
1200     SAVE_Hp -= space_required;
1201   }
1202   */
1203   
1204   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1205   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1206   ASSERT(newGraph != NULL);
1207   
1208   IF_PAR_DEBUG(packet,
1209                belch("@;~)  Unpacked graph with root at %p (%s):", 
1210                      newGraph, info_type(newGraph));
1211                PrintGraph(newGraph, 0));
1212
1213   IF_PAR_DEBUG(tables,
1214                DebugPrintGAGAMap(gagamap, nGAs));
1215
1216   IF_PAR_DEBUG(tables, 
1217     printLAGAtable();   
1218     DebugPrintGAGAMap(gagamap, nGAs));   
1219
1220   //We don't send an Ack to the head!!!!
1221   ASSERT(nGAs>0);  
1222   sendAck(sender, nGAs-1, gagamap+2);
1223   
1224   IF_PAR_DEBUG(verbose,
1225                belch("@;~)  About to create Reval thread on behalf of %x", 
1226                      sender));
1227   
1228   tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1229   tso->priority=RevalPriority;
1230   tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1231   tso->revalTid =gagamap->payload.gc.gtid;
1232   scheduleThread(tso);
1233   context_switch = 1; // switch at the earliest opportunity
1234
1235 #endif
1236
1237
1238 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1239 //@subsection GUM Message Processor
1240
1241 /*
1242  * GUM Message Processor
1243
1244  * processMessages processes any messages that have arrived, calling
1245  * appropriate routines depending on the message tag
1246  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1247  * present and performs a blocking receive! During profiling it
1248  * busy-waits in order to record idle time.
1249  */
1250
1251 //@cindex processMessages
1252 rtsBool
1253 processMessages(void)
1254 {
1255   rtsPacket packet;
1256   OpCode opcode;
1257   GlobalTaskId task;
1258   rtsBool receivedFinish = rtsFalse;
1259
1260   do {
1261     packet = GetPacket();  /* Get next message; block until one available */
1262     getOpcodeAndSender(packet, &opcode, &task);
1263
1264     if (task==SysManTask) { 
1265       switch (opcode) { 
1266       case PP_PETIDS:
1267         processPEtids();
1268         break;
1269           
1270       case PP_FINISH:
1271         IF_PAR_DEBUG(verbose,
1272                      belch("==== received FINISH [%p]", mytid));
1273         /* this boolean value is returned and propagated to the main 
1274            scheduling loop, thus shutting-down this PE */
1275         receivedFinish = rtsTrue;
1276         break;  
1277           
1278       default:  
1279         barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1280       }
1281     } else if (taskIDtoPE(task)==0) { 
1282       /* When a new PE joins then potentially FISH & REVAL message may
1283          reach PES before they are notified of the new PEs existance.  The
1284          only solution is to bounce/fail these messages back to the sender.
1285          But we will worry about it once we start seeing these race
1286          conditions!  */
1287       switch (opcode) { 
1288       case PP_FISH:
1289         bounceFish();
1290         break;
1291 #ifdef DIST       
1292       case PP_REVAL:
1293         bounceReval();
1294         break;    
1295 #endif          
1296       case PP_PETIDS:
1297         belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1298         break;
1299         
1300       case PP_FINISH:   
1301         break;
1302         
1303       default:  
1304         belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1305       }
1306     } else
1307       switch (opcode) {
1308       case PP_FETCH:
1309         processFetch();
1310         // Global statistics: count no. of fetches
1311         if (RtsFlags.ParFlags.ParStats.Global &&
1312             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1313           globalParStats.rec_fetch_mess++;
1314         }
1315         break;
1316
1317       case PP_RESUME:
1318         processResume(task);
1319         // Global statistics: count no. of fetches
1320         if (RtsFlags.ParFlags.ParStats.Global &&
1321             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1322           globalParStats.rec_resume_mess++;
1323         }
1324         break;
1325
1326       case PP_ACK:
1327         processAck();
1328         break;
1329
1330       case PP_FISH:
1331         processFish();
1332         // Global statistics: count no. of fetches
1333         if (RtsFlags.ParFlags.ParStats.Global &&
1334             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1335           globalParStats.rec_fish_mess++;
1336         }
1337         break;
1338
1339       case PP_FREE:
1340         processFree();
1341         break;
1342       
1343       case PP_SCHEDULE:
1344         processSchedule(task);
1345         // Global statistics: count no. of fetches
1346         if (RtsFlags.ParFlags.ParStats.Global &&
1347             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1348           globalParStats.rec_schedule_mess++;
1349         }
1350         break;
1351       
1352 #ifdef DIST      
1353       case PP_REVAL:
1354         processReval(task);
1355         // Global statistics: count no. of fetches
1356         if (RtsFlags.ParFlags.ParStats.Global &&
1357             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1358           globalParStats.rec_reval_mess++;
1359         }
1360         break;
1361 #endif
1362       
1363       default:
1364         /* Anything we're not prepared to deal with. */
1365         barf("Task %x: Unexpected opcode %x from %x",
1366              mytid, opcode, task);
1367       } /* switch */
1368
1369   } while (PacketsWaiting());   /* While there are messages: process them */
1370   return receivedFinish;
1371 }                               /* processMessages */
1372
1373 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1374 //@subsection Miscellaneous Functions
1375
1376 /*
1377  * blockFetch blocks a BlockedFetch node on some kind of black hole.
1378  */
1379 //@cindex blockFetch
1380 void
1381 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1382   bf->node = bh;
1383   switch (get_itbl(bh)->type) {
1384   case BLACKHOLE:
1385     bf->link = END_BQ_QUEUE;
1386     //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1387     SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1388     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1389     
1390     // put bh on the mutables list
1391     recordMutable((StgMutClosure *)bh);
1392     break;
1393     
1394   case BLACKHOLE_BQ:
1395     /* enqueue bf on blocking queue of closure bh */
1396     bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1397     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1398
1399     // put bh on the mutables list; ToDo: check
1400     recordMutable((StgMutClosure *)bh);
1401     break;
1402
1403   case FETCH_ME_BQ:
1404     /* enqueue bf on blocking queue of closure bh */
1405     bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1406     ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1407
1408     // put bh on the mutables list; ToDo: check
1409     recordMutable((StgMutClosure *)bh);
1410     break;
1411     
1412   case RBH:
1413     /* enqueue bf on blocking queue of closure bh */
1414     bf->link = ((StgRBH *)bh)->blocking_queue;
1415     ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1416
1417     // put bh on the mutables list; ToDo: check
1418     recordMutable((StgMutClosure *)bh);
1419     break;
1420     
1421   default:
1422     barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1423          (StgClosure *)bh, get_itbl((StgClosure *)bh), 
1424          info_type((StgClosure *)bh));
1425   }
1426   IF_PAR_DEBUG(bq,
1427                belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1428                      bh, info_type(bh));
1429                print_bq(bh));
1430 }
1431
1432
1433 /*
1434   @blockThread@ is called from the main scheduler whenever tso returns with
1435   a ThreadBlocked return code; tso has already been added to a blocking
1436   queue (that's done in the entry code of the closure, because it is a 
1437   cheap operation we have to do in any case); the main purpose of this
1438   routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1439   closure, which is indicated by the tso.why_blocked field;
1440   we also write an entry into the log file if we are generating one
1441
1442   Should update exectime etc in the entry code already; but we don't have
1443   something like ``system time'' in the log file anyway, so this should
1444   even out the inaccuracies.
1445 */
1446
1447 //@cindex blockThread
1448 void
1449 blockThread(StgTSO *tso)
1450 {
1451   globalAddr *remote_ga=NULL;
1452   globalAddr *local_ga;
1453   globalAddr fmbq_ga;
1454
1455   // ASSERT(we are on some blocking queue)
1456   ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1457
1458   /*
1459     We have to check why this thread has been blocked.
1460   */
1461   switch (tso->why_blocked) {
1462     case BlockedOnGA:
1463       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1464          FETCH_ME entry code */
1465       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1466
1467       /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1468          end this point; if something (eg. GC) happens inbetween the whole
1469          thing will blow up 
1470          The problem is that the ga field of the FETCH_ME has been overwritten
1471          with the head of the blocking queue (which is tso). 
1472       */
1473       ASSERT(looks_like_ga(&theGlobalFromGA));
1474       // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1475       remote_ga = &theGlobalFromGA; //tso->link;
1476       tso->link = (StgTSO*)END_BQ_QUEUE;
1477       /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1478          we have to send a Fetch message here! */
1479       if (RtsFlags.ParFlags.ParStats.Full) {
1480         /* Note that CURRENT_TIME may perform an unsafe call */
1481         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1482         tso->par.fetchcount++;
1483         tso->par.blockedat = CURRENT_TIME;
1484         /* we are about to send off a FETCH message, so dump a FETCH event */
1485         DumpRawGranEvent(CURRENT_PROC, 
1486                          taskIDtoPE(remote_ga->payload.gc.gtid),
1487                          GR_FETCH, tso, tso->block_info.closure, 0, 0);
1488       }
1489       /* Phil T. claims that this was a workaround for a hard-to-find
1490        * bug, hence I'm leaving it out for now --SDM 
1491        */
1492       /* Assign a brand-new global address to the newly created FMBQ  */
1493       local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1494       splitWeight(&fmbq_ga, local_ga);
1495       ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1496       
1497       sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1498
1499       // Global statistics: count no. of fetches
1500       if (RtsFlags.ParFlags.ParStats.Global &&
1501           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1502         globalParStats.tot_fetch_mess++;
1503       }
1504
1505       IF_DEBUG(sanity,
1506                theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1507       break;
1508
1509     case BlockedOnGA_NoSend:
1510       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1511          FETCH_ME_BQ entry code */
1512       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1513
1514       /* Fetch message has been sent already */
1515       if (RtsFlags.ParFlags.ParStats.Full) {
1516         /* Note that CURRENT_TIME may perform an unsafe call */
1517         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1518         tso->par.blockcount++;
1519         tso->par.blockedat = CURRENT_TIME;
1520         /* dump a block event, because fetch has been sent already */
1521         DumpRawGranEvent(CURRENT_PROC, thisPE,
1522                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1523       }
1524       break;
1525
1526     case BlockedOnMVar:
1527     case BlockedOnBlackHole:
1528       /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via 
1529          BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1530       ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1531              get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1532              get_itbl(tso->block_info.closure)->type==RBH);
1533
1534       /* if collecting stats update the execution time etc */
1535       if (RtsFlags.ParFlags.ParStats.Full) {
1536         /* Note that CURRENT_TIME may perform an unsafe call */
1537         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1538         tso->par.blockcount++;
1539         tso->par.blockedat = CURRENT_TIME;
1540         DumpRawGranEvent(CURRENT_PROC, thisPE,
1541                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1542       }
1543       break;
1544
1545     case BlockedOnDelay:
1546       /* Whats sort of stats shall we collect for an explicit threadDelay? */
1547       IF_PAR_DEBUG(verbose,
1548                belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1549                      tso->id));
1550       break;
1551
1552     /* Check that the following is impossible to happen, indeed
1553     case BlockedOnException:
1554     case BlockedOnRead:
1555     case BlockedOnWrite:
1556     */
1557     default:
1558       barf("blockThread: impossible why_blocked code %d for TSO %d",
1559            tso->why_blocked, tso->id);
1560   }
1561
1562   IF_PAR_DEBUG(verbose,
1563                belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1564                      tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1565                      (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1566   
1567   IF_PAR_DEBUG(bq,
1568                print_bq(tso->block_info.closure));
1569 }
1570
1571 /*
1572  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1573  * Important properties:
1574  *   - it varies during execution, even if the PE is idle
1575  *   - it's different for each PE
1576  *   - we never send a fish to ourselves
1577  */
1578 extern long lrand48 (void);
1579
1580 //@cindex choosePE
1581 GlobalTaskId
1582 choosePE(void)
1583 {
1584   long temp;
1585
1586   temp = lrand48() % nPEs;
1587   if (allPEs[temp] == mytid) {  /* Never send a FISH to yourself */
1588     temp = (temp + 1) % nPEs;
1589   }
1590   return allPEs[temp];
1591 }
1592
1593 /* 
1594  * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1595  * of the ga argument; called from processFetch when the local closure is
1596  * under evaluation
1597  */
1598 //@cindex createBlockedFetch
1599 StgClosure *
1600 createBlockedFetch (globalAddr ga, globalAddr rga)
1601 {
1602   StgBlockedFetch *bf;
1603   StgClosure *closure;
1604
1605   closure = GALAlookup(&ga);
1606   if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1607     barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1608     GarbageCollect(GetRoots, rtsFalse); 
1609     closure = GALAlookup(&ga);
1610     bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1611     // ToDo: check whether really guaranteed to succeed 2nd time around
1612   }
1613
1614   ASSERT(bf != (StgBlockedFetch *)NULL);
1615   SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1616   // ToDo: check whether other header info is needed
1617   bf->node = closure;
1618   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1619   bf->ga.payload.gc.slot = rga.payload.gc.slot;
1620   bf->ga.weight = rga.weight;
1621   // bf->link = NULL;  debugging
1622
1623   IF_PAR_DEBUG(schedule,
1624                fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1625                        bf, info_type((StgClosure*)bf));
1626                printGA(&(bf->ga));
1627                fputc('\n',stderr));
1628   return (StgClosure *)bf;
1629 }
1630
1631 /*
1632  * waitForTermination enters a loop ignoring spurious messages while
1633  * waiting for the termination sequence to be completed.  
1634  */
1635 //@cindex waitForTermination
1636 void
1637 waitForTermination(void)
1638 {
1639   do {
1640     rtsPacket p = GetPacket();
1641     processUnexpectedMessage(p);
1642   } while (rtsTrue);
1643 }
1644
1645 #ifdef DEBUG
1646 //@cindex DebugPrintGAGAMap
1647 void
1648 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1649 {
1650   nat i;
1651   
1652   for (i = 0; i < nGAs; ++i, gagamap += 2)
1653     fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1654             gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1655             gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1656 }
1657
1658 //@cindex checkGAGAMap
1659 void
1660 checkGAGAMap(globalAddr *gagamap, int nGAs)
1661 {
1662   nat i;
1663   
1664   for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1665     ASSERT(looks_like_ga(gagamap));
1666     ASSERT(looks_like_ga(gagamap+1));
1667   }
1668 }
1669 #endif
1670
1671 //@cindex freeMsgBuffer
1672 static StgWord **freeMsgBuffer = NULL;
1673 //@cindex freeMsgIndex
1674 static nat      *freeMsgIndex  = NULL;
1675
1676 //@cindex prepareFreeMsgBuffers
1677 void
1678 prepareFreeMsgBuffers(void)
1679 {
1680   nat i;
1681   
1682   /* Allocate the freeMsg buffers just once and then hang onto them. */
1683   if (freeMsgIndex == NULL) {
1684     freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), 
1685                                           "prepareFreeMsgBuffers (Index)");
1686     freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
1687                                           "prepareFreeMsgBuffers (Buffer)");
1688     
1689     for(i = 0; i < nPEs; i++) 
1690       if (i != (thisPE-1)) 
1691         freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1692                                                "prepareFreeMsgBuffers (Buffer #i)");
1693       else
1694         freeMsgBuffer[i] = 0;
1695   }
1696   
1697   /* Initialize the freeMsg buffer pointers to point to the start of their
1698      buffers */
1699   for (i = 0; i < nPEs; i++)
1700     freeMsgIndex[i] = 0;
1701 }
1702
1703 //@cindex freeRemoteGA
1704 void
1705 freeRemoteGA(int pe, globalAddr *ga)
1706 {
1707   nat i;
1708   
1709   ASSERT(GALAlookup(ga) == NULL);
1710   
1711   if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1712     IF_PAR_DEBUG(free,
1713                  belch("!! Filled a free message buffer (sending remaining messages indivisually)"));   
1714
1715     sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1716     i = 0;
1717   }
1718   freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1719   freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1720   freeMsgIndex[pe] = i;
1721
1722   IF_DEBUG(sanity,
1723            ga->weight = 0xdead0add;
1724            ga->payload.gc.gtid = 0xbbbbbbbb;
1725            ga->payload.gc.slot = 0xbbbbbbbb;);
1726 }
1727
1728 //@cindex sendFreeMessages
1729 void
1730 sendFreeMessages(void)
1731 {
1732   nat i;
1733   
1734   for (i = 0; i < nPEs; i++) 
1735     if (freeMsgIndex[i] > 0)
1736       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1737 }
1738
1739 /* synchronises with the other PEs. Receives and records in a global
1740  * variable the task-id of SysMan. If this is the main thread (discovered
1741  * in main.lc), identifies itself to SysMan. Finally it receives
1742  * from SysMan an array of the Global Task Ids of each PE, which is
1743  * returned as the value of the function.
1744  */
1745
1746 #if defined(PAR_TICKY)
1747 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1748 //@cindex stats_CntFreeGA
1749 void
1750 stats_CntFreeGA (void) {  // stats only
1751
1752   // Global statistics: residency of thread and spark pool
1753   if (RtsFlags.ParFlags.ParStats.Global &&
1754       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1755     nat i, s;
1756   
1757     globalParStats.cnt_free_GA++;
1758     for (i = 0, s = 0; i < nPEs; i++) 
1759       s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1760
1761     if ( s > globalParStats.res_free_GA )
1762       globalParStats.res_free_GA = s;
1763   }
1764 }
1765 #endif /* PAR_TICKY */
1766
1767 #endif /* PAR -- whole file */
1768
1769 //@node Index,  , Miscellaneous Functions, High Level Communications Routines
1770 //@subsection Index
1771
1772 //@index
1773 //* ACK::  @cindex\s-+ACK
1774 //* DebugPrintGAGAMap::  @cindex\s-+DebugPrintGAGAMap
1775 //* FETCH::  @cindex\s-+FETCH
1776 //* FISH::  @cindex\s-+FISH
1777 //* FREE::  @cindex\s-+FREE
1778 //* RESUME::  @cindex\s-+RESUME
1779 //* SCHEDULE::  @cindex\s-+SCHEDULE
1780 //* blockFetch::  @cindex\s-+blockFetch
1781 //* choosePE::  @cindex\s-+choosePE
1782 //* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
1783 //* freeMsgIndex::  @cindex\s-+freeMsgIndex
1784 //* freeRemoteGA::  @cindex\s-+freeRemoteGA
1785 //* gumPackBuffer::  @cindex\s-+gumPackBuffer
1786 //* initMoreBuffers::  @cindex\s-+initMoreBuffers
1787 //* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
1788 //* processAck::  @cindex\s-+processAck
1789 //* processFetch::  @cindex\s-+processFetch
1790 //* processFetches::  @cindex\s-+processFetches
1791 //* processFish::  @cindex\s-+processFish
1792 //* processFree::  @cindex\s-+processFree
1793 //* processMessages::  @cindex\s-+processMessages
1794 //* processResume::  @cindex\s-+processResume
1795 //* processSchedule::  @cindex\s-+processSchedule
1796 //* sendAck::  @cindex\s-+sendAck
1797 //* sendFetch::  @cindex\s-+sendFetch
1798 //* sendFish::  @cindex\s-+sendFish
1799 //* sendFree::  @cindex\s-+sendFree
1800 //* sendFreeMessages::  @cindex\s-+sendFreeMessages
1801 //* sendResume::  @cindex\s-+sendResume
1802 //* sendSchedule::  @cindex\s-+sendSchedule
1803 //* unpackAck::  @cindex\s-+unpackAck
1804 //* unpackFetch::  @cindex\s-+unpackFetch
1805 //* unpackFish::  @cindex\s-+unpackFish
1806 //* unpackFree::  @cindex\s-+unpackFree
1807 //* unpackResume::  @cindex\s-+unpackResume
1808 //* unpackSchedule::  @cindex\s-+unpackSchedule
1809 //* waitForTermination::  @cindex\s-+waitForTermination
1810 //@end index