1 /* --------------------------------------------------------------------------
2 Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
3 $Id: ParInit.c,v 1.5 2001/08/14 13:40:10 sewardj Exp $
5 Initialising the parallel RTS
7 An extension based on Kevin Hammond's GRAPH for PVM version
8 P. Trinder, January 17th 1995.
9 Adapted for the new RTS
10 P. Trinder, July 1997.
11 H-W. Loidl, November 1999.
13 ------------------------------------------------------------------------ */
15 #ifdef PAR /* whole file */
19 //* Global variables::
20 //* Initialisation Routines::
23 //@node Includes, Global variables
24 //@subsection Includes
26 /* Evidently not Posix */
27 /* #include "PosixSource.h" */
33 #include "ParallelRts.h"
38 //@node Global variables, Initialisation Routines, Includes
39 //@subsection Global variables
41 /* Global conditions defined here. */
43 rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */
45 /* Task identifiers for various interesting global tasks. */
47 GlobalTaskId IOTask = 0, /* The IO Task Id */
48 SysManTask = 0, /* The System Manager Task Id */
49 mytid = 0; /* This PE's Task Id */
51 rtsTime main_start_time; /* When the program started */
52 rtsTime main_stop_time; /* When the program finished */
53 jmp_buf exit_parallel_system; /* How to abort from the RTS */
56 //rtsBool fishing = rtsFalse; /* We have no fish out in the stream */
57 rtsTime last_fish_arrived_at = 0; /* Time of arrival of most recent fish*/
58 nat outstandingFishes = 0; /* Number of active fishes */
61 /* GranSim: a globally visible array of spark queues */
62 rtsSpark *pending_sparks_hd[SPARK_POOLS], /* ptr to start of a spark pool */
63 *pending_sparks_tl[SPARK_POOLS], /* ptr to end of a spark pool */
64 *pending_sparks_lim[SPARK_POOLS],
65 *pending_sparks_base[SPARK_POOLS];
68 /* max number of sparks permitted on the PE;
69 see RtsFlags.ParFlags.maxLocalSparks */
70 nat spark_limit[SPARK_POOLS];
72 //@cindex PendingFetches
73 /* A list of fetch reply messages not yet processed; this list is filled
74 by awaken_blocked_queue and processed by processFetches */
75 StgBlockedFetch *PendingFetches = END_BF_QUEUE;
83 //@cindex sparksIgnored
84 nat sparksIgnored = 0, sparksCreated = 0,
85 threadsIgnored = 0, threadsCreated = 0;
87 //@cindex advisory_thread_count
88 nat advisory_thread_count = 0;
90 globalAddr theGlobalFromGA;
92 /* For flag handling see RtsFlags.h */
95 //@subsection Prototypes
97 /* Needed for FISH messages (initialisation of random number generator) */
99 time_t time (time_t *);
101 //@node Initialisation Routines, , Global variables
102 //@subsection Initialisation Routines
105 par_exit defines how to terminate the program. If the exit code is
106 non-zero (i.e. an error has occurred), the PE should not halt until
107 outstanding error messages have been processed. Otherwise, messages
108 might be sent to non-existent Task Ids. The infinite loop will actually
109 terminate, since STG_Exception will call myexit\tr{(0)} when
110 it received a PP_FINISH from the system manager task.
112 //@cindex shutdownParallelSystem
114 shutdownParallelSystem(StgInt n)
116 /* use the file specified via -S */
117 FILE *sf = RtsFlags.GcFlags.statsFile;
119 IF_PAR_DEBUG(verbose,
121 belch("==== entered shutdownParallelSystem ...");
123 belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
130 fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored",
131 (W_) mytid, sparksCreated, sparksIgnored,
132 threadsCreated, threadsIgnored);
135 ShutdownEachPEHook();
138 //@cindex initParallelSystem
140 initParallelSystem(void)
142 /* Don't buffer standard channels... */
146 srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
147 /* used to select target of FISH message*/
148 if (!InitPackBuffer())
149 barf("InitPackBuffer");
151 if (!initMoreBuffers())
152 barf("initMoreBuffers");
154 if (!initSparkPools())
155 barf("initSparkPools");
159 * SynchroniseSystem synchronises the reduction task with the system
160 * manager, and initialises the Global address tables (LAGA & GALA)
163 //@cindex synchroniseSystem
165 synchroniseSystem(void)
167 /* Only in debug mode? */
168 fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
170 InitEachPEHook(); /* HWL: hook to be execed on each PE */
172 /* Initialize global address tables */
175 initParallelSystem();
181 Do the startup stuff (this is PVM specific!).
182 Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
183 Called at the beginning of RtsStartup.startupHaskell
186 startupParallelSystem(char *argv[]) {
187 mytid = pvm_mytid(); /* Connect to PVM */
189 if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */
190 IAmMainThread = rtsTrue;
191 sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/
192 argv++; /* Strip off flag argument */
194 SysManTask = pvm_parent();
197 IF_PAR_DEBUG(verbose,
198 fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
199 mytid, IAmMainThread?"Main":"Remote", SysManTask));
201 nPEs = atoi(argv[1]);
205 Exception handler during startup.
208 processUnexpectedMessageDuringStartup(rtsPacket p) {
210 GlobalTaskId sender_id;
212 getOpcodeAndSender(p, &opCode, &sender_id);
224 stg_exit(EXIT_SUCCESS);
227 fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
228 mytid, opCode, getOpName(opCode), sender_id);
236 allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
239 /* Send our tid and IAmMainThread flag back to SysMan */
240 sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);
241 /* Wait until we get the PE-Id table from Sysman */
242 waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup);
244 IF_PAR_DEBUG(verbose,
245 belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
247 /* Digest the PE table we received */
252 processPEtids(void) {
254 nat i, sentPEs, currentPEs;
260 IF_PAR_DEBUG(verbose,
261 belch("==-- processPEtids: starting to iterate over a PVM buffer"));
262 /* ToDo: this has to go into LLComms !!! */
265 ASSERT(sentPEs > currentPEs);
266 ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/
268 for (i = 0; i < sentPEs; i++) {
271 ASSERT(newPE == allPEs[i]);
274 // breaks with PAR && !DEBUG
275 IF_PAR_DEBUG(verbose,
276 fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE));
277 if(!looks_like_tid(newPE))
278 barf("unacceptable taskID %x\n",newPE);
286 IF_PAR_DEBUG(verbose,
288 belch("++++ [%x] PE table as I see it:", mytid);
289 for (i = 0; i < sentPEs; i++) {
290 belch("++++ allPEs[%d] = %x", i, allPEs[i]);
295 stopPEComms(StgInt n) {
297 /* In case sysman doesn't know about us yet...
298 pvm_initsend(PvmDataDefault);
299 PutArgs(&IAmMainThread,1);
300 pvm_send(SysManTask, PP_READY);
302 sendOp(PP_READY, SysManTask);
305 sendOp2(PP_FINISH, SysManTask, n, n);
306 waitForPEOp(PP_FINISH, SysManTask, NULL);
311 #endif /* PAR -- whole file */
314 //* PendingFetches:: @cindex\s-+PendingFetches
315 //* SynchroniseSystem:: @cindex\s-+SynchroniseSystem
316 //* allPEs:: @cindex\s-+allPEs
317 //* initParallelSystem:: @cindex\s-+initParallelSystem
318 //* nPEs:: @cindex\s-+nPEs
319 //* par_exit:: @cindex\s-+par_exit
320 //* spark queue:: @cindex\s-+spark queue
321 //* sparksIgnored:: @cindex\s-+sparksIgnored