[project @ 1996-01-08 20:28:12 by partain]
[ghc-hetmet.git] / ghc / lib / prelude / Merge.lhs
1 %
2 % (c) The GRASP/AQUA Project, Glasgow University, 1995
3 %
4 \section[Merge]{Mergeing streams}
5
6 Avoiding the loss of ref. transparency by attaching the merge to the
7 IO monad.
8
9 \begin{code}
10 module Merge
11
12         (
13          mergeIO,       --:: [a]   -> [a] -> IO [a]
14          nmergeIO       --:: [[a]] -> IO [a]
15         ) where
16
17 import Semaphore
18
19 import PreludeGlaST
20 import Concurrent       ( forkIO )
21 import PreludePrimIO    ( newEmptyMVar, newMVar, putMVar,
22                           readMVar, takeMVar, _MVar
23                         )
24 \end{code}
25
26 \begin{code}
27
28 max_buff_size = 1
29
30 mergeIO :: [a] -> [a] -> IO [a]
31 nmergeIO :: [[a]] -> IO [a]
32
33 #ifndef __CONCURRENT_HASKELL__
34
35 mergeIO _ _  = return []
36 nmergeIO _   = return []
37
38 #else
39
40 mergeIO ls rs
41  = newEmptyMVar                >>= \ tail_node ->
42    newMVar tail_node           >>= \ tail_list ->
43    newQSem max_buff_size       >>= \ e ->
44    newMVar 2                   >>= \ branches_running ->
45    let
46     buff = (tail_list,e)
47    in
48     forkIO (suckIO branches_running buff ls) >>
49     forkIO (suckIO branches_running buff rs) >>
50     takeMVar tail_node  >>= \ val ->
51     signalQSem e        >>
52     return val
53
54 type Buffer a 
55  = (_MVar (_MVar [a]), QSem)
56
57 suckIO :: _MVar Int -> Buffer a -> [a] -> IO ()
58
59 suckIO branches_running buff@(tail_list,e) vs
60  = case vs of
61         [] -> takeMVar branches_running >>= \ val ->
62               if val == 1 then
63                  takeMVar tail_list     >>= \ node ->
64                  putMVar node []        >>
65                  putMVar tail_list node
66               else      
67                  putMVar branches_running (val-1)
68         (x:xs) ->
69                 waitQSem e                       >>
70                 takeMVar tail_list               >>= \ node ->
71                 newEmptyMVar                     >>= \ next_node ->
72                 unsafeInterleavePrimIO (
73                         takeMVar next_node       `thenPrimIO` \ (Right x) ->
74                         signalQSem e             `seqPrimIO`
75                         returnPrimIO x)          `thenPrimIO` \ next_node_val ->
76                 putMVar node (x:next_node_val)   >>
77                 putMVar tail_list next_node      >>
78                 suckIO branches_running buff xs
79
80 nmergeIO lss
81  = let
82     len = length lss
83    in
84     newEmptyMVar          >>= \ tail_node ->
85     newMVar tail_node     >>= \ tail_list ->
86     newQSem max_buff_size >>= \ e ->
87     newMVar len           >>= \ branches_running ->
88     let
89      buff = (tail_list,e)
90     in
91     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
92     takeMVar tail_node  >>= \ val ->
93     signalQSem e        >>
94     return val
95   where
96     mapIO f xs = accumulate (map f xs)
97 \end{code}
98
99 So as to avoid creating a mutual recursive module dep. with @Concurrent.lhs@,
100 the defn. of @forkIO@ is duplicated here:
101
102 \begin{code}
103 {- HAH! WDP 95/07
104
105 forkIO :: PrimIO a -> PrimIO a
106 forkIO action s
107  = let
108     (r, new_s) = action s
109    in
110     new_s `_fork_` (r, s)
111  where
112     _fork_ x y = case (fork# x) of { 0# -> parError#; _ -> y }
113 -}
114
115 #endif {- __CONCURRENT_HASKELL__ -}
116
117 \end{code}