2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
7 -- Copyright : (c) The University of Glasgow, 1994-2002
8 -- License : see libraries/base/LICENSE
10 -- Maintainer : cvs-ghc@haskell.org
11 -- Stability : internal
12 -- Portability : non-portable (GHC extensions)
14 -- Basic concurrency stuff.
16 -----------------------------------------------------------------------------
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home. Hence:
29 -- * Forking and suchlike
30 , forkIO -- :: IO a -> IO ThreadId
31 , forkOnIO -- :: Int -> IO a -> IO ThreadId
32 , numCapabilities -- :: Int
33 , childHandler -- :: Exception -> IO ()
34 , myThreadId -- :: IO ThreadId
35 , killThread -- :: ThreadId -> IO ()
36 , throwTo -- :: ThreadId -> Exception -> IO ()
37 , par -- :: a -> b -> b
38 , pseq -- :: a -> b -> b
40 , labelThread -- :: ThreadId -> String -> IO ()
42 , ThreadStatus(..), BlockReason(..)
43 , threadStatus -- :: ThreadId -> IO ThreadStatus
46 , threadDelay -- :: Int -> IO ()
47 , registerDelay -- :: Int -> IO (TVar Bool)
48 , threadWaitRead -- :: Int -> IO ()
49 , threadWaitWrite -- :: Int -> IO ()
53 , newMVar -- :: a -> IO (MVar a)
54 , newEmptyMVar -- :: IO (MVar a)
55 , takeMVar -- :: MVar a -> IO a
56 , putMVar -- :: MVar a -> a -> IO ()
57 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
58 , tryPutMVar -- :: MVar a -> a -> IO Bool
59 , isEmptyMVar -- :: MVar a -> IO Bool
60 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
64 , atomically -- :: STM a -> IO a
66 , orElse -- :: STM a -> STM a -> STM a
67 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
68 , alwaysSucceeds -- :: STM a -> STM ()
69 , always -- :: STM Bool -> STM ()
71 , newTVar -- :: a -> STM (TVar a)
72 , newTVarIO -- :: a -> STM (TVar a)
73 , readTVar -- :: TVar a -> STM a
74 , writeTVar -- :: a -> TVar a -> STM ()
75 , unsafeIOToSTM -- :: IO a -> STM a
78 #ifdef mingw32_HOST_OS
79 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
83 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #ifndef mingw32_HOST_OS
91 , ensureIOManagerIsRunning
93 #ifdef mingw32_HOST_OS
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
108 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
115 import GHC.Num ( Num(..) )
116 import GHC.Real ( fromIntegral, div )
117 #ifndef mingw32_HOST_OS
118 import GHC.Base ( Int(..) )
120 #ifdef mingw32_HOST_OS
121 import GHC.Read ( Read )
122 import GHC.Enum ( Enum )
125 import GHC.Pack ( packCString# )
126 import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
128 import GHC.Show ( Show(..), showString )
131 infixr 0 `par`, `pseq`
134 %************************************************************************
136 \subsection{@ThreadId@, @par@, and @fork@}
138 %************************************************************************
141 data ThreadId = ThreadId ThreadId# deriving( Typeable )
142 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
143 -- But since ThreadId# is unlifted, the Weak type must use open
146 A 'ThreadId' is an abstract type representing a handle to a thread.
147 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
148 the 'Ord' instance implements an arbitrary total ordering over
149 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
150 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
151 useful when debugging or diagnosing the behaviour of a concurrent
154 /Note/: in GHC, if you have a 'ThreadId', you essentially have
155 a pointer to the thread itself. This means the thread itself can\'t be
156 garbage collected until you drop the 'ThreadId'.
157 This misfeature will hopefully be corrected at a later date.
159 /Note/: Hugs does not provide any operations on other threads;
160 it defines 'ThreadId' as a synonym for ().
163 instance Show ThreadId where
165 showString "ThreadId " .
166 showsPrec d (getThreadId (id2TSO t))
168 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
170 id2TSO :: ThreadId -> ThreadId#
171 id2TSO (ThreadId t) = t
173 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
176 cmpThread :: ThreadId -> ThreadId -> Ordering
178 case cmp_thread (id2TSO t1) (id2TSO t2) of
183 instance Eq ThreadId where
185 case t1 `cmpThread` t2 of
189 instance Ord ThreadId where
193 Sparks off a new thread to run the 'IO' computation passed as the
194 first argument, and returns the 'ThreadId' of the newly created
197 The new thread will be a lightweight thread; if you want to use a foreign
198 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
200 GHC note: the new thread inherits the /blocked/ state of the parent
201 (see 'Control.Exception.block').
203 forkIO :: IO () -> IO ThreadId
204 forkIO action = IO $ \ s ->
205 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
207 action_plus = catchException action childHandler
210 Like 'forkIO', but lets you specify on which CPU the thread is
211 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
212 will stay on the same CPU for its entire lifetime (`forkIO` threads
213 can migrate between CPUs according to the scheduling policy).
214 `forkOnIO` is useful for overriding the scheduling policy when you
215 know in advance how best to distribute the threads.
217 The `Int` argument specifies the CPU number; it is interpreted modulo
218 'numCapabilities' (note that it actually specifies a capability number
219 rather than a CPU number, but to a first approximation the two are
222 forkOnIO :: Int -> IO () -> IO ThreadId
223 forkOnIO (I# cpu) action = IO $ \ s ->
224 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
226 action_plus = catchException action childHandler
228 -- | the value passed to the @+RTS -N@ flag. This is the number of
229 -- Haskell threads that can run truly simultaneously at any given
230 -- time, and is typically set to the number of physical CPU cores on
232 numCapabilities :: Int
233 numCapabilities = unsafePerformIO $ do
234 n <- peek n_capabilities
235 return (fromIntegral n)
237 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
239 childHandler :: Exception -> IO ()
240 childHandler err = catchException (real_handler err) childHandler
242 real_handler :: Exception -> IO ()
245 -- ignore thread GC and killThread exceptions:
246 BlockedOnDeadMVar -> return ()
247 BlockedIndefinitely -> return ()
248 AsyncException ThreadKilled -> return ()
250 -- report all others:
251 AsyncException StackOverflow -> reportStackOverflow
252 other -> reportError other
254 {- | 'killThread' terminates the given thread (GHC only).
255 Any work already done by the thread isn\'t
256 lost: the computation is suspended until required by another thread.
257 The memory used by the thread will be garbage collected if it isn\'t
258 referenced from anywhere. The 'killThread' function is defined in
261 > killThread tid = throwTo tid (AsyncException ThreadKilled)
264 killThread :: ThreadId -> IO ()
265 killThread tid = throwTo tid (AsyncException ThreadKilled)
267 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
269 'throwTo' does not return until the exception has been raised in the
271 The calling thread can thus be certain that the target
272 thread has received the exception. This is a useful property to know
273 when dealing with race conditions: eg. if there are two threads that
274 can kill each other, it is guaranteed that only one of the threads
275 will get to kill the other.
277 If the target thread is currently making a foreign call, then the
278 exception will not be raised (and hence 'throwTo' will not return)
279 until the call has completed. This is the case regardless of whether
280 the call is inside a 'block' or not.
282 Important note: the behaviour of 'throwTo' differs from that described in
283 the paper \"Asynchronous exceptions in Haskell\"
284 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
285 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
286 a more synchronous design in which 'throwTo' does not return until the exception
287 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
288 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
291 There is currently no guarantee that the exception delivered by 'throwTo' will be
292 delivered at the first possible opportunity. In particular, if a thread may
293 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
294 a pending 'throwTo'. This is arguably undesirable behaviour.
297 throwTo :: ThreadId -> Exception -> IO ()
298 throwTo (ThreadId id) ex = IO $ \ s ->
299 case (killThread# id ex s) of s1 -> (# s1, () #)
301 -- | Returns the 'ThreadId' of the calling thread (GHC only).
302 myThreadId :: IO ThreadId
303 myThreadId = IO $ \s ->
304 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
307 -- |The 'yield' action allows (forces, in a co-operative multitasking
308 -- implementation) a context-switch to any other currently runnable
309 -- threads (if any), and is occasionally useful when implementing
310 -- concurrency abstractions.
313 case (yield# s) of s1 -> (# s1, () #)
315 {- | 'labelThread' stores a string as identifier for this thread if
316 you built a RTS with debugging support. This identifier will be used in
317 the debugging output to make distinction of different threads easier
318 (otherwise you only have the thread state object\'s address in the heap).
320 Other applications like the graphical Concurrent Haskell Debugger
321 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
322 'labelThread' for their purposes as well.
325 labelThread :: ThreadId -> String -> IO ()
326 labelThread (ThreadId t) str = IO $ \ s ->
327 let ps = packCString# str
328 adr = byteArrayContents# ps in
329 case (labelThread# t adr s) of s1 -> (# s1, () #)
331 -- Nota Bene: 'pseq' used to be 'seq'
332 -- but 'seq' is now defined in PrelGHC
334 -- "pseq" is defined a bit weirdly (see below)
336 -- The reason for the strange "lazy" call is that
337 -- it fools the compiler into thinking that pseq and par are non-strict in
338 -- their second argument (even if it inlines pseq at the call site).
339 -- If it thinks pseq is strict in "y", then it often evaluates
340 -- "y" before "x", which is totally wrong.
344 pseq x y = x `seq` lazy y
348 par x y = case (par# x) of { _ -> lazy y }
353 -- ^blocked on on 'MVar'
355 -- ^blocked on a computation in progress by another thread
357 -- ^blocked in 'throwTo'
359 -- ^blocked in 'retry' in an STM transaction
360 | BlockedOnForeignCall
361 -- ^currently in a foreign call
363 -- ^blocked on some other resource. Without @-threaded@,
364 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
365 -- they show up as 'BlockedOnMVar'.
366 deriving (Eq,Ord,Show)
368 -- | The current status of a thread
371 -- ^the thread is currently runnable or running
373 -- ^the thread has finished
374 | ThreadBlocked BlockReason
375 -- ^the thread is blocked on some resource
377 -- ^the thread received an uncaught exception
378 deriving (Eq,Ord,Show)
380 threadStatus :: ThreadId -> IO ThreadStatus
381 threadStatus (ThreadId t) = IO $ \s ->
382 case threadStatus# t s of
383 (# s', stat #) -> (# s', mk_stat (I# stat) #)
385 -- NB. keep these in sync with includes/Constants.h
386 mk_stat 0 = ThreadRunning
387 mk_stat 1 = ThreadBlocked BlockedOnMVar
388 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
389 mk_stat 3 = ThreadBlocked BlockedOnException
390 mk_stat 7 = ThreadBlocked BlockedOnSTM
391 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
392 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
393 mk_stat 16 = ThreadFinished
394 mk_stat 17 = ThreadDied
395 mk_stat _ = ThreadBlocked BlockedOnOther
399 %************************************************************************
401 \subsection[stm]{Transactional heap operations}
403 %************************************************************************
405 TVars are shared memory locations which support atomic memory
409 -- |A monad supporting atomic memory transactions.
410 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
412 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
415 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
417 instance Functor STM where
418 fmap f x = x >>= (return . f)
420 instance Monad STM where
421 {-# INLINE return #-}
425 return x = returnSTM x
426 m >>= k = bindSTM m k
428 bindSTM :: STM a -> (a -> STM b) -> STM b
429 bindSTM (STM m) k = STM ( \s ->
431 (# new_s, a #) -> unSTM (k a) new_s
434 thenSTM :: STM a -> STM b -> STM b
435 thenSTM (STM m) k = STM ( \s ->
437 (# new_s, a #) -> unSTM k new_s
440 returnSTM :: a -> STM a
441 returnSTM x = STM (\s -> (# s, x #))
443 -- | Unsafely performs IO in the STM monad.
444 unsafeIOToSTM :: IO a -> STM a
445 unsafeIOToSTM (IO m) = STM m
447 -- |Perform a series of STM actions atomically.
449 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
450 -- Any attempt to do so will result in a runtime error. (Reason: allowing
451 -- this would effectively allow a transaction inside a transaction, depending
452 -- on exactly when the thunk is evaluated.)
454 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
455 -- and which allows top-level TVars to be allocated.
457 atomically :: STM a -> IO a
458 atomically (STM m) = IO (\s -> (atomically# m) s )
460 -- |Retry execution of the current memory transaction because it has seen
461 -- values in TVars which mean that it should not continue (e.g. the TVars
462 -- represent a shared buffer that is now empty). The implementation may
463 -- block the thread until one of the TVars that it has read from has been
464 -- udpated. (GHC only)
466 retry = STM $ \s# -> retry# s#
468 -- |Compose two alternative STM actions (GHC only). If the first action
469 -- completes without retrying then it forms the result of the orElse.
470 -- Otherwise, if the first action retries, then the second action is
471 -- tried in its place. If both actions retry then the orElse as a
473 orElse :: STM a -> STM a -> STM a
474 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
476 -- |Exception handling within STM actions.
477 catchSTM :: STM a -> (Exception -> STM a) -> STM a
478 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
480 -- | Low-level primitive on which always and alwaysSucceeds are built.
481 -- checkInv differs form these in that (i) the invariant is not
482 -- checked when checkInv is called, only at the end of this and
483 -- subsequent transcations, (ii) the invariant failure is indicated
484 -- by raising an exception.
485 checkInv :: STM a -> STM ()
486 checkInv (STM m) = STM (\s -> (check# m) s)
488 -- | alwaysSucceeds adds a new invariant that must be true when passed
489 -- to alwaysSucceeds, at the end of the current transaction, and at
490 -- the end of every subsequent transaction. If it fails at any
491 -- of those points then the transaction violating it is aborted
492 -- and the exception raised by the invariant is propagated.
493 alwaysSucceeds :: STM a -> STM ()
494 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
497 -- | always is a variant of alwaysSucceeds in which the invariant is
498 -- expressed as an STM Bool action that must return True. Returning
499 -- False or raising an exception are both treated as invariant failures.
500 always :: STM Bool -> STM ()
501 always i = alwaysSucceeds ( do v <- i
502 if (v) then return () else ( error "Transacional invariant violation" ) )
504 -- |Shared memory locations that support atomic memory transactions.
505 data TVar a = TVar (TVar# RealWorld a)
507 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
509 instance Eq (TVar a) where
510 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
512 -- |Create a new TVar holding a value supplied
513 newTVar :: a -> STM (TVar a)
514 newTVar val = STM $ \s1# ->
515 case newTVar# val s1# of
516 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
518 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
519 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
520 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
522 newTVarIO :: a -> IO (TVar a)
523 newTVarIO val = IO $ \s1# ->
524 case newTVar# val s1# of
525 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
527 -- |Return the current value stored in a TVar
528 readTVar :: TVar a -> STM a
529 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
531 -- |Write the supplied value into a TVar
532 writeTVar :: TVar a -> a -> STM ()
533 writeTVar (TVar tvar#) val = STM $ \s1# ->
534 case writeTVar# tvar# val s1# of
539 %************************************************************************
541 \subsection[mvars]{M-Structures}
543 %************************************************************************
545 M-Vars are rendezvous points for concurrent threads. They begin
546 empty, and any attempt to read an empty M-Var blocks. When an M-Var
547 is written, a single blocked thread may be freed. Reading an M-Var
548 toggles its state from full back to empty. Therefore, any value
549 written to an M-Var may only be read once. Multiple reads and writes
550 are allowed, but there must be at least one read between any two
554 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
556 -- |Create an 'MVar' which is initially empty.
557 newEmptyMVar :: IO (MVar a)
558 newEmptyMVar = IO $ \ s# ->
560 (# s2#, svar# #) -> (# s2#, MVar svar# #)
562 -- |Create an 'MVar' which contains the supplied value.
563 newMVar :: a -> IO (MVar a)
565 newEmptyMVar >>= \ mvar ->
566 putMVar mvar value >>
569 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
570 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
571 -- the 'MVar' is left empty.
573 -- There are two further important properties of 'takeMVar':
575 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
576 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
577 -- only one thread will be woken up. The runtime guarantees that
578 -- the woken thread completes its 'takeMVar' operation.
580 -- * When multiple threads are blocked on an 'MVar', they are
581 -- woken up in FIFO order. This is useful for providing
582 -- fairness properties of abstractions built using 'MVar's.
584 takeMVar :: MVar a -> IO a
585 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
587 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
588 -- 'putMVar' will wait until it becomes empty.
590 -- There are two further important properties of 'putMVar':
592 -- * 'putMVar' is single-wakeup. That is, if there are multiple
593 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
594 -- only one thread will be woken up. The runtime guarantees that
595 -- the woken thread completes its 'putMVar' operation.
597 -- * When multiple threads are blocked on an 'MVar', they are
598 -- woken up in FIFO order. This is useful for providing
599 -- fairness properties of abstractions built using 'MVar's.
601 putMVar :: MVar a -> a -> IO ()
602 putMVar (MVar mvar#) x = IO $ \ s# ->
603 case putMVar# mvar# x s# of
606 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
607 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
608 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
609 -- the 'MVar' is left empty.
610 tryTakeMVar :: MVar a -> IO (Maybe a)
611 tryTakeMVar (MVar m) = IO $ \ s ->
612 case tryTakeMVar# m s of
613 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
614 (# s, _, a #) -> (# s, Just a #) -- MVar is full
616 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
617 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
618 -- it was successful, or 'False' otherwise.
619 tryPutMVar :: MVar a -> a -> IO Bool
620 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
621 case tryPutMVar# mvar# x s# of
622 (# s, 0# #) -> (# s, False #)
623 (# s, _ #) -> (# s, True #)
625 -- |Check whether a given 'MVar' is empty.
627 -- Notice that the boolean value returned is just a snapshot of
628 -- the state of the MVar. By the time you get to react on its result,
629 -- the MVar may have been filled (or emptied) - so be extremely
630 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
631 isEmptyMVar :: MVar a -> IO Bool
632 isEmptyMVar (MVar mv#) = IO $ \ s# ->
633 case isEmptyMVar# mv# s# of
634 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
636 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
637 -- "System.Mem.Weak" for more about finalizers.
638 addMVarFinalizer :: MVar a -> IO () -> IO ()
639 addMVarFinalizer (MVar m) finalizer =
640 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
642 withMVar :: MVar a -> (a -> IO b) -> IO b
646 b <- catchException (unblock (io a))
647 (\e -> do putMVar m a; throw e)
653 %************************************************************************
655 \subsection{Thread waiting}
657 %************************************************************************
660 #ifdef mingw32_HOST_OS
662 -- Note: threadWaitRead and threadWaitWrite aren't really functional
663 -- on Win32, but left in there because lib code (still) uses them (the manner
664 -- in which they're used doesn't cause problems on a Win32 platform though.)
666 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
667 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
668 IO $ \s -> case asyncRead# fd isSock len buf s of
669 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
671 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
672 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
673 IO $ \s -> case asyncWrite# fd isSock len buf s of
674 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
676 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
677 asyncDoProc (FunPtr proc) (Ptr param) =
678 -- the 'length' value is ignored; simplifies implementation of
679 -- the async*# primops to have them all return the same result.
680 IO $ \s -> case asyncDoProc# proc param s of
681 (# s, len#, err# #) -> (# s, I# err# #)
683 -- to aid the use of these primops by the IO Handle implementation,
684 -- provide the following convenience funs:
686 -- this better be a pinned byte array!
687 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
688 asyncReadBA fd isSock len off bufB =
689 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
691 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
692 asyncWriteBA fd isSock len off bufB =
693 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
697 -- -----------------------------------------------------------------------------
700 -- | Block the current thread until data is available to read on the
701 -- given file descriptor (GHC only).
702 threadWaitRead :: Fd -> IO ()
704 #ifndef mingw32_HOST_OS
705 | threaded = waitForReadEvent fd
707 | otherwise = IO $ \s ->
708 case fromIntegral fd of { I# fd# ->
709 case waitRead# fd# s of { s -> (# s, () #)
712 -- | Block the current thread until data can be written to the
713 -- given file descriptor (GHC only).
714 threadWaitWrite :: Fd -> IO ()
716 #ifndef mingw32_HOST_OS
717 | threaded = waitForWriteEvent fd
719 | otherwise = IO $ \s ->
720 case fromIntegral fd of { I# fd# ->
721 case waitWrite# fd# s of { s -> (# s, () #)
724 -- | Suspends the current thread for a given number of microseconds
727 -- There is no guarantee that the thread will be rescheduled promptly
728 -- when the delay has expired, but the thread will never continue to
729 -- run /earlier/ than specified.
731 threadDelay :: Int -> IO ()
733 | threaded = waitForDelayEvent time
734 | otherwise = IO $ \s ->
735 case fromIntegral time of { I# time# ->
736 case delay# time# s of { s -> (# s, () #)
740 -- | Set the value of returned TVar to True after a given number of
741 -- microseconds. The caveats associated with threadDelay also apply.
743 registerDelay :: Int -> IO (TVar Bool)
745 | threaded = waitForDelayEventSTM usecs
746 | otherwise = error "registerDelay: requires -threaded"
748 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
750 waitForDelayEvent :: Int -> IO ()
751 waitForDelayEvent usecs = do
753 target <- calculateTarget usecs
754 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
758 -- Delays for use in STM
759 waitForDelayEventSTM :: Int -> IO (TVar Bool)
760 waitForDelayEventSTM usecs = do
761 t <- atomically $ newTVar False
762 target <- calculateTarget usecs
763 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
767 calculateTarget :: Int -> IO USecs
768 calculateTarget usecs = do
770 return $ now + (fromIntegral usecs)
773 -- ----------------------------------------------------------------------------
774 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
776 -- In the threaded RTS, we employ a single IO Manager thread to wait
777 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
778 -- and delays (threadDelay).
780 -- We can do this because in the threaded RTS the IO Manager can make
781 -- a non-blocking call to select(), so we don't have to do select() in
782 -- the scheduler as we have to in the non-threaded RTS. We get performance
783 -- benefits from doing it this way, because we only have to restart the select()
784 -- when a new request arrives, rather than doing one select() each time
785 -- around the scheduler loop. Furthermore, the scheduler can be simplified
786 -- by not having to check for completed IO requests.
788 -- Issues, possible problems:
790 -- - we might want bound threads to just do the blocking
791 -- operation rather than communicating with the IO manager
792 -- thread. This would prevent simgle-threaded programs which do
793 -- IO from requiring multiple OS threads. However, it would also
794 -- prevent bound threads waiting on IO from being killed or sent
797 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
798 -- I couldn't repeat this.
800 -- - How do we handle signal delivery in the multithreaded RTS?
802 -- - forkProcess will kill the IO manager thread. Let's just
803 -- hope we don't need to do any blocking IO between fork & exec.
805 #ifndef mingw32_HOST_OS
807 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
808 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
812 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
813 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
815 #ifndef mingw32_HOST_OS
816 pendingEvents :: IORef [IOReq]
818 pendingDelays :: IORef [DelayReq]
819 -- could use a strict list or array here
820 {-# NOINLINE pendingEvents #-}
821 {-# NOINLINE pendingDelays #-}
822 (pendingEvents,pendingDelays) = unsafePerformIO $ do
827 -- the first time we schedule an IO request, the service thread
828 -- will be created (cool, huh?)
830 ensureIOManagerIsRunning :: IO ()
831 ensureIOManagerIsRunning
832 | threaded = seq pendingEvents $ return ()
833 | otherwise = return ()
835 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
836 insertDelay d [] = [d]
837 insertDelay d1 ds@(d2 : rest)
838 | delayTime d1 <= delayTime d2 = d1 : ds
839 | otherwise = d2 : insertDelay d1 rest
841 delayTime :: DelayReq -> USecs
842 delayTime (Delay t _) = t
843 delayTime (DelaySTM t _) = t
847 -- XXX: move into GHC.IOBase from Data.IORef?
848 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
849 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
851 foreign import ccall unsafe "getUSecOfDay"
852 getUSecOfDay :: IO USecs
854 prodding :: IORef Bool
855 {-# NOINLINE prodding #-}
856 prodding = unsafePerformIO (newIORef False)
858 prodServiceThread :: IO ()
859 prodServiceThread = do
860 was_set <- atomicModifyIORef prodding (\a -> (True,a))
861 if (not (was_set)) then wakeupIOManager else return ()
863 #ifdef mingw32_HOST_OS
864 -- ----------------------------------------------------------------------------
865 -- Windows IO manager thread
867 startIOManagerThread :: IO ()
868 startIOManagerThread = do
869 wakeup <- c_getIOManagerEvent
870 forkIO $ service_loop wakeup []
873 service_loop :: HANDLE -- read end of pipe
874 -> [DelayReq] -- current delay requests
877 service_loop wakeup old_delays = do
878 -- pick up new delay requests
879 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
880 let delays = foldr insertDelay old_delays new_delays
883 (delays', timeout) <- getDelay now delays
885 r <- c_WaitForSingleObject wakeup timeout
887 0xffffffff -> do c_maperrno; throwErrno "service_loop"
889 r <- c_readIOManagerEvent
892 _ | r == io_MANAGER_WAKEUP -> return False
893 _ | r == io_MANAGER_DIE -> return True
894 0 -> return False -- spurious wakeup
895 r -> do start_console_handler (r `shiftR` 1); return False
898 else service_cont wakeup delays'
900 _other -> service_cont wakeup delays' -- probably timeout
902 service_cont wakeup delays = do
903 atomicModifyIORef prodding (\_ -> (False,False))
904 service_loop wakeup delays
906 -- must agree with rts/win32/ThrIOManager.c
907 io_MANAGER_WAKEUP = 0xffffffff :: Word32
908 io_MANAGER_DIE = 0xfffffffe :: Word32
914 -- these are sent to Services only.
917 deriving (Eq, Ord, Enum, Show, Read, Typeable)
919 start_console_handler :: Word32 -> IO ()
920 start_console_handler r =
921 case toWin32ConsoleEvent r of
922 Just x -> withMVar win32ConsoleHandler $ \handler -> do
927 toWin32ConsoleEvent ev =
929 0 {- CTRL_C_EVENT-} -> Just ControlC
930 1 {- CTRL_BREAK_EVENT-} -> Just Break
931 2 {- CTRL_CLOSE_EVENT-} -> Just Close
932 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
933 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
936 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
937 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
939 stick :: IORef HANDLE
940 {-# NOINLINE stick #-}
941 stick = unsafePerformIO (newIORef nullPtr)
944 hdl <- readIORef stick
945 c_sendIOManagerEvent io_MANAGER_WAKEUP
947 -- Walk the queue of pending delays, waking up any that have passed
948 -- and return the smallest delay to wait for. The queue of pending
949 -- delays is kept ordered.
950 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
951 getDelay now [] = return ([], iNFINITE)
952 getDelay now all@(d : rest)
954 Delay time m | now >= time -> do
957 DelaySTM time t | now >= time -> do
958 atomically $ writeTVar t True
961 -- delay is in millisecs for WaitForSingleObject
962 let micro_seconds = delayTime d - now
963 milli_seconds = (micro_seconds + 999) `div` 1000
964 in return (all, fromIntegral milli_seconds)
966 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
967 -- available yet. We should move some Win32 functionality down here,
968 -- maybe as part of the grand reorganisation of the base package...
972 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
974 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
975 c_getIOManagerEvent :: IO HANDLE
977 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
978 c_readIOManagerEvent :: IO Word32
980 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
981 c_sendIOManagerEvent :: Word32 -> IO ()
983 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
986 foreign import stdcall "WaitForSingleObject"
987 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
990 -- ----------------------------------------------------------------------------
991 -- Unix IO manager thread, using select()
993 startIOManagerThread :: IO ()
994 startIOManagerThread = do
995 allocaArray 2 $ \fds -> do
996 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
997 rd_end <- peekElemOff fds 0
998 wr_end <- peekElemOff fds 1
999 writeIORef stick (fromIntegral wr_end)
1000 c_setIOManagerPipe wr_end
1002 allocaBytes sizeofFdSet $ \readfds -> do
1003 allocaBytes sizeofFdSet $ \writefds -> do
1004 allocaBytes sizeofTimeVal $ \timeval -> do
1005 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1009 :: Fd -- listen to this for wakeup calls
1016 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1018 -- pick up new IO requests
1019 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1020 let reqs = new_reqs ++ old_reqs
1022 -- pick up new delay requests
1023 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1024 let delays = foldr insertDelay old_delays new_delays
1026 -- build the FDSets for select()
1029 fdSet wakeup readfds
1030 maxfd <- buildFdSets 0 readfds writefds reqs
1032 -- perform the select()
1033 let do_select delays = do
1034 -- check the current time and wake up any thread in
1035 -- threadDelay whose timeout has expired. Also find the
1036 -- timeout value for the select() call.
1038 (delays', timeout) <- getDelay now ptimeval delays
1040 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1046 _ | err == eINTR -> do_select delays'
1047 -- EINTR: just redo the select()
1048 _ | err == eBADF -> return (True, delays)
1049 -- EBADF: one of the file descriptors is closed or bad,
1050 -- we don't know which one, so wake everyone up.
1051 _ | otherwise -> throwErrno "select"
1052 -- otherwise (ENOMEM or EINVAL) something has gone
1053 -- wrong; report the error.
1055 return (False,delays')
1057 (wakeup_all,delays') <- do_select delays
1060 if wakeup_all then return False
1062 b <- fdIsSet wakeup readfds
1065 else alloca $ \p -> do
1066 c_read (fromIntegral wakeup) p 1; return ()
1069 _ | s == io_MANAGER_WAKEUP -> return False
1070 _ | s == io_MANAGER_DIE -> return True
1071 _ -> withMVar signalHandlerLock $ \_ -> do
1072 handler_tbl <- peek handlers
1073 sp <- peekElemOff handler_tbl (fromIntegral s)
1074 io <- deRefStablePtr sp
1078 if exit then return () else do
1080 atomicModifyIORef prodding (\_ -> (False,False))
1082 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1083 else completeRequests reqs readfds writefds []
1085 service_loop wakeup readfds writefds ptimeval reqs' delays'
1087 io_MANAGER_WAKEUP = 0xff :: CChar
1088 io_MANAGER_DIE = 0xfe :: CChar
1091 {-# NOINLINE stick #-}
1092 stick = unsafePerformIO (newIORef 0)
1094 wakeupIOManager :: IO ()
1095 wakeupIOManager = do
1096 fd <- readIORef stick
1097 with io_MANAGER_WAKEUP $ \pbuf -> do
1098 c_write (fromIntegral fd) pbuf 1; return ()
1100 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1101 -- this race condition is #1922, although that bug was on Windows a similar
1102 -- bug also exists on Unix.
1103 signalHandlerLock :: MVar ()
1104 signalHandlerLock = unsafePerformIO (newMVar ())
1106 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1108 foreign import ccall "setIOManagerPipe"
1109 c_setIOManagerPipe :: CInt -> IO ()
1111 -- -----------------------------------------------------------------------------
1114 buildFdSets maxfd readfds writefds [] = return maxfd
1115 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1116 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1119 buildFdSets (max maxfd fd) readfds writefds reqs
1120 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1121 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1124 buildFdSets (max maxfd fd) readfds writefds reqs
1126 completeRequests [] _ _ reqs' = return reqs'
1127 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1128 b <- fdIsSet fd readfds
1130 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1131 else completeRequests reqs readfds writefds (Read fd m : reqs')
1132 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1133 b <- fdIsSet fd writefds
1135 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1136 else completeRequests reqs readfds writefds (Write fd m : reqs')
1138 wakeupAll [] = return ()
1139 wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
1140 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1142 waitForReadEvent :: Fd -> IO ()
1143 waitForReadEvent fd = do
1145 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1149 waitForWriteEvent :: Fd -> IO ()
1150 waitForWriteEvent fd = do
1152 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1156 -- -----------------------------------------------------------------------------
1159 -- Walk the queue of pending delays, waking up any that have passed
1160 -- and return the smallest delay to wait for. The queue of pending
1161 -- delays is kept ordered.
1162 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1163 getDelay now ptimeval [] = return ([],nullPtr)
1164 getDelay now ptimeval all@(d : rest)
1166 Delay time m | now >= time -> do
1168 getDelay now ptimeval rest
1169 DelaySTM time t | now >= time -> do
1170 atomically $ writeTVar t True
1171 getDelay now ptimeval rest
1173 setTimevalTicks ptimeval (delayTime d - now)
1174 return (all,ptimeval)
1176 newtype CTimeVal = CTimeVal ()
1178 foreign import ccall unsafe "sizeofTimeVal"
1179 sizeofTimeVal :: Int
1181 foreign import ccall unsafe "setTimevalTicks"
1182 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1185 On Win32 we're going to have a single Pipe, and a
1186 waitForSingleObject with the delay time. For signals, we send a
1187 byte down the pipe just like on Unix.
1190 -- ----------------------------------------------------------------------------
1191 -- select() interface
1193 -- ToDo: move to System.Posix.Internals?
1195 newtype CFdSet = CFdSet ()
1197 foreign import ccall safe "select"
1198 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1201 foreign import ccall unsafe "hsFD_SETSIZE"
1202 c_fD_SETSIZE :: CInt
1205 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1207 foreign import ccall unsafe "hsFD_CLR"
1208 c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1210 fdClr :: Fd -> Ptr CFdSet -> IO ()
1211 fdClr (Fd fd) fdset = c_fdClr fd fdset
1213 foreign import ccall unsafe "hsFD_ISSET"
1214 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1216 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1217 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1219 foreign import ccall unsafe "hsFD_SET"
1220 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1222 fdSet :: Fd -> Ptr CFdSet -> IO ()
1223 fdSet (Fd fd) fdset = c_fdSet fd fdset
1225 foreign import ccall unsafe "hsFD_ZERO"
1226 fdZero :: Ptr CFdSet -> IO ()
1228 foreign import ccall unsafe "sizeof_fd_set"