[project @ 2002-04-24 16:31:37 by simonmar]
[ghc-base.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module      :  Control.Concurrent
4 -- Copyright   :  (c) The University of Glasgow 2001
5 -- License     :  BSD-style (see the file libraries/core/LICENSE)
6 -- 
7 -- Maintainer  :  libraries@haskell.org
8 -- Stability   :  experimental
9 -- Portability :  non-portable
10 --
11 -- $Id: Concurrent.hs,v 1.6 2002/04/24 16:31:37 simonmar Exp $
12 --
13 -- A common interface to a collection of useful concurrency
14 -- abstractions.
15 --
16 -----------------------------------------------------------------------------
17
18 module Control.Concurrent (
19         module Control.Concurrent.Chan,
20         module Control.Concurrent.CVar,
21         module Control.Concurrent.MVar,
22         module Control.Concurrent.QSem,
23         module Control.Concurrent.QSemN,
24         module Control.Concurrent.SampleVar,
25
26         forkIO,                 -- :: IO () -> IO ()
27         yield,                  -- :: IO ()
28
29 #ifdef __GLASGOW_HASKELL__
30         ThreadId,
31
32         -- Forking and suchlike
33         myThreadId,             -- :: IO ThreadId
34         killThread,             -- :: ThreadId -> IO ()
35         throwTo,                -- :: ThreadId -> Exception -> IO ()
36
37         threadDelay,            -- :: Int -> IO ()
38         threadWaitRead,         -- :: Int -> IO ()
39         threadWaitWrite,        -- :: Int -> IO ()
40 #endif
41
42          -- merging of streams
43         mergeIO,                -- :: [a]   -> [a] -> IO [a]
44         nmergeIO                -- :: [[a]] -> IO [a]
45     ) where
46
47 import Prelude
48
49 import Control.Exception as Exception
50
51 #ifdef __GLASGOW_HASKELL__
52 import GHC.Conc
53 import GHC.TopHandler   ( reportStackOverflow, reportError )
54 import GHC.IOBase       ( IO(..) )
55 import GHC.IOBase       ( unsafeInterleaveIO )
56 import GHC.Base
57 #endif
58
59 #ifdef __HUGS__
60 import IOExts ( unsafeInterleaveIO )
61 import ConcBase
62 #endif
63
64 import Control.Concurrent.MVar
65 import Control.Concurrent.CVar
66 import Control.Concurrent.Chan
67 import Control.Concurrent.QSem
68 import Control.Concurrent.QSemN
69 import Control.Concurrent.SampleVar
70
71 -- Thread Ids, specifically the instances of Eq and Ord for these things.
72 -- The ThreadId type itself is defined in std/PrelConc.lhs.
73
74 -- Rather than define a new primitve, we use a little helper function
75 -- cmp_thread in the RTS.
76
77 #ifdef __GLASGOW_HASKELL__
78 foreign import ccall unsafe "cmp_thread" cmp_thread :: Addr# -> Addr# -> Int
79 -- Returns -1, 0, 1
80
81 cmpThread :: ThreadId -> ThreadId -> Ordering
82 cmpThread (ThreadId t1) (ThreadId t2) = 
83    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
84       -1 -> LT
85       0  -> EQ
86       _  -> GT -- must be 1
87
88 instance Eq ThreadId where
89    t1 == t2 = 
90       case t1 `cmpThread` t2 of
91          EQ -> True
92          _  -> False
93
94 instance Ord ThreadId where
95    compare = cmpThread
96
97 foreign import ccall unsafe "rts_getThreadId" getThreadId :: Addr# -> Int
98
99 instance Show ThreadId where
100    showsPrec d (ThreadId t) = 
101         showString "ThreadId " . 
102         showsPrec d (getThreadId (unsafeCoerce# t))
103
104 forkIO :: IO () -> IO ThreadId
105 forkIO action = IO $ \ s -> 
106    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
107  where
108   action_plus = Exception.catch action childHandler
109
110 childHandler :: Exception -> IO ()
111 childHandler err = Exception.catch (real_handler err) childHandler
112
113 real_handler :: Exception -> IO ()
114 real_handler ex =
115   case ex of
116         -- ignore thread GC and killThread exceptions:
117         BlockedOnDeadMVar            -> return ()
118         AsyncException ThreadKilled  -> return ()
119
120         -- report all others:
121         AsyncException StackOverflow -> reportStackOverflow False
122         ErrorCall s -> reportError False s
123         other       -> reportError False (showsPrec 0 other "\n")
124
125 #endif /* __GLASGOW_HASKELL__ */
126
127
128 max_buff_size :: Int
129 max_buff_size = 1
130
131 mergeIO :: [a] -> [a] -> IO [a]
132 nmergeIO :: [[a]] -> IO [a]
133
134 mergeIO ls rs
135  = newEmptyMVar                >>= \ tail_node ->
136    newMVar tail_node           >>= \ tail_list ->
137    newQSem max_buff_size       >>= \ e ->
138    newMVar 2                   >>= \ branches_running ->
139    let
140     buff = (tail_list,e)
141    in
142     forkIO (suckIO branches_running buff ls) >>
143     forkIO (suckIO branches_running buff rs) >>
144     takeMVar tail_node  >>= \ val ->
145     signalQSem e        >>
146     return val
147
148 type Buffer a 
149  = (MVar (MVar [a]), QSem)
150
151 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
152
153 suckIO branches_running buff@(tail_list,e) vs
154  = case vs of
155         [] -> takeMVar branches_running >>= \ val ->
156               if val == 1 then
157                  takeMVar tail_list     >>= \ node ->
158                  putMVar node []        >>
159                  putMVar tail_list node
160               else      
161                  putMVar branches_running (val-1)
162         (x:xs) ->
163                 waitQSem e                       >>
164                 takeMVar tail_list               >>= \ node ->
165                 newEmptyMVar                     >>= \ next_node ->
166                 unsafeInterleaveIO (
167                         takeMVar next_node  >>= \ y ->
168                         signalQSem e        >>
169                         return y)                >>= \ next_node_val ->
170                 putMVar node (x:next_node_val)   >>
171                 putMVar tail_list next_node      >>
172                 suckIO branches_running buff xs
173
174 nmergeIO lss
175  = let
176     len = length lss
177    in
178     newEmptyMVar          >>= \ tail_node ->
179     newMVar tail_node     >>= \ tail_list ->
180     newQSem max_buff_size >>= \ e ->
181     newMVar len           >>= \ branches_running ->
182     let
183      buff = (tail_list,e)
184     in
185     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
186     takeMVar tail_node  >>= \ val ->
187     signalQSem e        >>
188     return val
189   where
190     mapIO f xs = sequence (map f xs)