2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
7 -- Copyright : (c) The University of Glasgow, 1994-2002
8 -- License : see libraries/base/LICENSE
10 -- Maintainer : cvs-ghc@haskell.org
11 -- Stability : internal
12 -- Portability : non-portable (GHC extensions)
14 -- Basic concurrency stuff.
16 -----------------------------------------------------------------------------
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home. Hence:
29 -- * Forking and suchlike
30 , forkIO -- :: IO a -> IO ThreadId
31 , forkOnIO -- :: Int -> IO a -> IO ThreadId
32 , numCapabilities -- :: Int
33 , childHandler -- :: Exception -> IO ()
34 , myThreadId -- :: IO ThreadId
35 , killThread -- :: ThreadId -> IO ()
36 , throwTo -- :: ThreadId -> Exception -> IO ()
37 , par -- :: a -> b -> b
38 , pseq -- :: a -> b -> b
40 , labelThread -- :: ThreadId -> String -> IO ()
42 , ThreadStatus(..), BlockReason(..)
43 , threadStatus -- :: ThreadId -> IO ThreadStatus
46 , threadDelay -- :: Int -> IO ()
47 , registerDelay -- :: Int -> IO (TVar Bool)
48 , threadWaitRead -- :: Int -> IO ()
49 , threadWaitWrite -- :: Int -> IO ()
53 , newMVar -- :: a -> IO (MVar a)
54 , newEmptyMVar -- :: IO (MVar a)
55 , takeMVar -- :: MVar a -> IO a
56 , putMVar -- :: MVar a -> a -> IO ()
57 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
58 , tryPutMVar -- :: MVar a -> a -> IO Bool
59 , isEmptyMVar -- :: MVar a -> IO Bool
60 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
64 , atomically -- :: STM a -> IO a
66 , orElse -- :: STM a -> STM a -> STM a
67 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
68 , alwaysSucceeds -- :: STM a -> STM ()
69 , always -- :: STM Bool -> STM ()
71 , newTVar -- :: a -> STM (TVar a)
72 , newTVarIO -- :: a -> STM (TVar a)
73 , readTVar -- :: TVar a -> STM a
74 , writeTVar -- :: a -> TVar a -> STM ()
75 , unsafeIOToSTM -- :: IO a -> STM a
78 #ifdef mingw32_HOST_OS
79 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
83 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #ifndef mingw32_HOST_OS
91 , ensureIOManagerIsRunning
93 #ifdef mingw32_HOST_OS
98 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
99 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
101 , reportError, reportStackOverflow
104 import System.Posix.Types
105 #ifndef mingw32_HOST_OS
106 import System.Posix.Internals
114 import {-# SOURCE #-} GHC.Handle
116 import GHC.Num ( Num(..) )
117 import GHC.Real ( fromIntegral )
118 #ifdef mingw32_HOST_OS
119 import GHC.Real ( div )
120 import GHC.Ptr ( plusPtr, FunPtr(..) )
122 #ifdef mingw32_HOST_OS
123 import GHC.Read ( Read )
124 import GHC.Enum ( Enum )
126 import GHC.Exception ( SomeException(..), throw )
127 import GHC.Pack ( packCString# )
128 import GHC.Ptr ( Ptr(..) )
130 import GHC.Show ( Show(..), showString )
134 infixr 0 `par`, `pseq`
137 %************************************************************************
139 \subsection{@ThreadId@, @par@, and @fork@}
141 %************************************************************************
144 data ThreadId = ThreadId ThreadId# deriving( Typeable )
145 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
146 -- But since ThreadId# is unlifted, the Weak type must use open
149 A 'ThreadId' is an abstract type representing a handle to a thread.
150 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
151 the 'Ord' instance implements an arbitrary total ordering over
152 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
153 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
154 useful when debugging or diagnosing the behaviour of a concurrent
157 /Note/: in GHC, if you have a 'ThreadId', you essentially have
158 a pointer to the thread itself. This means the thread itself can\'t be
159 garbage collected until you drop the 'ThreadId'.
160 This misfeature will hopefully be corrected at a later date.
162 /Note/: Hugs does not provide any operations on other threads;
163 it defines 'ThreadId' as a synonym for ().
166 instance Show ThreadId where
168 showString "ThreadId " .
169 showsPrec d (getThreadId (id2TSO t))
171 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
173 id2TSO :: ThreadId -> ThreadId#
174 id2TSO (ThreadId t) = t
176 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
179 cmpThread :: ThreadId -> ThreadId -> Ordering
181 case cmp_thread (id2TSO t1) (id2TSO t2) of
186 instance Eq ThreadId where
188 case t1 `cmpThread` t2 of
192 instance Ord ThreadId where
196 Sparks off a new thread to run the 'IO' computation passed as the
197 first argument, and returns the 'ThreadId' of the newly created
200 The new thread will be a lightweight thread; if you want to use a foreign
201 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
203 GHC note: the new thread inherits the /blocked/ state of the parent
204 (see 'Control.Exception.block').
206 forkIO :: IO () -> IO ThreadId
207 forkIO action = IO $ \ s ->
208 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
210 action_plus = catchException action childHandler
213 Like 'forkIO', but lets you specify on which CPU the thread is
214 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
215 will stay on the same CPU for its entire lifetime (`forkIO` threads
216 can migrate between CPUs according to the scheduling policy).
217 `forkOnIO` is useful for overriding the scheduling policy when you
218 know in advance how best to distribute the threads.
220 The `Int` argument specifies the CPU number; it is interpreted modulo
221 'numCapabilities' (note that it actually specifies a capability number
222 rather than a CPU number, but to a first approximation the two are
225 forkOnIO :: Int -> IO () -> IO ThreadId
226 forkOnIO (I# cpu) action = IO $ \ s ->
227 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
229 action_plus = catchException action childHandler
231 -- | the value passed to the @+RTS -N@ flag. This is the number of
232 -- Haskell threads that can run truly simultaneously at any given
233 -- time, and is typically set to the number of physical CPU cores on
235 numCapabilities :: Int
236 numCapabilities = unsafePerformIO $ do
237 n <- peek n_capabilities
238 return (fromIntegral n)
240 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
242 childHandler :: SomeException -> IO ()
243 childHandler err = catchException (real_handler err) childHandler
245 real_handler :: SomeException -> IO ()
246 real_handler se@(SomeException ex) =
247 -- ignore thread GC and killThread exceptions:
249 Just BlockedOnDeadMVar -> return ()
251 Just BlockedIndefinitely -> return ()
253 Just ThreadKilled -> return ()
255 -- report all others:
256 Just StackOverflow -> reportStackOverflow
259 {- | 'killThread' terminates the given thread (GHC only).
260 Any work already done by the thread isn\'t
261 lost: the computation is suspended until required by another thread.
262 The memory used by the thread will be garbage collected if it isn\'t
263 referenced from anywhere. The 'killThread' function is defined in
266 > killThread tid = throwTo tid (AsyncException ThreadKilled)
269 killThread :: ThreadId -> IO ()
270 killThread tid = throwTo tid (toException ThreadKilled)
272 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
274 'throwTo' does not return until the exception has been raised in the
276 The calling thread can thus be certain that the target
277 thread has received the exception. This is a useful property to know
278 when dealing with race conditions: eg. if there are two threads that
279 can kill each other, it is guaranteed that only one of the threads
280 will get to kill the other.
282 If the target thread is currently making a foreign call, then the
283 exception will not be raised (and hence 'throwTo' will not return)
284 until the call has completed. This is the case regardless of whether
285 the call is inside a 'block' or not.
287 Important note: the behaviour of 'throwTo' differs from that described in
288 the paper \"Asynchronous exceptions in Haskell\"
289 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
290 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
291 a more synchronous design in which 'throwTo' does not return until the exception
292 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
293 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
296 There is currently no guarantee that the exception delivered by 'throwTo' will be
297 delivered at the first possible opportunity. In particular, if a thread may
298 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
299 a pending 'throwTo'. This is arguably undesirable behaviour.
302 -- XXX This is duplicated in Control.{Old,}Exception
303 throwTo :: ThreadId -> SomeException -> IO ()
304 throwTo (ThreadId id) ex = IO $ \ s ->
305 case (killThread# id ex s) of s1 -> (# s1, () #)
307 -- | Returns the 'ThreadId' of the calling thread (GHC only).
308 myThreadId :: IO ThreadId
309 myThreadId = IO $ \s ->
310 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
313 -- |The 'yield' action allows (forces, in a co-operative multitasking
314 -- implementation) a context-switch to any other currently runnable
315 -- threads (if any), and is occasionally useful when implementing
316 -- concurrency abstractions.
319 case (yield# s) of s1 -> (# s1, () #)
321 {- | 'labelThread' stores a string as identifier for this thread if
322 you built a RTS with debugging support. This identifier will be used in
323 the debugging output to make distinction of different threads easier
324 (otherwise you only have the thread state object\'s address in the heap).
326 Other applications like the graphical Concurrent Haskell Debugger
327 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
328 'labelThread' for their purposes as well.
331 labelThread :: ThreadId -> String -> IO ()
332 labelThread (ThreadId t) str = IO $ \ s ->
333 let ps = packCString# str
334 adr = byteArrayContents# ps in
335 case (labelThread# t adr s) of s1 -> (# s1, () #)
337 -- Nota Bene: 'pseq' used to be 'seq'
338 -- but 'seq' is now defined in PrelGHC
340 -- "pseq" is defined a bit weirdly (see below)
342 -- The reason for the strange "lazy" call is that
343 -- it fools the compiler into thinking that pseq and par are non-strict in
344 -- their second argument (even if it inlines pseq at the call site).
345 -- If it thinks pseq is strict in "y", then it often evaluates
346 -- "y" before "x", which is totally wrong.
350 pseq x y = x `seq` lazy y
354 par x y = case (par# x) of { _ -> lazy y }
359 -- ^blocked on on 'MVar'
361 -- ^blocked on a computation in progress by another thread
363 -- ^blocked in 'throwTo'
365 -- ^blocked in 'retry' in an STM transaction
366 | BlockedOnForeignCall
367 -- ^currently in a foreign call
369 -- ^blocked on some other resource. Without @-threaded@,
370 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
371 -- they show up as 'BlockedOnMVar'.
372 deriving (Eq,Ord,Show)
374 -- | The current status of a thread
377 -- ^the thread is currently runnable or running
379 -- ^the thread has finished
380 | ThreadBlocked BlockReason
381 -- ^the thread is blocked on some resource
383 -- ^the thread received an uncaught exception
384 deriving (Eq,Ord,Show)
386 threadStatus :: ThreadId -> IO ThreadStatus
387 threadStatus (ThreadId t) = IO $ \s ->
388 case threadStatus# t s of
389 (# s', stat #) -> (# s', mk_stat (I# stat) #)
391 -- NB. keep these in sync with includes/Constants.h
392 mk_stat 0 = ThreadRunning
393 mk_stat 1 = ThreadBlocked BlockedOnMVar
394 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
395 mk_stat 3 = ThreadBlocked BlockedOnException
396 mk_stat 7 = ThreadBlocked BlockedOnSTM
397 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
398 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
399 mk_stat 16 = ThreadFinished
400 mk_stat 17 = ThreadDied
401 mk_stat _ = ThreadBlocked BlockedOnOther
405 %************************************************************************
407 \subsection[stm]{Transactional heap operations}
409 %************************************************************************
411 TVars are shared memory locations which support atomic memory
415 -- |A monad supporting atomic memory transactions.
416 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
418 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
421 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
423 instance Functor STM where
424 fmap f x = x >>= (return . f)
426 instance Monad STM where
427 {-# INLINE return #-}
431 return x = returnSTM x
432 m >>= k = bindSTM m k
434 bindSTM :: STM a -> (a -> STM b) -> STM b
435 bindSTM (STM m) k = STM ( \s ->
437 (# new_s, a #) -> unSTM (k a) new_s
440 thenSTM :: STM a -> STM b -> STM b
441 thenSTM (STM m) k = STM ( \s ->
443 (# new_s, a #) -> unSTM k new_s
446 returnSTM :: a -> STM a
447 returnSTM x = STM (\s -> (# s, x #))
449 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
450 -- dangerous thing to do.
452 -- * The STM implementation will often run transactions multiple
453 -- times, so you need to be prepared for this if your IO has any
456 -- * The STM implementation will abort transactions that are known to
457 -- be invalid and need to be restarted. This may happen in the middle
458 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
459 -- that need releasing (exception handlers are ignored when aborting
460 -- the transaction). That includes doing any IO using Handles, for
461 -- example. Getting this wrong will probably lead to random deadlocks.
463 -- * The transaction may have seen an inconsistent view of memory when
464 -- the IO runs. Invariants that you expect to be true throughout
465 -- your program may not be true inside a transaction, due to the
466 -- way transactions are implemented. Normally this wouldn't be visible
467 -- to the programmer, but using `unsafeIOToSTM` can expose it.
469 unsafeIOToSTM :: IO a -> STM a
470 unsafeIOToSTM (IO m) = STM m
472 -- |Perform a series of STM actions atomically.
474 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
475 -- Any attempt to do so will result in a runtime error. (Reason: allowing
476 -- this would effectively allow a transaction inside a transaction, depending
477 -- on exactly when the thunk is evaluated.)
479 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
480 -- and which allows top-level TVars to be allocated.
482 atomically :: STM a -> IO a
483 atomically (STM m) = IO (\s -> (atomically# m) s )
485 -- |Retry execution of the current memory transaction because it has seen
486 -- values in TVars which mean that it should not continue (e.g. the TVars
487 -- represent a shared buffer that is now empty). The implementation may
488 -- block the thread until one of the TVars that it has read from has been
489 -- udpated. (GHC only)
491 retry = STM $ \s# -> retry# s#
493 -- |Compose two alternative STM actions (GHC only). If the first action
494 -- completes without retrying then it forms the result of the orElse.
495 -- Otherwise, if the first action retries, then the second action is
496 -- tried in its place. If both actions retry then the orElse as a
498 orElse :: STM a -> STM a -> STM a
499 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
501 -- |Exception handling within STM actions.
502 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
503 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
505 -- | Low-level primitive on which always and alwaysSucceeds are built.
506 -- checkInv differs form these in that (i) the invariant is not
507 -- checked when checkInv is called, only at the end of this and
508 -- subsequent transcations, (ii) the invariant failure is indicated
509 -- by raising an exception.
510 checkInv :: STM a -> STM ()
511 checkInv (STM m) = STM (\s -> (check# m) s)
513 -- | alwaysSucceeds adds a new invariant that must be true when passed
514 -- to alwaysSucceeds, at the end of the current transaction, and at
515 -- the end of every subsequent transaction. If it fails at any
516 -- of those points then the transaction violating it is aborted
517 -- and the exception raised by the invariant is propagated.
518 alwaysSucceeds :: STM a -> STM ()
519 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
522 -- | always is a variant of alwaysSucceeds in which the invariant is
523 -- expressed as an STM Bool action that must return True. Returning
524 -- False or raising an exception are both treated as invariant failures.
525 always :: STM Bool -> STM ()
526 always i = alwaysSucceeds ( do v <- i
527 if (v) then return () else ( error "Transacional invariant violation" ) )
529 -- |Shared memory locations that support atomic memory transactions.
530 data TVar a = TVar (TVar# RealWorld a)
532 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
534 instance Eq (TVar a) where
535 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
537 -- |Create a new TVar holding a value supplied
538 newTVar :: a -> STM (TVar a)
539 newTVar val = STM $ \s1# ->
540 case newTVar# val s1# of
541 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
543 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
544 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
545 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
547 newTVarIO :: a -> IO (TVar a)
548 newTVarIO val = IO $ \s1# ->
549 case newTVar# val s1# of
550 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
552 -- |Return the current value stored in a TVar
553 readTVar :: TVar a -> STM a
554 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
556 -- |Write the supplied value into a TVar
557 writeTVar :: TVar a -> a -> STM ()
558 writeTVar (TVar tvar#) val = STM $ \s1# ->
559 case writeTVar# tvar# val s1# of
564 %************************************************************************
566 \subsection[mvars]{M-Structures}
568 %************************************************************************
570 M-Vars are rendezvous points for concurrent threads. They begin
571 empty, and any attempt to read an empty M-Var blocks. When an M-Var
572 is written, a single blocked thread may be freed. Reading an M-Var
573 toggles its state from full back to empty. Therefore, any value
574 written to an M-Var may only be read once. Multiple reads and writes
575 are allowed, but there must be at least one read between any two
579 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
581 -- |Create an 'MVar' which is initially empty.
582 newEmptyMVar :: IO (MVar a)
583 newEmptyMVar = IO $ \ s# ->
585 (# s2#, svar# #) -> (# s2#, MVar svar# #)
587 -- |Create an 'MVar' which contains the supplied value.
588 newMVar :: a -> IO (MVar a)
590 newEmptyMVar >>= \ mvar ->
591 putMVar mvar value >>
594 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
595 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
596 -- the 'MVar' is left empty.
598 -- There are two further important properties of 'takeMVar':
600 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
601 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
602 -- only one thread will be woken up. The runtime guarantees that
603 -- the woken thread completes its 'takeMVar' operation.
605 -- * When multiple threads are blocked on an 'MVar', they are
606 -- woken up in FIFO order. This is useful for providing
607 -- fairness properties of abstractions built using 'MVar's.
609 takeMVar :: MVar a -> IO a
610 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
612 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
613 -- 'putMVar' will wait until it becomes empty.
615 -- There are two further important properties of 'putMVar':
617 -- * 'putMVar' is single-wakeup. That is, if there are multiple
618 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
619 -- only one thread will be woken up. The runtime guarantees that
620 -- the woken thread completes its 'putMVar' operation.
622 -- * When multiple threads are blocked on an 'MVar', they are
623 -- woken up in FIFO order. This is useful for providing
624 -- fairness properties of abstractions built using 'MVar's.
626 putMVar :: MVar a -> a -> IO ()
627 putMVar (MVar mvar#) x = IO $ \ s# ->
628 case putMVar# mvar# x s# of
631 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
632 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
633 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
634 -- the 'MVar' is left empty.
635 tryTakeMVar :: MVar a -> IO (Maybe a)
636 tryTakeMVar (MVar m) = IO $ \ s ->
637 case tryTakeMVar# m s of
638 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
639 (# s, _, a #) -> (# s, Just a #) -- MVar is full
641 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
642 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
643 -- it was successful, or 'False' otherwise.
644 tryPutMVar :: MVar a -> a -> IO Bool
645 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
646 case tryPutMVar# mvar# x s# of
647 (# s, 0# #) -> (# s, False #)
648 (# s, _ #) -> (# s, True #)
650 -- |Check whether a given 'MVar' is empty.
652 -- Notice that the boolean value returned is just a snapshot of
653 -- the state of the MVar. By the time you get to react on its result,
654 -- the MVar may have been filled (or emptied) - so be extremely
655 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
656 isEmptyMVar :: MVar a -> IO Bool
657 isEmptyMVar (MVar mv#) = IO $ \ s# ->
658 case isEmptyMVar# mv# s# of
659 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
661 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
662 -- "System.Mem.Weak" for more about finalizers.
663 addMVarFinalizer :: MVar a -> IO () -> IO ()
664 addMVarFinalizer (MVar m) finalizer =
665 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
667 withMVar :: MVar a -> (a -> IO b) -> IO b
671 b <- catchAny (unblock (io a))
672 (\e -> do putMVar m a; throw e)
678 %************************************************************************
680 \subsection{Thread waiting}
682 %************************************************************************
685 #ifdef mingw32_HOST_OS
687 -- Note: threadWaitRead and threadWaitWrite aren't really functional
688 -- on Win32, but left in there because lib code (still) uses them (the manner
689 -- in which they're used doesn't cause problems on a Win32 platform though.)
691 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
692 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
693 IO $ \s -> case asyncRead# fd isSock len buf s of
694 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
696 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
697 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
698 IO $ \s -> case asyncWrite# fd isSock len buf s of
699 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
701 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
702 asyncDoProc (FunPtr proc) (Ptr param) =
703 -- the 'length' value is ignored; simplifies implementation of
704 -- the async*# primops to have them all return the same result.
705 IO $ \s -> case asyncDoProc# proc param s of
706 (# s, len#, err# #) -> (# s, I# err# #)
708 -- to aid the use of these primops by the IO Handle implementation,
709 -- provide the following convenience funs:
711 -- this better be a pinned byte array!
712 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
713 asyncReadBA fd isSock len off bufB =
714 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
716 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
717 asyncWriteBA fd isSock len off bufB =
718 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
722 -- -----------------------------------------------------------------------------
725 -- | Block the current thread until data is available to read on the
726 -- given file descriptor (GHC only).
727 threadWaitRead :: Fd -> IO ()
729 #ifndef mingw32_HOST_OS
730 | threaded = waitForReadEvent fd
732 | otherwise = IO $ \s ->
733 case fromIntegral fd of { I# fd# ->
734 case waitRead# fd# s of { s -> (# s, () #)
737 -- | Block the current thread until data can be written to the
738 -- given file descriptor (GHC only).
739 threadWaitWrite :: Fd -> IO ()
741 #ifndef mingw32_HOST_OS
742 | threaded = waitForWriteEvent fd
744 | otherwise = IO $ \s ->
745 case fromIntegral fd of { I# fd# ->
746 case waitWrite# fd# s of { s -> (# s, () #)
749 -- | Suspends the current thread for a given number of microseconds
752 -- There is no guarantee that the thread will be rescheduled promptly
753 -- when the delay has expired, but the thread will never continue to
754 -- run /earlier/ than specified.
756 threadDelay :: Int -> IO ()
758 | threaded = waitForDelayEvent time
759 | otherwise = IO $ \s ->
760 case fromIntegral time of { I# time# ->
761 case delay# time# s of { s -> (# s, () #)
765 -- | Set the value of returned TVar to True after a given number of
766 -- microseconds. The caveats associated with threadDelay also apply.
768 registerDelay :: Int -> IO (TVar Bool)
770 | threaded = waitForDelayEventSTM usecs
771 | otherwise = error "registerDelay: requires -threaded"
773 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
775 waitForDelayEvent :: Int -> IO ()
776 waitForDelayEvent usecs = do
778 target <- calculateTarget usecs
779 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
783 -- Delays for use in STM
784 waitForDelayEventSTM :: Int -> IO (TVar Bool)
785 waitForDelayEventSTM usecs = do
786 t <- atomically $ newTVar False
787 target <- calculateTarget usecs
788 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
792 calculateTarget :: Int -> IO USecs
793 calculateTarget usecs = do
795 return $ now + (fromIntegral usecs)
798 -- ----------------------------------------------------------------------------
799 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
801 -- In the threaded RTS, we employ a single IO Manager thread to wait
802 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
803 -- and delays (threadDelay).
805 -- We can do this because in the threaded RTS the IO Manager can make
806 -- a non-blocking call to select(), so we don't have to do select() in
807 -- the scheduler as we have to in the non-threaded RTS. We get performance
808 -- benefits from doing it this way, because we only have to restart the select()
809 -- when a new request arrives, rather than doing one select() each time
810 -- around the scheduler loop. Furthermore, the scheduler can be simplified
811 -- by not having to check for completed IO requests.
813 -- Issues, possible problems:
815 -- - we might want bound threads to just do the blocking
816 -- operation rather than communicating with the IO manager
817 -- thread. This would prevent simgle-threaded programs which do
818 -- IO from requiring multiple OS threads. However, it would also
819 -- prevent bound threads waiting on IO from being killed or sent
822 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
823 -- I couldn't repeat this.
825 -- - How do we handle signal delivery in the multithreaded RTS?
827 -- - forkProcess will kill the IO manager thread. Let's just
828 -- hope we don't need to do any blocking IO between fork & exec.
830 #ifndef mingw32_HOST_OS
832 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
833 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
837 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
838 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
840 #ifndef mingw32_HOST_OS
841 pendingEvents :: IORef [IOReq]
843 pendingDelays :: IORef [DelayReq]
844 -- could use a strict list or array here
845 {-# NOINLINE pendingEvents #-}
846 {-# NOINLINE pendingDelays #-}
847 (pendingEvents,pendingDelays) = unsafePerformIO $ do
852 -- the first time we schedule an IO request, the service thread
853 -- will be created (cool, huh?)
855 ensureIOManagerIsRunning :: IO ()
856 ensureIOManagerIsRunning
857 | threaded = seq pendingEvents $ return ()
858 | otherwise = return ()
860 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
861 insertDelay d [] = [d]
862 insertDelay d1 ds@(d2 : rest)
863 | delayTime d1 <= delayTime d2 = d1 : ds
864 | otherwise = d2 : insertDelay d1 rest
866 delayTime :: DelayReq -> USecs
867 delayTime (Delay t _) = t
868 delayTime (DelaySTM t _) = t
872 -- XXX: move into GHC.IOBase from Data.IORef?
873 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
874 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
876 foreign import ccall unsafe "getUSecOfDay"
877 getUSecOfDay :: IO USecs
879 prodding :: IORef Bool
880 {-# NOINLINE prodding #-}
881 prodding = unsafePerformIO (newIORef False)
883 prodServiceThread :: IO ()
884 prodServiceThread = do
885 was_set <- atomicModifyIORef prodding (\a -> (True,a))
886 if (not (was_set)) then wakeupIOManager else return ()
888 #ifdef mingw32_HOST_OS
889 -- ----------------------------------------------------------------------------
890 -- Windows IO manager thread
892 startIOManagerThread :: IO ()
893 startIOManagerThread = do
894 wakeup <- c_getIOManagerEvent
895 forkIO $ service_loop wakeup []
898 service_loop :: HANDLE -- read end of pipe
899 -> [DelayReq] -- current delay requests
902 service_loop wakeup old_delays = do
903 -- pick up new delay requests
904 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
905 let delays = foldr insertDelay old_delays new_delays
908 (delays', timeout) <- getDelay now delays
910 r <- c_WaitForSingleObject wakeup timeout
912 0xffffffff -> do c_maperrno; throwErrno "service_loop"
914 r <- c_readIOManagerEvent
917 _ | r == io_MANAGER_WAKEUP -> return False
918 _ | r == io_MANAGER_DIE -> return True
919 0 -> return False -- spurious wakeup
920 r -> do start_console_handler (r `shiftR` 1); return False
923 else service_cont wakeup delays'
925 _other -> service_cont wakeup delays' -- probably timeout
927 service_cont wakeup delays = do
928 atomicModifyIORef prodding (\_ -> (False,False))
929 service_loop wakeup delays
931 -- must agree with rts/win32/ThrIOManager.c
932 io_MANAGER_WAKEUP = 0xffffffff :: Word32
933 io_MANAGER_DIE = 0xfffffffe :: Word32
939 -- these are sent to Services only.
942 deriving (Eq, Ord, Enum, Show, Read, Typeable)
944 start_console_handler :: Word32 -> IO ()
945 start_console_handler r =
946 case toWin32ConsoleEvent r of
947 Just x -> withMVar win32ConsoleHandler $ \handler -> do
952 toWin32ConsoleEvent ev =
954 0 {- CTRL_C_EVENT-} -> Just ControlC
955 1 {- CTRL_BREAK_EVENT-} -> Just Break
956 2 {- CTRL_CLOSE_EVENT-} -> Just Close
957 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
958 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
961 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
962 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
964 stick :: IORef HANDLE
965 {-# NOINLINE stick #-}
966 stick = unsafePerformIO (newIORef nullPtr)
969 hdl <- readIORef stick
970 c_sendIOManagerEvent io_MANAGER_WAKEUP
972 -- Walk the queue of pending delays, waking up any that have passed
973 -- and return the smallest delay to wait for. The queue of pending
974 -- delays is kept ordered.
975 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
976 getDelay now [] = return ([], iNFINITE)
977 getDelay now all@(d : rest)
979 Delay time m | now >= time -> do
982 DelaySTM time t | now >= time -> do
983 atomically $ writeTVar t True
986 -- delay is in millisecs for WaitForSingleObject
987 let micro_seconds = delayTime d - now
988 milli_seconds = (micro_seconds + 999) `div` 1000
989 in return (all, fromIntegral milli_seconds)
991 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
992 -- available yet. We should move some Win32 functionality down here,
993 -- maybe as part of the grand reorganisation of the base package...
997 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
999 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1000 c_getIOManagerEvent :: IO HANDLE
1002 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1003 c_readIOManagerEvent :: IO Word32
1005 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1006 c_sendIOManagerEvent :: Word32 -> IO ()
1008 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1011 foreign import stdcall "WaitForSingleObject"
1012 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1015 -- ----------------------------------------------------------------------------
1016 -- Unix IO manager thread, using select()
1018 startIOManagerThread :: IO ()
1019 startIOManagerThread = do
1020 allocaArray 2 $ \fds -> do
1021 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1022 rd_end <- peekElemOff fds 0
1023 wr_end <- peekElemOff fds 1
1024 writeIORef stick (fromIntegral wr_end)
1025 c_setIOManagerPipe wr_end
1027 allocaBytes sizeofFdSet $ \readfds -> do
1028 allocaBytes sizeofFdSet $ \writefds -> do
1029 allocaBytes sizeofTimeVal $ \timeval -> do
1030 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1034 :: Fd -- listen to this for wakeup calls
1041 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1043 -- pick up new IO requests
1044 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1045 let reqs = new_reqs ++ old_reqs
1047 -- pick up new delay requests
1048 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1049 let delays = foldr insertDelay old_delays new_delays
1051 -- build the FDSets for select()
1054 fdSet wakeup readfds
1055 maxfd <- buildFdSets 0 readfds writefds reqs
1057 -- perform the select()
1058 let do_select delays = do
1059 -- check the current time and wake up any thread in
1060 -- threadDelay whose timeout has expired. Also find the
1061 -- timeout value for the select() call.
1063 (delays', timeout) <- getDelay now ptimeval delays
1065 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1071 _ | err == eINTR -> do_select delays'
1072 -- EINTR: just redo the select()
1073 _ | err == eBADF -> return (True, delays)
1074 -- EBADF: one of the file descriptors is closed or bad,
1075 -- we don't know which one, so wake everyone up.
1076 _ | otherwise -> throwErrno "select"
1077 -- otherwise (ENOMEM or EINVAL) something has gone
1078 -- wrong; report the error.
1080 return (False,delays')
1082 (wakeup_all,delays') <- do_select delays
1085 if wakeup_all then return False
1087 b <- fdIsSet wakeup readfds
1090 else alloca $ \p -> do
1091 c_read (fromIntegral wakeup) p 1; return ()
1094 _ | s == io_MANAGER_WAKEUP -> return False
1095 _ | s == io_MANAGER_DIE -> return True
1096 _ -> withMVar signalHandlerLock $ \_ -> do
1097 handler_tbl <- peek handlers
1098 sp <- peekElemOff handler_tbl (fromIntegral s)
1099 io <- deRefStablePtr sp
1103 if exit then return () else do
1105 atomicModifyIORef prodding (\_ -> (False,False))
1107 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1108 else completeRequests reqs readfds writefds []
1110 service_loop wakeup readfds writefds ptimeval reqs' delays'
1112 io_MANAGER_WAKEUP = 0xff :: CChar
1113 io_MANAGER_DIE = 0xfe :: CChar
1116 {-# NOINLINE stick #-}
1117 stick = unsafePerformIO (newIORef 0)
1119 wakeupIOManager :: IO ()
1120 wakeupIOManager = do
1121 fd <- readIORef stick
1122 with io_MANAGER_WAKEUP $ \pbuf -> do
1123 c_write (fromIntegral fd) pbuf 1; return ()
1125 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1126 -- this race condition is #1922, although that bug was on Windows a similar
1127 -- bug also exists on Unix.
1128 signalHandlerLock :: MVar ()
1129 signalHandlerLock = unsafePerformIO (newMVar ())
1131 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1133 foreign import ccall "setIOManagerPipe"
1134 c_setIOManagerPipe :: CInt -> IO ()
1136 -- -----------------------------------------------------------------------------
1139 buildFdSets maxfd readfds writefds [] = return maxfd
1140 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1141 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1144 buildFdSets (max maxfd fd) readfds writefds reqs
1145 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1146 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1149 buildFdSets (max maxfd fd) readfds writefds reqs
1151 completeRequests [] _ _ reqs' = return reqs'
1152 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1153 b <- fdIsSet fd readfds
1155 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1156 else completeRequests reqs readfds writefds (Read fd m : reqs')
1157 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1158 b <- fdIsSet fd writefds
1160 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1161 else completeRequests reqs readfds writefds (Write fd m : reqs')
1163 wakeupAll [] = return ()
1164 wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
1165 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1167 waitForReadEvent :: Fd -> IO ()
1168 waitForReadEvent fd = do
1170 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1174 waitForWriteEvent :: Fd -> IO ()
1175 waitForWriteEvent fd = do
1177 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1181 -- -----------------------------------------------------------------------------
1184 -- Walk the queue of pending delays, waking up any that have passed
1185 -- and return the smallest delay to wait for. The queue of pending
1186 -- delays is kept ordered.
1187 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1188 getDelay now ptimeval [] = return ([],nullPtr)
1189 getDelay now ptimeval all@(d : rest)
1191 Delay time m | now >= time -> do
1193 getDelay now ptimeval rest
1194 DelaySTM time t | now >= time -> do
1195 atomically $ writeTVar t True
1196 getDelay now ptimeval rest
1198 setTimevalTicks ptimeval (delayTime d - now)
1199 return (all,ptimeval)
1201 newtype CTimeVal = CTimeVal ()
1203 foreign import ccall unsafe "sizeofTimeVal"
1204 sizeofTimeVal :: Int
1206 foreign import ccall unsafe "setTimevalTicks"
1207 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1210 On Win32 we're going to have a single Pipe, and a
1211 waitForSingleObject with the delay time. For signals, we send a
1212 byte down the pipe just like on Unix.
1215 -- ----------------------------------------------------------------------------
1216 -- select() interface
1218 -- ToDo: move to System.Posix.Internals?
1220 newtype CFdSet = CFdSet ()
1222 foreign import ccall safe "select"
1223 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1226 foreign import ccall unsafe "hsFD_SETSIZE"
1227 c_fD_SETSIZE :: CInt
1230 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1232 foreign import ccall unsafe "hsFD_CLR"
1233 c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1235 fdClr :: Fd -> Ptr CFdSet -> IO ()
1236 fdClr (Fd fd) fdset = c_fdClr fd fdset
1238 foreign import ccall unsafe "hsFD_ISSET"
1239 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1241 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1242 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1244 foreign import ccall unsafe "hsFD_SET"
1245 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1247 fdSet :: Fd -> Ptr CFdSet -> IO ()
1248 fdSet (Fd fd) fdset = c_fdSet fd fdset
1250 foreign import ccall unsafe "hsFD_ZERO"
1251 fdZero :: Ptr CFdSet -> IO ()
1253 foreign import ccall unsafe "sizeof_fd_set"
1258 reportStackOverflow :: IO a
1259 reportStackOverflow = do callStackOverflowHook; return undefined
1261 reportError :: SomeException -> IO a
1263 handler <- getUncaughtExceptionHandler
1267 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1268 -- the unsafe below.
1269 foreign import ccall unsafe "stackOverflow"
1270 callStackOverflowHook :: IO ()
1272 {-# NOINLINE uncaughtExceptionHandler #-}
1273 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1274 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1276 defaultHandler :: SomeException -> IO ()
1277 defaultHandler se@(SomeException ex) = do
1278 (hFlush stdout) `catchAny` (\ _ -> return ())
1279 let msg = case cast ex of
1280 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1281 _ -> case cast ex of
1282 Just (ErrorCall s) -> s
1283 _ -> showsPrec 0 se ""
1284 withCString "%s" $ \cfmt ->
1285 withCString msg $ \cmsg ->
1286 errorBelch cfmt cmsg
1288 -- don't use errorBelch() directly, because we cannot call varargs functions
1290 foreign import ccall unsafe "HsBase.h errorBelch2"
1291 errorBelch :: CString -> CString -> IO ()
1293 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1294 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1296 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1297 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler