[project @ 2004-10-17 00:22:03 by ross]
[ghc-base.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module      :  Control.Concurrent
4 -- Copyright   :  (c) The University of Glasgow 2001
5 -- License     :  BSD-style (see the file libraries/base/LICENSE)
6 -- 
7 -- Maintainer  :  libraries@haskell.org
8 -- Stability   :  experimental
9 -- Portability :  non-portable (concurrency)
10 --
11 -- A common interface to a collection of useful concurrency
12 -- abstractions.
13 --
14 -----------------------------------------------------------------------------
15
16 module Control.Concurrent (
17         -- * Concurrent Haskell
18
19         -- $conc_intro
20
21         -- * Basic concurrency operations
22
23         ThreadId,
24 #ifdef __GLASGOW_HASKELL__
25         myThreadId,
26 #endif
27
28         forkIO,
29 #ifdef __GLASGOW_HASKELL__
30         killThread,
31         throwTo,
32 #endif
33
34         -- * Scheduling
35
36         -- $conc_scheduling     
37         yield,                  -- :: IO ()
38
39         -- ** Blocking
40         
41         -- $blocking
42
43 #ifdef __GLASGOW_HASKELL__
44         -- ** Waiting
45         threadDelay,            -- :: Int -> IO ()
46         threadWaitRead,         -- :: Int -> IO ()
47         threadWaitWrite,        -- :: Int -> IO ()
48 #endif
49
50         -- * Communication abstractions
51
52         module Control.Concurrent.MVar,
53         module Control.Concurrent.Chan,
54         module Control.Concurrent.QSem,
55         module Control.Concurrent.QSemN,
56         module Control.Concurrent.SampleVar,
57
58         -- * Merging of streams
59 #ifndef __HUGS__
60         mergeIO,                -- :: [a]   -> [a] -> IO [a]
61         nmergeIO,               -- :: [[a]] -> IO [a]
62 #endif
63         -- $merge
64
65 #ifdef __GLASGOW_HASKELL__
66         -- * Bound Threads
67         -- $boundthreads
68         rtsSupportsBoundThreads,
69         forkOS,
70         isCurrentThreadBound,
71         runInBoundThread,
72         runInUnboundThread
73 #endif
74
75         -- * GHC's implementation of concurrency
76
77         -- |This section describes features specific to GHC's
78         -- implementation of Concurrent Haskell.
79         
80         -- ** Terminating the program
81
82         -- $termination
83
84         -- ** Pre-emption
85
86         -- $preemption
87     ) where
88
89 import Prelude
90
91 import Control.Exception as Exception
92
93 #ifdef __GLASGOW_HASKELL__
94 import GHC.Conc         ( ThreadId(..), myThreadId, killThread, yield,
95                           threadDelay, threadWaitRead, threadWaitWrite )
96 import GHC.TopHandler   ( reportStackOverflow, reportError )
97 import GHC.IOBase       ( IO(..) )
98 import GHC.IOBase       ( unsafeInterleaveIO )
99 import GHC.IOBase   ( newIORef, readIORef, writeIORef )
100 import GHC.Base
101
102 import Foreign.StablePtr
103 import Foreign.C.Types  ( CInt )
104 import Control.Monad    ( when )
105 #endif
106
107 #ifdef __HUGS__
108 import Hugs.ConcBase
109 #endif
110
111 import Control.Concurrent.MVar
112 import Control.Concurrent.Chan
113 import Control.Concurrent.QSem
114 import Control.Concurrent.QSemN
115 import Control.Concurrent.SampleVar
116
117 #ifdef __HUGS__
118 type ThreadId = ()
119 #endif
120
121 {- $conc_intro
122
123 The concurrency extension for Haskell is described in the paper
124 /Concurrent Haskell/
125 <http://www.haskell.org/ghc/docs/papers/concurrent-haskell.ps.gz>.
126
127 Concurrency is \"lightweight\", which means that both thread creation
128 and context switching overheads are extremely low.  Scheduling of
129 Haskell threads is done internally in the Haskell runtime system, and
130 doesn't make use of any operating system-supplied thread packages.
131
132 However, if you want to interact with a foreign library that expects your
133 program to use the operating system-supplied thread package, you can do so
134 by using 'forkOS' instead of 'forkIO'.
135
136 Haskell threads can communicate via 'MVar's, a kind of synchronised
137 mutable variable (see "Control.Concurrent.MVar").  Several common
138 concurrency abstractions can be built from 'MVar's, and these are
139 provided by the "Control.Concurrent" library.
140 In GHC, threads may also communicate via exceptions.
141 -}
142
143 {- $conc_scheduling
144
145     Scheduling may be either pre-emptive or co-operative,
146     depending on the implementation of Concurrent Haskell (see below
147     for information related to specific compilers).  In a co-operative
148     system, context switches only occur when you use one of the
149     primitives defined in this module.  This means that programs such
150     as:
151
152
153 >   main = forkIO (write 'a') >> write 'b'
154 >     where write c = putChar c >> write c
155
156     will print either @aaaaaaaaaaaaaa...@ or @bbbbbbbbbbbb...@,
157     instead of some random interleaving of @a@s and @b@s.  In
158     practice, cooperative multitasking is sufficient for writing
159     simple graphical user interfaces.  
160 -}
161
162 {- $blocking
163 Calling a foreign C procedure (such as @getchar@) that blocks waiting
164 for input will block /all/ threads, unless the @threadsafe@ attribute
165 is used on the foreign call (and your compiler \/ operating system
166 supports it).  GHC's I\/O system uses non-blocking I\/O internally to
167 implement thread-friendly I\/O, so calling standard Haskell I\/O
168 functions blocks only the thread making the call.
169 -}
170
171 -- Thread Ids, specifically the instances of Eq and Ord for these things.
172 -- The ThreadId type itself is defined in std/PrelConc.lhs.
173
174 -- Rather than define a new primitve, we use a little helper function
175 -- cmp_thread in the RTS.
176
177 #ifdef __GLASGOW_HASKELL__
178 id2TSO :: ThreadId -> ThreadId#
179 id2TSO (ThreadId t) = t
180
181 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> Int
182 -- Returns -1, 0, 1
183
184 cmpThread :: ThreadId -> ThreadId -> Ordering
185 cmpThread t1 t2 = 
186    case cmp_thread (id2TSO t1) (id2TSO t2) of
187       -1 -> LT
188       0  -> EQ
189       _  -> GT -- must be 1
190
191 instance Eq ThreadId where
192    t1 == t2 = 
193       case t1 `cmpThread` t2 of
194          EQ -> True
195          _  -> False
196
197 instance Ord ThreadId where
198    compare = cmpThread
199
200 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
201
202 instance Show ThreadId where
203    showsPrec d t = 
204         showString "ThreadId " . 
205         showsPrec d (getThreadId (id2TSO t))
206
207 {- |
208 This sparks off a new thread to run the 'IO' computation passed as the
209 first argument, and returns the 'ThreadId' of the newly created
210 thread.
211
212 The new thread will be a lightweight thread; if you want to use a foreign
213 library that uses thread-local storage, use 'forkOS' instead.
214 -}
215 forkIO :: IO () -> IO ThreadId
216 forkIO action = IO $ \ s -> 
217    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
218  where
219   action_plus = Exception.catch action childHandler
220
221 childHandler :: Exception -> IO ()
222 childHandler err = Exception.catch (real_handler err) childHandler
223
224 real_handler :: Exception -> IO ()
225 real_handler ex =
226   case ex of
227         -- ignore thread GC and killThread exceptions:
228         BlockedOnDeadMVar            -> return ()
229         AsyncException ThreadKilled  -> return ()
230
231         -- report all others:
232         AsyncException StackOverflow -> reportStackOverflow False
233         other       -> reportError False other
234
235 #endif /* __GLASGOW_HASKELL__ */
236
237 #ifndef __HUGS__
238 max_buff_size :: Int
239 max_buff_size = 1
240
241 mergeIO :: [a] -> [a] -> IO [a]
242 nmergeIO :: [[a]] -> IO [a]
243
244 -- $merge
245 -- The 'mergeIO' and 'nmergeIO' functions fork one thread for each
246 -- input list that concurrently evaluates that list; the results are
247 -- merged into a single output list.  
248 --
249 -- Note: Hugs does not provide these functions, since they require
250 -- preemptive multitasking.
251
252 mergeIO ls rs
253  = newEmptyMVar                >>= \ tail_node ->
254    newMVar tail_node           >>= \ tail_list ->
255    newQSem max_buff_size       >>= \ e ->
256    newMVar 2                   >>= \ branches_running ->
257    let
258     buff = (tail_list,e)
259    in
260     forkIO (suckIO branches_running buff ls) >>
261     forkIO (suckIO branches_running buff rs) >>
262     takeMVar tail_node  >>= \ val ->
263     signalQSem e        >>
264     return val
265
266 type Buffer a 
267  = (MVar (MVar [a]), QSem)
268
269 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
270
271 suckIO branches_running buff@(tail_list,e) vs
272  = case vs of
273         [] -> takeMVar branches_running >>= \ val ->
274               if val == 1 then
275                  takeMVar tail_list     >>= \ node ->
276                  putMVar node []        >>
277                  putMVar tail_list node
278               else      
279                  putMVar branches_running (val-1)
280         (x:xs) ->
281                 waitQSem e                       >>
282                 takeMVar tail_list               >>= \ node ->
283                 newEmptyMVar                     >>= \ next_node ->
284                 unsafeInterleaveIO (
285                         takeMVar next_node  >>= \ y ->
286                         signalQSem e        >>
287                         return y)                >>= \ next_node_val ->
288                 putMVar node (x:next_node_val)   >>
289                 putMVar tail_list next_node      >>
290                 suckIO branches_running buff xs
291
292 nmergeIO lss
293  = let
294     len = length lss
295    in
296     newEmptyMVar          >>= \ tail_node ->
297     newMVar tail_node     >>= \ tail_list ->
298     newQSem max_buff_size >>= \ e ->
299     newMVar len           >>= \ branches_running ->
300     let
301      buff = (tail_list,e)
302     in
303     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
304     takeMVar tail_node  >>= \ val ->
305     signalQSem e        >>
306     return val
307   where
308     mapIO f xs = sequence (map f xs)
309 #endif /* __HUGS__ */
310
311 #ifdef __GLASGOW_HASKELL__
312 -- ---------------------------------------------------------------------------
313 -- Bound Threads
314
315 {- $boundthreads
316
317 Support for multiple operating system threads and bound threads as described
318 below is currently only available in the GHC runtime system if you use the
319 /-threaded/ option when linking.
320
321 Other Haskell systems do not currently support multiple operating system threads.
322
323 A bound thread is a haskell thread that is /bound/ to an operating system
324 thread. While the bound thread is still scheduled by the Haskell run-time
325 system, the operating system thread takes care of all the foreign calls made
326 by the bound thread.
327
328 To a foreign library, the bound thread will look exactly like an ordinary
329 operating system thread created using OS functions like @pthread_create@
330 or @CreateThread@.
331
332 Bound threads can be created using the 'forkOS' function below. All foreign
333 exported functions are run in a bound thread (bound to the OS thread that
334 called the function). Also, the @main@ action of every Haskell program is
335 run in a bound thread.
336
337 Why do we need this? Because if a foreign library is called from a thread
338 created using 'forkIO', it won't have access to any /thread-local state/ - 
339 state variables that have specific values for each OS thread
340 (see POSIX's @pthread_key_create@ or Win32's @TlsAlloc@). Therefore, some
341 libraries (OpenGL, for example) will not work from a thread created using
342 'forkIO'. They work fine in threads created using 'forkOS' or when called
343 from @main@ or from a @foreign export@.
344 -}
345
346 -- | 'True' if bound threads are supported.
347 -- If @rtsSupportsBoundThreads@ is 'False', 'isCurrentThreadBound'
348 -- will always return 'False' and both 'forkOS' and 'runInBoundThread' will
349 -- fail.
350 foreign import ccall rtsSupportsBoundThreads :: Bool
351
352
353 {- |
354 Like 'forkIO', this sparks off a new thread to run the 'IO' computation passed as the
355 first argument, and returns the 'ThreadId' of the newly created
356 thread.
357
358 However, @forkOS@ uses operating system-supplied multithreading support to create
359 a new operating system thread. The new thread is /bound/, which means that
360 all foreign calls made by the 'IO' computation are guaranteed to be executed
361 in this new operating system thread; also, the operating system thread is not
362 used for any other foreign calls.
363
364 This means that you can use all kinds of foreign libraries from this thread 
365 (even those that rely on thread-local state), without the limitations of 'forkIO'.
366 -}
367 forkOS :: IO () -> IO ThreadId
368
369 foreign export ccall forkOS_entry
370     :: StablePtr (IO ()) -> IO ()
371
372 foreign import ccall "forkOS_entry" forkOS_entry_reimported
373     :: StablePtr (IO ()) -> IO ()
374
375 forkOS_entry stableAction = do
376         action <- deRefStablePtr stableAction
377         action
378
379 foreign import ccall forkOS_createThread
380     :: StablePtr (IO ()) -> IO CInt
381
382 failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
383                        ++"(use ghc -threaded when linking)"
384     
385 forkOS action 
386     | rtsSupportsBoundThreads = do
387         mv <- newEmptyMVar
388         let action_plus = Exception.catch action childHandler
389         entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
390         err <- forkOS_createThread entry
391         when (err /= 0) $ fail "Cannot create OS thread."
392         tid <- takeMVar mv
393         freeStablePtr entry
394         return tid
395     | otherwise = failNonThreaded
396
397 -- | Returns 'True' if the calling thread is /bound/, that is, if it is
398 -- safe to use foreign libraries that rely on thread-local state from the
399 -- calling thread.
400 isCurrentThreadBound :: IO Bool
401 isCurrentThreadBound = IO $ \ s# -> 
402     case isCurrentThreadBound# s# of
403         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
404
405
406 {- | 
407 Run the 'IO' computation passed as the first argument. If the calling thread
408 is not /bound/, a bound thread is created temporarily. @runInBoundThread@
409 doesn't finish until the 'IO' computation finishes.
410
411 You can wrap a series of foreign function calls that rely on thread-local state
412 with @runInBoundThread@ so that you can use them without knowing whether the
413 current thread is /bound/.
414 -}
415 runInBoundThread :: IO a -> IO a
416
417 runInBoundThread action
418     | rtsSupportsBoundThreads = do
419         bound <- isCurrentThreadBound
420         if bound
421             then action
422             else do
423                 ref <- newIORef undefined
424                 let action_plus = Exception.try action >>= writeIORef ref
425                 resultOrException <- 
426                     bracket (newStablePtr action_plus)
427                             freeStablePtr
428                             (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
429                 case resultOrException of
430                     Left exception -> Exception.throw exception
431                     Right result -> return result
432     | otherwise = failNonThreaded
433
434 {- | 
435 Run the 'IO' computation passed as the first argument. If the calling thread
436 is /bound/, an unbound thread is created temporarily using 'forkIO'.
437 @runInBoundThread@ doesn't finish until the 'IO' computation finishes.
438
439 Use this function /only/ in the rare case that you have actually observed a
440 performance loss due to the use of bound threads. A program that
441 doesn't need it's main thread to be bound and makes /heavy/ use of concurrency
442 (e.g. a web server), might want to wrap it's @main@ action in
443 @runInUnboundThread@.
444 -}
445 runInUnboundThread :: IO a -> IO a
446
447 runInUnboundThread action = do
448     bound <- isCurrentThreadBound
449     if bound
450         then do
451             mv <- newEmptyMVar
452             forkIO (Exception.try action >>= putMVar mv)
453             takeMVar mv >>= \either -> case either of
454                 Left exception -> Exception.throw exception
455                 Right result -> return result
456         else action
457         
458 #endif /* __GLASGOW_HASKELL__ */
459
460 -- ---------------------------------------------------------------------------
461 -- More docs
462
463 {- $termination
464
465       In a standalone GHC program, only the main thread is
466       required to terminate in order for the process to terminate.
467       Thus all other forked threads will simply terminate at the same
468       time as the main thread (the terminology for this kind of
469       behaviour is \"daemonic threads\").
470
471       If you want the program to wait for child threads to
472       finish before exiting, you need to program this yourself.  A
473       simple mechanism is to have each child thread write to an
474       'MVar' when it completes, and have the main
475       thread wait on all the 'MVar's before
476       exiting:
477
478 >   myForkIO :: IO () -> IO (MVar ())
479 >   myForkIO io = do
480 >     mvar \<- newEmptyMVar
481 >     forkIO (io \`finally\` putMVar mvar ())
482 >     return mvar
483
484       Note that we use 'finally' from the
485       "Control.Exception" module to make sure that the
486       'MVar' is written to even if the thread dies or
487       is killed for some reason.
488
489       A better method is to keep a global list of all child
490       threads which we should wait for at the end of the program:
491
492 >     children :: MVar [MVar ()]
493 >     children = unsafePerformIO (newMVar [])
494 >     
495 >     waitForChildren :: IO ()
496 >     waitForChildren = do
497 >       (mvar:mvars) \<- takeMVar children
498 >       putMVar children mvars
499 >       takeMVar mvar
500 >       waitForChildren
501 >     
502 >     forkChild :: IO () -> IO ()
503 >     forkChild io = do
504 >        mvar \<- newEmptyMVar
505 >        forkIO (p \`finally\` putMVar mvar ())
506 >        childs \<- takeMVar children
507 >        putMVar children (mvar:childs)
508 >     
509 >     later = flip finally
510 >     
511 >     main =
512 >       later waitForChildren $
513 >       ...
514
515       The main thread principle also applies to calls to Haskell from
516       outside, using @foreign export@.  When the @foreign export@ed
517       function is invoked, it starts a new main thread, and it returns
518       when this main thread terminates.  If the call causes new
519       threads to be forked, they may remain in the system after the
520       @foreign export@ed function has returned.
521 -}
522
523 {- $preemption
524
525       GHC implements pre-emptive multitasking: the execution of
526       threads are interleaved in a random fashion.  More specifically,
527       a thread may be pre-empted whenever it allocates some memory,
528       which unfortunately means that tight loops which do no
529       allocation tend to lock out other threads (this only seems to
530       happen with pathological benchmark-style code, however).
531
532       The rescheduling timer runs on a 20ms granularity by
533       default, but this may be altered using the
534       @-i\<n\>@ RTS option.  After a rescheduling
535       \"tick\" the running thread is pre-empted as soon as
536       possible.
537
538       One final note: the
539       @aaaa@ @bbbb@ example may not
540       work too well on GHC (see Scheduling, above), due
541       to the locking on a 'System.IO.Handle'.  Only one thread
542       may hold the lock on a 'System.IO.Handle' at any one
543       time, so if a reschedule happens while a thread is holding the
544       lock, the other thread won't be able to run.  The upshot is that
545       the switch from @aaaa@ to
546       @bbbbb@ happens infrequently.  It can be
547       improved by lowering the reschedule tick period.  We also have a
548       patch that causes a reschedule whenever a thread waiting on a
549       lock is woken up, but haven't found it to be useful for anything
550       other than this example :-)
551 -}