2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
118 #ifndef mingw32_HOST_OS
121 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
122 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
124 import GHC.IO.Exception
128 import GHC.Num ( Num(..) )
129 import GHC.Real ( fromIntegral )
130 #ifndef mingw32_HOST_OS
132 import GHC.Arr ( inRange )
134 #ifdef mingw32_HOST_OS
135 import GHC.Real ( div )
138 #ifdef mingw32_HOST_OS
139 import GHC.Read ( Read )
140 import GHC.Enum ( Enum )
142 import GHC.Pack ( packCString# )
143 import GHC.Show ( Show(..), showString )
145 infixr 0 `par`, `pseq`
148 %************************************************************************
150 \subsection{@ThreadId@, @par@, and @fork@}
152 %************************************************************************
155 data ThreadId = ThreadId ThreadId# deriving( Typeable )
156 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
157 -- But since ThreadId# is unlifted, the Weak type must use open
160 A 'ThreadId' is an abstract type representing a handle to a thread.
161 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
162 the 'Ord' instance implements an arbitrary total ordering over
163 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
164 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
165 useful when debugging or diagnosing the behaviour of a concurrent
168 /Note/: in GHC, if you have a 'ThreadId', you essentially have
169 a pointer to the thread itself. This means the thread itself can\'t be
170 garbage collected until you drop the 'ThreadId'.
171 This misfeature will hopefully be corrected at a later date.
173 /Note/: Hugs does not provide any operations on other threads;
174 it defines 'ThreadId' as a synonym for ().
177 instance Show ThreadId where
179 showString "ThreadId " .
180 showsPrec d (getThreadId (id2TSO t))
182 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
184 id2TSO :: ThreadId -> ThreadId#
185 id2TSO (ThreadId t) = t
187 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 cmpThread :: ThreadId -> ThreadId -> Ordering
192 case cmp_thread (id2TSO t1) (id2TSO t2) of
197 instance Eq ThreadId where
199 case t1 `cmpThread` t2 of
203 instance Ord ThreadId where
207 Sparks off a new thread to run the 'IO' computation passed as the
208 first argument, and returns the 'ThreadId' of the newly created
211 The new thread will be a lightweight thread; if you want to use a foreign
212 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
214 GHC note: the new thread inherits the /blocked/ state of the parent
215 (see 'Control.Exception.block').
217 The newly created thread has an exception handler that discards the
218 exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
219 'ThreadKilled', and passes all other exceptions to the uncaught
220 exception handler (see 'setUncaughtExceptionHandler').
222 forkIO :: IO () -> IO ThreadId
223 forkIO action = IO $ \ s ->
224 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
226 action_plus = catchException action childHandler
229 Like 'forkIO', but lets you specify on which CPU the thread is
230 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
231 will stay on the same CPU for its entire lifetime (`forkIO` threads
232 can migrate between CPUs according to the scheduling policy).
233 `forkOnIO` is useful for overriding the scheduling policy when you
234 know in advance how best to distribute the threads.
236 The `Int` argument specifies the CPU number; it is interpreted modulo
237 'numCapabilities' (note that it actually specifies a capability number
238 rather than a CPU number, but to a first approximation the two are
241 forkOnIO :: Int -> IO () -> IO ThreadId
242 forkOnIO (I# cpu) action = IO $ \ s ->
243 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
245 action_plus = catchException action childHandler
247 -- | the value passed to the @+RTS -N@ flag. This is the number of
248 -- Haskell threads that can run truly simultaneously at any given
249 -- time, and is typically set to the number of physical CPU cores on
251 numCapabilities :: Int
252 numCapabilities = unsafePerformIO $ do
253 n <- peek n_capabilities
254 return (fromIntegral n)
256 #if defined(mingw32_HOST_OS) && defined(__PIC__)
257 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
259 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
261 childHandler :: SomeException -> IO ()
262 childHandler err = catchException (real_handler err) childHandler
264 real_handler :: SomeException -> IO ()
265 real_handler se@(SomeException ex) =
266 -- ignore thread GC and killThread exceptions:
268 Just BlockedIndefinitelyOnMVar -> return ()
270 Just BlockedIndefinitelyOnSTM -> return ()
272 Just ThreadKilled -> return ()
274 -- report all others:
275 Just StackOverflow -> reportStackOverflow
278 {- | 'killThread' raises the 'ThreadKilled' exception in the given
281 > killThread tid = throwTo tid ThreadKilled
284 killThread :: ThreadId -> IO ()
285 killThread tid = throwTo tid ThreadKilled
287 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
289 'throwTo' does not return until the exception has been raised in the
291 The calling thread can thus be certain that the target
292 thread has received the exception. This is a useful property to know
293 when dealing with race conditions: eg. if there are two threads that
294 can kill each other, it is guaranteed that only one of the threads
295 will get to kill the other.
297 Whatever work the target thread was doing when the exception was
298 raised is not lost: the computation is suspended until required by
301 If the target thread is currently making a foreign call, then the
302 exception will not be raised (and hence 'throwTo' will not return)
303 until the call has completed. This is the case regardless of whether
304 the call is inside a 'block' or not.
306 Important note: the behaviour of 'throwTo' differs from that described in
307 the paper \"Asynchronous exceptions in Haskell\"
308 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
309 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
310 a more synchronous design in which 'throwTo' does not return until the exception
311 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
312 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
313 the paper). Unlike other interruptible operations, however, 'throwTo'
314 is /always/ interruptible, even if it does not actually block.
316 There is no guarantee that the exception will be delivered promptly,
317 although the runtime will endeavour to ensure that arbitrary
318 delays don't occur. In GHC, an exception can only be raised when a
319 thread reaches a /safe point/, where a safe point is where memory
320 allocation occurs. Some loops do not perform any memory allocation
321 inside the loop and therefore cannot be interrupted by a 'throwTo'.
323 Blocked 'throwTo' is fair: if multiple threads are trying to throw an
324 exception to the same target thread, they will succeed in FIFO order.
327 throwTo :: Exception e => ThreadId -> e -> IO ()
328 throwTo (ThreadId tid) ex = IO $ \ s ->
329 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
331 -- | Returns the 'ThreadId' of the calling thread (GHC only).
332 myThreadId :: IO ThreadId
333 myThreadId = IO $ \s ->
334 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
337 -- |The 'yield' action allows (forces, in a co-operative multitasking
338 -- implementation) a context-switch to any other currently runnable
339 -- threads (if any), and is occasionally useful when implementing
340 -- concurrency abstractions.
343 case (yield# s) of s1 -> (# s1, () #)
345 {- | 'labelThread' stores a string as identifier for this thread if
346 you built a RTS with debugging support. This identifier will be used in
347 the debugging output to make distinction of different threads easier
348 (otherwise you only have the thread state object\'s address in the heap).
350 Other applications like the graphical Concurrent Haskell Debugger
351 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
352 'labelThread' for their purposes as well.
355 labelThread :: ThreadId -> String -> IO ()
356 labelThread (ThreadId t) str = IO $ \ s ->
357 let !ps = packCString# str
358 !adr = byteArrayContents# ps in
359 case (labelThread# t adr s) of s1 -> (# s1, () #)
361 -- Nota Bene: 'pseq' used to be 'seq'
362 -- but 'seq' is now defined in PrelGHC
364 -- "pseq" is defined a bit weirdly (see below)
366 -- The reason for the strange "lazy" call is that
367 -- it fools the compiler into thinking that pseq and par are non-strict in
368 -- their second argument (even if it inlines pseq at the call site).
369 -- If it thinks pseq is strict in "y", then it often evaluates
370 -- "y" before "x", which is totally wrong.
374 pseq x y = x `seq` lazy y
378 par x y = case (par# x) of { _ -> lazy y }
380 -- | Internal function used by the RTS to run sparks.
383 where loop s = case getSpark# s of
385 if n ==# 0# then (# s', () #)
390 -- ^blocked on on 'MVar'
392 -- ^blocked on a computation in progress by another thread
394 -- ^blocked in 'throwTo'
396 -- ^blocked in 'retry' in an STM transaction
397 | BlockedOnForeignCall
398 -- ^currently in a foreign call
400 -- ^blocked on some other resource. Without @-threaded@,
401 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
402 -- they show up as 'BlockedOnMVar'.
403 deriving (Eq,Ord,Show)
405 -- | The current status of a thread
408 -- ^the thread is currently runnable or running
410 -- ^the thread has finished
411 | ThreadBlocked BlockReason
412 -- ^the thread is blocked on some resource
414 -- ^the thread received an uncaught exception
415 deriving (Eq,Ord,Show)
417 threadStatus :: ThreadId -> IO ThreadStatus
418 threadStatus (ThreadId t) = IO $ \s ->
419 case threadStatus# t s of
420 (# s', stat #) -> (# s', mk_stat (I# stat) #)
422 -- NB. keep these in sync with includes/Constants.h
423 mk_stat 0 = ThreadRunning
424 mk_stat 1 = ThreadBlocked BlockedOnMVar
425 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
426 mk_stat 3 = ThreadBlocked BlockedOnException
427 mk_stat 7 = ThreadBlocked BlockedOnSTM
428 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
429 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
430 mk_stat 16 = ThreadFinished
431 mk_stat 17 = ThreadDied
432 mk_stat _ = ThreadBlocked BlockedOnOther
436 %************************************************************************
438 \subsection[stm]{Transactional heap operations}
440 %************************************************************************
442 TVars are shared memory locations which support atomic memory
446 -- |A monad supporting atomic memory transactions.
447 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
449 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
452 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
454 instance Functor STM where
455 fmap f x = x >>= (return . f)
457 instance Monad STM where
458 {-# INLINE return #-}
462 return x = returnSTM x
463 m >>= k = bindSTM m k
465 bindSTM :: STM a -> (a -> STM b) -> STM b
466 bindSTM (STM m) k = STM ( \s ->
468 (# new_s, a #) -> unSTM (k a) new_s
471 thenSTM :: STM a -> STM b -> STM b
472 thenSTM (STM m) k = STM ( \s ->
474 (# new_s, _ #) -> unSTM k new_s
477 returnSTM :: a -> STM a
478 returnSTM x = STM (\s -> (# s, x #))
480 instance MonadPlus STM where
484 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
485 -- dangerous thing to do.
487 -- * The STM implementation will often run transactions multiple
488 -- times, so you need to be prepared for this if your IO has any
491 -- * The STM implementation will abort transactions that are known to
492 -- be invalid and need to be restarted. This may happen in the middle
493 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
494 -- that need releasing (exception handlers are ignored when aborting
495 -- the transaction). That includes doing any IO using Handles, for
496 -- example. Getting this wrong will probably lead to random deadlocks.
498 -- * The transaction may have seen an inconsistent view of memory when
499 -- the IO runs. Invariants that you expect to be true throughout
500 -- your program may not be true inside a transaction, due to the
501 -- way transactions are implemented. Normally this wouldn't be visible
502 -- to the programmer, but using `unsafeIOToSTM` can expose it.
504 unsafeIOToSTM :: IO a -> STM a
505 unsafeIOToSTM (IO m) = STM m
507 -- |Perform a series of STM actions atomically.
509 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
510 -- Any attempt to do so will result in a runtime error. (Reason: allowing
511 -- this would effectively allow a transaction inside a transaction, depending
512 -- on exactly when the thunk is evaluated.)
514 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
515 -- and which allows top-level TVars to be allocated.
517 atomically :: STM a -> IO a
518 atomically (STM m) = IO (\s -> (atomically# m) s )
520 -- |Retry execution of the current memory transaction because it has seen
521 -- values in TVars which mean that it should not continue (e.g. the TVars
522 -- represent a shared buffer that is now empty). The implementation may
523 -- block the thread until one of the TVars that it has read from has been
524 -- udpated. (GHC only)
526 retry = STM $ \s# -> retry# s#
528 -- |Compose two alternative STM actions (GHC only). If the first action
529 -- completes without retrying then it forms the result of the orElse.
530 -- Otherwise, if the first action retries, then the second action is
531 -- tried in its place. If both actions retry then the orElse as a
533 orElse :: STM a -> STM a -> STM a
534 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
536 -- |Exception handling within STM actions.
537 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
538 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
540 -- | Low-level primitive on which always and alwaysSucceeds are built.
541 -- checkInv differs form these in that (i) the invariant is not
542 -- checked when checkInv is called, only at the end of this and
543 -- subsequent transcations, (ii) the invariant failure is indicated
544 -- by raising an exception.
545 checkInv :: STM a -> STM ()
546 checkInv (STM m) = STM (\s -> (check# m) s)
548 -- | alwaysSucceeds adds a new invariant that must be true when passed
549 -- to alwaysSucceeds, at the end of the current transaction, and at
550 -- the end of every subsequent transaction. If it fails at any
551 -- of those points then the transaction violating it is aborted
552 -- and the exception raised by the invariant is propagated.
553 alwaysSucceeds :: STM a -> STM ()
554 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
557 -- | always is a variant of alwaysSucceeds in which the invariant is
558 -- expressed as an STM Bool action that must return True. Returning
559 -- False or raising an exception are both treated as invariant failures.
560 always :: STM Bool -> STM ()
561 always i = alwaysSucceeds ( do v <- i
562 if (v) then return () else ( error "Transacional invariant violation" ) )
564 -- |Shared memory locations that support atomic memory transactions.
565 data TVar a = TVar (TVar# RealWorld a)
567 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
569 instance Eq (TVar a) where
570 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
572 -- |Create a new TVar holding a value supplied
573 newTVar :: a -> STM (TVar a)
574 newTVar val = STM $ \s1# ->
575 case newTVar# val s1# of
576 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
578 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
579 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
580 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
582 newTVarIO :: a -> IO (TVar a)
583 newTVarIO val = IO $ \s1# ->
584 case newTVar# val s1# of
585 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
587 -- |Return the current value stored in a TVar.
588 -- This is equivalent to
590 -- > readTVarIO = atomically . readTVar
592 -- but works much faster, because it doesn't perform a complete
593 -- transaction, it just reads the current value of the 'TVar'.
594 readTVarIO :: TVar a -> IO a
595 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
597 -- |Return the current value stored in a TVar
598 readTVar :: TVar a -> STM a
599 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
601 -- |Write the supplied value into a TVar
602 writeTVar :: TVar a -> a -> STM ()
603 writeTVar (TVar tvar#) val = STM $ \s1# ->
604 case writeTVar# tvar# val s1# of
612 withMVar :: MVar a -> (a -> IO b) -> IO b
616 b <- catchAny (unblock (io a))
617 (\e -> do putMVar m a; throw e)
621 modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
625 a' <- catchAny (unblock (io a))
626 (\e -> do putMVar m a; throw e)
631 %************************************************************************
633 \subsection{Thread waiting}
635 %************************************************************************
638 #ifdef mingw32_HOST_OS
640 -- Note: threadWaitRead and threadWaitWrite aren't really functional
641 -- on Win32, but left in there because lib code (still) uses them (the manner
642 -- in which they're used doesn't cause problems on a Win32 platform though.)
644 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
645 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
646 IO $ \s -> case asyncRead# fd isSock len buf s of
647 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
649 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
650 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
651 IO $ \s -> case asyncWrite# fd isSock len buf s of
652 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
654 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
655 asyncDoProc (FunPtr proc) (Ptr param) =
656 -- the 'length' value is ignored; simplifies implementation of
657 -- the async*# primops to have them all return the same result.
658 IO $ \s -> case asyncDoProc# proc param s of
659 (# s', _len#, err# #) -> (# s', I# err# #)
661 -- to aid the use of these primops by the IO Handle implementation,
662 -- provide the following convenience funs:
664 -- this better be a pinned byte array!
665 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
666 asyncReadBA fd isSock len off bufB =
667 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
669 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
670 asyncWriteBA fd isSock len off bufB =
671 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
675 -- -----------------------------------------------------------------------------
678 -- | Block the current thread until data is available to read on the
679 -- given file descriptor (GHC only).
680 threadWaitRead :: Fd -> IO ()
682 #ifndef mingw32_HOST_OS
683 | threaded = waitForReadEvent fd
685 | otherwise = IO $ \s ->
686 case fromIntegral fd of { I# fd# ->
687 case waitRead# fd# s of { s' -> (# s', () #)
690 -- | Block the current thread until data can be written to the
691 -- given file descriptor (GHC only).
692 threadWaitWrite :: Fd -> IO ()
694 #ifndef mingw32_HOST_OS
695 | threaded = waitForWriteEvent fd
697 | otherwise = IO $ \s ->
698 case fromIntegral fd of { I# fd# ->
699 case waitWrite# fd# s of { s' -> (# s', () #)
702 -- | Suspends the current thread for a given number of microseconds
705 -- There is no guarantee that the thread will be rescheduled promptly
706 -- when the delay has expired, but the thread will never continue to
707 -- run /earlier/ than specified.
709 threadDelay :: Int -> IO ()
711 | threaded = waitForDelayEvent time
712 | otherwise = IO $ \s ->
713 case fromIntegral time of { I# time# ->
714 case delay# time# s of { s' -> (# s', () #)
718 -- | Set the value of returned TVar to True after a given number of
719 -- microseconds. The caveats associated with threadDelay also apply.
721 registerDelay :: Int -> IO (TVar Bool)
723 | threaded = waitForDelayEventSTM usecs
724 | otherwise = error "registerDelay: requires -threaded"
726 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
728 waitForDelayEvent :: Int -> IO ()
729 waitForDelayEvent usecs = do
731 target <- calculateTarget usecs
732 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
736 -- Delays for use in STM
737 waitForDelayEventSTM :: Int -> IO (TVar Bool)
738 waitForDelayEventSTM usecs = do
739 t <- atomically $ newTVar False
740 target <- calculateTarget usecs
741 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
745 calculateTarget :: Int -> IO USecs
746 calculateTarget usecs = do
748 return $ now + (fromIntegral usecs)
751 -- ----------------------------------------------------------------------------
752 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
754 -- In the threaded RTS, we employ a single IO Manager thread to wait
755 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
756 -- and delays (threadDelay).
758 -- We can do this because in the threaded RTS the IO Manager can make
759 -- a non-blocking call to select(), so we don't have to do select() in
760 -- the scheduler as we have to in the non-threaded RTS. We get performance
761 -- benefits from doing it this way, because we only have to restart the select()
762 -- when a new request arrives, rather than doing one select() each time
763 -- around the scheduler loop. Furthermore, the scheduler can be simplified
764 -- by not having to check for completed IO requests.
766 #ifndef mingw32_HOST_OS
768 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
769 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
773 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
774 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
776 #ifndef mingw32_HOST_OS
777 {-# NOINLINE pendingEvents #-}
778 pendingEvents :: IORef [IOReq]
779 pendingEvents = unsafePerformIO $ do
781 sharedCAF m getOrSetGHCConcPendingEventsStore
783 foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
784 getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
787 {-# NOINLINE pendingDelays #-}
788 pendingDelays :: IORef [DelayReq]
789 pendingDelays = unsafePerformIO $ do
791 sharedCAF m getOrSetGHCConcPendingDelaysStore
793 foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
794 getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
796 {-# NOINLINE ioManagerThread #-}
797 ioManagerThread :: MVar (Maybe ThreadId)
798 ioManagerThread = unsafePerformIO $ do
800 sharedCAF m getOrSetGHCConcIOManagerThreadStore
802 foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
803 getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
805 ensureIOManagerIsRunning :: IO ()
806 ensureIOManagerIsRunning
807 | threaded = startIOManagerThread
808 | otherwise = return ()
810 startIOManagerThread :: IO ()
811 startIOManagerThread = do
812 modifyMVar_ ioManagerThread $ \old -> do
813 let create = do t <- forkIO ioManager; return (Just t)
819 ThreadFinished -> create
821 _other -> return (Just t)
823 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
824 insertDelay d [] = [d]
825 insertDelay d1 ds@(d2 : rest)
826 | delayTime d1 <= delayTime d2 = d1 : ds
827 | otherwise = d2 : insertDelay d1 rest
829 delayTime :: DelayReq -> USecs
830 delayTime (Delay t _) = t
831 delayTime (DelaySTM t _) = t
835 foreign import ccall unsafe "getUSecOfDay"
836 getUSecOfDay :: IO USecs
838 {-# NOINLINE prodding #-}
839 prodding :: IORef Bool
840 prodding = unsafePerformIO $ do
842 sharedCAF r getOrSetGHCConcProddingStore
844 foreign import ccall unsafe "getOrSetGHCConcProddingStore"
845 getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
847 prodServiceThread :: IO ()
848 prodServiceThread = do
849 -- NB. use atomicModifyIORef here, otherwise there are race
850 -- conditions in which prodding is left at True but the server is
851 -- blocked in select().
852 was_set <- atomicModifyIORef prodding $ \b -> (True,b)
853 unless was_set wakeupIOManager
855 -- Machinery needed to ensure that we only have one copy of certain
856 -- CAFs in this module even when the base package is present twice, as
857 -- it is when base is dynamically loaded into GHCi. The RTS keeps
858 -- track of the single true value of the CAF, so even when the CAFs in
859 -- the dynamically-loaded base package are reverted, nothing bad
862 sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
863 sharedCAF a get_or_set =
865 stable_ref <- newStablePtr a
866 let ref = castPtr (castStablePtrToPtr stable_ref)
867 ref2 <- get_or_set ref
870 else do freeStablePtr stable_ref
871 deRefStablePtr (castPtrToStablePtr (castPtr ref2))
873 #ifdef mingw32_HOST_OS
874 -- ----------------------------------------------------------------------------
875 -- Windows IO manager thread
879 wakeup <- c_getIOManagerEvent
880 service_loop wakeup []
882 service_loop :: HANDLE -- read end of pipe
883 -> [DelayReq] -- current delay requests
886 service_loop wakeup old_delays = do
887 -- pick up new delay requests
888 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
889 let delays = foldr insertDelay old_delays new_delays
892 (delays', timeout) <- getDelay now delays
894 r <- c_WaitForSingleObject wakeup timeout
896 0xffffffff -> do c_maperrno; throwErrno "service_loop"
898 r2 <- c_readIOManagerEvent
901 _ | r2 == io_MANAGER_WAKEUP -> return False
902 _ | r2 == io_MANAGER_DIE -> return True
903 0 -> return False -- spurious wakeup
904 _ -> do start_console_handler (r2 `shiftR` 1); return False
905 unless exit $ service_cont wakeup delays'
907 _other -> service_cont wakeup delays' -- probably timeout
909 service_cont :: HANDLE -> [DelayReq] -> IO ()
910 service_cont wakeup delays = do
911 r <- atomicModifyIORef prodding (\_ -> (False,False))
912 r `seq` return () -- avoid space leak
913 service_loop wakeup delays
915 -- must agree with rts/win32/ThrIOManager.c
916 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
917 io_MANAGER_WAKEUP = 0xffffffff
918 io_MANAGER_DIE = 0xfffffffe
924 -- these are sent to Services only.
927 deriving (Eq, Ord, Enum, Show, Read, Typeable)
929 start_console_handler :: Word32 -> IO ()
930 start_console_handler r =
931 case toWin32ConsoleEvent r of
932 Just x -> withMVar win32ConsoleHandler $ \handler -> do
933 _ <- forkIO (handler x)
937 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
938 toWin32ConsoleEvent ev =
940 0 {- CTRL_C_EVENT-} -> Just ControlC
941 1 {- CTRL_BREAK_EVENT-} -> Just Break
942 2 {- CTRL_CLOSE_EVENT-} -> Just Close
943 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
944 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
947 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
948 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
950 wakeupIOManager :: IO ()
951 wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
953 -- Walk the queue of pending delays, waking up any that have passed
954 -- and return the smallest delay to wait for. The queue of pending
955 -- delays is kept ordered.
956 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
957 getDelay _ [] = return ([], iNFINITE)
958 getDelay now all@(d : rest)
960 Delay time m | now >= time -> do
963 DelaySTM time t | now >= time -> do
964 atomically $ writeTVar t True
967 -- delay is in millisecs for WaitForSingleObject
968 let micro_seconds = delayTime d - now
969 milli_seconds = (micro_seconds + 999) `div` 1000
970 in return (all, fromIntegral milli_seconds)
972 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
973 -- available yet. We should move some Win32 functionality down here,
974 -- maybe as part of the grand reorganisation of the base package...
979 iNFINITE = 0xFFFFFFFF -- urgh
981 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
982 c_getIOManagerEvent :: IO HANDLE
984 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
985 c_readIOManagerEvent :: IO Word32
987 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
988 c_sendIOManagerEvent :: Word32 -> IO ()
990 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
993 foreign import stdcall "WaitForSingleObject"
994 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
997 -- ----------------------------------------------------------------------------
998 -- Unix IO manager thread, using select()
1002 allocaArray 2 $ \fds -> do
1003 throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
1004 rd_end <- peekElemOff fds 0
1005 wr_end <- peekElemOff fds 1
1006 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
1007 -- don't want them to block.
1008 setCloseOnExec rd_end
1009 setCloseOnExec wr_end
1010 c_setIOManagerPipe wr_end
1011 allocaBytes sizeofFdSet $ \readfds -> do
1012 allocaBytes sizeofFdSet $ \writefds -> do
1013 allocaBytes sizeofTimeVal $ \timeval -> do
1014 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1018 :: Fd -- listen to this for wakeup calls
1025 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1027 -- reset prodding before we look at the new requests. If a new
1028 -- client arrives after this point they will send a wakup which will
1029 -- cause the server to loop around again, so we can be sure to not
1030 -- miss any requests.
1032 -- NB. it's important to do this in the *first* iteration of
1033 -- service_loop, rather than after calling select(), since a client
1034 -- may have set prodding to True without sending a wakeup byte down
1035 -- the pipe, because the pipe wasn't set up.
1036 atomicModifyIORef prodding (\_ -> (False, ()))
1038 -- pick up new IO requests
1039 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1040 let reqs = new_reqs ++ old_reqs
1042 -- pick up new delay requests
1043 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1044 let delays0 = foldr insertDelay old_delays new_delays
1046 -- build the FDSets for select()
1049 fdSet wakeup readfds
1050 maxfd <- buildFdSets 0 readfds writefds reqs
1052 -- perform the select()
1053 let do_select delays = do
1054 -- check the current time and wake up any thread in
1055 -- threadDelay whose timeout has expired. Also find the
1056 -- timeout value for the select() call.
1058 (delays', timeout) <- getDelay now ptimeval delays
1060 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1066 _ | err == eINTR -> do_select delays'
1067 -- EINTR: just redo the select()
1068 _ | err == eBADF -> return (True, delays)
1069 -- EBADF: one of the file descriptors is closed or bad,
1070 -- we don't know which one, so wake everyone up.
1071 _ | otherwise -> throwErrno "select"
1072 -- otherwise (ENOMEM or EINVAL) something has gone
1073 -- wrong; report the error.
1075 return (False,delays')
1077 (wakeup_all,delays') <- do_select delays0
1080 if wakeup_all then return False
1082 b <- fdIsSet wakeup readfds
1085 else alloca $ \p -> do
1086 warnErrnoIfMinus1_ "service_loop" $
1087 c_read (fromIntegral wakeup) p 1
1090 _ | s == io_MANAGER_WAKEUP -> return False
1091 _ | s == io_MANAGER_DIE -> return True
1092 _ | s == io_MANAGER_SYNC -> do
1093 mvars <- readIORef sync
1094 mapM_ (flip putMVar ()) mvars
1097 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1098 withForeignPtr fp $ \p_siginfo -> do
1099 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1101 when (r /= fromIntegral sizeof_siginfo_t) $
1102 error "failed to read siginfo_t"
1103 runHandlers' fp (fromIntegral s)
1108 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1109 else completeRequests reqs readfds writefds []
1111 service_loop wakeup readfds writefds ptimeval reqs' delays'
1113 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1114 io_MANAGER_WAKEUP = 0xff
1115 io_MANAGER_DIE = 0xfe
1116 io_MANAGER_SYNC = 0xfd
1118 {-# NOINLINE sync #-}
1119 sync :: IORef [MVar ()]
1120 sync = unsafePerformIO (newIORef [])
1122 -- waits for the IO manager to drain the pipe
1123 syncIOManager :: IO ()
1126 atomicModifyIORef sync (\old -> (m:old,()))
1130 foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO ()
1131 foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
1133 -- For the non-threaded RTS
1134 runHandlers :: Ptr Word8 -> Int -> IO ()
1135 runHandlers p_info sig = do
1136 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1137 withForeignPtr fp $ \p -> do
1138 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1140 runHandlers' fp (fromIntegral sig)
1142 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1143 runHandlers' p_info sig = do
1144 let int = fromIntegral sig
1145 withMVar signal_handlers $ \arr ->
1146 if not (inRange (boundsIOArray arr) int)
1148 else do handler <- unsafeReadIOArray arr int
1150 Nothing -> return ()
1151 Just (f,_) -> do _ <- forkIO (f p_info)
1154 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1155 warnErrnoIfMinus1_ what io
1159 str <- strerror errno >>= peekCString
1161 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1163 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1165 foreign import ccall "setIOManagerPipe"
1166 c_setIOManagerPipe :: CInt -> IO ()
1168 foreign import ccall "__hscore_sizeof_siginfo_t"
1169 sizeof_siginfo_t :: CSize
1175 type HandlerFun = ForeignPtr Word8 -> IO ()
1177 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1178 -- this race condition is #1922, although that bug was on Windows a similar
1179 -- bug also exists on Unix.
1180 {-# NOINLINE signal_handlers #-}
1181 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1182 signal_handlers = unsafePerformIO $ do
1183 arr <- newIOArray (0,maxSig) Nothing
1185 sharedCAF m getOrSetGHCConcSignalHandlerStore
1187 foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
1188 getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
1190 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1191 setHandler sig handler = do
1192 let int = fromIntegral sig
1193 withMVar signal_handlers $ \arr ->
1194 if not (inRange (boundsIOArray arr) int)
1195 then error "GHC.Conc.setHandler: signal out of range"
1196 else do old <- unsafeReadIOArray arr int
1197 unsafeWriteIOArray arr int handler
1200 -- -----------------------------------------------------------------------------
1203 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1204 buildFdSets maxfd _ _ [] = return maxfd
1205 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1206 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1209 buildFdSets (max maxfd fd) readfds writefds reqs
1210 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1211 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1214 buildFdSets (max maxfd fd) readfds writefds reqs
1216 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1218 completeRequests [] _ _ reqs' = return reqs'
1219 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1220 b <- fdIsSet fd readfds
1222 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1223 else completeRequests reqs readfds writefds (Read fd m : reqs')
1224 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1225 b <- fdIsSet fd writefds
1227 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1228 else completeRequests reqs readfds writefds (Write fd m : reqs')
1230 wakeupAll :: [IOReq] -> IO ()
1231 wakeupAll [] = return ()
1232 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1233 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1235 waitForReadEvent :: Fd -> IO ()
1236 waitForReadEvent fd = do
1238 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1242 waitForWriteEvent :: Fd -> IO ()
1243 waitForWriteEvent fd = do
1245 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1249 -- -----------------------------------------------------------------------------
1252 -- Walk the queue of pending delays, waking up any that have passed
1253 -- and return the smallest delay to wait for. The queue of pending
1254 -- delays is kept ordered.
1255 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1256 getDelay _ _ [] = return ([],nullPtr)
1257 getDelay now ptimeval all@(d : rest)
1259 Delay time m | now >= time -> do
1261 getDelay now ptimeval rest
1262 DelaySTM time t | now >= time -> do
1263 atomically $ writeTVar t True
1264 getDelay now ptimeval rest
1266 setTimevalTicks ptimeval (delayTime d - now)
1267 return (all,ptimeval)
1271 foreign import ccall unsafe "sizeofTimeVal"
1272 sizeofTimeVal :: Int
1274 foreign import ccall unsafe "setTimevalTicks"
1275 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1278 On Win32 we're going to have a single Pipe, and a
1279 waitForSingleObject with the delay time. For signals, we send a
1280 byte down the pipe just like on Unix.
1283 -- ----------------------------------------------------------------------------
1284 -- select() interface
1286 -- ToDo: move to System.Posix.Internals?
1290 foreign import ccall safe "__hscore_select"
1291 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1294 foreign import ccall unsafe "hsFD_SETSIZE"
1295 c_fD_SETSIZE :: CInt
1298 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1300 foreign import ccall unsafe "hsFD_ISSET"
1301 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1303 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1304 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1306 foreign import ccall unsafe "hsFD_SET"
1307 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1309 fdSet :: Fd -> Ptr CFdSet -> IO ()
1310 fdSet (Fd fd) fdset = c_fdSet fd fdset
1312 foreign import ccall unsafe "hsFD_ZERO"
1313 fdZero :: Ptr CFdSet -> IO ()
1315 foreign import ccall unsafe "sizeof_fd_set"
1320 reportStackOverflow :: IO ()
1321 reportStackOverflow = callStackOverflowHook
1323 reportError :: SomeException -> IO ()
1325 handler <- getUncaughtExceptionHandler
1328 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1329 -- the unsafe below.
1330 foreign import ccall unsafe "stackOverflow"
1331 callStackOverflowHook :: IO ()
1333 {-# NOINLINE uncaughtExceptionHandler #-}
1334 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1335 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1337 defaultHandler :: SomeException -> IO ()
1338 defaultHandler se@(SomeException ex) = do
1339 (hFlush stdout) `catchAny` (\ _ -> return ())
1340 let msg = case cast ex of
1341 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1342 _ -> case cast ex of
1343 Just (ErrorCall s) -> s
1344 _ -> showsPrec 0 se ""
1345 withCString "%s" $ \cfmt ->
1346 withCString msg $ \cmsg ->
1347 errorBelch cfmt cmsg
1349 -- don't use errorBelch() directly, because we cannot call varargs functions
1351 foreign import ccall unsafe "HsBase.h errorBelch2"
1352 errorBelch :: CString -> CString -> IO ()
1354 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1355 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1357 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1358 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler