Use extensible exceptions at the lowest level
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_HADDOCK not-home #-}
4 -----------------------------------------------------------------------------
5 -- |
6 -- Module      :  GHC.Conc
7 -- Copyright   :  (c) The University of Glasgow, 1994-2002
8 -- License     :  see libraries/base/LICENSE
9 -- 
10 -- Maintainer  :  cvs-ghc@haskell.org
11 -- Stability   :  internal
12 -- Portability :  non-portable (GHC extensions)
13 --
14 -- Basic concurrency stuff.
15 -- 
16 -----------------------------------------------------------------------------
17
18 -- No: #hide, because bits of this module are exposed by the stm package.
19 -- However, we don't want this module to be the home location for the
20 -- bits it exports, we'd rather have Control.Concurrent and the other
21 -- higher level modules be the home.  Hence:
22
23 #include "Typeable.h"
24
25 -- #not-home
26 module GHC.Conc
27         ( ThreadId(..)
28
29         -- * Forking and suchlike
30         , forkIO        -- :: IO a -> IO ThreadId
31         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
32         , numCapabilities -- :: Int
33         , childHandler  -- :: Exception -> IO ()
34         , myThreadId    -- :: IO ThreadId
35         , killThread    -- :: ThreadId -> IO ()
36         , throwTo       -- :: ThreadId -> Exception -> IO ()
37         , par           -- :: a -> b -> b
38         , pseq          -- :: a -> b -> b
39         , yield         -- :: IO ()
40         , labelThread   -- :: ThreadId -> String -> IO ()
41
42         , ThreadStatus(..), BlockReason(..)
43         , threadStatus  -- :: ThreadId -> IO ThreadStatus
44
45         -- * Waiting
46         , threadDelay           -- :: Int -> IO ()
47         , registerDelay         -- :: Int -> IO (TVar Bool)
48         , threadWaitRead        -- :: Int -> IO ()
49         , threadWaitWrite       -- :: Int -> IO ()
50
51         -- * MVars
52         , MVar(..)
53         , newMVar       -- :: a -> IO (MVar a)
54         , newEmptyMVar  -- :: IO (MVar a)
55         , takeMVar      -- :: MVar a -> IO a
56         , putMVar       -- :: MVar a -> a -> IO ()
57         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
58         , tryPutMVar    -- :: MVar a -> a -> IO Bool
59         , isEmptyMVar   -- :: MVar a -> IO Bool
60         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
61
62         -- * TVars
63         , STM(..)
64         , atomically    -- :: STM a -> IO a
65         , retry         -- :: STM a
66         , orElse        -- :: STM a -> STM a -> STM a
67         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
68         , alwaysSucceeds -- :: STM a -> STM ()
69         , always        -- :: STM Bool -> STM ()
70         , TVar(..)
71         , newTVar       -- :: a -> STM (TVar a)
72         , newTVarIO     -- :: a -> STM (TVar a)
73         , readTVar      -- :: TVar a -> STM a
74         , writeTVar     -- :: a -> TVar a -> STM ()
75         , unsafeIOToSTM -- :: IO a -> STM a
76
77         -- * Miscellaneous
78 #ifdef mingw32_HOST_OS
79         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
80         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
82
83         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
84         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
85 #endif
86
87 #ifndef mingw32_HOST_OS
88         , signalHandlerLock
89 #endif
90
91         , ensureIOManagerIsRunning
92
93 #ifdef mingw32_HOST_OS
94         , ConsoleEvent(..)
95         , win32ConsoleHandler
96         , toWin32ConsoleEvent
97 #endif
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifndef __HADDOCK__
108 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
109 #endif
110
111 import Data.Maybe
112
113 import GHC.Base
114 import GHC.IOBase
115 import GHC.Num          ( Num(..) )
116 import GHC.Real         ( fromIntegral, div )
117 #ifndef mingw32_HOST_OS
118 import GHC.Base         ( Int(..) )
119 #endif
120 #ifdef mingw32_HOST_OS
121 import GHC.Read         ( Read )
122 import GHC.Enum         ( Enum )
123 #endif
124 import GHC.Exception    ( catchException, catchAny, throw, block, unblock )
125 import GHC.Pack         ( packCString# )
126 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
127 import GHC.STRef
128 import GHC.Show         ( Show(..), showString )
129 import Data.Typeable
130
131 infixr 0 `par`, `pseq`
132 \end{code}
133
134 %************************************************************************
135 %*                                                                      *
136 \subsection{@ThreadId@, @par@, and @fork@}
137 %*                                                                      *
138 %************************************************************************
139
140 \begin{code}
141 data ThreadId = ThreadId ThreadId# deriving( Typeable )
142 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
143 -- But since ThreadId# is unlifted, the Weak type must use open
144 -- type variables.
145 {- ^
146 A 'ThreadId' is an abstract type representing a handle to a thread.
147 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
148 the 'Ord' instance implements an arbitrary total ordering over
149 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
150 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
151 useful when debugging or diagnosing the behaviour of a concurrent
152 program.
153
154 /Note/: in GHC, if you have a 'ThreadId', you essentially have
155 a pointer to the thread itself.  This means the thread itself can\'t be
156 garbage collected until you drop the 'ThreadId'.
157 This misfeature will hopefully be corrected at a later date.
158
159 /Note/: Hugs does not provide any operations on other threads;
160 it defines 'ThreadId' as a synonym for ().
161 -}
162
163 instance Show ThreadId where
164    showsPrec d t = 
165         showString "ThreadId " . 
166         showsPrec d (getThreadId (id2TSO t))
167
168 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
169
170 id2TSO :: ThreadId -> ThreadId#
171 id2TSO (ThreadId t) = t
172
173 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
174 -- Returns -1, 0, 1
175
176 cmpThread :: ThreadId -> ThreadId -> Ordering
177 cmpThread t1 t2 = 
178    case cmp_thread (id2TSO t1) (id2TSO t2) of
179       -1 -> LT
180       0  -> EQ
181       _  -> GT -- must be 1
182
183 instance Eq ThreadId where
184    t1 == t2 = 
185       case t1 `cmpThread` t2 of
186          EQ -> True
187          _  -> False
188
189 instance Ord ThreadId where
190    compare = cmpThread
191
192 {- |
193 Sparks off a new thread to run the 'IO' computation passed as the
194 first argument, and returns the 'ThreadId' of the newly created
195 thread.
196
197 The new thread will be a lightweight thread; if you want to use a foreign
198 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
199
200 GHC note: the new thread inherits the /blocked/ state of the parent 
201 (see 'Control.Exception.block').
202 -}
203 forkIO :: IO () -> IO ThreadId
204 forkIO action = IO $ \ s -> 
205    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
206  where
207   action_plus = catchException action childHandler
208
209 {- |
210 Like 'forkIO', but lets you specify on which CPU the thread is
211 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
212 will stay on the same CPU for its entire lifetime (`forkIO` threads
213 can migrate between CPUs according to the scheduling policy).
214 `forkOnIO` is useful for overriding the scheduling policy when you
215 know in advance how best to distribute the threads.
216
217 The `Int` argument specifies the CPU number; it is interpreted modulo
218 'numCapabilities' (note that it actually specifies a capability number
219 rather than a CPU number, but to a first approximation the two are
220 equivalent).
221 -}
222 forkOnIO :: Int -> IO () -> IO ThreadId
223 forkOnIO (I# cpu) action = IO $ \ s -> 
224    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
225  where
226   action_plus = catchException action childHandler
227
228 -- | the value passed to the @+RTS -N@ flag.  This is the number of
229 -- Haskell threads that can run truly simultaneously at any given
230 -- time, and is typically set to the number of physical CPU cores on
231 -- the machine.
232 numCapabilities :: Int
233 numCapabilities = unsafePerformIO $  do 
234                     n <- peek n_capabilities
235                     return (fromIntegral n)
236
237 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
238
239 childHandler :: Exception -> IO ()
240 childHandler err = catchException (real_handler err) childHandler
241
242 real_handler :: Exception -> IO ()
243 real_handler ex =
244   case ex of
245         -- ignore thread GC and killThread exceptions:
246         BlockedOnDeadMVar            -> return ()
247         BlockedIndefinitely          -> return ()
248         AsyncException ThreadKilled  -> return ()
249
250         -- report all others:
251         AsyncException StackOverflow -> reportStackOverflow
252         other       -> reportError other
253
254 {- | 'killThread' terminates the given thread (GHC only).
255 Any work already done by the thread isn\'t
256 lost: the computation is suspended until required by another thread.
257 The memory used by the thread will be garbage collected if it isn\'t
258 referenced from anywhere.  The 'killThread' function is defined in
259 terms of 'throwTo':
260
261 > killThread tid = throwTo tid (AsyncException ThreadKilled)
262
263 -}
264 killThread :: ThreadId -> IO ()
265 killThread tid = throwTo tid (AsyncException ThreadKilled)
266
267 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
268
269 'throwTo' does not return until the exception has been raised in the
270 target thread. 
271 The calling thread can thus be certain that the target
272 thread has received the exception.  This is a useful property to know
273 when dealing with race conditions: eg. if there are two threads that
274 can kill each other, it is guaranteed that only one of the threads
275 will get to kill the other.
276
277 If the target thread is currently making a foreign call, then the
278 exception will not be raised (and hence 'throwTo' will not return)
279 until the call has completed.  This is the case regardless of whether
280 the call is inside a 'block' or not.
281
282 Important note: the behaviour of 'throwTo' differs from that described in
283 the paper \"Asynchronous exceptions in Haskell\"
284 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
285 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
286 a more synchronous design in which 'throwTo' does not return until the exception
287 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
288 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
289 the paper).
290
291 There is currently no guarantee that the exception delivered by 'throwTo' will be
292 delivered at the first possible opportunity.  In particular, if a thread may 
293 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
294 a pending 'throwTo'.  This is arguably undesirable behaviour.
295
296  -}
297 throwTo :: ThreadId -> Exception -> IO ()
298 throwTo (ThreadId id) ex = IO $ \ s ->
299    case (killThread# id ex s) of s1 -> (# s1, () #)
300
301 -- | Returns the 'ThreadId' of the calling thread (GHC only).
302 myThreadId :: IO ThreadId
303 myThreadId = IO $ \s ->
304    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
305
306
307 -- |The 'yield' action allows (forces, in a co-operative multitasking
308 -- implementation) a context-switch to any other currently runnable
309 -- threads (if any), and is occasionally useful when implementing
310 -- concurrency abstractions.
311 yield :: IO ()
312 yield = IO $ \s -> 
313    case (yield# s) of s1 -> (# s1, () #)
314
315 {- | 'labelThread' stores a string as identifier for this thread if
316 you built a RTS with debugging support. This identifier will be used in
317 the debugging output to make distinction of different threads easier
318 (otherwise you only have the thread state object\'s address in the heap).
319
320 Other applications like the graphical Concurrent Haskell Debugger
321 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
322 'labelThread' for their purposes as well.
323 -}
324
325 labelThread :: ThreadId -> String -> IO ()
326 labelThread (ThreadId t) str = IO $ \ s ->
327    let ps  = packCString# str
328        adr = byteArrayContents# ps in
329      case (labelThread# t adr s) of s1 -> (# s1, () #)
330
331 --      Nota Bene: 'pseq' used to be 'seq'
332 --                 but 'seq' is now defined in PrelGHC
333 --
334 -- "pseq" is defined a bit weirdly (see below)
335 --
336 -- The reason for the strange "lazy" call is that
337 -- it fools the compiler into thinking that pseq  and par are non-strict in
338 -- their second argument (even if it inlines pseq at the call site).
339 -- If it thinks pseq is strict in "y", then it often evaluates
340 -- "y" before "x", which is totally wrong.  
341
342 {-# INLINE pseq  #-}
343 pseq :: a -> b -> b
344 pseq  x y = x `seq` lazy y
345
346 {-# INLINE par  #-}
347 par :: a -> b -> b
348 par  x y = case (par# x) of { _ -> lazy y }
349
350
351 data BlockReason
352   = BlockedOnMVar
353         -- ^blocked on on 'MVar'
354   | BlockedOnBlackHole
355         -- ^blocked on a computation in progress by another thread
356   | BlockedOnException
357         -- ^blocked in 'throwTo'
358   | BlockedOnSTM
359         -- ^blocked in 'retry' in an STM transaction
360   | BlockedOnForeignCall
361         -- ^currently in a foreign call
362   | BlockedOnOther
363         -- ^blocked on some other resource.  Without @-threaded@,
364         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
365         -- they show up as 'BlockedOnMVar'.
366   deriving (Eq,Ord,Show)
367
368 -- | The current status of a thread
369 data ThreadStatus
370   = ThreadRunning
371         -- ^the thread is currently runnable or running
372   | ThreadFinished
373         -- ^the thread has finished
374   | ThreadBlocked  BlockReason
375         -- ^the thread is blocked on some resource
376   | ThreadDied
377         -- ^the thread received an uncaught exception
378   deriving (Eq,Ord,Show)
379
380 threadStatus :: ThreadId -> IO ThreadStatus
381 threadStatus (ThreadId t) = IO $ \s ->
382    case threadStatus# t s of
383      (# s', stat #) -> (# s', mk_stat (I# stat) #)
384    where
385         -- NB. keep these in sync with includes/Constants.h
386      mk_stat 0  = ThreadRunning
387      mk_stat 1  = ThreadBlocked BlockedOnMVar
388      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
389      mk_stat 3  = ThreadBlocked BlockedOnException
390      mk_stat 7  = ThreadBlocked BlockedOnSTM
391      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
392      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
393      mk_stat 16 = ThreadFinished
394      mk_stat 17 = ThreadDied
395      mk_stat _  = ThreadBlocked BlockedOnOther
396 \end{code}
397
398
399 %************************************************************************
400 %*                                                                      *
401 \subsection[stm]{Transactional heap operations}
402 %*                                                                      *
403 %************************************************************************
404
405 TVars are shared memory locations which support atomic memory
406 transactions.
407
408 \begin{code}
409 -- |A monad supporting atomic memory transactions.
410 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
411
412 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
413 unSTM (STM a) = a
414
415 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
416
417 instance  Functor STM where
418    fmap f x = x >>= (return . f)
419
420 instance  Monad STM  where
421     {-# INLINE return #-}
422     {-# INLINE (>>)   #-}
423     {-# INLINE (>>=)  #-}
424     m >> k      = thenSTM m k
425     return x    = returnSTM x
426     m >>= k     = bindSTM m k
427
428 bindSTM :: STM a -> (a -> STM b) -> STM b
429 bindSTM (STM m) k = STM ( \s ->
430   case m s of 
431     (# new_s, a #) -> unSTM (k a) new_s
432   )
433
434 thenSTM :: STM a -> STM b -> STM b
435 thenSTM (STM m) k = STM ( \s ->
436   case m s of 
437     (# new_s, a #) -> unSTM k new_s
438   )
439
440 returnSTM :: a -> STM a
441 returnSTM x = STM (\s -> (# s, x #))
442
443 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
444 -- dangerous thing to do.  
445 --
446 --   * The STM implementation will often run transactions multiple
447 --     times, so you need to be prepared for this if your IO has any
448 --     side effects.
449 --
450 --   * The STM implementation will abort transactions that are known to
451 --     be invalid and need to be restarted.  This may happen in the middle
452 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
453 --     that need releasing (exception handlers are ignored when aborting
454 --     the transaction).  That includes doing any IO using Handles, for
455 --     example.  Getting this wrong will probably lead to random deadlocks.
456 --
457 --   * The transaction may have seen an inconsistent view of memory when
458 --     the IO runs.  Invariants that you expect to be true throughout
459 --     your program may not be true inside a transaction, due to the
460 --     way transactions are implemented.  Normally this wouldn't be visible
461 --     to the programmer, but using `unsafeIOToSTM` can expose it.
462 --
463 unsafeIOToSTM :: IO a -> STM a
464 unsafeIOToSTM (IO m) = STM m
465
466 -- |Perform a series of STM actions atomically.
467 --
468 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
469 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
470 -- this would effectively allow a transaction inside a transaction, depending
471 -- on exactly when the thunk is evaluated.)
472 --
473 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
474 -- and which allows top-level TVars to be allocated.
475
476 atomically :: STM a -> IO a
477 atomically (STM m) = IO (\s -> (atomically# m) s )
478
479 -- |Retry execution of the current memory transaction because it has seen
480 -- values in TVars which mean that it should not continue (e.g. the TVars
481 -- represent a shared buffer that is now empty).  The implementation may
482 -- block the thread until one of the TVars that it has read from has been
483 -- udpated. (GHC only)
484 retry :: STM a
485 retry = STM $ \s# -> retry# s#
486
487 -- |Compose two alternative STM actions (GHC only).  If the first action
488 -- completes without retrying then it forms the result of the orElse.
489 -- Otherwise, if the first action retries, then the second action is
490 -- tried in its place.  If both actions retry then the orElse as a
491 -- whole retries.
492 orElse :: STM a -> STM a -> STM a
493 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
494
495 -- |Exception handling within STM actions.
496 catchSTM :: STM a -> (Exception -> STM a) -> STM a
497 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
498
499 -- | Low-level primitive on which always and alwaysSucceeds are built.
500 -- checkInv differs form these in that (i) the invariant is not 
501 -- checked when checkInv is called, only at the end of this and
502 -- subsequent transcations, (ii) the invariant failure is indicated
503 -- by raising an exception.
504 checkInv :: STM a -> STM ()
505 checkInv (STM m) = STM (\s -> (check# m) s)
506
507 -- | alwaysSucceeds adds a new invariant that must be true when passed
508 -- to alwaysSucceeds, at the end of the current transaction, and at
509 -- the end of every subsequent transaction.  If it fails at any
510 -- of those points then the transaction violating it is aborted
511 -- and the exception raised by the invariant is propagated.
512 alwaysSucceeds :: STM a -> STM ()
513 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
514                       checkInv i
515
516 -- | always is a variant of alwaysSucceeds in which the invariant is
517 -- expressed as an STM Bool action that must return True.  Returning
518 -- False or raising an exception are both treated as invariant failures.
519 always :: STM Bool -> STM ()
520 always i = alwaysSucceeds ( do v <- i
521                                if (v) then return () else ( error "Transacional invariant violation" ) )
522
523 -- |Shared memory locations that support atomic memory transactions.
524 data TVar a = TVar (TVar# RealWorld a)
525
526 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
527
528 instance Eq (TVar a) where
529         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
530
531 -- |Create a new TVar holding a value supplied
532 newTVar :: a -> STM (TVar a)
533 newTVar val = STM $ \s1# ->
534     case newTVar# val s1# of
535          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
536
537 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
538 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
539 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
540 -- possible.
541 newTVarIO :: a -> IO (TVar a)
542 newTVarIO val = IO $ \s1# ->
543     case newTVar# val s1# of
544          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
545
546 -- |Return the current value stored in a TVar
547 readTVar :: TVar a -> STM a
548 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
549
550 -- |Write the supplied value into a TVar
551 writeTVar :: TVar a -> a -> STM ()
552 writeTVar (TVar tvar#) val = STM $ \s1# ->
553     case writeTVar# tvar# val s1# of
554          s2# -> (# s2#, () #)
555   
556 \end{code}
557
558 %************************************************************************
559 %*                                                                      *
560 \subsection[mvars]{M-Structures}
561 %*                                                                      *
562 %************************************************************************
563
564 M-Vars are rendezvous points for concurrent threads.  They begin
565 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
566 is written, a single blocked thread may be freed.  Reading an M-Var
567 toggles its state from full back to empty.  Therefore, any value
568 written to an M-Var may only be read once.  Multiple reads and writes
569 are allowed, but there must be at least one read between any two
570 writes.
571
572 \begin{code}
573 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
574
575 -- |Create an 'MVar' which is initially empty.
576 newEmptyMVar  :: IO (MVar a)
577 newEmptyMVar = IO $ \ s# ->
578     case newMVar# s# of
579          (# s2#, svar# #) -> (# s2#, MVar svar# #)
580
581 -- |Create an 'MVar' which contains the supplied value.
582 newMVar :: a -> IO (MVar a)
583 newMVar value =
584     newEmptyMVar        >>= \ mvar ->
585     putMVar mvar value  >>
586     return mvar
587
588 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
589 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
590 -- the 'MVar' is left empty.
591 -- 
592 -- There are two further important properties of 'takeMVar':
593 --
594 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
595 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
596 --     only one thread will be woken up.  The runtime guarantees that
597 --     the woken thread completes its 'takeMVar' operation.
598 --
599 --   * When multiple threads are blocked on an 'MVar', they are
600 --     woken up in FIFO order.  This is useful for providing
601 --     fairness properties of abstractions built using 'MVar's.
602 --
603 takeMVar :: MVar a -> IO a
604 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
605
606 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
607 -- 'putMVar' will wait until it becomes empty.
608 --
609 -- There are two further important properties of 'putMVar':
610 --
611 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
612 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
613 --     only one thread will be woken up.  The runtime guarantees that
614 --     the woken thread completes its 'putMVar' operation.
615 --
616 --   * When multiple threads are blocked on an 'MVar', they are
617 --     woken up in FIFO order.  This is useful for providing
618 --     fairness properties of abstractions built using 'MVar's.
619 --
620 putMVar  :: MVar a -> a -> IO ()
621 putMVar (MVar mvar#) x = IO $ \ s# ->
622     case putMVar# mvar# x s# of
623         s2# -> (# s2#, () #)
624
625 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
626 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
627 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
628 -- the 'MVar' is left empty.
629 tryTakeMVar :: MVar a -> IO (Maybe a)
630 tryTakeMVar (MVar m) = IO $ \ s ->
631     case tryTakeMVar# m s of
632         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
633         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
634
635 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
636 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
637 -- it was successful, or 'False' otherwise.
638 tryPutMVar  :: MVar a -> a -> IO Bool
639 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
640     case tryPutMVar# mvar# x s# of
641         (# s, 0# #) -> (# s, False #)
642         (# s, _  #) -> (# s, True #)
643
644 -- |Check whether a given 'MVar' is empty.
645 --
646 -- Notice that the boolean value returned  is just a snapshot of
647 -- the state of the MVar. By the time you get to react on its result,
648 -- the MVar may have been filled (or emptied) - so be extremely
649 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
650 isEmptyMVar :: MVar a -> IO Bool
651 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
652     case isEmptyMVar# mv# s# of
653         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
654
655 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
656 -- "System.Mem.Weak" for more about finalizers.
657 addMVarFinalizer :: MVar a -> IO () -> IO ()
658 addMVarFinalizer (MVar m) finalizer = 
659   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
660
661 withMVar :: MVar a -> (a -> IO b) -> IO b
662 withMVar m io = 
663   block $ do
664     a <- takeMVar m
665     b <- catchAny (unblock (io a))
666             (\e -> do putMVar m a; throw e)
667     putMVar m a
668     return b
669 \end{code}
670
671
672 %************************************************************************
673 %*                                                                      *
674 \subsection{Thread waiting}
675 %*                                                                      *
676 %************************************************************************
677
678 \begin{code}
679 #ifdef mingw32_HOST_OS
680
681 -- Note: threadWaitRead and threadWaitWrite aren't really functional
682 -- on Win32, but left in there because lib code (still) uses them (the manner
683 -- in which they're used doesn't cause problems on a Win32 platform though.)
684
685 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
686 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
687   IO $ \s -> case asyncRead# fd isSock len buf s of 
688                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
689
690 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
691 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
692   IO $ \s -> case asyncWrite# fd isSock len buf s of 
693                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
694
695 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
696 asyncDoProc (FunPtr proc) (Ptr param) = 
697     -- the 'length' value is ignored; simplifies implementation of
698     -- the async*# primops to have them all return the same result.
699   IO $ \s -> case asyncDoProc# proc param s  of 
700                (# s, len#, err# #) -> (# s, I# err# #)
701
702 -- to aid the use of these primops by the IO Handle implementation,
703 -- provide the following convenience funs:
704
705 -- this better be a pinned byte array!
706 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
707 asyncReadBA fd isSock len off bufB = 
708   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
709   
710 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
711 asyncWriteBA fd isSock len off bufB = 
712   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
713
714 #endif
715
716 -- -----------------------------------------------------------------------------
717 -- Thread IO API
718
719 -- | Block the current thread until data is available to read on the
720 -- given file descriptor (GHC only).
721 threadWaitRead :: Fd -> IO ()
722 threadWaitRead fd
723 #ifndef mingw32_HOST_OS
724   | threaded  = waitForReadEvent fd
725 #endif
726   | otherwise = IO $ \s -> 
727         case fromIntegral fd of { I# fd# ->
728         case waitRead# fd# s of { s -> (# s, () #)
729         }}
730
731 -- | Block the current thread until data can be written to the
732 -- given file descriptor (GHC only).
733 threadWaitWrite :: Fd -> IO ()
734 threadWaitWrite fd
735 #ifndef mingw32_HOST_OS
736   | threaded  = waitForWriteEvent fd
737 #endif
738   | otherwise = IO $ \s -> 
739         case fromIntegral fd of { I# fd# ->
740         case waitWrite# fd# s of { s -> (# s, () #)
741         }}
742
743 -- | Suspends the current thread for a given number of microseconds
744 -- (GHC only).
745 --
746 -- There is no guarantee that the thread will be rescheduled promptly
747 -- when the delay has expired, but the thread will never continue to
748 -- run /earlier/ than specified.
749 --
750 threadDelay :: Int -> IO ()
751 threadDelay time
752   | threaded  = waitForDelayEvent time
753   | otherwise = IO $ \s -> 
754         case fromIntegral time of { I# time# ->
755         case delay# time# s of { s -> (# s, () #)
756         }}
757
758
759 -- | Set the value of returned TVar to True after a given number of
760 -- microseconds. The caveats associated with threadDelay also apply.
761 --
762 registerDelay :: Int -> IO (TVar Bool)
763 registerDelay usecs 
764   | threaded = waitForDelayEventSTM usecs
765   | otherwise = error "registerDelay: requires -threaded"
766
767 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
768
769 waitForDelayEvent :: Int -> IO ()
770 waitForDelayEvent usecs = do
771   m <- newEmptyMVar
772   target <- calculateTarget usecs
773   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
774   prodServiceThread
775   takeMVar m
776
777 -- Delays for use in STM
778 waitForDelayEventSTM :: Int -> IO (TVar Bool)
779 waitForDelayEventSTM usecs = do
780    t <- atomically $ newTVar False
781    target <- calculateTarget usecs
782    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
783    prodServiceThread
784    return t  
785     
786 calculateTarget :: Int -> IO USecs
787 calculateTarget usecs = do
788     now <- getUSecOfDay
789     return $ now + (fromIntegral usecs)
790
791
792 -- ----------------------------------------------------------------------------
793 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
794
795 -- In the threaded RTS, we employ a single IO Manager thread to wait
796 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
797 -- and delays (threadDelay).  
798 --
799 -- We can do this because in the threaded RTS the IO Manager can make
800 -- a non-blocking call to select(), so we don't have to do select() in
801 -- the scheduler as we have to in the non-threaded RTS.  We get performance
802 -- benefits from doing it this way, because we only have to restart the select()
803 -- when a new request arrives, rather than doing one select() each time
804 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
805 -- by not having to check for completed IO requests.
806
807 -- Issues, possible problems:
808 --
809 --      - we might want bound threads to just do the blocking
810 --        operation rather than communicating with the IO manager
811 --        thread.  This would prevent simgle-threaded programs which do
812 --        IO from requiring multiple OS threads.  However, it would also
813 --        prevent bound threads waiting on IO from being killed or sent
814 --        exceptions.
815 --
816 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
817 --        I couldn't repeat this.
818 --
819 --      - How do we handle signal delivery in the multithreaded RTS?
820 --
821 --      - forkProcess will kill the IO manager thread.  Let's just
822 --        hope we don't need to do any blocking IO between fork & exec.
823
824 #ifndef mingw32_HOST_OS
825 data IOReq
826   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
827   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
828 #endif
829
830 data DelayReq
831   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
832   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
833
834 #ifndef mingw32_HOST_OS
835 pendingEvents :: IORef [IOReq]
836 #endif
837 pendingDelays :: IORef [DelayReq]
838         -- could use a strict list or array here
839 {-# NOINLINE pendingEvents #-}
840 {-# NOINLINE pendingDelays #-}
841 (pendingEvents,pendingDelays) = unsafePerformIO $ do
842   startIOManagerThread
843   reqs <- newIORef []
844   dels <- newIORef []
845   return (reqs, dels)
846         -- the first time we schedule an IO request, the service thread
847         -- will be created (cool, huh?)
848
849 ensureIOManagerIsRunning :: IO ()
850 ensureIOManagerIsRunning 
851   | threaded  = seq pendingEvents $ return ()
852   | otherwise = return ()
853
854 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
855 insertDelay d [] = [d]
856 insertDelay d1 ds@(d2 : rest)
857   | delayTime d1 <= delayTime d2 = d1 : ds
858   | otherwise                    = d2 : insertDelay d1 rest
859
860 delayTime :: DelayReq -> USecs
861 delayTime (Delay t _) = t
862 delayTime (DelaySTM t _) = t
863
864 type USecs = Word64
865
866 -- XXX: move into GHC.IOBase from Data.IORef?
867 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
868 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
869
870 foreign import ccall unsafe "getUSecOfDay" 
871   getUSecOfDay :: IO USecs
872
873 prodding :: IORef Bool
874 {-# NOINLINE prodding #-}
875 prodding = unsafePerformIO (newIORef False)
876
877 prodServiceThread :: IO ()
878 prodServiceThread = do
879   was_set <- atomicModifyIORef prodding (\a -> (True,a))
880   if (not (was_set)) then wakeupIOManager else return ()
881
882 #ifdef mingw32_HOST_OS
883 -- ----------------------------------------------------------------------------
884 -- Windows IO manager thread
885
886 startIOManagerThread :: IO ()
887 startIOManagerThread = do
888   wakeup <- c_getIOManagerEvent
889   forkIO $ service_loop wakeup []
890   return ()
891
892 service_loop :: HANDLE          -- read end of pipe
893              -> [DelayReq]      -- current delay requests
894              -> IO ()
895
896 service_loop wakeup old_delays = do
897   -- pick up new delay requests
898   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
899   let  delays = foldr insertDelay old_delays new_delays
900
901   now <- getUSecOfDay
902   (delays', timeout) <- getDelay now delays
903
904   r <- c_WaitForSingleObject wakeup timeout
905   case r of
906     0xffffffff -> do c_maperrno; throwErrno "service_loop"
907     0 -> do
908         r <- c_readIOManagerEvent
909         exit <- 
910               case r of
911                 _ | r == io_MANAGER_WAKEUP -> return False
912                 _ | r == io_MANAGER_DIE    -> return True
913                 0 -> return False -- spurious wakeup
914                 r -> do start_console_handler (r `shiftR` 1); return False
915         if exit
916           then return ()
917           else service_cont wakeup delays'
918
919     _other -> service_cont wakeup delays' -- probably timeout        
920
921 service_cont wakeup delays = do
922   atomicModifyIORef prodding (\_ -> (False,False))
923   service_loop wakeup delays
924
925 -- must agree with rts/win32/ThrIOManager.c
926 io_MANAGER_WAKEUP = 0xffffffff :: Word32
927 io_MANAGER_DIE    = 0xfffffffe :: Word32
928
929 data ConsoleEvent
930  = ControlC
931  | Break
932  | Close
933     -- these are sent to Services only.
934  | Logoff
935  | Shutdown
936  deriving (Eq, Ord, Enum, Show, Read, Typeable)
937
938 start_console_handler :: Word32 -> IO ()
939 start_console_handler r =
940   case toWin32ConsoleEvent r of
941      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
942                     forkIO (handler x)
943                     return ()
944      Nothing -> return ()
945
946 toWin32ConsoleEvent ev = 
947    case ev of
948        0 {- CTRL_C_EVENT-}        -> Just ControlC
949        1 {- CTRL_BREAK_EVENT-}    -> Just Break
950        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
951        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
952        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
953        _ -> Nothing
954
955 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
956 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
957
958 stick :: IORef HANDLE
959 {-# NOINLINE stick #-}
960 stick = unsafePerformIO (newIORef nullPtr)
961
962 wakeupIOManager = do 
963   hdl <- readIORef stick
964   c_sendIOManagerEvent io_MANAGER_WAKEUP
965
966 -- Walk the queue of pending delays, waking up any that have passed
967 -- and return the smallest delay to wait for.  The queue of pending
968 -- delays is kept ordered.
969 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
970 getDelay now [] = return ([], iNFINITE)
971 getDelay now all@(d : rest) 
972   = case d of
973      Delay time m | now >= time -> do
974         putMVar m ()
975         getDelay now rest
976      DelaySTM time t | now >= time -> do
977         atomically $ writeTVar t True
978         getDelay now rest
979      _otherwise ->
980         -- delay is in millisecs for WaitForSingleObject
981         let micro_seconds = delayTime d - now
982             milli_seconds = (micro_seconds + 999) `div` 1000
983         in return (all, fromIntegral milli_seconds)
984
985 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
986 -- available yet.  We should move some Win32 functionality down here,
987 -- maybe as part of the grand reorganisation of the base package...
988 type HANDLE       = Ptr ()
989 type DWORD        = Word32
990
991 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
992
993 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
994   c_getIOManagerEvent :: IO HANDLE
995
996 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
997   c_readIOManagerEvent :: IO Word32
998
999 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1000   c_sendIOManagerEvent :: Word32 -> IO ()
1001
1002 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1003    c_maperrno :: IO ()
1004
1005 foreign import stdcall "WaitForSingleObject"
1006    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1007
1008 #else
1009 -- ----------------------------------------------------------------------------
1010 -- Unix IO manager thread, using select()
1011
1012 startIOManagerThread :: IO ()
1013 startIOManagerThread = do
1014         allocaArray 2 $ \fds -> do
1015         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1016         rd_end <- peekElemOff fds 0
1017         wr_end <- peekElemOff fds 1
1018         writeIORef stick (fromIntegral wr_end)
1019         c_setIOManagerPipe wr_end
1020         forkIO $ do
1021             allocaBytes sizeofFdSet   $ \readfds -> do
1022             allocaBytes sizeofFdSet   $ \writefds -> do 
1023             allocaBytes sizeofTimeVal $ \timeval -> do
1024             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1025         return ()
1026
1027 service_loop
1028    :: Fd                -- listen to this for wakeup calls
1029    -> Ptr CFdSet
1030    -> Ptr CFdSet
1031    -> Ptr CTimeVal
1032    -> [IOReq]
1033    -> [DelayReq]
1034    -> IO ()
1035 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1036
1037   -- pick up new IO requests
1038   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1039   let reqs = new_reqs ++ old_reqs
1040
1041   -- pick up new delay requests
1042   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1043   let  delays = foldr insertDelay old_delays new_delays
1044
1045   -- build the FDSets for select()
1046   fdZero readfds
1047   fdZero writefds
1048   fdSet wakeup readfds
1049   maxfd <- buildFdSets 0 readfds writefds reqs
1050
1051   -- perform the select()
1052   let do_select delays = do
1053           -- check the current time and wake up any thread in
1054           -- threadDelay whose timeout has expired.  Also find the
1055           -- timeout value for the select() call.
1056           now <- getUSecOfDay
1057           (delays', timeout) <- getDelay now ptimeval delays
1058
1059           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1060                         nullPtr timeout
1061           if (res == -1)
1062              then do
1063                 err <- getErrno
1064                 case err of
1065                   _ | err == eINTR ->  do_select delays'
1066                         -- EINTR: just redo the select()
1067                   _ | err == eBADF ->  return (True, delays)
1068                         -- EBADF: one of the file descriptors is closed or bad,
1069                         -- we don't know which one, so wake everyone up.
1070                   _ | otherwise    ->  throwErrno "select"
1071                         -- otherwise (ENOMEM or EINVAL) something has gone
1072                         -- wrong; report the error.
1073              else
1074                 return (False,delays')
1075
1076   (wakeup_all,delays') <- do_select delays
1077
1078   exit <-
1079     if wakeup_all then return False
1080       else do
1081         b <- fdIsSet wakeup readfds
1082         if b == 0 
1083           then return False
1084           else alloca $ \p -> do 
1085                  c_read (fromIntegral wakeup) p 1; return ()
1086                  s <- peek p            
1087                  case s of
1088                   _ | s == io_MANAGER_WAKEUP -> return False
1089                   _ | s == io_MANAGER_DIE    -> return True
1090                   _ -> withMVar signalHandlerLock $ \_ -> do
1091                           handler_tbl <- peek handlers
1092                           sp <- peekElemOff handler_tbl (fromIntegral s)
1093                           io <- deRefStablePtr sp
1094                           forkIO io
1095                           return False
1096
1097   if exit then return () else do
1098
1099   atomicModifyIORef prodding (\_ -> (False,False))
1100
1101   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1102                          else completeRequests reqs readfds writefds []
1103
1104   service_loop wakeup readfds writefds ptimeval reqs' delays'
1105
1106 io_MANAGER_WAKEUP = 0xff :: CChar
1107 io_MANAGER_DIE    = 0xfe :: CChar
1108
1109 stick :: IORef Fd
1110 {-# NOINLINE stick #-}
1111 stick = unsafePerformIO (newIORef 0)
1112
1113 wakeupIOManager :: IO ()
1114 wakeupIOManager = do
1115   fd <- readIORef stick
1116   with io_MANAGER_WAKEUP $ \pbuf -> do 
1117     c_write (fromIntegral fd) pbuf 1; return ()
1118
1119 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1120 -- this race condition is #1922, although that bug was on Windows a similar
1121 -- bug also exists on Unix.
1122 signalHandlerLock :: MVar ()
1123 signalHandlerLock = unsafePerformIO (newMVar ())
1124
1125 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1126
1127 foreign import ccall "setIOManagerPipe"
1128   c_setIOManagerPipe :: CInt -> IO ()
1129
1130 -- -----------------------------------------------------------------------------
1131 -- IO requests
1132
1133 buildFdSets maxfd readfds writefds [] = return maxfd
1134 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1135   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1136   | otherwise        =  do
1137         fdSet fd readfds
1138         buildFdSets (max maxfd fd) readfds writefds reqs
1139 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1140   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1141   | otherwise        =  do
1142         fdSet fd writefds
1143         buildFdSets (max maxfd fd) readfds writefds reqs
1144
1145 completeRequests [] _ _ reqs' = return reqs'
1146 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1147   b <- fdIsSet fd readfds
1148   if b /= 0
1149     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1150     else completeRequests reqs readfds writefds (Read fd m : reqs')
1151 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1152   b <- fdIsSet fd writefds
1153   if b /= 0
1154     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1155     else completeRequests reqs readfds writefds (Write fd m : reqs')
1156
1157 wakeupAll [] = return ()
1158 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
1159 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1160
1161 waitForReadEvent :: Fd -> IO ()
1162 waitForReadEvent fd = do
1163   m <- newEmptyMVar
1164   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1165   prodServiceThread
1166   takeMVar m
1167
1168 waitForWriteEvent :: Fd -> IO ()
1169 waitForWriteEvent fd = do
1170   m <- newEmptyMVar
1171   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1172   prodServiceThread
1173   takeMVar m
1174
1175 -- -----------------------------------------------------------------------------
1176 -- Delays
1177
1178 -- Walk the queue of pending delays, waking up any that have passed
1179 -- and return the smallest delay to wait for.  The queue of pending
1180 -- delays is kept ordered.
1181 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1182 getDelay now ptimeval [] = return ([],nullPtr)
1183 getDelay now ptimeval all@(d : rest) 
1184   = case d of
1185      Delay time m | now >= time -> do
1186         putMVar m ()
1187         getDelay now ptimeval rest
1188      DelaySTM time t | now >= time -> do
1189         atomically $ writeTVar t True
1190         getDelay now ptimeval rest
1191      _otherwise -> do
1192         setTimevalTicks ptimeval (delayTime d - now)
1193         return (all,ptimeval)
1194
1195 newtype CTimeVal = CTimeVal ()
1196
1197 foreign import ccall unsafe "sizeofTimeVal"
1198   sizeofTimeVal :: Int
1199
1200 foreign import ccall unsafe "setTimevalTicks" 
1201   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1202
1203 {- 
1204   On Win32 we're going to have a single Pipe, and a
1205   waitForSingleObject with the delay time.  For signals, we send a
1206   byte down the pipe just like on Unix.
1207 -}
1208
1209 -- ----------------------------------------------------------------------------
1210 -- select() interface
1211
1212 -- ToDo: move to System.Posix.Internals?
1213
1214 newtype CFdSet = CFdSet ()
1215
1216 foreign import ccall safe "select"
1217   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1218            -> IO CInt
1219
1220 foreign import ccall unsafe "hsFD_SETSIZE"
1221   c_fD_SETSIZE :: CInt
1222
1223 fD_SETSIZE :: Fd
1224 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1225
1226 foreign import ccall unsafe "hsFD_CLR"
1227   c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1228
1229 fdClr :: Fd -> Ptr CFdSet -> IO ()
1230 fdClr (Fd fd) fdset = c_fdClr fd fdset
1231
1232 foreign import ccall unsafe "hsFD_ISSET"
1233   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1234
1235 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1236 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1237
1238 foreign import ccall unsafe "hsFD_SET"
1239   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1240
1241 fdSet :: Fd -> Ptr CFdSet -> IO ()
1242 fdSet (Fd fd) fdset = c_fdSet fd fdset
1243
1244 foreign import ccall unsafe "hsFD_ZERO"
1245   fdZero :: Ptr CFdSet -> IO ()
1246
1247 foreign import ccall unsafe "sizeof_fd_set"
1248   sizeofFdSet :: Int
1249
1250 #endif
1251
1252 \end{code}