2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
6 -- Copyright : (c) The University of Glasgow, 1994-2002
7 -- License : see libraries/base/LICENSE
9 -- Maintainer : cvs-ghc@haskell.org
10 -- Stability : internal
11 -- Portability : non-portable (GHC extensions)
13 -- Basic concurrency stuff.
15 -----------------------------------------------------------------------------
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home. Hence:
26 -- Forking and suchlike
27 , forkIO -- :: IO a -> IO ThreadId
28 , forkOnIO -- :: Int -> IO a -> IO ThreadId
29 , childHandler -- :: Exception -> IO ()
30 , myThreadId -- :: IO ThreadId
31 , killThread -- :: ThreadId -> IO ()
32 , throwTo -- :: ThreadId -> Exception -> IO ()
33 , par -- :: a -> b -> b
34 , pseq -- :: a -> b -> b
36 , labelThread -- :: ThreadId -> String -> IO ()
39 , threadDelay -- :: Int -> IO ()
40 , registerDelay -- :: Int -> IO (TVar Bool)
41 , threadWaitRead -- :: Int -> IO ()
42 , threadWaitWrite -- :: Int -> IO ()
46 , newMVar -- :: a -> IO (MVar a)
47 , newEmptyMVar -- :: IO (MVar a)
48 , takeMVar -- :: MVar a -> IO a
49 , putMVar -- :: MVar a -> a -> IO ()
50 , tryTakeMVar -- :: MVar a -> IO (Maybe a)
51 , tryPutMVar -- :: MVar a -> a -> IO Bool
52 , isEmptyMVar -- :: MVar a -> IO Bool
53 , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
57 , atomically -- :: STM a -> IO a
59 , orElse -- :: STM a -> STM a -> STM a
60 , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
62 , newTVar -- :: a -> STM (TVar a)
63 , newTVarIO -- :: a -> STM (TVar a)
64 , readTVar -- :: TVar a -> STM a
65 , writeTVar -- :: a -> TVar a -> STM ()
66 , unsafeIOToSTM -- :: IO a -> STM a
68 #ifdef mingw32_HOST_OS
69 , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
70 , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
71 , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
73 , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
74 , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77 #ifndef mingw32_HOST_OS
78 , ensureIOManagerIsRunning
82 import System.Posix.Types
83 import System.Posix.Internals
88 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
95 import GHC.Num ( Num(..) )
96 import GHC.Real ( fromIntegral, quot )
97 import GHC.Base ( Int(..) )
98 import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
99 import GHC.Pack ( packCString# )
100 import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
104 infixr 0 `par`, `pseq`
107 %************************************************************************
109 \subsection{@ThreadId@, @par@, and @fork@}
111 %************************************************************************
114 data ThreadId = ThreadId ThreadId# deriving( Typeable )
115 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
116 -- But since ThreadId# is unlifted, the Weak type must use open
119 A 'ThreadId' is an abstract type representing a handle to a thread.
120 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
121 the 'Ord' instance implements an arbitrary total ordering over
122 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
123 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
124 useful when debugging or diagnosing the behaviour of a concurrent
127 /Note/: in GHC, if you have a 'ThreadId', you essentially have
128 a pointer to the thread itself. This means the thread itself can\'t be
129 garbage collected until you drop the 'ThreadId'.
130 This misfeature will hopefully be corrected at a later date.
132 /Note/: Hugs does not provide any operations on other threads;
133 it defines 'ThreadId' as a synonym for ().
137 This sparks off a new thread to run the 'IO' computation passed as the
138 first argument, and returns the 'ThreadId' of the newly created
141 The new thread will be a lightweight thread; if you want to use a foreign
142 library that uses thread-local storage, use 'forkOS' instead.
144 forkIO :: IO () -> IO ThreadId
145 forkIO action = IO $ \ s ->
146 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
148 action_plus = catchException action childHandler
150 forkOnIO :: Int -> IO () -> IO ThreadId
151 forkOnIO (I# cpu) action = IO $ \ s ->
152 case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
154 action_plus = catchException action childHandler
156 childHandler :: Exception -> IO ()
157 childHandler err = catchException (real_handler err) childHandler
159 real_handler :: Exception -> IO ()
162 -- ignore thread GC and killThread exceptions:
163 BlockedOnDeadMVar -> return ()
164 BlockedIndefinitely -> return ()
165 AsyncException ThreadKilled -> return ()
167 -- report all others:
168 AsyncException StackOverflow -> reportStackOverflow
169 other -> reportError other
171 {- | 'killThread' terminates the given thread (GHC only).
172 Any work already done by the thread isn\'t
173 lost: the computation is suspended until required by another thread.
174 The memory used by the thread will be garbage collected if it isn\'t
175 referenced from anywhere. The 'killThread' function is defined in
178 > killThread tid = throwTo tid (AsyncException ThreadKilled)
181 killThread :: ThreadId -> IO ()
182 killThread tid = throwTo tid (AsyncException ThreadKilled)
184 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
186 'throwTo' does not return until the exception has been raised in the
187 target thread. The calling thread can thus be certain that the target
188 thread has received the exception. This is a useful property to know
189 when dealing with race conditions: eg. if there are two threads that
190 can kill each other, it is guaranteed that only one of the threads
191 will get to kill the other.
193 If the target thread is currently making a foreign call, then the
194 exception will not be raised (and hence 'throwTo' will not return)
195 until the call has completed. This is the case regardless of whether
196 the call is inside a 'block' or not.
198 throwTo :: ThreadId -> Exception -> IO ()
199 throwTo (ThreadId id) ex = IO $ \ s ->
200 case (killThread# id ex s) of s1 -> (# s1, () #)
202 -- | Returns the 'ThreadId' of the calling thread (GHC only).
203 myThreadId :: IO ThreadId
204 myThreadId = IO $ \s ->
205 case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
208 -- |The 'yield' action allows (forces, in a co-operative multitasking
209 -- implementation) a context-switch to any other currently runnable
210 -- threads (if any), and is occasionally useful when implementing
211 -- concurrency abstractions.
214 case (yield# s) of s1 -> (# s1, () #)
216 {- | 'labelThread' stores a string as identifier for this thread if
217 you built a RTS with debugging support. This identifier will be used in
218 the debugging output to make distinction of different threads easier
219 (otherwise you only have the thread state object\'s address in the heap).
221 Other applications like the graphical Concurrent Haskell Debugger
222 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
223 'labelThread' for their purposes as well.
226 labelThread :: ThreadId -> String -> IO ()
227 labelThread (ThreadId t) str = IO $ \ s ->
228 let ps = packCString# str
229 adr = byteArrayContents# ps in
230 case (labelThread# t adr s) of s1 -> (# s1, () #)
232 -- Nota Bene: 'pseq' used to be 'seq'
233 -- but 'seq' is now defined in PrelGHC
235 -- "pseq" is defined a bit weirdly (see below)
237 -- The reason for the strange "lazy" call is that
238 -- it fools the compiler into thinking that pseq and par are non-strict in
239 -- their second argument (even if it inlines pseq at the call site).
240 -- If it thinks pseq is strict in "y", then it often evaluates
241 -- "y" before "x", which is totally wrong.
245 pseq x y = x `seq` lazy y
249 par x y = case (par# x) of { _ -> lazy y }
253 %************************************************************************
255 \subsection[stm]{Transactional heap operations}
257 %************************************************************************
259 TVars are shared memory locations which support atomic memory
263 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
265 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
268 instance Functor STM where
269 fmap f x = x >>= (return . f)
271 instance Monad STM where
272 {-# INLINE return #-}
276 return x = returnSTM x
277 m >>= k = bindSTM m k
279 bindSTM :: STM a -> (a -> STM b) -> STM b
280 bindSTM (STM m) k = STM ( \s ->
282 (# new_s, a #) -> unSTM (k a) new_s
285 thenSTM :: STM a -> STM b -> STM b
286 thenSTM (STM m) k = STM ( \s ->
288 (# new_s, a #) -> unSTM k new_s
291 returnSTM :: a -> STM a
292 returnSTM x = STM (\s -> (# s, x #))
294 -- | Unsafely performs IO in the STM monad.
295 unsafeIOToSTM :: IO a -> STM a
296 unsafeIOToSTM (IO m) = STM m
298 -- |Perform a series of STM actions atomically.
299 atomically :: STM a -> IO a
300 atomically (STM m) = IO (\s -> (atomically# m) s )
302 -- |Retry execution of the current memory transaction because it has seen
303 -- values in TVars which mean that it should not continue (e.g. the TVars
304 -- represent a shared buffer that is now empty). The implementation may
305 -- block the thread until one of the TVars that it has read from has been
308 retry = STM $ \s# -> retry# s#
310 -- |Compose two alternative STM actions. If the first action completes without
311 -- retrying then it forms the result of the orElse. Otherwise, if the first
312 -- action retries, then the second action is tried in its place. If both actions
313 -- retry then the orElse as a whole retries.
314 orElse :: STM a -> STM a -> STM a
315 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
317 -- |Exception handling within STM actions.
318 catchSTM :: STM a -> (Exception -> STM a) -> STM a
319 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
321 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
323 instance Eq (TVar a) where
324 (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
326 -- |Create a new TVar holding a value supplied
327 newTVar :: a -> STM (TVar a)
328 newTVar val = STM $ \s1# ->
329 case newTVar# val s1# of
330 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
332 -- |@IO@ version of 'newTVar'. This is useful for creating top-level
333 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
334 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
336 newTVarIO :: a -> IO (TVar a)
337 newTVarIO val = IO $ \s1# ->
338 case newTVar# val s1# of
339 (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
341 -- |Return the current value stored in a TVar
342 readTVar :: TVar a -> STM a
343 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
345 -- |Write the supplied value into a TVar
346 writeTVar :: TVar a -> a -> STM ()
347 writeTVar (TVar tvar#) val = STM $ \s1# ->
348 case writeTVar# tvar# val s1# of
353 %************************************************************************
355 \subsection[mvars]{M-Structures}
357 %************************************************************************
359 M-Vars are rendezvous points for concurrent threads. They begin
360 empty, and any attempt to read an empty M-Var blocks. When an M-Var
361 is written, a single blocked thread may be freed. Reading an M-Var
362 toggles its state from full back to empty. Therefore, any value
363 written to an M-Var may only be read once. Multiple reads and writes
364 are allowed, but there must be at least one read between any two
368 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
370 -- |Create an 'MVar' which is initially empty.
371 newEmptyMVar :: IO (MVar a)
372 newEmptyMVar = IO $ \ s# ->
374 (# s2#, svar# #) -> (# s2#, MVar svar# #)
376 -- |Create an 'MVar' which contains the supplied value.
377 newMVar :: a -> IO (MVar a)
379 newEmptyMVar >>= \ mvar ->
380 putMVar mvar value >>
383 -- |Return the contents of the 'MVar'. If the 'MVar' is currently
384 -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
385 -- the 'MVar' is left empty.
387 -- There are two further important properties of 'takeMVar':
389 -- * 'takeMVar' is single-wakeup. That is, if there are multiple
390 -- threads blocked in 'takeMVar', and the 'MVar' becomes full,
391 -- only one thread will be woken up. The runtime guarantees that
392 -- the woken thread completes its 'takeMVar' operation.
394 -- * When multiple threads are blocked on an 'MVar', they are
395 -- woken up in FIFO order. This is useful for providing
396 -- fairness properties of abstractions built using 'MVar's.
398 takeMVar :: MVar a -> IO a
399 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
401 -- |Put a value into an 'MVar'. If the 'MVar' is currently full,
402 -- 'putMVar' will wait until it becomes empty.
404 -- There are two further important properties of 'putMVar':
406 -- * 'putMVar' is single-wakeup. That is, if there are multiple
407 -- threads blocked in 'putMVar', and the 'MVar' becomes empty,
408 -- only one thread will be woken up. The runtime guarantees that
409 -- the woken thread completes its 'putMVar' operation.
411 -- * When multiple threads are blocked on an 'MVar', they are
412 -- woken up in FIFO order. This is useful for providing
413 -- fairness properties of abstractions built using 'MVar's.
415 putMVar :: MVar a -> a -> IO ()
416 putMVar (MVar mvar#) x = IO $ \ s# ->
417 case putMVar# mvar# x s# of
420 -- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
421 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
422 -- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
423 -- the 'MVar' is left empty.
424 tryTakeMVar :: MVar a -> IO (Maybe a)
425 tryTakeMVar (MVar m) = IO $ \ s ->
426 case tryTakeMVar# m s of
427 (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
428 (# s, _, a #) -> (# s, Just a #) -- MVar is full
430 -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
431 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
432 -- it was successful, or 'False' otherwise.
433 tryPutMVar :: MVar a -> a -> IO Bool
434 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
435 case tryPutMVar# mvar# x s# of
436 (# s, 0# #) -> (# s, False #)
437 (# s, _ #) -> (# s, True #)
439 -- |Check whether a given 'MVar' is empty.
441 -- Notice that the boolean value returned is just a snapshot of
442 -- the state of the MVar. By the time you get to react on its result,
443 -- the MVar may have been filled (or emptied) - so be extremely
444 -- careful when using this operation. Use 'tryTakeMVar' instead if possible.
445 isEmptyMVar :: MVar a -> IO Bool
446 isEmptyMVar (MVar mv#) = IO $ \ s# ->
447 case isEmptyMVar# mv# s# of
448 (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
450 -- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
451 -- "System.Mem.Weak" for more about finalizers.
452 addMVarFinalizer :: MVar a -> IO () -> IO ()
453 addMVarFinalizer (MVar m) finalizer =
454 IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
458 %************************************************************************
460 \subsection{Thread waiting}
462 %************************************************************************
465 #ifdef mingw32_HOST_OS
467 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
468 -- on Win32, but left in there because lib code (still) uses them (the manner
469 -- in which they're used doesn't cause problems on a Win32 platform though.)
471 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
472 asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
473 IO $ \s -> case asyncRead# fd isSock len buf s of
474 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
476 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
477 asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
478 IO $ \s -> case asyncWrite# fd isSock len buf s of
479 (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
481 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
482 asyncDoProc (FunPtr proc) (Ptr param) =
483 -- the 'length' value is ignored; simplifies implementation of
484 -- the async*# primops to have them all return the same result.
485 IO $ \s -> case asyncDoProc# proc param s of
486 (# s, len#, err# #) -> (# s, I# err# #)
488 -- to aid the use of these primops by the IO Handle implementation,
489 -- provide the following convenience funs:
491 -- this better be a pinned byte array!
492 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
493 asyncReadBA fd isSock len off bufB =
494 asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
496 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
497 asyncWriteBA fd isSock len off bufB =
498 asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
502 -- -----------------------------------------------------------------------------
505 -- | Block the current thread until data is available to read on the
506 -- given file descriptor (GHC only).
507 threadWaitRead :: Fd -> IO ()
509 #ifndef mingw32_HOST_OS
510 | threaded = waitForReadEvent fd
512 | otherwise = IO $ \s ->
513 case fromIntegral fd of { I# fd# ->
514 case waitRead# fd# s of { s -> (# s, () #)
517 -- | Block the current thread until data can be written to the
518 -- given file descriptor (GHC only).
519 threadWaitWrite :: Fd -> IO ()
521 #ifndef mingw32_HOST_OS
522 | threaded = waitForWriteEvent fd
524 | otherwise = IO $ \s ->
525 case fromIntegral fd of { I# fd# ->
526 case waitWrite# fd# s of { s -> (# s, () #)
529 -- | Suspends the current thread for a given number of microseconds
532 -- Note that the resolution used by the Haskell runtime system's
533 -- internal timer is 1\/50 second, and 'threadDelay' will round its
534 -- argument up to the nearest multiple of this resolution.
536 -- There is no guarantee that the thread will be rescheduled promptly
537 -- when the delay has expired, but the thread will never continue to
538 -- run /earlier/ than specified.
540 threadDelay :: Int -> IO ()
542 #ifndef mingw32_HOST_OS
543 | threaded = waitForDelayEvent time
545 | threaded = c_Sleep (fromIntegral (time `quot` 1000))
547 | otherwise = IO $ \s ->
548 case fromIntegral time of { I# time# ->
549 case delay# time# s of { s -> (# s, () #)
553 #ifndef mingw32_HOST_OS
554 | threaded = waitForDelayEventSTM usecs
555 | otherwise = error "registerDelay: requires -threaded"
557 = error "registerDelay: not currently supported on Windows"
560 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
561 #ifdef mingw32_HOST_OS
562 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
565 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
567 -- ----------------------------------------------------------------------------
568 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
570 -- In the threaded RTS, we employ a single IO Manager thread to wait
571 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
572 -- and delays (threadDelay).
574 -- We can do this because in the threaded RTS the IO Manager can make
575 -- a non-blocking call to select(), so we don't have to do select() in
576 -- the scheduler as we have to in the non-threaded RTS. We get performance
577 -- benefits from doing it this way, because we only have to restart the select()
578 -- when a new request arrives, rather than doing one select() each time
579 -- around the scheduler loop. Furthermore, the scheduler can be simplified
580 -- by not having to check for completed IO requests.
582 -- Issues, possible problems:
584 -- - we might want bound threads to just do the blocking
585 -- operation rather than communicating with the IO manager
586 -- thread. This would prevent simgle-threaded programs which do
587 -- IO from requiring multiple OS threads. However, it would also
588 -- prevent bound threads waiting on IO from being killed or sent
591 -- - Apprently exec() doesn't work on Linux in a multithreaded program.
592 -- I couldn't repeat this.
594 -- - How do we handle signal delivery in the multithreaded RTS?
596 -- - forkProcess will kill the IO manager thread. Let's just
597 -- hope we don't need to do any blocking IO between fork & exec.
599 #ifndef mingw32_HOST_OS
602 = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
603 | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
606 = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
607 | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
609 pendingEvents :: IORef [IOReq]
610 pendingDelays :: IORef [DelayReq]
611 -- could use a strict list or array here
612 {-# NOINLINE pendingEvents #-}
613 {-# NOINLINE pendingDelays #-}
614 (pendingEvents,pendingDelays) = unsafePerformIO $ do
619 -- the first time we schedule an IO request, the service thread
620 -- will be created (cool, huh?)
622 ensureIOManagerIsRunning :: IO ()
623 ensureIOManagerIsRunning
624 | threaded = seq pendingEvents $ return ()
625 | otherwise = return ()
627 startIOManagerThread :: IO ()
628 startIOManagerThread = do
629 allocaArray 2 $ \fds -> do
630 throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
631 rd_end <- peekElemOff fds 0
632 wr_end <- peekElemOff fds 1
633 writeIORef stick (fromIntegral wr_end)
634 c_setIOManagerPipe wr_end
636 allocaBytes sizeofFdSet $ \readfds -> do
637 allocaBytes sizeofFdSet $ \writefds -> do
638 allocaBytes sizeofTimeVal $ \timeval -> do
639 service_loop (fromIntegral rd_end) readfds writefds timeval [] []
643 :: Fd -- listen to this for wakeup calls
650 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
652 -- pick up new IO requests
653 new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
654 let reqs = new_reqs ++ old_reqs
656 -- pick up new delay requests
657 new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
658 let delays = foldr insertDelay old_delays new_delays
660 -- build the FDSets for select()
664 maxfd <- buildFdSets 0 readfds writefds reqs
666 -- perform the select()
667 let do_select delays = do
668 -- check the current time and wake up any thread in
669 -- threadDelay whose timeout has expired. Also find the
670 -- timeout value for the select() call.
672 (delays', timeout) <- getDelay now ptimeval delays
674 res <- c_select ((max wakeup maxfd)+1) readfds writefds
680 then do_select delays'
681 else return (res,delays')
685 (res,delays') <- do_select delays
686 -- ToDo: check result
688 b <- fdIsSet wakeup readfds
691 else alloca $ \p -> do
692 c_read (fromIntegral wakeup) p 1; return ()
696 else do handler_tbl <- peek handlers
697 sp <- peekElemOff handler_tbl (fromIntegral s)
698 forkIO (do io <- deRefStablePtr sp; io)
702 putMVar prodding False
704 reqs' <- completeRequests reqs readfds writefds []
705 service_loop wakeup readfds writefds ptimeval reqs' delays'
708 {-# NOINLINE stick #-}
709 stick = unsafePerformIO (newIORef 0)
711 prodding :: MVar Bool
712 {-# NOINLINE prodding #-}
713 prodding = unsafePerformIO (newMVar False)
715 prodServiceThread :: IO ()
716 prodServiceThread = do
717 b <- takeMVar prodding
719 then do fd <- readIORef stick
720 with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
722 putMVar prodding True
724 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
726 foreign import ccall "setIOManagerPipe"
727 c_setIOManagerPipe :: CInt -> IO ()
729 -- -----------------------------------------------------------------------------
732 buildFdSets maxfd readfds writefds [] = return maxfd
733 buildFdSets maxfd readfds writefds (Read fd m : reqs)
734 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
737 buildFdSets (max maxfd fd) readfds writefds reqs
738 buildFdSets maxfd readfds writefds (Write fd m : reqs)
739 | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
742 buildFdSets (max maxfd fd) readfds writefds reqs
744 completeRequests [] _ _ reqs' = return reqs'
745 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
746 b <- fdIsSet fd readfds
748 then do putMVar m (); completeRequests reqs readfds writefds reqs'
749 else completeRequests reqs readfds writefds (Read fd m : reqs')
750 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
751 b <- fdIsSet fd writefds
753 then do putMVar m (); completeRequests reqs readfds writefds reqs'
754 else completeRequests reqs readfds writefds (Write fd m : reqs')
756 waitForReadEvent :: Fd -> IO ()
757 waitForReadEvent fd = do
759 atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
763 waitForWriteEvent :: Fd -> IO ()
764 waitForWriteEvent fd = do
766 atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
770 -- XXX: move into GHC.IOBase from Data.IORef?
771 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
772 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
774 -- -----------------------------------------------------------------------------
777 waitForDelayEvent :: Int -> IO ()
778 waitForDelayEvent usecs = do
781 let target = now + usecs `quot` tick_usecs
782 atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
786 -- Delays for use in STM
787 waitForDelayEventSTM :: Int -> IO (TVar Bool)
788 waitForDelayEventSTM usecs = do
789 t <- atomically $ newTVar False
791 let target = now + usecs `quot` tick_usecs
792 atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
796 -- Walk the queue of pending delays, waking up any that have passed
797 -- and return the smallest delay to wait for. The queue of pending
798 -- delays is kept ordered.
799 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
800 getDelay now ptimeval [] = return ([],nullPtr)
801 getDelay now ptimeval all@(d : rest)
803 Delay time m | now >= time -> do
805 getDelay now ptimeval rest
806 DelaySTM time t | now >= time -> do
807 atomically $ writeTVar t True
808 getDelay now ptimeval rest
810 setTimevalTicks ptimeval (delayTime d - now)
811 return (all,ptimeval)
813 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
814 insertDelay d [] = [d]
815 insertDelay d1 ds@(d2 : rest)
816 | delayTime d1 <= delayTime d2 = d1 : ds
817 | otherwise = d2 : insertDelay d1 rest
819 delayTime (Delay t _) = t
820 delayTime (DelaySTM t _) = t
823 tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
824 tick_usecs = 1000000 `quot` tick_freq :: Int
826 newtype CTimeVal = CTimeVal ()
828 foreign import ccall unsafe "sizeofTimeVal"
831 foreign import ccall unsafe "getTicksOfDay"
832 getTicksOfDay :: IO Ticks
834 foreign import ccall unsafe "setTimevalTicks"
835 setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
837 -- ----------------------------------------------------------------------------
838 -- select() interface
840 -- ToDo: move to System.Posix.Internals?
842 newtype CFdSet = CFdSet ()
844 foreign import ccall safe "select"
845 c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
848 foreign import ccall unsafe "hsFD_SETSIZE"
851 foreign import ccall unsafe "hsFD_CLR"
852 fdClr :: Fd -> Ptr CFdSet -> IO ()
854 foreign import ccall unsafe "hsFD_ISSET"
855 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
857 foreign import ccall unsafe "hsFD_SET"
858 fdSet :: Fd -> Ptr CFdSet -> IO ()
860 foreign import ccall unsafe "hsFD_ZERO"
861 fdZero :: Ptr CFdSet -> IO ()
863 foreign import ccall unsafe "sizeof_fd_set"