Import n_capabilities via import symbol when linking dynamically
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , yield         -- :: IO ()
41         , labelThread   -- :: ThreadId -> String -> IO ()
42
43         , ThreadStatus(..), BlockReason(..)
44         , threadStatus  -- :: ThreadId -> IO ThreadStatus
45
46         -- * Waiting
47         , threadDelay           -- :: Int -> IO ()
48         , registerDelay         -- :: Int -> IO (TVar Bool)
49         , threadWaitRead        -- :: Int -> IO ()
50         , threadWaitWrite       -- :: Int -> IO ()
51
52         -- * MVars
53         , MVar(..)
54         , newMVar       -- :: a -> IO (MVar a)
55         , newEmptyMVar  -- :: IO (MVar a)
56         , takeMVar      -- :: MVar a -> IO a
57         , putMVar       -- :: MVar a -> a -> IO ()
58         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
59         , tryPutMVar    -- :: MVar a -> a -> IO Bool
60         , isEmptyMVar   -- :: MVar a -> IO Bool
61         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
62
63         -- * TVars
64         , STM(..)
65         , atomically    -- :: STM a -> IO a
66         , retry         -- :: STM a
67         , orElse        -- :: STM a -> STM a -> STM a
68         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
69         , alwaysSucceeds -- :: STM a -> STM ()
70         , always        -- :: STM Bool -> STM ()
71         , TVar(..)
72         , newTVar       -- :: a -> STM (TVar a)
73         , newTVarIO     -- :: a -> STM (TVar a)
74         , readTVar      -- :: TVar a -> STM a
75         , readTVarIO    -- :: TVar a -> IO a
76         , writeTVar     -- :: a -> TVar a -> STM ()
77         , unsafeIOToSTM -- :: IO a -> STM a
78
79         -- * Miscellaneous
80 #ifdef mingw32_HOST_OS
81         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
82         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
84
85         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
86         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87 #endif
88
89 #ifndef mingw32_HOST_OS
90         , signalHandlerLock
91 #endif
92
93         , ensureIOManagerIsRunning
94
95 #ifdef mingw32_HOST_OS
96         , ConsoleEvent(..)
97         , win32ConsoleHandler
98         , toWin32ConsoleEvent
99 #endif
100         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
101         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
102
103         , reportError, reportStackOverflow
104         ) where
105
106 import System.Posix.Types
107 #ifndef mingw32_HOST_OS
108 import System.Posix.Internals
109 #endif
110 import Foreign
111 import Foreign.C
112
113 import Data.Maybe
114
115 import GHC.Base
116 import {-# SOURCE #-} GHC.Handle
117 import GHC.IOBase
118 import GHC.Num          ( Num(..) )
119 import GHC.Real         ( fromIntegral )
120 #ifdef mingw32_HOST_OS
121 import GHC.Real         ( div )
122 import GHC.Ptr          ( plusPtr, FunPtr(..) )
123 #endif
124 #ifdef mingw32_HOST_OS
125 import GHC.Read         ( Read )
126 import GHC.Enum         ( Enum )
127 #endif
128 import GHC.Exception    ( SomeException(..), throw )
129 import GHC.Pack         ( packCString# )
130 import GHC.Ptr          ( Ptr(..) )
131 import GHC.STRef
132 import GHC.Show         ( Show(..), showString )
133 import Data.Typeable
134 import GHC.Err
135
136 infixr 0 `par`, `pseq`
137 \end{code}
138
139 %************************************************************************
140 %*                                                                      *
141 \subsection{@ThreadId@, @par@, and @fork@}
142 %*                                                                      *
143 %************************************************************************
144
145 \begin{code}
146 data ThreadId = ThreadId ThreadId# deriving( Typeable )
147 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
148 -- But since ThreadId# is unlifted, the Weak type must use open
149 -- type variables.
150 {- ^
151 A 'ThreadId' is an abstract type representing a handle to a thread.
152 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
153 the 'Ord' instance implements an arbitrary total ordering over
154 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
155 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
156 useful when debugging or diagnosing the behaviour of a concurrent
157 program.
158
159 /Note/: in GHC, if you have a 'ThreadId', you essentially have
160 a pointer to the thread itself.  This means the thread itself can\'t be
161 garbage collected until you drop the 'ThreadId'.
162 This misfeature will hopefully be corrected at a later date.
163
164 /Note/: Hugs does not provide any operations on other threads;
165 it defines 'ThreadId' as a synonym for ().
166 -}
167
168 instance Show ThreadId where
169    showsPrec d t = 
170         showString "ThreadId " . 
171         showsPrec d (getThreadId (id2TSO t))
172
173 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
174
175 id2TSO :: ThreadId -> ThreadId#
176 id2TSO (ThreadId t) = t
177
178 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
179 -- Returns -1, 0, 1
180
181 cmpThread :: ThreadId -> ThreadId -> Ordering
182 cmpThread t1 t2 = 
183    case cmp_thread (id2TSO t1) (id2TSO t2) of
184       -1 -> LT
185       0  -> EQ
186       _  -> GT -- must be 1
187
188 instance Eq ThreadId where
189    t1 == t2 = 
190       case t1 `cmpThread` t2 of
191          EQ -> True
192          _  -> False
193
194 instance Ord ThreadId where
195    compare = cmpThread
196
197 {- |
198 Sparks off a new thread to run the 'IO' computation passed as the
199 first argument, and returns the 'ThreadId' of the newly created
200 thread.
201
202 The new thread will be a lightweight thread; if you want to use a foreign
203 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
204
205 GHC note: the new thread inherits the /blocked/ state of the parent 
206 (see 'Control.Exception.block').
207 -}
208 forkIO :: IO () -> IO ThreadId
209 forkIO action = IO $ \ s -> 
210    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
211  where
212   action_plus = catchException action childHandler
213
214 {- |
215 Like 'forkIO', but lets you specify on which CPU the thread is
216 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
217 will stay on the same CPU for its entire lifetime (`forkIO` threads
218 can migrate between CPUs according to the scheduling policy).
219 `forkOnIO` is useful for overriding the scheduling policy when you
220 know in advance how best to distribute the threads.
221
222 The `Int` argument specifies the CPU number; it is interpreted modulo
223 'numCapabilities' (note that it actually specifies a capability number
224 rather than a CPU number, but to a first approximation the two are
225 equivalent).
226 -}
227 forkOnIO :: Int -> IO () -> IO ThreadId
228 forkOnIO (I# cpu) action = IO $ \ s -> 
229    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
230  where
231   action_plus = catchException action childHandler
232
233 -- | the value passed to the @+RTS -N@ flag.  This is the number of
234 -- Haskell threads that can run truly simultaneously at any given
235 -- time, and is typically set to the number of physical CPU cores on
236 -- the machine.
237 numCapabilities :: Int
238 numCapabilities = unsafePerformIO $  do 
239                     n <- peek n_capabilities
240                     return (fromIntegral n)
241
242 #if defined(mingw32_HOST_OS) && defined(__PIC__)
243 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
244 #else
245 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
246 #endif
247 childHandler :: SomeException -> IO ()
248 childHandler err = catchException (real_handler err) childHandler
249
250 real_handler :: SomeException -> IO ()
251 real_handler se@(SomeException ex) =
252   -- ignore thread GC and killThread exceptions:
253   case cast ex of
254   Just BlockedOnDeadMVar                -> return ()
255   _ -> case cast ex of
256        Just BlockedIndefinitely         -> return ()
257        _ -> case cast ex of
258             Just ThreadKilled           -> return ()
259             _ -> case cast ex of
260                  -- report all others:
261                  Just StackOverflow     -> reportStackOverflow
262                  _                      -> reportError se
263
264 {- | 'killThread' terminates the given thread (GHC only).
265 Any work already done by the thread isn\'t
266 lost: the computation is suspended until required by another thread.
267 The memory used by the thread will be garbage collected if it isn\'t
268 referenced from anywhere.  The 'killThread' function is defined in
269 terms of 'throwTo':
270
271 > killThread tid = throwTo tid ThreadKilled
272
273 Killthread is a no-op if the target thread has already completed.
274 -}
275 killThread :: ThreadId -> IO ()
276 killThread tid = throwTo tid ThreadKilled
277
278 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
279
280 'throwTo' does not return until the exception has been raised in the
281 target thread. 
282 The calling thread can thus be certain that the target
283 thread has received the exception.  This is a useful property to know
284 when dealing with race conditions: eg. if there are two threads that
285 can kill each other, it is guaranteed that only one of the threads
286 will get to kill the other.
287
288 If the target thread is currently making a foreign call, then the
289 exception will not be raised (and hence 'throwTo' will not return)
290 until the call has completed.  This is the case regardless of whether
291 the call is inside a 'block' or not.
292
293 Important note: the behaviour of 'throwTo' differs from that described in
294 the paper \"Asynchronous exceptions in Haskell\"
295 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
296 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
297 a more synchronous design in which 'throwTo' does not return until the exception
298 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
299 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
300 the paper).
301
302 There is currently no guarantee that the exception delivered by 'throwTo' will be
303 delivered at the first possible opportunity.  In particular, if a thread may 
304 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
305 a pending 'throwTo'.  This is arguably undesirable behaviour.
306
307  -}
308 throwTo :: Exception e => ThreadId -> e -> IO ()
309 throwTo (ThreadId tid) ex = IO $ \ s ->
310    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
311
312 -- | Returns the 'ThreadId' of the calling thread (GHC only).
313 myThreadId :: IO ThreadId
314 myThreadId = IO $ \s ->
315    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
316
317
318 -- |The 'yield' action allows (forces, in a co-operative multitasking
319 -- implementation) a context-switch to any other currently runnable
320 -- threads (if any), and is occasionally useful when implementing
321 -- concurrency abstractions.
322 yield :: IO ()
323 yield = IO $ \s -> 
324    case (yield# s) of s1 -> (# s1, () #)
325
326 {- | 'labelThread' stores a string as identifier for this thread if
327 you built a RTS with debugging support. This identifier will be used in
328 the debugging output to make distinction of different threads easier
329 (otherwise you only have the thread state object\'s address in the heap).
330
331 Other applications like the graphical Concurrent Haskell Debugger
332 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
333 'labelThread' for their purposes as well.
334 -}
335
336 labelThread :: ThreadId -> String -> IO ()
337 labelThread (ThreadId t) str = IO $ \ s ->
338    let ps  = packCString# str
339        adr = byteArrayContents# ps in
340      case (labelThread# t adr s) of s1 -> (# s1, () #)
341
342 --      Nota Bene: 'pseq' used to be 'seq'
343 --                 but 'seq' is now defined in PrelGHC
344 --
345 -- "pseq" is defined a bit weirdly (see below)
346 --
347 -- The reason for the strange "lazy" call is that
348 -- it fools the compiler into thinking that pseq  and par are non-strict in
349 -- their second argument (even if it inlines pseq at the call site).
350 -- If it thinks pseq is strict in "y", then it often evaluates
351 -- "y" before "x", which is totally wrong.  
352
353 {-# INLINE pseq  #-}
354 pseq :: a -> b -> b
355 pseq  x y = x `seq` lazy y
356
357 {-# INLINE par  #-}
358 par :: a -> b -> b
359 par  x y = case (par# x) of { _ -> lazy y }
360
361
362 data BlockReason
363   = BlockedOnMVar
364         -- ^blocked on on 'MVar'
365   | BlockedOnBlackHole
366         -- ^blocked on a computation in progress by another thread
367   | BlockedOnException
368         -- ^blocked in 'throwTo'
369   | BlockedOnSTM
370         -- ^blocked in 'retry' in an STM transaction
371   | BlockedOnForeignCall
372         -- ^currently in a foreign call
373   | BlockedOnOther
374         -- ^blocked on some other resource.  Without @-threaded@,
375         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
376         -- they show up as 'BlockedOnMVar'.
377   deriving (Eq,Ord,Show)
378
379 -- | The current status of a thread
380 data ThreadStatus
381   = ThreadRunning
382         -- ^the thread is currently runnable or running
383   | ThreadFinished
384         -- ^the thread has finished
385   | ThreadBlocked  BlockReason
386         -- ^the thread is blocked on some resource
387   | ThreadDied
388         -- ^the thread received an uncaught exception
389   deriving (Eq,Ord,Show)
390
391 threadStatus :: ThreadId -> IO ThreadStatus
392 threadStatus (ThreadId t) = IO $ \s ->
393    case threadStatus# t s of
394      (# s', stat #) -> (# s', mk_stat (I# stat) #)
395    where
396         -- NB. keep these in sync with includes/Constants.h
397      mk_stat 0  = ThreadRunning
398      mk_stat 1  = ThreadBlocked BlockedOnMVar
399      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
400      mk_stat 3  = ThreadBlocked BlockedOnException
401      mk_stat 7  = ThreadBlocked BlockedOnSTM
402      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
403      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
404      mk_stat 16 = ThreadFinished
405      mk_stat 17 = ThreadDied
406      mk_stat _  = ThreadBlocked BlockedOnOther
407 \end{code}
408
409
410 %************************************************************************
411 %*                                                                      *
412 \subsection[stm]{Transactional heap operations}
413 %*                                                                      *
414 %************************************************************************
415
416 TVars are shared memory locations which support atomic memory
417 transactions.
418
419 \begin{code}
420 -- |A monad supporting atomic memory transactions.
421 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
422
423 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
424 unSTM (STM a) = a
425
426 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
427
428 instance  Functor STM where
429    fmap f x = x >>= (return . f)
430
431 instance  Monad STM  where
432     {-# INLINE return #-}
433     {-# INLINE (>>)   #-}
434     {-# INLINE (>>=)  #-}
435     m >> k      = thenSTM m k
436     return x    = returnSTM x
437     m >>= k     = bindSTM m k
438
439 bindSTM :: STM a -> (a -> STM b) -> STM b
440 bindSTM (STM m) k = STM ( \s ->
441   case m s of 
442     (# new_s, a #) -> unSTM (k a) new_s
443   )
444
445 thenSTM :: STM a -> STM b -> STM b
446 thenSTM (STM m) k = STM ( \s ->
447   case m s of 
448     (# new_s, _ #) -> unSTM k new_s
449   )
450
451 returnSTM :: a -> STM a
452 returnSTM x = STM (\s -> (# s, x #))
453
454 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
455 -- dangerous thing to do.  
456 --
457 --   * The STM implementation will often run transactions multiple
458 --     times, so you need to be prepared for this if your IO has any
459 --     side effects.
460 --
461 --   * The STM implementation will abort transactions that are known to
462 --     be invalid and need to be restarted.  This may happen in the middle
463 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
464 --     that need releasing (exception handlers are ignored when aborting
465 --     the transaction).  That includes doing any IO using Handles, for
466 --     example.  Getting this wrong will probably lead to random deadlocks.
467 --
468 --   * The transaction may have seen an inconsistent view of memory when
469 --     the IO runs.  Invariants that you expect to be true throughout
470 --     your program may not be true inside a transaction, due to the
471 --     way transactions are implemented.  Normally this wouldn't be visible
472 --     to the programmer, but using `unsafeIOToSTM` can expose it.
473 --
474 unsafeIOToSTM :: IO a -> STM a
475 unsafeIOToSTM (IO m) = STM m
476
477 -- |Perform a series of STM actions atomically.
478 --
479 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
480 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
481 -- this would effectively allow a transaction inside a transaction, depending
482 -- on exactly when the thunk is evaluated.)
483 --
484 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
485 -- and which allows top-level TVars to be allocated.
486
487 atomically :: STM a -> IO a
488 atomically (STM m) = IO (\s -> (atomically# m) s )
489
490 -- |Retry execution of the current memory transaction because it has seen
491 -- values in TVars which mean that it should not continue (e.g. the TVars
492 -- represent a shared buffer that is now empty).  The implementation may
493 -- block the thread until one of the TVars that it has read from has been
494 -- udpated. (GHC only)
495 retry :: STM a
496 retry = STM $ \s# -> retry# s#
497
498 -- |Compose two alternative STM actions (GHC only).  If the first action
499 -- completes without retrying then it forms the result of the orElse.
500 -- Otherwise, if the first action retries, then the second action is
501 -- tried in its place.  If both actions retry then the orElse as a
502 -- whole retries.
503 orElse :: STM a -> STM a -> STM a
504 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
505
506 -- |Exception handling within STM actions.
507 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
508 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
509
510 -- | Low-level primitive on which always and alwaysSucceeds are built.
511 -- checkInv differs form these in that (i) the invariant is not 
512 -- checked when checkInv is called, only at the end of this and
513 -- subsequent transcations, (ii) the invariant failure is indicated
514 -- by raising an exception.
515 checkInv :: STM a -> STM ()
516 checkInv (STM m) = STM (\s -> (check# m) s)
517
518 -- | alwaysSucceeds adds a new invariant that must be true when passed
519 -- to alwaysSucceeds, at the end of the current transaction, and at
520 -- the end of every subsequent transaction.  If it fails at any
521 -- of those points then the transaction violating it is aborted
522 -- and the exception raised by the invariant is propagated.
523 alwaysSucceeds :: STM a -> STM ()
524 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
525                       checkInv i
526
527 -- | always is a variant of alwaysSucceeds in which the invariant is
528 -- expressed as an STM Bool action that must return True.  Returning
529 -- False or raising an exception are both treated as invariant failures.
530 always :: STM Bool -> STM ()
531 always i = alwaysSucceeds ( do v <- i
532                                if (v) then return () else ( error "Transacional invariant violation" ) )
533
534 -- |Shared memory locations that support atomic memory transactions.
535 data TVar a = TVar (TVar# RealWorld a)
536
537 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
538
539 instance Eq (TVar a) where
540         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
541
542 -- |Create a new TVar holding a value supplied
543 newTVar :: a -> STM (TVar a)
544 newTVar val = STM $ \s1# ->
545     case newTVar# val s1# of
546          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
547
548 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
549 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
550 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
551 -- possible.
552 newTVarIO :: a -> IO (TVar a)
553 newTVarIO val = IO $ \s1# ->
554     case newTVar# val s1# of
555          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
556
557 -- |Return the current value stored in a TVar.
558 -- This is equivalent to
559 --
560 -- >  readTVarIO = atomically . readTVar
561 --
562 -- but works much faster, because it doesn't perform a complete
563 -- transaction, it just reads the current value of the 'TVar'.
564 readTVarIO :: TVar a -> IO a
565 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
566
567 -- |Return the current value stored in a TVar
568 readTVar :: TVar a -> STM a
569 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
570
571 -- |Write the supplied value into a TVar
572 writeTVar :: TVar a -> a -> STM ()
573 writeTVar (TVar tvar#) val = STM $ \s1# ->
574     case writeTVar# tvar# val s1# of
575          s2# -> (# s2#, () #)
576   
577 \end{code}
578
579 %************************************************************************
580 %*                                                                      *
581 \subsection[mvars]{M-Structures}
582 %*                                                                      *
583 %************************************************************************
584
585 M-Vars are rendezvous points for concurrent threads.  They begin
586 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
587 is written, a single blocked thread may be freed.  Reading an M-Var
588 toggles its state from full back to empty.  Therefore, any value
589 written to an M-Var may only be read once.  Multiple reads and writes
590 are allowed, but there must be at least one read between any two
591 writes.
592
593 \begin{code}
594 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
595
596 -- |Create an 'MVar' which is initially empty.
597 newEmptyMVar  :: IO (MVar a)
598 newEmptyMVar = IO $ \ s# ->
599     case newMVar# s# of
600          (# s2#, svar# #) -> (# s2#, MVar svar# #)
601
602 -- |Create an 'MVar' which contains the supplied value.
603 newMVar :: a -> IO (MVar a)
604 newMVar value =
605     newEmptyMVar        >>= \ mvar ->
606     putMVar mvar value  >>
607     return mvar
608
609 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
610 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
611 -- the 'MVar' is left empty.
612 -- 
613 -- There are two further important properties of 'takeMVar':
614 --
615 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
616 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
617 --     only one thread will be woken up.  The runtime guarantees that
618 --     the woken thread completes its 'takeMVar' operation.
619 --
620 --   * When multiple threads are blocked on an 'MVar', they are
621 --     woken up in FIFO order.  This is useful for providing
622 --     fairness properties of abstractions built using 'MVar's.
623 --
624 takeMVar :: MVar a -> IO a
625 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
626
627 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
628 -- 'putMVar' will wait until it becomes empty.
629 --
630 -- There are two further important properties of 'putMVar':
631 --
632 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
633 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
634 --     only one thread will be woken up.  The runtime guarantees that
635 --     the woken thread completes its 'putMVar' operation.
636 --
637 --   * When multiple threads are blocked on an 'MVar', they are
638 --     woken up in FIFO order.  This is useful for providing
639 --     fairness properties of abstractions built using 'MVar's.
640 --
641 putMVar  :: MVar a -> a -> IO ()
642 putMVar (MVar mvar#) x = IO $ \ s# ->
643     case putMVar# mvar# x s# of
644         s2# -> (# s2#, () #)
645
646 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
647 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
648 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
649 -- the 'MVar' is left empty.
650 tryTakeMVar :: MVar a -> IO (Maybe a)
651 tryTakeMVar (MVar m) = IO $ \ s ->
652     case tryTakeMVar# m s of
653         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
654         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
655
656 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
657 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
658 -- it was successful, or 'False' otherwise.
659 tryPutMVar  :: MVar a -> a -> IO Bool
660 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
661     case tryPutMVar# mvar# x s# of
662         (# s, 0# #) -> (# s, False #)
663         (# s, _  #) -> (# s, True #)
664
665 -- |Check whether a given 'MVar' is empty.
666 --
667 -- Notice that the boolean value returned  is just a snapshot of
668 -- the state of the MVar. By the time you get to react on its result,
669 -- the MVar may have been filled (or emptied) - so be extremely
670 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
671 isEmptyMVar :: MVar a -> IO Bool
672 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
673     case isEmptyMVar# mv# s# of
674         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
675
676 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
677 -- "System.Mem.Weak" for more about finalizers.
678 addMVarFinalizer :: MVar a -> IO () -> IO ()
679 addMVarFinalizer (MVar m) finalizer = 
680   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
681
682 withMVar :: MVar a -> (a -> IO b) -> IO b
683 withMVar m io = 
684   block $ do
685     a <- takeMVar m
686     b <- catchAny (unblock (io a))
687             (\e -> do putMVar m a; throw e)
688     putMVar m a
689     return b
690 \end{code}
691
692
693 %************************************************************************
694 %*                                                                      *
695 \subsection{Thread waiting}
696 %*                                                                      *
697 %************************************************************************
698
699 \begin{code}
700 #ifdef mingw32_HOST_OS
701
702 -- Note: threadWaitRead and threadWaitWrite aren't really functional
703 -- on Win32, but left in there because lib code (still) uses them (the manner
704 -- in which they're used doesn't cause problems on a Win32 platform though.)
705
706 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
707 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
708   IO $ \s -> case asyncRead# fd isSock len buf s of 
709                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
710
711 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
712 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
713   IO $ \s -> case asyncWrite# fd isSock len buf s of 
714                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
715
716 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
717 asyncDoProc (FunPtr proc) (Ptr param) = 
718     -- the 'length' value is ignored; simplifies implementation of
719     -- the async*# primops to have them all return the same result.
720   IO $ \s -> case asyncDoProc# proc param s  of 
721                (# s', _len#, err# #) -> (# s', I# err# #)
722
723 -- to aid the use of these primops by the IO Handle implementation,
724 -- provide the following convenience funs:
725
726 -- this better be a pinned byte array!
727 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
728 asyncReadBA fd isSock len off bufB = 
729   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
730   
731 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
732 asyncWriteBA fd isSock len off bufB = 
733   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
734
735 #endif
736
737 -- -----------------------------------------------------------------------------
738 -- Thread IO API
739
740 -- | Block the current thread until data is available to read on the
741 -- given file descriptor (GHC only).
742 threadWaitRead :: Fd -> IO ()
743 threadWaitRead fd
744 #ifndef mingw32_HOST_OS
745   | threaded  = waitForReadEvent fd
746 #endif
747   | otherwise = IO $ \s -> 
748         case fromIntegral fd of { I# fd# ->
749         case waitRead# fd# s of { s' -> (# s', () #)
750         }}
751
752 -- | Block the current thread until data can be written to the
753 -- given file descriptor (GHC only).
754 threadWaitWrite :: Fd -> IO ()
755 threadWaitWrite fd
756 #ifndef mingw32_HOST_OS
757   | threaded  = waitForWriteEvent fd
758 #endif
759   | otherwise = IO $ \s -> 
760         case fromIntegral fd of { I# fd# ->
761         case waitWrite# fd# s of { s' -> (# s', () #)
762         }}
763
764 -- | Suspends the current thread for a given number of microseconds
765 -- (GHC only).
766 --
767 -- There is no guarantee that the thread will be rescheduled promptly
768 -- when the delay has expired, but the thread will never continue to
769 -- run /earlier/ than specified.
770 --
771 threadDelay :: Int -> IO ()
772 threadDelay time
773   | threaded  = waitForDelayEvent time
774   | otherwise = IO $ \s -> 
775         case fromIntegral time of { I# time# ->
776         case delay# time# s of { s' -> (# s', () #)
777         }}
778
779
780 -- | Set the value of returned TVar to True after a given number of
781 -- microseconds. The caveats associated with threadDelay also apply.
782 --
783 registerDelay :: Int -> IO (TVar Bool)
784 registerDelay usecs 
785   | threaded = waitForDelayEventSTM usecs
786   | otherwise = error "registerDelay: requires -threaded"
787
788 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
789
790 waitForDelayEvent :: Int -> IO ()
791 waitForDelayEvent usecs = do
792   m <- newEmptyMVar
793   target <- calculateTarget usecs
794   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
795   prodServiceThread
796   takeMVar m
797
798 -- Delays for use in STM
799 waitForDelayEventSTM :: Int -> IO (TVar Bool)
800 waitForDelayEventSTM usecs = do
801    t <- atomically $ newTVar False
802    target <- calculateTarget usecs
803    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
804    prodServiceThread
805    return t  
806     
807 calculateTarget :: Int -> IO USecs
808 calculateTarget usecs = do
809     now <- getUSecOfDay
810     return $ now + (fromIntegral usecs)
811
812
813 -- ----------------------------------------------------------------------------
814 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
815
816 -- In the threaded RTS, we employ a single IO Manager thread to wait
817 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
818 -- and delays (threadDelay).  
819 --
820 -- We can do this because in the threaded RTS the IO Manager can make
821 -- a non-blocking call to select(), so we don't have to do select() in
822 -- the scheduler as we have to in the non-threaded RTS.  We get performance
823 -- benefits from doing it this way, because we only have to restart the select()
824 -- when a new request arrives, rather than doing one select() each time
825 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
826 -- by not having to check for completed IO requests.
827
828 -- Issues, possible problems:
829 --
830 --      - we might want bound threads to just do the blocking
831 --        operation rather than communicating with the IO manager
832 --        thread.  This would prevent simgle-threaded programs which do
833 --        IO from requiring multiple OS threads.  However, it would also
834 --        prevent bound threads waiting on IO from being killed or sent
835 --        exceptions.
836 --
837 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
838 --        I couldn't repeat this.
839 --
840 --      - How do we handle signal delivery in the multithreaded RTS?
841 --
842 --      - forkProcess will kill the IO manager thread.  Let's just
843 --        hope we don't need to do any blocking IO between fork & exec.
844
845 #ifndef mingw32_HOST_OS
846 data IOReq
847   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
848   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
849 #endif
850
851 data DelayReq
852   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
853   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
854
855 #ifndef mingw32_HOST_OS
856 pendingEvents :: IORef [IOReq]
857 #endif
858 pendingDelays :: IORef [DelayReq]
859         -- could use a strict list or array here
860 {-# NOINLINE pendingEvents #-}
861 {-# NOINLINE pendingDelays #-}
862 (pendingEvents,pendingDelays) = unsafePerformIO $ do
863   startIOManagerThread
864   reqs <- newIORef []
865   dels <- newIORef []
866   return (reqs, dels)
867         -- the first time we schedule an IO request, the service thread
868         -- will be created (cool, huh?)
869
870 ensureIOManagerIsRunning :: IO ()
871 ensureIOManagerIsRunning 
872   | threaded  = seq pendingEvents $ return ()
873   | otherwise = return ()
874
875 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
876 insertDelay d [] = [d]
877 insertDelay d1 ds@(d2 : rest)
878   | delayTime d1 <= delayTime d2 = d1 : ds
879   | otherwise                    = d2 : insertDelay d1 rest
880
881 delayTime :: DelayReq -> USecs
882 delayTime (Delay t _) = t
883 delayTime (DelaySTM t _) = t
884
885 type USecs = Word64
886
887 -- XXX: move into GHC.IOBase from Data.IORef?
888 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
889 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
890
891 foreign import ccall unsafe "getUSecOfDay" 
892   getUSecOfDay :: IO USecs
893
894 prodding :: IORef Bool
895 {-# NOINLINE prodding #-}
896 prodding = unsafePerformIO (newIORef False)
897
898 prodServiceThread :: IO ()
899 prodServiceThread = do
900   was_set <- atomicModifyIORef prodding (\a -> (True,a))
901   if (not (was_set)) then wakeupIOManager else return ()
902
903 #ifdef mingw32_HOST_OS
904 -- ----------------------------------------------------------------------------
905 -- Windows IO manager thread
906
907 startIOManagerThread :: IO ()
908 startIOManagerThread = do
909   wakeup <- c_getIOManagerEvent
910   forkIO $ service_loop wakeup []
911   return ()
912
913 service_loop :: HANDLE          -- read end of pipe
914              -> [DelayReq]      -- current delay requests
915              -> IO ()
916
917 service_loop wakeup old_delays = do
918   -- pick up new delay requests
919   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
920   let  delays = foldr insertDelay old_delays new_delays
921
922   now <- getUSecOfDay
923   (delays', timeout) <- getDelay now delays
924
925   r <- c_WaitForSingleObject wakeup timeout
926   case r of
927     0xffffffff -> do c_maperrno; throwErrno "service_loop"
928     0 -> do
929         r2 <- c_readIOManagerEvent
930         exit <- 
931               case r2 of
932                 _ | r2 == io_MANAGER_WAKEUP -> return False
933                 _ | r2 == io_MANAGER_DIE    -> return True
934                 0 -> return False -- spurious wakeup
935                 _ -> do start_console_handler (r2 `shiftR` 1); return False
936         if exit
937           then return ()
938           else service_cont wakeup delays'
939
940     _other -> service_cont wakeup delays' -- probably timeout        
941
942 service_cont :: HANDLE -> [DelayReq] -> IO ()
943 service_cont wakeup delays = do
944   atomicModifyIORef prodding (\_ -> (False,False))
945   service_loop wakeup delays
946
947 -- must agree with rts/win32/ThrIOManager.c
948 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
949 io_MANAGER_WAKEUP = 0xffffffff
950 io_MANAGER_DIE    = 0xfffffffe
951
952 data ConsoleEvent
953  = ControlC
954  | Break
955  | Close
956     -- these are sent to Services only.
957  | Logoff
958  | Shutdown
959  deriving (Eq, Ord, Enum, Show, Read, Typeable)
960
961 start_console_handler :: Word32 -> IO ()
962 start_console_handler r =
963   case toWin32ConsoleEvent r of
964      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
965                     forkIO (handler x)
966                     return ()
967      Nothing -> return ()
968
969 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
970 toWin32ConsoleEvent ev = 
971    case ev of
972        0 {- CTRL_C_EVENT-}        -> Just ControlC
973        1 {- CTRL_BREAK_EVENT-}    -> Just Break
974        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
975        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
976        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
977        _ -> Nothing
978
979 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
980 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
981
982 -- XXX Is this actually needed?
983 stick :: IORef HANDLE
984 {-# NOINLINE stick #-}
985 stick = unsafePerformIO (newIORef nullPtr)
986
987 wakeupIOManager :: IO ()
988 wakeupIOManager = do 
989   _hdl <- readIORef stick
990   c_sendIOManagerEvent io_MANAGER_WAKEUP
991
992 -- Walk the queue of pending delays, waking up any that have passed
993 -- and return the smallest delay to wait for.  The queue of pending
994 -- delays is kept ordered.
995 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
996 getDelay _   [] = return ([], iNFINITE)
997 getDelay now all@(d : rest) 
998   = case d of
999      Delay time m | now >= time -> do
1000         putMVar m ()
1001         getDelay now rest
1002      DelaySTM time t | now >= time -> do
1003         atomically $ writeTVar t True
1004         getDelay now rest
1005      _otherwise ->
1006         -- delay is in millisecs for WaitForSingleObject
1007         let micro_seconds = delayTime d - now
1008             milli_seconds = (micro_seconds + 999) `div` 1000
1009         in return (all, fromIntegral milli_seconds)
1010
1011 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1012 -- available yet.  We should move some Win32 functionality down here,
1013 -- maybe as part of the grand reorganisation of the base package...
1014 type HANDLE       = Ptr ()
1015 type DWORD        = Word32
1016
1017 iNFINITE :: DWORD
1018 iNFINITE = 0xFFFFFFFF -- urgh
1019
1020 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1021   c_getIOManagerEvent :: IO HANDLE
1022
1023 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1024   c_readIOManagerEvent :: IO Word32
1025
1026 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1027   c_sendIOManagerEvent :: Word32 -> IO ()
1028
1029 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1030    c_maperrno :: IO ()
1031
1032 foreign import stdcall "WaitForSingleObject"
1033    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1034
1035 #else
1036 -- ----------------------------------------------------------------------------
1037 -- Unix IO manager thread, using select()
1038
1039 startIOManagerThread :: IO ()
1040 startIOManagerThread = do
1041         allocaArray 2 $ \fds -> do
1042         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1043         rd_end <- peekElemOff fds 0
1044         wr_end <- peekElemOff fds 1
1045         writeIORef stick (fromIntegral wr_end)
1046         c_setIOManagerPipe wr_end
1047         forkIO $ do
1048             allocaBytes sizeofFdSet   $ \readfds -> do
1049             allocaBytes sizeofFdSet   $ \writefds -> do 
1050             allocaBytes sizeofTimeVal $ \timeval -> do
1051             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1052         return ()
1053
1054 service_loop
1055    :: Fd                -- listen to this for wakeup calls
1056    -> Ptr CFdSet
1057    -> Ptr CFdSet
1058    -> Ptr CTimeVal
1059    -> [IOReq]
1060    -> [DelayReq]
1061    -> IO ()
1062 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1063
1064   -- pick up new IO requests
1065   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1066   let reqs = new_reqs ++ old_reqs
1067
1068   -- pick up new delay requests
1069   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1070   let  delays0 = foldr insertDelay old_delays new_delays
1071
1072   -- build the FDSets for select()
1073   fdZero readfds
1074   fdZero writefds
1075   fdSet wakeup readfds
1076   maxfd <- buildFdSets 0 readfds writefds reqs
1077
1078   -- perform the select()
1079   let do_select delays = do
1080           -- check the current time and wake up any thread in
1081           -- threadDelay whose timeout has expired.  Also find the
1082           -- timeout value for the select() call.
1083           now <- getUSecOfDay
1084           (delays', timeout) <- getDelay now ptimeval delays
1085
1086           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1087                         nullPtr timeout
1088           if (res == -1)
1089              then do
1090                 err <- getErrno
1091                 case err of
1092                   _ | err == eINTR ->  do_select delays'
1093                         -- EINTR: just redo the select()
1094                   _ | err == eBADF ->  return (True, delays)
1095                         -- EBADF: one of the file descriptors is closed or bad,
1096                         -- we don't know which one, so wake everyone up.
1097                   _ | otherwise    ->  throwErrno "select"
1098                         -- otherwise (ENOMEM or EINVAL) something has gone
1099                         -- wrong; report the error.
1100              else
1101                 return (False,delays')
1102
1103   (wakeup_all,delays') <- do_select delays0
1104
1105   exit <-
1106     if wakeup_all then return False
1107       else do
1108         b <- fdIsSet wakeup readfds
1109         if b == 0 
1110           then return False
1111           else alloca $ \p -> do 
1112                  c_read (fromIntegral wakeup) p 1; return ()
1113                  s <- peek p            
1114                  case s of
1115                   _ | s == io_MANAGER_WAKEUP -> return False
1116                   _ | s == io_MANAGER_DIE    -> return True
1117                   _ -> withMVar signalHandlerLock $ \_ -> do
1118                           handler_tbl <- peek handlers
1119                           sp <- peekElemOff handler_tbl (fromIntegral s)
1120                           io <- deRefStablePtr sp
1121                           forkIO io
1122                           return False
1123
1124   if exit then return () else do
1125
1126   atomicModifyIORef prodding (\_ -> (False,False))
1127
1128   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1129                          else completeRequests reqs readfds writefds []
1130
1131   service_loop wakeup readfds writefds ptimeval reqs' delays'
1132
1133 io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
1134 io_MANAGER_WAKEUP = 0xff
1135 io_MANAGER_DIE    = 0xfe
1136
1137 stick :: IORef Fd
1138 {-# NOINLINE stick #-}
1139 stick = unsafePerformIO (newIORef 0)
1140
1141 wakeupIOManager :: IO ()
1142 wakeupIOManager = do
1143   fd <- readIORef stick
1144   with io_MANAGER_WAKEUP $ \pbuf -> do 
1145     c_write (fromIntegral fd) pbuf 1; return ()
1146
1147 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1148 -- this race condition is #1922, although that bug was on Windows a similar
1149 -- bug also exists on Unix.
1150 signalHandlerLock :: MVar ()
1151 signalHandlerLock = unsafePerformIO (newMVar ())
1152
1153 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1154
1155 foreign import ccall "setIOManagerPipe"
1156   c_setIOManagerPipe :: CInt -> IO ()
1157
1158 -- -----------------------------------------------------------------------------
1159 -- IO requests
1160
1161 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1162 buildFdSets maxfd _       _        [] = return maxfd
1163 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1164   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1165   | otherwise        =  do
1166         fdSet fd readfds
1167         buildFdSets (max maxfd fd) readfds writefds reqs
1168 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1169   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1170   | otherwise        =  do
1171         fdSet fd writefds
1172         buildFdSets (max maxfd fd) readfds writefds reqs
1173
1174 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1175                  -> IO [IOReq]
1176 completeRequests [] _ _ reqs' = return reqs'
1177 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1178   b <- fdIsSet fd readfds
1179   if b /= 0
1180     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1181     else completeRequests reqs readfds writefds (Read fd m : reqs')
1182 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1183   b <- fdIsSet fd writefds
1184   if b /= 0
1185     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1186     else completeRequests reqs readfds writefds (Write fd m : reqs')
1187
1188 wakeupAll :: [IOReq] -> IO ()
1189 wakeupAll [] = return ()
1190 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1191 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1192
1193 waitForReadEvent :: Fd -> IO ()
1194 waitForReadEvent fd = do
1195   m <- newEmptyMVar
1196   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1197   prodServiceThread
1198   takeMVar m
1199
1200 waitForWriteEvent :: Fd -> IO ()
1201 waitForWriteEvent fd = do
1202   m <- newEmptyMVar
1203   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1204   prodServiceThread
1205   takeMVar m
1206
1207 -- -----------------------------------------------------------------------------
1208 -- Delays
1209
1210 -- Walk the queue of pending delays, waking up any that have passed
1211 -- and return the smallest delay to wait for.  The queue of pending
1212 -- delays is kept ordered.
1213 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1214 getDelay _   _        [] = return ([],nullPtr)
1215 getDelay now ptimeval all@(d : rest) 
1216   = case d of
1217      Delay time m | now >= time -> do
1218         putMVar m ()
1219         getDelay now ptimeval rest
1220      DelaySTM time t | now >= time -> do
1221         atomically $ writeTVar t True
1222         getDelay now ptimeval rest
1223      _otherwise -> do
1224         setTimevalTicks ptimeval (delayTime d - now)
1225         return (all,ptimeval)
1226
1227 data CTimeVal
1228
1229 foreign import ccall unsafe "sizeofTimeVal"
1230   sizeofTimeVal :: Int
1231
1232 foreign import ccall unsafe "setTimevalTicks" 
1233   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1234
1235 {- 
1236   On Win32 we're going to have a single Pipe, and a
1237   waitForSingleObject with the delay time.  For signals, we send a
1238   byte down the pipe just like on Unix.
1239 -}
1240
1241 -- ----------------------------------------------------------------------------
1242 -- select() interface
1243
1244 -- ToDo: move to System.Posix.Internals?
1245
1246 data CFdSet
1247
1248 foreign import ccall safe "select"
1249   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1250            -> IO CInt
1251
1252 foreign import ccall unsafe "hsFD_SETSIZE"
1253   c_fD_SETSIZE :: CInt
1254
1255 fD_SETSIZE :: Fd
1256 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1257
1258 foreign import ccall unsafe "hsFD_ISSET"
1259   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1260
1261 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1262 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1263
1264 foreign import ccall unsafe "hsFD_SET"
1265   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1266
1267 fdSet :: Fd -> Ptr CFdSet -> IO ()
1268 fdSet (Fd fd) fdset = c_fdSet fd fdset
1269
1270 foreign import ccall unsafe "hsFD_ZERO"
1271   fdZero :: Ptr CFdSet -> IO ()
1272
1273 foreign import ccall unsafe "sizeof_fd_set"
1274   sizeofFdSet :: Int
1275
1276 #endif
1277
1278 reportStackOverflow :: IO a
1279 reportStackOverflow = do callStackOverflowHook; return undefined
1280
1281 reportError :: SomeException -> IO a
1282 reportError ex = do
1283    handler <- getUncaughtExceptionHandler
1284    handler ex
1285    return undefined
1286
1287 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1288 -- the unsafe below.
1289 foreign import ccall unsafe "stackOverflow"
1290         callStackOverflowHook :: IO ()
1291
1292 {-# NOINLINE uncaughtExceptionHandler #-}
1293 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1294 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1295    where
1296       defaultHandler :: SomeException -> IO ()
1297       defaultHandler se@(SomeException ex) = do
1298          (hFlush stdout) `catchAny` (\ _ -> return ())
1299          let msg = case cast ex of
1300                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1301                _ -> case cast ex of
1302                     Just (ErrorCall s) -> s
1303                     _                  -> showsPrec 0 se ""
1304          withCString "%s" $ \cfmt ->
1305           withCString msg $ \cmsg ->
1306             errorBelch cfmt cmsg
1307
1308 -- don't use errorBelch() directly, because we cannot call varargs functions
1309 -- using the FFI.
1310 foreign import ccall unsafe "HsBase.h errorBelch2"
1311    errorBelch :: CString -> CString -> IO ()
1312
1313 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1314 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1315
1316 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1317 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1318 \end{code}