From 40fe562f6d01f6076bf00a267dd24f57b45a1933 Mon Sep 17 00:00:00 2001 From: Bryan O'Sullivan Date: Fri, 26 Nov 2010 23:28:10 +0000 Subject: [PATCH] Fix #4514 - IO manager deadlock * The public APIs for threadWaitRead and threadWaitWrite remain unchanged, and now throw an IOError if a file descriptor is closed behind their backs. This behaviour is documented. * The GHC.Conc API is extended to add a closeFd function, the behaviour of which is documented. * Behind the scenes, we add a new evtClose event, which is used only when one thread closes a file descriptor that other threads are blocking on. * Both base's IO code and network use the new closeFd function. --- Control/Concurrent.hs | 25 +++++++++++++++++++++++++ GHC/Conc.lhs | 1 + GHC/Conc/IO.hs | 21 +++++++++++++++++++++ GHC/IO/FD.hs | 12 +++++++----- System/Event.hs | 2 +- System/Event/Internal.hs | 14 ++++++++++++-- System/Event/Manager.hs | 21 ++++++++++++--------- System/Event/Thread.hs | 31 +++++++++++++++++++++++++++++-- 8 files changed, 108 insertions(+), 19 deletions(-) diff --git a/Control/Concurrent.hs b/Control/Concurrent.hs index bdcb8de..b49f7db 100644 --- a/Control/Concurrent.hs +++ b/Control/Concurrent.hs @@ -47,6 +47,7 @@ module Control.Concurrent ( threadDelay, -- :: Int -> IO () threadWaitRead, -- :: Int -> IO () threadWaitWrite, -- :: Int -> IO () + closeFd, -- :: (Int -> IO ()) -> Int -> IO () #endif -- * Communication abstractions @@ -451,6 +452,9 @@ unsafeResult = either Exception.throwIO return -- | Block the current thread until data is available to read on the -- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. threadWaitRead :: Fd -> IO () threadWaitRead fd #ifdef mingw32_HOST_OS @@ -471,6 +475,9 @@ threadWaitRead fd -- | Block the current thread until data can be written to the -- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. threadWaitWrite :: Fd -> IO () threadWaitWrite fd #ifdef mingw32_HOST_OS @@ -480,6 +487,24 @@ threadWaitWrite fd = GHC.Conc.threadWaitWrite fd #endif +-- | Close a file descriptor in a concurrency-safe way (GHC only). If +-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform +-- blocking I\/O, you /must/ use this function to close file +-- descriptors, or blocked threads may not be woken. +-- +-- Any threads that are blocked on the file descriptor via +-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having +-- IO exceptions thrown. +closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close. + -> Fd -- ^ File descriptor to close. + -> IO () +closeFd close fd +#ifdef mingw32_HOST_OS + = close fd +#else + = GHC.Conc.closeFd close fd +#endif + #ifdef mingw32_HOST_OS foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 47b6ef6..a7f6902 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -52,6 +52,7 @@ module GHC.Conc , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () + , closeFd -- :: (Int -> IO ()) -> Int -> IO () -- * TVars , STM(..) diff --git a/GHC/Conc/IO.hs b/GHC/Conc/IO.hs index 590e3ab..5a5f0b2 100644 --- a/GHC/Conc/IO.hs +++ b/GHC/Conc/IO.hs @@ -31,6 +31,7 @@ module GHC.Conc.IO , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () + , closeFd -- :: (Int -> IO ()) -> Int -> IO () #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -82,6 +83,9 @@ threadWaitRead fd -- | Block the current thread until data can be written to the -- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. threadWaitWrite :: Fd -> IO () threadWaitWrite fd #ifndef mingw32_HOST_OS @@ -92,6 +96,23 @@ threadWaitWrite fd case waitWrite# fd# s of { s' -> (# s', () #) }} +-- | Close a file descriptor in a concurrency-safe way (GHC only). If +-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform +-- blocking I\/O, you /must/ use this function to close file +-- descriptors, or blocked threads may not be woken. +-- +-- Any threads that are blocked on the file descriptor via +-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having +-- IO exceptions thrown. +closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close. + -> Fd -- ^ File descriptor to close. + -> IO () +closeFd close fd +#ifndef mingw32_HOST_OS + | threaded = Event.closeFd close fd +#endif + | otherwise = close fd + -- | Suspends the current thread for a given number of microseconds -- (GHC only). -- diff --git a/GHC/IO/FD.hs b/GHC/IO/FD.hs index d873a4e..17362dc 100644 --- a/GHC/IO/FD.hs +++ b/GHC/IO/FD.hs @@ -281,13 +281,15 @@ close fd = #ifndef mingw32_HOST_OS (flip finally) (release fd) $ do #endif - throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $ + let closer realFd = + throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $ #ifdef mingw32_HOST_OS - if fdIsSocket fd then - c_closesocket (fdFD fd) - else + if fdIsSocket fd then + c_closesocket (fromIntegral realFd) + else #endif - c_close (fdFD fd) + c_close (fromIntegral realFd) + closeFd closer (fromIntegral (fdFD fd)) release :: FD -> IO () #ifdef mingw32_HOST_OS diff --git a/System/Event.hs b/System/Event.hs index f8537ca..126107a 100644 --- a/System/Event.hs +++ b/System/Event.hs @@ -22,7 +22,7 @@ module System.Event , registerFd_ , unregisterFd , unregisterFd_ - , fdWasClosed + , closeFd -- * Registering interest in timeout events , TimeoutCallback diff --git a/System/Event/Internal.hs b/System/Event/Internal.hs index cbe961d..545ff6f 100644 --- a/System/Event/Internal.hs +++ b/System/Event/Internal.hs @@ -12,6 +12,7 @@ module System.Event.Internal , Event , evtRead , evtWrite + , evtClose , eventIs -- * Timeout type , Timeout(..) @@ -29,7 +30,7 @@ import GHC.Num (Num(..)) import GHC.Show (Show(..)) import GHC.List (filter, null) --- | An I/O event. +-- | An I\/O event. newtype Event = Event Int deriving (Eq) @@ -37,20 +38,29 @@ evtNothing :: Event evtNothing = Event 0 {-# INLINE evtNothing #-} +-- | Data is available to be read. evtRead :: Event evtRead = Event 1 {-# INLINE evtRead #-} +-- | The file descriptor is ready to accept a write. evtWrite :: Event evtWrite = Event 2 {-# INLINE evtWrite #-} +-- | Another thread closed the file descriptor. +evtClose :: Event +evtClose = Event 4 +{-# INLINE evtClose #-} + eventIs :: Event -> Event -> Bool eventIs (Event a) (Event b) = a .&. b /= 0 instance Show Event where show e = '[' : (intercalate "," . filter (not . null) $ - [evtRead `so` "evtRead", evtWrite `so` "evtWrite"]) ++ "]" + [evtRead `so` "evtRead", + evtWrite `so` "evtWrite", + evtClose `so` "evtClose"]) ++ "]" where ev `so` disp | e `eventIs` ev = disp | otherwise = "" diff --git a/System/Event/Manager.hs b/System/Event/Manager.hs index 46569eb..74b1a72 100644 --- a/System/Event/Manager.hs +++ b/System/Event/Manager.hs @@ -26,7 +26,7 @@ module System.Event.Manager , registerFd , unregisterFd_ , unregisterFd - , fdWasClosed + , closeFd -- * Registering interest in timeout events , TimeoutCallback @@ -48,7 +48,7 @@ import Control.Monad ((=<<), forM_, liftM, sequence_, when) import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef, writeIORef) import Data.Maybe (Maybe(..)) -import Data.Monoid (mconcat, mempty) +import Data.Monoid (mappend, mconcat, mempty) import GHC.Base import GHC.Conc.Signal (runHandlers) import GHC.List (filter) @@ -57,7 +57,8 @@ import GHC.Real ((/), fromIntegral ) import GHC.Show (Show(..)) import System.Event.Clock (getCurrentTime) import System.Event.Control -import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..)) +import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite, + Timeout(..)) import System.Event.Unique (Unique, UniqueSource, newSource, newUnique) import System.Posix.Types (Fd) @@ -331,15 +332,17 @@ unregisterFd mgr reg = do wake <- unregisterFd_ mgr reg when wake $ wakeManager mgr --- | Notify the event manager that a file descriptor has been closed. -fdWasClosed :: EventManager -> Fd -> IO () -fdWasClosed mgr fd = - modifyMVar_ (emFds mgr) $ \oldMap -> +-- | Close a file descriptor in a race-safe way. +closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO () +closeFd mgr close fd = do + fds <- modifyMVar (emFds mgr) $ \oldMap -> do + close fd case IM.delete (fromIntegral fd) oldMap of - (Nothing, _) -> return oldMap + (Nothing, _) -> return (oldMap, []) (Just fds, !newMap) -> do when (eventsOf fds /= mempty) $ wakeManager mgr - return newMap + return (newMap, fds) + forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose) ------------------------------------------------------------------------ -- Registering interest in timeout events diff --git a/System/Event/Thread.hs b/System/Event/Thread.hs index ae3a71a..990bae3 100644 --- a/System/Event/Thread.hs +++ b/System/Event/Thread.hs @@ -5,20 +5,26 @@ module System.Event.Thread ensureIOManagerIsRunning , threadWaitRead , threadWaitWrite + , closeFd , threadDelay , registerDelay ) where import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.Maybe (Maybe(..)) +import Foreign.C.Error (eBADF, errnoToIOError) import Foreign.Ptr (Ptr) import GHC.Base import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO, labelThread, modifyMVar_, newTVar, sharedCAF, threadStatus, writeTVar) +import GHC.IO.Exception (ioError) import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar) +import GHC.Real (fromIntegral) +import System.Event.Internal (eventIs, evtClose) import System.Event.Manager (Event, EventManager, evtRead, evtWrite, loop, new, registerFd, unregisterFd_, registerTimeout) +import qualified System.Event.Manager as M import System.IO.Unsafe (unsafePerformIO) import System.Posix.Types (Fd) @@ -47,22 +53,43 @@ registerDelay usecs = do -- | Block the current thread until data is available to read from the -- given file descriptor. +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread is blocked. threadWaitRead :: Fd -> IO () threadWaitRead = threadWait evtRead {-# INLINE threadWaitRead #-} -- | Block the current thread until the given file descriptor can -- accept data to write. +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread is blocked. threadWaitWrite :: Fd -> IO () threadWaitWrite = threadWait evtWrite {-# INLINE threadWaitWrite #-} +-- | Close a file descriptor in a concurrency-safe way. +-- +-- Any threads that are blocked on the file descriptor via +-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having +-- IO exceptions thrown. +closeFd :: (Fd -> IO ()) -- ^ Action that performs the close. + -> Fd -- ^ File descriptor to close. + -> IO () +closeFd close fd = do + Just mgr <- readIORef eventManager + M.closeFd mgr close fd + threadWait :: Event -> Fd -> IO () threadWait evt fd = do m <- newEmptyMVar Just mgr <- readIORef eventManager - _ <- registerFd mgr (\reg _ -> unregisterFd_ mgr reg >> putMVar m ()) fd evt - takeMVar m + _ <- registerFd mgr (\reg e -> unregisterFd_ mgr reg >> putMVar m e) fd evt + evt' <- takeMVar m + if evt' `eventIs` evtClose + then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing + else return () foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore" getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a) -- 1.7.10.4