Document UniqSupply
[ghc-hetmet.git] / rts / parallel / ParInit.c
1 /* --------------------------------------------------------------------------
2    Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
3
4    Initialising the parallel RTS
5
6    An extension based on Kevin Hammond's GRAPH for PVM version
7    P. Trinder, January 17th 1995.
8    Adapted for the new RTS
9    P. Trinder, July 1997.
10    H-W. Loidl, November 1999.
11
12    ------------------------------------------------------------------------ */
13
14 #ifdef PAR /* whole file */
15
16 //@menu
17 //* Includes::                  
18 //* Global variables::          
19 //* Initialisation Routines::   
20 //@end menu
21
22 //@node Includes, Global variables
23 //@subsection Includes
24
25 /* Evidently not Posix */
26 /* #include "PosixSource.h" */
27
28 #include <setjmp.h>
29 #include "Rts.h"
30 #include "RtsFlags.h"
31 #include "RtsUtils.h"
32 #include "ParallelRts.h"
33 #include "Sparks.h"
34 #include "LLC.h"
35 #include "HLC.h"
36
37 //@node Global variables, Initialisation Routines, Includes
38 //@subsection Global variables
39
40 /* Global conditions defined here. */
41
42 rtsBool IAmMainThread = rtsFalse;       /* Set for the main thread      */
43
44 /* Task identifiers for various interesting global tasks. */
45
46 GlobalTaskId IOTask = 0,                /* The IO Task Id               */
47              SysManTask = 0,            /* The System Manager Task Id   */
48              mytid = 0;                 /* This PE's Task Id            */
49
50 rtsTime         main_start_time;        /* When the program started     */
51 rtsTime         main_stop_time;         /* When the program finished    */
52 jmp_buf         exit_parallel_system;   /* How to abort from the RTS    */
53
54
55 //rtsBool fishing = rtsFalse;             /* We have no fish out in the stream */
56 rtsTime last_fish_arrived_at = 0;       /* Time of arrival of most recent fish*/
57 nat     outstandingFishes = 0;          /* Number of active fishes */ 
58
59 //@cindex spark queue
60 /* GranSim: a globally visible array of spark queues */
61 rtsSpark *pending_sparks_hd[SPARK_POOLS],  /* ptr to start of a spark pool */ 
62          *pending_sparks_tl[SPARK_POOLS],  /* ptr to end of a spark pool */ 
63          *pending_sparks_lim[SPARK_POOLS],
64          *pending_sparks_base[SPARK_POOLS]; 
65
66 //@cindex spark_limit
67 /* max number of sparks permitted on the PE; 
68    see RtsFlags.ParFlags.maxLocalSparks */
69 nat spark_limit[SPARK_POOLS];
70
71 //@cindex PendingFetches
72 /* A list of fetch reply messages not yet processed; this list is filled
73    by awaken_blocked_queue and processed by processFetches */
74 StgBlockedFetch *PendingFetches = END_BF_QUEUE;
75
76 //@cindex allPEs
77 GlobalTaskId *allPEs;
78
79 //@cindex nPEs
80 nat nPEs = 0;
81
82 //@cindex sparksIgnored
83 nat sparksIgnored = 0, sparksCreated = 0, 
84     threadsIgnored = 0, threadsCreated = 0;
85
86 //@cindex advisory_thread_count
87 nat advisory_thread_count = 0;
88
89 globalAddr theGlobalFromGA;
90
91 /* For flag handling see RtsFlags.h */
92
93 //@node Prototypes
94 //@subsection Prototypes
95
96 /* Needed for FISH messages (initialisation of random number generator) */
97 void srand48 (long);
98 time_t time (time_t *);
99
100 //@node Initialisation Routines,  , Global variables
101 //@subsection Initialisation Routines
102
103 /*
104   par_exit defines how to terminate the program.  If the exit code is
105   non-zero (i.e. an error has occurred), the PE should not halt until
106   outstanding error messages have been processed.  Otherwise, messages
107   might be sent to non-existent Task Ids.  The infinite loop will actually
108   terminate, since STG_Exception will call myexit\tr{(0)} when
109   it received a PP_FINISH from the system manager task.
110 */
111 //@cindex shutdownParallelSystem
112 void
113 shutdownParallelSystem(StgInt n)
114 {
115   /* use the file specified via -S */ 
116   FILE *sf = RtsFlags.GcFlags.statsFile;
117
118   IF_PAR_DEBUG(verbose,
119                if (n==0)
120                  belch("==== entered shutdownParallelSystem ...");
121                else
122                  belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
123                );
124   
125   stopPEComms(n);
126
127 #if 0
128   if (sf!=(FILE*)NULL) 
129     fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored", 
130             (W_) mytid, sparksCreated, sparksIgnored,
131             threadsCreated, threadsIgnored);
132 #endif
133
134   ShutdownEachPEHook();
135 }
136
137 //@cindex initParallelSystem
138 void
139 initParallelSystem(void)
140 {
141   /* Don't buffer standard channels... */
142   setbuf(stdout,NULL);
143   setbuf(stderr,NULL);
144   
145   srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
146                                   /* used to select target of FISH message*/
147   if (!InitPackBuffer())
148     barf("InitPackBuffer");
149
150   if (!initMoreBuffers())
151     barf("initMoreBuffers");
152
153   if (!initSparkPools())
154     barf("initSparkPools");
155 }
156
157 /* 
158  * SynchroniseSystem synchronises the reduction task with the system
159  * manager, and initialises the Global address tables (LAGA & GALA)
160  */
161
162 //@cindex synchroniseSystem
163 void
164 synchroniseSystem(void)
165 {
166   /* Only in debug mode? */
167   fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
168
169   InitEachPEHook();                  /* HWL: hook to be execed on each PE */
170
171   /* Initialize global address tables */
172   initGAtables();
173
174   initParallelSystem();
175   
176   startPEComms();
177 }
178
179 /* 
180   Do the startup stuff (this is PVM specific!).
181   Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
182   Called at the beginning of RtsStartup.startupHaskell
183 */
184 void 
185 startupParallelSystem(char *argv[]) { 
186  mytid = pvm_mytid();           /* Connect to PVM */
187
188  if (*argv[0] == '-') {         /* Look to see whether we're the Main Thread */
189   IAmMainThread = rtsTrue;
190   sscanf(argv[0],"-%0X",&SysManTask);  /* extract SysMan task ID*/      
191   argv++;                              /* Strip off flag argument */
192  } else {
193   SysManTask = pvm_parent();
194  }
195
196  IF_PAR_DEBUG(verbose,
197                fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
198                        mytid, IAmMainThread?"Main":"Remote", SysManTask));
199
200  nPEs = atoi(argv[1]);
201 }
202
203 /* 
204    Exception handler during startup.
205 */
206 void *
207 processUnexpectedMessageDuringStartup(rtsPacket p) {
208   OpCode opCode;
209   GlobalTaskId sender_id;
210
211   getOpcodeAndSender(p, &opCode, &sender_id);
212
213   switch(opCode) { 
214       case PP_FISH:
215         bounceFish();
216         break;
217 #if defined(DIST)
218       case PP_REVAL:
219         bounceReval();
220         break;
221 #endif
222       case PP_FINISH:
223         stg_exit(EXIT_SUCCESS); 
224         break;
225       default:
226         fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
227                 mytid, opCode, getOpName(opCode), sender_id);
228     }
229 }
230
231 void 
232 startPEComms(void){ 
233
234   startUpPE(); 
235   allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
236                                            "(PEs)");
237   
238   /* Send our tid and IAmMainThread flag back to SysMan */
239   sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);  
240   /* Wait until we get the PE-Id table from Sysman */    
241   waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup); 
242
243   IF_PAR_DEBUG(verbose,
244                belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
245
246   /* Digest the PE table we received */
247   processPEtids();
248 }
249
250 void
251 processPEtids(void) { 
252   long newPE;
253   nat i, sentPEs, currentPEs;
254
255   nPEs=0;
256           
257   currentPEs = nPEs;
258
259   IF_PAR_DEBUG(verbose,
260                 belch("==-- processPEtids: starting to iterate over a PVM buffer"));
261   /* ToDo: this has to go into LLComms !!! */
262   GetArgs(&sentPEs,1);
263
264   ASSERT(sentPEs > currentPEs);
265   ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/  
266   
267   for (i = 0; i < sentPEs; i++) { 
268     GetArgs(&newPE,1);
269     if (i<currentPEs) { 
270       ASSERT(newPE == allPEs[i]);
271     } else { 
272 #if defined(DIST)
273       // breaks with PAR && !DEBUG
274       IF_PAR_DEBUG(verbose,
275         fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE)); 
276       if(!looks_like_tid(newPE))
277           barf("unacceptable taskID %x\n",newPE);
278 #endif
279       allPEs[i] = newPE;      
280       nPEs++;
281       registerTask(newPE); 
282     }
283   }
284
285   IF_PAR_DEBUG(verbose,
286                /* debugging */
287                belch("++++ [%x] PE table as I see it:", mytid);
288                for (i = 0; i < sentPEs; i++) { 
289                  belch("++++ allPEs[%d] = %x", i, allPEs[i]);
290                });
291 }
292
293 void 
294 stopPEComms(StgInt n) { 
295   if (n != 0) { 
296     /* In case sysman doesn't know about us yet...
297     pvm_initsend(PvmDataDefault);
298     PutArgs(&IAmMainThread,1);
299     pvm_send(SysManTask, PP_READY);
300      */
301     sendOp(PP_READY, SysManTask);  
302   } 
303   
304   sendOp2(PP_FINISH, SysManTask, n, n);  
305   waitForPEOp(PP_FINISH, SysManTask, NULL);
306   fflush(gr_file);
307   shutDownPE();
308 }
309
310 #endif /* PAR -- whole file */
311
312 //@index
313 //* PendingFetches::  @cindex\s-+PendingFetches
314 //* SynchroniseSystem::  @cindex\s-+SynchroniseSystem
315 //* allPEs::  @cindex\s-+allPEs
316 //* initParallelSystem::  @cindex\s-+initParallelSystem
317 //* nPEs::  @cindex\s-+nPEs
318 //* par_exit::  @cindex\s-+par_exit
319 //* spark queue::  @cindex\s-+spark queue
320 //* sparksIgnored::  @cindex\s-+sparksIgnored
321 //@end index
322