1 <sect> <idx/Concurrent/
2 <label id="sec:Concurrent">
5 <sect1> <idx/Concurrent Haskell/
6 <label id="sec:Concurrent Haskell">
9 GHC and Hugs both provide concurrency extensions, as described in
10 <url name="Concurrent Haskell"
11 url="http://research.microsoft.com/Users/simonpj/Papers/concurrent-haskell.ps.gz">.
13 Concurrency in GHC and Hugs is "lightweight", which means that both
14 thread creation and context switching overheads are extremely low.
15 Scheduling of Haskell threads is done internally in the Haskell
16 runtime system, and doesn't make use of any operating system-supplied
19 Haskell threads can communicate via <tt/MVar/s, a kind of synchronised
20 mutable variable. Several common concurrency abstractions can be
21 built from <tt/MVar/s, and these are provided by the <tt/Concurrent/
22 library, which is described in the later sections. Threads may also
23 communicate via exceptions.
25 <sect1> <idx/Concurrency Basics/
26 <label id="sec:Concurrency Basics">
29 To gain access to the concurrency primitives, just <tt/import Concurrent/
30 in your Haskell module. In GHC, you also need to add the <tt/-syslib
31 concurrent/ option to the command line.
33 To create a new thread, use <tt/forkIO/:
36 forkIO :: IO () -> IO ThreadId
39 This sparks off a new thread to run the <tt/IO/ computation passed as the
42 The returned <tt/ThreadId/ is an abstract type representing a handle
43 to the newly created thread. The <tt/ThreadId/ type is an instance of
44 both <tt/Eq/ and <tt/Ord/, where the <tt/Ord/ instance implements an
45 arbitrary total ordering over <tt/ThreadId/s.
47 Threads may also be killed via the <tt/ThreadId/:
50 killThread :: ThreadId -> IO ()
53 this terminates the given thread (Note: <tt/killThread/ is not
54 implemented in Hugs yet). Any work already done by the thread isn't
55 lost: the computation is suspended until required by another thread.
56 The memory used by the thread will be garbage collected if it isn't
57 referenced from anywhere else.
59 More generally, an arbitrary exception (see Section <ref
60 id="sec:Exception" name="Exceptions">) may be raised in any thread for
61 which we have a <tt/ThreadId/, with <tt/raiseInThread/:
64 raiseInThread :: ThreadId -> Exception -> IO ()
67 Actually <tt/killThread/ just raises the <tt/ThreadKilled/ exception
68 in the target thread, the normal action of which is to just terminate
69 the thread. The target thread will stop whatever it was doing (even
70 if it was blocked on an <tt/MVar/ or other computation) and handle the
73 One important property of <tt/raiseInThread/ (and therefore
74 <tt/killThread/) is that they are <em/synchronous/. This means that
75 after performing a <tt/raiseInThread/ operation, the calling thread
76 can be certain that the target thread has received the exception. In
77 other words, the target thread cannot perform any more processing
78 unless it handles the exception that has just been raised in it. This
79 is a useful property to know when dealing with race conditions: eg. if
80 there are two threads that can kill each other, it is guaranteed that
81 only one of the threads will get to kill the other.
83 The <tt/ThreadId/ for the current thread can be obtained with
87 myThreadId :: IO ThreadId
90 NOTE: if you have a <tt/ThreadId/, you essentially have a pointer to the
91 thread itself. This means the thread itself can't be garbage
92 collected until you drop the <tt/ThreadId/. This misfeature will
93 hopefully be corrected at a later date.
98 GHC uses <em>preemptive multitasking</em>: context switches can occur
99 at any time. At present, Hugs uses <em>cooperative multitasking</em>:
100 context switches only occur when you use one of the primitives defined
101 in this module. This means that programs such as:
104 main = forkIO (write 'a') >> write 'b'
105 where write c = putChar c >> write c
108 will print either <tt/aaaaaaaaaaaaaa.../ or <tt/bbbbbbbbbbbb.../,
109 instead of some random interleaving of <tt/a/s and <tt/b/s.
110 In practice, cooperative multitasking is sufficient for writing simple
111 graphical user interfaces.
113 The <tt>yield</tt> action forces a context-switch to any other
114 currently runnable threads (if any), and is occasionally useful when
115 implementing concurrency abstractions:
121 <sect2> <idx/Thread Waiting/
124 Finally, there are operations to delay a concurrent thread, and to
125 make one wait:<nidx>delay a concurrent thread</nidx>
126 <nidx>wait for a file descriptor</nidx>
129 threadDelay :: Int -> IO () -- delay rescheduling for N microseconds
130 threadWaitRead :: Int -> IO () -- wait for input on specified file descriptor
131 threadWaitWrite :: Int -> IO () -- (read and write, respectively).
134 The <tt/threadDelay/ operation will cause the current thread to
135 suspend for a given number of microseconds. Note that the resolution
136 used by the Haskell runtime system's internal timer together with the
137 fact that the thread may take some time to be rescheduled after the
138 time has expired, means that the accuracy is more like 1/50 second.
140 <tt/threadWaitRead/ and <tt/threadWaitWrite/ can be used to block a
141 thread until I/O is available on a given file descriptor. These
142 primitives are used by the I/O subsystem to ensure that a thread
143 waiting on I/O doesn't hang the entire system.
145 <sect2> <idx/Blocking/
147 Calling a foreign C procedure (such as <tt/getchar/) that blocks
148 waiting for input will block <em>all</em> threads, in both
149 GHC and Hugs. The GHC I/O system uses non-blocking I/O internally to implement
150 thread-friendly I/O, so calling standard Haskell I/O functions blocks
151 only the thead making the call.
154 <sect1> <idx/Concurrency abstractions/
155 <label id="sec:Concurrency-abstractions">
159 <label id="sec:MVars">
162 The <tt/Concurrent/ interface provides access to ``M-Vars'', which are
163 <em>synchronising variables</em>.
165 <nidx>synchronising variables (Glasgow extension)</nidx>
166 <nidx>concurrency -- synchronising variables</nidx>
168 <tt/MVars/<nidx>MVars (Glasgow extension)</nidx> are rendezvous points,
169 mostly for concurrent threads. They begin either empty or full, and
170 any attempt to read an empty <tt/MVar/ blocks. When an <tt/MVar/ is
171 written, a single blocked thread may be freed. Reading an <tt/MVar/
172 toggles its state from full back to empty. Therefore, any value
173 written to an <tt/MVar/ may only be read once. Multiple reads and writes
174 are allowed, but there must be at least one read between any two
178 data MVar a -- abstract
181 newEmptyMVar :: IO (MVar a)
182 newMVar :: a -> IO (MVar a)
183 takeMVar :: MVar a -> IO a
184 putMVar :: MVar a -> a -> IO ()
185 readMVar :: MVar a -> IO a
186 swapMVar :: MVar a -> a -> IO a
187 isEmptyMVar :: MVar a -> IO Bool
190 The operation <tt/isEmptyMVar/ returns a flag indicating
191 whether the <tt/MVar/ is currently empty or filled in, i.e.,
192 will a thread block when performing a <tt/takeMVar/ on that
195 Please notice that the Boolean value returned from <tt/isEmptyMVar/
196 represent just a snapshot of the state of the <tt/MVar/. By the
197 time a thread gets to inspect the result and act upon it, other
198 threads may have accessed the <tt/MVar/ and changed the 'filled-in'
199 status of the variable.
201 The same proviso applies to <tt/isEmptyChan/ (next sub-section).
203 These two predicates are currently only supported by GHC.
205 <sect2> <idx/Channel Variables/
206 <label id="sec:CVars">
209 A <em>channel variable</em> (<tt/CVar/) is a one-element channel, as
210 described in the paper:
214 newCVar :: IO (CVar a)
215 putCVar :: CVar a -> a -> IO ()
216 getCVar :: CVar a -> IO a
219 <sect2> <idx/Channels/
220 <label id="sec:Channels">
223 A <tt/Channel/ is an unbounded channel:
227 newChan :: IO (Chan a)
228 putChan :: Chan a -> a -> IO ()
229 getChan :: Chan a -> IO a
230 dupChan :: Chan a -> IO (Chan a)
231 unGetChan :: Chan a -> a -> IO ()
232 getChanContents :: Chan a -> IO [a]
235 <sect2> <idx/Semaphores/
236 <label id="sec:Semaphores">
239 General and quantity semaphores:
243 newQSem :: Int -> IO QSem
244 waitQSem :: QSem -> IO ()
245 signalQSem :: QSem -> IO ()
248 newQSemN :: Int -> IO QSemN
249 signalQSemN :: QSemN -> Int -> IO ()
250 waitQSemN :: QSemN -> Int -> IO ()
253 <sect2> <idx/Merging Streams/
254 <label id="sec:Merging Streams">
257 Merging streams---binary and n-ary:
260 mergeIO :: [a] -> [a] -> IO [a]
261 nmergeIO :: [[a]] -> IO [a]
264 These actions fork one thread for each input list that concurrently
265 evaluates that list; the results are merged into a single output list.
267 Note: Hugs does not provide the functions <tt/mergeIO/ or
268 <tt/nmergeIO/ since these require preemptive multitasking.
270 <sect2> <idx/Sample Variables/
271 <label id="sec:Sample-Variables">
274 A <em>Sample variable</em> (<tt/SampleVar/) is slightly different from a
278 <item> Reading an empty <tt/SampleVar/ causes the reader to block
279 (same as <tt/takeMVar/ on empty <tt/MVar/).
280 <item> Reading a filled <tt/SampleVar/ empties it and returns value.
281 (same as <tt/takeMVar/)
282 <item> Writing to an empty <tt/SampleVar/ fills it with a value, and
283 potentially, wakes up a blocked reader (same as for <tt/putMVar/ on empty <tt/MVar/).
284 <item> Writing to a filled <tt/SampleVar/ overwrites the current value.
285 (different from <tt/putMVar/ on full <tt/MVar/.)
289 type SampleVar a = MVar (Int, MVar a)
291 emptySampleVar :: SampleVar a -> IO ()
292 newSampleVar :: IO (SampleVar a)
293 readSample :: SampleVar a -> IO a
294 writeSample :: SampleVar a -> a -> IO ()
297 <sect1> The <tt/Concurrent/ library interface
300 The full interface for the <tt/Concurrent/ library is given below for
304 module Concurrent where
306 data ThreadId -- thread identifiers
308 instance Ord ThreadId
310 forkIO :: IO () -> IO ThreadId
311 myThreadId :: IO ThreadId
312 killThread :: ThreadId -> IO ()
315 data MVar a -- Synchronisation variables
317 newEmptyMVar :: IO (MVar a)
318 newMVar :: a -> IO (MVar a)
319 takeMVar :: MVar a -> IO a
320 putMVar :: MVar a -> a -> IO ()
321 swapMVar :: MVar a -> a -> IO a
322 readMVar :: MVar a -> IO a
323 isEmptyMVar :: MVar a -> IO Bool
326 data Chan a -- channels
327 newChan :: IO (Chan a)
328 writeChan :: Chan a -> a -> IO ()
329 readChan :: Chan a -> IO a
330 dupChan :: Chan a -> IO (Chan a)
331 unReadChan :: Chan a -> a -> IO ()
332 isEmptyChan :: Chan a -> IO Bool
333 getChanContents :: Chan a -> IO [a]
334 writeList2Chan :: Chan a -> [a] -> IO ()
336 data CVar a -- one element channels
337 newCVar :: IO (CVar a)
338 putCVar :: CVar a -> a -> IO ()
339 getCVar :: CVar a -> IO a
341 data QSem -- General/quantity semaphores
342 newQSem :: Int -> IO QSem
343 waitQSem :: QSem -> IO ()
344 signalQSem :: QSem -> IO ()
346 data QSemN -- General/quantity semaphores
347 newQSemN :: Int -> IO QSemN
348 waitQSemN :: QSemN -> Int -> IO ()
349 signalQSemN :: QSemN -> Int -> IO ()
351 type SampleVar a -- Sample variables
352 newEmptySampleVar:: IO (SampleVar a)
353 newSampleVar :: a -> IO (SampleVar a)
354 emptySampleVar :: SampleVar a -> IO ()
355 readSampleVar :: SampleVar a -> IO a
356 writeSampleVar :: SampleVar a -> a -> IO ()
358 threadDelay :: Int -> IO ()
359 threadWaitRead :: Int -> IO ()
360 threadWaitWrite :: Int -> IO ()
363 <sect1> GHC-specific concurrency issues
366 In a standalone GHC program, only the main thread is required to
367 terminate in order for the process to terminate. Thus all other
368 forked threads will simply terminate at the same time as the main
369 thread (the terminology for this kind of behaviour is ``daemonic
372 If you want the program to wait for child threads to finish before
373 exiting, you need to program this yourself. A simple mechanism is to
374 have each child thread write to an <tt/MVar/ when it completes, and
375 have the main thread wait on all the <tt/MVar/s before exiting:
378 myForkIO :: IO () -> IO (MVar ())
381 forkIO (io `finally` putMVar mvar ())
385 Note that we use <tt/finally/ from the <tt/Exception/ module to make
386 sure that the <tt/MVar/ is written to even if the thread dies or is
387 killed for some reason.
389 A better method is to keep a global list of all child threads which we
390 should wait for at the end of the program:
393 children :: MVar [MVar ()]
394 children = unsafePerformIO (newMVar [])
396 waitForChildren :: IO ()
398 (mvar:mvars) <- takeMVar children
399 putMVar children mvars
403 forkChild :: IO () -> IO ()
406 forkIO (p `finally` putMVar mvar ())
407 childs <- takeMVar children
408 putMVar children (mvar:childs)
413 later waitForChildren $