add readTVarIO :: TVar a -> IO a
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , yield         -- :: IO ()
41         , labelThread   -- :: ThreadId -> String -> IO ()
42
43         , ThreadStatus(..), BlockReason(..)
44         , threadStatus  -- :: ThreadId -> IO ThreadStatus
45
46         -- * Waiting
47         , threadDelay           -- :: Int -> IO ()
48         , registerDelay         -- :: Int -> IO (TVar Bool)
49         , threadWaitRead        -- :: Int -> IO ()
50         , threadWaitWrite       -- :: Int -> IO ()
51
52         -- * MVars
53         , MVar(..)
54         , newMVar       -- :: a -> IO (MVar a)
55         , newEmptyMVar  -- :: IO (MVar a)
56         , takeMVar      -- :: MVar a -> IO a
57         , putMVar       -- :: MVar a -> a -> IO ()
58         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
59         , tryPutMVar    -- :: MVar a -> a -> IO Bool
60         , isEmptyMVar   -- :: MVar a -> IO Bool
61         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
62
63         -- * TVars
64         , STM(..)
65         , atomically    -- :: STM a -> IO a
66         , retry         -- :: STM a
67         , orElse        -- :: STM a -> STM a -> STM a
68         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
69         , alwaysSucceeds -- :: STM a -> STM ()
70         , always        -- :: STM Bool -> STM ()
71         , TVar(..)
72         , newTVar       -- :: a -> STM (TVar a)
73         , newTVarIO     -- :: a -> STM (TVar a)
74         , readTVar      -- :: TVar a -> STM a
75         , readTVarIO    -- :: TVar a -> IO a
76         , writeTVar     -- :: a -> TVar a -> STM ()
77         , unsafeIOToSTM -- :: IO a -> STM a
78
79         -- * Miscellaneous
80 #ifdef mingw32_HOST_OS
81         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
84
85         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
86         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #endif
88
89 #ifndef mingw32_HOST_OS
90         , signalHandlerLock
91 #endif
92
93         , ensureIOManagerIsRunning
94
95 #ifdef mingw32_HOST_OS
96         , ConsoleEvent(..)
97         , win32ConsoleHandler
98         , toWin32ConsoleEvent
99 #endif
100         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
101         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
102
103         , reportError, reportStackOverflow
104         ) where
105
106 import System.Posix.Types
107 #ifndef mingw32_HOST_OS
108 import System.Posix.Internals
109 #endif
110 import Foreign
111 import Foreign.C
112
113 import Data.Maybe
114
115 import GHC.Base
116 import {-# SOURCE #-} GHC.Handle
117 import GHC.IOBase
118 import GHC.Num          ( Num(..) )
119 import GHC.Real         ( fromIntegral )
120 #ifdef mingw32_HOST_OS
121 import GHC.Real         ( div )
122 import GHC.Ptr          ( plusPtr, FunPtr(..) )
123 #endif
124 #ifdef mingw32_HOST_OS
125 import GHC.Read         ( Read )
126 import GHC.Enum         ( Enum )
127 #endif
128 import GHC.Exception    ( SomeException(..), throw )
129 import GHC.Pack         ( packCString# )
130 import GHC.Ptr          ( Ptr(..) )
131 import GHC.STRef
132 import GHC.Show         ( Show(..), showString )
133 import Data.Typeable
134 import GHC.Err
135
136 infixr 0 `par`, `pseq`
137 \end{code}
138
139 %************************************************************************
140 %*                                                                      *
141 \subsection{@ThreadId@, @par@, and @fork@}
142 %*                                                                      *
143 %************************************************************************
144
145 \begin{code}
146 data ThreadId = ThreadId ThreadId# deriving( Typeable )
147 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
148 -- But since ThreadId# is unlifted, the Weak type must use open
149 -- type variables.
150 {- ^
151 A 'ThreadId' is an abstract type representing a handle to a thread.
152 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
153 the 'Ord' instance implements an arbitrary total ordering over
154 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
155 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
156 useful when debugging or diagnosing the behaviour of a concurrent
157 program.
158
159 /Note/: in GHC, if you have a 'ThreadId', you essentially have
160 a pointer to the thread itself.  This means the thread itself can\'t be
161 garbage collected until you drop the 'ThreadId'.
162 This misfeature will hopefully be corrected at a later date.
163
164 /Note/: Hugs does not provide any operations on other threads;
165 it defines 'ThreadId' as a synonym for ().
166 -}
167
168 instance Show ThreadId where
169    showsPrec d t = 
170         showString "ThreadId " . 
171         showsPrec d (getThreadId (id2TSO t))
172
173 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
174
175 id2TSO :: ThreadId -> ThreadId#
176 id2TSO (ThreadId t) = t
177
178 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
179 -- Returns -1, 0, 1
180
181 cmpThread :: ThreadId -> ThreadId -> Ordering
182 cmpThread t1 t2 = 
183    case cmp_thread (id2TSO t1) (id2TSO t2) of
184       -1 -> LT
185       0  -> EQ
186       _  -> GT -- must be 1
187
188 instance Eq ThreadId where
189    t1 == t2 = 
190       case t1 `cmpThread` t2 of
191          EQ -> True
192          _  -> False
193
194 instance Ord ThreadId where
195    compare = cmpThread
196
197 {- |
198 Sparks off a new thread to run the 'IO' computation passed as the
199 first argument, and returns the 'ThreadId' of the newly created
200 thread.
201
202 The new thread will be a lightweight thread; if you want to use a foreign
203 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
204
205 GHC note: the new thread inherits the /blocked/ state of the parent 
206 (see 'Control.Exception.block').
207 -}
208 forkIO :: IO () -> IO ThreadId
209 forkIO action = IO $ \ s -> 
210    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
211  where
212   action_plus = catchException action childHandler
213
214 {- |
215 Like 'forkIO', but lets you specify on which CPU the thread is
216 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
217 will stay on the same CPU for its entire lifetime (`forkIO` threads
218 can migrate between CPUs according to the scheduling policy).
219 `forkOnIO` is useful for overriding the scheduling policy when you
220 know in advance how best to distribute the threads.
221
222 The `Int` argument specifies the CPU number; it is interpreted modulo
223 'numCapabilities' (note that it actually specifies a capability number
224 rather than a CPU number, but to a first approximation the two are
225 equivalent).
226 -}
227 forkOnIO :: Int -> IO () -> IO ThreadId
228 forkOnIO (I# cpu) action = IO $ \ s -> 
229    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
230  where
231   action_plus = catchException action childHandler
232
233 -- | the value passed to the @+RTS -N@ flag.  This is the number of
234 -- Haskell threads that can run truly simultaneously at any given
235 -- time, and is typically set to the number of physical CPU cores on
236 -- the machine.
237 numCapabilities :: Int
238 numCapabilities = unsafePerformIO $  do 
239                     n <- peek n_capabilities
240                     return (fromIntegral n)
241
242 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
243
244 childHandler :: SomeException -> IO ()
245 childHandler err = catchException (real_handler err) childHandler
246
247 real_handler :: SomeException -> IO ()
248 real_handler se@(SomeException ex) =
249   -- ignore thread GC and killThread exceptions:
250   case cast ex of
251   Just BlockedOnDeadMVar                -> return ()
252   _ -> case cast ex of
253        Just BlockedIndefinitely         -> return ()
254        _ -> case cast ex of
255             Just ThreadKilled           -> return ()
256             _ -> case cast ex of
257                  -- report all others:
258                  Just StackOverflow     -> reportStackOverflow
259                  _                      -> reportError se
260
261 {- | 'killThread' terminates the given thread (GHC only).
262 Any work already done by the thread isn\'t
263 lost: the computation is suspended until required by another thread.
264 The memory used by the thread will be garbage collected if it isn\'t
265 referenced from anywhere.  The 'killThread' function is defined in
266 terms of 'throwTo':
267
268 > killThread tid = throwTo tid ThreadKilled
269
270 Killthread is a no-op if the target thread has already completed.
271 -}
272 killThread :: ThreadId -> IO ()
273 killThread tid = throwTo tid ThreadKilled
274
275 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
276
277 'throwTo' does not return until the exception has been raised in the
278 target thread. 
279 The calling thread can thus be certain that the target
280 thread has received the exception.  This is a useful property to know
281 when dealing with race conditions: eg. if there are two threads that
282 can kill each other, it is guaranteed that only one of the threads
283 will get to kill the other.
284
285 If the target thread is currently making a foreign call, then the
286 exception will not be raised (and hence 'throwTo' will not return)
287 until the call has completed.  This is the case regardless of whether
288 the call is inside a 'block' or not.
289
290 Important note: the behaviour of 'throwTo' differs from that described in
291 the paper \"Asynchronous exceptions in Haskell\"
292 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
293 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
294 a more synchronous design in which 'throwTo' does not return until the exception
295 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
296 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
297 the paper).
298
299 There is currently no guarantee that the exception delivered by 'throwTo' will be
300 delivered at the first possible opportunity.  In particular, if a thread may 
301 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
302 a pending 'throwTo'.  This is arguably undesirable behaviour.
303
304  -}
305 throwTo :: Exception e => ThreadId -> e -> IO ()
306 throwTo (ThreadId tid) ex = IO $ \ s ->
307    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
308
309 -- | Returns the 'ThreadId' of the calling thread (GHC only).
310 myThreadId :: IO ThreadId
311 myThreadId = IO $ \s ->
312    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
313
314
315 -- |The 'yield' action allows (forces, in a co-operative multitasking
316 -- implementation) a context-switch to any other currently runnable
317 -- threads (if any), and is occasionally useful when implementing
318 -- concurrency abstractions.
319 yield :: IO ()
320 yield = IO $ \s -> 
321    case (yield# s) of s1 -> (# s1, () #)
322
323 {- | 'labelThread' stores a string as identifier for this thread if
324 you built a RTS with debugging support. This identifier will be used in
325 the debugging output to make distinction of different threads easier
326 (otherwise you only have the thread state object\'s address in the heap).
327
328 Other applications like the graphical Concurrent Haskell Debugger
329 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
330 'labelThread' for their purposes as well.
331 -}
332
333 labelThread :: ThreadId -> String -> IO ()
334 labelThread (ThreadId t) str = IO $ \ s ->
335    let ps  = packCString# str
336        adr = byteArrayContents# ps in
337      case (labelThread# t adr s) of s1 -> (# s1, () #)
338
339 --      Nota Bene: 'pseq' used to be 'seq'
340 --                 but 'seq' is now defined in PrelGHC
341 --
342 -- "pseq" is defined a bit weirdly (see below)
343 --
344 -- The reason for the strange "lazy" call is that
345 -- it fools the compiler into thinking that pseq  and par are non-strict in
346 -- their second argument (even if it inlines pseq at the call site).
347 -- If it thinks pseq is strict in "y", then it often evaluates
348 -- "y" before "x", which is totally wrong.  
349
350 {-# INLINE pseq  #-}
351 pseq :: a -> b -> b
352 pseq  x y = x `seq` lazy y
353
354 {-# INLINE par  #-}
355 par :: a -> b -> b
356 par  x y = case (par# x) of { _ -> lazy y }
357
358
359 data BlockReason
360   = BlockedOnMVar
361         -- ^blocked on on 'MVar'
362   | BlockedOnBlackHole
363         -- ^blocked on a computation in progress by another thread
364   | BlockedOnException
365         -- ^blocked in 'throwTo'
366   | BlockedOnSTM
367         -- ^blocked in 'retry' in an STM transaction
368   | BlockedOnForeignCall
369         -- ^currently in a foreign call
370   | BlockedOnOther
371         -- ^blocked on some other resource.  Without @-threaded@,
372         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
373         -- they show up as 'BlockedOnMVar'.
374   deriving (Eq,Ord,Show)
375
376 -- | The current status of a thread
377 data ThreadStatus
378   = ThreadRunning
379         -- ^the thread is currently runnable or running
380   | ThreadFinished
381         -- ^the thread has finished
382   | ThreadBlocked  BlockReason
383         -- ^the thread is blocked on some resource
384   | ThreadDied
385         -- ^the thread received an uncaught exception
386   deriving (Eq,Ord,Show)
387
388 threadStatus :: ThreadId -> IO ThreadStatus
389 threadStatus (ThreadId t) = IO $ \s ->
390    case threadStatus# t s of
391      (# s', stat #) -> (# s', mk_stat (I# stat) #)
392    where
393         -- NB. keep these in sync with includes/Constants.h
394      mk_stat 0  = ThreadRunning
395      mk_stat 1  = ThreadBlocked BlockedOnMVar
396      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
397      mk_stat 3  = ThreadBlocked BlockedOnException
398      mk_stat 7  = ThreadBlocked BlockedOnSTM
399      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
400      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
401      mk_stat 16 = ThreadFinished
402      mk_stat 17 = ThreadDied
403      mk_stat _  = ThreadBlocked BlockedOnOther
404 \end{code}
405
406
407 %************************************************************************
408 %*                                                                      *
409 \subsection[stm]{Transactional heap operations}
410 %*                                                                      *
411 %************************************************************************
412
413 TVars are shared memory locations which support atomic memory
414 transactions.
415
416 \begin{code}
417 -- |A monad supporting atomic memory transactions.
418 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
419
420 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
421 unSTM (STM a) = a
422
423 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
424
425 instance  Functor STM where
426    fmap f x = x >>= (return . f)
427
428 instance  Monad STM  where
429     {-# INLINE return #-}
430     {-# INLINE (>>)   #-}
431     {-# INLINE (>>=)  #-}
432     m >> k      = thenSTM m k
433     return x    = returnSTM x
434     m >>= k     = bindSTM m k
435
436 bindSTM :: STM a -> (a -> STM b) -> STM b
437 bindSTM (STM m) k = STM ( \s ->
438   case m s of 
439     (# new_s, a #) -> unSTM (k a) new_s
440   )
441
442 thenSTM :: STM a -> STM b -> STM b
443 thenSTM (STM m) k = STM ( \s ->
444   case m s of 
445     (# new_s, _ #) -> unSTM k new_s
446   )
447
448 returnSTM :: a -> STM a
449 returnSTM x = STM (\s -> (# s, x #))
450
451 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
452 -- dangerous thing to do.  
453 --
454 --   * The STM implementation will often run transactions multiple
455 --     times, so you need to be prepared for this if your IO has any
456 --     side effects.
457 --
458 --   * The STM implementation will abort transactions that are known to
459 --     be invalid and need to be restarted.  This may happen in the middle
460 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
461 --     that need releasing (exception handlers are ignored when aborting
462 --     the transaction).  That includes doing any IO using Handles, for
463 --     example.  Getting this wrong will probably lead to random deadlocks.
464 --
465 --   * The transaction may have seen an inconsistent view of memory when
466 --     the IO runs.  Invariants that you expect to be true throughout
467 --     your program may not be true inside a transaction, due to the
468 --     way transactions are implemented.  Normally this wouldn't be visible
469 --     to the programmer, but using `unsafeIOToSTM` can expose it.
470 --
471 unsafeIOToSTM :: IO a -> STM a
472 unsafeIOToSTM (IO m) = STM m
473
474 -- |Perform a series of STM actions atomically.
475 --
476 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
477 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
478 -- this would effectively allow a transaction inside a transaction, depending
479 -- on exactly when the thunk is evaluated.)
480 --
481 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
482 -- and which allows top-level TVars to be allocated.
483
484 atomically :: STM a -> IO a
485 atomically (STM m) = IO (\s -> (atomically# m) s )
486
487 -- |Retry execution of the current memory transaction because it has seen
488 -- values in TVars which mean that it should not continue (e.g. the TVars
489 -- represent a shared buffer that is now empty).  The implementation may
490 -- block the thread until one of the TVars that it has read from has been
491 -- udpated. (GHC only)
492 retry :: STM a
493 retry = STM $ \s# -> retry# s#
494
495 -- |Compose two alternative STM actions (GHC only).  If the first action
496 -- completes without retrying then it forms the result of the orElse.
497 -- Otherwise, if the first action retries, then the second action is
498 -- tried in its place.  If both actions retry then the orElse as a
499 -- whole retries.
500 orElse :: STM a -> STM a -> STM a
501 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
502
503 -- |Exception handling within STM actions.
504 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
505 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
506
507 -- | Low-level primitive on which always and alwaysSucceeds are built.
508 -- checkInv differs form these in that (i) the invariant is not 
509 -- checked when checkInv is called, only at the end of this and
510 -- subsequent transcations, (ii) the invariant failure is indicated
511 -- by raising an exception.
512 checkInv :: STM a -> STM ()
513 checkInv (STM m) = STM (\s -> (check# m) s)
514
515 -- | alwaysSucceeds adds a new invariant that must be true when passed
516 -- to alwaysSucceeds, at the end of the current transaction, and at
517 -- the end of every subsequent transaction.  If it fails at any
518 -- of those points then the transaction violating it is aborted
519 -- and the exception raised by the invariant is propagated.
520 alwaysSucceeds :: STM a -> STM ()
521 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
522                       checkInv i
523
524 -- | always is a variant of alwaysSucceeds in which the invariant is
525 -- expressed as an STM Bool action that must return True.  Returning
526 -- False or raising an exception are both treated as invariant failures.
527 always :: STM Bool -> STM ()
528 always i = alwaysSucceeds ( do v <- i
529                                if (v) then return () else ( error "Transacional invariant violation" ) )
530
531 -- |Shared memory locations that support atomic memory transactions.
532 data TVar a = TVar (TVar# RealWorld a)
533
534 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
535
536 instance Eq (TVar a) where
537         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
538
539 -- |Create a new TVar holding a value supplied
540 newTVar :: a -> STM (TVar a)
541 newTVar val = STM $ \s1# ->
542     case newTVar# val s1# of
543          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
544
545 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
546 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
547 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
548 -- possible.
549 newTVarIO :: a -> IO (TVar a)
550 newTVarIO val = IO $ \s1# ->
551     case newTVar# val s1# of
552          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
553
554 -- |Return the current value stored in a TVar.
555 -- This is equivalent to
556 --
557 -- >  readTVarIO = atomically . readTVar
558 --
559 -- but works much faster, because it doesn't perform a complete
560 -- transaction, it just reads the current value of the 'TVar'.
561 readTVarIO :: TVar a -> IO a
562 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
563
564 -- |Return the current value stored in a TVar
565 readTVar :: TVar a -> STM a
566 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
567
568 -- |Write the supplied value into a TVar
569 writeTVar :: TVar a -> a -> STM ()
570 writeTVar (TVar tvar#) val = STM $ \s1# ->
571     case writeTVar# tvar# val s1# of
572          s2# -> (# s2#, () #)
573   
574 \end{code}
575
576 %************************************************************************
577 %*                                                                      *
578 \subsection[mvars]{M-Structures}
579 %*                                                                      *
580 %************************************************************************
581
582 M-Vars are rendezvous points for concurrent threads.  They begin
583 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
584 is written, a single blocked thread may be freed.  Reading an M-Var
585 toggles its state from full back to empty.  Therefore, any value
586 written to an M-Var may only be read once.  Multiple reads and writes
587 are allowed, but there must be at least one read between any two
588 writes.
589
590 \begin{code}
591 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
592
593 -- |Create an 'MVar' which is initially empty.
594 newEmptyMVar  :: IO (MVar a)
595 newEmptyMVar = IO $ \ s# ->
596     case newMVar# s# of
597          (# s2#, svar# #) -> (# s2#, MVar svar# #)
598
599 -- |Create an 'MVar' which contains the supplied value.
600 newMVar :: a -> IO (MVar a)
601 newMVar value =
602     newEmptyMVar        >>= \ mvar ->
603     putMVar mvar value  >>
604     return mvar
605
606 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
607 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
608 -- the 'MVar' is left empty.
609 -- 
610 -- There are two further important properties of 'takeMVar':
611 --
612 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
613 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
614 --     only one thread will be woken up.  The runtime guarantees that
615 --     the woken thread completes its 'takeMVar' operation.
616 --
617 --   * When multiple threads are blocked on an 'MVar', they are
618 --     woken up in FIFO order.  This is useful for providing
619 --     fairness properties of abstractions built using 'MVar's.
620 --
621 takeMVar :: MVar a -> IO a
622 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
623
624 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
625 -- 'putMVar' will wait until it becomes empty.
626 --
627 -- There are two further important properties of 'putMVar':
628 --
629 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
630 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
631 --     only one thread will be woken up.  The runtime guarantees that
632 --     the woken thread completes its 'putMVar' operation.
633 --
634 --   * When multiple threads are blocked on an 'MVar', they are
635 --     woken up in FIFO order.  This is useful for providing
636 --     fairness properties of abstractions built using 'MVar's.
637 --
638 putMVar  :: MVar a -> a -> IO ()
639 putMVar (MVar mvar#) x = IO $ \ s# ->
640     case putMVar# mvar# x s# of
641         s2# -> (# s2#, () #)
642
643 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
644 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
645 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
646 -- the 'MVar' is left empty.
647 tryTakeMVar :: MVar a -> IO (Maybe a)
648 tryTakeMVar (MVar m) = IO $ \ s ->
649     case tryTakeMVar# m s of
650         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
651         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
652
653 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
654 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
655 -- it was successful, or 'False' otherwise.
656 tryPutMVar  :: MVar a -> a -> IO Bool
657 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
658     case tryPutMVar# mvar# x s# of
659         (# s, 0# #) -> (# s, False #)
660         (# s, _  #) -> (# s, True #)
661
662 -- |Check whether a given 'MVar' is empty.
663 --
664 -- Notice that the boolean value returned  is just a snapshot of
665 -- the state of the MVar. By the time you get to react on its result,
666 -- the MVar may have been filled (or emptied) - so be extremely
667 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
668 isEmptyMVar :: MVar a -> IO Bool
669 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
670     case isEmptyMVar# mv# s# of
671         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
672
673 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
674 -- "System.Mem.Weak" for more about finalizers.
675 addMVarFinalizer :: MVar a -> IO () -> IO ()
676 addMVarFinalizer (MVar m) finalizer = 
677   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
678
679 withMVar :: MVar a -> (a -> IO b) -> IO b
680 withMVar m io = 
681   block $ do
682     a <- takeMVar m
683     b <- catchAny (unblock (io a))
684             (\e -> do putMVar m a; throw e)
685     putMVar m a
686     return b
687 \end{code}
688
689
690 %************************************************************************
691 %*                                                                      *
692 \subsection{Thread waiting}
693 %*                                                                      *
694 %************************************************************************
695
696 \begin{code}
697 #ifdef mingw32_HOST_OS
698
699 -- Note: threadWaitRead and threadWaitWrite aren't really functional
700 -- on Win32, but left in there because lib code (still) uses them (the manner
701 -- in which they're used doesn't cause problems on a Win32 platform though.)
702
703 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
704 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
705   IO $ \s -> case asyncRead# fd isSock len buf s of 
706                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
707
708 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
709 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
710   IO $ \s -> case asyncWrite# fd isSock len buf s of 
711                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
712
713 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
714 asyncDoProc (FunPtr proc) (Ptr param) = 
715     -- the 'length' value is ignored; simplifies implementation of
716     -- the async*# primops to have them all return the same result.
717   IO $ \s -> case asyncDoProc# proc param s  of 
718                (# s', _len#, err# #) -> (# s', I# err# #)
719
720 -- to aid the use of these primops by the IO Handle implementation,
721 -- provide the following convenience funs:
722
723 -- this better be a pinned byte array!
724 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
725 asyncReadBA fd isSock len off bufB = 
726   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
727   
728 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
729 asyncWriteBA fd isSock len off bufB = 
730   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
731
732 #endif
733
734 -- -----------------------------------------------------------------------------
735 -- Thread IO API
736
737 -- | Block the current thread until data is available to read on the
738 -- given file descriptor (GHC only).
739 threadWaitRead :: Fd -> IO ()
740 threadWaitRead fd
741 #ifndef mingw32_HOST_OS
742   | threaded  = waitForReadEvent fd
743 #endif
744   | otherwise = IO $ \s -> 
745         case fromIntegral fd of { I# fd# ->
746         case waitRead# fd# s of { s' -> (# s', () #)
747         }}
748
749 -- | Block the current thread until data can be written to the
750 -- given file descriptor (GHC only).
751 threadWaitWrite :: Fd -> IO ()
752 threadWaitWrite fd
753 #ifndef mingw32_HOST_OS
754   | threaded  = waitForWriteEvent fd
755 #endif
756   | otherwise = IO $ \s -> 
757         case fromIntegral fd of { I# fd# ->
758         case waitWrite# fd# s of { s' -> (# s', () #)
759         }}
760
761 -- | Suspends the current thread for a given number of microseconds
762 -- (GHC only).
763 --
764 -- There is no guarantee that the thread will be rescheduled promptly
765 -- when the delay has expired, but the thread will never continue to
766 -- run /earlier/ than specified.
767 --
768 threadDelay :: Int -> IO ()
769 threadDelay time
770   | threaded  = waitForDelayEvent time
771   | otherwise = IO $ \s -> 
772         case fromIntegral time of { I# time# ->
773         case delay# time# s of { s' -> (# s', () #)
774         }}
775
776
777 -- | Set the value of returned TVar to True after a given number of
778 -- microseconds. The caveats associated with threadDelay also apply.
779 --
780 registerDelay :: Int -> IO (TVar Bool)
781 registerDelay usecs 
782   | threaded = waitForDelayEventSTM usecs
783   | otherwise = error "registerDelay: requires -threaded"
784
785 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
786
787 waitForDelayEvent :: Int -> IO ()
788 waitForDelayEvent usecs = do
789   m <- newEmptyMVar
790   target <- calculateTarget usecs
791   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
792   prodServiceThread
793   takeMVar m
794
795 -- Delays for use in STM
796 waitForDelayEventSTM :: Int -> IO (TVar Bool)
797 waitForDelayEventSTM usecs = do
798    t <- atomically $ newTVar False
799    target <- calculateTarget usecs
800    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
801    prodServiceThread
802    return t  
803     
804 calculateTarget :: Int -> IO USecs
805 calculateTarget usecs = do
806     now <- getUSecOfDay
807     return $ now + (fromIntegral usecs)
808
809
810 -- ----------------------------------------------------------------------------
811 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
812
813 -- In the threaded RTS, we employ a single IO Manager thread to wait
814 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
815 -- and delays (threadDelay).  
816 --
817 -- We can do this because in the threaded RTS the IO Manager can make
818 -- a non-blocking call to select(), so we don't have to do select() in
819 -- the scheduler as we have to in the non-threaded RTS.  We get performance
820 -- benefits from doing it this way, because we only have to restart the select()
821 -- when a new request arrives, rather than doing one select() each time
822 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
823 -- by not having to check for completed IO requests.
824
825 -- Issues, possible problems:
826 --
827 --      - we might want bound threads to just do the blocking
828 --        operation rather than communicating with the IO manager
829 --        thread.  This would prevent simgle-threaded programs which do
830 --        IO from requiring multiple OS threads.  However, it would also
831 --        prevent bound threads waiting on IO from being killed or sent
832 --        exceptions.
833 --
834 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
835 --        I couldn't repeat this.
836 --
837 --      - How do we handle signal delivery in the multithreaded RTS?
838 --
839 --      - forkProcess will kill the IO manager thread.  Let's just
840 --        hope we don't need to do any blocking IO between fork & exec.
841
842 #ifndef mingw32_HOST_OS
843 data IOReq
844   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
845   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
846 #endif
847
848 data DelayReq
849   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
850   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
851
852 #ifndef mingw32_HOST_OS
853 pendingEvents :: IORef [IOReq]
854 #endif
855 pendingDelays :: IORef [DelayReq]
856         -- could use a strict list or array here
857 {-# NOINLINE pendingEvents #-}
858 {-# NOINLINE pendingDelays #-}
859 (pendingEvents,pendingDelays) = unsafePerformIO $ do
860   startIOManagerThread
861   reqs <- newIORef []
862   dels <- newIORef []
863   return (reqs, dels)
864         -- the first time we schedule an IO request, the service thread
865         -- will be created (cool, huh?)
866
867 ensureIOManagerIsRunning :: IO ()
868 ensureIOManagerIsRunning 
869   | threaded  = seq pendingEvents $ return ()
870   | otherwise = return ()
871
872 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
873 insertDelay d [] = [d]
874 insertDelay d1 ds@(d2 : rest)
875   | delayTime d1 <= delayTime d2 = d1 : ds
876   | otherwise                    = d2 : insertDelay d1 rest
877
878 delayTime :: DelayReq -> USecs
879 delayTime (Delay t _) = t
880 delayTime (DelaySTM t _) = t
881
882 type USecs = Word64
883
884 -- XXX: move into GHC.IOBase from Data.IORef?
885 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
886 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
887
888 foreign import ccall unsafe "getUSecOfDay" 
889   getUSecOfDay :: IO USecs
890
891 prodding :: IORef Bool
892 {-# NOINLINE prodding #-}
893 prodding = unsafePerformIO (newIORef False)
894
895 prodServiceThread :: IO ()
896 prodServiceThread = do
897   was_set <- atomicModifyIORef prodding (\a -> (True,a))
898   if (not (was_set)) then wakeupIOManager else return ()
899
900 #ifdef mingw32_HOST_OS
901 -- ----------------------------------------------------------------------------
902 -- Windows IO manager thread
903
904 startIOManagerThread :: IO ()
905 startIOManagerThread = do
906   wakeup <- c_getIOManagerEvent
907   forkIO $ service_loop wakeup []
908   return ()
909
910 service_loop :: HANDLE          -- read end of pipe
911              -> [DelayReq]      -- current delay requests
912              -> IO ()
913
914 service_loop wakeup old_delays = do
915   -- pick up new delay requests
916   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
917   let  delays = foldr insertDelay old_delays new_delays
918
919   now <- getUSecOfDay
920   (delays', timeout) <- getDelay now delays
921
922   r <- c_WaitForSingleObject wakeup timeout
923   case r of
924     0xffffffff -> do c_maperrno; throwErrno "service_loop"
925     0 -> do
926         r2 <- c_readIOManagerEvent
927         exit <- 
928               case r2 of
929                 _ | r2 == io_MANAGER_WAKEUP -> return False
930                 _ | r2 == io_MANAGER_DIE    -> return True
931                 0 -> return False -- spurious wakeup
932                 _ -> do start_console_handler (r2 `shiftR` 1); return False
933         if exit
934           then return ()
935           else service_cont wakeup delays'
936
937     _other -> service_cont wakeup delays' -- probably timeout        
938
939 service_cont :: HANDLE -> [DelayReq] -> IO ()
940 service_cont wakeup delays = do
941   atomicModifyIORef prodding (\_ -> (False,False))
942   service_loop wakeup delays
943
944 -- must agree with rts/win32/ThrIOManager.c
945 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
946 io_MANAGER_WAKEUP = 0xffffffff
947 io_MANAGER_DIE    = 0xfffffffe
948
949 data ConsoleEvent
950  = ControlC
951  | Break
952  | Close
953     -- these are sent to Services only.
954  | Logoff
955  | Shutdown
956  deriving (Eq, Ord, Enum, Show, Read, Typeable)
957
958 start_console_handler :: Word32 -> IO ()
959 start_console_handler r =
960   case toWin32ConsoleEvent r of
961      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
962                     forkIO (handler x)
963                     return ()
964      Nothing -> return ()
965
966 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
967 toWin32ConsoleEvent ev = 
968    case ev of
969        0 {- CTRL_C_EVENT-}        -> Just ControlC
970        1 {- CTRL_BREAK_EVENT-}    -> Just Break
971        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
972        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
973        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
974        _ -> Nothing
975
976 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
977 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
978
979 -- XXX Is this actually needed?
980 stick :: IORef HANDLE
981 {-# NOINLINE stick #-}
982 stick = unsafePerformIO (newIORef nullPtr)
983
984 wakeupIOManager :: IO ()
985 wakeupIOManager = do 
986   _hdl <- readIORef stick
987   c_sendIOManagerEvent io_MANAGER_WAKEUP
988
989 -- Walk the queue of pending delays, waking up any that have passed
990 -- and return the smallest delay to wait for.  The queue of pending
991 -- delays is kept ordered.
992 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
993 getDelay _   [] = return ([], iNFINITE)
994 getDelay now all@(d : rest) 
995   = case d of
996      Delay time m | now >= time -> do
997         putMVar m ()
998         getDelay now rest
999      DelaySTM time t | now >= time -> do
1000         atomically $ writeTVar t True
1001         getDelay now rest
1002      _otherwise ->
1003         -- delay is in millisecs for WaitForSingleObject
1004         let micro_seconds = delayTime d - now
1005             milli_seconds = (micro_seconds + 999) `div` 1000
1006         in return (all, fromIntegral milli_seconds)
1007
1008 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1009 -- available yet.  We should move some Win32 functionality down here,
1010 -- maybe as part of the grand reorganisation of the base package...
1011 type HANDLE       = Ptr ()
1012 type DWORD        = Word32
1013
1014 iNFINITE :: DWORD
1015 iNFINITE = 0xFFFFFFFF -- urgh
1016
1017 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1018   c_getIOManagerEvent :: IO HANDLE
1019
1020 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1021   c_readIOManagerEvent :: IO Word32
1022
1023 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1024   c_sendIOManagerEvent :: Word32 -> IO ()
1025
1026 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1027    c_maperrno :: IO ()
1028
1029 foreign import stdcall "WaitForSingleObject"
1030    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1031
1032 #else
1033 -- ----------------------------------------------------------------------------
1034 -- Unix IO manager thread, using select()
1035
1036 startIOManagerThread :: IO ()
1037 startIOManagerThread = do
1038         allocaArray 2 $ \fds -> do
1039         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1040         rd_end <- peekElemOff fds 0
1041         wr_end <- peekElemOff fds 1
1042         writeIORef stick (fromIntegral wr_end)
1043         c_setIOManagerPipe wr_end
1044         forkIO $ do
1045             allocaBytes sizeofFdSet   $ \readfds -> do
1046             allocaBytes sizeofFdSet   $ \writefds -> do 
1047             allocaBytes sizeofTimeVal $ \timeval -> do
1048             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1049         return ()
1050
1051 service_loop
1052    :: Fd                -- listen to this for wakeup calls
1053    -> Ptr CFdSet
1054    -> Ptr CFdSet
1055    -> Ptr CTimeVal
1056    -> [IOReq]
1057    -> [DelayReq]
1058    -> IO ()
1059 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1060
1061   -- pick up new IO requests
1062   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1063   let reqs = new_reqs ++ old_reqs
1064
1065   -- pick up new delay requests
1066   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1067   let  delays0 = foldr insertDelay old_delays new_delays
1068
1069   -- build the FDSets for select()
1070   fdZero readfds
1071   fdZero writefds
1072   fdSet wakeup readfds
1073   maxfd <- buildFdSets 0 readfds writefds reqs
1074
1075   -- perform the select()
1076   let do_select delays = do
1077           -- check the current time and wake up any thread in
1078           -- threadDelay whose timeout has expired.  Also find the
1079           -- timeout value for the select() call.
1080           now <- getUSecOfDay
1081           (delays', timeout) <- getDelay now ptimeval delays
1082
1083           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1084                         nullPtr timeout
1085           if (res == -1)
1086              then do
1087                 err <- getErrno
1088                 case err of
1089                   _ | err == eINTR ->  do_select delays'
1090                         -- EINTR: just redo the select()
1091                   _ | err == eBADF ->  return (True, delays)
1092                         -- EBADF: one of the file descriptors is closed or bad,
1093                         -- we don't know which one, so wake everyone up.
1094                   _ | otherwise    ->  throwErrno "select"
1095                         -- otherwise (ENOMEM or EINVAL) something has gone
1096                         -- wrong; report the error.
1097              else
1098                 return (False,delays')
1099
1100   (wakeup_all,delays') <- do_select delays0
1101
1102   exit <-
1103     if wakeup_all then return False
1104       else do
1105         b <- fdIsSet wakeup readfds
1106         if b == 0 
1107           then return False
1108           else alloca $ \p -> do 
1109                  c_read (fromIntegral wakeup) p 1; return ()
1110                  s <- peek p            
1111                  case s of
1112                   _ | s == io_MANAGER_WAKEUP -> return False
1113                   _ | s == io_MANAGER_DIE    -> return True
1114                   _ -> withMVar signalHandlerLock $ \_ -> do
1115                           handler_tbl <- peek handlers
1116                           sp <- peekElemOff handler_tbl (fromIntegral s)
1117                           io <- deRefStablePtr sp
1118                           forkIO io
1119                           return False
1120
1121   if exit then return () else do
1122
1123   atomicModifyIORef prodding (\_ -> (False,False))
1124
1125   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1126                          else completeRequests reqs readfds writefds []
1127
1128   service_loop wakeup readfds writefds ptimeval reqs' delays'
1129
1130 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1131 io_MANAGER_WAKEUP = 0xff
1132 io_MANAGER_DIE    = 0xfe
1133
1134 stick :: IORef Fd
1135 {-# NOINLINE stick #-}
1136 stick = unsafePerformIO (newIORef 0)
1137
1138 wakeupIOManager :: IO ()
1139 wakeupIOManager = do
1140   fd <- readIORef stick
1141   with io_MANAGER_WAKEUP $ \pbuf -> do 
1142     c_write (fromIntegral fd) pbuf 1; return ()
1143
1144 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1145 -- this race condition is #1922, although that bug was on Windows a similar
1146 -- bug also exists on Unix.
1147 signalHandlerLock :: MVar ()
1148 signalHandlerLock = unsafePerformIO (newMVar ())
1149
1150 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1151
1152 foreign import ccall "setIOManagerPipe"
1153   c_setIOManagerPipe :: CInt -> IO ()
1154
1155 -- -----------------------------------------------------------------------------
1156 -- IO requests
1157
1158 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1159 buildFdSets maxfd _       _        [] = return maxfd
1160 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1161   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1162   | otherwise        =  do
1163         fdSet fd readfds
1164         buildFdSets (max maxfd fd) readfds writefds reqs
1165 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1166   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1167   | otherwise        =  do
1168         fdSet fd writefds
1169         buildFdSets (max maxfd fd) readfds writefds reqs
1170
1171 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1172                  -> IO [IOReq]
1173 completeRequests [] _ _ reqs' = return reqs'
1174 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1175   b <- fdIsSet fd readfds
1176   if b /= 0
1177     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1178     else completeRequests reqs readfds writefds (Read fd m : reqs')
1179 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1180   b <- fdIsSet fd writefds
1181   if b /= 0
1182     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1183     else completeRequests reqs readfds writefds (Write fd m : reqs')
1184
1185 wakeupAll :: [IOReq] -> IO ()
1186 wakeupAll [] = return ()
1187 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1188 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1189
1190 waitForReadEvent :: Fd -> IO ()
1191 waitForReadEvent fd = do
1192   m <- newEmptyMVar
1193   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1194   prodServiceThread
1195   takeMVar m
1196
1197 waitForWriteEvent :: Fd -> IO ()
1198 waitForWriteEvent fd = do
1199   m <- newEmptyMVar
1200   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1201   prodServiceThread
1202   takeMVar m
1203
1204 -- -----------------------------------------------------------------------------
1205 -- Delays
1206
1207 -- Walk the queue of pending delays, waking up any that have passed
1208 -- and return the smallest delay to wait for.  The queue of pending
1209 -- delays is kept ordered.
1210 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1211 getDelay _   _        [] = return ([],nullPtr)
1212 getDelay now ptimeval all@(d : rest) 
1213   = case d of
1214      Delay time m | now >= time -> do
1215         putMVar m ()
1216         getDelay now ptimeval rest
1217      DelaySTM time t | now >= time -> do
1218         atomically $ writeTVar t True
1219         getDelay now ptimeval rest
1220      _otherwise -> do
1221         setTimevalTicks ptimeval (delayTime d - now)
1222         return (all,ptimeval)
1223
1224 data CTimeVal
1225
1226 foreign import ccall unsafe "sizeofTimeVal"
1227   sizeofTimeVal :: Int
1228
1229 foreign import ccall unsafe "setTimevalTicks" 
1230   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1231
1232 {- 
1233   On Win32 we're going to have a single Pipe, and a
1234   waitForSingleObject with the delay time.  For signals, we send a
1235   byte down the pipe just like on Unix.
1236 -}
1237
1238 -- ----------------------------------------------------------------------------
1239 -- select() interface
1240
1241 -- ToDo: move to System.Posix.Internals?
1242
1243 data CFdSet
1244
1245 foreign import ccall safe "select"
1246   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1247            -> IO CInt
1248
1249 foreign import ccall unsafe "hsFD_SETSIZE"
1250   c_fD_SETSIZE :: CInt
1251
1252 fD_SETSIZE :: Fd
1253 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1254
1255 foreign import ccall unsafe "hsFD_ISSET"
1256   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1257
1258 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1259 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1260
1261 foreign import ccall unsafe "hsFD_SET"
1262   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1263
1264 fdSet :: Fd -> Ptr CFdSet -> IO ()
1265 fdSet (Fd fd) fdset = c_fdSet fd fdset
1266
1267 foreign import ccall unsafe "hsFD_ZERO"
1268   fdZero :: Ptr CFdSet -> IO ()
1269
1270 foreign import ccall unsafe "sizeof_fd_set"
1271   sizeofFdSet :: Int
1272
1273 #endif
1274
1275 reportStackOverflow :: IO a
1276 reportStackOverflow = do callStackOverflowHook; return undefined
1277
1278 reportError :: SomeException -> IO a
1279 reportError ex = do
1280    handler <- getUncaughtExceptionHandler
1281    handler ex
1282    return undefined
1283
1284 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1285 -- the unsafe below.
1286 foreign import ccall unsafe "stackOverflow"
1287         callStackOverflowHook :: IO ()
1288
1289 {-# NOINLINE uncaughtExceptionHandler #-}
1290 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1291 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1292    where
1293       defaultHandler :: SomeException -> IO ()
1294       defaultHandler se@(SomeException ex) = do
1295          (hFlush stdout) `catchAny` (\ _ -> return ())
1296          let msg = case cast ex of
1297                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1298                _ -> case cast ex of
1299                     Just (ErrorCall s) -> s
1300                     _                  -> showsPrec 0 se ""
1301          withCString "%s" $ \cfmt ->
1302           withCString msg $ \cmsg ->
1303             errorBelch cfmt cmsg
1304
1305 -- don't use errorBelch() directly, because we cannot call varargs functions
1306 -- using the FFI.
1307 foreign import ccall unsafe "HsBase.h errorBelch2"
1308    errorBelch :: CString -> CString -> IO ()
1309
1310 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1311 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1312
1313 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1314 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1315 \end{code}