1 /* --------------------------------------------------------------------------
2 Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
4 Initialising the parallel RTS
6 An extension based on Kevin Hammond's GRAPH for PVM version
7 P. Trinder, January 17th 1995.
8 Adapted for the new RTS
10 H-W. Loidl, November 1999.
12 ------------------------------------------------------------------------ */
14 #ifdef PAR /* whole file */
18 //* Global variables::
19 //* Initialisation Routines::
22 //@node Includes, Global variables
23 //@subsection Includes
25 /* Evidently not Posix */
26 /* #include "PosixSource.h" */
32 #include "ParallelRts.h"
37 //@node Global variables, Initialisation Routines, Includes
38 //@subsection Global variables
40 /* Global conditions defined here. */
42 rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */
44 /* Task identifiers for various interesting global tasks. */
46 GlobalTaskId IOTask = 0, /* The IO Task Id */
47 SysManTask = 0, /* The System Manager Task Id */
48 mytid = 0; /* This PE's Task Id */
50 rtsTime main_start_time; /* When the program started */
51 rtsTime main_stop_time; /* When the program finished */
52 jmp_buf exit_parallel_system; /* How to abort from the RTS */
55 //rtsBool fishing = rtsFalse; /* We have no fish out in the stream */
56 rtsTime last_fish_arrived_at = 0; /* Time of arrival of most recent fish*/
57 nat outstandingFishes = 0; /* Number of active fishes */
60 /* GranSim: a globally visible array of spark queues */
61 rtsSpark *pending_sparks_hd[SPARK_POOLS], /* ptr to start of a spark pool */
62 *pending_sparks_tl[SPARK_POOLS], /* ptr to end of a spark pool */
63 *pending_sparks_lim[SPARK_POOLS],
64 *pending_sparks_base[SPARK_POOLS];
67 /* max number of sparks permitted on the PE;
68 see RtsFlags.ParFlags.maxLocalSparks */
69 nat spark_limit[SPARK_POOLS];
71 //@cindex PendingFetches
72 /* A list of fetch reply messages not yet processed; this list is filled
73 by awaken_blocked_queue and processed by processFetches */
74 StgBlockedFetch *PendingFetches = END_BF_QUEUE;
82 //@cindex sparksIgnored
83 nat sparksIgnored = 0, sparksCreated = 0,
84 threadsIgnored = 0, threadsCreated = 0;
86 //@cindex advisory_thread_count
87 nat advisory_thread_count = 0;
89 globalAddr theGlobalFromGA;
91 /* For flag handling see RtsFlags.h */
94 //@subsection Prototypes
96 /* Needed for FISH messages (initialisation of random number generator) */
98 time_t time (time_t *);
100 //@node Initialisation Routines, , Global variables
101 //@subsection Initialisation Routines
104 par_exit defines how to terminate the program. If the exit code is
105 non-zero (i.e. an error has occurred), the PE should not halt until
106 outstanding error messages have been processed. Otherwise, messages
107 might be sent to non-existent Task Ids. The infinite loop will actually
108 terminate, since STG_Exception will call myexit\tr{(0)} when
109 it received a PP_FINISH from the system manager task.
111 //@cindex shutdownParallelSystem
113 shutdownParallelSystem(StgInt n)
115 /* use the file specified via -S */
116 FILE *sf = RtsFlags.GcFlags.statsFile;
118 IF_PAR_DEBUG(verbose,
120 belch("==== entered shutdownParallelSystem ...");
122 belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
129 fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored",
130 (W_) mytid, sparksCreated, sparksIgnored,
131 threadsCreated, threadsIgnored);
134 ShutdownEachPEHook();
137 //@cindex initParallelSystem
139 initParallelSystem(void)
141 /* Don't buffer standard channels... */
145 srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
146 /* used to select target of FISH message*/
147 if (!InitPackBuffer())
148 barf("InitPackBuffer");
150 if (!initMoreBuffers())
151 barf("initMoreBuffers");
153 if (!initSparkPools())
154 barf("initSparkPools");
158 * SynchroniseSystem synchronises the reduction task with the system
159 * manager, and initialises the Global address tables (LAGA & GALA)
162 //@cindex synchroniseSystem
164 synchroniseSystem(void)
166 /* Only in debug mode? */
167 fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
169 InitEachPEHook(); /* HWL: hook to be execed on each PE */
171 /* Initialize global address tables */
174 initParallelSystem();
180 Do the startup stuff (this is PVM specific!).
181 Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
182 Called at the beginning of RtsStartup.startupHaskell
185 startupParallelSystem(char *argv[]) {
186 mytid = pvm_mytid(); /* Connect to PVM */
188 if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */
189 IAmMainThread = rtsTrue;
190 sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/
191 argv++; /* Strip off flag argument */
193 SysManTask = pvm_parent();
196 IF_PAR_DEBUG(verbose,
197 fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
198 mytid, IAmMainThread?"Main":"Remote", SysManTask));
200 nPEs = atoi(argv[1]);
204 Exception handler during startup.
207 processUnexpectedMessageDuringStartup(rtsPacket p) {
209 GlobalTaskId sender_id;
211 getOpcodeAndSender(p, &opCode, &sender_id);
223 stg_exit(EXIT_SUCCESS);
226 fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
227 mytid, opCode, getOpName(opCode), sender_id);
235 allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
238 /* Send our tid and IAmMainThread flag back to SysMan */
239 sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);
240 /* Wait until we get the PE-Id table from Sysman */
241 waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup);
243 IF_PAR_DEBUG(verbose,
244 belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
246 /* Digest the PE table we received */
251 processPEtids(void) {
253 nat i, sentPEs, currentPEs;
259 IF_PAR_DEBUG(verbose,
260 belch("==-- processPEtids: starting to iterate over a PVM buffer"));
261 /* ToDo: this has to go into LLComms !!! */
264 ASSERT(sentPEs > currentPEs);
265 ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/
267 for (i = 0; i < sentPEs; i++) {
270 ASSERT(newPE == allPEs[i]);
273 // breaks with PAR && !DEBUG
274 IF_PAR_DEBUG(verbose,
275 fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE));
276 if(!looks_like_tid(newPE))
277 barf("unacceptable taskID %x\n",newPE);
285 IF_PAR_DEBUG(verbose,
287 belch("++++ [%x] PE table as I see it:", mytid);
288 for (i = 0; i < sentPEs; i++) {
289 belch("++++ allPEs[%d] = %x", i, allPEs[i]);
294 stopPEComms(StgInt n) {
296 /* In case sysman doesn't know about us yet...
297 pvm_initsend(PvmDataDefault);
298 PutArgs(&IAmMainThread,1);
299 pvm_send(SysManTask, PP_READY);
301 sendOp(PP_READY, SysManTask);
304 sendOp2(PP_FINISH, SysManTask, n, n);
305 waitForPEOp(PP_FINISH, SysManTask, NULL);
310 #endif /* PAR -- whole file */
313 //* PendingFetches:: @cindex\s-+PendingFetches
314 //* SynchroniseSystem:: @cindex\s-+SynchroniseSystem
315 //* allPEs:: @cindex\s-+allPEs
316 //* initParallelSystem:: @cindex\s-+initParallelSystem
317 //* nPEs:: @cindex\s-+nPEs
318 //* par_exit:: @cindex\s-+par_exit
319 //* spark queue:: @cindex\s-+spark queue
320 //* sparksIgnored:: @cindex\s-+sparksIgnored