[project @ 2001-03-22 03:51:08 by hwloidl]
[ghc-hetmet.git] / ghc / rts / parallel / HLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
3  * $Id: HLComms.c,v 1.4 2001/03/22 03:51:11 hwloidl Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  * Contains the high-level routines (i.e. communication
8  * subsystem independent) used by GUM
9  * 
10  * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
11  * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
12  * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
13  * 
14  * ------------------------------------------------------------------------- */
15
16 #ifdef PAR /* whole file */
17
18 //@node High Level Communications Routines, , ,
19 //@section High Level Communications Routines
20
21 //@menu
22 //* Macros etc::                
23 //* Includes::                  
24 //* GUM Message Sending and Unpacking Functions::  
25 //* Message-Processing Functions::  
26 //* GUM Message Processor::     
27 //* Miscellaneous Functions::   
28 //* Index::                     
29 //@end menu
30
31 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
32 //@subsection Macros etc
33
34 # ifndef _AIX
35 # define NON_POSIX_SOURCE /* so says Solaris */
36 # endif
37
38 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
39 //@subsection Includes
40
41 #include "Rts.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "Storage.h"   // for recordMutable
45 #include "HLC.h"
46 #include "Parallel.h"
47 #include "GranSimRts.h"
48 #include "ParallelRts.h"
49 #include "Sparks.h"
50 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
51 #if defined(DEBUG)
52 # include "ParallelDebug.h"
53 #endif
54 #include "StgMacros.h" // inlined IS_... fcts
55
56 #ifdef DIST
57 #include "SchedAPI.h" //for createIOThread
58 extern unsigned int context_switch; 
59 #endif DIST
60
61 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
62 //@subsection GUM Message Sending and Unpacking Functions
63
64 /*
65  * GUM Message Sending and Unpacking Functions
66  */
67
68 /*
69  * Allocate space for message processing
70  */
71
72 //@cindex gumPackBuffer
73 static rtsPackBuffer *gumPackBuffer;
74
75 //@cindex initMoreBuffers
76 rtsBool
77 initMoreBuffers(void)
78 {
79   if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize, 
80                                              "initMoreBuffers")) == NULL)
81     return rtsFalse;
82   return rtsTrue;
83 }
84
85 /*
86  * SendFetch packs the two global addresses and a load into a message +
87  * sends it.  
88
89 //@cindex FETCH
90
91    Structure of a FETCH message:
92
93          |    GA 1     |        GA 2          |
94          +------------------------------------+------+
95          | gtid | slot | weight | gtid | slot | load |
96          +------------------------------------+------+
97  */
98
99 //@cindex sendFetch
100 void
101 sendFetch(globalAddr *rga, globalAddr *lga, int load)
102 {
103   ASSERT(rga->weight > 0 && lga->weight > 0);
104   IF_PAR_DEBUG(fetch,
105                belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
106                      rga->payload.gc.gtid, rga->payload.gc.slot, 
107                      lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
108                      load));
109
110
111   /* ToDo: Dump event
112   DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid), 
113                    GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
114                    0, spark_queue_len(ADVISORY_POOL));
115   */
116
117   sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
118           (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot, 
119           (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid, 
120           (StgWord) lga->payload.gc.slot, (StgWord) load);
121 }
122
123 /*
124  * unpackFetch unpacks a FETCH message into two Global addresses and a load
125  * figure.  
126 */
127
128 //@cindex unpackFetch
129 static void
130 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
131 {
132   long buf[6];
133
134   GetArgs(buf, 6); 
135
136   IF_PAR_DEBUG(fetch,
137                belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
138                      (GlobalTaskId) buf[0], (int) buf[1], 
139                      (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
140
141   lga->weight = 1;
142   lga->payload.gc.gtid = (GlobalTaskId) buf[0];
143   lga->payload.gc.slot = (int) buf[1];
144
145   rga->weight = (unsigned) buf[2];
146   rga->payload.gc.gtid = (GlobalTaskId) buf[3];
147   rga->payload.gc.slot = (int) buf[4];
148
149   *load = (int) buf[5];
150
151   ASSERT(rga->weight > 0);
152 }
153
154 /*
155  * SendResume packs the remote blocking queue's GA and data into a message 
156  * and sends it.
157
158 //@cindex RESUME
159
160    Structure of a RESUME message:
161
162       -------------------------------
163       | weight | slot | n | data ...
164       -------------------------------
165
166    data is a packed graph represented as an rtsPackBuffer
167    n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
168  */
169
170 //@cindex sendResume
171 void
172 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
173 {
174   IF_PAR_DEBUG(fetch,
175                belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]", 
176                      packBuffer->id, nelem,
177                      rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
178                      rga->payload.gc.gtid));
179   IF_PAR_DEBUG(packet,
180                PrintPacket(packBuffer));
181
182   ASSERT(nelem==packBuffer->size);
183   /* check for magic end-of-buffer word */
184   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
185
186   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
187            nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer, 
188            2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
189 }
190
191 /*
192  * unpackResume unpacks a Resume message into two Global addresses and
193  * a data array.
194  */
195
196 //@cindex unpackResume
197 static void
198 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
199 {
200     long buf[3];
201
202     GetArgs(buf, 3); 
203
204     /*
205       RESUME event is written in awaken_blocked_queue
206     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
207                      GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
208     */
209
210     lga->weight = (unsigned) buf[0];
211     lga->payload.gc.gtid = mytid;
212     lga->payload.gc.slot = (int) buf[1];
213
214     *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
215     GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
216
217     IF_PAR_DEBUG(fetch,
218                  belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))", 
219                        packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
220
221     /* check for magic end-of-buffer word */
222     IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
223 }
224
225 /*
226  * SendAck packs the global address being acknowledged, together with
227  * an array of global addresses for any closures shipped and sends them.
228
229 //@cindex ACK
230
231    Structure of an ACK message:
232
233       |        GA 1          |        GA 2          | 
234       +---------------------------------------------+-------
235       | weight | gtid | slot | weight | gtid | slot |  .....  ngas times
236       + --------------------------------------------+------- 
237
238  */
239
240 //@cindex sendAck
241 void
242 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
243 {
244   static long *buffer;
245   long *p;
246   int i;
247
248   if(ngas==0)
249     return; //don't send unnecessary messages!!
250   
251   buffer = (long *) gumPackBuffer;
252
253   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
254     ASSERT(gagamap[1].weight > 0);
255     p[0] = (long) gagamap->weight;
256     p[1] = (long) gagamap->payload.gc.gtid;
257     p[2] = (long) gagamap->payload.gc.slot;
258     gagamap++;
259     p[3] = (long) gagamap->weight;
260     p[4] = (long) gagamap->payload.gc.gtid;
261     p[5] = (long) gagamap->payload.gc.slot;
262     gagamap++;
263   }
264   IF_PAR_DEBUG(schedule,
265                belch("~^,, Sending Ack (%d pairs) to [%x]\n", 
266                      ngas, task));
267
268   sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
269 }
270
271 /*
272  * unpackAck unpacks an Acknowledgement message into a Global address,
273  * a count of the number of global addresses following and a map of 
274  * Global addresses
275  */
276
277 //@cindex unpackAck
278 static void
279 unpackAck(int *ngas, globalAddr *gagamap)
280 {
281   long GAarraysize;
282   long buf[6];
283   
284   GetArgs(&GAarraysize, 1);
285   
286   *ngas = GAarraysize / 6;
287   
288   IF_PAR_DEBUG(schedule,
289                belch("~^,, Unpacking Ack (%d pairs) on [%x]\n", 
290                      *ngas, mytid));
291
292   while (GAarraysize > 0) {
293     GetArgs(buf, 6);
294     gagamap->weight = (rtsWeight) buf[0];
295     gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
296     gagamap->payload.gc.slot = (int) buf[2];
297     gagamap++;
298     gagamap->weight = (rtsWeight) buf[3];
299     gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
300     gagamap->payload.gc.slot = (int) buf[5];
301     ASSERT(gagamap->weight > 0);
302     gagamap++;
303     GAarraysize -= 6;
304   }
305 }
306
307 /*
308  * SendFish packs the global address being acknowledged, together with
309  * an array of global addresses for any closures shipped and sends them.
310
311 //@cindex FISH
312
313  Structure of a FISH message:
314
315      +----------------------------------+
316      | orig PE | age | history | hunger |
317      +----------------------------------+
318  */
319
320 //@cindex sendFish
321 void
322 sendFish(GlobalTaskId destPE, GlobalTaskId origPE, 
323          int age, int history, int hunger)
324 {
325   IF_PAR_DEBUG(fish,
326                belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)", 
327                      destPE, outstandingFishes));
328
329   sendOpV(PP_FISH, destPE, 4, 
330           (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
331
332   if (origPE == mytid) {
333     //fishing = rtsTrue;
334     outstandingFishes++;
335   }
336 }
337
338 /*
339  * unpackFish unpacks a FISH message into the global task id of the
340  * originating PE and 3 data fields: the age, history and hunger of the
341  * fish. The history + hunger are not currently used.
342
343  */
344
345 //@cindex unpackFish
346 static void
347 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
348 {
349   long buf[4];
350   
351   GetArgs(buf, 4);
352   
353   IF_PAR_DEBUG(fish,
354                belch("~^$$ Unpacking Fish from [%x] (age=%d)", 
355                      (GlobalTaskId) buf[0], (int) buf[1]));
356
357   *origPE = (GlobalTaskId) buf[0];
358   *age = (int) buf[1];
359   *history = (int) buf[2];
360   *hunger = (int) buf[3];
361 }
362
363 /*
364  * SendFree sends (weight, slot) pairs for GAs that we no longer need
365  * references to.  
366
367 //@cindex FREE
368
369    Structure of a FREE message:
370    
371        +-----------------------------
372        | n | weight_1 | slot_1 | ...
373        +-----------------------------
374  */
375 //@cindex sendFree
376 void
377 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
378 {
379     IF_PAR_DEBUG(free,
380                  belch("~^!! Sending Free (%d GAs) to [%x]", 
381                        nelem/2, pe));
382
383     sendOpN(PP_FREE, pe, nelem, data);
384 }
385
386 /*
387  * unpackFree unpacks a FREE message into the amount of data shipped and
388  * a data block.
389  */
390 //@cindex unpackFree
391 static void
392 unpackFree(int *nelem, StgWord *data)
393 {
394   long buf[1];
395   
396   GetArgs(buf, 1);
397   *nelem = (int) buf[0];
398
399   IF_PAR_DEBUG(free,
400                belch("~^!! Unpacking Free (%d GAs)", 
401                      *nelem/2));
402
403   GetArgs(data, *nelem);
404 }
405
406 /*
407  * SendSchedule sends a closure to be evaluated in response to a Fish
408  * message. The message is directed to the PE that originated the Fish
409  * (origPE), and includes the packed closure (data) along with its size
410  * (nelem).
411
412 //@cindex SCHEDULE
413
414    Structure of a SCHEDULE message:
415
416        +------------------------------------
417        | PE | n | pack buffer of a graph ...
418        +------------------------------------
419  */
420 //@cindex sendSchedule
421 void
422 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
423 {
424   IF_PAR_DEBUG(schedule,
425                belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n", 
426                      packBuffer->id, nelem, origPE));
427   IF_PAR_DEBUG(packet,
428                PrintPacket(packBuffer));
429
430   ASSERT(nelem==packBuffer->size);
431   /* check for magic end-of-buffer word */
432   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
433
434   sendOpN(PP_SCHEDULE, origPE, 
435           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
436 }
437
438 /*
439  * unpackSchedule unpacks a SCHEDULE message into the Global address of
440  * the closure shipped, the amount of data shipped (nelem) and the data
441  * block (data).
442  */
443
444 //@cindex unpackSchedule
445 static void
446 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
447 {
448   long buf[1];
449
450   /* first, just unpack 1 word containing the total size (including header) */
451   GetArgs(buf, 1);
452   /* no. of elems, not counting the header of the pack buffer */
453   *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
454
455   /* automatic cast of flat pvm-data to rtsPackBuffer */
456   GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
457
458   IF_PAR_DEBUG(schedule,
459                belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n", 
460                      packBuffer->id, *nelem, mytid));
461
462   ASSERT(*nelem==packBuffer->size);
463   /* check for magic end-of-buffer word */
464   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
465 }
466
467 #ifdef DIST
468 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
469 void
470 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
471 {  
472   IF_PAR_DEBUG(schedule,
473                belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n", 
474                      packBuffer->id, nelem, origPE));
475   IF_PAR_DEBUG(packet,
476                PrintPacket(packBuffer));
477
478   ASSERT(nelem==packBuffer->size);
479   /* check for magic end-of-buffer word */
480   IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
481
482   sendOpN(PP_REVAL, origPE, 
483           nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
484 }
485
486 void FinishReval(StgTSO *t)
487 { StgClosure *res;
488   globalAddr ga;
489   nat size;
490   rtsPackBuffer *buffer=NULL;
491   
492   ga.payload.gc.slot = t->revalSlot;
493   ga.payload.gc.gtid = t->revalTid;
494   ga.weight = 0; 
495   
496   //find where the reval result is
497   res = GALAlookup(&ga);
498   ASSERT(res);
499   
500   IF_PAR_DEBUG(schedule,
501     printGA(&ga);
502     belch(" needs the result %08x\n",res));       
503   
504   //send off the result
505   buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
506   ASSERT(buffer != (rtsPackBuffer *)NULL);
507   sendResume(&ga, size, buffer);
508
509   IF_PAR_DEBUG(schedule,
510     belch("@;~) Reval Finished"));
511 }
512
513 #endif DIST
514
515 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
516 //@subsection Message-Processing Functions
517
518 /*
519  * Message-Processing Functions
520  *
521  * The following routines process incoming GUM messages. Often reissuing
522  * messages in response.
523  *
524  * processFish unpacks a fish message, reissuing it if it's our own,
525  * sending work if we have it or sending it onwards otherwise.
526  */
527
528 /*
529  * processFetches constructs and sends resume messages for every
530  * BlockedFetch which is ready to be awakened.
531  * awaken_blocked_queue (in Schedule.c) is responsible for moving 
532  * BlockedFetches from a blocking queue to the PendingFetches queue.
533  */
534 void GetRoots(void);
535 extern StgBlockedFetch *PendingFetches;
536
537 nat
538 pending_fetches_len(void)
539 {
540   StgBlockedFetch *bf;
541   nat n;
542
543   for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
544     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
545   }
546   return n;
547 }
548
549 //@cindex processFetches
550 void
551 processFetches(void) {
552   StgBlockedFetch *bf, *next;
553   StgClosure *closure;
554   StgInfoTable *ip;
555   globalAddr rga;
556   static rtsPackBuffer *packBuffer;
557     
558   IF_PAR_DEBUG(verbose,
559                belch("____ processFetches: %d pending fetches (root @ %p)",
560                      pending_fetches_len(), PendingFetches));
561   
562   for (bf = PendingFetches; 
563        bf != END_BF_QUEUE;
564        bf=next) {
565     /* the PendingFetches list contains only BLOCKED_FETCH closures */
566     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
567     /* store link (we might overwrite it via blockFetch later on */
568     next = (StgBlockedFetch *)(bf->link);
569
570     /*
571      * Find the target at the end of the indirection chain, and
572      * process it in much the same fashion as the original target
573      * of the fetch.  Though we hope to find graph here, we could
574      * find a black hole (of any flavor) or even a FetchMe.
575      */
576     closure = bf->node;
577     /*
578       We evacuate BQs and update the node fields where necessary in GC.c
579       So, if we find an EVACUATED closure, something has gone Very Wrong
580       (and therefore we let the RTS crash most ungracefully).
581     */
582     ASSERT(get_itbl(closure)->type != EVACUATED);
583       //  closure = ((StgEvacuated *)closure)->evacuee;
584
585     closure = UNWIND_IND(closure);
586     //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
587
588     ip = get_itbl(closure);
589     if (ip->type == FETCH_ME) {
590       /* Forward the Fetch to someone else */
591       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
592       rga.payload.gc.slot = bf->ga.payload.gc.slot;
593       rga.weight = bf->ga.weight;
594       
595       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
596
597       // Global statistics: count no. of fetches
598       if (RtsFlags.ParFlags.ParStats.Global &&
599           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
600         globalParStats.tot_fetch_mess++;
601       }
602
603       IF_PAR_DEBUG(fetch,
604                    belch("__-> processFetches: Forwarding fetch from %lx to %lx",
605                          mytid, rga.payload.gc.gtid));
606
607     } else if (IS_BLACK_HOLE(closure)) {
608       IF_PAR_DEBUG(verbose,
609                    belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
610                          closure, info_type(closure)));
611       bf->node = closure;
612       blockFetch(bf, closure);
613     } else {
614       /* We now have some local graph to send back */
615       nat size;
616
617       packBuffer = gumPackBuffer;
618       IF_PAR_DEBUG(verbose,
619                    belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
620                          closure, info_type(closure)));
621
622       if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
623         // Put current BF back on list
624         bf->link = (StgBlockingQueueElement *)PendingFetches;
625         PendingFetches = (StgBlockedFetch *)bf;
626         // ToDo: check that nothing more has to be done to prepare for GC!
627         barf("processFetches: out of heap while packing graph; ToDo: call GC here");
628         GarbageCollect(GetRoots, rtsFalse); 
629         bf = PendingFetches;
630         PendingFetches = (StgBlockedFetch *)(bf->link);
631         closure = bf->node;
632         packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
633         ASSERT(packBuffer != (rtsPackBuffer *)NULL);
634       }
635       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
636       rga.payload.gc.slot = bf->ga.payload.gc.slot;
637       rga.weight = bf->ga.weight;
638       
639       sendResume(&rga, size, packBuffer);
640
641       // Global statistics: count no. of fetches
642       if (RtsFlags.ParFlags.ParStats.Global &&
643           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
644         globalParStats.tot_resume_mess++;
645       }
646     }
647   }
648   PendingFetches = END_BF_QUEUE;
649 }
650
651 #if 0
652 /*
653   Alternatively to sending fetch messages directly from the FETCH_ME_entry
654   code we could just store the data about the remote data in a global
655   variable and send the fetch request from the main scheduling loop (similar
656   to processFetches above). This would save an expensive STGCALL in the entry 
657   code because we have to go back to the scheduler anyway.
658 */
659 //@cindex processFetches
660 void
661 processTheRealFetches(void) {
662   StgBlockedFetch *bf;
663   StgClosure *closure, *next;
664     
665   IF_PAR_DEBUG(verbose,
666                belch("__ processTheRealFetches: ");
667                printGA(&theGlobalFromGA);
668                printGA(&theGlobalToGA));
669
670   ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
671          theGlobalToGA.payload.gc.gtid != 0);
672
673   /* the old version did this in the FETCH_ME entry code */
674   sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
675   
676 }
677 #endif
678
679
680 /* 
681    Way of dealing with unwanted fish.
682    Used during startup/shutdown, or from unknown PEs 
683 */
684 void
685 bounceFish(void) { 
686   GlobalTaskId origPE;
687   int age, history, hunger;
688   
689   /* IF_PAR_DEBUG(verbose, */
690                belch(".... [%x] Bouncing unwanted FISH",mytid);
691
692   unpackFish(&origPE, &age, &history, &hunger);
693           
694   if (origPE == mytid) {
695     //fishing = rtsFalse;                   // fish has come home
696     outstandingFishes--;
697     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
698     return;                               // that's all
699   }
700
701   /* otherwise, send it home to die */
702   sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
703   // Global statistics: count no. of fetches
704       if (RtsFlags.ParFlags.ParStats.Global &&
705           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
706         globalParStats.tot_fish_mess++;
707       }
708 }
709    
710 /*
711  * processFish unpacks a fish message, reissuing it if it's our own,
712  * sending work if we have it or sending it onwards otherwise.
713  */
714 //@cindex processFish
715 static void
716 processFish(void)
717 {
718   GlobalTaskId origPE;
719   int age, history, hunger;
720   rtsSpark spark;
721   static rtsPackBuffer *packBuffer; 
722
723   unpackFish(&origPE, &age, &history, &hunger);
724
725   if (origPE == mytid) {
726     //fishing = rtsFalse;                   // fish has come home
727     outstandingFishes--;
728     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
729     return;                               // that's all
730   }
731
732   ASSERT(origPE != mytid);
733   IF_PAR_DEBUG(fish,
734                belch("$$__ processing fish; %d sparks available",
735                      spark_queue_len(&(MainRegTable.rSparks))));
736   while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
737     nat size;
738     // StgClosure *graph;
739
740     packBuffer = gumPackBuffer; 
741     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
742     if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
743       IF_PAR_DEBUG(fish,
744                    belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
745                          (StgClosure *)spark));
746       barf("processFish: out of heap while packing graph; ToDo: call GC here");
747       GarbageCollect(GetRoots, rtsFalse);
748       /* Now go back and try again */
749     } else {
750       IF_PAR_DEBUG(verbose,
751                    if (RtsFlags.ParFlags.ParStats.Sparks)
752                      belch("==== STEALING spark %x; sending to %x", spark, origPE));
753       
754       IF_PAR_DEBUG(fish,
755                    belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
756                          origPE, 
757                          (StgClosure *)spark, info_type((StgClosure *)spark)));
758       sendSchedule(origPE, size, packBuffer);
759       disposeSpark(spark);
760       // Global statistics: count no. of fetches
761       if (RtsFlags.ParFlags.ParStats.Global &&
762           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
763         globalParStats.tot_schedule_mess++;
764       }
765
766       break;
767     }
768   }
769   if (spark == (rtsSpark)NULL) {
770     IF_PAR_DEBUG(fish,
771                  belch("$$^^ No sparks available for FISH from %x",
772                        origPE));
773     /* We have no sparks to give */
774     if (age < FISH_LIFE_EXPECTANCY) {
775       /* and the fish is atill young, send it to another PE to look for work */
776       sendFish(choosePE(), origPE,
777                (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
778
779       // Global statistics: count no. of fetches
780       if (RtsFlags.ParFlags.ParStats.Global &&
781           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
782         globalParStats.tot_fish_mess++;
783       }
784     } else { /* otherwise, send it home to die */
785       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
786       // Global statistics: count no. of fetches
787       if (RtsFlags.ParFlags.ParStats.Global &&
788           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
789         globalParStats.tot_fish_mess++;
790       }
791     }
792   }
793 }  /* processFish */
794
795 /*
796  * processFetch either returns the requested data (if available) 
797  * or blocks the remote blocking queue on a black hole (if not).
798  */
799
800 //@cindex processFetch
801 static void
802 processFetch(void)
803 {
804   globalAddr ga, rga;
805   int load;
806   StgClosure *closure;
807   StgInfoTable *ip;
808
809   unpackFetch(&ga, &rga, &load);
810   IF_PAR_DEBUG(fetch,
811                belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
812                      ga.payload.gc.gtid, ga.payload.gc.slot,
813                      rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
814                      rga.payload.gc.gtid));
815
816   closure = GALAlookup(&ga);
817   ASSERT(closure != (StgClosure *)NULL);
818   ip = get_itbl(closure);
819   if (ip->type == FETCH_ME) {
820     /* Forward the Fetch to someone else */
821     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
822
823     // Global statistics: count no. of fetches
824     if (RtsFlags.ParFlags.ParStats.Global &&
825         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
826       globalParStats.tot_fetch_mess++;
827     }
828   } else if (rga.payload.gc.gtid == mytid) {
829     /* Our own FETCH forwarded back around to us */
830     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
831     
832     IF_PAR_DEBUG(fetch,
833                  belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
834                        closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
835     /* We may have already discovered that the fetch target is our own. */
836     if ((StgClosure *)fmbq != closure) 
837       CommonUp((StgClosure *)fmbq, closure);
838     (void) addWeight(&rga);
839   } else if (IS_BLACK_HOLE(closure)) {
840     /* This includes RBH's and FMBQ's */
841     StgBlockedFetch *bf;
842
843     /* Can we assert something on the remote GA? */
844     ASSERT(GALAlookup(&rga) == NULL);
845
846     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
847        closure into the BQ in order to denote that when updating this node
848        the result should be sent to the originator of this fetch message. */
849     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
850     IF_PAR_DEBUG(fetch,
851                  belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
852                        rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
853                        closure, info_type(closure)));
854     blockFetch(bf, closure);
855   } else {                      
856     /* The target of the FetchMe is some local graph */
857     nat size;
858     // StgClosure *graph;
859     rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
860
861     if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
862       barf("processFetch: out of heap while packing graph; ToDo: call GC here");
863       GarbageCollect(GetRoots, rtsFalse); 
864       closure = GALAlookup(&ga);
865       buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
866       ASSERT(buffer != (rtsPackBuffer *)NULL);
867     }
868     sendResume(&rga, size, buffer);
869
870     // Global statistics: count no. of fetches
871     if (RtsFlags.ParFlags.ParStats.Global &&
872         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
873       globalParStats.tot_resume_mess++;
874     }
875   }
876 }
877
878 /* 
879    The list of pending fetches must be a root-list for GC.
880    This routine is called from GC.c (same as marking GAs etc).
881 */
882 void
883 markPendingFetches(rtsBool major_gc) {
884
885   /* No need to traverse the list; this is done via the scavenge code
886      for a BLOCKED_FETCH closure, which evacuates the link field */
887
888   if (PendingFetches != END_BF_QUEUE ) {
889     IF_PAR_DEBUG(tables,
890                  fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
891                          PendingFetches));
892
893     PendingFetches = MarkRoot((StgClosure*)PendingFetches);
894
895     IF_PAR_DEBUG(verbose,
896                  fprintf(stderr, " %p\n", PendingFetches));
897
898   } else {
899     IF_PAR_DEBUG(tables,
900                  fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
901   }
902 }
903
904 /*
905  * processFree unpacks a FREE message and adds the weights to our GAs.
906  */
907 //@cindex processFree
908 static void
909 processFree(void)
910 {
911   int nelem;
912   static StgWord *buffer;
913   int i;
914   globalAddr ga;
915
916   buffer = (StgWord *)gumPackBuffer;
917   unpackFree(&nelem, buffer);
918   IF_PAR_DEBUG(free,
919                belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
920
921   ga.payload.gc.gtid = mytid;
922   for (i = 0; i < nelem;) {
923     ga.weight = (rtsWeight) buffer[i++];
924     ga.payload.gc.slot = (int) buffer[i++];
925     IF_PAR_DEBUG(free,
926                  fprintf(stderr, "!!-- Processing free "); 
927                  printGA(&ga);
928                  fputc('\n', stderr);
929                  );
930     (void) addWeight(&ga);
931   }
932 }
933
934 /*
935  * processResume unpacks a RESUME message into the graph, filling in
936  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
937  * FetchMe (now a blocking queue) are awakened, and the blocking queue
938  * is converted into an indirection.  Finally it sends an ACK in response
939  * which contains any newly allocated GAs.
940  */
941
942 //@cindex processResume
943 static void
944 processResume(GlobalTaskId sender)
945 {
946   int nelem;
947   nat nGAs;
948   static rtsPackBuffer *packBuffer;
949   StgClosure *newGraph, *old;
950   globalAddr lga;
951   globalAddr *gagamap;
952   
953   packBuffer = (rtsPackBuffer *)gumPackBuffer;
954   unpackResume(&lga, &nelem, packBuffer);
955
956   IF_PAR_DEBUG(fetch,
957                fprintf(stderr, "[]__ Rcvd Resume for "); 
958                printGA(&lga);
959                fputc('\n', stderr));
960   IF_PAR_DEBUG(packet,
961                PrintPacket((rtsPackBuffer *)packBuffer));
962   
963   /* 
964    * We always unpack the incoming graph, even if we've received the
965    * requested node in some other data packet (and already awakened
966    * the blocking queue).
967   if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
968     ReallyPerformThreadGC(packBuffer[0], rtsFalse);
969     SAVE_Hp -= packBuffer[0];
970   }
971    */
972
973   // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
974
975   /* Do this *after* GC; we don't want to release the object early! */
976
977   if (lga.weight > 0)
978     (void) addWeight(&lga);
979
980   old = GALAlookup(&lga);
981
982   /* ToDo:  The closure that requested this graph must be one of these two?*/
983   ASSERT(get_itbl(old)->type == FETCH_ME_BQ || 
984          get_itbl(old)->type == RBH);
985
986   if (RtsFlags.ParFlags.ParStats.Full) {
987     StgBlockingQueueElement *bqe, *last_bqe;
988
989     IF_PAR_DEBUG(fetch,
990                  belch("[]-- Resume is REPLY to closure %lx", old));
991
992     /* Write REPLY events to the log file, indicating that the remote
993        data has arrived 
994        NB: we emit a REPLY only for the *last* elem in the queue; this is
995            the one that triggered the fetch message; all other entries
996            have just added themselves to the queue, waiting for the data 
997            they know that has been requested (see entry code for FETCH_ME_BQ)
998     */
999     if ((get_itbl(old)->type == FETCH_ME_BQ ||
1000          get_itbl(old)->type == RBH)) {
1001       for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1002            last_bqe = END_BQ_QUEUE;
1003              get_itbl(bqe)->type==TSO || 
1004              get_itbl(bqe)->type==BLOCKED_FETCH;
1005            last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1006
1007       ASSERT(last_bqe==END_BQ_QUEUE || 
1008              get_itbl((StgClosure *)last_bqe)->type == TSO);
1009
1010       /* last_bqe now points to the TSO that triggered the FETCH */ 
1011       if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1012         DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
1013                          GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1014                          0, spark_queue_len(&(MainRegTable.rSparks)));
1015     }
1016   }
1017
1018   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1019   ASSERT(newGraph != NULL);
1020
1021   /* 
1022    * Sometimes, unpacking will common up the resumee with the
1023    * incoming graph, but if it hasn't, we'd better do so now.
1024    */
1025    
1026   if (get_itbl(old)->type == FETCH_ME_BQ)
1027     CommonUp(old, newGraph);
1028
1029   IF_PAR_DEBUG(fetch,
1030                belch("[]-- Ready to resume unpacked graph at %p (%s)",
1031                      newGraph, info_type(newGraph)));
1032
1033   IF_PAR_DEBUG(tables,
1034                DebugPrintGAGAMap(gagamap, nGAs));
1035   
1036   sendAck(sender, nGAs, gagamap);
1037 }
1038
1039 /*
1040  * processSchedule unpacks a SCHEDULE message into the graph, filling
1041  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1042  * the local spark queue.  Finally it sends an ACK in response
1043  * which contains any newly allocated GAs.
1044  */
1045 //@cindex processSchedule
1046 static void
1047 processSchedule(GlobalTaskId sender)
1048 {
1049   nat nelem, nGAs;
1050   rtsBool success;
1051   static rtsPackBuffer *packBuffer;
1052   StgClosure *newGraph;
1053   globalAddr *gagamap;
1054   
1055   packBuffer = gumPackBuffer;           /* HWL */
1056   unpackSchedule(&nelem, packBuffer);
1057
1058   IF_PAR_DEBUG(schedule,
1059                belch("--__ Rcvd Schedule (%d elems)", nelem));
1060   IF_PAR_DEBUG(packet,
1061                PrintPacket(packBuffer));
1062
1063   /*
1064    * For now, the graph is a closure to be sparked as an advisory
1065    * spark, but in future it may be a complete spark with
1066    * required/advisory status, priority etc.
1067    */
1068
1069   /*
1070   space_required = packBuffer[0];
1071   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1072     ReallyPerformThreadGC(space_required, rtsFalse);
1073     SAVE_Hp -= space_required;
1074   }
1075   */
1076   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1077   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1078   ASSERT(newGraph != NULL);
1079   success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1080
1081   if (RtsFlags.ParFlags.ParStats.Full && 
1082       RtsFlags.ParFlags.ParStats.Sparks && 
1083       success) 
1084     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1085                      GR_STOLEN, ((StgTSO *)NULL), newGraph, 
1086                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1087
1088   IF_PAR_DEBUG(schedule,
1089                if (success)
1090                  belch("--^^  added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)", 
1091                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1092                else
1093                  belch("--^^  received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]", 
1094                      newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1095   IF_PAR_DEBUG(packet,
1096                belch("*<    Unpacked graph with root at %p (%s):", 
1097                      newGraph, info_type(newGraph));
1098                PrintGraph(newGraph, 0));
1099
1100   IF_PAR_DEBUG(tables,
1101                DebugPrintGAGAMap(gagamap, nGAs));
1102
1103   sendAck(sender, nGAs, gagamap);
1104
1105   //fishing = rtsFalse;
1106   ASSERT(outstandingFishes>0);
1107   outstandingFishes--;
1108 }
1109
1110 /*
1111  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1112  * (which represent shared thunks that have been shipped) into fetch-mes
1113  * to remote GAs.
1114  */
1115 //@cindex processAck
1116 static void
1117 processAck(void)
1118 {
1119   nat nGAs;
1120   globalAddr *gaga;
1121   globalAddr gagamap[256]; // ToDo: elim magic constant!!   MAX_GAS * 2];??
1122
1123   unpackAck(&nGAs, gagamap);
1124
1125   IF_PAR_DEBUG(tables,
1126                belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1127                DebugPrintGAGAMap(gagamap, nGAs));
1128
1129   IF_DEBUG(sanity,
1130            checkGAGAMap(gagamap, nGAs));
1131
1132   /*
1133    * For each (oldGA, newGA) pair, set the GA of the corresponding
1134    * thunk to the newGA, convert the thunk to a FetchMe, and return
1135    * the weight from the oldGA.
1136    */
1137   for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1138     StgClosure *old_closure = GALAlookup(gaga);
1139     StgClosure *new_closure = GALAlookup(gaga + 1);
1140
1141     ASSERT(old_closure != NULL);
1142     if (new_closure == NULL) {
1143       /* We don't have this closure, so we make a fetchme for it */
1144       globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1145       
1146       /* convertToFetchMe should be done unconditionally here.
1147          Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1148          so we have to check whether it is an RBH before converting
1149
1150          ASSERT(get_itbl(old_closure)==RBH);
1151       */
1152       if (get_itbl(old_closure)->type==RBH)
1153         convertToFetchMe((StgRBH *)old_closure, ga);
1154     } else {
1155       /* 
1156        * Oops...we've got this one already; update the RBH to
1157        * point to the object we already know about, whatever it
1158        * happens to be.
1159        */
1160       CommonUp(old_closure, new_closure);
1161       
1162       /* 
1163        * Increase the weight of the object by the amount just
1164        * received in the second part of the ACK pair.
1165        */
1166       (void) addWeight(gaga + 1);
1167     }
1168     (void) addWeight(gaga);
1169   }
1170
1171   /* check the sanity of the LAGA and GALA tables after mincing them */
1172   IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1173 }
1174
1175 #ifdef DIST
1176
1177 void
1178 bounceReval(void) {  
1179   barf("Task %x: TODO: should send NACK in response to REVAL",mytid);     
1180 }
1181
1182 static void
1183 processReval(GlobalTaskId sender) //similar to schedule...
1184 { nat nelem, space_required, nGAs;
1185   static rtsPackBuffer *packBuffer;
1186   StgClosure *newGraph;
1187   globalAddr *gagamap;
1188   StgTSO*     tso;
1189   globalAddr *ga;
1190   
1191   packBuffer = gumPackBuffer;           /* HWL */
1192   unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1193
1194   IF_PAR_DEBUG(packet,
1195                belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1196                PrintPacket(packBuffer));
1197
1198   /*
1199   space_required = packBuffer[0];
1200   if (SAVE_Hp + space_required >= SAVE_HpLim) {
1201     ReallyPerformThreadGC(space_required, rtsFalse);
1202     SAVE_Hp -= space_required;
1203   }
1204   */
1205   
1206   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1207   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1208   ASSERT(newGraph != NULL);
1209   
1210   IF_PAR_DEBUG(packet,
1211                belch("@;~)  Unpacked graph with root at %p (%s):", 
1212                      newGraph, info_type(newGraph));
1213                PrintGraph(newGraph, 0));
1214
1215   IF_PAR_DEBUG(tables,
1216                DebugPrintGAGAMap(gagamap, nGAs));
1217
1218   IF_PAR_DEBUG(tables, 
1219     printLAGAtable();   
1220     DebugPrintGAGAMap(gagamap, nGAs));   
1221
1222   //We don't send an Ack to the head!!!!
1223   ASSERT(nGAs>0);  
1224   sendAck(sender, nGAs-1, gagamap+2);
1225   
1226   IF_PAR_DEBUG(verbose,
1227                belch("@;~)  About to create Reval thread on behalf of %x", 
1228                      sender));
1229   
1230   tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1231   tso->priority=RevalPriority;
1232   tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1233   tso->revalTid =gagamap->payload.gc.gtid;
1234   scheduleThread(tso);
1235   context_switch = 1; // switch at the earliest opportunity
1236
1237 #endif
1238
1239
1240 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1241 //@subsection GUM Message Processor
1242
1243 /*
1244  * GUM Message Processor
1245
1246  * processMessages processes any messages that have arrived, calling
1247  * appropriate routines depending on the message tag
1248  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1249  * present and performs a blocking receive! During profiling it
1250  * busy-waits in order to record idle time.
1251  */
1252
1253 //@cindex processMessages
1254 rtsBool
1255 processMessages(void)
1256 {
1257   rtsPacket packet;
1258   OpCode opcode;
1259   GlobalTaskId task;
1260   rtsBool receivedFinish = rtsFalse;
1261
1262   do {
1263     packet = GetPacket();  /* Get next message; block until one available */
1264     getOpcodeAndSender(packet, &opcode, &task);
1265
1266     if (task==SysManTask) { 
1267       switch (opcode) { 
1268       case PP_PETIDS:
1269         processPEtids();
1270         break;
1271           
1272       case PP_FINISH:
1273         IF_PAR_DEBUG(verbose,
1274                      belch("==== received FINISH [%p]", mytid));
1275         /* this boolean value is returned and propagated to the main 
1276            scheduling loop, thus shutting-down this PE */
1277         receivedFinish = rtsTrue;
1278         break;  
1279           
1280       default:  
1281         barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1282       }
1283     } else if (taskIDtoPE(task)==0) { 
1284       /* When a new PE joins then potentially FISH & REVAL message may
1285          reach PES before they are notified of the new PEs existance.  The
1286          only solution is to bounce/fail these messages back to the sender.
1287          But we will worry about it once we start seeing these race
1288          conditions!  */
1289       switch (opcode) { 
1290       case PP_FISH:
1291         bounceFish();
1292         break;
1293 #ifdef DIST       
1294       case PP_REVAL:
1295         bounceReval();
1296         break;    
1297 #endif          
1298       case PP_PETIDS:
1299         belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1300         break;
1301         
1302       case PP_FINISH:   
1303         break;
1304         
1305       default:  
1306         belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1307       }
1308     } else
1309       switch (opcode) {
1310       case PP_FETCH:
1311         processFetch();
1312         // Global statistics: count no. of fetches
1313         if (RtsFlags.ParFlags.ParStats.Global &&
1314             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1315           globalParStats.rec_fetch_mess++;
1316         }
1317         break;
1318
1319       case PP_RESUME:
1320         processResume(task);
1321         // Global statistics: count no. of fetches
1322         if (RtsFlags.ParFlags.ParStats.Global &&
1323             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1324           globalParStats.rec_resume_mess++;
1325         }
1326         break;
1327
1328       case PP_ACK:
1329         processAck();
1330         break;
1331
1332       case PP_FISH:
1333         processFish();
1334         // Global statistics: count no. of fetches
1335         if (RtsFlags.ParFlags.ParStats.Global &&
1336             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1337           globalParStats.rec_fish_mess++;
1338         }
1339         break;
1340
1341       case PP_FREE:
1342         processFree();
1343         break;
1344       
1345       case PP_SCHEDULE:
1346         processSchedule(task);
1347         // Global statistics: count no. of fetches
1348         if (RtsFlags.ParFlags.ParStats.Global &&
1349             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1350           globalParStats.rec_schedule_mess++;
1351         }
1352         break;
1353       
1354 #ifdef DIST      
1355       case PP_REVAL:
1356         processReval(task);
1357         // Global statistics: count no. of fetches
1358         if (RtsFlags.ParFlags.ParStats.Global &&
1359             RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1360           globalParStats.rec_reval_mess++;
1361         }
1362         break;
1363 #endif
1364       
1365       default:
1366         /* Anything we're not prepared to deal with. */
1367         barf("Task %x: Unexpected opcode %x from %x",
1368              mytid, opcode, task);
1369       } /* switch */
1370
1371   } while (PacketsWaiting());   /* While there are messages: process them */
1372   return receivedFinish;
1373 }                               /* processMessages */
1374
1375 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1376 //@subsection Miscellaneous Functions
1377
1378 /*
1379  * blockFetch blocks a BlockedFetch node on some kind of black hole.
1380  */
1381 //@cindex blockFetch
1382 void
1383 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1384   bf->node = bh;
1385   switch (get_itbl(bh)->type) {
1386   case BLACKHOLE:
1387     bf->link = END_BQ_QUEUE;
1388     //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1389     SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1390     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1391     
1392     // put bh on the mutables list
1393     recordMutable((StgMutClosure *)bh);
1394     break;
1395     
1396   case BLACKHOLE_BQ:
1397     /* enqueue bf on blocking queue of closure bh */
1398     bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1399     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1400
1401     // put bh on the mutables list; ToDo: check
1402     recordMutable((StgMutClosure *)bh);
1403     break;
1404
1405   case FETCH_ME_BQ:
1406     /* enqueue bf on blocking queue of closure bh */
1407     bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1408     ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1409
1410     // put bh on the mutables list; ToDo: check
1411     recordMutable((StgMutClosure *)bh);
1412     break;
1413     
1414   case RBH:
1415     /* enqueue bf on blocking queue of closure bh */
1416     bf->link = ((StgRBH *)bh)->blocking_queue;
1417     ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1418
1419     // put bh on the mutables list; ToDo: check
1420     recordMutable((StgMutClosure *)bh);
1421     break;
1422     
1423   default:
1424     barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1425          (StgClosure *)bh, get_itbl((StgClosure *)bh), 
1426          info_type((StgClosure *)bh));
1427   }
1428   IF_PAR_DEBUG(bq,
1429                belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1430                      bh, info_type(bh));
1431                print_bq(bh));
1432 }
1433
1434
1435 /*
1436   @blockThread@ is called from the main scheduler whenever tso returns with
1437   a ThreadBlocked return code; tso has already been added to a blocking
1438   queue (that's done in the entry code of the closure, because it is a 
1439   cheap operation we have to do in any case); the main purpose of this
1440   routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1441   closure, which is indicated by the tso.why_blocked field;
1442   we also write an entry into the log file if we are generating one
1443
1444   Should update exectime etc in the entry code already; but we don't have
1445   something like ``system time'' in the log file anyway, so this should
1446   even out the inaccuracies.
1447 */
1448
1449 //@cindex blockThread
1450 void
1451 blockThread(StgTSO *tso)
1452 {
1453   globalAddr *remote_ga=NULL;
1454   globalAddr *local_ga;
1455   globalAddr fmbq_ga;
1456
1457   // ASSERT(we are on some blocking queue)
1458   ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1459
1460   /*
1461     We have to check why this thread has been blocked.
1462   */
1463   switch (tso->why_blocked) {
1464     case BlockedOnGA:
1465       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1466          FETCH_ME entry code */
1467       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1468
1469       /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1470          end this point; if something (eg. GC) happens inbetween the whole
1471          thing will blow up 
1472          The problem is that the ga field of the FETCH_ME has been overwritten
1473          with the head of the blocking queue (which is tso). 
1474       */
1475       ASSERT(looks_like_ga(&theGlobalFromGA));
1476       // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1477       remote_ga = &theGlobalFromGA; //tso->link;
1478       tso->link = (StgTSO*)END_BQ_QUEUE;
1479       /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1480          we have to send a Fetch message here! */
1481       if (RtsFlags.ParFlags.ParStats.Full) {
1482         /* Note that CURRENT_TIME may perform an unsafe call */
1483         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1484         tso->par.fetchcount++;
1485         tso->par.blockedat = CURRENT_TIME;
1486         /* we are about to send off a FETCH message, so dump a FETCH event */
1487         DumpRawGranEvent(CURRENT_PROC, 
1488                          taskIDtoPE(remote_ga->payload.gc.gtid),
1489                          GR_FETCH, tso, tso->block_info.closure, 0, 0);
1490       }
1491       /* Phil T. claims that this was a workaround for a hard-to-find
1492        * bug, hence I'm leaving it out for now --SDM 
1493        */
1494       /* Assign a brand-new global address to the newly created FMBQ  */
1495       local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1496       splitWeight(&fmbq_ga, local_ga);
1497       ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1498       
1499       sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1500
1501       // Global statistics: count no. of fetches
1502       if (RtsFlags.ParFlags.ParStats.Global &&
1503           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1504         globalParStats.tot_fetch_mess++;
1505       }
1506
1507       IF_DEBUG(sanity,
1508                theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1509       break;
1510
1511     case BlockedOnGA_NoSend:
1512       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1513          FETCH_ME_BQ entry code */
1514       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1515
1516       /* Fetch message has been sent already */
1517       if (RtsFlags.ParFlags.ParStats.Full) {
1518         /* Note that CURRENT_TIME may perform an unsafe call */
1519         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1520         tso->par.blockcount++;
1521         tso->par.blockedat = CURRENT_TIME;
1522         /* dump a block event, because fetch has been sent already */
1523         DumpRawGranEvent(CURRENT_PROC, thisPE,
1524                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1525       }
1526       break;
1527
1528     case BlockedOnMVar:
1529     case BlockedOnBlackHole:
1530       /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via 
1531          BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1532       ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1533              get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1534              get_itbl(tso->block_info.closure)->type==RBH);
1535
1536       /* if collecting stats update the execution time etc */
1537       if (RtsFlags.ParFlags.ParStats.Full) {
1538         /* Note that CURRENT_TIME may perform an unsafe call */
1539         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1540         tso->par.blockcount++;
1541         tso->par.blockedat = CURRENT_TIME;
1542         DumpRawGranEvent(CURRENT_PROC, thisPE,
1543                          GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1544       }
1545       break;
1546
1547     case BlockedOnDelay:
1548       /* Whats sort of stats shall we collect for an explicit threadDelay? */
1549       IF_PAR_DEBUG(verbose,
1550                belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1551                      tso->id));
1552       break;
1553
1554     /* Check that the following is impossible to happen, indeed
1555     case BlockedOnException:
1556     case BlockedOnRead:
1557     case BlockedOnWrite:
1558     */
1559     default:
1560       barf("blockThread: impossible why_blocked code %d for TSO %d",
1561            tso->why_blocked, tso->id);
1562   }
1563
1564   IF_PAR_DEBUG(verbose,
1565                belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1566                      tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1567                      (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1568   
1569   IF_PAR_DEBUG(bq,
1570                print_bq(tso->block_info.closure));
1571 }
1572
1573 /*
1574  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1575  * Important properties:
1576  *   - it varies during execution, even if the PE is idle
1577  *   - it's different for each PE
1578  *   - we never send a fish to ourselves
1579  */
1580 extern long lrand48 (void);
1581
1582 //@cindex choosePE
1583 GlobalTaskId
1584 choosePE(void)
1585 {
1586   long temp;
1587
1588   temp = lrand48() % nPEs;
1589   if (allPEs[temp] == mytid) {  /* Never send a FISH to yourself */
1590     temp = (temp + 1) % nPEs;
1591   }
1592   return allPEs[temp];
1593 }
1594
1595 /* 
1596  * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1597  * of the ga argument; called from processFetch when the local closure is
1598  * under evaluation
1599  */
1600 //@cindex createBlockedFetch
1601 StgClosure *
1602 createBlockedFetch (globalAddr ga, globalAddr rga)
1603 {
1604   StgBlockedFetch *bf;
1605   StgClosure *closure;
1606
1607   closure = GALAlookup(&ga);
1608   if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1609     barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1610     GarbageCollect(GetRoots, rtsFalse); 
1611     closure = GALAlookup(&ga);
1612     bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1613     // ToDo: check whether really guaranteed to succeed 2nd time around
1614   }
1615
1616   ASSERT(bf != (StgBlockedFetch *)NULL);
1617   SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1618   // ToDo: check whether other header info is needed
1619   bf->node = closure;
1620   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1621   bf->ga.payload.gc.slot = rga.payload.gc.slot;
1622   bf->ga.weight = rga.weight;
1623   // bf->link = NULL;  debugging
1624
1625   IF_PAR_DEBUG(schedule,
1626                fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1627                        bf, info_type((StgClosure*)bf));
1628                printGA(&(bf->ga));
1629                fputc('\n',stderr));
1630   return (StgClosure *)bf;
1631 }
1632
1633 /*
1634  * waitForTermination enters a loop ignoring spurious messages while
1635  * waiting for the termination sequence to be completed.  
1636  */
1637 //@cindex waitForTermination
1638 void
1639 waitForTermination(void)
1640 {
1641   do {
1642     rtsPacket p = GetPacket();
1643     processUnexpectedMessage(p);
1644   } while (rtsTrue);
1645 }
1646
1647 #ifdef DEBUG
1648 //@cindex DebugPrintGAGAMap
1649 void
1650 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1651 {
1652   nat i;
1653   
1654   for (i = 0; i < nGAs; ++i, gagamap += 2)
1655     fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1656             gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1657             gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1658 }
1659
1660 //@cindex checkGAGAMap
1661 void
1662 checkGAGAMap(globalAddr *gagamap, int nGAs)
1663 {
1664   nat i;
1665   
1666   for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1667     ASSERT(looks_like_ga(gagamap));
1668     ASSERT(looks_like_ga(gagamap+1));
1669   }
1670 }
1671 #endif
1672
1673 //@cindex freeMsgBuffer
1674 static StgWord **freeMsgBuffer = NULL;
1675 //@cindex freeMsgIndex
1676 static nat      *freeMsgIndex  = NULL;
1677
1678 //@cindex prepareFreeMsgBuffers
1679 void
1680 prepareFreeMsgBuffers(void)
1681 {
1682   nat i;
1683   
1684   /* Allocate the freeMsg buffers just once and then hang onto them. */
1685   if (freeMsgIndex == NULL) {
1686     freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), 
1687                                           "prepareFreeMsgBuffers (Index)");
1688     freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
1689                                           "prepareFreeMsgBuffers (Buffer)");
1690     
1691     for(i = 0; i < nPEs; i++) 
1692       if (i != (thisPE-1)) 
1693         freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1694                                                "prepareFreeMsgBuffers (Buffer #i)");
1695       else
1696         freeMsgBuffer[i] = 0;
1697   }
1698   
1699   /* Initialize the freeMsg buffer pointers to point to the start of their
1700      buffers */
1701   for (i = 0; i < nPEs; i++)
1702     freeMsgIndex[i] = 0;
1703 }
1704
1705 //@cindex freeRemoteGA
1706 void
1707 freeRemoteGA(int pe, globalAddr *ga)
1708 {
1709   nat i;
1710   
1711   ASSERT(GALAlookup(ga) == NULL);
1712   
1713   if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1714     IF_PAR_DEBUG(free,
1715                  belch("!! Filled a free message buffer (sending remaining messages indivisually)"));   
1716
1717     sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1718     i = 0;
1719   }
1720   freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1721   freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1722   freeMsgIndex[pe] = i;
1723
1724   IF_DEBUG(sanity,
1725            ga->weight = 0xdead0add;
1726            ga->payload.gc.gtid = 0xbbbbbbbb;
1727            ga->payload.gc.slot = 0xbbbbbbbb;);
1728 }
1729
1730 //@cindex sendFreeMessages
1731 void
1732 sendFreeMessages(void)
1733 {
1734   nat i;
1735   
1736   for (i = 0; i < nPEs; i++) 
1737     if (freeMsgIndex[i] > 0)
1738       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1739 }
1740
1741 /* synchronises with the other PEs. Receives and records in a global
1742  * variable the task-id of SysMan. If this is the main thread (discovered
1743  * in main.lc), identifies itself to SysMan. Finally it receives
1744  * from SysMan an array of the Global Task Ids of each PE, which is
1745  * returned as the value of the function.
1746  */
1747
1748 #if defined(PAR_TICKY)
1749 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1750 //@cindex stats_CntFreeGA
1751 void
1752 stats_CntFreeGA (void) {  // stats only
1753
1754   // Global statistics: residency of thread and spark pool
1755   if (RtsFlags.ParFlags.ParStats.Global &&
1756       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1757     nat i, s;
1758   
1759     globalParStats.cnt_free_GA++;
1760     for (i = 0, s = 0; i < nPEs; i++) 
1761       s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1762
1763     if ( s > globalParStats.res_free_GA )
1764       globalParStats.res_free_GA = s;
1765   }
1766 }
1767 #endif /* PAR_TICKY */
1768
1769 #endif /* PAR -- whole file */
1770
1771 //@node Index,  , Miscellaneous Functions, High Level Communications Routines
1772 //@subsection Index
1773
1774 //@index
1775 //* ACK::  @cindex\s-+ACK
1776 //* DebugPrintGAGAMap::  @cindex\s-+DebugPrintGAGAMap
1777 //* FETCH::  @cindex\s-+FETCH
1778 //* FISH::  @cindex\s-+FISH
1779 //* FREE::  @cindex\s-+FREE
1780 //* RESUME::  @cindex\s-+RESUME
1781 //* SCHEDULE::  @cindex\s-+SCHEDULE
1782 //* blockFetch::  @cindex\s-+blockFetch
1783 //* choosePE::  @cindex\s-+choosePE
1784 //* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
1785 //* freeMsgIndex::  @cindex\s-+freeMsgIndex
1786 //* freeRemoteGA::  @cindex\s-+freeRemoteGA
1787 //* gumPackBuffer::  @cindex\s-+gumPackBuffer
1788 //* initMoreBuffers::  @cindex\s-+initMoreBuffers
1789 //* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
1790 //* processAck::  @cindex\s-+processAck
1791 //* processFetch::  @cindex\s-+processFetch
1792 //* processFetches::  @cindex\s-+processFetches
1793 //* processFish::  @cindex\s-+processFish
1794 //* processFree::  @cindex\s-+processFree
1795 //* processMessages::  @cindex\s-+processMessages
1796 //* processResume::  @cindex\s-+processResume
1797 //* processSchedule::  @cindex\s-+processSchedule
1798 //* sendAck::  @cindex\s-+sendAck
1799 //* sendFetch::  @cindex\s-+sendFetch
1800 //* sendFish::  @cindex\s-+sendFish
1801 //* sendFree::  @cindex\s-+sendFree
1802 //* sendFreeMessages::  @cindex\s-+sendFreeMessages
1803 //* sendResume::  @cindex\s-+sendResume
1804 //* sendSchedule::  @cindex\s-+sendSchedule
1805 //* unpackAck::  @cindex\s-+unpackAck
1806 //* unpackFetch::  @cindex\s-+unpackFetch
1807 //* unpackFish::  @cindex\s-+unpackFish
1808 //* unpackFree::  @cindex\s-+unpackFree
1809 //* unpackResume::  @cindex\s-+unpackResume
1810 //* unpackSchedule::  @cindex\s-+unpackSchedule
1811 //* waitForTermination::  @cindex\s-+waitForTermination
1812 //@end index