[project @ 1999-11-26 16:26:32 by simonmar]
[ghc-hetmet.git] / ghc / lib / concurrent / Concurrent.lhs
index c715a1e..132922e 100644 (file)
@@ -17,16 +17,163 @@ module Concurrent (
        module ChannelVar,
        module Channel,
        module Semaphore,
-       module Merge,
-       module SampleVar,
-       module ConcBase
+       module SampleVar
+
+        , ThreadId
+
+       -- Forking and suchlike
+       , forkIO        -- :: IO () -> IO ThreadId
+       , myThreadId    -- :: IO ThreadId
+       , killThread    -- :: ThreadId -> IO ()
+       , raiseInThread -- :: ThreadId -> Exception -> IO ()
+       , par           -- :: a -> b -> b
+       , seq           -- :: a -> b -> b
+       , fork          -- :: a -> b -> b
+       , yield         -- :: IO ()
+
+       , threadDelay           -- :: Int -> IO ()
+       , threadWaitRead        -- :: Int -> IO ()
+       , threadWaitWrite       -- :: Int -> IO ()
+
+       -- MVars
+       , MVar          -- abstract
+       , newMVar       -- :: a -> IO (MVar a)
+       , newEmptyMVar  -- :: IO (MVar a)
+       , takeMVar      -- :: MVar a -> IO a
+       , putMVar       -- :: MVar a -> a -> IO ()
+       , readMVar      -- :: MVar a -> IO a
+       , swapMVar      -- :: MVar a -> a -> IO a
+       , isEmptyMVar   -- :: MVar a -> IO Bool
+
+        -- merging of streams
+       , mergeIO       -- :: [a]   -> [a] -> IO [a]
+       , nmergeIO      -- :: [[a]] -> IO [a]
     ) where
 
 import Parallel
 import ChannelVar
 import Channel
 import Semaphore
-import Merge
 import SampleVar
-import ConcBase
+import PrelConc
+import PrelHandle       ( topHandler )
+import PrelException
+import PrelIOBase      ( IO(..) )
+import IO
+import PrelAddr                ( Addr )
+import PrelArr         ( ByteArray )
+import PrelPack                ( packString )
+import PrelIOBase      ( unsafePerformIO , unsafeInterleaveIO )
+import PrelBase                ( fork# )
+import PrelGHC         ( Addr#, unsafeCoerce# )
+
+infixr 0 `fork`
+\end{code}
+
+Thread Ids, specifically the instances of Eq and Ord for these things.
+The ThreadId type itself is defined in std/PrelConc.lhs.
+
+Rather than define a new primitve, we use a little helper function
+cmp_thread in the RTS.
+
+\begin{code}
+foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
+-- Returns -1, 0, 1
+
+cmpThread :: ThreadId -> ThreadId -> Ordering
+cmpThread (ThreadId t1) (ThreadId t2) = 
+   case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
+      -1 -> LT
+      0  -> EQ
+      1  -> GT
+
+instance Eq ThreadId where
+   t1 == t2 = 
+      case t1 `cmpThread` t2 of
+         EQ -> True
+         _  -> False
+
+instance Ord ThreadId where
+   compare = cmpThread
+\end{code}
+
+\begin{code}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s -> 
+   case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+  action_plus = 
+    catchException action 
+                  (topHandler False{-don't quit on exception raised-})
+
+{-# INLINE fork #-}
+fork :: a -> b -> b
+fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
+\end{code}
+
+
+\begin{code}
+max_buff_size :: Int
+max_buff_size = 1
+
+mergeIO :: [a] -> [a] -> IO [a]
+nmergeIO :: [[a]] -> IO [a]
+
+mergeIO ls rs
+ = newEmptyMVar                       >>= \ tail_node ->
+   newMVar tail_node          >>= \ tail_list ->
+   newQSem max_buff_size       >>= \ e ->
+   newMVar 2                   >>= \ branches_running ->
+   let
+    buff = (tail_list,e)
+   in
+    forkIO (suckIO branches_running buff ls) >>
+    forkIO (suckIO branches_running buff rs) >>
+    takeMVar tail_node >>= \ val ->
+    signalQSem e       >>
+    return val
+
+type Buffer a 
+ = (MVar (MVar [a]), QSem)
+
+suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
+
+suckIO branches_running buff@(tail_list,e) vs
+ = case vs of
+       [] -> takeMVar branches_running >>= \ val ->
+             if val == 1 then
+                takeMVar tail_list     >>= \ node ->
+                putMVar node []        >>
+                putMVar tail_list node
+             else      
+                putMVar branches_running (val-1)
+       (x:xs) ->
+               waitQSem e                       >>
+               takeMVar tail_list               >>= \ node ->
+               newEmptyMVar                     >>= \ next_node ->
+               unsafeInterleaveIO (
+                       takeMVar next_node  >>= \ x ->
+                       signalQSem e        >>
+                       return x)                >>= \ next_node_val ->
+               putMVar node (x:next_node_val)   >>
+               putMVar tail_list next_node      >>
+               suckIO branches_running buff xs
+
+nmergeIO lss
+ = let
+    len = length lss
+   in
+    newEmptyMVar         >>= \ tail_node ->
+    newMVar tail_node    >>= \ tail_list ->
+    newQSem max_buff_size >>= \ e ->
+    newMVar len                  >>= \ branches_running ->
+    let
+     buff = (tail_list,e)
+    in
+    mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
+    takeMVar tail_node >>= \ val ->
+    signalQSem e       >>
+    return val
+  where
+    mapIO f xs = sequence (map f xs)
 \end{code}