Typeable1 instances for STM and TVar
[haskell-directory.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 #include "Typeable.h"
23
24 -- #not-home
25 module GHC.Conc
26         ( ThreadId(..)
27
28         -- * Forking and suchlike
29         , forkIO        -- :: IO a -> IO ThreadId
30         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
31         , childHandler  -- :: Exception -> IO ()
32         , myThreadId    -- :: IO ThreadId
33         , killThread    -- :: ThreadId -> IO ()
34         , throwTo       -- :: ThreadId -> Exception -> IO ()
35         , par           -- :: a -> b -> b
36         , pseq          -- :: a -> b -> b
37         , yield         -- :: IO ()
38         , labelThread   -- :: ThreadId -> String -> IO ()
39
40         -- * Waiting
41         , threadDelay           -- :: Int -> IO ()
42         , registerDelay         -- :: Int -> IO (TVar Bool)
43         , threadWaitRead        -- :: Int -> IO ()
44         , threadWaitWrite       -- :: Int -> IO ()
45
46         -- * MVars
47         , MVar          -- abstract
48         , newMVar       -- :: a -> IO (MVar a)
49         , newEmptyMVar  -- :: IO (MVar a)
50         , takeMVar      -- :: MVar a -> IO a
51         , putMVar       -- :: MVar a -> a -> IO ()
52         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
53         , tryPutMVar    -- :: MVar a -> a -> IO Bool
54         , isEmptyMVar   -- :: MVar a -> IO Bool
55         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
56
57         -- * TVars
58         , STM           -- abstract
59         , atomically    -- :: STM a -> IO a
60         , retry         -- :: STM a
61         , orElse        -- :: STM a -> STM a -> STM a
62         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
63         , TVar          -- abstract
64         , newTVar       -- :: a -> STM (TVar a)
65         , newTVarIO     -- :: a -> STM (TVar a)
66         , readTVar      -- :: TVar a -> STM a
67         , writeTVar     -- :: a -> TVar a -> STM ()
68         , unsafeIOToSTM -- :: IO a -> STM a
69
70         -- * Miscellaneous
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , ensureIOManagerIsRunning
82 #endif
83         ) where
84
85 import System.Posix.Types
86 import System.Posix.Internals
87 import Foreign
88 import Foreign.C
89
90 #ifndef __HADDOCK__
91 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
92 #endif
93
94 import Data.Maybe
95
96 import GHC.Base
97 import GHC.IOBase
98 import GHC.Num          ( Num(..) )
99 import GHC.Real         ( fromIntegral, quot )
100 import GHC.Base         ( Int(..) )
101 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
102 import GHC.Pack         ( packCString# )
103 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
104 import GHC.STRef
105 import GHC.Show         ( Show(..), showString )
106 import Data.Typeable
107
108 infixr 0 `par`, `pseq`
109 \end{code}
110
111 %************************************************************************
112 %*                                                                      *
113 \subsection{@ThreadId@, @par@, and @fork@}
114 %*                                                                      *
115 %************************************************************************
116
117 \begin{code}
118 data ThreadId = ThreadId ThreadId# deriving( Typeable )
119 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
120 -- But since ThreadId# is unlifted, the Weak type must use open
121 -- type variables.
122 {- ^
123 A 'ThreadId' is an abstract type representing a handle to a thread.
124 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
125 the 'Ord' instance implements an arbitrary total ordering over
126 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
127 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
128 useful when debugging or diagnosing the behaviour of a concurrent
129 program.
130
131 /Note/: in GHC, if you have a 'ThreadId', you essentially have
132 a pointer to the thread itself.  This means the thread itself can\'t be
133 garbage collected until you drop the 'ThreadId'.
134 This misfeature will hopefully be corrected at a later date.
135
136 /Note/: Hugs does not provide any operations on other threads;
137 it defines 'ThreadId' as a synonym for ().
138 -}
139
140 instance Show ThreadId where
141    showsPrec d t = 
142         showString "ThreadId " . 
143         showsPrec d (getThreadId (id2TSO t))
144
145 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
146
147 id2TSO :: ThreadId -> ThreadId#
148 id2TSO (ThreadId t) = t
149
150 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
151 -- Returns -1, 0, 1
152
153 cmpThread :: ThreadId -> ThreadId -> Ordering
154 cmpThread t1 t2 = 
155    case cmp_thread (id2TSO t1) (id2TSO t2) of
156       -1 -> LT
157       0  -> EQ
158       _  -> GT -- must be 1
159
160 instance Eq ThreadId where
161    t1 == t2 = 
162       case t1 `cmpThread` t2 of
163          EQ -> True
164          _  -> False
165
166 instance Ord ThreadId where
167    compare = cmpThread
168
169 {- |
170 This sparks off a new thread to run the 'IO' computation passed as the
171 first argument, and returns the 'ThreadId' of the newly created
172 thread.
173
174 The new thread will be a lightweight thread; if you want to use a foreign
175 library that uses thread-local storage, use 'forkOS' instead.
176 -}
177 forkIO :: IO () -> IO ThreadId
178 forkIO action = IO $ \ s -> 
179    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
180  where
181   action_plus = catchException action childHandler
182
183 forkOnIO :: Int -> IO () -> IO ThreadId
184 forkOnIO (I# cpu) action = IO $ \ s -> 
185    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
186  where
187   action_plus = catchException action childHandler
188
189 childHandler :: Exception -> IO ()
190 childHandler err = catchException (real_handler err) childHandler
191
192 real_handler :: Exception -> IO ()
193 real_handler ex =
194   case ex of
195         -- ignore thread GC and killThread exceptions:
196         BlockedOnDeadMVar            -> return ()
197         BlockedIndefinitely          -> return ()
198         AsyncException ThreadKilled  -> return ()
199
200         -- report all others:
201         AsyncException StackOverflow -> reportStackOverflow
202         other       -> reportError other
203
204 {- | 'killThread' terminates the given thread (GHC only).
205 Any work already done by the thread isn\'t
206 lost: the computation is suspended until required by another thread.
207 The memory used by the thread will be garbage collected if it isn\'t
208 referenced from anywhere.  The 'killThread' function is defined in
209 terms of 'throwTo':
210
211 > killThread tid = throwTo tid (AsyncException ThreadKilled)
212
213 -}
214 killThread :: ThreadId -> IO ()
215 killThread tid = throwTo tid (AsyncException ThreadKilled)
216
217 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
218
219 'throwTo' does not return until the exception has been raised in the
220 target thread.  The calling thread can thus be certain that the target
221 thread has received the exception.  This is a useful property to know
222 when dealing with race conditions: eg. if there are two threads that
223 can kill each other, it is guaranteed that only one of the threads
224 will get to kill the other.
225
226 If the target thread is currently making a foreign call, then the
227 exception will not be raised (and hence 'throwTo' will not return)
228 until the call has completed.  This is the case regardless of whether
229 the call is inside a 'block' or not.
230  -}
231 throwTo :: ThreadId -> Exception -> IO ()
232 throwTo (ThreadId id) ex = IO $ \ s ->
233    case (killThread# id ex s) of s1 -> (# s1, () #)
234
235 -- | Returns the 'ThreadId' of the calling thread (GHC only).
236 myThreadId :: IO ThreadId
237 myThreadId = IO $ \s ->
238    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
239
240
241 -- |The 'yield' action allows (forces, in a co-operative multitasking
242 -- implementation) a context-switch to any other currently runnable
243 -- threads (if any), and is occasionally useful when implementing
244 -- concurrency abstractions.
245 yield :: IO ()
246 yield = IO $ \s -> 
247    case (yield# s) of s1 -> (# s1, () #)
248
249 {- | 'labelThread' stores a string as identifier for this thread if
250 you built a RTS with debugging support. This identifier will be used in
251 the debugging output to make distinction of different threads easier
252 (otherwise you only have the thread state object\'s address in the heap).
253
254 Other applications like the graphical Concurrent Haskell Debugger
255 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
256 'labelThread' for their purposes as well.
257 -}
258
259 labelThread :: ThreadId -> String -> IO ()
260 labelThread (ThreadId t) str = IO $ \ s ->
261    let ps  = packCString# str
262        adr = byteArrayContents# ps in
263      case (labelThread# t adr s) of s1 -> (# s1, () #)
264
265 --      Nota Bene: 'pseq' used to be 'seq'
266 --                 but 'seq' is now defined in PrelGHC
267 --
268 -- "pseq" is defined a bit weirdly (see below)
269 --
270 -- The reason for the strange "lazy" call is that
271 -- it fools the compiler into thinking that pseq  and par are non-strict in
272 -- their second argument (even if it inlines pseq at the call site).
273 -- If it thinks pseq is strict in "y", then it often evaluates
274 -- "y" before "x", which is totally wrong.  
275
276 {-# INLINE pseq  #-}
277 pseq :: a -> b -> b
278 pseq  x y = x `seq` lazy y
279
280 {-# INLINE par  #-}
281 par :: a -> b -> b
282 par  x y = case (par# x) of { _ -> lazy y }
283 \end{code}
284
285
286 %************************************************************************
287 %*                                                                      *
288 \subsection[stm]{Transactional heap operations}
289 %*                                                                      *
290 %************************************************************************
291
292 TVars are shared memory locations which support atomic memory
293 transactions.
294
295 \begin{code}
296 -- |A monad supporting atomic memory transactions.
297 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
298
299 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
300 unSTM (STM a) = a
301
302 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
303
304 instance  Functor STM where
305    fmap f x = x >>= (return . f)
306
307 instance  Monad STM  where
308     {-# INLINE return #-}
309     {-# INLINE (>>)   #-}
310     {-# INLINE (>>=)  #-}
311     m >> k      = thenSTM m k
312     return x    = returnSTM x
313     m >>= k     = bindSTM m k
314
315 bindSTM :: STM a -> (a -> STM b) -> STM b
316 bindSTM (STM m) k = STM ( \s ->
317   case m s of 
318     (# new_s, a #) -> unSTM (k a) new_s
319   )
320
321 thenSTM :: STM a -> STM b -> STM b
322 thenSTM (STM m) k = STM ( \s ->
323   case m s of 
324     (# new_s, a #) -> unSTM k new_s
325   )
326
327 returnSTM :: a -> STM a
328 returnSTM x = STM (\s -> (# s, x #))
329
330 -- | Unsafely performs IO in the STM monad.
331 unsafeIOToSTM :: IO a -> STM a
332 unsafeIOToSTM (IO m) = STM m
333
334 -- |Perform a series of STM actions atomically.
335 --
336 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
337 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
338 -- this would effectively allow a transaction inside a transaction, depending
339 -- on exactly when the thunk is evaluated.)
340 --
341 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
342 -- and which allows top-level TVars to be allocated.
343
344 atomically :: STM a -> IO a
345 atomically (STM m) = IO (\s -> (atomically# m) s )
346
347 -- |Retry execution of the current memory transaction because it has seen
348 -- values in TVars which mean that it should not continue (e.g. the TVars
349 -- represent a shared buffer that is now empty).  The implementation may
350 -- block the thread until one of the TVars that it has read from has been
351 -- udpated. (GHC only)
352 retry :: STM a
353 retry = STM $ \s# -> retry# s#
354
355 -- |Compose two alternative STM actions (GHC only).  If the first action
356 -- completes without retrying then it forms the result of the orElse.
357 -- Otherwise, if the first action retries, then the second action is
358 -- tried in its place.  If both actions retry then the orElse as a
359 -- whole retries.
360 orElse :: STM a -> STM a -> STM a
361 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
362
363 -- |Exception handling within STM actions.
364 catchSTM :: STM a -> (Exception -> STM a) -> STM a
365 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
366
367 -- |Shared memory locations that support atomic memory transactions.
368 data TVar a = TVar (TVar# RealWorld a)
369
370 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
371
372 instance Eq (TVar a) where
373         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
374
375 -- |Create a new TVar holding a value supplied
376 newTVar :: a -> STM (TVar a)
377 newTVar val = STM $ \s1# ->
378     case newTVar# val s1# of
379          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
380
381 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
382 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
383 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
384 -- possible.
385 newTVarIO :: a -> IO (TVar a)
386 newTVarIO val = IO $ \s1# ->
387     case newTVar# val s1# of
388          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
389
390 -- |Return the current value stored in a TVar
391 readTVar :: TVar a -> STM a
392 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
393
394 -- |Write the supplied value into a TVar
395 writeTVar :: TVar a -> a -> STM ()
396 writeTVar (TVar tvar#) val = STM $ \s1# ->
397     case writeTVar# tvar# val s1# of
398          s2# -> (# s2#, () #)
399   
400 \end{code}
401
402 %************************************************************************
403 %*                                                                      *
404 \subsection[mvars]{M-Structures}
405 %*                                                                      *
406 %************************************************************************
407
408 M-Vars are rendezvous points for concurrent threads.  They begin
409 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
410 is written, a single blocked thread may be freed.  Reading an M-Var
411 toggles its state from full back to empty.  Therefore, any value
412 written to an M-Var may only be read once.  Multiple reads and writes
413 are allowed, but there must be at least one read between any two
414 writes.
415
416 \begin{code}
417 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
418
419 -- |Create an 'MVar' which is initially empty.
420 newEmptyMVar  :: IO (MVar a)
421 newEmptyMVar = IO $ \ s# ->
422     case newMVar# s# of
423          (# s2#, svar# #) -> (# s2#, MVar svar# #)
424
425 -- |Create an 'MVar' which contains the supplied value.
426 newMVar :: a -> IO (MVar a)
427 newMVar value =
428     newEmptyMVar        >>= \ mvar ->
429     putMVar mvar value  >>
430     return mvar
431
432 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
433 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
434 -- the 'MVar' is left empty.
435 -- 
436 -- There are two further important properties of 'takeMVar':
437 --
438 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
439 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
440 --     only one thread will be woken up.  The runtime guarantees that
441 --     the woken thread completes its 'takeMVar' operation.
442 --
443 --   * When multiple threads are blocked on an 'MVar', they are
444 --     woken up in FIFO order.  This is useful for providing
445 --     fairness properties of abstractions built using 'MVar's.
446 --
447 takeMVar :: MVar a -> IO a
448 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
449
450 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
451 -- 'putMVar' will wait until it becomes empty.
452 --
453 -- There are two further important properties of 'putMVar':
454 --
455 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
456 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
457 --     only one thread will be woken up.  The runtime guarantees that
458 --     the woken thread completes its 'putMVar' operation.
459 --
460 --   * When multiple threads are blocked on an 'MVar', they are
461 --     woken up in FIFO order.  This is useful for providing
462 --     fairness properties of abstractions built using 'MVar's.
463 --
464 putMVar  :: MVar a -> a -> IO ()
465 putMVar (MVar mvar#) x = IO $ \ s# ->
466     case putMVar# mvar# x s# of
467         s2# -> (# s2#, () #)
468
469 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
470 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
471 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
472 -- the 'MVar' is left empty.
473 tryTakeMVar :: MVar a -> IO (Maybe a)
474 tryTakeMVar (MVar m) = IO $ \ s ->
475     case tryTakeMVar# m s of
476         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
477         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
478
479 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
480 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
481 -- it was successful, or 'False' otherwise.
482 tryPutMVar  :: MVar a -> a -> IO Bool
483 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
484     case tryPutMVar# mvar# x s# of
485         (# s, 0# #) -> (# s, False #)
486         (# s, _  #) -> (# s, True #)
487
488 -- |Check whether a given 'MVar' is empty.
489 --
490 -- Notice that the boolean value returned  is just a snapshot of
491 -- the state of the MVar. By the time you get to react on its result,
492 -- the MVar may have been filled (or emptied) - so be extremely
493 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
494 isEmptyMVar :: MVar a -> IO Bool
495 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
496     case isEmptyMVar# mv# s# of
497         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
498
499 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
500 -- "System.Mem.Weak" for more about finalizers.
501 addMVarFinalizer :: MVar a -> IO () -> IO ()
502 addMVarFinalizer (MVar m) finalizer = 
503   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
504 \end{code}
505
506
507 %************************************************************************
508 %*                                                                      *
509 \subsection{Thread waiting}
510 %*                                                                      *
511 %************************************************************************
512
513 \begin{code}
514 #ifdef mingw32_HOST_OS
515
516 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
517 -- on Win32, but left in there because lib code (still) uses them (the manner
518 -- in which they're used doesn't cause problems on a Win32 platform though.)
519
520 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
521 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
522   IO $ \s -> case asyncRead# fd isSock len buf s of 
523                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
524
525 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
526 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
527   IO $ \s -> case asyncWrite# fd isSock len buf s of 
528                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
529
530 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
531 asyncDoProc (FunPtr proc) (Ptr param) = 
532     -- the 'length' value is ignored; simplifies implementation of
533     -- the async*# primops to have them all return the same result.
534   IO $ \s -> case asyncDoProc# proc param s  of 
535                (# s, len#, err# #) -> (# s, I# err# #)
536
537 -- to aid the use of these primops by the IO Handle implementation,
538 -- provide the following convenience funs:
539
540 -- this better be a pinned byte array!
541 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
542 asyncReadBA fd isSock len off bufB = 
543   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
544   
545 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
546 asyncWriteBA fd isSock len off bufB = 
547   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
548
549 #endif
550
551 -- -----------------------------------------------------------------------------
552 -- Thread IO API
553
554 -- | Block the current thread until data is available to read on the
555 -- given file descriptor (GHC only).
556 threadWaitRead :: Fd -> IO ()
557 threadWaitRead fd
558 #ifndef mingw32_HOST_OS
559   | threaded  = waitForReadEvent fd
560 #endif
561   | otherwise = IO $ \s -> 
562         case fromIntegral fd of { I# fd# ->
563         case waitRead# fd# s of { s -> (# s, () #)
564         }}
565
566 -- | Block the current thread until data can be written to the
567 -- given file descriptor (GHC only).
568 threadWaitWrite :: Fd -> IO ()
569 threadWaitWrite fd
570 #ifndef mingw32_HOST_OS
571   | threaded  = waitForWriteEvent fd
572 #endif
573   | otherwise = IO $ \s -> 
574         case fromIntegral fd of { I# fd# ->
575         case waitWrite# fd# s of { s -> (# s, () #)
576         }}
577
578 -- | Suspends the current thread for a given number of microseconds
579 -- (GHC only).
580 --
581 -- Note that the resolution used by the Haskell runtime system's
582 -- internal timer is 1\/50 second, and 'threadDelay' will round its
583 -- argument up to the nearest multiple of this resolution.
584 --
585 -- There is no guarantee that the thread will be rescheduled promptly
586 -- when the delay has expired, but the thread will never continue to
587 -- run /earlier/ than specified.
588 --
589 threadDelay :: Int -> IO ()
590 threadDelay time
591 #ifndef mingw32_HOST_OS
592   | threaded  = waitForDelayEvent time
593 #else
594   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
595 #endif
596   | otherwise = IO $ \s -> 
597         case fromIntegral time of { I# time# ->
598         case delay# time# s of { s -> (# s, () #)
599         }}
600
601 registerDelay :: Int -> IO (TVar Bool)
602 registerDelay usecs 
603 #ifndef mingw32_HOST_OS
604   | threaded = waitForDelayEventSTM usecs
605   | otherwise = error "registerDelay: requires -threaded"
606 #else
607   = error "registerDelay: not currently supported on Windows"
608 #endif
609
610 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
611 #ifdef mingw32_HOST_OS
612 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
613 #endif
614
615 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
616
617 -- ----------------------------------------------------------------------------
618 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
619
620 -- In the threaded RTS, we employ a single IO Manager thread to wait
621 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
622 -- and delays (threadDelay).  
623 --
624 -- We can do this because in the threaded RTS the IO Manager can make
625 -- a non-blocking call to select(), so we don't have to do select() in
626 -- the scheduler as we have to in the non-threaded RTS.  We get performance
627 -- benefits from doing it this way, because we only have to restart the select()
628 -- when a new request arrives, rather than doing one select() each time
629 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
630 -- by not having to check for completed IO requests.
631
632 -- Issues, possible problems:
633 --
634 --      - we might want bound threads to just do the blocking
635 --        operation rather than communicating with the IO manager
636 --        thread.  This would prevent simgle-threaded programs which do
637 --        IO from requiring multiple OS threads.  However, it would also
638 --        prevent bound threads waiting on IO from being killed or sent
639 --        exceptions.
640 --
641 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
642 --        I couldn't repeat this.
643 --
644 --      - How do we handle signal delivery in the multithreaded RTS?
645 --
646 --      - forkProcess will kill the IO manager thread.  Let's just
647 --        hope we don't need to do any blocking IO between fork & exec.
648
649 #ifndef mingw32_HOST_OS
650
651 data IOReq
652   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
653   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
654
655 data DelayReq
656   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
657   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
658
659 pendingEvents :: IORef [IOReq]
660 pendingDelays :: IORef [DelayReq]
661         -- could use a strict list or array here
662 {-# NOINLINE pendingEvents #-}
663 {-# NOINLINE pendingDelays #-}
664 (pendingEvents,pendingDelays) = unsafePerformIO $ do
665   startIOManagerThread
666   reqs <- newIORef []
667   dels <- newIORef []
668   return (reqs, dels)
669         -- the first time we schedule an IO request, the service thread
670         -- will be created (cool, huh?)
671
672 ensureIOManagerIsRunning :: IO ()
673 ensureIOManagerIsRunning 
674   | threaded  = seq pendingEvents $ return ()
675   | otherwise = return ()
676
677 startIOManagerThread :: IO ()
678 startIOManagerThread = do
679         allocaArray 2 $ \fds -> do
680         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
681         rd_end <- peekElemOff fds 0
682         wr_end <- peekElemOff fds 1
683         writeIORef stick (fromIntegral wr_end)
684         c_setIOManagerPipe wr_end
685         forkIO $ do
686             allocaBytes sizeofFdSet   $ \readfds -> do
687             allocaBytes sizeofFdSet   $ \writefds -> do 
688             allocaBytes sizeofTimeVal $ \timeval -> do
689             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
690         return ()
691
692 service_loop
693    :: Fd                -- listen to this for wakeup calls
694    -> Ptr CFdSet
695    -> Ptr CFdSet
696    -> Ptr CTimeVal
697    -> [IOReq]
698    -> [DelayReq]
699    -> IO ()
700 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
701
702   -- pick up new IO requests
703   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
704   let reqs = new_reqs ++ old_reqs
705
706   -- pick up new delay requests
707   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
708   let  delays = foldr insertDelay old_delays new_delays
709
710   -- build the FDSets for select()
711   fdZero readfds
712   fdZero writefds
713   fdSet wakeup readfds
714   maxfd <- buildFdSets 0 readfds writefds reqs
715
716   -- perform the select()
717   let do_select delays = do
718           -- check the current time and wake up any thread in
719           -- threadDelay whose timeout has expired.  Also find the
720           -- timeout value for the select() call.
721           now <- getTicksOfDay
722           (delays', timeout) <- getDelay now ptimeval delays
723
724           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
725                         nullPtr timeout
726           if (res == -1)
727              then do
728                 err <- getErrno
729                 case err of
730                   _ | err == eINTR ->  do_select delays'
731                         -- EINTR: just redo the select()
732                   _ | err == eBADF ->  return (True, delays)
733                         -- EBADF: one of the file descriptors is closed or bad,
734                         -- we don't know which one, so wake everyone up.
735                   _ | otherwise    ->  throwErrno "select"
736                         -- otherwise (ENOMEM or EINVAL) something has gone
737                         -- wrong; report the error.
738              else
739                 return (False,delays')
740
741   (wakeup_all,delays') <- do_select delays
742
743   exit <-
744     if wakeup_all then return False
745       else do
746         b <- fdIsSet wakeup readfds
747         if b == 0 
748           then return False
749           else alloca $ \p -> do 
750                  c_read (fromIntegral wakeup) p 1; return ()
751                  s <- peek p            
752                  case s of
753                   _ | s == io_MANAGER_WAKEUP -> return False
754                   _ | s == io_MANAGER_DIE    -> return True
755                   _ -> do handler_tbl <- peek handlers
756                           sp <- peekElemOff handler_tbl (fromIntegral s)
757                           forkIO (do io <- deRefStablePtr sp; io)
758                           return False
759
760   if exit then return () else do
761
762   takeMVar prodding
763   putMVar prodding False
764
765   reqs' <- if wakeup_all then do wakeupAll reqs; return []
766                          else completeRequests reqs readfds writefds []
767
768   service_loop wakeup readfds writefds ptimeval reqs' delays'
769
770 stick :: IORef Fd
771 {-# NOINLINE stick #-}
772 stick = unsafePerformIO (newIORef 0)
773
774 io_MANAGER_WAKEUP = 0xff :: CChar
775 io_MANAGER_DIE    = 0xfe :: CChar
776
777 prodding :: MVar Bool
778 {-# NOINLINE prodding #-}
779 prodding = unsafePerformIO (newMVar False)
780
781 prodServiceThread :: IO ()
782 prodServiceThread = do
783   b <- takeMVar prodding
784   if (not b) 
785     then do fd <- readIORef stick
786             with io_MANAGER_WAKEUP $ \pbuf -> do 
787                 c_write (fromIntegral fd) pbuf 1; return ()
788     else return ()
789   putMVar prodding True
790
791 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
792
793 foreign import ccall "setIOManagerPipe"
794   c_setIOManagerPipe :: CInt -> IO ()
795
796 -- -----------------------------------------------------------------------------
797 -- IO requests
798
799 buildFdSets maxfd readfds writefds [] = return maxfd
800 buildFdSets maxfd readfds writefds (Read fd m : reqs)
801   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
802   | otherwise        =  do
803         fdSet fd readfds
804         buildFdSets (max maxfd fd) readfds writefds reqs
805 buildFdSets maxfd readfds writefds (Write fd m : reqs)
806   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
807   | otherwise        =  do
808         fdSet fd writefds
809         buildFdSets (max maxfd fd) readfds writefds reqs
810
811 completeRequests [] _ _ reqs' = return reqs'
812 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
813   b <- fdIsSet fd readfds
814   if b /= 0
815     then do putMVar m (); completeRequests reqs readfds writefds reqs'
816     else completeRequests reqs readfds writefds (Read fd m : reqs')
817 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
818   b <- fdIsSet fd writefds
819   if b /= 0
820     then do putMVar m (); completeRequests reqs readfds writefds reqs'
821     else completeRequests reqs readfds writefds (Write fd m : reqs')
822
823 wakeupAll [] = return ()
824 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
825 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
826
827 waitForReadEvent :: Fd -> IO ()
828 waitForReadEvent fd = do
829   m <- newEmptyMVar
830   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
831   prodServiceThread
832   takeMVar m
833
834 waitForWriteEvent :: Fd -> IO ()
835 waitForWriteEvent fd = do
836   m <- newEmptyMVar
837   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
838   prodServiceThread
839   takeMVar m
840
841 -- XXX: move into GHC.IOBase from Data.IORef?
842 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
843 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
844
845 -- -----------------------------------------------------------------------------
846 -- Delays
847
848 waitForDelayEvent :: Int -> IO ()
849 waitForDelayEvent usecs = do
850   m <- newEmptyMVar
851   now <- getTicksOfDay
852   let target = now + usecs `quot` tick_usecs
853   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
854   prodServiceThread
855   takeMVar m
856
857 -- Delays for use in STM
858 waitForDelayEventSTM :: Int -> IO (TVar Bool)
859 waitForDelayEventSTM usecs = do
860    t <- atomically $ newTVar False
861    now <- getTicksOfDay
862    let target = now + usecs `quot` tick_usecs
863    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
864    prodServiceThread
865    return t  
866     
867 -- Walk the queue of pending delays, waking up any that have passed
868 -- and return the smallest delay to wait for.  The queue of pending
869 -- delays is kept ordered.
870 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
871 getDelay now ptimeval [] = return ([],nullPtr)
872 getDelay now ptimeval all@(d : rest) 
873   = case d of
874      Delay time m | now >= time -> do
875         putMVar m ()
876         getDelay now ptimeval rest
877      DelaySTM time t | now >= time -> do
878         atomically $ writeTVar t True
879         getDelay now ptimeval rest
880      _otherwise -> do
881         setTimevalTicks ptimeval (delayTime d - now)
882         return (all,ptimeval)
883
884 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
885 insertDelay d [] = [d]
886 insertDelay d1 ds@(d2 : rest)
887   | delayTime d1 <= delayTime d2 = d1 : ds
888   | otherwise                    = d2 : insertDelay d1 rest
889
890 delayTime (Delay t _) = t
891 delayTime (DelaySTM t _) = t
892
893 type Ticks = Int
894 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
895 tick_usecs = 1000000 `quot` tick_freq :: Int
896
897 newtype CTimeVal = CTimeVal ()
898
899 foreign import ccall unsafe "sizeofTimeVal"
900   sizeofTimeVal :: Int
901
902 foreign import ccall unsafe "getTicksOfDay" 
903   getTicksOfDay :: IO Ticks
904
905 foreign import ccall unsafe "setTimevalTicks" 
906   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
907
908 -- ----------------------------------------------------------------------------
909 -- select() interface
910
911 -- ToDo: move to System.Posix.Internals?
912
913 newtype CFdSet = CFdSet ()
914
915 foreign import ccall safe "select"
916   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
917            -> IO CInt
918
919 foreign import ccall unsafe "hsFD_SETSIZE"
920   fD_SETSIZE :: Fd
921
922 foreign import ccall unsafe "hsFD_CLR"
923   fdClr :: Fd -> Ptr CFdSet -> IO ()
924
925 foreign import ccall unsafe "hsFD_ISSET"
926   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
927
928 foreign import ccall unsafe "hsFD_SET"
929   fdSet :: Fd -> Ptr CFdSet -> IO ()
930
931 foreign import ccall unsafe "hsFD_ZERO"
932   fdZero :: Ptr CFdSet -> IO ()
933
934 foreign import ccall unsafe "sizeof_fd_set"
935   sizeofFdSet :: Int
936
937 #endif
938 \end{code}