2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
7 -- Copyright : (c) The University of Glasgow, 1994-2002
8 -- License : see libraries/base/LICENSE
10 -- Maintainer : cvs-ghc@haskell.org
11 -- Stability : internal
12 -- Portability : non-portable (GHC extensions)
14 -- Basic concurrency stuff.
16 -----------------------------------------------------------------------------
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home. Hence:
29 -- * Forking and suchlike
30 , forkIO -- :: IO a -> IO ThreadId
31 , forkOnIO -- :: Int -> IO a -> IO ThreadId
32 , numCapabilities -- :: Int
33 , childHandler -- :: Exception -> IO ()
34 , myThreadId -- :: IO ThreadId
35 , killThread -- :: ThreadId -> IO ()
36 , throwTo -- :: ThreadId -> Exception -> IO ()
37 , par -- :: a -> b -> b
38 , pseq -- :: a -> b -> b
40 , labelThread -- :: ThreadId -> String -> IO ()
42 , ThreadStatus(..), BlockReason(..)
43 , threadStatus -- :: ThreadId -> IO ThreadStatus
46 , threadDelay -- :: Int -> IO ()
47 , registerDelay -- :: Int -> IO (TVar Bool)
48 , threadWaitRead -- :: Int -> IO ()
49 , threadWaitWrite -- :: Int -> IO ()
53 , newMVar -- :: a -> IO (MVar a)
54 , newEmptyMVar -- :: IO (MVar a)
55 , takeMVar -- :: MVar a -> IO a
56 , putMVar -- :: MVar a -> a -> IO ()
57 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
58 , tryPutMVar -- :: MVar a -> a -> IO Bool
59 , isEmptyMVar -- :: MVar a -> IO Bool
60 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
64 , atomically -- :: STM a -> IO a
66 , orElse -- :: STM a -> STM a -> STM a
67 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
68 , alwaysSucceeds -- :: STM a -> STM ()
69 , always -- :: STM Bool -> STM ()
71 , newTVar -- :: a -> STM (TVar a)
72 , newTVarIO -- :: a -> STM (TVar a)
73 , readTVar -- :: TVar a -> STM a
74 , writeTVar -- :: a -> TVar a -> STM ()
75 , unsafeIOToSTM -- :: IO a -> STM a
78 #ifdef mingw32_HOST_OS
79 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
83 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #ifndef mingw32_HOST_OS
91 , ensureIOManagerIsRunning
93 #ifdef mingw32_HOST_OS
98 , reportError, reportStackOverflow
101 import System.Posix.Types
102 #ifndef mingw32_HOST_OS
103 import System.Posix.Internals
112 import GHC.Num ( Num(..) )
113 import GHC.Real ( fromIntegral, div )
114 #ifndef mingw32_HOST_OS
115 import GHC.Base ( Int(..) )
117 #ifdef mingw32_HOST_OS
118 import GHC.Read ( Read )
119 import GHC.Enum ( Enum )
121 import GHC.Exception ( SomeException(..), throw )
122 import GHC.Pack ( packCString# )
123 import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
125 import GHC.Show ( Show(..), showString )
128 import Control.Exception hiding (throwTo)
130 infixr 0 `par`, `pseq`
133 %************************************************************************
135 \subsection{@ThreadId@, @par@, and @fork@}
137 %************************************************************************
140 data ThreadId = ThreadId ThreadId# deriving( Typeable )
141 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
142 -- But since ThreadId# is unlifted, the Weak type must use open
145 A 'ThreadId' is an abstract type representing a handle to a thread.
146 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
147 the 'Ord' instance implements an arbitrary total ordering over
148 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
149 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
150 useful when debugging or diagnosing the behaviour of a concurrent
153 /Note/: in GHC, if you have a 'ThreadId', you essentially have
154 a pointer to the thread itself. This means the thread itself can\'t be
155 garbage collected until you drop the 'ThreadId'.
156 This misfeature will hopefully be corrected at a later date.
158 /Note/: Hugs does not provide any operations on other threads;
159 it defines 'ThreadId' as a synonym for ().
162 instance Show ThreadId where
164 showString "ThreadId " .
165 showsPrec d (getThreadId (id2TSO t))
167 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
169 id2TSO :: ThreadId -> ThreadId#
170 id2TSO (ThreadId t) = t
172 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
175 cmpThread :: ThreadId -> ThreadId -> Ordering
177 case cmp_thread (id2TSO t1) (id2TSO t2) of
182 instance Eq ThreadId where
184 case t1 `cmpThread` t2 of
188 instance Ord ThreadId where
192 Sparks off a new thread to run the 'IO' computation passed as the
193 first argument, and returns the 'ThreadId' of the newly created
196 The new thread will be a lightweight thread; if you want to use a foreign
197 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
199 GHC note: the new thread inherits the /blocked/ state of the parent
200 (see 'Control.Exception.block').
202 forkIO :: IO () -> IO ThreadId
203 forkIO action = IO $ \ s ->
204 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
206 action_plus = catchException action childHandler
209 Like 'forkIO', but lets you specify on which CPU the thread is
210 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
211 will stay on the same CPU for its entire lifetime (`forkIO` threads
212 can migrate between CPUs according to the scheduling policy).
213 `forkOnIO` is useful for overriding the scheduling policy when you
214 know in advance how best to distribute the threads.
216 The `Int` argument specifies the CPU number; it is interpreted modulo
217 'numCapabilities' (note that it actually specifies a capability number
218 rather than a CPU number, but to a first approximation the two are
221 forkOnIO :: Int -> IO () -> IO ThreadId
222 forkOnIO (I# cpu) action = IO $ \ s ->
223 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
225 action_plus = catchException action childHandler
227 -- | the value passed to the @+RTS -N@ flag. This is the number of
228 -- Haskell threads that can run truly simultaneously at any given
229 -- time, and is typically set to the number of physical CPU cores on
231 numCapabilities :: Int
232 numCapabilities = unsafePerformIO $ do
233 n <- peek n_capabilities
234 return (fromIntegral n)
236 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
238 childHandler :: SomeException -> IO ()
239 childHandler err = catchException (real_handler err) childHandler
241 real_handler :: SomeException -> IO ()
242 real_handler se@(SomeException ex) =
243 -- ignore thread GC and killThread exceptions:
245 Just BlockedOnDeadMVar -> return ()
247 Just BlockedIndefinitely -> return ()
249 Just ThreadKilled -> return ()
251 -- report all others:
252 Just StackOverflow -> reportStackOverflow
255 {- | 'killThread' terminates the given thread (GHC only).
256 Any work already done by the thread isn\'t
257 lost: the computation is suspended until required by another thread.
258 The memory used by the thread will be garbage collected if it isn\'t
259 referenced from anywhere. The 'killThread' function is defined in
262 > killThread tid = throwTo tid (AsyncException ThreadKilled)
265 killThread :: ThreadId -> IO ()
266 killThread tid = throwTo tid (toException ThreadKilled)
268 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
270 'throwTo' does not return until the exception has been raised in the
272 The calling thread can thus be certain that the target
273 thread has received the exception. This is a useful property to know
274 when dealing with race conditions: eg. if there are two threads that
275 can kill each other, it is guaranteed that only one of the threads
276 will get to kill the other.
278 If the target thread is currently making a foreign call, then the
279 exception will not be raised (and hence 'throwTo' will not return)
280 until the call has completed. This is the case regardless of whether
281 the call is inside a 'block' or not.
283 Important note: the behaviour of 'throwTo' differs from that described in
284 the paper \"Asynchronous exceptions in Haskell\"
285 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
286 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
287 a more synchronous design in which 'throwTo' does not return until the exception
288 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
289 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
292 There is currently no guarantee that the exception delivered by 'throwTo' will be
293 delivered at the first possible opportunity. In particular, if a thread may
294 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
295 a pending 'throwTo'. This is arguably undesirable behaviour.
298 -- XXX This is duplicated in Control.{Old,}Exception
299 throwTo :: ThreadId -> SomeException -> IO ()
300 throwTo (ThreadId id) ex = IO $ \ s ->
301 case (killThread# id ex s) of s1 -> (# s1, () #)
303 -- | Returns the 'ThreadId' of the calling thread (GHC only).
304 myThreadId :: IO ThreadId
305 myThreadId = IO $ \s ->
306 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
309 -- |The 'yield' action allows (forces, in a co-operative multitasking
310 -- implementation) a context-switch to any other currently runnable
311 -- threads (if any), and is occasionally useful when implementing
312 -- concurrency abstractions.
315 case (yield# s) of s1 -> (# s1, () #)
317 {- | 'labelThread' stores a string as identifier for this thread if
318 you built a RTS with debugging support. This identifier will be used in
319 the debugging output to make distinction of different threads easier
320 (otherwise you only have the thread state object\'s address in the heap).
322 Other applications like the graphical Concurrent Haskell Debugger
323 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
324 'labelThread' for their purposes as well.
327 labelThread :: ThreadId -> String -> IO ()
328 labelThread (ThreadId t) str = IO $ \ s ->
329 let ps = packCString# str
330 adr = byteArrayContents# ps in
331 case (labelThread# t adr s) of s1 -> (# s1, () #)
333 -- Nota Bene: 'pseq' used to be 'seq'
334 -- but 'seq' is now defined in PrelGHC
336 -- "pseq" is defined a bit weirdly (see below)
338 -- The reason for the strange "lazy" call is that
339 -- it fools the compiler into thinking that pseq and par are non-strict in
340 -- their second argument (even if it inlines pseq at the call site).
341 -- If it thinks pseq is strict in "y", then it often evaluates
342 -- "y" before "x", which is totally wrong.
346 pseq x y = x `seq` lazy y
350 par x y = case (par# x) of { _ -> lazy y }
355 -- ^blocked on on 'MVar'
357 -- ^blocked on a computation in progress by another thread
359 -- ^blocked in 'throwTo'
361 -- ^blocked in 'retry' in an STM transaction
362 | BlockedOnForeignCall
363 -- ^currently in a foreign call
365 -- ^blocked on some other resource. Without @-threaded@,
366 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
367 -- they show up as 'BlockedOnMVar'.
368 deriving (Eq,Ord,Show)
370 -- | The current status of a thread
373 -- ^the thread is currently runnable or running
375 -- ^the thread has finished
376 | ThreadBlocked BlockReason
377 -- ^the thread is blocked on some resource
379 -- ^the thread received an uncaught exception
380 deriving (Eq,Ord,Show)
382 threadStatus :: ThreadId -> IO ThreadStatus
383 threadStatus (ThreadId t) = IO $ \s ->
384 case threadStatus# t s of
385 (# s', stat #) -> (# s', mk_stat (I# stat) #)
387 -- NB. keep these in sync with includes/Constants.h
388 mk_stat 0 = ThreadRunning
389 mk_stat 1 = ThreadBlocked BlockedOnMVar
390 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
391 mk_stat 3 = ThreadBlocked BlockedOnException
392 mk_stat 7 = ThreadBlocked BlockedOnSTM
393 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
394 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
395 mk_stat 16 = ThreadFinished
396 mk_stat 17 = ThreadDied
397 mk_stat _ = ThreadBlocked BlockedOnOther
401 %************************************************************************
403 \subsection[stm]{Transactional heap operations}
405 %************************************************************************
407 TVars are shared memory locations which support atomic memory
411 -- |A monad supporting atomic memory transactions.
412 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
414 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
417 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
419 instance Functor STM where
420 fmap f x = x >>= (return . f)
422 instance Monad STM where
423 {-# INLINE return #-}
427 return x = returnSTM x
428 m >>= k = bindSTM m k
430 bindSTM :: STM a -> (a -> STM b) -> STM b
431 bindSTM (STM m) k = STM ( \s ->
433 (# new_s, a #) -> unSTM (k a) new_s
436 thenSTM :: STM a -> STM b -> STM b
437 thenSTM (STM m) k = STM ( \s ->
439 (# new_s, a #) -> unSTM k new_s
442 returnSTM :: a -> STM a
443 returnSTM x = STM (\s -> (# s, x #))
445 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
446 -- dangerous thing to do.
448 -- * The STM implementation will often run transactions multiple
449 -- times, so you need to be prepared for this if your IO has any
452 -- * The STM implementation will abort transactions that are known to
453 -- be invalid and need to be restarted. This may happen in the middle
454 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
455 -- that need releasing (exception handlers are ignored when aborting
456 -- the transaction). That includes doing any IO using Handles, for
457 -- example. Getting this wrong will probably lead to random deadlocks.
459 -- * The transaction may have seen an inconsistent view of memory when
460 -- the IO runs. Invariants that you expect to be true throughout
461 -- your program may not be true inside a transaction, due to the
462 -- way transactions are implemented. Normally this wouldn't be visible
463 -- to the programmer, but using `unsafeIOToSTM` can expose it.
465 unsafeIOToSTM :: IO a -> STM a
466 unsafeIOToSTM (IO m) = STM m
468 -- |Perform a series of STM actions atomically.
470 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
471 -- Any attempt to do so will result in a runtime error. (Reason: allowing
472 -- this would effectively allow a transaction inside a transaction, depending
473 -- on exactly when the thunk is evaluated.)
475 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
476 -- and which allows top-level TVars to be allocated.
478 atomically :: STM a -> IO a
479 atomically (STM m) = IO (\s -> (atomically# m) s )
481 -- |Retry execution of the current memory transaction because it has seen
482 -- values in TVars which mean that it should not continue (e.g. the TVars
483 -- represent a shared buffer that is now empty). The implementation may
484 -- block the thread until one of the TVars that it has read from has been
485 -- udpated. (GHC only)
487 retry = STM $ \s# -> retry# s#
489 -- |Compose two alternative STM actions (GHC only). If the first action
490 -- completes without retrying then it forms the result of the orElse.
491 -- Otherwise, if the first action retries, then the second action is
492 -- tried in its place. If both actions retry then the orElse as a
494 orElse :: STM a -> STM a -> STM a
495 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
497 -- |Exception handling within STM actions.
498 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
499 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
501 -- | Low-level primitive on which always and alwaysSucceeds are built.
502 -- checkInv differs form these in that (i) the invariant is not
503 -- checked when checkInv is called, only at the end of this and
504 -- subsequent transcations, (ii) the invariant failure is indicated
505 -- by raising an exception.
506 checkInv :: STM a -> STM ()
507 checkInv (STM m) = STM (\s -> (check# m) s)
509 -- | alwaysSucceeds adds a new invariant that must be true when passed
510 -- to alwaysSucceeds, at the end of the current transaction, and at
511 -- the end of every subsequent transaction. If it fails at any
512 -- of those points then the transaction violating it is aborted
513 -- and the exception raised by the invariant is propagated.
514 alwaysSucceeds :: STM a -> STM ()
515 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
518 -- | always is a variant of alwaysSucceeds in which the invariant is
519 -- expressed as an STM Bool action that must return True. Returning
520 -- False or raising an exception are both treated as invariant failures.
521 always :: STM Bool -> STM ()
522 always i = alwaysSucceeds ( do v <- i
523 if (v) then return () else ( error "Transacional invariant violation" ) )
525 -- |Shared memory locations that support atomic memory transactions.
526 data TVar a = TVar (TVar# RealWorld a)
528 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
530 instance Eq (TVar a) where
531 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
533 -- |Create a new TVar holding a value supplied
534 newTVar :: a -> STM (TVar a)
535 newTVar val = STM $ \s1# ->
536 case newTVar# val s1# of
537 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
539 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
540 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
541 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
543 newTVarIO :: a -> IO (TVar a)
544 newTVarIO val = IO $ \s1# ->
545 case newTVar# val s1# of
546 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
548 -- |Return the current value stored in a TVar
549 readTVar :: TVar a -> STM a
550 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
552 -- |Write the supplied value into a TVar
553 writeTVar :: TVar a -> a -> STM ()
554 writeTVar (TVar tvar#) val = STM $ \s1# ->
555 case writeTVar# tvar# val s1# of
560 %************************************************************************
562 \subsection[mvars]{M-Structures}
564 %************************************************************************
566 M-Vars are rendezvous points for concurrent threads. They begin
567 empty, and any attempt to read an empty M-Var blocks. When an M-Var
568 is written, a single blocked thread may be freed. Reading an M-Var
569 toggles its state from full back to empty. Therefore, any value
570 written to an M-Var may only be read once. Multiple reads and writes
571 are allowed, but there must be at least one read between any two
575 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
577 -- |Create an 'MVar' which is initially empty.
578 newEmptyMVar :: IO (MVar a)
579 newEmptyMVar = IO $ \ s# ->
581 (# s2#, svar# #) -> (# s2#, MVar svar# #)
583 -- |Create an 'MVar' which contains the supplied value.
584 newMVar :: a -> IO (MVar a)
586 newEmptyMVar >>= \ mvar ->
587 putMVar mvar value >>
590 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
591 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
592 -- the 'MVar' is left empty.
594 -- There are two further important properties of 'takeMVar':
596 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
597 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
598 -- only one thread will be woken up. The runtime guarantees that
599 -- the woken thread completes its 'takeMVar' operation.
601 -- * When multiple threads are blocked on an 'MVar', they are
602 -- woken up in FIFO order. This is useful for providing
603 -- fairness properties of abstractions built using 'MVar's.
605 takeMVar :: MVar a -> IO a
606 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
608 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
609 -- 'putMVar' will wait until it becomes empty.
611 -- There are two further important properties of 'putMVar':
613 -- * 'putMVar' is single-wakeup. That is, if there are multiple
614 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
615 -- only one thread will be woken up. The runtime guarantees that
616 -- the woken thread completes its 'putMVar' operation.
618 -- * When multiple threads are blocked on an 'MVar', they are
619 -- woken up in FIFO order. This is useful for providing
620 -- fairness properties of abstractions built using 'MVar's.
622 putMVar :: MVar a -> a -> IO ()
623 putMVar (MVar mvar#) x = IO $ \ s# ->
624 case putMVar# mvar# x s# of
627 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
628 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
629 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
630 -- the 'MVar' is left empty.
631 tryTakeMVar :: MVar a -> IO (Maybe a)
632 tryTakeMVar (MVar m) = IO $ \ s ->
633 case tryTakeMVar# m s of
634 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
635 (# s, _, a #) -> (# s, Just a #) -- MVar is full
637 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
638 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
639 -- it was successful, or 'False' otherwise.
640 tryPutMVar :: MVar a -> a -> IO Bool
641 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
642 case tryPutMVar# mvar# x s# of
643 (# s, 0# #) -> (# s, False #)
644 (# s, _ #) -> (# s, True #)
646 -- |Check whether a given 'MVar' is empty.
648 -- Notice that the boolean value returned is just a snapshot of
649 -- the state of the MVar. By the time you get to react on its result,
650 -- the MVar may have been filled (or emptied) - so be extremely
651 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
652 isEmptyMVar :: MVar a -> IO Bool
653 isEmptyMVar (MVar mv#) = IO $ \ s# ->
654 case isEmptyMVar# mv# s# of
655 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
657 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
658 -- "System.Mem.Weak" for more about finalizers.
659 addMVarFinalizer :: MVar a -> IO () -> IO ()
660 addMVarFinalizer (MVar m) finalizer =
661 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
663 withMVar :: MVar a -> (a -> IO b) -> IO b
667 b <- catchAny (unblock (io a))
668 (\e -> do putMVar m a; throw e)
674 %************************************************************************
676 \subsection{Thread waiting}
678 %************************************************************************
681 #ifdef mingw32_HOST_OS
683 -- Note: threadWaitRead and threadWaitWrite aren't really functional
684 -- on Win32, but left in there because lib code (still) uses them (the manner
685 -- in which they're used doesn't cause problems on a Win32 platform though.)
687 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
688 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
689 IO $ \s -> case asyncRead# fd isSock len buf s of
690 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
692 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
693 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
694 IO $ \s -> case asyncWrite# fd isSock len buf s of
695 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
697 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
698 asyncDoProc (FunPtr proc) (Ptr param) =
699 -- the 'length' value is ignored; simplifies implementation of
700 -- the async*# primops to have them all return the same result.
701 IO $ \s -> case asyncDoProc# proc param s of
702 (# s, len#, err# #) -> (# s, I# err# #)
704 -- to aid the use of these primops by the IO Handle implementation,
705 -- provide the following convenience funs:
707 -- this better be a pinned byte array!
708 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
709 asyncReadBA fd isSock len off bufB =
710 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
712 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
713 asyncWriteBA fd isSock len off bufB =
714 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
718 -- -----------------------------------------------------------------------------
721 -- | Block the current thread until data is available to read on the
722 -- given file descriptor (GHC only).
723 threadWaitRead :: Fd -> IO ()
725 #ifndef mingw32_HOST_OS
726 | threaded = waitForReadEvent fd
728 | otherwise = IO $ \s ->
729 case fromIntegral fd of { I# fd# ->
730 case waitRead# fd# s of { s -> (# s, () #)
733 -- | Block the current thread until data can be written to the
734 -- given file descriptor (GHC only).
735 threadWaitWrite :: Fd -> IO ()
737 #ifndef mingw32_HOST_OS
738 | threaded = waitForWriteEvent fd
740 | otherwise = IO $ \s ->
741 case fromIntegral fd of { I# fd# ->
742 case waitWrite# fd# s of { s -> (# s, () #)
745 -- | Suspends the current thread for a given number of microseconds
748 -- There is no guarantee that the thread will be rescheduled promptly
749 -- when the delay has expired, but the thread will never continue to
750 -- run /earlier/ than specified.
752 threadDelay :: Int -> IO ()
754 | threaded = waitForDelayEvent time
755 | otherwise = IO $ \s ->
756 case fromIntegral time of { I# time# ->
757 case delay# time# s of { s -> (# s, () #)
761 -- | Set the value of returned TVar to True after a given number of
762 -- microseconds. The caveats associated with threadDelay also apply.
764 registerDelay :: Int -> IO (TVar Bool)
766 | threaded = waitForDelayEventSTM usecs
767 | otherwise = error "registerDelay: requires -threaded"
769 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
771 waitForDelayEvent :: Int -> IO ()
772 waitForDelayEvent usecs = do
774 target <- calculateTarget usecs
775 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
779 -- Delays for use in STM
780 waitForDelayEventSTM :: Int -> IO (TVar Bool)
781 waitForDelayEventSTM usecs = do
782 t <- atomically $ newTVar False
783 target <- calculateTarget usecs
784 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
788 calculateTarget :: Int -> IO USecs
789 calculateTarget usecs = do
791 return $ now + (fromIntegral usecs)
794 -- ----------------------------------------------------------------------------
795 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
797 -- In the threaded RTS, we employ a single IO Manager thread to wait
798 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
799 -- and delays (threadDelay).
801 -- We can do this because in the threaded RTS the IO Manager can make
802 -- a non-blocking call to select(), so we don't have to do select() in
803 -- the scheduler as we have to in the non-threaded RTS. We get performance
804 -- benefits from doing it this way, because we only have to restart the select()
805 -- when a new request arrives, rather than doing one select() each time
806 -- around the scheduler loop. Furthermore, the scheduler can be simplified
807 -- by not having to check for completed IO requests.
809 -- Issues, possible problems:
811 -- - we might want bound threads to just do the blocking
812 -- operation rather than communicating with the IO manager
813 -- thread. This would prevent simgle-threaded programs which do
814 -- IO from requiring multiple OS threads. However, it would also
815 -- prevent bound threads waiting on IO from being killed or sent
818 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
819 -- I couldn't repeat this.
821 -- - How do we handle signal delivery in the multithreaded RTS?
823 -- - forkProcess will kill the IO manager thread. Let's just
824 -- hope we don't need to do any blocking IO between fork & exec.
826 #ifndef mingw32_HOST_OS
828 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
829 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
833 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
834 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
836 #ifndef mingw32_HOST_OS
837 pendingEvents :: IORef [IOReq]
839 pendingDelays :: IORef [DelayReq]
840 -- could use a strict list or array here
841 {-# NOINLINE pendingEvents #-}
842 {-# NOINLINE pendingDelays #-}
843 (pendingEvents,pendingDelays) = unsafePerformIO $ do
848 -- the first time we schedule an IO request, the service thread
849 -- will be created (cool, huh?)
851 ensureIOManagerIsRunning :: IO ()
852 ensureIOManagerIsRunning
853 | threaded = seq pendingEvents $ return ()
854 | otherwise = return ()
856 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
857 insertDelay d [] = [d]
858 insertDelay d1 ds@(d2 : rest)
859 | delayTime d1 <= delayTime d2 = d1 : ds
860 | otherwise = d2 : insertDelay d1 rest
862 delayTime :: DelayReq -> USecs
863 delayTime (Delay t _) = t
864 delayTime (DelaySTM t _) = t
868 -- XXX: move into GHC.IOBase from Data.IORef?
869 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
870 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
872 foreign import ccall unsafe "getUSecOfDay"
873 getUSecOfDay :: IO USecs
875 prodding :: IORef Bool
876 {-# NOINLINE prodding #-}
877 prodding = unsafePerformIO (newIORef False)
879 prodServiceThread :: IO ()
880 prodServiceThread = do
881 was_set <- atomicModifyIORef prodding (\a -> (True,a))
882 if (not (was_set)) then wakeupIOManager else return ()
884 #ifdef mingw32_HOST_OS
885 -- ----------------------------------------------------------------------------
886 -- Windows IO manager thread
888 startIOManagerThread :: IO ()
889 startIOManagerThread = do
890 wakeup <- c_getIOManagerEvent
891 forkIO $ service_loop wakeup []
894 service_loop :: HANDLE -- read end of pipe
895 -> [DelayReq] -- current delay requests
898 service_loop wakeup old_delays = do
899 -- pick up new delay requests
900 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
901 let delays = foldr insertDelay old_delays new_delays
904 (delays', timeout) <- getDelay now delays
906 r <- c_WaitForSingleObject wakeup timeout
908 0xffffffff -> do c_maperrno; throwErrno "service_loop"
910 r <- c_readIOManagerEvent
913 _ | r == io_MANAGER_WAKEUP -> return False
914 _ | r == io_MANAGER_DIE -> return True
915 0 -> return False -- spurious wakeup
916 r -> do start_console_handler (r `shiftR` 1); return False
919 else service_cont wakeup delays'
921 _other -> service_cont wakeup delays' -- probably timeout
923 service_cont wakeup delays = do
924 atomicModifyIORef prodding (\_ -> (False,False))
925 service_loop wakeup delays
927 -- must agree with rts/win32/ThrIOManager.c
928 io_MANAGER_WAKEUP = 0xffffffff :: Word32
929 io_MANAGER_DIE = 0xfffffffe :: Word32
935 -- these are sent to Services only.
938 deriving (Eq, Ord, Enum, Show, Read, Typeable)
940 start_console_handler :: Word32 -> IO ()
941 start_console_handler r =
942 case toWin32ConsoleEvent r of
943 Just x -> withMVar win32ConsoleHandler $ \handler -> do
948 toWin32ConsoleEvent ev =
950 0 {- CTRL_C_EVENT-} -> Just ControlC
951 1 {- CTRL_BREAK_EVENT-} -> Just Break
952 2 {- CTRL_CLOSE_EVENT-} -> Just Close
953 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
954 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
957 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
958 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
960 stick :: IORef HANDLE
961 {-# NOINLINE stick #-}
962 stick = unsafePerformIO (newIORef nullPtr)
965 hdl <- readIORef stick
966 c_sendIOManagerEvent io_MANAGER_WAKEUP
968 -- Walk the queue of pending delays, waking up any that have passed
969 -- and return the smallest delay to wait for. The queue of pending
970 -- delays is kept ordered.
971 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
972 getDelay now [] = return ([], iNFINITE)
973 getDelay now all@(d : rest)
975 Delay time m | now >= time -> do
978 DelaySTM time t | now >= time -> do
979 atomically $ writeTVar t True
982 -- delay is in millisecs for WaitForSingleObject
983 let micro_seconds = delayTime d - now
984 milli_seconds = (micro_seconds + 999) `div` 1000
985 in return (all, fromIntegral milli_seconds)
987 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
988 -- available yet. We should move some Win32 functionality down here,
989 -- maybe as part of the grand reorganisation of the base package...
993 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
995 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
996 c_getIOManagerEvent :: IO HANDLE
998 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
999 c_readIOManagerEvent :: IO Word32
1001 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1002 c_sendIOManagerEvent :: Word32 -> IO ()
1004 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1007 foreign import stdcall "WaitForSingleObject"
1008 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1011 -- ----------------------------------------------------------------------------
1012 -- Unix IO manager thread, using select()
1014 startIOManagerThread :: IO ()
1015 startIOManagerThread = do
1016 allocaArray 2 $ \fds -> do
1017 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1018 rd_end <- peekElemOff fds 0
1019 wr_end <- peekElemOff fds 1
1020 writeIORef stick (fromIntegral wr_end)
1021 c_setIOManagerPipe wr_end
1023 allocaBytes sizeofFdSet $ \readfds -> do
1024 allocaBytes sizeofFdSet $ \writefds -> do
1025 allocaBytes sizeofTimeVal $ \timeval -> do
1026 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1030 :: Fd -- listen to this for wakeup calls
1037 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1039 -- pick up new IO requests
1040 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1041 let reqs = new_reqs ++ old_reqs
1043 -- pick up new delay requests
1044 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1045 let delays = foldr insertDelay old_delays new_delays
1047 -- build the FDSets for select()
1050 fdSet wakeup readfds
1051 maxfd <- buildFdSets 0 readfds writefds reqs
1053 -- perform the select()
1054 let do_select delays = do
1055 -- check the current time and wake up any thread in
1056 -- threadDelay whose timeout has expired. Also find the
1057 -- timeout value for the select() call.
1059 (delays', timeout) <- getDelay now ptimeval delays
1061 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1067 _ | err == eINTR -> do_select delays'
1068 -- EINTR: just redo the select()
1069 _ | err == eBADF -> return (True, delays)
1070 -- EBADF: one of the file descriptors is closed or bad,
1071 -- we don't know which one, so wake everyone up.
1072 _ | otherwise -> throwErrno "select"
1073 -- otherwise (ENOMEM or EINVAL) something has gone
1074 -- wrong; report the error.
1076 return (False,delays')
1078 (wakeup_all,delays') <- do_select delays
1081 if wakeup_all then return False
1083 b <- fdIsSet wakeup readfds
1086 else alloca $ \p -> do
1087 c_read (fromIntegral wakeup) p 1; return ()
1090 _ | s == io_MANAGER_WAKEUP -> return False
1091 _ | s == io_MANAGER_DIE -> return True
1092 _ -> withMVar signalHandlerLock $ \_ -> do
1093 handler_tbl <- peek handlers
1094 sp <- peekElemOff handler_tbl (fromIntegral s)
1095 io <- deRefStablePtr sp
1099 if exit then return () else do
1101 atomicModifyIORef prodding (\_ -> (False,False))
1103 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1104 else completeRequests reqs readfds writefds []
1106 service_loop wakeup readfds writefds ptimeval reqs' delays'
1108 io_MANAGER_WAKEUP = 0xff :: CChar
1109 io_MANAGER_DIE = 0xfe :: CChar
1112 {-# NOINLINE stick #-}
1113 stick = unsafePerformIO (newIORef 0)
1115 wakeupIOManager :: IO ()
1116 wakeupIOManager = do
1117 fd <- readIORef stick
1118 with io_MANAGER_WAKEUP $ \pbuf -> do
1119 c_write (fromIntegral fd) pbuf 1; return ()
1121 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1122 -- this race condition is #1922, although that bug was on Windows a similar
1123 -- bug also exists on Unix.
1124 signalHandlerLock :: MVar ()
1125 signalHandlerLock = unsafePerformIO (newMVar ())
1127 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1129 foreign import ccall "setIOManagerPipe"
1130 c_setIOManagerPipe :: CInt -> IO ()
1132 -- -----------------------------------------------------------------------------
1135 buildFdSets maxfd readfds writefds [] = return maxfd
1136 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1137 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1140 buildFdSets (max maxfd fd) readfds writefds reqs
1141 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1142 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1145 buildFdSets (max maxfd fd) readfds writefds reqs
1147 completeRequests [] _ _ reqs' = return reqs'
1148 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1149 b <- fdIsSet fd readfds
1151 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1152 else completeRequests reqs readfds writefds (Read fd m : reqs')
1153 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1154 b <- fdIsSet fd writefds
1156 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1157 else completeRequests reqs readfds writefds (Write fd m : reqs')
1159 wakeupAll [] = return ()
1160 wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
1161 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1163 waitForReadEvent :: Fd -> IO ()
1164 waitForReadEvent fd = do
1166 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1170 waitForWriteEvent :: Fd -> IO ()
1171 waitForWriteEvent fd = do
1173 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1177 -- -----------------------------------------------------------------------------
1180 -- Walk the queue of pending delays, waking up any that have passed
1181 -- and return the smallest delay to wait for. The queue of pending
1182 -- delays is kept ordered.
1183 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1184 getDelay now ptimeval [] = return ([],nullPtr)
1185 getDelay now ptimeval all@(d : rest)
1187 Delay time m | now >= time -> do
1189 getDelay now ptimeval rest
1190 DelaySTM time t | now >= time -> do
1191 atomically $ writeTVar t True
1192 getDelay now ptimeval rest
1194 setTimevalTicks ptimeval (delayTime d - now)
1195 return (all,ptimeval)
1197 newtype CTimeVal = CTimeVal ()
1199 foreign import ccall unsafe "sizeofTimeVal"
1200 sizeofTimeVal :: Int
1202 foreign import ccall unsafe "setTimevalTicks"
1203 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1206 On Win32 we're going to have a single Pipe, and a
1207 waitForSingleObject with the delay time. For signals, we send a
1208 byte down the pipe just like on Unix.
1211 -- ----------------------------------------------------------------------------
1212 -- select() interface
1214 -- ToDo: move to System.Posix.Internals?
1216 newtype CFdSet = CFdSet ()
1218 foreign import ccall safe "select"
1219 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1222 foreign import ccall unsafe "hsFD_SETSIZE"
1223 c_fD_SETSIZE :: CInt
1226 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1228 foreign import ccall unsafe "hsFD_CLR"
1229 c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1231 fdClr :: Fd -> Ptr CFdSet -> IO ()
1232 fdClr (Fd fd) fdset = c_fdClr fd fdset
1234 foreign import ccall unsafe "hsFD_ISSET"
1235 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1237 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1238 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1240 foreign import ccall unsafe "hsFD_SET"
1241 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1243 fdSet :: Fd -> Ptr CFdSet -> IO ()
1244 fdSet (Fd fd) fdset = c_fdSet fd fdset
1246 foreign import ccall unsafe "hsFD_ZERO"
1247 fdZero :: Ptr CFdSet -> IO ()
1249 foreign import ccall unsafe "sizeof_fd_set"
1254 reportStackOverflow :: IO a
1255 reportStackOverflow = do callStackOverflowHook; return undefined
1257 reportError :: SomeException -> IO a
1259 handler <- getUncaughtExceptionHandler
1263 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1264 -- the unsafe below.
1265 foreign import ccall unsafe "stackOverflow"
1266 callStackOverflowHook :: IO ()