0fe8b61b81665be44e5c39d79ffe3eefcea0606f
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 #include "Trace.h"
18 #include "Prelude.h"
19
20 #include "SMP.h" // for cas
21
22 #include "Sparks.h"
23
24 #if defined(THREADED_RTS)
25
26 void
27 initSparkPools( void )
28 {
29     /* walk over the capabilities, allocating a spark pool for each one */
30     nat i;
31     for (i = 0; i < n_capabilities; i++) {
32       capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
33     }
34 }
35
36 void
37 freeSparkPool (SparkPool *pool)
38 {
39     freeWSDeque(pool);
40 }
41
42 /* -----------------------------------------------------------------------------
43  * 
44  * Turn a spark into a real thread
45  *
46  * -------------------------------------------------------------------------- */
47
48 void
49 createSparkThread (Capability *cap)
50 {
51     StgTSO *tso;
52
53     tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
54                           &base_GHCziConc_runSparks_closure);
55
56     postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id);
57
58     appendToRunQueue(cap,tso);
59 }
60
61 /* --------------------------------------------------------------------------
62  * newSpark: create a new spark, as a result of calling "par"
63  * Called directly from STG.
64  * -------------------------------------------------------------------------- */
65
66 StgInt
67 newSpark (StgRegTable *reg, StgClosure *p)
68 {
69     Capability *cap = regTableToCapability(reg);
70     SparkPool *pool = cap->sparks;
71
72     /* I am not sure whether this is the right thing to do.
73      * Maybe it is better to exploit the tag information
74      * instead of throwing it away?
75      */
76     p = UNTAG_CLOSURE(p);
77
78     if (closure_SHOULD_SPARK(p)) {
79         pushWSDeque(pool,p);
80     }   
81
82     cap->sparks_created++;
83
84     postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0);
85
86     return 1;
87 }
88
89 /* -----------------------------------------------------------------------------
90  * 
91  * tryStealSpark: try to steal a spark from a Capability.
92  *
93  * Returns a valid spark, or NULL if the pool was empty, and can
94  * occasionally return NULL if there was a race with another thread
95  * stealing from the same pool.  In this case, try again later.
96  *
97  -------------------------------------------------------------------------- */
98
99 StgClosure *
100 tryStealSpark (Capability *cap)
101 {
102   SparkPool *pool = cap->sparks;
103   StgClosure *stolen;
104
105   do { 
106       stolen = stealWSDeque_(pool); 
107       // use the no-loopy version, stealWSDeque_(), since if we get a
108       // spurious NULL here the caller may want to try stealing from
109       // other pools before trying again.
110   } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
111
112   return stolen;
113 }
114
115 /* --------------------------------------------------------------------------
116  * Remove all sparks from the spark queues which should not spark any
117  * more.  Called after GC. We assume exclusive access to the structure
118  * and replace  all sparks in the queue, see explanation below. At exit,
119  * the spark pool only contains sparkable closures.
120  * -------------------------------------------------------------------------- */
121
122 void
123 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
124
125     SparkPool *pool;
126     StgClosurePtr spark, tmp, *elements;
127     nat n, pruned_sparks; // stats only
128     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
129     const StgInfoTable *info;
130     
131     PAR_TICKY_MARK_SPARK_QUEUE_START();
132     
133     n = 0;
134     pruned_sparks = 0;
135     
136     pool = cap->sparks;
137     
138     // it is possible that top > bottom, indicating an empty pool.  We
139     // fix that here; this is only necessary because the loop below
140     // assumes it.
141     if (pool->top > pool->bottom)
142         pool->top = pool->bottom;
143
144     // Take this opportunity to reset top/bottom modulo the size of
145     // the array, to avoid overflow.  This is only possible because no
146     // stealing is happening during GC.
147     pool->bottom  -= pool->top & ~pool->moduloSize;
148     pool->top     &= pool->moduloSize;
149     pool->topBound = pool->top;
150
151     debugTrace(DEBUG_sched,
152                "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
153                sparkPoolSize(pool), pool->bottom, pool->top);
154
155     ASSERT_WSDEQUE_INVARIANTS(pool);
156
157     elements = (StgClosurePtr *)pool->elements;
158
159     /* We have exclusive access to the structure here, so we can reset
160        bottom and top counters, and prune invalid sparks. Contents are
161        copied in-place if they are valuable, otherwise discarded. The
162        routine uses "real" indices t and b, starts by computing them
163        as the modulus size of top and bottom,
164
165        Copying:
166
167        At the beginning, the pool structure can look like this:
168        ( bottom % size >= top % size , no wrap-around)
169                   t          b
170        ___________***********_________________
171
172        or like this ( bottom % size < top % size, wrap-around )
173                   b         t
174        ***********__________******************
175        As we need to remove useless sparks anyway, we make one pass
176        between t and b, moving valuable content to b and subsequent
177        cells (wrapping around when the size is reached).
178
179                      b      t
180        ***********OOO_______XX_X__X?**********
181                      ^____move?____/
182
183        After this movement, botInd becomes the new bottom, and old
184        bottom becomes the new top index, both as indices in the array
185        size range.
186     */
187     // starting here
188     currInd = (pool->top) & (pool->moduloSize); // mod
189
190     // copies of evacuated closures go to space from botInd on
191     // we keep oldBotInd to know when to stop
192     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
193
194     // on entry to loop, we are within the bounds
195     ASSERT( currInd < pool->size && botInd  < pool->size );
196
197     while (currInd != oldBotInd ) {
198       /* must use != here, wrap-around at size
199          subtle: loop not entered if queue empty
200        */
201
202       /* check element at currInd. if valuable, evacuate and move to
203          botInd, otherwise move on */
204       spark = elements[currInd];
205
206       // We have to be careful here: in the parallel GC, another
207       // thread might evacuate this closure while we're looking at it,
208       // so grab the info pointer just once.
209       info = spark->header.info;
210       if (IS_FORWARDING_PTR(info)) {
211           tmp = (StgClosure*)UN_FORWARDING_PTR(info);
212           /* if valuable work: shift inside the pool */
213           if (closure_SHOULD_SPARK(tmp)) {
214               elements[botInd] = tmp; // keep entry (new address)
215               botInd++;
216               n++;
217           } else {
218               pruned_sparks++; // discard spark
219               cap->sparks_pruned++;
220           }
221       } else {
222           if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
223               elements[botInd] = spark; // keep entry (new address)
224               evac (user, &elements[botInd]);
225               botInd++;
226               n++;
227           } else {
228               pruned_sparks++; // discard spark
229               cap->sparks_pruned++;
230           }
231       }
232       currInd++;
233
234       // in the loop, we may reach the bounds, and instantly wrap around
235       ASSERT( currInd <= pool->size && botInd <= pool->size );
236       if ( currInd == pool->size ) { currInd = 0; }
237       if ( botInd == pool->size )  { botInd = 0;  }
238
239     } // while-loop over spark pool elements
240
241     ASSERT(currInd == oldBotInd);
242
243     pool->top = oldBotInd; // where we started writing
244     pool->topBound = pool->top;
245
246     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
247     // first free place we did not use (corrected by wraparound)
248
249     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
250
251     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
252     
253     debugTrace(DEBUG_sched,
254                "new spark queue len=%ld; (hd=%ld; tl=%ld)",
255                sparkPoolSize(pool), pool->bottom, pool->top);
256
257     ASSERT_WSDEQUE_INVARIANTS(pool);
258 }
259
260 /* GC for the spark pool, called inside Capability.c for all
261    capabilities in turn. Blindly "evac"s complete spark pool. */
262 void
263 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
264 {
265     StgClosure **sparkp;
266     SparkPool *pool;
267     StgWord top,bottom, modMask;
268     
269     pool = cap->sparks;
270
271     ASSERT_WSDEQUE_INVARIANTS(pool);
272
273     top = pool->top;
274     bottom = pool->bottom;
275     sparkp = (StgClosurePtr*)pool->elements;
276     modMask = pool->moduloSize;
277
278     while (top < bottom) {
279     /* call evac for all closures in range (wrap-around via modulo)
280      * In GHC-6.10, evac takes an additional 1st argument to hold a
281      * GC-specific register, see rts/sm/GC.c::mark_root()
282      */
283       evac( user , sparkp + (top & modMask) ); 
284       top++;
285     }
286
287     debugTrace(DEBUG_sched,
288                "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
289                sparkPoolSize(pool), pool->bottom, pool->top);
290 }
291
292 /* ----------------------------------------------------------------------------
293  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
294  * capabilities) and its size. Accesses all spark pools and equally
295  * distributes the sparks among them.
296  *
297  * Could be called after GC, before Cap. release, from scheduler. 
298  * -------------------------------------------------------------------------- */
299 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
300
301 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
302                            Capability caps[] STG_UNUSED) {
303   barf("not implemented");
304 }
305
306 #else
307
308 StgInt
309 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
310 {
311     /* nothing */
312     return 1;
313 }
314
315 #endif /* THREADED_RTS */