/**************************************************************** * * * High Level Communications Routines (HLComms.lc) * * * * Contains the high-level routines (i.e. communication * * subsystem independent) used by GUM * * (c) The Parade/AQUA Projects, Glasgow University, 1995 * * Phil Trinder, Glasgow University, 12 December 1994 * * * *****************************************************************/ \begin{code} #ifdef PAR /* whole file */ #define NON_POSIX_SOURCE /* so says Solaris */ #include "rtsdefs.h" #include "HLC.h" \end{code} \section{GUM Message Sending and Unpacking Functions} @SendFetch@ packs the two global addresses and a load into a message + sends it. \begin{code} static W_ *gumPackBuffer; void InitMoreBuffers(STG_NO_ARGS) { gumPackBuffer = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers"); } void sendFetch(rga, lga, load) globalAddr *rga, *lga; int load; { CostCentre Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; ASSERT(rga->weight > 0 && lga->weight > 0); #ifdef FETCH_DEBUG fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", rga->loc.gc.gtid, rga->loc.gc.slot, load); #endif SendOpV(PP_FETCH, rga->loc.gc.gtid, 6, (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load); CCC = Save_CCC; } \end{code} @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure. \begin{code} static void unpackFetch(globalAddr *lga, globalAddr *rga, int *load) { long buf[6]; GetArgs(buf, 6); lga->weight = 1; lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0]; lga->loc.gc.slot = (int) buf[1]; rga->weight = (unsigned) buf[2]; rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3]; rga->loc.gc.slot = (int) buf[4]; *load = (int) buf[5]; ASSERT(rga->weight > 0); } \end{code} @SendResume@ packs the remote blocking queue's GA and data into a message and sends it. \begin{code} void sendResume(rga, nelem, data) globalAddr *rga; int nelem; P_ data; { CostCentre Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; #ifdef RESUME_DEBUG PrintPacket(data); fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight); #endif SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2, (W_) rga->weight, (W_) rga->loc.gc.slot); CCC = Save_CCC; } \end{code} @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole. \begin{code} static void blockFetch(P_ bf, P_ bh) { switch (INFO_TYPE(INFO_PTR(bh))) { case INFO_BH_TYPE: BF_LINK(bf) = Nil_closure; SET_INFO_PTR(bh, BQ_info); BQ_ENTRIES(bh) = (W_) bf; #ifdef GC_MUT_REQUIRED /* * If we modify a black hole in the old generation, we have to * make sure it goes on the mutables list */ if (bh <= StorageMgrInfo.OldLim) { MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables; StorageMgrInfo.OldMutables = bh; } else MUT_LINK(bh) = MUT_NOT_LINKED; #endif break; case INFO_BQ_TYPE: BF_LINK(bf) = (P_) BQ_ENTRIES(bh); BQ_ENTRIES(bh) = (W_) bf; break; case INFO_FMBQ_TYPE: BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh); FMBQ_ENTRIES(bh) = (W_) bf; break; case INFO_SPEC_RBH_TYPE: BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh); SPEC_RBH_BQ(bh) = (W_) bf; break; case INFO_GEN_RBH_TYPE: BF_LINK(bf) = (P_) GEN_RBH_BQ(bh); GEN_RBH_BQ(bh) = (W_) bf; break; default: fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n", (W_) bh, INFO_PTR(bh)); EXIT(EXIT_FAILURE); } } \end{code} @processFetches@ constructs and sends resume messages for every @BlockedFetch@ which is ready to be awakened. \begin{code} extern P_ PendingFetches; void processFetches() { P_ bf; P_ next; P_ closure; P_ ip; globalAddr rga; for (bf = PendingFetches; bf != Nil_closure; bf = next) { next = BF_LINK(bf); /* * Find the target at the end of the indirection chain, and * process it in much the same fashion as the original target * of the fetch. Though we hope to find graph here, we could * find a black hole (of any flavor) or even a FetchMe. */ closure = BF_NODE(bf); while (IS_INDIRECTION(INFO_PTR(closure))) closure = (P_) IND_CLOSURE_PTR(closure); ip = (P_) INFO_PTR(closure); if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) { /* Forward the Fetch to someone else */ rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf); rga.loc.gc.slot = (int) BF_SLOT(bf); rga.weight = (unsigned) BF_WEIGHT(bf); sendFetch(FETCHME_GA(closure), &rga, 0 /* load */); } else if (IS_BLACK_HOLE(ip)) { BF_NODE(bf) = closure; blockFetch(bf, closure); } else { /* We now have some local graph to send back */ W_ size; P_ graph; if ((graph = PackNearbyGraph(closure, &size)) == NULL) { PendingFetches = bf; ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse); SAVE_Hp -= PACK_HEAP_REQUIRED; bf = PendingFetches; next = BF_LINK(bf); closure = BF_NODE(bf); graph = PackNearbyGraph(closure, &size); ASSERT(graph != NULL); } rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf); rga.loc.gc.slot = (int) BF_SLOT(bf); rga.weight = (unsigned) BF_WEIGHT(bf); sendResume(&rga, size, graph); } } PendingFetches = Nil_closure; } \end{code} @unpackResume@ unpacks a Resume message into two Global addresses and a data array. \begin{code} static void unpackResume(globalAddr *lga, int *nelem, W_ *data) { long buf[3]; GetArgs(buf, 3); lga->weight = (unsigned) buf[0]; lga->loc.gc.gtid = mytid; lga->loc.gc.slot = (int) buf[1]; *nelem = (int) buf[2]; GetArgs(data, *nelem); } \end{code} @SendAck@ packs the global address being acknowledged, together with an array of global addresses for any closures shipped and sends them. \begin{code} void sendAck(task, ngas, gagamap) GLOBAL_TASK_ID task; int ngas; globalAddr *gagamap; { static long *buffer; long *p; int i; CostCentre Save_CCC = CCC; buffer = (long *) gumPackBuffer; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; for(i = 0, p = buffer; i < ngas; i++, p += 6) { ASSERT(gagamap[1].weight > 0); p[0] = (long) gagamap->weight; p[1] = (long) gagamap->loc.gc.gtid; p[2] = (long) gagamap->loc.gc.slot; gagamap++; p[3] = (long) gagamap->weight; p[4] = (long) gagamap->loc.gc.gtid; p[5] = (long) gagamap->loc.gc.slot; gagamap++; } #ifdef ACK_DEBUG fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task); #endif SendOpN(PP_ACK, task, p - buffer, buffer); CCC = Save_CCC; } \end{code} @unpackAck@ unpacks an Acknowledgement message into a Global address, a count of the number of global addresses following and a map of Global addresses \begin{code} static void unpackAck(int *ngas, globalAddr *gagamap) { long GAarraysize; long buf[6]; GetArgs(&GAarraysize, 1); *ngas = GAarraysize / 6; while (GAarraysize > 0) { GetArgs(buf, 6); gagamap->weight = (unsigned) buf[0]; gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1]; gagamap->loc.gc.slot = (int) buf[2]; gagamap++; gagamap->weight = (unsigned) buf[3]; gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4]; gagamap->loc.gc.slot = (int) buf[5]; ASSERT(gagamap->weight > 0); gagamap++; GAarraysize -= 6; } } \end{code} @SendFish@ packs the global address being acknowledged, together with an array of global addresses for any closures shipped and sends them. \begin{code} void sendFish(destPE, origPE, age, history, hunger) GLOBAL_TASK_ID destPE, origPE; int age, history, hunger; { CostCentre Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; #ifdef FISH_DEBUG fprintf(stderr,"Sending Fish to %lx\n", destPE); #endif SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger); if (origPE == mytid) fishing = rtsTrue; CCC = Save_CCC; } \end{code} @unpackFish@ unpacks a FISH message into the global task id of the originating PE and 3 data fields: the age, history and hunger of the fish. The history + hunger are not currently used. \begin{code} static void unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger) { long buf[4]; GetArgs(buf, 4); *origPE = (GLOBAL_TASK_ID) buf[0]; *age = (int) buf[1]; *history = (int) buf[2]; *hunger = (int) buf[3]; } \end{code} @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references to. \begin{code} void sendFree(pe, nelem, data) GLOBAL_TASK_ID pe; int nelem; P_ data; { CostCentre Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; #ifdef FREE_DEBUG fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe); #endif SendOpN(PP_FREE, pe, nelem, data); CCC = Save_CCC; } \end{code} @unpackFree@ unpacks a FREE message into the amount of data shipped and a data block. \begin{code} static void unpackFree(int *nelem, W_ *data) { long buf[1]; GetArgs(buf, 1); *nelem = (int) buf[0]; GetArgs(data, *nelem); } \end{code} @SendSchedule@ sends a closure to be evaluated in response to a Fish message. The message is directed to the PE that originated the Fish (origPE), and includes the packed closure (data) along with its size (nelem). \begin{code} void sendSchedule(origPE, nelem, data) GLOBAL_TASK_ID origPE; int nelem; P_ data; { CostCentre Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); CCC->scc_count++; #ifdef SCHEDULE_DEBUG PrintPacket(data); fprintf(stderr, "Sending Schedule to %x\n", origPE); #endif SendOpN(PP_SCHEDULE, origPE, nelem, data); CCC = Save_CCC; } \end{code} @unpackSchedule@ unpacks a SCHEDULE message into the Global address of the closure shipped, the amount of data shipped (nelem) and the data block (data). \begin{code} static void unpackSchedule(int *nelem, W_ *data) { long buf[1]; GetArgs(buf, 1); *nelem = (int) buf[0]; GetArgs(data, *nelem); } \end{code} \section{Message-Processing Functions} The following routines process incoming GUM messages. Often reissuing messages in response. @processFish@ unpacks a fish message, reissuing it if it's our own, sending work if we have it or sending it onwards otherwise. \begin{code} static void processFish(STG_NO_ARGS) { GLOBAL_TASK_ID origPE; int age, history, hunger; unpackFish(&origPE, &age, &history, &hunger); if (origPE == mytid) { fishing = rtsFalse; } else { P_ spark; while ((spark = FindLocalSpark(rtsTrue)) != NULL) { W_ size; P_ graph; if ((graph = PackNearbyGraph(spark, &size)) == NULL) { ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse); SAVE_Hp -= PACK_HEAP_REQUIRED; /* Now go back and try again */ } else { sendSchedule(origPE, size, graph); DisposeSpark(spark); break; } } if (spark == NULL) { /* We have no sparks to give */ if (age < FISH_LIFE_EXPECTANCY) sendFish(choosePE(), origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER); /* Send it home to die */ else sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER); } } } /* processFish */ \end{code} @processFetch@ either returns the requested data (if available) or blocks the remote blocking queue on a black hole (if not). \begin{code} static void processFetch(STG_NO_ARGS) { globalAddr ga, rga; int load; P_ closure; P_ ip; unpackFetch(&ga, &rga, &load); #ifdef FETCH_DEBUG fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n", ga.loc.gc.gtid, ga.loc.gc.slot, rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load); #endif closure = GALAlookup(&ga); ip = (P_) INFO_PTR(closure); if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) { /* Forward the Fetch to someone else */ sendFetch(FETCHME_GA(closure), &rga, load); } else if (rga.loc.gc.gtid == mytid) { /* Our own FETCH forwarded back around to us */ P_ fmbq = GALAlookup(&rga); /* We may have already discovered that the fetch target is our own. */ if (fmbq != closure) CommonUp(fmbq, closure); (void) addWeight(&rga); } else if (IS_BLACK_HOLE(ip)) { /* This includes RBH's and FMBQ's */ P_ bf; if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) { ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse); closure = GALAlookup(&ga); bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1; } ASSERT(GALAlookup(&rga) == NULL); SET_BF_HDR(bf, BF_info, bogosity); BF_NODE(bf) = closure; BF_GTID(bf) = (W_) rga.loc.gc.gtid; BF_SLOT(bf) = (W_) rga.loc.gc.slot; BF_WEIGHT(bf) = (W_) rga.weight; blockFetch(bf, closure); #ifdef FETCH_DEBUG fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n", rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure); #endif } else { /* The target of the FetchMe is some local graph */ W_ size; P_ graph; if ((graph = PackNearbyGraph(closure, &size)) == NULL) { ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse); SAVE_Hp -= PACK_HEAP_REQUIRED; closure = GALAlookup(&ga); graph = PackNearbyGraph(closure, &size); ASSERT(graph != NULL); } sendResume(&rga, size, graph); } } \end{code} @processFree@ unpacks a FREE message and adds the weights to our GAs. \begin{code} static void processFree(STG_NO_ARGS) { int nelem; static W_ *freeBuffer; int i; globalAddr ga; freeBuffer = gumPackBuffer; unpackFree(&nelem, freeBuffer); #ifdef FREE_DEBUG fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2); #endif ga.loc.gc.gtid = mytid; for (i = 0; i < nelem;) { ga.weight = (unsigned) freeBuffer[i++]; ga.loc.gc.slot = (int) freeBuffer[i++]; #ifdef FREE_DEBUG fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight); #endif (void) addWeight(&ga); } } \end{code} @processResume@ unpacks a RESUME message into the graph, filling in the LA -> GA, and GA -> LA tables. Threads blocked on the original @FetchMe@ (now a blocking queue) are awakened, and the blocking queue is converted into an indirection. Finally it sends an ACK in response which contains any newly allocated GAs. \begin{code} static void processResume(GLOBAL_TASK_ID sender) { int nelem; W_ nGAs; static W_ *packBuffer; P_ newGraph; P_ old; globalAddr lga; globalAddr *gagamap; packBuffer = gumPackBuffer; unpackResume(&lga, &nelem, packBuffer); #ifdef RESUME_DEBUG fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n", lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight); PrintPacket(packBuffer); #endif /* * We always unpack the incoming graph, even if we've received the * requested node in some other data packet (and already awakened * the blocking queue). */ if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) { ReallyPerformThreadGC(packBuffer[0], rtsFalse); SAVE_Hp -= packBuffer[0]; } /* Do this *after* GC; we don't want to release the object early! */ if (lga.weight > 0) (void) addWeight(&lga); old = GALAlookup(&lga); if (RTSflags.ParFlags.granSimStats) { P_ tso = NULL; if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) { for(tso = (P_) FMBQ_ENTRIES(old); TSO_LINK(tso) != Nil_closure; tso = TSO_LINK(tso)) ; } DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); } newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs); ASSERT(newGraph != NULL); /* * Sometimes, unpacking will common up the resumee with the * incoming graph, but if it hasn't, we'd better do so now. */ if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) CommonUp(old, newGraph); #ifdef RESUME_DEBUG DebugPrintGAGAMap(gagamap, nGAs); #endif sendAck(sender, nGAs, gagamap); } \end{code} @processSchedule@ unpacks a SCHEDULE message into the graph, filling in the LA -> GA, and GA -> LA tables. The root of the graph is added to the local spark queue. Finally it sends an ACK in response which contains any newly allocated GAs. \begin{code} static void processSchedule(GLOBAL_TASK_ID sender) { int nelem; int space_required; rtsBool success; static W_ *packBuffer; W_ nGAs; P_ newGraph; globalAddr *gagamap; packBuffer = gumPackBuffer; /* HWL */ unpackSchedule(&nelem, packBuffer); #ifdef SCHEDULE_DEBUG fprintf(stderr, "Rcvd Schedule\n"); PrintPacket(packBuffer); #endif /* * For now, the graph is a closure to be sparked as an advisory * spark, but in future it may be a complete spark with * required/advisory status, priority etc. */ space_required = packBuffer[0]; if (SAVE_Hp + space_required >= SAVE_HpLim) { ReallyPerformThreadGC(space_required, rtsFalse); SAVE_Hp -= space_required; } newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs); ASSERT(newGraph != NULL); success = Spark(newGraph, rtsFalse); ASSERT(success); #ifdef SCHEDULE_DEBUG DebugPrintGAGAMap(gagamap, nGAs); #endif if (nGAs > 0) sendAck(sender, nGAs, gagamap); fishing = rtsFalse; } \end{code} @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's (which represent shared thunks that have been shipped) into fetch-mes to remote GAs. \begin{code} static void processAck(STG_NO_ARGS) { int nGAs; globalAddr *gaga; globalAddr gagamap[MAX_GAS * 2]; unpackAck(&nGAs, gagamap); #ifdef ACK_DEBUG fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs); DebugPrintGAGAMap(gagamap, nGAs); #endif /* * For each (oldGA, newGA) pair, set the GA of the corresponding * thunk to the newGA, convert the thunk to a FetchMe, and return * the weight from the oldGA. */ for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) { P_ old = GALAlookup(gaga); P_ new = GALAlookup(gaga + 1); if (new == NULL) { /* We don't have this closure, so we make a fetchme for it */ globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue); convertToFetchMe(old, ga); } else { /* * Oops...we've got this one already; update the RBH to * point to the object we already know about, whatever it * happens to be. */ CommonUp(old, new); /* * Increase the weight of the object by the amount just * received in the second part of the ACK pair. */ (void) addWeight(gaga + 1); } (void) addWeight(gaga); } } \end{code} \section{GUM Message Processor} @processMessages@ processes any messages that have arrived, calling appropriate routines depending on the message tag (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages present and performs a blocking receive! During profiling it busy-waits in order to record idle time. \begin{code} void processMessages(STG_NO_ARGS) { PACKET packet; OPCODE opcode; CostCentre Save_CCC; /* Temporary Test Definitions */ GLOBAL_TASK_ID task; Save_CCC = CCC; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); do { if (RTSflags.CcFlags.doCostCentres) { CCC = (CostCentre)STATIC_CC_REF(CC_IDLE); CCC->scc_count++; while (!PacketsWaiting()) /*busy-wait loop*/; CCC = (CostCentre)STATIC_CC_REF(CC_MSG); } packet = GetPacket(); /* Get next message; block until one available */ CCC->scc_count++; get_opcode_and_sender(packet, &opcode, &task); switch (opcode) { case PP_FINISH: EXIT(EXIT_SUCCESS); /* The computation has been completed by someone * else */ break; case PP_FETCH: processFetch(); break; case PP_RESUME: processResume(task); break; case PP_ACK: processAck(); break; case PP_FISH: processFish(); break; case PP_FREE: processFree(); break; case PP_SCHEDULE: processSchedule(task); break; default: /* Anything we're not prepared to deal with. */ fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n", mytid, opcode, task); EXIT(EXIT_FAILURE); } /* switch */ } while (PacketsWaiting()); /* While there are messages: process them */ CCC = Save_CCC; } /* processMessages */ \end{code} \section{Exception Handlers} @Comms_Harness_Exception@ is an exception handler that tests out the different GUM messages. \begin{code} void Comms_Harness_Exception(packet) PACKET packet; { int i, load; globalAddr ga,bqga; /* GLOBAL_TASK_ID sender = Sender_Task(packet); */ OPCODE opcode = Opcode(packet); GLOBAL_TASK_ID task; /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */ switch (opcode) { case PP_FINISH: EXIT(EXIT_SUCCESS); break; case PP_FETCH: { W_ data[11]; get_opcode_and_sender(packet,&opcode,&task); fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task ); unpackFetch(&ga,&bqga,&load); fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n", ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load); /*Send Resume in Response*/ for (i=0; i <= 10; ++i) data[i] = i; sendResume(&bqga,11,data); } break; case PP_ACK: { int nGAs; globalAddr gagamap[MAX_GAS*2]; get_opcode_and_sender(packet,&opcode,&task); fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task ); unpackAck(&nGAs,gagamap); #ifdef DEBUG DebugPrintGAGAMap(gagamap,nGAs); #endif } break; case PP_FISH: { GLOBAL_TASK_ID origPE; int age, history, hunger; globalAddr testGA; StgWord testData[6]; get_opcode_and_sender(packet,&opcode,&task); fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task ); unpackFish(&origPE, &age, &history, &hunger); fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n", origPE, age, history, hunger); testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024; for (i=0; i <= 5; ++i) testData[i] = 40+i; sendSchedule(origPE,6,testData); } break; case PP_SCHEDULE: { /* Test variables */ int nelem; int testData[6]; get_opcode_and_sender(packet,&opcode,&task); fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task ); unpackSchedule(&nelem, &testData); fprintf(stderr,"In PE, nelem = %d \n", nelem); for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]); fprintf(stderr,"\n"); } break; /* Anything we're not prepared to deal with. Note that ALL * opcodes are discarded during termination -- this helps * prevent bizarre race conditions. */ default: if (!GlobalStopPending) { GLOBAL_TASK_ID ErrorTask; int opcode; get_opcode_and_sender(packet,&opcode,&ErrorTask); fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n", mytid, opcode, ErrorTask ); PEShutDown(); EXIT(EXIT_FAILURE); } } } \end{code} @STG_Exception@ handles real communication exceptions \begin{code} void STG_Exception(packet) PACKET packet; { GLOBAL_TASK_ID sender = Sender_Task(packet); OPCODE opcode = Opcode(packet); fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); switch (opcode) { case PP_FINISH: EXIT(EXIT_SUCCESS); break; /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded during termination -- this helps prevent bizarre race conditions. */ default: if (!GlobalStopPending) { GLOBAL_TASK_ID ErrorTask; int opcode; get_opcode_and_sender(packet,&opcode,&ErrorTask); fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n", mytid, opcode, ErrorTask ); EXIT(EXIT_FAILURE); } } } \end{code} \section{Miscellaneous Functions} @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'. Important properties: o it varies during execution, even if the PE is idle o it's different for each PE o we never send a fish to ourselves \begin{code} extern long lrand48 (STG_NO_ARGS); GLOBAL_TASK_ID choosePE(STG_NO_ARGS) { long temp; temp = lrand48() % nPEs; if (PEs[temp] == mytid) { /* Never send a FISH to yourself */ temp = (temp + 1) % nPEs; } return PEs[temp]; } \end{code} @WaitForTermination@ enters an infinite loop waiting for the termination sequence to be completed. \begin{code} void WaitForTermination(STG_NO_ARGS) { do { PACKET p = GetPacket(); HandleException(p); } while (rtsTrue); } \end{code} \begin{code} #ifdef DEBUG void DebugPrintGAGAMap(gagamap, nGAs) globalAddr *gagamap; int nGAs; { int i; for (i = 0; i < nGAs; ++i, gagamap += 2) fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i, gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight, gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight); } #endif \end{code} \begin{code} static PP_ freeMsgBuffer = NULL; static int *freeMsgIndex = NULL; void prepareFreeMsgBuffers(STG_NO_ARGS) { int i; /* Allocate the freeMsg buffers just once and then hang onto them. */ if (freeMsgIndex == NULL) { freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)"); freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)"); for(i = 0; i < nPEs; i++) { if (i != thisPE) { freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize, "prepareFreeMsgBuffers (Buffer #i)"); } } } /* Initialize the freeMsg buffer pointers to point to the start of their buffers */ for (i = 0; i < nPEs; i++) freeMsgIndex[i] = 0; } void freeRemoteGA(pe, ga) int pe; globalAddr *ga; { int i; ASSERT(GALAlookup(ga) == NULL); if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) { #ifdef FREE_DEBUG fprintf(stderr, "Filled a free message buffer\n"); #endif sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]); i = 0; } freeMsgBuffer[pe][i++] = (W_) ga->weight; freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot; freeMsgIndex[pe] = i; #ifdef DEBUG ga->weight = 0x0f0f0f0f; ga->loc.gc.gtid = 0x666; ga->loc.gc.slot = 0xdeaddead; #endif } void sendFreeMessages(STG_NO_ARGS) { int i; for (i = 0; i < nPEs; i++) { if (freeMsgIndex[i] > 0) sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]); } } #endif /* PAR -- whole file */ \end{code}