bce0de78066fc6f2d0bfb715a3c1486a43181f89
[ghc-hetmet.git] / ghc / rts / parallel / HLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Wed Jan 12 2000 13:32:25 Stardate: [-30]4193.86 hwloidl>
3  * $Id: HLComms.c,v 1.2 2000/01/13 14:34:07 hwloidl Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  * Contains the high-level routines (i.e. communication
8  * subsystem independent) used by GUM
9  * 
10  * Phil Trinder, Glasgow University, 12 December 1994
11  * Adapted for new RTS
12  * Phil Trinder, Simon Marlow July 1998
13  * H-W. Loidl, Heriot-Watt University, November 1999
14  * 
15  * ------------------------------------------------------------------------- */
16
17 #ifdef PAR /* whole file */
18
19 //@node High Level Communications Routines, , ,
20 //@section High Level Communications Routines
21
22 //@menu
23 //* Macros etc::                
24 //* Includes::                  
25 //* GUM Message Sending and Unpacking Functions::  
26 //* Message-Processing Functions::  
27 //* GUM Message Processor::     
28 //* Miscellaneous Functions::   
29 //* Index::                     
30 //@end menu
31
32 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
33 //@subsection Macros etc
34
35 # ifndef _AIX
36 # define NON_POSIX_SOURCE /* so says Solaris */
37 # endif
38
39 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
40 //@subsection Includes
41
42 #include "Rts.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"   // for recordMutable
46 #include "HLC.h"
47 #include "Parallel.h"
48 #include "GranSimRts.h"
49 #include "ParallelRts.h"
50 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
51 #if defined(DEBUG)
52 # include "ParallelDebug.h"
53 #endif
54 #include "StgMacros.h" // inlined IS_... fcts
55
56 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
57 //@subsection GUM Message Sending and Unpacking Functions
58
59 /*
60  * GUM Message Sending and Unpacking Functions
61  */
62
63 /*
64  * Allocate space for message processing
65  */
66
67 //@cindex gumPackBuffer
68 static rtsPackBuffer *gumPackBuffer;
69
70 //@cindex initMoreBuffers
71 rtsBool
72 initMoreBuffers(void)
73 {
74   if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize, 
75                                              "initMoreBuffers")) == NULL)
76     return rtsFalse;
77   return rtsTrue;
78 }
79
80 /*
81  * SendFetch packs the two global addresses and a load into a message +
82  * sends it.  
83
84 //@cindex FETCH
85
86    Structure of a FETCH message:
87
88          |    GA 1     |        GA 2          |
89          +------------------------------------+------+
90          | gtid | slot | weight | gtid | slot | load |
91          +------------------------------------+------+
92  */
93
94 //@cindex sendFetch
95 void
96 sendFetch(globalAddr *rga, globalAddr *lga, int load)
97 {
98   ASSERT(rga->weight > 0 && lga->weight > 0);
99   IF_PAR_DEBUG(fetch,
100                belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
101                      mytid,
102                      rga->payload.gc.gtid, rga->payload.gc.slot, 
103                      lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
104                      load));
105
106
107   /* ToDo: Dump event
108   DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid), 
109                    GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
110                    0, spark_queue_len(ADVISORY_POOL));
111   */
112
113   sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
114           (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot, 
115           (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid, 
116           (StgWord) lga->payload.gc.slot, (StgWord) load);
117 }
118
119 /*
120  * unpackFetch unpacks a FETCH message into two Global addresses and a load
121  * figure.  
122 */
123
124 //@cindex unpackFetch
125 static void
126 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
127 {
128   long buf[6];
129
130   GetArgs(buf, 6); 
131
132   IF_PAR_DEBUG(fetch,
133                belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
134                      mytid,
135                      (GlobalTaskId) buf[0], (int) buf[1], 
136                      (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
137
138   lga->weight = 1;
139   lga->payload.gc.gtid = (GlobalTaskId) buf[0];
140   lga->payload.gc.slot = (int) buf[1];
141
142   rga->weight = (unsigned) buf[2];
143   rga->payload.gc.gtid = (GlobalTaskId) buf[3];
144   rga->payload.gc.slot = (int) buf[4];
145
146   *load = (int) buf[5];
147
148   ASSERT(rga->weight > 0);
149 }
150
151 /*
152  * SendResume packs the remote blocking queue's GA and data into a message 
153  * and sends it.
154
155 //@cindex RESUME
156
157    Structure of a RESUME message:
158
159       -------------------------------
160       | weight | slot | n | data ...
161       -------------------------------
162
163    data is a packed graph represented as an rtsPackBuffer
164    n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
165  */
166
167 //@cindex sendResume
168 void
169 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
170 {
171   IF_PAR_DEBUG(resume,
172                PrintPacket(data);
173                belch("[] [%x] Sending Resume for ((%x, %d, %x))", 
174                      mytid,
175                      rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
176
177   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
178            nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data, 
179            2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
180 }
181
182 /*
183  * unpackResume unpacks a Resume message into two Global addresses and
184  * a data array.
185  */
186
187 //@cindex unpackResume
188 static void
189 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
190 {
191     long buf[3];
192
193     GetArgs(buf, 3); 
194
195     IF_PAR_DEBUG(resume,
196                  belch("[] [%x] Unpacking Resume for ((%x, %d, %x))", 
197                        mytid, mytid,
198                        (int) buf[1], (unsigned) buf[0]));
199
200     /*
201       RESUME event is written in awaken_blocked_queue
202     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
203                      GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
204     */
205
206     lga->weight = (unsigned) buf[0];
207     lga->payload.gc.gtid = mytid;
208     lga->payload.gc.slot = (int) buf[1];
209
210     *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
211     GetArgs(data, *nelem);
212     *nelem -= PACK_BUFFER_HDR_SIZE;
213 }
214
215 /*
216  * SendAck packs the global address being acknowledged, together with
217  * an array of global addresses for any closures shipped and sends them.
218
219 //@cindex ACK
220
221    Structure of an ACK message:
222
223       |        GA 1          |        GA 2          | 
224       +---------------------------------------------+-------
225       | weight | gtid | slot | weight | gtid | slot |  .....  ngas times
226       + --------------------------------------------+------- 
227
228  */
229
230 //@cindex sendAck
231 void
232 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
233 {
234   static long *buffer;
235   long *p;
236   int i;
237
238   buffer = (long *) gumPackBuffer;
239
240   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
241     ASSERT(gagamap[1].weight > 0);
242     p[0] = (long) gagamap->weight;
243     p[1] = (long) gagamap->payload.gc.gtid;
244     p[2] = (long) gagamap->payload.gc.slot;
245     gagamap++;
246     p[3] = (long) gagamap->weight;
247     p[4] = (long) gagamap->payload.gc.gtid;
248     p[5] = (long) gagamap->payload.gc.slot;
249     gagamap++;
250   }
251   IF_PAR_DEBUG(ack,
252                belch(",, [%x] Sending Ack (%d pairs) to PE %x\n", 
253                      mytid, ngas, task));
254
255   sendOpN(PP_ACK, task, p - buffer, buffer);
256 }
257
258 /*
259  * unpackAck unpacks an Acknowledgement message into a Global address,
260  * a count of the number of global addresses following and a map of 
261  * Global addresses
262  */
263
264 //@cindex unpackAck
265 static void
266 unpackAck(int *ngas, globalAddr *gagamap)
267 {
268   long GAarraysize;
269   long buf[6];
270   
271   GetArgs(&GAarraysize, 1);
272   
273   *ngas = GAarraysize / 6;
274   
275   IF_PAR_DEBUG(ack,
276                belch(",, [%x] Unpacking Ack (%d pairs) on %x\n", 
277                      mytid, *ngas, mytid));
278
279   while (GAarraysize > 0) {
280     GetArgs(buf, 6);
281     gagamap->weight = (rtsWeight) buf[0];
282     gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
283     gagamap->payload.gc.slot = (int) buf[2];
284     gagamap++;
285     gagamap->weight = (rtsWeight) buf[3];
286     gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
287     gagamap->payload.gc.slot = (int) buf[5];
288     ASSERT(gagamap->weight > 0);
289     gagamap++;
290     GAarraysize -= 6;
291   }
292 }
293
294 /*
295  * SendFish packs the global address being acknowledged, together with
296  * an array of global addresses for any closures shipped and sends them.
297
298 //@cindex FISH
299
300  Structure of a FISH message:
301
302      +----------------------------------+
303      | orig PE | age | history | hunger |
304      +----------------------------------+
305  */
306
307 //@cindex sendFish
308 void
309 sendFish(GlobalTaskId destPE, GlobalTaskId origPE, 
310          int age, int history, int hunger)
311 {
312   IF_PAR_DEBUG(fish,
313                belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)", 
314                      mytid, destPE, outstandingFishes));
315
316   sendOpV(PP_FISH, destPE, 4, 
317           (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
318
319   if (origPE == mytid) {
320     //fishing = rtsTrue;
321     outstandingFishes++;
322   }
323 }
324
325 /*
326  * unpackFish unpacks a FISH message into the global task id of the
327  * originating PE and 3 data fields: the age, history and hunger of the
328  * fish. The history + hunger are not currently used.
329
330  */
331
332 //@cindex unpackFish
333 static void
334 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
335 {
336   long buf[4];
337   
338   GetArgs(buf, 4);
339   
340   IF_PAR_DEBUG(fish,
341                belch("$$ [%x] Unpacking Fish from PE %x (age=%d)", 
342                      mytid, (GlobalTaskId) buf[0], (int) buf[1]));
343
344   *origPE = (GlobalTaskId) buf[0];
345   *age = (int) buf[1];
346   *history = (int) buf[2];
347   *hunger = (int) buf[3];
348 }
349
350 /*
351  * SendFree sends (weight, slot) pairs for GAs that we no longer need
352  * references to.  
353
354 //@cindex FREE
355
356    Structure of a FREE message:
357    
358        +-----------------------------
359        | n | weight_1 | slot_1 | ...
360        +-----------------------------
361  */
362 //@cindex sendFree
363 void
364 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
365 {
366     IF_PAR_DEBUG(free,
367                  belch("!! [%x] Sending Free (%d GAs) to %x", 
368                        mytid, nelem/2, pe));
369
370     sendOpN(PP_FREE, pe, nelem, data);
371 }
372
373 /*
374  * unpackFree unpacks a FREE message into the amount of data shipped and
375  * a data block.
376  */
377 //@cindex unpackFree
378 static void
379 unpackFree(int *nelem, rtsPackBuffer *data)
380 {
381   long buf[1];
382   
383   GetArgs(buf, 1);
384   *nelem = (int) buf[0];
385
386   IF_PAR_DEBUG(free,
387                belch("!! [%x] Unpacking Free (%d GAs)", 
388                      mytid, *nelem/2));
389
390   GetArgs(data, *nelem);
391 }
392
393 /*
394  * SendSchedule sends a closure to be evaluated in response to a Fish
395  * message. The message is directed to the PE that originated the Fish
396  * (origPE), and includes the packed closure (data) along with its size
397  * (nelem).
398
399 //@cindex SCHEDULE
400
401    Structure of a SCHEDULE message:
402
403        +------------------------------------
404        | PE | n | pack buffer of a graph ...
405        +------------------------------------
406  */
407 //@cindex sendSchedule
408 void
409 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
410 {
411   IF_PAR_DEBUG(schedule,
412                PrintPacket(data);
413                belch("-- [%x] Sending Schedule (%d elems) to %x\n", 
414                      mytid, nelem, origPE));
415
416   sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
417 }
418
419 /*
420  * unpackSchedule unpacks a SCHEDULE message into the Global address of
421  * the closure shipped, the amount of data shipped (nelem) and the data
422  * block (data).
423  */
424
425 //@cindex unpackSchedule
426 static void
427 unpackSchedule(int *nelem, rtsPackBuffer *data)
428 {
429     long buf[1];
430
431     GetArgs(buf, 1);
432     /* no. of elems, not counting the header of the pack buffer */
433     *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
434
435     IF_PAR_DEBUG(schedule,
436                  belch("-- [%x] Unpacking Schedule (%d elems) on %x\n", 
437                        mytid, *nelem));
438
439     /* automatic cast of flat pvm-data to rtsPackBuffer */
440     GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
441 }
442
443 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
444 //@subsection Message-Processing Functions
445
446 /*
447  * Message-Processing Functions
448  *
449  * The following routines process incoming GUM messages. Often reissuing
450  * messages in response.
451  *
452  * processFish unpacks a fish message, reissuing it if it's our own,
453  * sending work if we have it or sending it onwards otherwise.
454  */
455
456 /*
457  * blockFetch blocks a BlockedFetch node on some kind of black hole.
458  */
459 //@cindex blockFetch
460 static void
461 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
462   bf->node = bh;
463   switch (get_itbl(bh)->type) {
464   case BLACKHOLE:
465     bf->link = END_BQ_QUEUE;
466     //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
467     SET_INFO(bh, &BLACKHOLE_BQ_info);  // turn closure into a blocking queue
468     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
469     
470     // put bh on the mutables list
471     recordMutable((StgMutClosure *)bh);
472
473 # if 0
474     /*
475      * If we modify a black hole in the old generation, we have to
476      * make sure it goes on the mutables list
477      */
478     
479     if (bh <= StorageMgrInfo.OldLim) {
480       MUT_LINK(bh) = (StgWord) StorageMgrInfo.OldMutables;
481       StorageMgrInfo.OldMutables = bh;
482     } else
483       MUT_LINK(bh) = MUT_NOT_LINKED;
484 # endif
485     break;
486     
487   case BLACKHOLE_BQ:
488     /* enqueue bf on blocking queue of closure bh */
489     bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
490     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
491
492     // put bh on the mutables list; ToDo: check
493     recordMutable((StgMutClosure *)bh);
494     break;
495
496   case FETCH_ME_BQ:
497     /* enqueue bf on blocking queue of closure bh */
498     bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
499     ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
500
501     // put bh on the mutables list; ToDo: check
502     recordMutable((StgMutClosure *)bh);
503     break;
504     
505   case RBH:
506     /* enqueue bf on blocking queue of closure bh */
507     bf->link = ((StgRBH *)bh)->blocking_queue;
508     ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
509
510     // put bh on the mutables list; ToDo: check
511     recordMutable((StgMutClosure *)bh);
512     break;
513     
514   default:
515     barf("Panic (blockFetch): thought %p was a black hole (IP %#lx, %s)",
516          (StgClosure *)bh, get_itbl((StgClosure *)bh), 
517          info_type((StgClosure *)bh));
518   }
519   IF_PAR_DEBUG(verbose,
520                belch("## blockFetch: after block the BQ of %p (%s) is:",
521                      bh, info_type(bh));
522                print_bq(bh));
523 }
524
525
526 /*
527  * processFetches constructs and sends resume messages for every
528  * BlockedFetch which is ready to be awakened.
529  * awaken_blocked_queue (in Schedule.c) is responsible for moving 
530  * BlockedFetches from a blocking queue to the PendingFetches queue.
531  */
532 void GetRoots(void);
533 extern StgBlockedFetch *PendingFetches;
534
535 nat
536 pending_fetches_len(void)
537 {
538   StgBlockedFetch *bf;
539   nat n;
540
541   for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
542     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
543   }
544   return n;
545 }
546
547 //@cindex processFetches
548 void
549 processFetches(void) {
550   StgBlockedFetch *bf;
551   StgClosure *closure, *next;
552   StgInfoTable *ip;
553   globalAddr rga;
554   static rtsPackBuffer *packBuffer;
555     
556   IF_PAR_DEBUG(verbose,
557                belch("__ processFetches: %d  pending fetches",
558                      pending_fetches_len()));
559   
560   for (bf = PendingFetches; 
561        bf != END_BF_QUEUE;
562        bf=(StgBlockedFetch *)(bf->link)) {
563     /* the PendingFetches list contains only BLOCKED_FETCH closures */
564     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
565
566     /*
567      * Find the target at the end of the indirection chain, and
568      * process it in much the same fashion as the original target
569      * of the fetch.  Though we hope to find graph here, we could
570      * find a black hole (of any flavor) or even a FetchMe.
571      */
572     closure = bf->node;
573     /*
574       HACK 312: bf->node may have been evacuated since filling it; follow
575        the evacuee in this case; the proper way to handle this is to
576        traverse the blocking queue and update the node fields of
577        BLOCKED_FETCH entries when evacuating an BLACKHOLE_BQ, FETCH_ME_BQ
578        or RBH (but it's late and I'm tired) 
579     */
580     if (get_itbl(closure)->type == EVACUATED)
581       closure = ((StgEvacuated *)closure)->evacuee;
582
583     while ((next = IS_INDIRECTION(closure)) != NULL) { closure = next; }
584
585     ip = get_itbl(closure);
586     if (ip->type == FETCH_ME) {
587       /* Forward the Fetch to someone else */
588       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
589       rga.payload.gc.slot = bf->ga.payload.gc.slot;
590       rga.weight = bf->ga.weight;
591       
592       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
593
594       IF_PAR_DEBUG(forward,
595                    belch("__ processFetches: Forwarding fetch from %lx to %lx",
596                          mytid, rga.payload.gc.gtid));
597
598     } else if (IS_BLACK_HOLE(closure)) {
599       IF_PAR_DEBUG(verbose,
600                    belch("__ processFetches: trying to send a BLACK_HOLE => doign a blockFetch on closure %p (%s)",
601                          closure, info_type(closure)));
602       bf->node = closure;
603       blockFetch(bf, closure);
604     } else {
605       /* We now have some local graph to send back */
606       nat size;
607
608       packBuffer = gumPackBuffer;
609       IF_PAR_DEBUG(verbose,
610                    belch("__ processFetches: PackNearbyGraph of closure %p (%s)",
611                          closure, info_type(closure)));
612
613       if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
614         // Put current BF back on list
615         bf->link = (StgBlockingQueueElement *)PendingFetches;
616         PendingFetches = (StgBlockedFetch *)bf;
617         // ToDo: check that nothing more has to be done to prepare for GC!
618         GarbageCollect(GetRoots); 
619         bf = PendingFetches;
620         PendingFetches = (StgBlockedFetch *)(bf->link);
621         closure = bf->node;
622         packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
623         ASSERT(packBuffer != (rtsPackBuffer *)NULL);
624       }
625       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
626       rga.payload.gc.slot = bf->ga.payload.gc.slot;
627       rga.weight = bf->ga.weight;
628       
629       sendResume(&rga, size, packBuffer);
630     }
631   }
632   PendingFetches = END_BF_QUEUE;
633 }
634
635 #if 0
636 /*
637   Alternatively to sending fetch messages directly from the FETCH_ME_entry
638   code we could just store the data about the remote data in a global
639   variable and send the fetch request from the main scheduling loop (similar
640   to processFetches above). This would save an expensive STGCALL in the entry 
641   code because we have to go back to the scheduler anyway.
642 */
643 //@cindex processFetches
644 void
645 processTheRealFetches(void) {
646   StgBlockedFetch *bf;
647   StgClosure *closure, *next;
648     
649   IF_PAR_DEBUG(verbose,
650                belch("__ processTheRealFetches: ");
651                printGA(&theGlobalFromGA);
652                printGA(&theGlobalToGA));
653
654   ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
655          theGlobalToGA.payload.gc.gtid != 0);
656
657   /* the old version did this in the FETCH_ME entry code */
658   sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
659   
660 #if DEBUG
661   theGlobalFromGA.payload.gc.gtid = 0;
662   theGlobalToGA.payload.gc.gtid = 0;
663 #endif DEBUG
664 }
665 #endif
666
667
668 /*
669  * processFish unpacks a fish message, reissuing it if it's our own,
670  * sending work if we have it or sending it onwards otherwise.
671  */
672 //@cindex processFish
673 static void
674 processFish(void)
675 {
676   GlobalTaskId origPE;
677   int age, history, hunger;
678   rtsSpark spark;
679   static rtsPackBuffer *packBuffer; 
680
681   unpackFish(&origPE, &age, &history, &hunger);
682
683   if (origPE == mytid) {
684     //fishing = rtsFalse;                   // fish has come home
685     outstandingFishes--;
686     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
687     return;                               // that's all
688   }
689
690   ASSERT(origPE != mytid);
691   IF_PAR_DEBUG(fish,
692                belch("$$ [%x] processing fish; %d sparks available",
693                      mytid, spark_queue_len(ADVISORY_POOL)));
694   while ((spark = findLocalSpark(rtsTrue)) != NULL) {
695     nat size;
696     // StgClosure *graph;
697
698     packBuffer = gumPackBuffer; 
699     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
700     if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
701       IF_PAR_DEBUG(fish,
702                    belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
703                          (StgClosure *)spark));
704       GarbageCollect(GetRoots);
705       /* Now go back and try again */
706     } else {
707       IF_PAR_DEBUG(fish,
708                    belch("$$ [%x] Replying to FISH from %x by sending graph @ %p (%s)",
709                          mytid, origPE, 
710                          (StgClosure *)spark, info_type((StgClosure *)spark)));
711       sendSchedule(origPE, size, packBuffer);
712       disposeSpark(spark);
713       break;
714     }
715   }
716   if (spark == (rtsSpark)NULL) {
717     IF_PAR_DEBUG(fish,
718                  belch("$$ [%x] No sparks available for FISH from %x",
719                        mytid, origPE));
720     /* We have no sparks to give */
721     if (age < FISH_LIFE_EXPECTANCY)
722       /* and the fish is atill young, send it to another PE to look for work */
723       sendFish(choosePE(), origPE,
724                (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
725
726     /* otherwise, send it home to die */
727     else
728       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
729     }
730 }  /* processFish */
731
732 /*
733  * processFetch either returns the requested data (if available) 
734  * or blocks the remote blocking queue on a black hole (if not).
735  */
736
737 //@cindex processFetch
738 static void
739 processFetch(void)
740 {
741   globalAddr ga, rga;
742   int load;
743   StgClosure *closure;
744   StgInfoTable *ip;
745
746   unpackFetch(&ga, &rga, &load);
747   IF_PAR_DEBUG(fetch,
748                belch("%% [%x] Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
749                      mytid, 
750                      ga.payload.gc.gtid, ga.payload.gc.slot,
751                      rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
752                      rga.payload.gc.gtid));
753
754   closure = GALAlookup(&ga);
755   ASSERT(closure != (StgClosure *)NULL);
756   ip = get_itbl(closure);
757   if (ip->type == FETCH_ME) {
758     /* Forward the Fetch to someone else */
759     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
760   } else if (rga.payload.gc.gtid == mytid) {
761     /* Our own FETCH forwarded back around to us */
762     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
763     
764     IF_PAR_DEBUG(fetch,
765                  belch("%% [%x] Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
766                        mytid, closure, info_type(closure), fmbq, info_type(fmbq)));
767     /* We may have already discovered that the fetch target is our own. */
768     if ((StgClosure *)fmbq != closure) 
769       CommonUp((StgClosure *)fmbq, closure);
770     (void) addWeight(&rga);
771   } else if (IS_BLACK_HOLE(closure)) {
772     /* This includes RBH's and FMBQ's */
773     StgBlockedFetch *bf;
774
775     ASSERT(GALAlookup(&rga) == NULL);
776
777     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
778        closure into the BQ in order to denote that when updating this node
779        the result should be sent to the originator of this fetch message. */
780     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
781     blockFetch(bf, closure);
782
783     IF_PAR_DEBUG(fetch,
784                  belch("%% [%x] Blocking Fetch ((%x, %d, %x)) on %p (%s)",
785                        mytid, 
786                        rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
787                        closure, info_type(closure)));
788     } else {                    
789       /* The target of the FetchMe is some local graph */
790       nat size;
791       // StgClosure *graph;
792       rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
793
794       if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
795         GarbageCollect(GetRoots); 
796         closure = GALAlookup(&ga);
797         buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
798         ASSERT(buffer != (rtsPackBuffer *)NULL);
799       }
800       sendResume(&rga, size, buffer);
801     }
802 }
803
804 /*
805  * processFree unpacks a FREE message and adds the weights to our GAs.
806  */
807 //@cindex processFree
808 static void
809 processFree(void)
810 {
811   int nelem;
812   static StgWord *buffer;
813   int i;
814   globalAddr ga;
815
816   buffer = (StgWord *)gumPackBuffer;
817   unpackFree(&nelem, buffer);
818   IF_PAR_DEBUG(free,
819                belch("!! [%x] Rcvd Free (%d GAs)", mytid, nelem / 2));
820
821   ga.payload.gc.gtid = mytid;
822   for (i = 0; i < nelem;) {
823     ga.weight = (rtsWeight) buffer[i++];
824     ga.payload.gc.slot = (int) buffer[i++];
825     IF_PAR_DEBUG(free,
826                  fprintf(stderr, "!! [%x] Processing free ", mytid); 
827                  printGA(&ga);
828                  fputc('\n', stderr);
829                  );
830     (void) addWeight(&ga);
831   }
832 }
833
834 /*
835  * processResume unpacks a RESUME message into the graph, filling in
836  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
837  * FetchMe (now a blocking queue) are awakened, and the blocking queue
838  * is converted into an indirection.  Finally it sends an ACK in response
839  * which contains any newly allocated GAs.
840  */
841
842 //@cindex processResume
843 static void
844 processResume(GlobalTaskId sender)
845 {
846   int nelem;
847   nat nGAs;
848   static rtsPackBuffer *packBuffer;
849   StgClosure *newGraph, *old;
850   globalAddr lga;
851   globalAddr *gagamap;
852   
853   packBuffer = gumPackBuffer;
854   unpackResume(&lga, &nelem, (StgPtr)packBuffer);
855
856   IF_PAR_DEBUG(resume,
857                fprintf(stderr, "[] [%x] Rcvd Resume for ", mytid); 
858                printGA(&lga);
859                fputc('\n', stderr);
860                PrintPacket((rtsPackBuffer *)packBuffer));
861   
862   /* 
863    * We always unpack the incoming graph, even if we've received the
864    * requested node in some other data packet (and already awakened
865    * the blocking queue).
866   if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
867     ReallyPerformThreadGC(packBuffer[0], rtsFalse);
868     SAVE_Hp -= packBuffer[0];
869   }
870    */
871
872   // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
873
874   /* Do this *after* GC; we don't want to release the object early! */
875
876   if (lga.weight > 0)
877     (void) addWeight(&lga);
878
879   old = GALAlookup(&lga);
880
881   if (RtsFlags.ParFlags.ParStats.Full) {
882     // StgTSO *tso = END_TSO_QUEUE;
883     StgBlockingQueueElement *bqe;
884
885     /* Write REPLY events to the log file, indicating that the remote
886        data has arrived */
887     if (get_itbl(old)->type == FETCH_ME_BQ ||
888         get_itbl(old)->type == RBH) 
889       for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
890            bqe->link != END_BQ_QUEUE;
891            bqe = bqe->link)
892         if (get_itbl((StgClosure *)bqe)->type == TSO)
893           DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
894                            GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
895                            0, spark_queue_len(ADVISORY_POOL));
896   }
897
898   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
899   ASSERT(newGraph != NULL);
900
901   /* 
902    * Sometimes, unpacking will common up the resumee with the
903    * incoming graph, but if it hasn't, we'd better do so now.
904    */
905    
906   if (get_itbl(old)->type == FETCH_ME_BQ)
907     CommonUp(old, newGraph);
908
909   IF_PAR_DEBUG(resume,
910                DebugPrintGAGAMap(gagamap, nGAs));
911   
912   sendAck(sender, nGAs, gagamap);
913 }
914
915 /*
916  * processSchedule unpacks a SCHEDULE message into the graph, filling
917  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
918  * the local spark queue.  Finally it sends an ACK in response
919  * which contains any newly allocated GAs.
920  */
921 //@cindex processSchedule
922 static void
923 processSchedule(GlobalTaskId sender)
924 {
925   nat nelem, space_required, nGAs;
926   rtsBool success;
927   static rtsPackBuffer *packBuffer;
928   StgClosure *newGraph;
929   globalAddr *gagamap;
930   
931   packBuffer = gumPackBuffer;           /* HWL */
932   unpackSchedule(&nelem, packBuffer);
933
934   IF_PAR_DEBUG(schedule,
935                belch("-- [%x] Rcvd Schedule (%d elems)", mytid, nelem);
936                PrintPacket(packBuffer));
937
938   /*
939    * For now, the graph is a closure to be sparked as an advisory
940    * spark, but in future it may be a complete spark with
941    * required/advisory status, priority etc.
942    */
943
944   /*
945   space_required = packBuffer[0];
946   if (SAVE_Hp + space_required >= SAVE_HpLim) {
947     ReallyPerformThreadGC(space_required, rtsFalse);
948     SAVE_Hp -= space_required;
949   }
950   */
951   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!1
952   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
953   ASSERT(newGraph != NULL);
954   success = add_to_spark_queue(newGraph, rtsFalse);
955
956   IF_PAR_DEBUG(pack,
957                if (success)
958                  belch("+* added spark to unpacked graph %p; %d sparks available on [%x]", 
959                      newGraph, spark_queue_len(ADVISORY_POOL), mytid);
960                else
961                  belch("+* received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]", 
962                      newGraph, spark_queue_len(ADVISORY_POOL), mytid);
963                belch("-* Unpacked graph with root at %p (%s):", 
964                      newGraph, info_type(newGraph));
965                PrintGraph(newGraph, 0));
966
967   IF_PAR_DEBUG(pack,
968                DebugPrintGAGAMap(gagamap, nGAs));
969
970   if (nGAs > 0)
971     sendAck(sender, nGAs, gagamap);
972
973   //fishing = rtsFalse;
974   ASSERT(outstandingFishes>0);
975   outstandingFishes--;
976 }
977
978 /*
979  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
980  * (which represent shared thunks that have been shipped) into fetch-mes
981  * to remote GAs.
982  */
983 //@cindex processAck
984 static void
985 processAck(void)
986 {
987   nat nGAs;
988   globalAddr *gaga;
989   globalAddr gagamap[256]; // ToDo: elim magic constant!!   MAX_GAS * 2];??
990
991   unpackAck(&nGAs, gagamap);
992
993   IF_PAR_DEBUG(ack,
994                belch(",, [%x] Rcvd Ack (%d pairs)", mytid, nGAs);
995                DebugPrintGAGAMap(gagamap, nGAs));
996
997   /*
998    * For each (oldGA, newGA) pair, set the GA of the corresponding
999    * thunk to the newGA, convert the thunk to a FetchMe, and return
1000    * the weight from the oldGA.
1001    */
1002   for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1003     StgClosure *old_closure = GALAlookup(gaga);
1004     StgClosure *new_closure = GALAlookup(gaga + 1);
1005
1006     ASSERT(old_closure != NULL);
1007     if (new_closure == NULL) {
1008       /* We don't have this closure, so we make a fetchme for it */
1009       globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1010       
1011       /* convertToFetchMe should be done unconditionally here.
1012          Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1013          so we have to check whether it is an RBH before converting
1014
1015          ASSERT(get_itbl(old_closure)==RBH);
1016       */
1017       if (get_itbl(old_closure)->type==RBH)
1018         convertToFetchMe(old_closure, ga);
1019     } else {
1020       /* 
1021        * Oops...we've got this one already; update the RBH to
1022        * point to the object we already know about, whatever it
1023        * happens to be.
1024        */
1025       CommonUp(old_closure, new_closure);
1026       
1027       /* 
1028        * Increase the weight of the object by the amount just
1029        * received in the second part of the ACK pair.
1030        */
1031       (void) addWeight(gaga + 1);
1032     }
1033     (void) addWeight(gaga);
1034   }
1035 }
1036
1037 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1038 //@subsection GUM Message Processor
1039
1040 /*
1041  * GUM Message Processor
1042
1043  * processMessages processes any messages that have arrived, calling
1044  * appropriate routines depending on the message tag
1045  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1046  * present and performs a blocking receive! During profiling it
1047  * busy-waits in order to record idle time.
1048  */
1049
1050 //@cindex processMessages
1051 void
1052 processMessages(void)
1053 {
1054   rtsPacket packet;
1055   OpCode opcode;
1056   GlobalTaskId task;
1057     
1058   do {
1059     packet = GetPacket();  /* Get next message; block until one available */
1060     getOpcodeAndSender(packet, &opcode, &task);
1061
1062     switch (opcode) {
1063     case PP_FINISH:
1064       IF_PAR_DEBUG(verbose,
1065                    belch("== [%x] received FINISH", mytid));
1066       /* setting this global variables eventually terminates the main
1067          scheduling loop for this PE and causes a shut-down, sending 
1068          PP_FINISH to SysMan */
1069       GlobalStopPending = rtsTrue;
1070       break;
1071
1072     case PP_FETCH:
1073       processFetch();
1074       break;
1075
1076     case PP_RESUME:
1077       processResume(task);
1078       break;
1079
1080     case PP_ACK:
1081       processAck();
1082       break;
1083
1084     case PP_FISH:
1085       processFish();
1086       break;
1087
1088     case PP_FREE:
1089       processFree();
1090       break;
1091       
1092     case PP_SCHEDULE:
1093       processSchedule(task);
1094       break;
1095
1096     default:
1097       /* Anything we're not prepared to deal with. */
1098       barf("Task %x: Unexpected opcode %x from %x",
1099            mytid, opcode, task);
1100     } /* switch */
1101
1102   } while (PacketsWaiting());   /* While there are messages: process them */
1103 }                               /* processMessages */
1104
1105 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1106 //@subsection Miscellaneous Functions
1107
1108 /*
1109  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1110  * Important properties:
1111  *   - it varies during execution, even if the PE is idle
1112  *   - it's different for each PE
1113  *   - we never send a fish to ourselves
1114  */
1115 extern long lrand48 (void);
1116
1117 //@cindex choosePE
1118 GlobalTaskId
1119 choosePE(void)
1120 {
1121   long temp;
1122
1123   temp = lrand48() % nPEs;
1124   if (allPEs[temp] == mytid) {  /* Never send a FISH to yourself */
1125     temp = (temp + 1) % nPEs;
1126   }
1127   return allPEs[temp];
1128 }
1129
1130 /* 
1131  * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1132  * of the ga argument; called from processFetch when the local closure is
1133  * under evaluation
1134  */
1135 //@cindex createBlockedFetch
1136 StgClosure *
1137 createBlockedFetch (globalAddr ga, globalAddr rga)
1138 {
1139   StgBlockedFetch *bf;
1140   StgClosure *closure;
1141
1142   closure = GALAlookup(&ga);
1143   if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
1144     GarbageCollect(GetRoots); 
1145     closure = GALAlookup(&ga);
1146     bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
1147     // ToDo: check whether really guaranteed to succeed 2nd time around
1148   }
1149
1150   ASSERT(bf != (StgClosure *)NULL);
1151   SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
1152   // ToDo: check whether other header info is needed
1153   bf->node = closure;
1154   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1155   bf->ga.payload.gc.slot = rga.payload.gc.slot;
1156   bf->ga.weight = rga.weight;
1157   // bf->link = NULL;  debugging
1158
1159   IF_PAR_DEBUG(fetch,
1160                fprintf(stderr, "%% [%x] created BF: closure=%p (%s), GA: ",
1161                        mytid, closure, info_type(closure));
1162                printGA(&(bf->ga));
1163                fputc('\n',stderr));
1164   return bf;
1165 }
1166
1167 /*
1168  * waitForTermination enters a loop ignoring spurious messages while
1169  * waiting for the termination sequence to be completed.  
1170  */
1171 //@cindex waitForTermination
1172 void
1173 waitForTermination(void)
1174 {
1175   do {
1176     rtsPacket p = GetPacket();
1177     processUnexpected(p);
1178   } while (rtsTrue);
1179 }
1180
1181 #ifdef DEBUG
1182 //@cindex DebugPrintGAGAMap
1183 void
1184 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1185 {
1186   int i;
1187   
1188   for (i = 0; i < nGAs; ++i, gagamap += 2)
1189     fprintf(stderr, "gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1190             gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1191             gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1192 }
1193 #endif
1194
1195 //@cindex freeMsgBuffer
1196 static StgWord **freeMsgBuffer = NULL;
1197 //@cindex freeMsgIndex
1198 static int      *freeMsgIndex  = NULL;
1199
1200 //@cindex prepareFreeMsgBuffers
1201 void
1202 prepareFreeMsgBuffers(void)
1203 {
1204   int i;
1205   
1206   /* Allocate the freeMsg buffers just once and then hang onto them. */
1207   if (freeMsgIndex == NULL) {
1208     freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), 
1209                                           "prepareFreeMsgBuffers (Index)");
1210     freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
1211                                           "prepareFreeMsgBuffers (Buffer)");
1212     
1213     for(i = 0; i < nPEs; i++) 
1214       if (i != thisPE) 
1215         freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1216                                                "prepareFreeMsgBuffers (Buffer #i)");
1217   }
1218   
1219   /* Initialize the freeMsg buffer pointers to point to the start of their
1220      buffers */
1221   for (i = 0; i < nPEs; i++)
1222     freeMsgIndex[i] = 0;
1223 }
1224
1225 //@cindex freeRemoteGA
1226 void
1227 freeRemoteGA(int pe, globalAddr *ga)
1228 {
1229   int i;
1230   
1231   ASSERT(GALAlookup(ga) == NULL);
1232   
1233   if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1234     IF_PAR_DEBUG(free,
1235                  belch("Filled a free message buffer (sending remaining messages indivisually)"));      
1236
1237     sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1238     i = 0;
1239   }
1240   freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1241   freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1242   freeMsgIndex[pe] = i;
1243
1244 #ifdef DEBUG
1245   ga->weight = 0x0f0f0f0f;
1246   ga->payload.gc.gtid = 0x666;
1247   ga->payload.gc.slot = 0xdeaddead;
1248 #endif
1249 }
1250
1251 //@cindex sendFreeMessages
1252 void
1253 sendFreeMessages(void)
1254 {
1255   int i;
1256   
1257   for (i = 0; i < nPEs; i++) 
1258     if (freeMsgIndex[i] > 0)
1259       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1260 }
1261
1262 #endif /* PAR -- whole file */
1263
1264 //@node Index,  , Miscellaneous Functions, High Level Communications Routines
1265 //@subsection Index
1266
1267 //@index
1268 //* ACK::  @cindex\s-+ACK
1269 //* DebugPrintGAGAMap::  @cindex\s-+DebugPrintGAGAMap
1270 //* FETCH::  @cindex\s-+FETCH
1271 //* FISH::  @cindex\s-+FISH
1272 //* FREE::  @cindex\s-+FREE
1273 //* RESUME::  @cindex\s-+RESUME
1274 //* SCHEDULE::  @cindex\s-+SCHEDULE
1275 //* blockFetch::  @cindex\s-+blockFetch
1276 //* choosePE::  @cindex\s-+choosePE
1277 //* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
1278 //* freeMsgIndex::  @cindex\s-+freeMsgIndex
1279 //* freeRemoteGA::  @cindex\s-+freeRemoteGA
1280 //* gumPackBuffer::  @cindex\s-+gumPackBuffer
1281 //* initMoreBuffers::  @cindex\s-+initMoreBuffers
1282 //* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
1283 //* processAck::  @cindex\s-+processAck
1284 //* processFetch::  @cindex\s-+processFetch
1285 //* processFetches::  @cindex\s-+processFetches
1286 //* processFish::  @cindex\s-+processFish
1287 //* processFree::  @cindex\s-+processFree
1288 //* processMessages::  @cindex\s-+processMessages
1289 //* processResume::  @cindex\s-+processResume
1290 //* processSchedule::  @cindex\s-+processSchedule
1291 //* sendAck::  @cindex\s-+sendAck
1292 //* sendFetch::  @cindex\s-+sendFetch
1293 //* sendFish::  @cindex\s-+sendFish
1294 //* sendFree::  @cindex\s-+sendFree
1295 //* sendFreeMessages::  @cindex\s-+sendFreeMessages
1296 //* sendResume::  @cindex\s-+sendResume
1297 //* sendSchedule::  @cindex\s-+sendSchedule
1298 //* unpackAck::  @cindex\s-+unpackAck
1299 //* unpackFetch::  @cindex\s-+unpackFetch
1300 //* unpackFish::  @cindex\s-+unpackFish
1301 //* unpackFree::  @cindex\s-+unpackFree
1302 //* unpackResume::  @cindex\s-+unpackResume
1303 //* unpackSchedule::  @cindex\s-+unpackSchedule
1304 //* waitForTermination::  @cindex\s-+waitForTermination
1305 //@end index