[project @ 1999-08-25 16:11:43 by simonmar]
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         , yield         -- :: IO ()
33
34         , threadDelay           -- :: Int -> IO ()
35         , threadWaitRead        -- :: Int -> IO ()
36         , threadWaitWrite       -- :: Int -> IO ()
37
38         -- MVars
39         , MVar          -- abstract
40         , newMVar       -- :: a -> IO (MVar a)
41         , newEmptyMVar  -- :: IO (MVar a)
42         , takeMVar      -- :: MVar a -> IO a
43         , putMVar       -- :: MVar a -> a -> IO ()
44         , readMVar      -- :: MVar a -> IO a
45         , swapMVar      -- :: MVar a -> a -> IO a
46         , isEmptyMVar   -- :: MVar a -> IO Bool
47
48          -- merging of streams
49         , mergeIO       -- :: [a]   -> [a] -> IO [a]
50         , nmergeIO      -- :: [[a]] -> IO [a]
51     ) where
52
53 import Parallel
54 import ChannelVar
55 import Channel
56 import Semaphore
57 import SampleVar
58 import PrelConc
59 import PrelHandle       ( topHandler, threadDelay, 
60                           threadWaitRead, threadWaitWrite )
61 import PrelException
62 import PrelIOBase       ( IO(..) )
63 import IO
64 import PrelAddr         ( Addr )
65 import PrelArr          ( ByteArray )
66 import PrelPack         ( packString )
67 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
68 import PrelBase         ( fork# )
69 import PrelGHC          ( Addr#, unsafeCoerce# )
70
71 infixr 0 `fork`
72 \end{code}
73
74 Thread Ids, specifically the instances of Eq and Ord for these things.
75 The ThreadId type itself is defined in std/PrelConc.lhs.
76
77 Rather than define a new primitve, we use a little helper function
78 cmp_thread in the RTS.
79
80 \begin{code}
81 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
82 -- Returns -1, 0, 1
83
84 cmpThread :: ThreadId -> ThreadId -> Ordering
85 cmpThread (ThreadId t1) (ThreadId t2) = 
86    case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
87       -1 -> LT
88       0  -> EQ
89       1  -> GT
90
91 instance Eq ThreadId where
92    t1 == t2 = 
93       case t1 `cmpThread` t2 of
94          EQ -> True
95          _  -> False
96
97 instance Ord ThreadId where
98    compare = cmpThread
99 \end{code}
100
101 \begin{code}
102 forkIO :: IO () -> IO ThreadId
103 forkIO action = IO $ \ s -> 
104    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
105  where
106   action_plus = 
107     catchException action 
108                    (topHandler False{-don't quit on exception raised-})
109
110 {-# INLINE fork #-}
111 fork :: a -> b -> b
112 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
113 \end{code}
114
115
116 \begin{code}
117 max_buff_size :: Int
118 max_buff_size = 1
119
120 mergeIO :: [a] -> [a] -> IO [a]
121 nmergeIO :: [[a]] -> IO [a]
122
123 mergeIO ls rs
124  = newEmptyMVar                >>= \ tail_node ->
125    newMVar tail_node           >>= \ tail_list ->
126    newQSem max_buff_size       >>= \ e ->
127    newMVar 2                   >>= \ branches_running ->
128    let
129     buff = (tail_list,e)
130    in
131     forkIO (suckIO branches_running buff ls) >>
132     forkIO (suckIO branches_running buff rs) >>
133     takeMVar tail_node  >>= \ val ->
134     signalQSem e        >>
135     return val
136
137 type Buffer a 
138  = (MVar (MVar [a]), QSem)
139
140 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
141
142 suckIO branches_running buff@(tail_list,e) vs
143  = case vs of
144         [] -> takeMVar branches_running >>= \ val ->
145               if val == 1 then
146                  takeMVar tail_list     >>= \ node ->
147                  putMVar node []        >>
148                  putMVar tail_list node
149               else      
150                  putMVar branches_running (val-1)
151         (x:xs) ->
152                 waitQSem e                       >>
153                 takeMVar tail_list               >>= \ node ->
154                 newEmptyMVar                     >>= \ next_node ->
155                 unsafeInterleaveIO (
156                         takeMVar next_node  >>= \ x ->
157                         signalQSem e        >>
158                         return x)                >>= \ next_node_val ->
159                 putMVar node (x:next_node_val)   >>
160                 putMVar tail_list next_node      >>
161                 suckIO branches_running buff xs
162
163 nmergeIO lss
164  = let
165     len = length lss
166    in
167     newEmptyMVar          >>= \ tail_node ->
168     newMVar tail_node     >>= \ tail_list ->
169     newQSem max_buff_size >>= \ e ->
170     newMVar len           >>= \ branches_running ->
171     let
172      buff = (tail_list,e)
173     in
174     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
175     takeMVar tail_node  >>= \ val ->
176     signalQSem e        >>
177     return val
178   where
179     mapIO f xs = sequence (map f xs)
180 \end{code}