Add threadStatus :: ThreadId -> IO ThreadStatus
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
5 -- |
6 -- Module      :  GHC.Conc
7 -- Copyright   :  (c) The University of Glasgow, 1994-2002
8 -- License     :  see libraries/base/LICENSE
9 -- 
10 -- Maintainer  :  cvs-ghc@haskell.org
11 -- Stability   :  internal
12 -- Portability :  non-portable (GHC extensions)
13 --
14 -- Basic concurrency stuff.
15 -- 
16 -----------------------------------------------------------------------------
17
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home.  Hence:
22
23 #include "Typeable.h"
24
25 -- #not-home
26 module GHC.Conc
27         ( ThreadId(..)
28
29         -- * Forking and suchlike
30         , forkIO        -- :: IO a -> IO ThreadId
31         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
32         , numCapabilities -- :: Int
33         , childHandler  -- :: Exception -> IO ()
34         , myThreadId    -- :: IO ThreadId
35         , killThread    -- :: ThreadId -> IO ()
36         , throwTo       -- :: ThreadId -> Exception -> IO ()
37         , par           -- :: a -> b -> b
38         , pseq          -- :: a -> b -> b
39         , yield         -- :: IO ()
40         , labelThread   -- :: ThreadId -> String -> IO ()
41
42         , ThreadStatus(..), BlockReason(..)
43         , threadStatus  -- :: ThreadId -> IO ThreadStatus
44
45         -- * Waiting
46         , threadDelay           -- :: Int -> IO ()
47         , registerDelay         -- :: Int -> IO (TVar Bool)
48         , threadWaitRead        -- :: Int -> IO ()
49         , threadWaitWrite       -- :: Int -> IO ()
50
51         -- * MVars
52         , MVar(..)
53         , newMVar       -- :: a -> IO (MVar a)
54         , newEmptyMVar  -- :: IO (MVar a)
55         , takeMVar      -- :: MVar a -> IO a
56         , putMVar       -- :: MVar a -> a -> IO ()
57         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
58         , tryPutMVar    -- :: MVar a -> a -> IO Bool
59         , isEmptyMVar   -- :: MVar a -> IO Bool
60         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
61
62         -- * TVars
63         , STM(..)
64         , atomically    -- :: STM a -> IO a
65         , retry         -- :: STM a
66         , orElse        -- :: STM a -> STM a -> STM a
67         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
68         , alwaysSucceeds -- :: STM a -> STM ()
69         , always        -- :: STM Bool -> STM ()
70         , TVar(..)
71         , newTVar       -- :: a -> STM (TVar a)
72         , newTVarIO     -- :: a -> STM (TVar a)
73         , readTVar      -- :: TVar a -> STM a
74         , writeTVar     -- :: a -> TVar a -> STM ()
75         , unsafeIOToSTM -- :: IO a -> STM a
76
77         -- * Miscellaneous
78 #ifdef mingw32_HOST_OS
79         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
82
83         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
85 #endif
86
87 #ifndef mingw32_HOST_OS
88         , signalHandlerLock
89 #endif
90
91         , ensureIOManagerIsRunning
92
93 #ifdef mingw32_HOST_OS
94         , ConsoleEvent(..)
95         , win32ConsoleHandler
96         , toWin32ConsoleEvent
97 #endif
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifndef __HADDOCK__
108 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
109 #endif
110
111 import Data.Maybe
112
113 import GHC.Base
114 import GHC.IOBase
115 import GHC.Num          ( Num(..) )
116 import GHC.Real         ( fromIntegral, div )
117 #ifndef mingw32_HOST_OS
118 import GHC.Base         ( Int(..) )
119 #endif
120 #ifdef mingw32_HOST_OS
121 import GHC.Read         ( Read )
122 import GHC.Enum         ( Enum )
123 #endif
124 import GHC.Exception
125 import GHC.Pack         ( packCString# )
126 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
127 import GHC.STRef
128 import GHC.Show         ( Show(..), showString )
129 import Data.Typeable
130
131 infixr 0 `par`, `pseq`
132 \end{code}
133
134 %************************************************************************
135 %*                                                                      *
136 \subsection{@ThreadId@, @par@, and @fork@}
137 %*                                                                      *
138 %************************************************************************
139
140 \begin{code}
141 data ThreadId = ThreadId ThreadId# deriving( Typeable )
142 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
143 -- But since ThreadId# is unlifted, the Weak type must use open
144 -- type variables.
145 {- ^
146 A 'ThreadId' is an abstract type representing a handle to a thread.
147 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
148 the 'Ord' instance implements an arbitrary total ordering over
149 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
150 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
151 useful when debugging or diagnosing the behaviour of a concurrent
152 program.
153
154 /Note/: in GHC, if you have a 'ThreadId', you essentially have
155 a pointer to the thread itself.  This means the thread itself can\'t be
156 garbage collected until you drop the 'ThreadId'.
157 This misfeature will hopefully be corrected at a later date.
158
159 /Note/: Hugs does not provide any operations on other threads;
160 it defines 'ThreadId' as a synonym for ().
161 -}
162
163 instance Show ThreadId where
164    showsPrec d t = 
165         showString "ThreadId " . 
166         showsPrec d (getThreadId (id2TSO t))
167
168 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
169
170 id2TSO :: ThreadId -> ThreadId#
171 id2TSO (ThreadId t) = t
172
173 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
174 -- Returns -1, 0, 1
175
176 cmpThread :: ThreadId -> ThreadId -> Ordering
177 cmpThread t1 t2 = 
178    case cmp_thread (id2TSO t1) (id2TSO t2) of
179       -1 -> LT
180       0  -> EQ
181       _  -> GT -- must be 1
182
183 instance Eq ThreadId where
184    t1 == t2 = 
185       case t1 `cmpThread` t2 of
186          EQ -> True
187          _  -> False
188
189 instance Ord ThreadId where
190    compare = cmpThread
191
192 {- |
193 Sparks off a new thread to run the 'IO' computation passed as the
194 first argument, and returns the 'ThreadId' of the newly created
195 thread.
196
197 The new thread will be a lightweight thread; if you want to use a foreign
198 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
199
200 GHC note: the new thread inherits the /blocked/ state of the parent 
201 (see 'Control.Exception.block').
202 -}
203 forkIO :: IO () -> IO ThreadId
204 forkIO action = IO $ \ s -> 
205    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
206  where
207   action_plus = catchException action childHandler
208
209 {- |
210 Like 'forkIO', but lets you specify on which CPU the thread is
211 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
212 will stay on the same CPU for its entire lifetime (`forkIO` threads
213 can migrate between CPUs according to the scheduling policy).
214 `forkOnIO` is useful for overriding the scheduling policy when you
215 know in advance how best to distribute the threads.
216
217 The `Int` argument specifies the CPU number; it is interpreted modulo
218 'numCapabilities' (note that it actually specifies a capability number
219 rather than a CPU number, but to a first approximation the two are
220 equivalent).
221 -}
222 forkOnIO :: Int -> IO () -> IO ThreadId
223 forkOnIO (I# cpu) action = IO $ \ s -> 
224    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
225  where
226   action_plus = catchException action childHandler
227
228 -- | the value passed to the @+RTS -N@ flag.  This is the number of
229 -- Haskell threads that can run truly simultaneously at any given
230 -- time, and is typically set to the number of physical CPU cores on
231 -- the machine.
232 numCapabilities :: Int
233 numCapabilities = unsafePerformIO $  do 
234                     n <- peek n_capabilities
235                     return (fromIntegral n)
236
237 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
238
239 childHandler :: Exception -> IO ()
240 childHandler err = catchException (real_handler err) childHandler
241
242 real_handler :: Exception -> IO ()
243 real_handler ex =
244   case ex of
245         -- ignore thread GC and killThread exceptions:
246         BlockedOnDeadMVar            -> return ()
247         BlockedIndefinitely          -> return ()
248         AsyncException ThreadKilled  -> return ()
249
250         -- report all others:
251         AsyncException StackOverflow -> reportStackOverflow
252         other       -> reportError other
253
254 {- | 'killThread' terminates the given thread (GHC only).
255 Any work already done by the thread isn\'t
256 lost: the computation is suspended until required by another thread.
257 The memory used by the thread will be garbage collected if it isn\'t
258 referenced from anywhere.  The 'killThread' function is defined in
259 terms of 'throwTo':
260
261 > killThread tid = throwTo tid (AsyncException ThreadKilled)
262
263 -}
264 killThread :: ThreadId -> IO ()
265 killThread tid = throwTo tid (AsyncException ThreadKilled)
266
267 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
268
269 'throwTo' does not return until the exception has been raised in the
270 target thread. 
271 The calling thread can thus be certain that the target
272 thread has received the exception.  This is a useful property to know
273 when dealing with race conditions: eg. if there are two threads that
274 can kill each other, it is guaranteed that only one of the threads
275 will get to kill the other.
276
277 If the target thread is currently making a foreign call, then the
278 exception will not be raised (and hence 'throwTo' will not return)
279 until the call has completed.  This is the case regardless of whether
280 the call is inside a 'block' or not.
281
282 Important note: the behaviour of 'throwTo' differs from that described in
283 the paper \"Asynchronous exceptions in Haskell\"
284 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
285 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
286 a more synchronous design in which 'throwTo' does not return until the exception
287 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
288 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
289 the paper).
290
291 There is currently no guarantee that the exception delivered by 'throwTo' will be
292 delivered at the first possible opportunity.  In particular, if a thread may 
293 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
294 a pending 'throwTo'.  This is arguably undesirable behaviour.
295
296  -}
297 throwTo :: ThreadId -> Exception -> IO ()
298 throwTo (ThreadId id) ex = IO $ \ s ->
299    case (killThread# id ex s) of s1 -> (# s1, () #)
300
301 -- | Returns the 'ThreadId' of the calling thread (GHC only).
302 myThreadId :: IO ThreadId
303 myThreadId = IO $ \s ->
304    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
305
306
307 -- |The 'yield' action allows (forces, in a co-operative multitasking
308 -- implementation) a context-switch to any other currently runnable
309 -- threads (if any), and is occasionally useful when implementing
310 -- concurrency abstractions.
311 yield :: IO ()
312 yield = IO $ \s -> 
313    case (yield# s) of s1 -> (# s1, () #)
314
315 {- | 'labelThread' stores a string as identifier for this thread if
316 you built a RTS with debugging support. This identifier will be used in
317 the debugging output to make distinction of different threads easier
318 (otherwise you only have the thread state object\'s address in the heap).
319
320 Other applications like the graphical Concurrent Haskell Debugger
321 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
322 'labelThread' for their purposes as well.
323 -}
324
325 labelThread :: ThreadId -> String -> IO ()
326 labelThread (ThreadId t) str = IO $ \ s ->
327    let ps  = packCString# str
328        adr = byteArrayContents# ps in
329      case (labelThread# t adr s) of s1 -> (# s1, () #)
330
331 --      Nota Bene: 'pseq' used to be 'seq'
332 --                 but 'seq' is now defined in PrelGHC
333 --
334 -- "pseq" is defined a bit weirdly (see below)
335 --
336 -- The reason for the strange "lazy" call is that
337 -- it fools the compiler into thinking that pseq  and par are non-strict in
338 -- their second argument (even if it inlines pseq at the call site).
339 -- If it thinks pseq is strict in "y", then it often evaluates
340 -- "y" before "x", which is totally wrong.  
341
342 {-# INLINE pseq  #-}
343 pseq :: a -> b -> b
344 pseq  x y = x `seq` lazy y
345
346 {-# INLINE par  #-}
347 par :: a -> b -> b
348 par  x y = case (par# x) of { _ -> lazy y }
349
350
351 data BlockReason
352   = BlockedOnMVar
353         -- ^blocked on on 'MVar'
354   | BlockedOnBlackHole
355         -- ^blocked on a computation in progress by another thread
356   | BlockedOnException
357         -- ^blocked in 'throwTo'
358   | BlockedOnSTM
359         -- ^blocked in 'retry' in an STM transaction
360   | BlockedOnForeignCall
361         -- ^currently in a foreign call
362   | BlockedOnOther
363         -- ^blocked on some other resource.  Without @-threaded@,
364         -- I/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
365         -- they show up as 'BlockedOnMVar'.
366   deriving (Eq,Ord,Show)
367
368 -- | The current status of a thread
369 data ThreadStatus
370   = ThreadRunning
371         -- ^the thread is currently runnable or running
372   | ThreadFinished
373         -- ^the thread has finished
374   | ThreadBlocked  BlockReason
375         -- ^the thread is blocked on some resource
376   | ThreadDied
377         -- ^the thread received an uncaught exception
378   deriving (Eq,Ord,Show)
379
380 threadStatus :: ThreadId -> IO ThreadStatus
381 threadStatus (ThreadId t) = IO $ \s ->
382    case threadStatus# t s of
383      (# s', stat #) -> (# s', mk_stat (I# stat) #)
384    where
385         -- NB. keep these in sync with includes/Constants.h
386      mk_stat 0  = ThreadRunning
387      mk_stat 1  = ThreadBlocked BlockedOnMVar
388      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
389      mk_stat 3  = ThreadBlocked BlockedOnException
390      mk_stat 7  = ThreadBlocked BlockedOnSTM
391      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
392      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
393      mk_stat 16 = ThreadFinished
394      mk_stat 17 = ThreadDied
395      mk_stat _  = ThreadBlocked BlockedOnOther
396 \end{code}
397
398
399 %************************************************************************
400 %*                                                                      *
401 \subsection[stm]{Transactional heap operations}
402 %*                                                                      *
403 %************************************************************************
404
405 TVars are shared memory locations which support atomic memory
406 transactions.
407
408 \begin{code}
409 -- |A monad supporting atomic memory transactions.
410 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
411
412 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
413 unSTM (STM a) = a
414
415 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
416
417 instance  Functor STM where
418    fmap f x = x >>= (return . f)
419
420 instance  Monad STM  where
421     {-# INLINE return #-}
422     {-# INLINE (>>)   #-}
423     {-# INLINE (>>=)  #-}
424     m >> k      = thenSTM m k
425     return x    = returnSTM x
426     m >>= k     = bindSTM m k
427
428 bindSTM :: STM a -> (a -> STM b) -> STM b
429 bindSTM (STM m) k = STM ( \s ->
430   case m s of 
431     (# new_s, a #) -> unSTM (k a) new_s
432   )
433
434 thenSTM :: STM a -> STM b -> STM b
435 thenSTM (STM m) k = STM ( \s ->
436   case m s of 
437     (# new_s, a #) -> unSTM k new_s
438   )
439
440 returnSTM :: a -> STM a
441 returnSTM x = STM (\s -> (# s, x #))
442
443 -- | Unsafely performs IO in the STM monad.
444 unsafeIOToSTM :: IO a -> STM a
445 unsafeIOToSTM (IO m) = STM m
446
447 -- |Perform a series of STM actions atomically.
448 --
449 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
450 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
451 -- this would effectively allow a transaction inside a transaction, depending
452 -- on exactly when the thunk is evaluated.)
453 --
454 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
455 -- and which allows top-level TVars to be allocated.
456
457 atomically :: STM a -> IO a
458 atomically (STM m) = IO (\s -> (atomically# m) s )
459
460 -- |Retry execution of the current memory transaction because it has seen
461 -- values in TVars which mean that it should not continue (e.g. the TVars
462 -- represent a shared buffer that is now empty).  The implementation may
463 -- block the thread until one of the TVars that it has read from has been
464 -- udpated. (GHC only)
465 retry :: STM a
466 retry = STM $ \s# -> retry# s#
467
468 -- |Compose two alternative STM actions (GHC only).  If the first action
469 -- completes without retrying then it forms the result of the orElse.
470 -- Otherwise, if the first action retries, then the second action is
471 -- tried in its place.  If both actions retry then the orElse as a
472 -- whole retries.
473 orElse :: STM a -> STM a -> STM a
474 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
475
476 -- |Exception handling within STM actions.
477 catchSTM :: STM a -> (Exception -> STM a) -> STM a
478 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
479
480 -- | Low-level primitive on which always and alwaysSucceeds are built.
481 -- checkInv differs form these in that (i) the invariant is not 
482 -- checked when checkInv is called, only at the end of this and
483 -- subsequent transcations, (ii) the invariant failure is indicated
484 -- by raising an exception.
485 checkInv :: STM a -> STM ()
486 checkInv (STM m) = STM (\s -> (check# m) s)
487
488 -- | alwaysSucceeds adds a new invariant that must be true when passed
489 -- to alwaysSucceeds, at the end of the current transaction, and at
490 -- the end of every subsequent transaction.  If it fails at any
491 -- of those points then the transaction violating it is aborted
492 -- and the exception raised by the invariant is propagated.
493 alwaysSucceeds :: STM a -> STM ()
494 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
495                       checkInv i
496
497 -- | always is a variant of alwaysSucceeds in which the invariant is
498 -- expressed as an STM Bool action that must return True.  Returning
499 -- False or raising an exception are both treated as invariant failures.
500 always :: STM Bool -> STM ()
501 always i = alwaysSucceeds ( do v <- i
502                                if (v) then return () else ( error "Transacional invariant violation" ) )
503
504 -- |Shared memory locations that support atomic memory transactions.
505 data TVar a = TVar (TVar# RealWorld a)
506
507 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
508
509 instance Eq (TVar a) where
510         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
511
512 -- |Create a new TVar holding a value supplied
513 newTVar :: a -> STM (TVar a)
514 newTVar val = STM $ \s1# ->
515     case newTVar# val s1# of
516          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
517
518 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
519 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
520 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
521 -- possible.
522 newTVarIO :: a -> IO (TVar a)
523 newTVarIO val = IO $ \s1# ->
524     case newTVar# val s1# of
525          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
526
527 -- |Return the current value stored in a TVar
528 readTVar :: TVar a -> STM a
529 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
530
531 -- |Write the supplied value into a TVar
532 writeTVar :: TVar a -> a -> STM ()
533 writeTVar (TVar tvar#) val = STM $ \s1# ->
534     case writeTVar# tvar# val s1# of
535          s2# -> (# s2#, () #)
536   
537 \end{code}
538
539 %************************************************************************
540 %*                                                                      *
541 \subsection[mvars]{M-Structures}
542 %*                                                                      *
543 %************************************************************************
544
545 M-Vars are rendezvous points for concurrent threads.  They begin
546 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
547 is written, a single blocked thread may be freed.  Reading an M-Var
548 toggles its state from full back to empty.  Therefore, any value
549 written to an M-Var may only be read once.  Multiple reads and writes
550 are allowed, but there must be at least one read between any two
551 writes.
552
553 \begin{code}
554 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
555
556 -- |Create an 'MVar' which is initially empty.
557 newEmptyMVar  :: IO (MVar a)
558 newEmptyMVar = IO $ \ s# ->
559     case newMVar# s# of
560          (# s2#, svar# #) -> (# s2#, MVar svar# #)
561
562 -- |Create an 'MVar' which contains the supplied value.
563 newMVar :: a -> IO (MVar a)
564 newMVar value =
565     newEmptyMVar        >>= \ mvar ->
566     putMVar mvar value  >>
567     return mvar
568
569 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
570 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
571 -- the 'MVar' is left empty.
572 -- 
573 -- There are two further important properties of 'takeMVar':
574 --
575 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
576 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
577 --     only one thread will be woken up.  The runtime guarantees that
578 --     the woken thread completes its 'takeMVar' operation.
579 --
580 --   * When multiple threads are blocked on an 'MVar', they are
581 --     woken up in FIFO order.  This is useful for providing
582 --     fairness properties of abstractions built using 'MVar's.
583 --
584 takeMVar :: MVar a -> IO a
585 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
586
587 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
588 -- 'putMVar' will wait until it becomes empty.
589 --
590 -- There are two further important properties of 'putMVar':
591 --
592 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
593 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
594 --     only one thread will be woken up.  The runtime guarantees that
595 --     the woken thread completes its 'putMVar' operation.
596 --
597 --   * When multiple threads are blocked on an 'MVar', they are
598 --     woken up in FIFO order.  This is useful for providing
599 --     fairness properties of abstractions built using 'MVar's.
600 --
601 putMVar  :: MVar a -> a -> IO ()
602 putMVar (MVar mvar#) x = IO $ \ s# ->
603     case putMVar# mvar# x s# of
604         s2# -> (# s2#, () #)
605
606 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
607 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
608 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
609 -- the 'MVar' is left empty.
610 tryTakeMVar :: MVar a -> IO (Maybe a)
611 tryTakeMVar (MVar m) = IO $ \ s ->
612     case tryTakeMVar# m s of
613         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
614         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
615
616 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
617 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
618 -- it was successful, or 'False' otherwise.
619 tryPutMVar  :: MVar a -> a -> IO Bool
620 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
621     case tryPutMVar# mvar# x s# of
622         (# s, 0# #) -> (# s, False #)
623         (# s, _  #) -> (# s, True #)
624
625 -- |Check whether a given 'MVar' is empty.
626 --
627 -- Notice that the boolean value returned  is just a snapshot of
628 -- the state of the MVar. By the time you get to react on its result,
629 -- the MVar may have been filled (or emptied) - so be extremely
630 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
631 isEmptyMVar :: MVar a -> IO Bool
632 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
633     case isEmptyMVar# mv# s# of
634         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
635
636 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
637 -- "System.Mem.Weak" for more about finalizers.
638 addMVarFinalizer :: MVar a -> IO () -> IO ()
639 addMVarFinalizer (MVar m) finalizer = 
640   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
641
642 withMVar :: MVar a -> (a -> IO b) -> IO b
643 withMVar m io = 
644   block $ do
645     a <- takeMVar m
646     b <- catchException (unblock (io a))
647             (\e -> do putMVar m a; throw e)
648     putMVar m a
649     return b
650 \end{code}
651
652
653 %************************************************************************
654 %*                                                                      *
655 \subsection{Thread waiting}
656 %*                                                                      *
657 %************************************************************************
658
659 \begin{code}
660 #ifdef mingw32_HOST_OS
661
662 -- Note: threadWaitRead and threadWaitWrite aren't really functional
663 -- on Win32, but left in there because lib code (still) uses them (the manner
664 -- in which they're used doesn't cause problems on a Win32 platform though.)
665
666 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
667 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
668   IO $ \s -> case asyncRead# fd isSock len buf s of 
669                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
670
671 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
672 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
673   IO $ \s -> case asyncWrite# fd isSock len buf s of 
674                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
675
676 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
677 asyncDoProc (FunPtr proc) (Ptr param) = 
678     -- the 'length' value is ignored; simplifies implementation of
679     -- the async*# primops to have them all return the same result.
680   IO $ \s -> case asyncDoProc# proc param s  of 
681                (# s, len#, err# #) -> (# s, I# err# #)
682
683 -- to aid the use of these primops by the IO Handle implementation,
684 -- provide the following convenience funs:
685
686 -- this better be a pinned byte array!
687 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
688 asyncReadBA fd isSock len off bufB = 
689   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
690   
691 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
692 asyncWriteBA fd isSock len off bufB = 
693   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
694
695 #endif
696
697 -- -----------------------------------------------------------------------------
698 -- Thread IO API
699
700 -- | Block the current thread until data is available to read on the
701 -- given file descriptor (GHC only).
702 threadWaitRead :: Fd -> IO ()
703 threadWaitRead fd
704 #ifndef mingw32_HOST_OS
705   | threaded  = waitForReadEvent fd
706 #endif
707   | otherwise = IO $ \s -> 
708         case fromIntegral fd of { I# fd# ->
709         case waitRead# fd# s of { s -> (# s, () #)
710         }}
711
712 -- | Block the current thread until data can be written to the
713 -- given file descriptor (GHC only).
714 threadWaitWrite :: Fd -> IO ()
715 threadWaitWrite fd
716 #ifndef mingw32_HOST_OS
717   | threaded  = waitForWriteEvent fd
718 #endif
719   | otherwise = IO $ \s -> 
720         case fromIntegral fd of { I# fd# ->
721         case waitWrite# fd# s of { s -> (# s, () #)
722         }}
723
724 -- | Suspends the current thread for a given number of microseconds
725 -- (GHC only).
726 --
727 -- There is no guarantee that the thread will be rescheduled promptly
728 -- when the delay has expired, but the thread will never continue to
729 -- run /earlier/ than specified.
730 --
731 threadDelay :: Int -> IO ()
732 threadDelay time
733   | threaded  = waitForDelayEvent time
734   | otherwise = IO $ \s -> 
735         case fromIntegral time of { I# time# ->
736         case delay# time# s of { s -> (# s, () #)
737         }}
738
739
740 -- | Set the value of returned TVar to True after a given number of
741 -- microseconds. The caveats associated with threadDelay also apply.
742 --
743 registerDelay :: Int -> IO (TVar Bool)
744 registerDelay usecs 
745   | threaded = waitForDelayEventSTM usecs
746   | otherwise = error "registerDelay: requires -threaded"
747
748 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
749
750 waitForDelayEvent :: Int -> IO ()
751 waitForDelayEvent usecs = do
752   m <- newEmptyMVar
753   target <- calculateTarget usecs
754   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
755   prodServiceThread
756   takeMVar m
757
758 -- Delays for use in STM
759 waitForDelayEventSTM :: Int -> IO (TVar Bool)
760 waitForDelayEventSTM usecs = do
761    t <- atomically $ newTVar False
762    target <- calculateTarget usecs
763    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
764    prodServiceThread
765    return t  
766     
767 calculateTarget :: Int -> IO USecs
768 calculateTarget usecs = do
769     now <- getUSecOfDay
770     return $ now + (fromIntegral usecs)
771
772
773 -- ----------------------------------------------------------------------------
774 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
775
776 -- In the threaded RTS, we employ a single IO Manager thread to wait
777 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
778 -- and delays (threadDelay).  
779 --
780 -- We can do this because in the threaded RTS the IO Manager can make
781 -- a non-blocking call to select(), so we don't have to do select() in
782 -- the scheduler as we have to in the non-threaded RTS.  We get performance
783 -- benefits from doing it this way, because we only have to restart the select()
784 -- when a new request arrives, rather than doing one select() each time
785 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
786 -- by not having to check for completed IO requests.
787
788 -- Issues, possible problems:
789 --
790 --      - we might want bound threads to just do the blocking
791 --        operation rather than communicating with the IO manager
792 --        thread.  This would prevent simgle-threaded programs which do
793 --        IO from requiring multiple OS threads.  However, it would also
794 --        prevent bound threads waiting on IO from being killed or sent
795 --        exceptions.
796 --
797 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
798 --        I couldn't repeat this.
799 --
800 --      - How do we handle signal delivery in the multithreaded RTS?
801 --
802 --      - forkProcess will kill the IO manager thread.  Let's just
803 --        hope we don't need to do any blocking IO between fork & exec.
804
805 #ifndef mingw32_HOST_OS
806 data IOReq
807   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
808   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
809 #endif
810
811 data DelayReq
812   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
813   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
814
815 #ifndef mingw32_HOST_OS
816 pendingEvents :: IORef [IOReq]
817 #endif
818 pendingDelays :: IORef [DelayReq]
819         -- could use a strict list or array here
820 {-# NOINLINE pendingEvents #-}
821 {-# NOINLINE pendingDelays #-}
822 (pendingEvents,pendingDelays) = unsafePerformIO $ do
823   startIOManagerThread
824   reqs <- newIORef []
825   dels <- newIORef []
826   return (reqs, dels)
827         -- the first time we schedule an IO request, the service thread
828         -- will be created (cool, huh?)
829
830 ensureIOManagerIsRunning :: IO ()
831 ensureIOManagerIsRunning 
832   | threaded  = seq pendingEvents $ return ()
833   | otherwise = return ()
834
835 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
836 insertDelay d [] = [d]
837 insertDelay d1 ds@(d2 : rest)
838   | delayTime d1 <= delayTime d2 = d1 : ds
839   | otherwise                    = d2 : insertDelay d1 rest
840
841 delayTime :: DelayReq -> USecs
842 delayTime (Delay t _) = t
843 delayTime (DelaySTM t _) = t
844
845 type USecs = Word64
846
847 -- XXX: move into GHC.IOBase from Data.IORef?
848 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
849 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
850
851 foreign import ccall unsafe "getUSecOfDay" 
852   getUSecOfDay :: IO USecs
853
854 prodding :: IORef Bool
855 {-# NOINLINE prodding #-}
856 prodding = unsafePerformIO (newIORef False)
857
858 prodServiceThread :: IO ()
859 prodServiceThread = do
860   was_set <- atomicModifyIORef prodding (\a -> (True,a))
861   if (not (was_set)) then wakeupIOManager else return ()
862
863 #ifdef mingw32_HOST_OS
864 -- ----------------------------------------------------------------------------
865 -- Windows IO manager thread
866
867 startIOManagerThread :: IO ()
868 startIOManagerThread = do
869   wakeup <- c_getIOManagerEvent
870   forkIO $ service_loop wakeup []
871   return ()
872
873 service_loop :: HANDLE          -- read end of pipe
874              -> [DelayReq]      -- current delay requests
875              -> IO ()
876
877 service_loop wakeup old_delays = do
878   -- pick up new delay requests
879   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
880   let  delays = foldr insertDelay old_delays new_delays
881
882   now <- getUSecOfDay
883   (delays', timeout) <- getDelay now delays
884
885   r <- c_WaitForSingleObject wakeup timeout
886   case r of
887     0xffffffff -> do c_maperrno; throwErrno "service_loop"
888     0 -> do
889         r <- c_readIOManagerEvent
890         exit <- 
891               case r of
892                 _ | r == io_MANAGER_WAKEUP -> return False
893                 _ | r == io_MANAGER_DIE    -> return True
894                 0 -> return False -- spurious wakeup
895                 r -> do start_console_handler (r `shiftR` 1); return False
896         if exit
897           then return ()
898           else service_cont wakeup delays'
899
900     _other -> service_cont wakeup delays' -- probably timeout        
901
902 service_cont wakeup delays = do
903   atomicModifyIORef prodding (\_ -> (False,False))
904   service_loop wakeup delays
905
906 -- must agree with rts/win32/ThrIOManager.c
907 io_MANAGER_WAKEUP = 0xffffffff :: Word32
908 io_MANAGER_DIE    = 0xfffffffe :: Word32
909
910 data ConsoleEvent
911  = ControlC
912  | Break
913  | Close
914     -- these are sent to Services only.
915  | Logoff
916  | Shutdown
917  deriving (Eq, Ord, Enum, Show, Read, Typeable)
918
919 start_console_handler :: Word32 -> IO ()
920 start_console_handler r =
921   case toWin32ConsoleEvent r of
922      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
923                     forkIO (handler x)
924                     return ()
925      Nothing -> return ()
926
927 toWin32ConsoleEvent ev = 
928    case ev of
929        0 {- CTRL_C_EVENT-}        -> Just ControlC
930        1 {- CTRL_BREAK_EVENT-}    -> Just Break
931        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
932        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
933        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
934        _ -> Nothing
935
936 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
937 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
938
939 stick :: IORef HANDLE
940 {-# NOINLINE stick #-}
941 stick = unsafePerformIO (newIORef nullPtr)
942
943 wakeupIOManager = do 
944   hdl <- readIORef stick
945   c_sendIOManagerEvent io_MANAGER_WAKEUP
946
947 -- Walk the queue of pending delays, waking up any that have passed
948 -- and return the smallest delay to wait for.  The queue of pending
949 -- delays is kept ordered.
950 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
951 getDelay now [] = return ([], iNFINITE)
952 getDelay now all@(d : rest) 
953   = case d of
954      Delay time m | now >= time -> do
955         putMVar m ()
956         getDelay now rest
957      DelaySTM time t | now >= time -> do
958         atomically $ writeTVar t True
959         getDelay now rest
960      _otherwise ->
961         -- delay is in millisecs for WaitForSingleObject
962         let micro_seconds = delayTime d - now
963             milli_seconds = (micro_seconds + 999) `div` 1000
964         in return (all, fromIntegral milli_seconds)
965
966 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
967 -- available yet.  We should move some Win32 functionality down here,
968 -- maybe as part of the grand reorganisation of the base package...
969 type HANDLE       = Ptr ()
970 type DWORD        = Word32
971
972 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
973
974 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
975   c_getIOManagerEvent :: IO HANDLE
976
977 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
978   c_readIOManagerEvent :: IO Word32
979
980 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
981   c_sendIOManagerEvent :: Word32 -> IO ()
982
983 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
984    c_maperrno :: IO ()
985
986 foreign import stdcall "WaitForSingleObject"
987    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
988
989 #else
990 -- ----------------------------------------------------------------------------
991 -- Unix IO manager thread, using select()
992
993 startIOManagerThread :: IO ()
994 startIOManagerThread = do
995         allocaArray 2 $ \fds -> do
996         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
997         rd_end <- peekElemOff fds 0
998         wr_end <- peekElemOff fds 1
999         writeIORef stick (fromIntegral wr_end)
1000         c_setIOManagerPipe wr_end
1001         forkIO $ do
1002             allocaBytes sizeofFdSet   $ \readfds -> do
1003             allocaBytes sizeofFdSet   $ \writefds -> do 
1004             allocaBytes sizeofTimeVal $ \timeval -> do
1005             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1006         return ()
1007
1008 service_loop
1009    :: Fd                -- listen to this for wakeup calls
1010    -> Ptr CFdSet
1011    -> Ptr CFdSet
1012    -> Ptr CTimeVal
1013    -> [IOReq]
1014    -> [DelayReq]
1015    -> IO ()
1016 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1017
1018   -- pick up new IO requests
1019   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1020   let reqs = new_reqs ++ old_reqs
1021
1022   -- pick up new delay requests
1023   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1024   let  delays = foldr insertDelay old_delays new_delays
1025
1026   -- build the FDSets for select()
1027   fdZero readfds
1028   fdZero writefds
1029   fdSet wakeup readfds
1030   maxfd <- buildFdSets 0 readfds writefds reqs
1031
1032   -- perform the select()
1033   let do_select delays = do
1034           -- check the current time and wake up any thread in
1035           -- threadDelay whose timeout has expired.  Also find the
1036           -- timeout value for the select() call.
1037           now <- getUSecOfDay
1038           (delays', timeout) <- getDelay now ptimeval delays
1039
1040           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1041                         nullPtr timeout
1042           if (res == -1)
1043              then do
1044                 err <- getErrno
1045                 case err of
1046                   _ | err == eINTR ->  do_select delays'
1047                         -- EINTR: just redo the select()
1048                   _ | err == eBADF ->  return (True, delays)
1049                         -- EBADF: one of the file descriptors is closed or bad,
1050                         -- we don't know which one, so wake everyone up.
1051                   _ | otherwise    ->  throwErrno "select"
1052                         -- otherwise (ENOMEM or EINVAL) something has gone
1053                         -- wrong; report the error.
1054              else
1055                 return (False,delays')
1056
1057   (wakeup_all,delays') <- do_select delays
1058
1059   exit <-
1060     if wakeup_all then return False
1061       else do
1062         b <- fdIsSet wakeup readfds
1063         if b == 0 
1064           then return False
1065           else alloca $ \p -> do 
1066                  c_read (fromIntegral wakeup) p 1; return ()
1067                  s <- peek p            
1068                  case s of
1069                   _ | s == io_MANAGER_WAKEUP -> return False
1070                   _ | s == io_MANAGER_DIE    -> return True
1071                   _ -> withMVar signalHandlerLock $ \_ -> do
1072                           handler_tbl <- peek handlers
1073                           sp <- peekElemOff handler_tbl (fromIntegral s)
1074                           io <- deRefStablePtr sp
1075                           forkIO io
1076                           return False
1077
1078   if exit then return () else do
1079
1080   atomicModifyIORef prodding (\_ -> (False,False))
1081
1082   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1083                          else completeRequests reqs readfds writefds []
1084
1085   service_loop wakeup readfds writefds ptimeval reqs' delays'
1086
1087 io_MANAGER_WAKEUP = 0xff :: CChar
1088 io_MANAGER_DIE    = 0xfe :: CChar
1089
1090 stick :: IORef Fd
1091 {-# NOINLINE stick #-}
1092 stick = unsafePerformIO (newIORef 0)
1093
1094 wakeupIOManager :: IO ()
1095 wakeupIOManager = do
1096   fd <- readIORef stick
1097   with io_MANAGER_WAKEUP $ \pbuf -> do 
1098     c_write (fromIntegral fd) pbuf 1; return ()
1099
1100 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1101 -- this race condition is #1922, although that bug was on Windows a similar
1102 -- bug also exists on Unix.
1103 signalHandlerLock :: MVar ()
1104 signalHandlerLock = unsafePerformIO (newMVar ())
1105
1106 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1107
1108 foreign import ccall "setIOManagerPipe"
1109   c_setIOManagerPipe :: CInt -> IO ()
1110
1111 -- -----------------------------------------------------------------------------
1112 -- IO requests
1113
1114 buildFdSets maxfd readfds writefds [] = return maxfd
1115 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1116   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1117   | otherwise        =  do
1118         fdSet fd readfds
1119         buildFdSets (max maxfd fd) readfds writefds reqs
1120 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1121   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1122   | otherwise        =  do
1123         fdSet fd writefds
1124         buildFdSets (max maxfd fd) readfds writefds reqs
1125
1126 completeRequests [] _ _ reqs' = return reqs'
1127 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1128   b <- fdIsSet fd readfds
1129   if b /= 0
1130     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1131     else completeRequests reqs readfds writefds (Read fd m : reqs')
1132 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1133   b <- fdIsSet fd writefds
1134   if b /= 0
1135     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1136     else completeRequests reqs readfds writefds (Write fd m : reqs')
1137
1138 wakeupAll [] = return ()
1139 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
1140 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1141
1142 waitForReadEvent :: Fd -> IO ()
1143 waitForReadEvent fd = do
1144   m <- newEmptyMVar
1145   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1146   prodServiceThread
1147   takeMVar m
1148
1149 waitForWriteEvent :: Fd -> IO ()
1150 waitForWriteEvent fd = do
1151   m <- newEmptyMVar
1152   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1153   prodServiceThread
1154   takeMVar m
1155
1156 -- -----------------------------------------------------------------------------
1157 -- Delays
1158
1159 -- Walk the queue of pending delays, waking up any that have passed
1160 -- and return the smallest delay to wait for.  The queue of pending
1161 -- delays is kept ordered.
1162 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1163 getDelay now ptimeval [] = return ([],nullPtr)
1164 getDelay now ptimeval all@(d : rest) 
1165   = case d of
1166      Delay time m | now >= time -> do
1167         putMVar m ()
1168         getDelay now ptimeval rest
1169      DelaySTM time t | now >= time -> do
1170         atomically $ writeTVar t True
1171         getDelay now ptimeval rest
1172      _otherwise -> do
1173         setTimevalTicks ptimeval (delayTime d - now)
1174         return (all,ptimeval)
1175
1176 newtype CTimeVal = CTimeVal ()
1177
1178 foreign import ccall unsafe "sizeofTimeVal"
1179   sizeofTimeVal :: Int
1180
1181 foreign import ccall unsafe "setTimevalTicks" 
1182   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1183
1184 {- 
1185   On Win32 we're going to have a single Pipe, and a
1186   waitForSingleObject with the delay time.  For signals, we send a
1187   byte down the pipe just like on Unix.
1188 -}
1189
1190 -- ----------------------------------------------------------------------------
1191 -- select() interface
1192
1193 -- ToDo: move to System.Posix.Internals?
1194
1195 newtype CFdSet = CFdSet ()
1196
1197 foreign import ccall safe "select"
1198   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1199            -> IO CInt
1200
1201 foreign import ccall unsafe "hsFD_SETSIZE"
1202   c_fD_SETSIZE :: CInt
1203
1204 fD_SETSIZE :: Fd
1205 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1206
1207 foreign import ccall unsafe "hsFD_CLR"
1208   c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1209
1210 fdClr :: Fd -> Ptr CFdSet -> IO ()
1211 fdClr (Fd fd) fdset = c_fdClr fd fdset
1212
1213 foreign import ccall unsafe "hsFD_ISSET"
1214   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1215
1216 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1217 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1218
1219 foreign import ccall unsafe "hsFD_SET"
1220   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1221
1222 fdSet :: Fd -> Ptr CFdSet -> IO ()
1223 fdSet (Fd fd) fdset = c_fdSet fd fdset
1224
1225 foreign import ccall unsafe "hsFD_ZERO"
1226   fdZero :: Ptr CFdSet -> IO ()
1227
1228 foreign import ccall unsafe "sizeof_fd_set"
1229   sizeofFdSet :: Int
1230
1231 #endif
1232
1233 \end{code}