2 % (c) The GRASP/AQUA Project, Glasgow University, 1995
4 \section[Merge]{Mergeing streams}
6 Avoiding the loss of ref. transparency by attaching the merge to the
13 mergeIO, --:: [a] -> [a] -> IO [a]
14 nmergeIO --:: [[a]] -> IO [a]
20 import Concurrent ( forkIO )
21 import PreludePrimIO ( newEmptyMVar, newMVar, putMVar,
22 readMVar, takeMVar, _MVar
30 mergeIO :: [a] -> [a] -> IO [a]
31 nmergeIO :: [[a]] -> IO [a]
33 #ifndef __CONCURRENT_HASKELL__
35 mergeIO _ _ = return []
36 nmergeIO _ = return []
41 = newEmptyMVar >>= \ tail_node ->
42 newMVar tail_node >>= \ tail_list ->
43 newQSem max_buff_size >>= \ e ->
44 newMVar 2 >>= \ branches_running ->
48 forkIO (suckIO branches_running buff ls) >>
49 forkIO (suckIO branches_running buff rs) >>
50 takeMVar tail_node >>= \ val ->
55 = (_MVar (_MVar [a]), QSem)
57 suckIO :: _MVar Int -> Buffer a -> [a] -> IO ()
59 suckIO branches_running buff@(tail_list,e) vs
61 [] -> takeMVar branches_running >>= \ val ->
63 takeMVar tail_list >>= \ node ->
65 putMVar tail_list node
67 putMVar branches_running (val-1)
70 takeMVar tail_list >>= \ node ->
71 newEmptyMVar >>= \ next_node ->
72 unsafeInterleavePrimIO (
73 takeMVar next_node `thenPrimIO` \ (Right x) ->
74 signalQSem e `seqPrimIO`
75 returnPrimIO x) `thenPrimIO` \ next_node_val ->
76 putMVar node (x:next_node_val) >>
77 putMVar tail_list next_node >>
78 suckIO branches_running buff xs
84 newEmptyMVar >>= \ tail_node ->
85 newMVar tail_node >>= \ tail_list ->
86 newQSem max_buff_size >>= \ e ->
87 newMVar len >>= \ branches_running ->
91 mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
92 takeMVar tail_node >>= \ val ->
96 mapIO f xs = accumulate (map f xs)
99 So as to avoid creating a mutual recursive module dep. with @Concurrent.lhs@,
100 the defn. of @forkIO@ is duplicated here:
105 forkIO :: PrimIO a -> PrimIO a
108 (r, new_s) = action s
110 new_s `_fork_` (r, s)
112 _fork_ x y = case (fork# x) of { 0# -> parError#; _ -> y }
115 #endif {- __CONCURRENT_HASKELL__ -}