Better error handling in the IO manager thread
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 -- #not-home
23 module GHC.Conc
24         ( ThreadId(..)
25
26         -- Forking and suchlike
27         , forkIO        -- :: IO a -> IO ThreadId
28         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
29         , childHandler  -- :: Exception -> IO ()
30         , myThreadId    -- :: IO ThreadId
31         , killThread    -- :: ThreadId -> IO ()
32         , throwTo       -- :: ThreadId -> Exception -> IO ()
33         , par           -- :: a -> b -> b
34         , pseq          -- :: a -> b -> b
35         , yield         -- :: IO ()
36         , labelThread   -- :: ThreadId -> String -> IO ()
37
38         -- Waiting
39         , threadDelay           -- :: Int -> IO ()
40         , registerDelay         -- :: Int -> IO (TVar Bool)
41         , threadWaitRead        -- :: Int -> IO ()
42         , threadWaitWrite       -- :: Int -> IO ()
43
44         -- MVars
45         , MVar          -- abstract
46         , newMVar       -- :: a -> IO (MVar a)
47         , newEmptyMVar  -- :: IO (MVar a)
48         , takeMVar      -- :: MVar a -> IO a
49         , putMVar       -- :: MVar a -> a -> IO ()
50         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
51         , tryPutMVar    -- :: MVar a -> a -> IO Bool
52         , isEmptyMVar   -- :: MVar a -> IO Bool
53         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
54
55         -- TVars
56         , STM           -- abstract
57         , atomically    -- :: STM a -> IO a
58         , retry         -- :: STM a
59         , orElse        -- :: STM a -> STM a -> STM a
60         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
61         , TVar          -- abstract
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , writeTVar     -- :: a -> TVar a -> STM ()
66         , unsafeIOToSTM -- :: IO a -> STM a
67
68 #ifdef mingw32_HOST_OS
69         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
70         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
71         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
72
73         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
74         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
75 #endif
76
77 #ifndef mingw32_HOST_OS
78         , ensureIOManagerIsRunning
79 #endif
80         ) where
81
82 import System.Posix.Types
83 import System.Posix.Internals
84 import Foreign
85 import Foreign.C
86
87 #ifndef __HADDOCK__
88 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
89 #endif
90
91 import Data.Maybe
92
93 import GHC.Base
94 import GHC.IOBase
95 import GHC.Num          ( Num(..) )
96 import GHC.Real         ( fromIntegral, quot )
97 import GHC.Base         ( Int(..) )
98 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
99 import GHC.Pack         ( packCString# )
100 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
101 import GHC.STRef
102 import Data.Typeable
103
104 infixr 0 `par`, `pseq`
105 \end{code}
106
107 %************************************************************************
108 %*                                                                      *
109 \subsection{@ThreadId@, @par@, and @fork@}
110 %*                                                                      *
111 %************************************************************************
112
113 \begin{code}
114 data ThreadId = ThreadId ThreadId# deriving( Typeable )
115 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
116 -- But since ThreadId# is unlifted, the Weak type must use open
117 -- type variables.
118 {- ^
119 A 'ThreadId' is an abstract type representing a handle to a thread.
120 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
121 the 'Ord' instance implements an arbitrary total ordering over
122 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
123 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
124 useful when debugging or diagnosing the behaviour of a concurrent
125 program.
126
127 /Note/: in GHC, if you have a 'ThreadId', you essentially have
128 a pointer to the thread itself.  This means the thread itself can\'t be
129 garbage collected until you drop the 'ThreadId'.
130 This misfeature will hopefully be corrected at a later date.
131
132 /Note/: Hugs does not provide any operations on other threads;
133 it defines 'ThreadId' as a synonym for ().
134 -}
135
136 {- |
137 This sparks off a new thread to run the 'IO' computation passed as the
138 first argument, and returns the 'ThreadId' of the newly created
139 thread.
140
141 The new thread will be a lightweight thread; if you want to use a foreign
142 library that uses thread-local storage, use 'forkOS' instead.
143 -}
144 forkIO :: IO () -> IO ThreadId
145 forkIO action = IO $ \ s -> 
146    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
147  where
148   action_plus = catchException action childHandler
149
150 forkOnIO :: Int -> IO () -> IO ThreadId
151 forkOnIO (I# cpu) action = IO $ \ s -> 
152    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
153  where
154   action_plus = catchException action childHandler
155
156 childHandler :: Exception -> IO ()
157 childHandler err = catchException (real_handler err) childHandler
158
159 real_handler :: Exception -> IO ()
160 real_handler ex =
161   case ex of
162         -- ignore thread GC and killThread exceptions:
163         BlockedOnDeadMVar            -> return ()
164         BlockedIndefinitely          -> return ()
165         AsyncException ThreadKilled  -> return ()
166
167         -- report all others:
168         AsyncException StackOverflow -> reportStackOverflow
169         other       -> reportError other
170
171 {- | 'killThread' terminates the given thread (GHC only).
172 Any work already done by the thread isn\'t
173 lost: the computation is suspended until required by another thread.
174 The memory used by the thread will be garbage collected if it isn\'t
175 referenced from anywhere.  The 'killThread' function is defined in
176 terms of 'throwTo':
177
178 > killThread tid = throwTo tid (AsyncException ThreadKilled)
179
180 -}
181 killThread :: ThreadId -> IO ()
182 killThread tid = throwTo tid (AsyncException ThreadKilled)
183
184 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
185
186 'throwTo' does not return until the exception has been raised in the
187 target thread.  The calling thread can thus be certain that the target
188 thread has received the exception.  This is a useful property to know
189 when dealing with race conditions: eg. if there are two threads that
190 can kill each other, it is guaranteed that only one of the threads
191 will get to kill the other.
192
193 If the target thread is currently making a foreign call, then the
194 exception will not be raised (and hence 'throwTo' will not return)
195 until the call has completed.  This is the case regardless of whether
196 the call is inside a 'block' or not.
197  -}
198 throwTo :: ThreadId -> Exception -> IO ()
199 throwTo (ThreadId id) ex = IO $ \ s ->
200    case (killThread# id ex s) of s1 -> (# s1, () #)
201
202 -- | Returns the 'ThreadId' of the calling thread (GHC only).
203 myThreadId :: IO ThreadId
204 myThreadId = IO $ \s ->
205    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
206
207
208 -- |The 'yield' action allows (forces, in a co-operative multitasking
209 -- implementation) a context-switch to any other currently runnable
210 -- threads (if any), and is occasionally useful when implementing
211 -- concurrency abstractions.
212 yield :: IO ()
213 yield = IO $ \s -> 
214    case (yield# s) of s1 -> (# s1, () #)
215
216 {- | 'labelThread' stores a string as identifier for this thread if
217 you built a RTS with debugging support. This identifier will be used in
218 the debugging output to make distinction of different threads easier
219 (otherwise you only have the thread state object\'s address in the heap).
220
221 Other applications like the graphical Concurrent Haskell Debugger
222 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
223 'labelThread' for their purposes as well.
224 -}
225
226 labelThread :: ThreadId -> String -> IO ()
227 labelThread (ThreadId t) str = IO $ \ s ->
228    let ps  = packCString# str
229        adr = byteArrayContents# ps in
230      case (labelThread# t adr s) of s1 -> (# s1, () #)
231
232 --      Nota Bene: 'pseq' used to be 'seq'
233 --                 but 'seq' is now defined in PrelGHC
234 --
235 -- "pseq" is defined a bit weirdly (see below)
236 --
237 -- The reason for the strange "lazy" call is that
238 -- it fools the compiler into thinking that pseq  and par are non-strict in
239 -- their second argument (even if it inlines pseq at the call site).
240 -- If it thinks pseq is strict in "y", then it often evaluates
241 -- "y" before "x", which is totally wrong.  
242
243 {-# INLINE pseq  #-}
244 pseq :: a -> b -> b
245 pseq  x y = x `seq` lazy y
246
247 {-# INLINE par  #-}
248 par :: a -> b -> b
249 par  x y = case (par# x) of { _ -> lazy y }
250 \end{code}
251
252
253 %************************************************************************
254 %*                                                                      *
255 \subsection[stm]{Transactional heap operations}
256 %*                                                                      *
257 %************************************************************************
258
259 TVars are shared memory locations which support atomic memory
260 transactions.
261
262 \begin{code}
263 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
264
265 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
266 unSTM (STM a) = a
267
268 instance  Functor STM where
269    fmap f x = x >>= (return . f)
270
271 instance  Monad STM  where
272     {-# INLINE return #-}
273     {-# INLINE (>>)   #-}
274     {-# INLINE (>>=)  #-}
275     m >> k      = thenSTM m k
276     return x    = returnSTM x
277     m >>= k     = bindSTM m k
278
279 bindSTM :: STM a -> (a -> STM b) -> STM b
280 bindSTM (STM m) k = STM ( \s ->
281   case m s of 
282     (# new_s, a #) -> unSTM (k a) new_s
283   )
284
285 thenSTM :: STM a -> STM b -> STM b
286 thenSTM (STM m) k = STM ( \s ->
287   case m s of 
288     (# new_s, a #) -> unSTM k new_s
289   )
290
291 returnSTM :: a -> STM a
292 returnSTM x = STM (\s -> (# s, x #))
293
294 -- | Unsafely performs IO in the STM monad.
295 unsafeIOToSTM :: IO a -> STM a
296 unsafeIOToSTM (IO m) = STM m
297
298 -- |Perform a series of STM actions atomically.
299 atomically :: STM a -> IO a
300 atomically (STM m) = IO (\s -> (atomically# m) s )
301
302 -- |Retry execution of the current memory transaction because it has seen
303 -- values in TVars which mean that it should not continue (e.g. the TVars
304 -- represent a shared buffer that is now empty).  The implementation may
305 -- block the thread until one of the TVars that it has read from has been
306 -- udpated.
307 retry :: STM a
308 retry = STM $ \s# -> retry# s#
309
310 -- |Compose two alternative STM actions.  If the first action completes without
311 -- retrying then it forms the result of the orElse.  Otherwise, if the first
312 -- action retries, then the second action is tried in its place.  If both actions
313 -- retry then the orElse as a whole retries.
314 orElse :: STM a -> STM a -> STM a
315 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
316
317 -- |Exception handling within STM actions.
318 catchSTM :: STM a -> (Exception -> STM a) -> STM a
319 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
320
321 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
322
323 instance Eq (TVar a) where
324         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
325
326 -- |Create a new TVar holding a value supplied
327 newTVar :: a -> STM (TVar a)
328 newTVar val = STM $ \s1# ->
329     case newTVar# val s1# of
330          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
331
332 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
333 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
334 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
335 -- possible.
336 newTVarIO :: a -> IO (TVar a)
337 newTVarIO val = IO $ \s1# ->
338     case newTVar# val s1# of
339          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
340
341 -- |Return the current value stored in a TVar
342 readTVar :: TVar a -> STM a
343 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
344
345 -- |Write the supplied value into a TVar
346 writeTVar :: TVar a -> a -> STM ()
347 writeTVar (TVar tvar#) val = STM $ \s1# ->
348     case writeTVar# tvar# val s1# of
349          s2# -> (# s2#, () #)
350   
351 \end{code}
352
353 %************************************************************************
354 %*                                                                      *
355 \subsection[mvars]{M-Structures}
356 %*                                                                      *
357 %************************************************************************
358
359 M-Vars are rendezvous points for concurrent threads.  They begin
360 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
361 is written, a single blocked thread may be freed.  Reading an M-Var
362 toggles its state from full back to empty.  Therefore, any value
363 written to an M-Var may only be read once.  Multiple reads and writes
364 are allowed, but there must be at least one read between any two
365 writes.
366
367 \begin{code}
368 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
369
370 -- |Create an 'MVar' which is initially empty.
371 newEmptyMVar  :: IO (MVar a)
372 newEmptyMVar = IO $ \ s# ->
373     case newMVar# s# of
374          (# s2#, svar# #) -> (# s2#, MVar svar# #)
375
376 -- |Create an 'MVar' which contains the supplied value.
377 newMVar :: a -> IO (MVar a)
378 newMVar value =
379     newEmptyMVar        >>= \ mvar ->
380     putMVar mvar value  >>
381     return mvar
382
383 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
384 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
385 -- the 'MVar' is left empty.
386 -- 
387 -- There are two further important properties of 'takeMVar':
388 --
389 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
390 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
391 --     only one thread will be woken up.  The runtime guarantees that
392 --     the woken thread completes its 'takeMVar' operation.
393 --
394 --   * When multiple threads are blocked on an 'MVar', they are
395 --     woken up in FIFO order.  This is useful for providing
396 --     fairness properties of abstractions built using 'MVar's.
397 --
398 takeMVar :: MVar a -> IO a
399 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
400
401 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
402 -- 'putMVar' will wait until it becomes empty.
403 --
404 -- There are two further important properties of 'putMVar':
405 --
406 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
407 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
408 --     only one thread will be woken up.  The runtime guarantees that
409 --     the woken thread completes its 'putMVar' operation.
410 --
411 --   * When multiple threads are blocked on an 'MVar', they are
412 --     woken up in FIFO order.  This is useful for providing
413 --     fairness properties of abstractions built using 'MVar's.
414 --
415 putMVar  :: MVar a -> a -> IO ()
416 putMVar (MVar mvar#) x = IO $ \ s# ->
417     case putMVar# mvar# x s# of
418         s2# -> (# s2#, () #)
419
420 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
421 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
422 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
423 -- the 'MVar' is left empty.
424 tryTakeMVar :: MVar a -> IO (Maybe a)
425 tryTakeMVar (MVar m) = IO $ \ s ->
426     case tryTakeMVar# m s of
427         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
428         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
429
430 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
431 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
432 -- it was successful, or 'False' otherwise.
433 tryPutMVar  :: MVar a -> a -> IO Bool
434 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
435     case tryPutMVar# mvar# x s# of
436         (# s, 0# #) -> (# s, False #)
437         (# s, _  #) -> (# s, True #)
438
439 -- |Check whether a given 'MVar' is empty.
440 --
441 -- Notice that the boolean value returned  is just a snapshot of
442 -- the state of the MVar. By the time you get to react on its result,
443 -- the MVar may have been filled (or emptied) - so be extremely
444 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
445 isEmptyMVar :: MVar a -> IO Bool
446 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
447     case isEmptyMVar# mv# s# of
448         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
449
450 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
451 -- "System.Mem.Weak" for more about finalizers.
452 addMVarFinalizer :: MVar a -> IO () -> IO ()
453 addMVarFinalizer (MVar m) finalizer = 
454   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
455 \end{code}
456
457
458 %************************************************************************
459 %*                                                                      *
460 \subsection{Thread waiting}
461 %*                                                                      *
462 %************************************************************************
463
464 \begin{code}
465 #ifdef mingw32_HOST_OS
466
467 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
468 -- on Win32, but left in there because lib code (still) uses them (the manner
469 -- in which they're used doesn't cause problems on a Win32 platform though.)
470
471 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
472 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
473   IO $ \s -> case asyncRead# fd isSock len buf s of 
474                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
475
476 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
477 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
478   IO $ \s -> case asyncWrite# fd isSock len buf s of 
479                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
480
481 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
482 asyncDoProc (FunPtr proc) (Ptr param) = 
483     -- the 'length' value is ignored; simplifies implementation of
484     -- the async*# primops to have them all return the same result.
485   IO $ \s -> case asyncDoProc# proc param s  of 
486                (# s, len#, err# #) -> (# s, I# err# #)
487
488 -- to aid the use of these primops by the IO Handle implementation,
489 -- provide the following convenience funs:
490
491 -- this better be a pinned byte array!
492 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
493 asyncReadBA fd isSock len off bufB = 
494   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
495   
496 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
497 asyncWriteBA fd isSock len off bufB = 
498   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
499
500 #endif
501
502 -- -----------------------------------------------------------------------------
503 -- Thread IO API
504
505 -- | Block the current thread until data is available to read on the
506 -- given file descriptor (GHC only).
507 threadWaitRead :: Fd -> IO ()
508 threadWaitRead fd
509 #ifndef mingw32_HOST_OS
510   | threaded  = waitForReadEvent fd
511 #endif
512   | otherwise = IO $ \s -> 
513         case fromIntegral fd of { I# fd# ->
514         case waitRead# fd# s of { s -> (# s, () #)
515         }}
516
517 -- | Block the current thread until data can be written to the
518 -- given file descriptor (GHC only).
519 threadWaitWrite :: Fd -> IO ()
520 threadWaitWrite fd
521 #ifndef mingw32_HOST_OS
522   | threaded  = waitForWriteEvent fd
523 #endif
524   | otherwise = IO $ \s -> 
525         case fromIntegral fd of { I# fd# ->
526         case waitWrite# fd# s of { s -> (# s, () #)
527         }}
528
529 -- | Suspends the current thread for a given number of microseconds
530 -- (GHC only).
531 --
532 -- Note that the resolution used by the Haskell runtime system's
533 -- internal timer is 1\/50 second, and 'threadDelay' will round its
534 -- argument up to the nearest multiple of this resolution.
535 --
536 -- There is no guarantee that the thread will be rescheduled promptly
537 -- when the delay has expired, but the thread will never continue to
538 -- run /earlier/ than specified.
539 --
540 threadDelay :: Int -> IO ()
541 threadDelay time
542 #ifndef mingw32_HOST_OS
543   | threaded  = waitForDelayEvent time
544 #else
545   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
546 #endif
547   | otherwise = IO $ \s -> 
548         case fromIntegral time of { I# time# ->
549         case delay# time# s of { s -> (# s, () #)
550         }}
551
552 registerDelay usecs 
553 #ifndef mingw32_HOST_OS
554   | threaded = waitForDelayEventSTM usecs
555   | otherwise = error "registerDelay: requires -threaded"
556 #else
557   = error "registerDelay: not currently supported on Windows"
558 #endif
559
560 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
561 #ifdef mingw32_HOST_OS
562 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
563 #endif
564
565 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
566
567 -- ----------------------------------------------------------------------------
568 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
569
570 -- In the threaded RTS, we employ a single IO Manager thread to wait
571 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
572 -- and delays (threadDelay).  
573 --
574 -- We can do this because in the threaded RTS the IO Manager can make
575 -- a non-blocking call to select(), so we don't have to do select() in
576 -- the scheduler as we have to in the non-threaded RTS.  We get performance
577 -- benefits from doing it this way, because we only have to restart the select()
578 -- when a new request arrives, rather than doing one select() each time
579 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
580 -- by not having to check for completed IO requests.
581
582 -- Issues, possible problems:
583 --
584 --      - we might want bound threads to just do the blocking
585 --        operation rather than communicating with the IO manager
586 --        thread.  This would prevent simgle-threaded programs which do
587 --        IO from requiring multiple OS threads.  However, it would also
588 --        prevent bound threads waiting on IO from being killed or sent
589 --        exceptions.
590 --
591 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
592 --        I couldn't repeat this.
593 --
594 --      - How do we handle signal delivery in the multithreaded RTS?
595 --
596 --      - forkProcess will kill the IO manager thread.  Let's just
597 --        hope we don't need to do any blocking IO between fork & exec.
598
599 #ifndef mingw32_HOST_OS
600
601 data IOReq
602   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
603   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
604
605 data DelayReq
606   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
607   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
608
609 pendingEvents :: IORef [IOReq]
610 pendingDelays :: IORef [DelayReq]
611         -- could use a strict list or array here
612 {-# NOINLINE pendingEvents #-}
613 {-# NOINLINE pendingDelays #-}
614 (pendingEvents,pendingDelays) = unsafePerformIO $ do
615   startIOManagerThread
616   reqs <- newIORef []
617   dels <- newIORef []
618   return (reqs, dels)
619         -- the first time we schedule an IO request, the service thread
620         -- will be created (cool, huh?)
621
622 ensureIOManagerIsRunning :: IO ()
623 ensureIOManagerIsRunning 
624   | threaded  = seq pendingEvents $ return ()
625   | otherwise = return ()
626
627 startIOManagerThread :: IO ()
628 startIOManagerThread = do
629         allocaArray 2 $ \fds -> do
630         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
631         rd_end <- peekElemOff fds 0
632         wr_end <- peekElemOff fds 1
633         writeIORef stick (fromIntegral wr_end)
634         c_setIOManagerPipe wr_end
635         forkIO $ do
636             allocaBytes sizeofFdSet   $ \readfds -> do
637             allocaBytes sizeofFdSet   $ \writefds -> do 
638             allocaBytes sizeofTimeVal $ \timeval -> do
639             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
640         return ()
641
642 service_loop
643    :: Fd                -- listen to this for wakeup calls
644    -> Ptr CFdSet
645    -> Ptr CFdSet
646    -> Ptr CTimeVal
647    -> [IOReq]
648    -> [DelayReq]
649    -> IO ()
650 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
651
652   -- pick up new IO requests
653   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
654   let reqs = new_reqs ++ old_reqs
655
656   -- pick up new delay requests
657   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
658   let  delays = foldr insertDelay old_delays new_delays
659
660   -- build the FDSets for select()
661   fdZero readfds
662   fdZero writefds
663   fdSet wakeup readfds
664   maxfd <- buildFdSets 0 readfds writefds reqs
665
666   -- perform the select()
667   let do_select delays = do
668           -- check the current time and wake up any thread in
669           -- threadDelay whose timeout has expired.  Also find the
670           -- timeout value for the select() call.
671           now <- getTicksOfDay
672           (delays', timeout) <- getDelay now ptimeval delays
673
674           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
675                         nullPtr timeout
676           if (res == -1)
677              then do
678                 err <- getErrno
679                 case err of
680                   _ | err == eINTR ->  do_select delays'
681                         -- EINTR: just redo the select()
682                   _ | err == eBADF ->  return (True, delays)
683                         -- EBADF: one of the file descriptors is closed or bad,
684                         -- we don't know which one, so wake everyone up.
685                   _ | otherwise    ->  throwErrno "select"
686                         -- otherwise (ENOMEM or EINVAL) something has gone
687                         -- wrong; report the error.
688              else
689                 return (False,delays')
690
691   (wakeup_all,delays') <- do_select delays
692
693   if wakeup_all then return ()
694     else do
695       b <- fdIsSet wakeup readfds
696       if b == 0 
697         then return ()
698         else alloca $ \p -> do 
699             c_read (fromIntegral wakeup) p 1; return ()
700             s <- peek p         
701             if (s == 0xff) 
702               then return ()
703               else do handler_tbl <- peek handlers
704                       sp <- peekElemOff handler_tbl (fromIntegral s)
705                       forkIO (do io <- deRefStablePtr sp; io)
706                       return ()
707
708   takeMVar prodding
709   putMVar prodding False
710
711   reqs' <- if wakeup_all then do wakeupAll reqs; return []
712                          else completeRequests reqs readfds writefds []
713
714   service_loop wakeup readfds writefds ptimeval reqs' delays'
715
716 stick :: IORef Fd
717 {-# NOINLINE stick #-}
718 stick = unsafePerformIO (newIORef 0)
719
720 prodding :: MVar Bool
721 {-# NOINLINE prodding #-}
722 prodding = unsafePerformIO (newMVar False)
723
724 prodServiceThread :: IO ()
725 prodServiceThread = do
726   b <- takeMVar prodding
727   if (not b) 
728     then do fd <- readIORef stick
729             with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
730     else return ()
731   putMVar prodding True
732
733 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
734
735 foreign import ccall "setIOManagerPipe"
736   c_setIOManagerPipe :: CInt -> IO ()
737
738 -- -----------------------------------------------------------------------------
739 -- IO requests
740
741 buildFdSets maxfd readfds writefds [] = return maxfd
742 buildFdSets maxfd readfds writefds (Read fd m : reqs)
743   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
744   | otherwise        =  do
745         fdSet fd readfds
746         buildFdSets (max maxfd fd) readfds writefds reqs
747 buildFdSets maxfd readfds writefds (Write fd m : reqs)
748   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
749   | otherwise        =  do
750         fdSet fd writefds
751         buildFdSets (max maxfd fd) readfds writefds reqs
752
753 completeRequests [] _ _ reqs' = return reqs'
754 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
755   b <- fdIsSet fd readfds
756   if b /= 0
757     then do putMVar m (); completeRequests reqs readfds writefds reqs'
758     else completeRequests reqs readfds writefds (Read fd m : reqs')
759 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
760   b <- fdIsSet fd writefds
761   if b /= 0
762     then do putMVar m (); completeRequests reqs readfds writefds reqs'
763     else completeRequests reqs readfds writefds (Write fd m : reqs')
764
765 wakeupAll [] = return ()
766 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
767 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
768
769 waitForReadEvent :: Fd -> IO ()
770 waitForReadEvent fd = do
771   m <- newEmptyMVar
772   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
773   prodServiceThread
774   takeMVar m
775
776 waitForWriteEvent :: Fd -> IO ()
777 waitForWriteEvent fd = do
778   m <- newEmptyMVar
779   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
780   prodServiceThread
781   takeMVar m
782
783 -- XXX: move into GHC.IOBase from Data.IORef?
784 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
785 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
786
787 -- -----------------------------------------------------------------------------
788 -- Delays
789
790 waitForDelayEvent :: Int -> IO ()
791 waitForDelayEvent usecs = do
792   m <- newEmptyMVar
793   now <- getTicksOfDay
794   let target = now + usecs `quot` tick_usecs
795   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
796   prodServiceThread
797   takeMVar m
798
799 -- Delays for use in STM
800 waitForDelayEventSTM :: Int -> IO (TVar Bool)
801 waitForDelayEventSTM usecs = do
802    t <- atomically $ newTVar False
803    now <- getTicksOfDay
804    let target = now + usecs `quot` tick_usecs
805    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
806    prodServiceThread
807    return t  
808     
809 -- Walk the queue of pending delays, waking up any that have passed
810 -- and return the smallest delay to wait for.  The queue of pending
811 -- delays is kept ordered.
812 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
813 getDelay now ptimeval [] = return ([],nullPtr)
814 getDelay now ptimeval all@(d : rest) 
815   = case d of
816      Delay time m | now >= time -> do
817         putMVar m ()
818         getDelay now ptimeval rest
819      DelaySTM time t | now >= time -> do
820         atomically $ writeTVar t True
821         getDelay now ptimeval rest
822      _otherwise -> do
823         setTimevalTicks ptimeval (delayTime d - now)
824         return (all,ptimeval)
825
826 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
827 insertDelay d [] = [d]
828 insertDelay d1 ds@(d2 : rest)
829   | delayTime d1 <= delayTime d2 = d1 : ds
830   | otherwise                    = d2 : insertDelay d1 rest
831
832 delayTime (Delay t _) = t
833 delayTime (DelaySTM t _) = t
834
835 type Ticks = Int
836 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
837 tick_usecs = 1000000 `quot` tick_freq :: Int
838
839 newtype CTimeVal = CTimeVal ()
840
841 foreign import ccall unsafe "sizeofTimeVal"
842   sizeofTimeVal :: Int
843
844 foreign import ccall unsafe "getTicksOfDay" 
845   getTicksOfDay :: IO Ticks
846
847 foreign import ccall unsafe "setTimevalTicks" 
848   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
849
850 -- ----------------------------------------------------------------------------
851 -- select() interface
852
853 -- ToDo: move to System.Posix.Internals?
854
855 newtype CFdSet = CFdSet ()
856
857 foreign import ccall safe "select"
858   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
859            -> IO CInt
860
861 foreign import ccall unsafe "hsFD_SETSIZE"
862   fD_SETSIZE :: Fd
863
864 foreign import ccall unsafe "hsFD_CLR"
865   fdClr :: Fd -> Ptr CFdSet -> IO ()
866
867 foreign import ccall unsafe "hsFD_ISSET"
868   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
869
870 foreign import ccall unsafe "hsFD_SET"
871   fdSet :: Fd -> Ptr CFdSet -> IO ()
872
873 foreign import ccall unsafe "hsFD_ZERO"
874   fdZero :: Ptr CFdSet -> IO ()
875
876 foreign import ccall unsafe "sizeof_fd_set"
877   sizeofFdSet :: Int
878
879 #endif
880 \end{code}