add forkOnIO :: Int -> IO () -> IO ThreadId
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 -- #not-home
23 module GHC.Conc
24         ( ThreadId(..)
25
26         -- Forking and suchlike
27         , forkIO        -- :: IO a -> IO ThreadId
28         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
29         , childHandler  -- :: Exception -> IO ()
30         , myThreadId    -- :: IO ThreadId
31         , killThread    -- :: ThreadId -> IO ()
32         , throwTo       -- :: ThreadId -> Exception -> IO ()
33         , par           -- :: a -> b -> b
34         , pseq          -- :: a -> b -> b
35         , yield         -- :: IO ()
36         , labelThread   -- :: ThreadId -> String -> IO ()
37
38         -- Waiting
39         , threadDelay           -- :: Int -> IO ()
40         , registerDelay         -- :: Int -> IO (TVar Bool)
41         , threadWaitRead        -- :: Int -> IO ()
42         , threadWaitWrite       -- :: Int -> IO ()
43
44         -- MVars
45         , MVar          -- abstract
46         , newMVar       -- :: a -> IO (MVar a)
47         , newEmptyMVar  -- :: IO (MVar a)
48         , takeMVar      -- :: MVar a -> IO a
49         , putMVar       -- :: MVar a -> a -> IO ()
50         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
51         , tryPutMVar    -- :: MVar a -> a -> IO Bool
52         , isEmptyMVar   -- :: MVar a -> IO Bool
53         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
54
55         -- TVars
56         , STM           -- abstract
57         , atomically    -- :: STM a -> IO a
58         , retry         -- :: STM a
59         , orElse        -- :: STM a -> STM a -> STM a
60         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
61         , TVar          -- abstract
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , writeTVar     -- :: a -> TVar a -> STM ()
66         , unsafeIOToSTM -- :: IO a -> STM a
67
68 #ifdef mingw32_HOST_OS
69         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
70         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
71         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
72
73         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
74         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
75 #endif
76
77 #ifndef mingw32_HOST_OS
78         , ensureIOManagerIsRunning
79 #endif
80         ) where
81
82 import System.Posix.Types
83 import System.Posix.Internals
84 import Foreign
85 import Foreign.C
86
87 #ifndef __HADDOCK__
88 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
89 #endif
90
91 import Data.Maybe
92
93 import GHC.Base
94 import GHC.IOBase
95 import GHC.Num          ( Num(..) )
96 import GHC.Real         ( fromIntegral, quot )
97 import GHC.Base         ( Int(..) )
98 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
99 import GHC.Pack         ( packCString# )
100 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
101 import GHC.STRef
102 import Data.Typeable
103
104 infixr 0 `par`, `pseq`
105 \end{code}
106
107 %************************************************************************
108 %*                                                                      *
109 \subsection{@ThreadId@, @par@, and @fork@}
110 %*                                                                      *
111 %************************************************************************
112
113 \begin{code}
114 data ThreadId = ThreadId ThreadId# deriving( Typeable )
115 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
116 -- But since ThreadId# is unlifted, the Weak type must use open
117 -- type variables.
118 {- ^
119 A 'ThreadId' is an abstract type representing a handle to a thread.
120 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
121 the 'Ord' instance implements an arbitrary total ordering over
122 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
123 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
124 useful when debugging or diagnosing the behaviour of a concurrent
125 program.
126
127 /Note/: in GHC, if you have a 'ThreadId', you essentially have
128 a pointer to the thread itself.  This means the thread itself can\'t be
129 garbage collected until you drop the 'ThreadId'.
130 This misfeature will hopefully be corrected at a later date.
131
132 /Note/: Hugs does not provide any operations on other threads;
133 it defines 'ThreadId' as a synonym for ().
134 -}
135
136 {- |
137 This sparks off a new thread to run the 'IO' computation passed as the
138 first argument, and returns the 'ThreadId' of the newly created
139 thread.
140
141 The new thread will be a lightweight thread; if you want to use a foreign
142 library that uses thread-local storage, use 'forkOS' instead.
143 -}
144 forkIO :: IO () -> IO ThreadId
145 forkIO action = IO $ \ s -> 
146    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
147  where
148   action_plus = catchException action childHandler
149
150 forkOnIO :: Int -> IO () -> IO ThreadId
151 forkOnIO (I# cpu) action = IO $ \ s -> 
152    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
153  where
154   action_plus = catchException action childHandler
155
156 childHandler :: Exception -> IO ()
157 childHandler err = catchException (real_handler err) childHandler
158
159 real_handler :: Exception -> IO ()
160 real_handler ex =
161   case ex of
162         -- ignore thread GC and killThread exceptions:
163         BlockedOnDeadMVar            -> return ()
164         BlockedIndefinitely          -> return ()
165         AsyncException ThreadKilled  -> return ()
166
167         -- report all others:
168         AsyncException StackOverflow -> reportStackOverflow
169         other       -> reportError other
170
171 {- | 'killThread' terminates the given thread (GHC only).
172 Any work already done by the thread isn\'t
173 lost: the computation is suspended until required by another thread.
174 The memory used by the thread will be garbage collected if it isn\'t
175 referenced from anywhere.  The 'killThread' function is defined in
176 terms of 'throwTo':
177
178 > killThread tid = throwTo tid (AsyncException ThreadKilled)
179
180 -}
181 killThread :: ThreadId -> IO ()
182 killThread tid = throwTo tid (AsyncException ThreadKilled)
183
184 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
185
186 'throwTo' does not return until the exception has been raised in the
187 target thread.  The calling thread can thus be certain that the target
188 thread has received the exception.  This is a useful property to know
189 when dealing with race conditions: eg. if there are two threads that
190 can kill each other, it is guaranteed that only one of the threads
191 will get to kill the other.
192
193 If the target thread is currently making a foreign call, then the
194 exception will not be raised (and hence 'throwTo' will not return)
195 until the call has completed.  This is the case regardless of whether
196 the call is inside a 'block' or not.
197  -}
198 throwTo :: ThreadId -> Exception -> IO ()
199 throwTo (ThreadId id) ex = IO $ \ s ->
200    case (killThread# id ex s) of s1 -> (# s1, () #)
201
202 -- | Returns the 'ThreadId' of the calling thread (GHC only).
203 myThreadId :: IO ThreadId
204 myThreadId = IO $ \s ->
205    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
206
207
208 -- |The 'yield' action allows (forces, in a co-operative multitasking
209 -- implementation) a context-switch to any other currently runnable
210 -- threads (if any), and is occasionally useful when implementing
211 -- concurrency abstractions.
212 yield :: IO ()
213 yield = IO $ \s -> 
214    case (yield# s) of s1 -> (# s1, () #)
215
216 {- | 'labelThread' stores a string as identifier for this thread if
217 you built a RTS with debugging support. This identifier will be used in
218 the debugging output to make distinction of different threads easier
219 (otherwise you only have the thread state object\'s address in the heap).
220
221 Other applications like the graphical Concurrent Haskell Debugger
222 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
223 'labelThread' for their purposes as well.
224 -}
225
226 labelThread :: ThreadId -> String -> IO ()
227 labelThread (ThreadId t) str = IO $ \ s ->
228    let ps  = packCString# str
229        adr = byteArrayContents# ps in
230      case (labelThread# t adr s) of s1 -> (# s1, () #)
231
232 --      Nota Bene: 'pseq' used to be 'seq'
233 --                 but 'seq' is now defined in PrelGHC
234 --
235 -- "pseq" is defined a bit weirdly (see below)
236 --
237 -- The reason for the strange "lazy" call is that
238 -- it fools the compiler into thinking that pseq  and par are non-strict in
239 -- their second argument (even if it inlines pseq at the call site).
240 -- If it thinks pseq is strict in "y", then it often evaluates
241 -- "y" before "x", which is totally wrong.  
242
243 {-# INLINE pseq  #-}
244 pseq :: a -> b -> b
245 pseq  x y = x `seq` lazy y
246
247 {-# INLINE par  #-}
248 par :: a -> b -> b
249 par  x y = case (par# x) of { _ -> lazy y }
250 \end{code}
251
252
253 %************************************************************************
254 %*                                                                      *
255 \subsection[stm]{Transactional heap operations}
256 %*                                                                      *
257 %************************************************************************
258
259 TVars are shared memory locations which support atomic memory
260 transactions.
261
262 \begin{code}
263 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
264
265 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
266 unSTM (STM a) = a
267
268 instance  Functor STM where
269    fmap f x = x >>= (return . f)
270
271 instance  Monad STM  where
272     {-# INLINE return #-}
273     {-# INLINE (>>)   #-}
274     {-# INLINE (>>=)  #-}
275     m >> k      = thenSTM m k
276     return x    = returnSTM x
277     m >>= k     = bindSTM m k
278
279 bindSTM :: STM a -> (a -> STM b) -> STM b
280 bindSTM (STM m) k = STM ( \s ->
281   case m s of 
282     (# new_s, a #) -> unSTM (k a) new_s
283   )
284
285 thenSTM :: STM a -> STM b -> STM b
286 thenSTM (STM m) k = STM ( \s ->
287   case m s of 
288     (# new_s, a #) -> unSTM k new_s
289   )
290
291 returnSTM :: a -> STM a
292 returnSTM x = STM (\s -> (# s, x #))
293
294 -- | Unsafely performs IO in the STM monad.
295 unsafeIOToSTM :: IO a -> STM a
296 unsafeIOToSTM (IO m) = STM m
297
298 -- |Perform a series of STM actions atomically.
299 atomically :: STM a -> IO a
300 atomically (STM m) = IO (\s -> (atomically# m) s )
301
302 -- |Retry execution of the current memory transaction because it has seen
303 -- values in TVars which mean that it should not continue (e.g. the TVars
304 -- represent a shared buffer that is now empty).  The implementation may
305 -- block the thread until one of the TVars that it has read from has been
306 -- udpated.
307 retry :: STM a
308 retry = STM $ \s# -> retry# s#
309
310 -- |Compose two alternative STM actions.  If the first action completes without
311 -- retrying then it forms the result of the orElse.  Otherwise, if the first
312 -- action retries, then the second action is tried in its place.  If both actions
313 -- retry then the orElse as a whole retries.
314 orElse :: STM a -> STM a -> STM a
315 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
316
317 -- |Exception handling within STM actions.
318 catchSTM :: STM a -> (Exception -> STM a) -> STM a
319 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
320
321 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
322
323 instance Eq (TVar a) where
324         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
325
326 -- |Create a new TVar holding a value supplied
327 newTVar :: a -> STM (TVar a)
328 newTVar val = STM $ \s1# ->
329     case newTVar# val s1# of
330          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
331
332 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
333 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
334 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
335 -- possible.
336 newTVarIO :: a -> IO (TVar a)
337 newTVarIO val = IO $ \s1# ->
338     case newTVar# val s1# of
339          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
340
341 -- |Return the current value stored in a TVar
342 readTVar :: TVar a -> STM a
343 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
344
345 -- |Write the supplied value into a TVar
346 writeTVar :: TVar a -> a -> STM ()
347 writeTVar (TVar tvar#) val = STM $ \s1# ->
348     case writeTVar# tvar# val s1# of
349          s2# -> (# s2#, () #)
350   
351 \end{code}
352
353 %************************************************************************
354 %*                                                                      *
355 \subsection[mvars]{M-Structures}
356 %*                                                                      *
357 %************************************************************************
358
359 M-Vars are rendezvous points for concurrent threads.  They begin
360 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
361 is written, a single blocked thread may be freed.  Reading an M-Var
362 toggles its state from full back to empty.  Therefore, any value
363 written to an M-Var may only be read once.  Multiple reads and writes
364 are allowed, but there must be at least one read between any two
365 writes.
366
367 \begin{code}
368 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
369
370 -- |Create an 'MVar' which is initially empty.
371 newEmptyMVar  :: IO (MVar a)
372 newEmptyMVar = IO $ \ s# ->
373     case newMVar# s# of
374          (# s2#, svar# #) -> (# s2#, MVar svar# #)
375
376 -- |Create an 'MVar' which contains the supplied value.
377 newMVar :: a -> IO (MVar a)
378 newMVar value =
379     newEmptyMVar        >>= \ mvar ->
380     putMVar mvar value  >>
381     return mvar
382
383 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
384 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
385 -- the 'MVar' is left empty.
386 -- 
387 -- There are two further important properties of 'takeMVar':
388 --
389 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
390 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
391 --     only one thread will be woken up.  The runtime guarantees that
392 --     the woken thread completes its 'takeMVar' operation.
393 --
394 --   * When multiple threads are blocked on an 'MVar', they are
395 --     woken up in FIFO order.  This is useful for providing
396 --     fairness properties of abstractions built using 'MVar's.
397 --
398 takeMVar :: MVar a -> IO a
399 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
400
401 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
402 -- 'putMVar' will wait until it becomes empty.
403 --
404 -- There are two further important properties of 'putMVar':
405 --
406 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
407 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
408 --     only one thread will be woken up.  The runtime guarantees that
409 --     the woken thread completes its 'putMVar' operation.
410 --
411 --   * When multiple threads are blocked on an 'MVar', they are
412 --     woken up in FIFO order.  This is useful for providing
413 --     fairness properties of abstractions built using 'MVar's.
414 --
415 putMVar  :: MVar a -> a -> IO ()
416 putMVar (MVar mvar#) x = IO $ \ s# ->
417     case putMVar# mvar# x s# of
418         s2# -> (# s2#, () #)
419
420 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
421 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
422 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
423 -- the 'MVar' is left empty.
424 tryTakeMVar :: MVar a -> IO (Maybe a)
425 tryTakeMVar (MVar m) = IO $ \ s ->
426     case tryTakeMVar# m s of
427         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
428         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
429
430 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
431 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
432 -- it was successful, or 'False' otherwise.
433 tryPutMVar  :: MVar a -> a -> IO Bool
434 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
435     case tryPutMVar# mvar# x s# of
436         (# s, 0# #) -> (# s, False #)
437         (# s, _  #) -> (# s, True #)
438
439 -- |Check whether a given 'MVar' is empty.
440 --
441 -- Notice that the boolean value returned  is just a snapshot of
442 -- the state of the MVar. By the time you get to react on its result,
443 -- the MVar may have been filled (or emptied) - so be extremely
444 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
445 isEmptyMVar :: MVar a -> IO Bool
446 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
447     case isEmptyMVar# mv# s# of
448         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
449
450 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
451 -- "System.Mem.Weak" for more about finalizers.
452 addMVarFinalizer :: MVar a -> IO () -> IO ()
453 addMVarFinalizer (MVar m) finalizer = 
454   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
455 \end{code}
456
457
458 %************************************************************************
459 %*                                                                      *
460 \subsection{Thread waiting}
461 %*                                                                      *
462 %************************************************************************
463
464 \begin{code}
465 #ifdef mingw32_HOST_OS
466
467 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
468 -- on Win32, but left in there because lib code (still) uses them (the manner
469 -- in which they're used doesn't cause problems on a Win32 platform though.)
470
471 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
472 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
473   IO $ \s -> case asyncRead# fd isSock len buf s of 
474                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
475
476 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
477 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
478   IO $ \s -> case asyncWrite# fd isSock len buf s of 
479                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
480
481 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
482 asyncDoProc (FunPtr proc) (Ptr param) = 
483     -- the 'length' value is ignored; simplifies implementation of
484     -- the async*# primops to have them all return the same result.
485   IO $ \s -> case asyncDoProc# proc param s  of 
486                (# s, len#, err# #) -> (# s, I# err# #)
487
488 -- to aid the use of these primops by the IO Handle implementation,
489 -- provide the following convenience funs:
490
491 -- this better be a pinned byte array!
492 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
493 asyncReadBA fd isSock len off bufB = 
494   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
495   
496 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
497 asyncWriteBA fd isSock len off bufB = 
498   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
499
500 #endif
501
502 -- -----------------------------------------------------------------------------
503 -- Thread IO API
504
505 -- | Block the current thread until data is available to read on the
506 -- given file descriptor (GHC only).
507 threadWaitRead :: Fd -> IO ()
508 threadWaitRead fd
509 #ifndef mingw32_HOST_OS
510   | threaded  = waitForReadEvent fd
511 #endif
512   | otherwise = IO $ \s -> 
513         case fromIntegral fd of { I# fd# ->
514         case waitRead# fd# s of { s -> (# s, () #)
515         }}
516
517 -- | Block the current thread until data can be written to the
518 -- given file descriptor (GHC only).
519 threadWaitWrite :: Fd -> IO ()
520 threadWaitWrite fd
521 #ifndef mingw32_HOST_OS
522   | threaded  = waitForWriteEvent fd
523 #endif
524   | otherwise = IO $ \s -> 
525         case fromIntegral fd of { I# fd# ->
526         case waitWrite# fd# s of { s -> (# s, () #)
527         }}
528
529 -- | Suspends the current thread for a given number of microseconds
530 -- (GHC only).
531 --
532 -- Note that the resolution used by the Haskell runtime system's
533 -- internal timer is 1\/50 second, and 'threadDelay' will round its
534 -- argument up to the nearest multiple of this resolution.
535 --
536 -- There is no guarantee that the thread will be rescheduled promptly
537 -- when the delay has expired, but the thread will never continue to
538 -- run /earlier/ than specified.
539 --
540 threadDelay :: Int -> IO ()
541 threadDelay time
542 #ifndef mingw32_HOST_OS
543   | threaded  = waitForDelayEvent time
544 #else
545   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
546 #endif
547   | otherwise = IO $ \s -> 
548         case fromIntegral time of { I# time# ->
549         case delay# time# s of { s -> (# s, () #)
550         }}
551
552 registerDelay usecs 
553 #ifndef mingw32_HOST_OS
554   | threaded = waitForDelayEventSTM usecs
555   | otherwise = error "registerDelay: requires -threaded"
556 #else
557   = error "registerDelay: not currently supported on Windows"
558 #endif
559
560 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
561 #ifdef mingw32_HOST_OS
562 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
563 #endif
564
565 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
566
567 -- ----------------------------------------------------------------------------
568 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
569
570 -- In the threaded RTS, we employ a single IO Manager thread to wait
571 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
572 -- and delays (threadDelay).  
573 --
574 -- We can do this because in the threaded RTS the IO Manager can make
575 -- a non-blocking call to select(), so we don't have to do select() in
576 -- the scheduler as we have to in the non-threaded RTS.  We get performance
577 -- benefits from doing it this way, because we only have to restart the select()
578 -- when a new request arrives, rather than doing one select() each time
579 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
580 -- by not having to check for completed IO requests.
581
582 -- Issues, possible problems:
583 --
584 --      - we might want bound threads to just do the blocking
585 --        operation rather than communicating with the IO manager
586 --        thread.  This would prevent simgle-threaded programs which do
587 --        IO from requiring multiple OS threads.  However, it would also
588 --        prevent bound threads waiting on IO from being killed or sent
589 --        exceptions.
590 --
591 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
592 --        I couldn't repeat this.
593 --
594 --      - How do we handle signal delivery in the multithreaded RTS?
595 --
596 --      - forkProcess will kill the IO manager thread.  Let's just
597 --        hope we don't need to do any blocking IO between fork & exec.
598
599 #ifndef mingw32_HOST_OS
600
601 data IOReq
602   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
603   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
604
605 data DelayReq
606   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
607   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
608
609 pendingEvents :: IORef [IOReq]
610 pendingDelays :: IORef [DelayReq]
611         -- could use a strict list or array here
612 {-# NOINLINE pendingEvents #-}
613 {-# NOINLINE pendingDelays #-}
614 (pendingEvents,pendingDelays) = unsafePerformIO $ do
615   startIOManagerThread
616   reqs <- newIORef []
617   dels <- newIORef []
618   return (reqs, dels)
619         -- the first time we schedule an IO request, the service thread
620         -- will be created (cool, huh?)
621
622 ensureIOManagerIsRunning :: IO ()
623 ensureIOManagerIsRunning 
624   | threaded  = seq pendingEvents $ return ()
625   | otherwise = return ()
626
627 startIOManagerThread :: IO ()
628 startIOManagerThread = do
629         allocaArray 2 $ \fds -> do
630         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
631         rd_end <- peekElemOff fds 0
632         wr_end <- peekElemOff fds 1
633         writeIORef stick (fromIntegral wr_end)
634         c_setIOManagerPipe wr_end
635         forkIO $ do
636             allocaBytes sizeofFdSet   $ \readfds -> do
637             allocaBytes sizeofFdSet   $ \writefds -> do 
638             allocaBytes sizeofTimeVal $ \timeval -> do
639             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
640         return ()
641
642 service_loop
643    :: Fd                -- listen to this for wakeup calls
644    -> Ptr CFdSet
645    -> Ptr CFdSet
646    -> Ptr CTimeVal
647    -> [IOReq]
648    -> [DelayReq]
649    -> IO ()
650 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
651
652   -- pick up new IO requests
653   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
654   let reqs = new_reqs ++ old_reqs
655
656   -- pick up new delay requests
657   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
658   let  delays = foldr insertDelay old_delays new_delays
659
660   -- build the FDSets for select()
661   fdZero readfds
662   fdZero writefds
663   fdSet wakeup readfds
664   maxfd <- buildFdSets 0 readfds writefds reqs
665
666   -- perform the select()
667   let do_select delays = do
668           -- check the current time and wake up any thread in
669           -- threadDelay whose timeout has expired.  Also find the
670           -- timeout value for the select() call.
671           now <- getTicksOfDay
672           (delays', timeout) <- getDelay now ptimeval delays
673
674           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
675                         nullPtr timeout
676           if (res == -1)
677              then do
678                 err <- getErrno
679                 if err == eINTR
680                         then do_select delays'
681                         else return (res,delays')
682              else
683                 return (res,delays')
684
685   (res,delays') <- do_select delays
686   -- ToDo: check result
687
688   b <- fdIsSet wakeup readfds
689   if b == 0 
690     then return ()
691     else alloca $ \p -> do 
692             c_read (fromIntegral wakeup) p 1; return ()
693             s <- peek p         
694             if (s == 0xff) 
695               then return ()
696               else do handler_tbl <- peek handlers
697                       sp <- peekElemOff handler_tbl (fromIntegral s)
698                       forkIO (do io <- deRefStablePtr sp; io)
699                       return ()
700
701   takeMVar prodding
702   putMVar prodding False
703
704   reqs' <- completeRequests reqs readfds writefds []
705   service_loop wakeup readfds writefds ptimeval reqs' delays'
706
707 stick :: IORef Fd
708 {-# NOINLINE stick #-}
709 stick = unsafePerformIO (newIORef 0)
710
711 prodding :: MVar Bool
712 {-# NOINLINE prodding #-}
713 prodding = unsafePerformIO (newMVar False)
714
715 prodServiceThread :: IO ()
716 prodServiceThread = do
717   b <- takeMVar prodding
718   if (not b) 
719     then do fd <- readIORef stick
720             with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
721     else return ()
722   putMVar prodding True
723
724 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
725
726 foreign import ccall "setIOManagerPipe"
727   c_setIOManagerPipe :: CInt -> IO ()
728
729 -- -----------------------------------------------------------------------------
730 -- IO requests
731
732 buildFdSets maxfd readfds writefds [] = return maxfd
733 buildFdSets maxfd readfds writefds (Read fd m : reqs)
734   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
735   | otherwise        =  do
736         fdSet fd readfds
737         buildFdSets (max maxfd fd) readfds writefds reqs
738 buildFdSets maxfd readfds writefds (Write fd m : reqs)
739   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
740   | otherwise        =  do
741         fdSet fd writefds
742         buildFdSets (max maxfd fd) readfds writefds reqs
743
744 completeRequests [] _ _ reqs' = return reqs'
745 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
746   b <- fdIsSet fd readfds
747   if b /= 0
748     then do putMVar m (); completeRequests reqs readfds writefds reqs'
749     else completeRequests reqs readfds writefds (Read fd m : reqs')
750 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
751   b <- fdIsSet fd writefds
752   if b /= 0
753     then do putMVar m (); completeRequests reqs readfds writefds reqs'
754     else completeRequests reqs readfds writefds (Write fd m : reqs')
755
756 waitForReadEvent :: Fd -> IO ()
757 waitForReadEvent fd = do
758   m <- newEmptyMVar
759   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
760   prodServiceThread
761   takeMVar m
762
763 waitForWriteEvent :: Fd -> IO ()
764 waitForWriteEvent fd = do
765   m <- newEmptyMVar
766   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
767   prodServiceThread
768   takeMVar m
769
770 -- XXX: move into GHC.IOBase from Data.IORef?
771 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
772 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
773
774 -- -----------------------------------------------------------------------------
775 -- Delays
776
777 waitForDelayEvent :: Int -> IO ()
778 waitForDelayEvent usecs = do
779   m <- newEmptyMVar
780   now <- getTicksOfDay
781   let target = now + usecs `quot` tick_usecs
782   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
783   prodServiceThread
784   takeMVar m
785
786 -- Delays for use in STM
787 waitForDelayEventSTM :: Int -> IO (TVar Bool)
788 waitForDelayEventSTM usecs = do
789    t <- atomically $ newTVar False
790    now <- getTicksOfDay
791    let target = now + usecs `quot` tick_usecs
792    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
793    prodServiceThread
794    return t  
795     
796 -- Walk the queue of pending delays, waking up any that have passed
797 -- and return the smallest delay to wait for.  The queue of pending
798 -- delays is kept ordered.
799 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
800 getDelay now ptimeval [] = return ([],nullPtr)
801 getDelay now ptimeval all@(d : rest) 
802   = case d of
803      Delay time m | now >= time -> do
804         putMVar m ()
805         getDelay now ptimeval rest
806      DelaySTM time t | now >= time -> do
807         atomically $ writeTVar t True
808         getDelay now ptimeval rest
809      _otherwise -> do
810         setTimevalTicks ptimeval (delayTime d - now)
811         return (all,ptimeval)
812
813 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
814 insertDelay d [] = [d]
815 insertDelay d1 ds@(d2 : rest)
816   | delayTime d1 <= delayTime d2 = d1 : ds
817   | otherwise                    = d2 : insertDelay d1 rest
818
819 delayTime (Delay t _) = t
820 delayTime (DelaySTM t _) = t
821
822 type Ticks = Int
823 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
824 tick_usecs = 1000000 `quot` tick_freq :: Int
825
826 newtype CTimeVal = CTimeVal ()
827
828 foreign import ccall unsafe "sizeofTimeVal"
829   sizeofTimeVal :: Int
830
831 foreign import ccall unsafe "getTicksOfDay" 
832   getTicksOfDay :: IO Ticks
833
834 foreign import ccall unsafe "setTimevalTicks" 
835   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
836
837 -- ----------------------------------------------------------------------------
838 -- select() interface
839
840 -- ToDo: move to System.Posix.Internals?
841
842 newtype CFdSet = CFdSet ()
843
844 foreign import ccall safe "select"
845   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
846            -> IO CInt
847
848 foreign import ccall unsafe "hsFD_SETSIZE"
849   fD_SETSIZE :: Fd
850
851 foreign import ccall unsafe "hsFD_CLR"
852   fdClr :: Fd -> Ptr CFdSet -> IO ()
853
854 foreign import ccall unsafe "hsFD_ISSET"
855   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
856
857 foreign import ccall unsafe "hsFD_SET"
858   fdSet :: Fd -> Ptr CFdSet -> IO ()
859
860 foreign import ccall unsafe "hsFD_ZERO"
861   fdZero :: Ptr CFdSet -> IO ()
862
863 foreign import ccall unsafe "sizeof_fd_set"
864   sizeofFdSet :: Int
865
866 #endif
867 \end{code}