[project @ 2002-10-22 10:59:40 by simonmar]
[ghc-base.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module      :  Control.Concurrent
4 -- Copyright   :  (c) The University of Glasgow 2001
5 -- License     :  BSD-style (see the file libraries/base/LICENSE)
6 -- 
7 -- Maintainer  :  libraries@haskell.org
8 -- Stability   :  experimental
9 -- Portability :  non-portable (concurrency)
10 --
11 -- A common interface to a collection of useful concurrency
12 -- abstractions.
13 --
14 -----------------------------------------------------------------------------
15
16 module Control.Concurrent (
17         -- * Concurrent Haskell
18
19         -- $conc_intro
20
21         -- * Basic concurrency operations
22
23         ThreadId,
24         myThreadId,
25
26         forkIO,
27         killThread,
28         throwTo,
29
30         -- * Scheduling
31
32         -- $conc_scheduling     
33         yield,                  -- :: IO ()
34
35         -- ** Blocking
36         
37         -- $blocking
38
39 #ifdef __GLASGOW_HASKELL__
40         -- ** Waiting
41         threadDelay,            -- :: Int -> IO ()
42         threadWaitRead,         -- :: Int -> IO ()
43         threadWaitWrite,        -- :: Int -> IO ()
44 #endif
45
46         -- * Communication abstractions
47
48         module Control.Concurrent.MVar,
49         module Control.Concurrent.Chan,
50         module Control.Concurrent.QSem,
51         module Control.Concurrent.QSemN,
52         module Control.Concurrent.SampleVar,
53
54         -- * Merging of streams
55         mergeIO,                -- :: [a]   -> [a] -> IO [a]
56         nmergeIO,               -- :: [[a]] -> IO [a]
57         -- $merge
58
59         -- * GHC's implementation of concurrency
60
61         -- |This section describes features specific to GHC's
62         -- implementation of Concurrent Haskell.
63         
64         -- ** Terminating the program
65
66         -- $termination
67
68         -- ** Pre-emption
69
70         -- $preemption
71
72     ) where
73
74 import Prelude
75
76 import Control.Exception as Exception
77
78 #ifdef __GLASGOW_HASKELL__
79 import GHC.Conc
80 import GHC.TopHandler   ( reportStackOverflow, reportError )
81 import GHC.IOBase       ( IO(..) )
82 import GHC.IOBase       ( unsafeInterleaveIO )
83 import GHC.Base
84 #endif
85
86 #ifdef __HUGS__
87 import IOExts ( unsafeInterleaveIO )
88 import ConcBase
89 #endif
90
91 import Control.Concurrent.MVar
92 import Control.Concurrent.Chan
93 import Control.Concurrent.QSem
94 import Control.Concurrent.QSemN
95 import Control.Concurrent.SampleVar
96
97 {- $conc_intro
98
99 The concurrency extension for Haskell is described in the paper
100 /Concurrent Haskell/
101 <http://www.haskell.org/ghc/docs/papers/concurrent-haskell.ps.gz>.
102
103 Concurrency is \"lightweight\", which means that both thread creation
104 and context switching overheads are extremely low.  Scheduling of
105 Haskell threads is done internally in the Haskell runtime system, and
106 doesn't make use of any operating system-supplied thread packages.
107
108 Haskell threads can communicate via 'MVar's, a kind of synchronised
109 mutable variable (see "Control.Concurrent.MVar").  Several common
110 concurrency abstractions can be built from 'MVar's, and these are
111 provided by the "Concurrent" library.  Threads may also communicate
112 via exceptions. 
113 -}
114
115 {- $conc_scheduling
116
117     Scheduling may be either pre-emptive or co-operative,
118     depending on the implementation of Concurrent Haskell (see below
119     for imformation related to specific compilers).  In a co-operative
120     system, context switches only occur when you use one of the
121     primitives defined in this module.  This means that programs such
122     as:
123
124
125 >   main = forkIO (write 'a') >> write 'b'
126 >     where write c = putChar c >> write c
127
128     will print either @aaaaaaaaaaaaaa...@ or @bbbbbbbbbbbb...@,
129     instead of some random interleaving of @a@s and @b@s.  In
130     practice, cooperative multitasking is sufficient for writing
131     simple graphical user interfaces.  
132 -}
133
134 {- $blocking
135 Calling a foreign C procedure (such as @getchar@) that blocks waiting
136 for input will block /all/ threads, unless the @threadsafe@ attribute
137 is used on the foreign call (and your compiler \/ operating system
138 supports it).  GHC's I\/O system uses non-blocking I\/O internally to
139 implement thread-friendly I\/O, so calling standard Haskell I\/O
140 functions blocks only the thread making the call.
141 -}
142
143 -- Thread Ids, specifically the instances of Eq and Ord for these things.
144 -- The ThreadId type itself is defined in std/PrelConc.lhs.
145
146 -- Rather than define a new primitve, we use a little helper function
147 -- cmp_thread in the RTS.
148
149 #ifdef __GLASGOW_HASKELL__
150 id2TSO :: ThreadId -> ThreadId#
151 id2TSO (ThreadId t) = t
152
153 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> Int
154 -- Returns -1, 0, 1
155
156 cmpThread :: ThreadId -> ThreadId -> Ordering
157 cmpThread t1 t2 = 
158    case cmp_thread (id2TSO t1) (id2TSO t2) of
159       -1 -> LT
160       0  -> EQ
161       _  -> GT -- must be 1
162
163 instance Eq ThreadId where
164    t1 == t2 = 
165       case t1 `cmpThread` t2 of
166          EQ -> True
167          _  -> False
168
169 instance Ord ThreadId where
170    compare = cmpThread
171
172 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
173
174 instance Show ThreadId where
175    showsPrec d t = 
176         showString "ThreadId " . 
177         showsPrec d (getThreadId (id2TSO t))
178
179 {- |
180 This sparks off a new thread to run the 'IO' computation passed as the
181 first argument, and returns the 'ThreadId' of the newly created
182 thread.
183 -}
184 forkIO :: IO () -> IO ThreadId
185 forkIO action = IO $ \ s -> 
186    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
187  where
188   action_plus = Exception.catch action childHandler
189
190 childHandler :: Exception -> IO ()
191 childHandler err = Exception.catch (real_handler err) childHandler
192
193 real_handler :: Exception -> IO ()
194 real_handler ex =
195   case ex of
196         -- ignore thread GC and killThread exceptions:
197         BlockedOnDeadMVar            -> return ()
198         AsyncException ThreadKilled  -> return ()
199
200         -- report all others:
201         AsyncException StackOverflow -> reportStackOverflow False
202         ErrorCall s -> reportError False s
203         other       -> reportError False (showsPrec 0 other "\n")
204
205 #endif /* __GLASGOW_HASKELL__ */
206
207
208 max_buff_size :: Int
209 max_buff_size = 1
210
211 mergeIO :: [a] -> [a] -> IO [a]
212 nmergeIO :: [[a]] -> IO [a]
213
214 -- $merge
215 -- The 'mergeIO' and 'nmergeIO' functions fork one thread for each
216 -- input list that concurrently evaluates that list; the results are
217 -- merged into a single output list.  
218 --
219 -- Note: Hugs does not provide these functions, since they require
220 -- preemptive multitasking.
221
222 mergeIO ls rs
223  = newEmptyMVar                >>= \ tail_node ->
224    newMVar tail_node           >>= \ tail_list ->
225    newQSem max_buff_size       >>= \ e ->
226    newMVar 2                   >>= \ branches_running ->
227    let
228     buff = (tail_list,e)
229    in
230     forkIO (suckIO branches_running buff ls) >>
231     forkIO (suckIO branches_running buff rs) >>
232     takeMVar tail_node  >>= \ val ->
233     signalQSem e        >>
234     return val
235
236 type Buffer a 
237  = (MVar (MVar [a]), QSem)
238
239 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
240
241 suckIO branches_running buff@(tail_list,e) vs
242  = case vs of
243         [] -> takeMVar branches_running >>= \ val ->
244               if val == 1 then
245                  takeMVar tail_list     >>= \ node ->
246                  putMVar node []        >>
247                  putMVar tail_list node
248               else      
249                  putMVar branches_running (val-1)
250         (x:xs) ->
251                 waitQSem e                       >>
252                 takeMVar tail_list               >>= \ node ->
253                 newEmptyMVar                     >>= \ next_node ->
254                 unsafeInterleaveIO (
255                         takeMVar next_node  >>= \ y ->
256                         signalQSem e        >>
257                         return y)                >>= \ next_node_val ->
258                 putMVar node (x:next_node_val)   >>
259                 putMVar tail_list next_node      >>
260                 suckIO branches_running buff xs
261
262 nmergeIO lss
263  = let
264     len = length lss
265    in
266     newEmptyMVar          >>= \ tail_node ->
267     newMVar tail_node     >>= \ tail_list ->
268     newQSem max_buff_size >>= \ e ->
269     newMVar len           >>= \ branches_running ->
270     let
271      buff = (tail_list,e)
272     in
273     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
274     takeMVar tail_node  >>= \ val ->
275     signalQSem e        >>
276     return val
277   where
278     mapIO f xs = sequence (map f xs)
279
280 -- ---------------------------------------------------------------------------
281 -- More docs
282
283 {- $termination
284
285       In a standalone GHC program, only the main thread is
286       required to terminate in order for the process to terminate.
287       Thus all other forked threads will simply terminate at the same
288       time as the main thread (the terminology for this kind of
289       behaviour is \"daemonic threads\").
290
291       If you want the program to wait for child threads to
292       finish before exiting, you need to program this yourself.  A
293       simple mechanism is to have each child thread write to an
294       'MVar' when it completes, and have the main
295       thread wait on all the 'MVar's before
296       exiting:
297
298 >   myForkIO :: IO () -> IO (MVar ())
299 >   myForkIO io = do
300 >     mvar \<- newEmptyMVar
301 >     forkIO (io \`finally\` putMVar mvar ())
302 >     return mvar
303
304       Note that we use 'finally' from the
305       "Exception" module to make sure that the
306       'MVar' is written to even if the thread dies or
307       is killed for some reason.
308
309       A better method is to keep a global list of all child
310       threads which we should wait for at the end of the program:
311
312 >     children :: MVar [MVar ()]
313 >     children = unsafePerformIO (newMVar [])
314 >     
315 >     waitForChildren :: IO ()
316 >     waitForChildren = do
317 >       (mvar:mvars) \<- takeMVar children
318 >       putMVar children mvars
319 >       takeMVar mvar
320 >       waitForChildren
321 >     
322 >     forkChild :: IO () -> IO ()
323 >     forkChild io = do
324 >        mvar \<- newEmptyMVar
325 >        forkIO (p \`finally\` putMVar mvar ())
326 >        childs \<- takeMVar children
327 >        putMVar children (mvar:childs)
328 >     
329 >     later = flip finally
330 >     
331 >     main =
332 >       later waitForChildren $
333 >       ...
334
335       The main thread principle also applies to calls to Haskell from
336       outside, using @foreign export@.  When the @foreign export@ed
337       function is invoked, it starts a new main thread, and it returns
338       when this main thread terminates.  If the call causes new
339       threads to be forked, they may remain in the system after the
340       @foreign export@ed function has returned.
341 -}
342
343 {- $preemption
344
345       GHC implements pre-emptive multitasking: the execution of
346       threads are interleaved in a random fashion.  More specifically,
347       a thread may be pre-empted whenever it allocates some memory,
348       which unfortunately means that tight loops which do no
349       allocation tend to lock out other threads (this only seems to
350       happen with pathalogical benchmark-style code, however).
351
352       The rescheduling timer runs on a 20ms granularity by
353       default, but this may be altered using the
354       @-i<n>@ RTS option.  After a rescheduling
355       \"tick\" the running thread is pre-empted as soon as
356       possible.
357
358       One final note: the
359       @aaaa@ @bbbb@ example may not
360       work too well on GHC (see Scheduling, above), due
361       to the locking on a 'Handle'.  Only one thread
362       may hold the lock on a 'Handle' at any one
363       time, so if a reschedule happens while a thread is holding the
364       lock, the other thread won't be able to run.  The upshot is that
365       the switch from @aaaa@ to
366       @bbbbb@ happens infrequently.  It can be
367       improved by lowering the reschedule tick period.  We also have a
368       patch that causes a reschedule whenever a thread waiting on a
369       lock is woken up, but haven't found it to be useful for anything
370       other than this example :-)
371 -}