de342c6e4328e555dce6def635a7ff0649e326c2
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         , yield         -- :: IO ()
33
34         {-threadDelay, threadWaitRead, threadWaitWrite,-}
35
36         -- MVars
37         , MVar          -- abstract
38         , newMVar       -- :: a -> IO (MVar a)
39         , newEmptyMVar  -- :: IO (MVar a)
40         , takeMVar      -- :: MVar a -> IO a
41         , putMVar       -- :: MVar a -> a -> IO ()
42         , readMVar      -- :: MVar a -> IO a
43         , swapMVar      -- :: MVar a -> a -> IO a
44         , isEmptyMVar   -- :: MVar a -> IO Bool
45
46          -- merging of streams
47         , mergeIO       -- :: [a]   -> [a] -> IO [a]
48         , nmergeIO      -- :: [[a]] -> IO [a]
49     ) where
50
51 import Parallel
52 import ChannelVar
53 import Channel
54 import Semaphore
55 import SampleVar
56 import PrelConc
57 import PrelHandle       ( topHandler )
58 import PrelException
59 import PrelIOBase       ( IO(..) )
60 import IO
61 import PrelAddr         ( Addr )
62 import PrelArr          ( ByteArray )
63 import PrelPack         ( packString )
64 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
65 import PrelBase         ( fork# )
66 import PrelGHC          ( Addr#, unsafeCoerce# )
67
68 infixr 0 `fork`
69 \end{code}
70
71 Thread Ids, specifically the instances of Eq and Ord for these things.
72 The ThreadId type itself is defined in std/PrelConc.lhs.
73
74 Rather than define a new primitve, we use a little helper function
75 cmp_thread in the RTS.
76
77 \begin{code}
78 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
79 -- Returns -1, 0, 1
80
81 cmpThread :: ThreadId -> ThreadId -> Ordering
82 cmpThread (ThreadId t1) (ThreadId t2) = 
83    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
84       -1 -> LT
85       0  -> EQ
86       1  -> GT
87
88 instance Eq ThreadId where
89    t1 == t2 = 
90       case t1 `cmpThread` t2 of
91          EQ -> True
92          _  -> False
93
94 instance Ord ThreadId where
95    compare = cmpThread
96 \end{code}
97
98 \begin{code}
99 forkIO :: IO () -> IO ThreadId
100 forkIO action = IO $ \ s -> 
101    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
102  where
103   action_plus = 
104     catchException action 
105                    (topHandler False{-don't quit on exception raised-})
106
107 {-# INLINE fork #-}
108 fork :: a -> b -> b
109 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
110 \end{code}
111
112
113 \begin{code}
114 max_buff_size :: Int
115 max_buff_size = 1
116
117 mergeIO :: [a] -> [a] -> IO [a]
118 nmergeIO :: [[a]] -> IO [a]
119
120 mergeIO ls rs
121  = newEmptyMVar                >>= \ tail_node ->
122    newMVar tail_node           >>= \ tail_list ->
123    newQSem max_buff_size       >>= \ e ->
124    newMVar 2                   >>= \ branches_running ->
125    let
126     buff = (tail_list,e)
127    in
128     forkIO (suckIO branches_running buff ls) >>
129     forkIO (suckIO branches_running buff rs) >>
130     takeMVar tail_node  >>= \ val ->
131     signalQSem e        >>
132     return val
133
134 type Buffer a 
135  = (MVar (MVar [a]), QSem)
136
137 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
138
139 suckIO branches_running buff@(tail_list,e) vs
140  = case vs of
141         [] -> takeMVar branches_running >>= \ val ->
142               if val == 1 then
143                  takeMVar tail_list     >>= \ node ->
144                  putMVar node []        >>
145                  putMVar tail_list node
146               else      
147                  putMVar branches_running (val-1)
148         (x:xs) ->
149                 waitQSem e                       >>
150                 takeMVar tail_list               >>= \ node ->
151                 newEmptyMVar                     >>= \ next_node ->
152                 unsafeInterleaveIO (
153                         takeMVar next_node  >>= \ x ->
154                         signalQSem e        >>
155                         return x)                >>= \ next_node_val ->
156                 putMVar node (x:next_node_val)   >>
157                 putMVar tail_list next_node      >>
158                 suckIO branches_running buff xs
159
160 nmergeIO lss
161  = let
162     len = length lss
163    in
164     newEmptyMVar          >>= \ tail_node ->
165     newMVar tail_node     >>= \ tail_list ->
166     newQSem max_buff_size >>= \ e ->
167     newMVar len           >>= \ branches_running ->
168     let
169      buff = (tail_list,e)
170     in
171     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
172     takeMVar tail_node  >>= \ val ->
173     signalQSem e        >>
174     return val
175   where
176     mapIO f xs = sequence (map f xs)
177 \end{code}