Better error handling in the IO manager thread
[haskell-directory.git] / GHC / Conc.lhs
index 233a686..11d78b8 100644 (file)
@@ -24,6 +24,9 @@ module GHC.Conc
        ( ThreadId(..)
 
        -- Forking and suchlike
+       , forkIO        -- :: IO a -> IO ThreadId
+       , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+       , childHandler  -- :: Exception -> IO ()
        , myThreadId    -- :: IO ThreadId
        , killThread    -- :: ThreadId -> IO ()
        , throwTo       -- :: ThreadId -> Exception -> IO ()
@@ -57,6 +60,7 @@ module GHC.Conc
         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
        , TVar          -- abstract
        , newTVar       -- :: a -> STM (TVar a)
+       , newTVarIO     -- :: a -> STM (TVar a)
        , readTVar      -- :: TVar a -> STM a
        , writeTVar     -- :: a -> TVar a -> STM ()
        , unsafeIOToSTM -- :: IO a -> STM a
@@ -80,6 +84,10 @@ import System.Posix.Internals
 import Foreign
 import Foreign.C
 
+#ifndef __HADDOCK__
+import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
+#endif
+
 import Data.Maybe
 
 import GHC.Base
@@ -87,7 +95,7 @@ import GHC.IOBase
 import GHC.Num         ( Num(..) )
 import GHC.Real                ( fromIntegral, quot )
 import GHC.Base                ( Int(..) )
-import GHC.Exception    ( Exception(..), AsyncException(..) )
+import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
@@ -125,7 +133,40 @@ This misfeature will hopefully be corrected at a later date.
 it defines 'ThreadId' as a synonym for ().
 -}
 
