docs: describe the changes to forkIO, and document forkOnIO
[ghc-base.git] / GHC / Conc.lhs
index 83a4df8..498b928 100644 (file)
@@ -28,6 +28,7 @@ module GHC.Conc
        -- * Forking and suchlike
        , forkIO        -- :: IO a -> IO ThreadId
        , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+        , numCapabilities -- :: Int
        , childHandler  -- :: Exception -> IO ()
        , myThreadId    -- :: IO ThreadId
        , killThread    -- :: ThreadId -> IO ()
@@ -79,7 +80,17 @@ module GHC.Conc
        , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
 #endif
 
+#ifndef mingw32_HOST_OS
+        , signalHandlerLock
+#endif
+
        , ensureIOManagerIsRunning
+
+#ifdef mingw32_HOST_OS
+        , ConsoleEvent(..)
+        , win32ConsoleHandler
+        , toWin32ConsoleEvent
+#endif
         ) where
 
 import System.Posix.Types
@@ -98,11 +109,15 @@ import Data.Maybe
 import GHC.Base
 import GHC.IOBase
 import GHC.Num         ( Num(..) )
-import GHC.Real                ( fromIntegral, quot )
+import GHC.Real                ( fromIntegral, div )
 #ifndef mingw32_HOST_OS
 import GHC.Base                ( Int(..) )
 #endif
-import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
+#ifdef mingw32_HOST_OS
+import GHC.Read         ( Read )
+import GHC.Enum         ( Enum )
+#endif
+import GHC.Exception
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
@@ -146,7 +161,7 @@ instance Show ThreadId where
        showString "ThreadId " . 
         showsPrec d (getThreadId (id2TSO t))
 
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
 
 id2TSO :: ThreadId -> ThreadId#
 id2TSO (ThreadId t) = t
@@ -171,12 +186,15 @@ instance Ord ThreadId where
    compare = cmpThread
 
 {- |
-This sparks off a new thread to run the 'IO' computation passed as the
+Sparks off a new thread to run the 'IO' computation passed as the
 first argument, and returns the 'ThreadId' of the newly created
 thread.
 
 The new thread will be a lightweight thread; if you want to use a foreign
-library that uses thread-local storage, use 'forkOS' instead.
+library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
+
+GHC note: the new thread inherits the /blocked/ state of the parent 
+(see 'Control.Exception.block').
 -}
 forkIO :: IO () -> IO ThreadId
 forkIO action = IO $ \ s -> 
