2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
41 , labelThread -- :: ThreadId -> String -> IO ()
43 , ThreadStatus(..), BlockReason(..)
44 , threadStatus -- :: ThreadId -> IO ThreadStatus
47 , threadDelay -- :: Int -> IO ()
48 , registerDelay -- :: Int -> IO (TVar Bool)
49 , threadWaitRead -- :: Int -> IO ()
50 , threadWaitWrite -- :: Int -> IO ()
54 , newMVar -- :: a -> IO (MVar a)
55 , newEmptyMVar -- :: IO (MVar a)
56 , takeMVar -- :: MVar a -> IO a
57 , putMVar -- :: MVar a -> a -> IO ()
58 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
59 , tryPutMVar -- :: MVar a -> a -> IO Bool
60 , isEmptyMVar -- :: MVar a -> IO Bool
61 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
65 , atomically -- :: STM a -> IO a
67 , orElse -- :: STM a -> STM a -> STM a
68 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
69 , alwaysSucceeds -- :: STM a -> STM ()
70 , always -- :: STM Bool -> STM ()
72 , newTVar -- :: a -> STM (TVar a)
73 , newTVarIO -- :: a -> STM (TVar a)
74 , readTVar -- :: TVar a -> STM a
75 , writeTVar -- :: a -> TVar a -> STM ()
76 , unsafeIOToSTM -- :: IO a -> STM a
79 #ifdef mingw32_HOST_OS
80 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
84 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
85 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
88 #ifndef mingw32_HOST_OS
92 , ensureIOManagerIsRunning
94 #ifdef mingw32_HOST_OS
99 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
100 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
102 , reportError, reportStackOverflow
105 import System.Posix.Types
106 #ifndef mingw32_HOST_OS
107 import System.Posix.Internals
115 import {-# SOURCE #-} GHC.Handle
117 import GHC.Num ( Num(..) )
118 import GHC.Real ( fromIntegral )
119 #ifdef mingw32_HOST_OS
120 import GHC.Real ( div )
121 import GHC.Ptr ( plusPtr, FunPtr(..) )
123 #ifdef mingw32_HOST_OS
124 import GHC.Read ( Read )
125 import GHC.Enum ( Enum )
127 import GHC.Exception ( SomeException(..), throw )
128 import GHC.Pack ( packCString# )
129 import GHC.Ptr ( Ptr(..) )
131 import GHC.Show ( Show(..), showString )
135 infixr 0 `par`, `pseq`
138 %************************************************************************
140 \subsection{@ThreadId@, @par@, and @fork@}
142 %************************************************************************
145 data ThreadId = ThreadId ThreadId# deriving( Typeable )
146 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
147 -- But since ThreadId# is unlifted, the Weak type must use open
150 A 'ThreadId' is an abstract type representing a handle to a thread.
151 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
152 the 'Ord' instance implements an arbitrary total ordering over
153 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
154 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
155 useful when debugging or diagnosing the behaviour of a concurrent
158 /Note/: in GHC, if you have a 'ThreadId', you essentially have
159 a pointer to the thread itself. This means the thread itself can\'t be
160 garbage collected until you drop the 'ThreadId'.
161 This misfeature will hopefully be corrected at a later date.
163 /Note/: Hugs does not provide any operations on other threads;
164 it defines 'ThreadId' as a synonym for ().
167 instance Show ThreadId where
169 showString "ThreadId " .
170 showsPrec d (getThreadId (id2TSO t))
172 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
174 id2TSO :: ThreadId -> ThreadId#
175 id2TSO (ThreadId t) = t
177 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
180 cmpThread :: ThreadId -> ThreadId -> Ordering
182 case cmp_thread (id2TSO t1) (id2TSO t2) of
187 instance Eq ThreadId where
189 case t1 `cmpThread` t2 of
193 instance Ord ThreadId where
197 Sparks off a new thread to run the 'IO' computation passed as the
198 first argument, and returns the 'ThreadId' of the newly created
201 The new thread will be a lightweight thread; if you want to use a foreign
202 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
204 GHC note: the new thread inherits the /blocked/ state of the parent
205 (see 'Control.Exception.block').
207 forkIO :: IO () -> IO ThreadId
208 forkIO action = IO $ \ s ->
209 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
211 action_plus = catchException action childHandler
214 Like 'forkIO', but lets you specify on which CPU the thread is
215 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
216 will stay on the same CPU for its entire lifetime (`forkIO` threads
217 can migrate between CPUs according to the scheduling policy).
218 `forkOnIO` is useful for overriding the scheduling policy when you
219 know in advance how best to distribute the threads.
221 The `Int` argument specifies the CPU number; it is interpreted modulo
222 'numCapabilities' (note that it actually specifies a capability number
223 rather than a CPU number, but to a first approximation the two are
226 forkOnIO :: Int -> IO () -> IO ThreadId
227 forkOnIO (I# cpu) action = IO $ \ s ->
228 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
230 action_plus = catchException action childHandler
232 -- | the value passed to the @+RTS -N@ flag. This is the number of
233 -- Haskell threads that can run truly simultaneously at any given
234 -- time, and is typically set to the number of physical CPU cores on
236 numCapabilities :: Int
237 numCapabilities = unsafePerformIO $ do
238 n <- peek n_capabilities
239 return (fromIntegral n)
241 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
243 childHandler :: SomeException -> IO ()
244 childHandler err = catchException (real_handler err) childHandler
246 real_handler :: SomeException -> IO ()
247 real_handler se@(SomeException ex) =
248 -- ignore thread GC and killThread exceptions:
250 Just BlockedOnDeadMVar -> return ()
252 Just BlockedIndefinitely -> return ()
254 Just ThreadKilled -> return ()
256 -- report all others:
257 Just StackOverflow -> reportStackOverflow
260 {- | 'killThread' terminates the given thread (GHC only).
261 Any work already done by the thread isn\'t
262 lost: the computation is suspended until required by another thread.
263 The memory used by the thread will be garbage collected if it isn\'t
264 referenced from anywhere. The 'killThread' function is defined in
267 > killThread tid = throwTo tid ThreadKilled
269 Killthread is a no-op if the target thread has already completed.
271 killThread :: ThreadId -> IO ()
272 killThread tid = throwTo tid ThreadKilled
274 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
276 'throwTo' does not return until the exception has been raised in the
278 The calling thread can thus be certain that the target
279 thread has received the exception. This is a useful property to know
280 when dealing with race conditions: eg. if there are two threads that
281 can kill each other, it is guaranteed that only one of the threads
282 will get to kill the other.
284 If the target thread is currently making a foreign call, then the
285 exception will not be raised (and hence 'throwTo' will not return)
286 until the call has completed. This is the case regardless of whether
287 the call is inside a 'block' or not.
289 Important note: the behaviour of 'throwTo' differs from that described in
290 the paper \"Asynchronous exceptions in Haskell\"
291 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
292 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
293 a more synchronous design in which 'throwTo' does not return until the exception
294 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
295 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
298 There is currently no guarantee that the exception delivered by 'throwTo' will be
299 delivered at the first possible opportunity. In particular, if a thread may
300 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
301 a pending 'throwTo'. This is arguably undesirable behaviour.
304 throwTo :: Exception e => ThreadId -> e -> IO ()
305 throwTo (ThreadId tid) ex = IO $ \ s ->
306 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
308 -- | Returns the 'ThreadId' of the calling thread (GHC only).
309 myThreadId :: IO ThreadId
310 myThreadId = IO $ \s ->
311 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
314 -- |The 'yield' action allows (forces, in a co-operative multitasking
315 -- implementation) a context-switch to any other currently runnable
316 -- threads (if any), and is occasionally useful when implementing
317 -- concurrency abstractions.
320 case (yield# s) of s1 -> (# s1, () #)
322 {- | 'labelThread' stores a string as identifier for this thread if
323 you built a RTS with debugging support. This identifier will be used in
324 the debugging output to make distinction of different threads easier
325 (otherwise you only have the thread state object\'s address in the heap).
327 Other applications like the graphical Concurrent Haskell Debugger
328 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
329 'labelThread' for their purposes as well.
332 labelThread :: ThreadId -> String -> IO ()
333 labelThread (ThreadId t) str = IO $ \ s ->
334 let ps = packCString# str
335 adr = byteArrayContents# ps in
336 case (labelThread# t adr s) of s1 -> (# s1, () #)
338 -- Nota Bene: 'pseq' used to be 'seq'
339 -- but 'seq' is now defined in PrelGHC
341 -- "pseq" is defined a bit weirdly (see below)
343 -- The reason for the strange "lazy" call is that
344 -- it fools the compiler into thinking that pseq and par are non-strict in
345 -- their second argument (even if it inlines pseq at the call site).
346 -- If it thinks pseq is strict in "y", then it often evaluates
347 -- "y" before "x", which is totally wrong.
351 pseq x y = x `seq` lazy y
355 par x y = case (par# x) of { _ -> lazy y }
360 -- ^blocked on on 'MVar'
362 -- ^blocked on a computation in progress by another thread
364 -- ^blocked in 'throwTo'
366 -- ^blocked in 'retry' in an STM transaction
367 | BlockedOnForeignCall
368 -- ^currently in a foreign call
370 -- ^blocked on some other resource. Without @-threaded@,
371 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
372 -- they show up as 'BlockedOnMVar'.
373 deriving (Eq,Ord,Show)
375 -- | The current status of a thread
378 -- ^the thread is currently runnable or running
380 -- ^the thread has finished
381 | ThreadBlocked BlockReason
382 -- ^the thread is blocked on some resource
384 -- ^the thread received an uncaught exception
385 deriving (Eq,Ord,Show)
387 threadStatus :: ThreadId -> IO ThreadStatus
388 threadStatus (ThreadId t) = IO $ \s ->
389 case threadStatus# t s of
390 (# s', stat #) -> (# s', mk_stat (I# stat) #)
392 -- NB. keep these in sync with includes/Constants.h
393 mk_stat 0 = ThreadRunning
394 mk_stat 1 = ThreadBlocked BlockedOnMVar
395 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
396 mk_stat 3 = ThreadBlocked BlockedOnException
397 mk_stat 7 = ThreadBlocked BlockedOnSTM
398 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
399 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
400 mk_stat 16 = ThreadFinished
401 mk_stat 17 = ThreadDied
402 mk_stat _ = ThreadBlocked BlockedOnOther
406 %************************************************************************
408 \subsection[stm]{Transactional heap operations}
410 %************************************************************************
412 TVars are shared memory locations which support atomic memory
416 -- |A monad supporting atomic memory transactions.
417 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
419 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
422 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
424 instance Functor STM where
425 fmap f x = x >>= (return . f)
427 instance Monad STM where
428 {-# INLINE return #-}
432 return x = returnSTM x
433 m >>= k = bindSTM m k
435 bindSTM :: STM a -> (a -> STM b) -> STM b
436 bindSTM (STM m) k = STM ( \s ->
438 (# new_s, a #) -> unSTM (k a) new_s
441 thenSTM :: STM a -> STM b -> STM b
442 thenSTM (STM m) k = STM ( \s ->
444 (# new_s, _ #) -> unSTM k new_s
447 returnSTM :: a -> STM a
448 returnSTM x = STM (\s -> (# s, x #))
450 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
451 -- dangerous thing to do.
453 -- * The STM implementation will often run transactions multiple
454 -- times, so you need to be prepared for this if your IO has any
457 -- * The STM implementation will abort transactions that are known to
458 -- be invalid and need to be restarted. This may happen in the middle
459 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
460 -- that need releasing (exception handlers are ignored when aborting
461 -- the transaction). That includes doing any IO using Handles, for
462 -- example. Getting this wrong will probably lead to random deadlocks.
464 -- * The transaction may have seen an inconsistent view of memory when
465 -- the IO runs. Invariants that you expect to be true throughout
466 -- your program may not be true inside a transaction, due to the
467 -- way transactions are implemented. Normally this wouldn't be visible
468 -- to the programmer, but using `unsafeIOToSTM` can expose it.
470 unsafeIOToSTM :: IO a -> STM a
471 unsafeIOToSTM (IO m) = STM m
473 -- |Perform a series of STM actions atomically.
475 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
476 -- Any attempt to do so will result in a runtime error. (Reason: allowing
477 -- this would effectively allow a transaction inside a transaction, depending
478 -- on exactly when the thunk is evaluated.)
480 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
481 -- and which allows top-level TVars to be allocated.
483 atomically :: STM a -> IO a
484 atomically (STM m) = IO (\s -> (atomically# m) s )
486 -- |Retry execution of the current memory transaction because it has seen
487 -- values in TVars which mean that it should not continue (e.g. the TVars
488 -- represent a shared buffer that is now empty). The implementation may
489 -- block the thread until one of the TVars that it has read from has been
490 -- udpated. (GHC only)
492 retry = STM $ \s# -> retry# s#
494 -- |Compose two alternative STM actions (GHC only). If the first action
495 -- completes without retrying then it forms the result of the orElse.
496 -- Otherwise, if the first action retries, then the second action is
497 -- tried in its place. If both actions retry then the orElse as a
499 orElse :: STM a -> STM a -> STM a
500 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
502 -- |Exception handling within STM actions.
503 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
504 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
506 -- | Low-level primitive on which always and alwaysSucceeds are built.
507 -- checkInv differs form these in that (i) the invariant is not
508 -- checked when checkInv is called, only at the end of this and
509 -- subsequent transcations, (ii) the invariant failure is indicated
510 -- by raising an exception.
511 checkInv :: STM a -> STM ()
512 checkInv (STM m) = STM (\s -> (check# m) s)
514 -- | alwaysSucceeds adds a new invariant that must be true when passed
515 -- to alwaysSucceeds, at the end of the current transaction, and at
516 -- the end of every subsequent transaction. If it fails at any
517 -- of those points then the transaction violating it is aborted
518 -- and the exception raised by the invariant is propagated.
519 alwaysSucceeds :: STM a -> STM ()
520 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
523 -- | always is a variant of alwaysSucceeds in which the invariant is
524 -- expressed as an STM Bool action that must return True. Returning
525 -- False or raising an exception are both treated as invariant failures.
526 always :: STM Bool -> STM ()
527 always i = alwaysSucceeds ( do v <- i
528 if (v) then return () else ( error "Transacional invariant violation" ) )
530 -- |Shared memory locations that support atomic memory transactions.
531 data TVar a = TVar (TVar# RealWorld a)
533 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
535 instance Eq (TVar a) where
536 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
538 -- |Create a new TVar holding a value supplied
539 newTVar :: a -> STM (TVar a)
540 newTVar val = STM $ \s1# ->
541 case newTVar# val s1# of
542 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
544 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
545 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
546 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
548 newTVarIO :: a -> IO (TVar a)
549 newTVarIO val = IO $ \s1# ->
550 case newTVar# val s1# of
551 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
553 -- |Return the current value stored in a TVar
554 readTVar :: TVar a -> STM a
555 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
557 -- |Write the supplied value into a TVar
558 writeTVar :: TVar a -> a -> STM ()
559 writeTVar (TVar tvar#) val = STM $ \s1# ->
560 case writeTVar# tvar# val s1# of
565 %************************************************************************
567 \subsection[mvars]{M-Structures}
569 %************************************************************************
571 M-Vars are rendezvous points for concurrent threads. They begin
572 empty, and any attempt to read an empty M-Var blocks. When an M-Var
573 is written, a single blocked thread may be freed. Reading an M-Var
574 toggles its state from full back to empty. Therefore, any value
575 written to an M-Var may only be read once. Multiple reads and writes
576 are allowed, but there must be at least one read between any two
580 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
582 -- |Create an 'MVar' which is initially empty.
583 newEmptyMVar :: IO (MVar a)
584 newEmptyMVar = IO $ \ s# ->
586 (# s2#, svar# #) -> (# s2#, MVar svar# #)
588 -- |Create an 'MVar' which contains the supplied value.
589 newMVar :: a -> IO (MVar a)
591 newEmptyMVar >>= \ mvar ->
592 putMVar mvar value >>
595 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
596 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
597 -- the 'MVar' is left empty.
599 -- There are two further important properties of 'takeMVar':
601 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
602 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
603 -- only one thread will be woken up. The runtime guarantees that
604 -- the woken thread completes its 'takeMVar' operation.
606 -- * When multiple threads are blocked on an 'MVar', they are
607 -- woken up in FIFO order. This is useful for providing
608 -- fairness properties of abstractions built using 'MVar's.
610 takeMVar :: MVar a -> IO a
611 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
613 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
614 -- 'putMVar' will wait until it becomes empty.
616 -- There are two further important properties of 'putMVar':
618 -- * 'putMVar' is single-wakeup. That is, if there are multiple
619 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
620 -- only one thread will be woken up. The runtime guarantees that
621 -- the woken thread completes its 'putMVar' operation.
623 -- * When multiple threads are blocked on an 'MVar', they are
624 -- woken up in FIFO order. This is useful for providing
625 -- fairness properties of abstractions built using 'MVar's.
627 putMVar :: MVar a -> a -> IO ()
628 putMVar (MVar mvar#) x = IO $ \ s# ->
629 case putMVar# mvar# x s# of
632 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
633 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
634 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
635 -- the 'MVar' is left empty.
636 tryTakeMVar :: MVar a -> IO (Maybe a)
637 tryTakeMVar (MVar m) = IO $ \ s ->
638 case tryTakeMVar# m s of
639 (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty
640 (# s', _, a #) -> (# s', Just a #) -- MVar is full
642 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
643 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
644 -- it was successful, or 'False' otherwise.
645 tryPutMVar :: MVar a -> a -> IO Bool
646 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
647 case tryPutMVar# mvar# x s# of
648 (# s, 0# #) -> (# s, False #)
649 (# s, _ #) -> (# s, True #)
651 -- |Check whether a given 'MVar' is empty.
653 -- Notice that the boolean value returned is just a snapshot of
654 -- the state of the MVar. By the time you get to react on its result,
655 -- the MVar may have been filled (or emptied) - so be extremely
656 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
657 isEmptyMVar :: MVar a -> IO Bool
658 isEmptyMVar (MVar mv#) = IO $ \ s# ->
659 case isEmptyMVar# mv# s# of
660 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
662 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
663 -- "System.Mem.Weak" for more about finalizers.
664 addMVarFinalizer :: MVar a -> IO () -> IO ()
665 addMVarFinalizer (MVar m) finalizer =
666 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
668 withMVar :: MVar a -> (a -> IO b) -> IO b
672 b <- catchAny (unblock (io a))
673 (\e -> do putMVar m a; throw e)
679 %************************************************************************
681 \subsection{Thread waiting}
683 %************************************************************************
686 #ifdef mingw32_HOST_OS
688 -- Note: threadWaitRead and threadWaitWrite aren't really functional
689 -- on Win32, but left in there because lib code (still) uses them (the manner
690 -- in which they're used doesn't cause problems on a Win32 platform though.)
692 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
693 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
694 IO $ \s -> case asyncRead# fd isSock len buf s of
695 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
697 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
698 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
699 IO $ \s -> case asyncWrite# fd isSock len buf s of
700 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
702 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
703 asyncDoProc (FunPtr proc) (Ptr param) =
704 -- the 'length' value is ignored; simplifies implementation of
705 -- the async*# primops to have them all return the same result.
706 IO $ \s -> case asyncDoProc# proc param s of
707 (# s', _len#, err# #) -> (# s', I# err# #)
709 -- to aid the use of these primops by the IO Handle implementation,
710 -- provide the following convenience funs:
712 -- this better be a pinned byte array!
713 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
714 asyncReadBA fd isSock len off bufB =
715 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
717 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
718 asyncWriteBA fd isSock len off bufB =
719 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
723 -- -----------------------------------------------------------------------------
726 -- | Block the current thread until data is available to read on the
727 -- given file descriptor (GHC only).
728 threadWaitRead :: Fd -> IO ()
730 #ifndef mingw32_HOST_OS
731 | threaded = waitForReadEvent fd
733 | otherwise = IO $ \s ->
734 case fromIntegral fd of { I# fd# ->
735 case waitRead# fd# s of { s' -> (# s', () #)
738 -- | Block the current thread until data can be written to the
739 -- given file descriptor (GHC only).
740 threadWaitWrite :: Fd -> IO ()
742 #ifndef mingw32_HOST_OS
743 | threaded = waitForWriteEvent fd
745 | otherwise = IO $ \s ->
746 case fromIntegral fd of { I# fd# ->
747 case waitWrite# fd# s of { s' -> (# s', () #)
750 -- | Suspends the current thread for a given number of microseconds
753 -- There is no guarantee that the thread will be rescheduled promptly
754 -- when the delay has expired, but the thread will never continue to
755 -- run /earlier/ than specified.
757 threadDelay :: Int -> IO ()
759 | threaded = waitForDelayEvent time
760 | otherwise = IO $ \s ->
761 case fromIntegral time of { I# time# ->
762 case delay# time# s of { s' -> (# s', () #)
766 -- | Set the value of returned TVar to True after a given number of
767 -- microseconds. The caveats associated with threadDelay also apply.
769 registerDelay :: Int -> IO (TVar Bool)
771 | threaded = waitForDelayEventSTM usecs
772 | otherwise = error "registerDelay: requires -threaded"
774 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
776 waitForDelayEvent :: Int -> IO ()
777 waitForDelayEvent usecs = do
779 target <- calculateTarget usecs
780 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
784 -- Delays for use in STM
785 waitForDelayEventSTM :: Int -> IO (TVar Bool)
786 waitForDelayEventSTM usecs = do
787 t <- atomically $ newTVar False
788 target <- calculateTarget usecs
789 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
793 calculateTarget :: Int -> IO USecs
794 calculateTarget usecs = do
796 return $ now + (fromIntegral usecs)
799 -- ----------------------------------------------------------------------------
800 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
802 -- In the threaded RTS, we employ a single IO Manager thread to wait
803 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
804 -- and delays (threadDelay).
806 -- We can do this because in the threaded RTS the IO Manager can make
807 -- a non-blocking call to select(), so we don't have to do select() in
808 -- the scheduler as we have to in the non-threaded RTS. We get performance
809 -- benefits from doing it this way, because we only have to restart the select()
810 -- when a new request arrives, rather than doing one select() each time
811 -- around the scheduler loop. Furthermore, the scheduler can be simplified
812 -- by not having to check for completed IO requests.
814 -- Issues, possible problems:
816 -- - we might want bound threads to just do the blocking
817 -- operation rather than communicating with the IO manager
818 -- thread. This would prevent simgle-threaded programs which do
819 -- IO from requiring multiple OS threads. However, it would also
820 -- prevent bound threads waiting on IO from being killed or sent
823 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
824 -- I couldn't repeat this.
826 -- - How do we handle signal delivery in the multithreaded RTS?
828 -- - forkProcess will kill the IO manager thread. Let's just
829 -- hope we don't need to do any blocking IO between fork & exec.
831 #ifndef mingw32_HOST_OS
833 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
834 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
838 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
839 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
841 #ifndef mingw32_HOST_OS
842 pendingEvents :: IORef [IOReq]
844 pendingDelays :: IORef [DelayReq]
845 -- could use a strict list or array here
846 {-# NOINLINE pendingEvents #-}
847 {-# NOINLINE pendingDelays #-}
848 (pendingEvents,pendingDelays) = unsafePerformIO $ do
853 -- the first time we schedule an IO request, the service thread
854 -- will be created (cool, huh?)
856 ensureIOManagerIsRunning :: IO ()
857 ensureIOManagerIsRunning
858 | threaded = seq pendingEvents $ return ()
859 | otherwise = return ()
861 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
862 insertDelay d [] = [d]
863 insertDelay d1 ds@(d2 : rest)
864 | delayTime d1 <= delayTime d2 = d1 : ds
865 | otherwise = d2 : insertDelay d1 rest
867 delayTime :: DelayReq -> USecs
868 delayTime (Delay t _) = t
869 delayTime (DelaySTM t _) = t
873 -- XXX: move into GHC.IOBase from Data.IORef?
874 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
875 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
877 foreign import ccall unsafe "getUSecOfDay"
878 getUSecOfDay :: IO USecs
880 prodding :: IORef Bool
881 {-# NOINLINE prodding #-}
882 prodding = unsafePerformIO (newIORef False)
884 prodServiceThread :: IO ()
885 prodServiceThread = do
886 was_set <- atomicModifyIORef prodding (\a -> (True,a))
887 if (not (was_set)) then wakeupIOManager else return ()
889 #ifdef mingw32_HOST_OS
890 -- ----------------------------------------------------------------------------
891 -- Windows IO manager thread
893 startIOManagerThread :: IO ()
894 startIOManagerThread = do
895 wakeup <- c_getIOManagerEvent
896 forkIO $ service_loop wakeup []
899 service_loop :: HANDLE -- read end of pipe
900 -> [DelayReq] -- current delay requests
903 service_loop wakeup old_delays = do
904 -- pick up new delay requests
905 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
906 let delays = foldr insertDelay old_delays new_delays
909 (delays', timeout) <- getDelay now delays
911 r <- c_WaitForSingleObject wakeup timeout
913 0xffffffff -> do c_maperrno; throwErrno "service_loop"
915 r2 <- c_readIOManagerEvent
918 _ | r2 == io_MANAGER_WAKEUP -> return False
919 _ | r2 == io_MANAGER_DIE -> return True
920 0 -> return False -- spurious wakeup
921 _ -> do start_console_handler (r2 `shiftR` 1); return False
924 else service_cont wakeup delays'
926 _other -> service_cont wakeup delays' -- probably timeout
928 service_cont :: HANDLE -> [DelayReq] -> IO ()
929 service_cont wakeup delays = do
930 atomicModifyIORef prodding (\_ -> (False,False))
931 service_loop wakeup delays
933 -- must agree with rts/win32/ThrIOManager.c
934 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
935 io_MANAGER_WAKEUP = 0xffffffff
936 io_MANAGER_DIE = 0xfffffffe
942 -- these are sent to Services only.
945 deriving (Eq, Ord, Enum, Show, Read, Typeable)
947 start_console_handler :: Word32 -> IO ()
948 start_console_handler r =
949 case toWin32ConsoleEvent r of
950 Just x -> withMVar win32ConsoleHandler $ \handler -> do
955 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
956 toWin32ConsoleEvent ev =
958 0 {- CTRL_C_EVENT-} -> Just ControlC
959 1 {- CTRL_BREAK_EVENT-} -> Just Break
960 2 {- CTRL_CLOSE_EVENT-} -> Just Close
961 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
962 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
965 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
966 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
968 -- XXX Is this actually needed?
969 stick :: IORef HANDLE
970 {-# NOINLINE stick #-}
971 stick = unsafePerformIO (newIORef nullPtr)
973 wakeupIOManager :: IO ()
975 _hdl <- readIORef stick
976 c_sendIOManagerEvent io_MANAGER_WAKEUP
978 -- Walk the queue of pending delays, waking up any that have passed
979 -- and return the smallest delay to wait for. The queue of pending
980 -- delays is kept ordered.
981 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
982 getDelay _ [] = return ([], iNFINITE)
983 getDelay now all@(d : rest)
985 Delay time m | now >= time -> do
988 DelaySTM time t | now >= time -> do
989 atomically $ writeTVar t True
992 -- delay is in millisecs for WaitForSingleObject
993 let micro_seconds = delayTime d - now
994 milli_seconds = (micro_seconds + 999) `div` 1000
995 in return (all, fromIntegral milli_seconds)
997 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
998 -- available yet. We should move some Win32 functionality down here,
999 -- maybe as part of the grand reorganisation of the base package...
1000 type HANDLE = Ptr ()
1004 iNFINITE = 0xFFFFFFFF -- urgh
1006 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1007 c_getIOManagerEvent :: IO HANDLE
1009 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1010 c_readIOManagerEvent :: IO Word32
1012 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1013 c_sendIOManagerEvent :: Word32 -> IO ()
1015 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1018 foreign import stdcall "WaitForSingleObject"
1019 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1022 -- ----------------------------------------------------------------------------
1023 -- Unix IO manager thread, using select()
1025 startIOManagerThread :: IO ()
1026 startIOManagerThread = do
1027 allocaArray 2 $ \fds -> do
1028 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1029 rd_end <- peekElemOff fds 0
1030 wr_end <- peekElemOff fds 1
1031 writeIORef stick (fromIntegral wr_end)
1032 c_setIOManagerPipe wr_end
1034 allocaBytes sizeofFdSet $ \readfds -> do
1035 allocaBytes sizeofFdSet $ \writefds -> do
1036 allocaBytes sizeofTimeVal $ \timeval -> do
1037 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1041 :: Fd -- listen to this for wakeup calls
1048 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1050 -- pick up new IO requests
1051 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1052 let reqs = new_reqs ++ old_reqs
1054 -- pick up new delay requests
1055 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1056 let delays0 = foldr insertDelay old_delays new_delays
1058 -- build the FDSets for select()
1061 fdSet wakeup readfds
1062 maxfd <- buildFdSets 0 readfds writefds reqs
1064 -- perform the select()
1065 let do_select delays = do
1066 -- check the current time and wake up any thread in
1067 -- threadDelay whose timeout has expired. Also find the
1068 -- timeout value for the select() call.
1070 (delays', timeout) <- getDelay now ptimeval delays
1072 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1078 _ | err == eINTR -> do_select delays'
1079 -- EINTR: just redo the select()
1080 _ | err == eBADF -> return (True, delays)
1081 -- EBADF: one of the file descriptors is closed or bad,
1082 -- we don't know which one, so wake everyone up.
1083 _ | otherwise -> throwErrno "select"
1084 -- otherwise (ENOMEM or EINVAL) something has gone
1085 -- wrong; report the error.
1087 return (False,delays')
1089 (wakeup_all,delays') <- do_select delays0
1092 if wakeup_all then return False
1094 b <- fdIsSet wakeup readfds
1097 else alloca $ \p -> do
1098 c_read (fromIntegral wakeup) p 1; return ()
1101 _ | s == io_MANAGER_WAKEUP -> return False
1102 _ | s == io_MANAGER_DIE -> return True
1103 _ -> withMVar signalHandlerLock $ \_ -> do
1104 handler_tbl <- peek handlers
1105 sp <- peekElemOff handler_tbl (fromIntegral s)
1106 io <- deRefStablePtr sp
1110 if exit then return () else do
1112 atomicModifyIORef prodding (\_ -> (False,False))
1114 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1115 else completeRequests reqs readfds writefds []
1117 service_loop wakeup readfds writefds ptimeval reqs' delays'
1119 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1120 io_MANAGER_WAKEUP = 0xff
1121 io_MANAGER_DIE = 0xfe
1124 {-# NOINLINE stick #-}
1125 stick = unsafePerformIO (newIORef 0)
1127 wakeupIOManager :: IO ()
1128 wakeupIOManager = do
1129 fd <- readIORef stick
1130 with io_MANAGER_WAKEUP $ \pbuf -> do
1131 c_write (fromIntegral fd) pbuf 1; return ()
1133 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1134 -- this race condition is #1922, although that bug was on Windows a similar
1135 -- bug also exists on Unix.
1136 signalHandlerLock :: MVar ()
1137 signalHandlerLock = unsafePerformIO (newMVar ())
1139 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1141 foreign import ccall "setIOManagerPipe"
1142 c_setIOManagerPipe :: CInt -> IO ()
1144 -- -----------------------------------------------------------------------------
1147 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1148 buildFdSets maxfd _ _ [] = return maxfd
1149 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1150 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1153 buildFdSets (max maxfd fd) readfds writefds reqs
1154 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1155 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1158 buildFdSets (max maxfd fd) readfds writefds reqs
1160 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1162 completeRequests [] _ _ reqs' = return reqs'
1163 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1164 b <- fdIsSet fd readfds
1166 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1167 else completeRequests reqs readfds writefds (Read fd m : reqs')
1168 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1169 b <- fdIsSet fd writefds
1171 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1172 else completeRequests reqs readfds writefds (Write fd m : reqs')
1174 wakeupAll :: [IOReq] -> IO ()
1175 wakeupAll [] = return ()
1176 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1177 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1179 waitForReadEvent :: Fd -> IO ()
1180 waitForReadEvent fd = do
1182 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1186 waitForWriteEvent :: Fd -> IO ()
1187 waitForWriteEvent fd = do
1189 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1193 -- -----------------------------------------------------------------------------
1196 -- Walk the queue of pending delays, waking up any that have passed
1197 -- and return the smallest delay to wait for. The queue of pending
1198 -- delays is kept ordered.
1199 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1200 getDelay _ _ [] = return ([],nullPtr)
1201 getDelay now ptimeval all@(d : rest)
1203 Delay time m | now >= time -> do
1205 getDelay now ptimeval rest
1206 DelaySTM time t | now >= time -> do
1207 atomically $ writeTVar t True
1208 getDelay now ptimeval rest
1210 setTimevalTicks ptimeval (delayTime d - now)
1211 return (all,ptimeval)
1215 foreign import ccall unsafe "sizeofTimeVal"
1216 sizeofTimeVal :: Int
1218 foreign import ccall unsafe "setTimevalTicks"
1219 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1222 On Win32 we're going to have a single Pipe, and a
1223 waitForSingleObject with the delay time. For signals, we send a
1224 byte down the pipe just like on Unix.
1227 -- ----------------------------------------------------------------------------
1228 -- select() interface
1230 -- ToDo: move to System.Posix.Internals?
1234 foreign import ccall safe "select"
1235 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1238 foreign import ccall unsafe "hsFD_SETSIZE"
1239 c_fD_SETSIZE :: CInt
1242 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1244 foreign import ccall unsafe "hsFD_ISSET"
1245 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1247 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1248 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1250 foreign import ccall unsafe "hsFD_SET"
1251 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1253 fdSet :: Fd -> Ptr CFdSet -> IO ()
1254 fdSet (Fd fd) fdset = c_fdSet fd fdset
1256 foreign import ccall unsafe "hsFD_ZERO"
1257 fdZero :: Ptr CFdSet -> IO ()
1259 foreign import ccall unsafe "sizeof_fd_set"
1264 reportStackOverflow :: IO a
1265 reportStackOverflow = do callStackOverflowHook; return undefined
1267 reportError :: SomeException -> IO a
1269 handler <- getUncaughtExceptionHandler
1273 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1274 -- the unsafe below.
1275 foreign import ccall unsafe "stackOverflow"
1276 callStackOverflowHook :: IO ()
1278 {-# NOINLINE uncaughtExceptionHandler #-}
1279 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1280 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1282 defaultHandler :: SomeException -> IO ()
1283 defaultHandler se@(SomeException ex) = do
1284 (hFlush stdout) `catchAny` (\ _ -> return ())
1285 let msg = case cast ex of
1286 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1287 _ -> case cast ex of
1288 Just (ErrorCall s) -> s
1289 _ -> showsPrec 0 se ""
1290 withCString "%s" $ \cfmt ->
1291 withCString msg $ \cmsg ->
1292 errorBelch cfmt cmsg
1294 -- don't use errorBelch() directly, because we cannot call varargs functions
1296 foreign import ccall unsafe "HsBase.h errorBelch2"
1297 errorBelch :: CString -> CString -> IO ()
1299 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1300 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1302 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1303 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler