2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , newMVar -- :: a -> IO (MVar a)
56 , newEmptyMVar -- :: IO (MVar a)
57 , takeMVar -- :: MVar a -> IO a
58 , putMVar -- :: MVar a -> a -> IO ()
59 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
60 , tryPutMVar -- :: MVar a -> a -> IO Bool
61 , isEmptyMVar -- :: MVar a -> IO Bool
62 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
66 , atomically -- :: STM a -> IO a
68 , orElse -- :: STM a -> STM a -> STM a
69 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
70 , alwaysSucceeds -- :: STM a -> STM ()
71 , always -- :: STM Bool -> STM ()
73 , newTVar -- :: a -> STM (TVar a)
74 , newTVarIO -- :: a -> STM (TVar a)
75 , readTVar -- :: TVar a -> STM a
76 , readTVarIO -- :: TVar a -> IO a
77 , writeTVar -- :: a -> TVar a -> STM ()
78 , unsafeIOToSTM -- :: IO a -> STM a
81 #ifdef mingw32_HOST_OS
82 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
84 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
86 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
90 #ifndef mingw32_HOST_OS
94 , ensureIOManagerIsRunning
96 #ifdef mingw32_HOST_OS
101 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
102 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
104 , reportError, reportStackOverflow
107 import System.Posix.Types
108 #ifndef mingw32_HOST_OS
109 import System.Posix.Internals
117 import {-# SOURCE #-} GHC.Handle
119 import GHC.Num ( Num(..) )
120 import GHC.Real ( fromIntegral )
121 #ifdef mingw32_HOST_OS
122 import GHC.Real ( div )
123 import GHC.Ptr ( plusPtr, FunPtr(..) )
125 #ifdef mingw32_HOST_OS
126 import GHC.Read ( Read )
127 import GHC.Enum ( Enum )
129 import GHC.Exception ( SomeException(..), throw )
130 import GHC.Pack ( packCString# )
131 import GHC.Ptr ( Ptr(..) )
133 import GHC.Show ( Show(..), showString )
137 infixr 0 `par`, `pseq`
140 %************************************************************************
142 \subsection{@ThreadId@, @par@, and @fork@}
144 %************************************************************************
147 data ThreadId = ThreadId ThreadId# deriving( Typeable )
148 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
149 -- But since ThreadId# is unlifted, the Weak type must use open
152 A 'ThreadId' is an abstract type representing a handle to a thread.
153 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
154 the 'Ord' instance implements an arbitrary total ordering over
155 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
156 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
157 useful when debugging or diagnosing the behaviour of a concurrent
160 /Note/: in GHC, if you have a 'ThreadId', you essentially have
161 a pointer to the thread itself. This means the thread itself can\'t be
162 garbage collected until you drop the 'ThreadId'.
163 This misfeature will hopefully be corrected at a later date.
165 /Note/: Hugs does not provide any operations on other threads;
166 it defines 'ThreadId' as a synonym for ().
169 instance Show ThreadId where
171 showString "ThreadId " .
172 showsPrec d (getThreadId (id2TSO t))
174 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
176 id2TSO :: ThreadId -> ThreadId#
177 id2TSO (ThreadId t) = t
179 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
182 cmpThread :: ThreadId -> ThreadId -> Ordering
184 case cmp_thread (id2TSO t1) (id2TSO t2) of
189 instance Eq ThreadId where
191 case t1 `cmpThread` t2 of
195 instance Ord ThreadId where
199 Sparks off a new thread to run the 'IO' computation passed as the
200 first argument, and returns the 'ThreadId' of the newly created
203 The new thread will be a lightweight thread; if you want to use a foreign
204 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
206 GHC note: the new thread inherits the /blocked/ state of the parent
207 (see 'Control.Exception.block').
209 The newly created thread has an exception handler that discards the
210 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
211 'ThreadKilled', and passes all other exceptions to the uncaught
212 exception handler (see 'setUncaughtExceptionHandler').
214 forkIO :: IO () -> IO ThreadId
215 forkIO action = IO $ \ s ->
216 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
218 action_plus = catchException action childHandler
221 Like 'forkIO', but lets you specify on which CPU the thread is
222 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
223 will stay on the same CPU for its entire lifetime (`forkIO` threads
224 can migrate between CPUs according to the scheduling policy).
225 `forkOnIO` is useful for overriding the scheduling policy when you
226 know in advance how best to distribute the threads.
228 The `Int` argument specifies the CPU number; it is interpreted modulo
229 'numCapabilities' (note that it actually specifies a capability number
230 rather than a CPU number, but to a first approximation the two are
233 forkOnIO :: Int -> IO () -> IO ThreadId
234 forkOnIO (I# cpu) action = IO $ \ s ->
235 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
237 action_plus = catchException action childHandler
239 -- | the value passed to the @+RTS -N@ flag. This is the number of
240 -- Haskell threads that can run truly simultaneously at any given
241 -- time, and is typically set to the number of physical CPU cores on
243 numCapabilities :: Int
244 numCapabilities = unsafePerformIO $ do
245 n <- peek n_capabilities
246 return (fromIntegral n)
248 #if defined(mingw32_HOST_OS) && defined(__PIC__)
249 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
251 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
253 childHandler :: SomeException -> IO ()
254 childHandler err = catchException (real_handler err) childHandler
256 real_handler :: SomeException -> IO ()
257 real_handler se@(SomeException ex) =
258 -- ignore thread GC and killThread exceptions:
260 Just BlockedOnDeadMVar -> return ()
262 Just BlockedIndefinitely -> return ()
264 Just ThreadKilled -> return ()
266 -- report all others:
267 Just StackOverflow -> reportStackOverflow
270 {- | 'killThread' terminates the given thread (GHC only).
271 Any work already done by the thread isn\'t
272 lost: the computation is suspended until required by another thread.
273 The memory used by the thread will be garbage collected if it isn\'t
274 referenced from anywhere. The 'killThread' function is defined in
277 > killThread tid = throwTo tid ThreadKilled
279 Killthread is a no-op if the target thread has already completed.
281 killThread :: ThreadId -> IO ()
282 killThread tid = throwTo tid ThreadKilled
284 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
286 'throwTo' does not return until the exception has been raised in the
288 The calling thread can thus be certain that the target
289 thread has received the exception. This is a useful property to know
290 when dealing with race conditions: eg. if there are two threads that
291 can kill each other, it is guaranteed that only one of the threads
292 will get to kill the other.
294 If the target thread is currently making a foreign call, then the
295 exception will not be raised (and hence 'throwTo' will not return)
296 until the call has completed. This is the case regardless of whether
297 the call is inside a 'block' or not.
299 Important note: the behaviour of 'throwTo' differs from that described in
300 the paper \"Asynchronous exceptions in Haskell\"
301 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
302 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
303 a more synchronous design in which 'throwTo' does not return until the exception
304 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
305 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
308 There is currently no guarantee that the exception delivered by 'throwTo' will be
309 delivered at the first possible opportunity. In particular, if a thread may
310 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
311 a pending 'throwTo'. This is arguably undesirable behaviour.
314 throwTo :: Exception e => ThreadId -> e -> IO ()
315 throwTo (ThreadId tid) ex = IO $ \ s ->
316 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
318 -- | Returns the 'ThreadId' of the calling thread (GHC only).
319 myThreadId :: IO ThreadId
320 myThreadId = IO $ \s ->
321 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
324 -- |The 'yield' action allows (forces, in a co-operative multitasking
325 -- implementation) a context-switch to any other currently runnable
326 -- threads (if any), and is occasionally useful when implementing
327 -- concurrency abstractions.
330 case (yield# s) of s1 -> (# s1, () #)
332 {- | 'labelThread' stores a string as identifier for this thread if
333 you built a RTS with debugging support. This identifier will be used in
334 the debugging output to make distinction of different threads easier
335 (otherwise you only have the thread state object\'s address in the heap).
337 Other applications like the graphical Concurrent Haskell Debugger
338 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
339 'labelThread' for their purposes as well.
342 labelThread :: ThreadId -> String -> IO ()
343 labelThread (ThreadId t) str = IO $ \ s ->
344 let ps = packCString# str
345 adr = byteArrayContents# ps in
346 case (labelThread# t adr s) of s1 -> (# s1, () #)
348 -- Nota Bene: 'pseq' used to be 'seq'
349 -- but 'seq' is now defined in PrelGHC
351 -- "pseq" is defined a bit weirdly (see below)
353 -- The reason for the strange "lazy" call is that
354 -- it fools the compiler into thinking that pseq and par are non-strict in
355 -- their second argument (even if it inlines pseq at the call site).
356 -- If it thinks pseq is strict in "y", then it often evaluates
357 -- "y" before "x", which is totally wrong.
361 pseq x y = x `seq` lazy y
365 par x y = case (par# x) of { _ -> lazy y }
367 -- | Internal function used by the RTS to run sparks.
370 where loop s = case getSpark# s of
372 if n ==# 0# then (# s', () #)
377 -- ^blocked on on 'MVar'
379 -- ^blocked on a computation in progress by another thread
381 -- ^blocked in 'throwTo'
383 -- ^blocked in 'retry' in an STM transaction
384 | BlockedOnForeignCall
385 -- ^currently in a foreign call
387 -- ^blocked on some other resource. Without @-threaded@,
388 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
389 -- they show up as 'BlockedOnMVar'.
390 deriving (Eq,Ord,Show)
392 -- | The current status of a thread
395 -- ^the thread is currently runnable or running
397 -- ^the thread has finished
398 | ThreadBlocked BlockReason
399 -- ^the thread is blocked on some resource
401 -- ^the thread received an uncaught exception
402 deriving (Eq,Ord,Show)
404 threadStatus :: ThreadId -> IO ThreadStatus
405 threadStatus (ThreadId t) = IO $ \s ->
406 case threadStatus# t s of
407 (# s', stat #) -> (# s', mk_stat (I# stat) #)
409 -- NB. keep these in sync with includes/Constants.h
410 mk_stat 0 = ThreadRunning
411 mk_stat 1 = ThreadBlocked BlockedOnMVar
412 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
413 mk_stat 3 = ThreadBlocked BlockedOnException
414 mk_stat 7 = ThreadBlocked BlockedOnSTM
415 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
416 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
417 mk_stat 16 = ThreadFinished
418 mk_stat 17 = ThreadDied
419 mk_stat _ = ThreadBlocked BlockedOnOther
423 %************************************************************************
425 \subsection[stm]{Transactional heap operations}
427 %************************************************************************
429 TVars are shared memory locations which support atomic memory
433 -- |A monad supporting atomic memory transactions.
434 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
436 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
439 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
441 instance Functor STM where
442 fmap f x = x >>= (return . f)
444 instance Monad STM where
445 {-# INLINE return #-}
449 return x = returnSTM x
450 m >>= k = bindSTM m k
452 bindSTM :: STM a -> (a -> STM b) -> STM b
453 bindSTM (STM m) k = STM ( \s ->
455 (# new_s, a #) -> unSTM (k a) new_s
458 thenSTM :: STM a -> STM b -> STM b
459 thenSTM (STM m) k = STM ( \s ->
461 (# new_s, _ #) -> unSTM k new_s
464 returnSTM :: a -> STM a
465 returnSTM x = STM (\s -> (# s, x #))
467 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
468 -- dangerous thing to do.
470 -- * The STM implementation will often run transactions multiple
471 -- times, so you need to be prepared for this if your IO has any
474 -- * The STM implementation will abort transactions that are known to
475 -- be invalid and need to be restarted. This may happen in the middle
476 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
477 -- that need releasing (exception handlers are ignored when aborting
478 -- the transaction). That includes doing any IO using Handles, for
479 -- example. Getting this wrong will probably lead to random deadlocks.
481 -- * The transaction may have seen an inconsistent view of memory when
482 -- the IO runs. Invariants that you expect to be true throughout
483 -- your program may not be true inside a transaction, due to the
484 -- way transactions are implemented. Normally this wouldn't be visible
485 -- to the programmer, but using `unsafeIOToSTM` can expose it.
487 unsafeIOToSTM :: IO a -> STM a
488 unsafeIOToSTM (IO m) = STM m
490 -- |Perform a series of STM actions atomically.
492 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
493 -- Any attempt to do so will result in a runtime error. (Reason: allowing
494 -- this would effectively allow a transaction inside a transaction, depending
495 -- on exactly when the thunk is evaluated.)
497 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
498 -- and which allows top-level TVars to be allocated.
500 atomically :: STM a -> IO a
501 atomically (STM m) = IO (\s -> (atomically# m) s )
503 -- |Retry execution of the current memory transaction because it has seen
504 -- values in TVars which mean that it should not continue (e.g. the TVars
505 -- represent a shared buffer that is now empty). The implementation may
506 -- block the thread until one of the TVars that it has read from has been
507 -- udpated. (GHC only)
509 retry = STM $ \s# -> retry# s#
511 -- |Compose two alternative STM actions (GHC only). If the first action
512 -- completes without retrying then it forms the result of the orElse.
513 -- Otherwise, if the first action retries, then the second action is
514 -- tried in its place. If both actions retry then the orElse as a
516 orElse :: STM a -> STM a -> STM a
517 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
519 -- |Exception handling within STM actions.
520 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
521 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
523 -- | Low-level primitive on which always and alwaysSucceeds are built.
524 -- checkInv differs form these in that (i) the invariant is not
525 -- checked when checkInv is called, only at the end of this and
526 -- subsequent transcations, (ii) the invariant failure is indicated
527 -- by raising an exception.
528 checkInv :: STM a -> STM ()
529 checkInv (STM m) = STM (\s -> (check# m) s)
531 -- | alwaysSucceeds adds a new invariant that must be true when passed
532 -- to alwaysSucceeds, at the end of the current transaction, and at
533 -- the end of every subsequent transaction. If it fails at any
534 -- of those points then the transaction violating it is aborted
535 -- and the exception raised by the invariant is propagated.
536 alwaysSucceeds :: STM a -> STM ()
537 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
540 -- | always is a variant of alwaysSucceeds in which the invariant is
541 -- expressed as an STM Bool action that must return True. Returning
542 -- False or raising an exception are both treated as invariant failures.
543 always :: STM Bool -> STM ()
544 always i = alwaysSucceeds ( do v <- i
545 if (v) then return () else ( error "Transacional invariant violation" ) )
547 -- |Shared memory locations that support atomic memory transactions.
548 data TVar a = TVar (TVar# RealWorld a)
550 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
552 instance Eq (TVar a) where
553 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
555 -- |Create a new TVar holding a value supplied
556 newTVar :: a -> STM (TVar a)
557 newTVar val = STM $ \s1# ->
558 case newTVar# val s1# of
559 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
561 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
562 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
563 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
565 newTVarIO :: a -> IO (TVar a)
566 newTVarIO val = IO $ \s1# ->
567 case newTVar# val s1# of
568 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
570 -- |Return the current value stored in a TVar.
571 -- This is equivalent to
573 -- > readTVarIO = atomically . readTVar
575 -- but works much faster, because it doesn't perform a complete
576 -- transaction, it just reads the current value of the 'TVar'.
577 readTVarIO :: TVar a -> IO a
578 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
580 -- |Return the current value stored in a TVar
581 readTVar :: TVar a -> STM a
582 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
584 -- |Write the supplied value into a TVar
585 writeTVar :: TVar a -> a -> STM ()
586 writeTVar (TVar tvar#) val = STM $ \s1# ->
587 case writeTVar# tvar# val s1# of
592 %************************************************************************
594 \subsection[mvars]{M-Structures}
596 %************************************************************************
598 M-Vars are rendezvous points for concurrent threads. They begin
599 empty, and any attempt to read an empty M-Var blocks. When an M-Var
600 is written, a single blocked thread may be freed. Reading an M-Var
601 toggles its state from full back to empty. Therefore, any value
602 written to an M-Var may only be read once. Multiple reads and writes
603 are allowed, but there must be at least one read between any two
607 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
609 -- |Create an 'MVar' which is initially empty.
610 newEmptyMVar :: IO (MVar a)
611 newEmptyMVar = IO $ \ s# ->
613 (# s2#, svar# #) -> (# s2#, MVar svar# #)
615 -- |Create an 'MVar' which contains the supplied value.
616 newMVar :: a -> IO (MVar a)
618 newEmptyMVar >>= \ mvar ->
619 putMVar mvar value >>
622 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
623 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
624 -- the 'MVar' is left empty.
626 -- There are two further important properties of 'takeMVar':
628 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
629 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
630 -- only one thread will be woken up. The runtime guarantees that
631 -- the woken thread completes its 'takeMVar' operation.
633 -- * When multiple threads are blocked on an 'MVar', they are
634 -- woken up in FIFO order. This is useful for providing
635 -- fairness properties of abstractions built using 'MVar's.
637 takeMVar :: MVar a -> IO a
638 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
640 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
641 -- 'putMVar' will wait until it becomes empty.
643 -- There are two further important properties of 'putMVar':
645 -- * 'putMVar' is single-wakeup. That is, if there are multiple
646 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
647 -- only one thread will be woken up. The runtime guarantees that
648 -- the woken thread completes its 'putMVar' operation.
650 -- * When multiple threads are blocked on an 'MVar', they are
651 -- woken up in FIFO order. This is useful for providing
652 -- fairness properties of abstractions built using 'MVar's.
654 putMVar :: MVar a -> a -> IO ()
655 putMVar (MVar mvar#) x = IO $ \ s# ->
656 case putMVar# mvar# x s# of
659 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
660 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
661 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
662 -- the 'MVar' is left empty.
663 tryTakeMVar :: MVar a -> IO (Maybe a)
664 tryTakeMVar (MVar m) = IO $ \ s ->
665 case tryTakeMVar# m s of
666 (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty
667 (# s', _, a #) -> (# s', Just a #) -- MVar is full
669 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
670 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
671 -- it was successful, or 'False' otherwise.
672 tryPutMVar :: MVar a -> a -> IO Bool
673 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
674 case tryPutMVar# mvar# x s# of
675 (# s, 0# #) -> (# s, False #)
676 (# s, _ #) -> (# s, True #)
678 -- |Check whether a given 'MVar' is empty.
680 -- Notice that the boolean value returned is just a snapshot of
681 -- the state of the MVar. By the time you get to react on its result,
682 -- the MVar may have been filled (or emptied) - so be extremely
683 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
684 isEmptyMVar :: MVar a -> IO Bool
685 isEmptyMVar (MVar mv#) = IO $ \ s# ->
686 case isEmptyMVar# mv# s# of
687 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
689 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
690 -- "System.Mem.Weak" for more about finalizers.
691 addMVarFinalizer :: MVar a -> IO () -> IO ()
692 addMVarFinalizer (MVar m) finalizer =
693 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
695 withMVar :: MVar a -> (a -> IO b) -> IO b
699 b <- catchAny (unblock (io a))
700 (\e -> do putMVar m a; throw e)
706 %************************************************************************
708 \subsection{Thread waiting}
710 %************************************************************************
713 #ifdef mingw32_HOST_OS
715 -- Note: threadWaitRead and threadWaitWrite aren't really functional
716 -- on Win32, but left in there because lib code (still) uses them (the manner
717 -- in which they're used doesn't cause problems on a Win32 platform though.)
719 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
720 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
721 IO $ \s -> case asyncRead# fd isSock len buf s of
722 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
724 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
725 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
726 IO $ \s -> case asyncWrite# fd isSock len buf s of
727 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
729 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
730 asyncDoProc (FunPtr proc) (Ptr param) =
731 -- the 'length' value is ignored; simplifies implementation of
732 -- the async*# primops to have them all return the same result.
733 IO $ \s -> case asyncDoProc# proc param s of
734 (# s', _len#, err# #) -> (# s', I# err# #)
736 -- to aid the use of these primops by the IO Handle implementation,
737 -- provide the following convenience funs:
739 -- this better be a pinned byte array!
740 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
741 asyncReadBA fd isSock len off bufB =
742 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
744 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
745 asyncWriteBA fd isSock len off bufB =
746 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
750 -- -----------------------------------------------------------------------------
753 -- | Block the current thread until data is available to read on the
754 -- given file descriptor (GHC only).
755 threadWaitRead :: Fd -> IO ()
757 #ifndef mingw32_HOST_OS
758 | threaded = waitForReadEvent fd
760 | otherwise = IO $ \s ->
761 case fromIntegral fd of { I# fd# ->
762 case waitRead# fd# s of { s' -> (# s', () #)
765 -- | Block the current thread until data can be written to the
766 -- given file descriptor (GHC only).
767 threadWaitWrite :: Fd -> IO ()
769 #ifndef mingw32_HOST_OS
770 | threaded = waitForWriteEvent fd
772 | otherwise = IO $ \s ->
773 case fromIntegral fd of { I# fd# ->
774 case waitWrite# fd# s of { s' -> (# s', () #)
777 -- | Suspends the current thread for a given number of microseconds
780 -- There is no guarantee that the thread will be rescheduled promptly
781 -- when the delay has expired, but the thread will never continue to
782 -- run /earlier/ than specified.
784 threadDelay :: Int -> IO ()
786 | threaded = waitForDelayEvent time
787 | otherwise = IO $ \s ->
788 case fromIntegral time of { I# time# ->
789 case delay# time# s of { s' -> (# s', () #)
793 -- | Set the value of returned TVar to True after a given number of
794 -- microseconds. The caveats associated with threadDelay also apply.
796 registerDelay :: Int -> IO (TVar Bool)
798 | threaded = waitForDelayEventSTM usecs
799 | otherwise = error "registerDelay: requires -threaded"
801 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
803 waitForDelayEvent :: Int -> IO ()
804 waitForDelayEvent usecs = do
806 target <- calculateTarget usecs
807 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
811 -- Delays for use in STM
812 waitForDelayEventSTM :: Int -> IO (TVar Bool)
813 waitForDelayEventSTM usecs = do
814 t <- atomically $ newTVar False
815 target <- calculateTarget usecs
816 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
820 calculateTarget :: Int -> IO USecs
821 calculateTarget usecs = do
823 return $ now + (fromIntegral usecs)
826 -- ----------------------------------------------------------------------------
827 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
829 -- In the threaded RTS, we employ a single IO Manager thread to wait
830 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
831 -- and delays (threadDelay).
833 -- We can do this because in the threaded RTS the IO Manager can make
834 -- a non-blocking call to select(), so we don't have to do select() in
835 -- the scheduler as we have to in the non-threaded RTS. We get performance
836 -- benefits from doing it this way, because we only have to restart the select()
837 -- when a new request arrives, rather than doing one select() each time
838 -- around the scheduler loop. Furthermore, the scheduler can be simplified
839 -- by not having to check for completed IO requests.
841 -- Issues, possible problems:
843 -- - we might want bound threads to just do the blocking
844 -- operation rather than communicating with the IO manager
845 -- thread. This would prevent simgle-threaded programs which do
846 -- IO from requiring multiple OS threads. However, it would also
847 -- prevent bound threads waiting on IO from being killed or sent
850 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
851 -- I couldn't repeat this.
853 -- - How do we handle signal delivery in the multithreaded RTS?
855 -- - forkProcess will kill the IO manager thread. Let's just
856 -- hope we don't need to do any blocking IO between fork & exec.
858 #ifndef mingw32_HOST_OS
860 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
861 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
865 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
866 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
868 #ifndef mingw32_HOST_OS
869 pendingEvents :: IORef [IOReq]
871 pendingDelays :: IORef [DelayReq]
872 -- could use a strict list or array here
873 {-# NOINLINE pendingEvents #-}
874 {-# NOINLINE pendingDelays #-}
875 (pendingEvents,pendingDelays) = unsafePerformIO $ do
880 -- the first time we schedule an IO request, the service thread
881 -- will be created (cool, huh?)
883 ensureIOManagerIsRunning :: IO ()
884 ensureIOManagerIsRunning
885 | threaded = seq pendingEvents $ return ()
886 | otherwise = return ()
888 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
889 insertDelay d [] = [d]
890 insertDelay d1 ds@(d2 : rest)
891 | delayTime d1 <= delayTime d2 = d1 : ds
892 | otherwise = d2 : insertDelay d1 rest
894 delayTime :: DelayReq -> USecs
895 delayTime (Delay t _) = t
896 delayTime (DelaySTM t _) = t
900 -- XXX: move into GHC.IOBase from Data.IORef?
901 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
902 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
904 foreign import ccall unsafe "getUSecOfDay"
905 getUSecOfDay :: IO USecs
907 prodding :: IORef Bool
908 {-# NOINLINE prodding #-}
909 prodding = unsafePerformIO (newIORef False)
911 prodServiceThread :: IO ()
912 prodServiceThread = do
913 was_set <- atomicModifyIORef prodding (\a -> (True,a))
914 if (not (was_set)) then wakeupIOManager else return ()
916 #ifdef mingw32_HOST_OS
917 -- ----------------------------------------------------------------------------
918 -- Windows IO manager thread
920 startIOManagerThread :: IO ()
921 startIOManagerThread = do
922 wakeup <- c_getIOManagerEvent
923 forkIO $ service_loop wakeup []
926 service_loop :: HANDLE -- read end of pipe
927 -> [DelayReq] -- current delay requests
930 service_loop wakeup old_delays = do
931 -- pick up new delay requests
932 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
933 let delays = foldr insertDelay old_delays new_delays
936 (delays', timeout) <- getDelay now delays
938 r <- c_WaitForSingleObject wakeup timeout
940 0xffffffff -> do c_maperrno; throwErrno "service_loop"
942 r2 <- c_readIOManagerEvent
945 _ | r2 == io_MANAGER_WAKEUP -> return False
946 _ | r2 == io_MANAGER_DIE -> return True
947 0 -> return False -- spurious wakeup
948 _ -> do start_console_handler (r2 `shiftR` 1); return False
951 else service_cont wakeup delays'
953 _other -> service_cont wakeup delays' -- probably timeout
955 service_cont :: HANDLE -> [DelayReq] -> IO ()
956 service_cont wakeup delays = do
957 atomicModifyIORef prodding (\_ -> (False,False))
958 service_loop wakeup delays
960 -- must agree with rts/win32/ThrIOManager.c
961 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
962 io_MANAGER_WAKEUP = 0xffffffff
963 io_MANAGER_DIE = 0xfffffffe
969 -- these are sent to Services only.
972 deriving (Eq, Ord, Enum, Show, Read, Typeable)
974 start_console_handler :: Word32 -> IO ()
975 start_console_handler r =
976 case toWin32ConsoleEvent r of
977 Just x -> withMVar win32ConsoleHandler $ \handler -> do
982 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
983 toWin32ConsoleEvent ev =
985 0 {- CTRL_C_EVENT-} -> Just ControlC
986 1 {- CTRL_BREAK_EVENT-} -> Just Break
987 2 {- CTRL_CLOSE_EVENT-} -> Just Close
988 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
989 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
992 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
993 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
995 -- XXX Is this actually needed?
996 stick :: IORef HANDLE
997 {-# NOINLINE stick #-}
998 stick = unsafePerformIO (newIORef nullPtr)
1000 wakeupIOManager :: IO ()
1001 wakeupIOManager = do
1002 _hdl <- readIORef stick
1003 c_sendIOManagerEvent io_MANAGER_WAKEUP
1005 -- Walk the queue of pending delays, waking up any that have passed
1006 -- and return the smallest delay to wait for. The queue of pending
1007 -- delays is kept ordered.
1008 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1009 getDelay _ [] = return ([], iNFINITE)
1010 getDelay now all@(d : rest)
1012 Delay time m | now >= time -> do
1015 DelaySTM time t | now >= time -> do
1016 atomically $ writeTVar t True
1019 -- delay is in millisecs for WaitForSingleObject
1020 let micro_seconds = delayTime d - now
1021 milli_seconds = (micro_seconds + 999) `div` 1000
1022 in return (all, fromIntegral milli_seconds)
1024 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1025 -- available yet. We should move some Win32 functionality down here,
1026 -- maybe as part of the grand reorganisation of the base package...
1027 type HANDLE = Ptr ()
1031 iNFINITE = 0xFFFFFFFF -- urgh
1033 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1034 c_getIOManagerEvent :: IO HANDLE
1036 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1037 c_readIOManagerEvent :: IO Word32
1039 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1040 c_sendIOManagerEvent :: Word32 -> IO ()
1042 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
1045 foreign import stdcall "WaitForSingleObject"
1046 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1049 -- ----------------------------------------------------------------------------
1050 -- Unix IO manager thread, using select()
1052 startIOManagerThread :: IO ()
1053 startIOManagerThread = do
1054 allocaArray 2 $ \fds -> do
1055 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1056 rd_end <- peekElemOff fds 0
1057 wr_end <- peekElemOff fds 1
1058 writeIORef stick (fromIntegral wr_end)
1059 c_setIOManagerPipe wr_end
1061 allocaBytes sizeofFdSet $ \readfds -> do
1062 allocaBytes sizeofFdSet $ \writefds -> do
1063 allocaBytes sizeofTimeVal $ \timeval -> do
1064 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1068 :: Fd -- listen to this for wakeup calls
1075 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1077 -- pick up new IO requests
1078 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1079 let reqs = new_reqs ++ old_reqs
1081 -- pick up new delay requests
1082 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1083 let delays0 = foldr insertDelay old_delays new_delays
1085 -- build the FDSets for select()
1088 fdSet wakeup readfds
1089 maxfd <- buildFdSets 0 readfds writefds reqs
1091 -- perform the select()
1092 let do_select delays = do
1093 -- check the current time and wake up any thread in
1094 -- threadDelay whose timeout has expired. Also find the
1095 -- timeout value for the select() call.
1097 (delays', timeout) <- getDelay now ptimeval delays
1099 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1105 _ | err == eINTR -> do_select delays'
1106 -- EINTR: just redo the select()
1107 _ | err == eBADF -> return (True, delays)
1108 -- EBADF: one of the file descriptors is closed or bad,
1109 -- we don't know which one, so wake everyone up.
1110 _ | otherwise -> throwErrno "select"
1111 -- otherwise (ENOMEM or EINVAL) something has gone
1112 -- wrong; report the error.
1114 return (False,delays')
1116 (wakeup_all,delays') <- do_select delays0
1119 if wakeup_all then return False
1121 b <- fdIsSet wakeup readfds
1124 else alloca $ \p -> do
1125 c_read (fromIntegral wakeup) p 1; return ()
1128 _ | s == io_MANAGER_WAKEUP -> return False
1129 _ | s == io_MANAGER_DIE -> return True
1130 _ -> withMVar signalHandlerLock $ \_ -> do
1131 handler_tbl <- peek handlers
1132 sp <- peekElemOff handler_tbl (fromIntegral s)
1133 io <- deRefStablePtr sp
1137 if exit then return () else do
1139 atomicModifyIORef prodding (\_ -> (False,False))
1141 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1142 else completeRequests reqs readfds writefds []
1144 service_loop wakeup readfds writefds ptimeval reqs' delays'
1146 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1147 io_MANAGER_WAKEUP = 0xff
1148 io_MANAGER_DIE = 0xfe
1151 {-# NOINLINE stick #-}
1152 stick = unsafePerformIO (newIORef 0)
1154 wakeupIOManager :: IO ()
1155 wakeupIOManager = do
1156 fd <- readIORef stick
1157 with io_MANAGER_WAKEUP $ \pbuf -> do
1158 c_write (fromIntegral fd) pbuf 1; return ()
1160 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1161 -- this race condition is #1922, although that bug was on Windows a similar
1162 -- bug also exists on Unix.
1163 signalHandlerLock :: MVar ()
1164 signalHandlerLock = unsafePerformIO (newMVar ())
1166 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1168 foreign import ccall "setIOManagerPipe"
1169 c_setIOManagerPipe :: CInt -> IO ()
1171 -- -----------------------------------------------------------------------------
1174 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1175 buildFdSets maxfd _ _ [] = return maxfd
1176 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1177 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1180 buildFdSets (max maxfd fd) readfds writefds reqs
1181 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1182 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1185 buildFdSets (max maxfd fd) readfds writefds reqs
1187 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1189 completeRequests [] _ _ reqs' = return reqs'
1190 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1191 b <- fdIsSet fd readfds
1193 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1194 else completeRequests reqs readfds writefds (Read fd m : reqs')
1195 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1196 b <- fdIsSet fd writefds
1198 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1199 else completeRequests reqs readfds writefds (Write fd m : reqs')
1201 wakeupAll :: [IOReq] -> IO ()
1202 wakeupAll [] = return ()
1203 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1204 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1206 waitForReadEvent :: Fd -> IO ()
1207 waitForReadEvent fd = do
1209 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1213 waitForWriteEvent :: Fd -> IO ()
1214 waitForWriteEvent fd = do
1216 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1220 -- -----------------------------------------------------------------------------
1223 -- Walk the queue of pending delays, waking up any that have passed
1224 -- and return the smallest delay to wait for. The queue of pending
1225 -- delays is kept ordered.
1226 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1227 getDelay _ _ [] = return ([],nullPtr)
1228 getDelay now ptimeval all@(d : rest)
1230 Delay time m | now >= time -> do
1232 getDelay now ptimeval rest
1233 DelaySTM time t | now >= time -> do
1234 atomically $ writeTVar t True
1235 getDelay now ptimeval rest
1237 setTimevalTicks ptimeval (delayTime d - now)
1238 return (all,ptimeval)
1242 foreign import ccall unsafe "sizeofTimeVal"
1243 sizeofTimeVal :: Int
1245 foreign import ccall unsafe "setTimevalTicks"
1246 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1249 On Win32 we're going to have a single Pipe, and a
1250 waitForSingleObject with the delay time. For signals, we send a
1251 byte down the pipe just like on Unix.
1254 -- ----------------------------------------------------------------------------
1255 -- select() interface
1257 -- ToDo: move to System.Posix.Internals?
1261 foreign import ccall safe "select"
1262 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1265 foreign import ccall unsafe "hsFD_SETSIZE"
1266 c_fD_SETSIZE :: CInt
1269 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1271 foreign import ccall unsafe "hsFD_ISSET"
1272 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1274 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1275 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1277 foreign import ccall unsafe "hsFD_SET"
1278 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1280 fdSet :: Fd -> Ptr CFdSet -> IO ()
1281 fdSet (Fd fd) fdset = c_fdSet fd fdset
1283 foreign import ccall unsafe "hsFD_ZERO"
1284 fdZero :: Ptr CFdSet -> IO ()
1286 foreign import ccall unsafe "sizeof_fd_set"
1291 reportStackOverflow :: IO a
1292 reportStackOverflow = do callStackOverflowHook; return undefined
1294 reportError :: SomeException -> IO a
1296 handler <- getUncaughtExceptionHandler
1300 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1301 -- the unsafe below.
1302 foreign import ccall unsafe "stackOverflow"
1303 callStackOverflowHook :: IO ()
1305 {-# NOINLINE uncaughtExceptionHandler #-}
1306 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1307 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1309 defaultHandler :: SomeException -> IO ()
1310 defaultHandler se@(SomeException ex) = do
1311 (hFlush stdout) `catchAny` (\ _ -> return ())
1312 let msg = case cast ex of
1313 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1314 _ -> case cast ex of
1315 Just (ErrorCall s) -> s
1316 _ -> showsPrec 0 se ""
1317 withCString "%s" $ \cfmt ->
1318 withCString msg $ \cmsg ->
1319 errorBelch cfmt cmsg
1321 -- don't use errorBelch() directly, because we cannot call varargs functions
1323 foreign import ccall unsafe "HsBase.h errorBelch2"
1324 errorBelch :: CString -> CString -> IO ()
1326 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1327 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1329 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1330 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler