2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
41 , labelThread -- :: ThreadId -> String -> IO ()
43 , ThreadStatus(..), BlockReason(..)
44 , threadStatus -- :: ThreadId -> IO ThreadStatus
47 , threadDelay -- :: Int -> IO ()
48 , registerDelay -- :: Int -> IO (TVar Bool)
49 , threadWaitRead -- :: Int -> IO ()
50 , threadWaitWrite -- :: Int -> IO ()
54 , newMVar -- :: a -> IO (MVar a)
55 , newEmptyMVar -- :: IO (MVar a)
56 , takeMVar -- :: MVar a -> IO a
57 , putMVar -- :: MVar a -> a -> IO ()
58 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
59 , tryPutMVar -- :: MVar a -> a -> IO Bool
60 , isEmptyMVar -- :: MVar a -> IO Bool
61 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
65 , atomically -- :: STM a -> IO a
67 , orElse -- :: STM a -> STM a -> STM a
68 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
69 , alwaysSucceeds -- :: STM a -> STM ()
70 , always -- :: STM Bool -> STM ()
72 , newTVar -- :: a -> STM (TVar a)
73 , newTVarIO -- :: a -> STM (TVar a)
74 , readTVar -- :: TVar a -> STM a
75 , readTVarIO -- :: TVar a -> IO a
76 , writeTVar -- :: a -> TVar a -> STM ()
77 , unsafeIOToSTM -- :: IO a -> STM a
80 #ifdef mingw32_HOST_OS
81 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
86 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
89 #ifndef mingw32_HOST_OS
93 , ensureIOManagerIsRunning
95 #ifdef mingw32_HOST_OS
100 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
101 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
103 , reportError, reportStackOverflow
106 import System.Posix.Types
107 #ifndef mingw32_HOST_OS
108 import System.Posix.Internals
116 import {-# SOURCE #-} GHC.Handle
118 import GHC.Num ( Num(..) )
119 import GHC.Real ( fromIntegral )
120 #ifdef mingw32_HOST_OS
121 import GHC.Real ( div )
122 import GHC.Ptr ( plusPtr, FunPtr(..) )
124 #ifdef mingw32_HOST_OS
125 import GHC.Read ( Read )
126 import GHC.Enum ( Enum )
128 import GHC.Exception ( SomeException(..), throw )
129 import GHC.Pack ( packCString# )
130 import GHC.Ptr ( Ptr(..) )
132 import GHC.Show ( Show(..), showString )
136 infixr 0 `par`, `pseq`
139 %************************************************************************
141 \subsection{@ThreadId@, @par@, and @fork@}
143 %************************************************************************
146 data ThreadId = ThreadId ThreadId# deriving( Typeable )
147 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
148 -- But since ThreadId# is unlifted, the Weak type must use open
151 A 'ThreadId' is an abstract type representing a handle to a thread.
152 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
153 the 'Ord' instance implements an arbitrary total ordering over
154 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
155 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
156 useful when debugging or diagnosing the behaviour of a concurrent
159 /Note/: in GHC, if you have a 'ThreadId', you essentially have
160 a pointer to the thread itself. This means the thread itself can\'t be
161 garbage collected until you drop the 'ThreadId'.
162 This misfeature will hopefully be corrected at a later date.
164 /Note/: Hugs does not provide any operations on other threads;
165 it defines 'ThreadId' as a synonym for ().
168 instance Show ThreadId where
170 showString "ThreadId " .
171 showsPrec d (getThreadId (id2TSO t))
173 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
175 id2TSO :: ThreadId -> ThreadId#
176 id2TSO (ThreadId t) = t
178 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
181 cmpThread :: ThreadId -> ThreadId -> Ordering
183 case cmp_thread (id2TSO t1) (id2TSO t2) of
188 instance Eq ThreadId where
190 case t1 `cmpThread` t2 of
194 instance Ord ThreadId where
198 Sparks off a new thread to run the 'IO' computation passed as the
199 first argument, and returns the 'ThreadId' of the newly created
202 The new thread will be a lightweight thread; if you want to use a foreign
203 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
205 GHC note: the new thread inherits the /blocked/ state of the parent
206 (see 'Control.Exception.block').
208 forkIO :: IO () -> IO ThreadId
209 forkIO action = IO $ \ s ->
210 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
212 action_plus = catchException action childHandler
215 Like 'forkIO', but lets you specify on which CPU the thread is
216 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
217 will stay on the same CPU for its entire lifetime (`forkIO` threads
218 can migrate between CPUs according to the scheduling policy).
219 `forkOnIO` is useful for overriding the scheduling policy when you
220 know in advance how best to distribute the threads.
222 The `Int` argument specifies the CPU number; it is interpreted modulo
223 'numCapabilities' (note that it actually specifies a capability number
224 rather than a CPU number, but to a first approximation the two are
227 forkOnIO :: Int -> IO () -> IO ThreadId
228 forkOnIO (I# cpu) action = IO $ \ s ->
229 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
231 action_plus = catchException action childHandler
233 -- | the value passed to the @+RTS -N@ flag. This is the number of
234 -- Haskell threads that can run truly simultaneously at any given
235 -- time, and is typically set to the number of physical CPU cores on
237 numCapabilities :: Int
238 numCapabilities = unsafePerformIO $ do
239 n <- peek n_capabilities
240 return (fromIntegral n)
242 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
244 childHandler :: SomeException -> IO ()
245 childHandler err = catchException (real_handler err) childHandler
247 real_handler :: SomeException -> IO ()
248 real_handler se@(SomeException ex) =
249 -- ignore thread GC and killThread exceptions:
251 Just BlockedOnDeadMVar -> return ()
253 Just BlockedIndefinitely -> return ()
255 Just ThreadKilled -> return ()
257 -- report all others:
258 Just StackOverflow -> reportStackOverflow
261 {- | 'killThread' terminates the given thread (GHC only).
262 Any work already done by the thread isn\'t
263 lost: the computation is suspended until required by another thread.
264 The memory used by the thread will be garbage collected if it isn\'t
265 referenced from anywhere. The 'killThread' function is defined in
268 > killThread tid = throwTo tid ThreadKilled
270 Killthread is a no-op if the target thread has already completed.
272 killThread :: ThreadId -> IO ()
273 killThread tid = throwTo tid ThreadKilled
275 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
277 'throwTo' does not return until the exception has been raised in the
279 The calling thread can thus be certain that the target
280 thread has received the exception. This is a useful property to know
281 when dealing with race conditions: eg. if there are two threads that
282 can kill each other, it is guaranteed that only one of the threads
283 will get to kill the other.
285 If the target thread is currently making a foreign call, then the
286 exception will not be raised (and hence 'throwTo' will not return)
287 until the call has completed. This is the case regardless of whether
288 the call is inside a 'block' or not.
290 Important note: the behaviour of 'throwTo' differs from that described in
291 the paper \"Asynchronous exceptions in Haskell\"
292 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
293 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
294 a more synchronous design in which 'throwTo' does not return until the exception
295 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
296 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
299 There is currently no guarantee that the exception delivered by 'throwTo' will be
300 delivered at the first possible opportunity. In particular, if a thread may
301 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
302 a pending 'throwTo'. This is arguably undesirable behaviour.
305 throwTo :: Exception e => ThreadId -> e -> IO ()
306 throwTo (ThreadId tid) ex = IO $ \ s ->
307 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
309 -- | Returns the 'ThreadId' of the calling thread (GHC only).
310 myThreadId :: IO ThreadId
311 myThreadId = IO $ \s ->
312 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
315 -- |The 'yield' action allows (forces, in a co-operative multitasking
316 -- implementation) a context-switch to any other currently runnable
317 -- threads (if any), and is occasionally useful when implementing
318 -- concurrency abstractions.
321 case (yield# s) of s1 -> (# s1, () #)
323 {- | 'labelThread' stores a string as identifier for this thread if
324 you built a RTS with debugging support. This identifier will be used in
325 the debugging output to make distinction of different threads easier
326 (otherwise you only have the thread state object\'s address in the heap).
328 Other applications like the graphical Concurrent Haskell Debugger
329 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
330 'labelThread' for their purposes as well.
333 labelThread :: ThreadId -> String -> IO ()
334 labelThread (ThreadId t) str = IO $ \ s ->
335 let ps = packCString# str
336 adr = byteArrayContents# ps in
337 case (labelThread# t adr s) of s1 -> (# s1, () #)
339 -- Nota Bene: 'pseq' used to be 'seq'
340 -- but 'seq' is now defined in PrelGHC
342 -- "pseq" is defined a bit weirdly (see below)
344 -- The reason for the strange "lazy" call is that
345 -- it fools the compiler into thinking that pseq and par are non-strict in
346 -- their second argument (even if it inlines pseq at the call site).
347 -- If it thinks pseq is strict in "y", then it often evaluates
348 -- "y" before "x", which is totally wrong.
352 pseq x y = x `seq` lazy y
356 par x y = case (par# x) of { _ -> lazy y }
361 -- ^blocked on on 'MVar'
363 -- ^blocked on a computation in progress by another thread
365 -- ^blocked in 'throwTo'
367 -- ^blocked in 'retry' in an STM transaction
368 | BlockedOnForeignCall
369 -- ^currently in a foreign call
371 -- ^blocked on some other resource. Without @-threaded@,
372 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
373 -- they show up as 'BlockedOnMVar'.
374 deriving (Eq,Ord,Show)
376 -- | The current status of a thread
379 -- ^the thread is currently runnable or running
381 -- ^the thread has finished
382 | ThreadBlocked BlockReason
383 -- ^the thread is blocked on some resource
385 -- ^the thread received an uncaught exception
386 deriving (Eq,Ord,Show)
388 threadStatus :: ThreadId -> IO ThreadStatus
389 threadStatus (ThreadId t) = IO $ \s ->
390 case threadStatus# t s of
391 (# s', stat #) -> (# s', mk_stat (I# stat) #)
393 -- NB. keep these in sync with includes/Constants.h
394 mk_stat 0 = ThreadRunning
395 mk_stat 1 = ThreadBlocked BlockedOnMVar
396 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
397 mk_stat 3 = ThreadBlocked BlockedOnException
398 mk_stat 7 = ThreadBlocked BlockedOnSTM
399 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
400 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
401 mk_stat 16 = ThreadFinished
402 mk_stat 17 = ThreadDied
403 mk_stat _ = ThreadBlocked BlockedOnOther
407 %************************************************************************
409 \subsection[stm]{Transactional heap operations}
411 %************************************************************************
413 TVars are shared memory locations which support atomic memory
417 -- |A monad supporting atomic memory transactions.
418 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
420 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
423 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
425 instance Functor STM where
426 fmap f x = x >>= (return . f)
428 instance Monad STM where
429 {-# INLINE return #-}
433 return x = returnSTM x
434 m >>= k = bindSTM m k
436 bindSTM :: STM a -> (a -> STM b) -> STM b
437 bindSTM (STM m) k = STM ( \s ->
439 (# new_s, a #) -> unSTM (k a) new_s
442 thenSTM :: STM a -> STM b -> STM b
443 thenSTM (STM m) k = STM ( \s ->
445 (# new_s, _ #) -> unSTM k new_s
448 returnSTM :: a -> STM a
449 returnSTM x = STM (\s -> (# s, x #))
451 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
452 -- dangerous thing to do.
454 -- * The STM implementation will often run transactions multiple
455 -- times, so you need to be prepared for this if your IO has any
458 -- * The STM implementation will abort transactions that are known to
459 -- be invalid and need to be restarted. This may happen in the middle
460 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
461 -- that need releasing (exception handlers are ignored when aborting
462 -- the transaction). That includes doing any IO using Handles, for
463 -- example. Getting this wrong will probably lead to random deadlocks.
465 -- * The transaction may have seen an inconsistent view of memory when
466 -- the IO runs. Invariants that you expect to be true throughout
467 -- your program may not be true inside a transaction, due to the
468 -- way transactions are implemented. Normally this wouldn't be visible
469 -- to the programmer, but using `unsafeIOToSTM` can expose it.
471 unsafeIOToSTM :: IO a -> STM a
472 unsafeIOToSTM (IO m) = STM m
474 -- |Perform a series of STM actions atomically.
476 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
477 -- Any attempt to do so will result in a runtime error. (Reason: allowing
478 -- this would effectively allow a transaction inside a transaction, depending
479 -- on exactly when the thunk is evaluated.)
481 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
482 -- and which allows top-level TVars to be allocated.
484 atomically :: STM a -> IO a
485 atomically (STM m) = IO (\s -> (atomically# m) s )
487 -- |Retry execution of the current memory transaction because it has seen
488 -- values in TVars which mean that it should not continue (e.g. the TVars
489 -- represent a shared buffer that is now empty). The implementation may
490 -- block the thread until one of the TVars that it has read from has been
491 -- udpated. (GHC only)
493 retry = STM $ \s# -> retry# s#
495 -- |Compose two alternative STM actions (GHC only). If the first action
496 -- completes without retrying then it forms the result of the orElse.
497 -- Otherwise, if the first action retries, then the second action is
498 -- tried in its place. If both actions retry then the orElse as a
500 orElse :: STM a -> STM a -> STM a
501 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
503 -- |Exception handling within STM actions.
504 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
505 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
507 -- | Low-level primitive on which always and alwaysSucceeds are built.
508 -- checkInv differs form these in that (i) the invariant is not
509 -- checked when checkInv is called, only at the end of this and
510 -- subsequent transcations, (ii) the invariant failure is indicated
511 -- by raising an exception.
512 checkInv :: STM a -> STM ()
513 checkInv (STM m) = STM (\s -> (check# m) s)
515 -- | alwaysSucceeds adds a new invariant that must be true when passed
516 -- to alwaysSucceeds, at the end of the current transaction, and at
517 -- the end of every subsequent transaction. If it fails at any
518 -- of those points then the transaction violating it is aborted
519 -- and the exception raised by the invariant is propagated.
520 alwaysSucceeds :: STM a -> STM ()
521 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
524 -- | always is a variant of alwaysSucceeds in which the invariant is
525 -- expressed as an STM Bool action that must return True. Returning
526 -- False or raising an exception are both treated as invariant failures.
527 always :: STM Bool -> STM ()
528 always i = alwaysSucceeds ( do v <- i
529 if (v) then return () else ( error "Transacional invariant violation" ) )
531 -- |Shared memory locations that support atomic memory transactions.
532 data TVar a = TVar (TVar# RealWorld a)
534 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
536 instance Eq (TVar a) where
537 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
539 -- |Create a new TVar holding a value supplied
540 newTVar :: a -> STM (TVar a)
541 newTVar val = STM $ \s1# ->
542 case newTVar# val s1# of
543 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
545 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
546 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
547 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
549 newTVarIO :: a -> IO (TVar a)
550 newTVarIO val = IO $ \s1# ->
551 case newTVar# val s1# of
552 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
554 -- |Return the current value stored in a TVar.
555 -- This is equivalent to
557 -- > readTVarIO = atomically . readTVar
559 -- but works much faster, because it doesn't perform a complete
560 -- transaction, it just reads the current value of the 'TVar'.
561 readTVarIO :: TVar a -> IO a
562 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
564 -- |Return the current value stored in a TVar
565 readTVar :: TVar a -> STM a
566 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
568 -- |Write the supplied value into a TVar
569 writeTVar :: TVar a -> a -> STM ()
570 writeTVar (TVar tvar#) val = STM $ \s1# ->
571 case writeTVar# tvar# val s1# of
576 %************************************************************************
578 \subsection[mvars]{M-Structures}
580 %************************************************************************
582 M-Vars are rendezvous points for concurrent threads. They begin
583 empty, and any attempt to read an empty M-Var blocks. When an M-Var
584 is written, a single blocked thread may be freed. Reading an M-Var
585 toggles its state from full back to empty. Therefore, any value
586 written to an M-Var may only be read once. Multiple reads and writes
587 are allowed, but there must be at least one read between any two
591 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
593 -- |Create an 'MVar' which is initially empty.
594 newEmptyMVar :: IO (MVar a)
595 newEmptyMVar = IO $ \ s# ->
597 (# s2#, svar# #) -> (# s2#, MVar svar# #)
599 -- |Create an 'MVar' which contains the supplied value.
600 newMVar :: a -> IO (MVar a)
602 newEmptyMVar >>= \ mvar ->
603 putMVar mvar value >>
606 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
607 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
608 -- the 'MVar' is left empty.
610 -- There are two further important properties of 'takeMVar':
612 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
613 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
614 -- only one thread will be woken up. The runtime guarantees that
615 -- the woken thread completes its 'takeMVar' operation.
617 -- * When multiple threads are blocked on an 'MVar', they are
618 -- woken up in FIFO order. This is useful for providing
619 -- fairness properties of abstractions built using 'MVar's.
621 takeMVar :: MVar a -> IO a
622 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
624 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
625 -- 'putMVar' will wait until it becomes empty.
627 -- There are two further important properties of 'putMVar':
629 -- * 'putMVar' is single-wakeup. That is, if there are multiple
630 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
631 -- only one thread will be woken up. The runtime guarantees that
632 -- the woken thread completes its 'putMVar' operation.
634 -- * When multiple threads are blocked on an 'MVar', they are
635 -- woken up in FIFO order. This is useful for providing
636 -- fairness properties of abstractions built using 'MVar's.
638 putMVar :: MVar a -> a -> IO ()
639 putMVar (MVar mvar#) x = IO $ \ s# ->
640 case putMVar# mvar# x s# of
643 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
644 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
645 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
646 -- the 'MVar' is left empty.
647 tryTakeMVar :: MVar a -> IO (Maybe a)
648 tryTakeMVar (MVar m) = IO $ \ s ->
649 case tryTakeMVar# m s of
650 (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty
651 (# s', _, a #) -> (# s', Just a #) -- MVar is full
653 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
654 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
655 -- it was successful, or 'False' otherwise.
656 tryPutMVar :: MVar a -> a -> IO Bool
657 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
658 case tryPutMVar# mvar# x s# of
659 (# s, 0# #) -> (# s, False #)
660 (# s, _ #) -> (# s, True #)
662 -- |Check whether a given 'MVar' is empty.
664 -- Notice that the boolean value returned is just a snapshot of
665 -- the state of the MVar. By the time you get to react on its result,
666 -- the MVar may have been filled (or emptied) - so be extremely
667 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
668 isEmptyMVar :: MVar a -> IO Bool
669 isEmptyMVar (MVar mv#) = IO $ \ s# ->
670 case isEmptyMVar# mv# s# of
671 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
673 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
674 -- "System.Mem.Weak" for more about finalizers.
675 addMVarFinalizer :: MVar a -> IO () -> IO ()
676 addMVarFinalizer (MVar m) finalizer =
677 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
679 withMVar :: MVar a -> (a -> IO b) -> IO b
683 b <- catchAny (unblock (io a))
684 (\e -> do putMVar m a; throw e)
690 %************************************************************************
692 \subsection{Thread waiting}
694 %************************************************************************
697 #ifdef mingw32_HOST_OS
699 -- Note: threadWaitRead and threadWaitWrite aren't really functional
700 -- on Win32, but left in there because lib code (still) uses them (the manner
701 -- in which they're used doesn't cause problems on a Win32 platform though.)
703 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
704 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
705 IO $ \s -> case asyncRead# fd isSock len buf s of
706 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
708 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
709 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
710 IO $ \s -> case asyncWrite# fd isSock len buf s of
711 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
713 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
714 asyncDoProc (FunPtr proc) (Ptr param) =
715 -- the 'length' value is ignored; simplifies implementation of
716 -- the async*# primops to have them all return the same result.
717 IO $ \s -> case asyncDoProc# proc param s of
718 (# s', _len#, err# #) -> (# s', I# err# #)
720 -- to aid the use of these primops by the IO Handle implementation,
721 -- provide the following convenience funs:
723 -- this better be a pinned byte array!
724 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
725 asyncReadBA fd isSock len off bufB =
726 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
728 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
729 asyncWriteBA fd isSock len off bufB =
730 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
734 -- -----------------------------------------------------------------------------
737 -- | Block the current thread until data is available to read on the
738 -- given file descriptor (GHC only).
739 threadWaitRead :: Fd -> IO ()
741 #ifndef mingw32_HOST_OS
742 | threaded = waitForReadEvent fd
744 | otherwise = IO $ \s ->
745 case fromIntegral fd of { I# fd# ->
746 case waitRead# fd# s of { s' -> (# s', () #)
749 -- | Block the current thread until data can be written to the
750 -- given file descriptor (GHC only).
751 threadWaitWrite :: Fd -> IO ()
753 #ifndef mingw32_HOST_OS
754 | threaded = waitForWriteEvent fd
756 | otherwise = IO $ \s ->
757 case fromIntegral fd of { I# fd# ->
758 case waitWrite# fd# s of { s' -> (# s', () #)
761 -- | Suspends the current thread for a given number of microseconds
764 -- There is no guarantee that the thread will be rescheduled promptly
765 -- when the delay has expired, but the thread will never continue to
766 -- run /earlier/ than specified.
768 threadDelay :: Int -> IO ()
770 | threaded = waitForDelayEvent time
771 | otherwise = IO $ \s ->
772 case fromIntegral time of { I# time# ->
773 case delay# time# s of { s' -> (# s', () #)
777 -- | Set the value of returned TVar to True after a given number of
778 -- microseconds. The caveats associated with threadDelay also apply.
780 registerDelay :: Int -> IO (TVar Bool)
782 | threaded = waitForDelayEventSTM usecs
783 | otherwise = error "registerDelay: requires -threaded"
785 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
787 waitForDelayEvent :: Int -> IO ()
788 waitForDelayEvent usecs = do
790 target <- calculateTarget usecs
791 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
795 -- Delays for use in STM
796 waitForDelayEventSTM :: Int -> IO (TVar Bool)
797 waitForDelayEventSTM usecs = do
798 t <- atomically $ newTVar False
799 target <- calculateTarget usecs
800 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
804 calculateTarget :: Int -> IO USecs
805 calculateTarget usecs = do
807 return $ now + (fromIntegral usecs)
810 -- ----------------------------------------------------------------------------
811 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
813 -- In the threaded RTS, we employ a single IO Manager thread to wait
814 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
815 -- and delays (threadDelay).
817 -- We can do this because in the threaded RTS the IO Manager can make
818 -- a non-blocking call to select(), so we don't have to do select() in
819 -- the scheduler as we have to in the non-threaded RTS. We get performance
820 -- benefits from doing it this way, because we only have to restart the select()
821 -- when a new request arrives, rather than doing one select() each time
822 -- around the scheduler loop. Furthermore, the scheduler can be simplified
823 -- by not having to check for completed IO requests.
825 -- Issues, possible problems:
827 -- - we might want bound threads to just do the blocking
828 -- operation rather than communicating with the IO manager
829 -- thread. This would prevent simgle-threaded programs which do
830 -- IO from requiring multiple OS threads. However, it would also
831 -- prevent bound threads waiting on IO from being killed or sent
834 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
835 -- I couldn't repeat this.
837 -- - How do we handle signal delivery in the multithreaded RTS?
839 -- - forkProcess will kill the IO manager thread. Let's just
840 -- hope we don't need to do any blocking IO between fork & exec.
842 #ifndef mingw32_HOST_OS
844 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
845 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
849 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
850 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
852 #ifndef mingw32_HOST_OS
853 pendingEvents :: IORef [IOReq]
855 pendingDelays :: IORef [DelayReq]
856 -- could use a strict list or array here
857 {-# NOINLINE pendingEvents #-}
858 {-# NOINLINE pendingDelays #-}
859 (pendingEvents,pendingDelays) = unsafePerformIO $ do
864 -- the first time we schedule an IO request, the service thread
865 -- will be created (cool, huh?)
867 ensureIOManagerIsRunning :: IO ()
868 ensureIOManagerIsRunning
869 | threaded = seq pendingEvents $ return ()
870 | otherwise = return ()
872 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
873 insertDelay d [] = [d]
874 insertDelay d1 ds@(d2 : rest)
875 | delayTime d1 <= delayTime d2 = d1 : ds
876 | otherwise = d2 : insertDelay d1 rest
878 delayTime :: DelayReq -> USecs
879 delayTime (Delay t _) = t
880 delayTime (DelaySTM t _) = t
884 -- XXX: move into GHC.IOBase from Data.IORef?
885 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
886 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
888 foreign import ccall unsafe "getUSecOfDay"
889 getUSecOfDay :: IO USecs
891 prodding :: IORef Bool
892 {-# NOINLINE prodding #-}
893 prodding = unsafePerformIO (newIORef False)
895 prodServiceThread :: IO ()
896 prodServiceThread = do
897 was_set <- atomicModifyIORef prodding (\a -> (True,a))
898 if (not (was_set)) then wakeupIOManager else return ()
900 #ifdef mingw32_HOST_OS
901 -- ----------------------------------------------------------------------------
902 -- Windows IO manager thread
904 startIOManagerThread :: IO ()
905 startIOManagerThread = do
906 wakeup <- c_getIOManagerEvent
907 forkIO $ service_loop wakeup []
910 service_loop :: HANDLE -- read end of pipe
911 -> [DelayReq] -- current delay requests
914 service_loop wakeup old_delays = do
915 -- pick up new delay requests
916 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
917 let delays = foldr insertDelay old_delays new_delays
920 (delays', timeout) <- getDelay now delays
922 r <- c_WaitForSingleObject wakeup timeout
924 0xffffffff -> do c_maperrno; throwErrno "service_loop"
926 r2 <- c_readIOManagerEvent
929 _ | r2 == io_MANAGER_WAKEUP -> return False
930 _ | r2 == io_MANAGER_DIE -> return True
931 0 -> return False -- spurious wakeup
932 _ -> do start_console_handler (r2 `shiftR` 1); return False
935 else service_cont wakeup delays'
937 _other -> service_cont wakeup delays' -- probably timeout
939 service_cont :: HANDLE -> [DelayReq] -> IO ()
940 service_cont wakeup delays = do
941 atomicModifyIORef prodding (\_ -> (False,False))
942 service_loop wakeup delays
944 -- must agree with rts/win32/ThrIOManager.c
945 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
946 io_MANAGER_WAKEUP = 0xffffffff
947 io_MANAGER_DIE = 0xfffffffe
953 -- these are sent to Services only.
956 deriving (Eq, Ord, Enum, Show, Read, Typeable)
958 start_console_handler :: Word32 -> IO ()
959 start_console_handler r =
960 case toWin32ConsoleEvent r of
961 Just x -> withMVar win32ConsoleHandler $ \handler -> do
966 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
967 toWin32ConsoleEvent ev =
969 0 {- CTRL_C_EVENT-} -> Just ControlC
970 1 {- CTRL_BREAK_EVENT-} -> Just Break
971 2 {- CTRL_CLOSE_EVENT-} -> Just Close
972 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
973 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
976 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
977 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
979 -- XXX Is this actually needed?
980 stick :: IORef HANDLE
981 {-# NOINLINE stick #-}
982 stick = unsafePerformIO (newIORef nullPtr)
984 wakeupIOManager :: IO ()
986 _hdl <- readIORef stick
987 c_sendIOManagerEvent io_MANAGER_WAKEUP
989 -- Walk the queue of pending delays, waking up any that have passed
990 -- and return the smallest delay to wait for. The queue of pending
991 -- delays is kept ordered.
992 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
993 getDelay _ [] = return ([], iNFINITE)
994 getDelay now all@(d : rest)
996 Delay time m | now >= time -> do
999 DelaySTM time t | now >= time -> do
1000 atomically $ writeTVar t True
1003 -- delay is in millisecs for WaitForSingleObject
1004 let micro_seconds = delayTime d - now
1005 milli_seconds = (micro_seconds + 999) `div` 1000
1006 in return (all, fromIntegral milli_seconds)
1008 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1009 -- available yet. We should move some Win32 functionality down here,
1010 -- maybe as part of the grand reorganisation of the base package...
1011 type HANDLE = Ptr ()
1015 iNFINITE = 0xFFFFFFFF -- urgh
1017 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1018 c_getIOManagerEvent :: IO HANDLE
1020 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1021 c_readIOManagerEvent :: IO Word32
1023 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1024 c_sendIOManagerEvent :: Word32 -> IO ()
1026 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1029 foreign import stdcall "WaitForSingleObject"
1030 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1033 -- ----------------------------------------------------------------------------
1034 -- Unix IO manager thread, using select()
1036 startIOManagerThread :: IO ()
1037 startIOManagerThread = do
1038 allocaArray 2 $ \fds -> do
1039 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1040 rd_end <- peekElemOff fds 0
1041 wr_end <- peekElemOff fds 1
1042 writeIORef stick (fromIntegral wr_end)
1043 c_setIOManagerPipe wr_end
1045 allocaBytes sizeofFdSet $ \readfds -> do
1046 allocaBytes sizeofFdSet $ \writefds -> do
1047 allocaBytes sizeofTimeVal $ \timeval -> do
1048 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1052 :: Fd -- listen to this for wakeup calls
1059 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1061 -- pick up new IO requests
1062 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1063 let reqs = new_reqs ++ old_reqs
1065 -- pick up new delay requests
1066 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1067 let delays0 = foldr insertDelay old_delays new_delays
1069 -- build the FDSets for select()
1072 fdSet wakeup readfds
1073 maxfd <- buildFdSets 0 readfds writefds reqs
1075 -- perform the select()
1076 let do_select delays = do
1077 -- check the current time and wake up any thread in
1078 -- threadDelay whose timeout has expired. Also find the
1079 -- timeout value for the select() call.
1081 (delays', timeout) <- getDelay now ptimeval delays
1083 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1089 _ | err == eINTR -> do_select delays'
1090 -- EINTR: just redo the select()
1091 _ | err == eBADF -> return (True, delays)
1092 -- EBADF: one of the file descriptors is closed or bad,
1093 -- we don't know which one, so wake everyone up.
1094 _ | otherwise -> throwErrno "select"
1095 -- otherwise (ENOMEM or EINVAL) something has gone
1096 -- wrong; report the error.
1098 return (False,delays')
1100 (wakeup_all,delays') <- do_select delays0
1103 if wakeup_all then return False
1105 b <- fdIsSet wakeup readfds
1108 else alloca $ \p -> do
1109 c_read (fromIntegral wakeup) p 1; return ()
1112 _ | s == io_MANAGER_WAKEUP -> return False
1113 _ | s == io_MANAGER_DIE -> return True
1114 _ -> withMVar signalHandlerLock $ \_ -> do
1115 handler_tbl <- peek handlers
1116 sp <- peekElemOff handler_tbl (fromIntegral s)
1117 io <- deRefStablePtr sp
1121 if exit then return () else do
1123 atomicModifyIORef prodding (\_ -> (False,False))
1125 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1126 else completeRequests reqs readfds writefds []
1128 service_loop wakeup readfds writefds ptimeval reqs' delays'
1130 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1131 io_MANAGER_WAKEUP = 0xff
1132 io_MANAGER_DIE = 0xfe
1135 {-# NOINLINE stick #-}
1136 stick = unsafePerformIO (newIORef 0)
1138 wakeupIOManager :: IO ()
1139 wakeupIOManager = do
1140 fd <- readIORef stick
1141 with io_MANAGER_WAKEUP $ \pbuf -> do
1142 c_write (fromIntegral fd) pbuf 1; return ()
1144 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1145 -- this race condition is #1922, although that bug was on Windows a similar
1146 -- bug also exists on Unix.
1147 signalHandlerLock :: MVar ()
1148 signalHandlerLock = unsafePerformIO (newMVar ())
1150 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1152 foreign import ccall "setIOManagerPipe"
1153 c_setIOManagerPipe :: CInt -> IO ()
1155 -- -----------------------------------------------------------------------------
1158 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1159 buildFdSets maxfd _ _ [] = return maxfd
1160 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1161 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1164 buildFdSets (max maxfd fd) readfds writefds reqs
1165 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1166 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1169 buildFdSets (max maxfd fd) readfds writefds reqs
1171 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1173 completeRequests [] _ _ reqs' = return reqs'
1174 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1175 b <- fdIsSet fd readfds
1177 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1178 else completeRequests reqs readfds writefds (Read fd m : reqs')
1179 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1180 b <- fdIsSet fd writefds
1182 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1183 else completeRequests reqs readfds writefds (Write fd m : reqs')
1185 wakeupAll :: [IOReq] -> IO ()
1186 wakeupAll [] = return ()
1187 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1188 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1190 waitForReadEvent :: Fd -> IO ()
1191 waitForReadEvent fd = do
1193 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1197 waitForWriteEvent :: Fd -> IO ()
1198 waitForWriteEvent fd = do
1200 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1204 -- -----------------------------------------------------------------------------
1207 -- Walk the queue of pending delays, waking up any that have passed
1208 -- and return the smallest delay to wait for. The queue of pending
1209 -- delays is kept ordered.
1210 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1211 getDelay _ _ [] = return ([],nullPtr)
1212 getDelay now ptimeval all@(d : rest)
1214 Delay time m | now >= time -> do
1216 getDelay now ptimeval rest
1217 DelaySTM time t | now >= time -> do
1218 atomically $ writeTVar t True
1219 getDelay now ptimeval rest
1221 setTimevalTicks ptimeval (delayTime d - now)
1222 return (all,ptimeval)
1226 foreign import ccall unsafe "sizeofTimeVal"
1227 sizeofTimeVal :: Int
1229 foreign import ccall unsafe "setTimevalTicks"
1230 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1233 On Win32 we're going to have a single Pipe, and a
1234 waitForSingleObject with the delay time. For signals, we send a
1235 byte down the pipe just like on Unix.
1238 -- ----------------------------------------------------------------------------
1239 -- select() interface
1241 -- ToDo: move to System.Posix.Internals?
1245 foreign import ccall safe "select"
1246 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1249 foreign import ccall unsafe "hsFD_SETSIZE"
1250 c_fD_SETSIZE :: CInt
1253 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1255 foreign import ccall unsafe "hsFD_ISSET"
1256 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1258 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1259 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1261 foreign import ccall unsafe "hsFD_SET"
1262 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1264 fdSet :: Fd -> Ptr CFdSet -> IO ()
1265 fdSet (Fd fd) fdset = c_fdSet fd fdset
1267 foreign import ccall unsafe "hsFD_ZERO"
1268 fdZero :: Ptr CFdSet -> IO ()
1270 foreign import ccall unsafe "sizeof_fd_set"
1275 reportStackOverflow :: IO a
1276 reportStackOverflow = do callStackOverflowHook; return undefined
1278 reportError :: SomeException -> IO a
1280 handler <- getUncaughtExceptionHandler
1284 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1285 -- the unsafe below.
1286 foreign import ccall unsafe "stackOverflow"
1287 callStackOverflowHook :: IO ()
1289 {-# NOINLINE uncaughtExceptionHandler #-}
1290 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1291 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1293 defaultHandler :: SomeException -> IO ()
1294 defaultHandler se@(SomeException ex) = do
1295 (hFlush stdout) `catchAny` (\ _ -> return ())
1296 let msg = case cast ex of
1297 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1298 _ -> case cast ex of
1299 Just (ErrorCall s) -> s
1300 _ -> showsPrec 0 se ""
1301 withCString "%s" $ \cfmt ->
1302 withCString msg $ \cmsg ->
1303 errorBelch cfmt cmsg
1305 -- don't use errorBelch() directly, because we cannot call varargs functions
1307 foreign import ccall unsafe "HsBase.h errorBelch2"
1308 errorBelch :: CString -> CString -> IO ()
1310 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1311 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1313 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1314 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler