2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
41 , labelThread -- :: ThreadId -> String -> IO ()
43 , ThreadStatus(..), BlockReason(..)
44 , threadStatus -- :: ThreadId -> IO ThreadStatus
47 , threadDelay -- :: Int -> IO ()
48 , registerDelay -- :: Int -> IO (TVar Bool)
49 , threadWaitRead -- :: Int -> IO ()
50 , threadWaitWrite -- :: Int -> IO ()
54 , newMVar -- :: a -> IO (MVar a)
55 , newEmptyMVar -- :: IO (MVar a)
56 , takeMVar -- :: MVar a -> IO a
57 , putMVar -- :: MVar a -> a -> IO ()
58 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
59 , tryPutMVar -- :: MVar a -> a -> IO Bool
60 , isEmptyMVar -- :: MVar a -> IO Bool
61 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
65 , atomically -- :: STM a -> IO a
67 , orElse -- :: STM a -> STM a -> STM a
68 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
69 , alwaysSucceeds -- :: STM a -> STM ()
70 , always -- :: STM Bool -> STM ()
72 , newTVar -- :: a -> STM (TVar a)
73 , newTVarIO -- :: a -> STM (TVar a)
74 , readTVar -- :: TVar a -> STM a
75 , readTVarIO -- :: TVar a -> IO a
76 , writeTVar -- :: a -> TVar a -> STM ()
77 , unsafeIOToSTM -- :: IO a -> STM a
80 #ifdef mingw32_HOST_OS
81 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
86 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
89 #ifndef mingw32_HOST_OS
93 , ensureIOManagerIsRunning
95 #ifdef mingw32_HOST_OS
100 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
101 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
103 , reportError, reportStackOverflow
106 import System.Posix.Types
107 #ifndef mingw32_HOST_OS
108 import System.Posix.Internals
116 import {-# SOURCE #-} GHC.Handle
118 import GHC.Num ( Num(..) )
119 import GHC.Real ( fromIntegral )
120 #ifdef mingw32_HOST_OS
121 import GHC.Real ( div )
122 import GHC.Ptr ( plusPtr, FunPtr(..) )
124 #ifdef mingw32_HOST_OS
125 import GHC.Read ( Read )
126 import GHC.Enum ( Enum )
128 import GHC.Exception ( SomeException(..), throw )
129 import GHC.Pack ( packCString# )
130 import GHC.Ptr ( Ptr(..) )
132 import GHC.Show ( Show(..), showString )
136 infixr 0 `par`, `pseq`
139 %************************************************************************
141 \subsection{@ThreadId@, @par@, and @fork@}
143 %************************************************************************
146 data ThreadId = ThreadId ThreadId# deriving( Typeable )
147 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
148 -- But since ThreadId# is unlifted, the Weak type must use open
151 A 'ThreadId' is an abstract type representing a handle to a thread.
152 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
153 the 'Ord' instance implements an arbitrary total ordering over
154 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
155 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
156 useful when debugging or diagnosing the behaviour of a concurrent
159 /Note/: in GHC, if you have a 'ThreadId', you essentially have
160 a pointer to the thread itself. This means the thread itself can\'t be
161 garbage collected until you drop the 'ThreadId'.
162 This misfeature will hopefully be corrected at a later date.
164 /Note/: Hugs does not provide any operations on other threads;
165 it defines 'ThreadId' as a synonym for ().
168 instance Show ThreadId where
170 showString "ThreadId " .
171 showsPrec d (getThreadId (id2TSO t))
173 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
175 id2TSO :: ThreadId -> ThreadId#
176 id2TSO (ThreadId t) = t
178 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
181 cmpThread :: ThreadId -> ThreadId -> Ordering
183 case cmp_thread (id2TSO t1) (id2TSO t2) of
188 instance Eq ThreadId where
190 case t1 `cmpThread` t2 of
194 instance Ord ThreadId where
198 Sparks off a new thread to run the 'IO' computation passed as the
199 first argument, and returns the 'ThreadId' of the newly created
202 The new thread will be a lightweight thread; if you want to use a foreign
203 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
205 GHC note: the new thread inherits the /blocked/ state of the parent
206 (see 'Control.Exception.block').
208 The newly created thread has an exception handler that discards the
209 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
210 'ThreadKilled', and passes all other exceptions to the uncaught
211 exception handler (see 'setUncaughtExceptionHandler').
213 forkIO :: IO () -> IO ThreadId
214 forkIO action = IO $ \ s ->
215 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
217 action_plus = catchException action childHandler
220 Like 'forkIO', but lets you specify on which CPU the thread is
221 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
222 will stay on the same CPU for its entire lifetime (`forkIO` threads
223 can migrate between CPUs according to the scheduling policy).
224 `forkOnIO` is useful for overriding the scheduling policy when you
225 know in advance how best to distribute the threads.
227 The `Int` argument specifies the CPU number; it is interpreted modulo
228 'numCapabilities' (note that it actually specifies a capability number
229 rather than a CPU number, but to a first approximation the two are
232 forkOnIO :: Int -> IO () -> IO ThreadId
233 forkOnIO (I# cpu) action = IO $ \ s ->
234 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
236 action_plus = catchException action childHandler
238 -- | the value passed to the @+RTS -N@ flag. This is the number of
239 -- Haskell threads that can run truly simultaneously at any given
240 -- time, and is typically set to the number of physical CPU cores on
242 numCapabilities :: Int
243 numCapabilities = unsafePerformIO $ do
244 n <- peek n_capabilities
245 return (fromIntegral n)
247 #if defined(mingw32_HOST_OS) && defined(__PIC__)
248 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
250 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
252 childHandler :: SomeException -> IO ()
253 childHandler err = catchException (real_handler err) childHandler
255 real_handler :: SomeException -> IO ()
256 real_handler se@(SomeException ex) =
257 -- ignore thread GC and killThread exceptions:
259 Just BlockedOnDeadMVar -> return ()
261 Just BlockedIndefinitely -> return ()
263 Just ThreadKilled -> return ()
265 -- report all others:
266 Just StackOverflow -> reportStackOverflow
269 {- | 'killThread' terminates the given thread (GHC only).
270 Any work already done by the thread isn\'t
271 lost: the computation is suspended until required by another thread.
272 The memory used by the thread will be garbage collected if it isn\'t
273 referenced from anywhere. The 'killThread' function is defined in
276 > killThread tid = throwTo tid ThreadKilled
278 Killthread is a no-op if the target thread has already completed.
280 killThread :: ThreadId -> IO ()
281 killThread tid = throwTo tid ThreadKilled
283 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
285 'throwTo' does not return until the exception has been raised in the
287 The calling thread can thus be certain that the target
288 thread has received the exception. This is a useful property to know
289 when dealing with race conditions: eg. if there are two threads that
290 can kill each other, it is guaranteed that only one of the threads
291 will get to kill the other.
293 If the target thread is currently making a foreign call, then the
294 exception will not be raised (and hence 'throwTo' will not return)
295 until the call has completed. This is the case regardless of whether
296 the call is inside a 'block' or not.
298 Important note: the behaviour of 'throwTo' differs from that described in
299 the paper \"Asynchronous exceptions in Haskell\"
300 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
301 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
302 a more synchronous design in which 'throwTo' does not return until the exception
303 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
304 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
307 There is currently no guarantee that the exception delivered by 'throwTo' will be
308 delivered at the first possible opportunity. In particular, if a thread may
309 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
310 a pending 'throwTo'. This is arguably undesirable behaviour.
313 throwTo :: Exception e => ThreadId -> e -> IO ()
314 throwTo (ThreadId tid) ex = IO $ \ s ->
315 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
317 -- | Returns the 'ThreadId' of the calling thread (GHC only).
318 myThreadId :: IO ThreadId
319 myThreadId = IO $ \s ->
320 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
323 -- |The 'yield' action allows (forces, in a co-operative multitasking
324 -- implementation) a context-switch to any other currently runnable
325 -- threads (if any), and is occasionally useful when implementing
326 -- concurrency abstractions.
329 case (yield# s) of s1 -> (# s1, () #)
331 {- | 'labelThread' stores a string as identifier for this thread if
332 you built a RTS with debugging support. This identifier will be used in
333 the debugging output to make distinction of different threads easier
334 (otherwise you only have the thread state object\'s address in the heap).
336 Other applications like the graphical Concurrent Haskell Debugger
337 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
338 'labelThread' for their purposes as well.
341 labelThread :: ThreadId -> String -> IO ()
342 labelThread (ThreadId t) str = IO $ \ s ->
343 let ps = packCString# str
344 adr = byteArrayContents# ps in
345 case (labelThread# t adr s) of s1 -> (# s1, () #)
347 -- Nota Bene: 'pseq' used to be 'seq'
348 -- but 'seq' is now defined in PrelGHC
350 -- "pseq" is defined a bit weirdly (see below)
352 -- The reason for the strange "lazy" call is that
353 -- it fools the compiler into thinking that pseq and par are non-strict in
354 -- their second argument (even if it inlines pseq at the call site).
355 -- If it thinks pseq is strict in "y", then it often evaluates
356 -- "y" before "x", which is totally wrong.
360 pseq x y = x `seq` lazy y
364 par x y = case (par# x) of { _ -> lazy y }
369 -- ^blocked on on 'MVar'
371 -- ^blocked on a computation in progress by another thread
373 -- ^blocked in 'throwTo'
375 -- ^blocked in 'retry' in an STM transaction
376 | BlockedOnForeignCall
377 -- ^currently in a foreign call
379 -- ^blocked on some other resource. Without @-threaded@,
380 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
381 -- they show up as 'BlockedOnMVar'.
382 deriving (Eq,Ord,Show)
384 -- | The current status of a thread
387 -- ^the thread is currently runnable or running
389 -- ^the thread has finished
390 | ThreadBlocked BlockReason
391 -- ^the thread is blocked on some resource
393 -- ^the thread received an uncaught exception
394 deriving (Eq,Ord,Show)
396 threadStatus :: ThreadId -> IO ThreadStatus
397 threadStatus (ThreadId t) = IO $ \s ->
398 case threadStatus# t s of
399 (# s', stat #) -> (# s', mk_stat (I# stat) #)
401 -- NB. keep these in sync with includes/Constants.h
402 mk_stat 0 = ThreadRunning
403 mk_stat 1 = ThreadBlocked BlockedOnMVar
404 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
405 mk_stat 3 = ThreadBlocked BlockedOnException
406 mk_stat 7 = ThreadBlocked BlockedOnSTM
407 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
408 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
409 mk_stat 16 = ThreadFinished
410 mk_stat 17 = ThreadDied
411 mk_stat _ = ThreadBlocked BlockedOnOther
415 %************************************************************************
417 \subsection[stm]{Transactional heap operations}
419 %************************************************************************
421 TVars are shared memory locations which support atomic memory
425 -- |A monad supporting atomic memory transactions.
426 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
428 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
431 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
433 instance Functor STM where
434 fmap f x = x >>= (return . f)
436 instance Monad STM where
437 {-# INLINE return #-}
441 return x = returnSTM x
442 m >>= k = bindSTM m k
444 bindSTM :: STM a -> (a -> STM b) -> STM b
445 bindSTM (STM m) k = STM ( \s ->
447 (# new_s, a #) -> unSTM (k a) new_s
450 thenSTM :: STM a -> STM b -> STM b
451 thenSTM (STM m) k = STM ( \s ->
453 (# new_s, _ #) -> unSTM k new_s
456 returnSTM :: a -> STM a
457 returnSTM x = STM (\s -> (# s, x #))
459 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
460 -- dangerous thing to do.
462 -- * The STM implementation will often run transactions multiple
463 -- times, so you need to be prepared for this if your IO has any
466 -- * The STM implementation will abort transactions that are known to
467 -- be invalid and need to be restarted. This may happen in the middle
468 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
469 -- that need releasing (exception handlers are ignored when aborting
470 -- the transaction). That includes doing any IO using Handles, for
471 -- example. Getting this wrong will probably lead to random deadlocks.
473 -- * The transaction may have seen an inconsistent view of memory when
474 -- the IO runs. Invariants that you expect to be true throughout
475 -- your program may not be true inside a transaction, due to the
476 -- way transactions are implemented. Normally this wouldn't be visible
477 -- to the programmer, but using `unsafeIOToSTM` can expose it.
479 unsafeIOToSTM :: IO a -> STM a
480 unsafeIOToSTM (IO m) = STM m
482 -- |Perform a series of STM actions atomically.
484 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
485 -- Any attempt to do so will result in a runtime error. (Reason: allowing
486 -- this would effectively allow a transaction inside a transaction, depending
487 -- on exactly when the thunk is evaluated.)
489 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
490 -- and which allows top-level TVars to be allocated.
492 atomically :: STM a -> IO a
493 atomically (STM m) = IO (\s -> (atomically# m) s )
495 -- |Retry execution of the current memory transaction because it has seen
496 -- values in TVars which mean that it should not continue (e.g. the TVars
497 -- represent a shared buffer that is now empty). The implementation may
498 -- block the thread until one of the TVars that it has read from has been
499 -- udpated. (GHC only)
501 retry = STM $ \s# -> retry# s#
503 -- |Compose two alternative STM actions (GHC only). If the first action
504 -- completes without retrying then it forms the result of the orElse.
505 -- Otherwise, if the first action retries, then the second action is
506 -- tried in its place. If both actions retry then the orElse as a
508 orElse :: STM a -> STM a -> STM a
509 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
511 -- |Exception handling within STM actions.
512 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
513 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
515 -- | Low-level primitive on which always and alwaysSucceeds are built.
516 -- checkInv differs form these in that (i) the invariant is not
517 -- checked when checkInv is called, only at the end of this and
518 -- subsequent transcations, (ii) the invariant failure is indicated
519 -- by raising an exception.
520 checkInv :: STM a -> STM ()
521 checkInv (STM m) = STM (\s -> (check# m) s)
523 -- | alwaysSucceeds adds a new invariant that must be true when passed
524 -- to alwaysSucceeds, at the end of the current transaction, and at
525 -- the end of every subsequent transaction. If it fails at any
526 -- of those points then the transaction violating it is aborted
527 -- and the exception raised by the invariant is propagated.
528 alwaysSucceeds :: STM a -> STM ()
529 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
532 -- | always is a variant of alwaysSucceeds in which the invariant is
533 -- expressed as an STM Bool action that must return True. Returning
534 -- False or raising an exception are both treated as invariant failures.
535 always :: STM Bool -> STM ()
536 always i = alwaysSucceeds ( do v <- i
537 if (v) then return () else ( error "Transacional invariant violation" ) )
539 -- |Shared memory locations that support atomic memory transactions.
540 data TVar a = TVar (TVar# RealWorld a)
542 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
544 instance Eq (TVar a) where
545 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
547 -- |Create a new TVar holding a value supplied
548 newTVar :: a -> STM (TVar a)
549 newTVar val = STM $ \s1# ->
550 case newTVar# val s1# of
551 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
553 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
554 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
555 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
557 newTVarIO :: a -> IO (TVar a)
558 newTVarIO val = IO $ \s1# ->
559 case newTVar# val s1# of
560 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
562 -- |Return the current value stored in a TVar.
563 -- This is equivalent to
565 -- > readTVarIO = atomically . readTVar
567 -- but works much faster, because it doesn't perform a complete
568 -- transaction, it just reads the current value of the 'TVar'.
569 readTVarIO :: TVar a -> IO a
570 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
572 -- |Return the current value stored in a TVar
573 readTVar :: TVar a -> STM a
574 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
576 -- |Write the supplied value into a TVar
577 writeTVar :: TVar a -> a -> STM ()
578 writeTVar (TVar tvar#) val = STM $ \s1# ->
579 case writeTVar# tvar# val s1# of
584 %************************************************************************
586 \subsection[mvars]{M-Structures}
588 %************************************************************************
590 M-Vars are rendezvous points for concurrent threads. They begin
591 empty, and any attempt to read an empty M-Var blocks. When an M-Var
592 is written, a single blocked thread may be freed. Reading an M-Var
593 toggles its state from full back to empty. Therefore, any value
594 written to an M-Var may only be read once. Multiple reads and writes
595 are allowed, but there must be at least one read between any two
599 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
601 -- |Create an 'MVar' which is initially empty.
602 newEmptyMVar :: IO (MVar a)
603 newEmptyMVar = IO $ \ s# ->
605 (# s2#, svar# #) -> (# s2#, MVar svar# #)
607 -- |Create an 'MVar' which contains the supplied value.
608 newMVar :: a -> IO (MVar a)
610 newEmptyMVar >>= \ mvar ->
611 putMVar mvar value >>
614 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
615 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
616 -- the 'MVar' is left empty.
618 -- There are two further important properties of 'takeMVar':
620 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
621 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
622 -- only one thread will be woken up. The runtime guarantees that
623 -- the woken thread completes its 'takeMVar' operation.
625 -- * When multiple threads are blocked on an 'MVar', they are
626 -- woken up in FIFO order. This is useful for providing
627 -- fairness properties of abstractions built using 'MVar's.
629 takeMVar :: MVar a -> IO a
630 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
632 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
633 -- 'putMVar' will wait until it becomes empty.
635 -- There are two further important properties of 'putMVar':
637 -- * 'putMVar' is single-wakeup. That is, if there are multiple
638 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
639 -- only one thread will be woken up. The runtime guarantees that
640 -- the woken thread completes its 'putMVar' operation.
642 -- * When multiple threads are blocked on an 'MVar', they are
643 -- woken up in FIFO order. This is useful for providing
644 -- fairness properties of abstractions built using 'MVar's.
646 putMVar :: MVar a -> a -> IO ()
647 putMVar (MVar mvar#) x = IO $ \ s# ->
648 case putMVar# mvar# x s# of
651 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
652 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
653 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
654 -- the 'MVar' is left empty.
655 tryTakeMVar :: MVar a -> IO (Maybe a)
656 tryTakeMVar (MVar m) = IO $ \ s ->
657 case tryTakeMVar# m s of
658 (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty
659 (# s', _, a #) -> (# s', Just a #) -- MVar is full
661 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
662 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
663 -- it was successful, or 'False' otherwise.
664 tryPutMVar :: MVar a -> a -> IO Bool
665 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
666 case tryPutMVar# mvar# x s# of
667 (# s, 0# #) -> (# s, False #)
668 (# s, _ #) -> (# s, True #)
670 -- |Check whether a given 'MVar' is empty.
672 -- Notice that the boolean value returned is just a snapshot of
673 -- the state of the MVar. By the time you get to react on its result,
674 -- the MVar may have been filled (or emptied) - so be extremely
675 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
676 isEmptyMVar :: MVar a -> IO Bool
677 isEmptyMVar (MVar mv#) = IO $ \ s# ->
678 case isEmptyMVar# mv# s# of
679 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
681 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
682 -- "System.Mem.Weak" for more about finalizers.
683 addMVarFinalizer :: MVar a -> IO () -> IO ()
684 addMVarFinalizer (MVar m) finalizer =
685 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
687 withMVar :: MVar a -> (a -> IO b) -> IO b
691 b <- catchAny (unblock (io a))
692 (\e -> do putMVar m a; throw e)
698 %************************************************************************
700 \subsection{Thread waiting}
702 %************************************************************************
705 #ifdef mingw32_HOST_OS
707 -- Note: threadWaitRead and threadWaitWrite aren't really functional
708 -- on Win32, but left in there because lib code (still) uses them (the manner
709 -- in which they're used doesn't cause problems on a Win32 platform though.)
711 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
712 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
713 IO $ \s -> case asyncRead# fd isSock len buf s of
714 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
716 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
717 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
718 IO $ \s -> case asyncWrite# fd isSock len buf s of
719 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
721 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
722 asyncDoProc (FunPtr proc) (Ptr param) =
723 -- the 'length' value is ignored; simplifies implementation of
724 -- the async*# primops to have them all return the same result.
725 IO $ \s -> case asyncDoProc# proc param s of
726 (# s', _len#, err# #) -> (# s', I# err# #)
728 -- to aid the use of these primops by the IO Handle implementation,
729 -- provide the following convenience funs:
731 -- this better be a pinned byte array!
732 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
733 asyncReadBA fd isSock len off bufB =
734 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
736 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
737 asyncWriteBA fd isSock len off bufB =
738 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
742 -- -----------------------------------------------------------------------------
745 -- | Block the current thread until data is available to read on the
746 -- given file descriptor (GHC only).
747 threadWaitRead :: Fd -> IO ()
749 #ifndef mingw32_HOST_OS
750 | threaded = waitForReadEvent fd
752 | otherwise = IO $ \s ->
753 case fromIntegral fd of { I# fd# ->
754 case waitRead# fd# s of { s' -> (# s', () #)
757 -- | Block the current thread until data can be written to the
758 -- given file descriptor (GHC only).
759 threadWaitWrite :: Fd -> IO ()
761 #ifndef mingw32_HOST_OS
762 | threaded = waitForWriteEvent fd
764 | otherwise = IO $ \s ->
765 case fromIntegral fd of { I# fd# ->
766 case waitWrite# fd# s of { s' -> (# s', () #)
769 -- | Suspends the current thread for a given number of microseconds
772 -- There is no guarantee that the thread will be rescheduled promptly
773 -- when the delay has expired, but the thread will never continue to
774 -- run /earlier/ than specified.
776 threadDelay :: Int -> IO ()
778 | threaded = waitForDelayEvent time
779 | otherwise = IO $ \s ->
780 case fromIntegral time of { I# time# ->
781 case delay# time# s of { s' -> (# s', () #)
785 -- | Set the value of returned TVar to True after a given number of
786 -- microseconds. The caveats associated with threadDelay also apply.
788 registerDelay :: Int -> IO (TVar Bool)
790 | threaded = waitForDelayEventSTM usecs
791 | otherwise = error "registerDelay: requires -threaded"
793 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
795 waitForDelayEvent :: Int -> IO ()
796 waitForDelayEvent usecs = do
798 target <- calculateTarget usecs
799 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
803 -- Delays for use in STM
804 waitForDelayEventSTM :: Int -> IO (TVar Bool)
805 waitForDelayEventSTM usecs = do
806 t <- atomically $ newTVar False
807 target <- calculateTarget usecs
808 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
812 calculateTarget :: Int -> IO USecs
813 calculateTarget usecs = do
815 return $ now + (fromIntegral usecs)
818 -- ----------------------------------------------------------------------------
819 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
821 -- In the threaded RTS, we employ a single IO Manager thread to wait
822 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
823 -- and delays (threadDelay).
825 -- We can do this because in the threaded RTS the IO Manager can make
826 -- a non-blocking call to select(), so we don't have to do select() in
827 -- the scheduler as we have to in the non-threaded RTS. We get performance
828 -- benefits from doing it this way, because we only have to restart the select()
829 -- when a new request arrives, rather than doing one select() each time
830 -- around the scheduler loop. Furthermore, the scheduler can be simplified
831 -- by not having to check for completed IO requests.
833 -- Issues, possible problems:
835 -- - we might want bound threads to just do the blocking
836 -- operation rather than communicating with the IO manager
837 -- thread. This would prevent simgle-threaded programs which do
838 -- IO from requiring multiple OS threads. However, it would also
839 -- prevent bound threads waiting on IO from being killed or sent
842 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
843 -- I couldn't repeat this.
845 -- - How do we handle signal delivery in the multithreaded RTS?
847 -- - forkProcess will kill the IO manager thread. Let's just
848 -- hope we don't need to do any blocking IO between fork & exec.
850 #ifndef mingw32_HOST_OS
852 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
853 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
857 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
858 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
860 #ifndef mingw32_HOST_OS
861 pendingEvents :: IORef [IOReq]
863 pendingDelays :: IORef [DelayReq]
864 -- could use a strict list or array here
865 {-# NOINLINE pendingEvents #-}
866 {-# NOINLINE pendingDelays #-}
867 (pendingEvents,pendingDelays) = unsafePerformIO $ do
872 -- the first time we schedule an IO request, the service thread
873 -- will be created (cool, huh?)
875 ensureIOManagerIsRunning :: IO ()
876 ensureIOManagerIsRunning
877 | threaded = seq pendingEvents $ return ()
878 | otherwise = return ()
880 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
881 insertDelay d [] = [d]
882 insertDelay d1 ds@(d2 : rest)
883 | delayTime d1 <= delayTime d2 = d1 : ds
884 | otherwise = d2 : insertDelay d1 rest
886 delayTime :: DelayReq -> USecs
887 delayTime (Delay t _) = t
888 delayTime (DelaySTM t _) = t
892 -- XXX: move into GHC.IOBase from Data.IORef?
893 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
894 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
896 foreign import ccall unsafe "getUSecOfDay"
897 getUSecOfDay :: IO USecs
899 prodding :: IORef Bool
900 {-# NOINLINE prodding #-}
901 prodding = unsafePerformIO (newIORef False)
903 prodServiceThread :: IO ()
904 prodServiceThread = do
905 was_set <- atomicModifyIORef prodding (\a -> (True,a))
906 if (not (was_set)) then wakeupIOManager else return ()
908 #ifdef mingw32_HOST_OS
909 -- ----------------------------------------------------------------------------
910 -- Windows IO manager thread
912 startIOManagerThread :: IO ()
913 startIOManagerThread = do
914 wakeup <- c_getIOManagerEvent
915 forkIO $ service_loop wakeup []
918 service_loop :: HANDLE -- read end of pipe
919 -> [DelayReq] -- current delay requests
922 service_loop wakeup old_delays = do
923 -- pick up new delay requests
924 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
925 let delays = foldr insertDelay old_delays new_delays
928 (delays', timeout) <- getDelay now delays
930 r <- c_WaitForSingleObject wakeup timeout
932 0xffffffff -> do c_maperrno; throwErrno "service_loop"
934 r2 <- c_readIOManagerEvent
937 _ | r2 == io_MANAGER_WAKEUP -> return False
938 _ | r2 == io_MANAGER_DIE -> return True
939 0 -> return False -- spurious wakeup
940 _ -> do start_console_handler (r2 `shiftR` 1); return False
943 else service_cont wakeup delays'
945 _other -> service_cont wakeup delays' -- probably timeout
947 service_cont :: HANDLE -> [DelayReq] -> IO ()
948 service_cont wakeup delays = do
949 atomicModifyIORef prodding (\_ -> (False,False))
950 service_loop wakeup delays
952 -- must agree with rts/win32/ThrIOManager.c
953 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
954 io_MANAGER_WAKEUP = 0xffffffff
955 io_MANAGER_DIE = 0xfffffffe
961 -- these are sent to Services only.
964 deriving (Eq, Ord, Enum, Show, Read, Typeable)
966 start_console_handler :: Word32 -> IO ()
967 start_console_handler r =
968 case toWin32ConsoleEvent r of
969 Just x -> withMVar win32ConsoleHandler $ \handler -> do
974 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
975 toWin32ConsoleEvent ev =
977 0 {- CTRL_C_EVENT-} -> Just ControlC
978 1 {- CTRL_BREAK_EVENT-} -> Just Break
979 2 {- CTRL_CLOSE_EVENT-} -> Just Close
980 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
981 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
984 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
985 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
987 -- XXX Is this actually needed?
988 stick :: IORef HANDLE
989 {-# NOINLINE stick #-}
990 stick = unsafePerformIO (newIORef nullPtr)
992 wakeupIOManager :: IO ()
994 _hdl <- readIORef stick
995 c_sendIOManagerEvent io_MANAGER_WAKEUP
997 -- Walk the queue of pending delays, waking up any that have passed
998 -- and return the smallest delay to wait for. The queue of pending
999 -- delays is kept ordered.
1000 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1001 getDelay _ [] = return ([], iNFINITE)
1002 getDelay now all@(d : rest)
1004 Delay time m | now >= time -> do
1007 DelaySTM time t | now >= time -> do
1008 atomically $ writeTVar t True
1011 -- delay is in millisecs for WaitForSingleObject
1012 let micro_seconds = delayTime d - now
1013 milli_seconds = (micro_seconds + 999) `div` 1000
1014 in return (all, fromIntegral milli_seconds)
1016 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1017 -- available yet. We should move some Win32 functionality down here,
1018 -- maybe as part of the grand reorganisation of the base package...
1019 type HANDLE = Ptr ()
1023 iNFINITE = 0xFFFFFFFF -- urgh
1025 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1026 c_getIOManagerEvent :: IO HANDLE
1028 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1029 c_readIOManagerEvent :: IO Word32
1031 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1032 c_sendIOManagerEvent :: Word32 -> IO ()
1034 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1037 foreign import stdcall "WaitForSingleObject"
1038 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1041 -- ----------------------------------------------------------------------------
1042 -- Unix IO manager thread, using select()
1044 startIOManagerThread :: IO ()
1045 startIOManagerThread = do
1046 allocaArray 2 $ \fds -> do
1047 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1048 rd_end <- peekElemOff fds 0
1049 wr_end <- peekElemOff fds 1
1050 writeIORef stick (fromIntegral wr_end)
1051 c_setIOManagerPipe wr_end
1053 allocaBytes sizeofFdSet $ \readfds -> do
1054 allocaBytes sizeofFdSet $ \writefds -> do
1055 allocaBytes sizeofTimeVal $ \timeval -> do
1056 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1060 :: Fd -- listen to this for wakeup calls
1067 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1069 -- pick up new IO requests
1070 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1071 let reqs = new_reqs ++ old_reqs
1073 -- pick up new delay requests
1074 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1075 let delays0 = foldr insertDelay old_delays new_delays
1077 -- build the FDSets for select()
1080 fdSet wakeup readfds
1081 maxfd <- buildFdSets 0 readfds writefds reqs
1083 -- perform the select()
1084 let do_select delays = do
1085 -- check the current time and wake up any thread in
1086 -- threadDelay whose timeout has expired. Also find the
1087 -- timeout value for the select() call.
1089 (delays', timeout) <- getDelay now ptimeval delays
1091 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1097 _ | err == eINTR -> do_select delays'
1098 -- EINTR: just redo the select()
1099 _ | err == eBADF -> return (True, delays)
1100 -- EBADF: one of the file descriptors is closed or bad,
1101 -- we don't know which one, so wake everyone up.
1102 _ | otherwise -> throwErrno "select"
1103 -- otherwise (ENOMEM or EINVAL) something has gone
1104 -- wrong; report the error.
1106 return (False,delays')
1108 (wakeup_all,delays') <- do_select delays0
1111 if wakeup_all then return False
1113 b <- fdIsSet wakeup readfds
1116 else alloca $ \p -> do
1117 c_read (fromIntegral wakeup) p 1; return ()
1120 _ | s == io_MANAGER_WAKEUP -> return False
1121 _ | s == io_MANAGER_DIE -> return True
1122 _ -> withMVar signalHandlerLock $ \_ -> do
1123 handler_tbl <- peek handlers
1124 sp <- peekElemOff handler_tbl (fromIntegral s)
1125 io <- deRefStablePtr sp
1129 if exit then return () else do
1131 atomicModifyIORef prodding (\_ -> (False,False))
1133 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1134 else completeRequests reqs readfds writefds []
1136 service_loop wakeup readfds writefds ptimeval reqs' delays'
1138 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1139 io_MANAGER_WAKEUP = 0xff
1140 io_MANAGER_DIE = 0xfe
1143 {-# NOINLINE stick #-}
1144 stick = unsafePerformIO (newIORef 0)
1146 wakeupIOManager :: IO ()
1147 wakeupIOManager = do
1148 fd <- readIORef stick
1149 with io_MANAGER_WAKEUP $ \pbuf -> do
1150 c_write (fromIntegral fd) pbuf 1; return ()
1152 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1153 -- this race condition is #1922, although that bug was on Windows a similar
1154 -- bug also exists on Unix.
1155 signalHandlerLock :: MVar ()
1156 signalHandlerLock = unsafePerformIO (newMVar ())
1158 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1160 foreign import ccall "setIOManagerPipe"
1161 c_setIOManagerPipe :: CInt -> IO ()
1163 -- -----------------------------------------------------------------------------
1166 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1167 buildFdSets maxfd _ _ [] = return maxfd
1168 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1169 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1172 buildFdSets (max maxfd fd) readfds writefds reqs
1173 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1174 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1177 buildFdSets (max maxfd fd) readfds writefds reqs
1179 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1181 completeRequests [] _ _ reqs' = return reqs'
1182 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1183 b <- fdIsSet fd readfds
1185 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1186 else completeRequests reqs readfds writefds (Read fd m : reqs')
1187 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1188 b <- fdIsSet fd writefds
1190 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1191 else completeRequests reqs readfds writefds (Write fd m : reqs')
1193 wakeupAll :: [IOReq] -> IO ()
1194 wakeupAll [] = return ()
1195 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1196 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1198 waitForReadEvent :: Fd -> IO ()
1199 waitForReadEvent fd = do
1201 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1205 waitForWriteEvent :: Fd -> IO ()
1206 waitForWriteEvent fd = do
1208 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1212 -- -----------------------------------------------------------------------------
1215 -- Walk the queue of pending delays, waking up any that have passed
1216 -- and return the smallest delay to wait for. The queue of pending
1217 -- delays is kept ordered.
1218 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1219 getDelay _ _ [] = return ([],nullPtr)
1220 getDelay now ptimeval all@(d : rest)
1222 Delay time m | now >= time -> do
1224 getDelay now ptimeval rest
1225 DelaySTM time t | now >= time -> do
1226 atomically $ writeTVar t True
1227 getDelay now ptimeval rest
1229 setTimevalTicks ptimeval (delayTime d - now)
1230 return (all,ptimeval)
1234 foreign import ccall unsafe "sizeofTimeVal"
1235 sizeofTimeVal :: Int
1237 foreign import ccall unsafe "setTimevalTicks"
1238 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1241 On Win32 we're going to have a single Pipe, and a
1242 waitForSingleObject with the delay time. For signals, we send a
1243 byte down the pipe just like on Unix.
1246 -- ----------------------------------------------------------------------------
1247 -- select() interface
1249 -- ToDo: move to System.Posix.Internals?
1253 foreign import ccall safe "select"
1254 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1257 foreign import ccall unsafe "hsFD_SETSIZE"
1258 c_fD_SETSIZE :: CInt
1261 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1263 foreign import ccall unsafe "hsFD_ISSET"
1264 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1266 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1267 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1269 foreign import ccall unsafe "hsFD_SET"
1270 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1272 fdSet :: Fd -> Ptr CFdSet -> IO ()
1273 fdSet (Fd fd) fdset = c_fdSet fd fdset
1275 foreign import ccall unsafe "hsFD_ZERO"
1276 fdZero :: Ptr CFdSet -> IO ()
1278 foreign import ccall unsafe "sizeof_fd_set"
1283 reportStackOverflow :: IO a
1284 reportStackOverflow = do callStackOverflowHook; return undefined
1286 reportError :: SomeException -> IO a
1288 handler <- getUncaughtExceptionHandler
1292 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1293 -- the unsafe below.
1294 foreign import ccall unsafe "stackOverflow"
1295 callStackOverflowHook :: IO ()
1297 {-# NOINLINE uncaughtExceptionHandler #-}
1298 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1299 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1301 defaultHandler :: SomeException -> IO ()
1302 defaultHandler se@(SomeException ex) = do
1303 (hFlush stdout) `catchAny` (\ _ -> return ())
1304 let msg = case cast ex of
1305 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1306 _ -> case cast ex of
1307 Just (ErrorCall s) -> s
1308 _ -> showsPrec 0 se ""
1309 withCString "%s" $ \cfmt ->
1310 withCString msg $ \cmsg ->
1311 errorBelch cfmt cmsg
1313 -- don't use errorBelch() directly, because we cannot call varargs functions
1315 foreign import ccall unsafe "HsBase.h errorBelch2"
1316 errorBelch :: CString -> CString -> IO ()
1318 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1319 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1321 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1322 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler