706f0e6dbec9bfeb517e19fadcb9453914fe2cbe
[ghc-hetmet.git] / ghc / lib / concurrent / Merge.lhs
1 %
2 % (c) The GRASP/AQUA Project, Glasgow University, 1995
3 %
4 \section[Merge]{Mergeing streams}
5
6 Avoiding the loss of ref. transparency by attaching the merge to the
7 IO monad.
8
9 \begin{code}
10 module Merge
11
12         (
13          mergeIO,       -- :: [a]   -> [a] -> IO [a]
14          nmergeIO       -- :: [[a]] -> IO [a]
15         ) where
16
17 import Semaphore
18 import PrelConc
19 import PrelIOBase
20
21 max_buff_size = 1
22
23 mergeIO :: [a] -> [a] -> IO [a]
24 nmergeIO :: [[a]] -> IO [a]
25
26 #ifndef __CONCURRENT_HASKELL__
27
28 mergeIO _ _  = return []
29 nmergeIO _   = return []
30
31 #else
32
33 mergeIO ls rs
34  = newEmptyMVar                >>= \ tail_node ->
35    newMVar tail_node           >>= \ tail_list ->
36    newQSem max_buff_size       >>= \ e ->
37    newMVar 2                   >>= \ branches_running ->
38    let
39     buff = (tail_list,e)
40    in
41     forkIO (suckIO branches_running buff ls) >>
42     forkIO (suckIO branches_running buff rs) >>
43     takeMVar tail_node  >>= \ val ->
44     signalQSem e        >>
45     return val
46
47 type Buffer a 
48  = (MVar (MVar [a]), QSem)
49
50 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
51
52 suckIO branches_running buff@(tail_list,e) vs
53  = case vs of
54         [] -> takeMVar branches_running >>= \ val ->
55               if val == 1 then
56                  takeMVar tail_list     >>= \ node ->
57                  putMVar node []        >>
58                  putMVar tail_list node
59               else      
60                  putMVar branches_running (val-1)
61         (x:xs) ->
62                 waitQSem e                       >>
63                 takeMVar tail_list               >>= \ node ->
64                 newEmptyMVar                     >>= \ next_node ->
65                 unsafeInterleaveIO (
66                         takeMVar next_node  >>= \ x ->
67                         signalQSem e        >>
68                         return x)                >>= \ next_node_val ->
69                 putMVar node (x:next_node_val)   >>
70                 putMVar tail_list next_node      >>
71                 suckIO branches_running buff xs
72
73 nmergeIO lss
74  = let
75     len = length lss
76    in
77     newEmptyMVar          >>= \ tail_node ->
78     newMVar tail_node     >>= \ tail_list ->
79     newQSem max_buff_size >>= \ e ->
80     newMVar len           >>= \ branches_running ->
81     let
82      buff = (tail_list,e)
83     in
84     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
85     takeMVar tail_node  >>= \ val ->
86     signalQSem e        >>
87     return val
88   where
89     mapIO f xs = accumulate (map f xs)
90
91 #endif {- __CONCURRENT_HASKELL__ -}
92 \end{code}