2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
8 -- Copyright : (c) The University of Glasgow, 1994-2002
9 -- License : see libraries/base/LICENSE
11 -- Maintainer : cvs-ghc@haskell.org
12 -- Stability : internal
13 -- Portability : non-portable (GHC extensions)
15 -- Basic concurrency stuff.
17 -----------------------------------------------------------------------------
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home. Hence:
30 -- * Forking and suchlike
31 , forkIO -- :: IO a -> IO ThreadId
32 , forkOnIO -- :: Int -> IO a -> IO ThreadId
33 , numCapabilities -- :: Int
34 , childHandler -- :: Exception -> IO ()
35 , myThreadId -- :: IO ThreadId
36 , killThread -- :: ThreadId -> IO ()
37 , throwTo -- :: ThreadId -> Exception -> IO ()
38 , par -- :: a -> b -> b
39 , pseq -- :: a -> b -> b
42 , labelThread -- :: ThreadId -> String -> IO ()
44 , ThreadStatus(..), BlockReason(..)
45 , threadStatus -- :: ThreadId -> IO ThreadStatus
48 , threadDelay -- :: Int -> IO ()
49 , registerDelay -- :: Int -> IO (TVar Bool)
50 , threadWaitRead -- :: Int -> IO ()
51 , threadWaitWrite -- :: Int -> IO ()
55 , atomically -- :: STM a -> IO a
57 , orElse -- :: STM a -> STM a -> STM a
58 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
59 , alwaysSucceeds -- :: STM a -> STM ()
60 , always -- :: STM Bool -> STM ()
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , readTVarIO -- :: TVar a -> IO a
66 , writeTVar -- :: a -> TVar a -> STM ()
67 , unsafeIOToSTM -- :: IO a -> STM a
71 #ifdef mingw32_HOST_OS
72 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
76 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #ifndef mingw32_HOST_OS
81 , Signal, HandlerFun, setHandler, runHandlers
84 , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
89 #ifdef mingw32_HOST_OS
94 , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO ()
95 , getUncaughtExceptionHandler -- :: IO (Exception -> IO ())
97 , reportError, reportStackOverflow
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
107 #ifdef mingw32_HOST_OS
111 #ifndef mingw32_HOST_OS
119 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
120 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
122 import GHC.IO.Exception
126 import GHC.Num ( Num(..) )
127 import GHC.Real ( fromIntegral )
128 #ifndef mingw32_HOST_OS
130 import GHC.Arr ( inRange )
132 #ifdef mingw32_HOST_OS
133 import GHC.Real ( div )
136 #ifdef mingw32_HOST_OS
137 import GHC.Read ( Read )
138 import GHC.Enum ( Enum )
140 import GHC.Pack ( packCString# )
141 import GHC.Show ( Show(..), showString )
144 infixr 0 `par`, `pseq`
147 %************************************************************************
149 \subsection{@ThreadId@, @par@, and @fork@}
151 %************************************************************************
154 data ThreadId = ThreadId ThreadId# deriving( Typeable )
155 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
156 -- But since ThreadId# is unlifted, the Weak type must use open
159 A 'ThreadId' is an abstract type representing a handle to a thread.
160 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
161 the 'Ord' instance implements an arbitrary total ordering over
162 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
163 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
164 useful when debugging or diagnosing the behaviour of a concurrent
167 /Note/: in GHC, if you have a 'ThreadId', you essentially have
168 a pointer to the thread itself. This means the thread itself can\'t be
169 garbage collected until you drop the 'ThreadId'.
170 This misfeature will hopefully be corrected at a later date.
172 /Note/: Hugs does not provide any operations on other threads;
173 it defines 'ThreadId' as a synonym for ().
176 instance Show ThreadId where
178 showString "ThreadId " .
179 showsPrec d (getThreadId (id2TSO t))
181 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
183 id2TSO :: ThreadId -> ThreadId#
184 id2TSO (ThreadId t) = t
186 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
189 cmpThread :: ThreadId -> ThreadId -> Ordering
191 case cmp_thread (id2TSO t1) (id2TSO t2) of
196 instance Eq ThreadId where
198 case t1 `cmpThread` t2 of
202 instance Ord ThreadId where
206 Sparks off a new thread to run the 'IO' computation passed as the
207 first argument, and returns the 'ThreadId' of the newly created
210 The new thread will be a lightweight thread; if you want to use a foreign
211 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
213 GHC note: the new thread inherits the /blocked/ state of the parent
214 (see 'Control.Exception.block').
216 The newly created thread has an exception handler that discards the
217 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
218 'ThreadKilled', and passes all other exceptions to the uncaught
219 exception handler (see 'setUncaughtExceptionHandler').
221 forkIO :: IO () -> IO ThreadId
222 forkIO action = IO $ \ s ->
223 case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
225 action_plus = catchException action childHandler
228 Like 'forkIO', but lets you specify on which CPU the thread is
229 created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
230 will stay on the same CPU for its entire lifetime (`forkIO` threads
231 can migrate between CPUs according to the scheduling policy).
232 `forkOnIO` is useful for overriding the scheduling policy when you
233 know in advance how best to distribute the threads.
235 The `Int` argument specifies the CPU number; it is interpreted modulo
236 'numCapabilities' (note that it actually specifies a capability number
237 rather than a CPU number, but to a first approximation the two are
240 forkOnIO :: Int -> IO () -> IO ThreadId
241 forkOnIO (I# cpu) action = IO $ \ s ->
242 case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
244 action_plus = catchException action childHandler
246 -- | the value passed to the @+RTS -N@ flag. This is the number of
247 -- Haskell threads that can run truly simultaneously at any given
248 -- time, and is typically set to the number of physical CPU cores on
250 numCapabilities :: Int
251 numCapabilities = unsafePerformIO $ do
252 n <- peek n_capabilities
253 return (fromIntegral n)
255 #if defined(mingw32_HOST_OS) && defined(__PIC__)
256 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
258 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
260 childHandler :: SomeException -> IO ()
261 childHandler err = catchException (real_handler err) childHandler
263 real_handler :: SomeException -> IO ()
264 real_handler se@(SomeException ex) =
265 -- ignore thread GC and killThread exceptions:
267 Just BlockedOnDeadMVar -> return ()
269 Just BlockedIndefinitely -> return ()
271 Just ThreadKilled -> return ()
273 -- report all others:
274 Just StackOverflow -> reportStackOverflow
277 {- | 'killThread' terminates the given thread (GHC only).
278 Any work already done by the thread isn\'t
279 lost: the computation is suspended until required by another thread.
280 The memory used by the thread will be garbage collected if it isn\'t
281 referenced from anywhere. The 'killThread' function is defined in
284 > killThread tid = throwTo tid ThreadKilled
286 Killthread is a no-op if the target thread has already completed.
288 killThread :: ThreadId -> IO ()
289 killThread tid = throwTo tid ThreadKilled
291 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
293 'throwTo' does not return until the exception has been raised in the
295 The calling thread can thus be certain that the target
296 thread has received the exception. This is a useful property to know
297 when dealing with race conditions: eg. if there are two threads that
298 can kill each other, it is guaranteed that only one of the threads
299 will get to kill the other.
301 If the target thread is currently making a foreign call, then the
302 exception will not be raised (and hence 'throwTo' will not return)
303 until the call has completed. This is the case regardless of whether
304 the call is inside a 'block' or not.
306 Important note: the behaviour of 'throwTo' differs from that described in
307 the paper \"Asynchronous exceptions in Haskell\"
308 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
309 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
310 a more synchronous design in which 'throwTo' does not return until the exception
311 is received by the target thread. The trade-off is discussed in Section 9 of the paper.
312 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
315 There is currently no guarantee that the exception delivered by 'throwTo' will be
316 delivered at the first possible opportunity. In particular, a thread may
317 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
318 a pending 'throwTo'. This is arguably undesirable behaviour.
321 throwTo :: Exception e => ThreadId -> e -> IO ()
322 throwTo (ThreadId tid) ex = IO $ \ s ->
323 case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
325 -- | Returns the 'ThreadId' of the calling thread (GHC only).
326 myThreadId :: IO ThreadId
327 myThreadId = IO $ \s ->
328 case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
331 -- |The 'yield' action allows (forces, in a co-operative multitasking
332 -- implementation) a context-switch to any other currently runnable
333 -- threads (if any), and is occasionally useful when implementing
334 -- concurrency abstractions.
337 case (yield# s) of s1 -> (# s1, () #)
339 {- | 'labelThread' stores a string as identifier for this thread if
340 you built a RTS with debugging support. This identifier will be used in
341 the debugging output to make distinction of different threads easier
342 (otherwise you only have the thread state object\'s address in the heap).
344 Other applications like the graphical Concurrent Haskell Debugger
345 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
346 'labelThread' for their purposes as well.
349 labelThread :: ThreadId -> String -> IO ()
350 labelThread (ThreadId t) str = IO $ \ s ->
351 let !ps = packCString# str
352 !adr = byteArrayContents# ps in
353 case (labelThread# t adr s) of s1 -> (# s1, () #)
355 -- Nota Bene: 'pseq' used to be 'seq'
356 -- but 'seq' is now defined in PrelGHC
358 -- "pseq" is defined a bit weirdly (see below)
360 -- The reason for the strange "lazy" call is that
361 -- it fools the compiler into thinking that pseq and par are non-strict in
362 -- their second argument (even if it inlines pseq at the call site).
363 -- If it thinks pseq is strict in "y", then it often evaluates
364 -- "y" before "x", which is totally wrong.
368 pseq x y = x `seq` lazy y
372 par x y = case (par# x) of { _ -> lazy y }
374 -- | Internal function used by the RTS to run sparks.
377 where loop s = case getSpark# s of
379 if n ==# 0# then (# s', () #)
384 -- ^blocked on on 'MVar'
386 -- ^blocked on a computation in progress by another thread
388 -- ^blocked in 'throwTo'
390 -- ^blocked in 'retry' in an STM transaction
391 | BlockedOnForeignCall
392 -- ^currently in a foreign call
394 -- ^blocked on some other resource. Without @-threaded@,
395 -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
396 -- they show up as 'BlockedOnMVar'.
397 deriving (Eq,Ord,Show)
399 -- | The current status of a thread
402 -- ^the thread is currently runnable or running
404 -- ^the thread has finished
405 | ThreadBlocked BlockReason
406 -- ^the thread is blocked on some resource
408 -- ^the thread received an uncaught exception
409 deriving (Eq,Ord,Show)
411 threadStatus :: ThreadId -> IO ThreadStatus
412 threadStatus (ThreadId t) = IO $ \s ->
413 case threadStatus# t s of
414 (# s', stat #) -> (# s', mk_stat (I# stat) #)
416 -- NB. keep these in sync with includes/Constants.h
417 mk_stat 0 = ThreadRunning
418 mk_stat 1 = ThreadBlocked BlockedOnMVar
419 mk_stat 2 = ThreadBlocked BlockedOnBlackHole
420 mk_stat 3 = ThreadBlocked BlockedOnException
421 mk_stat 7 = ThreadBlocked BlockedOnSTM
422 mk_stat 11 = ThreadBlocked BlockedOnForeignCall
423 mk_stat 12 = ThreadBlocked BlockedOnForeignCall
424 mk_stat 16 = ThreadFinished
425 mk_stat 17 = ThreadDied
426 mk_stat _ = ThreadBlocked BlockedOnOther
430 %************************************************************************
432 \subsection[stm]{Transactional heap operations}
434 %************************************************************************
436 TVars are shared memory locations which support atomic memory
440 -- |A monad supporting atomic memory transactions.
441 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
443 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
446 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
448 instance Functor STM where
449 fmap f x = x >>= (return . f)
451 instance Monad STM where
452 {-# INLINE return #-}
456 return x = returnSTM x
457 m >>= k = bindSTM m k
459 bindSTM :: STM a -> (a -> STM b) -> STM b
460 bindSTM (STM m) k = STM ( \s ->
462 (# new_s, a #) -> unSTM (k a) new_s
465 thenSTM :: STM a -> STM b -> STM b
466 thenSTM (STM m) k = STM ( \s ->
468 (# new_s, _ #) -> unSTM k new_s
471 returnSTM :: a -> STM a
472 returnSTM x = STM (\s -> (# s, x #))
474 -- | Unsafely performs IO in the STM monad. Beware: this is a highly
475 -- dangerous thing to do.
477 -- * The STM implementation will often run transactions multiple
478 -- times, so you need to be prepared for this if your IO has any
481 -- * The STM implementation will abort transactions that are known to
482 -- be invalid and need to be restarted. This may happen in the middle
483 -- of `unsafeIOToSTM`, so make sure you don't acquire any resources
484 -- that need releasing (exception handlers are ignored when aborting
485 -- the transaction). That includes doing any IO using Handles, for
486 -- example. Getting this wrong will probably lead to random deadlocks.
488 -- * The transaction may have seen an inconsistent view of memory when
489 -- the IO runs. Invariants that you expect to be true throughout
490 -- your program may not be true inside a transaction, due to the
491 -- way transactions are implemented. Normally this wouldn't be visible
492 -- to the programmer, but using `unsafeIOToSTM` can expose it.
494 unsafeIOToSTM :: IO a -> STM a
495 unsafeIOToSTM (IO m) = STM m
497 -- |Perform a series of STM actions atomically.
499 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
500 -- Any attempt to do so will result in a runtime error. (Reason: allowing
501 -- this would effectively allow a transaction inside a transaction, depending
502 -- on exactly when the thunk is evaluated.)
504 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
505 -- and which allows top-level TVars to be allocated.
507 atomically :: STM a -> IO a
508 atomically (STM m) = IO (\s -> (atomically# m) s )
510 -- |Retry execution of the current memory transaction because it has seen
511 -- values in TVars which mean that it should not continue (e.g. the TVars
512 -- represent a shared buffer that is now empty). The implementation may
513 -- block the thread until one of the TVars that it has read from has been
514 -- udpated. (GHC only)
516 retry = STM $ \s# -> retry# s#
518 -- |Compose two alternative STM actions (GHC only). If the first action
519 -- completes without retrying then it forms the result of the orElse.
520 -- Otherwise, if the first action retries, then the second action is
521 -- tried in its place. If both actions retry then the orElse as a
523 orElse :: STM a -> STM a -> STM a
524 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
526 -- |Exception handling within STM actions.
527 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
528 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
530 -- | Low-level primitive on which always and alwaysSucceeds are built.
531 -- checkInv differs form these in that (i) the invariant is not
532 -- checked when checkInv is called, only at the end of this and
533 -- subsequent transcations, (ii) the invariant failure is indicated
534 -- by raising an exception.
535 checkInv :: STM a -> STM ()
536 checkInv (STM m) = STM (\s -> (check# m) s)
538 -- | alwaysSucceeds adds a new invariant that must be true when passed
539 -- to alwaysSucceeds, at the end of the current transaction, and at
540 -- the end of every subsequent transaction. If it fails at any
541 -- of those points then the transaction violating it is aborted
542 -- and the exception raised by the invariant is propagated.
543 alwaysSucceeds :: STM a -> STM ()
544 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
547 -- | always is a variant of alwaysSucceeds in which the invariant is
548 -- expressed as an STM Bool action that must return True. Returning
549 -- False or raising an exception are both treated as invariant failures.
550 always :: STM Bool -> STM ()
551 always i = alwaysSucceeds ( do v <- i
552 if (v) then return () else ( error "Transacional invariant violation" ) )
554 -- |Shared memory locations that support atomic memory transactions.
555 data TVar a = TVar (TVar# RealWorld a)
557 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
559 instance Eq (TVar a) where
560 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
562 -- |Create a new TVar holding a value supplied
563 newTVar :: a -> STM (TVar a)
564 newTVar val = STM $ \s1# ->
565 case newTVar# val s1# of
566 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
568 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
569 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
570 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
572 newTVarIO :: a -> IO (TVar a)
573 newTVarIO val = IO $ \s1# ->
574 case newTVar# val s1# of
575 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
577 -- |Return the current value stored in a TVar.
578 -- This is equivalent to
580 -- > readTVarIO = atomically . readTVar
582 -- but works much faster, because it doesn't perform a complete
583 -- transaction, it just reads the current value of the 'TVar'.
584 readTVarIO :: TVar a -> IO a
585 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
587 -- |Return the current value stored in a TVar
588 readTVar :: TVar a -> STM a
589 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
591 -- |Write the supplied value into a TVar
592 writeTVar :: TVar a -> a -> STM ()
593 writeTVar (TVar tvar#) val = STM $ \s1# ->
594 case writeTVar# tvar# val s1# of
602 withMVar :: MVar a -> (a -> IO b) -> IO b
606 b <- catchAny (unblock (io a))
607 (\e -> do putMVar m a; throw e)
612 %************************************************************************
614 \subsection{Thread waiting}
616 %************************************************************************
619 #ifdef mingw32_HOST_OS
621 -- Note: threadWaitRead and threadWaitWrite aren't really functional
622 -- on Win32, but left in there because lib code (still) uses them (the manner
623 -- in which they're used doesn't cause problems on a Win32 platform though.)
625 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
626 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
627 IO $ \s -> case asyncRead# fd isSock len buf s of
628 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
630 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
631 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
632 IO $ \s -> case asyncWrite# fd isSock len buf s of
633 (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
635 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
636 asyncDoProc (FunPtr proc) (Ptr param) =
637 -- the 'length' value is ignored; simplifies implementation of
638 -- the async*# primops to have them all return the same result.
639 IO $ \s -> case asyncDoProc# proc param s of
640 (# s', _len#, err# #) -> (# s', I# err# #)
642 -- to aid the use of these primops by the IO Handle implementation,
643 -- provide the following convenience funs:
645 -- this better be a pinned byte array!
646 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
647 asyncReadBA fd isSock len off bufB =
648 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
650 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
651 asyncWriteBA fd isSock len off bufB =
652 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
656 -- -----------------------------------------------------------------------------
659 -- | Block the current thread until data is available to read on the
660 -- given file descriptor (GHC only).
661 threadWaitRead :: Fd -> IO ()
663 #ifndef mingw32_HOST_OS
664 | threaded = waitForReadEvent fd
666 | otherwise = IO $ \s ->
667 case fromIntegral fd of { I# fd# ->
668 case waitRead# fd# s of { s' -> (# s', () #)
671 -- | Block the current thread until data can be written to the
672 -- given file descriptor (GHC only).
673 threadWaitWrite :: Fd -> IO ()
675 #ifndef mingw32_HOST_OS
676 | threaded = waitForWriteEvent fd
678 | otherwise = IO $ \s ->
679 case fromIntegral fd of { I# fd# ->
680 case waitWrite# fd# s of { s' -> (# s', () #)
683 -- | Suspends the current thread for a given number of microseconds
686 -- There is no guarantee that the thread will be rescheduled promptly
687 -- when the delay has expired, but the thread will never continue to
688 -- run /earlier/ than specified.
690 threadDelay :: Int -> IO ()
692 | threaded = waitForDelayEvent time
693 | otherwise = IO $ \s ->
694 case fromIntegral time of { I# time# ->
695 case delay# time# s of { s' -> (# s', () #)
699 -- | Set the value of returned TVar to True after a given number of
700 -- microseconds. The caveats associated with threadDelay also apply.
702 registerDelay :: Int -> IO (TVar Bool)
704 | threaded = waitForDelayEventSTM usecs
705 | otherwise = error "registerDelay: requires -threaded"
707 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
709 waitForDelayEvent :: Int -> IO ()
710 waitForDelayEvent usecs = do
712 target <- calculateTarget usecs
713 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
717 -- Delays for use in STM
718 waitForDelayEventSTM :: Int -> IO (TVar Bool)
719 waitForDelayEventSTM usecs = do
720 t <- atomically $ newTVar False
721 target <- calculateTarget usecs
722 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
726 calculateTarget :: Int -> IO USecs
727 calculateTarget usecs = do
729 return $ now + (fromIntegral usecs)
732 -- ----------------------------------------------------------------------------
733 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
735 -- In the threaded RTS, we employ a single IO Manager thread to wait
736 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
737 -- and delays (threadDelay).
739 -- We can do this because in the threaded RTS the IO Manager can make
740 -- a non-blocking call to select(), so we don't have to do select() in
741 -- the scheduler as we have to in the non-threaded RTS. We get performance
742 -- benefits from doing it this way, because we only have to restart the select()
743 -- when a new request arrives, rather than doing one select() each time
744 -- around the scheduler loop. Furthermore, the scheduler can be simplified
745 -- by not having to check for completed IO requests.
747 -- Issues, possible problems:
749 -- - we might want bound threads to just do the blocking
750 -- operation rather than communicating with the IO manager
751 -- thread. This would prevent simgle-threaded programs which do
752 -- IO from requiring multiple OS threads. However, it would also
753 -- prevent bound threads waiting on IO from being killed or sent
756 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
757 -- I couldn't repeat this.
759 -- - How do we handle signal delivery in the multithreaded RTS?
761 -- - forkProcess will kill the IO manager thread. Let's just
762 -- hope we don't need to do any blocking IO between fork & exec.
764 #ifndef mingw32_HOST_OS
766 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
767 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
771 = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
772 | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
774 #ifndef mingw32_HOST_OS
775 pendingEvents :: IORef [IOReq]
777 pendingDelays :: IORef [DelayReq]
778 -- could use a strict list or array here
779 {-# NOINLINE pendingEvents #-}
780 {-# NOINLINE pendingDelays #-}
781 (pendingEvents,pendingDelays) = unsafePerformIO $ do
786 -- the first time we schedule an IO request, the service thread
787 -- will be created (cool, huh?)
789 ensureIOManagerIsRunning :: IO ()
790 ensureIOManagerIsRunning
791 | threaded = seq pendingEvents $ return ()
792 | otherwise = return ()
794 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
795 insertDelay d [] = [d]
796 insertDelay d1 ds@(d2 : rest)
797 | delayTime d1 <= delayTime d2 = d1 : ds
798 | otherwise = d2 : insertDelay d1 rest
800 delayTime :: DelayReq -> USecs
801 delayTime (Delay t _) = t
802 delayTime (DelaySTM t _) = t
806 foreign import ccall unsafe "getUSecOfDay"
807 getUSecOfDay :: IO USecs
809 prodding :: IORef Bool
810 {-# NOINLINE prodding #-}
811 prodding = unsafePerformIO (newIORef False)
813 prodServiceThread :: IO ()
814 prodServiceThread = do
815 was_set <- atomicModifyIORef prodding (\a -> (True,a))
816 if (not (was_set)) then wakeupIOManager else return ()
818 #ifdef mingw32_HOST_OS
819 -- ----------------------------------------------------------------------------
820 -- Windows IO manager thread
822 startIOManagerThread :: IO ()
823 startIOManagerThread = do
824 wakeup <- c_getIOManagerEvent
825 forkIO $ service_loop wakeup []
828 service_loop :: HANDLE -- read end of pipe
829 -> [DelayReq] -- current delay requests
832 service_loop wakeup old_delays = do
833 -- pick up new delay requests
834 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
835 let delays = foldr insertDelay old_delays new_delays
838 (delays', timeout) <- getDelay now delays
840 r <- c_WaitForSingleObject wakeup timeout
842 0xffffffff -> do c_maperrno; throwErrno "service_loop"
844 r2 <- c_readIOManagerEvent
847 _ | r2 == io_MANAGER_WAKEUP -> return False
848 _ | r2 == io_MANAGER_DIE -> return True
849 0 -> return False -- spurious wakeup
850 _ -> do start_console_handler (r2 `shiftR` 1); return False
851 unless exit $ service_cont wakeup delays'
853 _other -> service_cont wakeup delays' -- probably timeout
855 service_cont :: HANDLE -> [DelayReq] -> IO ()
856 service_cont wakeup delays = do
857 r <- atomicModifyIORef prodding (\_ -> (False,False))
858 r `seq` return () -- avoid space leak
859 service_loop wakeup delays
861 -- must agree with rts/win32/ThrIOManager.c
862 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
863 io_MANAGER_WAKEUP = 0xffffffff
864 io_MANAGER_DIE = 0xfffffffe
870 -- these are sent to Services only.
873 deriving (Eq, Ord, Enum, Show, Read, Typeable)
875 start_console_handler :: Word32 -> IO ()
876 start_console_handler r =
877 case toWin32ConsoleEvent r of
878 Just x -> withMVar win32ConsoleHandler $ \handler -> do
883 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
884 toWin32ConsoleEvent ev =
886 0 {- CTRL_C_EVENT-} -> Just ControlC
887 1 {- CTRL_BREAK_EVENT-} -> Just Break
888 2 {- CTRL_CLOSE_EVENT-} -> Just Close
889 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
890 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
893 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
894 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
896 -- XXX Is this actually needed?
897 stick :: IORef HANDLE
898 {-# NOINLINE stick #-}
899 stick = unsafePerformIO (newIORef nullPtr)
901 wakeupIOManager :: IO ()
903 _hdl <- readIORef stick
904 c_sendIOManagerEvent io_MANAGER_WAKEUP
906 -- Walk the queue of pending delays, waking up any that have passed
907 -- and return the smallest delay to wait for. The queue of pending
908 -- delays is kept ordered.
909 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
910 getDelay _ [] = return ([], iNFINITE)
911 getDelay now all@(d : rest)
913 Delay time m | now >= time -> do
916 DelaySTM time t | now >= time -> do
917 atomically $ writeTVar t True
920 -- delay is in millisecs for WaitForSingleObject
921 let micro_seconds = delayTime d - now
922 milli_seconds = (micro_seconds + 999) `div` 1000
923 in return (all, fromIntegral milli_seconds)
925 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
926 -- available yet. We should move some Win32 functionality down here,
927 -- maybe as part of the grand reorganisation of the base package...
932 iNFINITE = 0xFFFFFFFF -- urgh
934 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
935 c_getIOManagerEvent :: IO HANDLE
937 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
938 c_readIOManagerEvent :: IO Word32
940 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
941 c_sendIOManagerEvent :: Word32 -> IO ()
943 foreign import ccall unsafe "maperrno" -- in Win32Utils.c
946 foreign import stdcall "WaitForSingleObject"
947 c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
950 -- ----------------------------------------------------------------------------
951 -- Unix IO manager thread, using select()
953 startIOManagerThread :: IO ()
954 startIOManagerThread = do
955 allocaArray 2 $ \fds -> do
956 throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
957 rd_end <- peekElemOff fds 0
958 wr_end <- peekElemOff fds 1
959 setNonBlockingFD wr_end True -- writes happen in a signal handler, we
960 -- don't want them to block.
961 setCloseOnExec rd_end
962 setCloseOnExec wr_end
963 writeIORef stick (fromIntegral wr_end)
964 c_setIOManagerPipe wr_end
966 allocaBytes sizeofFdSet $ \readfds -> do
967 allocaBytes sizeofFdSet $ \writefds -> do
968 allocaBytes sizeofTimeVal $ \timeval -> do
969 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
973 :: Fd -- listen to this for wakeup calls
980 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
982 -- pick up new IO requests
983 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
984 let reqs = new_reqs ++ old_reqs
986 -- pick up new delay requests
987 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
988 let delays0 = foldr insertDelay old_delays new_delays
990 -- build the FDSets for select()
994 maxfd <- buildFdSets 0 readfds writefds reqs
996 -- perform the select()
997 let do_select delays = do
998 -- check the current time and wake up any thread in
999 -- threadDelay whose timeout has expired. Also find the
1000 -- timeout value for the select() call.
1002 (delays', timeout) <- getDelay now ptimeval delays
1004 res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
1010 _ | err == eINTR -> do_select delays'
1011 -- EINTR: just redo the select()
1012 _ | err == eBADF -> return (True, delays)
1013 -- EBADF: one of the file descriptors is closed or bad,
1014 -- we don't know which one, so wake everyone up.
1015 _ | otherwise -> throwErrno "select"
1016 -- otherwise (ENOMEM or EINVAL) something has gone
1017 -- wrong; report the error.
1019 return (False,delays')
1021 (wakeup_all,delays') <- do_select delays0
1024 if wakeup_all then return False
1026 b <- fdIsSet wakeup readfds
1029 else alloca $ \p -> do
1030 warnErrnoIfMinus1_ "service_loop" $
1031 c_read (fromIntegral wakeup) p 1
1034 _ | s == io_MANAGER_WAKEUP -> return False
1035 _ | s == io_MANAGER_DIE -> return True
1036 _ | s == io_MANAGER_SYNC -> do
1037 mvars <- readIORef sync
1038 mapM_ (flip putMVar ()) mvars
1041 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1042 withForeignPtr fp $ \p_siginfo -> do
1043 r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1045 when (r /= fromIntegral sizeof_siginfo_t) $
1046 error "failed to read siginfo_t"
1047 runHandlers' fp (fromIntegral s)
1052 atomicModifyIORef prodding (\_ -> (False, ()))
1054 reqs' <- if wakeup_all then do wakeupAll reqs; return []
1055 else completeRequests reqs readfds writefds []
1057 service_loop wakeup readfds writefds ptimeval reqs' delays'
1059 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1060 io_MANAGER_WAKEUP = 0xff
1061 io_MANAGER_DIE = 0xfe
1062 io_MANAGER_SYNC = 0xfd
1064 -- | the stick is for poking the IO manager with
1066 {-# NOINLINE stick #-}
1067 stick = unsafePerformIO (newIORef 0)
1069 {-# NOINLINE sync #-}
1070 sync :: IORef [MVar ()]
1071 sync = unsafePerformIO (newIORef [])
1073 -- waits for the IO manager to drain the pipe
1074 syncIOManager :: IO ()
1077 atomicModifyIORef sync (\old -> (m:old,()))
1078 fd <- readIORef stick
1079 with io_MANAGER_SYNC $ \pbuf -> do
1080 warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
1083 wakeupIOManager :: IO ()
1084 wakeupIOManager = do
1085 fd <- readIORef stick
1086 with io_MANAGER_WAKEUP $ \pbuf -> do
1087 warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
1089 -- For the non-threaded RTS
1090 runHandlers :: Ptr Word8 -> Int -> IO ()
1091 runHandlers p_info sig = do
1092 fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1093 withForeignPtr fp $ \p -> do
1094 copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1096 runHandlers' fp (fromIntegral sig)
1098 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1099 runHandlers' p_info sig = do
1100 let int = fromIntegral sig
1101 withMVar signal_handlers $ \arr ->
1102 if not (inRange (boundsIOArray arr) int)
1104 else do handler <- unsafeReadIOArray arr int
1106 Nothing -> return ()
1107 Just (f,_) -> do _ <- forkIO (f p_info)
1110 foreign import ccall "setIOManagerPipe"
1111 c_setIOManagerPipe :: CInt -> IO ()
1113 foreign import ccall "__hscore_sizeof_siginfo_t"
1114 sizeof_siginfo_t :: CSize
1120 type HandlerFun = ForeignPtr Word8 -> IO ()
1122 -- Lock used to protect concurrent access to signal_handlers. Symptom of
1123 -- this race condition is #1922, although that bug was on Windows a similar
1124 -- bug also exists on Unix.
1125 {-# NOINLINE signal_handlers #-}
1126 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1127 signal_handlers = unsafePerformIO $ do
1128 arr <- newIOArray (0,maxSig) Nothing
1131 stable_ref <- newStablePtr m
1132 let ref = castStablePtrToPtr stable_ref
1133 ref2 <- getOrSetSignalHandlerStore ref
1136 else do freeStablePtr stable_ref
1137 deRefStablePtr (castPtrToStablePtr ref2)
1139 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1140 getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1142 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1143 setHandler sig handler = do
1144 let int = fromIntegral sig
1145 withMVar signal_handlers $ \arr ->
1146 if not (inRange (boundsIOArray arr) int)
1147 then error "GHC.Conc.setHandler: signal out of range"
1148 else do old <- unsafeReadIOArray arr int
1149 unsafeWriteIOArray arr int handler
1152 -- -----------------------------------------------------------------------------
1155 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1156 buildFdSets maxfd _ _ [] = return maxfd
1157 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1158 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1161 buildFdSets (max maxfd fd) readfds writefds reqs
1162 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1163 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
1166 buildFdSets (max maxfd fd) readfds writefds reqs
1168 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1170 completeRequests [] _ _ reqs' = return reqs'
1171 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1172 b <- fdIsSet fd readfds
1174 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1175 else completeRequests reqs readfds writefds (Read fd m : reqs')
1176 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1177 b <- fdIsSet fd writefds
1179 then do putMVar m (); completeRequests reqs readfds writefds reqs'
1180 else completeRequests reqs readfds writefds (Write fd m : reqs')
1182 wakeupAll :: [IOReq] -> IO ()
1183 wakeupAll [] = return ()
1184 wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs
1185 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1187 waitForReadEvent :: Fd -> IO ()
1188 waitForReadEvent fd = do
1190 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1194 waitForWriteEvent :: Fd -> IO ()
1195 waitForWriteEvent fd = do
1197 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1201 -- -----------------------------------------------------------------------------
1204 -- Walk the queue of pending delays, waking up any that have passed
1205 -- and return the smallest delay to wait for. The queue of pending
1206 -- delays is kept ordered.
1207 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1208 getDelay _ _ [] = return ([],nullPtr)
1209 getDelay now ptimeval all@(d : rest)
1211 Delay time m | now >= time -> do
1213 getDelay now ptimeval rest
1214 DelaySTM time t | now >= time -> do
1215 atomically $ writeTVar t True
1216 getDelay now ptimeval rest
1218 setTimevalTicks ptimeval (delayTime d - now)
1219 return (all,ptimeval)
1223 foreign import ccall unsafe "sizeofTimeVal"
1224 sizeofTimeVal :: Int
1226 foreign import ccall unsafe "setTimevalTicks"
1227 setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1230 On Win32 we're going to have a single Pipe, and a
1231 waitForSingleObject with the delay time. For signals, we send a
1232 byte down the pipe just like on Unix.
1235 -- ----------------------------------------------------------------------------
1236 -- select() interface
1238 -- ToDo: move to System.Posix.Internals?
1242 foreign import ccall safe "select"
1243 c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1246 foreign import ccall unsafe "hsFD_SETSIZE"
1247 c_fD_SETSIZE :: CInt
1250 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1252 foreign import ccall unsafe "hsFD_ISSET"
1253 c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1255 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1256 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1258 foreign import ccall unsafe "hsFD_SET"
1259 c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1261 fdSet :: Fd -> Ptr CFdSet -> IO ()
1262 fdSet (Fd fd) fdset = c_fdSet fd fdset
1264 foreign import ccall unsafe "hsFD_ZERO"
1265 fdZero :: Ptr CFdSet -> IO ()
1267 foreign import ccall unsafe "sizeof_fd_set"
1272 reportStackOverflow :: IO a
1273 reportStackOverflow = do callStackOverflowHook; return undefined
1275 reportError :: SomeException -> IO a
1277 handler <- getUncaughtExceptionHandler
1281 -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
1282 -- the unsafe below.
1283 foreign import ccall unsafe "stackOverflow"
1284 callStackOverflowHook :: IO ()
1286 {-# NOINLINE uncaughtExceptionHandler #-}
1287 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1288 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1290 defaultHandler :: SomeException -> IO ()
1291 defaultHandler se@(SomeException ex) = do
1292 (hFlush stdout) `catchAny` (\ _ -> return ())
1293 let msg = case cast ex of
1294 Just Deadlock -> "no threads to run: infinite loop or deadlock?"
1295 _ -> case cast ex of
1296 Just (ErrorCall s) -> s
1297 _ -> showsPrec 0 se ""
1298 withCString "%s" $ \cfmt ->
1299 withCString msg $ \cmsg ->
1300 errorBelch cfmt cmsg
1302 -- don't use errorBelch() directly, because we cannot call varargs functions
1304 foreign import ccall unsafe "HsBase.h errorBelch2"
1305 errorBelch :: CString -> CString -> IO ()
1307 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1308 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1310 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1311 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1313 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1314 warnErrnoIfMinus1_ what io
1318 str <- strerror errno >>= peekCString
1320 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1322 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)