2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
118 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
119 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
121 import GHC.IO.Exception
125 import GHC.Num ( Num(..) )
126 import GHC.Real ( fromIntegral )
127 #ifndef mingw32_HOST_OS
129 import GHC.Arr ( inRange )
131 #ifdef mingw32_HOST_OS
132 import GHC.Real ( div )
135 #ifdef mingw32_HOST_OS
136 import GHC.Read ( Read )
137 import GHC.Enum ( Enum )
139 import GHC.Pack ( packCString# )
140 import GHC.Show ( Show(..), showString )
143 infixr 0 `par`, `pseq`
146 %************************************************************************
148 \subsection{@ThreadId@, @par@, and @fork@}
150 %************************************************************************
153 data ThreadId = ThreadId ThreadId# deriving( Typeable )
154 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
155 -- But since ThreadId# is unlifted, the Weak type must use open
158 A 'ThreadId' is an abstract type representing a handle to a thread.
159 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
160 the 'Ord' instance implements an arbitrary total ordering over
161 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
162 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
163 useful when debugging or diagnosing the behaviour of a concurrent
166 /Note/: in GHC, if you have a 'ThreadId', you essentially have
167 a pointer to the thread itself. This means the thread itself can\'t be
168 garbage collected until you drop the 'ThreadId'.
169 This misfeature will hopefully be corrected at a later date.
171 /Note/: Hugs does not provide any operations on other threads;
172 it defines 'ThreadId' as a synonym for ().
175 instance Show ThreadId where
177 showString "ThreadId " .
178 showsPrec d (getThreadId (id2TSO t))
180 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
182 id2TSO :: ThreadId -> ThreadId#
183 id2TSO (ThreadId t) = t
185 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
188 cmpThread :: ThreadId -> ThreadId -> Ordering
190 case cmp_thread (id2TSO t1) (id2TSO t2) of
195 instance Eq ThreadId where
197 case t1 `cmpThread` t2 of
201 instance Ord ThreadId where
205 Sparks off a new thread to run the 'IO' computation passed as the
206 first argument, and returns the 'ThreadId' of the newly created
209 The new thread will be a lightweight thread; if you want to use a foreign
210 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
212 GHC note: the new thread inherits the /blocked/ state of the parent
213 (see 'Control.Exception.block').
215 The newly created thread has an exception handler that discards the
216 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
217 'ThreadKilled', and passes all other exceptions to the uncaught
218 exception handler (see 'setUncaughtExceptionHandler').
220 forkIO :: IO () -> IO ThreadId
221 forkIO action = IO $ \ s ->
222 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
224 action_plus = catchException action childHandler
227 Like 'forkIO', but lets you specify on which CPU the thread is
228 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
229 will stay on the same CPU for its entire lifetime (`forkIO` threads
230 can migrate between CPUs according to the scheduling policy).
231 `forkOnIO` is useful for overriding the scheduling policy when you
232 know in advance how best to distribute the threads.
234 The `Int` argument specifies the CPU number; it is interpreted modulo
235 'numCapabilities' (note that it actually specifies a capability number
236 rather than a CPU number, but to a first approximation the two are
239 forkOnIO :: Int -> IO () -> IO ThreadId
240 forkOnIO (I# cpu) action = IO $ \ s ->
241 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
243 action_plus = catchException action childHandler
245 -- | the value passed to the @+RTS -N@ flag. This is the number of
246 -- Haskell threads that can run truly simultaneously at any given
247 -- time, and is typically set to the number of physical CPU cores on
249 numCapabilities :: Int
250 numCapabilities = unsafePerformIO $ do
251 n <- peek n_capabilities
252 return (fromIntegral n)
254 #if defined(mingw32_HOST_OS) && defined(__PIC__)
255 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
257 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
259 childHandler :: SomeException -> IO ()
260 childHandler err = catchException (real_handler err) childHandler
262 real_handler :: SomeException -> IO ()
263 real_handler se@(SomeException ex) =
264 -- ignore thread GC and killThread exceptions:
266 Just BlockedOnDeadMVar -> return ()
268 Just BlockedIndefinitely -> return ()
270 Just ThreadKilled -> return ()
272 -- report all others:
273 Just StackOverflow -> reportStackOverflow
276 {- | 'killThread' terminates the given thread (GHC only).
277 Any work already done by the thread isn\'t
278 lost: the computation is suspended until required by another thread.
279 The memory used by the thread will be garbage collected if it isn\'t
280 referenced from anywhere. The 'killThread' function is defined in
283 > killThread tid = throwTo tid ThreadKilled
285 Killthread is a no-op if the target thread has already completed.
287 killThread :: ThreadId -> IO ()
288 killThread tid = throwTo tid ThreadKilled
290 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
292 'throwTo' does not return until the exception has been raised in the
294 The calling thread can thus be certain that the target
295 thread has received the exception. This is a useful property to know
296 when dealing with race conditions: eg. if there are two threads that
297 can kill each other, it is guaranteed that only one of the threads
298 will get to kill the other.
300 If the target thread is currently making a foreign call, then the
301 exception will not be raised (and hence 'throwTo' will not return)
302 until the call has completed. This is the case regardless of whether
303 the call is inside a 'block' or not.
305 Important note: the behaviour of 'throwTo' differs from that described in
306 the paper \"Asynchronous exceptions in Haskell\"
307 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
308 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
309 a more synchronous design in which 'throwTo' does not return until the exception
310 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
311 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
314 There is currently no guarantee that the exception delivered by 'throwTo' will be
315 delivered at the first possible opportunity. In particular, a thread may
316 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
317 a pending 'throwTo'. This is arguably undesirable behaviour.
320 throwTo :: Exception e => ThreadId -> e -> IO ()
321 throwTo (ThreadId tid) ex = IO $ \ s ->
322 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
324 -- | Returns the 'ThreadId' of the calling thread (GHC only).
325 myThreadId :: IO ThreadId
326 myThreadId = IO $ \s ->
327 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
330 -- |The 'yield' action allows (forces, in a co-operative multitasking
331 -- implementation) a context-switch to any other currently runnable
332 -- threads (if any), and is occasionally useful when implementing
333 -- concurrency abstractions.
336 case (yield# s) of s1 -> (# s1, () #)
338 {- | 'labelThread' stores a string as identifier for this thread if
339 you built a RTS with debugging support. This identifier will be used in
340 the debugging output to make distinction of different threads easier
341 (otherwise you only have the thread state object\'s address in the heap).
343 Other applications like the graphical Concurrent Haskell Debugger
344 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
345 'labelThread' for their purposes as well.
348 labelThread :: ThreadId -> String -> IO ()
349 labelThread (ThreadId t) str = IO $ \ s ->
350 let !ps = packCString# str
351 !adr = byteArrayContents# ps in
352 case (labelThread# t adr s) of s1 -> (# s1, () #)
354 -- Nota Bene: 'pseq' used to be 'seq'
355 -- but 'seq' is now defined in PrelGHC
357 -- "pseq" is defined a bit weirdly (see below)
359 -- The reason for the strange "lazy" call is that
360 -- it fools the compiler into thinking that pseq and par are non-strict in
361 -- their second argument (even if it inlines pseq at the call site).
362 -- If it thinks pseq is strict in "y", then it often evaluates
363 -- "y" before "x", which is totally wrong.
367 pseq x y = x `seq` lazy y
371 par x y = case (par# x) of { _ -> lazy y }
373 -- | Internal function used by the RTS to run sparks.
376 where loop s = case getSpark# s of
378 if n ==# 0# then (# s', () #)
383 -- ^blocked on on 'MVar'
385 -- ^blocked on a computation in progress by another thread
387 -- ^blocked in 'throwTo'
389 -- ^blocked in 'retry' in an STM transaction
390 | BlockedOnForeignCall
391 -- ^currently in a foreign call
393 -- ^blocked on some other resource. Without @-threaded@,
394 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
395 -- they show up as 'BlockedOnMVar'.
396 deriving (Eq,Ord,Show)
398 -- | The current status of a thread
401 -- ^the thread is currently runnable or running
403 -- ^the thread has finished
404 | ThreadBlocked BlockReason
405 -- ^the thread is blocked on some resource
407 -- ^the thread received an uncaught exception
408 deriving (Eq,Ord,Show)
410 threadStatus :: ThreadId -> IO ThreadStatus
411 threadStatus (ThreadId t) = IO $ \s ->
412 case threadStatus# t s of
413 (# s', stat #) -> (# s', mk_stat (I# stat) #)
415 -- NB. keep these in sync with includes/Constants.h
416 mk_stat 0 = ThreadRunning
417 mk_stat 1 = ThreadBlocked BlockedOnMVar
418 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
419 mk_stat 3 = ThreadBlocked BlockedOnException
420 mk_stat 7 = ThreadBlocked BlockedOnSTM
421 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
422 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
423 mk_stat 16 = ThreadFinished
424 mk_stat 17 = ThreadDied
425 mk_stat _ = ThreadBlocked BlockedOnOther
429 %************************************************************************
431 \subsection[stm]{Transactional heap operations}
433 %************************************************************************
435 TVars are shared memory locations which support atomic memory
439 -- |A monad supporting atomic memory transactions.
440 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
442 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
445 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
447 instance Functor STM where
448 fmap f x = x >>= (return . f)
450 instance Monad STM where
451 {-# INLINE return #-}
455 return x = returnSTM x
456 m >>= k = bindSTM m k
458 bindSTM :: STM a -> (a -> STM b) -> STM b
459 bindSTM (STM m) k = STM ( \s ->
461 (# new_s, a #) -> unSTM (k a) new_s
464 thenSTM :: STM a -> STM b -> STM b
465 thenSTM (STM m) k = STM ( \s ->
467 (# new_s, _ #) -> unSTM k new_s
470 returnSTM :: a -> STM a
471 returnSTM x = STM (\s -> (# s, x #))
473 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
474 -- dangerous thing to do.
476 -- * The STM implementation will often run transactions multiple
477 -- times, so you need to be prepared for this if your IO has any
480 -- * The STM implementation will abort transactions that are known to
481 -- be invalid and need to be restarted. This may happen in the middle
482 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
483 -- that need releasing (exception handlers are ignored when aborting
484 -- the transaction). That includes doing any IO using Handles, for
485 -- example. Getting this wrong will probably lead to random deadlocks.
487 -- * The transaction may have seen an inconsistent view of memory when
488 -- the IO runs. Invariants that you expect to be true throughout
489 -- your program may not be true inside a transaction, due to the
490 -- way transactions are implemented. Normally this wouldn't be visible
491 -- to the programmer, but using `unsafeIOToSTM` can expose it.
493 unsafeIOToSTM :: IO a -> STM a
494 unsafeIOToSTM (IO m) = STM m
496 -- |Perform a series of STM actions atomically.
498 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
499 -- Any attempt to do so will result in a runtime error. (Reason: allowing
500 -- this would effectively allow a transaction inside a transaction, depending
501 -- on exactly when the thunk is evaluated.)
503 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
504 -- and which allows top-level TVars to be allocated.
506 atomically :: STM a -> IO a
507 atomically (STM m) = IO (\s -> (atomically# m) s )
509 -- |Retry execution of the current memory transaction because it has seen
510 -- values in TVars which mean that it should not continue (e.g. the TVars
511 -- represent a shared buffer that is now empty). The implementation may
512 -- block the thread until one of the TVars that it has read from has been
513 -- udpated. (GHC only)
515 retry = STM $ \s# -> retry# s#
517 -- |Compose two alternative STM actions (GHC only). If the first action
518 -- completes without retrying then it forms the result of the orElse.
519 -- Otherwise, if the first action retries, then the second action is
520 -- tried in its place. If both actions retry then the orElse as a
522 orElse :: STM a -> STM a -> STM a
523 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
525 -- |Exception handling within STM actions.
526 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
527 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
529 -- | Low-level primitive on which always and alwaysSucceeds are built.
530 -- checkInv differs form these in that (i) the invariant is not
531 -- checked when checkInv is called, only at the end of this and
532 -- subsequent transcations, (ii) the invariant failure is indicated
533 -- by raising an exception.
534 checkInv :: STM a -> STM ()
535 checkInv (STM m) = STM (\s -> (check# m) s)
537 -- | alwaysSucceeds adds a new invariant that must be true when passed
538 -- to alwaysSucceeds, at the end of the current transaction, and at
539 -- the end of every subsequent transaction. If it fails at any
540 -- of those points then the transaction violating it is aborted
541 -- and the exception raised by the invariant is propagated.
542 alwaysSucceeds :: STM a -> STM ()
543 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
546 -- | always is a variant of alwaysSucceeds in which the invariant is
547 -- expressed as an STM Bool action that must return True. Returning
548 -- False or raising an exception are both treated as invariant failures.
549 always :: STM Bool -> STM ()
550 always i = alwaysSucceeds ( do v <- i
551 if (v) then return () else ( error "Transacional invariant violation" ) )
553 -- |Shared memory locations that support atomic memory transactions.
554 data TVar a = TVar (TVar# RealWorld a)
556 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
558 instance Eq (TVar a) where
559 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
561 -- |Create a new TVar holding a value supplied
562 newTVar :: a -> STM (TVar a)
563 newTVar val = STM $ \s1# ->
564 case newTVar# val s1# of
565 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
567 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
568 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
569 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
571 newTVarIO :: a -> IO (TVar a)
572 newTVarIO val = IO $ \s1# ->
573 case newTVar# val s1# of
574 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
576 -- |Return the current value stored in a TVar.
577 -- This is equivalent to
579 -- > readTVarIO = atomically . readTVar
581 -- but works much faster, because it doesn't perform a complete
582 -- transaction, it just reads the current value of the 'TVar'.
583 readTVarIO :: TVar a -> IO a
584 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
586 -- |Return the current value stored in a TVar
587 readTVar :: TVar a -> STM a
588 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
590 -- |Write the supplied value into a TVar
591 writeTVar :: TVar a -> a -> STM ()
592 writeTVar (TVar tvar#) val = STM $ \s1# ->
593 case writeTVar# tvar# val s1# of
601 withMVar :: MVar a -> (a -> IO b) -> IO b
605 b <- catchAny (unblock (io a))
606 (\e -> do putMVar m a; throw e)
611 %************************************************************************
613 \subsection{Thread waiting}
615 %************************************************************************
618 #ifdef mingw32_HOST_OS
620 -- Note: threadWaitRead and threadWaitWrite aren't really functional
621 -- on Win32, but left in there because lib code (still) uses them (the manner
622 -- in which they're used doesn't cause problems on a Win32 platform though.)
624 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
625 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
626 IO $ \s -> case asyncRead# fd isSock len buf s of
627 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
629 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
630 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
631 IO $ \s -> case asyncWrite# fd isSock len buf s of
632 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
634 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
635 asyncDoProc (FunPtr proc) (Ptr param) =
636 -- the 'length' value is ignored; simplifies implementation of
637 -- the async*# primops to have them all return the same result.
638 IO $ \s -> case asyncDoProc# proc param s of
639 (# s', _len#, err# #) -> (# s', I# err# #)
641 -- to aid the use of these primops by the IO Handle implementation,
642 -- provide the following convenience funs:
644 -- this better be a pinned byte array!
645 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
646 asyncReadBA fd isSock len off bufB =
647 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
649 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
650 asyncWriteBA fd isSock len off bufB =
651 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
655 -- -----------------------------------------------------------------------------
658 -- | Block the current thread until data is available to read on the
659 -- given file descriptor (GHC only).
660 threadWaitRead :: Fd -> IO ()
662 #ifndef mingw32_HOST_OS
663 | threaded = waitForReadEvent fd
665 | otherwise = IO $ \s ->
666 case fromIntegral fd of { I# fd# ->
667 case waitRead# fd# s of { s' -> (# s', () #)
670 -- | Block the current thread until data can be written to the
671 -- given file descriptor (GHC only).
672 threadWaitWrite :: Fd -> IO ()
674 #ifndef mingw32_HOST_OS
675 | threaded = waitForWriteEvent fd
677 | otherwise = IO $ \s ->
678 case fromIntegral fd of { I# fd# ->
679 case waitWrite# fd# s of { s' -> (# s', () #)
682 -- | Suspends the current thread for a given number of microseconds
685 -- There is no guarantee that the thread will be rescheduled promptly
686 -- when the delay has expired, but the thread will never continue to
687 -- run /earlier/ than specified.
689 threadDelay :: Int -> IO ()
691 | threaded = waitForDelayEvent time
692 | otherwise = IO $ \s ->
693 case fromIntegral time of { I# time# ->
694 case delay# time# s of { s' -> (# s', () #)
698 -- | Set the value of returned TVar to True after a given number of
699 -- microseconds. The caveats associated with threadDelay also apply.
701 registerDelay :: Int -> IO (TVar Bool)
703 | threaded = waitForDelayEventSTM usecs
704 | otherwise = error "registerDelay: requires -threaded"
706 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
708 waitForDelayEvent :: Int -> IO ()
709 waitForDelayEvent usecs = do
711 target <- calculateTarget usecs
712 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
716 -- Delays for use in STM
717 waitForDelayEventSTM :: Int -> IO (TVar Bool)
718 waitForDelayEventSTM usecs = do
719 t <- atomically $ newTVar False
720 target <- calculateTarget usecs
721 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
725 calculateTarget :: Int -> IO USecs
726 calculateTarget usecs = do
728 return $ now + (fromIntegral usecs)
731 -- ----------------------------------------------------------------------------
732 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
734 -- In the threaded RTS, we employ a single IO Manager thread to wait
735 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
736 -- and delays (threadDelay).
738 -- We can do this because in the threaded RTS the IO Manager can make
739 -- a non-blocking call to select(), so we don't have to do select() in
740 -- the scheduler as we have to in the non-threaded RTS. We get performance
741 -- benefits from doing it this way, because we only have to restart the select()
742 -- when a new request arrives, rather than doing one select() each time
743 -- around the scheduler loop. Furthermore, the scheduler can be simplified
744 -- by not having to check for completed IO requests.
746 -- Issues, possible problems:
748 -- - we might want bound threads to just do the blocking
749 -- operation rather than communicating with the IO manager
750 -- thread. This would prevent simgle-threaded programs which do
751 -- IO from requiring multiple OS threads. However, it would also
752 -- prevent bound threads waiting on IO from being killed or sent
755 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
756 -- I couldn't repeat this.
758 -- - How do we handle signal delivery in the multithreaded RTS?
760 -- - forkProcess will kill the IO manager thread. Let's just
761 -- hope we don't need to do any blocking IO between fork & exec.
763 #ifndef mingw32_HOST_OS
765 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
766 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
770 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
771 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
773 #ifndef mingw32_HOST_OS
774 pendingEvents :: IORef [IOReq]
776 pendingDelays :: IORef [DelayReq]
777 -- could use a strict list or array here
778 {-# NOINLINE pendingEvents #-}
779 {-# NOINLINE pendingDelays #-}
780 (pendingEvents,pendingDelays) = unsafePerformIO $ do
785 -- the first time we schedule an IO request, the service thread
786 -- will be created (cool, huh?)
788 ensureIOManagerIsRunning :: IO ()
789 ensureIOManagerIsRunning
790 | threaded = seq pendingEvents $ return ()
791 | otherwise = return ()
793 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
794 insertDelay d [] = [d]
795 insertDelay d1 ds@(d2 : rest)
796 | delayTime d1 <= delayTime d2 = d1 : ds
797 | otherwise = d2 : insertDelay d1 rest
799 delayTime :: DelayReq -> USecs
800 delayTime (Delay t _) = t
801 delayTime (DelaySTM t _) = t
805 foreign import ccall unsafe "getUSecOfDay"
806 getUSecOfDay :: IO USecs
808 prodding :: IORef Bool
809 {-# NOINLINE prodding #-}
810 prodding = unsafePerformIO (newIORef False)
812 prodServiceThread :: IO ()
813 prodServiceThread = do
814 was_set <- atomicModifyIORef prodding (\a -> (True,a))
815 if (not (was_set)) then wakeupIOManager else return ()
817 #ifdef mingw32_HOST_OS
818 -- ----------------------------------------------------------------------------
819 -- Windows IO manager thread
821 startIOManagerThread :: IO ()
822 startIOManagerThread = do
823 wakeup <- c_getIOManagerEvent
824 forkIO $ service_loop wakeup []
827 service_loop :: HANDLE -- read end of pipe
828 -> [DelayReq] -- current delay requests
831 service_loop wakeup old_delays = do
832 -- pick up new delay requests
833 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
834 let delays = foldr insertDelay old_delays new_delays
837 (delays', timeout) <- getDelay now delays
839 r <- c_WaitForSingleObject wakeup timeout
841 0xffffffff -> do c_maperrno; throwErrno "service_loop"
843 r2 <- c_readIOManagerEvent
846 _ | r2 == io_MANAGER_WAKEUP -> return False
847 _ | r2 == io_MANAGER_DIE -> return True
848 0 -> return False -- spurious wakeup
849 _ -> do start_console_handler (r2 `shiftR` 1); return False
852 else service_cont wakeup delays'
854 _other -> service_cont wakeup delays' -- probably timeout
856 service_cont :: HANDLE -> [DelayReq] -> IO ()
857 service_cont wakeup delays = do
858 r <- atomicModifyIORef prodding (\_ -> (False,False))
859 r `seq` return () -- avoid space leak
860 service_loop wakeup delays
862 -- must agree with rts/win32/ThrIOManager.c
863 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
864 io_MANAGER_WAKEUP = 0xffffffff
865 io_MANAGER_DIE = 0xfffffffe
871 -- these are sent to Services only.
874 deriving (Eq, Ord, Enum, Show, Read, Typeable)
876 start_console_handler :: Word32 -> IO ()
877 start_console_handler r =
878 case toWin32ConsoleEvent r of
879 Just x -> withMVar win32ConsoleHandler $ \handler -> do
884 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
885 toWin32ConsoleEvent ev =
887 0 {- CTRL_C_EVENT-} -> Just ControlC
888 1 {- CTRL_BREAK_EVENT-} -> Just Break
889 2 {- CTRL_CLOSE_EVENT-} -> Just Close
890 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
891 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
894 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
895 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
897 -- XXX Is this actually needed?
898 stick :: IORef HANDLE
899 {-# NOINLINE stick #-}
900 stick = unsafePerformIO (newIORef nullPtr)
902 wakeupIOManager :: IO ()
904 _hdl <- readIORef stick
905 c_sendIOManagerEvent io_MANAGER_WAKEUP
907 -- Walk the queue of pending delays, waking up any that have passed
908 -- and return the smallest delay to wait for. The queue of pending
909 -- delays is kept ordered.
910 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
911 getDelay _ [] = return ([], iNFINITE)
912 getDelay now all@(d : rest)
914 Delay time m | now >= time -> do
917 DelaySTM time t | now >= time -> do
918 atomically $ writeTVar t True
921 -- delay is in millisecs for WaitForSingleObject
922 let micro_seconds = delayTime d - now
923 milli_seconds = (micro_seconds + 999) `div` 1000
924 in return (all, fromIntegral milli_seconds)
926 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
927 -- available yet. We should move some Win32 functionality down here,
928 -- maybe as part of the grand reorganisation of the base package...
933 iNFINITE = 0xFFFFFFFF -- urgh
935 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
936 c_getIOManagerEvent :: IO HANDLE
938 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
939 c_readIOManagerEvent :: IO Word32
941 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
942 c_sendIOManagerEvent :: Word32 -> IO ()
944 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
947 foreign import stdcall "WaitForSingleObject"
948 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
951 -- ----------------------------------------------------------------------------
952 -- Unix IO manager thread, using select()
954 startIOManagerThread :: IO ()
955 startIOManagerThread = do
956 allocaArray 2 $ \fds -> do
957 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
958 rd_end <- peekElemOff fds 0
959 wr_end <- peekElemOff fds 1
960 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
961 -- don't want them to block.
962 setCloseOnExec rd_end
963 setCloseOnExec wr_end
964 writeIORef stick (fromIntegral wr_end)
965 c_setIOManagerPipe wr_end
967 allocaBytes sizeofFdSet $ \readfds -> do
968 allocaBytes sizeofFdSet $ \writefds -> do
969 allocaBytes sizeofTimeVal $ \timeval -> do
970 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
974 :: Fd -- listen to this for wakeup calls
981 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
983 -- pick up new IO requests
984 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
985 let reqs = new_reqs ++ old_reqs
987 -- pick up new delay requests
988 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
989 let delays0 = foldr insertDelay old_delays new_delays
991 -- build the FDSets for select()
995 maxfd <- buildFdSets 0 readfds writefds reqs
997 -- perform the select()
998 let do_select delays = do
999 -- check the current time and wake up any thread in
1000 -- threadDelay whose timeout has expired. Also find the
1001 -- timeout value for the select() call.
1003 (delays', timeout) <- getDelay now ptimeval delays
1005 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1011 _ | err == eINTR -> do_select delays'
1012 -- EINTR: just redo the select()
1013 _ | err == eBADF -> return (True, delays)
1014 -- EBADF: one of the file descriptors is closed or bad,
1015 -- we don't know which one, so wake everyone up.
1016 _ | otherwise -> throwErrno "select"
1017 -- otherwise (ENOMEM or EINVAL) something has gone
1018 -- wrong; report the error.
1020 return (False,delays')
1022 (wakeup_all,delays') <- do_select delays0
1025 if wakeup_all then return False
1027 b <- fdIsSet wakeup readfds
1030 else alloca $ \p -> do
1031 c_read (fromIntegral wakeup) p 1
1034 _ | s == io_MANAGER_WAKEUP -> return False
1035 _ | s == io_MANAGER_DIE -> return True
1036 _ | s == io_MANAGER_SYNC -> do
1037 mvars <- readIORef sync
1038 mapM_ (flip putMVar ()) mvars
1041 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1042 withForeignPtr fp $ \p_siginfo -> do
1043 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1045 when (r /= fromIntegral sizeof_siginfo_t) $
1046 error "failed to read siginfo_t"
1047 runHandlers' fp (fromIntegral s)
1050 if exit then return () else do
1052 atomicModifyIORef prodding (\_ -> (False,False))
1054 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1055 else completeRequests reqs readfds writefds []
1057 service_loop wakeup readfds writefds ptimeval reqs' delays'
1059 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1060 io_MANAGER_WAKEUP = 0xff
1061 io_MANAGER_DIE = 0xfe
1062 io_MANAGER_SYNC = 0xfd
1064 -- | the stick is for poking the IO manager with
1066 {-# NOINLINE stick #-}
1067 stick = unsafePerformIO (newIORef 0)
1069 {-# NOINLINE sync #-}
1070 sync :: IORef [MVar ()]
1071 sync = unsafePerformIO (newIORef [])
1073 -- waits for the IO manager to drain the pipe
1074 syncIOManager :: IO ()
1077 atomicModifyIORef sync (\old -> (m:old,()))
1078 fd <- readIORef stick
1079 with io_MANAGER_SYNC $ \pbuf -> do
1080 c_write (fromIntegral fd) pbuf 1; return ()
1083 wakeupIOManager :: IO ()
1084 wakeupIOManager = do
1085 fd <- readIORef stick
1086 with io_MANAGER_WAKEUP $ \pbuf -> do
1087 c_write (fromIntegral fd) pbuf 1; return ()
1089 -- For the non-threaded RTS
1090 runHandlers :: Ptr Word8 -> Int -> IO ()
1091 runHandlers p_info sig = do
1092 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1093 withForeignPtr fp $ \p -> do
1094 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1096 runHandlers' fp (fromIntegral sig)
1098 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1099 runHandlers' p_info sig = do
1100 let int = fromIntegral sig
1101 withMVar signal_handlers $ \arr ->
1102 if not (inRange (boundsIOArray arr) int)
1104 else do handler <- unsafeReadIOArray arr int
1106 Nothing -> return ()
1107 Just (f,_) -> do forkIO (f p_info); return ()
1109 foreign import ccall "setIOManagerPipe"
1110 c_setIOManagerPipe :: CInt -> IO ()
1112 foreign import ccall "__hscore_sizeof_siginfo_t"
1113 sizeof_siginfo_t :: CSize
1119 type HandlerFun = ForeignPtr Word8 -> IO ()
1121 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1122 -- this race condition is #1922, although that bug was on Windows a similar
1123 -- bug also exists on Unix.
1124 {-# NOINLINE signal_handlers #-}
1125 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1126 signal_handlers = unsafePerformIO $ do
1127 arr <- newIOArray (0,maxSig) Nothing
1130 stable_ref <- newStablePtr m
1131 let ref = castStablePtrToPtr stable_ref
1132 ref2 <- getOrSetSignalHandlerStore ref
1135 else do freeStablePtr stable_ref
1136 deRefStablePtr (castPtrToStablePtr ref2)
1138 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1139 getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1141 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1142 setHandler sig handler = do
1143 let int = fromIntegral sig
1144 withMVar signal_handlers $ \arr ->
1145 if not (inRange (boundsIOArray arr) int)
1146 then error "GHC.Conc.setHandler: signal out of range"
1147 else do old <- unsafeReadIOArray arr int
1148 unsafeWriteIOArray arr int handler
1151 -- -----------------------------------------------------------------------------
1154 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1155 buildFdSets maxfd _ _ [] = return maxfd
1156 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1157 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1160 buildFdSets (max maxfd fd) readfds writefds reqs
1161 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1162 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1165 buildFdSets (max maxfd fd) readfds writefds reqs
1167 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1169 completeRequests [] _ _ reqs' = return reqs'
1170 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1171 b <- fdIsSet fd readfds
1173 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1174 else completeRequests reqs readfds writefds (Read fd m : reqs')
1175 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1176 b <- fdIsSet fd writefds
1178 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1179 else completeRequests reqs readfds writefds (Write fd m : reqs')
1181 wakeupAll :: [IOReq] -> IO ()
1182 wakeupAll [] = return ()
1183 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1184 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1186 waitForReadEvent :: Fd -> IO ()
1187 waitForReadEvent fd = do
1189 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1193 waitForWriteEvent :: Fd -> IO ()
1194 waitForWriteEvent fd = do
1196 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1200 -- -----------------------------------------------------------------------------
1203 -- Walk the queue of pending delays, waking up any that have passed
1204 -- and return the smallest delay to wait for. The queue of pending
1205 -- delays is kept ordered.
1206 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1207 getDelay _ _ [] = return ([],nullPtr)
1208 getDelay now ptimeval all@(d : rest)
1210 Delay time m | now >= time -> do
1212 getDelay now ptimeval rest
1213 DelaySTM time t | now >= time -> do
1214 atomically $ writeTVar t True
1215 getDelay now ptimeval rest
1217 setTimevalTicks ptimeval (delayTime d - now)
1218 return (all,ptimeval)
1222 foreign import ccall unsafe "sizeofTimeVal"
1223 sizeofTimeVal :: Int
1225 foreign import ccall unsafe "setTimevalTicks"
1226 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1229 On Win32 we're going to have a single Pipe, and a
1230 waitForSingleObject with the delay time. For signals, we send a
1231 byte down the pipe just like on Unix.
1234 -- ----------------------------------------------------------------------------
1235 -- select() interface
1237 -- ToDo: move to System.Posix.Internals?
1241 foreign import ccall safe "select"
1242 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1245 foreign import ccall unsafe "hsFD_SETSIZE"
1246 c_fD_SETSIZE :: CInt
1249 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1251 foreign import ccall unsafe "hsFD_ISSET"
1252 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1254 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1255 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1257 foreign import ccall unsafe "hsFD_SET"
1258 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1260 fdSet :: Fd -> Ptr CFdSet -> IO ()
1261 fdSet (Fd fd) fdset = c_fdSet fd fdset
1263 foreign import ccall unsafe "hsFD_ZERO"
1264 fdZero :: Ptr CFdSet -> IO ()
1266 foreign import ccall unsafe "sizeof_fd_set"
1271 reportStackOverflow :: IO a
1272 reportStackOverflow = do callStackOverflowHook; return undefined
1274 reportError :: SomeException -> IO a
1276 handler <- getUncaughtExceptionHandler
1280 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1281 -- the unsafe below.
1282 foreign import ccall unsafe "stackOverflow"
1283 callStackOverflowHook :: IO ()
1285 {-# NOINLINE uncaughtExceptionHandler #-}
1286 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1287 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1289 defaultHandler :: SomeException -> IO ()
1290 defaultHandler se@(SomeException ex) = do
1291 (hFlush stdout) `catchAny` (\ _ -> return ())
1292 let msg = case cast ex of
1293 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1294 _ -> case cast ex of
1295 Just (ErrorCall s) -> s
1296 _ -> showsPrec 0 se ""
1297 withCString "%s" $ \cfmt ->
1298 withCString msg $ \cmsg ->
1299 errorBelch cfmt cmsg
1301 -- don't use errorBelch() directly, because we cannot call varargs functions
1303 foreign import ccall unsafe "HsBase.h errorBelch2"
1304 errorBelch :: CString -> CString -> IO ()
1306 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1307 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1309 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1310 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler