[project @ 2003-10-08 09:42:34 by wolfgang]
[ghc-hetmet.git] / ghc / rts / parallel / ParInit.c
1 /* --------------------------------------------------------------------------
2    Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
3    $Id: ParInit.c,v 1.5 2001/08/14 13:40:10 sewardj Exp $
4
5    Initialising the parallel RTS
6
7    An extension based on Kevin Hammond's GRAPH for PVM version
8    P. Trinder, January 17th 1995.
9    Adapted for the new RTS
10    P. Trinder, July 1997.
11    H-W. Loidl, November 1999.
12
13    ------------------------------------------------------------------------ */
14
15 #ifdef PAR /* whole file */
16
17 //@menu
18 //* Includes::                  
19 //* Global variables::          
20 //* Initialisation Routines::   
21 //@end menu
22
23 //@node Includes, Global variables
24 //@subsection Includes
25
26 /* Evidently not Posix */
27 /* #include "PosixSource.h" */
28
29 #include <setjmp.h>
30 #include "Rts.h"
31 #include "RtsFlags.h"
32 #include "RtsUtils.h"
33 #include "ParallelRts.h"
34 #include "Sparks.h"
35 #include "LLC.h"
36 #include "HLC.h"
37
38 //@node Global variables, Initialisation Routines, Includes
39 //@subsection Global variables
40
41 /* Global conditions defined here. */
42
43 rtsBool IAmMainThread = rtsFalse;       /* Set for the main thread      */
44
45 /* Task identifiers for various interesting global tasks. */
46
47 GlobalTaskId IOTask = 0,                /* The IO Task Id               */
48              SysManTask = 0,            /* The System Manager Task Id   */
49              mytid = 0;                 /* This PE's Task Id            */
50
51 rtsTime         main_start_time;        /* When the program started     */
52 rtsTime         main_stop_time;         /* When the program finished    */
53 jmp_buf         exit_parallel_system;   /* How to abort from the RTS    */
54
55
56 //rtsBool fishing = rtsFalse;             /* We have no fish out in the stream */
57 rtsTime last_fish_arrived_at = 0;       /* Time of arrival of most recent fish*/
58 nat     outstandingFishes = 0;          /* Number of active fishes */ 
59
60 //@cindex spark queue
61 /* GranSim: a globally visible array of spark queues */
62 rtsSpark *pending_sparks_hd[SPARK_POOLS],  /* ptr to start of a spark pool */ 
63          *pending_sparks_tl[SPARK_POOLS],  /* ptr to end of a spark pool */ 
64          *pending_sparks_lim[SPARK_POOLS],
65          *pending_sparks_base[SPARK_POOLS]; 
66
67 //@cindex spark_limit
68 /* max number of sparks permitted on the PE; 
69    see RtsFlags.ParFlags.maxLocalSparks */
70 nat spark_limit[SPARK_POOLS];
71
72 //@cindex PendingFetches
73 /* A list of fetch reply messages not yet processed; this list is filled
74    by awaken_blocked_queue and processed by processFetches */
75 StgBlockedFetch *PendingFetches = END_BF_QUEUE;
76
77 //@cindex allPEs
78 GlobalTaskId *allPEs;
79
80 //@cindex nPEs
81 nat nPEs = 0;
82
83 //@cindex sparksIgnored
84 nat sparksIgnored = 0, sparksCreated = 0, 
85     threadsIgnored = 0, threadsCreated = 0;
86
87 //@cindex advisory_thread_count
88 nat advisory_thread_count = 0;
89
90 globalAddr theGlobalFromGA;
91
92 /* For flag handling see RtsFlags.h */
93
94 //@node Prototypes
95 //@subsection Prototypes
96
97 /* Needed for FISH messages (initialisation of random number generator) */
98 void srand48 (long);
99 time_t time (time_t *);
100
101 //@node Initialisation Routines,  , Global variables
102 //@subsection Initialisation Routines
103
104 /*
105   par_exit defines how to terminate the program.  If the exit code is
106   non-zero (i.e. an error has occurred), the PE should not halt until
107   outstanding error messages have been processed.  Otherwise, messages
108   might be sent to non-existent Task Ids.  The infinite loop will actually
109   terminate, since STG_Exception will call myexit\tr{(0)} when
110   it received a PP_FINISH from the system manager task.
111 */
112 //@cindex shutdownParallelSystem
113 void
114 shutdownParallelSystem(StgInt n)
115 {
116   /* use the file specified via -S */ 
117   FILE *sf = RtsFlags.GcFlags.statsFile;
118
119   IF_PAR_DEBUG(verbose,
120                if (n==0)
121                  belch("==== entered shutdownParallelSystem ...");
122                else
123                  belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
124                );
125   
126   stopPEComms(n);
127
128 #if 0
129   if (sf!=(FILE*)NULL) 
130     fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored", 
131             (W_) mytid, sparksCreated, sparksIgnored,
132             threadsCreated, threadsIgnored);
133 #endif
134
135   ShutdownEachPEHook();
136 }
137
138 //@cindex initParallelSystem
139 void
140 initParallelSystem(void)
141 {
142   /* Don't buffer standard channels... */
143   setbuf(stdout,NULL);
144   setbuf(stderr,NULL);
145   
146   srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
147                                   /* used to select target of FISH message*/
148   if (!InitPackBuffer())
149     barf("InitPackBuffer");
150
151   if (!initMoreBuffers())
152     barf("initMoreBuffers");
153
154   if (!initSparkPools())
155     barf("initSparkPools");
156 }
157
158 /* 
159  * SynchroniseSystem synchronises the reduction task with the system
160  * manager, and initialises the Global address tables (LAGA & GALA)
161  */
162
163 //@cindex synchroniseSystem
164 void
165 synchroniseSystem(void)
166 {
167   /* Only in debug mode? */
168   fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
169
170   InitEachPEHook();                  /* HWL: hook to be execed on each PE */
171
172   /* Initialize global address tables */
173   initGAtables();
174
175   initParallelSystem();
176   
177   startPEComms();
178 }
179
180 /* 
181   Do the startup stuff (this is PVM specific!).
182   Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
183   Called at the beginning of RtsStartup.startupHaskell
184 */
185 void 
186 startupParallelSystem(char *argv[]) { 
187  mytid = pvm_mytid();           /* Connect to PVM */
188
189  if (*argv[0] == '-') {         /* Look to see whether we're the Main Thread */
190   IAmMainThread = rtsTrue;
191   sscanf(argv[0],"-%0X",&SysManTask);  /* extract SysMan task ID*/      
192   argv++;                              /* Strip off flag argument */
193  } else {
194   SysManTask = pvm_parent();
195  }
196
197  IF_PAR_DEBUG(verbose,
198                fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
199                        mytid, IAmMainThread?"Main":"Remote", SysManTask));
200
201  nPEs = atoi(argv[1]);
202 }
203
204 /* 
205    Exception handler during startup.
206 */
207 void *
208 processUnexpectedMessageDuringStartup(rtsPacket p) {
209   OpCode opCode;
210   GlobalTaskId sender_id;
211
212   getOpcodeAndSender(p, &opCode, &sender_id);
213
214   switch(opCode) { 
215       case PP_FISH:
216         bounceFish();
217         break;
218 #if defined(DIST)
219       case PP_REVAL:
220         bounceReval();
221         break;
222 #endif
223       case PP_FINISH:
224         stg_exit(EXIT_SUCCESS); 
225         break;
226       default:
227         fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
228                 mytid, opCode, getOpName(opCode), sender_id);
229     }
230 }
231
232 void 
233 startPEComms(void){ 
234
235   startUpPE(); 
236   allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
237                                            "(PEs)");
238   
239   /* Send our tid and IAmMainThread flag back to SysMan */
240   sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);  
241   /* Wait until we get the PE-Id table from Sysman */    
242   waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup); 
243
244   IF_PAR_DEBUG(verbose,
245                belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
246
247   /* Digest the PE table we received */
248   processPEtids();
249 }
250
251 void
252 processPEtids(void) { 
253   long newPE;
254   nat i, sentPEs, currentPEs;
255
256   nPEs=0;
257           
258   currentPEs = nPEs;
259
260   IF_PAR_DEBUG(verbose,
261                 belch("==-- processPEtids: starting to iterate over a PVM buffer"));
262   /* ToDo: this has to go into LLComms !!! */
263   GetArgs(&sentPEs,1);
264
265   ASSERT(sentPEs > currentPEs);
266   ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/  
267   
268   for (i = 0; i < sentPEs; i++) { 
269     GetArgs(&newPE,1);
270     if (i<currentPEs) { 
271       ASSERT(newPE == allPEs[i]);
272     } else { 
273 #if defined(DIST)
274       // breaks with PAR && !DEBUG
275       IF_PAR_DEBUG(verbose,
276         fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE)); 
277       if(!looks_like_tid(newPE))
278           barf("unacceptable taskID %x\n",newPE);
279 #endif
280       allPEs[i] = newPE;      
281       nPEs++;
282       registerTask(newPE); 
283     }
284   }
285
286   IF_PAR_DEBUG(verbose,
287                /* debugging */
288                belch("++++ [%x] PE table as I see it:", mytid);
289                for (i = 0; i < sentPEs; i++) { 
290                  belch("++++ allPEs[%d] = %x", i, allPEs[i]);
291                });
292 }
293
294 void 
295 stopPEComms(StgInt n) { 
296   if (n != 0) { 
297     /* In case sysman doesn't know about us yet...
298     pvm_initsend(PvmDataDefault);
299     PutArgs(&IAmMainThread,1);
300     pvm_send(SysManTask, PP_READY);
301      */
302     sendOp(PP_READY, SysManTask);  
303   } 
304   
305   sendOp2(PP_FINISH, SysManTask, n, n);  
306   waitForPEOp(PP_FINISH, SysManTask, NULL);
307   fflush(gr_file);
308   shutDownPE();
309 }
310
311 #endif /* PAR -- whole file */
312
313 //@index
314 //* PendingFetches::  @cindex\s-+PendingFetches
315 //* SynchroniseSystem::  @cindex\s-+SynchroniseSystem
316 //* allPEs::  @cindex\s-+allPEs
317 //* initParallelSystem::  @cindex\s-+initParallelSystem
318 //* nPEs::  @cindex\s-+nPEs
319 //* par_exit::  @cindex\s-+par_exit
320 //* spark queue::  @cindex\s-+spark queue
321 //* sparksIgnored::  @cindex\s-+sparksIgnored
322 //@end index
323