threadDelay, -- :: Int -> IO ()
threadWaitRead, -- :: Int -> IO ()
threadWaitWrite, -- :: Int -> IO ()
+ closeFd, -- :: (Int -> IO ()) -> Int -> IO ()
#endif
-- * Communication abstractions
-- | Block the current thread until data is available to read on the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitRead :: Fd -> IO ()
threadWaitRead fd
#ifdef mingw32_HOST_OS
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifdef mingw32_HOST_OS
= GHC.Conc.threadWaitWrite fd
#endif
+-- | Close a file descriptor in a concurrency-safe way (GHC only). If
+-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform
+-- blocking I\/O, you /must/ use this function to close file
+-- descriptors, or blocked threads may not be woken.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd
+#ifdef mingw32_HOST_OS
+ = close fd
+#else
+ = GHC.Conc.closeFd close fd
+#endif
+
#ifdef mingw32_HOST_OS
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
+ , closeFd -- :: (Int -> IO ()) -> Int -> IO ()
-- * TVars
, STM(..)
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
+ , closeFd -- :: (Int -> IO ()) -> Int -> IO ()
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifndef mingw32_HOST_OS
case waitWrite# fd# s of { s' -> (# s', () #)
}}
+-- | Close a file descriptor in a concurrency-safe way (GHC only). If
+-- you are using 'threadWaitRead' or 'threadWaitWrite' to perform
+-- blocking I\/O, you /must/ use this function to close file
+-- descriptors, or blocked threads may not be woken.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Low-level action that performs the real close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd
+#ifndef mingw32_HOST_OS
+ | threaded = Event.closeFd close fd
+#endif
+ | otherwise = close fd
+
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
#ifndef mingw32_HOST_OS
(flip finally) (release fd) $ do
#endif
- throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $
+ let closer realFd =
+ throwErrnoIfMinus1Retry_ "GHC.IO.FD.close" $
#ifdef mingw32_HOST_OS
- if fdIsSocket fd then
- c_closesocket (fdFD fd)
- else
+ if fdIsSocket fd then
+ c_closesocket (fromIntegral realFd)
+ else
#endif
- c_close (fdFD fd)
+ c_close (fromIntegral realFd)
+ closeFd closer (fromIntegral (fdFD fd))
release :: FD -> IO ()
#ifdef mingw32_HOST_OS
, registerFd_
, unregisterFd
, unregisterFd_
- , fdWasClosed
+ , closeFd
-- * Registering interest in timeout events
, TimeoutCallback
, Event
, evtRead
, evtWrite
+ , evtClose
, eventIs
-- * Timeout type
, Timeout(..)
import GHC.Show (Show(..))
import GHC.List (filter, null)
--- | An I/O event.
+-- | An I\/O event.
newtype Event = Event Int
deriving (Eq)
evtNothing = Event 0
{-# INLINE evtNothing #-}
+-- | Data is available to be read.
evtRead :: Event
evtRead = Event 1
{-# INLINE evtRead #-}
+-- | The file descriptor is ready to accept a write.
evtWrite :: Event
evtWrite = Event 2
{-# INLINE evtWrite #-}
+-- | Another thread closed the file descriptor.
+evtClose :: Event
+evtClose = Event 4
+{-# INLINE evtClose #-}
+
eventIs :: Event -> Event -> Bool
eventIs (Event a) (Event b) = a .&. b /= 0
instance Show Event where
show e = '[' : (intercalate "," . filter (not . null) $
- [evtRead `so` "evtRead", evtWrite `so` "evtWrite"]) ++ "]"
+ [evtRead `so` "evtRead",
+ evtWrite `so` "evtWrite",
+ evtClose `so` "evtClose"]) ++ "]"
where ev `so` disp | e `eventIs` ev = disp
| otherwise = ""
, registerFd
, unregisterFd_
, unregisterFd
- , fdWasClosed
+ , closeFd
-- * Registering interest in timeout events
, TimeoutCallback
import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
writeIORef)
import Data.Maybe (Maybe(..))
-import Data.Monoid (mconcat, mempty)
+import Data.Monoid (mappend, mconcat, mempty)
import GHC.Base
import GHC.Conc.Signal (runHandlers)
import GHC.List (filter)
import GHC.Show (Show(..))
import System.Event.Clock (getCurrentTime)
import System.Event.Control
-import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..))
+import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
+ Timeout(..))
import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
import System.Posix.Types (Fd)
wake <- unregisterFd_ mgr reg
when wake $ wakeManager mgr
--- | Notify the event manager that a file descriptor has been closed.
-fdWasClosed :: EventManager -> Fd -> IO ()
-fdWasClosed mgr fd =
- modifyMVar_ (emFds mgr) $ \oldMap ->
+-- | Close a file descriptor in a race-safe way.
+closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
+closeFd mgr close fd = do
+ fds <- modifyMVar (emFds mgr) $ \oldMap -> do
+ close fd
case IM.delete (fromIntegral fd) oldMap of
- (Nothing, _) -> return oldMap
+ (Nothing, _) -> return (oldMap, [])
(Just fds, !newMap) -> do
when (eventsOf fds /= mempty) $ wakeManager mgr
- return newMap
+ return (newMap, fds)
+ forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
------------------------------------------------------------------------
-- Registering interest in timeout events
ensureIOManagerIsRunning
, threadWaitRead
, threadWaitWrite
+ , closeFd
, threadDelay
, registerDelay
) where
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Maybe (Maybe(..))
+import Foreign.C.Error (eBADF, errnoToIOError)
import Foreign.Ptr (Ptr)
import GHC.Base
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, newTVar, sharedCAF,
threadStatus, writeTVar)
+import GHC.IO.Exception (ioError)
import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar)
+import GHC.Real (fromIntegral)
+import System.Event.Internal (eventIs, evtClose)
import System.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_, registerTimeout)
+import qualified System.Event.Manager as M
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.Types (Fd)
-- | Block the current thread until data is available to read from the
-- given file descriptor.
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread is blocked.
threadWaitRead :: Fd -> IO ()
threadWaitRead = threadWait evtRead
{-# INLINE threadWaitRead #-}
-- | Block the current thread until the given file descriptor can
-- accept data to write.
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread is blocked.
threadWaitWrite :: Fd -> IO ()
threadWaitWrite = threadWait evtWrite
{-# INLINE threadWaitWrite #-}
+-- | Close a file descriptor in a concurrency-safe way.
+--
+-- Any threads that are blocked on the file descriptor via
+-- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
+-- IO exceptions thrown.
+closeFd :: (Fd -> IO ()) -- ^ Action that performs the close.
+ -> Fd -- ^ File descriptor to close.
+ -> IO ()
+closeFd close fd = do
+ Just mgr <- readIORef eventManager
+ M.closeFd mgr close fd
+
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = do
m <- newEmptyMVar
Just mgr <- readIORef eventManager
- _ <- registerFd mgr (\reg _ -> unregisterFd_ mgr reg >> putMVar m ()) fd evt
- takeMVar m
+ _ <- registerFd mgr (\reg e -> unregisterFd_ mgr reg >> putMVar m e) fd evt
+ evt' <- takeMVar m
+ if evt' `eventIs` evtClose
+ then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
+ else return ()
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)