1 /* -----------------------------------------------------------------------------
3 * $Id: HLComms.c,v 1.2 1998/12/02 13:29:05 simonm Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * Phil Trinder, Glasgow University, 12 December 1994
12 * Phil Trinder, Simon Marlow July 1998
14 * -------------------------------------------------------------------------- */
16 #ifdef PAR /* whole file */
19 #define NON_POSIX_SOURCE /* so says Solaris */
30 * GUM Message Sending and Unpacking Functions
31 * ********************************************
35 * Allocate space for message processing
38 static W_ *gumPackBuffer;
44 = (W_ *) stgMallocWords(RtsFlags.ParFlags.packBufferSize, "initMoreBuffers");
48 *SendFetch packs the two global addresses and a load into a message +
53 sendFetch(globalAddr *rga, globalAddr *lga, int load)
56 ASSERT(rga->weight > 0 && lga->weight > 0);
58 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
59 rga->loc.gc.gtid, rga->loc.gc.slot, load);
61 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
62 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
63 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
67 *unpackFetch unpacks a FETCH message into two Global addresses and a load figure.
71 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
77 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
78 lga->loc.gc.slot = (int) buf[1];
80 rga->weight = (unsigned) buf[2];
81 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
82 rga->loc.gc.slot = (int) buf[4];
86 ASSERT(rga->weight > 0);
90 * SendResume packs the remote blocking queue's GA and data into a message
95 sendResume(globalAddr *rga, int nelem, StgPtr data)
100 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
101 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105 (W_) rga->weight, (W_) rga->loc.gc.slot);
110 * blockFetch blocks a BlockedFetch node on some kind of black hole.
113 blockFetch(StgPtr bf, StgPtr bh)
117 * Empty until Blocked fetches etc defined
118 * switch (INFO_TYPE(INFO_PTR(bh))) {
120 * BF_LINK(bf) = PrelBase_Z91Z93_closure;
121 * SET_INFO_PTR(bh, BQ_info);
122 * BQ_ENTRIES(bh) = (W_) bf;
124 *#ifdef GC_MUT_REQUIRED
126 * * If we modify a black hole in the old generation, we have to
127 * * make sure it goes on the mutables list
130 * if (bh <= StorageMgrInfo.OldLim) {
131 * MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
132 * StorageMgrInfo.OldMutables = bh;
134 * MUT_LINK(bh) = MUT_NOT_LINKED;
138 * BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
139 * BQ_ENTRIES(bh) = (W_) bf;
141 * case INFO_FMBQ_TYPE:
142 * BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
143 * FMBQ_ENTRIES(bh) = (W_) bf;
145 * case INFO_SPEC_RBH_TYPE:
146 * BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
147 * SPEC_RBH_BQ(bh) = (W_) bf;
149 * case INFO_GEN_RBH_TYPE:
150 * BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
151 * GEN_RBH_BQ(bh) = (W_) bf;
154 * fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
155 * (W_) bh, INFO_PTR(bh));
156 * EXIT(EXIT_FAILURE);
162 * processFetches constructs and sends resume messages for every
163 * BlockedFetch which is ready to be awakened.
165 extern P_ PendingFetches;
171 * Empty till closure defined
178 * for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
179 * next = BF_LINK(bf);
182 * * Find the target at the end of the indirection chain, and
183 * * process it in much the same fashion as the original target
184 * * of the fetch. Though we hope to find graph here, we could
185 * * find a black hole (of any flavor) or even a FetchMe.
187 * closure = BF_NODE(bf);
188 * while (IS_INDIRECTION(INFO_PTR(closure)))
189 * closure = (P_) IND_CLOSURE_PTR(closure);
190 * ip = (P_) INFO_PTR(closure);
192 * if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
193 * /* Forward the Fetch to someone else *
194 * rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
195 * rga.loc.gc.slot = (int) BF_SLOT(bf);
196 * rga.weight = (unsigned) BF_WEIGHT(bf);
198 * sendFetch(FETCHME_GA(closure), &rga, 0 /* load *);
199 * } else if (IS_BLACK_HOLE(ip)) {
200 * BF_NODE(bf) = closure;
201 * blockFetch(bf, closure);
203 * /* We now have some local graph to send back *
207 * if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
208 * PendingFetches = bf;
209 * ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
210 * SAVE_Hp -= PACK_HEAP_REQUIRED;
211 * bf = PendingFetches;
212 * next = BF_LINK(bf);
213 * closure = BF_NODE(bf);
214 * graph = PackNearbyGraph(closure, &size);
215 * ASSERT(graph != NULL);
217 * rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
218 * rga.loc.gc.slot = (int) BF_SLOT(bf);
219 * rga.weight = (unsigned) BF_WEIGHT(bf);
221 * sendResume(&rga, size, graph);
224 * PendingFetches = PrelBase_Z91Z93_closure;
229 * unpackResume unpacks a Resume message into two Global addresses and
234 unpackResume(globalAddr *lga, int *nelem, W_ *data)
239 lga->weight = (unsigned) buf[0];
240 lga->loc.gc.gtid = mytid;
241 lga->loc.gc.slot = (int) buf[1];
243 *nelem = (int) buf[2];
244 GetArgs(data, *nelem);
248 *SendAck packs the global address being acknowledged, together with
249 *an array of global addresses for any closures shipped and sends them.
253 sendAck(GLOBAL_TASK_ID task, int ngas, globalAddr *gagamap)
259 buffer = (long *) gumPackBuffer;
261 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
262 ASSERT(gagamap[1].weight > 0);
263 p[0] = (long) gagamap->weight;
264 p[1] = (long) gagamap->loc.gc.gtid;
265 p[2] = (long) gagamap->loc.gc.slot;
267 p[3] = (long) gagamap->weight;
268 p[4] = (long) gagamap->loc.gc.gtid;
269 p[5] = (long) gagamap->loc.gc.slot;
273 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
275 SendOpN(PP_ACK, task, p - buffer, buffer);
280 *unpackAck unpacks an Acknowledgement message into a Global address,
281 *a count of the number of global addresses following and a map of
286 unpackAck(int *ngas, globalAddr *gagamap)
291 GetArgs(&GAarraysize, 1);
293 *ngas = GAarraysize / 6;
295 while (GAarraysize > 0) {
297 gagamap->weight = (unsigned) buf[0];
298 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
299 gagamap->loc.gc.slot = (int) buf[2];
301 gagamap->weight = (unsigned) buf[3];
302 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
303 gagamap->loc.gc.slot = (int) buf[5];
304 ASSERT(gagamap->weight > 0);
311 *SendFish packs the global address being acknowledged, together with
312 *an array of global addresses for any closures shipped and sends them.
316 sendFish(GLOBAL_TASK_ID destPE, GLOBAL_TASK_ID origPE,
317 int age, int history, int hunger)
321 fprintf(stderr,"Sending Fish to %lx\n", destPE);
323 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
330 *unpackFish unpacks a FISH message into the global task id of the
331 *originating PE and 3 data fields: the age, history and hunger of the
332 *fish. The history + hunger are not currently used.
336 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
342 *origPE = (GLOBAL_TASK_ID) buf[0];
344 *history = (int) buf[2];
345 *hunger = (int) buf[3];
349 *SendFree sends (weight, slot) pairs for GAs that we no longer need references
353 sendFree(GLOBAL_TASK_ID pe, int nelem, StgPtr data)
356 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
358 SendOpN(PP_FREE, pe, nelem, data);
364 *unpackFree unpacks a FREE message into the amount of data shipped and
369 unpackFree(int *nelem, W_ *data)
374 *nelem = (int) buf[0];
375 GetArgs(data, *nelem);
379 *SendSchedule sends a closure to be evaluated in response to a Fish
380 *message. The message is directed to the PE that originated the Fish
381 *(origPE), and includes the packed closure (data) along with its size
386 sendSchedule(GLOBAL_TASK_ID origPE, int nelem, StgPtr data)
388 #ifdef SCHEDULE_DEBUG
390 fprintf(stderr, "Sending Schedule to %x\n", origPE);
393 SendOpN(PP_SCHEDULE, origPE, nelem, data);
397 *unpackSchedule unpacks a SCHEDULE message into the Global address of
398 *the closure shipped, the amount of data shipped (nelem) and the data
403 unpackSchedule(int *nelem, W_ *data)
408 *nelem = (int) buf[0];
409 GetArgs(data, *nelem);
413 *Message-Processing Functions
415 *The following routines process incoming GUM messages. Often reissuing
416 *messages in response.
418 *processFish unpacks a fish message, reissuing it if it's our own,
419 *sending work if we have it or sending it onwards otherwise.
421 * Only stubs now. Real stuff in HLCommsRest PWT
428 * processFetch either returns the requested data (if available)
429 * or blocks the remote blocking queue on a black hole (if not).
436 * processFree unpacks a FREE message and adds the weights to our GAs.
443 * processResume unpacks a RESUME message into the graph, filling in
444 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
445 * FetchMe (now a blocking queue) are awakened, and the blocking queue
446 * is converted into an indirection. Finally it sends an ACK in response
447 * which contains any newly allocated GAs.
451 processResume(GLOBAL_TASK_ID sender)
455 * processSchedule unpacks a SCHEDULE message into the graph, filling
456 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
457 * the local spark queue. Finally it sends an ACK in response
458 * which contains any newly allocated GAs.
461 processSchedule(GLOBAL_TASK_ID sender)
466 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
467 * (which represent shared thunks that have been shipped) into fetch-mes
475 * GUM Message Processor
477 * processMessages processes any messages that have arrived, calling
478 * appropriate routines depending on the message tag
479 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
480 * present and performs a blocking receive! During profiling it
481 * busy-waits in order to record idle time.
485 processMessages(void)
493 packet = GetPacket(); /* Get next message; block until one available */
495 get_opcode_and_sender(packet, &opcode, &task);
500 stg_exit(EXIT_SUCCESS); /* The computation has been completed by someone
525 processSchedule(task);
529 /* Anything we're not prepared to deal with. */
530 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
531 mytid, opcode, task);
533 stg_exit(EXIT_FAILURE);
536 } while (PacketsWaiting()); /* While there are messages: process them */
537 } /* processMessages */
540 * Miscellaneous Functions
543 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
544 * Important properties:
545 * - it varies during execution, even if the PE is idle
546 * - it's different for each PE
547 * - we never send a fish to ourselves
549 extern long lrand48 (void);
556 temp = lrand48() % nPEs;
557 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
558 temp = (temp + 1) % nPEs;
564 *WaitForTermination enters a loop ignoring spurious messages while waiting for the
565 *termination sequence to be completed.
568 WaitForTermination(void)
571 PACKET p = GetPacket();
572 ProcessUnexpected(p);
578 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
582 for (i = 0; i < nGAs; ++i, gagamap += 2)
583 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
584 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
585 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
589 static PP_ freeMsgBuffer = NULL;
590 static int *freeMsgIndex = NULL;
593 prepareFreeMsgBuffers(void)
597 /* Allocate the freeMsg buffers just once and then hang onto them. */
599 if (freeMsgIndex == NULL) {
601 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
602 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
604 for(i = 0; i < nPEs; i++) {
606 freeMsgBuffer[i] = (P_) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
607 "prepareFreeMsgBuffers (Buffer #i)");
612 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
613 for (i = 0; i < nPEs; i++)
618 freeRemoteGA(int pe, globalAddr *ga)
622 ASSERT(GALAlookup(ga) == NULL);
624 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
626 fprintf(stderr, "Filled a free message buffer\n");
628 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
631 freeMsgBuffer[pe][i++] = (W_) ga->weight;
632 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
633 freeMsgIndex[pe] = i;
635 ga->weight = 0x0f0f0f0f;
636 ga->loc.gc.gtid = 0x666;
637 ga->loc.gc.slot = 0xdeaddead;
642 sendFreeMessages(void)
646 for (i = 0; i < nPEs; i++) {
647 if (freeMsgIndex[i] > 0)
648 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
652 /* Process messaging code ripped out for the time being -- SDM & PWT */
655 /* These are the remaining message-processing functions from HLComms*/
659 *Message-Processing Functions
661 *The following routines process incoming GUM messages. Often reissuing
662 *messages in response.
664 *processFish unpacks a fish message, reissuing it if it's our own,
665 *sending work if we have it or sending it onwards otherwise.
670 GLOBAL_TASK_ID origPE;
671 int age, history, hunger;
673 unpackFish(&origPE, &age, &history, &hunger);
675 if (origPE == mytid) {
680 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
684 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
685 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
686 SAVE_Hp -= PACK_HEAP_REQUIRED;
687 /* Now go back and try again */
689 sendSchedule(origPE, size, graph);
695 /* We have no sparks to give */
696 if (age < FISH_LIFE_EXPECTANCY)
697 sendFish(choosePE(), origPE,
698 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
700 /* Send it home to die */
702 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
708 *processFetch either returns the requested data (if available)
709 *or blocks the remote blocking queue on a black hole (if not).
720 unpackFetch(&ga, &rga, &load);
722 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
723 ga.loc.gc.gtid, ga.loc.gc.slot,
724 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
727 closure = GALAlookup(&ga);
728 ip = (P_) INFO_PTR(closure);
730 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
731 /* Forward the Fetch to someone else */
732 sendFetch(FETCHME_GA(closure), &rga, load);
733 } else if (rga.loc.gc.gtid == mytid) {
734 /* Our own FETCH forwarded back around to us */
735 P_ fmbq = GALAlookup(&rga);
737 /* We may have already discovered that the fetch target is our own. */
739 CommonUp(fmbq, closure);
740 (void) addWeight(&rga);
741 } else if (IS_BLACK_HOLE(ip)) {
742 /* This includes RBH's and FMBQ's */
745 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
746 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
747 closure = GALAlookup(&ga);
748 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
750 ASSERT(GALAlookup(&rga) == NULL);
752 SET_BF_HDR(bf, BF_info, bogosity);
753 BF_NODE(bf) = closure;
754 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
755 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
756 BF_WEIGHT(bf) = (W_) rga.weight;
757 blockFetch(bf, closure);
760 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
761 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
765 /* The target of the FetchMe is some local graph */
769 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
770 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
771 SAVE_Hp -= PACK_HEAP_REQUIRED;
772 closure = GALAlookup(&ga);
773 graph = PackNearbyGraph(closure, &size);
774 ASSERT(graph != NULL);
776 sendResume(&rga, size, graph);
781 *processFree unpacks a FREE message and adds the weights to our GAs.
787 static W_ *freeBuffer;
791 freeBuffer = gumPackBuffer;
792 unpackFree(&nelem, freeBuffer);
794 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
796 ga.loc.gc.gtid = mytid;
797 for (i = 0; i < nelem;) {
798 ga.weight = (unsigned) freeBuffer[i++];
799 ga.loc.gc.slot = (int) freeBuffer[i++];
801 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
802 ga.loc.gc.slot, ga.weight);
804 (void) addWeight(&ga);
809 *processResume unpacks a RESUME message into the graph, filling in
810 *the LA -> GA, and GA -> LA tables. Threads blocked on the original
811 *FetchMe (now a blocking queue) are awakened, and the blocking queue
812 *is converted into an indirection. Finally it sends an ACK in response
813 *which contains any newly allocated GAs.
817 processResume(GLOBAL_TASK_ID sender)
821 static W_ *packBuffer;
827 packBuffer = gumPackBuffer;
828 unpackResume(&lga, &nelem, packBuffer);
831 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
832 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
833 PrintPacket(packBuffer);
837 * We always unpack the incoming graph, even if we've received the
838 * requested node in some other data packet (and already awakened
839 * the blocking queue).
841 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
842 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
843 SAVE_Hp -= packBuffer[0];
846 /* Do this *after* GC; we don't want to release the object early! */
849 (void) addWeight(&lga);
851 old = GALAlookup(&lga);
853 if (RtsFlags.ParFlags.granSimStats) {
856 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
857 for(tso = (P_) FMBQ_ENTRIES(old);
858 TSO_LINK(tso) != PrelBase_Z91Z93_closure;
862 /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
863 DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
867 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
868 ASSERT(newGraph != NULL);
871 * Sometimes, unpacking will common up the resumee with the
872 * incoming graph, but if it hasn't, we'd better do so now.
875 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
876 CommonUp(old, newGraph);
879 DebugPrintGAGAMap(gagamap, nGAs);
882 sendAck(sender, nGAs, gagamap);
886 *processSchedule unpacks a SCHEDULE message into the graph, filling
887 *in the LA -> GA, and GA -> LA tables. The root of the graph is added to
888 *the local spark queue. Finally it sends an ACK in response
889 *which contains any newly allocated GAs.
892 processSchedule(GLOBAL_TASK_ID sender)
897 static W_ *packBuffer;
902 packBuffer = gumPackBuffer; /* HWL */
903 unpackSchedule(&nelem, packBuffer);
905 #ifdef SCHEDULE_DEBUG
906 fprintf(stderr, "Rcvd Schedule\n");
907 PrintPacket(packBuffer);
911 * For now, the graph is a closure to be sparked as an advisory
912 * spark, but in future it may be a complete spark with
913 * required/advisory status, priority etc.
916 space_required = packBuffer[0];
917 if (SAVE_Hp + space_required >= SAVE_HpLim) {
918 ReallyPerformThreadGC(space_required, rtsFalse);
919 SAVE_Hp -= space_required;
921 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
922 ASSERT(newGraph != NULL);
923 success = Spark(newGraph, rtsFalse);
926 #ifdef SCHEDULE_DEBUG
927 DebugPrintGAGAMap(gagamap, nGAs);
931 sendAck(sender, nGAs, gagamap);
937 *processAck unpacks an ACK, and uses the GAGA map to convert RBH's
938 *(which represent shared thunks that have been shipped) into fetch-mes
947 globalAddr gagamap[MAX_GAS * 2];
949 unpackAck(&nGAs, gagamap);
952 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
953 DebugPrintGAGAMap(gagamap, nGAs);
957 * For each (oldGA, newGA) pair, set the GA of the corresponding
958 * thunk to the newGA, convert the thunk to a FetchMe, and return
959 * the weight from the oldGA.
961 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
962 P_ old = GALAlookup(gaga);
963 P_ new = GALAlookup(gaga + 1);
966 /* We don't have this closure, so we make a fetchme for it */
967 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
969 convertToFetchMe(old, ga);
972 * Oops...we've got this one already; update the RBH to
973 * point to the object we already know about, whatever it
979 * Increase the weight of the object by the amount just
980 * received in the second part of the ACK pair.
982 (void) addWeight(gaga + 1);
984 (void) addWeight(gaga);
990 #endif /* PAR -- whole file */