[project @ 2001-06-28 14:15:04 by simonmar]
[ghc-base.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- 
3 -- Module      :  Control.Concurrent
4 -- Copyright   :  (c) The University of Glasgow 2001
5 -- License     :  BSD-style (see the file libraries/core/LICENSE)
6 -- 
7 -- Maintainer  :  libraries@haskell.org
8 -- Stability   :  experimental
9 -- Portability :  non-portable
10 --
11 -- $Id: Concurrent.hs,v 1.1 2001/06/28 14:15:01 simonmar Exp $
12 --
13 -- A common interface to a collection of useful concurrency
14 -- abstractions.
15 --
16 -----------------------------------------------------------------------------
17
18 module Control.Concurrent
19         ( module Control.Concurrent.Chan
20         , module Control.Concurrent.CVar
21         , module Control.Concurrent.MVar
22         , module Control.Concurrent.QSem
23         , module Control.Concurrent.QSemN
24         , module Control.Concurrent.SampleVar
25
26 #ifdef __HUGS__
27         , forkIO        -- :: IO () -> IO ()
28 #elif defined(__GLASGOW_HASKELL__)
29         , ThreadId
30
31         -- Forking and suchlike
32         , myThreadId    -- :: IO ThreadId
33         , killThread    -- :: ThreadId -> IO ()
34         , throwTo       -- :: ThreadId -> Exception -> IO ()
35 #endif
36         , par           -- :: a -> b -> b
37         , seq           -- :: a -> b -> b
38 #ifdef __GLASGOW_HASKELL__
39         , fork          -- :: a -> b -> b
40 #endif
41         , yield         -- :: IO ()
42
43 #ifdef __GLASGOW_HASKELL__
44         , threadDelay           -- :: Int -> IO ()
45         , threadWaitRead        -- :: Int -> IO ()
46         , threadWaitWrite       -- :: Int -> IO ()
47 #endif
48
49          -- merging of streams
50         , mergeIO       -- :: [a]   -> [a] -> IO [a]
51         , nmergeIO      -- :: [[a]] -> IO [a]
52     ) where
53
54 import Prelude
55
56 import Control.Exception as Exception
57
58 #ifdef __GLASGOW_HASKELL__
59 import GHC.Conc
60 import GHC.TopHandler   ( reportStackOverflow, reportError )
61 import GHC.IOBase       ( IO(..) )
62 import GHC.IOBase       ( unsafePerformIO , unsafeInterleaveIO )
63 import GHC.Base         ( fork# )
64 import GHC.Prim         ( Addr#, unsafeCoerce# )
65 #endif
66
67 #ifdef __HUGS__
68 import IOExts ( unsafeInterleaveIO, unsafePerformIO )
69 import ConcBase
70 #endif
71
72 import Control.Concurrent.MVar
73 import Control.Concurrent.CVar
74 import Control.Concurrent.Chan
75 import Control.Concurrent.QSem
76 import Control.Concurrent.QSemN
77 import Control.Concurrent.SampleVar
78
79 #ifdef __GLASGOW_HASKELL__
80 infixr 0 `fork`
81 #endif
82
83 -- Thread Ids, specifically the instances of Eq and Ord for these things.
84 -- The ThreadId type itself is defined in std/PrelConc.lhs.
85
86 -- Rather than define a new primitve, we use a little helper function
87 -- cmp_thread in the RTS.
88
89 #ifdef __GLASGOW_HASKELL__
90 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
91 -- Returns -1, 0, 1
92
93 cmpThread :: ThreadId -> ThreadId -> Ordering
94 cmpThread (ThreadId t1) (ThreadId t2) = 
95    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
96       -1 -> LT
97       0  -> EQ
98       _  -> GT -- must be 1
99
100 instance Eq ThreadId where
101    t1 == t2 = 
102       case t1 `cmpThread` t2 of
103          EQ -> True
104          _  -> False
105
106 instance Ord ThreadId where
107    compare = cmpThread
108
109 forkIO :: IO () -> IO ThreadId
110 forkIO action = IO $ \ s -> 
111    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
112  where
113   action_plus = Exception.catch action childHandler
114
115 childHandler :: Exception -> IO ()
116 childHandler err = Exception.catch (real_handler err) childHandler
117
118 real_handler :: Exception -> IO ()
119 real_handler ex =
120   case ex of
121         -- ignore thread GC and killThread exceptions:
122         BlockedOnDeadMVar            -> return ()
123         AsyncException ThreadKilled  -> return ()
124
125         -- report all others:
126         AsyncException StackOverflow -> reportStackOverflow False
127         ErrorCall s -> reportError False s
128         other       -> reportError False (showsPrec 0 other "\n")
129
130 {-# INLINE fork #-}
131 fork :: a -> b -> b
132 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
133
134 #endif /* __GLASGOW_HASKELL__ */
135
136
137 max_buff_size :: Int
138 max_buff_size = 1
139
140 mergeIO :: [a] -> [a] -> IO [a]
141 nmergeIO :: [[a]] -> IO [a]
142
143 mergeIO ls rs
144  = newEmptyMVar                >>= \ tail_node ->
145    newMVar tail_node           >>= \ tail_list ->
146    newQSem max_buff_size       >>= \ e ->
147    newMVar 2                   >>= \ branches_running ->
148    let
149     buff = (tail_list,e)
150    in
151     forkIO (suckIO branches_running buff ls) >>
152     forkIO (suckIO branches_running buff rs) >>
153     takeMVar tail_node  >>= \ val ->
154     signalQSem e        >>
155     return val
156
157 type Buffer a 
158  = (MVar (MVar [a]), QSem)
159
160 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
161
162 suckIO branches_running buff@(tail_list,e) vs
163  = case vs of
164         [] -> takeMVar branches_running >>= \ val ->
165               if val == 1 then
166                  takeMVar tail_list     >>= \ node ->
167                  putMVar node []        >>
168                  putMVar tail_list node
169               else      
170                  putMVar branches_running (val-1)
171         (x:xs) ->
172                 waitQSem e                       >>
173                 takeMVar tail_list               >>= \ node ->
174                 newEmptyMVar                     >>= \ next_node ->
175                 unsafeInterleaveIO (
176                         takeMVar next_node  >>= \ y ->
177                         signalQSem e        >>
178                         return y)                >>= \ next_node_val ->
179                 putMVar node (x:next_node_val)   >>
180                 putMVar tail_list next_node      >>
181                 suckIO branches_running buff xs
182
183 nmergeIO lss
184  = let
185     len = length lss
186    in
187     newEmptyMVar          >>= \ tail_node ->
188     newMVar tail_node     >>= \ tail_list ->
189     newQSem max_buff_size >>= \ e ->
190     newMVar len           >>= \ branches_running ->
191     let
192      buff = (tail_list,e)
193     in
194     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
195     takeMVar tail_node  >>= \ val ->
196     signalQSem e        >>
197     return val
198   where
199     mapIO f xs = sequence (map f xs)