13a2851e61e6be666cf719c3283c5be49242c282
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         , yield         -- :: IO ()
33
34         , threadDelay           -- :: Int -> IO ()
35         , threadWaitRead        -- :: Int -> IO ()
36         , threadWaitWrite       -- :: Int -> IO ()
37
38         -- MVars
39         , MVar          -- abstract
40         , newMVar       -- :: a -> IO (MVar a)
41         , newEmptyMVar  -- :: IO (MVar a)
42         , takeMVar      -- :: MVar a -> IO a
43         , putMVar       -- :: MVar a -> a -> IO ()
44         , readMVar      -- :: MVar a -> IO a
45         , swapMVar      -- :: MVar a -> a -> IO a
46         , isEmptyMVar   -- :: MVar a -> IO Bool
47
48          -- merging of streams
49         , mergeIO       -- :: [a]   -> [a] -> IO [a]
50         , nmergeIO      -- :: [[a]] -> IO [a]
51     ) where
52
53 import Parallel
54 import ChannelVar
55 import Channel
56 import Semaphore
57 import SampleVar
58 import PrelConc
59 import PrelException
60 import PrelIOBase       ( IO(..) )
61 import IO
62 import PrelAddr         ( Addr )
63 import PrelArr          ( ByteArray )
64 import PrelPack         ( packString )
65 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
66 import PrelBase         ( fork# )
67 import PrelGHC          ( Addr#, unsafeCoerce# )
68
69 infixr 0 `fork`
70 \end{code}
71
72 Thread Ids, specifically the instances of Eq and Ord for these things.
73 The ThreadId type itself is defined in std/PrelConc.lhs.
74
75 Rather than define a new primitve, we use a little helper function
76 cmp_thread in the RTS.
77
78 \begin{code}
79 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
80 -- Returns -1, 0, 1
81
82 cmpThread :: ThreadId -> ThreadId -> Ordering
83 cmpThread (ThreadId t1) (ThreadId t2) = 
84    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
85       -1 -> LT
86       0  -> EQ
87       1  -> GT
88
89 instance Eq ThreadId where
90    t1 == t2 = 
91       case t1 `cmpThread` t2 of
92          EQ -> True
93          _  -> False
94
95 instance Ord ThreadId where
96    compare = cmpThread
97 \end{code}
98
99 \begin{code}
100 forkIO :: IO () -> IO ThreadId
101 forkIO action = IO $ \ s -> 
102    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
103  where
104   action_plus = 
105     catchException action 
106                    (topHandler False{-don't quit on exception raised-})
107
108 {-# INLINE fork #-}
109 fork :: a -> b -> b
110 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
111 \end{code}
112
113
114 \begin{code}
115 max_buff_size :: Int
116 max_buff_size = 1
117
118 mergeIO :: [a] -> [a] -> IO [a]
119 nmergeIO :: [[a]] -> IO [a]
120
121 mergeIO ls rs
122  = newEmptyMVar                >>= \ tail_node ->
123    newMVar tail_node           >>= \ tail_list ->
124    newQSem max_buff_size       >>= \ e ->
125    newMVar 2                   >>= \ branches_running ->
126    let
127     buff = (tail_list,e)
128    in
129     forkIO (suckIO branches_running buff ls) >>
130     forkIO (suckIO branches_running buff rs) >>
131     takeMVar tail_node  >>= \ val ->
132     signalQSem e        >>
133     return val
134
135 type Buffer a 
136  = (MVar (MVar [a]), QSem)
137
138 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
139
140 suckIO branches_running buff@(tail_list,e) vs
141  = case vs of
142         [] -> takeMVar branches_running >>= \ val ->
143               if val == 1 then
144                  takeMVar tail_list     >>= \ node ->
145                  putMVar node []        >>
146                  putMVar tail_list node
147               else      
148                  putMVar branches_running (val-1)
149         (x:xs) ->
150                 waitQSem e                       >>
151                 takeMVar tail_list               >>= \ node ->
152                 newEmptyMVar                     >>= \ next_node ->
153                 unsafeInterleaveIO (
154                         takeMVar next_node  >>= \ x ->
155                         signalQSem e        >>
156                         return x)                >>= \ next_node_val ->
157                 putMVar node (x:next_node_val)   >>
158                 putMVar tail_list next_node      >>
159                 suckIO branches_running buff xs
160
161 nmergeIO lss
162  = let
163     len = length lss
164    in
165     newEmptyMVar          >>= \ tail_node ->
166     newMVar tail_node     >>= \ tail_list ->
167     newQSem max_buff_size >>= \ e ->
168     newMVar len           >>= \ branches_running ->
169     let
170      buff = (tail_list,e)
171     in
172     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
173     takeMVar tail_node  >>= \ val ->
174     signalQSem e        >>
175     return val
176   where
177     mapIO f xs = sequence (map f xs)
178 \end{code}