2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
118 #ifndef mingw32_HOST_OS
121 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
122 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
124 import GHC.IO.Exception
128 import GHC.Num ( Num(..) )
129 import GHC.Real ( fromIntegral )
130 #ifndef mingw32_HOST_OS
132 import GHC.Arr ( inRange )
134 #ifdef mingw32_HOST_OS
135 import GHC.Real ( div )
138 #ifdef mingw32_HOST_OS
139 import GHC.Read ( Read )
140 import GHC.Enum ( Enum )
142 import GHC.Pack ( packCString# )
143 import GHC.Show ( Show(..), showString )
145 infixr 0 `par`, `pseq`
148 %************************************************************************
150 \subsection{@ThreadId@, @par@, and @fork@}
152 %************************************************************************
155 data ThreadId = ThreadId ThreadId# deriving( Typeable )
156 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
157 -- But since ThreadId# is unlifted, the Weak type must use open
160 A 'ThreadId' is an abstract type representing a handle to a thread.
161 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
162 the 'Ord' instance implements an arbitrary total ordering over
163 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
164 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
165 useful when debugging or diagnosing the behaviour of a concurrent
168 /Note/: in GHC, if you have a 'ThreadId', you essentially have
169 a pointer to the thread itself. This means the thread itself can\'t be
170 garbage collected until you drop the 'ThreadId'.
171 This misfeature will hopefully be corrected at a later date.
173 /Note/: Hugs does not provide any operations on other threads;
174 it defines 'ThreadId' as a synonym for ().
177 instance Show ThreadId where
179 showString "ThreadId " .
180 showsPrec d (getThreadId (id2TSO t))
182 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
184 id2TSO :: ThreadId -> ThreadId#
185 id2TSO (ThreadId t) = t
187 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 cmpThread :: ThreadId -> ThreadId -> Ordering
192 case cmp_thread (id2TSO t1) (id2TSO t2) of
197 instance Eq ThreadId where
199 case t1 `cmpThread` t2 of
203 instance Ord ThreadId where
207 Sparks off a new thread to run the 'IO' computation passed as the
208 first argument, and returns the 'ThreadId' of the newly created
211 The new thread will be a lightweight thread; if you want to use a foreign
212 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
214 GHC note: the new thread inherits the /blocked/ state of the parent
215 (see 'Control.Exception.block').
217 The newly created thread has an exception handler that discards the
218 exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
219 'ThreadKilled', and passes all other exceptions to the uncaught
220 exception handler (see 'setUncaughtExceptionHandler').
222 forkIO :: IO () -> IO ThreadId
223 forkIO action = IO $ \ s ->
224 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
226 action_plus = catchException action childHandler
229 Like 'forkIO', but lets you specify on which CPU the thread is
230 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
231 will stay on the same CPU for its entire lifetime (`forkIO` threads
232 can migrate between CPUs according to the scheduling policy).
233 `forkOnIO` is useful for overriding the scheduling policy when you
234 know in advance how best to distribute the threads.
236 The `Int` argument specifies the CPU number; it is interpreted modulo
237 'numCapabilities' (note that it actually specifies a capability number
238 rather than a CPU number, but to a first approximation the two are
241 forkOnIO :: Int -> IO () -> IO ThreadId
242 forkOnIO (I# cpu) action = IO $ \ s ->
243 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
245 action_plus = catchException action childHandler
247 -- | the value passed to the @+RTS -N@ flag. This is the number of
248 -- Haskell threads that can run truly simultaneously at any given
249 -- time, and is typically set to the number of physical CPU cores on
251 numCapabilities :: Int
252 numCapabilities = unsafePerformIO $ do
253 n <- peek n_capabilities
254 return (fromIntegral n)
256 #if defined(mingw32_HOST_OS) && defined(__PIC__)
257 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
259 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
261 childHandler :: SomeException -> IO ()
262 childHandler err = catchException (real_handler err) childHandler
264 real_handler :: SomeException -> IO ()
265 real_handler se@(SomeException ex) =
266 -- ignore thread GC and killThread exceptions:
268 Just BlockedIndefinitelyOnMVar -> return ()
270 Just BlockedIndefinitelyOnSTM -> return ()
272 Just ThreadKilled -> return ()
274 -- report all others:
275 Just StackOverflow -> reportStackOverflow
278 {- | 'killThread' raises the 'ThreadKilled' exception in the given
281 > killThread tid = throwTo tid ThreadKilled
284 killThread :: ThreadId -> IO ()
285 killThread tid = throwTo tid ThreadKilled
287 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
289 'throwTo' does not return until the exception has been raised in the
291 The calling thread can thus be certain that the target
292 thread has received the exception. This is a useful property to know
293 when dealing with race conditions: eg. if there are two threads that
294 can kill each other, it is guaranteed that only one of the threads
295 will get to kill the other.
297 Whatever work the target thread was doing when the exception was
298 raised is not lost: the computation is suspended until required by
301 If the target thread is currently making a foreign call, then the
302 exception will not be raised (and hence 'throwTo' will not return)
303 until the call has completed. This is the case regardless of whether
304 the call is inside a 'block' or not.
306 Important note: the behaviour of 'throwTo' differs from that described in
307 the paper \"Asynchronous exceptions in Haskell\"
308 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
309 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
310 a more synchronous design in which 'throwTo' does not return until the exception
311 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
312 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
315 There is no guarantee that the exception will be delivered promptly,
316 although the runtime will endeavour to ensure that arbitrary
317 delays don't occur. In GHC, an exception can only be raised when a
318 thread reaches a /safe point/, where a safe point is where memory
319 allocation occurs. Some loops do not perform any memory allocation
320 inside the loop and therefore cannot be interrupted by a 'throwTo'.
322 Blocked 'throwTo' is fair: if multiple threads are trying to throw an
323 exception to the same target thread, they will succeed in FIFO order.
326 throwTo :: Exception e => ThreadId -> e -> IO ()
327 throwTo (ThreadId tid) ex = IO $ \ s ->
328 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
330 -- | Returns the 'ThreadId' of the calling thread (GHC only).
331 myThreadId :: IO ThreadId
332 myThreadId = IO $ \s ->
333 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
336 -- |The 'yield' action allows (forces, in a co-operative multitasking
337 -- implementation) a context-switch to any other currently runnable
338 -- threads (if any), and is occasionally useful when implementing
339 -- concurrency abstractions.
342 case (yield# s) of s1 -> (# s1, () #)
344 {- | 'labelThread' stores a string as identifier for this thread if
345 you built a RTS with debugging support. This identifier will be used in
346 the debugging output to make distinction of different threads easier
347 (otherwise you only have the thread state object\'s address in the heap).
349 Other applications like the graphical Concurrent Haskell Debugger
350 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
351 'labelThread' for their purposes as well.
354 labelThread :: ThreadId -> String -> IO ()
355 labelThread (ThreadId t) str = IO $ \ s ->
356 let !ps = packCString# str
357 !adr = byteArrayContents# ps in
358 case (labelThread# t adr s) of s1 -> (# s1, () #)
360 -- Nota Bene: 'pseq' used to be 'seq'
361 -- but 'seq' is now defined in PrelGHC
363 -- "pseq" is defined a bit weirdly (see below)
365 -- The reason for the strange "lazy" call is that
366 -- it fools the compiler into thinking that pseq and par are non-strict in
367 -- their second argument (even if it inlines pseq at the call site).
368 -- If it thinks pseq is strict in "y", then it often evaluates
369 -- "y" before "x", which is totally wrong.
373 pseq x y = x `seq` lazy y
377 par x y = case (par# x) of { _ -> lazy y }
379 -- | Internal function used by the RTS to run sparks.
382 where loop s = case getSpark# s of
384 if n ==# 0# then (# s', () #)
389 -- ^blocked on on 'MVar'
391 -- ^blocked on a computation in progress by another thread
393 -- ^blocked in 'throwTo'
395 -- ^blocked in 'retry' in an STM transaction
396 | BlockedOnForeignCall
397 -- ^currently in a foreign call
399 -- ^blocked on some other resource. Without @-threaded@,
400 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
401 -- they show up as 'BlockedOnMVar'.
402 deriving (Eq,Ord,Show)
404 -- | The current status of a thread
407 -- ^the thread is currently runnable or running
409 -- ^the thread has finished
410 | ThreadBlocked BlockReason
411 -- ^the thread is blocked on some resource
413 -- ^the thread received an uncaught exception
414 deriving (Eq,Ord,Show)
416 threadStatus :: ThreadId -> IO ThreadStatus
417 threadStatus (ThreadId t) = IO $ \s ->
418 case threadStatus# t s of
419 (# s', stat #) -> (# s', mk_stat (I# stat) #)
421 -- NB. keep these in sync with includes/Constants.h
422 mk_stat 0 = ThreadRunning
423 mk_stat 1 = ThreadBlocked BlockedOnMVar
424 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
425 mk_stat 3 = ThreadBlocked BlockedOnException
426 mk_stat 7 = ThreadBlocked BlockedOnSTM
427 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
428 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
429 mk_stat 16 = ThreadFinished
430 mk_stat 17 = ThreadDied
431 mk_stat _ = ThreadBlocked BlockedOnOther
435 %************************************************************************
437 \subsection[stm]{Transactional heap operations}
439 %************************************************************************
441 TVars are shared memory locations which support atomic memory
445 -- |A monad supporting atomic memory transactions.
446 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
448 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
451 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
453 instance Functor STM where
454 fmap f x = x >>= (return . f)
456 instance Monad STM where
457 {-# INLINE return #-}
461 return x = returnSTM x
462 m >>= k = bindSTM m k
464 bindSTM :: STM a -> (a -> STM b) -> STM b
465 bindSTM (STM m) k = STM ( \s ->
467 (# new_s, a #) -> unSTM (k a) new_s
470 thenSTM :: STM a -> STM b -> STM b
471 thenSTM (STM m) k = STM ( \s ->
473 (# new_s, _ #) -> unSTM k new_s
476 returnSTM :: a -> STM a
477 returnSTM x = STM (\s -> (# s, x #))
479 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
480 -- dangerous thing to do.
482 -- * The STM implementation will often run transactions multiple
483 -- times, so you need to be prepared for this if your IO has any
486 -- * The STM implementation will abort transactions that are known to
487 -- be invalid and need to be restarted. This may happen in the middle
488 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
489 -- that need releasing (exception handlers are ignored when aborting
490 -- the transaction). That includes doing any IO using Handles, for
491 -- example. Getting this wrong will probably lead to random deadlocks.
493 -- * The transaction may have seen an inconsistent view of memory when
494 -- the IO runs. Invariants that you expect to be true throughout
495 -- your program may not be true inside a transaction, due to the
496 -- way transactions are implemented. Normally this wouldn't be visible
497 -- to the programmer, but using `unsafeIOToSTM` can expose it.
499 unsafeIOToSTM :: IO a -> STM a
500 unsafeIOToSTM (IO m) = STM m
502 -- |Perform a series of STM actions atomically.
504 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
505 -- Any attempt to do so will result in a runtime error. (Reason: allowing
506 -- this would effectively allow a transaction inside a transaction, depending
507 -- on exactly when the thunk is evaluated.)
509 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
510 -- and which allows top-level TVars to be allocated.
512 atomically :: STM a -> IO a
513 atomically (STM m) = IO (\s -> (atomically# m) s )
515 -- |Retry execution of the current memory transaction because it has seen
516 -- values in TVars which mean that it should not continue (e.g. the TVars
517 -- represent a shared buffer that is now empty). The implementation may
518 -- block the thread until one of the TVars that it has read from has been
519 -- udpated. (GHC only)
521 retry = STM $ \s# -> retry# s#
523 -- |Compose two alternative STM actions (GHC only). If the first action
524 -- completes without retrying then it forms the result of the orElse.
525 -- Otherwise, if the first action retries, then the second action is
526 -- tried in its place. If both actions retry then the orElse as a
528 orElse :: STM a -> STM a -> STM a
529 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
531 -- |Exception handling within STM actions.
532 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
533 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
535 -- | Low-level primitive on which always and alwaysSucceeds are built.
536 -- checkInv differs form these in that (i) the invariant is not
537 -- checked when checkInv is called, only at the end of this and
538 -- subsequent transcations, (ii) the invariant failure is indicated
539 -- by raising an exception.
540 checkInv :: STM a -> STM ()
541 checkInv (STM m) = STM (\s -> (check# m) s)
543 -- | alwaysSucceeds adds a new invariant that must be true when passed
544 -- to alwaysSucceeds, at the end of the current transaction, and at
545 -- the end of every subsequent transaction. If it fails at any
546 -- of those points then the transaction violating it is aborted
547 -- and the exception raised by the invariant is propagated.
548 alwaysSucceeds :: STM a -> STM ()
549 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
552 -- | always is a variant of alwaysSucceeds in which the invariant is
553 -- expressed as an STM Bool action that must return True. Returning
554 -- False or raising an exception are both treated as invariant failures.
555 always :: STM Bool -> STM ()
556 always i = alwaysSucceeds ( do v <- i
557 if (v) then return () else ( error "Transacional invariant violation" ) )
559 -- |Shared memory locations that support atomic memory transactions.
560 data TVar a = TVar (TVar# RealWorld a)
562 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
564 instance Eq (TVar a) where
565 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
567 -- |Create a new TVar holding a value supplied
568 newTVar :: a -> STM (TVar a)
569 newTVar val = STM $ \s1# ->
570 case newTVar# val s1# of
571 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
573 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
574 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
575 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
577 newTVarIO :: a -> IO (TVar a)
578 newTVarIO val = IO $ \s1# ->
579 case newTVar# val s1# of
580 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
582 -- |Return the current value stored in a TVar.
583 -- This is equivalent to
585 -- > readTVarIO = atomically . readTVar
587 -- but works much faster, because it doesn't perform a complete
588 -- transaction, it just reads the current value of the 'TVar'.
589 readTVarIO :: TVar a -> IO a
590 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
592 -- |Return the current value stored in a TVar
593 readTVar :: TVar a -> STM a
594 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
596 -- |Write the supplied value into a TVar
597 writeTVar :: TVar a -> a -> STM ()
598 writeTVar (TVar tvar#) val = STM $ \s1# ->
599 case writeTVar# tvar# val s1# of
607 withMVar :: MVar a -> (a -> IO b) -> IO b
611 b <- catchAny (unblock (io a))
612 (\e -> do putMVar m a; throw e)
616 modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
620 a' <- catchAny (unblock (io a))
621 (\e -> do putMVar m a; throw e)
626 %************************************************************************
628 \subsection{Thread waiting}
630 %************************************************************************
633 #ifdef mingw32_HOST_OS
635 -- Note: threadWaitRead and threadWaitWrite aren't really functional
636 -- on Win32, but left in there because lib code (still) uses them (the manner
637 -- in which they're used doesn't cause problems on a Win32 platform though.)
639 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
640 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
641 IO $ \s -> case asyncRead# fd isSock len buf s of
642 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
644 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
645 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
646 IO $ \s -> case asyncWrite# fd isSock len buf s of
647 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
649 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
650 asyncDoProc (FunPtr proc) (Ptr param) =
651 -- the 'length' value is ignored; simplifies implementation of
652 -- the async*# primops to have them all return the same result.
653 IO $ \s -> case asyncDoProc# proc param s of
654 (# s', _len#, err# #) -> (# s', I# err# #)
656 -- to aid the use of these primops by the IO Handle implementation,
657 -- provide the following convenience funs:
659 -- this better be a pinned byte array!
660 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
661 asyncReadBA fd isSock len off bufB =
662 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
664 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
665 asyncWriteBA fd isSock len off bufB =
666 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
670 -- -----------------------------------------------------------------------------
673 -- | Block the current thread until data is available to read on the
674 -- given file descriptor (GHC only).
675 threadWaitRead :: Fd -> IO ()
677 #ifndef mingw32_HOST_OS
678 | threaded = waitForReadEvent fd
680 | otherwise = IO $ \s ->
681 case fromIntegral fd of { I# fd# ->
682 case waitRead# fd# s of { s' -> (# s', () #)
685 -- | Block the current thread until data can be written to the
686 -- given file descriptor (GHC only).
687 threadWaitWrite :: Fd -> IO ()
689 #ifndef mingw32_HOST_OS
690 | threaded = waitForWriteEvent fd
692 | otherwise = IO $ \s ->
693 case fromIntegral fd of { I# fd# ->
694 case waitWrite# fd# s of { s' -> (# s', () #)
697 -- | Suspends the current thread for a given number of microseconds
700 -- There is no guarantee that the thread will be rescheduled promptly
701 -- when the delay has expired, but the thread will never continue to
702 -- run /earlier/ than specified.
704 threadDelay :: Int -> IO ()
706 | threaded = waitForDelayEvent time
707 | otherwise = IO $ \s ->
708 case fromIntegral time of { I# time# ->
709 case delay# time# s of { s' -> (# s', () #)
713 -- | Set the value of returned TVar to True after a given number of
714 -- microseconds. The caveats associated with threadDelay also apply.
716 registerDelay :: Int -> IO (TVar Bool)
718 | threaded = waitForDelayEventSTM usecs
719 | otherwise = error "registerDelay: requires -threaded"
721 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
723 waitForDelayEvent :: Int -> IO ()
724 waitForDelayEvent usecs = do
726 target <- calculateTarget usecs
727 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
731 -- Delays for use in STM
732 waitForDelayEventSTM :: Int -> IO (TVar Bool)
733 waitForDelayEventSTM usecs = do
734 t <- atomically $ newTVar False
735 target <- calculateTarget usecs
736 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
740 calculateTarget :: Int -> IO USecs
741 calculateTarget usecs = do
743 return $ now + (fromIntegral usecs)
746 -- ----------------------------------------------------------------------------
747 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
749 -- In the threaded RTS, we employ a single IO Manager thread to wait
750 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
751 -- and delays (threadDelay).
753 -- We can do this because in the threaded RTS the IO Manager can make
754 -- a non-blocking call to select(), so we don't have to do select() in
755 -- the scheduler as we have to in the non-threaded RTS. We get performance
756 -- benefits from doing it this way, because we only have to restart the select()
757 -- when a new request arrives, rather than doing one select() each time
758 -- around the scheduler loop. Furthermore, the scheduler can be simplified
759 -- by not having to check for completed IO requests.
761 #ifndef mingw32_HOST_OS
763 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
764 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
768 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
769 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
771 #ifndef mingw32_HOST_OS
772 {-# NOINLINE pendingEvents #-}
773 pendingEvents :: IORef [IOReq]
774 pendingEvents = unsafePerformIO $ do
776 sharedCAF m getOrSetGHCConcPendingEventsStore
778 foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
779 getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
782 {-# NOINLINE pendingDelays #-}
783 pendingDelays :: IORef [DelayReq]
784 pendingDelays = unsafePerformIO $ do
786 sharedCAF m getOrSetGHCConcPendingDelaysStore
788 foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
789 getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
791 {-# NOINLINE ioManagerThread #-}
792 ioManagerThread :: MVar (Maybe ThreadId)
793 ioManagerThread = unsafePerformIO $ do
795 sharedCAF m getOrSetGHCConcIOManagerThreadStore
797 foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
798 getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
800 ensureIOManagerIsRunning :: IO ()
801 ensureIOManagerIsRunning
802 | threaded = startIOManagerThread
803 | otherwise = return ()
805 startIOManagerThread :: IO ()
806 startIOManagerThread = do
807 modifyMVar_ ioManagerThread $ \old -> do
808 let create = do t <- forkIO ioManager; return (Just t)
814 ThreadFinished -> create
816 _other -> return (Just t)
818 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
819 insertDelay d [] = [d]
820 insertDelay d1 ds@(d2 : rest)
821 | delayTime d1 <= delayTime d2 = d1 : ds
822 | otherwise = d2 : insertDelay d1 rest
824 delayTime :: DelayReq -> USecs
825 delayTime (Delay t _) = t
826 delayTime (DelaySTM t _) = t
830 foreign import ccall unsafe "getUSecOfDay"
831 getUSecOfDay :: IO USecs
833 {-# NOINLINE prodding #-}
834 prodding :: IORef Bool
835 prodding = unsafePerformIO $ do
837 sharedCAF r getOrSetGHCConcProddingStore
839 foreign import ccall unsafe "getOrSetGHCConcProddingStore"
840 getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
842 prodServiceThread :: IO ()
843 prodServiceThread = do
844 -- NB. use atomicModifyIORef here, otherwise there are race
845 -- conditions in which prodding is left at True but the server is
846 -- blocked in select().
847 was_set <- atomicModifyIORef prodding $ \b -> (True,b)
848 unless was_set wakeupIOManager
850 -- Machinery needed to ensure that we only have one copy of certain
851 -- CAFs in this module even when the base package is present twice, as
852 -- it is when base is dynamically loaded into GHCi. The RTS keeps
853 -- track of the single true value of the CAF, so even when the CAFs in
854 -- the dynamically-loaded base package are reverted, nothing bad
857 sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
858 sharedCAF a get_or_set =
860 stable_ref <- newStablePtr a
861 let ref = castPtr (castStablePtrToPtr stable_ref)
862 ref2 <- get_or_set ref
865 else do freeStablePtr stable_ref
866 deRefStablePtr (castPtrToStablePtr (castPtr ref2))
868 #ifdef mingw32_HOST_OS
869 -- ----------------------------------------------------------------------------
870 -- Windows IO manager thread
874 wakeup <- c_getIOManagerEvent
875 service_loop wakeup []
877 service_loop :: HANDLE -- read end of pipe
878 -> [DelayReq] -- current delay requests
881 service_loop wakeup old_delays = do
882 -- pick up new delay requests
883 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
884 let delays = foldr insertDelay old_delays new_delays
887 (delays', timeout) <- getDelay now delays
889 r <- c_WaitForSingleObject wakeup timeout
891 0xffffffff -> do c_maperrno; throwErrno "service_loop"
893 r2 <- c_readIOManagerEvent
896 _ | r2 == io_MANAGER_WAKEUP -> return False
897 _ | r2 == io_MANAGER_DIE -> return True
898 0 -> return False -- spurious wakeup
899 _ -> do start_console_handler (r2 `shiftR` 1); return False
900 unless exit $ service_cont wakeup delays'
902 _other -> service_cont wakeup delays' -- probably timeout
904 service_cont :: HANDLE -> [DelayReq] -> IO ()
905 service_cont wakeup delays = do
906 r <- atomicModifyIORef prodding (\_ -> (False,False))
907 r `seq` return () -- avoid space leak
908 service_loop wakeup delays
910 -- must agree with rts/win32/ThrIOManager.c
911 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
912 io_MANAGER_WAKEUP = 0xffffffff
913 io_MANAGER_DIE = 0xfffffffe
919 -- these are sent to Services only.
922 deriving (Eq, Ord, Enum, Show, Read, Typeable)
924 start_console_handler :: Word32 -> IO ()
925 start_console_handler r =
926 case toWin32ConsoleEvent r of
927 Just x -> withMVar win32ConsoleHandler $ \handler -> do
928 _ <- forkIO (handler x)
932 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
933 toWin32ConsoleEvent ev =
935 0 {- CTRL_C_EVENT-} -> Just ControlC
936 1 {- CTRL_BREAK_EVENT-} -> Just Break
937 2 {- CTRL_CLOSE_EVENT-} -> Just Close
938 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
939 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
942 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
943 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
945 wakeupIOManager :: IO ()
946 wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
948 -- Walk the queue of pending delays, waking up any that have passed
949 -- and return the smallest delay to wait for. The queue of pending
950 -- delays is kept ordered.
951 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
952 getDelay _ [] = return ([], iNFINITE)
953 getDelay now all@(d : rest)
955 Delay time m | now >= time -> do
958 DelaySTM time t | now >= time -> do
959 atomically $ writeTVar t True
962 -- delay is in millisecs for WaitForSingleObject
963 let micro_seconds = delayTime d - now
964 milli_seconds = (micro_seconds + 999) `div` 1000
965 in return (all, fromIntegral milli_seconds)
967 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
968 -- available yet. We should move some Win32 functionality down here,
969 -- maybe as part of the grand reorganisation of the base package...
974 iNFINITE = 0xFFFFFFFF -- urgh
976 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
977 c_getIOManagerEvent :: IO HANDLE
979 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
980 c_readIOManagerEvent :: IO Word32
982 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
983 c_sendIOManagerEvent :: Word32 -> IO ()
985 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
988 foreign import stdcall "WaitForSingleObject"
989 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
992 -- ----------------------------------------------------------------------------
993 -- Unix IO manager thread, using select()
997 allocaArray 2 $ \fds -> do
998 throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
999 rd_end <- peekElemOff fds 0
1000 wr_end <- peekElemOff fds 1
1001 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
1002 -- don't want them to block.
1003 setCloseOnExec rd_end
1004 setCloseOnExec wr_end
1005 c_setIOManagerPipe wr_end
1006 allocaBytes sizeofFdSet $ \readfds -> do
1007 allocaBytes sizeofFdSet $ \writefds -> do
1008 allocaBytes sizeofTimeVal $ \timeval -> do
1009 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1013 :: Fd -- listen to this for wakeup calls
1020 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1022 -- reset prodding before we look at the new requests. If a new
1023 -- client arrives after this point they will send a wakup which will
1024 -- cause the server to loop around again, so we can be sure to not
1025 -- miss any requests.
1027 -- NB. it's important to do this in the *first* iteration of
1028 -- service_loop, rather than after calling select(), since a client
1029 -- may have set prodding to True without sending a wakeup byte down
1030 -- the pipe, because the pipe wasn't set up.
1031 atomicModifyIORef prodding (\_ -> (False, ()))
1033 -- pick up new IO requests
1034 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1035 let reqs = new_reqs ++ old_reqs
1037 -- pick up new delay requests
1038 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1039 let delays0 = foldr insertDelay old_delays new_delays
1041 -- build the FDSets for select()
1044 fdSet wakeup readfds
1045 maxfd <- buildFdSets 0 readfds writefds reqs
1047 -- perform the select()
1048 let do_select delays = do
1049 -- check the current time and wake up any thread in
1050 -- threadDelay whose timeout has expired. Also find the
1051 -- timeout value for the select() call.
1053 (delays', timeout) <- getDelay now ptimeval delays
1055 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1061 _ | err == eINTR -> do_select delays'
1062 -- EINTR: just redo the select()
1063 _ | err == eBADF -> return (True, delays)
1064 -- EBADF: one of the file descriptors is closed or bad,
1065 -- we don't know which one, so wake everyone up.
1066 _ | otherwise -> throwErrno "select"
1067 -- otherwise (ENOMEM or EINVAL) something has gone
1068 -- wrong; report the error.
1070 return (False,delays')
1072 (wakeup_all,delays') <- do_select delays0
1075 if wakeup_all then return False
1077 b <- fdIsSet wakeup readfds
1080 else alloca $ \p -> do
1081 warnErrnoIfMinus1_ "service_loop" $
1082 c_read (fromIntegral wakeup) p 1
1085 _ | s == io_MANAGER_WAKEUP -> return False
1086 _ | s == io_MANAGER_DIE -> return True
1087 _ | s == io_MANAGER_SYNC -> do
1088 mvars <- readIORef sync
1089 mapM_ (flip putMVar ()) mvars
1092 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1093 withForeignPtr fp $ \p_siginfo -> do
1094 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1096 when (r /= fromIntegral sizeof_siginfo_t) $
1097 error "failed to read siginfo_t"
1098 runHandlers' fp (fromIntegral s)
1103 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1104 else completeRequests reqs readfds writefds []
1106 service_loop wakeup readfds writefds ptimeval reqs' delays'
1108 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1109 io_MANAGER_WAKEUP = 0xff
1110 io_MANAGER_DIE = 0xfe
1111 io_MANAGER_SYNC = 0xfd
1113 {-# NOINLINE sync #-}
1114 sync :: IORef [MVar ()]
1115 sync = unsafePerformIO (newIORef [])
1117 -- waits for the IO manager to drain the pipe
1118 syncIOManager :: IO ()
1121 atomicModifyIORef sync (\old -> (m:old,()))
1125 foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO ()
1126 foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
1128 -- For the non-threaded RTS
1129 runHandlers :: Ptr Word8 -> Int -> IO ()
1130 runHandlers p_info sig = do
1131 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1132 withForeignPtr fp $ \p -> do
1133 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1135 runHandlers' fp (fromIntegral sig)
1137 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1138 runHandlers' p_info sig = do
1139 let int = fromIntegral sig
1140 withMVar signal_handlers $ \arr ->
1141 if not (inRange (boundsIOArray arr) int)
1143 else do handler <- unsafeReadIOArray arr int
1145 Nothing -> return ()
1146 Just (f,_) -> do _ <- forkIO (f p_info)
1149 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1150 warnErrnoIfMinus1_ what io
1154 str <- strerror errno >>= peekCString
1156 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1158 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1160 foreign import ccall "setIOManagerPipe"
1161 c_setIOManagerPipe :: CInt -> IO ()
1163 foreign import ccall "__hscore_sizeof_siginfo_t"
1164 sizeof_siginfo_t :: CSize
1170 type HandlerFun = ForeignPtr Word8 -> IO ()
1172 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1173 -- this race condition is #1922, although that bug was on Windows a similar
1174 -- bug also exists on Unix.
1175 {-# NOINLINE signal_handlers #-}
1176 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1177 signal_handlers = unsafePerformIO $ do
1178 arr <- newIOArray (0,maxSig) Nothing
1180 sharedCAF m getOrSetGHCConcSignalHandlerStore
1182 foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
1183 getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
1185 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1186 setHandler sig handler = do
1187 let int = fromIntegral sig
1188 withMVar signal_handlers $ \arr ->
1189 if not (inRange (boundsIOArray arr) int)
1190 then error "GHC.Conc.setHandler: signal out of range"
1191 else do old <- unsafeReadIOArray arr int
1192 unsafeWriteIOArray arr int handler
1195 -- -----------------------------------------------------------------------------
1198 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1199 buildFdSets maxfd _ _ [] = return maxfd
1200 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1201 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1204 buildFdSets (max maxfd fd) readfds writefds reqs
1205 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1206 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1209 buildFdSets (max maxfd fd) readfds writefds reqs
1211 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1213 completeRequests [] _ _ reqs' = return reqs'
1214 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1215 b <- fdIsSet fd readfds
1217 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1218 else completeRequests reqs readfds writefds (Read fd m : reqs')
1219 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1220 b <- fdIsSet fd writefds
1222 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1223 else completeRequests reqs readfds writefds (Write fd m : reqs')
1225 wakeupAll :: [IOReq] -> IO ()
1226 wakeupAll [] = return ()
1227 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1228 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1230 waitForReadEvent :: Fd -> IO ()
1231 waitForReadEvent fd = do
1233 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1237 waitForWriteEvent :: Fd -> IO ()
1238 waitForWriteEvent fd = do
1240 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1244 -- -----------------------------------------------------------------------------
1247 -- Walk the queue of pending delays, waking up any that have passed
1248 -- and return the smallest delay to wait for. The queue of pending
1249 -- delays is kept ordered.
1250 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1251 getDelay _ _ [] = return ([],nullPtr)
1252 getDelay now ptimeval all@(d : rest)
1254 Delay time m | now >= time -> do
1256 getDelay now ptimeval rest
1257 DelaySTM time t | now >= time -> do
1258 atomically $ writeTVar t True
1259 getDelay now ptimeval rest
1261 setTimevalTicks ptimeval (delayTime d - now)
1262 return (all,ptimeval)
1266 foreign import ccall unsafe "sizeofTimeVal"
1267 sizeofTimeVal :: Int
1269 foreign import ccall unsafe "setTimevalTicks"
1270 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1273 On Win32 we're going to have a single Pipe, and a
1274 waitForSingleObject with the delay time. For signals, we send a
1275 byte down the pipe just like on Unix.
1278 -- ----------------------------------------------------------------------------
1279 -- select() interface
1281 -- ToDo: move to System.Posix.Internals?
1285 foreign import ccall safe "__hscore_select"
1286 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1289 foreign import ccall unsafe "hsFD_SETSIZE"
1290 c_fD_SETSIZE :: CInt
1293 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1295 foreign import ccall unsafe "hsFD_ISSET"
1296 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1298 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1299 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1301 foreign import ccall unsafe "hsFD_SET"
1302 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1304 fdSet :: Fd -> Ptr CFdSet -> IO ()
1305 fdSet (Fd fd) fdset = c_fdSet fd fdset
1307 foreign import ccall unsafe "hsFD_ZERO"
1308 fdZero :: Ptr CFdSet -> IO ()
1310 foreign import ccall unsafe "sizeof_fd_set"
1315 reportStackOverflow :: IO ()
1316 reportStackOverflow = callStackOverflowHook
1318 reportError :: SomeException -> IO ()
1320 handler <- getUncaughtExceptionHandler
1323 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1324 -- the unsafe below.
1325 foreign import ccall unsafe "stackOverflow"
1326 callStackOverflowHook :: IO ()
1328 {-# NOINLINE uncaughtExceptionHandler #-}
1329 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1330 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1332 defaultHandler :: SomeException -> IO ()
1333 defaultHandler se@(SomeException ex) = do
1334 (hFlush stdout) `catchAny` (\ _ -> return ())
1335 let msg = case cast ex of
1336 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1337 _ -> case cast ex of
1338 Just (ErrorCall s) -> s
1339 _ -> showsPrec 0 se ""
1340 withCString "%s" $ \cfmt ->
1341 withCString msg $ \cmsg ->
1342 errorBelch cfmt cmsg
1344 -- don't use errorBelch() directly, because we cannot call varargs functions
1346 foreign import ccall unsafe "HsBase.h errorBelch2"
1347 errorBelch :: CString -> CString -> IO ()
1349 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1350 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1352 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1353 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler