1 /* --------------------------------------------------------------------------
2 Time-stamp: <Fri Mar 24 2000 17:42:24 Stardate: [-30]4553.68 hwloidl>
3 $Id: ParInit.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
5 Initialising the parallel RTS
7 An extension based on Kevin Hammond's GRAPH for PVM version
8 P. Trinder, January 17th 1995.
9 Adapted for the new RTS
10 P. Trinder, July 1997.
11 H-W. Loidl, November 1999.
13 ------------------------------------------------------------------------ */
15 #ifdef PAR /* whole file */
17 #define NON_POSIX_SOURCE /* so says Solaris */
21 //* Global variables::
22 //* Initialisation Routines::
25 //@node Includes, Global variables
26 //@subsection Includes
31 #include "ParallelRts.h"
36 //@node Global variables, Initialisation Routines, Includes
37 //@subsection Global variables
39 /* Global conditions defined here. */
41 rtsBool IAmMainThread = rtsFalse, /* Set for the main thread */
42 GlobalStopPending = rtsFalse; /* Terminating */
44 /* Task identifiers for various interesting global tasks. */
46 GlobalTaskId IOTask = 0, /* The IO Task Id */
47 SysManTask = 0, /* The System Manager Task Id */
48 mytid = 0; /* This PE's Task Id */
50 rtsTime main_start_time; /* When the program started */
51 rtsTime main_stop_time; /* When the program finished */
52 jmp_buf exit_parallel_system; /* How to abort from the RTS */
55 //rtsBool fishing = rtsFalse; /* We have no fish out in the stream */
56 rtsTime last_fish_arrived_at = 0; /* Time of arrival of most recent fish*/
57 nat outstandingFishes = 0; /* Number of active fishes */
60 /* GranSim: a globally visible array of spark queues */
61 rtsSpark *pending_sparks_hd[SPARK_POOLS], /* ptr to start of a spark pool */
62 *pending_sparks_tl[SPARK_POOLS], /* ptr to end of a spark pool */
63 *pending_sparks_lim[SPARK_POOLS],
64 *pending_sparks_base[SPARK_POOLS];
67 /* max number of sparks permitted on the PE;
68 see RtsFlags.ParFlags.maxLocalSparks */
69 nat spark_limit[SPARK_POOLS];
71 //@cindex PendingFetches
72 /* A list of fetch reply messages not yet processed; this list is filled
73 by awaken_blocked_queue and processed by processFetches */
74 StgBlockedFetch *PendingFetches = END_BF_QUEUE;
82 //@cindex sparksIgnored
83 nat sparksIgnored = 0, sparksCreated = 0,
84 threadsIgnored = 0, threadsCreated = 0;
86 //@cindex advisory_thread_count
87 nat advisory_thread_count = 0;
89 /* For flag handling see RtsFlags.h */
92 //@subsection Prototypes
94 /* Needed for FISH messages (initialisation of random number generator) */
96 time_t time (time_t *);
98 //@node Initialisation Routines, , Global variables
99 //@subsection Initialisation Routines
102 par_exit defines how to terminate the program. If the exit code is
103 non-zero (i.e. an error has occurred), the PE should not halt until
104 outstanding error messages have been processed. Otherwise, messages
105 might be sent to non-existent Task Ids. The infinite loop will actually
106 terminate, since STG_Exception will call myexit\tr{(0)} when
107 it received a PP_FINISH from the system manager task.
111 shutdownParallelSystem(StgInt n)
113 belch("==== entered shutdownParallelSystem ...");
114 ASSERT(GlobalStopPending = rtsTrue);
115 sendOp(PP_FINISH, SysManTask);
117 waitForTermination();
119 waitForPEOp(PP_FINISH, SysManTask);
121 IF_PAR_DEBUG(verbose,
122 belch("--++ shutting down PE %lx, %ld sparks created, %ld sparks Ignored, %ld threads created, %ld threads Ignored",
123 (W_) mytid, sparksCreated, sparksIgnored,
124 threadsCreated, threadsIgnored));
129 //@cindex initParallelSystem
131 initParallelSystem(void)
133 /* Don't buffer standard channels... */
137 srand48(time(NULL) * getpid()); /*Initialise Random-number generator seed*/
138 /* Used to select target of FISH message*/
140 if (!InitPackBuffer())
141 barf("InitPackBuffer");
143 if (!initMoreBuffers())
144 barf("initMoreBuffers");
146 if (!initSparkPools())
147 barf("initSparkPools");
151 * SynchroniseSystem synchronises the reduction task with the system
152 * manager, and initialises the Global address tables (LAGA & GALA)
155 //@cindex SynchroniseSystem
157 SynchroniseSystem(void)
161 fprintf(stderr, "==== SynchroniseSystem: nPEs=%d\n", nPEs);
163 initEachPEHook(); /* HWL: hook to be execed on each PE */
165 fprintf(stderr, "==== SynchroniseSystem: initParallelSystem\n");
166 initParallelSystem();
167 allPEs = startUpPE(nPEs);
169 /* Initialize global address tables */
172 /* Record the shortened the PE identifiers for LAGA etc. tables */
173 for (i = 0; i < nPEs; ++i) {
174 fprintf(stderr, "==== [%x] registering %d-th PE as %x\n", mytid, i, allPEs[i]);
175 registerTask(allPEs[i]);
179 #endif /* PAR -- whole file */
182 //* PendingFetches:: @cindex\s-+PendingFetches
183 //* SynchroniseSystem:: @cindex\s-+SynchroniseSystem
184 //* allPEs:: @cindex\s-+allPEs
185 //* initParallelSystem:: @cindex\s-+initParallelSystem
186 //* nPEs:: @cindex\s-+nPEs
187 //* par_exit:: @cindex\s-+par_exit
188 //* spark queue:: @cindex\s-+spark queue
189 //* sparksIgnored:: @cindex\s-+sparksIgnored