[project @ 1996-12-19 18:07:39 by simonpj]
[ghc-hetmet.git] / ghc / lib / concurrent / Merge.lhs
1 %
2 % (c) The GRASP/AQUA Project, Glasgow University, 1995
3 %
4 \section[Merge]{Mergeing streams}
5
6 Avoiding the loss of ref. transparency by attaching the merge to the
7 IO monad.
8
9 \begin{code}
10 module Merge
11
12         (
13          mergeIO,       --:: [a]   -> [a] -> IO [a]
14          nmergeIO       --:: [[a]] -> IO [a]
15         ) where
16
17 import Semaphore
18
19
20 max_buff_size = 1
21
22 mergeIO :: [a] -> [a] -> IO [a]
23 nmergeIO :: [[a]] -> IO [a]
24
25 #ifndef __CONCURRENT_HASKELL__
26
27 mergeIO _ _  = return []
28 nmergeIO _   = return []
29
30 #else
31
32 mergeIO ls rs
33  = newEmptyMVar                >>= \ tail_node ->
34    newMVar tail_node           >>= \ tail_list ->
35    newQSem max_buff_size       >>= \ e ->
36    newMVar 2                   >>= \ branches_running ->
37    let
38     buff = (tail_list,e)
39    in
40     forkIO (suckIO branches_running buff ls) >>
41     forkIO (suckIO branches_running buff rs) >>
42     takeMVar tail_node  >>= \ val ->
43     signalQSem e        >>
44     return val
45
46 type Buffer a 
47  = (MVar (MVar [a]), QSem)
48
49 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
50
51 suckIO branches_running buff@(tail_list,e) vs
52  = case vs of
53         [] -> takeMVar branches_running >>= \ val ->
54               if val == 1 then
55                  takeMVar tail_list     >>= \ node ->
56                  putMVar node []        >>
57                  putMVar tail_list node
58               else      
59                  putMVar branches_running (val-1)
60         (x:xs) ->
61                 waitQSem e                       >>
62                 takeMVar tail_list               >>= \ node ->
63                 newEmptyMVar                     >>= \ next_node ->
64                 unsafeInterleavePrimIO ( ioToPrimIO $
65                         takeMVar next_node  >>= \ x ->
66                         signalQSem e        >>
67                         return x)           `stThen` \ next_node_val ->
68                 putMVar node (x:next_node_val)   >>
69                 putMVar tail_list next_node      >>
70                 suckIO branches_running buff xs
71
72 nmergeIO lss
73  = let
74     len = length lss
75    in
76     newEmptyMVar          >>= \ tail_node ->
77     newMVar tail_node     >>= \ tail_list ->
78     newQSem max_buff_size >>= \ e ->
79     newMVar len           >>= \ branches_running ->
80     let
81      buff = (tail_list,e)
82     in
83     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
84     takeMVar tail_node  >>= \ val ->
85     signalQSem e        >>
86     return val
87   where
88     mapIO f xs = accumulate (map f xs)
89
90 #endif {- __CONCURRENT_HASKELL__ -}
91 \end{code}