Fix #1185: restart the IO manager after fork()
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * TVars
54         , STM(..)
55         , atomically    -- :: STM a -> IO a
56         , retry         -- :: STM a
57         , orElse        -- :: STM a -> STM a -> STM a
58         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
59         , alwaysSucceeds -- :: STM a -> STM ()
60         , always        -- :: STM Bool -> STM ()
61         , TVar(..)
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , readTVarIO    -- :: TVar a -> IO a
66         , writeTVar     -- :: a -> TVar a -> STM ()
67         , unsafeIOToSTM -- :: IO a -> STM a
68
69         -- * Miscellaneous
70         , withMVar
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , Signal, HandlerFun, setHandler, runHandlers
82 #endif
83
84         , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
86         , syncIOManager
87 #endif
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
95         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
96
97         , reportError, reportStackOverflow
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifdef mingw32_HOST_OS
108 import Data.Typeable
109 #endif
110
111 #ifndef mingw32_HOST_OS
112 import Data.Dynamic
113 #endif
114 import Control.Monad
115 import Data.Maybe
116
117 import GHC.Base
118 #ifndef mingw32_HOST_OS
119 import GHC.Debug
120 #endif
121 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
122 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
123 import GHC.IO
124 import GHC.IO.Exception
125 import GHC.Exception
126 import GHC.IORef
127 import GHC.MVar
128 import GHC.Num          ( Num(..) )
129 import GHC.Real         ( fromIntegral )
130 #ifndef mingw32_HOST_OS
131 import GHC.IOArray
132 import GHC.Arr          ( inRange )
133 #endif
134 #ifdef mingw32_HOST_OS
135 import GHC.Real         ( div )
136 import GHC.Ptr
137 #endif
138 #ifdef mingw32_HOST_OS
139 import GHC.Read         ( Read )
140 import GHC.Enum         ( Enum )
141 #endif
142 import GHC.Pack         ( packCString# )
143 import GHC.Show         ( Show(..), showString )
144
145 infixr 0 `par`, `pseq`
146 \end{code}
147
148 %************************************************************************
149 %*                                                                      *
150 \subsection{@ThreadId@, @par@, and @fork@}
151 %*                                                                      *
152 %************************************************************************
153
154 \begin{code}
155 data ThreadId = ThreadId ThreadId# deriving( Typeable )
156 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
157 -- But since ThreadId# is unlifted, the Weak type must use open
158 -- type variables.
159 {- ^
160 A 'ThreadId' is an abstract type representing a handle to a thread.
161 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
162 the 'Ord' instance implements an arbitrary total ordering over
163 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
164 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
165 useful when debugging or diagnosing the behaviour of a concurrent
166 program.
167
168 /Note/: in GHC, if you have a 'ThreadId', you essentially have
169 a pointer to the thread itself.  This means the thread itself can\'t be
170 garbage collected until you drop the 'ThreadId'.
171 This misfeature will hopefully be corrected at a later date.
172
173 /Note/: Hugs does not provide any operations on other threads;
174 it defines 'ThreadId' as a synonym for ().
175 -}
176
177 instance Show ThreadId where
178    showsPrec d t = 
179         showString "ThreadId " . 
180         showsPrec d (getThreadId (id2TSO t))
181
182 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
183
184 id2TSO :: ThreadId -> ThreadId#
185 id2TSO (ThreadId t) = t
186
187 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
188 -- Returns -1, 0, 1
189
190 cmpThread :: ThreadId -> ThreadId -> Ordering
191 cmpThread t1 t2 = 
192    case cmp_thread (id2TSO t1) (id2TSO t2) of
193       -1 -> LT
194       0  -> EQ
195       _  -> GT -- must be 1
196
197 instance Eq ThreadId where
198    t1 == t2 = 
199       case t1 `cmpThread` t2 of
200          EQ -> True
201          _  -> False
202
203 instance Ord ThreadId where
204    compare = cmpThread
205
206 {- |
207 Sparks off a new thread to run the 'IO' computation passed as the
208 first argument, and returns the 'ThreadId' of the newly created
209 thread.
210
211 The new thread will be a lightweight thread; if you want to use a foreign
212 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
213
214 GHC note: the new thread inherits the /blocked/ state of the parent 
215 (see 'Control.Exception.block').
216
217 The newly created thread has an exception handler that discards the
218 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
219 'ThreadKilled', and passes all other exceptions to the uncaught
220 exception handler (see 'setUncaughtExceptionHandler').
221 -}
222 forkIO :: IO () -> IO ThreadId
223 forkIO action = IO $ \ s -> 
224    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
225  where
226   action_plus = catchException action childHandler
227
228 {- |
229 Like 'forkIO', but lets you specify on which CPU the thread is
230 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
231 will stay on the same CPU for its entire lifetime (`forkIO` threads
232 can migrate between CPUs according to the scheduling policy).
233 `forkOnIO` is useful for overriding the scheduling policy when you
234 know in advance how best to distribute the threads.
235
236 The `Int` argument specifies the CPU number; it is interpreted modulo
237 'numCapabilities' (note that it actually specifies a capability number
238 rather than a CPU number, but to a first approximation the two are
239 equivalent).
240 -}
241 forkOnIO :: Int -> IO () -> IO ThreadId
242 forkOnIO (I# cpu) action = IO $ \ s -> 
243    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
244  where
245   action_plus = catchException action childHandler
246
247 -- | the value passed to the @+RTS -N@ flag.  This is the number of
248 -- Haskell threads that can run truly simultaneously at any given
249 -- time, and is typically set to the number of physical CPU cores on
250 -- the machine.
251 numCapabilities :: Int
252 numCapabilities = unsafePerformIO $  do 
253                     n <- peek n_capabilities
254                     return (fromIntegral n)
255
256 #if defined(mingw32_HOST_OS) && defined(__PIC__)
257 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
258 #else
259 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
260 #endif
261 childHandler :: SomeException -> IO ()
262 childHandler err = catchException (real_handler err) childHandler
263
264 real_handler :: SomeException -> IO ()
265 real_handler se@(SomeException ex) =
266   -- ignore thread GC and killThread exceptions:
267   case cast ex of
268   Just BlockedIndefinitelyOnMVar        -> return ()
269   _ -> case cast ex of
270        Just BlockedIndefinitelyOnSTM    -> return ()
271        _ -> case cast ex of
272             Just ThreadKilled           -> return ()
273             _ -> case cast ex of
274                  -- report all others:
275                  Just StackOverflow     -> reportStackOverflow
276                  _                      -> reportError se
277
278 {- | 'killThread' terminates the given thread (GHC only).
279 Any work already done by the thread isn\'t
280 lost: the computation is suspended until required by another thread.
281 The memory used by the thread will be garbage collected if it isn\'t
282 referenced from anywhere.  The 'killThread' function is defined in
283 terms of 'throwTo':
284
285 > killThread tid = throwTo tid ThreadKilled
286
287 Killthread is a no-op if the target thread has already completed.
288 -}
289 killThread :: ThreadId -> IO ()
290 killThread tid = throwTo tid ThreadKilled
291
292 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
293
294 'throwTo' does not return until the exception has been raised in the
295 target thread. 
296 The calling thread can thus be certain that the target
297 thread has received the exception.  This is a useful property to know
298 when dealing with race conditions: eg. if there are two threads that
299 can kill each other, it is guaranteed that only one of the threads
300 will get to kill the other.
301
302 If the target thread is currently making a foreign call, then the
303 exception will not be raised (and hence 'throwTo' will not return)
304 until the call has completed.  This is the case regardless of whether
305 the call is inside a 'block' or not.
306
307 Important note: the behaviour of 'throwTo' differs from that described in
308 the paper \"Asynchronous exceptions in Haskell\"
309 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
310 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
311 a more synchronous design in which 'throwTo' does not return until the exception
312 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
313 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
314 the paper).
315
316 There is currently no guarantee that the exception delivered by 'throwTo' will be
317 delivered at the first possible opportunity.  In particular, a thread may 
318 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
319 a pending 'throwTo'.  This is arguably undesirable behaviour.
320
321  -}
322 throwTo :: Exception e => ThreadId -> e -> IO ()
323 throwTo (ThreadId tid) ex = IO $ \ s ->
324    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
325
326 -- | Returns the 'ThreadId' of the calling thread (GHC only).
327 myThreadId :: IO ThreadId
328 myThreadId = IO $ \s ->
329    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
330
331
332 -- |The 'yield' action allows (forces, in a co-operative multitasking
333 -- implementation) a context-switch to any other currently runnable
334 -- threads (if any), and is occasionally useful when implementing
335 -- concurrency abstractions.
336 yield :: IO ()
337 yield = IO $ \s -> 
338    case (yield# s) of s1 -> (# s1, () #)
339
340 {- | 'labelThread' stores a string as identifier for this thread if
341 you built a RTS with debugging support. This identifier will be used in
342 the debugging output to make distinction of different threads easier
343 (otherwise you only have the thread state object\'s address in the heap).
344
345 Other applications like the graphical Concurrent Haskell Debugger
346 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
347 'labelThread' for their purposes as well.
348 -}
349
350 labelThread :: ThreadId -> String -> IO ()
351 labelThread (ThreadId t) str = IO $ \ s ->
352    let !ps  = packCString# str
353        !adr = byteArrayContents# ps in
354      case (labelThread# t adr s) of s1 -> (# s1, () #)
355
356 --      Nota Bene: 'pseq' used to be 'seq'
357 --                 but 'seq' is now defined in PrelGHC
358 --
359 -- "pseq" is defined a bit weirdly (see below)
360 --
361 -- The reason for the strange "lazy" call is that
362 -- it fools the compiler into thinking that pseq  and par are non-strict in
363 -- their second argument (even if it inlines pseq at the call site).
364 -- If it thinks pseq is strict in "y", then it often evaluates
365 -- "y" before "x", which is totally wrong.  
366
367 {-# INLINE pseq  #-}
368 pseq :: a -> b -> b
369 pseq  x y = x `seq` lazy y
370
371 {-# INLINE par  #-}
372 par :: a -> b -> b
373 par  x y = case (par# x) of { _ -> lazy y }
374
375 -- | Internal function used by the RTS to run sparks.
376 runSparks :: IO ()
377 runSparks = IO loop
378   where loop s = case getSpark# s of
379                    (# s', n, p #) ->
380                       if n ==# 0# then (# s', () #)
381                                   else p `seq` loop s'
382
383 data BlockReason
384   = BlockedOnMVar
385         -- ^blocked on on 'MVar'
386   | BlockedOnBlackHole
387         -- ^blocked on a computation in progress by another thread
388   | BlockedOnException
389         -- ^blocked in 'throwTo'
390   | BlockedOnSTM
391         -- ^blocked in 'retry' in an STM transaction
392   | BlockedOnForeignCall
393         -- ^currently in a foreign call
394   | BlockedOnOther
395         -- ^blocked on some other resource.  Without @-threaded@,
396         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
397         -- they show up as 'BlockedOnMVar'.
398   deriving (Eq,Ord,Show)
399
400 -- | The current status of a thread
401 data ThreadStatus
402   = ThreadRunning
403         -- ^the thread is currently runnable or running
404   | ThreadFinished
405         -- ^the thread has finished
406   | ThreadBlocked  BlockReason
407         -- ^the thread is blocked on some resource
408   | ThreadDied
409         -- ^the thread received an uncaught exception
410   deriving (Eq,Ord,Show)
411
412 threadStatus :: ThreadId -> IO ThreadStatus
413 threadStatus (ThreadId t) = IO $ \s ->
414    case threadStatus# t s of
415      (# s', stat #) -> (# s', mk_stat (I# stat) #)
416    where
417         -- NB. keep these in sync with includes/Constants.h
418      mk_stat 0  = ThreadRunning
419      mk_stat 1  = ThreadBlocked BlockedOnMVar
420      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
421      mk_stat 3  = ThreadBlocked BlockedOnException
422      mk_stat 7  = ThreadBlocked BlockedOnSTM
423      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
424      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
425      mk_stat 16 = ThreadFinished
426      mk_stat 17 = ThreadDied
427      mk_stat _  = ThreadBlocked BlockedOnOther
428 \end{code}
429
430
431 %************************************************************************
432 %*                                                                      *
433 \subsection[stm]{Transactional heap operations}
434 %*                                                                      *
435 %************************************************************************
436
437 TVars are shared memory locations which support atomic memory
438 transactions.
439
440 \begin{code}
441 -- |A monad supporting atomic memory transactions.
442 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
443
444 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
445 unSTM (STM a) = a
446
447 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
448
449 instance  Functor STM where
450    fmap f x = x >>= (return . f)
451
452 instance  Monad STM  where
453     {-# INLINE return #-}
454     {-# INLINE (>>)   #-}
455     {-# INLINE (>>=)  #-}
456     m >> k      = thenSTM m k
457     return x    = returnSTM x
458     m >>= k     = bindSTM m k
459
460 bindSTM :: STM a -> (a -> STM b) -> STM b
461 bindSTM (STM m) k = STM ( \s ->
462   case m s of 
463     (# new_s, a #) -> unSTM (k a) new_s
464   )
465
466 thenSTM :: STM a -> STM b -> STM b
467 thenSTM (STM m) k = STM ( \s ->
468   case m s of 
469     (# new_s, _ #) -> unSTM k new_s
470   )
471
472 returnSTM :: a -> STM a
473 returnSTM x = STM (\s -> (# s, x #))
474
475 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
476 -- dangerous thing to do.  
477 --
478 --   * The STM implementation will often run transactions multiple
479 --     times, so you need to be prepared for this if your IO has any
480 --     side effects.
481 --
482 --   * The STM implementation will abort transactions that are known to
483 --     be invalid and need to be restarted.  This may happen in the middle
484 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
485 --     that need releasing (exception handlers are ignored when aborting
486 --     the transaction).  That includes doing any IO using Handles, for
487 --     example.  Getting this wrong will probably lead to random deadlocks.
488 --
489 --   * The transaction may have seen an inconsistent view of memory when
490 --     the IO runs.  Invariants that you expect to be true throughout
491 --     your program may not be true inside a transaction, due to the
492 --     way transactions are implemented.  Normally this wouldn't be visible
493 --     to the programmer, but using `unsafeIOToSTM` can expose it.
494 --
495 unsafeIOToSTM :: IO a -> STM a
496 unsafeIOToSTM (IO m) = STM m
497
498 -- |Perform a series of STM actions atomically.
499 --
500 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
501 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
502 -- this would effectively allow a transaction inside a transaction, depending
503 -- on exactly when the thunk is evaluated.)
504 --
505 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
506 -- and which allows top-level TVars to be allocated.
507
508 atomically :: STM a -> IO a
509 atomically (STM m) = IO (\s -> (atomically# m) s )
510
511 -- |Retry execution of the current memory transaction because it has seen
512 -- values in TVars which mean that it should not continue (e.g. the TVars
513 -- represent a shared buffer that is now empty).  The implementation may
514 -- block the thread until one of the TVars that it has read from has been
515 -- udpated. (GHC only)
516 retry :: STM a
517 retry = STM $ \s# -> retry# s#
518
519 -- |Compose two alternative STM actions (GHC only).  If the first action
520 -- completes without retrying then it forms the result of the orElse.
521 -- Otherwise, if the first action retries, then the second action is
522 -- tried in its place.  If both actions retry then the orElse as a
523 -- whole retries.
524 orElse :: STM a -> STM a -> STM a
525 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
526
527 -- |Exception handling within STM actions.
528 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
529 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
530
531 -- | Low-level primitive on which always and alwaysSucceeds are built.
532 -- checkInv differs form these in that (i) the invariant is not 
533 -- checked when checkInv is called, only at the end of this and
534 -- subsequent transcations, (ii) the invariant failure is indicated
535 -- by raising an exception.
536 checkInv :: STM a -> STM ()
537 checkInv (STM m) = STM (\s -> (check# m) s)
538
539 -- | alwaysSucceeds adds a new invariant that must be true when passed
540 -- to alwaysSucceeds, at the end of the current transaction, and at
541 -- the end of every subsequent transaction.  If it fails at any
542 -- of those points then the transaction violating it is aborted
543 -- and the exception raised by the invariant is propagated.
544 alwaysSucceeds :: STM a -> STM ()
545 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) 
546                       checkInv i
547
548 -- | always is a variant of alwaysSucceeds in which the invariant is
549 -- expressed as an STM Bool action that must return True.  Returning
550 -- False or raising an exception are both treated as invariant failures.
551 always :: STM Bool -> STM ()
552 always i = alwaysSucceeds ( do v <- i
553                                if (v) then return () else ( error "Transacional invariant violation" ) )
554
555 -- |Shared memory locations that support atomic memory transactions.
556 data TVar a = TVar (TVar# RealWorld a)
557
558 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
559
560 instance Eq (TVar a) where
561         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
562
563 -- |Create a new TVar holding a value supplied
564 newTVar :: a -> STM (TVar a)
565 newTVar val = STM $ \s1# ->
566     case newTVar# val s1# of
567          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
568
569 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
570 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
571 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
572 -- possible.
573 newTVarIO :: a -> IO (TVar a)
574 newTVarIO val = IO $ \s1# ->
575     case newTVar# val s1# of
576          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
577
578 -- |Return the current value stored in a TVar.
579 -- This is equivalent to
580 --
581 -- >  readTVarIO = atomically . readTVar
582 --
583 -- but works much faster, because it doesn't perform a complete
584 -- transaction, it just reads the current value of the 'TVar'.
585 readTVarIO :: TVar a -> IO a
586 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
587
588 -- |Return the current value stored in a TVar
589 readTVar :: TVar a -> STM a
590 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
591
592 -- |Write the supplied value into a TVar
593 writeTVar :: TVar a -> a -> STM ()
594 writeTVar (TVar tvar#) val = STM $ \s1# ->
595     case writeTVar# tvar# val s1# of
596          s2# -> (# s2#, () #)
597   
598 \end{code}
599
600 MVar utilities
601
602 \begin{code}
603 withMVar :: MVar a -> (a -> IO b) -> IO b
604 withMVar m io = 
605   block $ do
606     a <- takeMVar m
607     b <- catchAny (unblock (io a))
608             (\e -> do putMVar m a; throw e)
609     putMVar m a
610     return b
611
612 modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
613 modifyMVar_ m io =
614   block $ do
615     a <- takeMVar m
616     a' <- catchAny (unblock (io a))
617             (\e -> do putMVar m a; throw e)
618     putMVar m a'
619     return ()
620 \end{code}
621
622 %************************************************************************
623 %*                                                                      *
624 \subsection{Thread waiting}
625 %*                                                                      *
626 %************************************************************************
627
628 \begin{code}
629 #ifdef mingw32_HOST_OS
630
631 -- Note: threadWaitRead and threadWaitWrite aren't really functional
632 -- on Win32, but left in there because lib code (still) uses them (the manner
633 -- in which they're used doesn't cause problems on a Win32 platform though.)
634
635 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
636 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
637   IO $ \s -> case asyncRead# fd isSock len buf s of 
638                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
639
640 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
641 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
642   IO $ \s -> case asyncWrite# fd isSock len buf s of 
643                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
644
645 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
646 asyncDoProc (FunPtr proc) (Ptr param) = 
647     -- the 'length' value is ignored; simplifies implementation of
648     -- the async*# primops to have them all return the same result.
649   IO $ \s -> case asyncDoProc# proc param s  of 
650                (# s', _len#, err# #) -> (# s', I# err# #)
651
652 -- to aid the use of these primops by the IO Handle implementation,
653 -- provide the following convenience funs:
654
655 -- this better be a pinned byte array!
656 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
657 asyncReadBA fd isSock len off bufB = 
658   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
659   
660 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
661 asyncWriteBA fd isSock len off bufB = 
662   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
663
664 #endif
665
666 -- -----------------------------------------------------------------------------
667 -- Thread IO API
668
669 -- | Block the current thread until data is available to read on the
670 -- given file descriptor (GHC only).
671 threadWaitRead :: Fd -> IO ()
672 threadWaitRead fd
673 #ifndef mingw32_HOST_OS
674   | threaded  = waitForReadEvent fd
675 #endif
676   | otherwise = IO $ \s -> 
677         case fromIntegral fd of { I# fd# ->
678         case waitRead# fd# s of { s' -> (# s', () #)
679         }}
680
681 -- | Block the current thread until data can be written to the
682 -- given file descriptor (GHC only).
683 threadWaitWrite :: Fd -> IO ()
684 threadWaitWrite fd
685 #ifndef mingw32_HOST_OS
686   | threaded  = waitForWriteEvent fd
687 #endif
688   | otherwise = IO $ \s -> 
689         case fromIntegral fd of { I# fd# ->
690         case waitWrite# fd# s of { s' -> (# s', () #)
691         }}
692
693 -- | Suspends the current thread for a given number of microseconds
694 -- (GHC only).
695 --
696 -- There is no guarantee that the thread will be rescheduled promptly
697 -- when the delay has expired, but the thread will never continue to
698 -- run /earlier/ than specified.
699 --
700 threadDelay :: Int -> IO ()
701 threadDelay time
702   | threaded  = waitForDelayEvent time
703   | otherwise = IO $ \s -> 
704         case fromIntegral time of { I# time# ->
705         case delay# time# s of { s' -> (# s', () #)
706         }}
707
708
709 -- | Set the value of returned TVar to True after a given number of
710 -- microseconds. The caveats associated with threadDelay also apply.
711 --
712 registerDelay :: Int -> IO (TVar Bool)
713 registerDelay usecs 
714   | threaded = waitForDelayEventSTM usecs
715   | otherwise = error "registerDelay: requires -threaded"
716
717 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
718
719 waitForDelayEvent :: Int -> IO ()
720 waitForDelayEvent usecs = do
721   m <- newEmptyMVar
722   target <- calculateTarget usecs
723   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
724   prodServiceThread
725   takeMVar m
726
727 -- Delays for use in STM
728 waitForDelayEventSTM :: Int -> IO (TVar Bool)
729 waitForDelayEventSTM usecs = do
730    t <- atomically $ newTVar False
731    target <- calculateTarget usecs
732    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
733    prodServiceThread
734    return t  
735     
736 calculateTarget :: Int -> IO USecs
737 calculateTarget usecs = do
738     now <- getUSecOfDay
739     return $ now + (fromIntegral usecs)
740
741
742 -- ----------------------------------------------------------------------------
743 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
744
745 -- In the threaded RTS, we employ a single IO Manager thread to wait
746 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
747 -- and delays (threadDelay).  
748 --
749 -- We can do this because in the threaded RTS the IO Manager can make
750 -- a non-blocking call to select(), so we don't have to do select() in
751 -- the scheduler as we have to in the non-threaded RTS.  We get performance
752 -- benefits from doing it this way, because we only have to restart the select()
753 -- when a new request arrives, rather than doing one select() each time
754 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
755 -- by not having to check for completed IO requests.
756
757 #ifndef mingw32_HOST_OS
758 data IOReq
759   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
760   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
761 #endif
762
763 data DelayReq
764   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
765   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
766
767 #ifndef mingw32_HOST_OS
768 pendingEvents :: IORef [IOReq]
769 #endif
770 pendingDelays :: IORef [DelayReq]
771 {-# NOINLINE pendingEvents #-}
772 {-# NOINLINE pendingDelays #-}
773 (pendingEvents,pendingDelays) = unsafePerformIO $ do
774   reqs <- newIORef []
775   dels <- newIORef []
776   return (reqs, dels)
777
778 {-# NOINLINE ioManagerThread #-}
779 ioManagerThread :: MVar (Maybe ThreadId)
780 ioManagerThread = unsafePerformIO $ newMVar Nothing
781
782 ensureIOManagerIsRunning :: IO ()
783 ensureIOManagerIsRunning 
784   | threaded  = startIOManagerThread
785   | otherwise = return ()
786
787 startIOManagerThread :: IO ()
788 startIOManagerThread = do
789   modifyMVar_ ioManagerThread $ \old -> do
790     let create = do t <- forkIO ioManager; return (Just t)
791     case old of
792       Nothing -> create
793       Just t  -> do
794         s <- threadStatus t
795         case s of
796           ThreadFinished -> create
797           ThreadDied     -> create
798           _other         -> return (Just t)
799
800 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
801 insertDelay d [] = [d]
802 insertDelay d1 ds@(d2 : rest)
803   | delayTime d1 <= delayTime d2 = d1 : ds
804   | otherwise                    = d2 : insertDelay d1 rest
805
806 delayTime :: DelayReq -> USecs
807 delayTime (Delay t _) = t
808 delayTime (DelaySTM t _) = t
809
810 type USecs = Word64
811
812 foreign import ccall unsafe "getUSecOfDay" 
813   getUSecOfDay :: IO USecs
814
815 prodding :: IORef Bool
816 {-# NOINLINE prodding #-}
817 prodding = unsafePerformIO (newIORef False)
818
819 prodServiceThread :: IO ()
820 prodServiceThread = do
821   was_set <- atomicModifyIORef prodding (\a -> (True,a))
822   if (not (was_set)) then wakeupIOManager else return ()
823
824 #ifdef mingw32_HOST_OS
825 -- ----------------------------------------------------------------------------
826 -- Windows IO manager thread
827
828 ioManager :: IO ()
829 ioManager = do
830   wakeup <- c_getIOManagerEvent
831   service_loop wakeup []
832
833 service_loop :: HANDLE          -- read end of pipe
834              -> [DelayReq]      -- current delay requests
835              -> IO ()
836
837 service_loop wakeup old_delays = do
838   -- pick up new delay requests
839   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
840   let  delays = foldr insertDelay old_delays new_delays
841
842   now <- getUSecOfDay
843   (delays', timeout) <- getDelay now delays
844
845   r <- c_WaitForSingleObject wakeup timeout
846   case r of
847     0xffffffff -> do c_maperrno; throwErrno "service_loop"
848     0 -> do
849         r2 <- c_readIOManagerEvent
850         exit <- 
851               case r2 of
852                 _ | r2 == io_MANAGER_WAKEUP -> return False
853                 _ | r2 == io_MANAGER_DIE    -> return True
854                 0 -> return False -- spurious wakeup
855                 _ -> do start_console_handler (r2 `shiftR` 1); return False
856         unless exit $ service_cont wakeup delays'
857
858     _other -> service_cont wakeup delays' -- probably timeout        
859
860 service_cont :: HANDLE -> [DelayReq] -> IO ()
861 service_cont wakeup delays = do
862   r <- atomicModifyIORef prodding (\_ -> (False,False))
863   r `seq` return () -- avoid space leak
864   service_loop wakeup delays
865
866 -- must agree with rts/win32/ThrIOManager.c
867 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
868 io_MANAGER_WAKEUP = 0xffffffff
869 io_MANAGER_DIE    = 0xfffffffe
870
871 data ConsoleEvent
872  = ControlC
873  | Break
874  | Close
875     -- these are sent to Services only.
876  | Logoff
877  | Shutdown
878  deriving (Eq, Ord, Enum, Show, Read, Typeable)
879
880 start_console_handler :: Word32 -> IO ()
881 start_console_handler r =
882   case toWin32ConsoleEvent r of
883      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
884                     _ <- forkIO (handler x)
885                     return ()
886      Nothing -> return ()
887
888 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
889 toWin32ConsoleEvent ev = 
890    case ev of
891        0 {- CTRL_C_EVENT-}        -> Just ControlC
892        1 {- CTRL_BREAK_EVENT-}    -> Just Break
893        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
894        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
895        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
896        _ -> Nothing
897
898 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
899 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
900
901 wakeupIOManager :: IO ()
902 wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
903
904 -- Walk the queue of pending delays, waking up any that have passed
905 -- and return the smallest delay to wait for.  The queue of pending
906 -- delays is kept ordered.
907 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
908 getDelay _   [] = return ([], iNFINITE)
909 getDelay now all@(d : rest) 
910   = case d of
911      Delay time m | now >= time -> do
912         putMVar m ()
913         getDelay now rest
914      DelaySTM time t | now >= time -> do
915         atomically $ writeTVar t True
916         getDelay now rest
917      _otherwise ->
918         -- delay is in millisecs for WaitForSingleObject
919         let micro_seconds = delayTime d - now
920             milli_seconds = (micro_seconds + 999) `div` 1000
921         in return (all, fromIntegral milli_seconds)
922
923 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
924 -- available yet.  We should move some Win32 functionality down here,
925 -- maybe as part of the grand reorganisation of the base package...
926 type HANDLE       = Ptr ()
927 type DWORD        = Word32
928
929 iNFINITE :: DWORD
930 iNFINITE = 0xFFFFFFFF -- urgh
931
932 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
933   c_getIOManagerEvent :: IO HANDLE
934
935 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
936   c_readIOManagerEvent :: IO Word32
937
938 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
939   c_sendIOManagerEvent :: Word32 -> IO ()
940
941 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
942    c_maperrno :: IO ()
943
944 foreign import stdcall "WaitForSingleObject"
945    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
946
947 #else
948 -- ----------------------------------------------------------------------------
949 -- Unix IO manager thread, using select()
950
951 ioManager :: IO ()
952 ioManager = do
953         allocaArray 2 $ \fds -> do
954         throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
955         rd_end <- peekElemOff fds 0
956         wr_end <- peekElemOff fds 1
957         setNonBlockingFD wr_end True -- writes happen in a signal handler, we
958                                      -- don't want them to block.
959         setCloseOnExec rd_end
960         setCloseOnExec wr_end
961         writeIORef stick (fromIntegral wr_end)
962         c_setIOManagerPipe wr_end
963         allocaBytes sizeofFdSet   $ \readfds -> do
964         allocaBytes sizeofFdSet   $ \writefds -> do 
965         allocaBytes sizeofTimeVal $ \timeval -> do
966         service_loop (fromIntegral rd_end) readfds writefds timeval [] []
967         return ()
968
969 service_loop
970    :: Fd                -- listen to this for wakeup calls
971    -> Ptr CFdSet
972    -> Ptr CFdSet
973    -> Ptr CTimeVal
974    -> [IOReq]
975    -> [DelayReq]
976    -> IO ()
977 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
978
979   -- pick up new IO requests
980   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
981   let reqs = new_reqs ++ old_reqs
982
983   -- pick up new delay requests
984   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
985   let  delays0 = foldr insertDelay old_delays new_delays
986
987   -- build the FDSets for select()
988   fdZero readfds
989   fdZero writefds
990   fdSet wakeup readfds
991   maxfd <- buildFdSets 0 readfds writefds reqs
992
993   -- perform the select()
994   let do_select delays = do
995           -- check the current time and wake up any thread in
996           -- threadDelay whose timeout has expired.  Also find the
997           -- timeout value for the select() call.
998           now <- getUSecOfDay
999           (delays', timeout) <- getDelay now ptimeval delays
1000
1001           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1002                         nullPtr timeout
1003           if (res == -1)
1004              then do
1005                 err <- getErrno
1006                 case err of
1007                   _ | err == eINTR ->  do_select delays'
1008                         -- EINTR: just redo the select()
1009                   _ | err == eBADF ->  return (True, delays)
1010                         -- EBADF: one of the file descriptors is closed or bad,
1011                         -- we don't know which one, so wake everyone up.
1012                   _ | otherwise    ->  throwErrno "select"
1013                         -- otherwise (ENOMEM or EINVAL) something has gone
1014                         -- wrong; report the error.
1015              else
1016                 return (False,delays')
1017
1018   (wakeup_all,delays') <- do_select delays0
1019
1020   exit <-
1021     if wakeup_all then return False
1022       else do
1023         b <- fdIsSet wakeup readfds
1024         if b == 0 
1025           then return False
1026           else alloca $ \p -> do 
1027                  warnErrnoIfMinus1_ "service_loop" $
1028                      c_read (fromIntegral wakeup) p 1
1029                  s <- peek p            
1030                  case s of
1031                   _ | s == io_MANAGER_WAKEUP -> return False
1032                   _ | s == io_MANAGER_DIE    -> return True
1033                   _ | s == io_MANAGER_SYNC   -> do
1034                        mvars <- readIORef sync
1035                        mapM_ (flip putMVar ()) mvars
1036                        return False
1037                   _ -> do
1038                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1039                        withForeignPtr fp $ \p_siginfo -> do
1040                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1041                                  sizeof_siginfo_t
1042                          when (r /= fromIntegral sizeof_siginfo_t) $
1043                             error "failed to read siginfo_t"
1044                        runHandlers' fp (fromIntegral s)
1045                        return False
1046
1047   unless exit $ do
1048
1049   atomicModifyIORef prodding (\_ -> (False, ()))
1050
1051   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1052                          else completeRequests reqs readfds writefds []
1053
1054   service_loop wakeup readfds writefds ptimeval reqs' delays'
1055
1056 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1057 io_MANAGER_WAKEUP = 0xff
1058 io_MANAGER_DIE    = 0xfe
1059 io_MANAGER_SYNC   = 0xfd
1060
1061 -- | the stick is for poking the IO manager with
1062 stick :: IORef Fd
1063 {-# NOINLINE stick #-}
1064 stick = unsafePerformIO $ newIORef (-1)
1065
1066 {-# NOINLINE sync #-}
1067 sync :: IORef [MVar ()]
1068 sync = unsafePerformIO (newIORef [])
1069
1070 -- waits for the IO manager to drain the pipe
1071 syncIOManager :: IO ()
1072 syncIOManager = do
1073   m <- newEmptyMVar
1074   atomicModifyIORef sync (\old -> (m:old,()))
1075   fd <- readIORef stick
1076   when (fd /= (-1)) $
1077     with io_MANAGER_SYNC $ \pbuf -> do
1078       warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
1079   takeMVar m
1080
1081 wakeupIOManager :: IO ()
1082 wakeupIOManager = do
1083   fd <- readIORef stick
1084   when (fd /= (-1)) $
1085     with io_MANAGER_WAKEUP $ \pbuf -> do
1086       warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
1087
1088 -- For the non-threaded RTS
1089 runHandlers :: Ptr Word8 -> Int -> IO ()
1090 runHandlers p_info sig = do
1091   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1092   withForeignPtr fp $ \p -> do
1093     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1094     free p_info
1095   runHandlers' fp (fromIntegral sig)
1096
1097 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1098 runHandlers' p_info sig = do
1099   let int = fromIntegral sig
1100   withMVar signal_handlers $ \arr ->
1101       if not (inRange (boundsIOArray arr) int)
1102          then return ()
1103          else do handler <- unsafeReadIOArray arr int
1104                  case handler of
1105                     Nothing -> return ()
1106                     Just (f,_)  -> do _ <- forkIO (f p_info)
1107                                       return ()
1108
1109 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1110 warnErrnoIfMinus1_ what io
1111     = do r <- io
1112          when (r == -1) $ do
1113              errno <- getErrno
1114              str <- strerror errno >>= peekCString
1115              when (r == -1) $
1116                  debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1117
1118 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1119
1120 foreign import ccall "setIOManagerPipe"
1121   c_setIOManagerPipe :: CInt -> IO ()
1122
1123 foreign import ccall "__hscore_sizeof_siginfo_t"
1124   sizeof_siginfo_t :: CSize
1125
1126 type Signal = CInt
1127
1128 maxSig = 64 :: Int
1129
1130 type HandlerFun = ForeignPtr Word8 -> IO ()
1131
1132 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1133 -- this race condition is #1922, although that bug was on Windows a similar
1134 -- bug also exists on Unix.
1135 {-# NOINLINE signal_handlers #-}
1136 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1137 signal_handlers = unsafePerformIO $ do
1138    arr <- newIOArray (0,maxSig) Nothing
1139    m <- newMVar arr
1140    block $ do
1141      stable_ref <- newStablePtr m
1142      let ref = castStablePtrToPtr stable_ref
1143      ref2 <- getOrSetSignalHandlerStore ref
1144      if ref==ref2
1145         then return m
1146         else do freeStablePtr stable_ref
1147                 deRefStablePtr (castPtrToStablePtr ref2)
1148
1149 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1150     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1151
1152 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1153 setHandler sig handler = do
1154   let int = fromIntegral sig
1155   withMVar signal_handlers $ \arr -> 
1156      if not (inRange (boundsIOArray arr) int)
1157         then error "GHC.Conc.setHandler: signal out of range"
1158         else do old <- unsafeReadIOArray arr int
1159                 unsafeWriteIOArray arr int handler
1160                 return old
1161
1162 -- -----------------------------------------------------------------------------
1163 -- IO requests
1164
1165 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1166 buildFdSets maxfd _       _        [] = return maxfd
1167 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1168   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1169   | otherwise        =  do
1170         fdSet fd readfds
1171         buildFdSets (max maxfd fd) readfds writefds reqs
1172 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1173   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1174   | otherwise        =  do
1175         fdSet fd writefds
1176         buildFdSets (max maxfd fd) readfds writefds reqs
1177
1178 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1179                  -> IO [IOReq]
1180 completeRequests [] _ _ reqs' = return reqs'
1181 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1182   b <- fdIsSet fd readfds
1183   if b /= 0
1184     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1185     else completeRequests reqs readfds writefds (Read fd m : reqs')
1186 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1187   b <- fdIsSet fd writefds
1188   if b /= 0
1189     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1190     else completeRequests reqs readfds writefds (Write fd m : reqs')
1191
1192 wakeupAll :: [IOReq] -> IO ()
1193 wakeupAll [] = return ()
1194 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1195 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1196
1197 waitForReadEvent :: Fd -> IO ()
1198 waitForReadEvent fd = do
1199   m <- newEmptyMVar
1200   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1201   prodServiceThread
1202   takeMVar m
1203
1204 waitForWriteEvent :: Fd -> IO ()
1205 waitForWriteEvent fd = do
1206   m <- newEmptyMVar
1207   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1208   prodServiceThread
1209   takeMVar m
1210
1211 -- -----------------------------------------------------------------------------
1212 -- Delays
1213
1214 -- Walk the queue of pending delays, waking up any that have passed
1215 -- and return the smallest delay to wait for.  The queue of pending
1216 -- delays is kept ordered.
1217 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1218 getDelay _   _        [] = return ([],nullPtr)
1219 getDelay now ptimeval all@(d : rest) 
1220   = case d of
1221      Delay time m | now >= time -> do
1222         putMVar m ()
1223         getDelay now ptimeval rest
1224      DelaySTM time t | now >= time -> do
1225         atomically $ writeTVar t True
1226         getDelay now ptimeval rest
1227      _otherwise -> do
1228         setTimevalTicks ptimeval (delayTime d - now)
1229         return (all,ptimeval)
1230
1231 data CTimeVal
1232
1233 foreign import ccall unsafe "sizeofTimeVal"
1234   sizeofTimeVal :: Int
1235
1236 foreign import ccall unsafe "setTimevalTicks" 
1237   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1238
1239 {- 
1240   On Win32 we're going to have a single Pipe, and a
1241   waitForSingleObject with the delay time.  For signals, we send a
1242   byte down the pipe just like on Unix.
1243 -}
1244
1245 -- ----------------------------------------------------------------------------
1246 -- select() interface
1247
1248 -- ToDo: move to System.Posix.Internals?
1249
1250 data CFdSet
1251
1252 foreign import ccall safe "__hscore_select"
1253   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1254            -> IO CInt
1255
1256 foreign import ccall unsafe "hsFD_SETSIZE"
1257   c_fD_SETSIZE :: CInt
1258
1259 fD_SETSIZE :: Fd
1260 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1261
1262 foreign import ccall unsafe "hsFD_ISSET"
1263   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1264
1265 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1266 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1267
1268 foreign import ccall unsafe "hsFD_SET"
1269   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1270
1271 fdSet :: Fd -> Ptr CFdSet -> IO ()
1272 fdSet (Fd fd) fdset = c_fdSet fd fdset
1273
1274 foreign import ccall unsafe "hsFD_ZERO"
1275   fdZero :: Ptr CFdSet -> IO ()
1276
1277 foreign import ccall unsafe "sizeof_fd_set"
1278   sizeofFdSet :: Int
1279
1280 #endif
1281
1282 reportStackOverflow :: IO ()
1283 reportStackOverflow = callStackOverflowHook
1284
1285 reportError :: SomeException -> IO ()
1286 reportError ex = do
1287    handler <- getUncaughtExceptionHandler
1288    handler ex
1289
1290 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1291 -- the unsafe below.
1292 foreign import ccall unsafe "stackOverflow"
1293         callStackOverflowHook :: IO ()
1294
1295 {-# NOINLINE uncaughtExceptionHandler #-}
1296 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1297 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1298    where
1299       defaultHandler :: SomeException -> IO ()
1300       defaultHandler se@(SomeException ex) = do
1301          (hFlush stdout) `catchAny` (\ _ -> return ())
1302          let msg = case cast ex of
1303                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1304                _ -> case cast ex of
1305                     Just (ErrorCall s) -> s
1306                     _                  -> showsPrec 0 se ""
1307          withCString "%s" $ \cfmt ->
1308           withCString msg $ \cmsg ->
1309             errorBelch cfmt cmsg
1310
1311 -- don't use errorBelch() directly, because we cannot call varargs functions
1312 -- using the FFI.
1313 foreign import ccall unsafe "HsBase.h errorBelch2"
1314    errorBelch :: CString -> CString -> IO ()
1315
1316 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1317 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1318
1319 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1320 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1321
1322 \end{code}