[project @ 1996-07-19 18:36:04 by partain]
[ghc-hetmet.git] / ghc / lib / concurrent / Merge.hs
1 {-
2 %
3 % (c) The GRASP/AQUA Project, Glasgow University, 1995
4 %
5 \section[Merge]{Mergeing streams}
6
7 Avoiding the loss of ref. transparency by attaching the merge to the
8 IO monad.
9
10 \begin{code}
11 -}
12 module Merge
13
14         (
15          mergeIO,       --:: [a]   -> [a] -> IO [a]
16          nmergeIO       --:: [[a]] -> IO [a]
17         ) where
18
19 import Semaphore
20
21 import PreludeGlaST
22 import Concurrent       ( forkIO )
23
24 max_buff_size = 1
25
26 mergeIO :: [a] -> [a] -> IO [a]
27 nmergeIO :: [[a]] -> IO [a]
28
29 #ifndef __CONCURRENT_HASKELL__
30
31 mergeIO _ _  = return []
32 nmergeIO _   = return []
33
34 #else
35
36 mergeIO ls rs
37  = newEmptyMVar                >>= \ tail_node ->
38    newMVar tail_node           >>= \ tail_list ->
39    newQSem max_buff_size       >>= \ e ->
40    newMVar 2                   >>= \ branches_running ->
41    let
42     buff = (tail_list,e)
43    in
44     forkIO (suckIO branches_running buff ls) >>
45     forkIO (suckIO branches_running buff rs) >>
46     takeMVar tail_node  >>= \ val ->
47     signalQSem e        >>
48     return val
49
50 type Buffer a 
51  = (MVar (MVar [a]), QSem)
52
53 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
54
55 suckIO branches_running buff@(tail_list,e) vs
56  = case vs of
57         [] -> takeMVar branches_running >>= \ val ->
58               if val == 1 then
59                  takeMVar tail_list     >>= \ node ->
60                  putMVar node []        >>
61                  putMVar tail_list node
62               else      
63                  putMVar branches_running (val-1)
64         (x:xs) ->
65                 waitQSem e                       >>
66                 takeMVar tail_list               >>= \ node ->
67                 newEmptyMVar                     >>= \ next_node ->
68                 unsafeInterleavePrimIO (
69                         takeMVar next_node       `thenPrimIO` \ (Right x) ->
70                         signalQSem e             `seqPrimIO`
71                         returnPrimIO x)          `thenPrimIO` \ next_node_val ->
72                 putMVar node (x:next_node_val)   >>
73                 putMVar tail_list next_node      >>
74                 suckIO branches_running buff xs
75
76 nmergeIO lss
77  = let
78     len = length lss
79    in
80     newEmptyMVar          >>= \ tail_node ->
81     newMVar tail_node     >>= \ tail_list ->
82     newQSem max_buff_size >>= \ e ->
83     newMVar len           >>= \ branches_running ->
84     let
85      buff = (tail_list,e)
86     in
87     mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
88     takeMVar tail_node  >>= \ val ->
89     signalQSem e        >>
90     return val
91   where
92     mapIO f xs = accumulate (map f xs)
93
94 #endif {- __CONCURRENT_HASKELL__ -}