import GHC.Base
import GHC.IOBase
import GHC.Num ( Num(..) )
-import GHC.Real ( fromIntegral, quot )
+import GHC.Real ( fromIntegral, div )
#ifndef mingw32_HOST_OS
import GHC.Base ( Int(..) )
#endif
showString "ThreadId " .
showsPrec d (getThreadId (id2TSO t))
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
id2TSO :: ThreadId -> ThreadId#
id2TSO (ThreadId t) = t
{- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
'throwTo' does not return until the exception has been raised in the
-target thread. The calling thread can thus be certain that the target
+target thread.
+The calling thread can thus be certain that the target
thread has received the exception. This is a useful property to know
when dealing with race conditions: eg. if there are two threads that
can kill each other, it is guaranteed that only one of the threads
exception will not be raised (and hence 'throwTo' will not return)
until the call has completed. This is the case regardless of whether
the call is inside a 'block' or not.
+
+Important note: the behaviour of 'throwTo' differs from that described in
+the paper "Asynchronous exceptions in Haskell"
+(<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
+In the paper, 'throwTo' is non-blocking; but the library implementation adopts
+a more synchronous design in which 'throwTo' does not return until the exception
+is received by the target thread. The trade-off is discussed in Section 8 of the paper.
+Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
+the paper).
+
+There is currently no guarantee that the exception delivered by 'throwTo' will be
+delivered at the first possible opportunity. In particular, if a thread may
+unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
+a pending 'throwTo'. This is arguably undesirable behaviour.
+
-}
throwTo :: ThreadId -> Exception -> IO ()
throwTo (ThreadId id) ex = IO $ \ s ->
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
--- Note that the resolution used by the Haskell runtime system's
--- internal timer is 1\/50 second, and 'threadDelay' will round its
--- argument up to the nearest multiple of this resolution.
---
-- There is no guarantee that the thread will be rescheduled promptly
-- when the delay has expired, but the thread will never continue to
-- run /earlier/ than specified.
case delay# time# s of { s -> (# s, () #)
}}
+
+-- | Set the value of returned TVar to True after a given number of
+-- microseconds. The caveats associated with threadDelay also apply.
+--
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
| threaded = waitForDelayEventSTM usecs
waitForDelayEvent :: Int -> IO ()
waitForDelayEvent usecs = do
m <- newEmptyMVar
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
+ target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
prodServiceThread
takeMVar m
waitForDelayEventSTM :: Int -> IO (TVar Bool)
waitForDelayEventSTM usecs = do
t <- atomically $ newTVar False
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
+ target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
prodServiceThread
return t
-calculateTarget :: Int -> IO Int
+calculateTarget :: Int -> IO USecs
calculateTarget usecs = do
- now <- getTicksOfDay
- let -- Convert usecs to ticks, rounding up as we must wait /at least/
- -- as long as we are told
- usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
- target = now + 1 -- getTicksOfDay will have rounded down, but
- -- again we need to wait for /at least/ as long
- -- as we are told, so add 1 to it
- + usecs'
- return target
+ now <- getUSecOfDay
+ return $ now + (fromIntegral usecs)
+
-- ----------------------------------------------------------------------------
-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
#endif
data DelayReq
- = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
- | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
+ = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
#ifndef mingw32_HOST_OS
pendingEvents :: IORef [IOReq]
| delayTime d1 <= delayTime d2 = d1 : ds
| otherwise = d2 : insertDelay d1 rest
+delayTime :: DelayReq -> USecs
delayTime (Delay t _) = t
delayTime (DelaySTM t _) = t
-type Ticks = Int
-tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-tick_msecs = 1000 `quot` tick_freq :: Int
+type USecs = Word64
-- XXX: move into GHC.IOBase from Data.IORef?
atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-foreign import ccall unsafe "getTicksOfDay"
- getTicksOfDay :: IO Ticks
+foreign import ccall unsafe "getUSecOfDay"
+ getUSecOfDay :: IO USecs
+
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+ was_set <- atomicModifyIORef prodding (\a -> (True,a))
+ if (not (was_set)) then wakeupIOManager else return ()
#ifdef mingw32_HOST_OS
-- ----------------------------------------------------------------------------
new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
let delays = foldr insertDelay old_delays new_delays
- now <- getTicksOfDay
+ now <- getUSecOfDay
(delays', timeout) <- getDelay now delays
r <- c_WaitForSingleObject wakeup timeout
_other -> service_cont wakeup delays' -- probably timeout
service_cont wakeup delays = do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
service_loop wakeup delays
-- must agree with rts/win32/ThrIOManager.c
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef nullPtr)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do hdl <- readIORef stick
- c_sendIOManagerEvent io_MANAGER_WAKEUP
- else return ()
- putMVar prodding True
+wakeupIOManager = do
+ hdl <- readIORef stick
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
-getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
getDelay now [] = return ([], iNFINITE)
getDelay now all@(d : rest)
= case d of
atomically $ writeTVar t True
getDelay now rest
_otherwise ->
- return (all, (fromIntegral (delayTime d - now) *
- fromIntegral tick_msecs))
- -- delay is in millisecs for WaitForSingleObject
+ -- delay is in millisecs for WaitForSingleObject
+ let micro_seconds = delayTime d - now
+ milli_seconds = (micro_seconds + 999) `div` 1000
+ in return (all, fromIntegral milli_seconds)
-- ToDo: this just duplicates part of System.Win32.Types, which isn't
-- available yet. We should move some Win32 functionality down here,
-- check the current time and wake up any thread in
-- threadDelay whose timeout has expired. Also find the
-- timeout value for the select() call.
- now <- getTicksOfDay
+ now <- getUSecOfDay
(delays', timeout) <- getDelay now ptimeval delays
- res <- c_select ((max wakeup maxfd)+1) readfds writefds
+ res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
nullPtr timeout
if (res == -1)
then do
if exit then return () else do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
reqs' <- if wakeup_all then do wakeupAll reqs; return []
else completeRequests reqs readfds writefds []
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do fd <- readIORef stick
- with io_MANAGER_WAKEUP $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
- else return ()
- putMVar prodding True
+wakeupIOManager :: IO ()
+wakeupIOManager = do
+ fd <- readIORef stick
+ with io_MANAGER_WAKEUP $ \pbuf -> do
+ c_write (fromIntegral fd) pbuf 1; return ()
foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
-getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
+getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
getDelay now ptimeval [] = return ([],nullPtr)
getDelay now ptimeval all@(d : rest)
= case d of
sizeofTimeVal :: Int
foreign import ccall unsafe "setTimevalTicks"
- setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+ setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
{-
On Win32 we're going to have a single Pipe, and a
newtype CFdSet = CFdSet ()
foreign import ccall safe "select"
- c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+ c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
foreign import ccall unsafe "hsFD_SETSIZE"
- fD_SETSIZE :: Fd
+ c_fD_SETSIZE :: CInt
+
+fD_SETSIZE :: Fd
+fD_SETSIZE = fromIntegral c_fD_SETSIZE
foreign import ccall unsafe "hsFD_CLR"
- fdClr :: Fd -> Ptr CFdSet -> IO ()
+ c_fdClr :: CInt -> Ptr CFdSet -> IO ()
+
+fdClr :: Fd -> Ptr CFdSet -> IO ()
+fdClr (Fd fd) fdset = c_fdClr fd fdset
foreign import ccall unsafe "hsFD_ISSET"
- fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+ c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
+
+fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
foreign import ccall unsafe "hsFD_SET"
- fdSet :: Fd -> Ptr CFdSet -> IO ()
+ c_fdSet :: CInt -> Ptr CFdSet -> IO ()
+
+fdSet :: Fd -> Ptr CFdSet -> IO ()
+fdSet (Fd fd) fdset = c_fdSet fd fdset
foreign import ccall unsafe "hsFD_ZERO"
fdZero :: Ptr CFdSet -> IO ()