---forkIO has now been hoisted out into the Concurrent library.
+{- |
+This sparks off a new thread to run the 'IO' computation passed as the
+first argument, and returns the 'ThreadId' of the newly created
+thread.
+
+The new thread will be a lightweight thread; if you want to use a foreign
+library that uses thread-local storage, use 'forkOS' instead.
+-}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s -> 
+   case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+  action_plus = catchException action childHandler
+
+forkOnIO :: Int -> IO () -> IO ThreadId
+forkOnIO (I# cpu) action = IO $ \ s -> 
+   case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+  action_plus = catchException action childHandler
+
+childHandler :: Exception -> IO ()
+childHandler err = catchException (real_handler err) childHandler
+
+real_handler :: Exception -> IO ()
+real_handler ex =
+  case ex of
+       -- ignore thread GC and killThread exceptions:
+       BlockedOnDeadMVar            -> return ()
+       BlockedIndefinitely          -> return ()
+       AsyncException ThreadKilled  -> return ()
+
+       -- report all others:
+       AsyncException StackOverflow -> reportStackOverflow
+       other       -> reportError other
 
 {- | 'killThread' terminates the given thread (GHC only).
 Any work already done by the thread isn\'t
@@ -288,6 +329,15 @@ newTVar val = STM $ \s1# ->
     case newTVar# val s1# of
         (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
 
+-- |@IO@ version of 'newTVar'.  This is useful for creating top-level
+-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTVarIO :: a -> IO (TVar a)
+newTVarIO val = IO $ \s1# ->
+    case newTVar# val s1# of
+        (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+
 -- |Return the current value stored in a TVar
 readTVar :: TVar a -> STM a
 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
@@ -334,16 +384,34 @@ newMVar value =
 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
 -- the 'MVar' is left empty.
 -- 
--- If several threads are competing to take the same 'MVar', one is chosen
--- to continue at random when the 'MVar' becomes full.
+-- There are two further important properties of 'takeMVar':
+--
+--   * 'takeMVar' is single-wakeup.  That is, if there are multiple
+--     threads blocked in 'takeMVar', and the 'MVar' becomes full,
+--     only one thread will be woken up.  The runtime guarantees that
+--     the woken thread completes its 'takeMVar' operation.
+--
+--   * When multiple threads are blocked on an 'MVar', they are
+--     woken up in FIFO order.  This is useful for providing
+--     fairness properties of abstractions built using 'MVar's.
+--
 takeMVar :: MVar a -> IO a
 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
 
 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
 -- 'putMVar' will wait until it becomes empty.
 --
--- If several threads are competing to fill the same 'MVar', one is
--- chosen to continue at random when the 'MVar' becomes empty.
+-- There are two further important properties of 'putMVar':
+--
+--   * 'putMVar' is single-wakeup.  That is, if there are multiple
+--     threads blocked in 'putMVar', and the 'MVar' becomes empty,
+--     only one thread will be woken up.  The runtime guarantees that
+--     the woken thread completes its 'putMVar' operation.
+--
+--   * When multiple threads are blocked on an 'MVar', they are
+--     woken up in FIFO order.  This is useful for providing
+--     fairness properties of abstractions built using 'MVar's.
+--
 putMVar  :: MVar a -> a -> IO ()
 putMVar (MVar mvar#) x = IO $ \ s# ->
     case putMVar# mvar# x s# of
@@ -564,17 +632,13 @@ startIOManagerThread = do
        wr_end <- peekElemOff fds 1
        writeIORef stick (fromIntegral wr_end)
        c_setIOManagerPipe wr_end
-       quickForkIO $ do
+       forkIO $ do
            allocaBytes sizeofFdSet   $ \readfds -> do
            allocaBytes sizeofFdSet   $ \writefds -> do 
            allocaBytes sizeofTimeVal $ \timeval -> do
            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
        return ()
 
--- XXX: move real forkIO here from Control.Concurrent?
-quickForkIO action = IO $ \s ->
-   case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
-
 service_loop
    :: Fd               -- listen to this for wakeup calls
    -> Ptr CFdSet
@@ -612,32 +676,41 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
          if (res == -1)
             then do
                err <- getErrno
-               if err == eINTR
-                       then do_select delays'
-                       else return (res,delays')
+               case err of
+                 _ | err == eINTR ->  do_select delays'
+                       -- EINTR: just redo the select()
+                 _ | err == eBADF ->  return (True, delays)
+                       -- EBADF: one of the file descriptors is closed or bad,
+                       -- we don't know which one, so wake everyone up.
+                 _ | otherwise    ->  throwErrno "select"
+                       -- otherwise (ENOMEM or EINVAL) something has gone
+                       -- wrong; report the error.
             else
-               return (res,delays')
+               return (False,delays')
 
-  (res,delays') <- do_select delays
-  -- ToDo: check result
+  (wakeup_all,delays') <- do_select delays
 
-  b <- fdIsSet wakeup readfds
-  if b == 0 
-    then return ()
-    else alloca $ \p -> do 
+  if wakeup_all then return ()
+    else do
+      b <- fdIsSet wakeup readfds
+      if b == 0 
+        then return ()
+        else alloca $ \p -> do 
            c_read (fromIntegral wakeup) p 1; return ()
            s <- peek p         
            if (s == 0xff) 
              then return ()
              else do handler_tbl <- peek handlers
                      sp <- peekElemOff handler_tbl (fromIntegral s)
-                     quickForkIO (do io <- deRefStablePtr sp; io)
+                     forkIO (do io <- deRefStablePtr sp; io)
                      return ()
 
   takeMVar prodding
   putMVar prodding False
 
-  reqs' <- completeRequests reqs readfds writefds []
+  reqs' <- if wakeup_all then do wakeupAll reqs; return []
+                        else completeRequests reqs readfds writefds []
+
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
 stick :: IORef Fd
@@ -666,12 +739,16 @@ foreign import ccall "setIOManagerPipe"
 -- IO requests
 
 buildFdSets maxfd readfds writefds [] = return maxfd
-buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
-  fdSet fd readfds
-  buildFdSets (max maxfd fd) readfds writefds reqs
-buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
-  fdSet fd writefds
-  buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Read fd m : reqs)
+  | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
+  | otherwise        =  do
+       fdSet fd readfds
+        buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Write fd m : reqs)
+  | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
+  | otherwise        =  do
+       fdSet fd writefds
+       buildFdSets (max maxfd fd) readfds writefds reqs
 
 completeRequests [] _ _ reqs' = return reqs'
 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
@@ -685,6 +762,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do
     then do putMVar m (); completeRequests reqs readfds writefds reqs'
     else completeRequests reqs readfds writefds (Write fd m : reqs')
 
+wakeupAll [] = return ()
+wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
+wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
+
 waitForReadEvent :: Fd -> IO ()
 waitForReadEvent fd = do
   m <- newEmptyMVar
@@ -777,6 +858,9 @@ foreign import ccall safe "select"
   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
+foreign import ccall unsafe "hsFD_SETSIZE"
+  fD_SETSIZE :: Fd
+
 foreign import ccall unsafe "hsFD_CLR"
   fdClr :: Fd -> Ptr CFdSet -> IO ()