@@ -184,12 +202,36 @@ forkIO action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
+{- |
+Like 'forkIO', but lets you specify on which CPU the thread is
+created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
+will stay on the same CPU for its entire lifetime (`forkIO` threads
+can migrate between CPUs according to the scheduling policy).
+`forkOnIO` is useful for overriding the scheduling policy when you
+know in advance how best to distribute the threads.
+
+The `Int` argument specifies the CPU number; it is interpreted modulo
+'numCapabilities' (note that it actually specifies a capability number
+rather than a CPU number, but to a first approximation the two are
+equivalent).
+-}
 forkOnIO :: Int -> IO () -> IO ThreadId
 forkOnIO (I# cpu) action = IO $ \ s -> 
    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
  where
   action_plus = catchException action childHandler
 
+-- | the value passed to the @+RTS -N@ flag.  This is the number of
+-- Haskell threads that can run truly simultaneously at any given
+-- time, and is typically set to the number of physical CPU cores on
+-- the machine.
+numCapabilities :: Int
+numCapabilities = unsafePerformIO $  do 
+                    n <- peek n_capabilities
+                    return (fromIntegral n)
+
+foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
+
 childHandler :: Exception -> IO ()
 childHandler err = catchException (real_handler err) childHandler
 
@@ -234,7 +276,7 @@ until the call has completed.  This is the case regardless of whether
 the call is inside a 'block' or not.
 
 Important note: the behaviour of 'throwTo' differs from that described in
-the paper "Asynchronous exceptions in Haskell" 
+the paper \"Asynchronous exceptions in Haskell\"
 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
 a more synchronous design in which 'throwTo' does not return until the exception
@@ -545,6 +587,15 @@ isEmptyMVar (MVar mv#) = IO $ \ s# ->
 addMVarFinalizer :: MVar a -> IO () -> IO ()
 addMVarFinalizer (MVar m) finalizer = 
   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
+
+withMVar :: MVar a -> (a -> IO b) -> IO b
+withMVar m io = 
+  block $ do
+    a <- takeMVar m
+    b <- catchException (unblock (io a))
+           (\e -> do putMVar m a; throw e)
+    putMVar m a
+    return b
 \end{code}
 
 
@@ -557,7 +608,7 @@ addMVarFinalizer (MVar m) finalizer =
 \begin{code}
 #ifdef mingw32_HOST_OS
 
--- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
+-- Note: threadWaitRead and threadWaitWrite aren't really functional
 -- on Win32, but left in there because lib code (still) uses them (the manner
 -- in which they're used doesn't cause problems on a Win32 platform though.)
 
@@ -622,10 +673,6 @@ threadWaitWrite fd
 -- | Suspends the current thread for a given number of microseconds
 -- (GHC only).
 --
--- Note that the resolution used by the Haskell runtime system's
--- internal timer is 1\/50 second, and 'threadDelay' will round its
--- argument up to the nearest multiple of this resolution.
---
 -- There is no guarantee that the thread will be rescheduled promptly
 -- when the delay has expired, but the thread will never continue to
 -- run /earlier/ than specified.
@@ -638,6 +685,10 @@ threadDelay time
        case delay# time# s of { s -> (# s, () #)
        }}
 
+
+-- | Set the value of returned TVar to True after a given number of
+-- microseconds. The caveats associated with threadDelay also apply.
+--
 registerDelay :: Int -> IO (TVar Bool)
 registerDelay usecs 
   | threaded = waitForDelayEventSTM usecs
@@ -648,7 +699,6 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 waitForDelayEvent :: Int -> IO ()
 waitForDelayEvent usecs = do
   m <- newEmptyMVar
-  now <- getTicksOfDay
   target <- calculateTarget usecs
   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
   prodServiceThread
@@ -658,23 +708,16 @@ waitForDelayEvent usecs = do
 waitForDelayEventSTM :: Int -> IO (TVar Bool)
 waitForDelayEventSTM usecs = do
    t <- atomically $ newTVar False
-   now <- getTicksOfDay
    target <- calculateTarget usecs
    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
    prodServiceThread
    return t  
     
-calculateTarget :: Int -> IO Int
+calculateTarget :: Int -> IO USecs
 calculateTarget usecs = do
-    now <- getTicksOfDay
-    let -- Convert usecs to ticks, rounding up as we must wait /at least/
-        -- as long as we are told
-        usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
-        target = now + 1 -- getTicksOfDay will have rounded down, but
-                         -- again we need to wait for /at least/ as long
-                         -- as we are told, so add 1 to it
-               + usecs'
-    return target
+    now <- getUSecOfDay
+    return $ now + (fromIntegral usecs)
+
 
 -- ----------------------------------------------------------------------------
 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
@@ -715,8 +758,8 @@ data IOReq
 #endif
 
 data DelayReq
-  = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
-  | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
+  = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+  | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
 #ifndef mingw32_HOST_OS
 pendingEvents :: IORef [IOReq]
@@ -744,20 +787,27 @@ insertDelay d1 ds@(d2 : rest)
   | delayTime d1 <= delayTime d2 = d1 : ds
   | otherwise                    = d2 : insertDelay d1 rest
 
+delayTime :: DelayReq -> USecs
 delayTime (Delay t _) = t
 delayTime (DelaySTM t _) = t
 
-type Ticks = Int
-tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-tick_msecs = 1000 `quot` tick_freq :: Int
+type USecs = Word64
 
 -- XXX: move into GHC.IOBase from Data.IORef?
 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
 
