module ChannelVar,
module Channel,
module Semaphore,
- module Merge,
- module SampleVar,
- module PrelConc
+ module SampleVar
+
+ , ThreadId
+
+ -- Forking and suchlike
+ , forkIO -- :: IO () -> IO ThreadId
+ , myThreadId -- :: IO ThreadId
+ , killThread -- :: ThreadId -> IO ()
+ , raiseInThread -- :: ThreadId -> Exception -> IO ()
+ , par -- :: a -> b -> b
+ , seq -- :: a -> b -> b
+ , fork -- :: a -> b -> b
+ {-threadDelay, threadWaitRead, threadWaitWrite,-}
+
+ -- MVars
+ , MVar -- abstract
+ , newMVar -- :: a -> IO (MVar a)
+ , newEmptyMVar -- :: IO (MVar a)
+ , takeMVar -- :: MVar a -> IO a
+ , putMVar -- :: MVar a -> a -> IO ()
+ , readMVar -- :: MVar a -> IO a
+ , swapMVar -- :: MVar a -> a -> IO a
+ , isEmptyMVar -- :: MVar a -> IO Bool
+
+ -- merging of streams
+ , mergeIO -- :: [a] -> [a] -> IO [a]
+ , nmergeIO -- :: [[a]] -> IO [a]
) where
import Parallel
import ChannelVar
import Channel
import Semaphore
-import Merge
import SampleVar
import PrelConc
+import PrelHandle ( topHandler )
+import PrelException
+import PrelIOBase ( IO(..) )
+import IO
+import PrelAddr ( Addr )
+import PrelArr ( ByteArray )
+import PrelPack ( packString )
+import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO )
+import PrelBase ( fork# )
+
+infixr 0 `fork`
+\end{code}
+
+\begin{code}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s ->
+ case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+ action_plus =
+ catchException action
+ (topHandler False{-don't quit on exception raised-})
+
+{-# INLINE fork #-}
+fork :: a -> b -> b
+fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
+\end{code}
+
+
+\begin{code}
+max_buff_size :: Int
+max_buff_size = 1
+
+mergeIO :: [a] -> [a] -> IO [a]
+nmergeIO :: [[a]] -> IO [a]
+
+mergeIO ls rs
+ = newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
+ newQSem max_buff_size >>= \ e ->
+ newMVar 2 >>= \ branches_running ->
+ let
+ buff = (tail_list,e)
+ in
+ forkIO (suckIO branches_running buff ls) >>
+ forkIO (suckIO branches_running buff rs) >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
+ return val
+
+type Buffer a
+ = (MVar (MVar [a]), QSem)
+
+suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
+
+suckIO branches_running buff@(tail_list,e) vs
+ = case vs of
+ [] -> takeMVar branches_running >>= \ val ->
+ if val == 1 then
+ takeMVar tail_list >>= \ node ->
+ putMVar node [] >>
+ putMVar tail_list node
+ else
+ putMVar branches_running (val-1)
+ (x:xs) ->
+ waitQSem e >>
+ takeMVar tail_list >>= \ node ->
+ newEmptyMVar >>= \ next_node ->
+ unsafeInterleaveIO (
+ takeMVar next_node >>= \ x ->
+ signalQSem e >>
+ return x) >>= \ next_node_val ->
+ putMVar node (x:next_node_val) >>
+ putMVar tail_list next_node >>
+ suckIO branches_running buff xs
+
+nmergeIO lss
+ = let
+ len = length lss
+ in
+ newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
+ newQSem max_buff_size >>= \ e ->
+ newMVar len >>= \ branches_running ->
+ let
+ buff = (tail_list,e)
+ in
+ mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
+ return val
+ where
+ mapIO f xs = sequence (map f xs)
\end{code}
Avoiding the loss of ref. transparency by attaching the merge to the
IO monad.
+(The ops. are now defined in Concurrent to avoid module loop trouble).
+
\begin{code}
module Merge
-
(
- mergeIO, -- :: [a] -> [a] -> IO [a]
- nmergeIO -- :: [[a]] -> IO [a]
+ merge
+ , nmergeIO
) where
-import Semaphore
-import PrelConc
-import PrelIOBase
-
-max_buff_size :: Int
-max_buff_size = 1
-
-mergeIO :: [a] -> [a] -> IO [a]
-nmergeIO :: [[a]] -> IO [a]
-
-mergeIO ls rs
- = newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
- newQSem max_buff_size >>= \ e ->
- newMVar 2 >>= \ branches_running ->
- let
- buff = (tail_list,e)
- in
- forkIO (suckIO branches_running buff ls) >>
- forkIO (suckIO branches_running buff rs) >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
- return val
-
-type Buffer a
- = (MVar (MVar [a]), QSem)
-
-suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
-
-suckIO branches_running buff@(tail_list,e) vs
- = case vs of
- [] -> takeMVar branches_running >>= \ val ->
- if val == 1 then
- takeMVar tail_list >>= \ node ->
- putMVar node [] >>
- putMVar tail_list node
- else
- putMVar branches_running (val-1)
- (x:xs) ->
- waitQSem e >>
- takeMVar tail_list >>= \ node ->
- newEmptyMVar >>= \ next_node ->
- unsafeInterleaveIO (
- takeMVar next_node >>= \ x ->
- signalQSem e >>
- return x) >>= \ next_node_val ->
- putMVar node (x:next_node_val) >>
- putMVar tail_list next_node >>
- suckIO branches_running buff xs
-
-nmergeIO lss
- = let
- len = length lss
- in
- newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
- newQSem max_buff_size >>= \ e ->
- newMVar len >>= \ branches_running ->
- let
- buff = (tail_list,e)
- in
- mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
- return val
- where
- mapIO f xs = sequence (map f xs)
+import Concurrent
\end{code}