Add back imports needed on Windows
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * TVars
54         , STM(..)
55         , atomically    -- :: STM a -> IO a
56         , retry         -- :: STM a
57         , orElse        -- :: STM a -> STM a -> STM a
58         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
59         , alwaysSucceeds -- :: STM a -> STM ()
60         , always        -- :: STM Bool -> STM ()
61         , TVar(..)
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , readTVarIO    -- :: TVar a -> IO a
66         , writeTVar     -- :: a -> TVar a -> STM ()
67         , unsafeIOToSTM -- :: IO a -> STM a
68
69         -- * Miscellaneous
70         , withMVar
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , Signal, HandlerFun, setHandler, runHandlers
82 #endif
83
84         , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
86         , syncIOManager
87 #endif
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
95         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
96
97         , reportError, reportStackOverflow
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifdef mingw32_HOST_OS
108 import Data.Typeable
109 #endif
110
111 #ifndef mingw32_HOST_OS
112 import Data.Dynamic
113 import Control.Monad
114 #endif
115 import Data.Maybe
116
117 import GHC.Base
118 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
119 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
120 import GHC.IO
121 import GHC.IO.Exception
122 import GHC.Exception
123 import GHC.IORef
124 import GHC.MVar
125 import GHC.Num          ( Num(..) )
126 import GHC.Real         ( fromIntegral )
127 #ifndef mingw32_HOST_OS
128 import GHC.IOArray
129 import GHC.Arr          ( inRange )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Real         ( div )
133 import GHC.Ptr
134 #endif
135 #ifdef mingw32_HOST_OS
136 import GHC.Read         ( Read )
137 import GHC.Enum         ( Enum )
138 #endif
139 import GHC.Pack         ( packCString# )
140 import GHC.Show         ( Show(..), showString )
141 import GHC.Err
142
143 infixr 0 `par`, `pseq`
144 \end{code}
145
146 %************************************************************************
147 %*                                                                      *
148 \subsection{@ThreadId@, @par@, and @fork@}
149 %*                                                                      *
150 %************************************************************************
151
152 \begin{code}
153 data ThreadId = ThreadId ThreadId# deriving( Typeable )
154 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
155 -- But since ThreadId# is unlifted, the Weak type must use open
156 -- type variables.
157 {- ^
158 A 'ThreadId' is an abstract type representing a handle to a thread.
159 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
160 the 'Ord' instance implements an arbitrary total ordering over
161 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
162 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
163 useful when debugging or diagnosing the behaviour of a concurrent
164 program.
165
166 /Note/: in GHC, if you have a 'ThreadId', you essentially have
167 a pointer to the thread itself.  This means the thread itself can\'t be
168 garbage collected until you drop the 'ThreadId'.
169 This misfeature will hopefully be corrected at a later date.
170
171 /Note/: Hugs does not provide any operations on other threads;
172 it defines 'ThreadId' as a synonym for ().
173 -}
174
175 instance Show ThreadId where
176    showsPrec d t = 
177         showString "ThreadId " . 
178         showsPrec d (getThreadId (id2TSO t))
179
180 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
181
182 id2TSO :: ThreadId -> ThreadId#
183 id2TSO (ThreadId t) = t
184
185 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
186 -- Returns -1, 0, 1
187
188 cmpThread :: ThreadId -> ThreadId -> Ordering
189 cmpThread t1 t2 = 
190    case cmp_thread (id2TSO t1) (id2TSO t2) of
191       -1 -> LT
192       0  -> EQ
193       _  -> GT -- must be 1
194
195 instance Eq ThreadId where
196    t1 == t2 = 
197       case t1 `cmpThread` t2 of
198          EQ -> True
199          _  -> False
200
201 instance Ord ThreadId where
202    compare = cmpThread
203
204 {- |
205 Sparks off a new thread to run the 'IO' computation passed as the
206 first argument, and returns the 'ThreadId' of the newly created
207 thread.
208
209 The new thread will be a lightweight thread; if you want to use a foreign
210 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
211
212 GHC note: the new thread inherits the /blocked/ state of the parent 
213 (see 'Control.Exception.block').
214
215 The newly created thread has an exception handler that discards the
216 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
217 'ThreadKilled', and passes all other exceptions to the uncaught
218 exception handler (see 'setUncaughtExceptionHandler').
219 -}
220 forkIO :: IO () -> IO ThreadId
221 forkIO action = IO $ \ s -> 
222    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
223  where
224   action_plus = catchException action childHandler
225
226 {- |
227 Like 'forkIO', but lets you specify on which CPU the thread is
228 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
229 will stay on the same CPU for its entire lifetime (`forkIO` threads
230 can migrate between CPUs according to the scheduling policy).
231 `forkOnIO` is useful for overriding the scheduling policy when you
232 know in advance how best to distribute the threads.
233
234 The `Int` argument specifies the CPU number; it is interpreted modulo
235 'numCapabilities' (note that it actually specifies a capability number
236 rather than a CPU number, but to a first approximation the two are
237 equivalent).
238 -}
239 forkOnIO :: Int -> IO () -> IO ThreadId
240 forkOnIO (I# cpu) action = IO $ \ s -> 
241    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
242  where
243   action_plus = catchException action childHandler
244
245 -- | the value passed to the @+RTS -N@ flag.  This is the number of
246 -- Haskell threads that can run truly simultaneously at any given
247 -- time, and is typically set to the number of physical CPU cores on
248 -- the machine.
249 numCapabilities :: Int
250 numCapabilities = unsafePerformIO $  do 
251                     n <- peek n_capabilities
252                     return (fromIntegral n)
253
254 #if defined(mingw32_HOST_OS) && defined(__PIC__)
255 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
256 #else
257 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
258 #endif
259 childHandler :: SomeException -> IO ()
260 childHandler err = catchException (real_handler err) childHandler
261
262 real_handler :: SomeException -> IO ()
263 real_handler se@(SomeException ex) =
264   -- ignore thread GC and killThread exceptions:
265   case cast ex of
266   Just BlockedOnDeadMVar                -> return ()
267   _ -> case cast ex of
268        Just BlockedIndefinitely         -> return ()
269        _ -> case cast ex of
270             Just ThreadKilled           -> return ()
271             _ -> case cast ex of
272                  -- report all others:
273                  Just StackOverflow     -> reportStackOverflow
274                  _                      -> reportError se
275
276 {- | 'killThread' terminates the given thread (GHC only).
277 Any work already done by the thread isn\'t
278 lost: the computation is suspended until required by another thread.
279 The memory used by the thread will be garbage collected if it isn\'t
280 referenced from anywhere.  The 'killThread' function is defined in
281 terms of 'throwTo':
282
283 > killThread tid = throwTo tid ThreadKilled
284
285 Killthread is a no-op if the target thread has already completed.
286 -}
287 killThread :: ThreadId -> IO ()
288 killThread tid = throwTo tid ThreadKilled
289
290 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
291
292 'throwTo' does not return until the exception has been raised in the
293 target thread. 
294 The calling thread can thus be certain that the target
295 thread has received the exception.  This is a useful property to know
296 when dealing with race conditions: eg. if there are two threads that
297 can kill each other, it is guaranteed that only one of the threads
298 will get to kill the other.
299
300 If the target thread is currently making a foreign call, then the
301 exception will not be raised (and hence 'throwTo' will not return)
302 until the call has completed.  This is the case regardless of whether
303 the call is inside a 'block' or not.
304
305 Important note: the behaviour of 'throwTo' differs from that described in
306 the paper \"Asynchronous exceptions in Haskell\"
307 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
308 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
309 a more synchronous design in which 'throwTo' does not return until the exception
310 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
311 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
312 the paper).
313
314 There is currently no guarantee that the exception delivered by 'throwTo' will be
315 delivered at the first possible opportunity.  In particular, a thread may 
316 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
317 a pending 'throwTo'.  This is arguably undesirable behaviour.
318
319  -}
320 throwTo :: Exception e => ThreadId -> e -> IO ()
321 throwTo (ThreadId tid) ex = IO $ \ s ->
322    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
323
324 -- | Returns the 'ThreadId' of the calling thread (GHC only).
325 myThreadId :: IO ThreadId
326 myThreadId = IO $ \s ->
327    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
328
329
330 -- |The 'yield' action allows (forces, in a co-operative multitasking
331 -- implementation) a context-switch to any other currently runnable
332 -- threads (if any), and is occasionally useful when implementing
333 -- concurrency abstractions.
334 yield :: IO ()
335 yield = IO $ \s -> 
336    case (yield# s) of s1 -> (# s1, () #)
337
338 {- | 'labelThread' stores a string as identifier for this thread if
339 you built a RTS with debugging support. This identifier will be used in
340 the debugging output to make distinction of different threads easier
341 (otherwise you only have the thread state object\'s address in the heap).
342
343 Other applications like the graphical Concurrent Haskell Debugger
344 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
345 'labelThread' for their purposes as well.
346 -}
347
348 labelThread :: ThreadId -> String -> IO ()
349 labelThread (ThreadId t) str = IO $ \ s ->
350    let !ps  = packCString# str
351        !adr = byteArrayContents# ps in
352      case (labelThread# t adr s) of s1 -> (# s1, () #)
353
354 --      Nota Bene: 'pseq' used to be 'seq'
355 --                 but 'seq' is now defined in PrelGHC
356 --
357 -- "pseq" is defined a bit weirdly (see below)
358 --
359 -- The reason for the strange "lazy" call is that
360 -- it fools the compiler into thinking that pseq  and par are non-strict in
361 -- their second argument (even if it inlines pseq at the call site).
362 -- If it thinks pseq is strict in "y", then it often evaluates
363 -- "y" before "x", which is totally wrong.  
364
365 {-# INLINE pseq  #-}
366 pseq :: a -> b -> b
367 pseq  x y = x `seq` lazy y
368
369 {-# INLINE par  #-}
370 par :: a -> b -> b
371 par  x y = case (par# x) of { _ -> lazy y }
372
373 -- | Internal function used by the RTS to run sparks.
374 runSparks :: IO ()
375 runSparks = IO loop
376   where loop s = case getSpark# s of
377                    (# s', n, p #) ->
378                       if n ==# 0# then (# s', () #)
379                                   else p `seq` loop s'
380
381 data BlockReason
382   = BlockedOnMVar
383         -- ^blocked on on 'MVar'
384   | BlockedOnBlackHole
385         -- ^blocked on a computation in progress by another thread
386   | BlockedOnException
387         -- ^blocked in 'throwTo'
388   | BlockedOnSTM
389         -- ^blocked in 'retry' in an STM transaction
390   | BlockedOnForeignCall
391         -- ^currently in a foreign call
392   | BlockedOnOther
393         -- ^blocked on some other resource.  Without @-threaded@,
394         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
395         -- they show up as 'BlockedOnMVar'.
396   deriving (Eq,Ord,Show)
397
398 -- | The current status of a thread
399 data ThreadStatus
400   = ThreadRunning
401         -- ^the thread is currently runnable or running
402   | ThreadFinished
403         -- ^the thread has finished
404   | ThreadBlocked  BlockReason
405         -- ^the thread is blocked on some resource
406   | ThreadDied
407         -- ^the thread received an uncaught exception
408   deriving (Eq,Ord,Show)
409
410 threadStatus :: ThreadId -> IO ThreadStatus
411 threadStatus (ThreadId t) = IO $ \s ->
412    case threadStatus# t s of
413      (# s', stat #) -> (# s', mk_stat (I# stat) #)
414    where
415         -- NB. keep these in sync with includes/Constants.h
416      mk_stat 0  = ThreadRunning
417      mk_stat 1  = ThreadBlocked BlockedOnMVar
418      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
419      mk_stat 3  = ThreadBlocked BlockedOnException
420      mk_stat 7  = ThreadBlocked BlockedOnSTM
421      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
422      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
423      mk_stat 16 = ThreadFinished
424      mk_stat 17 = ThreadDied
425      mk_stat _  = ThreadBlocked BlockedOnOther
426 \end{code}
427
428
429 %************************************************************************
430 %*                                                                      *
431 \subsection[stm]{Transactional heap operations}
432 %*                                                                      *
433 %************************************************************************
434
435 TVars are shared memory locations which support atomic memory
436 transactions.
437
438 \begin{code}
439 -- |A monad supporting atomic memory transactions.
440 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
441
442 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
443 unSTM (STM a) = a
444
445 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
446
447 instance  Functor STM where
448    fmap f x = x >>= (return . f)
449
450 instance  Monad STM  where
451     {-# INLINE return #-}
452     {-# INLINE (>>)   #-}
453     {-# INLINE (>>=)  #-}
454     m >> k      = thenSTM m k
455     return x    = returnSTM x
456     m >>= k     = bindSTM m k
457
458 bindSTM :: STM a -> (a -> STM b) -> STM b
459 bindSTM (STM m) k = STM ( \s ->
460   case m s of 
461     (# new_s, a #) -> unSTM (k a) new_s
462   )
463
464 thenSTM :: STM a -> STM b -> STM b
465 thenSTM (STM m) k = STM ( \s ->
466   case m s of 
467     (# new_s, _ #) -> unSTM k new_s
468   )
469
470 returnSTM :: a -> STM a
471 returnSTM x = STM (\s -> (# s, x #))
472
473 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
474 -- dangerous thing to do.  
475 --
476 --   * The STM implementation will often run transactions multiple
477 --     times, so you need to be prepared for this if your IO has any
478 --     side effects.
479 --
480 --   * The STM implementation will abort transactions that are known to
481 --     be invalid and need to be restarted.  This may happen in the middle
482 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
483 --     that need releasing (exception handlers are ignored when aborting
484 --     the transaction).  That includes doing any IO using Handles, for
485 --     example.  Getting this wrong will probably lead to random deadlocks.
486 --
487 --   * The transaction may have seen an inconsistent view of memory when
488 --     the IO runs.  Invariants that you expect to be true throughout
489 --     your program may not be true inside a transaction, due to the
490 --     way transactions are implemented.  Normally this wouldn't be visible
491 --     to the programmer, but using `unsafeIOToSTM` can expose it.
492 --
493 unsafeIOToSTM :: IO a -> STM a
494 unsafeIOToSTM (IO m) = STM m
495
496 -- |Perform a series of STM actions atomically.
497 --
498 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
499 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
500 -- this would effectively allow a transaction inside a transaction, depending
501 -- on exactly when the thunk is evaluated.)
502 --
503 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
504 -- and which allows top-level TVars to be allocated.
505
506 atomically :: STM a -> IO a
507 atomically (STM m) = IO (\s -> (atomically# m) s )
508
509 -- |Retry execution of the current memory transaction because it has seen
510 -- values in TVars which mean that it should not continue (e.g. the TVars
511 -- represent a shared buffer that is now empty).  The implementation may
512 -- block the thread until one of the TVars that it has read from has been
513 -- udpated. (GHC only)
514 retry :: STM a
515 retry = STM $ \s# -> retry# s#
516
517 -- |Compose two alternative STM actions (GHC only).  If the first action
518 -- completes without retrying then it forms the result of the orElse.
519 -- Otherwise, if the first action retries, then the second action is
520 -- tried in its place.  If both actions retry then the orElse as a
521 -- whole retries.
522 orElse :: STM a -> STM a -> STM a
523 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
524
525 -- |Exception handling within STM actions.
526 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
527 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
528
529 -- | Low-level primitive on which always and alwaysSucceeds are built.
530 -- checkInv differs form these in that (i) the invariant is not 
531 -- checked when checkInv is called, only at the end of this and
532 -- subsequent transcations, (ii) the invariant failure is indicated
533 -- by raising an exception.
534 checkInv :: STM a -> STM ()
535 checkInv (STM m) = STM (\s -> (check# m) s)
536
537 -- | alwaysSucceeds adds a new invariant that must be true when passed
538 -- to alwaysSucceeds, at the end of the current transaction, and at
539 -- the end of every subsequent transaction.  If it fails at any
540 -- of those points then the transaction violating it is aborted
541 -- and the exception raised by the invariant is propagated.
542 alwaysSucceeds :: STM a -> STM ()
543 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
544                       checkInv i
545
546 -- | always is a variant of alwaysSucceeds in which the invariant is
547 -- expressed as an STM Bool action that must return True.  Returning
548 -- False or raising an exception are both treated as invariant failures.
549 always :: STM Bool -> STM ()
550 always i = alwaysSucceeds ( do v <- i
551                                if (v) then return () else ( error "Transacional invariant violation" ) )
552
553 -- |Shared memory locations that support atomic memory transactions.
554 data TVar a = TVar (TVar# RealWorld a)
555
556 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
557
558 instance Eq (TVar a) where
559         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
560
561 -- |Create a new TVar holding a value supplied
562 newTVar :: a -> STM (TVar a)
563 newTVar val = STM $ \s1# ->
564     case newTVar# val s1# of
565          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
566
567 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
568 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
569 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
570 -- possible.
571 newTVarIO :: a -> IO (TVar a)
572 newTVarIO val = IO $ \s1# ->
573     case newTVar# val s1# of
574          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
575
576 -- |Return the current value stored in a TVar.
577 -- This is equivalent to
578 --
579 -- >  readTVarIO = atomically . readTVar
580 --
581 -- but works much faster, because it doesn't perform a complete
582 -- transaction, it just reads the current value of the 'TVar'.
583 readTVarIO :: TVar a -> IO a
584 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
585
586 -- |Return the current value stored in a TVar
587 readTVar :: TVar a -> STM a
588 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
589
590 -- |Write the supplied value into a TVar
591 writeTVar :: TVar a -> a -> STM ()
592 writeTVar (TVar tvar#) val = STM $ \s1# ->
593     case writeTVar# tvar# val s1# of
594          s2# -> (# s2#, () #)
595   
596 \end{code}
597
598 MVar utilities
599
600 \begin{code}
601 withMVar :: MVar a -> (a -> IO b) -> IO b
602 withMVar m io = 
603   block $ do
604     a <- takeMVar m
605     b <- catchAny (unblock (io a))
606             (\e -> do putMVar m a; throw e)
607     putMVar m a
608     return b
609 \end{code}
610
611 %************************************************************************
612 %*                                                                      *
613 \subsection{Thread waiting}
614 %*                                                                      *
615 %************************************************************************
616
617 \begin{code}
618 #ifdef mingw32_HOST_OS
619
620 -- Note: threadWaitRead and threadWaitWrite aren't really functional
621 -- on Win32, but left in there because lib code (still) uses them (the manner
622 -- in which they're used doesn't cause problems on a Win32 platform though.)
623
624 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
625 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
626   IO $ \s -> case asyncRead# fd isSock len buf s of 
627                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
628
629 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
630 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
631   IO $ \s -> case asyncWrite# fd isSock len buf s of 
632                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
633
634 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
635 asyncDoProc (FunPtr proc) (Ptr param) = 
636     -- the 'length' value is ignored; simplifies implementation of
637     -- the async*# primops to have them all return the same result.
638   IO $ \s -> case asyncDoProc# proc param s  of 
639                (# s', _len#, err# #) -> (# s', I# err# #)
640
641 -- to aid the use of these primops by the IO Handle implementation,
642 -- provide the following convenience funs:
643
644 -- this better be a pinned byte array!
645 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
646 asyncReadBA fd isSock len off bufB = 
647   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
648   
649 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
650 asyncWriteBA fd isSock len off bufB = 
651   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
652
653 #endif
654
655 -- -----------------------------------------------------------------------------
656 -- Thread IO API
657
658 -- | Block the current thread until data is available to read on the
659 -- given file descriptor (GHC only).
660 threadWaitRead :: Fd -> IO ()
661 threadWaitRead fd
662 #ifndef mingw32_HOST_OS
663   | threaded  = waitForReadEvent fd
664 #endif
665   | otherwise = IO $ \s -> 
666         case fromIntegral fd of { I# fd# ->
667         case waitRead# fd# s of { s' -> (# s', () #)
668         }}
669
670 -- | Block the current thread until data can be written to the
671 -- given file descriptor (GHC only).
672 threadWaitWrite :: Fd -> IO ()
673 threadWaitWrite fd
674 #ifndef mingw32_HOST_OS
675   | threaded  = waitForWriteEvent fd
676 #endif
677   | otherwise = IO $ \s -> 
678         case fromIntegral fd of { I# fd# ->
679         case waitWrite# fd# s of { s' -> (# s', () #)
680         }}
681
682 -- | Suspends the current thread for a given number of microseconds
683 -- (GHC only).
684 --
685 -- There is no guarantee that the thread will be rescheduled promptly
686 -- when the delay has expired, but the thread will never continue to
687 -- run /earlier/ than specified.
688 --
689 threadDelay :: Int -> IO ()
690 threadDelay time
691   | threaded  = waitForDelayEvent time
692   | otherwise = IO $ \s -> 
693         case fromIntegral time of { I# time# ->
694         case delay# time# s of { s' -> (# s', () #)
695         }}
696
697
698 -- | Set the value of returned TVar to True after a given number of
699 -- microseconds. The caveats associated with threadDelay also apply.
700 --
701 registerDelay :: Int -> IO (TVar Bool)
702 registerDelay usecs 
703   | threaded = waitForDelayEventSTM usecs
704   | otherwise = error "registerDelay: requires -threaded"
705
706 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
707
708 waitForDelayEvent :: Int -> IO ()
709 waitForDelayEvent usecs = do
710   m <- newEmptyMVar
711   target <- calculateTarget usecs
712   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
713   prodServiceThread
714   takeMVar m
715
716 -- Delays for use in STM
717 waitForDelayEventSTM :: Int -> IO (TVar Bool)
718 waitForDelayEventSTM usecs = do
719    t <- atomically $ newTVar False
720    target <- calculateTarget usecs
721    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
722    prodServiceThread
723    return t  
724     
725 calculateTarget :: Int -> IO USecs
726 calculateTarget usecs = do
727     now <- getUSecOfDay
728     return $ now + (fromIntegral usecs)
729
730
731 -- ----------------------------------------------------------------------------
732 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
733
734 -- In the threaded RTS, we employ a single IO Manager thread to wait
735 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
736 -- and delays (threadDelay).  
737 --
738 -- We can do this because in the threaded RTS the IO Manager can make
739 -- a non-blocking call to select(), so we don't have to do select() in
740 -- the scheduler as we have to in the non-threaded RTS.  We get performance
741 -- benefits from doing it this way, because we only have to restart the select()
742 -- when a new request arrives, rather than doing one select() each time
743 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
744 -- by not having to check for completed IO requests.
745
746 -- Issues, possible problems:
747 --
748 --      - we might want bound threads to just do the blocking
749 --        operation rather than communicating with the IO manager
750 --        thread.  This would prevent simgle-threaded programs which do
751 --        IO from requiring multiple OS threads.  However, it would also
752 --        prevent bound threads waiting on IO from being killed or sent
753 --        exceptions.
754 --
755 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
756 --        I couldn't repeat this.
757 --
758 --      - How do we handle signal delivery in the multithreaded RTS?
759 --
760 --      - forkProcess will kill the IO manager thread.  Let's just
761 --        hope we don't need to do any blocking IO between fork & exec.
762
763 #ifndef mingw32_HOST_OS
764 data IOReq
765   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
766   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
767 #endif
768
769 data DelayReq
770   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
771   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
772
773 #ifndef mingw32_HOST_OS
774 pendingEvents :: IORef [IOReq]
775 #endif
776 pendingDelays :: IORef [DelayReq]
777         -- could use a strict list or array here
778 {-# NOINLINE pendingEvents #-}
779 {-# NOINLINE pendingDelays #-}
780 (pendingEvents,pendingDelays) = unsafePerformIO $ do
781   startIOManagerThread
782   reqs <- newIORef []
783   dels <- newIORef []
784   return (reqs, dels)
785         -- the first time we schedule an IO request, the service thread
786         -- will be created (cool, huh?)
787
788 ensureIOManagerIsRunning :: IO ()
789 ensureIOManagerIsRunning 
790   | threaded  = seq pendingEvents $ return ()
791   | otherwise = return ()
792
793 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
794 insertDelay d [] = [d]
795 insertDelay d1 ds@(d2 : rest)
796   | delayTime d1 <= delayTime d2 = d1 : ds
797   | otherwise                    = d2 : insertDelay d1 rest
798
799 delayTime :: DelayReq -> USecs
800 delayTime (Delay t _) = t
801 delayTime (DelaySTM t _) = t
802
803 type USecs = Word64
804
805 foreign import ccall unsafe "getUSecOfDay" 
806   getUSecOfDay :: IO USecs
807
808 prodding :: IORef Bool
809 {-# NOINLINE prodding #-}
810 prodding = unsafePerformIO (newIORef False)
811
812 prodServiceThread :: IO ()
813 prodServiceThread = do
814   was_set <- atomicModifyIORef prodding (\a -> (True,a))
815   if (not (was_set)) then wakeupIOManager else return ()
816
817 #ifdef mingw32_HOST_OS
818 -- ----------------------------------------------------------------------------
819 -- Windows IO manager thread
820
821 startIOManagerThread :: IO ()
822 startIOManagerThread = do
823   wakeup <- c_getIOManagerEvent
824   forkIO $ service_loop wakeup []
825   return ()
826
827 service_loop :: HANDLE          -- read end of pipe
828              -> [DelayReq]      -- current delay requests
829              -> IO ()
830
831 service_loop wakeup old_delays = do
832   -- pick up new delay requests
833   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
834   let  delays = foldr insertDelay old_delays new_delays
835
836   now <- getUSecOfDay
837   (delays', timeout) <- getDelay now delays
838
839   r <- c_WaitForSingleObject wakeup timeout
840   case r of
841     0xffffffff -> do c_maperrno; throwErrno "service_loop"
842     0 -> do
843         r2 <- c_readIOManagerEvent
844         exit <- 
845               case r2 of
846                 _ | r2 == io_MANAGER_WAKEUP -> return False
847                 _ | r2 == io_MANAGER_DIE    -> return True
848                 0 -> return False -- spurious wakeup
849                 _ -> do start_console_handler (r2 `shiftR` 1); return False
850         if exit
851           then return ()
852           else service_cont wakeup delays'
853
854     _other -> service_cont wakeup delays' -- probably timeout        
855
856 service_cont :: HANDLE -> [DelayReq] -> IO ()
857 service_cont wakeup delays = do
858   r <- atomicModifyIORef prodding (\_ -> (False,False))
859   r `seq` return () -- avoid space leak
860   service_loop wakeup delays
861
862 -- must agree with rts/win32/ThrIOManager.c
863 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
864 io_MANAGER_WAKEUP = 0xffffffff
865 io_MANAGER_DIE    = 0xfffffffe
866
867 data ConsoleEvent
868  = ControlC
869  | Break
870  | Close
871     -- these are sent to Services only.
872  | Logoff
873  | Shutdown
874  deriving (Eq, Ord, Enum, Show, Read, Typeable)
875
876 start_console_handler :: Word32 -> IO ()
877 start_console_handler r =
878   case toWin32ConsoleEvent r of
879      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
880                     forkIO (handler x)
881                     return ()
882      Nothing -> return ()
883
884 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
885 toWin32ConsoleEvent ev = 
886    case ev of
887        0 {- CTRL_C_EVENT-}        -> Just ControlC
888        1 {- CTRL_BREAK_EVENT-}    -> Just Break
889        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
890        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
891        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
892        _ -> Nothing
893
894 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
895 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
896
897 -- XXX Is this actually needed?
898 stick :: IORef HANDLE
899 {-# NOINLINE stick #-}
900 stick = unsafePerformIO (newIORef nullPtr)
901
902 wakeupIOManager :: IO ()
903 wakeupIOManager = do 
904   _hdl <- readIORef stick
905   c_sendIOManagerEvent io_MANAGER_WAKEUP
906
907 -- Walk the queue of pending delays, waking up any that have passed
908 -- and return the smallest delay to wait for.  The queue of pending
909 -- delays is kept ordered.
910 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
911 getDelay _   [] = return ([], iNFINITE)
912 getDelay now all@(d : rest) 
913   = case d of
914      Delay time m | now >= time -> do
915         putMVar m ()
916         getDelay now rest
917      DelaySTM time t | now >= time -> do
918         atomically $ writeTVar t True
919         getDelay now rest
920      _otherwise ->
921         -- delay is in millisecs for WaitForSingleObject
922         let micro_seconds = delayTime d - now
923             milli_seconds = (micro_seconds + 999) `div` 1000
924         in return (all, fromIntegral milli_seconds)
925
926 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
927 -- available yet.  We should move some Win32 functionality down here,
928 -- maybe as part of the grand reorganisation of the base package...
929 type HANDLE       = Ptr ()
930 type DWORD        = Word32
931
932 iNFINITE :: DWORD
933 iNFINITE = 0xFFFFFFFF -- urgh
934
935 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
936   c_getIOManagerEvent :: IO HANDLE
937
938 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
939   c_readIOManagerEvent :: IO Word32
940
941 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
942   c_sendIOManagerEvent :: Word32 -> IO ()
943
944 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
945    c_maperrno :: IO ()
946
947 foreign import stdcall "WaitForSingleObject"
948    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
949
950 #else
951 -- ----------------------------------------------------------------------------
952 -- Unix IO manager thread, using select()
953
954 startIOManagerThread :: IO ()
955 startIOManagerThread = do
956         allocaArray 2 $ \fds -> do
957         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
958         rd_end <- peekElemOff fds 0
959         wr_end <- peekElemOff fds 1
960         setNonBlockingFD wr_end True -- writes happen in a signal handler, we
961                                      -- don't want them to block.
962         setCloseOnExec rd_end
963         setCloseOnExec wr_end
964         writeIORef stick (fromIntegral wr_end)
965         c_setIOManagerPipe wr_end
966         forkIO $ do
967             allocaBytes sizeofFdSet   $ \readfds -> do
968             allocaBytes sizeofFdSet   $ \writefds -> do 
969             allocaBytes sizeofTimeVal $ \timeval -> do
970             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
971         return ()
972
973 service_loop
974    :: Fd                -- listen to this for wakeup calls
975    -> Ptr CFdSet
976    -> Ptr CFdSet
977    -> Ptr CTimeVal
978    -> [IOReq]
979    -> [DelayReq]
980    -> IO ()
981 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
982
983   -- pick up new IO requests
984   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
985   let reqs = new_reqs ++ old_reqs
986
987   -- pick up new delay requests
988   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
989   let  delays0 = foldr insertDelay old_delays new_delays
990
991   -- build the FDSets for select()
992   fdZero readfds
993   fdZero writefds
994   fdSet wakeup readfds
995   maxfd <- buildFdSets 0 readfds writefds reqs
996
997   -- perform the select()
998   let do_select delays = do
999           -- check the current time and wake up any thread in
1000           -- threadDelay whose timeout has expired.  Also find the
1001           -- timeout value for the select() call.
1002           now <- getUSecOfDay
1003           (delays', timeout) <- getDelay now ptimeval delays
1004
1005           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1006                         nullPtr timeout
1007           if (res == -1)
1008              then do
1009                 err <- getErrno
1010                 case err of
1011                   _ | err == eINTR ->  do_select delays'
1012                         -- EINTR: just redo the select()
1013                   _ | err == eBADF ->  return (True, delays)
1014                         -- EBADF: one of the file descriptors is closed or bad,
1015                         -- we don't know which one, so wake everyone up.
1016                   _ | otherwise    ->  throwErrno "select"
1017                         -- otherwise (ENOMEM or EINVAL) something has gone
1018                         -- wrong; report the error.
1019              else
1020                 return (False,delays')
1021
1022   (wakeup_all,delays') <- do_select delays0
1023
1024   exit <-
1025     if wakeup_all then return False
1026       else do
1027         b <- fdIsSet wakeup readfds
1028         if b == 0 
1029           then return False
1030           else alloca $ \p -> do 
1031                  c_read (fromIntegral wakeup) p 1
1032                  s <- peek p            
1033                  case s of
1034                   _ | s == io_MANAGER_WAKEUP -> return False
1035                   _ | s == io_MANAGER_DIE    -> return True
1036                   _ | s == io_MANAGER_SYNC   -> do
1037                        mvars <- readIORef sync
1038                        mapM_ (flip putMVar ()) mvars
1039                        return False
1040                   _ -> do
1041                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1042                        withForeignPtr fp $ \p_siginfo -> do
1043                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1044                                  sizeof_siginfo_t
1045                          when (r /= fromIntegral sizeof_siginfo_t) $
1046                             error "failed to read siginfo_t"
1047                        runHandlers' fp (fromIntegral s)
1048                        return False
1049
1050   if exit then return () else do
1051
1052   atomicModifyIORef prodding (\_ -> (False,False))
1053
1054   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1055                          else completeRequests reqs readfds writefds []
1056
1057   service_loop wakeup readfds writefds ptimeval reqs' delays'
1058
1059 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
1060 io_MANAGER_WAKEUP = 0xff
1061 io_MANAGER_DIE    = 0xfe
1062 io_MANAGER_SYNC   = 0xfd
1063
1064 -- | the stick is for poking the IO manager with
1065 stick :: IORef Fd
1066 {-# NOINLINE stick #-}
1067 stick = unsafePerformIO (newIORef 0)
1068
1069 {-# NOINLINE sync #-}
1070 sync :: IORef [MVar ()]
1071 sync = unsafePerformIO (newIORef [])
1072
1073 -- waits for the IO manager to drain the pipe
1074 syncIOManager :: IO ()
1075 syncIOManager = do
1076   m <- newEmptyMVar
1077   atomicModifyIORef sync (\old -> (m:old,()))
1078   fd <- readIORef stick
1079   with io_MANAGER_SYNC $ \pbuf -> do 
1080     c_write (fromIntegral fd) pbuf 1; return ()
1081   takeMVar m
1082
1083 wakeupIOManager :: IO ()
1084 wakeupIOManager = do
1085   fd <- readIORef stick
1086   with io_MANAGER_WAKEUP $ \pbuf -> do 
1087     c_write (fromIntegral fd) pbuf 1; return ()
1088
1089 -- For the non-threaded RTS
1090 runHandlers :: Ptr Word8 -> Int -> IO ()
1091 runHandlers p_info sig = do
1092   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1093   withForeignPtr fp $ \p -> do
1094     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1095     free p_info
1096   runHandlers' fp (fromIntegral sig)
1097
1098 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1099 runHandlers' p_info sig = do
1100   let int = fromIntegral sig
1101   withMVar signal_handlers $ \arr ->
1102       if not (inRange (boundsIOArray arr) int)
1103          then return ()
1104          else do handler <- unsafeReadIOArray arr int
1105                  case handler of
1106                     Nothing -> return ()
1107                     Just (f,_)  -> do forkIO (f p_info); return ()
1108
1109 foreign import ccall "setIOManagerPipe"
1110   c_setIOManagerPipe :: CInt -> IO ()
1111
1112 foreign import ccall "__hscore_sizeof_siginfo_t"
1113   sizeof_siginfo_t :: CSize
1114
1115 type Signal = CInt
1116
1117 maxSig = 64 :: Int
1118
1119 type HandlerFun = ForeignPtr Word8 -> IO ()
1120
1121 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1122 -- this race condition is #1922, although that bug was on Windows a similar
1123 -- bug also exists on Unix.
1124 {-# NOINLINE signal_handlers #-}
1125 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1126 signal_handlers = unsafePerformIO $ do
1127    arr <- newIOArray (0,maxSig) Nothing
1128    m <- newMVar arr
1129    block $ do
1130      stable_ref <- newStablePtr m
1131      let ref = castStablePtrToPtr stable_ref
1132      ref2 <- getOrSetSignalHandlerStore ref
1133      if ref==ref2
1134         then return m
1135         else do freeStablePtr stable_ref
1136                 deRefStablePtr (castPtrToStablePtr ref2)
1137
1138 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1139     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1140
1141 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1142 setHandler sig handler = do
1143   let int = fromIntegral sig
1144   withMVar signal_handlers $ \arr -> 
1145      if not (inRange (boundsIOArray arr) int)
1146         then error "GHC.Conc.setHandler: signal out of range"
1147         else do old <- unsafeReadIOArray arr int
1148                 unsafeWriteIOArray arr int handler
1149                 return old
1150
1151 -- -----------------------------------------------------------------------------
1152 -- IO requests
1153
1154 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1155 buildFdSets maxfd _       _        [] = return maxfd
1156 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1157   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1158   | otherwise        =  do
1159         fdSet fd readfds
1160         buildFdSets (max maxfd fd) readfds writefds reqs
1161 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1162   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1163   | otherwise        =  do
1164         fdSet fd writefds
1165         buildFdSets (max maxfd fd) readfds writefds reqs
1166
1167 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1168                  -> IO [IOReq]
1169 completeRequests [] _ _ reqs' = return reqs'
1170 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1171   b <- fdIsSet fd readfds
1172   if b /= 0
1173     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1174     else completeRequests reqs readfds writefds (Read fd m : reqs')
1175 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1176   b <- fdIsSet fd writefds
1177   if b /= 0
1178     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1179     else completeRequests reqs readfds writefds (Write fd m : reqs')
1180
1181 wakeupAll :: [IOReq] -> IO ()
1182 wakeupAll [] = return ()
1183 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1184 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1185
1186 waitForReadEvent :: Fd -> IO ()
1187 waitForReadEvent fd = do
1188   m <- newEmptyMVar
1189   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1190   prodServiceThread
1191   takeMVar m
1192
1193 waitForWriteEvent :: Fd -> IO ()
1194 waitForWriteEvent fd = do
1195   m <- newEmptyMVar
1196   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1197   prodServiceThread
1198   takeMVar m
1199
1200 -- -----------------------------------------------------------------------------
1201 -- Delays
1202
1203 -- Walk the queue of pending delays, waking up any that have passed
1204 -- and return the smallest delay to wait for.  The queue of pending
1205 -- delays is kept ordered.
1206 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1207 getDelay _   _        [] = return ([],nullPtr)
1208 getDelay now ptimeval all@(d : rest) 
1209   = case d of
1210      Delay time m | now >= time -> do
1211         putMVar m ()
1212         getDelay now ptimeval rest
1213      DelaySTM time t | now >= time -> do
1214         atomically $ writeTVar t True
1215         getDelay now ptimeval rest
1216      _otherwise -> do
1217         setTimevalTicks ptimeval (delayTime d - now)
1218         return (all,ptimeval)
1219
1220 data CTimeVal
1221
1222 foreign import ccall unsafe "sizeofTimeVal"
1223   sizeofTimeVal :: Int
1224
1225 foreign import ccall unsafe "setTimevalTicks" 
1226   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1227
1228 {- 
1229   On Win32 we're going to have a single Pipe, and a
1230   waitForSingleObject with the delay time.  For signals, we send a
1231   byte down the pipe just like on Unix.
1232 -}
1233
1234 -- ----------------------------------------------------------------------------
1235 -- select() interface
1236
1237 -- ToDo: move to System.Posix.Internals?
1238
1239 data CFdSet
1240
1241 foreign import ccall safe "select"
1242   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1243            -> IO CInt
1244
1245 foreign import ccall unsafe "hsFD_SETSIZE"
1246   c_fD_SETSIZE :: CInt
1247
1248 fD_SETSIZE :: Fd
1249 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1250
1251 foreign import ccall unsafe "hsFD_ISSET"
1252   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1253
1254 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1255 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1256
1257 foreign import ccall unsafe "hsFD_SET"
1258   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1259
1260 fdSet :: Fd -> Ptr CFdSet -> IO ()
1261 fdSet (Fd fd) fdset = c_fdSet fd fdset
1262
1263 foreign import ccall unsafe "hsFD_ZERO"
1264   fdZero :: Ptr CFdSet -> IO ()
1265
1266 foreign import ccall unsafe "sizeof_fd_set"
1267   sizeofFdSet :: Int
1268
1269 #endif
1270
1271 reportStackOverflow :: IO a
1272 reportStackOverflow = do callStackOverflowHook; return undefined
1273
1274 reportError :: SomeException -> IO a
1275 reportError ex = do
1276    handler <- getUncaughtExceptionHandler
1277    handler ex
1278    return undefined
1279
1280 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1281 -- the unsafe below.
1282 foreign import ccall unsafe "stackOverflow"
1283         callStackOverflowHook :: IO ()
1284
1285 {-# NOINLINE uncaughtExceptionHandler #-}
1286 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1287 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1288    where
1289       defaultHandler :: SomeException -> IO ()
1290       defaultHandler se@(SomeException ex) = do
1291          (hFlush stdout) `catchAny` (\ _ -> return ())
1292          let msg = case cast ex of
1293                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1294                _ -> case cast ex of
1295                     Just (ErrorCall s) -> s
1296                     _                  -> showsPrec 0 se ""
1297          withCString "%s" $ \cfmt ->
1298           withCString msg $ \cmsg ->
1299             errorBelch cfmt cmsg
1300
1301 -- don't use errorBelch() directly, because we cannot call varargs functions
1302 -- using the FFI.
1303 foreign import ccall unsafe "HsBase.h errorBelch2"
1304    errorBelch :: CString -> CString -> IO ()
1305
1306 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1307 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1308
1309 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1310 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1311 \end{code}