Fix a couple of issues with :print
[ghc-hetmet.git] / rts / parallel / LLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Mon Mar 19 2001 22:10:38 Stardate: [-30]6354.62 hwloidl>
3  *
4  * GUM Low-Level Inter-Task Communication
5  *
6  * This module defines PVM Routines for PE-PE  communication.
7  *
8  * P. Trinder, December 5th. 1994.
9  * P. Trinder, July 1998
10  * H-W. Loidl, November 1999 -
11  --------------------------------------------------------------------------- */
12
13 #ifdef PAR /* whole file */
14
15 //@node GUM Low-Level Inter-Task Communication, , ,
16 //@section GUM Low-Level Inter-Task Communication
17
18 /*
19  *This module defines the routines which communicate between PEs.  The
20  *code is based on Kevin Hammond's GRIP RTS. (OpCodes.h defines
21  *PEOp1 etc. in terms of sendOp1 etc.).  
22  *
23  *Routine       &       Arguments 
24  *              &               
25  *sendOp        &       0                       \\
26  *sendOp1       &       1                       \\
27  *sendOp2       &       2                       \\
28  *sendOpN       &       vector                  \\
29  *sendOpV       &       variable                \\
30  *sendOpNV      &       variable+ vector        \\
31  *
32  *First the standard include files.
33  */
34
35 //@menu
36 //* Macros etc::                
37 //* Includes::                  
38 //* Auxiliary functions::       
39 //* Index::                     
40 //@end menu
41
42 //@node Macros etc, Includes, GUM Low-Level Inter-Task Communication, GUM Low-Level Inter-Task Communication
43 //@subsection Macros etc
44
45 /* Evidently not Posix */
46 /* #include "PosixSource.h" */
47
48 #define UNUSED           /* nothing */
49
50 //@node Includes, Auxiliary functions, Macros etc, GUM Low-Level Inter-Task Communication
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "RtsFlags.h"
55 #include "RtsUtils.h"
56 #include "Parallel.h"
57 #include "ParallelRts.h"
58 #if defined(DEBUG)
59 # include "ParallelDebug.h"
60 #endif
61 #include "LLC.h"
62
63 #ifdef __STDC__
64 #include <stdarg.h>
65 #else
66 #include <varargs.h>
67 #endif
68
69 /* Cannot use std macro when compiling for SysMan */
70 /* debugging enabled */
71 // #define IF_PAR_DEBUG(c,s)  { s; }
72 /* debugging disabled */
73 #define IF_PAR_DEBUG(c,s)  /* nothing */
74
75 //@node Auxiliary functions, Index, Includes, GUM Low-Level Inter-Task Communication
76 //@subsection Auxiliary functions
77
78 /*
79  * heapChkCounter tracks the number of heap checks since the last probe.
80  * Not currently used! We check for messages when a thread is resheduled.
81  */
82 int heapChkCounter = 0;
83
84 /*
85  * Then some miscellaneous functions. 
86  * getOpName returns the character-string name of any OpCode.
87  */
88
89 char *UserPEOpNames[] = { PEOP_NAMES };
90
91 //@cindex getOpName
92 char *
93 getOpName(nat op)
94 {
95     if (op >= MIN_PEOPS && op <= MAX_PEOPS)
96         return (UserPEOpNames[op - MIN_PEOPS]);
97     else
98         return ("Unknown PE OpCode");
99 }
100
101 /*
102  * traceSendOp handles the tracing of messages. 
103  */
104
105 //@cindex traceSendOp
106 static void
107 traceSendOp(OpCode op, GlobalTaskId dest UNUSED,
108              unsigned int data1 UNUSED, unsigned int data2 UNUSED)
109 {
110     char *OpName;
111
112     OpName = getOpName(op);
113     IF_PAR_DEBUG(trace,
114                  fprintf(stderr," %s [%x,%x] sent from %x to %x", 
115                        OpName, data1, data2, mytid, dest));
116 }
117
118 /*
119  * sendOp sends a 0-argument message with OpCode {\em op} to
120  * the global task {\em task}.
121  */
122
123 //@cindex sendOp
124 void
125 sendOp(OpCode op, GlobalTaskId task)
126 {
127     traceSendOp(op, task,0,0);
128
129     pvm_initsend(PvmDataRaw);
130     pvm_send(task, op);
131 }
132
133 /*
134  * sendOp1 sends a 1-argument message with OpCode {\em op}
135  * to the global task {\em task}.
136  */
137
138 //@cindex sendOp1
139 void
140 sendOp1(OpCode op, GlobalTaskId task, StgWord arg1)
141 {
142     traceSendOp(op, task, arg1,0);
143
144     pvm_initsend(PvmDataRaw);
145     PutArg1(arg1);
146     pvm_send(task, op);
147 }
148
149
150 /*
151  * sendOp2 is used by the FP code only. 
152  */
153
154 //@cindex sendOp2
155 void
156 sendOp2(OpCode op, GlobalTaskId task, StgWord arg1, StgWord arg2)
157 {
158     traceSendOp(op, task, arg1, arg2);
159
160     pvm_initsend(PvmDataRaw);
161     PutArg1(arg1);
162     PutArg2(arg2);
163     pvm_send(task, op);
164 }
165
166 /*
167  *
168  * sendOpV takes a variable number of arguments, as specified by {\em n}.  
169  * For example,
170  *
171  *    sendOpV( PP_STATS, StatsTask, 3, start_time, stop_time, sparkcount);
172  */
173
174 //@cindex sendOpV
175 void
176 sendOpV(OpCode op, GlobalTaskId task, int n, ...)
177 {
178     va_list ap;
179     int i;
180     StgWord arg;
181
182     va_start(ap, n);
183
184     traceSendOp(op, task, 0, 0);
185
186     pvm_initsend(PvmDataRaw);
187
188     for (i = 0; i < n; ++i) {
189         arg = va_arg(ap, StgWord);
190         PutArgN(i, arg);
191     }
192     va_end(ap);
193
194     pvm_send(task, op);
195 }
196
197 /*    
198  *
199  * sendOpNV takes a variable-size datablock, as specified by {\em
200  * nelem} and a variable number of arguments, as specified by {\em
201  * narg}. N.B. The datablock and the additional arguments are contiguous
202  * and are copied over together.  For example,
203  *
204  *        sendOpNV(PP_RESUME, tsoga.pe, 6, nelem, data,
205  *          (W_) ga.weight, (W_) ga.loc.gc.gtid, (W_) ga.loc.gc.slot, 
206  *          (W_) tsoga.weight, (W_) tsoga.loc.gc.gtid, (W_) tsoga.loc.gc.slot);
207  *
208  * Important: The variable arguments must all be StgWords.
209
210  sendOpNV(_, tid, m, n, data, x1, ..., xm):
211
212                          |   n elems
213      +------------------------------
214      | x1 | ... | xm | n | data ....
215      +------------------------------
216  */
217
218 //@cindex sendOpNV
219 void
220 sendOpNV(OpCode op, GlobalTaskId task, int nelem, 
221          StgWord *datablock, int narg, ...)
222 {
223     va_list ap;
224     int i;
225     StgWord arg;
226
227     va_start(ap, narg);
228
229     traceSendOp(op, task, 0, 0);
230     IF_PAR_DEBUG(trace,
231                  fprintf(stderr,"~~ sendOpNV: op = %x (%s), task = %x, narg = %d, nelem = %d",
232                        op, getOpName(op), task, narg, nelem));
233
234     pvm_initsend(PvmDataRaw);
235
236     for (i = 0; i < narg; ++i) {
237         arg = va_arg(ap, StgWord);
238         IF_PAR_DEBUG(trace,
239                      fprintf(stderr,"~~ sendOpNV: arg = %d\n",arg));
240         PutArgN(i, arg);
241     }
242     arg = (StgWord) nelem;
243     PutArgN(narg, arg);
244
245 /*  for (i=0; i < nelem; ++i) fprintf(stderr, "%d ",datablock[i]); */
246 /*  fprintf(stderr," in sendOpNV\n");*/
247
248     PutArgs(datablock, nelem);
249     va_end(ap);
250
251     pvm_send(task, op);
252 }
253
254 /*    
255  * sendOpN take a variable size array argument, whose size is given by
256  * {\em n}.  For example,
257  *
258  *    sendOpN( PP_STATS, StatsTask, 3, stats_array);
259  */
260
261 //@cindex sendOpN
262 void
263 sendOpN(OpCode op, GlobalTaskId task, int n, StgPtr args)
264 {
265     long arg;
266
267     traceSendOp(op, task, 0, 0);
268
269     pvm_initsend(PvmDataRaw);
270     arg = (long) n;
271     PutArgN(0, arg);
272     PutArgs(args, n);
273     pvm_send(task, op);
274 }
275
276 /*    
277  * broadcastOpN is as sendOpN but broadcasts to all members of a group.
278  */
279
280 void
281 broadcastOpN(OpCode op, char *group, int n, StgPtr args)
282 {
283   long arg;
284
285   //traceSendOp(op, task, 0, 0);
286   
287   pvm_initsend(PvmDataRaw);
288   arg = (long) n;
289   PutArgN(0, arg);
290   PutArgs(args, n);
291   pvm_bcast(group, op);
292 }
293
294 /*
295    waitForPEOp waits for a packet from global task who with the
296    OpCode op.  If ignore is true all other messages are simply ignored; 
297    otherwise they are handled by processUnexpected.
298  */
299 //@cindex waitForPEOp
300 rtsPacket 
301 waitForPEOp(OpCode op, GlobalTaskId who, void(*processUnexpected)(rtsPacket) )
302 {
303   rtsPacket p;
304   int nbytes;
305   OpCode opCode;
306   GlobalTaskId sender_id;
307   rtsBool match;
308
309   IF_PAR_DEBUG(verbose,
310                fprintf(stderr,"~~ waitForPEOp: expecting op = %x (%s), who = [%x]\n", 
311                        op, getOpName(op), who)); 
312
313   do {
314     while((p = pvm_recv(ANY_TASK,ANY_OPCODE)) < 0)
315       pvm_perror("waitForPEOp: Waiting for PEOp");
316       
317     pvm_bufinfo( p, &nbytes, &opCode, &sender_id );
318     match = (op == ANY_OPCODE || op == opCode) && 
319             (who == ANY_TASK || who == sender_id);
320
321     if (match) {
322       IF_PAR_DEBUG(verbose,
323                    fprintf(stderr,
324                            "~~waitForPEOp: Qapla! received: OpCode = %#x (%s), sender_id = [%x]",
325                            opCode, getOpName(opCode), sender_id)); 
326
327       return(p);
328     }
329
330     /* Handle the unexpected OpCodes */
331     if (processUnexpected!=NULL) {
332       (*processUnexpected)(p);
333     } else {
334       IF_PAR_DEBUG(verbose,
335                    fprintf(stderr,
336                            "~~ waitForPEOp: ignoring OpCode = %#x (%s), sender_id = [%x]",
337                            opCode, getOpName(opCode), sender_id)); 
338     }
339
340   } while(rtsTrue);
341 }
342
343 /*
344   processUnexpected processes unexpected messages. If the message is a
345   FINISH it exits the prgram, and PVM gracefully
346  */
347 //@cindex processUnexpectedMessage
348 void
349 processUnexpectedMessage(rtsPacket packet) {
350     OpCode opCode = getOpcode(packet);
351
352     IF_PAR_DEBUG(verbose,
353                  GlobalTaskId sender = senderTask(packet); 
354                  fprintf(stderr,"~~ [%x] processUnexpected: Received %x (%s), sender %x\n",
355                        mytid, opCode, getOpName(opCode), sender)); 
356
357     switch (opCode) {
358     case PP_FINISH:
359         stg_exit(EXIT_SUCCESS);
360         break;
361
362       /* Anything we're not prepared to deal with.  Note that ALL OpCodes
363          are discarded during termination -- this helps prevent bizarre
364          race conditions.  */
365       default:
366         // if (!GlobalStopPending) 
367         {
368           GlobalTaskId errorTask;
369           OpCode opCode;
370
371           getOpcodeAndSender(packet, &opCode, &errorTask);
372           fprintf(stderr,"== Task %x: Unexpected OpCode %x from %x in processUnexpected",
373                 mytid, opCode, errorTask );
374             
375           stg_exit(EXIT_FAILURE);
376         }
377     }
378 }
379
380 //@cindex getOpcode
381 OpCode 
382 getOpcode(rtsPacket p)
383 {
384   int nbytes;
385   OpCode OpCode;
386   GlobalTaskId sender_id;
387   /* read PVM buffer */
388   pvm_bufinfo(p, &nbytes, &OpCode, &sender_id);
389   /* return tag of the buffer as opcode */
390   return(OpCode);
391 }
392
393 //@cindex getOpcodeAndSender
394 void
395 getOpcodeAndSender(rtsPacket p, OpCode *opCodep, GlobalTaskId *senderIdp)
396 {
397   int nbytes;
398   /* read PVM buffer */
399   pvm_bufinfo(p, &nbytes, opCodep, senderIdp);
400 }
401
402 //@cindex senderTask
403 GlobalTaskId
404 senderTask(rtsPacket p)
405 {
406   int nbytes;
407   OpCode opCode;
408   GlobalTaskId sender_id;
409   /* read PVM buffer */
410   pvm_bufinfo(p, &nbytes, &opCode, &sender_id);
411   return(sender_id);
412 }
413
414 /*
415  * startUpPE does the low-level comms specific startup stuff for a
416  * PE. It initialises the comms system, joins the appropriate groups
417  * allocates the PE buffer
418  */
419
420 //@cindex startUpPE
421 void
422 startUpPE(void)
423
424   mytid = _my_gtid;     /* Initialise PVM and get task id into global var.*/
425   
426   IF_PAR_DEBUG(verbose,
427                fprintf(stderr,"== [%x] PEStartup: Task id = [%x], No. PEs = %d \n", 
428                        mytid, mytid, nPEs));
429   checkComms(pvm_joingroup(PEGROUP), "PEStartup");
430   IF_PAR_DEBUG(verbose,
431                fprintf(stderr,"== [%x] PEStartup: Joined PEGROUP\n", mytid));
432 }
433
434 /*
435  * PEShutdown does the low-level comms-specific shutdown stuff for a
436  * single PE. It leaves the groups and then exits from pvm.
437  */
438 //@cindex shutDownPE
439 void
440 shutDownPE(void)
441 {    
442   IF_PAR_DEBUG(verbose,
443                fprintf(stderr, "== [%x] PEshutdown\n", mytid));
444
445   checkComms(pvm_lvgroup(PEGROUP),"PEShutDown");
446   checkComms(pvm_exit(),"PEShutDown");
447 }
448
449 /* 
450    Extract the exit code out of a PP_FINISH packet (used in SysMan)
451 */
452 int
453 getExitCode(int nbytes, GlobalTaskId *sender_idp) {
454   int exitCode=0;
455
456   if (nbytes==4) {               // Notification from a task doing pvm_exit
457     GetArgs(sender_idp,1);       // Presumably this must be MainPE Id
458     exitCode = -1;
459   } else if (nbytes==8) {        // Doing a controlled shutdown
460     GetArgs(&exitCode,1);        // HACK: controlled shutdown == 2 values
461     GetArgs(&exitCode,1);
462   } else {
463     exitCode = -2;               // everything else
464   }
465   return exitCode;
466 }
467
468 #endif /* PAR -- whole file */
469
470 //@node Index,  , Auxiliary functions, GUM Low-Level Inter-Task Communication
471 //@subsection Index
472
473 //@index
474 //* getOpName::  @cindex\s-+getOpName
475 //* traceSendOp::  @cindex\s-+traceSendOp
476 //* sendOp::  @cindex\s-+sendOp
477 //* sendOp1::  @cindex\s-+sendOp1
478 //* sendOp2::  @cindex\s-+sendOp2
479 //* sendOpV::  @cindex\s-+sendOpV
480 //* sendOpNV::  @cindex\s-+sendOpNV
481 //* sendOpN::  @cindex\s-+sendOpN
482 //* waitForPEOp::  @cindex\s-+waitForPEOp
483 //* processUnexpectedMessage::  @cindex\s-+processUnexpectedMessage
484 //* getOpcode::  @cindex\s-+getOpcode
485 //* getOpcodeAndSender::  @cindex\s-+getOpcodeAndSender
486 //* senderTask::  @cindex\s-+senderTask
487 //* startUpPE::  @cindex\s-+startUpPE
488 //* shutDownPE::  @cindex\s-+shutDownPE
489 //@end index