Remove unused imports
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * TVars
54         , STM(..)
55         , atomically    -- :: STM a -> IO a
56         , retry         -- :: STM a
57         , orElse        -- :: STM a -> STM a -> STM a
58         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
59         , alwaysSucceeds -- :: STM a -> STM ()
60         , always        -- :: STM Bool -> STM ()
61         , TVar(..)
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , readTVarIO    -- :: TVar a -> IO a
66         , writeTVar     -- :: a -> TVar a -> STM ()
67         , unsafeIOToSTM -- :: IO a -> STM a
68
69         -- * Miscellaneous
70         , withMVar
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , Signal, HandlerFun, setHandler, runHandlers
82 #endif
83
84         , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
86         , syncIOManager
87 #endif
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
95         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
96
97         , reportError, reportStackOverflow
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifndef mingw32_HOST_OS
108 import Data.Dynamic
109 import Control.Monad
110 #endif
111 import Data.Maybe
112
113 import GHC.Base
114 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
115 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
116 import GHC.IO
117 import GHC.IO.Exception
118 import GHC.Exception
119 import GHC.IORef
120 import GHC.MVar
121 import GHC.Num          ( Num(..) )
122 import GHC.Real         ( fromIntegral )
123 #ifndef mingw32_HOST_OS
124 import GHC.IOArray
125 import GHC.Arr          ( inRange )
126 #endif
127 #ifdef mingw32_HOST_OS
128 import GHC.Real         ( div )
129 import GHC.Ptr          ( FunPtr(..) )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Read         ( Read )
133 import GHC.Enum         ( Enum )
134 #endif
135 import GHC.Pack         ( packCString# )
136 import GHC.Show         ( Show(..), showString )
137 import GHC.Err
138
139 infixr 0 `par`, `pseq`
140 \end{code}
141
142 %************************************************************************
143 %*                                                                      *
144 \subsection{@ThreadId@, @par@, and @fork@}
145 %*                                                                      *
146 %************************************************************************
147
148 \begin{code}
149 data ThreadId = ThreadId ThreadId# deriving( Typeable )
150 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
151 -- But since ThreadId# is unlifted, the Weak type must use open
152 -- type variables.
153 {- ^
154 A 'ThreadId' is an abstract type representing a handle to a thread.
155 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
156 the 'Ord' instance implements an arbitrary total ordering over
157 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
158 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
159 useful when debugging or diagnosing the behaviour of a concurrent
160 program.
161
162 /Note/: in GHC, if you have a 'ThreadId', you essentially have
163 a pointer to the thread itself.  This means the thread itself can\'t be
164 garbage collected until you drop the 'ThreadId'.
165 This misfeature will hopefully be corrected at a later date.
166
167 /Note/: Hugs does not provide any operations on other threads;
168 it defines 'ThreadId' as a synonym for ().
169 -}
170
171 instance Show ThreadId where
172    showsPrec d t = 
173         showString "ThreadId " . 
174         showsPrec d (getThreadId (id2TSO t))
175
176 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
177
178 id2TSO :: ThreadId -> ThreadId#
179 id2TSO (ThreadId t) = t
180
181 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
182 -- Returns -1, 0, 1
183
184 cmpThread :: ThreadId -> ThreadId -> Ordering
185 cmpThread t1 t2 = 
186    case cmp_thread (id2TSO t1) (id2TSO t2) of
187       -1 -> LT
188       0  -> EQ
189       _  -> GT -- must be 1
190
191 instance Eq ThreadId where
192    t1 == t2 = 
193       case t1 `cmpThread` t2 of
194          EQ -> True
195          _  -> False
196
197 instance Ord ThreadId where
198    compare = cmpThread
199
200 {- |
201 Sparks off a new thread to run the 'IO' computation passed as the
202 first argument, and returns the 'ThreadId' of the newly created
203 thread.
204
205 The new thread will be a lightweight thread; if you want to use a foreign
206 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
207
208 GHC note: the new thread inherits the /blocked/ state of the parent 
209 (see 'Control.Exception.block').
210
211 The newly created thread has an exception handler that discards the
212 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
213 'ThreadKilled', and passes all other exceptions to the uncaught
214 exception handler (see 'setUncaughtExceptionHandler').
215 -}
216 forkIO :: IO () -> IO ThreadId
217 forkIO action = IO $ \ s -> 
218    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
219  where
220   action_plus = catchException action childHandler
221
222 {- |
223 Like 'forkIO', but lets you specify on which CPU the thread is
224 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
225 will stay on the same CPU for its entire lifetime (`forkIO` threads
226 can migrate between CPUs according to the scheduling policy).
227 `forkOnIO` is useful for overriding the scheduling policy when you
228 know in advance how best to distribute the threads.
229
230 The `Int` argument specifies the CPU number; it is interpreted modulo
231 'numCapabilities' (note that it actually specifies a capability number
232 rather than a CPU number, but to a first approximation the two are
233 equivalent).
234 -}
235 forkOnIO :: Int -> IO () -> IO ThreadId
236 forkOnIO (I# cpu) action = IO $ \ s -> 
237    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
238  where
239   action_plus = catchException action childHandler
240
241 -- | the value passed to the @+RTS -N@ flag.  This is the number of
242 -- Haskell threads that can run truly simultaneously at any given
243 -- time, and is typically set to the number of physical CPU cores on
244 -- the machine.
245 numCapabilities :: Int
246 numCapabilities = unsafePerformIO $  do 
247                     n <- peek n_capabilities
248                     return (fromIntegral n)
249
250 #if defined(mingw32_HOST_OS) && defined(__PIC__)
251 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
252 #else
253 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
254 #endif
255 childHandler :: SomeException -> IO ()
256 childHandler err = catchException (real_handler err) childHandler
257
258 real_handler :: SomeException -> IO ()
259 real_handler se@(SomeException ex) =
260   -- ignore thread GC and killThread exceptions:
261   case cast ex of
262   Just BlockedOnDeadMVar                -> return ()
263   _ -> case cast ex of
264        Just BlockedIndefinitely         -> return ()
265        _ -> case cast ex of
266             Just ThreadKilled           -> return ()
267             _ -> case cast ex of
268                  -- report all others:
269                  Just StackOverflow     -> reportStackOverflow
270                  _                      -> reportError se
271
272 {- | 'killThread' terminates the given thread (GHC only).
273 Any work already done by the thread isn\'t
274 lost: the computation is suspended until required by another thread.
275 The memory used by the thread will be garbage collected if it isn\'t
276 referenced from anywhere.  The 'killThread' function is defined in
277 terms of 'throwTo':
278
279 > killThread tid = throwTo tid ThreadKilled
280
281 Killthread is a no-op if the target thread has already completed.
282 -}
283 killThread :: ThreadId -> IO ()
284 killThread tid = throwTo tid ThreadKilled
285
286 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
287
288 'throwTo' does not return until the exception has been raised in the
289 target thread. 
290 The calling thread can thus be certain that the target
291 thread has received the exception.  This is a useful property to know
292 when dealing with race conditions: eg. if there are two threads that
293 can kill each other, it is guaranteed that only one of the threads
294 will get to kill the other.
295
296 If the target thread is currently making a foreign call, then the
297 exception will not be raised (and hence 'throwTo' will not return)
298 until the call has completed.  This is the case regardless of whether
299 the call is inside a 'block' or not.
300
301 Important note: the behaviour of 'throwTo' differs from that described in
302 the paper \"Asynchronous exceptions in Haskell\"
303 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
304 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
305 a more synchronous design in which 'throwTo' does not return until the exception
306 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
307 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
308 the paper).
309
310 There is currently no guarantee that the exception delivered by 'throwTo' will be
311 delivered at the first possible opportunity.  In particular, a thread may 
312 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
313 a pending 'throwTo'.  This is arguably undesirable behaviour.
314
315  -}
316 throwTo :: Exception e => ThreadId -> e -> IO ()
317 throwTo (ThreadId tid) ex = IO $ \ s ->
318    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
319
320 -- | Returns the 'ThreadId' of the calling thread (GHC only).
321 myThreadId :: IO ThreadId
322 myThreadId = IO $ \s ->
323    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
324
325
326 -- |The 'yield' action allows (forces, in a co-operative multitasking
327 -- implementation) a context-switch to any other currently runnable
328 -- threads (if any), and is occasionally useful when implementing
329 -- concurrency abstractions.
330 yield :: IO ()
331 yield = IO $ \s -> 
332    case (yield# s) of s1 -> (# s1, () #)
333
334 {- | 'labelThread' stores a string as identifier for this thread if
335 you built a RTS with debugging support. This identifier will be used in
336 the debugging output to make distinction of different threads easier
337 (otherwise you only have the thread state object\'s address in the heap).
338
339 Other applications like the graphical Concurrent Haskell Debugger
340 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
341 'labelThread' for their purposes as well.
342 -}
343
344 labelThread :: ThreadId -> String -> IO ()
345 labelThread (ThreadId t) str = IO $ \ s ->
346    let !ps  = packCString# str
347        !adr = byteArrayContents# ps in
348      case (labelThread# t adr s) of s1 -> (# s1, () #)
349
350 --      Nota Bene: 'pseq' used to be 'seq'
351 --                 but 'seq' is now defined in PrelGHC
352 --
353 -- "pseq" is defined a bit weirdly (see below)
354 --
355 -- The reason for the strange "lazy" call is that
356 -- it fools the compiler into thinking that pseq  and par are non-strict in
357 -- their second argument (even if it inlines pseq at the call site).
358 -- If it thinks pseq is strict in "y", then it often evaluates
359 -- "y" before "x", which is totally wrong.  
360
361 {-# INLINE pseq  #-}
362 pseq :: a -> b -> b
363 pseq  x y = x `seq` lazy y
364
365 {-# INLINE par  #-}
366 par :: a -> b -> b
367 par  x y = case (par# x) of { _ -> lazy y }
368
369 -- | Internal function used by the RTS to run sparks.
370 runSparks :: IO ()
371 runSparks = IO loop
372   where loop s = case getSpark# s of
373                    (# s', n, p #) ->
374                       if n ==# 0# then (# s', () #)
375                                   else p `seq` loop s'
376
377 data BlockReason
378   = BlockedOnMVar
379         -- ^blocked on on 'MVar'
380   | BlockedOnBlackHole
381         -- ^blocked on a computation in progress by another thread
382   | BlockedOnException
383         -- ^blocked in 'throwTo'
384   | BlockedOnSTM
385         -- ^blocked in 'retry' in an STM transaction
386   | BlockedOnForeignCall
387         -- ^currently in a foreign call
388   | BlockedOnOther
389         -- ^blocked on some other resource.  Without @-threaded@,
390         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
391         -- they show up as 'BlockedOnMVar'.
392   deriving (Eq,Ord,Show)
393
394 -- | The current status of a thread
395 data ThreadStatus
396   = ThreadRunning
397         -- ^the thread is currently runnable or running
398   | ThreadFinished
399         -- ^the thread has finished
400   | ThreadBlocked  BlockReason
401         -- ^the thread is blocked on some resource
402   | ThreadDied
403         -- ^the thread received an uncaught exception
404   deriving (Eq,Ord,Show)
405
406 threadStatus :: ThreadId -> IO ThreadStatus
407 threadStatus (ThreadId t) = IO $ \s ->
408    case threadStatus# t s of
409      (# s', stat #) -> (# s', mk_stat (I# stat) #)
410    where
411         -- NB. keep these in sync with includes/Constants.h
412      mk_stat 0  = ThreadRunning
413      mk_stat 1  = ThreadBlocked BlockedOnMVar
414      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
415      mk_stat 3  = ThreadBlocked BlockedOnException
416      mk_stat 7  = ThreadBlocked BlockedOnSTM
417      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
418      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
419      mk_stat 16 = ThreadFinished
420      mk_stat 17 = ThreadDied
421      mk_stat _  = ThreadBlocked BlockedOnOther
422 \end{code}
423
424
425 %************************************************************************
426 %*                                                                      *
427 \subsection[stm]{Transactional heap operations}
428 %*                                                                      *
429 %************************************************************************
430
431 TVars are shared memory locations which support atomic memory
432 transactions.
433
434 \begin{code}
435 -- |A monad supporting atomic memory transactions.
436 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
437
438 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
439 unSTM (STM a) = a
440
441 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
442
443 instance  Functor STM where
444    fmap f x = x >>= (return . f)
445
446 instance  Monad STM  where
447     {-# INLINE return #-}
448     {-# INLINE (>>)   #-}
449     {-# INLINE (>>=)  #-}
450     m >> k      = thenSTM m k
451     return x    = returnSTM x
452     m >>= k     = bindSTM m k
453
454 bindSTM :: STM a -> (a -> STM b) -> STM b
455 bindSTM (STM m) k = STM ( \s ->
456   case m s of 
457     (# new_s, a #) -> unSTM (k a) new_s
458   )
459
460 thenSTM :: STM a -> STM b -> STM b
461 thenSTM (STM m) k = STM ( \s ->
462   case m s of 
463     (# new_s, _ #) -> unSTM k new_s
464   )
465
466 returnSTM :: a -> STM a
467 returnSTM x = STM (\s -> (# s, x #))
468
469 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
470 -- dangerous thing to do.  
471 --
472 --   * The STM implementation will often run transactions multiple
473 --     times, so you need to be prepared for this if your IO has any
474 --     side effects.
475 --
476 --   * The STM implementation will abort transactions that are known to
477 --     be invalid and need to be restarted.  This may happen in the middle
478 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
479 --     that need releasing (exception handlers are ignored when aborting
480 --     the transaction).  That includes doing any IO using Handles, for
481 --     example.  Getting this wrong will probably lead to random deadlocks.
482 --
483 --   * The transaction may have seen an inconsistent view of memory when
484 --     the IO runs.  Invariants that you expect to be true throughout
485 --     your program may not be true inside a transaction, due to the
486 --     way transactions are implemented.  Normally this wouldn't be visible
487 --     to the programmer, but using `unsafeIOToSTM` can expose it.
488 --
489 unsafeIOToSTM :: IO a -> STM a
490 unsafeIOToSTM (IO m) = STM m
491
492 -- |Perform a series of STM actions atomically.
493 --
494 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
495 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
496 -- this would effectively allow a transaction inside a transaction, depending
497 -- on exactly when the thunk is evaluated.)
498 --
499 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
500 -- and which allows top-level TVars to be allocated.
501
502 atomically :: STM a -> IO a
503 atomically (STM m) = IO (\s -> (atomically# m) s )
504
505 -- |Retry execution of the current memory transaction because it has seen
506 -- values in TVars which mean that it should not continue (e.g. the TVars
507 -- represent a shared buffer that is now empty).  The implementation may
508 -- block the thread until one of the TVars that it has read from has been
509 -- udpated. (GHC only)
510 retry :: STM a
511 retry = STM $ \s# -> retry# s#
512
513 -- |Compose two alternative STM actions (GHC only).  If the first action
514 -- completes without retrying then it forms the result of the orElse.
515 -- Otherwise, if the first action retries, then the second action is
516 -- tried in its place.  If both actions retry then the orElse as a
517 -- whole retries.
518 orElse :: STM a -> STM a -> STM a
519 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
520
521 -- |Exception handling within STM actions.
522 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
523 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
524
525 -- | Low-level primitive on which always and alwaysSucceeds are built.
526 -- checkInv differs form these in that (i) the invariant is not 
527 -- checked when checkInv is called, only at the end of this and
528 -- subsequent transcations, (ii) the invariant failure is indicated
529 -- by raising an exception.
530 checkInv :: STM a -> STM ()
531 checkInv (STM m) = STM (\s -> (check# m) s)
532
533 -- | alwaysSucceeds adds a new invariant that must be true when passed
534 -- to alwaysSucceeds, at the end of the current transaction, and at
535 -- the end of every subsequent transaction.  If it fails at any
536 -- of those points then the transaction violating it is aborted
537 -- and the exception raised by the invariant is propagated.
538 alwaysSucceeds :: STM a -> STM ()
539 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
540                       checkInv i
541
542 -- | always is a variant of alwaysSucceeds in which the invariant is
543 -- expressed as an STM Bool action that must return True.  Returning
544 -- False or raising an exception are both treated as invariant failures.
545 always :: STM Bool -> STM ()
546 always i = alwaysSucceeds ( do v <- i
547                                if (v) then return () else ( error "Transacional invariant violation" ) )
548
549 -- |Shared memory locations that support atomic memory transactions.
550 data TVar a = TVar (TVar# RealWorld a)
551
552 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
553
554 instance Eq (TVar a) where
555         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
556
557 -- |Create a new TVar holding a value supplied
558 newTVar :: a -> STM (TVar a)
559 newTVar val = STM $ \s1# ->
560     case newTVar# val s1# of
561          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
562
563 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
564 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
565 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
566 -- possible.
567 newTVarIO :: a -> IO (TVar a)
568 newTVarIO val = IO $ \s1# ->
569     case newTVar# val s1# of
570          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
571
572 -- |Return the current value stored in a TVar.
573 -- This is equivalent to
574 --
575 -- >  readTVarIO = atomically . readTVar
576 --
577 -- but works much faster, because it doesn't perform a complete
578 -- transaction, it just reads the current value of the 'TVar'.
579 readTVarIO :: TVar a -> IO a
580 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
581
582 -- |Return the current value stored in a TVar
583 readTVar :: TVar a -> STM a
584 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
585
586 -- |Write the supplied value into a TVar
587 writeTVar :: TVar a -> a -> STM ()
588 writeTVar (TVar tvar#) val = STM $ \s1# ->
589     case writeTVar# tvar# val s1# of
590          s2# -> (# s2#, () #)
591   
592 \end{code}
593
594 MVar utilities
595
596 \begin{code}
597 withMVar :: MVar a -> (a -> IO b) -> IO b
598 withMVar m io = 
599   block $ do
600     a <- takeMVar m
601     b <- catchAny (unblock (io a))
602             (\e -> do putMVar m a; throw e)
603     putMVar m a
604     return b
605 \end{code}
606
607 %************************************************************************
608 %*                                                                      *
609 \subsection{Thread waiting}
610 %*                                                                      *
611 %************************************************************************
612
613 \begin{code}
614 #ifdef mingw32_HOST_OS
615
616 -- Note: threadWaitRead and threadWaitWrite aren't really functional
617 -- on Win32, but left in there because lib code (still) uses them (the manner
618 -- in which they're used doesn't cause problems on a Win32 platform though.)
619
620 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
621 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
622   IO $ \s -> case asyncRead# fd isSock len buf s of 
623                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
624
625 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
626 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
627   IO $ \s -> case asyncWrite# fd isSock len buf s of 
628                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
629
630 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
631 asyncDoProc (FunPtr proc) (Ptr param) = 
632     -- the 'length' value is ignored; simplifies implementation of
633     -- the async*# primops to have them all return the same result.
634   IO $ \s -> case asyncDoProc# proc param s  of 
635                (# s', _len#, err# #) -> (# s', I# err# #)
636
637 -- to aid the use of these primops by the IO Handle implementation,
638 -- provide the following convenience funs:
639
640 -- this better be a pinned byte array!
641 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
642 asyncReadBA fd isSock len off bufB = 
643   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
644   
645 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
646 asyncWriteBA fd isSock len off bufB = 
647   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
648
649 #endif
650
651 -- -----------------------------------------------------------------------------
652 -- Thread IO API
653
654 -- | Block the current thread until data is available to read on the
655 -- given file descriptor (GHC only).
656 threadWaitRead :: Fd -> IO ()
657 threadWaitRead fd
658 #ifndef mingw32_HOST_OS
659   | threaded  = waitForReadEvent fd
660 #endif
661   | otherwise = IO $ \s -> 
662         case fromIntegral fd of { I# fd# ->
663         case waitRead# fd# s of { s' -> (# s', () #)
664         }}
665
666 -- | Block the current thread until data can be written to the
667 -- given file descriptor (GHC only).
668 threadWaitWrite :: Fd -> IO ()
669 threadWaitWrite fd
670 #ifndef mingw32_HOST_OS
671   | threaded  = waitForWriteEvent fd
672 #endif
673   | otherwise = IO $ \s -> 
674         case fromIntegral fd of { I# fd# ->
675         case waitWrite# fd# s of { s' -> (# s', () #)
676         }}
677
678 -- | Suspends the current thread for a given number of microseconds
679 -- (GHC only).
680 --
681 -- There is no guarantee that the thread will be rescheduled promptly
682 -- when the delay has expired, but the thread will never continue to
683 -- run /earlier/ than specified.
684 --
685 threadDelay :: Int -> IO ()
686 threadDelay time
687   | threaded  = waitForDelayEvent time
688   | otherwise = IO $ \s -> 
689         case fromIntegral time of { I# time# ->
690         case delay# time# s of { s' -> (# s', () #)
691         }}
692
693
694 -- | Set the value of returned TVar to True after a given number of
695 -- microseconds. The caveats associated with threadDelay also apply.
696 --
697 registerDelay :: Int -> IO (TVar Bool)
698 registerDelay usecs 
699   | threaded = waitForDelayEventSTM usecs
700   | otherwise = error "registerDelay: requires -threaded"
701
702 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
703
704 waitForDelayEvent :: Int -> IO ()
705 waitForDelayEvent usecs = do
706   m <- newEmptyMVar
707   target <- calculateTarget usecs
708   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
709   prodServiceThread
710   takeMVar m
711
712 -- Delays for use in STM
713 waitForDelayEventSTM :: Int -> IO (TVar Bool)
714 waitForDelayEventSTM usecs = do
715    t <- atomically $ newTVar False
716    target <- calculateTarget usecs
717    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
718    prodServiceThread
719    return t  
720     
721 calculateTarget :: Int -> IO USecs
722 calculateTarget usecs = do
723     now <- getUSecOfDay
724     return $ now + (fromIntegral usecs)
725
726
727 -- ----------------------------------------------------------------------------
728 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
729
730 -- In the threaded RTS, we employ a single IO Manager thread to wait
731 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
732 -- and delays (threadDelay).  
733 --
734 -- We can do this because in the threaded RTS the IO Manager can make
735 -- a non-blocking call to select(), so we don't have to do select() in
736 -- the scheduler as we have to in the non-threaded RTS.  We get performance
737 -- benefits from doing it this way, because we only have to restart the select()
738 -- when a new request arrives, rather than doing one select() each time
739 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
740 -- by not having to check for completed IO requests.
741
742 -- Issues, possible problems:
743 --
744 --      - we might want bound threads to just do the blocking
745 --        operation rather than communicating with the IO manager
746 --        thread.  This would prevent simgle-threaded programs which do
747 --        IO from requiring multiple OS threads.  However, it would also
748 --        prevent bound threads waiting on IO from being killed or sent
749 --        exceptions.
750 --
751 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
752 --        I couldn't repeat this.
753 --
754 --      - How do we handle signal delivery in the multithreaded RTS?
755 --
756 --      - forkProcess will kill the IO manager thread.  Let's just
757 --        hope we don't need to do any blocking IO between fork & exec.
758
759 #ifndef mingw32_HOST_OS
760 data IOReq
761   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
762   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
763 #endif
764
765 data DelayReq
766   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
767   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
768
769 #ifndef mingw32_HOST_OS
770 pendingEvents :: IORef [IOReq]
771 #endif
772 pendingDelays :: IORef [DelayReq]
773         -- could use a strict list or array here
774 {-# NOINLINE pendingEvents #-}
775 {-# NOINLINE pendingDelays #-}
776 (pendingEvents,pendingDelays) = unsafePerformIO $ do
777   startIOManagerThread
778   reqs <- newIORef []
779   dels <- newIORef []
780   return (reqs, dels)
781         -- the first time we schedule an IO request, the service thread
782         -- will be created (cool, huh?)
783
784 ensureIOManagerIsRunning :: IO ()
785 ensureIOManagerIsRunning 
786   | threaded  = seq pendingEvents $ return ()
787   | otherwise = return ()
788
789 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
790 insertDelay d [] = [d]
791 insertDelay d1 ds@(d2 : rest)
792   | delayTime d1 <= delayTime d2 = d1 : ds
793   | otherwise                    = d2 : insertDelay d1 rest
794
795 delayTime :: DelayReq -> USecs
796 delayTime (Delay t _) = t
797 delayTime (DelaySTM t _) = t
798
799 type USecs = Word64
800
801 foreign import ccall unsafe "getUSecOfDay" 
802   getUSecOfDay :: IO USecs
803
804 prodding :: IORef Bool
805 {-# NOINLINE prodding #-}
806 prodding = unsafePerformIO (newIORef False)
807
808 prodServiceThread :: IO ()
809 prodServiceThread = do
810   was_set <- atomicModifyIORef prodding (\a -> (True,a))
811   if (not (was_set)) then wakeupIOManager else return ()
812
813 #ifdef mingw32_HOST_OS
814 -- ----------------------------------------------------------------------------
815 -- Windows IO manager thread
816
817 startIOManagerThread :: IO ()
818 startIOManagerThread = do
819   wakeup <- c_getIOManagerEvent
820   forkIO $ service_loop wakeup []
821   return ()
822
823 service_loop :: HANDLE          -- read end of pipe
824              -> [DelayReq]      -- current delay requests
825              -> IO ()
826
827 service_loop wakeup old_delays = do
828   -- pick up new delay requests
829   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
830   let  delays = foldr insertDelay old_delays new_delays
831
832   now <- getUSecOfDay
833   (delays', timeout) <- getDelay now delays
834
835   r <- c_WaitForSingleObject wakeup timeout
836   case r of
837     0xffffffff -> do c_maperrno; throwErrno "service_loop"
838     0 -> do
839         r2 <- c_readIOManagerEvent
840         exit <- 
841               case r2 of
842                 _ | r2 == io_MANAGER_WAKEUP -> return False
843                 _ | r2 == io_MANAGER_DIE    -> return True
844                 0 -> return False -- spurious wakeup
845                 _ -> do start_console_handler (r2 `shiftR` 1); return False
846         if exit
847           then return ()
848           else service_cont wakeup delays'
849
850     _other -> service_cont wakeup delays' -- probably timeout        
851
852 service_cont :: HANDLE -> [DelayReq] -> IO ()
853 service_cont wakeup delays = do
854   r <- atomicModifyIORef prodding (\_ -> (False,False))
855   r `seq` return () -- avoid space leak
856   service_loop wakeup delays
857
858 -- must agree with rts/win32/ThrIOManager.c
859 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
860 io_MANAGER_WAKEUP = 0xffffffff
861 io_MANAGER_DIE    = 0xfffffffe
862
863 data ConsoleEvent
864  = ControlC
865  | Break
866  | Close
867     -- these are sent to Services only.
868  | Logoff
869  | Shutdown
870  deriving (Eq, Ord, Enum, Show, Read, Typeable)
871
872 start_console_handler :: Word32 -> IO ()
873 start_console_handler r =
874   case toWin32ConsoleEvent r of
875      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
876                     forkIO (handler x)
877                     return ()
878      Nothing -> return ()
879
880 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
881 toWin32ConsoleEvent ev = 
882    case ev of
883        0 {- CTRL_C_EVENT-}        -> Just ControlC
884        1 {- CTRL_BREAK_EVENT-}    -> Just Break
885        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
886        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
887        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
888        _ -> Nothing
889
890 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
891 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
892
893 -- XXX Is this actually needed?
894 stick :: IORef HANDLE
895 {-# NOINLINE stick #-}
896 stick = unsafePerformIO (newIORef nullPtr)
897
898 wakeupIOManager :: IO ()
899 wakeupIOManager = do 
900   _hdl <- readIORef stick
901   c_sendIOManagerEvent io_MANAGER_WAKEUP
902
903 -- Walk the queue of pending delays, waking up any that have passed
904 -- and return the smallest delay to wait for.  The queue of pending
905 -- delays is kept ordered.
906 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
907 getDelay _   [] = return ([], iNFINITE)
908 getDelay now all@(d : rest) 
909   = case d of
910      Delay time m | now >= time -> do
911         putMVar m ()
912         getDelay now rest
913      DelaySTM time t | now >= time -> do
914         atomically $ writeTVar t True
915         getDelay now rest
916      _otherwise ->
917         -- delay is in millisecs for WaitForSingleObject
918         let micro_seconds = delayTime d - now
919             milli_seconds = (micro_seconds + 999) `div` 1000
920         in return (all, fromIntegral milli_seconds)
921
922 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
923 -- available yet.  We should move some Win32 functionality down here,
924 -- maybe as part of the grand reorganisation of the base package...
925 type HANDLE       = Ptr ()
926 type DWORD        = Word32
927
928 iNFINITE :: DWORD
929 iNFINITE = 0xFFFFFFFF -- urgh
930
931 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
932   c_getIOManagerEvent :: IO HANDLE
933
934 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
935   c_readIOManagerEvent :: IO Word32
936
937 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
938   c_sendIOManagerEvent :: Word32 -> IO ()
939
940 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
941    c_maperrno :: IO ()
942
943 foreign import stdcall "WaitForSingleObject"
944    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
945
946 #else
947 -- ----------------------------------------------------------------------------
948 -- Unix IO manager thread, using select()
949
950 startIOManagerThread :: IO ()
951 startIOManagerThread = do
952         allocaArray 2 $ \fds -> do
953         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
954         rd_end <- peekElemOff fds 0
955         wr_end <- peekElemOff fds 1
956         setNonBlockingFD wr_end True -- writes happen in a signal handler, we
957                                      -- don't want them to block.
958         setCloseOnExec rd_end
959         setCloseOnExec wr_end
960         writeIORef stick (fromIntegral wr_end)
961         c_setIOManagerPipe wr_end
962         forkIO $ do
963             allocaBytes sizeofFdSet   $ \readfds -> do
964             allocaBytes sizeofFdSet   $ \writefds -> do 
965             allocaBytes sizeofTimeVal $ \timeval -> do
966             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
967         return ()
968
969 service_loop
970    :: Fd                -- listen to this for wakeup calls
971    -> Ptr CFdSet
972    -> Ptr CFdSet
973    -> Ptr CTimeVal
974    -> [IOReq]
975    -> [DelayReq]
976    -> IO ()
977 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
978
979   -- pick up new IO requests
980   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
981   let reqs = new_reqs ++ old_reqs
982
983   -- pick up new delay requests
984   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
985   let  delays0 = foldr insertDelay old_delays new_delays
986
987   -- build the FDSets for select()
988   fdZero readfds
989   fdZero writefds
990   fdSet wakeup readfds
991   maxfd <- buildFdSets 0 readfds writefds reqs
992
993   -- perform the select()
994   let do_select delays = do
995           -- check the current time and wake up any thread in
996           -- threadDelay whose timeout has expired.  Also find the
997           -- timeout value for the select() call.
998           now <- getUSecOfDay
999           (delays', timeout) <- getDelay now ptimeval delays
1000
1001           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1002                         nullPtr timeout
1003           if (res == -1)
1004              then do
1005                 err <- getErrno
1006                 case err of
1007                   _ | err == eINTR ->  do_select delays'
1008                         -- EINTR: just redo the select()
1009                   _ | err == eBADF ->  return (True, delays)
1010                         -- EBADF: one of the file descriptors is closed or bad,
1011                         -- we don't know which one, so wake everyone up.
1012                   _ | otherwise    ->  throwErrno "select"
1013                         -- otherwise (ENOMEM or EINVAL) something has gone
1014                         -- wrong; report the error.
1015              else
1016                 return (False,delays')
1017
1018   (wakeup_all,delays') <- do_select delays0
1019
1020   exit <-
1021     if wakeup_all then return False
1022       else do
1023         b <- fdIsSet wakeup readfds
1024         if b == 0 
1025           then return False
1026           else alloca $ \p -> do 
1027                  c_read (fromIntegral wakeup) p 1
1028                  s <- peek p            
1029                  case s of
1030                   _ | s == io_MANAGER_WAKEUP -> return False
1031                   _ | s == io_MANAGER_DIE    -> return True
1032                   _ | s == io_MANAGER_SYNC   -> do
1033                        mvars <- readIORef sync
1034                        mapM_ (flip putMVar ()) mvars
1035                        return False
1036                   _ -> do
1037                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1038                        withForeignPtr fp $ \p_siginfo -> do
1039                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1040                                  sizeof_siginfo_t
1041                          when (r /= fromIntegral sizeof_siginfo_t) $
1042                             error "failed to read siginfo_t"
1043                        runHandlers' fp (fromIntegral s)
1044                        return False
1045
1046   if exit then return () else do
1047
1048   atomicModifyIORef prodding (\_ -> (False,False))
1049
1050   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1051                          else completeRequests reqs readfds writefds []
1052
1053   service_loop wakeup readfds writefds ptimeval reqs' delays'
1054
1055 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1056 io_MANAGER_WAKEUP = 0xff
1057 io_MANAGER_DIE    = 0xfe
1058 io_MANAGER_SYNC   = 0xfd
1059
1060 -- | the stick is for poking the IO manager with
1061 stick :: IORef Fd
1062 {-# NOINLINE stick #-}
1063 stick = unsafePerformIO (newIORef 0)
1064
1065 {-# NOINLINE sync #-}
1066 sync :: IORef [MVar ()]
1067 sync = unsafePerformIO (newIORef [])
1068
1069 -- waits for the IO manager to drain the pipe
1070 syncIOManager :: IO ()
1071 syncIOManager = do
1072   m <- newEmptyMVar
1073   atomicModifyIORef sync (\old -> (m:old,()))
1074   fd <- readIORef stick
1075   with io_MANAGER_SYNC $ \pbuf -> do 
1076     c_write (fromIntegral fd) pbuf 1; return ()
1077   takeMVar m
1078
1079 wakeupIOManager :: IO ()
1080 wakeupIOManager = do
1081   fd <- readIORef stick
1082   with io_MANAGER_WAKEUP $ \pbuf -> do 
1083     c_write (fromIntegral fd) pbuf 1; return ()
1084
1085 -- For the non-threaded RTS
1086 runHandlers :: Ptr Word8 -> Int -> IO ()
1087 runHandlers p_info sig = do
1088   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1089   withForeignPtr fp $ \p -> do
1090     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1091     free p_info
1092   runHandlers' fp (fromIntegral sig)
1093
1094 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1095 runHandlers' p_info sig = do
1096   let int = fromIntegral sig
1097   withMVar signal_handlers $ \arr ->
1098       if not (inRange (boundsIOArray arr) int)
1099          then return ()
1100          else do handler <- unsafeReadIOArray arr int
1101                  case handler of
1102                     Nothing -> return ()
1103                     Just (f,_)  -> do forkIO (f p_info); return ()
1104
1105 foreign import ccall "setIOManagerPipe"
1106   c_setIOManagerPipe :: CInt -> IO ()
1107
1108 foreign import ccall "__hscore_sizeof_siginfo_t"
1109   sizeof_siginfo_t :: CSize
1110
1111 type Signal = CInt
1112
1113 maxSig = 64 :: Int
1114
1115 type HandlerFun = ForeignPtr Word8 -> IO ()
1116
1117 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1118 -- this race condition is #1922, although that bug was on Windows a similar
1119 -- bug also exists on Unix.
1120 {-# NOINLINE signal_handlers #-}
1121 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1122 signal_handlers = unsafePerformIO $ do
1123    arr <- newIOArray (0,maxSig) Nothing
1124    m <- newMVar arr
1125    block $ do
1126      stable_ref <- newStablePtr m
1127      let ref = castStablePtrToPtr stable_ref
1128      ref2 <- getOrSetSignalHandlerStore ref
1129      if ref==ref2
1130         then return m
1131         else do freeStablePtr stable_ref
1132                 deRefStablePtr (castPtrToStablePtr ref2)
1133
1134 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1135     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1136
1137 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1138 setHandler sig handler = do
1139   let int = fromIntegral sig
1140   withMVar signal_handlers $ \arr -> 
1141      if not (inRange (boundsIOArray arr) int)
1142         then error "GHC.Conc.setHandler: signal out of range"
1143         else do old <- unsafeReadIOArray arr int
1144                 unsafeWriteIOArray arr int handler
1145                 return old
1146
1147 -- -----------------------------------------------------------------------------
1148 -- IO requests
1149
1150 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1151 buildFdSets maxfd _       _        [] = return maxfd
1152 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1153   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1154   | otherwise        =  do
1155         fdSet fd readfds
1156         buildFdSets (max maxfd fd) readfds writefds reqs
1157 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1158   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1159   | otherwise        =  do
1160         fdSet fd writefds
1161         buildFdSets (max maxfd fd) readfds writefds reqs
1162
1163 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1164                  -> IO [IOReq]
1165 completeRequests [] _ _ reqs' = return reqs'
1166 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1167   b <- fdIsSet fd readfds
1168   if b /= 0
1169     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1170     else completeRequests reqs readfds writefds (Read fd m : reqs')
1171 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1172   b <- fdIsSet fd writefds
1173   if b /= 0
1174     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1175     else completeRequests reqs readfds writefds (Write fd m : reqs')
1176
1177 wakeupAll :: [IOReq] -> IO ()
1178 wakeupAll [] = return ()
1179 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1180 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1181
1182 waitForReadEvent :: Fd -> IO ()
1183 waitForReadEvent fd = do
1184   m <- newEmptyMVar
1185   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1186   prodServiceThread
1187   takeMVar m
1188
1189 waitForWriteEvent :: Fd -> IO ()
1190 waitForWriteEvent fd = do
1191   m <- newEmptyMVar
1192   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1193   prodServiceThread
1194   takeMVar m
1195
1196 -- -----------------------------------------------------------------------------
1197 -- Delays
1198
1199 -- Walk the queue of pending delays, waking up any that have passed
1200 -- and return the smallest delay to wait for.  The queue of pending
1201 -- delays is kept ordered.
1202 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1203 getDelay _   _        [] = return ([],nullPtr)
1204 getDelay now ptimeval all@(d : rest) 
1205   = case d of
1206      Delay time m | now >= time -> do
1207         putMVar m ()
1208         getDelay now ptimeval rest
1209      DelaySTM time t | now >= time -> do
1210         atomically $ writeTVar t True
1211         getDelay now ptimeval rest
1212      _otherwise -> do
1213         setTimevalTicks ptimeval (delayTime d - now)
1214         return (all,ptimeval)
1215
1216 data CTimeVal
1217
1218 foreign import ccall unsafe "sizeofTimeVal"
1219   sizeofTimeVal :: Int
1220
1221 foreign import ccall unsafe "setTimevalTicks" 
1222   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1223
1224 {- 
1225   On Win32 we're going to have a single Pipe, and a
1226   waitForSingleObject with the delay time.  For signals, we send a
1227   byte down the pipe just like on Unix.
1228 -}
1229
1230 -- ----------------------------------------------------------------------------
1231 -- select() interface
1232
1233 -- ToDo: move to System.Posix.Internals?
1234
1235 data CFdSet
1236
1237 foreign import ccall safe "select"
1238   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1239            -> IO CInt
1240
1241 foreign import ccall unsafe "hsFD_SETSIZE"
1242   c_fD_SETSIZE :: CInt
1243
1244 fD_SETSIZE :: Fd
1245 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1246
1247 foreign import ccall unsafe "hsFD_ISSET"
1248   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1249
1250 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1251 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1252
1253 foreign import ccall unsafe "hsFD_SET"
1254   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1255
1256 fdSet :: Fd -> Ptr CFdSet -> IO ()
1257 fdSet (Fd fd) fdset = c_fdSet fd fdset
1258
1259 foreign import ccall unsafe "hsFD_ZERO"
1260   fdZero :: Ptr CFdSet -> IO ()
1261
1262 foreign import ccall unsafe "sizeof_fd_set"
1263   sizeofFdSet :: Int
1264
1265 #endif
1266
1267 reportStackOverflow :: IO a
1268 reportStackOverflow = do callStackOverflowHook; return undefined
1269
1270 reportError :: SomeException -> IO a
1271 reportError ex = do
1272    handler <- getUncaughtExceptionHandler
1273    handler ex
1274    return undefined
1275
1276 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1277 -- the unsafe below.
1278 foreign import ccall unsafe "stackOverflow"
1279         callStackOverflowHook :: IO ()
1280
1281 {-# NOINLINE uncaughtExceptionHandler #-}
1282 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1283 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1284    where
1285       defaultHandler :: SomeException -> IO ()
1286       defaultHandler se@(SomeException ex) = do
1287          (hFlush stdout) `catchAny` (\ _ -> return ())
1288          let msg = case cast ex of
1289                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1290                _ -> case cast ex of
1291                     Just (ErrorCall s) -> s
1292                     _                  -> showsPrec 0 se ""
1293          withCString "%s" $ \cfmt ->
1294           withCString msg $ \cmsg ->
1295             errorBelch cfmt cmsg
1296
1297 -- don't use errorBelch() directly, because we cannot call varargs functions
1298 -- using the FFI.
1299 foreign import ccall unsafe "HsBase.h errorBelch2"
1300    errorBelch :: CString -> CString -> IO ()
1301
1302 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1303 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1304
1305 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1306 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1307 \end{code}