2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
118 #ifndef mingw32_HOST_OS
121 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
122 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
124 import GHC.IO.Exception
128 import GHC.Num ( Num(..) )
129 import GHC.Real ( fromIntegral )
130 #ifndef mingw32_HOST_OS
132 import GHC.Arr ( inRange )
134 #ifdef mingw32_HOST_OS
135 import GHC.Real ( div )
138 #ifdef mingw32_HOST_OS
139 import GHC.Read ( Read )
140 import GHC.Enum ( Enum )
142 import GHC.Pack ( packCString# )
143 import GHC.Show ( Show(..), showString )
145 infixr 0 `par`, `pseq`
148 %************************************************************************
150 \subsection{@ThreadId@, @par@, and @fork@}
152 %************************************************************************
155 data ThreadId = ThreadId ThreadId# deriving( Typeable )
156 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
157 -- But since ThreadId# is unlifted, the Weak type must use open
160 A 'ThreadId' is an abstract type representing a handle to a thread.
161 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
162 the 'Ord' instance implements an arbitrary total ordering over
163 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
164 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
165 useful when debugging or diagnosing the behaviour of a concurrent
168 /Note/: in GHC, if you have a 'ThreadId', you essentially have
169 a pointer to the thread itself. This means the thread itself can\'t be
170 garbage collected until you drop the 'ThreadId'.
171 This misfeature will hopefully be corrected at a later date.
173 /Note/: Hugs does not provide any operations on other threads;
174 it defines 'ThreadId' as a synonym for ().
177 instance Show ThreadId where
179 showString "ThreadId " .
180 showsPrec d (getThreadId (id2TSO t))
182 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
184 id2TSO :: ThreadId -> ThreadId#
185 id2TSO (ThreadId t) = t
187 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 cmpThread :: ThreadId -> ThreadId -> Ordering
192 case cmp_thread (id2TSO t1) (id2TSO t2) of
197 instance Eq ThreadId where
199 case t1 `cmpThread` t2 of
203 instance Ord ThreadId where
207 Sparks off a new thread to run the 'IO' computation passed as the
208 first argument, and returns the 'ThreadId' of the newly created
211 The new thread will be a lightweight thread; if you want to use a foreign
212 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
214 GHC note: the new thread inherits the /blocked/ state of the parent
215 (see 'Control.Exception.block').
217 The newly created thread has an exception handler that discards the
218 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
219 'ThreadKilled', and passes all other exceptions to the uncaught
220 exception handler (see 'setUncaughtExceptionHandler').
222 forkIO :: IO () -> IO ThreadId
223 forkIO action = IO $ \ s ->
224 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
226 action_plus = catchException action childHandler
229 Like 'forkIO', but lets you specify on which CPU the thread is
230 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
231 will stay on the same CPU for its entire lifetime (`forkIO` threads
232 can migrate between CPUs according to the scheduling policy).
233 `forkOnIO` is useful for overriding the scheduling policy when you
234 know in advance how best to distribute the threads.
236 The `Int` argument specifies the CPU number; it is interpreted modulo
237 'numCapabilities' (note that it actually specifies a capability number
238 rather than a CPU number, but to a first approximation the two are
241 forkOnIO :: Int -> IO () -> IO ThreadId
242 forkOnIO (I# cpu) action = IO $ \ s ->
243 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
245 action_plus = catchException action childHandler
247 -- | the value passed to the @+RTS -N@ flag. This is the number of
248 -- Haskell threads that can run truly simultaneously at any given
249 -- time, and is typically set to the number of physical CPU cores on
251 numCapabilities :: Int
252 numCapabilities = unsafePerformIO $ do
253 n <- peek n_capabilities
254 return (fromIntegral n)
256 #if defined(mingw32_HOST_OS) && defined(__PIC__)
257 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
259 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
261 childHandler :: SomeException -> IO ()
262 childHandler err = catchException (real_handler err) childHandler
264 real_handler :: SomeException -> IO ()
265 real_handler se@(SomeException ex) =
266 -- ignore thread GC and killThread exceptions:
268 Just BlockedIndefinitelyOnMVar -> return ()
270 Just BlockedIndefinitelyOnSTM -> return ()
272 Just ThreadKilled -> return ()
274 -- report all others:
275 Just StackOverflow -> reportStackOverflow
278 {- | 'killThread' terminates the given thread (GHC only).
279 Any work already done by the thread isn\'t
280 lost: the computation is suspended until required by another thread.
281 The memory used by the thread will be garbage collected if it isn\'t
282 referenced from anywhere. The 'killThread' function is defined in
285 > killThread tid = throwTo tid ThreadKilled
287 Killthread is a no-op if the target thread has already completed.
289 killThread :: ThreadId -> IO ()
290 killThread tid = throwTo tid ThreadKilled
292 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
294 'throwTo' does not return until the exception has been raised in the
296 The calling thread can thus be certain that the target
297 thread has received the exception. This is a useful property to know
298 when dealing with race conditions: eg. if there are two threads that
299 can kill each other, it is guaranteed that only one of the threads
300 will get to kill the other.
302 If the target thread is currently making a foreign call, then the
303 exception will not be raised (and hence 'throwTo' will not return)
304 until the call has completed. This is the case regardless of whether
305 the call is inside a 'block' or not.
307 Important note: the behaviour of 'throwTo' differs from that described in
308 the paper \"Asynchronous exceptions in Haskell\"
309 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
310 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
311 a more synchronous design in which 'throwTo' does not return until the exception
312 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
313 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
316 There is currently no guarantee that the exception delivered by 'throwTo' will be
317 delivered at the first possible opportunity. In particular, a thread may
318 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
319 a pending 'throwTo'. This is arguably undesirable behaviour.
322 throwTo :: Exception e => ThreadId -> e -> IO ()
323 throwTo (ThreadId tid) ex = IO $ \ s ->
324 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
326 -- | Returns the 'ThreadId' of the calling thread (GHC only).
327 myThreadId :: IO ThreadId
328 myThreadId = IO $ \s ->
329 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
332 -- |The 'yield' action allows (forces, in a co-operative multitasking
333 -- implementation) a context-switch to any other currently runnable
334 -- threads (if any), and is occasionally useful when implementing
335 -- concurrency abstractions.
338 case (yield# s) of s1 -> (# s1, () #)
340 {- | 'labelThread' stores a string as identifier for this thread if
341 you built a RTS with debugging support. This identifier will be used in
342 the debugging output to make distinction of different threads easier
343 (otherwise you only have the thread state object\'s address in the heap).
345 Other applications like the graphical Concurrent Haskell Debugger
346 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
347 'labelThread' for their purposes as well.
350 labelThread :: ThreadId -> String -> IO ()
351 labelThread (ThreadId t) str = IO $ \ s ->
352 let !ps = packCString# str
353 !adr = byteArrayContents# ps in
354 case (labelThread# t adr s) of s1 -> (# s1, () #)
356 -- Nota Bene: 'pseq' used to be 'seq'
357 -- but 'seq' is now defined in PrelGHC
359 -- "pseq" is defined a bit weirdly (see below)
361 -- The reason for the strange "lazy" call is that
362 -- it fools the compiler into thinking that pseq and par are non-strict in
363 -- their second argument (even if it inlines pseq at the call site).
364 -- If it thinks pseq is strict in "y", then it often evaluates
365 -- "y" before "x", which is totally wrong.
369 pseq x y = x `seq` lazy y
373 par x y = case (par# x) of { _ -> lazy y }
375 -- | Internal function used by the RTS to run sparks.
378 where loop s = case getSpark# s of
380 if n ==# 0# then (# s', () #)
385 -- ^blocked on on 'MVar'
387 -- ^blocked on a computation in progress by another thread
389 -- ^blocked in 'throwTo'
391 -- ^blocked in 'retry' in an STM transaction
392 | BlockedOnForeignCall
393 -- ^currently in a foreign call
395 -- ^blocked on some other resource. Without @-threaded@,
396 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
397 -- they show up as 'BlockedOnMVar'.
398 deriving (Eq,Ord,Show)
400 -- | The current status of a thread
403 -- ^the thread is currently runnable or running
405 -- ^the thread has finished
406 | ThreadBlocked BlockReason
407 -- ^the thread is blocked on some resource
409 -- ^the thread received an uncaught exception
410 deriving (Eq,Ord,Show)
412 threadStatus :: ThreadId -> IO ThreadStatus
413 threadStatus (ThreadId t) = IO $ \s ->
414 case threadStatus# t s of
415 (# s', stat #) -> (# s', mk_stat (I# stat) #)
417 -- NB. keep these in sync with includes/Constants.h
418 mk_stat 0 = ThreadRunning
419 mk_stat 1 = ThreadBlocked BlockedOnMVar
420 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
421 mk_stat 3 = ThreadBlocked BlockedOnException
422 mk_stat 7 = ThreadBlocked BlockedOnSTM
423 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
424 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
425 mk_stat 16 = ThreadFinished
426 mk_stat 17 = ThreadDied
427 mk_stat _ = ThreadBlocked BlockedOnOther
431 %************************************************************************
433 \subsection[stm]{Transactional heap operations}
435 %************************************************************************
437 TVars are shared memory locations which support atomic memory
441 -- |A monad supporting atomic memory transactions.
442 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
444 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
447 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
449 instance Functor STM where
450 fmap f x = x >>= (return . f)
452 instance Monad STM where
453 {-# INLINE return #-}
457 return x = returnSTM x
458 m >>= k = bindSTM m k
460 bindSTM :: STM a -> (a -> STM b) -> STM b
461 bindSTM (STM m) k = STM ( \s ->
463 (# new_s, a #) -> unSTM (k a) new_s
466 thenSTM :: STM a -> STM b -> STM b
467 thenSTM (STM m) k = STM ( \s ->
469 (# new_s, _ #) -> unSTM k new_s
472 returnSTM :: a -> STM a
473 returnSTM x = STM (\s -> (# s, x #))
475 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
476 -- dangerous thing to do.
478 -- * The STM implementation will often run transactions multiple
479 -- times, so you need to be prepared for this if your IO has any
482 -- * The STM implementation will abort transactions that are known to
483 -- be invalid and need to be restarted. This may happen in the middle
484 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
485 -- that need releasing (exception handlers are ignored when aborting
486 -- the transaction). That includes doing any IO using Handles, for
487 -- example. Getting this wrong will probably lead to random deadlocks.
489 -- * The transaction may have seen an inconsistent view of memory when
490 -- the IO runs. Invariants that you expect to be true throughout
491 -- your program may not be true inside a transaction, due to the
492 -- way transactions are implemented. Normally this wouldn't be visible
493 -- to the programmer, but using `unsafeIOToSTM` can expose it.
495 unsafeIOToSTM :: IO a -> STM a
496 unsafeIOToSTM (IO m) = STM m
498 -- |Perform a series of STM actions atomically.
500 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
501 -- Any attempt to do so will result in a runtime error. (Reason: allowing
502 -- this would effectively allow a transaction inside a transaction, depending
503 -- on exactly when the thunk is evaluated.)
505 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
506 -- and which allows top-level TVars to be allocated.
508 atomically :: STM a -> IO a
509 atomically (STM m) = IO (\s -> (atomically# m) s )
511 -- |Retry execution of the current memory transaction because it has seen
512 -- values in TVars which mean that it should not continue (e.g. the TVars
513 -- represent a shared buffer that is now empty). The implementation may
514 -- block the thread until one of the TVars that it has read from has been
515 -- udpated. (GHC only)
517 retry = STM $ \s# -> retry# s#
519 -- |Compose two alternative STM actions (GHC only). If the first action
520 -- completes without retrying then it forms the result of the orElse.
521 -- Otherwise, if the first action retries, then the second action is
522 -- tried in its place. If both actions retry then the orElse as a
524 orElse :: STM a -> STM a -> STM a
525 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
527 -- |Exception handling within STM actions.
528 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
529 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
531 -- | Low-level primitive on which always and alwaysSucceeds are built.
532 -- checkInv differs form these in that (i) the invariant is not
533 -- checked when checkInv is called, only at the end of this and
534 -- subsequent transcations, (ii) the invariant failure is indicated
535 -- by raising an exception.
536 checkInv :: STM a -> STM ()
537 checkInv (STM m) = STM (\s -> (check# m) s)
539 -- | alwaysSucceeds adds a new invariant that must be true when passed
540 -- to alwaysSucceeds, at the end of the current transaction, and at
541 -- the end of every subsequent transaction. If it fails at any
542 -- of those points then the transaction violating it is aborted
543 -- and the exception raised by the invariant is propagated.
544 alwaysSucceeds :: STM a -> STM ()
545 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
548 -- | always is a variant of alwaysSucceeds in which the invariant is
549 -- expressed as an STM Bool action that must return True. Returning
550 -- False or raising an exception are both treated as invariant failures.
551 always :: STM Bool -> STM ()
552 always i = alwaysSucceeds ( do v <- i
553 if (v) then return () else ( error "Transacional invariant violation" ) )
555 -- |Shared memory locations that support atomic memory transactions.
556 data TVar a = TVar (TVar# RealWorld a)
558 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
560 instance Eq (TVar a) where
561 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
563 -- |Create a new TVar holding a value supplied
564 newTVar :: a -> STM (TVar a)
565 newTVar val = STM $ \s1# ->
566 case newTVar# val s1# of
567 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
569 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
570 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
571 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
573 newTVarIO :: a -> IO (TVar a)
574 newTVarIO val = IO $ \s1# ->
575 case newTVar# val s1# of
576 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
578 -- |Return the current value stored in a TVar.
579 -- This is equivalent to
581 -- > readTVarIO = atomically . readTVar
583 -- but works much faster, because it doesn't perform a complete
584 -- transaction, it just reads the current value of the 'TVar'.
585 readTVarIO :: TVar a -> IO a
586 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
588 -- |Return the current value stored in a TVar
589 readTVar :: TVar a -> STM a
590 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
592 -- |Write the supplied value into a TVar
593 writeTVar :: TVar a -> a -> STM ()
594 writeTVar (TVar tvar#) val = STM $ \s1# ->
595 case writeTVar# tvar# val s1# of
603 withMVar :: MVar a -> (a -> IO b) -> IO b
607 b <- catchAny (unblock (io a))
608 (\e -> do putMVar m a; throw e)
612 modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
616 a' <- catchAny (unblock (io a))
617 (\e -> do putMVar m a; throw e)
622 %************************************************************************
624 \subsection{Thread waiting}
626 %************************************************************************
629 #ifdef mingw32_HOST_OS
631 -- Note: threadWaitRead and threadWaitWrite aren't really functional
632 -- on Win32, but left in there because lib code (still) uses them (the manner
633 -- in which they're used doesn't cause problems on a Win32 platform though.)
635 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
636 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
637 IO $ \s -> case asyncRead# fd isSock len buf s of
638 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
640 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
641 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
642 IO $ \s -> case asyncWrite# fd isSock len buf s of
643 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
645 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
646 asyncDoProc (FunPtr proc) (Ptr param) =
647 -- the 'length' value is ignored; simplifies implementation of
648 -- the async*# primops to have them all return the same result.
649 IO $ \s -> case asyncDoProc# proc param s of
650 (# s', _len#, err# #) -> (# s', I# err# #)
652 -- to aid the use of these primops by the IO Handle implementation,
653 -- provide the following convenience funs:
655 -- this better be a pinned byte array!
656 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
657 asyncReadBA fd isSock len off bufB =
658 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
660 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
661 asyncWriteBA fd isSock len off bufB =
662 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
666 -- -----------------------------------------------------------------------------
669 -- | Block the current thread until data is available to read on the
670 -- given file descriptor (GHC only).
671 threadWaitRead :: Fd -> IO ()
673 #ifndef mingw32_HOST_OS
674 | threaded = waitForReadEvent fd
676 | otherwise = IO $ \s ->
677 case fromIntegral fd of { I# fd# ->
678 case waitRead# fd# s of { s' -> (# s', () #)
681 -- | Block the current thread until data can be written to the
682 -- given file descriptor (GHC only).
683 threadWaitWrite :: Fd -> IO ()
685 #ifndef mingw32_HOST_OS
686 | threaded = waitForWriteEvent fd
688 | otherwise = IO $ \s ->
689 case fromIntegral fd of { I# fd# ->
690 case waitWrite# fd# s of { s' -> (# s', () #)
693 -- | Suspends the current thread for a given number of microseconds
696 -- There is no guarantee that the thread will be rescheduled promptly
697 -- when the delay has expired, but the thread will never continue to
698 -- run /earlier/ than specified.
700 threadDelay :: Int -> IO ()
702 | threaded = waitForDelayEvent time
703 | otherwise = IO $ \s ->
704 case fromIntegral time of { I# time# ->
705 case delay# time# s of { s' -> (# s', () #)
709 -- | Set the value of returned TVar to True after a given number of
710 -- microseconds. The caveats associated with threadDelay also apply.
712 registerDelay :: Int -> IO (TVar Bool)
714 | threaded = waitForDelayEventSTM usecs
715 | otherwise = error "registerDelay: requires -threaded"
717 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
719 waitForDelayEvent :: Int -> IO ()
720 waitForDelayEvent usecs = do
722 target <- calculateTarget usecs
723 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
727 -- Delays for use in STM
728 waitForDelayEventSTM :: Int -> IO (TVar Bool)
729 waitForDelayEventSTM usecs = do
730 t <- atomically $ newTVar False
731 target <- calculateTarget usecs
732 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
736 calculateTarget :: Int -> IO USecs
737 calculateTarget usecs = do
739 return $ now + (fromIntegral usecs)
742 -- ----------------------------------------------------------------------------
743 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
745 -- In the threaded RTS, we employ a single IO Manager thread to wait
746 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
747 -- and delays (threadDelay).
749 -- We can do this because in the threaded RTS the IO Manager can make
750 -- a non-blocking call to select(), so we don't have to do select() in
751 -- the scheduler as we have to in the non-threaded RTS. We get performance
752 -- benefits from doing it this way, because we only have to restart the select()
753 -- when a new request arrives, rather than doing one select() each time
754 -- around the scheduler loop. Furthermore, the scheduler can be simplified
755 -- by not having to check for completed IO requests.
757 #ifndef mingw32_HOST_OS
759 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
760 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
764 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
765 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
767 #ifndef mingw32_HOST_OS
768 pendingEvents :: IORef [IOReq]
770 pendingDelays :: IORef [DelayReq]
771 {-# NOINLINE pendingEvents #-}
772 {-# NOINLINE pendingDelays #-}
773 (pendingEvents,pendingDelays) = unsafePerformIO $ do
778 {-# NOINLINE ioManagerThread #-}
779 ioManagerThread :: MVar (Maybe ThreadId)
780 ioManagerThread = unsafePerformIO $ newMVar Nothing
782 ensureIOManagerIsRunning :: IO ()
783 ensureIOManagerIsRunning
784 | threaded = startIOManagerThread
785 | otherwise = return ()
787 startIOManagerThread :: IO ()
788 startIOManagerThread = do
789 modifyMVar_ ioManagerThread $ \old -> do
790 let create = do t <- forkIO ioManager; return (Just t)
796 ThreadFinished -> create
798 _other -> return (Just t)
800 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
801 insertDelay d [] = [d]
802 insertDelay d1 ds@(d2 : rest)
803 | delayTime d1 <= delayTime d2 = d1 : ds
804 | otherwise = d2 : insertDelay d1 rest
806 delayTime :: DelayReq -> USecs
807 delayTime (Delay t _) = t
808 delayTime (DelaySTM t _) = t
812 foreign import ccall unsafe "getUSecOfDay"
813 getUSecOfDay :: IO USecs
815 prodding :: IORef Bool
816 {-# NOINLINE prodding #-}
817 prodding = unsafePerformIO (newIORef False)
819 prodServiceThread :: IO ()
820 prodServiceThread = do
821 was_set <- atomicModifyIORef prodding (\a -> (True,a))
822 if (not (was_set)) then wakeupIOManager else return ()
824 #ifdef mingw32_HOST_OS
825 -- ----------------------------------------------------------------------------
826 -- Windows IO manager thread
830 wakeup <- c_getIOManagerEvent
831 service_loop wakeup []
833 service_loop :: HANDLE -- read end of pipe
834 -> [DelayReq] -- current delay requests
837 service_loop wakeup old_delays = do
838 -- pick up new delay requests
839 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
840 let delays = foldr insertDelay old_delays new_delays
843 (delays', timeout) <- getDelay now delays
845 r <- c_WaitForSingleObject wakeup timeout
847 0xffffffff -> do c_maperrno; throwErrno "service_loop"
849 r2 <- c_readIOManagerEvent
852 _ | r2 == io_MANAGER_WAKEUP -> return False
853 _ | r2 == io_MANAGER_DIE -> return True
854 0 -> return False -- spurious wakeup
855 _ -> do start_console_handler (r2 `shiftR` 1); return False
856 unless exit $ service_cont wakeup delays'
858 _other -> service_cont wakeup delays' -- probably timeout
860 service_cont :: HANDLE -> [DelayReq] -> IO ()
861 service_cont wakeup delays = do
862 r <- atomicModifyIORef prodding (\_ -> (False,False))
863 r `seq` return () -- avoid space leak
864 service_loop wakeup delays
866 -- must agree with rts/win32/ThrIOManager.c
867 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
868 io_MANAGER_WAKEUP = 0xffffffff
869 io_MANAGER_DIE = 0xfffffffe
875 -- these are sent to Services only.
878 deriving (Eq, Ord, Enum, Show, Read, Typeable)
880 start_console_handler :: Word32 -> IO ()
881 start_console_handler r =
882 case toWin32ConsoleEvent r of
883 Just x -> withMVar win32ConsoleHandler $ \handler -> do
884 _ <- forkIO (handler x)
888 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
889 toWin32ConsoleEvent ev =
891 0 {- CTRL_C_EVENT-} -> Just ControlC
892 1 {- CTRL_BREAK_EVENT-} -> Just Break
893 2 {- CTRL_CLOSE_EVENT-} -> Just Close
894 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
895 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
898 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
899 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
901 wakeupIOManager :: IO ()
902 wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
904 -- Walk the queue of pending delays, waking up any that have passed
905 -- and return the smallest delay to wait for. The queue of pending
906 -- delays is kept ordered.
907 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
908 getDelay _ [] = return ([], iNFINITE)
909 getDelay now all@(d : rest)
911 Delay time m | now >= time -> do
914 DelaySTM time t | now >= time -> do
915 atomically $ writeTVar t True
918 -- delay is in millisecs for WaitForSingleObject
919 let micro_seconds = delayTime d - now
920 milli_seconds = (micro_seconds + 999) `div` 1000
921 in return (all, fromIntegral milli_seconds)
923 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
924 -- available yet. We should move some Win32 functionality down here,
925 -- maybe as part of the grand reorganisation of the base package...
930 iNFINITE = 0xFFFFFFFF -- urgh
932 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
933 c_getIOManagerEvent :: IO HANDLE
935 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
936 c_readIOManagerEvent :: IO Word32
938 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
939 c_sendIOManagerEvent :: Word32 -> IO ()
941 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
944 foreign import stdcall "WaitForSingleObject"
945 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
948 -- ----------------------------------------------------------------------------
949 -- Unix IO manager thread, using select()
953 allocaArray 2 $ \fds -> do
954 throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
955 rd_end <- peekElemOff fds 0
956 wr_end <- peekElemOff fds 1
957 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
958 -- don't want them to block.
959 setCloseOnExec rd_end
960 setCloseOnExec wr_end
961 writeIORef stick (fromIntegral wr_end)
962 c_setIOManagerPipe wr_end
963 allocaBytes sizeofFdSet $ \readfds -> do
964 allocaBytes sizeofFdSet $ \writefds -> do
965 allocaBytes sizeofTimeVal $ \timeval -> do
966 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
970 :: Fd -- listen to this for wakeup calls
977 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
979 -- pick up new IO requests
980 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
981 let reqs = new_reqs ++ old_reqs
983 -- pick up new delay requests
984 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
985 let delays0 = foldr insertDelay old_delays new_delays
987 -- build the FDSets for select()
991 maxfd <- buildFdSets 0 readfds writefds reqs
993 -- perform the select()
994 let do_select delays = do
995 -- check the current time and wake up any thread in
996 -- threadDelay whose timeout has expired. Also find the
997 -- timeout value for the select() call.
999 (delays', timeout) <- getDelay now ptimeval delays
1001 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1007 _ | err == eINTR -> do_select delays'
1008 -- EINTR: just redo the select()
1009 _ | err == eBADF -> return (True, delays)
1010 -- EBADF: one of the file descriptors is closed or bad,
1011 -- we don't know which one, so wake everyone up.
1012 _ | otherwise -> throwErrno "select"
1013 -- otherwise (ENOMEM or EINVAL) something has gone
1014 -- wrong; report the error.
1016 return (False,delays')
1018 (wakeup_all,delays') <- do_select delays0
1021 if wakeup_all then return False
1023 b <- fdIsSet wakeup readfds
1026 else alloca $ \p -> do
1027 warnErrnoIfMinus1_ "service_loop" $
1028 c_read (fromIntegral wakeup) p 1
1031 _ | s == io_MANAGER_WAKEUP -> return False
1032 _ | s == io_MANAGER_DIE -> return True
1033 _ | s == io_MANAGER_SYNC -> do
1034 mvars <- readIORef sync
1035 mapM_ (flip putMVar ()) mvars
1038 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1039 withForeignPtr fp $ \p_siginfo -> do
1040 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1042 when (r /= fromIntegral sizeof_siginfo_t) $
1043 error "failed to read siginfo_t"
1044 runHandlers' fp (fromIntegral s)
1049 atomicModifyIORef prodding (\_ -> (False, ()))
1051 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1052 else completeRequests reqs readfds writefds []
1054 service_loop wakeup readfds writefds ptimeval reqs' delays'
1056 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1057 io_MANAGER_WAKEUP = 0xff
1058 io_MANAGER_DIE = 0xfe
1059 io_MANAGER_SYNC = 0xfd
1061 -- | the stick is for poking the IO manager with
1063 {-# NOINLINE stick #-}
1064 stick = unsafePerformIO $ newIORef (-1)
1066 {-# NOINLINE sync #-}
1067 sync :: IORef [MVar ()]
1068 sync = unsafePerformIO (newIORef [])
1070 -- waits for the IO manager to drain the pipe
1071 syncIOManager :: IO ()
1074 atomicModifyIORef sync (\old -> (m:old,()))
1075 fd <- readIORef stick
1077 with io_MANAGER_SYNC $ \pbuf -> do
1078 warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
1081 wakeupIOManager :: IO ()
1082 wakeupIOManager = do
1083 fd <- readIORef stick
1085 with io_MANAGER_WAKEUP $ \pbuf -> do
1086 warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
1088 -- For the non-threaded RTS
1089 runHandlers :: Ptr Word8 -> Int -> IO ()
1090 runHandlers p_info sig = do
1091 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1092 withForeignPtr fp $ \p -> do
1093 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1095 runHandlers' fp (fromIntegral sig)
1097 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1098 runHandlers' p_info sig = do
1099 let int = fromIntegral sig
1100 withMVar signal_handlers $ \arr ->
1101 if not (inRange (boundsIOArray arr) int)
1103 else do handler <- unsafeReadIOArray arr int
1105 Nothing -> return ()
1106 Just (f,_) -> do _ <- forkIO (f p_info)
1109 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1110 warnErrnoIfMinus1_ what io
1114 str <- strerror errno >>= peekCString
1116 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1118 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1120 foreign import ccall "setIOManagerPipe"
1121 c_setIOManagerPipe :: CInt -> IO ()
1123 foreign import ccall "__hscore_sizeof_siginfo_t"
1124 sizeof_siginfo_t :: CSize
1130 type HandlerFun = ForeignPtr Word8 -> IO ()
1132 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1133 -- this race condition is #1922, although that bug was on Windows a similar
1134 -- bug also exists on Unix.
1135 {-# NOINLINE signal_handlers #-}
1136 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1137 signal_handlers = unsafePerformIO $ do
1138 arr <- newIOArray (0,maxSig) Nothing
1141 stable_ref <- newStablePtr m
1142 let ref = castStablePtrToPtr stable_ref
1143 ref2 <- getOrSetSignalHandlerStore ref
1146 else do freeStablePtr stable_ref
1147 deRefStablePtr (castPtrToStablePtr ref2)
1149 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1150 getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1152 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1153 setHandler sig handler = do
1154 let int = fromIntegral sig
1155 withMVar signal_handlers $ \arr ->
1156 if not (inRange (boundsIOArray arr) int)
1157 then error "GHC.Conc.setHandler: signal out of range"
1158 else do old <- unsafeReadIOArray arr int
1159 unsafeWriteIOArray arr int handler
1162 -- -----------------------------------------------------------------------------
1165 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1166 buildFdSets maxfd _ _ [] = return maxfd
1167 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1168 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1171 buildFdSets (max maxfd fd) readfds writefds reqs
1172 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1173 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1176 buildFdSets (max maxfd fd) readfds writefds reqs
1178 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1180 completeRequests [] _ _ reqs' = return reqs'
1181 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1182 b <- fdIsSet fd readfds
1184 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1185 else completeRequests reqs readfds writefds (Read fd m : reqs')
1186 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1187 b <- fdIsSet fd writefds
1189 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1190 else completeRequests reqs readfds writefds (Write fd m : reqs')
1192 wakeupAll :: [IOReq] -> IO ()
1193 wakeupAll [] = return ()
1194 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1195 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1197 waitForReadEvent :: Fd -> IO ()
1198 waitForReadEvent fd = do
1200 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1204 waitForWriteEvent :: Fd -> IO ()
1205 waitForWriteEvent fd = do
1207 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1211 -- -----------------------------------------------------------------------------
1214 -- Walk the queue of pending delays, waking up any that have passed
1215 -- and return the smallest delay to wait for. The queue of pending
1216 -- delays is kept ordered.
1217 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1218 getDelay _ _ [] = return ([],nullPtr)
1219 getDelay now ptimeval all@(d : rest)
1221 Delay time m | now >= time -> do
1223 getDelay now ptimeval rest
1224 DelaySTM time t | now >= time -> do
1225 atomically $ writeTVar t True
1226 getDelay now ptimeval rest
1228 setTimevalTicks ptimeval (delayTime d - now)
1229 return (all,ptimeval)
1233 foreign import ccall unsafe "sizeofTimeVal"
1234 sizeofTimeVal :: Int
1236 foreign import ccall unsafe "setTimevalTicks"
1237 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1240 On Win32 we're going to have a single Pipe, and a
1241 waitForSingleObject with the delay time. For signals, we send a
1242 byte down the pipe just like on Unix.
1245 -- ----------------------------------------------------------------------------
1246 -- select() interface
1248 -- ToDo: move to System.Posix.Internals?
1252 foreign import ccall safe "__hscore_select"
1253 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1256 foreign import ccall unsafe "hsFD_SETSIZE"
1257 c_fD_SETSIZE :: CInt
1260 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1262 foreign import ccall unsafe "hsFD_ISSET"
1263 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1265 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1266 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1268 foreign import ccall unsafe "hsFD_SET"
1269 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1271 fdSet :: Fd -> Ptr CFdSet -> IO ()
1272 fdSet (Fd fd) fdset = c_fdSet fd fdset
1274 foreign import ccall unsafe "hsFD_ZERO"
1275 fdZero :: Ptr CFdSet -> IO ()
1277 foreign import ccall unsafe "sizeof_fd_set"
1282 reportStackOverflow :: IO ()
1283 reportStackOverflow = callStackOverflowHook
1285 reportError :: SomeException -> IO ()
1287 handler <- getUncaughtExceptionHandler
1290 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1291 -- the unsafe below.
1292 foreign import ccall unsafe "stackOverflow"
1293 callStackOverflowHook :: IO ()
1295 {-# NOINLINE uncaughtExceptionHandler #-}
1296 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1297 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1299 defaultHandler :: SomeException -> IO ()
1300 defaultHandler se@(SomeException ex) = do
1301 (hFlush stdout) `catchAny` (\ _ -> return ())
1302 let msg = case cast ex of
1303 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1304 _ -> case cast ex of
1305 Just (ErrorCall s) -> s
1306 _ -> showsPrec 0 se ""
1307 withCString "%s" $ \cfmt ->
1308 withCString msg $ \cmsg ->
1309 errorBelch cfmt cmsg
1311 -- don't use errorBelch() directly, because we cannot call varargs functions
1313 foreign import ccall unsafe "HsBase.h errorBelch2"
1314 errorBelch :: CString -> CString -> IO ()
1316 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1317 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1319 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1320 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler