2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifndef mingw32_HOST_OS
114 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
115 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
117 import GHC.IO.Exception
121 import GHC.Num ( Num(..) )
122 import GHC.Real ( fromIntegral )
123 #ifndef mingw32_HOST_OS
125 import GHC.Arr ( inRange )
127 #ifdef mingw32_HOST_OS
128 import GHC.Real ( div )
129 import GHC.Ptr ( FunPtr(..) )
131 #ifdef mingw32_HOST_OS
132 import GHC.Read ( Read )
133 import GHC.Enum ( Enum )
135 import GHC.Pack ( packCString# )
136 import GHC.Ptr ( Ptr(..) )
137 import GHC.Show ( Show(..), showString )
141 infixr 0 `par`, `pseq`
144 %************************************************************************
146 \subsection{@ThreadId@, @par@, and @fork@}
148 %************************************************************************
151 data ThreadId = ThreadId ThreadId# deriving( Typeable )
152 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
153 -- But since ThreadId# is unlifted, the Weak type must use open
156 A 'ThreadId' is an abstract type representing a handle to a thread.
157 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
158 the 'Ord' instance implements an arbitrary total ordering over
159 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
160 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
161 useful when debugging or diagnosing the behaviour of a concurrent
164 /Note/: in GHC, if you have a 'ThreadId', you essentially have
165 a pointer to the thread itself. This means the thread itself can\'t be
166 garbage collected until you drop the 'ThreadId'.
167 This misfeature will hopefully be corrected at a later date.
169 /Note/: Hugs does not provide any operations on other threads;
170 it defines 'ThreadId' as a synonym for ().
173 instance Show ThreadId where
175 showString "ThreadId " .
176 showsPrec d (getThreadId (id2TSO t))
178 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
180 id2TSO :: ThreadId -> ThreadId#
181 id2TSO (ThreadId t) = t
183 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
186 cmpThread :: ThreadId -> ThreadId -> Ordering
188 case cmp_thread (id2TSO t1) (id2TSO t2) of
193 instance Eq ThreadId where
195 case t1 `cmpThread` t2 of
199 instance Ord ThreadId where
203 Sparks off a new thread to run the 'IO' computation passed as the
204 first argument, and returns the 'ThreadId' of the newly created
207 The new thread will be a lightweight thread; if you want to use a foreign
208 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
210 GHC note: the new thread inherits the /blocked/ state of the parent
211 (see 'Control.Exception.block').
213 The newly created thread has an exception handler that discards the
214 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
215 'ThreadKilled', and passes all other exceptions to the uncaught
216 exception handler (see 'setUncaughtExceptionHandler').
218 forkIO :: IO () -> IO ThreadId
219 forkIO action = IO $ \ s ->
220 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
222 action_plus = catchException action childHandler
225 Like 'forkIO', but lets you specify on which CPU the thread is
226 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
227 will stay on the same CPU for its entire lifetime (`forkIO` threads
228 can migrate between CPUs according to the scheduling policy).
229 `forkOnIO` is useful for overriding the scheduling policy when you
230 know in advance how best to distribute the threads.
232 The `Int` argument specifies the CPU number; it is interpreted modulo
233 'numCapabilities' (note that it actually specifies a capability number
234 rather than a CPU number, but to a first approximation the two are
237 forkOnIO :: Int -> IO () -> IO ThreadId
238 forkOnIO (I# cpu) action = IO $ \ s ->
239 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
241 action_plus = catchException action childHandler
243 -- | the value passed to the @+RTS -N@ flag. This is the number of
244 -- Haskell threads that can run truly simultaneously at any given
245 -- time, and is typically set to the number of physical CPU cores on
247 numCapabilities :: Int
248 numCapabilities = unsafePerformIO $ do
249 n <- peek n_capabilities
250 return (fromIntegral n)
252 #if defined(mingw32_HOST_OS) && defined(__PIC__)
253 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
255 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
257 childHandler :: SomeException -> IO ()
258 childHandler err = catchException (real_handler err) childHandler
260 real_handler :: SomeException -> IO ()
261 real_handler se@(SomeException ex) =
262 -- ignore thread GC and killThread exceptions:
264 Just BlockedOnDeadMVar -> return ()
266 Just BlockedIndefinitely -> return ()
268 Just ThreadKilled -> return ()
270 -- report all others:
271 Just StackOverflow -> reportStackOverflow
274 {- | 'killThread' terminates the given thread (GHC only).
275 Any work already done by the thread isn\'t
276 lost: the computation is suspended until required by another thread.
277 The memory used by the thread will be garbage collected if it isn\'t
278 referenced from anywhere. The 'killThread' function is defined in
281 > killThread tid = throwTo tid ThreadKilled
283 Killthread is a no-op if the target thread has already completed.
285 killThread :: ThreadId -> IO ()
286 killThread tid = throwTo tid ThreadKilled
288 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
290 'throwTo' does not return until the exception has been raised in the
292 The calling thread can thus be certain that the target
293 thread has received the exception. This is a useful property to know
294 when dealing with race conditions: eg. if there are two threads that
295 can kill each other, it is guaranteed that only one of the threads
296 will get to kill the other.
298 If the target thread is currently making a foreign call, then the
299 exception will not be raised (and hence 'throwTo' will not return)
300 until the call has completed. This is the case regardless of whether
301 the call is inside a 'block' or not.
303 Important note: the behaviour of 'throwTo' differs from that described in
304 the paper \"Asynchronous exceptions in Haskell\"
305 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
306 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
307 a more synchronous design in which 'throwTo' does not return until the exception
308 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
309 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
312 There is currently no guarantee that the exception delivered by 'throwTo' will be
313 delivered at the first possible opportunity. In particular, a thread may
314 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
315 a pending 'throwTo'. This is arguably undesirable behaviour.
318 throwTo :: Exception e => ThreadId -> e -> IO ()
319 throwTo (ThreadId tid) ex = IO $ \ s ->
320 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
322 -- | Returns the 'ThreadId' of the calling thread (GHC only).
323 myThreadId :: IO ThreadId
324 myThreadId = IO $ \s ->
325 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
328 -- |The 'yield' action allows (forces, in a co-operative multitasking
329 -- implementation) a context-switch to any other currently runnable
330 -- threads (if any), and is occasionally useful when implementing
331 -- concurrency abstractions.
334 case (yield# s) of s1 -> (# s1, () #)
336 {- | 'labelThread' stores a string as identifier for this thread if
337 you built a RTS with debugging support. This identifier will be used in
338 the debugging output to make distinction of different threads easier
339 (otherwise you only have the thread state object\'s address in the heap).
341 Other applications like the graphical Concurrent Haskell Debugger
342 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
343 'labelThread' for their purposes as well.
346 labelThread :: ThreadId -> String -> IO ()
347 labelThread (ThreadId t) str = IO $ \ s ->
348 let !ps = packCString# str
349 !adr = byteArrayContents# ps in
350 case (labelThread# t adr s) of s1 -> (# s1, () #)
352 -- Nota Bene: 'pseq' used to be 'seq'
353 -- but 'seq' is now defined in PrelGHC
355 -- "pseq" is defined a bit weirdly (see below)
357 -- The reason for the strange "lazy" call is that
358 -- it fools the compiler into thinking that pseq and par are non-strict in
359 -- their second argument (even if it inlines pseq at the call site).
360 -- If it thinks pseq is strict in "y", then it often evaluates
361 -- "y" before "x", which is totally wrong.
365 pseq x y = x `seq` lazy y
369 par x y = case (par# x) of { _ -> lazy y }
371 -- | Internal function used by the RTS to run sparks.
374 where loop s = case getSpark# s of
376 if n ==# 0# then (# s', () #)
381 -- ^blocked on on 'MVar'
383 -- ^blocked on a computation in progress by another thread
385 -- ^blocked in 'throwTo'
387 -- ^blocked in 'retry' in an STM transaction
388 | BlockedOnForeignCall
389 -- ^currently in a foreign call
391 -- ^blocked on some other resource. Without @-threaded@,
392 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
393 -- they show up as 'BlockedOnMVar'.
394 deriving (Eq,Ord,Show)
396 -- | The current status of a thread
399 -- ^the thread is currently runnable or running
401 -- ^the thread has finished
402 | ThreadBlocked BlockReason
403 -- ^the thread is blocked on some resource
405 -- ^the thread received an uncaught exception
406 deriving (Eq,Ord,Show)
408 threadStatus :: ThreadId -> IO ThreadStatus
409 threadStatus (ThreadId t) = IO $ \s ->
410 case threadStatus# t s of
411 (# s', stat #) -> (# s', mk_stat (I# stat) #)
413 -- NB. keep these in sync with includes/Constants.h
414 mk_stat 0 = ThreadRunning
415 mk_stat 1 = ThreadBlocked BlockedOnMVar
416 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
417 mk_stat 3 = ThreadBlocked BlockedOnException
418 mk_stat 7 = ThreadBlocked BlockedOnSTM
419 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
420 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
421 mk_stat 16 = ThreadFinished
422 mk_stat 17 = ThreadDied
423 mk_stat _ = ThreadBlocked BlockedOnOther
427 %************************************************************************
429 \subsection[stm]{Transactional heap operations}
431 %************************************************************************
433 TVars are shared memory locations which support atomic memory
437 -- |A monad supporting atomic memory transactions.
438 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
440 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
443 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
445 instance Functor STM where
446 fmap f x = x >>= (return . f)
448 instance Monad STM where
449 {-# INLINE return #-}
453 return x = returnSTM x
454 m >>= k = bindSTM m k
456 bindSTM :: STM a -> (a -> STM b) -> STM b
457 bindSTM (STM m) k = STM ( \s ->
459 (# new_s, a #) -> unSTM (k a) new_s
462 thenSTM :: STM a -> STM b -> STM b
463 thenSTM (STM m) k = STM ( \s ->
465 (# new_s, _ #) -> unSTM k new_s
468 returnSTM :: a -> STM a
469 returnSTM x = STM (\s -> (# s, x #))
471 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
472 -- dangerous thing to do.
474 -- * The STM implementation will often run transactions multiple
475 -- times, so you need to be prepared for this if your IO has any
478 -- * The STM implementation will abort transactions that are known to
479 -- be invalid and need to be restarted. This may happen in the middle
480 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
481 -- that need releasing (exception handlers are ignored when aborting
482 -- the transaction). That includes doing any IO using Handles, for
483 -- example. Getting this wrong will probably lead to random deadlocks.
485 -- * The transaction may have seen an inconsistent view of memory when
486 -- the IO runs. Invariants that you expect to be true throughout
487 -- your program may not be true inside a transaction, due to the
488 -- way transactions are implemented. Normally this wouldn't be visible
489 -- to the programmer, but using `unsafeIOToSTM` can expose it.
491 unsafeIOToSTM :: IO a -> STM a
492 unsafeIOToSTM (IO m) = STM m
494 -- |Perform a series of STM actions atomically.
496 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
497 -- Any attempt to do so will result in a runtime error. (Reason: allowing
498 -- this would effectively allow a transaction inside a transaction, depending
499 -- on exactly when the thunk is evaluated.)
501 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
502 -- and which allows top-level TVars to be allocated.
504 atomically :: STM a -> IO a
505 atomically (STM m) = IO (\s -> (atomically# m) s )
507 -- |Retry execution of the current memory transaction because it has seen
508 -- values in TVars which mean that it should not continue (e.g. the TVars
509 -- represent a shared buffer that is now empty). The implementation may
510 -- block the thread until one of the TVars that it has read from has been
511 -- udpated. (GHC only)
513 retry = STM $ \s# -> retry# s#
515 -- |Compose two alternative STM actions (GHC only). If the first action
516 -- completes without retrying then it forms the result of the orElse.
517 -- Otherwise, if the first action retries, then the second action is
518 -- tried in its place. If both actions retry then the orElse as a
520 orElse :: STM a -> STM a -> STM a
521 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
523 -- |Exception handling within STM actions.
524 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
525 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
527 -- | Low-level primitive on which always and alwaysSucceeds are built.
528 -- checkInv differs form these in that (i) the invariant is not
529 -- checked when checkInv is called, only at the end of this and
530 -- subsequent transcations, (ii) the invariant failure is indicated
531 -- by raising an exception.
532 checkInv :: STM a -> STM ()
533 checkInv (STM m) = STM (\s -> (check# m) s)
535 -- | alwaysSucceeds adds a new invariant that must be true when passed
536 -- to alwaysSucceeds, at the end of the current transaction, and at
537 -- the end of every subsequent transaction. If it fails at any
538 -- of those points then the transaction violating it is aborted
539 -- and the exception raised by the invariant is propagated.
540 alwaysSucceeds :: STM a -> STM ()
541 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
544 -- | always is a variant of alwaysSucceeds in which the invariant is
545 -- expressed as an STM Bool action that must return True. Returning
546 -- False or raising an exception are both treated as invariant failures.
547 always :: STM Bool -> STM ()
548 always i = alwaysSucceeds ( do v <- i
549 if (v) then return () else ( error "Transacional invariant violation" ) )
551 -- |Shared memory locations that support atomic memory transactions.
552 data TVar a = TVar (TVar# RealWorld a)
554 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
556 instance Eq (TVar a) where
557 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
559 -- |Create a new TVar holding a value supplied
560 newTVar :: a -> STM (TVar a)
561 newTVar val = STM $ \s1# ->
562 case newTVar# val s1# of
563 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
565 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
566 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
567 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
569 newTVarIO :: a -> IO (TVar a)
570 newTVarIO val = IO $ \s1# ->
571 case newTVar# val s1# of
572 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
574 -- |Return the current value stored in a TVar.
575 -- This is equivalent to
577 -- > readTVarIO = atomically . readTVar
579 -- but works much faster, because it doesn't perform a complete
580 -- transaction, it just reads the current value of the 'TVar'.
581 readTVarIO :: TVar a -> IO a
582 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
584 -- |Return the current value stored in a TVar
585 readTVar :: TVar a -> STM a
586 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
588 -- |Write the supplied value into a TVar
589 writeTVar :: TVar a -> a -> STM ()
590 writeTVar (TVar tvar#) val = STM $ \s1# ->
591 case writeTVar# tvar# val s1# of
599 withMVar :: MVar a -> (a -> IO b) -> IO b
603 b <- catchAny (unblock (io a))
604 (\e -> do putMVar m a; throw e)
609 %************************************************************************
611 \subsection{Thread waiting}
613 %************************************************************************
616 #ifdef mingw32_HOST_OS
618 -- Note: threadWaitRead and threadWaitWrite aren't really functional
619 -- on Win32, but left in there because lib code (still) uses them (the manner
620 -- in which they're used doesn't cause problems on a Win32 platform though.)
622 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
623 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
624 IO $ \s -> case asyncRead# fd isSock len buf s of
625 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
627 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
628 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
629 IO $ \s -> case asyncWrite# fd isSock len buf s of
630 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
632 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
633 asyncDoProc (FunPtr proc) (Ptr param) =
634 -- the 'length' value is ignored; simplifies implementation of
635 -- the async*# primops to have them all return the same result.
636 IO $ \s -> case asyncDoProc# proc param s of
637 (# s', _len#, err# #) -> (# s', I# err# #)
639 -- to aid the use of these primops by the IO Handle implementation,
640 -- provide the following convenience funs:
642 -- this better be a pinned byte array!
643 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
644 asyncReadBA fd isSock len off bufB =
645 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
647 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
648 asyncWriteBA fd isSock len off bufB =
649 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
653 -- -----------------------------------------------------------------------------
656 -- | Block the current thread until data is available to read on the
657 -- given file descriptor (GHC only).
658 threadWaitRead :: Fd -> IO ()
660 #ifndef mingw32_HOST_OS
661 | threaded = waitForReadEvent fd
663 | otherwise = IO $ \s ->
664 case fromIntegral fd of { I# fd# ->
665 case waitRead# fd# s of { s' -> (# s', () #)
668 -- | Block the current thread until data can be written to the
669 -- given file descriptor (GHC only).
670 threadWaitWrite :: Fd -> IO ()
672 #ifndef mingw32_HOST_OS
673 | threaded = waitForWriteEvent fd
675 | otherwise = IO $ \s ->
676 case fromIntegral fd of { I# fd# ->
677 case waitWrite# fd# s of { s' -> (# s', () #)
680 -- | Suspends the current thread for a given number of microseconds
683 -- There is no guarantee that the thread will be rescheduled promptly
684 -- when the delay has expired, but the thread will never continue to
685 -- run /earlier/ than specified.
687 threadDelay :: Int -> IO ()
689 | threaded = waitForDelayEvent time
690 | otherwise = IO $ \s ->
691 case fromIntegral time of { I# time# ->
692 case delay# time# s of { s' -> (# s', () #)
696 -- | Set the value of returned TVar to True after a given number of
697 -- microseconds. The caveats associated with threadDelay also apply.
699 registerDelay :: Int -> IO (TVar Bool)
701 | threaded = waitForDelayEventSTM usecs
702 | otherwise = error "registerDelay: requires -threaded"
704 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
706 waitForDelayEvent :: Int -> IO ()
707 waitForDelayEvent usecs = do
709 target <- calculateTarget usecs
710 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
714 -- Delays for use in STM
715 waitForDelayEventSTM :: Int -> IO (TVar Bool)
716 waitForDelayEventSTM usecs = do
717 t <- atomically $ newTVar False
718 target <- calculateTarget usecs
719 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
723 calculateTarget :: Int -> IO USecs
724 calculateTarget usecs = do
726 return $ now + (fromIntegral usecs)
729 -- ----------------------------------------------------------------------------
730 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
732 -- In the threaded RTS, we employ a single IO Manager thread to wait
733 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
734 -- and delays (threadDelay).
736 -- We can do this because in the threaded RTS the IO Manager can make
737 -- a non-blocking call to select(), so we don't have to do select() in
738 -- the scheduler as we have to in the non-threaded RTS. We get performance
739 -- benefits from doing it this way, because we only have to restart the select()
740 -- when a new request arrives, rather than doing one select() each time
741 -- around the scheduler loop. Furthermore, the scheduler can be simplified
742 -- by not having to check for completed IO requests.
744 -- Issues, possible problems:
746 -- - we might want bound threads to just do the blocking
747 -- operation rather than communicating with the IO manager
748 -- thread. This would prevent simgle-threaded programs which do
749 -- IO from requiring multiple OS threads. However, it would also
750 -- prevent bound threads waiting on IO from being killed or sent
753 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
754 -- I couldn't repeat this.
756 -- - How do we handle signal delivery in the multithreaded RTS?
758 -- - forkProcess will kill the IO manager thread. Let's just
759 -- hope we don't need to do any blocking IO between fork & exec.
761 #ifndef mingw32_HOST_OS
763 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
764 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
768 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
769 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
771 #ifndef mingw32_HOST_OS
772 pendingEvents :: IORef [IOReq]
774 pendingDelays :: IORef [DelayReq]
775 -- could use a strict list or array here
776 {-# NOINLINE pendingEvents #-}
777 {-# NOINLINE pendingDelays #-}
778 (pendingEvents,pendingDelays) = unsafePerformIO $ do
783 -- the first time we schedule an IO request, the service thread
784 -- will be created (cool, huh?)
786 ensureIOManagerIsRunning :: IO ()
787 ensureIOManagerIsRunning
788 | threaded = seq pendingEvents $ return ()
789 | otherwise = return ()
791 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
792 insertDelay d [] = [d]
793 insertDelay d1 ds@(d2 : rest)
794 | delayTime d1 <= delayTime d2 = d1 : ds
795 | otherwise = d2 : insertDelay d1 rest
797 delayTime :: DelayReq -> USecs
798 delayTime (Delay t _) = t
799 delayTime (DelaySTM t _) = t
803 foreign import ccall unsafe "getUSecOfDay"
804 getUSecOfDay :: IO USecs
806 prodding :: IORef Bool
807 {-# NOINLINE prodding #-}
808 prodding = unsafePerformIO (newIORef False)
810 prodServiceThread :: IO ()
811 prodServiceThread = do
812 was_set <- atomicModifyIORef prodding (\a -> (True,a))
813 if (not (was_set)) then wakeupIOManager else return ()
815 #ifdef mingw32_HOST_OS
816 -- ----------------------------------------------------------------------------
817 -- Windows IO manager thread
819 startIOManagerThread :: IO ()
820 startIOManagerThread = do
821 wakeup <- c_getIOManagerEvent
822 forkIO $ service_loop wakeup []
825 service_loop :: HANDLE -- read end of pipe
826 -> [DelayReq] -- current delay requests
829 service_loop wakeup old_delays = do
830 -- pick up new delay requests
831 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
832 let delays = foldr insertDelay old_delays new_delays
835 (delays', timeout) <- getDelay now delays
837 r <- c_WaitForSingleObject wakeup timeout
839 0xffffffff -> do c_maperrno; throwErrno "service_loop"
841 r2 <- c_readIOManagerEvent
844 _ | r2 == io_MANAGER_WAKEUP -> return False
845 _ | r2 == io_MANAGER_DIE -> return True
846 0 -> return False -- spurious wakeup
847 _ -> do start_console_handler (r2 `shiftR` 1); return False
850 else service_cont wakeup delays'
852 _other -> service_cont wakeup delays' -- probably timeout
854 service_cont :: HANDLE -> [DelayReq] -> IO ()
855 service_cont wakeup delays = do
856 r <- atomicModifyIORef prodding (\_ -> (False,False))
857 r `seq` return () -- avoid space leak
858 service_loop wakeup delays
860 -- must agree with rts/win32/ThrIOManager.c
861 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
862 io_MANAGER_WAKEUP = 0xffffffff
863 io_MANAGER_DIE = 0xfffffffe
869 -- these are sent to Services only.
872 deriving (Eq, Ord, Enum, Show, Read, Typeable)
874 start_console_handler :: Word32 -> IO ()
875 start_console_handler r =
876 case toWin32ConsoleEvent r of
877 Just x -> withMVar win32ConsoleHandler $ \handler -> do
882 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
883 toWin32ConsoleEvent ev =
885 0 {- CTRL_C_EVENT-} -> Just ControlC
886 1 {- CTRL_BREAK_EVENT-} -> Just Break
887 2 {- CTRL_CLOSE_EVENT-} -> Just Close
888 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
889 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
892 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
893 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
895 -- XXX Is this actually needed?
896 stick :: IORef HANDLE
897 {-# NOINLINE stick #-}
898 stick = unsafePerformIO (newIORef nullPtr)
900 wakeupIOManager :: IO ()
902 _hdl <- readIORef stick
903 c_sendIOManagerEvent io_MANAGER_WAKEUP
905 -- Walk the queue of pending delays, waking up any that have passed
906 -- and return the smallest delay to wait for. The queue of pending
907 -- delays is kept ordered.
908 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
909 getDelay _ [] = return ([], iNFINITE)
910 getDelay now all@(d : rest)
912 Delay time m | now >= time -> do
915 DelaySTM time t | now >= time -> do
916 atomically $ writeTVar t True
919 -- delay is in millisecs for WaitForSingleObject
920 let micro_seconds = delayTime d - now
921 milli_seconds = (micro_seconds + 999) `div` 1000
922 in return (all, fromIntegral milli_seconds)
924 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
925 -- available yet. We should move some Win32 functionality down here,
926 -- maybe as part of the grand reorganisation of the base package...
931 iNFINITE = 0xFFFFFFFF -- urgh
933 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
934 c_getIOManagerEvent :: IO HANDLE
936 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
937 c_readIOManagerEvent :: IO Word32
939 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
940 c_sendIOManagerEvent :: Word32 -> IO ()
942 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
945 foreign import stdcall "WaitForSingleObject"
946 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
949 -- ----------------------------------------------------------------------------
950 -- Unix IO manager thread, using select()
952 startIOManagerThread :: IO ()
953 startIOManagerThread = do
954 allocaArray 2 $ \fds -> do
955 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
956 rd_end <- peekElemOff fds 0
957 wr_end <- peekElemOff fds 1
958 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
959 -- don't want them to block.
960 setCloseOnExec rd_end
961 setCloseOnExec wr_end
962 writeIORef stick (fromIntegral wr_end)
963 c_setIOManagerPipe wr_end
965 allocaBytes sizeofFdSet $ \readfds -> do
966 allocaBytes sizeofFdSet $ \writefds -> do
967 allocaBytes sizeofTimeVal $ \timeval -> do
968 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
972 :: Fd -- listen to this for wakeup calls
979 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
981 -- pick up new IO requests
982 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
983 let reqs = new_reqs ++ old_reqs
985 -- pick up new delay requests
986 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
987 let delays0 = foldr insertDelay old_delays new_delays
989 -- build the FDSets for select()
993 maxfd <- buildFdSets 0 readfds writefds reqs
995 -- perform the select()
996 let do_select delays = do
997 -- check the current time and wake up any thread in
998 -- threadDelay whose timeout has expired. Also find the
999 -- timeout value for the select() call.
1001 (delays', timeout) <- getDelay now ptimeval delays
1003 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1009 _ | err == eINTR -> do_select delays'
1010 -- EINTR: just redo the select()
1011 _ | err == eBADF -> return (True, delays)
1012 -- EBADF: one of the file descriptors is closed or bad,
1013 -- we don't know which one, so wake everyone up.
1014 _ | otherwise -> throwErrno "select"
1015 -- otherwise (ENOMEM or EINVAL) something has gone
1016 -- wrong; report the error.
1018 return (False,delays')
1020 (wakeup_all,delays') <- do_select delays0
1023 if wakeup_all then return False
1025 b <- fdIsSet wakeup readfds
1028 else alloca $ \p -> do
1029 c_read (fromIntegral wakeup) p 1
1032 _ | s == io_MANAGER_WAKEUP -> return False
1033 _ | s == io_MANAGER_DIE -> return True
1034 _ | s == io_MANAGER_SYNC -> do
1035 mvars <- readIORef sync
1036 mapM_ (flip putMVar ()) mvars
1039 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1040 withForeignPtr fp $ \p_siginfo -> do
1041 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1043 when (r /= fromIntegral sizeof_siginfo_t) $
1044 error "failed to read siginfo_t"
1045 runHandlers' fp (fromIntegral s)
1048 if exit then return () else do
1050 atomicModifyIORef prodding (\_ -> (False,False))
1052 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1053 else completeRequests reqs readfds writefds []
1055 service_loop wakeup readfds writefds ptimeval reqs' delays'
1057 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1058 io_MANAGER_WAKEUP = 0xff
1059 io_MANAGER_DIE = 0xfe
1060 io_MANAGER_SYNC = 0xfd
1062 -- | the stick is for poking the IO manager with
1064 {-# NOINLINE stick #-}
1065 stick = unsafePerformIO (newIORef 0)
1067 {-# NOINLINE sync #-}
1068 sync :: IORef [MVar ()]
1069 sync = unsafePerformIO (newIORef [])
1071 -- waits for the IO manager to drain the pipe
1072 syncIOManager :: IO ()
1075 atomicModifyIORef sync (\old -> (m:old,()))
1076 fd <- readIORef stick
1077 with io_MANAGER_SYNC $ \pbuf -> do
1078 c_write (fromIntegral fd) pbuf 1; return ()
1081 wakeupIOManager :: IO ()
1082 wakeupIOManager = do
1083 fd <- readIORef stick
1084 with io_MANAGER_WAKEUP $ \pbuf -> do
1085 c_write (fromIntegral fd) pbuf 1; return ()
1087 -- For the non-threaded RTS
1088 runHandlers :: Ptr Word8 -> Int -> IO ()
1089 runHandlers p_info sig = do
1090 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1091 withForeignPtr fp $ \p -> do
1092 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1094 runHandlers' fp (fromIntegral sig)
1096 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1097 runHandlers' p_info sig = do
1098 let int = fromIntegral sig
1099 withMVar signal_handlers $ \arr ->
1100 if not (inRange (boundsIOArray arr) int)
1102 else do handler <- unsafeReadIOArray arr int
1104 Nothing -> return ()
1105 Just (f,_) -> do forkIO (f p_info); return ()
1107 foreign import ccall "setIOManagerPipe"
1108 c_setIOManagerPipe :: CInt -> IO ()
1110 foreign import ccall "__hscore_sizeof_siginfo_t"
1111 sizeof_siginfo_t :: CSize
1117 type HandlerFun = ForeignPtr Word8 -> IO ()
1119 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1120 -- this race condition is #1922, although that bug was on Windows a similar
1121 -- bug also exists on Unix.
1122 {-# NOINLINE signal_handlers #-}
1123 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1124 signal_handlers = unsafePerformIO $ do
1125 arr <- newIOArray (0,maxSig) Nothing
1128 stable_ref <- newStablePtr m
1129 let ref = castStablePtrToPtr stable_ref
1130 ref2 <- getOrSetSignalHandlerStore ref
1133 else do freeStablePtr stable_ref
1134 deRefStablePtr (castPtrToStablePtr ref2)
1136 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1137 getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1139 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1140 setHandler sig handler = do
1141 let int = fromIntegral sig
1142 withMVar signal_handlers $ \arr ->
1143 if not (inRange (boundsIOArray arr) int)
1144 then error "GHC.Conc.setHandler: signal out of range"
1145 else do old <- unsafeReadIOArray arr int
1146 unsafeWriteIOArray arr int handler
1149 -- -----------------------------------------------------------------------------
1152 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1153 buildFdSets maxfd _ _ [] = return maxfd
1154 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1155 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1158 buildFdSets (max maxfd fd) readfds writefds reqs
1159 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1160 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1163 buildFdSets (max maxfd fd) readfds writefds reqs
1165 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1167 completeRequests [] _ _ reqs' = return reqs'
1168 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1169 b <- fdIsSet fd readfds
1171 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1172 else completeRequests reqs readfds writefds (Read fd m : reqs')
1173 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1174 b <- fdIsSet fd writefds
1176 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1177 else completeRequests reqs readfds writefds (Write fd m : reqs')
1179 wakeupAll :: [IOReq] -> IO ()
1180 wakeupAll [] = return ()
1181 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1182 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1184 waitForReadEvent :: Fd -> IO ()
1185 waitForReadEvent fd = do
1187 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1191 waitForWriteEvent :: Fd -> IO ()
1192 waitForWriteEvent fd = do
1194 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1198 -- -----------------------------------------------------------------------------
1201 -- Walk the queue of pending delays, waking up any that have passed
1202 -- and return the smallest delay to wait for. The queue of pending
1203 -- delays is kept ordered.
1204 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1205 getDelay _ _ [] = return ([],nullPtr)
1206 getDelay now ptimeval all@(d : rest)
1208 Delay time m | now >= time -> do
1210 getDelay now ptimeval rest
1211 DelaySTM time t | now >= time -> do
1212 atomically $ writeTVar t True
1213 getDelay now ptimeval rest
1215 setTimevalTicks ptimeval (delayTime d - now)
1216 return (all,ptimeval)
1220 foreign import ccall unsafe "sizeofTimeVal"
1221 sizeofTimeVal :: Int
1223 foreign import ccall unsafe "setTimevalTicks"
1224 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1227 On Win32 we're going to have a single Pipe, and a
1228 waitForSingleObject with the delay time. For signals, we send a
1229 byte down the pipe just like on Unix.
1232 -- ----------------------------------------------------------------------------
1233 -- select() interface
1235 -- ToDo: move to System.Posix.Internals?
1239 foreign import ccall safe "select"
1240 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1243 foreign import ccall unsafe "hsFD_SETSIZE"
1244 c_fD_SETSIZE :: CInt
1247 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1249 foreign import ccall unsafe "hsFD_ISSET"
1250 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1252 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1253 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1255 foreign import ccall unsafe "hsFD_SET"
1256 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1258 fdSet :: Fd -> Ptr CFdSet -> IO ()
1259 fdSet (Fd fd) fdset = c_fdSet fd fdset
1261 foreign import ccall unsafe "hsFD_ZERO"
1262 fdZero :: Ptr CFdSet -> IO ()
1264 foreign import ccall unsafe "sizeof_fd_set"
1269 reportStackOverflow :: IO a
1270 reportStackOverflow = do callStackOverflowHook; return undefined
1272 reportError :: SomeException -> IO a
1274 handler <- getUncaughtExceptionHandler
1278 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1279 -- the unsafe below.
1280 foreign import ccall unsafe "stackOverflow"
1281 callStackOverflowHook :: IO ()
1283 {-# NOINLINE uncaughtExceptionHandler #-}
1284 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1285 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1287 defaultHandler :: SomeException -> IO ()
1288 defaultHandler se@(SomeException ex) = do
1289 (hFlush stdout) `catchAny` (\ _ -> return ())
1290 let msg = case cast ex of
1291 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1292 _ -> case cast ex of
1293 Just (ErrorCall s) -> s
1294 _ -> showsPrec 0 se ""
1295 withCString "%s" $ \cfmt ->
1296 withCString msg $ \cmsg ->
1297 errorBelch cfmt cmsg
1299 -- don't use errorBelch() directly, because we cannot call varargs functions
1301 foreign import ccall unsafe "HsBase.h errorBelch2"
1302 errorBelch :: CString -> CString -> IO ()
1304 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1305 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1307 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1308 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler