2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
6 -- Copyright : (c) The University of Glasgow, 1994-2002
7 -- License : see libraries/base/LICENSE
9 -- Maintainer : cvs-ghc@haskell.org
10 -- Stability : internal
11 -- Portability : non-portable (GHC extensions)
13 -- Basic concurrency stuff.
15 -----------------------------------------------------------------------------
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home. Hence:
28 -- * Forking and suchlike
29 , forkIO -- :: IO a -> IO ThreadId
30 , forkOnIO -- :: Int -> IO a -> IO ThreadId
31 , numCapabilities -- :: Int
32 , childHandler -- :: Exception -> IO ()
33 , myThreadId -- :: IO ThreadId
34 , killThread -- :: ThreadId -> IO ()
35 , throwTo -- :: ThreadId -> Exception -> IO ()
36 , par -- :: a -> b -> b
37 , pseq -- :: a -> b -> b
39 , labelThread -- :: ThreadId -> String -> IO ()
42 , threadDelay -- :: Int -> IO ()
43 , registerDelay -- :: Int -> IO (TVar Bool)
44 , threadWaitRead -- :: Int -> IO ()
45 , threadWaitWrite -- :: Int -> IO ()
49 , newMVar -- :: a -> IO (MVar a)
50 , newEmptyMVar -- :: IO (MVar a)
51 , takeMVar -- :: MVar a -> IO a
52 , putMVar -- :: MVar a -> a -> IO ()
53 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
54 , tryPutMVar -- :: MVar a -> a -> IO Bool
55 , isEmptyMVar -- :: MVar a -> IO Bool
56 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
60 , atomically -- :: STM a -> IO a
62 , orElse -- :: STM a -> STM a -> STM a
63 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
64 , alwaysSucceeds -- :: STM a -> STM ()
65 , always -- :: STM Bool -> STM ()
67 , newTVar -- :: a -> STM (TVar a)
68 , newTVarIO -- :: a -> STM (TVar a)
69 , readTVar -- :: TVar a -> STM a
70 , writeTVar -- :: a -> TVar a -> STM ()
71 , unsafeIOToSTM -- :: IO a -> STM a
74 #ifdef mingw32_HOST_OS
75 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
76 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
77 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
79 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
83 #ifndef mingw32_HOST_OS
87 , ensureIOManagerIsRunning
89 #ifdef mingw32_HOST_OS
96 import System.Posix.Types
97 #ifndef mingw32_HOST_OS
98 import System.Posix.Internals
104 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
111 import GHC.Num ( Num(..) )
112 import GHC.Real ( fromIntegral, div )
113 #ifndef mingw32_HOST_OS
114 import GHC.Base ( Int(..) )
116 #ifdef mingw32_HOST_OS
117 import GHC.Read ( Read )
118 import GHC.Enum ( Enum )
121 import GHC.Pack ( packCString# )
122 import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
124 import GHC.Show ( Show(..), showString )
127 infixr 0 `par`, `pseq`
130 %************************************************************************
132 \subsection{@ThreadId@, @par@, and @fork@}
134 %************************************************************************
137 data ThreadId = ThreadId ThreadId# deriving( Typeable )
138 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
139 -- But since ThreadId# is unlifted, the Weak type must use open
142 A 'ThreadId' is an abstract type representing a handle to a thread.
143 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
144 the 'Ord' instance implements an arbitrary total ordering over
145 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
146 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
147 useful when debugging or diagnosing the behaviour of a concurrent
150 /Note/: in GHC, if you have a 'ThreadId', you essentially have
151 a pointer to the thread itself. This means the thread itself can\'t be
152 garbage collected until you drop the 'ThreadId'.
153 This misfeature will hopefully be corrected at a later date.
155 /Note/: Hugs does not provide any operations on other threads;
156 it defines 'ThreadId' as a synonym for ().
159 instance Show ThreadId where
161 showString "ThreadId " .
162 showsPrec d (getThreadId (id2TSO t))
164 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
166 id2TSO :: ThreadId -> ThreadId#
167 id2TSO (ThreadId t) = t
169 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
172 cmpThread :: ThreadId -> ThreadId -> Ordering
174 case cmp_thread (id2TSO t1) (id2TSO t2) of
179 instance Eq ThreadId where
181 case t1 `cmpThread` t2 of
185 instance Ord ThreadId where
189 This sparks off a new thread to run the 'IO' computation passed as the
190 first argument, and returns the 'ThreadId' of the newly created
193 The new thread will be a lightweight thread; if you want to use a foreign
194 library that uses thread-local storage, use 'forkOS' instead.
196 forkIO :: IO () -> IO ThreadId
197 forkIO action = IO $ \ s ->
198 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
200 action_plus = catchException action childHandler
202 forkOnIO :: Int -> IO () -> IO ThreadId
203 forkOnIO (I# cpu) action = IO $ \ s ->
204 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
206 action_plus = catchException action childHandler
208 -- | the value passed to the @+RTS -N@ flag. This is the number of
209 -- Haskell threads that can run truly simultaneously at any given
210 -- time, and is typically set to the number of physical CPU cores on
212 numCapabilities :: Int
213 numCapabilities = unsafePerformIO $ do
214 n <- peek n_capabilities
215 return (fromIntegral n)
217 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
219 childHandler :: Exception -> IO ()
220 childHandler err = catchException (real_handler err) childHandler
222 real_handler :: Exception -> IO ()
225 -- ignore thread GC and killThread exceptions:
226 BlockedOnDeadMVar -> return ()
227 BlockedIndefinitely -> return ()
228 AsyncException ThreadKilled -> return ()
230 -- report all others:
231 AsyncException StackOverflow -> reportStackOverflow
232 other -> reportError other
234 {- | 'killThread' terminates the given thread (GHC only).
235 Any work already done by the thread isn\'t
236 lost: the computation is suspended until required by another thread.
237 The memory used by the thread will be garbage collected if it isn\'t
238 referenced from anywhere. The 'killThread' function is defined in
241 > killThread tid = throwTo tid (AsyncException ThreadKilled)
244 killThread :: ThreadId -> IO ()
245 killThread tid = throwTo tid (AsyncException ThreadKilled)
247 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
249 'throwTo' does not return until the exception has been raised in the
251 The calling thread can thus be certain that the target
252 thread has received the exception. This is a useful property to know
253 when dealing with race conditions: eg. if there are two threads that
254 can kill each other, it is guaranteed that only one of the threads
255 will get to kill the other.
257 If the target thread is currently making a foreign call, then the
258 exception will not be raised (and hence 'throwTo' will not return)
259 until the call has completed. This is the case regardless of whether
260 the call is inside a 'block' or not.
262 Important note: the behaviour of 'throwTo' differs from that described in
263 the paper \"Asynchronous exceptions in Haskell\"
264 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
265 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
266 a more synchronous design in which 'throwTo' does not return until the exception
267 is received by the target thread. The trade-off is discussed in Section 8 of the paper.
268 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
271 There is currently no guarantee that the exception delivered by 'throwTo' will be
272 delivered at the first possible opportunity. In particular, if a thread may
273 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
274 a pending 'throwTo'. This is arguably undesirable behaviour.
277 throwTo :: ThreadId -> Exception -> IO ()
278 throwTo (ThreadId id) ex = IO $ \ s ->
279 case (killThread# id ex s) of s1 -> (# s1, () #)
281 -- | Returns the 'ThreadId' of the calling thread (GHC only).
282 myThreadId :: IO ThreadId
283 myThreadId = IO $ \s ->
284 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
287 -- |The 'yield' action allows (forces, in a co-operative multitasking
288 -- implementation) a context-switch to any other currently runnable
289 -- threads (if any), and is occasionally useful when implementing
290 -- concurrency abstractions.
293 case (yield# s) of s1 -> (# s1, () #)
295 {- | 'labelThread' stores a string as identifier for this thread if
296 you built a RTS with debugging support. This identifier will be used in
297 the debugging output to make distinction of different threads easier
298 (otherwise you only have the thread state object\'s address in the heap).
300 Other applications like the graphical Concurrent Haskell Debugger
301 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
302 'labelThread' for their purposes as well.
305 labelThread :: ThreadId -> String -> IO ()
306 labelThread (ThreadId t) str = IO $ \ s ->
307 let ps = packCString# str
308 adr = byteArrayContents# ps in
309 case (labelThread# t adr s) of s1 -> (# s1, () #)
311 -- Nota Bene: 'pseq' used to be 'seq'
312 -- but 'seq' is now defined in PrelGHC
314 -- "pseq" is defined a bit weirdly (see below)
316 -- The reason for the strange "lazy" call is that
317 -- it fools the compiler into thinking that pseq and par are non-strict in
318 -- their second argument (even if it inlines pseq at the call site).
319 -- If it thinks pseq is strict in "y", then it often evaluates
320 -- "y" before "x", which is totally wrong.
324 pseq x y = x `seq` lazy y
328 par x y = case (par# x) of { _ -> lazy y }
332 %************************************************************************
334 \subsection[stm]{Transactional heap operations}
336 %************************************************************************
338 TVars are shared memory locations which support atomic memory
342 -- |A monad supporting atomic memory transactions.
343 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
345 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
348 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
350 instance Functor STM where
351 fmap f x = x >>= (return . f)
353 instance Monad STM where
354 {-# INLINE return #-}
358 return x = returnSTM x
359 m >>= k = bindSTM m k
361 bindSTM :: STM a -> (a -> STM b) -> STM b
362 bindSTM (STM m) k = STM ( \s ->
364 (# new_s, a #) -> unSTM (k a) new_s
367 thenSTM :: STM a -> STM b -> STM b
368 thenSTM (STM m) k = STM ( \s ->
370 (# new_s, a #) -> unSTM k new_s
373 returnSTM :: a -> STM a
374 returnSTM x = STM (\s -> (# s, x #))
376 -- | Unsafely performs IO in the STM monad.
377 unsafeIOToSTM :: IO a -> STM a
378 unsafeIOToSTM (IO m) = STM m
380 -- |Perform a series of STM actions atomically.
382 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
383 -- Any attempt to do so will result in a runtime error. (Reason: allowing
384 -- this would effectively allow a transaction inside a transaction, depending
385 -- on exactly when the thunk is evaluated.)
387 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
388 -- and which allows top-level TVars to be allocated.
390 atomically :: STM a -> IO a
391 atomically (STM m) = IO (\s -> (atomically# m) s )
393 -- |Retry execution of the current memory transaction because it has seen
394 -- values in TVars which mean that it should not continue (e.g. the TVars
395 -- represent a shared buffer that is now empty). The implementation may
396 -- block the thread until one of the TVars that it has read from has been
397 -- udpated. (GHC only)
399 retry = STM $ \s# -> retry# s#
401 -- |Compose two alternative STM actions (GHC only). If the first action
402 -- completes without retrying then it forms the result of the orElse.
403 -- Otherwise, if the first action retries, then the second action is
404 -- tried in its place. If both actions retry then the orElse as a
406 orElse :: STM a -> STM a -> STM a
407 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
409 -- |Exception handling within STM actions.
410 catchSTM :: STM a -> (Exception -> STM a) -> STM a
411 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
413 -- | Low-level primitive on which always and alwaysSucceeds are built.
414 -- checkInv differs form these in that (i) the invariant is not
415 -- checked when checkInv is called, only at the end of this and
416 -- subsequent transcations, (ii) the invariant failure is indicated
417 -- by raising an exception.
418 checkInv :: STM a -> STM ()
419 checkInv (STM m) = STM (\s -> (check# m) s)
421 -- | alwaysSucceeds adds a new invariant that must be true when passed
422 -- to alwaysSucceeds, at the end of the current transaction, and at
423 -- the end of every subsequent transaction. If it fails at any
424 -- of those points then the transaction violating it is aborted
425 -- and the exception raised by the invariant is propagated.
426 alwaysSucceeds :: STM a -> STM ()
427 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
430 -- | always is a variant of alwaysSucceeds in which the invariant is
431 -- expressed as an STM Bool action that must return True. Returning
432 -- False or raising an exception are both treated as invariant failures.
433 always :: STM Bool -> STM ()
434 always i = alwaysSucceeds ( do v <- i
435 if (v) then return () else ( error "Transacional invariant violation" ) )
437 -- |Shared memory locations that support atomic memory transactions.
438 data TVar a = TVar (TVar# RealWorld a)
440 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
442 instance Eq (TVar a) where
443 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
445 -- |Create a new TVar holding a value supplied
446 newTVar :: a -> STM (TVar a)
447 newTVar val = STM $ \s1# ->
448 case newTVar# val s1# of
449 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
451 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
452 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
453 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
455 newTVarIO :: a -> IO (TVar a)
456 newTVarIO val = IO $ \s1# ->
457 case newTVar# val s1# of
458 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
460 -- |Return the current value stored in a TVar
461 readTVar :: TVar a -> STM a
462 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
464 -- |Write the supplied value into a TVar
465 writeTVar :: TVar a -> a -> STM ()
466 writeTVar (TVar tvar#) val = STM $ \s1# ->
467 case writeTVar# tvar# val s1# of
472 %************************************************************************
474 \subsection[mvars]{M-Structures}
476 %************************************************************************
478 M-Vars are rendezvous points for concurrent threads. They begin
479 empty, and any attempt to read an empty M-Var blocks. When an M-Var
480 is written, a single blocked thread may be freed. Reading an M-Var
481 toggles its state from full back to empty. Therefore, any value
482 written to an M-Var may only be read once. Multiple reads and writes
483 are allowed, but there must be at least one read between any two
487 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
489 -- |Create an 'MVar' which is initially empty.
490 newEmptyMVar :: IO (MVar a)
491 newEmptyMVar = IO $ \ s# ->
493 (# s2#, svar# #) -> (# s2#, MVar svar# #)
495 -- |Create an 'MVar' which contains the supplied value.
496 newMVar :: a -> IO (MVar a)
498 newEmptyMVar >>= \ mvar ->
499 putMVar mvar value >>
502 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
503 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
504 -- the 'MVar' is left empty.
506 -- There are two further important properties of 'takeMVar':
508 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
509 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
510 -- only one thread will be woken up. The runtime guarantees that
511 -- the woken thread completes its 'takeMVar' operation.
513 -- * When multiple threads are blocked on an 'MVar', they are
514 -- woken up in FIFO order. This is useful for providing
515 -- fairness properties of abstractions built using 'MVar's.
517 takeMVar :: MVar a -> IO a
518 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
520 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
521 -- 'putMVar' will wait until it becomes empty.
523 -- There are two further important properties of 'putMVar':
525 -- * 'putMVar' is single-wakeup. That is, if there are multiple
526 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
527 -- only one thread will be woken up. The runtime guarantees that
528 -- the woken thread completes its 'putMVar' operation.
530 -- * When multiple threads are blocked on an 'MVar', they are
531 -- woken up in FIFO order. This is useful for providing
532 -- fairness properties of abstractions built using 'MVar's.
534 putMVar :: MVar a -> a -> IO ()
535 putMVar (MVar mvar#) x = IO $ \ s# ->
536 case putMVar# mvar# x s# of
539 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
540 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
541 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
542 -- the 'MVar' is left empty.
543 tryTakeMVar :: MVar a -> IO (Maybe a)
544 tryTakeMVar (MVar m) = IO $ \ s ->
545 case tryTakeMVar# m s of
546 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
547 (# s, _, a #) -> (# s, Just a #) -- MVar is full
549 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
550 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
551 -- it was successful, or 'False' otherwise.
552 tryPutMVar :: MVar a -> a -> IO Bool
553 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
554 case tryPutMVar# mvar# x s# of
555 (# s, 0# #) -> (# s, False #)
556 (# s, _ #) -> (# s, True #)
558 -- |Check whether a given 'MVar' is empty.
560 -- Notice that the boolean value returned is just a snapshot of
561 -- the state of the MVar. By the time you get to react on its result,
562 -- the MVar may have been filled (or emptied) - so be extremely
563 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
564 isEmptyMVar :: MVar a -> IO Bool
565 isEmptyMVar (MVar mv#) = IO $ \ s# ->
566 case isEmptyMVar# mv# s# of
567 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
569 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
570 -- "System.Mem.Weak" for more about finalizers.
571 addMVarFinalizer :: MVar a -> IO () -> IO ()
572 addMVarFinalizer (MVar m) finalizer =
573 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
575 withMVar :: MVar a -> (a -> IO b) -> IO b
579 b <- catchException (unblock (io a))
580 (\e -> do putMVar m a; throw e)
586 %************************************************************************
588 \subsection{Thread waiting}
590 %************************************************************************
593 #ifdef mingw32_HOST_OS
595 -- Note: threadWaitRead and threadWaitWrite aren't really functional
596 -- on Win32, but left in there because lib code (still) uses them (the manner
597 -- in which they're used doesn't cause problems on a Win32 platform though.)
599 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
600 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
601 IO $ \s -> case asyncRead# fd isSock len buf s of
602 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
604 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
605 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
606 IO $ \s -> case asyncWrite# fd isSock len buf s of
607 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
609 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
610 asyncDoProc (FunPtr proc) (Ptr param) =
611 -- the 'length' value is ignored; simplifies implementation of
612 -- the async*# primops to have them all return the same result.
613 IO $ \s -> case asyncDoProc# proc param s of
614 (# s, len#, err# #) -> (# s, I# err# #)
616 -- to aid the use of these primops by the IO Handle implementation,
617 -- provide the following convenience funs:
619 -- this better be a pinned byte array!
620 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
621 asyncReadBA fd isSock len off bufB =
622 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
624 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
625 asyncWriteBA fd isSock len off bufB =
626 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
630 -- -----------------------------------------------------------------------------
633 -- | Block the current thread until data is available to read on the
634 -- given file descriptor (GHC only).
635 threadWaitRead :: Fd -> IO ()
637 #ifndef mingw32_HOST_OS
638 | threaded = waitForReadEvent fd
640 | otherwise = IO $ \s ->
641 case fromIntegral fd of { I# fd# ->
642 case waitRead# fd# s of { s -> (# s, () #)
645 -- | Block the current thread until data can be written to the
646 -- given file descriptor (GHC only).
647 threadWaitWrite :: Fd -> IO ()
649 #ifndef mingw32_HOST_OS
650 | threaded = waitForWriteEvent fd
652 | otherwise = IO $ \s ->
653 case fromIntegral fd of { I# fd# ->
654 case waitWrite# fd# s of { s -> (# s, () #)
657 -- | Suspends the current thread for a given number of microseconds
660 -- There is no guarantee that the thread will be rescheduled promptly
661 -- when the delay has expired, but the thread will never continue to
662 -- run /earlier/ than specified.
664 threadDelay :: Int -> IO ()
666 | threaded = waitForDelayEvent time
667 | otherwise = IO $ \s ->
668 case fromIntegral time of { I# time# ->
669 case delay# time# s of { s -> (# s, () #)
673 -- | Set the value of returned TVar to True after a given number of
674 -- microseconds. The caveats associated with threadDelay also apply.
676 registerDelay :: Int -> IO (TVar Bool)
678 | threaded = waitForDelayEventSTM usecs
679 | otherwise = error "registerDelay: requires -threaded"
681 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
683 waitForDelayEvent :: Int -> IO ()
684 waitForDelayEvent usecs = do
686 target <- calculateTarget usecs
687 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
691 -- Delays for use in STM
692 waitForDelayEventSTM :: Int -> IO (TVar Bool)
693 waitForDelayEventSTM usecs = do
694 t <- atomically $ newTVar False
695 target <- calculateTarget usecs
696 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
700 calculateTarget :: Int -> IO USecs
701 calculateTarget usecs = do
703 return $ now + (fromIntegral usecs)
706 -- ----------------------------------------------------------------------------
707 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
709 -- In the threaded RTS, we employ a single IO Manager thread to wait
710 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
711 -- and delays (threadDelay).
713 -- We can do this because in the threaded RTS the IO Manager can make
714 -- a non-blocking call to select(), so we don't have to do select() in
715 -- the scheduler as we have to in the non-threaded RTS. We get performance
716 -- benefits from doing it this way, because we only have to restart the select()
717 -- when a new request arrives, rather than doing one select() each time
718 -- around the scheduler loop. Furthermore, the scheduler can be simplified
719 -- by not having to check for completed IO requests.
721 -- Issues, possible problems:
723 -- - we might want bound threads to just do the blocking
724 -- operation rather than communicating with the IO manager
725 -- thread. This would prevent simgle-threaded programs which do
726 -- IO from requiring multiple OS threads. However, it would also
727 -- prevent bound threads waiting on IO from being killed or sent
730 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
731 -- I couldn't repeat this.
733 -- - How do we handle signal delivery in the multithreaded RTS?
735 -- - forkProcess will kill the IO manager thread. Let's just
736 -- hope we don't need to do any blocking IO between fork & exec.
738 #ifndef mingw32_HOST_OS
740 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
741 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
745 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
746 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
748 #ifndef mingw32_HOST_OS
749 pendingEvents :: IORef [IOReq]
751 pendingDelays :: IORef [DelayReq]
752 -- could use a strict list or array here
753 {-# NOINLINE pendingEvents #-}
754 {-# NOINLINE pendingDelays #-}
755 (pendingEvents,pendingDelays) = unsafePerformIO $ do
760 -- the first time we schedule an IO request, the service thread
761 -- will be created (cool, huh?)
763 ensureIOManagerIsRunning :: IO ()
764 ensureIOManagerIsRunning
765 | threaded = seq pendingEvents $ return ()
766 | otherwise = return ()
768 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
769 insertDelay d [] = [d]
770 insertDelay d1 ds@(d2 : rest)
771 | delayTime d1 <= delayTime d2 = d1 : ds
772 | otherwise = d2 : insertDelay d1 rest
774 delayTime :: DelayReq -> USecs
775 delayTime (Delay t _) = t
776 delayTime (DelaySTM t _) = t
780 -- XXX: move into GHC.IOBase from Data.IORef?
781 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
782 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
784 foreign import ccall unsafe "getUSecOfDay"
785 getUSecOfDay :: IO USecs
787 prodding :: IORef Bool
788 {-# NOINLINE prodding #-}
789 prodding = unsafePerformIO (newIORef False)
791 prodServiceThread :: IO ()
792 prodServiceThread = do
793 was_set <- atomicModifyIORef prodding (\a -> (True,a))
794 if (not (was_set)) then wakeupIOManager else return ()
796 #ifdef mingw32_HOST_OS
797 -- ----------------------------------------------------------------------------
798 -- Windows IO manager thread
800 startIOManagerThread :: IO ()
801 startIOManagerThread = do
802 wakeup <- c_getIOManagerEvent
803 forkIO $ service_loop wakeup []
806 service_loop :: HANDLE -- read end of pipe
807 -> [DelayReq] -- current delay requests
810 service_loop wakeup old_delays = do
811 -- pick up new delay requests
812 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
813 let delays = foldr insertDelay old_delays new_delays
816 (delays', timeout) <- getDelay now delays
818 r <- c_WaitForSingleObject wakeup timeout
820 0xffffffff -> do c_maperrno; throwErrno "service_loop"
822 r <- c_readIOManagerEvent
825 _ | r == io_MANAGER_WAKEUP -> return False
826 _ | r == io_MANAGER_DIE -> return True
827 0 -> return False -- spurious wakeup
828 r -> do start_console_handler (r `shiftR` 1); return False
831 else service_cont wakeup delays'
833 _other -> service_cont wakeup delays' -- probably timeout
835 service_cont wakeup delays = do
836 atomicModifyIORef prodding (\_ -> (False,False))
837 service_loop wakeup delays
839 -- must agree with rts/win32/ThrIOManager.c
840 io_MANAGER_WAKEUP = 0xffffffff :: Word32
841 io_MANAGER_DIE = 0xfffffffe :: Word32
847 -- these are sent to Services only.
850 deriving (Eq, Ord, Enum, Show, Read, Typeable)
852 start_console_handler :: Word32 -> IO ()
853 start_console_handler r =
854 case toWin32ConsoleEvent r of
855 Just x -> withMVar win32ConsoleHandler $ \handler -> do
860 toWin32ConsoleEvent ev =
862 0 {- CTRL_C_EVENT-} -> Just ControlC
863 1 {- CTRL_BREAK_EVENT-} -> Just Break
864 2 {- CTRL_CLOSE_EVENT-} -> Just Close
865 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
866 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
869 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
870 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
872 stick :: IORef HANDLE
873 {-# NOINLINE stick #-}
874 stick = unsafePerformIO (newIORef nullPtr)
877 hdl <- readIORef stick
878 c_sendIOManagerEvent io_MANAGER_WAKEUP
880 -- Walk the queue of pending delays, waking up any that have passed
881 -- and return the smallest delay to wait for. The queue of pending
882 -- delays is kept ordered.
883 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
884 getDelay now [] = return ([], iNFINITE)
885 getDelay now all@(d : rest)
887 Delay time m | now >= time -> do
890 DelaySTM time t | now >= time -> do
891 atomically $ writeTVar t True
894 -- delay is in millisecs for WaitForSingleObject
895 let micro_seconds = delayTime d - now
896 milli_seconds = (micro_seconds + 999) `div` 1000
897 in return (all, fromIntegral milli_seconds)
899 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
900 -- available yet. We should move some Win32 functionality down here,
901 -- maybe as part of the grand reorganisation of the base package...
905 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
907 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
908 c_getIOManagerEvent :: IO HANDLE
910 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
911 c_readIOManagerEvent :: IO Word32
913 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
914 c_sendIOManagerEvent :: Word32 -> IO ()
916 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
919 foreign import stdcall "WaitForSingleObject"
920 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
923 -- ----------------------------------------------------------------------------
924 -- Unix IO manager thread, using select()
926 startIOManagerThread :: IO ()
927 startIOManagerThread = do
928 allocaArray 2 $ \fds -> do
929 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
930 rd_end <- peekElemOff fds 0
931 wr_end <- peekElemOff fds 1
932 writeIORef stick (fromIntegral wr_end)
933 c_setIOManagerPipe wr_end
935 allocaBytes sizeofFdSet $ \readfds -> do
936 allocaBytes sizeofFdSet $ \writefds -> do
937 allocaBytes sizeofTimeVal $ \timeval -> do
938 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
942 :: Fd -- listen to this for wakeup calls
949 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
951 -- pick up new IO requests
952 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
953 let reqs = new_reqs ++ old_reqs
955 -- pick up new delay requests
956 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
957 let delays = foldr insertDelay old_delays new_delays
959 -- build the FDSets for select()
963 maxfd <- buildFdSets 0 readfds writefds reqs
965 -- perform the select()
966 let do_select delays = do
967 -- check the current time and wake up any thread in
968 -- threadDelay whose timeout has expired. Also find the
969 -- timeout value for the select() call.
971 (delays', timeout) <- getDelay now ptimeval delays
973 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
979 _ | err == eINTR -> do_select delays'
980 -- EINTR: just redo the select()
981 _ | err == eBADF -> return (True, delays)
982 -- EBADF: one of the file descriptors is closed or bad,
983 -- we don't know which one, so wake everyone up.
984 _ | otherwise -> throwErrno "select"
985 -- otherwise (ENOMEM or EINVAL) something has gone
986 -- wrong; report the error.
988 return (False,delays')
990 (wakeup_all,delays') <- do_select delays
993 if wakeup_all then return False
995 b <- fdIsSet wakeup readfds
998 else alloca $ \p -> do
999 c_read (fromIntegral wakeup) p 1; return ()
1002 _ | s == io_MANAGER_WAKEUP -> return False
1003 _ | s == io_MANAGER_DIE -> return True
1004 _ -> withMVar signalHandlerLock $ \_ -> do
1005 handler_tbl <- peek handlers
1006 sp <- peekElemOff handler_tbl (fromIntegral s)
1007 io <- deRefStablePtr sp
1011 if exit then return () else do
1013 atomicModifyIORef prodding (\_ -> (False,False))
1015 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1016 else completeRequests reqs readfds writefds []
1018 service_loop wakeup readfds writefds ptimeval reqs' delays'
1020 io_MANAGER_WAKEUP = 0xff :: CChar
1021 io_MANAGER_DIE = 0xfe :: CChar
1024 {-# NOINLINE stick #-}
1025 stick = unsafePerformIO (newIORef 0)
1027 wakeupIOManager :: IO ()
1028 wakeupIOManager = do
1029 fd <- readIORef stick
1030 with io_MANAGER_WAKEUP $ \pbuf -> do
1031 c_write (fromIntegral fd) pbuf 1; return ()
1033 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1034 -- this race condition is #1922, although that bug was on Windows a similar
1035 -- bug also exists on Unix.
1036 signalHandlerLock :: MVar ()
1037 signalHandlerLock = unsafePerformIO (newMVar ())
1039 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1041 foreign import ccall "setIOManagerPipe"
1042 c_setIOManagerPipe :: CInt -> IO ()
1044 -- -----------------------------------------------------------------------------
1047 buildFdSets maxfd readfds writefds [] = return maxfd
1048 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1049 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1052 buildFdSets (max maxfd fd) readfds writefds reqs
1053 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1054 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1057 buildFdSets (max maxfd fd) readfds writefds reqs
1059 completeRequests [] _ _ reqs' = return reqs'
1060 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1061 b <- fdIsSet fd readfds
1063 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1064 else completeRequests reqs readfds writefds (Read fd m : reqs')
1065 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1066 b <- fdIsSet fd writefds
1068 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1069 else completeRequests reqs readfds writefds (Write fd m : reqs')
1071 wakeupAll [] = return ()
1072 wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
1073 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1075 waitForReadEvent :: Fd -> IO ()
1076 waitForReadEvent fd = do
1078 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1082 waitForWriteEvent :: Fd -> IO ()
1083 waitForWriteEvent fd = do
1085 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1089 -- -----------------------------------------------------------------------------
1092 -- Walk the queue of pending delays, waking up any that have passed
1093 -- and return the smallest delay to wait for. The queue of pending
1094 -- delays is kept ordered.
1095 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1096 getDelay now ptimeval [] = return ([],nullPtr)
1097 getDelay now ptimeval all@(d : rest)
1099 Delay time m | now >= time -> do
1101 getDelay now ptimeval rest
1102 DelaySTM time t | now >= time -> do
1103 atomically $ writeTVar t True
1104 getDelay now ptimeval rest
1106 setTimevalTicks ptimeval (delayTime d - now)
1107 return (all,ptimeval)
1109 newtype CTimeVal = CTimeVal ()
1111 foreign import ccall unsafe "sizeofTimeVal"
1112 sizeofTimeVal :: Int
1114 foreign import ccall unsafe "setTimevalTicks"
1115 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1118 On Win32 we're going to have a single Pipe, and a
1119 waitForSingleObject with the delay time. For signals, we send a
1120 byte down the pipe just like on Unix.
1123 -- ----------------------------------------------------------------------------
1124 -- select() interface
1126 -- ToDo: move to System.Posix.Internals?
1128 newtype CFdSet = CFdSet ()
1130 foreign import ccall safe "select"
1131 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1134 foreign import ccall unsafe "hsFD_SETSIZE"
1135 c_fD_SETSIZE :: CInt
1138 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1140 foreign import ccall unsafe "hsFD_CLR"
1141 c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1143 fdClr :: Fd -> Ptr CFdSet -> IO ()
1144 fdClr (Fd fd) fdset = c_fdClr fd fdset
1146 foreign import ccall unsafe "hsFD_ISSET"
1147 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1149 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1150 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1152 foreign import ccall unsafe "hsFD_SET"
1153 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1155 fdSet :: Fd -> Ptr CFdSet -> IO ()
1156 fdSet (Fd fd) fdset = c_fdSet fd fdset
1158 foreign import ccall unsafe "hsFD_ZERO"
1159 fdZero :: Ptr CFdSet -> IO ()
1161 foreign import ccall unsafe "sizeof_fd_set"