TopHandler now uses the new extensible exceptions
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
5 -- |
6 -- Module      :  GHC.Conc
7 -- Copyright   :  (c) The University of Glasgow, 1994-2002
8 -- License     :  see libraries/base/LICENSE
9 -- 
10 -- Maintainer  :  cvs-ghc@haskell.org
11 -- Stability   :  internal
12 -- Portability :  non-portable (GHC extensions)
13 --
14 -- Basic concurrency stuff.
15 -- 
16 -----------------------------------------------------------------------------
17
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home.  Hence:
22
23 #include "Typeable.h"
24
25 -- #not-home
26 module GHC.Conc
27         ( ThreadId(..)
28
29         -- * Forking and suchlike
30         , forkIO        -- :: IO a -> IO ThreadId
31         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
32         , numCapabilities -- :: Int
33         , childHandler  -- :: Exception -> IO ()
34         , myThreadId    -- :: IO ThreadId
35         , killThread    -- :: ThreadId -> IO ()
36         , throwTo       -- :: ThreadId -> Exception -> IO ()
37         , par           -- :: a -> b -> b
38         , pseq          -- :: a -> b -> b
39         , yield         -- :: IO ()
40         , labelThread   -- :: ThreadId -> String -> IO ()
41
42         , ThreadStatus(..), BlockReason(..)
43         , threadStatus  -- :: ThreadId -> IO ThreadStatus
44
45         -- * Waiting
46         , threadDelay           -- :: Int -> IO ()
47         , registerDelay         -- :: Int -> IO (TVar Bool)
48         , threadWaitRead        -- :: Int -> IO ()
49         , threadWaitWrite       -- :: Int -> IO ()
50
51         -- * MVars
52         , MVar(..)
53         , newMVar       -- :: a -> IO (MVar a)
54         , newEmptyMVar  -- :: IO (MVar a)
55         , takeMVar      -- :: MVar a -> IO a
56         , putMVar       -- :: MVar a -> a -> IO ()
57         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
58         , tryPutMVar    -- :: MVar a -> a -> IO Bool
59         , isEmptyMVar   -- :: MVar a -> IO Bool
60         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
61
62         -- * TVars
63         , STM(..)
64         , atomically    -- :: STM a -> IO a
65         , retry         -- :: STM a
66         , orElse        -- :: STM a -> STM a -> STM a
67         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
68         , alwaysSucceeds -- :: STM a -> STM ()
69         , always        -- :: STM Bool -> STM ()
70         , TVar(..)
71         , newTVar       -- :: a -> STM (TVar a)
72         , newTVarIO     -- :: a -> STM (TVar a)
73         , readTVar      -- :: TVar a -> STM a
74         , writeTVar     -- :: a -> TVar a -> STM ()
75         , unsafeIOToSTM -- :: IO a -> STM a
76
77         -- * Miscellaneous
78 #ifdef mingw32_HOST_OS
79         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
82
83         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
85 #endif
86
87 #ifndef mingw32_HOST_OS
88         , signalHandlerLock
89 #endif
90
91         , ensureIOManagerIsRunning
92
93 #ifdef mingw32_HOST_OS
94         , ConsoleEvent(..)
95         , win32ConsoleHandler
96         , toWin32ConsoleEvent
97 #endif
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifndef __HADDOCK__
108 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
109 #endif
110
111 import Data.Maybe
112
113 import GHC.Base
114 import GHC.IOBase
115 import GHC.Num          ( Num(..) )
116 import GHC.Real         ( fromIntegral, div )
117 #ifndef mingw32_HOST_OS
118 import GHC.Base         ( Int(..) )
119 #endif
120 #ifdef mingw32_HOST_OS
121 import GHC.Read         ( Read )
122 import GHC.Enum         ( Enum )
123 #endif
124 import GHC.Exception    ( SomeException(..), throw )
125 import GHC.Pack         ( packCString# )
126 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
127 import GHC.STRef
128 import GHC.Show         ( Show(..), showString )
129 import Data.Typeable
130
131 infixr 0 `par`, `pseq`
132 \end{code}
133
134 %************************************************************************
135 %*                                                                      *
136 \subsection{@ThreadId@, @par@, and @fork@}
137 %*                                                                      *
138 %************************************************************************
139
140 \begin{code}
141 data ThreadId = ThreadId ThreadId# deriving( Typeable )
142 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
143 -- But since ThreadId# is unlifted, the Weak type must use open
144 -- type variables.
145 {- ^
146 A 'ThreadId' is an abstract type representing a handle to a thread.
147 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
148 the 'Ord' instance implements an arbitrary total ordering over
149 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
150 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
151 useful when debugging or diagnosing the behaviour of a concurrent
152 program.
153
154 /Note/: in GHC, if you have a 'ThreadId', you essentially have
155 a pointer to the thread itself.  This means the thread itself can\'t be
156 garbage collected until you drop the 'ThreadId'.
157 This misfeature will hopefully be corrected at a later date.
158
159 /Note/: Hugs does not provide any operations on other threads;
160 it defines 'ThreadId' as a synonym for ().
161 -}
162
163 instance Show ThreadId where
164    showsPrec d t = 
165         showString "ThreadId " . 
166         showsPrec d (getThreadId (id2TSO t))
167
168 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
169
170 id2TSO :: ThreadId -> ThreadId#
171 id2TSO (ThreadId t) = t
172
173 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
174 -- Returns -1, 0, 1
175
176 cmpThread :: ThreadId -> ThreadId -> Ordering
177 cmpThread t1 t2 = 
178    case cmp_thread (id2TSO t1) (id2TSO t2) of
179       -1 -> LT
180       0  -> EQ
181       _  -> GT -- must be 1
182
183 instance Eq ThreadId where
184    t1 == t2 = 
185       case t1 `cmpThread` t2 of
186          EQ -> True
187          _  -> False
188
189 instance Ord ThreadId where
190    compare = cmpThread
191
192 {- |
193 Sparks off a new thread to run the 'IO' computation passed as the
194 first argument, and returns the 'ThreadId' of the newly created
195 thread.
196
197 The new thread will be a lightweight thread; if you want to use a foreign
198 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
199
200 GHC note: the new thread inherits the /blocked/ state of the parent 
201 (see 'Control.Exception.block').
202 -}
203 forkIO :: IO () -> IO ThreadId
204 forkIO action = IO $ \ s -> 
205    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
206  where
207   action_plus = catchException action childHandler
208
209 {- |
210 Like 'forkIO', but lets you specify on which CPU the thread is
211 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
212 will stay on the same CPU for its entire lifetime (`forkIO` threads
213 can migrate between CPUs according to the scheduling policy).
214 `forkOnIO` is useful for overriding the scheduling policy when you
215 know in advance how best to distribute the threads.
216
217 The `Int` argument specifies the CPU number; it is interpreted modulo
218 'numCapabilities' (note that it actually specifies a capability number
219 rather than a CPU number, but to a first approximation the two are
220 equivalent).
221 -}
222 forkOnIO :: Int -> IO () -> IO ThreadId
223 forkOnIO (I# cpu) action = IO $ \ s -> 
224    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
225  where
226   action_plus = catchException action childHandler
227
228 -- | the value passed to the @+RTS -N@ flag.  This is the number of
229 -- Haskell threads that can run truly simultaneously at any given
230 -- time, and is typically set to the number of physical CPU cores on
231 -- the machine.
232 numCapabilities :: Int
233 numCapabilities = unsafePerformIO $  do 
234                     n <- peek n_capabilities
235                     return (fromIntegral n)
236
237 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
238
239 childHandler :: SomeException -> IO ()
240 childHandler err = catchException (real_handler err) childHandler
241
242 real_handler :: SomeException -> IO ()
243 real_handler se@(SomeException ex) =
244   -- ignore thread GC and killThread exceptions:
245   case cast ex of
246   Just BlockedOnDeadMVar                -> return ()
247   _ -> case cast ex of
248        Just BlockedIndefinitely         -> return ()
249        _ -> case cast ex of
250             Just ThreadKilled           -> return ()
251             _ -> case cast ex of
252                  -- report all others:
253                  Just StackOverflow     -> reportStackOverflow
254                  _                      -> reportError se
255
256 {- | 'killThread' terminates the given thread (GHC only).
257 Any work already done by the thread isn\'t
258 lost: the computation is suspended until required by another thread.
259 The memory used by the thread will be garbage collected if it isn\'t
260 referenced from anywhere.  The 'killThread' function is defined in
261 terms of 'throwTo':
262
263 > killThread tid = throwTo tid (AsyncException ThreadKilled)
264
265 -}
266 killThread :: ThreadId -> IO ()
267 killThread tid = throwTo tid (toException ThreadKilled)
268
269 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
270
271 'throwTo' does not return until the exception has been raised in the
272 target thread. 
273 The calling thread can thus be certain that the target
274 thread has received the exception.  This is a useful property to know
275 when dealing with race conditions: eg. if there are two threads that
276 can kill each other, it is guaranteed that only one of the threads
277 will get to kill the other.
278
279 If the target thread is currently making a foreign call, then the
280 exception will not be raised (and hence 'throwTo' will not return)
281 until the call has completed.  This is the case regardless of whether
282 the call is inside a 'block' or not.
283
284 Important note: the behaviour of 'throwTo' differs from that described in
285 the paper \"Asynchronous exceptions in Haskell\"
286 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
287 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
288 a more synchronous design in which 'throwTo' does not return until the exception
289 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
290 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
291 the paper).
292
293 There is currently no guarantee that the exception delivered by 'throwTo' will be
294 delivered at the first possible opportunity.  In particular, if a thread may 
295 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
296 a pending 'throwTo'.  This is arguably undesirable behaviour.
297
298  -}
299 -- XXX This is duplicated in Control.{Old,}Exception
300 throwTo :: ThreadId -> SomeException -> IO ()
301 throwTo (ThreadId id) ex = IO $ \ s ->
302    case (killThread# id ex s) of s1 -> (# s1, () #)
303
304 -- | Returns the 'ThreadId' of the calling thread (GHC only).
305 myThreadId :: IO ThreadId
306 myThreadId = IO $ \s ->
307    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
308
309
310 -- |The 'yield' action allows (forces, in a co-operative multitasking
311 -- implementation) a context-switch to any other currently runnable
312 -- threads (if any), and is occasionally useful when implementing
313 -- concurrency abstractions.
314 yield :: IO ()
315 yield = IO $ \s -> 
316    case (yield# s) of s1 -> (# s1, () #)
317
318 {- | 'labelThread' stores a string as identifier for this thread if
319 you built a RTS with debugging support. This identifier will be used in
320 the debugging output to make distinction of different threads easier
321 (otherwise you only have the thread state object\'s address in the heap).
322
323 Other applications like the graphical Concurrent Haskell Debugger
324 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
325 'labelThread' for their purposes as well.
326 -}
327
328 labelThread :: ThreadId -> String -> IO ()
329 labelThread (ThreadId t) str = IO $ \ s ->
330    let ps  = packCString# str
331        adr = byteArrayContents# ps in
332      case (labelThread# t adr s) of s1 -> (# s1, () #)
333
334 --      Nota Bene: 'pseq' used to be 'seq'
335 --                 but 'seq' is now defined in PrelGHC
336 --
337 -- "pseq" is defined a bit weirdly (see below)
338 --
339 -- The reason for the strange "lazy" call is that
340 -- it fools the compiler into thinking that pseq  and par are non-strict in
341 -- their second argument (even if it inlines pseq at the call site).
342 -- If it thinks pseq is strict in "y", then it often evaluates
343 -- "y" before "x", which is totally wrong.  
344
345 {-# INLINE pseq  #-}
346 pseq :: a -> b -> b
347 pseq  x y = x `seq` lazy y
348
349 {-# INLINE par  #-}
350 par :: a -> b -> b
351 par  x y = case (par# x) of { _ -> lazy y }
352
353
354 data BlockReason
355   = BlockedOnMVar
356         -- ^blocked on on 'MVar'
357   | BlockedOnBlackHole
358         -- ^blocked on a computation in progress by another thread
359   | BlockedOnException
360         -- ^blocked in 'throwTo'
361   | BlockedOnSTM
362         -- ^blocked in 'retry' in an STM transaction
363   | BlockedOnForeignCall
364         -- ^currently in a foreign call
365   | BlockedOnOther
366         -- ^blocked on some other resource.  Without @-threaded@,
367         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
368         -- they show up as 'BlockedOnMVar'.
369   deriving (Eq,Ord,Show)
370
371 -- | The current status of a thread
372 data ThreadStatus
373   = ThreadRunning
374         -- ^the thread is currently runnable or running
375   | ThreadFinished
376         -- ^the thread has finished
377   | ThreadBlocked  BlockReason
378         -- ^the thread is blocked on some resource
379   | ThreadDied
380         -- ^the thread received an uncaught exception
381   deriving (Eq,Ord,Show)
382
383 threadStatus :: ThreadId -> IO ThreadStatus
384 threadStatus (ThreadId t) = IO $ \s ->
385    case threadStatus# t s of
386      (# s', stat #) -> (# s', mk_stat (I# stat) #)
387    where
388         -- NB. keep these in sync with includes/Constants.h
389      mk_stat 0  = ThreadRunning
390      mk_stat 1  = ThreadBlocked BlockedOnMVar
391      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
392      mk_stat 3  = ThreadBlocked BlockedOnException
393      mk_stat 7  = ThreadBlocked BlockedOnSTM
394      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
395      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
396      mk_stat 16 = ThreadFinished
397      mk_stat 17 = ThreadDied
398      mk_stat _  = ThreadBlocked BlockedOnOther
399 \end{code}
400
401
402 %************************************************************************
403 %*                                                                      *
404 \subsection[stm]{Transactional heap operations}
405 %*                                                                      *
406 %************************************************************************
407
408 TVars are shared memory locations which support atomic memory
409 transactions.
410
411 \begin{code}
412 -- |A monad supporting atomic memory transactions.
413 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
414
415 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
416 unSTM (STM a) = a
417
418 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
419
420 instance  Functor STM where
421    fmap f x = x >>= (return . f)
422
423 instance  Monad STM  where
424     {-# INLINE return #-}
425     {-# INLINE (>>)   #-}
426     {-# INLINE (>>=)  #-}
427     m >> k      = thenSTM m k
428     return x    = returnSTM x
429     m >>= k     = bindSTM m k
430
431 bindSTM :: STM a -> (a -> STM b) -> STM b
432 bindSTM (STM m) k = STM ( \s ->
433   case m s of 
434     (# new_s, a #) -> unSTM (k a) new_s
435   )
436
437 thenSTM :: STM a -> STM b -> STM b
438 thenSTM (STM m) k = STM ( \s ->
439   case m s of 
440     (# new_s, a #) -> unSTM k new_s
441   )
442
443 returnSTM :: a -> STM a
444 returnSTM x = STM (\s -> (# s, x #))
445
446 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
447 -- dangerous thing to do.  
448 --
449 --   * The STM implementation will often run transactions multiple
450 --     times, so you need to be prepared for this if your IO has any
451 --     side effects.
452 --
453 --   * The STM implementation will abort transactions that are known to
454 --     be invalid and need to be restarted.  This may happen in the middle
455 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
456 --     that need releasing (exception handlers are ignored when aborting
457 --     the transaction).  That includes doing any IO using Handles, for
458 --     example.  Getting this wrong will probably lead to random deadlocks.
459 --
460 --   * The transaction may have seen an inconsistent view of memory when
461 --     the IO runs.  Invariants that you expect to be true throughout
462 --     your program may not be true inside a transaction, due to the
463 --     way transactions are implemented.  Normally this wouldn't be visible
464 --     to the programmer, but using `unsafeIOToSTM` can expose it.
465 --
466 unsafeIOToSTM :: IO a -> STM a
467 unsafeIOToSTM (IO m) = STM m
468
469 -- |Perform a series of STM actions atomically.
470 --
471 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
472 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
473 -- this would effectively allow a transaction inside a transaction, depending
474 -- on exactly when the thunk is evaluated.)
475 --
476 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
477 -- and which allows top-level TVars to be allocated.
478
479 atomically :: STM a -> IO a
480 atomically (STM m) = IO (\s -> (atomically# m) s )
481
482 -- |Retry execution of the current memory transaction because it has seen
483 -- values in TVars which mean that it should not continue (e.g. the TVars
484 -- represent a shared buffer that is now empty).  The implementation may
485 -- block the thread until one of the TVars that it has read from has been
486 -- udpated. (GHC only)
487 retry :: STM a
488 retry = STM $ \s# -> retry# s#
489
490 -- |Compose two alternative STM actions (GHC only).  If the first action
491 -- completes without retrying then it forms the result of the orElse.
492 -- Otherwise, if the first action retries, then the second action is
493 -- tried in its place.  If both actions retry then the orElse as a
494 -- whole retries.
495 orElse :: STM a -> STM a -> STM a
496 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
497
498 -- |Exception handling within STM actions.
499 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
500 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
501
502 -- | Low-level primitive on which always and alwaysSucceeds are built.
503 -- checkInv differs form these in that (i) the invariant is not 
504 -- checked when checkInv is called, only at the end of this and
505 -- subsequent transcations, (ii) the invariant failure is indicated
506 -- by raising an exception.
507 checkInv :: STM a -> STM ()
508 checkInv (STM m) = STM (\s -> (check# m) s)
509
510 -- | alwaysSucceeds adds a new invariant that must be true when passed
511 -- to alwaysSucceeds, at the end of the current transaction, and at
512 -- the end of every subsequent transaction.  If it fails at any
513 -- of those points then the transaction violating it is aborted
514 -- and the exception raised by the invariant is propagated.
515 alwaysSucceeds :: STM a -> STM ()
516 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
517                       checkInv i
518
519 -- | always is a variant of alwaysSucceeds in which the invariant is
520 -- expressed as an STM Bool action that must return True.  Returning
521 -- False or raising an exception are both treated as invariant failures.
522 always :: STM Bool -> STM ()
523 always i = alwaysSucceeds ( do v <- i
524                                if (v) then return () else ( error "Transacional invariant violation" ) )
525
526 -- |Shared memory locations that support atomic memory transactions.
527 data TVar a = TVar (TVar# RealWorld a)
528
529 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
530
531 instance Eq (TVar a) where
532         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
533
534 -- |Create a new TVar holding a value supplied
535 newTVar :: a -> STM (TVar a)
536 newTVar val = STM $ \s1# ->
537     case newTVar# val s1# of
538          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
539
540 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
541 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
542 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
543 -- possible.
544 newTVarIO :: a -> IO (TVar a)
545 newTVarIO val = IO $ \s1# ->
546     case newTVar# val s1# of
547          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
548
549 -- |Return the current value stored in a TVar
550 readTVar :: TVar a -> STM a
551 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
552
553 -- |Write the supplied value into a TVar
554 writeTVar :: TVar a -> a -> STM ()
555 writeTVar (TVar tvar#) val = STM $ \s1# ->
556     case writeTVar# tvar# val s1# of
557          s2# -> (# s2#, () #)
558   
559 \end{code}
560
561 %************************************************************************
562 %*                                                                      *
563 \subsection[mvars]{M-Structures}
564 %*                                                                      *
565 %************************************************************************
566
567 M-Vars are rendezvous points for concurrent threads.  They begin
568 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
569 is written, a single blocked thread may be freed.  Reading an M-Var
570 toggles its state from full back to empty.  Therefore, any value
571 written to an M-Var may only be read once.  Multiple reads and writes
572 are allowed, but there must be at least one read between any two
573 writes.
574
575 \begin{code}
576 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
577
578 -- |Create an 'MVar' which is initially empty.
579 newEmptyMVar  :: IO (MVar a)
580 newEmptyMVar = IO $ \ s# ->
581     case newMVar# s# of
582          (# s2#, svar# #) -> (# s2#, MVar svar# #)
583
584 -- |Create an 'MVar' which contains the supplied value.
585 newMVar :: a -> IO (MVar a)
586 newMVar value =
587     newEmptyMVar        >>= \ mvar ->
588     putMVar mvar value  >>
589     return mvar
590
591 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
592 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
593 -- the 'MVar' is left empty.
594 -- 
595 -- There are two further important properties of 'takeMVar':
596 --
597 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
598 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
599 --     only one thread will be woken up.  The runtime guarantees that
600 --     the woken thread completes its 'takeMVar' operation.
601 --
602 --   * When multiple threads are blocked on an 'MVar', they are
603 --     woken up in FIFO order.  This is useful for providing
604 --     fairness properties of abstractions built using 'MVar's.
605 --
606 takeMVar :: MVar a -> IO a
607 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
608
609 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
610 -- 'putMVar' will wait until it becomes empty.
611 --
612 -- There are two further important properties of 'putMVar':
613 --
614 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
615 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
616 --     only one thread will be woken up.  The runtime guarantees that
617 --     the woken thread completes its 'putMVar' operation.
618 --
619 --   * When multiple threads are blocked on an 'MVar', they are
620 --     woken up in FIFO order.  This is useful for providing
621 --     fairness properties of abstractions built using 'MVar's.
622 --
623 putMVar  :: MVar a -> a -> IO ()
624 putMVar (MVar mvar#) x = IO $ \ s# ->
625     case putMVar# mvar# x s# of
626         s2# -> (# s2#, () #)
627
628 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
629 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
630 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
631 -- the 'MVar' is left empty.
632 tryTakeMVar :: MVar a -> IO (Maybe a)
633 tryTakeMVar (MVar m) = IO $ \ s ->
634     case tryTakeMVar# m s of
635         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
636         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
637
638 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
639 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
640 -- it was successful, or 'False' otherwise.
641 tryPutMVar  :: MVar a -> a -> IO Bool
642 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
643     case tryPutMVar# mvar# x s# of
644         (# s, 0# #) -> (# s, False #)
645         (# s, _  #) -> (# s, True #)
646
647 -- |Check whether a given 'MVar' is empty.
648 --
649 -- Notice that the boolean value returned  is just a snapshot of
650 -- the state of the MVar. By the time you get to react on its result,
651 -- the MVar may have been filled (or emptied) - so be extremely
652 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
653 isEmptyMVar :: MVar a -> IO Bool
654 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
655     case isEmptyMVar# mv# s# of
656         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
657
658 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
659 -- "System.Mem.Weak" for more about finalizers.
660 addMVarFinalizer :: MVar a -> IO () -> IO ()
661 addMVarFinalizer (MVar m) finalizer = 
662   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
663
664 withMVar :: MVar a -> (a -> IO b) -> IO b
665 withMVar m io = 
666   block $ do
667     a <- takeMVar m
668     b <- catchAny (unblock (io a))
669             (\e -> do putMVar m a; throw e)
670     putMVar m a
671     return b
672 \end{code}
673
674
675 %************************************************************************
676 %*                                                                      *
677 \subsection{Thread waiting}
678 %*                                                                      *
679 %************************************************************************
680
681 \begin{code}
682 #ifdef mingw32_HOST_OS
683
684 -- Note: threadWaitRead and threadWaitWrite aren't really functional
685 -- on Win32, but left in there because lib code (still) uses them (the manner
686 -- in which they're used doesn't cause problems on a Win32 platform though.)
687
688 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
689 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
690   IO $ \s -> case asyncRead# fd isSock len buf s of 
691                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
692
693 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
694 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
695   IO $ \s -> case asyncWrite# fd isSock len buf s of 
696                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
697
698 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
699 asyncDoProc (FunPtr proc) (Ptr param) = 
700     -- the 'length' value is ignored; simplifies implementation of
701     -- the async*# primops to have them all return the same result.
702   IO $ \s -> case asyncDoProc# proc param s  of 
703                (# s, len#, err# #) -> (# s, I# err# #)
704
705 -- to aid the use of these primops by the IO Handle implementation,
706 -- provide the following convenience funs:
707
708 -- this better be a pinned byte array!
709 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
710 asyncReadBA fd isSock len off bufB = 
711   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
712   
713 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
714 asyncWriteBA fd isSock len off bufB = 
715   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
716
717 #endif
718
719 -- -----------------------------------------------------------------------------
720 -- Thread IO API
721
722 -- | Block the current thread until data is available to read on the
723 -- given file descriptor (GHC only).
724 threadWaitRead :: Fd -> IO ()
725 threadWaitRead fd
726 #ifndef mingw32_HOST_OS
727   | threaded  = waitForReadEvent fd
728 #endif
729   | otherwise = IO $ \s -> 
730         case fromIntegral fd of { I# fd# ->
731         case waitRead# fd# s of { s -> (# s, () #)
732         }}
733
734 -- | Block the current thread until data can be written to the
735 -- given file descriptor (GHC only).
736 threadWaitWrite :: Fd -> IO ()
737 threadWaitWrite fd
738 #ifndef mingw32_HOST_OS
739   | threaded  = waitForWriteEvent fd
740 #endif
741   | otherwise = IO $ \s -> 
742         case fromIntegral fd of { I# fd# ->
743         case waitWrite# fd# s of { s -> (# s, () #)
744         }}
745
746 -- | Suspends the current thread for a given number of microseconds
747 -- (GHC only).
748 --
749 -- There is no guarantee that the thread will be rescheduled promptly
750 -- when the delay has expired, but the thread will never continue to
751 -- run /earlier/ than specified.
752 --
753 threadDelay :: Int -> IO ()
754 threadDelay time
755   | threaded  = waitForDelayEvent time
756   | otherwise = IO $ \s -> 
757         case fromIntegral time of { I# time# ->
758         case delay# time# s of { s -> (# s, () #)
759         }}
760
761
762 -- | Set the value of returned TVar to True after a given number of
763 -- microseconds. The caveats associated with threadDelay also apply.
764 --
765 registerDelay :: Int -> IO (TVar Bool)
766 registerDelay usecs 
767   | threaded = waitForDelayEventSTM usecs
768   | otherwise = error "registerDelay: requires -threaded"
769
770 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
771
772 waitForDelayEvent :: Int -> IO ()
773 waitForDelayEvent usecs = do
774   m <- newEmptyMVar
775   target <- calculateTarget usecs
776   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
777   prodServiceThread
778   takeMVar m
779
780 -- Delays for use in STM
781 waitForDelayEventSTM :: Int -> IO (TVar Bool)
782 waitForDelayEventSTM usecs = do
783    t <- atomically $ newTVar False
784    target <- calculateTarget usecs
785    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
786    prodServiceThread
787    return t  
788     
789 calculateTarget :: Int -> IO USecs
790 calculateTarget usecs = do
791     now <- getUSecOfDay
792     return $ now + (fromIntegral usecs)
793
794
795 -- ----------------------------------------------------------------------------
796 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
797
798 -- In the threaded RTS, we employ a single IO Manager thread to wait
799 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
800 -- and delays (threadDelay).  
801 --
802 -- We can do this because in the threaded RTS the IO Manager can make
803 -- a non-blocking call to select(), so we don't have to do select() in
804 -- the scheduler as we have to in the non-threaded RTS.  We get performance
805 -- benefits from doing it this way, because we only have to restart the select()
806 -- when a new request arrives, rather than doing one select() each time
807 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
808 -- by not having to check for completed IO requests.
809
810 -- Issues, possible problems:
811 --
812 --      - we might want bound threads to just do the blocking
813 --        operation rather than communicating with the IO manager
814 --        thread.  This would prevent simgle-threaded programs which do
815 --        IO from requiring multiple OS threads.  However, it would also
816 --        prevent bound threads waiting on IO from being killed or sent
817 --        exceptions.
818 --
819 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
820 --        I couldn't repeat this.
821 --
822 --      - How do we handle signal delivery in the multithreaded RTS?
823 --
824 --      - forkProcess will kill the IO manager thread.  Let's just
825 --        hope we don't need to do any blocking IO between fork & exec.
826
827 #ifndef mingw32_HOST_OS
828 data IOReq
829   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
830   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
831 #endif
832
833 data DelayReq
834   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
835   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
836
837 #ifndef mingw32_HOST_OS
838 pendingEvents :: IORef [IOReq]
839 #endif
840 pendingDelays :: IORef [DelayReq]
841         -- could use a strict list or array here
842 {-# NOINLINE pendingEvents #-}
843 {-# NOINLINE pendingDelays #-}
844 (pendingEvents,pendingDelays) = unsafePerformIO $ do
845   startIOManagerThread
846   reqs <- newIORef []
847   dels <- newIORef []
848   return (reqs, dels)
849         -- the first time we schedule an IO request, the service thread
850         -- will be created (cool, huh?)
851
852 ensureIOManagerIsRunning :: IO ()
853 ensureIOManagerIsRunning 
854   | threaded  = seq pendingEvents $ return ()
855   | otherwise = return ()
856
857 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
858 insertDelay d [] = [d]
859 insertDelay d1 ds@(d2 : rest)
860   | delayTime d1 <= delayTime d2 = d1 : ds
861   | otherwise                    = d2 : insertDelay d1 rest
862
863 delayTime :: DelayReq -> USecs
864 delayTime (Delay t _) = t
865 delayTime (DelaySTM t _) = t
866
867 type USecs = Word64
868
869 -- XXX: move into GHC.IOBase from Data.IORef?
870 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
871 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
872
873 foreign import ccall unsafe "getUSecOfDay" 
874   getUSecOfDay :: IO USecs
875
876 prodding :: IORef Bool
877 {-# NOINLINE prodding #-}
878 prodding = unsafePerformIO (newIORef False)
879
880 prodServiceThread :: IO ()
881 prodServiceThread = do
882   was_set <- atomicModifyIORef prodding (\a -> (True,a))
883   if (not (was_set)) then wakeupIOManager else return ()
884
885 #ifdef mingw32_HOST_OS
886 -- ----------------------------------------------------------------------------
887 -- Windows IO manager thread
888
889 startIOManagerThread :: IO ()
890 startIOManagerThread = do
891   wakeup <- c_getIOManagerEvent
892   forkIO $ service_loop wakeup []
893   return ()
894
895 service_loop :: HANDLE          -- read end of pipe
896              -> [DelayReq]      -- current delay requests
897              -> IO ()
898
899 service_loop wakeup old_delays = do
900   -- pick up new delay requests
901   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
902   let  delays = foldr insertDelay old_delays new_delays
903
904   now <- getUSecOfDay
905   (delays', timeout) <- getDelay now delays
906
907   r <- c_WaitForSingleObject wakeup timeout
908   case r of
909     0xffffffff -> do c_maperrno; throwErrno "service_loop"
910     0 -> do
911         r <- c_readIOManagerEvent
912         exit <- 
913               case r of
914                 _ | r == io_MANAGER_WAKEUP -> return False
915                 _ | r == io_MANAGER_DIE    -> return True
916                 0 -> return False -- spurious wakeup
917                 r -> do start_console_handler (r `shiftR` 1); return False
918         if exit
919           then return ()
920           else service_cont wakeup delays'
921
922     _other -> service_cont wakeup delays' -- probably timeout        
923
924 service_cont wakeup delays = do
925   atomicModifyIORef prodding (\_ -> (False,False))
926   service_loop wakeup delays
927
928 -- must agree with rts/win32/ThrIOManager.c
929 io_MANAGER_WAKEUP = 0xffffffff :: Word32
930 io_MANAGER_DIE    = 0xfffffffe :: Word32
931
932 data ConsoleEvent
933  = ControlC
934  | Break
935  | Close
936     -- these are sent to Services only.
937  | Logoff
938  | Shutdown
939  deriving (Eq, Ord, Enum, Show, Read, Typeable)
940
941 start_console_handler :: Word32 -> IO ()
942 start_console_handler r =
943   case toWin32ConsoleEvent r of
944      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
945                     forkIO (handler x)
946                     return ()
947      Nothing -> return ()
948
949 toWin32ConsoleEvent ev = 
950    case ev of
951        0 {- CTRL_C_EVENT-}        -> Just ControlC
952        1 {- CTRL_BREAK_EVENT-}    -> Just Break
953        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
954        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
955        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
956        _ -> Nothing
957
958 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
959 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
960
961 stick :: IORef HANDLE
962 {-# NOINLINE stick #-}
963 stick = unsafePerformIO (newIORef nullPtr)
964
965 wakeupIOManager = do 
966   hdl <- readIORef stick
967   c_sendIOManagerEvent io_MANAGER_WAKEUP
968
969 -- Walk the queue of pending delays, waking up any that have passed
970 -- and return the smallest delay to wait for.  The queue of pending
971 -- delays is kept ordered.
972 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
973 getDelay now [] = return ([], iNFINITE)
974 getDelay now all@(d : rest) 
975   = case d of
976      Delay time m | now >= time -> do
977         putMVar m ()
978         getDelay now rest
979      DelaySTM time t | now >= time -> do
980         atomically $ writeTVar t True
981         getDelay now rest
982      _otherwise ->
983         -- delay is in millisecs for WaitForSingleObject
984         let micro_seconds = delayTime d - now
985             milli_seconds = (micro_seconds + 999) `div` 1000
986         in return (all, fromIntegral milli_seconds)
987
988 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
989 -- available yet.  We should move some Win32 functionality down here,
990 -- maybe as part of the grand reorganisation of the base package...
991 type HANDLE       = Ptr ()
992 type DWORD        = Word32
993
994 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
995
996 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
997   c_getIOManagerEvent :: IO HANDLE
998
999 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1000   c_readIOManagerEvent :: IO Word32
1001
1002 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1003   c_sendIOManagerEvent :: Word32 -> IO ()
1004
1005 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1006    c_maperrno :: IO ()
1007
1008 foreign import stdcall "WaitForSingleObject"
1009    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1010
1011 #else
1012 -- ----------------------------------------------------------------------------
1013 -- Unix IO manager thread, using select()
1014
1015 startIOManagerThread :: IO ()
1016 startIOManagerThread = do
1017         allocaArray 2 $ \fds -> do
1018         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1019         rd_end <- peekElemOff fds 0
1020         wr_end <- peekElemOff fds 1
1021         writeIORef stick (fromIntegral wr_end)
1022         c_setIOManagerPipe wr_end
1023         forkIO $ do
1024             allocaBytes sizeofFdSet   $ \readfds -> do
1025             allocaBytes sizeofFdSet   $ \writefds -> do 
1026             allocaBytes sizeofTimeVal $ \timeval -> do
1027             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1028         return ()
1029
1030 service_loop
1031    :: Fd                -- listen to this for wakeup calls
1032    -> Ptr CFdSet
1033    -> Ptr CFdSet
1034    -> Ptr CTimeVal
1035    -> [IOReq]
1036    -> [DelayReq]
1037    -> IO ()
1038 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1039
1040   -- pick up new IO requests
1041   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1042   let reqs = new_reqs ++ old_reqs
1043
1044   -- pick up new delay requests
1045   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1046   let  delays = foldr insertDelay old_delays new_delays
1047
1048   -- build the FDSets for select()
1049   fdZero readfds
1050   fdZero writefds
1051   fdSet wakeup readfds
1052   maxfd <- buildFdSets 0 readfds writefds reqs
1053
1054   -- perform the select()
1055   let do_select delays = do
1056           -- check the current time and wake up any thread in
1057           -- threadDelay whose timeout has expired.  Also find the
1058           -- timeout value for the select() call.
1059           now <- getUSecOfDay
1060           (delays', timeout) <- getDelay now ptimeval delays
1061
1062           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1063                         nullPtr timeout
1064           if (res == -1)
1065              then do
1066                 err <- getErrno
1067                 case err of
1068                   _ | err == eINTR ->  do_select delays'
1069                         -- EINTR: just redo the select()
1070                   _ | err == eBADF ->  return (True, delays)
1071                         -- EBADF: one of the file descriptors is closed or bad,
1072                         -- we don't know which one, so wake everyone up.
1073                   _ | otherwise    ->  throwErrno "select"
1074                         -- otherwise (ENOMEM or EINVAL) something has gone
1075                         -- wrong; report the error.
1076              else
1077                 return (False,delays')
1078
1079   (wakeup_all,delays') <- do_select delays
1080
1081   exit <-
1082     if wakeup_all then return False
1083       else do
1084         b <- fdIsSet wakeup readfds
1085         if b == 0 
1086           then return False
1087           else alloca $ \p -> do 
1088                  c_read (fromIntegral wakeup) p 1; return ()
1089                  s <- peek p            
1090                  case s of
1091                   _ | s == io_MANAGER_WAKEUP -> return False
1092                   _ | s == io_MANAGER_DIE    -> return True
1093                   _ -> withMVar signalHandlerLock $ \_ -> do
1094                           handler_tbl <- peek handlers
1095                           sp <- peekElemOff handler_tbl (fromIntegral s)
1096                           io <- deRefStablePtr sp
1097                           forkIO io
1098                           return False
1099
1100   if exit then return () else do
1101
1102   atomicModifyIORef prodding (\_ -> (False,False))
1103
1104   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1105                          else completeRequests reqs readfds writefds []
1106
1107   service_loop wakeup readfds writefds ptimeval reqs' delays'
1108
1109 io_MANAGER_WAKEUP = 0xff :: CChar
1110 io_MANAGER_DIE    = 0xfe :: CChar
1111
1112 stick :: IORef Fd
1113 {-# NOINLINE stick #-}
1114 stick = unsafePerformIO (newIORef 0)
1115
1116 wakeupIOManager :: IO ()
1117 wakeupIOManager = do
1118   fd <- readIORef stick
1119   with io_MANAGER_WAKEUP $ \pbuf -> do 
1120     c_write (fromIntegral fd) pbuf 1; return ()
1121
1122 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1123 -- this race condition is #1922, although that bug was on Windows a similar
1124 -- bug also exists on Unix.
1125 signalHandlerLock :: MVar ()
1126 signalHandlerLock = unsafePerformIO (newMVar ())
1127
1128 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1129
1130 foreign import ccall "setIOManagerPipe"
1131   c_setIOManagerPipe :: CInt -> IO ()
1132
1133 -- -----------------------------------------------------------------------------
1134 -- IO requests
1135
1136 buildFdSets maxfd readfds writefds [] = return maxfd
1137 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1138   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1139   | otherwise        =  do
1140         fdSet fd readfds
1141         buildFdSets (max maxfd fd) readfds writefds reqs
1142 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1143   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1144   | otherwise        =  do
1145         fdSet fd writefds
1146         buildFdSets (max maxfd fd) readfds writefds reqs
1147
1148 completeRequests [] _ _ reqs' = return reqs'
1149 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1150   b <- fdIsSet fd readfds
1151   if b /= 0
1152     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1153     else completeRequests reqs readfds writefds (Read fd m : reqs')
1154 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1155   b <- fdIsSet fd writefds
1156   if b /= 0
1157     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1158     else completeRequests reqs readfds writefds (Write fd m : reqs')
1159
1160 wakeupAll [] = return ()
1161 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
1162 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1163
1164 waitForReadEvent :: Fd -> IO ()
1165 waitForReadEvent fd = do
1166   m <- newEmptyMVar
1167   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1168   prodServiceThread
1169   takeMVar m
1170
1171 waitForWriteEvent :: Fd -> IO ()
1172 waitForWriteEvent fd = do
1173   m <- newEmptyMVar
1174   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1175   prodServiceThread
1176   takeMVar m
1177
1178 -- -----------------------------------------------------------------------------
1179 -- Delays
1180
1181 -- Walk the queue of pending delays, waking up any that have passed
1182 -- and return the smallest delay to wait for.  The queue of pending
1183 -- delays is kept ordered.
1184 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1185 getDelay now ptimeval [] = return ([],nullPtr)
1186 getDelay now ptimeval all@(d : rest) 
1187   = case d of
1188      Delay time m | now >= time -> do
1189         putMVar m ()
1190         getDelay now ptimeval rest
1191      DelaySTM time t | now >= time -> do
1192         atomically $ writeTVar t True
1193         getDelay now ptimeval rest
1194      _otherwise -> do
1195         setTimevalTicks ptimeval (delayTime d - now)
1196         return (all,ptimeval)
1197
1198 newtype CTimeVal = CTimeVal ()
1199
1200 foreign import ccall unsafe "sizeofTimeVal"
1201   sizeofTimeVal :: Int
1202
1203 foreign import ccall unsafe "setTimevalTicks" 
1204   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1205
1206 {- 
1207   On Win32 we're going to have a single Pipe, and a
1208   waitForSingleObject with the delay time.  For signals, we send a
1209   byte down the pipe just like on Unix.
1210 -}
1211
1212 -- ----------------------------------------------------------------------------
1213 -- select() interface
1214
1215 -- ToDo: move to System.Posix.Internals?
1216
1217 newtype CFdSet = CFdSet ()
1218
1219 foreign import ccall safe "select"
1220   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1221            -> IO CInt
1222
1223 foreign import ccall unsafe "hsFD_SETSIZE"
1224   c_fD_SETSIZE :: CInt
1225
1226 fD_SETSIZE :: Fd
1227 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1228
1229 foreign import ccall unsafe "hsFD_CLR"
1230   c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1231
1232 fdClr :: Fd -> Ptr CFdSet -> IO ()
1233 fdClr (Fd fd) fdset = c_fdClr fd fdset
1234
1235 foreign import ccall unsafe "hsFD_ISSET"
1236   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1237
1238 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1239 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1240
1241 foreign import ccall unsafe "hsFD_SET"
1242   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1243
1244 fdSet :: Fd -> Ptr CFdSet -> IO ()
1245 fdSet (Fd fd) fdset = c_fdSet fd fdset
1246
1247 foreign import ccall unsafe "hsFD_ZERO"
1248   fdZero :: Ptr CFdSet -> IO ()
1249
1250 foreign import ccall unsafe "sizeof_fd_set"
1251   sizeofFdSet :: Int
1252
1253 #endif
1254
1255 \end{code}