[project @ 2005-02-03 10:32:11 by ross]
[ghc-base.git] / GHC / Conc.lhs
index e3bfae2..6dbe991 100644 (file)
@@ -1,5 +1,5 @@
 \begin{code}
-{-# OPTIONS -fno-implicit-prelude #-}
+{-# OPTIONS_GHC -fno-implicit-prelude #-}
 -----------------------------------------------------------------------------
 -- |
 -- Module      :  GHC.Conc
@@ -14,7 +14,7 @@
 -- 
 -----------------------------------------------------------------------------
 
-#include "ghcconfig.h"
+-- #hide
 module GHC.Conc
        ( ThreadId(..)
 
@@ -43,7 +43,19 @@ module GHC.Conc
        , isEmptyMVar   -- :: MVar a -> IO Bool
        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
 
-#ifdef mingw32_TARGET_OS
+       -- TVars
+       , STM           -- abstract
+       , atomically    -- :: STM a -> IO a
+       , retry         -- :: STM a
+       , orElse        -- :: STM a -> STM a -> STM a
+        , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
+       , TVar          -- abstract
+       , newTVar       -- :: a -> STM (TVar a)
+       , readTVar      -- :: TVar a -> STM a
+       , writeTVar     -- :: a -> TVar a -> STM ()
+       , unsafeIOToSTM -- :: IO a -> STM a
+
+#ifdef mingw32_HOST_OS
        , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
        , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
        , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
@@ -69,6 +81,7 @@ import GHC.Exception    ( Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
+import Data.Typeable
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -80,7 +93,7 @@ infixr 0 `par`, `pseq`
 %************************************************************************
 
 \begin{code}
-data ThreadId = ThreadId ThreadId#
+data ThreadId = ThreadId ThreadId# deriving( Typeable )
 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
 -- But since ThreadId# is unlifted, the Weak type must use open
 -- type variables.
@@ -179,6 +192,98 @@ par :: a -> b -> b
 par  x y = case (par# x) of { _ -> lazy y }
 \end{code}
 
+
+%************************************************************************
+%*                                                                     *
+\subsection[stm]{Transactional heap operations}
+%*                                                                     *
+%************************************************************************
+
+TVars are shared memory locations which support atomic memory
+transactions.
+
+\begin{code}
+newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
+
+unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
+unSTM (STM a) = a
+
+instance  Functor STM where
+   fmap f x = x >>= (return . f)
+
+instance  Monad STM  where
+    {-# INLINE return #-}
+    {-# INLINE (>>)   #-}
+    {-# INLINE (>>=)  #-}
+    m >> k      = thenSTM m k
+    return x   = returnSTM x
+    m >>= k     = bindSTM m k
+
+bindSTM :: STM a -> (a -> STM b) -> STM b
+bindSTM (STM m) k = STM ( \s ->
+  case m s of 
+    (# new_s, a #) -> unSTM (k a) new_s
+  )
+
+thenSTM :: STM a -> STM b -> STM b
+thenSTM (STM m) k = STM ( \s ->
+  case m s of 
+    (# new_s, a #) -> unSTM k new_s
+  )
+
+returnSTM :: a -> STM a
+returnSTM x = STM (\s -> (# s, x #))
+
+-- | Unsafely performs IO in the STM monad.
+unsafeIOToSTM :: IO a -> STM a
+unsafeIOToSTM (IO m) = STM m
+
+-- |Perform a series of STM actions atomically.
+atomically :: STM a -> IO a
+atomically (STM m) = IO (\s -> (atomically# m) s )
+
+-- |Retry execution of the current memory transaction because it has seen
+-- values in TVars which mean that it should not continue (e.g. the TVars
+-- represent a shared buffer that is now empty).  The implementation may
+-- block the thread until one of the TVars that it has read from has been
+-- udpated.
+retry :: STM a
+retry = STM $ \s# -> retry# s#
+
+-- |Compose two alternative STM actions.  If the first action completes without
+-- retrying then it forms the result of the orElse.  Otherwise, if the first
+-- action retries, then the second action is tried in its place.  If both actions
+-- retry then the orElse as a whole retries.
+orElse :: STM a -> STM a -> STM a
+orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
+
+-- |Exception handling within STM actions.
+catchSTM :: STM a -> (Exception -> STM a) -> STM a
+catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
+
+data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
+
+instance Eq (TVar a) where
+       (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
+
+-- |Create a new TVar holding a value supplied
+newTVar :: a -> STM (TVar a)
+newTVar val = STM $ \s1# ->
+    case newTVar# val s1# of
+        (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+
+-- |Return the current value stored in a TVar
+readTVar :: TVar a -> STM a
+readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
+
+-- |Write the supplied value into a TVar
+writeTVar :: TVar a -> a -> STM ()
+writeTVar (TVar tvar#) val = STM $ \s1# ->
+    case writeTVar# tvar# val s1# of
+        s2# -> (# s2#, () #)
+  
+\end{code}
+
 %************************************************************************
 %*                                                                     *
 \subsection[mvars]{M-Structures}
@@ -273,20 +378,20 @@ addMVarFinalizer (MVar m) finalizer =
 %************************************************************************
 
 \begin{code}
-#ifdef mingw32_TARGET_OS
+#ifdef mingw32_HOST_OS
 
 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
 -- on Win32, but left in there because lib code (still) uses them (the manner
 -- in which they're used doesn't cause problems on a Win32 platform though.)
 
 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) = 
-  IO $ \s -> case asyncRead# fd isSock len buf s  of 
+asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
+  IO $ \s -> case asyncRead# fd isSock len buf s of 
               (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
 
 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) = 
-  IO $ \s -> case asyncWrite# fd isSock len buf s  of 
+asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
+  IO $ \s -> case asyncWrite# fd isSock len buf s of 
               (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
 
 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
@@ -317,7 +422,7 @@ asyncWriteBA fd isSock len off bufB =
 -- given file descriptor (GHC only).
 threadWaitRead :: Fd -> IO ()
 threadWaitRead fd
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForReadEvent fd
 #endif
   | otherwise = IO $ \s -> 
@@ -329,7 +434,7 @@ threadWaitRead fd
 -- given file descriptor (GHC only).
 threadWaitWrite :: Fd -> IO ()
 threadWaitWrite fd
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForWriteEvent fd
 #endif
   | otherwise = IO $ \s -> 
@@ -350,7 +455,7 @@ threadWaitWrite fd
 --
 threadDelay :: Int -> IO ()
 threadDelay time
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForDelayEvent time
 #else
   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
@@ -361,7 +466,7 @@ threadDelay time
        }}
 
 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
-#ifdef mingw32_TARGET_OS
+#ifdef mingw32_HOST_OS
 foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO ()
 #endif
 
@@ -399,7 +504,7 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 --     - forkProcess will kill the IO manager thread.  Let's just
 --       hope we don't need to do any blocking IO between fork & exec.
 
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
 
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
@@ -483,10 +588,10 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
   res <- do_select
   -- ToDo: check result
 
-  old <- atomicModifyIORef prodding (\old -> (False,old))
-  if old 
-       then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return ()
-       else return ()
+  b <- takeMVar prodding
+  if b then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return ()
+       else return ()
+  putMVar prodding False
 
   reqs' <- completeRequests reqs readfds writefds []
   service_loop wakeup readfds writefds ptimeval reqs' delays'
@@ -495,19 +600,18 @@ stick :: IORef Fd
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef 0)
 
-prodding :: IORef Bool
+prodding :: MVar Bool
 {-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newIORef False)
+prodding = unsafePerformIO (newMVar False)
 
 prodServiceThread :: IO ()
 prodServiceThread = do
-  b <- atomicModifyIORef prodding (\old -> (True,old)) -- compare & swap!
-  if (not b)
-       then do
-         fd <- readIORef stick
-         with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
-       else
-         return ()
+  b <- takeMVar prodding
+  if (not b) 
+    then do fd <- readIORef stick
+           with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
+    else return ()
+  putMVar prodding True
 
 -- -----------------------------------------------------------------------------
 -- IO requests