[project @ 1999-05-14 19:49:22 by sof]
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         , yield         -- :: IO ()
33
34         {-threadDelay, threadWaitRead, threadWaitWrite,-}
35
36         -- MVars
37         , MVar          -- abstract
38         , newMVar       -- :: a -> IO (MVar a)
39         , newEmptyMVar  -- :: IO (MVar a)
40         , takeMVar      -- :: MVar a -> IO a
41         , putMVar       -- :: MVar a -> a -> IO ()
42         , readMVar      -- :: MVar a -> IO a
43         , swapMVar      -- :: MVar a -> a -> IO a
44         , isEmptyMVar   -- :: MVar a -> IO Bool
45
46          -- merging of streams
47         , mergeIO       -- :: [a]   -> [a] -> IO [a]
48         , nmergeIO      -- :: [[a]] -> IO [a]
49     ) where
50
51 import Parallel
52 import ChannelVar
53 import Channel
54 import Semaphore
55 import SampleVar
56 import PrelConc
57 import PrelHandle       ( topHandler )
58 import PrelException
59 import PrelIOBase       ( IO(..) )
60 import IO
61 import PrelAddr         ( Addr )
62 import PrelArr          ( ByteArray )
63 import PrelPack         ( packString )
64 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
65 import PrelBase         ( fork# )
66
67 infixr 0 `fork`
68 \end{code}
69
70 \begin{code}
71 forkIO :: IO () -> IO ThreadId
72 forkIO action = IO $ \ s -> 
73    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
74  where
75   action_plus = 
76     catchException action 
77                    (topHandler False{-don't quit on exception raised-})
78
79 {-# INLINE fork #-}
80 fork :: a -> b -> b
81 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
82 \end{code}
83
84
85 \begin{code}
86 max_buff_size :: Int
87 max_buff_size = 1
88
89 mergeIO :: [a] -> [a] -> IO [a]
90 nmergeIO :: [[a]] -> IO [a]
91
92 mergeIO ls rs
93  = newEmptyMVar                >>= \ tail_node ->
94    newMVar tail_node           >>= \ tail_list ->
95    newQSem max_buff_size       >>= \ e ->
96    newMVar 2                   >>= \ branches_running ->
97    let
98     buff = (tail_list,e)
99    in
100     forkIO (suckIO branches_running buff ls) >>
101     forkIO (suckIO branches_running buff rs) >>
102     takeMVar tail_node  >>= \ val ->
103     signalQSem e        >>
104     return val
105
106 type Buffer a 
107  = (MVar (MVar [a]), QSem)
108
109 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
110
111 suckIO branches_running buff@(tail_list,e) vs
112  = case vs of
113         [] -> takeMVar branches_running >>= \ val ->
114               if val == 1 then
115                  takeMVar tail_list     >>= \ node ->
116                  putMVar node []        >>
117                  putMVar tail_list node
118               else      
119                  putMVar branches_running (val-1)
120         (x:xs) ->
121                 waitQSem e                       >>
122                 takeMVar tail_list               >>= \ node ->
123                 newEmptyMVar                     >>= \ next_node ->
124                 unsafeInterleaveIO (
125                         takeMVar next_node  >>= \ x ->
126                         signalQSem e        >>
127                         return x)                >>= \ next_node_val ->
128                 putMVar node (x:next_node_val)   >>
129                 putMVar tail_list next_node      >>
130                 suckIO branches_running buff xs
131
132 nmergeIO lss
133  = let
134     len = length lss
135    in
136     newEmptyMVar          >>= \ tail_node ->
137     newMVar tail_node     >>= \ tail_list ->
138     newQSem max_buff_size >>= \ e ->
139     newMVar len           >>= \ branches_running ->
140     let
141      buff = (tail_list,e)
142     in
143     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
144     takeMVar tail_node  >>= \ val ->
145     signalQSem e        >>
146     return val
147   where
148     mapIO f xs = sequence (map f xs)
149 \end{code}