( ThreadId(..)
-- Forking and suchlike
+ , forkIO -- :: IO a -> IO ThreadId
+ , childHandler -- :: Exception -> IO ()
, myThreadId -- :: IO ThreadId
, killThread -- :: ThreadId -> IO ()
, throwTo -- :: ThreadId -> Exception -> IO ()
-- Waiting
, threadDelay -- :: Int -> IO ()
+ , registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
, catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
, TVar -- abstract
, newTVar -- :: a -> STM (TVar a)
+ , newTVarIO -- :: a -> STM (TVar a)
, readTVar -- :: TVar a -> STM a
, writeTVar -- :: a -> TVar a -> STM ()
, unsafeIOToSTM -- :: IO a -> STM a
import Foreign
import Foreign.C
+#ifndef __HADDOCK__
+import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
+#endif
+
import Data.Maybe
import GHC.Base
import GHC.Num ( Num(..) )
import GHC.Real ( fromIntegral, quot )
import GHC.Base ( Int(..) )
-import GHC.Exception ( Exception(..), AsyncException(..) )
+import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
import GHC.Pack ( packCString# )
import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
import GHC.STRef
it defines 'ThreadId' as a synonym for ().
-}
---forkIO has now been hoisted out into the Concurrent library.
+{- |
+This sparks off a new thread to run the 'IO' computation passed as the
+first argument, and returns the 'ThreadId' of the newly created
+thread.
+
+The new thread will be a lightweight thread; if you want to use a foreign
+library that uses thread-local storage, use 'forkOS' instead.
+-}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s ->
+ case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+ action_plus = catchException action childHandler
+
+childHandler :: Exception -> IO ()
+childHandler err = catchException (real_handler err) childHandler
+
+real_handler :: Exception -> IO ()
+real_handler ex =
+ case ex of
+ -- ignore thread GC and killThread exceptions:
+ BlockedOnDeadMVar -> return ()
+ BlockedIndefinitely -> return ()
+ AsyncException ThreadKilled -> return ()
+
+ -- report all others:
+ AsyncException StackOverflow -> reportStackOverflow
+ other -> reportError other
{- | 'killThread' terminates the given thread (GHC only).
Any work already done by the thread isn\'t
thread has received the exception. This is a useful property to know
when dealing with race conditions: eg. if there are two threads that
can kill each other, it is guaranteed that only one of the threads
-will get to kill the other. -}
+will get to kill the other.
+
+If the target thread is currently making a foreign call, then the
+exception will not be raised (and hence 'throwTo' will not return)
+until the call has completed. This is the case regardless of whether
+the call is inside a 'block' or not.
+ -}
throwTo :: ThreadId -> Exception -> IO ()
throwTo (ThreadId id) ex = IO $ \ s ->
case (killThread# id ex s) of s1 -> (# s1, () #)
case newTVar# val s1# of
(# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+-- |@IO@ version of 'newTVar'. This is useful for creating top-level
+-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTVarIO :: a -> IO (TVar a)
+newTVarIO val = IO $ \s1# ->
+ case newTVar# val s1# of
+ (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+
-- |Return the current value stored in a TVar
readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
case delay# time# s of { s -> (# s, () #)
}}
+registerDelay usecs
+#ifndef mingw32_HOST_OS
+ | threaded = waitForDelayEventSTM usecs
+ | otherwise = error "registerDelay: requires -threaded"
+#else
+ = error "registerDelay: not currently supported on Windows"
+#endif
+
-- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
#ifdef mingw32_HOST_OS
foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
| Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
data DelayReq
- = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+ = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
pendingEvents :: IORef [IOReq]
pendingDelays :: IORef [DelayReq]
wr_end <- peekElemOff fds 1
writeIORef stick (fromIntegral wr_end)
c_setIOManagerPipe wr_end
- quickForkIO $ do
+ forkIO $ do
allocaBytes sizeofFdSet $ \readfds -> do
allocaBytes sizeofFdSet $ \writefds -> do
allocaBytes sizeofTimeVal $ \timeval -> do
service_loop (fromIntegral rd_end) readfds writefds timeval [] []
return ()
--- XXX: move real forkIO here from Control.Concurrent?
-quickForkIO action = IO $ \s ->
- case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
-
service_loop
:: Fd -- listen to this for wakeup calls
-> Ptr CFdSet
s <- peek p
if (s == 0xff)
then return ()
- else c_startSignalHandler (fromIntegral s)
+ else do handler_tbl <- peek handlers
+ sp <- peekElemOff handler_tbl (fromIntegral s)
+ forkIO (do io <- deRefStablePtr sp; io)
+ return ()
takeMVar prodding
putMVar prodding False
else return ()
putMVar prodding True
-foreign import ccall unsafe "startSignalHandler"
- c_startSignalHandler :: CInt -> IO ()
+foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
foreign import ccall "setIOManagerPipe"
c_setIOManagerPipe :: CInt -> IO ()
-- IO requests
buildFdSets maxfd readfds writefds [] = return maxfd
-buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
- fdSet fd readfds
- buildFdSets (max maxfd fd) readfds writefds reqs
-buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
- fdSet fd writefds
- buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Read fd m : reqs)
+ | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
+ | otherwise = do
+ fdSet fd readfds
+ buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Write fd m : reqs)
+ | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
+ | otherwise = do
+ fdSet fd writefds
+ buildFdSets (max maxfd fd) readfds writefds reqs
completeRequests [] _ _ reqs' = return reqs'
completeRequests (Read fd m : reqs) readfds writefds reqs' = do
prodServiceThread
takeMVar m
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+ t <- atomically $ newTVar False
+ now <- getTicksOfDay
+ let target = now + usecs `quot` tick_usecs
+ atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+ prodServiceThread
+ return t
+
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
getDelay now ptimeval [] = return ([],nullPtr)
-getDelay now ptimeval all@(Delay time m : rest)
- | now >= time = do
+getDelay now ptimeval all@(d : rest)
+ = case d of
+ Delay time m | now >= time -> do
putMVar m ()
getDelay now ptimeval rest
- | otherwise = do
- setTimevalTicks ptimeval (time - now)
+ DelaySTM time t | now >= time -> do
+ atomically $ writeTVar t True
+ getDelay now ptimeval rest
+ _otherwise -> do
+ setTimevalTicks ptimeval (delayTime d - now)
return (all,ptimeval)
insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d@(Delay time m) [] = [d]
-insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
- | time <= time' = d1 : ds
- | otherwise = d2 : insertDelay d1 rest
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+ | delayTime d1 <= delayTime d2 = d1 : ds
+ | otherwise = d2 : insertDelay d1 rest
+
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
type Ticks = Int
tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
+foreign import ccall unsafe "hsFD_SETSIZE"
+ fD_SETSIZE :: Fd
+
foreign import ccall unsafe "hsFD_CLR"
fdClr :: Fd -> Ptr CFdSet -> IO ()