* The public APIs for threadWaitRead and threadWaitWrite remain unchanged,
and now throw an IOError if a file descriptor is closed behind their
backs. This behaviour is documented.
* The GHC.Conc API is extended to add a closeFd function, the behaviour
of which is documented.
* Behind the scenes, we add a new evtClose event, which is used only when
one thread closes a file descriptor that other threads are blocking on.
* Both base's IO code and network use the new closeFd function.
threadDelay, -- :: Int -> IO ()
threadWaitRead, -- :: Int -> IO ()
threadWaitWrite, -- :: Int -> IO ()
threadDelay, -- :: Int -> IO ()
threadWaitRead, -- :: Int -> IO ()
threadWaitWrite, -- :: Int -> IO ()
+ closeFd, -- :: (Int -> IO ()) -> Int -> IO ()
#endif
-- * Communication abstractions
#endif
-- * Communication abstractions
-- | Block the current thread until data is available to read on the
-- given file descriptor (GHC only).
-- | Block the current thread until data is available to read on the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitRead :: Fd -> IO ()
threadWaitRead fd
#ifdef mingw32_HOST_OS
threadWaitRead :: Fd -> IO ()
threadWaitRead fd
#ifdef mingw32_HOST_OS
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifdef mingw32_HOST_OS
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifdef mingw32_HOST_OS
= GHC.Conc.threadWaitWrite fd
#endif
= GHC.Conc.threadWaitWrite fd
#endif
+-- | Close a file descriptor in a concurrency-safe way (GHC only). If
+-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform
+-- blocking I\/O, you /must/ use this function to close file
+-- descriptors, or blocked threads may not be woken.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd
+#ifdef mingw32_HOST_OS
+ = close fd
+#else
+ = GHC.Conc.closeFd close fd
+#endif
+
#ifdef mingw32_HOST_OS
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
#ifdef mingw32_HOST_OS
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
+ , closeFd -- :: (Int -> IO ()) -> Int -> IO ()
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
+ , closeFd -- :: (Int -> IO ()) -> Int -> IO ()
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifndef mingw32_HOST_OS
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifndef mingw32_HOST_OS
case waitWrite# fd# s of { s' -> (# s', () #)
}}
case waitWrite# fd# s of { s' -> (# s', () #)
}}
+-- | Close a file descriptor in a concurrency-safe way (GHC only). If
+-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform
+-- blocking I\/O, you /must/ use this function to close file
+-- descriptors, or blocked threads may not be woken.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd
+#ifndef mingw32_HOST_OS
+ | threaded = Event.closeFd close fd
+#endif
+ | otherwise = close fd
+
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
#ifndef mingw32_HOST_OS
(flip finally) (release fd) $ do
#endif
#ifndef mingw32_HOST_OS
(flip finally) (release fd) $ do
#endif
- throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $
+ let closer realFd =
+ throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $
- if fdIsSocket fd then
- c_closesocket (fdFD fd)
- else
+ if fdIsSocket fd then
+ c_closesocket (fromIntegral realFd)
+ else
+ c_close (fromIntegral realFd)
+ closeFd closer (fromIntegral (fdFD fd))
release :: FD -> IO ()
#ifdef mingw32_HOST_OS
release :: FD -> IO ()
#ifdef mingw32_HOST_OS
, registerFd_
, unregisterFd
, unregisterFd_
, registerFd_
, unregisterFd
, unregisterFd_
-- * Registering interest in timeout events
, TimeoutCallback
-- * Registering interest in timeout events
, TimeoutCallback
, Event
, evtRead
, evtWrite
, Event
, evtRead
, evtWrite
, eventIs
-- * Timeout type
, Timeout(..)
, eventIs
-- * Timeout type
, Timeout(..)
import GHC.Show (Show(..))
import GHC.List (filter, null)
import GHC.Show (Show(..))
import GHC.List (filter, null)
newtype Event = Event Int
deriving (Eq)
newtype Event = Event Int
deriving (Eq)
evtNothing = Event 0
{-# INLINE evtNothing #-}
evtNothing = Event 0
{-# INLINE evtNothing #-}
+-- | Data is available to be read.
evtRead :: Event
evtRead = Event 1
{-# INLINE evtRead #-}
evtRead :: Event
evtRead = Event 1
{-# INLINE evtRead #-}
+-- | The file descriptor is ready to accept a write.
evtWrite :: Event
evtWrite = Event 2
{-# INLINE evtWrite #-}
evtWrite :: Event
evtWrite = Event 2
{-# INLINE evtWrite #-}
+-- | Another thread closed the file descriptor.
+evtClose :: Event
+evtClose = Event 4
+{-# INLINE evtClose #-}
+
eventIs :: Event -> Event -> Bool
eventIs (Event a) (Event b) = a .&. b /= 0
instance Show Event where
show e = '[' : (intercalate "," . filter (not . null) $
eventIs :: Event -> Event -> Bool
eventIs (Event a) (Event b) = a .&. b /= 0
instance Show Event where
show e = '[' : (intercalate "," . filter (not . null) $
- [evtRead `so` "evtRead", evtWrite `so` "evtWrite"]) ++ "]"
+ [evtRead `so` "evtRead",
+ evtWrite `so` "evtWrite",
+ evtClose `so` "evtClose"]) ++ "]"
where ev `so` disp | e `eventIs` ev = disp
| otherwise = ""
where ev `so` disp | e `eventIs` ev = disp
| otherwise = ""
, registerFd
, unregisterFd_
, unregisterFd
, registerFd
, unregisterFd_
, unregisterFd
-- * Registering interest in timeout events
, TimeoutCallback
-- * Registering interest in timeout events
, TimeoutCallback
import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
writeIORef)
import Data.Maybe (Maybe(..))
import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
writeIORef)
import Data.Maybe (Maybe(..))
-import Data.Monoid (mconcat, mempty)
+import Data.Monoid (mappend, mconcat, mempty)
import GHC.Base
import GHC.Conc.Signal (runHandlers)
import GHC.List (filter)
import GHC.Base
import GHC.Conc.Signal (runHandlers)
import GHC.List (filter)
import GHC.Show (Show(..))
import System.Event.Clock (getCurrentTime)
import System.Event.Control
import GHC.Show (Show(..))
import System.Event.Clock (getCurrentTime)
import System.Event.Control
-import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..))
+import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
+ Timeout(..))
import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
import System.Posix.Types (Fd)
import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
import System.Posix.Types (Fd)
wake <- unregisterFd_ mgr reg
when wake $ wakeManager mgr
wake <- unregisterFd_ mgr reg
when wake $ wakeManager mgr
--- | Notify the event manager that a file descriptor has been closed.
-fdWasClosed :: EventManager -> Fd -> IO ()
-fdWasClosed mgr fd =
- modifyMVar_ (emFds mgr) $ \oldMap ->
+-- | Close a file descriptor in a race-safe way.
+closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
+closeFd mgr close fd = do
+ fds <- modifyMVar (emFds mgr) $ \oldMap -> do
+ close fd
case IM.delete (fromIntegral fd) oldMap of
case IM.delete (fromIntegral fd) oldMap of
- (Nothing, _) -> return oldMap
+ (Nothing, _) -> return (oldMap, [])
(Just fds, !newMap) -> do
when (eventsOf fds /= mempty) $ wakeManager mgr
(Just fds, !newMap) -> do
when (eventsOf fds /= mempty) $ wakeManager mgr
+ return (newMap, fds)
+ forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
------------------------------------------------------------------------
-- Registering interest in timeout events
------------------------------------------------------------------------
-- Registering interest in timeout events
ensureIOManagerIsRunning
, threadWaitRead
, threadWaitWrite
ensureIOManagerIsRunning
, threadWaitRead
, threadWaitWrite
, threadDelay
, registerDelay
) where
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Maybe (Maybe(..))
, threadDelay
, registerDelay
) where
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Maybe (Maybe(..))
+import Foreign.C.Error (eBADF, errnoToIOError)
import Foreign.Ptr (Ptr)
import GHC.Base
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, newTVar, sharedCAF,
threadStatus, writeTVar)
import Foreign.Ptr (Ptr)
import GHC.Base
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, newTVar, sharedCAF,
threadStatus, writeTVar)
+import GHC.IO.Exception (ioError)
import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar)
import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar)
+import GHC.Real (fromIntegral)
+import System.Event.Internal (eventIs, evtClose)
import System.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_, registerTimeout)
import System.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_, registerTimeout)
+import qualified System.Event.Manager as M
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.Types (Fd)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.Types (Fd)
-- | Block the current thread until data is available to read from the
-- given file descriptor.
-- | Block the current thread until data is available to read from the
-- given file descriptor.
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread is blocked.
threadWaitRead :: Fd -> IO ()
threadWaitRead = threadWait evtRead
{-# INLINE threadWaitRead #-}
-- | Block the current thread until the given file descriptor can
-- accept data to write.
threadWaitRead :: Fd -> IO ()
threadWaitRead = threadWait evtRead
{-# INLINE threadWaitRead #-}
-- | Block the current thread until the given file descriptor can
-- accept data to write.
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread is blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite = threadWait evtWrite
{-# INLINE threadWaitWrite #-}
threadWaitWrite :: Fd -> IO ()
threadWaitWrite = threadWait evtWrite
{-# INLINE threadWaitWrite #-}
+-- | Close a file descriptor in a concurrency-safe way.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Action that performs the close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd = do
+ Just mgr <- readIORef eventManager
+ M.closeFd mgr close fd
+
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = do
m <- newEmptyMVar
Just mgr <- readIORef eventManager
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = do
m <- newEmptyMVar
Just mgr <- readIORef eventManager
- _ <- registerFd mgr (\reg _ -> unregisterFd_ mgr reg >> putMVar m ()) fd evt
- takeMVar m
+ _ <- registerFd mgr (\reg e -> unregisterFd_ mgr reg >> putMVar m e) fd evt
+ evt' <- takeMVar m
+ if evt' `eventIs` evtClose
+ then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
+ else return ()
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)