Remove an unused import
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * TVars
54         , STM(..)
55         , atomically    -- :: STM a -> IO a
56         , retry         -- :: STM a
57         , orElse        -- :: STM a -> STM a -> STM a
58         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
59         , alwaysSucceeds -- :: STM a -> STM ()
60         , always        -- :: STM Bool -> STM ()
61         , TVar(..)
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , readTVarIO    -- :: TVar a -> IO a
66         , writeTVar     -- :: a -> TVar a -> STM ()
67         , unsafeIOToSTM -- :: IO a -> STM a
68
69         -- * Miscellaneous
70         , withMVar
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , Signal, HandlerFun, setHandler, runHandlers
82 #endif
83
84         , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
86         , syncIOManager
87 #endif
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
95         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
96
97         , reportError, reportStackOverflow
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifdef mingw32_HOST_OS
108 import Data.Typeable
109 #endif
110
111 #ifndef mingw32_HOST_OS
112 import Data.Dynamic
113 import Control.Monad
114 #endif
115 import Data.Maybe
116
117 import GHC.Base
118 import GHC.Debug
119 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
120 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
121 import GHC.IO
122 import GHC.IO.Exception
123 import GHC.Exception
124 import GHC.IORef
125 import GHC.MVar
126 import GHC.Num          ( Num(..) )
127 import GHC.Real         ( fromIntegral )
128 #ifndef mingw32_HOST_OS
129 import GHC.IOArray
130 import GHC.Arr          ( inRange )
131 #endif
132 #ifdef mingw32_HOST_OS
133 import GHC.Real         ( div )
134 import GHC.Ptr
135 #endif
136 #ifdef mingw32_HOST_OS
137 import GHC.Read         ( Read )
138 import GHC.Enum         ( Enum )
139 #endif
140 import GHC.Pack         ( packCString# )
141 import GHC.Show         ( Show(..), showString )
142
143 infixr 0 `par`, `pseq`
144 \end{code}
145
146 %************************************************************************
147 %*                                                                      *
148 \subsection{@ThreadId@, @par@, and @fork@}
149 %*                                                                      *
150 %************************************************************************
151
152 \begin{code}
153 data ThreadId = ThreadId ThreadId# deriving( Typeable )
154 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
155 -- But since ThreadId# is unlifted, the Weak type must use open
156 -- type variables.
157 {- ^
158 A 'ThreadId' is an abstract type representing a handle to a thread.
159 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
160 the 'Ord' instance implements an arbitrary total ordering over
161 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
162 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
163 useful when debugging or diagnosing the behaviour of a concurrent
164 program.
165
166 /Note/: in GHC, if you have a 'ThreadId', you essentially have
167 a pointer to the thread itself.  This means the thread itself can\'t be
168 garbage collected until you drop the 'ThreadId'.
169 This misfeature will hopefully be corrected at a later date.
170
171 /Note/: Hugs does not provide any operations on other threads;
172 it defines 'ThreadId' as a synonym for ().
173 -}
174
175 instance Show ThreadId where
176    showsPrec d t = 
177         showString "ThreadId " . 
178         showsPrec d (getThreadId (id2TSO t))
179
180 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
181
182 id2TSO :: ThreadId -> ThreadId#
183 id2TSO (ThreadId t) = t
184
185 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
186 -- Returns -1, 0, 1
187
188 cmpThread :: ThreadId -> ThreadId -> Ordering
189 cmpThread t1 t2 = 
190    case cmp_thread (id2TSO t1) (id2TSO t2) of
191       -1 -> LT
192       0  -> EQ
193       _  -> GT -- must be 1
194
195 instance Eq ThreadId where
196    t1 == t2 = 
197       case t1 `cmpThread` t2 of
198          EQ -> True
199          _  -> False
200
201 instance Ord ThreadId where
202    compare = cmpThread
203
204 {- |
205 Sparks off a new thread to run the 'IO' computation passed as the
206 first argument, and returns the 'ThreadId' of the newly created
207 thread.
208
209 The new thread will be a lightweight thread; if you want to use a foreign
210 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
211
212 GHC note: the new thread inherits the /blocked/ state of the parent 
213 (see 'Control.Exception.block').
214
215 The newly created thread has an exception handler that discards the
216 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
217 'ThreadKilled', and passes all other exceptions to the uncaught
218 exception handler (see 'setUncaughtExceptionHandler').
219 -}
220 forkIO :: IO () -> IO ThreadId
221 forkIO action = IO $ \ s -> 
222    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
223  where
224   action_plus = catchException action childHandler
225
226 {- |
227 Like 'forkIO', but lets you specify on which CPU the thread is
228 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
229 will stay on the same CPU for its entire lifetime (`forkIO` threads
230 can migrate between CPUs according to the scheduling policy).
231 `forkOnIO` is useful for overriding the scheduling policy when you
232 know in advance how best to distribute the threads.
233
234 The `Int` argument specifies the CPU number; it is interpreted modulo
235 'numCapabilities' (note that it actually specifies a capability number
236 rather than a CPU number, but to a first approximation the two are
237 equivalent).
238 -}
239 forkOnIO :: Int -> IO () -> IO ThreadId
240 forkOnIO (I# cpu) action = IO $ \ s -> 
241    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
242  where
243   action_plus = catchException action childHandler
244
245 -- | the value passed to the @+RTS -N@ flag.  This is the number of
246 -- Haskell threads that can run truly simultaneously at any given
247 -- time, and is typically set to the number of physical CPU cores on
248 -- the machine.
249 numCapabilities :: Int
250 numCapabilities = unsafePerformIO $  do 
251                     n <- peek n_capabilities
252                     return (fromIntegral n)
253
254 #if defined(mingw32_HOST_OS) && defined(__PIC__)
255 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
256 #else
257 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
258 #endif
259 childHandler :: SomeException -> IO ()
260 childHandler err = catchException (real_handler err) childHandler
261
262 real_handler :: SomeException -> IO ()
263 real_handler se@(SomeException ex) =
264   -- ignore thread GC and killThread exceptions:
265   case cast ex of
266   Just BlockedOnDeadMVar                -> return ()
267   _ -> case cast ex of
268        Just BlockedIndefinitely         -> return ()
269        _ -> case cast ex of
270             Just ThreadKilled           -> return ()
271             _ -> case cast ex of
272                  -- report all others:
273                  Just StackOverflow     -> reportStackOverflow
274                  _                      -> reportError se
275
276 {- | 'killThread' terminates the given thread (GHC only).
277 Any work already done by the thread isn\'t
278 lost: the computation is suspended until required by another thread.
279 The memory used by the thread will be garbage collected if it isn\'t
280 referenced from anywhere.  The 'killThread' function is defined in
281 terms of 'throwTo':
282
283 > killThread tid = throwTo tid ThreadKilled
284
285 Killthread is a no-op if the target thread has already completed.
286 -}
287 killThread :: ThreadId -> IO ()
288 killThread tid = throwTo tid ThreadKilled
289
290 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
291
292 'throwTo' does not return until the exception has been raised in the
293 target thread. 
294 The calling thread can thus be certain that the target
295 thread has received the exception.  This is a useful property to know
296 when dealing with race conditions: eg. if there are two threads that
297 can kill each other, it is guaranteed that only one of the threads
298 will get to kill the other.
299
300 If the target thread is currently making a foreign call, then the
301 exception will not be raised (and hence 'throwTo' will not return)
302 until the call has completed.  This is the case regardless of whether
303 the call is inside a 'block' or not.
304
305 Important note: the behaviour of 'throwTo' differs from that described in
306 the paper \"Asynchronous exceptions in Haskell\"
307 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
308 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
309 a more synchronous design in which 'throwTo' does not return until the exception
310 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
311 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
312 the paper).
313
314 There is currently no guarantee that the exception delivered by 'throwTo' will be
315 delivered at the first possible opportunity.  In particular, a thread may 
316 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
317 a pending 'throwTo'.  This is arguably undesirable behaviour.
318
319  -}
320 throwTo :: Exception e => ThreadId -> e -> IO ()
321 throwTo (ThreadId tid) ex = IO $ \ s ->
322    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
323
324 -- | Returns the 'ThreadId' of the calling thread (GHC only).
325 myThreadId :: IO ThreadId
326 myThreadId = IO $ \s ->
327    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
328
329
330 -- |The 'yield' action allows (forces, in a co-operative multitasking
331 -- implementation) a context-switch to any other currently runnable
332 -- threads (if any), and is occasionally useful when implementing
333 -- concurrency abstractions.
334 yield :: IO ()
335 yield = IO $ \s -> 
336    case (yield# s) of s1 -> (# s1, () #)
337
338 {- | 'labelThread' stores a string as identifier for this thread if
339 you built a RTS with debugging support. This identifier will be used in
340 the debugging output to make distinction of different threads easier
341 (otherwise you only have the thread state object\'s address in the heap).
342
343 Other applications like the graphical Concurrent Haskell Debugger
344 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
345 'labelThread' for their purposes as well.
346 -}
347
348 labelThread :: ThreadId -> String -> IO ()
349 labelThread (ThreadId t) str = IO $ \ s ->
350    let !ps  = packCString# str
351        !adr = byteArrayContents# ps in
352      case (labelThread# t adr s) of s1 -> (# s1, () #)
353
354 --      Nota Bene: 'pseq' used to be 'seq'
355 --                 but 'seq' is now defined in PrelGHC
356 --
357 -- "pseq" is defined a bit weirdly (see below)
358 --
359 -- The reason for the strange "lazy" call is that
360 -- it fools the compiler into thinking that pseq  and par are non-strict in
361 -- their second argument (even if it inlines pseq at the call site).
362 -- If it thinks pseq is strict in "y", then it often evaluates
363 -- "y" before "x", which is totally wrong.  
364
365 {-# INLINE pseq  #-}
366 pseq :: a -> b -> b
367 pseq  x y = x `seq` lazy y
368
369 {-# INLINE par  #-}
370 par :: a -> b -> b
371 par  x y = case (par# x) of { _ -> lazy y }
372
373 -- | Internal function used by the RTS to run sparks.
374 runSparks :: IO ()
375 runSparks = IO loop
376   where loop s = case getSpark# s of
377                    (# s', n, p #) ->
378                       if n ==# 0# then (# s', () #)
379                                   else p `seq` loop s'
380
381 data BlockReason
382   = BlockedOnMVar
383         -- ^blocked on on 'MVar'
384   | BlockedOnBlackHole
385         -- ^blocked on a computation in progress by another thread
386   | BlockedOnException
387         -- ^blocked in 'throwTo'
388   | BlockedOnSTM
389         -- ^blocked in 'retry' in an STM transaction
390   | BlockedOnForeignCall
391         -- ^currently in a foreign call
392   | BlockedOnOther
393         -- ^blocked on some other resource.  Without @-threaded@,
394         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
395         -- they show up as 'BlockedOnMVar'.
396   deriving (Eq,Ord,Show)
397
398 -- | The current status of a thread
399 data ThreadStatus
400   = ThreadRunning
401         -- ^the thread is currently runnable or running
402   | ThreadFinished
403         -- ^the thread has finished
404   | ThreadBlocked  BlockReason
405         -- ^the thread is blocked on some resource
406   | ThreadDied
407         -- ^the thread received an uncaught exception
408   deriving (Eq,Ord,Show)
409
410 threadStatus :: ThreadId -> IO ThreadStatus
411 threadStatus (ThreadId t) = IO $ \s ->
412    case threadStatus# t s of
413      (# s', stat #) -> (# s', mk_stat (I# stat) #)
414    where
415         -- NB. keep these in sync with includes/Constants.h
416      mk_stat 0  = ThreadRunning
417      mk_stat 1  = ThreadBlocked BlockedOnMVar
418      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
419      mk_stat 3  = ThreadBlocked BlockedOnException
420      mk_stat 7  = ThreadBlocked BlockedOnSTM
421      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
422      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
423      mk_stat 16 = ThreadFinished
424      mk_stat 17 = ThreadDied
425      mk_stat _  = ThreadBlocked BlockedOnOther
426 \end{code}
427
428
429 %************************************************************************
430 %*                                                                      *
431 \subsection[stm]{Transactional heap operations}
432 %*                                                                      *
433 %************************************************************************
434
435 TVars are shared memory locations which support atomic memory
436 transactions.
437
438 \begin{code}
439 -- |A monad supporting atomic memory transactions.
440 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
441
442 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
443 unSTM (STM a) = a
444
445 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
446
447 instance  Functor STM where
448    fmap f x = x >>= (return . f)
449
450 instance  Monad STM  where
451     {-# INLINE return #-}
452     {-# INLINE (>>)   #-}
453     {-# INLINE (>>=)  #-}
454     m >> k      = thenSTM m k
455     return x    = returnSTM x
456     m >>= k     = bindSTM m k
457
458 bindSTM :: STM a -> (a -> STM b) -> STM b
459 bindSTM (STM m) k = STM ( \s ->
460   case m s of 
461     (# new_s, a #) -> unSTM (k a) new_s
462   )
463
464 thenSTM :: STM a -> STM b -> STM b
465 thenSTM (STM m) k = STM ( \s ->
466   case m s of 
467     (# new_s, _ #) -> unSTM k new_s
468   )
469
470 returnSTM :: a -> STM a
471 returnSTM x = STM (\s -> (# s, x #))
472
473 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
474 -- dangerous thing to do.  
475 --
476 --   * The STM implementation will often run transactions multiple
477 --     times, so you need to be prepared for this if your IO has any
478 --     side effects.
479 --
480 --   * The STM implementation will abort transactions that are known to
481 --     be invalid and need to be restarted.  This may happen in the middle
482 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
483 --     that need releasing (exception handlers are ignored when aborting
484 --     the transaction).  That includes doing any IO using Handles, for
485 --     example.  Getting this wrong will probably lead to random deadlocks.
486 --
487 --   * The transaction may have seen an inconsistent view of memory when
488 --     the IO runs.  Invariants that you expect to be true throughout
489 --     your program may not be true inside a transaction, due to the
490 --     way transactions are implemented.  Normally this wouldn't be visible
491 --     to the programmer, but using `unsafeIOToSTM` can expose it.
492 --
493 unsafeIOToSTM :: IO a -> STM a
494 unsafeIOToSTM (IO m) = STM m
495
496 -- |Perform a series of STM actions atomically.
497 --
498 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
499 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
500 -- this would effectively allow a transaction inside a transaction, depending
501 -- on exactly when the thunk is evaluated.)
502 --
503 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
504 -- and which allows top-level TVars to be allocated.
505
506 atomically :: STM a -> IO a
507 atomically (STM m) = IO (\s -> (atomically# m) s )
508
509 -- |Retry execution of the current memory transaction because it has seen
510 -- values in TVars which mean that it should not continue (e.g. the TVars
511 -- represent a shared buffer that is now empty).  The implementation may
512 -- block the thread until one of the TVars that it has read from has been
513 -- udpated. (GHC only)
514 retry :: STM a
515 retry = STM $ \s# -> retry# s#
516
517 -- |Compose two alternative STM actions (GHC only).  If the first action
518 -- completes without retrying then it forms the result of the orElse.
519 -- Otherwise, if the first action retries, then the second action is
520 -- tried in its place.  If both actions retry then the orElse as a
521 -- whole retries.
522 orElse :: STM a -> STM a -> STM a
523 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
524
525 -- |Exception handling within STM actions.
526 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
527 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
528
529 -- | Low-level primitive on which always and alwaysSucceeds are built.
530 -- checkInv differs form these in that (i) the invariant is not 
531 -- checked when checkInv is called, only at the end of this and
532 -- subsequent transcations, (ii) the invariant failure is indicated
533 -- by raising an exception.
534 checkInv :: STM a -> STM ()
535 checkInv (STM m) = STM (\s -> (check# m) s)
536
537 -- | alwaysSucceeds adds a new invariant that must be true when passed
538 -- to alwaysSucceeds, at the end of the current transaction, and at
539 -- the end of every subsequent transaction.  If it fails at any
540 -- of those points then the transaction violating it is aborted
541 -- and the exception raised by the invariant is propagated.
542 alwaysSucceeds :: STM a -> STM ()
543 alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) 
544                       checkInv i
545
546 -- | always is a variant of alwaysSucceeds in which the invariant is
547 -- expressed as an STM Bool action that must return True.  Returning
548 -- False or raising an exception are both treated as invariant failures.
549 always :: STM Bool -> STM ()
550 always i = alwaysSucceeds ( do v <- i
551                                if (v) then return () else ( error "Transacional invariant violation" ) )
552
553 -- |Shared memory locations that support atomic memory transactions.
554 data TVar a = TVar (TVar# RealWorld a)
555
556 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
557
558 instance Eq (TVar a) where
559         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
560
561 -- |Create a new TVar holding a value supplied
562 newTVar :: a -> STM (TVar a)
563 newTVar val = STM $ \s1# ->
564     case newTVar# val s1# of
565          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
566
567 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
568 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
569 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
570 -- possible.
571 newTVarIO :: a -> IO (TVar a)
572 newTVarIO val = IO $ \s1# ->
573     case newTVar# val s1# of
574          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
575
576 -- |Return the current value stored in a TVar.
577 -- This is equivalent to
578 --
579 -- >  readTVarIO = atomically . readTVar
580 --
581 -- but works much faster, because it doesn't perform a complete
582 -- transaction, it just reads the current value of the 'TVar'.
583 readTVarIO :: TVar a -> IO a
584 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
585
586 -- |Return the current value stored in a TVar
587 readTVar :: TVar a -> STM a
588 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
589
590 -- |Write the supplied value into a TVar
591 writeTVar :: TVar a -> a -> STM ()
592 writeTVar (TVar tvar#) val = STM $ \s1# ->
593     case writeTVar# tvar# val s1# of
594          s2# -> (# s2#, () #)
595   
596 \end{code}
597
598 MVar utilities
599
600 \begin{code}
601 withMVar :: MVar a -> (a -> IO b) -> IO b
602 withMVar m io = 
603   block $ do
604     a <- takeMVar m
605     b <- catchAny (unblock (io a))
606             (\e -> do putMVar m a; throw e)
607     putMVar m a
608     return b
609 \end{code}
610
611 %************************************************************************
612 %*                                                                      *
613 \subsection{Thread waiting}
614 %*                                                                      *
615 %************************************************************************
616
617 \begin{code}
618 #ifdef mingw32_HOST_OS
619
620 -- Note: threadWaitRead and threadWaitWrite aren't really functional
621 -- on Win32, but left in there because lib code (still) uses them (the manner
622 -- in which they're used doesn't cause problems on a Win32 platform though.)
623
624 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
625 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
626   IO $ \s -> case asyncRead# fd isSock len buf s of 
627                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
628
629 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
630 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
631   IO $ \s -> case asyncWrite# fd isSock len buf s of 
632                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
633
634 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
635 asyncDoProc (FunPtr proc) (Ptr param) = 
636     -- the 'length' value is ignored; simplifies implementation of
637     -- the async*# primops to have them all return the same result.
638   IO $ \s -> case asyncDoProc# proc param s  of 
639                (# s', _len#, err# #) -> (# s', I# err# #)
640
641 -- to aid the use of these primops by the IO Handle implementation,
642 -- provide the following convenience funs:
643
644 -- this better be a pinned byte array!
645 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
646 asyncReadBA fd isSock len off bufB = 
647   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
648   
649 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
650 asyncWriteBA fd isSock len off bufB = 
651   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
652
653 #endif
654
655 -- -----------------------------------------------------------------------------
656 -- Thread IO API
657
658 -- | Block the current thread until data is available to read on the
659 -- given file descriptor (GHC only).
660 threadWaitRead :: Fd -> IO ()
661 threadWaitRead fd
662 #ifndef mingw32_HOST_OS
663   | threaded  = waitForReadEvent fd
664 #endif
665   | otherwise = IO $ \s -> 
666         case fromIntegral fd of { I# fd# ->
667         case waitRead# fd# s of { s' -> (# s', () #)
668         }}
669
670 -- | Block the current thread until data can be written to the
671 -- given file descriptor (GHC only).
672 threadWaitWrite :: Fd -> IO ()
673 threadWaitWrite fd
674 #ifndef mingw32_HOST_OS
675   | threaded  = waitForWriteEvent fd
676 #endif
677   | otherwise = IO $ \s -> 
678         case fromIntegral fd of { I# fd# ->
679         case waitWrite# fd# s of { s' -> (# s', () #)
680         }}
681
682 -- | Suspends the current thread for a given number of microseconds
683 -- (GHC only).
684 --
685 -- There is no guarantee that the thread will be rescheduled promptly
686 -- when the delay has expired, but the thread will never continue to
687 -- run /earlier/ than specified.
688 --
689 threadDelay :: Int -> IO ()
690 threadDelay time
691   | threaded  = waitForDelayEvent time
692   | otherwise = IO $ \s -> 
693         case fromIntegral time of { I# time# ->
694         case delay# time# s of { s' -> (# s', () #)
695         }}
696
697
698 -- | Set the value of returned TVar to True after a given number of
699 -- microseconds. The caveats associated with threadDelay also apply.
700 --
701 registerDelay :: Int -> IO (TVar Bool)
702 registerDelay usecs 
703   | threaded = waitForDelayEventSTM usecs
704   | otherwise = error "registerDelay: requires -threaded"
705
706 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
707
708 waitForDelayEvent :: Int -> IO ()
709 waitForDelayEvent usecs = do
710   m <- newEmptyMVar
711   target <- calculateTarget usecs
712   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
713   prodServiceThread
714   takeMVar m
715
716 -- Delays for use in STM
717 waitForDelayEventSTM :: Int -> IO (TVar Bool)
718 waitForDelayEventSTM usecs = do
719    t <- atomically $ newTVar False
720    target <- calculateTarget usecs
721    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
722    prodServiceThread
723    return t  
724     
725 calculateTarget :: Int -> IO USecs
726 calculateTarget usecs = do
727     now <- getUSecOfDay
728     return $ now + (fromIntegral usecs)
729
730
731 -- ----------------------------------------------------------------------------
732 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
733
734 -- In the threaded RTS, we employ a single IO Manager thread to wait
735 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
736 -- and delays (threadDelay).  
737 --
738 -- We can do this because in the threaded RTS the IO Manager can make
739 -- a non-blocking call to select(), so we don't have to do select() in
740 -- the scheduler as we have to in the non-threaded RTS.  We get performance
741 -- benefits from doing it this way, because we only have to restart the select()
742 -- when a new request arrives, rather than doing one select() each time
743 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
744 -- by not having to check for completed IO requests.
745
746 -- Issues, possible problems:
747 --
748 --      - we might want bound threads to just do the blocking
749 --        operation rather than communicating with the IO manager
750 --        thread.  This would prevent simgle-threaded programs which do
751 --        IO from requiring multiple OS threads.  However, it would also
752 --        prevent bound threads waiting on IO from being killed or sent
753 --        exceptions.
754 --
755 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
756 --        I couldn't repeat this.
757 --
758 --      - How do we handle signal delivery in the multithreaded RTS?
759 --
760 --      - forkProcess will kill the IO manager thread.  Let's just
761 --        hope we don't need to do any blocking IO between fork & exec.
762
763 #ifndef mingw32_HOST_OS
764 data IOReq
765   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
766   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
767 #endif
768
769 data DelayReq
770   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
771   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
772
773 #ifndef mingw32_HOST_OS
774 pendingEvents :: IORef [IOReq]
775 #endif
776 pendingDelays :: IORef [DelayReq]
777         -- could use a strict list or array here
778 {-# NOINLINE pendingEvents #-}
779 {-# NOINLINE pendingDelays #-}
780 (pendingEvents,pendingDelays) = unsafePerformIO $ do
781   startIOManagerThread
782   reqs <- newIORef []
783   dels <- newIORef []
784   return (reqs, dels)
785         -- the first time we schedule an IO request, the service thread
786         -- will be created (cool, huh?)
787
788 ensureIOManagerIsRunning :: IO ()
789 ensureIOManagerIsRunning 
790   | threaded  = seq pendingEvents $ return ()
791   | otherwise = return ()
792
793 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
794 insertDelay d [] = [d]
795 insertDelay d1 ds@(d2 : rest)
796   | delayTime d1 <= delayTime d2 = d1 : ds
797   | otherwise                    = d2 : insertDelay d1 rest
798
799 delayTime :: DelayReq -> USecs
800 delayTime (Delay t _) = t
801 delayTime (DelaySTM t _) = t
802
803 type USecs = Word64
804
805 foreign import ccall unsafe "getUSecOfDay" 
806   getUSecOfDay :: IO USecs
807
808 prodding :: IORef Bool
809 {-# NOINLINE prodding #-}
810 prodding = unsafePerformIO (newIORef False)
811
812 prodServiceThread :: IO ()
813 prodServiceThread = do
814   was_set <- atomicModifyIORef prodding (\a -> (True,a))
815   if (not (was_set)) then wakeupIOManager else return ()
816
817 #ifdef mingw32_HOST_OS
818 -- ----------------------------------------------------------------------------
819 -- Windows IO manager thread
820
821 startIOManagerThread :: IO ()
822 startIOManagerThread = do
823   wakeup <- c_getIOManagerEvent
824   forkIO $ service_loop wakeup []
825   return ()
826
827 service_loop :: HANDLE          -- read end of pipe
828              -> [DelayReq]      -- current delay requests
829              -> IO ()
830
831 service_loop wakeup old_delays = do
832   -- pick up new delay requests
833   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
834   let  delays = foldr insertDelay old_delays new_delays
835
836   now <- getUSecOfDay
837   (delays', timeout) <- getDelay now delays
838
839   r <- c_WaitForSingleObject wakeup timeout
840   case r of
841     0xffffffff -> do c_maperrno; throwErrno "service_loop"
842     0 -> do
843         r2 <- c_readIOManagerEvent
844         exit <- 
845               case r2 of
846                 _ | r2 == io_MANAGER_WAKEUP -> return False
847                 _ | r2 == io_MANAGER_DIE    -> return True
848                 0 -> return False -- spurious wakeup
849                 _ -> do start_console_handler (r2 `shiftR` 1); return False
850         unless exit $ service_cont wakeup delays'
851
852     _other -> service_cont wakeup delays' -- probably timeout        
853
854 service_cont :: HANDLE -> [DelayReq] -> IO ()
855 service_cont wakeup delays = do
856   r <- atomicModifyIORef prodding (\_ -> (False,False))
857   r `seq` return () -- avoid space leak
858   service_loop wakeup delays
859
860 -- must agree with rts/win32/ThrIOManager.c
861 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
862 io_MANAGER_WAKEUP = 0xffffffff
863 io_MANAGER_DIE    = 0xfffffffe
864
865 data ConsoleEvent
866  = ControlC
867  | Break
868  | Close
869     -- these are sent to Services only.
870  | Logoff
871  | Shutdown
872  deriving (Eq, Ord, Enum, Show, Read, Typeable)
873
874 start_console_handler :: Word32 -> IO ()
875 start_console_handler r =
876   case toWin32ConsoleEvent r of
877      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
878                     forkIO (handler x)
879                     return ()
880      Nothing -> return ()
881
882 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
883 toWin32ConsoleEvent ev = 
884    case ev of
885        0 {- CTRL_C_EVENT-}        -> Just ControlC
886        1 {- CTRL_BREAK_EVENT-}    -> Just Break
887        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
888        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
889        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
890        _ -> Nothing
891
892 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
893 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
894
895 -- XXX Is this actually needed?
896 stick :: IORef HANDLE
897 {-# NOINLINE stick #-}
898 stick = unsafePerformIO (newIORef nullPtr)
899
900 wakeupIOManager :: IO ()
901 wakeupIOManager = do 
902   _hdl <- readIORef stick
903   c_sendIOManagerEvent io_MANAGER_WAKEUP
904
905 -- Walk the queue of pending delays, waking up any that have passed
906 -- and return the smallest delay to wait for.  The queue of pending
907 -- delays is kept ordered.
908 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
909 getDelay _   [] = return ([], iNFINITE)
910 getDelay now all@(d : rest) 
911   = case d of
912      Delay time m | now >= time -> do
913         putMVar m ()
914         getDelay now rest
915      DelaySTM time t | now >= time -> do
916         atomically $ writeTVar t True
917         getDelay now rest
918      _otherwise ->
919         -- delay is in millisecs for WaitForSingleObject
920         let micro_seconds = delayTime d - now
921             milli_seconds = (micro_seconds + 999) `div` 1000
922         in return (all, fromIntegral milli_seconds)
923
924 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
925 -- available yet.  We should move some Win32 functionality down here,
926 -- maybe as part of the grand reorganisation of the base package...
927 type HANDLE       = Ptr ()
928 type DWORD        = Word32
929
930 iNFINITE :: DWORD
931 iNFINITE = 0xFFFFFFFF -- urgh
932
933 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
934   c_getIOManagerEvent :: IO HANDLE
935
936 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
937   c_readIOManagerEvent :: IO Word32
938
939 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
940   c_sendIOManagerEvent :: Word32 -> IO ()
941
942 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
943    c_maperrno :: IO ()
944
945 foreign import stdcall "WaitForSingleObject"
946    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
947
948 #else
949 -- ----------------------------------------------------------------------------
950 -- Unix IO manager thread, using select()
951
952 startIOManagerThread :: IO ()
953 startIOManagerThread = do
954         allocaArray 2 $ \fds -> do
955         throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
956         rd_end <- peekElemOff fds 0
957         wr_end <- peekElemOff fds 1
958         setNonBlockingFD wr_end True -- writes happen in a signal handler, we
959                                      -- don't want them to block.
960         setCloseOnExec rd_end
961         setCloseOnExec wr_end
962         writeIORef stick (fromIntegral wr_end)
963         c_setIOManagerPipe wr_end
964         _ <- forkIO $ do
965             allocaBytes sizeofFdSet   $ \readfds -> do
966             allocaBytes sizeofFdSet   $ \writefds -> do 
967             allocaBytes sizeofTimeVal $ \timeval -> do
968             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
969         return ()
970
971 service_loop
972    :: Fd                -- listen to this for wakeup calls
973    -> Ptr CFdSet
974    -> Ptr CFdSet
975    -> Ptr CTimeVal
976    -> [IOReq]
977    -> [DelayReq]
978    -> IO ()
979 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
980
981   -- pick up new IO requests
982   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
983   let reqs = new_reqs ++ old_reqs
984
985   -- pick up new delay requests
986   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
987   let  delays0 = foldr insertDelay old_delays new_delays
988
989   -- build the FDSets for select()
990   fdZero readfds
991   fdZero writefds
992   fdSet wakeup readfds
993   maxfd <- buildFdSets 0 readfds writefds reqs
994
995   -- perform the select()
996   let do_select delays = do
997           -- check the current time and wake up any thread in
998           -- threadDelay whose timeout has expired.  Also find the
999           -- timeout value for the select() call.
1000           now <- getUSecOfDay
1001           (delays', timeout) <- getDelay now ptimeval delays
1002
1003           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1004                         nullPtr timeout
1005           if (res == -1)
1006              then do
1007                 err <- getErrno
1008                 case err of
1009                   _ | err == eINTR ->  do_select delays'
1010                         -- EINTR: just redo the select()
1011                   _ | err == eBADF ->  return (True, delays)
1012                         -- EBADF: one of the file descriptors is closed or bad,
1013                         -- we don't know which one, so wake everyone up.
1014                   _ | otherwise    ->  throwErrno "select"
1015                         -- otherwise (ENOMEM or EINVAL) something has gone
1016                         -- wrong; report the error.
1017              else
1018                 return (False,delays')
1019
1020   (wakeup_all,delays') <- do_select delays0
1021
1022   exit <-
1023     if wakeup_all then return False
1024       else do
1025         b <- fdIsSet wakeup readfds
1026         if b == 0 
1027           then return False
1028           else alloca $ \p -> do 
1029                  warnErrnoIfMinus1_ "service_loop" $
1030                      c_read (fromIntegral wakeup) p 1
1031                  s <- peek p            
1032                  case s of
1033                   _ | s == io_MANAGER_WAKEUP -> return False
1034                   _ | s == io_MANAGER_DIE    -> return True
1035                   _ | s == io_MANAGER_SYNC   -> do
1036                        mvars <- readIORef sync
1037                        mapM_ (flip putMVar ()) mvars
1038                        return False
1039                   _ -> do
1040                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1041                        withForeignPtr fp $ \p_siginfo -> do
1042                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1043                                  sizeof_siginfo_t
1044                          when (r /= fromIntegral sizeof_siginfo_t) $
1045                             error "failed to read siginfo_t"
1046                        runHandlers' fp (fromIntegral s)
1047                        return False
1048
1049   unless exit $ do
1050
1051   atomicModifyIORef prodding (\_ -> (False, ()))
1052
1053   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1054                          else completeRequests reqs readfds writefds []
1055
1056   service_loop wakeup readfds writefds ptimeval reqs' delays'
1057
1058 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1059 io_MANAGER_WAKEUP = 0xff
1060 io_MANAGER_DIE    = 0xfe
1061 io_MANAGER_SYNC   = 0xfd
1062
1063 -- | the stick is for poking the IO manager with
1064 stick :: IORef Fd
1065 {-# NOINLINE stick #-}
1066 stick = unsafePerformIO (newIORef 0)
1067
1068 {-# NOINLINE sync #-}
1069 sync :: IORef [MVar ()]
1070 sync = unsafePerformIO (newIORef [])
1071
1072 -- waits for the IO manager to drain the pipe
1073 syncIOManager :: IO ()
1074 syncIOManager = do
1075   m <- newEmptyMVar
1076   atomicModifyIORef sync (\old -> (m:old,()))
1077   fd <- readIORef stick
1078   with io_MANAGER_SYNC $ \pbuf -> do 
1079     warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
1080   takeMVar m
1081
1082 wakeupIOManager :: IO ()
1083 wakeupIOManager = do
1084   fd <- readIORef stick
1085   with io_MANAGER_WAKEUP $ \pbuf -> do 
1086     warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
1087
1088 -- For the non-threaded RTS
1089 runHandlers :: Ptr Word8 -> Int -> IO ()
1090 runHandlers p_info sig = do
1091   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1092   withForeignPtr fp $ \p -> do
1093     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1094     free p_info
1095   runHandlers' fp (fromIntegral sig)
1096
1097 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1098 runHandlers' p_info sig = do
1099   let int = fromIntegral sig
1100   withMVar signal_handlers $ \arr ->
1101       if not (inRange (boundsIOArray arr) int)
1102          then return ()
1103          else do handler <- unsafeReadIOArray arr int
1104                  case handler of
1105                     Nothing -> return ()
1106                     Just (f,_)  -> do _ <- forkIO (f p_info)
1107                                       return ()
1108
1109 foreign import ccall "setIOManagerPipe"
1110   c_setIOManagerPipe :: CInt -> IO ()
1111
1112 foreign import ccall "__hscore_sizeof_siginfo_t"
1113   sizeof_siginfo_t :: CSize
1114
1115 type Signal = CInt
1116
1117 maxSig = 64 :: Int
1118
1119 type HandlerFun = ForeignPtr Word8 -> IO ()
1120
1121 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1122 -- this race condition is #1922, although that bug was on Windows a similar
1123 -- bug also exists on Unix.
1124 {-# NOINLINE signal_handlers #-}
1125 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1126 signal_handlers = unsafePerformIO $ do
1127    arr <- newIOArray (0,maxSig) Nothing
1128    m <- newMVar arr
1129    block $ do
1130      stable_ref <- newStablePtr m
1131      let ref = castStablePtrToPtr stable_ref
1132      ref2 <- getOrSetSignalHandlerStore ref
1133      if ref==ref2
1134         then return m
1135         else do freeStablePtr stable_ref
1136                 deRefStablePtr (castPtrToStablePtr ref2)
1137
1138 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1139     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1140
1141 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1142 setHandler sig handler = do
1143   let int = fromIntegral sig
1144   withMVar signal_handlers $ \arr -> 
1145      if not (inRange (boundsIOArray arr) int)
1146         then error "GHC.Conc.setHandler: signal out of range"
1147         else do old <- unsafeReadIOArray arr int
1148                 unsafeWriteIOArray arr int handler
1149                 return old
1150
1151 -- -----------------------------------------------------------------------------
1152 -- IO requests
1153
1154 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1155 buildFdSets maxfd _       _        [] = return maxfd
1156 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1157   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1158   | otherwise        =  do
1159         fdSet fd readfds
1160         buildFdSets (max maxfd fd) readfds writefds reqs
1161 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1162   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1163   | otherwise        =  do
1164         fdSet fd writefds
1165         buildFdSets (max maxfd fd) readfds writefds reqs
1166
1167 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1168                  -> IO [IOReq]
1169 completeRequests [] _ _ reqs' = return reqs'
1170 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1171   b <- fdIsSet fd readfds
1172   if b /= 0
1173     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1174     else completeRequests reqs readfds writefds (Read fd m : reqs')
1175 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1176   b <- fdIsSet fd writefds
1177   if b /= 0
1178     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1179     else completeRequests reqs readfds writefds (Write fd m : reqs')
1180
1181 wakeupAll :: [IOReq] -> IO ()
1182 wakeupAll [] = return ()
1183 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1184 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1185
1186 waitForReadEvent :: Fd -> IO ()
1187 waitForReadEvent fd = do
1188   m <- newEmptyMVar
1189   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1190   prodServiceThread
1191   takeMVar m
1192
1193 waitForWriteEvent :: Fd -> IO ()
1194 waitForWriteEvent fd = do
1195   m <- newEmptyMVar
1196   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1197   prodServiceThread
1198   takeMVar m
1199
1200 -- -----------------------------------------------------------------------------
1201 -- Delays
1202
1203 -- Walk the queue of pending delays, waking up any that have passed
1204 -- and return the smallest delay to wait for.  The queue of pending
1205 -- delays is kept ordered.
1206 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1207 getDelay _   _        [] = return ([],nullPtr)
1208 getDelay now ptimeval all@(d : rest) 
1209   = case d of
1210      Delay time m | now >= time -> do
1211         putMVar m ()
1212         getDelay now ptimeval rest
1213      DelaySTM time t | now >= time -> do
1214         atomically $ writeTVar t True
1215         getDelay now ptimeval rest
1216      _otherwise -> do
1217         setTimevalTicks ptimeval (delayTime d - now)
1218         return (all,ptimeval)
1219
1220 data CTimeVal
1221
1222 foreign import ccall unsafe "sizeofTimeVal"
1223   sizeofTimeVal :: Int
1224
1225 foreign import ccall unsafe "setTimevalTicks" 
1226   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1227
1228 {- 
1229   On Win32 we're going to have a single Pipe, and a
1230   waitForSingleObject with the delay time.  For signals, we send a
1231   byte down the pipe just like on Unix.
1232 -}
1233
1234 -- ----------------------------------------------------------------------------
1235 -- select() interface
1236
1237 -- ToDo: move to System.Posix.Internals?
1238
1239 data CFdSet
1240
1241 foreign import ccall safe "select"
1242   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1243            -> IO CInt
1244
1245 foreign import ccall unsafe "hsFD_SETSIZE"
1246   c_fD_SETSIZE :: CInt
1247
1248 fD_SETSIZE :: Fd
1249 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1250
1251 foreign import ccall unsafe "hsFD_ISSET"
1252   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1253
1254 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1255 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1256
1257 foreign import ccall unsafe "hsFD_SET"
1258   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1259
1260 fdSet :: Fd -> Ptr CFdSet -> IO ()
1261 fdSet (Fd fd) fdset = c_fdSet fd fdset
1262
1263 foreign import ccall unsafe "hsFD_ZERO"
1264   fdZero :: Ptr CFdSet -> IO ()
1265
1266 foreign import ccall unsafe "sizeof_fd_set"
1267   sizeofFdSet :: Int
1268
1269 #endif
1270
1271 reportStackOverflow :: IO ()
1272 reportStackOverflow = callStackOverflowHook
1273
1274 reportError :: SomeException -> IO ()
1275 reportError ex = do
1276    handler <- getUncaughtExceptionHandler
1277    handler ex
1278
1279 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1280 -- the unsafe below.
1281 foreign import ccall unsafe "stackOverflow"
1282         callStackOverflowHook :: IO ()
1283
1284 {-# NOINLINE uncaughtExceptionHandler #-}
1285 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1286 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1287    where
1288       defaultHandler :: SomeException -> IO ()
1289       defaultHandler se@(SomeException ex) = do
1290          (hFlush stdout) `catchAny` (\ _ -> return ())
1291          let msg = case cast ex of
1292                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1293                _ -> case cast ex of
1294                     Just (ErrorCall s) -> s
1295                     _                  -> showsPrec 0 se ""
1296          withCString "%s" $ \cfmt ->
1297           withCString msg $ \cmsg ->
1298             errorBelch cfmt cmsg
1299
1300 -- don't use errorBelch() directly, because we cannot call varargs functions
1301 -- using the FFI.
1302 foreign import ccall unsafe "HsBase.h errorBelch2"
1303    errorBelch :: CString -> CString -> IO ()
1304
1305 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1306 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1307
1308 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1309 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1310
1311 warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
1312 warnErrnoIfMinus1_ what io
1313     = do r <- io
1314          when (r == -1) $ do
1315              errno <- getErrno
1316              str <- strerror errno >>= peekCString
1317              when (r == -1) $
1318                  debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
1319
1320 foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
1321
1322 \end{code}