2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
7 -- Copyright : (c) The University of Glasgow, 1994-2002
8 -- License : see libraries/base/LICENSE
10 -- Maintainer : cvs-ghc@haskell.org
11 -- Stability : internal
12 -- Portability : non-portable (GHC extensions)
14 -- Basic concurrency stuff.
16 -----------------------------------------------------------------------------
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home. Hence:
29 -- * Forking and suchlike
30 , forkIO -- :: IO a -> IO ThreadId
31 , forkOnIO -- :: Int -> IO a -> IO ThreadId
32 , numCapabilities -- :: Int
33 , childHandler -- :: Exception -> IO ()
34 , myThreadId -- :: IO ThreadId
35 , killThread -- :: ThreadId -> IO ()
36 , throwTo -- :: ThreadId -> Exception -> IO ()
37 , par -- :: a -> b -> b
38 , pseq -- :: a -> b -> b
40 , labelThread -- :: ThreadId -> String -> IO ()
42 , ThreadStatus(..), BlockReason(..)
43 , threadStatus -- :: ThreadId -> IO ThreadStatus
46 , threadDelay -- :: Int -> IO ()
47 , registerDelay -- :: Int -> IO (TVar Bool)
48 , threadWaitRead -- :: Int -> IO ()
49 , threadWaitWrite -- :: Int -> IO ()
53 , newMVar -- :: a -> IO (MVar a)
54 , newEmptyMVar -- :: IO (MVar a)
55 , takeMVar -- :: MVar a -> IO a
56 , putMVar -- :: MVar a -> a -> IO ()
57 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
58 , tryPutMVar -- :: MVar a -> a -> IO Bool
59 , isEmptyMVar -- :: MVar a -> IO Bool
60 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
64 , atomically -- :: STM a -> IO a
66 , orElse -- :: STM a -> STM a -> STM a
67 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
68 , alwaysSucceeds -- :: STM a -> STM ()
69 , always -- :: STM Bool -> STM ()
71 , newTVar -- :: a -> STM (TVar a)
72 , newTVarIO -- :: a -> STM (TVar a)
73 , readTVar -- :: TVar a -> STM a
74 , writeTVar -- :: a -> TVar a -> STM ()
75 , unsafeIOToSTM -- :: IO a -> STM a
78 #ifdef mingw32_HOST_OS
79 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
83 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #ifndef mingw32_HOST_OS
91 , ensureIOManagerIsRunning
93 #ifdef mingw32_HOST_OS
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
108 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
115 import GHC.Num ( Num(..) )
116 import GHC.Real ( fromIntegral, div )
117 #ifndef mingw32_HOST_OS
118 import GHC.Base ( Int(..) )
120 #ifdef mingw32_HOST_OS
121 import GHC.Read ( Read )
122 import GHC.Enum ( Enum )
125 import GHC.Pack ( packCString# )
126 import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
128 import GHC.Show ( Show(..), showString )
131 infixr 0 `par`, `pseq`
134 %************************************************************************
136 \subsection{@ThreadId@, @par@, and @fork@}
138 %************************************************************************
141 data ThreadId = ThreadId ThreadId# deriving( Typeable )
142 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
143 -- But since ThreadId# is unlifted, the Weak type must use open
146 A 'ThreadId' is an abstract type representing a handle to a thread.
147 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
148 the 'Ord' instance implements an arbitrary total ordering over
149 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
150 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
151 useful when debugging or diagnosing the behaviour of a concurrent
154 /Note/: in GHC, if you have a 'ThreadId', you essentially have
155 a pointer to the thread itself. This means the thread itself can\'t be
156 garbage collected until you drop the 'ThreadId'.
157 This misfeature will hopefully be corrected at a later date.
159 /Note/: Hugs does not provide any operations on other threads;
160 it defines 'ThreadId' as a synonym for ().
163 instance Show ThreadId where
165 showString "ThreadId " .
166 showsPrec d (getThreadId (id2TSO t))
168 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
170 id2TSO :: ThreadId -> ThreadId#
171 id2TSO (ThreadId t) = t
173 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
176 cmpThread :: ThreadId -> ThreadId -> Ordering
178 case cmp_thread (id2TSO t1) (id2TSO t2) of
183 instance Eq ThreadId where
185 case t1 `cmpThread` t2 of
189 instance Ord ThreadId where
193 Sparks off a new thread to run the 'IO' computation passed as the
194 first argument, and returns the 'ThreadId' of the newly created
197 The new thread will be a lightweight thread; if you want to use a foreign
198 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
200 GHC note: the new thread inherits the /blocked/ state of the parent
201 (see 'Control.Exception.block').
203 forkIO :: IO () -> IO ThreadId
204 forkIO action = IO $ \ s ->
205 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
207 action_plus = catchException action childHandler
210 Like 'forkIO', but lets you specify on which CPU the thread is
211 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
212 will stay on the same CPU for its entire lifetime (`forkIO` threads
213 can migrate between CPUs according to the scheduling policy).
214 `forkOnIO` is useful for overriding the scheduling policy when you
215 know in advance how best to distribute the threads.
217 The `Int` argument specifies the CPU number; it is interpreted modulo
218 'numCapabilities' (note that it actually specifies a capability number
219 rather than a CPU number, but to a first approximation the two are
222 forkOnIO :: Int -> IO () -> IO ThreadId
223 forkOnIO (I# cpu) action = IO $ \ s ->
224 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
226 action_plus = catchException action childHandler
228 -- | the value passed to the @+RTS -N@ flag. This is the number of
229 -- Haskell threads that can run truly simultaneously at any given
230 -- time, and is typically set to the number of physical CPU cores on
232 numCapabilities :: Int
233 numCapabilities = unsafePerformIO $ do
234 n <- peek n_capabilities
235 return (fromIntegral n)
237 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
239 childHandler :: Exception -> IO ()
240 childHandler err = catchException (real_handler err) childHandler
242 real_handler :: Exception -> IO ()
245 -- ignore thread GC and killThread exceptions:
246 BlockedOnDeadMVar -> return ()
247 BlockedIndefinitely -> return ()
248 AsyncException ThreadKilled -> return ()
250 -- report all others:
251 AsyncException StackOverflow -> reportStackOverflow
252 other -> reportError other
254 {- | 'killThread' terminates the given thread (GHC only).
255 Any work already done by the thread isn\'t
256 lost: the computation is suspended until required by another thread.
257 The memory used by the thread will be garbage collected if it isn\'t
258 referenced from anywhere. The 'killThread' function is defined in
261 > killThread tid = throwTo tid (AsyncException ThreadKilled)
264 killThread :: ThreadId -> IO ()
265 killThread tid = throwTo tid (AsyncException ThreadKilled)
267 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
269 'throwTo' does not return until the exception has been raised in the
271 The calling thread can thus be certain that the target
272 thread has received the exception. This is a useful property to know
273 when dealing with race conditions: eg. if there are two threads that
274 can kill each other, it is guaranteed that only one of the threads
275 will get to kill the other.
277 If the target thread is currently making a foreign call, then the
278 exception will not be raised (and hence 'throwTo' will not return)
279 until the call has completed. This is the case regardless of whether
280 the call is inside a 'block' or not.
282 Important note: the behaviour of 'throwTo' differs from that described in
283 the paper \"Asynchronous exceptions in Haskell\"
284 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
285 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
286 a more synchronous design in which 'throwTo' does not return until the exception
287 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
288 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
291 There is currently no guarantee that the exception delivered by 'throwTo' will be
292 delivered at the first possible opportunity. In particular, if a thread may
293 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
294 a pending 'throwTo'. This is arguably undesirable behaviour.
297 throwTo :: ThreadId -> Exception -> IO ()
298 throwTo (ThreadId id) ex = IO $ \ s ->
299 case (killThread# id ex s) of s1 -> (# s1, () #)
301 -- | Returns the 'ThreadId' of the calling thread (GHC only).
302 myThreadId :: IO ThreadId
303 myThreadId = IO $ \s ->
304 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
307 -- |The 'yield' action allows (forces, in a co-operative multitasking
308 -- implementation) a context-switch to any other currently runnable
309 -- threads (if any), and is occasionally useful when implementing
310 -- concurrency abstractions.
313 case (yield# s) of s1 -> (# s1, () #)
315 {- | 'labelThread' stores a string as identifier for this thread if
316 you built a RTS with debugging support. This identifier will be used in
317 the debugging output to make distinction of different threads easier
318 (otherwise you only have the thread state object\'s address in the heap).
320 Other applications like the graphical Concurrent Haskell Debugger
321 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
322 'labelThread' for their purposes as well.
325 labelThread :: ThreadId -> String -> IO ()
326 labelThread (ThreadId t) str = IO $ \ s ->
327 let ps = packCString# str
328 adr = byteArrayContents# ps in
329 case (labelThread# t adr s) of s1 -> (# s1, () #)
331 -- Nota Bene: 'pseq' used to be 'seq'
332 -- but 'seq' is now defined in PrelGHC
334 -- "pseq" is defined a bit weirdly (see below)
336 -- The reason for the strange "lazy" call is that
337 -- it fools the compiler into thinking that pseq and par are non-strict in
338 -- their second argument (even if it inlines pseq at the call site).
339 -- If it thinks pseq is strict in "y", then it often evaluates
340 -- "y" before "x", which is totally wrong.
344 pseq x y = x `seq` lazy y
348 par x y = case (par# x) of { _ -> lazy y }
353 -- ^blocked on on 'MVar'
355 -- ^blocked on a computation in progress by another thread
357 -- ^blocked in 'throwTo'
359 -- ^blocked in 'retry' in an STM transaction
360 | BlockedOnForeignCall
361 -- ^currently in a foreign call
363 -- ^blocked on some other resource. Without @-threaded@,
364 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
365 -- they show up as 'BlockedOnMVar'.
366 deriving (Eq,Ord,Show)
368 -- | The current status of a thread
371 -- ^the thread is currently runnable or running
373 -- ^the thread has finished
374 | ThreadBlocked BlockReason
375 -- ^the thread is blocked on some resource
377 -- ^the thread received an uncaught exception
378 deriving (Eq,Ord,Show)
380 threadStatus :: ThreadId -> IO ThreadStatus
381 threadStatus (ThreadId t) = IO $ \s ->
382 case threadStatus# t s of
383 (# s', stat #) -> (# s', mk_stat (I# stat) #)
385 -- NB. keep these in sync with includes/Constants.h
386 mk_stat 0 = ThreadRunning
387 mk_stat 1 = ThreadBlocked BlockedOnMVar
388 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
389 mk_stat 3 = ThreadBlocked BlockedOnException
390 mk_stat 7 = ThreadBlocked BlockedOnSTM
391 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
392 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
393 mk_stat 16 = ThreadFinished
394 mk_stat 17 = ThreadDied
395 mk_stat _ = ThreadBlocked BlockedOnOther
399 %************************************************************************
401 \subsection[stm]{Transactional heap operations}
403 %************************************************************************
405 TVars are shared memory locations which support atomic memory
409 -- |A monad supporting atomic memory transactions.
410 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
412 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
415 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
417 instance Functor STM where
418 fmap f x = x >>= (return . f)
420 instance Monad STM where
421 {-# INLINE return #-}
425 return x = returnSTM x
426 m >>= k = bindSTM m k
428 bindSTM :: STM a -> (a -> STM b) -> STM b
429 bindSTM (STM m) k = STM ( \s ->
431 (# new_s, a #) -> unSTM (k a) new_s
434 thenSTM :: STM a -> STM b -> STM b
435 thenSTM (STM m) k = STM ( \s ->
437 (# new_s, a #) -> unSTM k new_s
440 returnSTM :: a -> STM a
441 returnSTM x = STM (\s -> (# s, x #))
443 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
444 -- dangerous thing to do.
446 -- * The STM implementation will often run transactions multiple
447 -- times, so you need to be prepared for this if your IO has any
450 -- * The STM implementation will abort transactions that are known to
451 -- be invalid and need to be restarted. This may happen in the middle
452 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
453 -- that need releasing (exception handlers are ignored when aborting
454 -- the transaction). That includes doing any IO using Handles, for
455 -- example. Getting this wrong will probably lead to random deadlocks.
457 -- * The transaction may have seen an inconsistent view of memory when
458 -- the IO runs. Invariants that you expect to be true throughout
459 -- your program may not be true inside a transaction, due to the
460 -- way transactions are implemented. Normally this wouldn't be visible
461 -- to the programmer, but using `unsafeIOToSTM` can expose it.
463 unsafeIOToSTM :: IO a -> STM a
464 unsafeIOToSTM (IO m) = STM m
466 -- |Perform a series of STM actions atomically.
468 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
469 -- Any attempt to do so will result in a runtime error. (Reason: allowing
470 -- this would effectively allow a transaction inside a transaction, depending
471 -- on exactly when the thunk is evaluated.)
473 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
474 -- and which allows top-level TVars to be allocated.
476 atomically :: STM a -> IO a
477 atomically (STM m) = IO (\s -> (atomically# m) s )
479 -- |Retry execution of the current memory transaction because it has seen
480 -- values in TVars which mean that it should not continue (e.g. the TVars
481 -- represent a shared buffer that is now empty). The implementation may
482 -- block the thread until one of the TVars that it has read from has been
483 -- udpated. (GHC only)
485 retry = STM $ \s# -> retry# s#
487 -- |Compose two alternative STM actions (GHC only). If the first action
488 -- completes without retrying then it forms the result of the orElse.
489 -- Otherwise, if the first action retries, then the second action is
490 -- tried in its place. If both actions retry then the orElse as a
492 orElse :: STM a -> STM a -> STM a
493 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
495 -- |Exception handling within STM actions.
496 catchSTM :: STM a -> (Exception -> STM a) -> STM a
497 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
499 -- | Low-level primitive on which always and alwaysSucceeds are built.
500 -- checkInv differs form these in that (i) the invariant is not
501 -- checked when checkInv is called, only at the end of this and
502 -- subsequent transcations, (ii) the invariant failure is indicated
503 -- by raising an exception.
504 checkInv :: STM a -> STM ()
505 checkInv (STM m) = STM (\s -> (check# m) s)
507 -- | alwaysSucceeds adds a new invariant that must be true when passed
508 -- to alwaysSucceeds, at the end of the current transaction, and at
509 -- the end of every subsequent transaction. If it fails at any
510 -- of those points then the transaction violating it is aborted
511 -- and the exception raised by the invariant is propagated.
512 alwaysSucceeds :: STM a -> STM ()
513 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
516 -- | always is a variant of alwaysSucceeds in which the invariant is
517 -- expressed as an STM Bool action that must return True. Returning
518 -- False or raising an exception are both treated as invariant failures.
519 always :: STM Bool -> STM ()
520 always i = alwaysSucceeds ( do v <- i
521 if (v) then return () else ( error "Transacional invariant violation" ) )
523 -- |Shared memory locations that support atomic memory transactions.
524 data TVar a = TVar (TVar# RealWorld a)
526 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
528 instance Eq (TVar a) where
529 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
531 -- |Create a new TVar holding a value supplied
532 newTVar :: a -> STM (TVar a)
533 newTVar val = STM $ \s1# ->
534 case newTVar# val s1# of
535 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
537 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
538 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
539 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
541 newTVarIO :: a -> IO (TVar a)
542 newTVarIO val = IO $ \s1# ->
543 case newTVar# val s1# of
544 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
546 -- |Return the current value stored in a TVar
547 readTVar :: TVar a -> STM a
548 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
550 -- |Write the supplied value into a TVar
551 writeTVar :: TVar a -> a -> STM ()
552 writeTVar (TVar tvar#) val = STM $ \s1# ->
553 case writeTVar# tvar# val s1# of
558 %************************************************************************
560 \subsection[mvars]{M-Structures}
562 %************************************************************************
564 M-Vars are rendezvous points for concurrent threads. They begin
565 empty, and any attempt to read an empty M-Var blocks. When an M-Var
566 is written, a single blocked thread may be freed. Reading an M-Var
567 toggles its state from full back to empty. Therefore, any value
568 written to an M-Var may only be read once. Multiple reads and writes
569 are allowed, but there must be at least one read between any two
573 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
575 -- |Create an 'MVar' which is initially empty.
576 newEmptyMVar :: IO (MVar a)
577 newEmptyMVar = IO $ \ s# ->
579 (# s2#, svar# #) -> (# s2#, MVar svar# #)
581 -- |Create an 'MVar' which contains the supplied value.
582 newMVar :: a -> IO (MVar a)
584 newEmptyMVar >>= \ mvar ->
585 putMVar mvar value >>
588 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
589 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
590 -- the 'MVar' is left empty.
592 -- There are two further important properties of 'takeMVar':
594 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
595 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
596 -- only one thread will be woken up. The runtime guarantees that
597 -- the woken thread completes its 'takeMVar' operation.
599 -- * When multiple threads are blocked on an 'MVar', they are
600 -- woken up in FIFO order. This is useful for providing
601 -- fairness properties of abstractions built using 'MVar's.
603 takeMVar :: MVar a -> IO a
604 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
606 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
607 -- 'putMVar' will wait until it becomes empty.
609 -- There are two further important properties of 'putMVar':
611 -- * 'putMVar' is single-wakeup. That is, if there are multiple
612 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
613 -- only one thread will be woken up. The runtime guarantees that
614 -- the woken thread completes its 'putMVar' operation.
616 -- * When multiple threads are blocked on an 'MVar', they are
617 -- woken up in FIFO order. This is useful for providing
618 -- fairness properties of abstractions built using 'MVar's.
620 putMVar :: MVar a -> a -> IO ()
621 putMVar (MVar mvar#) x = IO $ \ s# ->
622 case putMVar# mvar# x s# of
625 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
626 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
627 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
628 -- the 'MVar' is left empty.
629 tryTakeMVar :: MVar a -> IO (Maybe a)
630 tryTakeMVar (MVar m) = IO $ \ s ->
631 case tryTakeMVar# m s of
632 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
633 (# s, _, a #) -> (# s, Just a #) -- MVar is full
635 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
636 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
637 -- it was successful, or 'False' otherwise.
638 tryPutMVar :: MVar a -> a -> IO Bool
639 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
640 case tryPutMVar# mvar# x s# of
641 (# s, 0# #) -> (# s, False #)
642 (# s, _ #) -> (# s, True #)
644 -- |Check whether a given 'MVar' is empty.
646 -- Notice that the boolean value returned is just a snapshot of
647 -- the state of the MVar. By the time you get to react on its result,
648 -- the MVar may have been filled (or emptied) - so be extremely
649 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
650 isEmptyMVar :: MVar a -> IO Bool
651 isEmptyMVar (MVar mv#) = IO $ \ s# ->
652 case isEmptyMVar# mv# s# of
653 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
655 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
656 -- "System.Mem.Weak" for more about finalizers.
657 addMVarFinalizer :: MVar a -> IO () -> IO ()
658 addMVarFinalizer (MVar m) finalizer =
659 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
661 withMVar :: MVar a -> (a -> IO b) -> IO b
665 b <- catchException (unblock (io a))
666 (\e -> do putMVar m a; throw e)
672 %************************************************************************
674 \subsection{Thread waiting}
676 %************************************************************************
679 #ifdef mingw32_HOST_OS
681 -- Note: threadWaitRead and threadWaitWrite aren't really functional
682 -- on Win32, but left in there because lib code (still) uses them (the manner
683 -- in which they're used doesn't cause problems on a Win32 platform though.)
685 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
686 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
687 IO $ \s -> case asyncRead# fd isSock len buf s of
688 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
690 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
691 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
692 IO $ \s -> case asyncWrite# fd isSock len buf s of
693 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
695 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
696 asyncDoProc (FunPtr proc) (Ptr param) =
697 -- the 'length' value is ignored; simplifies implementation of
698 -- the async*# primops to have them all return the same result.
699 IO $ \s -> case asyncDoProc# proc param s of
700 (# s, len#, err# #) -> (# s, I# err# #)
702 -- to aid the use of these primops by the IO Handle implementation,
703 -- provide the following convenience funs:
705 -- this better be a pinned byte array!
706 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
707 asyncReadBA fd isSock len off bufB =
708 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
710 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
711 asyncWriteBA fd isSock len off bufB =
712 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
716 -- -----------------------------------------------------------------------------
719 -- | Block the current thread until data is available to read on the
720 -- given file descriptor (GHC only).
721 threadWaitRead :: Fd -> IO ()
723 #ifndef mingw32_HOST_OS
724 | threaded = waitForReadEvent fd
726 | otherwise = IO $ \s ->
727 case fromIntegral fd of { I# fd# ->
728 case waitRead# fd# s of { s -> (# s, () #)
731 -- | Block the current thread until data can be written to the
732 -- given file descriptor (GHC only).
733 threadWaitWrite :: Fd -> IO ()
735 #ifndef mingw32_HOST_OS
736 | threaded = waitForWriteEvent fd
738 | otherwise = IO $ \s ->
739 case fromIntegral fd of { I# fd# ->
740 case waitWrite# fd# s of { s -> (# s, () #)
743 -- | Suspends the current thread for a given number of microseconds
746 -- There is no guarantee that the thread will be rescheduled promptly
747 -- when the delay has expired, but the thread will never continue to
748 -- run /earlier/ than specified.
750 threadDelay :: Int -> IO ()
752 | threaded = waitForDelayEvent time
753 | otherwise = IO $ \s ->
754 case fromIntegral time of { I# time# ->
755 case delay# time# s of { s -> (# s, () #)
759 -- | Set the value of returned TVar to True after a given number of
760 -- microseconds. The caveats associated with threadDelay also apply.
762 registerDelay :: Int -> IO (TVar Bool)
764 | threaded = waitForDelayEventSTM usecs
765 | otherwise = error "registerDelay: requires -threaded"
767 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
769 waitForDelayEvent :: Int -> IO ()
770 waitForDelayEvent usecs = do
772 target <- calculateTarget usecs
773 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
777 -- Delays for use in STM
778 waitForDelayEventSTM :: Int -> IO (TVar Bool)
779 waitForDelayEventSTM usecs = do
780 t <- atomically $ newTVar False
781 target <- calculateTarget usecs
782 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
786 calculateTarget :: Int -> IO USecs
787 calculateTarget usecs = do
789 return $ now + (fromIntegral usecs)
792 -- ----------------------------------------------------------------------------
793 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
795 -- In the threaded RTS, we employ a single IO Manager thread to wait
796 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
797 -- and delays (threadDelay).
799 -- We can do this because in the threaded RTS the IO Manager can make
800 -- a non-blocking call to select(), so we don't have to do select() in
801 -- the scheduler as we have to in the non-threaded RTS. We get performance
802 -- benefits from doing it this way, because we only have to restart the select()
803 -- when a new request arrives, rather than doing one select() each time
804 -- around the scheduler loop. Furthermore, the scheduler can be simplified
805 -- by not having to check for completed IO requests.
807 -- Issues, possible problems:
809 -- - we might want bound threads to just do the blocking
810 -- operation rather than communicating with the IO manager
811 -- thread. This would prevent simgle-threaded programs which do
812 -- IO from requiring multiple OS threads. However, it would also
813 -- prevent bound threads waiting on IO from being killed or sent
816 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
817 -- I couldn't repeat this.
819 -- - How do we handle signal delivery in the multithreaded RTS?
821 -- - forkProcess will kill the IO manager thread. Let's just
822 -- hope we don't need to do any blocking IO between fork & exec.
824 #ifndef mingw32_HOST_OS
826 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
827 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
831 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
832 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
834 #ifndef mingw32_HOST_OS
835 pendingEvents :: IORef [IOReq]
837 pendingDelays :: IORef [DelayReq]
838 -- could use a strict list or array here
839 {-# NOINLINE pendingEvents #-}
840 {-# NOINLINE pendingDelays #-}
841 (pendingEvents,pendingDelays) = unsafePerformIO $ do
846 -- the first time we schedule an IO request, the service thread
847 -- will be created (cool, huh?)
849 ensureIOManagerIsRunning :: IO ()
850 ensureIOManagerIsRunning
851 | threaded = seq pendingEvents $ return ()
852 | otherwise = return ()
854 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
855 insertDelay d [] = [d]
856 insertDelay d1 ds@(d2 : rest)
857 | delayTime d1 <= delayTime d2 = d1 : ds
858 | otherwise = d2 : insertDelay d1 rest
860 delayTime :: DelayReq -> USecs
861 delayTime (Delay t _) = t
862 delayTime (DelaySTM t _) = t
866 -- XXX: move into GHC.IOBase from Data.IORef?
867 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
868 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
870 foreign import ccall unsafe "getUSecOfDay"
871 getUSecOfDay :: IO USecs
873 prodding :: IORef Bool
874 {-# NOINLINE prodding #-}
875 prodding = unsafePerformIO (newIORef False)
877 prodServiceThread :: IO ()
878 prodServiceThread = do
879 was_set <- atomicModifyIORef prodding (\a -> (True,a))
880 if (not (was_set)) then wakeupIOManager else return ()
882 #ifdef mingw32_HOST_OS
883 -- ----------------------------------------------------------------------------
884 -- Windows IO manager thread
886 startIOManagerThread :: IO ()
887 startIOManagerThread = do
888 wakeup <- c_getIOManagerEvent
889 forkIO $ service_loop wakeup []
892 service_loop :: HANDLE -- read end of pipe
893 -> [DelayReq] -- current delay requests
896 service_loop wakeup old_delays = do
897 -- pick up new delay requests
898 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
899 let delays = foldr insertDelay old_delays new_delays
902 (delays', timeout) <- getDelay now delays
904 r <- c_WaitForSingleObject wakeup timeout
906 0xffffffff -> do c_maperrno; throwErrno "service_loop"
908 r <- c_readIOManagerEvent
911 _ | r == io_MANAGER_WAKEUP -> return False
912 _ | r == io_MANAGER_DIE -> return True
913 0 -> return False -- spurious wakeup
914 r -> do start_console_handler (r `shiftR` 1); return False
917 else service_cont wakeup delays'
919 _other -> service_cont wakeup delays' -- probably timeout
921 service_cont wakeup delays = do
922 atomicModifyIORef prodding (\_ -> (False,False))
923 service_loop wakeup delays
925 -- must agree with rts/win32/ThrIOManager.c
926 io_MANAGER_WAKEUP = 0xffffffff :: Word32
927 io_MANAGER_DIE = 0xfffffffe :: Word32
933 -- these are sent to Services only.
936 deriving (Eq, Ord, Enum, Show, Read, Typeable)
938 start_console_handler :: Word32 -> IO ()
939 start_console_handler r =
940 case toWin32ConsoleEvent r of
941 Just x -> withMVar win32ConsoleHandler $ \handler -> do
946 toWin32ConsoleEvent ev =
948 0 {- CTRL_C_EVENT-} -> Just ControlC
949 1 {- CTRL_BREAK_EVENT-} -> Just Break
950 2 {- CTRL_CLOSE_EVENT-} -> Just Close
951 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
952 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
955 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
956 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
958 stick :: IORef HANDLE
959 {-# NOINLINE stick #-}
960 stick = unsafePerformIO (newIORef nullPtr)
963 hdl <- readIORef stick
964 c_sendIOManagerEvent io_MANAGER_WAKEUP
966 -- Walk the queue of pending delays, waking up any that have passed
967 -- and return the smallest delay to wait for. The queue of pending
968 -- delays is kept ordered.
969 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
970 getDelay now [] = return ([], iNFINITE)
971 getDelay now all@(d : rest)
973 Delay time m | now >= time -> do
976 DelaySTM time t | now >= time -> do
977 atomically $ writeTVar t True
980 -- delay is in millisecs for WaitForSingleObject
981 let micro_seconds = delayTime d - now
982 milli_seconds = (micro_seconds + 999) `div` 1000
983 in return (all, fromIntegral milli_seconds)
985 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
986 -- available yet. We should move some Win32 functionality down here,
987 -- maybe as part of the grand reorganisation of the base package...
991 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
993 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
994 c_getIOManagerEvent :: IO HANDLE
996 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
997 c_readIOManagerEvent :: IO Word32
999 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1000 c_sendIOManagerEvent :: Word32 -> IO ()
1002 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1005 foreign import stdcall "WaitForSingleObject"
1006 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1009 -- ----------------------------------------------------------------------------
1010 -- Unix IO manager thread, using select()
1012 startIOManagerThread :: IO ()
1013 startIOManagerThread = do
1014 allocaArray 2 $ \fds -> do
1015 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1016 rd_end <- peekElemOff fds 0
1017 wr_end <- peekElemOff fds 1
1018 writeIORef stick (fromIntegral wr_end)
1019 c_setIOManagerPipe wr_end
1021 allocaBytes sizeofFdSet $ \readfds -> do
1022 allocaBytes sizeofFdSet $ \writefds -> do
1023 allocaBytes sizeofTimeVal $ \timeval -> do
1024 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1028 :: Fd -- listen to this for wakeup calls
1035 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1037 -- pick up new IO requests
1038 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1039 let reqs = new_reqs ++ old_reqs
1041 -- pick up new delay requests
1042 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1043 let delays = foldr insertDelay old_delays new_delays
1045 -- build the FDSets for select()
1048 fdSet wakeup readfds
1049 maxfd <- buildFdSets 0 readfds writefds reqs
1051 -- perform the select()
1052 let do_select delays = do
1053 -- check the current time and wake up any thread in
1054 -- threadDelay whose timeout has expired. Also find the
1055 -- timeout value for the select() call.
1057 (delays', timeout) <- getDelay now ptimeval delays
1059 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1065 _ | err == eINTR -> do_select delays'
1066 -- EINTR: just redo the select()
1067 _ | err == eBADF -> return (True, delays)
1068 -- EBADF: one of the file descriptors is closed or bad,
1069 -- we don't know which one, so wake everyone up.
1070 _ | otherwise -> throwErrno "select"
1071 -- otherwise (ENOMEM or EINVAL) something has gone
1072 -- wrong; report the error.
1074 return (False,delays')
1076 (wakeup_all,delays') <- do_select delays
1079 if wakeup_all then return False
1081 b <- fdIsSet wakeup readfds
1084 else alloca $ \p -> do
1085 c_read (fromIntegral wakeup) p 1; return ()
1088 _ | s == io_MANAGER_WAKEUP -> return False
1089 _ | s == io_MANAGER_DIE -> return True
1090 _ -> withMVar signalHandlerLock $ \_ -> do
1091 handler_tbl <- peek handlers
1092 sp <- peekElemOff handler_tbl (fromIntegral s)
1093 io <- deRefStablePtr sp
1097 if exit then return () else do
1099 atomicModifyIORef prodding (\_ -> (False,False))
1101 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1102 else completeRequests reqs readfds writefds []
1104 service_loop wakeup readfds writefds ptimeval reqs' delays'
1106 io_MANAGER_WAKEUP = 0xff :: CChar
1107 io_MANAGER_DIE = 0xfe :: CChar
1110 {-# NOINLINE stick #-}
1111 stick = unsafePerformIO (newIORef 0)
1113 wakeupIOManager :: IO ()
1114 wakeupIOManager = do
1115 fd <- readIORef stick
1116 with io_MANAGER_WAKEUP $ \pbuf -> do
1117 c_write (fromIntegral fd) pbuf 1; return ()
1119 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1120 -- this race condition is #1922, although that bug was on Windows a similar
1121 -- bug also exists on Unix.
1122 signalHandlerLock :: MVar ()
1123 signalHandlerLock = unsafePerformIO (newMVar ())
1125 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1127 foreign import ccall "setIOManagerPipe"
1128 c_setIOManagerPipe :: CInt -> IO ()
1130 -- -----------------------------------------------------------------------------
1133 buildFdSets maxfd readfds writefds [] = return maxfd
1134 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1135 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1138 buildFdSets (max maxfd fd) readfds writefds reqs
1139 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1140 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1143 buildFdSets (max maxfd fd) readfds writefds reqs
1145 completeRequests [] _ _ reqs' = return reqs'
1146 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1147 b <- fdIsSet fd readfds
1149 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1150 else completeRequests reqs readfds writefds (Read fd m : reqs')
1151 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1152 b <- fdIsSet fd writefds
1154 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1155 else completeRequests reqs readfds writefds (Write fd m : reqs')
1157 wakeupAll [] = return ()
1158 wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
1159 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1161 waitForReadEvent :: Fd -> IO ()
1162 waitForReadEvent fd = do
1164 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1168 waitForWriteEvent :: Fd -> IO ()
1169 waitForWriteEvent fd = do
1171 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1175 -- -----------------------------------------------------------------------------
1178 -- Walk the queue of pending delays, waking up any that have passed
1179 -- and return the smallest delay to wait for. The queue of pending
1180 -- delays is kept ordered.
1181 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1182 getDelay now ptimeval [] = return ([],nullPtr)
1183 getDelay now ptimeval all@(d : rest)
1185 Delay time m | now >= time -> do
1187 getDelay now ptimeval rest
1188 DelaySTM time t | now >= time -> do
1189 atomically $ writeTVar t True
1190 getDelay now ptimeval rest
1192 setTimevalTicks ptimeval (delayTime d - now)
1193 return (all,ptimeval)
1195 newtype CTimeVal = CTimeVal ()
1197 foreign import ccall unsafe "sizeofTimeVal"
1198 sizeofTimeVal :: Int
1200 foreign import ccall unsafe "setTimevalTicks"
1201 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1204 On Win32 we're going to have a single Pipe, and a
1205 waitForSingleObject with the delay time. For signals, we send a
1206 byte down the pipe just like on Unix.
1209 -- ----------------------------------------------------------------------------
1210 -- select() interface
1212 -- ToDo: move to System.Posix.Internals?
1214 newtype CFdSet = CFdSet ()
1216 foreign import ccall safe "select"
1217 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1220 foreign import ccall unsafe "hsFD_SETSIZE"
1221 c_fD_SETSIZE :: CInt
1224 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1226 foreign import ccall unsafe "hsFD_CLR"
1227 c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1229 fdClr :: Fd -> Ptr CFdSet -> IO ()
1230 fdClr (Fd fd) fdset = c_fdClr fd fdset
1232 foreign import ccall unsafe "hsFD_ISSET"
1233 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1235 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1236 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1238 foreign import ccall unsafe "hsFD_SET"
1239 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1241 fdSet :: Fd -> Ptr CFdSet -> IO ()
1242 fdSet (Fd fd) fdset = c_fdSet fd fdset
1244 foreign import ccall unsafe "hsFD_ZERO"
1245 fdZero :: Ptr CFdSet -> IO ()
1247 foreign import ccall unsafe "sizeof_fd_set"