[project @ 1996-07-25 20:43:49 by partain]
[ghc-hetmet.git] / ghc / lib / concurrent / Merge.hs
1 {-
2 %
3 % (c) The GRASP/AQUA Project, Glasgow University, 1995
4 %
5 \section[Merge]{Mergeing streams}
6
7 Avoiding the loss of ref. transparency by attaching the merge to the
8 IO monad.
9
10 \begin{code}
11 -}
12 module Merge
13
14         (
15          mergeIO,       --:: [a]   -> [a] -> IO [a]
16          nmergeIO       --:: [[a]] -> IO [a]
17         ) where
18
19 import Semaphore
20
21 import GHCbase
22 import GHCio            ( stThen )
23 import Concurrent       ( forkIO )
24
25 max_buff_size = 1
26
27 mergeIO :: [a] -> [a] -> IO [a]
28 nmergeIO :: [[a]] -> IO [a]
29
30 #ifndef __CONCURRENT_HASKELL__
31
32 mergeIO _ _  = return []
33 nmergeIO _   = return []
34
35 #else
36
37 mergeIO ls rs
38  = newEmptyMVar                >>= \ tail_node ->
39    newMVar tail_node           >>= \ tail_list ->
40    newQSem max_buff_size       >>= \ e ->
41    newMVar 2                   >>= \ branches_running ->
42    let
43     buff = (tail_list,e)
44    in
45     forkIO (suckIO branches_running buff ls) >>
46     forkIO (suckIO branches_running buff rs) >>
47     takeMVar tail_node  >>= \ val ->
48     signalQSem e        >>
49     return val
50
51 type Buffer a 
52  = (MVar (MVar [a]), QSem)
53
54 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
55
56 suckIO branches_running buff@(tail_list,e) vs
57  = case vs of
58         [] -> takeMVar branches_running >>= \ val ->
59               if val == 1 then
60                  takeMVar tail_list     >>= \ node ->
61                  putMVar node []        >>
62                  putMVar tail_list node
63               else      
64                  putMVar branches_running (val-1)
65         (x:xs) ->
66                 waitQSem e                       >>
67                 takeMVar tail_list               >>= \ node ->
68                 newEmptyMVar                     >>= \ next_node ->
69                 unsafeInterleavePrimIO ( ioToPrimIO $
70                         takeMVar next_node  >>= \ x ->
71                         signalQSem e        >>
72                         return x)           `stThen` \ next_node_val ->
73                 putMVar node (x:next_node_val)   >>
74                 putMVar tail_list next_node      >>
75                 suckIO branches_running buff xs
76
77 nmergeIO lss
78  = let
79     len = length lss
80    in
81     newEmptyMVar          >>= \ tail_node ->
82     newMVar tail_node     >>= \ tail_list ->
83     newQSem max_buff_size >>= \ e ->
84     newMVar len           >>= \ branches_running ->
85     let
86      buff = (tail_list,e)
87     in
88     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
89     takeMVar tail_node  >>= \ val ->
90     signalQSem e        >>
91     return val
92   where
93     mapIO f xs = accumulate (map f xs)
94
95 #endif {- __CONCURRENT_HASKELL__ -}