2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
118 #ifndef mingw32_HOST_OS
121 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
122 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
124 import GHC.IO.Exception
128 import GHC.Num ( Num(..) )
129 import GHC.Real ( fromIntegral )
130 #ifndef mingw32_HOST_OS
132 import GHC.Arr ( inRange )
134 #ifdef mingw32_HOST_OS
135 import GHC.Real ( div )
138 #ifdef mingw32_HOST_OS
139 import GHC.Read ( Read )
140 import GHC.Enum ( Enum )
142 import GHC.Pack ( packCString# )
143 import GHC.Show ( Show(..), showString )
145 infixr 0 `par`, `pseq`
148 %************************************************************************
150 \subsection{@ThreadId@, @par@, and @fork@}
152 %************************************************************************
155 data ThreadId = ThreadId ThreadId# deriving( Typeable )
156 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
157 -- But since ThreadId# is unlifted, the Weak type must use open
160 A 'ThreadId' is an abstract type representing a handle to a thread.
161 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
162 the 'Ord' instance implements an arbitrary total ordering over
163 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
164 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
165 useful when debugging or diagnosing the behaviour of a concurrent
168 /Note/: in GHC, if you have a 'ThreadId', you essentially have
169 a pointer to the thread itself. This means the thread itself can\'t be
170 garbage collected until you drop the 'ThreadId'.
171 This misfeature will hopefully be corrected at a later date.
173 /Note/: Hugs does not provide any operations on other threads;
174 it defines 'ThreadId' as a synonym for ().
177 instance Show ThreadId where
179 showString "ThreadId " .
180 showsPrec d (getThreadId (id2TSO t))
182 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
184 id2TSO :: ThreadId -> ThreadId#
185 id2TSO (ThreadId t) = t
187 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 cmpThread :: ThreadId -> ThreadId -> Ordering
192 case cmp_thread (id2TSO t1) (id2TSO t2) of
197 instance Eq ThreadId where
199 case t1 `cmpThread` t2 of
203 instance Ord ThreadId where
207 Sparks off a new thread to run the 'IO' computation passed as the
208 first argument, and returns the 'ThreadId' of the newly created
211 The new thread will be a lightweight thread; if you want to use a foreign
212 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
214 GHC note: the new thread inherits the /blocked/ state of the parent
215 (see 'Control.Exception.block').
217 The newly created thread has an exception handler that discards the
218 exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
219 'ThreadKilled', and passes all other exceptions to the uncaught
220 exception handler (see 'setUncaughtExceptionHandler').
222 forkIO :: IO () -> IO ThreadId
223 forkIO action = IO $ \ s ->
224 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
226 action_plus = catchException action childHandler
229 Like 'forkIO', but lets you specify on which CPU the thread is
230 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
231 will stay on the same CPU for its entire lifetime (`forkIO` threads
232 can migrate between CPUs according to the scheduling policy).
233 `forkOnIO` is useful for overriding the scheduling policy when you
234 know in advance how best to distribute the threads.
236 The `Int` argument specifies the CPU number; it is interpreted modulo
237 'numCapabilities' (note that it actually specifies a capability number
238 rather than a CPU number, but to a first approximation the two are
241 forkOnIO :: Int -> IO () -> IO ThreadId
242 forkOnIO (I# cpu) action = IO $ \ s ->
243 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
245 action_plus = catchException action childHandler
247 -- | the value passed to the @+RTS -N@ flag. This is the number of
248 -- Haskell threads that can run truly simultaneously at any given
249 -- time, and is typically set to the number of physical CPU cores on
251 numCapabilities :: Int
252 numCapabilities = unsafePerformIO $ do
253 n <- peek n_capabilities
254 return (fromIntegral n)
256 #if defined(mingw32_HOST_OS) && defined(__PIC__)
257 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
259 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
261 childHandler :: SomeException -> IO ()
262 childHandler err = catchException (real_handler err) childHandler
264 real_handler :: SomeException -> IO ()
265 real_handler se@(SomeException ex) =
266 -- ignore thread GC and killThread exceptions:
268 Just BlockedIndefinitelyOnMVar -> return ()
270 Just BlockedIndefinitelyOnSTM -> return ()
272 Just ThreadKilled -> return ()
274 -- report all others:
275 Just StackOverflow -> reportStackOverflow
278 {- | 'killThread' raises the 'ThreadKilled' exception in the given
281 > killThread tid = throwTo tid ThreadKilled
284 killThread :: ThreadId -> IO ()
285 killThread tid = throwTo tid ThreadKilled
287 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
289 'throwTo' does not return until the exception has been raised in the
291 The calling thread can thus be certain that the target
292 thread has received the exception. This is a useful property to know
293 when dealing with race conditions: eg. if there are two threads that
294 can kill each other, it is guaranteed that only one of the threads
295 will get to kill the other.
297 Whatever work the target thread was doing when the exception was
298 raised is not lost: the computation is suspended until required by
301 If the target thread is currently making a foreign call, then the
302 exception will not be raised (and hence 'throwTo' will not return)
303 until the call has completed. This is the case regardless of whether
304 the call is inside a 'block' or not.
306 Important note: the behaviour of 'throwTo' differs from that described in
307 the paper \"Asynchronous exceptions in Haskell\"
308 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
309 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
310 a more synchronous design in which 'throwTo' does not return until the exception
311 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
312 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
315 There is no guarantee that the exception will be delivered promptly,
316 although the runtime will endeavour to ensure that arbitrary
317 delays don't occur. In GHC, an exception can only be raised when a
318 thread reaches a /safe point/, where a safe point is where memory
319 allocation occurs. Some loops do not perform any memory allocation
320 inside the loop and therefore cannot be interrupted by a 'throwTo'.
322 Blocked 'throwTo' is fair: if multiple threads are trying to throw an
323 exception to the same target thread, they will succeed in FIFO order.
326 throwTo :: Exception e => ThreadId -> e -> IO ()
327 throwTo (ThreadId tid) ex = IO $ \ s ->
328 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
330 -- | Returns the 'ThreadId' of the calling thread (GHC only).
331 myThreadId :: IO ThreadId
332 myThreadId = IO $ \s ->
333 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
336 -- |The 'yield' action allows (forces, in a co-operative multitasking
337 -- implementation) a context-switch to any other currently runnable
338 -- threads (if any), and is occasionally useful when implementing
339 -- concurrency abstractions.
342 case (yield# s) of s1 -> (# s1, () #)
344 {- | 'labelThread' stores a string as identifier for this thread if
345 you built a RTS with debugging support. This identifier will be used in
346 the debugging output to make distinction of different threads easier
347 (otherwise you only have the thread state object\'s address in the heap).
349 Other applications like the graphical Concurrent Haskell Debugger
350 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
351 'labelThread' for their purposes as well.
354 labelThread :: ThreadId -> String -> IO ()
355 labelThread (ThreadId t) str = IO $ \ s ->
356 let !ps = packCString# str
357 !adr = byteArrayContents# ps in
358 case (labelThread# t adr s) of s1 -> (# s1, () #)
360 -- Nota Bene: 'pseq' used to be 'seq'
361 -- but 'seq' is now defined in PrelGHC
363 -- "pseq" is defined a bit weirdly (see below)
365 -- The reason for the strange "lazy" call is that
366 -- it fools the compiler into thinking that pseq and par are non-strict in
367 -- their second argument (even if it inlines pseq at the call site).
368 -- If it thinks pseq is strict in "y", then it often evaluates
369 -- "y" before "x", which is totally wrong.
373 pseq x y = x `seq` lazy y
377 par x y = case (par# x) of { _ -> lazy y }
379 -- | Internal function used by the RTS to run sparks.
382 where loop s = case getSpark# s of
384 if n ==# 0# then (# s', () #)
389 -- ^blocked on on 'MVar'
391 -- ^blocked on a computation in progress by another thread
393 -- ^blocked in 'throwTo'
395 -- ^blocked in 'retry' in an STM transaction
396 | BlockedOnForeignCall
397 -- ^currently in a foreign call
399 -- ^blocked on some other resource. Without @-threaded@,
400 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
401 -- they show up as 'BlockedOnMVar'.
402 deriving (Eq,Ord,Show)
404 -- | The current status of a thread
407 -- ^the thread is currently runnable or running
409 -- ^the thread has finished
410 | ThreadBlocked BlockReason
411 -- ^the thread is blocked on some resource
413 -- ^the thread received an uncaught exception
414 deriving (Eq,Ord,Show)
416 threadStatus :: ThreadId -> IO ThreadStatus
417 threadStatus (ThreadId t) = IO $ \s ->
418 case threadStatus# t s of
419 (# s', stat #) -> (# s', mk_stat (I# stat) #)
421 -- NB. keep these in sync with includes/Constants.h
422 mk_stat 0 = ThreadRunning
423 mk_stat 1 = ThreadBlocked BlockedOnMVar
424 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
425 mk_stat 3 = ThreadBlocked BlockedOnException
426 mk_stat 7 = ThreadBlocked BlockedOnSTM
427 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
428 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
429 mk_stat 16 = ThreadFinished
430 mk_stat 17 = ThreadDied
431 mk_stat _ = ThreadBlocked BlockedOnOther
435 %************************************************************************
437 \subsection[stm]{Transactional heap operations}
439 %************************************************************************
441 TVars are shared memory locations which support atomic memory
445 -- |A monad supporting atomic memory transactions.
446 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
448 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
451 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
453 instance Functor STM where
454 fmap f x = x >>= (return . f)
456 instance Monad STM where
457 {-# INLINE return #-}
461 return x = returnSTM x
462 m >>= k = bindSTM m k
464 bindSTM :: STM a -> (a -> STM b) -> STM b
465 bindSTM (STM m) k = STM ( \s ->
467 (# new_s, a #) -> unSTM (k a) new_s
470 thenSTM :: STM a -> STM b -> STM b
471 thenSTM (STM m) k = STM ( \s ->
473 (# new_s, _ #) -> unSTM k new_s
476 returnSTM :: a -> STM a
477 returnSTM x = STM (\s -> (# s, x #))
479 instance MonadPlus STM where
483 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
484 -- dangerous thing to do.
486 -- * The STM implementation will often run transactions multiple
487 -- times, so you need to be prepared for this if your IO has any
490 -- * The STM implementation will abort transactions that are known to
491 -- be invalid and need to be restarted. This may happen in the middle
492 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
493 -- that need releasing (exception handlers are ignored when aborting
494 -- the transaction). That includes doing any IO using Handles, for
495 -- example. Getting this wrong will probably lead to random deadlocks.
497 -- * The transaction may have seen an inconsistent view of memory when
498 -- the IO runs. Invariants that you expect to be true throughout
499 -- your program may not be true inside a transaction, due to the
500 -- way transactions are implemented. Normally this wouldn't be visible
501 -- to the programmer, but using `unsafeIOToSTM` can expose it.
503 unsafeIOToSTM :: IO a -> STM a
504 unsafeIOToSTM (IO m) = STM m
506 -- |Perform a series of STM actions atomically.
508 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
509 -- Any attempt to do so will result in a runtime error. (Reason: allowing
510 -- this would effectively allow a transaction inside a transaction, depending
511 -- on exactly when the thunk is evaluated.)
513 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
514 -- and which allows top-level TVars to be allocated.
516 atomically :: STM a -> IO a
517 atomically (STM m) = IO (\s -> (atomically# m) s )
519 -- |Retry execution of the current memory transaction because it has seen
520 -- values in TVars which mean that it should not continue (e.g. the TVars
521 -- represent a shared buffer that is now empty). The implementation may
522 -- block the thread until one of the TVars that it has read from has been
523 -- udpated. (GHC only)
525 retry = STM $ \s# -> retry# s#
527 -- |Compose two alternative STM actions (GHC only). If the first action
528 -- completes without retrying then it forms the result of the orElse.
529 -- Otherwise, if the first action retries, then the second action is
530 -- tried in its place. If both actions retry then the orElse as a
532 orElse :: STM a -> STM a -> STM a
533 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
535 -- |Exception handling within STM actions.
536 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
537 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
539 -- | Low-level primitive on which always and alwaysSucceeds are built.
540 -- checkInv differs form these in that (i) the invariant is not
541 -- checked when checkInv is called, only at the end of this and
542 -- subsequent transcations, (ii) the invariant failure is indicated
543 -- by raising an exception.
544 checkInv :: STM a -> STM ()
545 checkInv (STM m) = STM (\s -> (check# m) s)
547 -- | alwaysSucceeds adds a new invariant that must be true when passed
548 -- to alwaysSucceeds, at the end of the current transaction, and at
549 -- the end of every subsequent transaction. If it fails at any
550 -- of those points then the transaction violating it is aborted
551 -- and the exception raised by the invariant is propagated.
552 alwaysSucceeds :: STM a -> STM ()
553 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
556 -- | always is a variant of alwaysSucceeds in which the invariant is
557 -- expressed as an STM Bool action that must return True. Returning
558 -- False or raising an exception are both treated as invariant failures.
559 always :: STM Bool -> STM ()
560 always i = alwaysSucceeds ( do v <- i
561 if (v) then return () else ( error "Transacional invariant violation" ) )
563 -- |Shared memory locations that support atomic memory transactions.
564 data TVar a = TVar (TVar# RealWorld a)
566 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
568 instance Eq (TVar a) where
569 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
571 -- |Create a new TVar holding a value supplied
572 newTVar :: a -> STM (TVar a)
573 newTVar val = STM $ \s1# ->
574 case newTVar# val s1# of
575 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
577 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
578 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
579 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
581 newTVarIO :: a -> IO (TVar a)
582 newTVarIO val = IO $ \s1# ->
583 case newTVar# val s1# of
584 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
586 -- |Return the current value stored in a TVar.
587 -- This is equivalent to
589 -- > readTVarIO = atomically . readTVar
591 -- but works much faster, because it doesn't perform a complete
592 -- transaction, it just reads the current value of the 'TVar'.
593 readTVarIO :: TVar a -> IO a
594 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
596 -- |Return the current value stored in a TVar
597 readTVar :: TVar a -> STM a
598 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
600 -- |Write the supplied value into a TVar
601 writeTVar :: TVar a -> a -> STM ()
602 writeTVar (TVar tvar#) val = STM $ \s1# ->
603 case writeTVar# tvar# val s1# of
611 withMVar :: MVar a -> (a -> IO b) -> IO b
615 b <- catchAny (unblock (io a))
616 (\e -> do putMVar m a; throw e)
620 modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
624 a' <- catchAny (unblock (io a))
625 (\e -> do putMVar m a; throw e)
630 %************************************************************************
632 \subsection{Thread waiting}
634 %************************************************************************
637 #ifdef mingw32_HOST_OS
639 -- Note: threadWaitRead and threadWaitWrite aren't really functional
640 -- on Win32, but left in there because lib code (still) uses them (the manner
641 -- in which they're used doesn't cause problems on a Win32 platform though.)
643 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
644 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
645 IO $ \s -> case asyncRead# fd isSock len buf s of
646 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
648 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
649 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
650 IO $ \s -> case asyncWrite# fd isSock len buf s of
651 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
653 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
654 asyncDoProc (FunPtr proc) (Ptr param) =
655 -- the 'length' value is ignored; simplifies implementation of
656 -- the async*# primops to have them all return the same result.
657 IO $ \s -> case asyncDoProc# proc param s of
658 (# s', _len#, err# #) -> (# s', I# err# #)
660 -- to aid the use of these primops by the IO Handle implementation,
661 -- provide the following convenience funs:
663 -- this better be a pinned byte array!
664 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
665 asyncReadBA fd isSock len off bufB =
666 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
668 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
669 asyncWriteBA fd isSock len off bufB =
670 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
674 -- -----------------------------------------------------------------------------
677 -- | Block the current thread until data is available to read on the
678 -- given file descriptor (GHC only).
679 threadWaitRead :: Fd -> IO ()
681 #ifndef mingw32_HOST_OS
682 | threaded = waitForReadEvent fd
684 | otherwise = IO $ \s ->
685 case fromIntegral fd of { I# fd# ->
686 case waitRead# fd# s of { s' -> (# s', () #)
689 -- | Block the current thread until data can be written to the
690 -- given file descriptor (GHC only).
691 threadWaitWrite :: Fd -> IO ()
693 #ifndef mingw32_HOST_OS
694 | threaded = waitForWriteEvent fd
696 | otherwise = IO $ \s ->
697 case fromIntegral fd of { I# fd# ->
698 case waitWrite# fd# s of { s' -> (# s', () #)
701 -- | Suspends the current thread for a given number of microseconds
704 -- There is no guarantee that the thread will be rescheduled promptly
705 -- when the delay has expired, but the thread will never continue to
706 -- run /earlier/ than specified.
708 threadDelay :: Int -> IO ()
710 | threaded = waitForDelayEvent time
711 | otherwise = IO $ \s ->
712 case fromIntegral time of { I# time# ->
713 case delay# time# s of { s' -> (# s', () #)
717 -- | Set the value of returned TVar to True after a given number of
718 -- microseconds. The caveats associated with threadDelay also apply.
720 registerDelay :: Int -> IO (TVar Bool)
722 | threaded = waitForDelayEventSTM usecs
723 | otherwise = error "registerDelay: requires -threaded"
725 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
727 waitForDelayEvent :: Int -> IO ()
728 waitForDelayEvent usecs = do
730 target <- calculateTarget usecs
731 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
735 -- Delays for use in STM
736 waitForDelayEventSTM :: Int -> IO (TVar Bool)
737 waitForDelayEventSTM usecs = do
738 t <- atomically $ newTVar False
739 target <- calculateTarget usecs
740 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
744 calculateTarget :: Int -> IO USecs
745 calculateTarget usecs = do
747 return $ now + (fromIntegral usecs)
750 -- ----------------------------------------------------------------------------
751 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
753 -- In the threaded RTS, we employ a single IO Manager thread to wait
754 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
755 -- and delays (threadDelay).
757 -- We can do this because in the threaded RTS the IO Manager can make
758 -- a non-blocking call to select(), so we don't have to do select() in
759 -- the scheduler as we have to in the non-threaded RTS. We get performance
760 -- benefits from doing it this way, because we only have to restart the select()
761 -- when a new request arrives, rather than doing one select() each time
762 -- around the scheduler loop. Furthermore, the scheduler can be simplified
763 -- by not having to check for completed IO requests.
765 #ifndef mingw32_HOST_OS
767 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
768 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
772 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
773 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
775 #ifndef mingw32_HOST_OS
776 {-# NOINLINE pendingEvents #-}
777 pendingEvents :: IORef [IOReq]
778 pendingEvents = unsafePerformIO $ do
780 sharedCAF m getOrSetGHCConcPendingEventsStore
782 foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
783 getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
786 {-# NOINLINE pendingDelays #-}
787 pendingDelays :: IORef [DelayReq]
788 pendingDelays = unsafePerformIO $ do
790 sharedCAF m getOrSetGHCConcPendingDelaysStore
792 foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
793 getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
795 {-# NOINLINE ioManagerThread #-}
796 ioManagerThread :: MVar (Maybe ThreadId)
797 ioManagerThread = unsafePerformIO $ do
799 sharedCAF m getOrSetGHCConcIOManagerThreadStore
801 foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
802 getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
804 ensureIOManagerIsRunning :: IO ()
805 ensureIOManagerIsRunning
806 | threaded = startIOManagerThread
807 | otherwise = return ()
809 startIOManagerThread :: IO ()
810 startIOManagerThread = do
811 modifyMVar_ ioManagerThread $ \old -> do
812 let create = do t <- forkIO ioManager; return (Just t)
818 ThreadFinished -> create
820 _other -> return (Just t)
822 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
823 insertDelay d [] = [d]
824 insertDelay d1 ds@(d2 : rest)
825 | delayTime d1 <= delayTime d2 = d1 : ds
826 | otherwise = d2 : insertDelay d1 rest
828 delayTime :: DelayReq -> USecs
829 delayTime (Delay t _) = t
830 delayTime (DelaySTM t _) = t
834 foreign import ccall unsafe "getUSecOfDay"
835 getUSecOfDay :: IO USecs
837 {-# NOINLINE prodding #-}
838 prodding :: IORef Bool
839 prodding = unsafePerformIO $ do
841 sharedCAF r getOrSetGHCConcProddingStore
843 foreign import ccall unsafe "getOrSetGHCConcProddingStore"
844 getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
846 prodServiceThread :: IO ()
847 prodServiceThread = do
848 -- NB. use atomicModifyIORef here, otherwise there are race
849 -- conditions in which prodding is left at True but the server is
850 -- blocked in select().
851 was_set <- atomicModifyIORef prodding $ \b -> (True,b)
852 unless was_set wakeupIOManager
854 -- Machinery needed to ensure that we only have one copy of certain
855 -- CAFs in this module even when the base package is present twice, as
856 -- it is when base is dynamically loaded into GHCi. The RTS keeps
857 -- track of the single true value of the CAF, so even when the CAFs in
858 -- the dynamically-loaded base package are reverted, nothing bad
861 sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
862 sharedCAF a get_or_set =
864 stable_ref <- newStablePtr a
865 let ref = castPtr (castStablePtrToPtr stable_ref)
866 ref2 <- get_or_set ref
869 else do freeStablePtr stable_ref
870 deRefStablePtr (castPtrToStablePtr (castPtr ref2))
872 #ifdef mingw32_HOST_OS
873 -- ----------------------------------------------------------------------------
874 -- Windows IO manager thread
878 wakeup <- c_getIOManagerEvent
879 service_loop wakeup []
881 service_loop :: HANDLE -- read end of pipe
882 -> [DelayReq] -- current delay requests
885 service_loop wakeup old_delays = do
886 -- pick up new delay requests
887 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
888 let delays = foldr insertDelay old_delays new_delays
891 (delays', timeout) <- getDelay now delays
893 r <- c_WaitForSingleObject wakeup timeout
895 0xffffffff -> do c_maperrno; throwErrno "service_loop"
897 r2 <- c_readIOManagerEvent
900 _ | r2 == io_MANAGER_WAKEUP -> return False
901 _ | r2 == io_MANAGER_DIE -> return True
902 0 -> return False -- spurious wakeup
903 _ -> do start_console_handler (r2 `shiftR` 1); return False
904 unless exit $ service_cont wakeup delays'
906 _other -> service_cont wakeup delays' -- probably timeout
908 service_cont :: HANDLE -> [DelayReq] -> IO ()
909 service_cont wakeup delays = do
910 r <- atomicModifyIORef prodding (\_ -> (False,False))
911 r `seq` return () -- avoid space leak
912 service_loop wakeup delays
914 -- must agree with rts/win32/ThrIOManager.c
915 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
916 io_MANAGER_WAKEUP = 0xffffffff
917 io_MANAGER_DIE = 0xfffffffe
923 -- these are sent to Services only.
926 deriving (Eq, Ord, Enum, Show, Read, Typeable)
928 start_console_handler :: Word32 -> IO ()
929 start_console_handler r =
930 case toWin32ConsoleEvent r of
931 Just x -> withMVar win32ConsoleHandler $ \handler -> do
932 _ <- forkIO (handler x)
936 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
937 toWin32ConsoleEvent ev =
939 0 {- CTRL_C_EVENT-} -> Just ControlC
940 1 {- CTRL_BREAK_EVENT-} -> Just Break
941 2 {- CTRL_CLOSE_EVENT-} -> Just Close
942 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
943 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
946 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
947 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
949 wakeupIOManager :: IO ()
950 wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
952 -- Walk the queue of pending delays, waking up any that have passed
953 -- and return the smallest delay to wait for. The queue of pending
954 -- delays is kept ordered.
955 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
956 getDelay _ [] = return ([], iNFINITE)
957 getDelay now all@(d : rest)
959 Delay time m | now >= time -> do
962 DelaySTM time t | now >= time -> do
963 atomically $ writeTVar t True
966 -- delay is in millisecs for WaitForSingleObject
967 let micro_seconds = delayTime d - now
968 milli_seconds = (micro_seconds + 999) `div` 1000
969 in return (all, fromIntegral milli_seconds)
971 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
972 -- available yet. We should move some Win32 functionality down here,
973 -- maybe as part of the grand reorganisation of the base package...
978 iNFINITE = 0xFFFFFFFF -- urgh
980 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
981 c_getIOManagerEvent :: IO HANDLE
983 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
984 c_readIOManagerEvent :: IO Word32
986 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
987 c_sendIOManagerEvent :: Word32 -> IO ()
989 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
992 foreign import stdcall "WaitForSingleObject"
993 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
996 -- ----------------------------------------------------------------------------
997 -- Unix IO manager thread, using select()
1001 allocaArray 2 $ \fds -> do
1002 throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
1003 rd_end <- peekElemOff fds 0
1004 wr_end <- peekElemOff fds 1
1005 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
1006 -- don't want them to block.
1007 setCloseOnExec rd_end
1008 setCloseOnExec wr_end
1009 c_setIOManagerPipe wr_end
1010 allocaBytes sizeofFdSet $ \readfds -> do
1011 allocaBytes sizeofFdSet $ \writefds -> do
1012 allocaBytes sizeofTimeVal $ \timeval -> do
1013 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1017 :: Fd -- listen to this for wakeup calls
1024 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1026 -- reset prodding before we look at the new requests. If a new
1027 -- client arrives after this point they will send a wakup which will
1028 -- cause the server to loop around again, so we can be sure to not
1029 -- miss any requests.
1031 -- NB. it's important to do this in the *first* iteration of
1032 -- service_loop, rather than after calling select(), since a client
1033 -- may have set prodding to True without sending a wakeup byte down
1034 -- the pipe, because the pipe wasn't set up.
1035 atomicModifyIORef prodding (\_ -> (False, ()))
1037 -- pick up new IO requests
1038 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1039 let reqs = new_reqs ++ old_reqs
1041 -- pick up new delay requests
1042 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1043 let delays0 = foldr insertDelay old_delays new_delays
1045 -- build the FDSets for select()
1048 fdSet wakeup readfds
1049 maxfd <- buildFdSets 0 readfds writefds reqs
1051 -- perform the select()
1052 let do_select delays = do
1053 -- check the current time and wake up any thread in
1054 -- threadDelay whose timeout has expired. Also find the
1055 -- timeout value for the select() call.
1057 (delays', timeout) <- getDelay now ptimeval delays
1059 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1065 _ | err == eINTR -> do_select delays'
1066 -- EINTR: just redo the select()
1067 _ | err == eBADF -> return (True, delays)
1068 -- EBADF: one of the file descriptors is closed or bad,
1069 -- we don't know which one, so wake everyone up.
1070 _ | otherwise -> throwErrno "select"
1071 -- otherwise (ENOMEM or EINVAL) something has gone
1072 -- wrong; report the error.
1074 return (False,delays')
1076 (wakeup_all,delays') <- do_select delays0
1079 if wakeup_all then return False
1081 b <- fdIsSet wakeup readfds
1084 else alloca $ \p -> do
1085 warnErrnoIfMinus1_ "service_loop" $
1086 c_read (fromIntegral wakeup) p 1
1089 _ | s == io_MANAGER_WAKEUP -> return False
1090 _ | s == io_MANAGER_DIE -> return True
1091 _ | s == io_MANAGER_SYNC -> do
1092 mvars <- readIORef sync
1093 mapM_ (flip putMVar ()) mvars
1096 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1097 withForeignPtr fp $ \p_siginfo -> do
1098 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1100 when (r /= fromIntegral sizeof_siginfo_t) $
1101 error "failed to read siginfo_t"
1102 runHandlers' fp (fromIntegral s)
1107 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1108 else completeRequests reqs readfds writefds []
1110 service_loop wakeup readfds writefds ptimeval reqs' delays'
1112 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1113 io_MANAGER_WAKEUP = 0xff
1114 io_MANAGER_DIE = 0xfe
1115 io_MANAGER_SYNC = 0xfd
1117 {-# NOINLINE sync #-}
1118 sync :: IORef [MVar ()]
1119 sync = unsafePerformIO (newIORef [])
1121 -- waits for the IO manager to drain the pipe
1122 syncIOManager :: IO ()
1125 atomicModifyIORef sync (\old -> (m:old,()))
1129 foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO ()
1130 foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
1132 -- For the non-threaded RTS
1133 runHandlers :: Ptr Word8 -> Int -> IO ()
1134 runHandlers p_info sig = do
1135 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1136 withForeignPtr fp $ \p -> do
1137 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1139 runHandlers' fp (fromIntegral sig)
1141 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1142 runHandlers' p_info sig = do
1143 let int = fromIntegral sig
1144 withMVar signal_handlers $ \arr ->
1145 if not (inRange (boundsIOArray arr) int)
1147 else do handler <- unsafeReadIOArray arr int
1149 Nothing -> return ()
1150 Just (f,_) -> do _ <- forkIO (f p_info)
1153 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1154 warnErrnoIfMinus1_ what io
1158 str <- strerror errno >>= peekCString
1160 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1162 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1164 foreign import ccall "setIOManagerPipe"
1165 c_setIOManagerPipe :: CInt -> IO ()
1167 foreign import ccall "__hscore_sizeof_siginfo_t"
1168 sizeof_siginfo_t :: CSize
1174 type HandlerFun = ForeignPtr Word8 -> IO ()
1176 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1177 -- this race condition is #1922, although that bug was on Windows a similar
1178 -- bug also exists on Unix.
1179 {-# NOINLINE signal_handlers #-}
1180 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1181 signal_handlers = unsafePerformIO $ do
1182 arr <- newIOArray (0,maxSig) Nothing
1184 sharedCAF m getOrSetGHCConcSignalHandlerStore
1186 foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
1187 getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
1189 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1190 setHandler sig handler = do
1191 let int = fromIntegral sig
1192 withMVar signal_handlers $ \arr ->
1193 if not (inRange (boundsIOArray arr) int)
1194 then error "GHC.Conc.setHandler: signal out of range"
1195 else do old <- unsafeReadIOArray arr int
1196 unsafeWriteIOArray arr int handler
1199 -- -----------------------------------------------------------------------------
1202 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1203 buildFdSets maxfd _ _ [] = return maxfd
1204 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1205 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1208 buildFdSets (max maxfd fd) readfds writefds reqs
1209 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1210 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1213 buildFdSets (max maxfd fd) readfds writefds reqs
1215 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1217 completeRequests [] _ _ reqs' = return reqs'
1218 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1219 b <- fdIsSet fd readfds
1221 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1222 else completeRequests reqs readfds writefds (Read fd m : reqs')
1223 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1224 b <- fdIsSet fd writefds
1226 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1227 else completeRequests reqs readfds writefds (Write fd m : reqs')
1229 wakeupAll :: [IOReq] -> IO ()
1230 wakeupAll [] = return ()
1231 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1232 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1234 waitForReadEvent :: Fd -> IO ()
1235 waitForReadEvent fd = do
1237 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1241 waitForWriteEvent :: Fd -> IO ()
1242 waitForWriteEvent fd = do
1244 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1248 -- -----------------------------------------------------------------------------
1251 -- Walk the queue of pending delays, waking up any that have passed
1252 -- and return the smallest delay to wait for. The queue of pending
1253 -- delays is kept ordered.
1254 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1255 getDelay _ _ [] = return ([],nullPtr)
1256 getDelay now ptimeval all@(d : rest)
1258 Delay time m | now >= time -> do
1260 getDelay now ptimeval rest
1261 DelaySTM time t | now >= time -> do
1262 atomically $ writeTVar t True
1263 getDelay now ptimeval rest
1265 setTimevalTicks ptimeval (delayTime d - now)
1266 return (all,ptimeval)
1270 foreign import ccall unsafe "sizeofTimeVal"
1271 sizeofTimeVal :: Int
1273 foreign import ccall unsafe "setTimevalTicks"
1274 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1277 On Win32 we're going to have a single Pipe, and a
1278 waitForSingleObject with the delay time. For signals, we send a
1279 byte down the pipe just like on Unix.
1282 -- ----------------------------------------------------------------------------
1283 -- select() interface
1285 -- ToDo: move to System.Posix.Internals?
1289 foreign import ccall safe "__hscore_select"
1290 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1293 foreign import ccall unsafe "hsFD_SETSIZE"
1294 c_fD_SETSIZE :: CInt
1297 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1299 foreign import ccall unsafe "hsFD_ISSET"
1300 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1302 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1303 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1305 foreign import ccall unsafe "hsFD_SET"
1306 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1308 fdSet :: Fd -> Ptr CFdSet -> IO ()
1309 fdSet (Fd fd) fdset = c_fdSet fd fdset
1311 foreign import ccall unsafe "hsFD_ZERO"
1312 fdZero :: Ptr CFdSet -> IO ()
1314 foreign import ccall unsafe "sizeof_fd_set"
1319 reportStackOverflow :: IO ()
1320 reportStackOverflow = callStackOverflowHook
1322 reportError :: SomeException -> IO ()
1324 handler <- getUncaughtExceptionHandler
1327 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1328 -- the unsafe below.
1329 foreign import ccall unsafe "stackOverflow"
1330 callStackOverflowHook :: IO ()
1332 {-# NOINLINE uncaughtExceptionHandler #-}
1333 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1334 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1336 defaultHandler :: SomeException -> IO ()
1337 defaultHandler se@(SomeException ex) = do
1338 (hFlush stdout) `catchAny` (\ _ -> return ())
1339 let msg = case cast ex of
1340 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1341 _ -> case cast ex of
1342 Just (ErrorCall s) -> s
1343 _ -> showsPrec 0 se ""
1344 withCString "%s" $ \cfmt ->
1345 withCString msg $ \cmsg ->
1346 errorBelch cfmt cmsg
1348 -- don't use errorBelch() directly, because we cannot call varargs functions
1350 foreign import ccall unsafe "HsBase.h errorBelch2"
1351 errorBelch :: CString -> CString -> IO ()
1353 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1354 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1356 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1357 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler