[project @ 1998-02-02 17:27:26 by simonm]
[ghc-hetmet.git] / ghc / lib / concurrent / Merge.lhs
1 %
2 % (c) The GRASP/AQUA Project, Glasgow University, 1995
3 %
4 \section[Merge]{Mergeing streams}
5
6 Avoiding the loss of ref. transparency by attaching the merge to the
7 IO monad.
8
9 \begin{code}
10 module Merge
11
12         (
13          mergeIO,       --:: [a]   -> [a] -> IO [a]
14          nmergeIO       --:: [[a]] -> IO [a]
15         ) where
16
17 import Semaphore
18 import PrelConc
19 import PrelUnsafe  ( unsafeInterleaveIO )
20 import PrelIOBase
21
22 max_buff_size = 1
23
24 mergeIO :: [a] -> [a] -> IO [a]
25 nmergeIO :: [[a]] -> IO [a]
26
27 #ifndef __CONCURRENT_HASKELL__
28
29 mergeIO _ _  = return []
30 nmergeIO _   = return []
31
32 #else
33
34 mergeIO ls rs
35  = newEmptyMVar                >>= \ tail_node ->
36    newMVar tail_node           >>= \ tail_list ->
37    newQSem max_buff_size       >>= \ e ->
38    newMVar 2                   >>= \ branches_running ->
39    let
40     buff = (tail_list,e)
41    in
42     forkIO (suckIO branches_running buff ls) >>
43     forkIO (suckIO branches_running buff rs) >>
44     takeMVar tail_node  >>= \ val ->
45     signalQSem e        >>
46     return val
47
48 type Buffer a 
49  = (MVar (MVar [a]), QSem)
50
51 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
52
53 suckIO branches_running buff@(tail_list,e) vs
54  = case vs of
55         [] -> takeMVar branches_running >>= \ val ->
56               if val == 1 then
57                  takeMVar tail_list     >>= \ node ->
58                  putMVar node []        >>
59                  putMVar tail_list node
60               else      
61                  putMVar branches_running (val-1)
62         (x:xs) ->
63                 waitQSem e                       >>
64                 takeMVar tail_list               >>= \ node ->
65                 newEmptyMVar                     >>= \ next_node ->
66                 unsafeInterleaveIO (
67                         takeMVar next_node  >>= \ x ->
68                         signalQSem e        >>
69                         return x)                >>= \ next_node_val ->
70                 putMVar node (x:next_node_val)   >>
71                 putMVar tail_list next_node      >>
72                 suckIO branches_running buff xs
73
74 nmergeIO lss
75  = let
76     len = length lss
77    in
78     newEmptyMVar          >>= \ tail_node ->
79     newMVar tail_node     >>= \ tail_list ->
80     newQSem max_buff_size >>= \ e ->
81     newMVar len           >>= \ branches_running ->
82     let
83      buff = (tail_list,e)
84     in
85     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
86     takeMVar tail_node  >>= \ val ->
87     signalQSem e        >>
88     return val
89   where
90     mapIO f xs = accumulate (map f xs)
91
92 #endif {- __CONCURRENT_HASKELL__ -}
93 \end{code}