-foreign import ccall unsafe "getTicksOfDay" 
-  getTicksOfDay :: IO Ticks
+foreign import ccall unsafe "getUSecOfDay" 
+  getUSecOfDay :: IO USecs
+
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+  was_set <- atomicModifyIORef prodding (\a -> (True,a))
+  if (not (was_set)) then wakeupIOManager else return ()
 
 #ifdef mingw32_HOST_OS
 -- ----------------------------------------------------------------------------
@@ -778,7 +828,7 @@ service_loop wakeup old_delays = do
   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
   let  delays = foldr insertDelay old_delays new_delays
 
-  now <- getTicksOfDay
+  now <- getUSecOfDay
   (delays', timeout) <- getDelay now delays
 
   r <- c_WaitForSingleObject wakeup timeout
@@ -799,44 +849,54 @@ service_loop wakeup old_delays = do
     _other -> service_cont wakeup delays' -- probably timeout        
 
 service_cont wakeup delays = do
-  takeMVar prodding
-  putMVar prodding False
+  atomicModifyIORef prodding (\_ -> (False,False))
   service_loop wakeup delays
 
 -- must agree with rts/win32/ThrIOManager.c
 io_MANAGER_WAKEUP = 0xffffffff :: Word32
 io_MANAGER_DIE    = 0xfffffffe :: Word32
 
-start_console_handler :: Word32 -> IO ()
-start_console_handler r = do                   
-  stableptr <- peek console_handler
-  forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
-  return ()
+data ConsoleEvent
+ = ControlC
+ | Break
+ | Close
+    -- these are sent to Services only.
+ | Logoff
+ | Shutdown
+ deriving (Eq, Ord, Enum, Show, Read, Typeable)
 
-foreign import ccall "&console_handler" 
-   console_handler :: Ptr (StablePtr (CInt -> IO ()))
+start_console_handler :: Word32 -> IO ()
+start_console_handler r =
+  case toWin32ConsoleEvent r of
+     Just x  -> withMVar win32ConsoleHandler $ \handler -> do
+                    forkIO (handler x)
+                    return ()
+     Nothing -> return ()
+
+toWin32ConsoleEvent ev = 
+   case ev of
+       0 {- CTRL_C_EVENT-}        -> Just ControlC
+       1 {- CTRL_BREAK_EVENT-}    -> Just Break
+       2 {- CTRL_CLOSE_EVENT-}    -> Just Close
+       5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
+       6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
+       _ -> Nothing
+
+win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
+win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
 stick :: IORef HANDLE
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef nullPtr)
 
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
-  b <- takeMVar prodding
-  if (not b) 
-    then do hdl <- readIORef stick
-            c_sendIOManagerEvent io_MANAGER_WAKEUP
-    else return ()
-  putMVar prodding True
+wakeupIOManager = do 
+  hdl <- readIORef stick
+  c_sendIOManagerEvent io_MANAGER_WAKEUP
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 -- delays is kept ordered.
-getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
 getDelay now [] = return ([], iNFINITE)
 getDelay now all@(d : rest) 
   = case d of
@@ -847,9 +907,10 @@ getDelay now all@(d : rest)
        atomically $ writeTVar t True
        getDelay now rest
      _otherwise ->
-        return (all, (fromIntegral (delayTime d - now) * 
-                        fromIntegral tick_msecs))
-                        -- delay is in millisecs for WaitForSingleObject
+        -- delay is in millisecs for WaitForSingleObject
+        let micro_seconds = delayTime d - now
+            milli_seconds = (micro_seconds + 999) `div` 1000
+        in return (all, fromIntegral milli_seconds)
 
 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
 -- available yet.  We should move some Win32 functionality down here,
@@ -868,7 +929,7 @@ foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
   c_sendIOManagerEvent :: Word32 -> IO ()
 
-foreign import ccall unsafe "maperrno"             -- in runProcess.c
+foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
    c_maperrno :: IO ()
 
 foreign import stdcall "WaitForSingleObject"
@@ -922,10 +983,10 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
          -- check the current time and wake up any thread in
          -- threadDelay whose timeout has expired.  Also find the
          -- timeout value for the select() call.
