[project @ 1999-04-27 17:44:26 by sof]
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
1 %
2 % (c) The AQUA Project, Glasgow University, 1994-1996
3 %
4
5 \section[Concurrent]{Concurrent Haskell constructs}
6
7 A common interface to a collection of useful concurrency abstractions.
8 Currently, the collection only contains the abstractions found in the
9 {\em Concurrent Haskell} paper (presented at the Haskell Workshop
10 1995, draft available via \tr{ftp} from
11 \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.)  plus a couple of
12 others. See the paper and the individual files containing the module
13 definitions for explanation on what they do.
14
15 \begin{code}
16 module Concurrent (
17         module ChannelVar,
18         module Channel,
19         module Semaphore,
20         module SampleVar
21
22         , ThreadId
23
24         -- Forking and suchlike
25         , forkIO        -- :: IO () -> IO ThreadId
26         , myThreadId    -- :: IO ThreadId
27         , killThread    -- :: ThreadId -> IO ()
28         , raiseInThread -- :: ThreadId -> Exception -> IO ()
29         , par           -- :: a -> b -> b
30         , seq           -- :: a -> b -> b
31         , fork          -- :: a -> b -> b
32         {-threadDelay, threadWaitRead, threadWaitWrite,-}
33
34         -- MVars
35         , MVar          -- abstract
36         , newMVar       -- :: a -> IO (MVar a)
37         , newEmptyMVar  -- :: IO (MVar a)
38         , takeMVar      -- :: MVar a -> IO a
39         , putMVar       -- :: MVar a -> a -> IO ()
40         , readMVar      -- :: MVar a -> IO a
41         , swapMVar      -- :: MVar a -> a -> IO a
42         , isEmptyMVar   -- :: MVar a -> IO Bool
43
44          -- merging of streams
45         , mergeIO       -- :: [a]   -> [a] -> IO [a]
46         , nmergeIO      -- :: [[a]] -> IO [a]
47     ) where
48
49 import Parallel
50 import ChannelVar
51 import Channel
52 import Semaphore
53 import SampleVar
54 import PrelConc
55 import PrelHandle       ( topHandler )
56 import PrelException
57 import PrelIOBase       ( IO(..) )
58 import IO
59 import PrelAddr         ( Addr )
60 import PrelArr          ( ByteArray )
61 import PrelPack         ( packString )
62 import PrelIOBase       ( unsafePerformIO , unsafeInterleaveIO )
63 import PrelBase         ( fork# )
64
65 infixr 0 `fork`
66 \end{code}
67
68 \begin{code}
69 forkIO :: IO () -> IO ThreadId
70 forkIO action = IO $ \ s -> 
71    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
72  where
73   action_plus = 
74     catchException action 
75                    (topHandler False{-don't quit on exception raised-})
76
77 {-# INLINE fork #-}
78 fork :: a -> b -> b
79 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
80 \end{code}
81
82
83 \begin{code}
84 max_buff_size :: Int
85 max_buff_size = 1
86
87 mergeIO :: [a] -> [a] -> IO [a]
88 nmergeIO :: [[a]] -> IO [a]
89
90 mergeIO ls rs
91  = newEmptyMVar                >>= \ tail_node ->
92    newMVar tail_node           >>= \ tail_list ->
93    newQSem max_buff_size       >>= \ e ->
94    newMVar 2                   >>= \ branches_running ->
95    let
96     buff = (tail_list,e)
97    in
98     forkIO (suckIO branches_running buff ls) >>
99     forkIO (suckIO branches_running buff rs) >>
100     takeMVar tail_node  >>= \ val ->
101     signalQSem e        >>
102     return val
103
104 type Buffer a 
105  = (MVar (MVar [a]), QSem)
106
107 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
108
109 suckIO branches_running buff@(tail_list,e) vs
110  = case vs of
111         [] -> takeMVar branches_running >>= \ val ->
112               if val == 1 then
113                  takeMVar tail_list     >>= \ node ->
114                  putMVar node []        >>
115                  putMVar tail_list node
116               else      
117                  putMVar branches_running (val-1)
118         (x:xs) ->
119                 waitQSem e                       >>
120                 takeMVar tail_list               >>= \ node ->
121                 newEmptyMVar                     >>= \ next_node ->
122                 unsafeInterleaveIO (
123                         takeMVar next_node  >>= \ x ->
124                         signalQSem e        >>
125                         return x)                >>= \ next_node_val ->
126                 putMVar node (x:next_node_val)   >>
127                 putMVar tail_list next_node      >>
128                 suckIO branches_running buff xs
129
130 nmergeIO lss
131  = let
132     len = length lss
133    in
134     newEmptyMVar          >>= \ tail_node ->
135     newMVar tail_node     >>= \ tail_list ->
136     newQSem max_buff_size >>= \ e ->
137     newMVar len           >>= \ branches_running ->
138     let
139      buff = (tail_list,e)
140     in
141     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
142     takeMVar tail_node  >>= \ val ->
143     signalQSem e        >>
144     return val
145   where
146     mapIO f xs = sequence (map f xs)
147 \end{code}