[project @ 1999-11-26 16:26:32 by simonmar]
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         , yield         -- :: IO ()
33
34         , threadDelay           -- :: Int -> IO ()
35         , threadWaitRead        -- :: Int -> IO ()
36         , threadWaitWrite       -- :: Int -> IO ()
37
38         -- MVars
39         , MVar          -- abstract
40         , newMVar       -- :: a -> IO (MVar a)
41         , newEmptyMVar  -- :: IO (MVar a)
42         , takeMVar      -- :: MVar a -> IO a
43         , putMVar       -- :: MVar a -> a -> IO ()
44         , readMVar      -- :: MVar a -> IO a
45         , swapMVar      -- :: MVar a -> a -> IO a
46         , isEmptyMVar   -- :: MVar a -> IO Bool
47
48          -- merging of streams
49         , mergeIO       -- :: [a]   -> [a] -> IO [a]
50         , nmergeIO      -- :: [[a]] -> IO [a]
51     ) where
52
53 import Parallel
54 import ChannelVar
55 import Channel
56 import Semaphore
57 import SampleVar
58 import PrelConc
59 import PrelHandle       ( topHandler )
60 import PrelException
61 import PrelIOBase       ( IO(..) )
62 import IO
63 import PrelAddr         ( Addr )
64 import PrelArr          ( ByteArray )
65 import PrelPack         ( packString )
66 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
67 import PrelBase         ( fork# )
68 import PrelGHC          ( Addr#, unsafeCoerce# )
69
70 infixr 0 `fork`
71 \end{code}
72
73 Thread Ids, specifically the instances of Eq and Ord for these things.
74 The ThreadId type itself is defined in std/PrelConc.lhs.
75
76 Rather than define a new primitve, we use a little helper function
77 cmp_thread in the RTS.
78
79 \begin{code}
80 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
81 -- Returns -1, 0, 1
82
83 cmpThread :: ThreadId -> ThreadId -> Ordering
84 cmpThread (ThreadId t1) (ThreadId t2) = 
85    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
86       -1 -> LT
87       0  -> EQ
88       1  -> GT
89
90 instance Eq ThreadId where
91    t1 == t2 = 
92       case t1 `cmpThread` t2 of
93          EQ -> True
94          _  -> False
95
96 instance Ord ThreadId where
97    compare = cmpThread
98 \end{code}
99
100 \begin{code}
101 forkIO :: IO () -> IO ThreadId
102 forkIO action = IO $ \ s -> 
103    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
104  where
105   action_plus = 
106     catchException action 
107                    (topHandler False{-don't quit on exception raised-})
108
109 {-# INLINE fork #-}
110 fork :: a -> b -> b
111 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
112 \end{code}
113
114
115 \begin{code}
116 max_buff_size :: Int
117 max_buff_size = 1
118
119 mergeIO :: [a] -> [a] -> IO [a]
120 nmergeIO :: [[a]] -> IO [a]
121
122 mergeIO ls rs
123  = newEmptyMVar                >>= \ tail_node ->
124    newMVar tail_node           >>= \ tail_list ->
125    newQSem max_buff_size       >>= \ e ->
126    newMVar 2                   >>= \ branches_running ->
127    let
128     buff = (tail_list,e)
129    in
130     forkIO (suckIO branches_running buff ls) >>
131     forkIO (suckIO branches_running buff rs) >>
132     takeMVar tail_node  >>= \ val ->
133     signalQSem e        >>
134     return val
135
136 type Buffer a 
137  = (MVar (MVar [a]), QSem)
138
139 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
140
141 suckIO branches_running buff@(tail_list,e) vs
142  = case vs of
143         [] -> takeMVar branches_running >>= \ val ->
144               if val == 1 then
145                  takeMVar tail_list     >>= \ node ->
146                  putMVar node []        >>
147                  putMVar tail_list node
148               else      
149                  putMVar branches_running (val-1)
150         (x:xs) ->
151                 waitQSem e                       >>
152                 takeMVar tail_list               >>= \ node ->
153                 newEmptyMVar                     >>= \ next_node ->
154                 unsafeInterleaveIO (
155                         takeMVar next_node  >>= \ x ->
156                         signalQSem e        >>
157                         return x)                >>= \ next_node_val ->
158                 putMVar node (x:next_node_val)   >>
159                 putMVar tail_list next_node      >>
160                 suckIO branches_running buff xs
161
162 nmergeIO lss
163  = let
164     len = length lss
165    in
166     newEmptyMVar          >>= \ tail_node ->
167     newMVar tail_node     >>= \ tail_list ->
168     newQSem max_buff_size >>= \ e ->
169     newMVar len           >>= \ branches_running ->
170     let
171      buff = (tail_list,e)
172     in
173     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
174     takeMVar tail_node  >>= \ val ->
175     signalQSem e        >>
176     return val
177   where
178     mapIO f xs = sequence (map f xs)
179 \end{code}