-         now <- getTicksOfDay
+         now <- getUSecOfDay
          (delays', timeout) <- getDelay now ptimeval delays
 
-         res <- c_select ((max wakeup maxfd)+1) readfds writefds 
+         res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
                        nullPtr timeout
          if (res == -1)
             then do
@@ -956,15 +1017,16 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
                 case s of
                  _ | s == io_MANAGER_WAKEUP -> return False
                  _ | s == io_MANAGER_DIE    -> return True
-                 _ -> do handler_tbl <- peek handlers
+                 _ -> withMVar signalHandlerLock $ \_ -> do
+                          handler_tbl <- peek handlers
                          sp <- peekElemOff handler_tbl (fromIntegral s)
-                         forkIO (do io <- deRefStablePtr sp; io)
+                          io <- deRefStablePtr sp
+                         forkIO io
                          return False
 
   if exit then return () else do
 
-  takeMVar prodding
-  putMVar prodding False
+  atomicModifyIORef prodding (\_ -> (False,False))
 
   reqs' <- if wakeup_all then do wakeupAll reqs; return []
                         else completeRequests reqs readfds writefds []
@@ -978,19 +1040,17 @@ stick :: IORef Fd
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef 0)
 
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
+wakeupIOManager :: IO ()
+wakeupIOManager = do
+  fd <- readIORef stick
+  with io_MANAGER_WAKEUP $ \pbuf -> do 
+    c_write (fromIntegral fd) pbuf 1; return ()
 
-prodServiceThread :: IO ()
-prodServiceThread = do
-  b <- takeMVar prodding
-  if (not b) 
-    then do fd <- readIORef stick
-           with io_MANAGER_WAKEUP $ \pbuf -> do 
-               c_write (fromIntegral fd) pbuf 1; return ()
-    else return ()
-  putMVar prodding True
+-- Lock used to protect concurrent access to signal_handlers.  Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+signalHandlerLock :: MVar ()
+signalHandlerLock = unsafePerformIO (newMVar ())
 
 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
 
@@ -1048,7 +1108,7 @@ waitForWriteEvent fd = do
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 -- delays is kept ordered.
-getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
+getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
 getDelay now ptimeval [] = return ([],nullPtr)
 getDelay now ptimeval all@(d : rest) 
   = case d of
@@ -1068,7 +1128,7 @@ foreign import ccall unsafe "sizeofTimeVal"
   sizeofTimeVal :: Int
 
 foreign import ccall unsafe "setTimevalTicks" 
-  setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+  setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
 
 {- 
   On Win32 we're going to have a single Pipe, and a
@@ -1084,20 +1144,32 @@ foreign import ccall unsafe "setTimevalTicks"
 newtype CFdSet = CFdSet ()
 
 foreign import ccall safe "select"
-  c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+  c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
 foreign import ccall unsafe "hsFD_SETSIZE"
-  fD_SETSIZE :: Fd
+  c_fD_SETSIZE :: CInt
+
+fD_SETSIZE :: Fd
+fD_SETSIZE = fromIntegral c_fD_SETSIZE
 
 foreign import ccall unsafe "hsFD_CLR"
-  fdClr :: Fd -> Ptr CFdSet -> IO ()
+  c_fdClr :: CInt -> Ptr CFdSet -> IO ()
+
+fdClr :: Fd -> Ptr CFdSet -> IO ()
+fdClr (Fd fd) fdset = c_fdClr fd fdset
 
 foreign import ccall unsafe "hsFD_ISSET"
-  fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+  c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
+
+fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
 
 foreign import ccall unsafe "hsFD_SET"
-  fdSet :: Fd -> Ptr CFdSet -> IO ()
+  c_fdSet :: CInt -> Ptr CFdSet -> IO ()
+
+fdSet :: Fd -> Ptr CFdSet -> IO ()
+fdSet (Fd fd) fdset = c_fdSet fd fdset
 
 foreign import ccall unsafe "hsFD_ZERO"
   fdZero :: Ptr CFdSet -> IO ()