docs: mention that killThread on a completed thread is a no-op
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , yield         -- :: IO ()
41         , labelThread   -- :: ThreadId -> String -> IO ()
42
43         , ThreadStatus(..), BlockReason(..)
44         , threadStatus  -- :: ThreadId -> IO ThreadStatus
45
46         -- * Waiting
47         , threadDelay           -- :: Int -> IO ()
48         , registerDelay         -- :: Int -> IO (TVar Bool)
49         , threadWaitRead        -- :: Int -> IO ()
50         , threadWaitWrite       -- :: Int -> IO ()
51
52         -- * MVars
53         , MVar(..)
54         , newMVar       -- :: a -> IO (MVar a)
55         , newEmptyMVar  -- :: IO (MVar a)
56         , takeMVar      -- :: MVar a -> IO a
57         , putMVar       -- :: MVar a -> a -> IO ()
58         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
59         , tryPutMVar    -- :: MVar a -> a -> IO Bool
60         , isEmptyMVar   -- :: MVar a -> IO Bool
61         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
62
63         -- * TVars
64         , STM(..)
65         , atomically    -- :: STM a -> IO a
66         , retry         -- :: STM a
67         , orElse        -- :: STM a -> STM a -> STM a
68         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
69         , alwaysSucceeds -- :: STM a -> STM ()
70         , always        -- :: STM Bool -> STM ()
71         , TVar(..)
72         , newTVar       -- :: a -> STM (TVar a)
73         , newTVarIO     -- :: a -> STM (TVar a)
74         , readTVar      -- :: TVar a -> STM a
75         , writeTVar     -- :: a -> TVar a -> STM ()
76         , unsafeIOToSTM -- :: IO a -> STM a
77
78         -- * Miscellaneous
79 #ifdef mingw32_HOST_OS
80         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
81         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
83
84         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
85         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
86 #endif
87
88 #ifndef mingw32_HOST_OS
89         , signalHandlerLock
90 #endif
91
92         , ensureIOManagerIsRunning
93
94 #ifdef mingw32_HOST_OS
95         , ConsoleEvent(..)
96         , win32ConsoleHandler
97         , toWin32ConsoleEvent
98 #endif
99         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
100         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
101
102         , reportError, reportStackOverflow
103         ) where
104
105 import System.Posix.Types
106 #ifndef mingw32_HOST_OS
107 import System.Posix.Internals
108 #endif
109 import Foreign
110 import Foreign.C
111
112 import Data.Maybe
113
114 import GHC.Base
115 import {-# SOURCE #-} GHC.Handle
116 import GHC.IOBase
117 import GHC.Num          ( Num(..) )
118 import GHC.Real         ( fromIntegral )
119 #ifdef mingw32_HOST_OS
120 import GHC.Real         ( div )
121 import GHC.Ptr          ( plusPtr, FunPtr(..) )
122 #endif
123 #ifdef mingw32_HOST_OS
124 import GHC.Read         ( Read )
125 import GHC.Enum         ( Enum )
126 #endif
127 import GHC.Exception    ( SomeException(..), throw )
128 import GHC.Pack         ( packCString# )
129 import GHC.Ptr          ( Ptr(..) )
130 import GHC.STRef
131 import GHC.Show         ( Show(..), showString )
132 import Data.Typeable
133 import GHC.Err
134
135 infixr 0 `par`, `pseq`
136 \end{code}
137
138 %************************************************************************
139 %*                                                                      *
140 \subsection{@ThreadId@, @par@, and @fork@}
141 %*                                                                      *
142 %************************************************************************
143
144 \begin{code}
145 data ThreadId = ThreadId ThreadId# deriving( Typeable )
146 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
147 -- But since ThreadId# is unlifted, the Weak type must use open
148 -- type variables.
149 {- ^
150 A 'ThreadId' is an abstract type representing a handle to a thread.
151 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
152 the 'Ord' instance implements an arbitrary total ordering over
153 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
154 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
155 useful when debugging or diagnosing the behaviour of a concurrent
156 program.
157
158 /Note/: in GHC, if you have a 'ThreadId', you essentially have
159 a pointer to the thread itself.  This means the thread itself can\'t be
160 garbage collected until you drop the 'ThreadId'.
161 This misfeature will hopefully be corrected at a later date.
162
163 /Note/: Hugs does not provide any operations on other threads;
164 it defines 'ThreadId' as a synonym for ().
165 -}
166
167 instance Show ThreadId where
168    showsPrec d t = 
169         showString "ThreadId " . 
170         showsPrec d (getThreadId (id2TSO t))
171
172 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
173
174 id2TSO :: ThreadId -> ThreadId#
175 id2TSO (ThreadId t) = t
176
177 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
178 -- Returns -1, 0, 1
179
180 cmpThread :: ThreadId -> ThreadId -> Ordering
181 cmpThread t1 t2 = 
182    case cmp_thread (id2TSO t1) (id2TSO t2) of
183       -1 -> LT
184       0  -> EQ
185       _  -> GT -- must be 1
186
187 instance Eq ThreadId where
188    t1 == t2 = 
189       case t1 `cmpThread` t2 of
190          EQ -> True
191          _  -> False
192
193 instance Ord ThreadId where
194    compare = cmpThread
195
196 {- |
197 Sparks off a new thread to run the 'IO' computation passed as the
198 first argument, and returns the 'ThreadId' of the newly created
199 thread.
200
201 The new thread will be a lightweight thread; if you want to use a foreign
202 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
203
204 GHC note: the new thread inherits the /blocked/ state of the parent 
205 (see 'Control.Exception.block').
206 -}
207 forkIO :: IO () -> IO ThreadId
208 forkIO action = IO $ \ s -> 
209    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
210  where
211   action_plus = catchException action childHandler
212
213 {- |
214 Like 'forkIO', but lets you specify on which CPU the thread is
215 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
216 will stay on the same CPU for its entire lifetime (`forkIO` threads
217 can migrate between CPUs according to the scheduling policy).
218 `forkOnIO` is useful for overriding the scheduling policy when you
219 know in advance how best to distribute the threads.
220
221 The `Int` argument specifies the CPU number; it is interpreted modulo
222 'numCapabilities' (note that it actually specifies a capability number
223 rather than a CPU number, but to a first approximation the two are
224 equivalent).
225 -}
226 forkOnIO :: Int -> IO () -> IO ThreadId
227 forkOnIO (I# cpu) action = IO $ \ s -> 
228    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
229  where
230   action_plus = catchException action childHandler
231
232 -- | the value passed to the @+RTS -N@ flag.  This is the number of
233 -- Haskell threads that can run truly simultaneously at any given
234 -- time, and is typically set to the number of physical CPU cores on
235 -- the machine.
236 numCapabilities :: Int
237 numCapabilities = unsafePerformIO $  do 
238                     n <- peek n_capabilities
239                     return (fromIntegral n)
240
241 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
242
243 childHandler :: SomeException -> IO ()
244 childHandler err = catchException (real_handler err) childHandler
245
246 real_handler :: SomeException -> IO ()
247 real_handler se@(SomeException ex) =
248   -- ignore thread GC and killThread exceptions:
249   case cast ex of
250   Just BlockedOnDeadMVar                -> return ()
251   _ -> case cast ex of
252        Just BlockedIndefinitely         -> return ()
253        _ -> case cast ex of
254             Just ThreadKilled           -> return ()
255             _ -> case cast ex of
256                  -- report all others:
257                  Just StackOverflow     -> reportStackOverflow
258                  _                      -> reportError se
259
260 {- | 'killThread' terminates the given thread (GHC only).
261 Any work already done by the thread isn\'t
262 lost: the computation is suspended until required by another thread.
263 The memory used by the thread will be garbage collected if it isn\'t
264 referenced from anywhere.  The 'killThread' function is defined in
265 terms of 'throwTo':
266
267 > killThread tid = throwTo tid ThreadKilled
268
269 Killthread is a no-op if the target thread has already completed.
270 -}
271 killThread :: ThreadId -> IO ()
272 killThread tid = throwTo tid ThreadKilled
273
274 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
275
276 'throwTo' does not return until the exception has been raised in the
277 target thread. 
278 The calling thread can thus be certain that the target
279 thread has received the exception.  This is a useful property to know
280 when dealing with race conditions: eg. if there are two threads that
281 can kill each other, it is guaranteed that only one of the threads
282 will get to kill the other.
283
284 If the target thread is currently making a foreign call, then the
285 exception will not be raised (and hence 'throwTo' will not return)
286 until the call has completed.  This is the case regardless of whether
287 the call is inside a 'block' or not.
288
289 Important note: the behaviour of 'throwTo' differs from that described in
290 the paper \"Asynchronous exceptions in Haskell\"
291 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
292 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
293 a more synchronous design in which 'throwTo' does not return until the exception
294 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
295 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
296 the paper).
297
298 There is currently no guarantee that the exception delivered by 'throwTo' will be
299 delivered at the first possible opportunity.  In particular, if a thread may 
300 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
301 a pending 'throwTo'.  This is arguably undesirable behaviour.
302
303  -}
304 throwTo :: Exception e => ThreadId -> e -> IO ()
305 throwTo (ThreadId tid) ex = IO $ \ s ->
306    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
307
308 -- | Returns the 'ThreadId' of the calling thread (GHC only).
309 myThreadId :: IO ThreadId
310 myThreadId = IO $ \s ->
311    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
312
313
314 -- |The 'yield' action allows (forces, in a co-operative multitasking
315 -- implementation) a context-switch to any other currently runnable
316 -- threads (if any), and is occasionally useful when implementing
317 -- concurrency abstractions.
318 yield :: IO ()
319 yield = IO $ \s -> 
320    case (yield# s) of s1 -> (# s1, () #)
321
322 {- | 'labelThread' stores a string as identifier for this thread if
323 you built a RTS with debugging support. This identifier will be used in
324 the debugging output to make distinction of different threads easier
325 (otherwise you only have the thread state object\'s address in the heap).
326
327 Other applications like the graphical Concurrent Haskell Debugger
328 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
329 'labelThread' for their purposes as well.
330 -}
331
332 labelThread :: ThreadId -> String -> IO ()
333 labelThread (ThreadId t) str = IO $ \ s ->
334    let ps  = packCString# str
335        adr = byteArrayContents# ps in
336      case (labelThread# t adr s) of s1 -> (# s1, () #)
337
338 --      Nota Bene: 'pseq' used to be 'seq'
339 --                 but 'seq' is now defined in PrelGHC
340 --
341 -- "pseq" is defined a bit weirdly (see below)
342 --
343 -- The reason for the strange "lazy" call is that
344 -- it fools the compiler into thinking that pseq  and par are non-strict in
345 -- their second argument (even if it inlines pseq at the call site).
346 -- If it thinks pseq is strict in "y", then it often evaluates
347 -- "y" before "x", which is totally wrong.  
348
349 {-# INLINE pseq  #-}
350 pseq :: a -> b -> b
351 pseq  x y = x `seq` lazy y
352
353 {-# INLINE par  #-}
354 par :: a -> b -> b
355 par  x y = case (par# x) of { _ -> lazy y }
356
357
358 data BlockReason
359   = BlockedOnMVar
360         -- ^blocked on on 'MVar'
361   | BlockedOnBlackHole
362         -- ^blocked on a computation in progress by another thread
363   | BlockedOnException
364         -- ^blocked in 'throwTo'
365   | BlockedOnSTM
366         -- ^blocked in 'retry' in an STM transaction
367   | BlockedOnForeignCall
368         -- ^currently in a foreign call
369   | BlockedOnOther
370         -- ^blocked on some other resource.  Without @-threaded@,
371         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
372         -- they show up as 'BlockedOnMVar'.
373   deriving (Eq,Ord,Show)
374
375 -- | The current status of a thread
376 data ThreadStatus
377   = ThreadRunning
378         -- ^the thread is currently runnable or running
379   | ThreadFinished
380         -- ^the thread has finished
381   | ThreadBlocked  BlockReason
382         -- ^the thread is blocked on some resource
383   | ThreadDied
384         -- ^the thread received an uncaught exception
385   deriving (Eq,Ord,Show)
386
387 threadStatus :: ThreadId -> IO ThreadStatus
388 threadStatus (ThreadId t) = IO $ \s ->
389    case threadStatus# t s of
390      (# s', stat #) -> (# s', mk_stat (I# stat) #)
391    where
392         -- NB. keep these in sync with includes/Constants.h
393      mk_stat 0  = ThreadRunning
394      mk_stat 1  = ThreadBlocked BlockedOnMVar
395      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
396      mk_stat 3  = ThreadBlocked BlockedOnException
397      mk_stat 7  = ThreadBlocked BlockedOnSTM
398      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
399      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
400      mk_stat 16 = ThreadFinished
401      mk_stat 17 = ThreadDied
402      mk_stat _  = ThreadBlocked BlockedOnOther
403 \end{code}
404
405
406 %************************************************************************
407 %*                                                                      *
408 \subsection[stm]{Transactional heap operations}
409 %*                                                                      *
410 %************************************************************************
411
412 TVars are shared memory locations which support atomic memory
413 transactions.
414
415 \begin{code}
416 -- |A monad supporting atomic memory transactions.
417 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
418
419 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
420 unSTM (STM a) = a
421
422 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
423
424 instance  Functor STM where
425    fmap f x = x >>= (return . f)
426
427 instance  Monad STM  where
428     {-# INLINE return #-}
429     {-# INLINE (>>)   #-}
430     {-# INLINE (>>=)  #-}
431     m >> k      = thenSTM m k
432     return x    = returnSTM x
433     m >>= k     = bindSTM m k
434
435 bindSTM :: STM a -> (a -> STM b) -> STM b
436 bindSTM (STM m) k = STM ( \s ->
437   case m s of 
438     (# new_s, a #) -> unSTM (k a) new_s
439   )
440
441 thenSTM :: STM a -> STM b -> STM b
442 thenSTM (STM m) k = STM ( \s ->
443   case m s of 
444     (# new_s, _ #) -> unSTM k new_s
445   )
446
447 returnSTM :: a -> STM a
448 returnSTM x = STM (\s -> (# s, x #))
449
450 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
451 -- dangerous thing to do.  
452 --
453 --   * The STM implementation will often run transactions multiple
454 --     times, so you need to be prepared for this if your IO has any
455 --     side effects.
456 --
457 --   * The STM implementation will abort transactions that are known to
458 --     be invalid and need to be restarted.  This may happen in the middle
459 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
460 --     that need releasing (exception handlers are ignored when aborting
461 --     the transaction).  That includes doing any IO using Handles, for
462 --     example.  Getting this wrong will probably lead to random deadlocks.
463 --
464 --   * The transaction may have seen an inconsistent view of memory when
465 --     the IO runs.  Invariants that you expect to be true throughout
466 --     your program may not be true inside a transaction, due to the
467 --     way transactions are implemented.  Normally this wouldn't be visible
468 --     to the programmer, but using `unsafeIOToSTM` can expose it.
469 --
470 unsafeIOToSTM :: IO a -> STM a
471 unsafeIOToSTM (IO m) = STM m
472
473 -- |Perform a series of STM actions atomically.
474 --
475 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
476 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
477 -- this would effectively allow a transaction inside a transaction, depending
478 -- on exactly when the thunk is evaluated.)
479 --
480 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
481 -- and which allows top-level TVars to be allocated.
482
483 atomically :: STM a -> IO a
484 atomically (STM m) = IO (\s -> (atomically# m) s )
485
486 -- |Retry execution of the current memory transaction because it has seen
487 -- values in TVars which mean that it should not continue (e.g. the TVars
488 -- represent a shared buffer that is now empty).  The implementation may
489 -- block the thread until one of the TVars that it has read from has been
490 -- udpated. (GHC only)
491 retry :: STM a
492 retry = STM $ \s# -> retry# s#
493
494 -- |Compose two alternative STM actions (GHC only).  If the first action
495 -- completes without retrying then it forms the result of the orElse.
496 -- Otherwise, if the first action retries, then the second action is
497 -- tried in its place.  If both actions retry then the orElse as a
498 -- whole retries.
499 orElse :: STM a -> STM a -> STM a
500 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
501
502 -- |Exception handling within STM actions.
503 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
504 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
505
506 -- | Low-level primitive on which always and alwaysSucceeds are built.
507 -- checkInv differs form these in that (i) the invariant is not 
508 -- checked when checkInv is called, only at the end of this and
509 -- subsequent transcations, (ii) the invariant failure is indicated
510 -- by raising an exception.
511 checkInv :: STM a -> STM ()
512 checkInv (STM m) = STM (\s -> (check# m) s)
513
514 -- | alwaysSucceeds adds a new invariant that must be true when passed
515 -- to alwaysSucceeds, at the end of the current transaction, and at
516 -- the end of every subsequent transaction.  If it fails at any
517 -- of those points then the transaction violating it is aborted
518 -- and the exception raised by the invariant is propagated.
519 alwaysSucceeds :: STM a -> STM ()
520 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
521                       checkInv i
522
523 -- | always is a variant of alwaysSucceeds in which the invariant is
524 -- expressed as an STM Bool action that must return True.  Returning
525 -- False or raising an exception are both treated as invariant failures.
526 always :: STM Bool -> STM ()
527 always i = alwaysSucceeds ( do v <- i
528                                if (v) then return () else ( error "Transacional invariant violation" ) )
529
530 -- |Shared memory locations that support atomic memory transactions.
531 data TVar a = TVar (TVar# RealWorld a)
532
533 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
534
535 instance Eq (TVar a) where
536         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
537
538 -- |Create a new TVar holding a value supplied
539 newTVar :: a -> STM (TVar a)
540 newTVar val = STM $ \s1# ->
541     case newTVar# val s1# of
542          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
543
544 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
545 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
546 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
547 -- possible.
548 newTVarIO :: a -> IO (TVar a)
549 newTVarIO val = IO $ \s1# ->
550     case newTVar# val s1# of
551          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
552
553 -- |Return the current value stored in a TVar
554 readTVar :: TVar a -> STM a
555 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
556
557 -- |Write the supplied value into a TVar
558 writeTVar :: TVar a -> a -> STM ()
559 writeTVar (TVar tvar#) val = STM $ \s1# ->
560     case writeTVar# tvar# val s1# of
561          s2# -> (# s2#, () #)
562   
563 \end{code}
564
565 %************************************************************************
566 %*                                                                      *
567 \subsection[mvars]{M-Structures}
568 %*                                                                      *
569 %************************************************************************
570
571 M-Vars are rendezvous points for concurrent threads.  They begin
572 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
573 is written, a single blocked thread may be freed.  Reading an M-Var
574 toggles its state from full back to empty.  Therefore, any value
575 written to an M-Var may only be read once.  Multiple reads and writes
576 are allowed, but there must be at least one read between any two
577 writes.
578
579 \begin{code}
580 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
581
582 -- |Create an 'MVar' which is initially empty.
583 newEmptyMVar  :: IO (MVar a)
584 newEmptyMVar = IO $ \ s# ->
585     case newMVar# s# of
586          (# s2#, svar# #) -> (# s2#, MVar svar# #)
587
588 -- |Create an 'MVar' which contains the supplied value.
589 newMVar :: a -> IO (MVar a)
590 newMVar value =
591     newEmptyMVar        >>= \ mvar ->
592     putMVar mvar value  >>
593     return mvar
594
595 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
596 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
597 -- the 'MVar' is left empty.
598 -- 
599 -- There are two further important properties of 'takeMVar':
600 --
601 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
602 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
603 --     only one thread will be woken up.  The runtime guarantees that
604 --     the woken thread completes its 'takeMVar' operation.
605 --
606 --   * When multiple threads are blocked on an 'MVar', they are
607 --     woken up in FIFO order.  This is useful for providing
608 --     fairness properties of abstractions built using 'MVar's.
609 --
610 takeMVar :: MVar a -> IO a
611 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
612
613 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
614 -- 'putMVar' will wait until it becomes empty.
615 --
616 -- There are two further important properties of 'putMVar':
617 --
618 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
619 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
620 --     only one thread will be woken up.  The runtime guarantees that
621 --     the woken thread completes its 'putMVar' operation.
622 --
623 --   * When multiple threads are blocked on an 'MVar', they are
624 --     woken up in FIFO order.  This is useful for providing
625 --     fairness properties of abstractions built using 'MVar's.
626 --
627 putMVar  :: MVar a -> a -> IO ()
628 putMVar (MVar mvar#) x = IO $ \ s# ->
629     case putMVar# mvar# x s# of
630         s2# -> (# s2#, () #)
631
632 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
633 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
634 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
635 -- the 'MVar' is left empty.
636 tryTakeMVar :: MVar a -> IO (Maybe a)
637 tryTakeMVar (MVar m) = IO $ \ s ->
638     case tryTakeMVar# m s of
639         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
640         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
641
642 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
643 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
644 -- it was successful, or 'False' otherwise.
645 tryPutMVar  :: MVar a -> a -> IO Bool
646 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
647     case tryPutMVar# mvar# x s# of
648         (# s, 0# #) -> (# s, False #)
649         (# s, _  #) -> (# s, True #)
650
651 -- |Check whether a given 'MVar' is empty.
652 --
653 -- Notice that the boolean value returned  is just a snapshot of
654 -- the state of the MVar. By the time you get to react on its result,
655 -- the MVar may have been filled (or emptied) - so be extremely
656 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
657 isEmptyMVar :: MVar a -> IO Bool
658 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
659     case isEmptyMVar# mv# s# of
660         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
661
662 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
663 -- "System.Mem.Weak" for more about finalizers.
664 addMVarFinalizer :: MVar a -> IO () -> IO ()
665 addMVarFinalizer (MVar m) finalizer = 
666   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
667
668 withMVar :: MVar a -> (a -> IO b) -> IO b
669 withMVar m io = 
670   block $ do
671     a <- takeMVar m
672     b <- catchAny (unblock (io a))
673             (\e -> do putMVar m a; throw e)
674     putMVar m a
675     return b
676 \end{code}
677
678
679 %************************************************************************
680 %*                                                                      *
681 \subsection{Thread waiting}
682 %*                                                                      *
683 %************************************************************************
684
685 \begin{code}
686 #ifdef mingw32_HOST_OS
687
688 -- Note: threadWaitRead and threadWaitWrite aren't really functional
689 -- on Win32, but left in there because lib code (still) uses them (the manner
690 -- in which they're used doesn't cause problems on a Win32 platform though.)
691
692 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
693 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
694   IO $ \s -> case asyncRead# fd isSock len buf s of 
695                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
696
697 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
698 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
699   IO $ \s -> case asyncWrite# fd isSock len buf s of 
700                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
701
702 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
703 asyncDoProc (FunPtr proc) (Ptr param) = 
704     -- the 'length' value is ignored; simplifies implementation of
705     -- the async*# primops to have them all return the same result.
706   IO $ \s -> case asyncDoProc# proc param s  of 
707                (# s', _len#, err# #) -> (# s', I# err# #)
708
709 -- to aid the use of these primops by the IO Handle implementation,
710 -- provide the following convenience funs:
711
712 -- this better be a pinned byte array!
713 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
714 asyncReadBA fd isSock len off bufB = 
715   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
716   
717 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
718 asyncWriteBA fd isSock len off bufB = 
719   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
720
721 #endif
722
723 -- -----------------------------------------------------------------------------
724 -- Thread IO API
725
726 -- | Block the current thread until data is available to read on the
727 -- given file descriptor (GHC only).
728 threadWaitRead :: Fd -> IO ()
729 threadWaitRead fd
730 #ifndef mingw32_HOST_OS
731   | threaded  = waitForReadEvent fd
732 #endif
733   | otherwise = IO $ \s -> 
734         case fromIntegral fd of { I# fd# ->
735         case waitRead# fd# s of { s' -> (# s', () #)
736         }}
737
738 -- | Block the current thread until data can be written to the
739 -- given file descriptor (GHC only).
740 threadWaitWrite :: Fd -> IO ()
741 threadWaitWrite fd
742 #ifndef mingw32_HOST_OS
743   | threaded  = waitForWriteEvent fd
744 #endif
745   | otherwise = IO $ \s -> 
746         case fromIntegral fd of { I# fd# ->
747         case waitWrite# fd# s of { s' -> (# s', () #)
748         }}
749
750 -- | Suspends the current thread for a given number of microseconds
751 -- (GHC only).
752 --
753 -- There is no guarantee that the thread will be rescheduled promptly
754 -- when the delay has expired, but the thread will never continue to
755 -- run /earlier/ than specified.
756 --
757 threadDelay :: Int -> IO ()
758 threadDelay time
759   | threaded  = waitForDelayEvent time
760   | otherwise = IO $ \s -> 
761         case fromIntegral time of { I# time# ->
762         case delay# time# s of { s' -> (# s', () #)
763         }}
764
765
766 -- | Set the value of returned TVar to True after a given number of
767 -- microseconds. The caveats associated with threadDelay also apply.
768 --
769 registerDelay :: Int -> IO (TVar Bool)
770 registerDelay usecs 
771   | threaded = waitForDelayEventSTM usecs
772   | otherwise = error "registerDelay: requires -threaded"
773
774 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
775
776 waitForDelayEvent :: Int -> IO ()
777 waitForDelayEvent usecs = do
778   m <- newEmptyMVar
779   target <- calculateTarget usecs
780   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
781   prodServiceThread
782   takeMVar m
783
784 -- Delays for use in STM
785 waitForDelayEventSTM :: Int -> IO (TVar Bool)
786 waitForDelayEventSTM usecs = do
787    t <- atomically $ newTVar False
788    target <- calculateTarget usecs
789    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
790    prodServiceThread
791    return t  
792     
793 calculateTarget :: Int -> IO USecs
794 calculateTarget usecs = do
795     now <- getUSecOfDay
796     return $ now + (fromIntegral usecs)
797
798
799 -- ----------------------------------------------------------------------------
800 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
801
802 -- In the threaded RTS, we employ a single IO Manager thread to wait
803 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
804 -- and delays (threadDelay).  
805 --
806 -- We can do this because in the threaded RTS the IO Manager can make
807 -- a non-blocking call to select(), so we don't have to do select() in
808 -- the scheduler as we have to in the non-threaded RTS.  We get performance
809 -- benefits from doing it this way, because we only have to restart the select()
810 -- when a new request arrives, rather than doing one select() each time
811 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
812 -- by not having to check for completed IO requests.
813
814 -- Issues, possible problems:
815 --
816 --      - we might want bound threads to just do the blocking
817 --        operation rather than communicating with the IO manager
818 --        thread.  This would prevent simgle-threaded programs which do
819 --        IO from requiring multiple OS threads.  However, it would also
820 --        prevent bound threads waiting on IO from being killed or sent
821 --        exceptions.
822 --
823 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
824 --        I couldn't repeat this.
825 --
826 --      - How do we handle signal delivery in the multithreaded RTS?
827 --
828 --      - forkProcess will kill the IO manager thread.  Let's just
829 --        hope we don't need to do any blocking IO between fork & exec.
830
831 #ifndef mingw32_HOST_OS
832 data IOReq
833   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
834   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
835 #endif
836
837 data DelayReq
838   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
839   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
840
841 #ifndef mingw32_HOST_OS
842 pendingEvents :: IORef [IOReq]
843 #endif
844 pendingDelays :: IORef [DelayReq]
845         -- could use a strict list or array here
846 {-# NOINLINE pendingEvents #-}
847 {-# NOINLINE pendingDelays #-}
848 (pendingEvents,pendingDelays) = unsafePerformIO $ do
849   startIOManagerThread
850   reqs <- newIORef []
851   dels <- newIORef []
852   return (reqs, dels)
853         -- the first time we schedule an IO request, the service thread
854         -- will be created (cool, huh?)
855
856 ensureIOManagerIsRunning :: IO ()
857 ensureIOManagerIsRunning 
858   | threaded  = seq pendingEvents $ return ()
859   | otherwise = return ()
860
861 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
862 insertDelay d [] = [d]
863 insertDelay d1 ds@(d2 : rest)
864   | delayTime d1 <= delayTime d2 = d1 : ds
865   | otherwise                    = d2 : insertDelay d1 rest
866
867 delayTime :: DelayReq -> USecs
868 delayTime (Delay t _) = t
869 delayTime (DelaySTM t _) = t
870
871 type USecs = Word64
872
873 -- XXX: move into GHC.IOBase from Data.IORef?
874 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
875 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
876
877 foreign import ccall unsafe "getUSecOfDay" 
878   getUSecOfDay :: IO USecs
879
880 prodding :: IORef Bool
881 {-# NOINLINE prodding #-}
882 prodding = unsafePerformIO (newIORef False)
883
884 prodServiceThread :: IO ()
885 prodServiceThread = do
886   was_set <- atomicModifyIORef prodding (\a -> (True,a))
887   if (not (was_set)) then wakeupIOManager else return ()
888
889 #ifdef mingw32_HOST_OS
890 -- ----------------------------------------------------------------------------
891 -- Windows IO manager thread
892
893 startIOManagerThread :: IO ()
894 startIOManagerThread = do
895   wakeup <- c_getIOManagerEvent
896   forkIO $ service_loop wakeup []
897   return ()
898
899 service_loop :: HANDLE          -- read end of pipe
900              -> [DelayReq]      -- current delay requests
901              -> IO ()
902
903 service_loop wakeup old_delays = do
904   -- pick up new delay requests
905   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
906   let  delays = foldr insertDelay old_delays new_delays
907
908   now <- getUSecOfDay
909   (delays', timeout) <- getDelay now delays
910
911   r <- c_WaitForSingleObject wakeup timeout
912   case r of
913     0xffffffff -> do c_maperrno; throwErrno "service_loop"
914     0 -> do
915         r2 <- c_readIOManagerEvent
916         exit <- 
917               case r2 of
918                 _ | r2 == io_MANAGER_WAKEUP -> return False
919                 _ | r2 == io_MANAGER_DIE    -> return True
920                 0 -> return False -- spurious wakeup
921                 _ -> do start_console_handler (r2 `shiftR` 1); return False
922         if exit
923           then return ()
924           else service_cont wakeup delays'
925
926     _other -> service_cont wakeup delays' -- probably timeout        
927
928 service_cont :: HANDLE -> [DelayReq] -> IO ()
929 service_cont wakeup delays = do
930   atomicModifyIORef prodding (\_ -> (False,False))
931   service_loop wakeup delays
932
933 -- must agree with rts/win32/ThrIOManager.c
934 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
935 io_MANAGER_WAKEUP = 0xffffffff
936 io_MANAGER_DIE    = 0xfffffffe
937
938 data ConsoleEvent
939  = ControlC
940  | Break
941  | Close
942     -- these are sent to Services only.
943  | Logoff
944  | Shutdown
945  deriving (Eq, Ord, Enum, Show, Read, Typeable)
946
947 start_console_handler :: Word32 -> IO ()
948 start_console_handler r =
949   case toWin32ConsoleEvent r of
950      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
951                     forkIO (handler x)
952                     return ()
953      Nothing -> return ()
954
955 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
956 toWin32ConsoleEvent ev = 
957    case ev of
958        0 {- CTRL_C_EVENT-}        -> Just ControlC
959        1 {- CTRL_BREAK_EVENT-}    -> Just Break
960        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
961        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
962        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
963        _ -> Nothing
964
965 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
966 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
967
968 -- XXX Is this actually needed?
969 stick :: IORef HANDLE
970 {-# NOINLINE stick #-}
971 stick = unsafePerformIO (newIORef nullPtr)
972
973 wakeupIOManager :: IO ()
974 wakeupIOManager = do 
975   _hdl <- readIORef stick
976   c_sendIOManagerEvent io_MANAGER_WAKEUP
977
978 -- Walk the queue of pending delays, waking up any that have passed
979 -- and return the smallest delay to wait for.  The queue of pending
980 -- delays is kept ordered.
981 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
982 getDelay _   [] = return ([], iNFINITE)
983 getDelay now all@(d : rest) 
984   = case d of
985      Delay time m | now >= time -> do
986         putMVar m ()
987         getDelay now rest
988      DelaySTM time t | now >= time -> do
989         atomically $ writeTVar t True
990         getDelay now rest
991      _otherwise ->
992         -- delay is in millisecs for WaitForSingleObject
993         let micro_seconds = delayTime d - now
994             milli_seconds = (micro_seconds + 999) `div` 1000
995         in return (all, fromIntegral milli_seconds)
996
997 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
998 -- available yet.  We should move some Win32 functionality down here,
999 -- maybe as part of the grand reorganisation of the base package...
1000 type HANDLE       = Ptr ()
1001 type DWORD        = Word32
1002
1003 iNFINITE :: DWORD
1004 iNFINITE = 0xFFFFFFFF -- urgh
1005
1006 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1007   c_getIOManagerEvent :: IO HANDLE
1008
1009 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1010   c_readIOManagerEvent :: IO Word32
1011
1012 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1013   c_sendIOManagerEvent :: Word32 -> IO ()
1014
1015 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1016    c_maperrno :: IO ()
1017
1018 foreign import stdcall "WaitForSingleObject"
1019    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1020
1021 #else
1022 -- ----------------------------------------------------------------------------
1023 -- Unix IO manager thread, using select()
1024
1025 startIOManagerThread :: IO ()
1026 startIOManagerThread = do
1027         allocaArray 2 $ \fds -> do
1028         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1029         rd_end <- peekElemOff fds 0
1030         wr_end <- peekElemOff fds 1
1031         writeIORef stick (fromIntegral wr_end)
1032         c_setIOManagerPipe wr_end
1033         forkIO $ do
1034             allocaBytes sizeofFdSet   $ \readfds -> do
1035             allocaBytes sizeofFdSet   $ \writefds -> do 
1036             allocaBytes sizeofTimeVal $ \timeval -> do
1037             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1038         return ()
1039
1040 service_loop
1041    :: Fd                -- listen to this for wakeup calls
1042    -> Ptr CFdSet
1043    -> Ptr CFdSet
1044    -> Ptr CTimeVal
1045    -> [IOReq]
1046    -> [DelayReq]
1047    -> IO ()
1048 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1049
1050   -- pick up new IO requests
1051   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1052   let reqs = new_reqs ++ old_reqs
1053
1054   -- pick up new delay requests
1055   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1056   let  delays0 = foldr insertDelay old_delays new_delays
1057
1058   -- build the FDSets for select()
1059   fdZero readfds
1060   fdZero writefds
1061   fdSet wakeup readfds
1062   maxfd <- buildFdSets 0 readfds writefds reqs
1063
1064   -- perform the select()
1065   let do_select delays = do
1066           -- check the current time and wake up any thread in
1067           -- threadDelay whose timeout has expired.  Also find the
1068           -- timeout value for the select() call.
1069           now <- getUSecOfDay
1070           (delays', timeout) <- getDelay now ptimeval delays
1071
1072           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1073                         nullPtr timeout
1074           if (res == -1)
1075              then do
1076                 err <- getErrno
1077                 case err of
1078                   _ | err == eINTR ->  do_select delays'
1079                         -- EINTR: just redo the select()
1080                   _ | err == eBADF ->  return (True, delays)
1081                         -- EBADF: one of the file descriptors is closed or bad,
1082                         -- we don't know which one, so wake everyone up.
1083                   _ | otherwise    ->  throwErrno "select"
1084                         -- otherwise (ENOMEM or EINVAL) something has gone
1085                         -- wrong; report the error.
1086              else
1087                 return (False,delays')
1088
1089   (wakeup_all,delays') <- do_select delays0
1090
1091   exit <-
1092     if wakeup_all then return False
1093       else do
1094         b <- fdIsSet wakeup readfds
1095         if b == 0 
1096           then return False
1097           else alloca $ \p -> do 
1098                  c_read (fromIntegral wakeup) p 1; return ()
1099                  s <- peek p            
1100                  case s of
1101                   _ | s == io_MANAGER_WAKEUP -> return False
1102                   _ | s == io_MANAGER_DIE    -> return True
1103                   _ -> withMVar signalHandlerLock $ \_ -> do
1104                           handler_tbl <- peek handlers
1105                           sp <- peekElemOff handler_tbl (fromIntegral s)
1106                           io <- deRefStablePtr sp
1107                           forkIO io
1108                           return False
1109
1110   if exit then return () else do
1111
1112   atomicModifyIORef prodding (\_ -> (False,False))
1113
1114   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1115                          else completeRequests reqs readfds writefds []
1116
1117   service_loop wakeup readfds writefds ptimeval reqs' delays'
1118
1119 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1120 io_MANAGER_WAKEUP = 0xff
1121 io_MANAGER_DIE    = 0xfe
1122
1123 stick :: IORef Fd
1124 {-# NOINLINE stick #-}
1125 stick = unsafePerformIO (newIORef 0)
1126
1127 wakeupIOManager :: IO ()
1128 wakeupIOManager = do
1129   fd <- readIORef stick
1130   with io_MANAGER_WAKEUP $ \pbuf -> do 
1131     c_write (fromIntegral fd) pbuf 1; return ()
1132
1133 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1134 -- this race condition is #1922, although that bug was on Windows a similar
1135 -- bug also exists on Unix.
1136 signalHandlerLock :: MVar ()
1137 signalHandlerLock = unsafePerformIO (newMVar ())
1138
1139 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1140
1141 foreign import ccall "setIOManagerPipe"
1142   c_setIOManagerPipe :: CInt -> IO ()
1143
1144 -- -----------------------------------------------------------------------------
1145 -- IO requests
1146
1147 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1148 buildFdSets maxfd _       _        [] = return maxfd
1149 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1150   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1151   | otherwise        =  do
1152         fdSet fd readfds
1153         buildFdSets (max maxfd fd) readfds writefds reqs
1154 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1155   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1156   | otherwise        =  do
1157         fdSet fd writefds
1158         buildFdSets (max maxfd fd) readfds writefds reqs
1159
1160 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1161                  -> IO [IOReq]
1162 completeRequests [] _ _ reqs' = return reqs'
1163 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1164   b <- fdIsSet fd readfds
1165   if b /= 0
1166     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1167     else completeRequests reqs readfds writefds (Read fd m : reqs')
1168 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1169   b <- fdIsSet fd writefds
1170   if b /= 0
1171     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1172     else completeRequests reqs readfds writefds (Write fd m : reqs')
1173
1174 wakeupAll :: [IOReq] -> IO ()
1175 wakeupAll [] = return ()
1176 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1177 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1178
1179 waitForReadEvent :: Fd -> IO ()
1180 waitForReadEvent fd = do
1181   m <- newEmptyMVar
1182   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1183   prodServiceThread
1184   takeMVar m
1185
1186 waitForWriteEvent :: Fd -> IO ()
1187 waitForWriteEvent fd = do
1188   m <- newEmptyMVar
1189   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1190   prodServiceThread
1191   takeMVar m
1192
1193 -- -----------------------------------------------------------------------------
1194 -- Delays
1195
1196 -- Walk the queue of pending delays, waking up any that have passed
1197 -- and return the smallest delay to wait for.  The queue of pending
1198 -- delays is kept ordered.
1199 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1200 getDelay _   _        [] = return ([],nullPtr)
1201 getDelay now ptimeval all@(d : rest) 
1202   = case d of
1203      Delay time m | now >= time -> do
1204         putMVar m ()
1205         getDelay now ptimeval rest
1206      DelaySTM time t | now >= time -> do
1207         atomically $ writeTVar t True
1208         getDelay now ptimeval rest
1209      _otherwise -> do
1210         setTimevalTicks ptimeval (delayTime d - now)
1211         return (all,ptimeval)
1212
1213 data CTimeVal
1214
1215 foreign import ccall unsafe "sizeofTimeVal"
1216   sizeofTimeVal :: Int
1217
1218 foreign import ccall unsafe "setTimevalTicks" 
1219   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1220
1221 {- 
1222   On Win32 we're going to have a single Pipe, and a
1223   waitForSingleObject with the delay time.  For signals, we send a
1224   byte down the pipe just like on Unix.
1225 -}
1226
1227 -- ----------------------------------------------------------------------------
1228 -- select() interface
1229
1230 -- ToDo: move to System.Posix.Internals?
1231
1232 data CFdSet
1233
1234 foreign import ccall safe "select"
1235   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1236            -> IO CInt
1237
1238 foreign import ccall unsafe "hsFD_SETSIZE"
1239   c_fD_SETSIZE :: CInt
1240
1241 fD_SETSIZE :: Fd
1242 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1243
1244 foreign import ccall unsafe "hsFD_ISSET"
1245   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1246
1247 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1248 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1249
1250 foreign import ccall unsafe "hsFD_SET"
1251   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1252
1253 fdSet :: Fd -> Ptr CFdSet -> IO ()
1254 fdSet (Fd fd) fdset = c_fdSet fd fdset
1255
1256 foreign import ccall unsafe "hsFD_ZERO"
1257   fdZero :: Ptr CFdSet -> IO ()
1258
1259 foreign import ccall unsafe "sizeof_fd_set"
1260   sizeofFdSet :: Int
1261
1262 #endif
1263
1264 reportStackOverflow :: IO a
1265 reportStackOverflow = do callStackOverflowHook; return undefined
1266
1267 reportError :: SomeException -> IO a
1268 reportError ex = do
1269    handler <- getUncaughtExceptionHandler
1270    handler ex
1271    return undefined
1272
1273 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1274 -- the unsafe below.
1275 foreign import ccall unsafe "stackOverflow"
1276         callStackOverflowHook :: IO ()
1277
1278 {-# NOINLINE uncaughtExceptionHandler #-}
1279 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1280 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1281    where
1282       defaultHandler :: SomeException -> IO ()
1283       defaultHandler se@(SomeException ex) = do
1284          (hFlush stdout) `catchAny` (\ _ -> return ())
1285          let msg = case cast ex of
1286                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1287                _ -> case cast ex of
1288                     Just (ErrorCall s) -> s
1289                     _                  -> showsPrec 0 se ""
1290          withCString "%s" $ \cfmt ->
1291           withCString msg $ \cmsg ->
1292             errorBelch cfmt cmsg
1293
1294 -- don't use errorBelch() directly, because we cannot call varargs functions
1295 -- using the FFI.
1296 foreign import ccall unsafe "HsBase.h errorBelch2"
1297    errorBelch :: CString -> CString -> IO ()
1298
1299 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1300 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1301
1302 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1303 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1304 \end{code}