ifdef out the syncIOManager export on Windows; fixes the build
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * MVars
54         , MVar(..)
55         , newMVar       -- :: a -> IO (MVar a)
56         , newEmptyMVar  -- :: IO (MVar a)
57         , takeMVar      -- :: MVar a -> IO a
58         , putMVar       -- :: MVar a -> a -> IO ()
59         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
60         , tryPutMVar    -- :: MVar a -> a -> IO Bool
61         , isEmptyMVar   -- :: MVar a -> IO Bool
62         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
63
64         -- * TVars
65         , STM(..)
66         , atomically    -- :: STM a -> IO a
67         , retry         -- :: STM a
68         , orElse        -- :: STM a -> STM a -> STM a
69         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
70         , alwaysSucceeds -- :: STM a -> STM ()
71         , always        -- :: STM Bool -> STM ()
72         , TVar(..)
73         , newTVar       -- :: a -> STM (TVar a)
74         , newTVarIO     -- :: a -> STM (TVar a)
75         , readTVar      -- :: TVar a -> STM a
76         , readTVarIO    -- :: TVar a -> IO a
77         , writeTVar     -- :: a -> TVar a -> STM ()
78         , unsafeIOToSTM -- :: IO a -> STM a
79
80         -- * Miscellaneous
81 #ifdef mingw32_HOST_OS
82         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
84         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85
86         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
88 #endif
89
90 #ifndef mingw32_HOST_OS
91         , Signal, HandlerFun, setHandler, runHandlers
92 #endif
93
94         , ensureIOManagerIsRunning
95 #ifndef mingw32_HOST_OS
96         , syncIOManager
97 #endif
98
99 #ifdef mingw32_HOST_OS
100         , ConsoleEvent(..)
101         , win32ConsoleHandler
102         , toWin32ConsoleEvent
103 #endif
104         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
105         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
106
107         , reportError, reportStackOverflow
108         ) where
109
110 import System.Posix.Types
111 #ifndef mingw32_HOST_OS
112 import System.Posix.Internals
113 #endif
114 import Foreign
115 import Foreign.C
116
117 import Data.Dynamic
118 import Data.Maybe
119 import Control.Monad
120
121 import GHC.Base
122 import {-# SOURCE #-} GHC.Handle
123 import GHC.IOBase
124 import GHC.Num          ( Num(..) )
125 import GHC.Real         ( fromIntegral )
126 import GHC.Arr          ( inRange )
127 #ifdef mingw32_HOST_OS
128 import GHC.Real         ( div )
129 import GHC.Ptr          ( plusPtr, FunPtr(..) )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Read         ( Read )
133 import GHC.Enum         ( Enum )
134 #endif
135 import GHC.Exception    ( SomeException(..), throw )
136 import GHC.Pack         ( packCString# )
137 import GHC.Ptr          ( Ptr(..) )
138 import GHC.STRef
139 import GHC.Show         ( Show(..), showString )
140 import Data.Typeable
141 import GHC.Err
142
143 infixr 0 `par`, `pseq`
144 \end{code}
145
146 %************************************************************************
147 %*                                                                      *
148 \subsection{@ThreadId@, @par@, and @fork@}
149 %*                                                                      *
150 %************************************************************************
151
152 \begin{code}
153 data ThreadId = ThreadId ThreadId# deriving( Typeable )
154 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
155 -- But since ThreadId# is unlifted, the Weak type must use open
156 -- type variables.
157 {- ^
158 A 'ThreadId' is an abstract type representing a handle to a thread.
159 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
160 the 'Ord' instance implements an arbitrary total ordering over
161 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
162 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
163 useful when debugging or diagnosing the behaviour of a concurrent
164 program.
165
166 /Note/: in GHC, if you have a 'ThreadId', you essentially have
167 a pointer to the thread itself.  This means the thread itself can\'t be
168 garbage collected until you drop the 'ThreadId'.
169 This misfeature will hopefully be corrected at a later date.
170
171 /Note/: Hugs does not provide any operations on other threads;
172 it defines 'ThreadId' as a synonym for ().
173 -}
174
175 instance Show ThreadId where
176    showsPrec d t = 
177         showString "ThreadId " . 
178         showsPrec d (getThreadId (id2TSO t))
179
180 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
181
182 id2TSO :: ThreadId -> ThreadId#
183 id2TSO (ThreadId t) = t
184
185 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
186 -- Returns -1, 0, 1
187
188 cmpThread :: ThreadId -> ThreadId -> Ordering
189 cmpThread t1 t2 = 
190    case cmp_thread (id2TSO t1) (id2TSO t2) of
191       -1 -> LT
192       0  -> EQ
193       _  -> GT -- must be 1
194
195 instance Eq ThreadId where
196    t1 == t2 = 
197       case t1 `cmpThread` t2 of
198          EQ -> True
199          _  -> False
200
201 instance Ord ThreadId where
202    compare = cmpThread
203
204 {- |
205 Sparks off a new thread to run the 'IO' computation passed as the
206 first argument, and returns the 'ThreadId' of the newly created
207 thread.
208
209 The new thread will be a lightweight thread; if you want to use a foreign
210 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
211
212 GHC note: the new thread inherits the /blocked/ state of the parent 
213 (see 'Control.Exception.block').
214
215 The newly created thread has an exception handler that discards the
216 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
217 'ThreadKilled', and passes all other exceptions to the uncaught
218 exception handler (see 'setUncaughtExceptionHandler').
219 -}
220 forkIO :: IO () -> IO ThreadId
221 forkIO action = IO $ \ s -> 
222    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
223  where
224   action_plus = catchException action childHandler
225
226 {- |
227 Like 'forkIO', but lets you specify on which CPU the thread is
228 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
229 will stay on the same CPU for its entire lifetime (`forkIO` threads
230 can migrate between CPUs according to the scheduling policy).
231 `forkOnIO` is useful for overriding the scheduling policy when you
232 know in advance how best to distribute the threads.
233
234 The `Int` argument specifies the CPU number; it is interpreted modulo
235 'numCapabilities' (note that it actually specifies a capability number
236 rather than a CPU number, but to a first approximation the two are
237 equivalent).
238 -}
239 forkOnIO :: Int -> IO () -> IO ThreadId
240 forkOnIO (I# cpu) action = IO $ \ s -> 
241    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
242  where
243   action_plus = catchException action childHandler
244
245 -- | the value passed to the @+RTS -N@ flag.  This is the number of
246 -- Haskell threads that can run truly simultaneously at any given
247 -- time, and is typically set to the number of physical CPU cores on
248 -- the machine.
249 numCapabilities :: Int
250 numCapabilities = unsafePerformIO $  do 
251                     n <- peek n_capabilities
252                     return (fromIntegral n)
253
254 #if defined(mingw32_HOST_OS) && defined(__PIC__)
255 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
256 #else
257 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
258 #endif
259 childHandler :: SomeException -> IO ()
260 childHandler err = catchException (real_handler err) childHandler
261
262 real_handler :: SomeException -> IO ()
263 real_handler se@(SomeException ex) =
264   -- ignore thread GC and killThread exceptions:
265   case cast ex of
266   Just BlockedOnDeadMVar                -> return ()
267   _ -> case cast ex of
268        Just BlockedIndefinitely         -> return ()
269        _ -> case cast ex of
270             Just ThreadKilled           -> return ()
271             _ -> case cast ex of
272                  -- report all others:
273                  Just StackOverflow     -> reportStackOverflow
274                  _                      -> reportError se
275
276 {- | 'killThread' terminates the given thread (GHC only).
277 Any work already done by the thread isn\'t
278 lost: the computation is suspended until required by another thread.
279 The memory used by the thread will be garbage collected if it isn\'t
280 referenced from anywhere.  The 'killThread' function is defined in
281 terms of 'throwTo':
282
283 > killThread tid = throwTo tid ThreadKilled
284
285 Killthread is a no-op if the target thread has already completed.
286 -}
287 killThread :: ThreadId -> IO ()
288 killThread tid = throwTo tid ThreadKilled
289
290 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
291
292 'throwTo' does not return until the exception has been raised in the
293 target thread. 
294 The calling thread can thus be certain that the target
295 thread has received the exception.  This is a useful property to know
296 when dealing with race conditions: eg. if there are two threads that
297 can kill each other, it is guaranteed that only one of the threads
298 will get to kill the other.
299
300 If the target thread is currently making a foreign call, then the
301 exception will not be raised (and hence 'throwTo' will not return)
302 until the call has completed.  This is the case regardless of whether
303 the call is inside a 'block' or not.
304
305 Important note: the behaviour of 'throwTo' differs from that described in
306 the paper \"Asynchronous exceptions in Haskell\"
307 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
308 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
309 a more synchronous design in which 'throwTo' does not return until the exception
310 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
311 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
312 the paper).
313
314 There is currently no guarantee that the exception delivered by 'throwTo' will be
315 delivered at the first possible opportunity.  In particular, a thread may 
316 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
317 a pending 'throwTo'.  This is arguably undesirable behaviour.
318
319  -}
320 throwTo :: Exception e => ThreadId -> e -> IO ()
321 throwTo (ThreadId tid) ex = IO $ \ s ->
322    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
323
324 -- | Returns the 'ThreadId' of the calling thread (GHC only).
325 myThreadId :: IO ThreadId
326 myThreadId = IO $ \s ->
327    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
328
329
330 -- |The 'yield' action allows (forces, in a co-operative multitasking
331 -- implementation) a context-switch to any other currently runnable
332 -- threads (if any), and is occasionally useful when implementing
333 -- concurrency abstractions.
334 yield :: IO ()
335 yield = IO $ \s -> 
336    case (yield# s) of s1 -> (# s1, () #)
337
338 {- | 'labelThread' stores a string as identifier for this thread if
339 you built a RTS with debugging support. This identifier will be used in
340 the debugging output to make distinction of different threads easier
341 (otherwise you only have the thread state object\'s address in the heap).
342
343 Other applications like the graphical Concurrent Haskell Debugger
344 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
345 'labelThread' for their purposes as well.
346 -}
347
348 labelThread :: ThreadId -> String -> IO ()
349 labelThread (ThreadId t) str = IO $ \ s ->
350    let ps  = packCString# str
351        adr = byteArrayContents# ps in
352      case (labelThread# t adr s) of s1 -> (# s1, () #)
353
354 --      Nota Bene: 'pseq' used to be 'seq'
355 --                 but 'seq' is now defined in PrelGHC
356 --
357 -- "pseq" is defined a bit weirdly (see below)
358 --
359 -- The reason for the strange "lazy" call is that
360 -- it fools the compiler into thinking that pseq  and par are non-strict in
361 -- their second argument (even if it inlines pseq at the call site).
362 -- If it thinks pseq is strict in "y", then it often evaluates
363 -- "y" before "x", which is totally wrong.  
364
365 {-# INLINE pseq  #-}
366 pseq :: a -> b -> b
367 pseq  x y = x `seq` lazy y
368
369 {-# INLINE par  #-}
370 par :: a -> b -> b
371 par  x y = case (par# x) of { _ -> lazy y }
372
373 -- | Internal function used by the RTS to run sparks.
374 runSparks :: IO ()
375 runSparks = IO loop
376   where loop s = case getSpark# s of
377                    (# s', n, p #) ->
378                       if n ==# 0# then (# s', () #)
379                                   else p `seq` loop s'
380
381 data BlockReason
382   = BlockedOnMVar
383         -- ^blocked on on 'MVar'
384   | BlockedOnBlackHole
385         -- ^blocked on a computation in progress by another thread
386   | BlockedOnException
387         -- ^blocked in 'throwTo'
388   | BlockedOnSTM
389         -- ^blocked in 'retry' in an STM transaction
390   | BlockedOnForeignCall
391         -- ^currently in a foreign call
392   | BlockedOnOther
393         -- ^blocked on some other resource.  Without @-threaded@,
394         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
395         -- they show up as 'BlockedOnMVar'.
396   deriving (Eq,Ord,Show)
397
398 -- | The current status of a thread
399 data ThreadStatus
400   = ThreadRunning
401         -- ^the thread is currently runnable or running
402   | ThreadFinished
403         -- ^the thread has finished
404   | ThreadBlocked  BlockReason
405         -- ^the thread is blocked on some resource
406   | ThreadDied
407         -- ^the thread received an uncaught exception
408   deriving (Eq,Ord,Show)
409
410 threadStatus :: ThreadId -> IO ThreadStatus
411 threadStatus (ThreadId t) = IO $ \s ->
412    case threadStatus# t s of
413      (# s', stat #) -> (# s', mk_stat (I# stat) #)
414    where
415         -- NB. keep these in sync with includes/Constants.h
416      mk_stat 0  = ThreadRunning
417      mk_stat 1  = ThreadBlocked BlockedOnMVar
418      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
419      mk_stat 3  = ThreadBlocked BlockedOnException
420      mk_stat 7  = ThreadBlocked BlockedOnSTM
421      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
422      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
423      mk_stat 16 = ThreadFinished
424      mk_stat 17 = ThreadDied
425      mk_stat _  = ThreadBlocked BlockedOnOther
426 \end{code}
427
428
429 %************************************************************************
430 %*                                                                      *
431 \subsection[stm]{Transactional heap operations}
432 %*                                                                      *
433 %************************************************************************
434
435 TVars are shared memory locations which support atomic memory
436 transactions.
437
438 \begin{code}
439 -- |A monad supporting atomic memory transactions.
440 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
441
442 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
443 unSTM (STM a) = a
444
445 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
446
447 instance  Functor STM where
448    fmap f x = x >>= (return . f)
449
450 instance  Monad STM  where
451     {-# INLINE return #-}
452     {-# INLINE (>>)   #-}
453     {-# INLINE (>>=)  #-}
454     m >> k      = thenSTM m k
455     return x    = returnSTM x
456     m >>= k     = bindSTM m k
457
458 bindSTM :: STM a -> (a -> STM b) -> STM b
459 bindSTM (STM m) k = STM ( \s ->
460   case m s of 
461     (# new_s, a #) -> unSTM (k a) new_s
462   )
463
464 thenSTM :: STM a -> STM b -> STM b
465 thenSTM (STM m) k = STM ( \s ->
466   case m s of 
467     (# new_s, _ #) -> unSTM k new_s
468   )
469
470 returnSTM :: a -> STM a
471 returnSTM x = STM (\s -> (# s, x #))
472
473 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
474 -- dangerous thing to do.  
475 --
476 --   * The STM implementation will often run transactions multiple
477 --     times, so you need to be prepared for this if your IO has any
478 --     side effects.
479 --
480 --   * The STM implementation will abort transactions that are known to
481 --     be invalid and need to be restarted.  This may happen in the middle
482 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
483 --     that need releasing (exception handlers are ignored when aborting
484 --     the transaction).  That includes doing any IO using Handles, for
485 --     example.  Getting this wrong will probably lead to random deadlocks.
486 --
487 --   * The transaction may have seen an inconsistent view of memory when
488 --     the IO runs.  Invariants that you expect to be true throughout
489 --     your program may not be true inside a transaction, due to the
490 --     way transactions are implemented.  Normally this wouldn't be visible
491 --     to the programmer, but using `unsafeIOToSTM` can expose it.
492 --
493 unsafeIOToSTM :: IO a -> STM a
494 unsafeIOToSTM (IO m) = STM m
495
496 -- |Perform a series of STM actions atomically.
497 --
498 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
499 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
500 -- this would effectively allow a transaction inside a transaction, depending
501 -- on exactly when the thunk is evaluated.)
502 --
503 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
504 -- and which allows top-level TVars to be allocated.
505
506 atomically :: STM a -> IO a
507 atomically (STM m) = IO (\s -> (atomically# m) s )
508
509 -- |Retry execution of the current memory transaction because it has seen
510 -- values in TVars which mean that it should not continue (e.g. the TVars
511 -- represent a shared buffer that is now empty).  The implementation may
512 -- block the thread until one of the TVars that it has read from has been
513 -- udpated. (GHC only)
514 retry :: STM a
515 retry = STM $ \s# -> retry# s#
516
517 -- |Compose two alternative STM actions (GHC only).  If the first action
518 -- completes without retrying then it forms the result of the orElse.
519 -- Otherwise, if the first action retries, then the second action is
520 -- tried in its place.  If both actions retry then the orElse as a
521 -- whole retries.
522 orElse :: STM a -> STM a -> STM a
523 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
524
525 -- |Exception handling within STM actions.
526 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
527 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
528
529 -- | Low-level primitive on which always and alwaysSucceeds are built.
530 -- checkInv differs form these in that (i) the invariant is not 
531 -- checked when checkInv is called, only at the end of this and
532 -- subsequent transcations, (ii) the invariant failure is indicated
533 -- by raising an exception.
534 checkInv :: STM a -> STM ()
535 checkInv (STM m) = STM (\s -> (check# m) s)
536
537 -- | alwaysSucceeds adds a new invariant that must be true when passed
538 -- to alwaysSucceeds, at the end of the current transaction, and at
539 -- the end of every subsequent transaction.  If it fails at any
540 -- of those points then the transaction violating it is aborted
541 -- and the exception raised by the invariant is propagated.
542 alwaysSucceeds :: STM a -> STM ()
543 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
544                       checkInv i
545
546 -- | always is a variant of alwaysSucceeds in which the invariant is
547 -- expressed as an STM Bool action that must return True.  Returning
548 -- False or raising an exception are both treated as invariant failures.
549 always :: STM Bool -> STM ()
550 always i = alwaysSucceeds ( do v <- i
551                                if (v) then return () else ( error "Transacional invariant violation" ) )
552
553 -- |Shared memory locations that support atomic memory transactions.
554 data TVar a = TVar (TVar# RealWorld a)
555
556 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
557
558 instance Eq (TVar a) where
559         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
560
561 -- |Create a new TVar holding a value supplied
562 newTVar :: a -> STM (TVar a)
563 newTVar val = STM $ \s1# ->
564     case newTVar# val s1# of
565          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
566
567 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
568 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
569 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
570 -- possible.
571 newTVarIO :: a -> IO (TVar a)
572 newTVarIO val = IO $ \s1# ->
573     case newTVar# val s1# of
574          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
575
576 -- |Return the current value stored in a TVar.
577 -- This is equivalent to
578 --
579 -- >  readTVarIO = atomically . readTVar
580 --
581 -- but works much faster, because it doesn't perform a complete
582 -- transaction, it just reads the current value of the 'TVar'.
583 readTVarIO :: TVar a -> IO a
584 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
585
586 -- |Return the current value stored in a TVar
587 readTVar :: TVar a -> STM a
588 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
589
590 -- |Write the supplied value into a TVar
591 writeTVar :: TVar a -> a -> STM ()
592 writeTVar (TVar tvar#) val = STM $ \s1# ->
593     case writeTVar# tvar# val s1# of
594          s2# -> (# s2#, () #)
595   
596 \end{code}
597
598 %************************************************************************
599 %*                                                                      *
600 \subsection[mvars]{M-Structures}
601 %*                                                                      *
602 %************************************************************************
603
604 M-Vars are rendezvous points for concurrent threads.  They begin
605 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
606 is written, a single blocked thread may be freed.  Reading an M-Var
607 toggles its state from full back to empty.  Therefore, any value
608 written to an M-Var may only be read once.  Multiple reads and writes
609 are allowed, but there must be at least one read between any two
610 writes.
611
612 \begin{code}
613 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
614
615 -- |Create an 'MVar' which is initially empty.
616 newEmptyMVar  :: IO (MVar a)
617 newEmptyMVar = IO $ \ s# ->
618     case newMVar# s# of
619          (# s2#, svar# #) -> (# s2#, MVar svar# #)
620
621 -- |Create an 'MVar' which contains the supplied value.
622 newMVar :: a -> IO (MVar a)
623 newMVar value =
624     newEmptyMVar        >>= \ mvar ->
625     putMVar mvar value  >>
626     return mvar
627
628 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
629 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
630 -- the 'MVar' is left empty.
631 -- 
632 -- There are two further important properties of 'takeMVar':
633 --
634 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
635 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
636 --     only one thread will be woken up.  The runtime guarantees that
637 --     the woken thread completes its 'takeMVar' operation.
638 --
639 --   * When multiple threads are blocked on an 'MVar', they are
640 --     woken up in FIFO order.  This is useful for providing
641 --     fairness properties of abstractions built using 'MVar's.
642 --
643 takeMVar :: MVar a -> IO a
644 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
645
646 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
647 -- 'putMVar' will wait until it becomes empty.
648 --
649 -- There are two further important properties of 'putMVar':
650 --
651 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
652 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
653 --     only one thread will be woken up.  The runtime guarantees that
654 --     the woken thread completes its 'putMVar' operation.
655 --
656 --   * When multiple threads are blocked on an 'MVar', they are
657 --     woken up in FIFO order.  This is useful for providing
658 --     fairness properties of abstractions built using 'MVar's.
659 --
660 putMVar  :: MVar a -> a -> IO ()
661 putMVar (MVar mvar#) x = IO $ \ s# ->
662     case putMVar# mvar# x s# of
663         s2# -> (# s2#, () #)
664
665 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
666 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
667 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
668 -- the 'MVar' is left empty.
669 tryTakeMVar :: MVar a -> IO (Maybe a)
670 tryTakeMVar (MVar m) = IO $ \ s ->
671     case tryTakeMVar# m s of
672         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
673         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
674
675 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
676 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
677 -- it was successful, or 'False' otherwise.
678 tryPutMVar  :: MVar a -> a -> IO Bool
679 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
680     case tryPutMVar# mvar# x s# of
681         (# s, 0# #) -> (# s, False #)
682         (# s, _  #) -> (# s, True #)
683
684 -- |Check whether a given 'MVar' is empty.
685 --
686 -- Notice that the boolean value returned  is just a snapshot of
687 -- the state of the MVar. By the time you get to react on its result,
688 -- the MVar may have been filled (or emptied) - so be extremely
689 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
690 isEmptyMVar :: MVar a -> IO Bool
691 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
692     case isEmptyMVar# mv# s# of
693         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
694
695 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
696 -- "System.Mem.Weak" for more about finalizers.
697 addMVarFinalizer :: MVar a -> IO () -> IO ()
698 addMVarFinalizer (MVar m) finalizer = 
699   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
700 \end{code}
701
702
703 %************************************************************************
704 %*                                                                      *
705 \subsection{Thread waiting}
706 %*                                                                      *
707 %************************************************************************
708
709 \begin{code}
710 #ifdef mingw32_HOST_OS
711
712 -- Note: threadWaitRead and threadWaitWrite aren't really functional
713 -- on Win32, but left in there because lib code (still) uses them (the manner
714 -- in which they're used doesn't cause problems on a Win32 platform though.)
715
716 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
717 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
718   IO $ \s -> case asyncRead# fd isSock len buf s of 
719                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
720
721 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
722 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
723   IO $ \s -> case asyncWrite# fd isSock len buf s of 
724                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
725
726 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
727 asyncDoProc (FunPtr proc) (Ptr param) = 
728     -- the 'length' value is ignored; simplifies implementation of
729     -- the async*# primops to have them all return the same result.
730   IO $ \s -> case asyncDoProc# proc param s  of 
731                (# s', _len#, err# #) -> (# s', I# err# #)
732
733 -- to aid the use of these primops by the IO Handle implementation,
734 -- provide the following convenience funs:
735
736 -- this better be a pinned byte array!
737 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
738 asyncReadBA fd isSock len off bufB = 
739   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
740   
741 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
742 asyncWriteBA fd isSock len off bufB = 
743   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
744
745 #endif
746
747 -- -----------------------------------------------------------------------------
748 -- Thread IO API
749
750 -- | Block the current thread until data is available to read on the
751 -- given file descriptor (GHC only).
752 threadWaitRead :: Fd -> IO ()
753 threadWaitRead fd
754 #ifndef mingw32_HOST_OS
755   | threaded  = waitForReadEvent fd
756 #endif
757   | otherwise = IO $ \s -> 
758         case fromIntegral fd of { I# fd# ->
759         case waitRead# fd# s of { s' -> (# s', () #)
760         }}
761
762 -- | Block the current thread until data can be written to the
763 -- given file descriptor (GHC only).
764 threadWaitWrite :: Fd -> IO ()
765 threadWaitWrite fd
766 #ifndef mingw32_HOST_OS
767   | threaded  = waitForWriteEvent fd
768 #endif
769   | otherwise = IO $ \s -> 
770         case fromIntegral fd of { I# fd# ->
771         case waitWrite# fd# s of { s' -> (# s', () #)
772         }}
773
774 -- | Suspends the current thread for a given number of microseconds
775 -- (GHC only).
776 --
777 -- There is no guarantee that the thread will be rescheduled promptly
778 -- when the delay has expired, but the thread will never continue to
779 -- run /earlier/ than specified.
780 --
781 threadDelay :: Int -> IO ()
782 threadDelay time
783   | threaded  = waitForDelayEvent time
784   | otherwise = IO $ \s -> 
785         case fromIntegral time of { I# time# ->
786         case delay# time# s of { s' -> (# s', () #)
787         }}
788
789
790 -- | Set the value of returned TVar to True after a given number of
791 -- microseconds. The caveats associated with threadDelay also apply.
792 --
793 registerDelay :: Int -> IO (TVar Bool)
794 registerDelay usecs 
795   | threaded = waitForDelayEventSTM usecs
796   | otherwise = error "registerDelay: requires -threaded"
797
798 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
799
800 waitForDelayEvent :: Int -> IO ()
801 waitForDelayEvent usecs = do
802   m <- newEmptyMVar
803   target <- calculateTarget usecs
804   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
805   prodServiceThread
806   takeMVar m
807
808 -- Delays for use in STM
809 waitForDelayEventSTM :: Int -> IO (TVar Bool)
810 waitForDelayEventSTM usecs = do
811    t <- atomically $ newTVar False
812    target <- calculateTarget usecs
813    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
814    prodServiceThread
815    return t  
816     
817 calculateTarget :: Int -> IO USecs
818 calculateTarget usecs = do
819     now <- getUSecOfDay
820     return $ now + (fromIntegral usecs)
821
822
823 -- ----------------------------------------------------------------------------
824 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
825
826 -- In the threaded RTS, we employ a single IO Manager thread to wait
827 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
828 -- and delays (threadDelay).  
829 --
830 -- We can do this because in the threaded RTS the IO Manager can make
831 -- a non-blocking call to select(), so we don't have to do select() in
832 -- the scheduler as we have to in the non-threaded RTS.  We get performance
833 -- benefits from doing it this way, because we only have to restart the select()
834 -- when a new request arrives, rather than doing one select() each time
835 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
836 -- by not having to check for completed IO requests.
837
838 -- Issues, possible problems:
839 --
840 --      - we might want bound threads to just do the blocking
841 --        operation rather than communicating with the IO manager
842 --        thread.  This would prevent simgle-threaded programs which do
843 --        IO from requiring multiple OS threads.  However, it would also
844 --        prevent bound threads waiting on IO from being killed or sent
845 --        exceptions.
846 --
847 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
848 --        I couldn't repeat this.
849 --
850 --      - How do we handle signal delivery in the multithreaded RTS?
851 --
852 --      - forkProcess will kill the IO manager thread.  Let's just
853 --        hope we don't need to do any blocking IO between fork & exec.
854
855 #ifndef mingw32_HOST_OS
856 data IOReq
857   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
858   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
859 #endif
860
861 data DelayReq
862   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
863   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
864
865 #ifndef mingw32_HOST_OS
866 pendingEvents :: IORef [IOReq]
867 #endif
868 pendingDelays :: IORef [DelayReq]
869         -- could use a strict list or array here
870 {-# NOINLINE pendingEvents #-}
871 {-# NOINLINE pendingDelays #-}
872 (pendingEvents,pendingDelays) = unsafePerformIO $ do
873   startIOManagerThread
874   reqs <- newIORef []
875   dels <- newIORef []
876   return (reqs, dels)
877         -- the first time we schedule an IO request, the service thread
878         -- will be created (cool, huh?)
879
880 ensureIOManagerIsRunning :: IO ()
881 ensureIOManagerIsRunning 
882   | threaded  = seq pendingEvents $ return ()
883   | otherwise = return ()
884
885 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
886 insertDelay d [] = [d]
887 insertDelay d1 ds@(d2 : rest)
888   | delayTime d1 <= delayTime d2 = d1 : ds
889   | otherwise                    = d2 : insertDelay d1 rest
890
891 delayTime :: DelayReq -> USecs
892 delayTime (Delay t _) = t
893 delayTime (DelaySTM t _) = t
894
895 type USecs = Word64
896
897 -- XXX: move into GHC.IOBase from Data.IORef?
898 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
899 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
900
901 foreign import ccall unsafe "getUSecOfDay" 
902   getUSecOfDay :: IO USecs
903
904 prodding :: IORef Bool
905 {-# NOINLINE prodding #-}
906 prodding = unsafePerformIO (newIORef False)
907
908 prodServiceThread :: IO ()
909 prodServiceThread = do
910   was_set <- atomicModifyIORef prodding (\a -> (True,a))
911   if (not (was_set)) then wakeupIOManager else return ()
912
913 #ifdef mingw32_HOST_OS
914 -- ----------------------------------------------------------------------------
915 -- Windows IO manager thread
916
917 startIOManagerThread :: IO ()
918 startIOManagerThread = do
919   wakeup <- c_getIOManagerEvent
920   forkIO $ service_loop wakeup []
921   return ()
922
923 service_loop :: HANDLE          -- read end of pipe
924              -> [DelayReq]      -- current delay requests
925              -> IO ()
926
927 service_loop wakeup old_delays = do
928   -- pick up new delay requests
929   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
930   let  delays = foldr insertDelay old_delays new_delays
931
932   now <- getUSecOfDay
933   (delays', timeout) <- getDelay now delays
934
935   r <- c_WaitForSingleObject wakeup timeout
936   case r of
937     0xffffffff -> do c_maperrno; throwErrno "service_loop"
938     0 -> do
939         r2 <- c_readIOManagerEvent
940         exit <- 
941               case r2 of
942                 _ | r2 == io_MANAGER_WAKEUP -> return False
943                 _ | r2 == io_MANAGER_DIE    -> return True
944                 0 -> return False -- spurious wakeup
945                 _ -> do start_console_handler (r2 `shiftR` 1); return False
946         if exit
947           then return ()
948           else service_cont wakeup delays'
949
950     _other -> service_cont wakeup delays' -- probably timeout        
951
952 service_cont :: HANDLE -> [DelayReq] -> IO ()
953 service_cont wakeup delays = do
954   atomicModifyIORef prodding (\_ -> (False,False))
955   service_loop wakeup delays
956
957 -- must agree with rts/win32/ThrIOManager.c
958 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
959 io_MANAGER_WAKEUP = 0xffffffff
960 io_MANAGER_DIE    = 0xfffffffe
961
962 data ConsoleEvent
963  = ControlC
964  | Break
965  | Close
966     -- these are sent to Services only.
967  | Logoff
968  | Shutdown
969  deriving (Eq, Ord, Enum, Show, Read, Typeable)
970
971 start_console_handler :: Word32 -> IO ()
972 start_console_handler r =
973   case toWin32ConsoleEvent r of
974      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
975                     forkIO (handler x)
976                     return ()
977      Nothing -> return ()
978
979 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
980 toWin32ConsoleEvent ev = 
981    case ev of
982        0 {- CTRL_C_EVENT-}        -> Just ControlC
983        1 {- CTRL_BREAK_EVENT-}    -> Just Break
984        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
985        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
986        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
987        _ -> Nothing
988
989 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
990 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
991
992 -- XXX Is this actually needed?
993 stick :: IORef HANDLE
994 {-# NOINLINE stick #-}
995 stick = unsafePerformIO (newIORef nullPtr)
996
997 wakeupIOManager :: IO ()
998 wakeupIOManager = do 
999   _hdl <- readIORef stick
1000   c_sendIOManagerEvent io_MANAGER_WAKEUP
1001
1002 -- Walk the queue of pending delays, waking up any that have passed
1003 -- and return the smallest delay to wait for.  The queue of pending
1004 -- delays is kept ordered.
1005 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1006 getDelay _   [] = return ([], iNFINITE)
1007 getDelay now all@(d : rest) 
1008   = case d of
1009      Delay time m | now >= time -> do
1010         putMVar m ()
1011         getDelay now rest
1012      DelaySTM time t | now >= time -> do
1013         atomically $ writeTVar t True
1014         getDelay now rest
1015      _otherwise ->
1016         -- delay is in millisecs for WaitForSingleObject
1017         let micro_seconds = delayTime d - now
1018             milli_seconds = (micro_seconds + 999) `div` 1000
1019         in return (all, fromIntegral milli_seconds)
1020
1021 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1022 -- available yet.  We should move some Win32 functionality down here,
1023 -- maybe as part of the grand reorganisation of the base package...
1024 type HANDLE       = Ptr ()
1025 type DWORD        = Word32
1026
1027 iNFINITE :: DWORD
1028 iNFINITE = 0xFFFFFFFF -- urgh
1029
1030 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1031   c_getIOManagerEvent :: IO HANDLE
1032
1033 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1034   c_readIOManagerEvent :: IO Word32
1035
1036 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1037   c_sendIOManagerEvent :: Word32 -> IO ()
1038
1039 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1040    c_maperrno :: IO ()
1041
1042 foreign import stdcall "WaitForSingleObject"
1043    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1044
1045 #else
1046 -- ----------------------------------------------------------------------------
1047 -- Unix IO manager thread, using select()
1048
1049 startIOManagerThread :: IO ()
1050 startIOManagerThread = do
1051         allocaArray 2 $ \fds -> do
1052         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1053         rd_end <- peekElemOff fds 0
1054         wr_end <- peekElemOff fds 1
1055         setNonBlockingFD wr_end -- writes happen in a signal handler, we
1056                                 -- don't want them to block.
1057         setCloseOnExec rd_end
1058         setCloseOnExec wr_end
1059         writeIORef stick (fromIntegral wr_end)
1060         c_setIOManagerPipe wr_end
1061         forkIO $ do
1062             allocaBytes sizeofFdSet   $ \readfds -> do
1063             allocaBytes sizeofFdSet   $ \writefds -> do 
1064             allocaBytes sizeofTimeVal $ \timeval -> do
1065             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1066         return ()
1067
1068 service_loop
1069    :: Fd                -- listen to this for wakeup calls
1070    -> Ptr CFdSet
1071    -> Ptr CFdSet
1072    -> Ptr CTimeVal
1073    -> [IOReq]
1074    -> [DelayReq]
1075    -> IO ()
1076 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1077
1078   -- pick up new IO requests
1079   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1080   let reqs = new_reqs ++ old_reqs
1081
1082   -- pick up new delay requests
1083   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1084   let  delays0 = foldr insertDelay old_delays new_delays
1085
1086   -- build the FDSets for select()
1087   fdZero readfds
1088   fdZero writefds
1089   fdSet wakeup readfds
1090   maxfd <- buildFdSets 0 readfds writefds reqs
1091
1092   -- perform the select()
1093   let do_select delays = do
1094           -- check the current time and wake up any thread in
1095           -- threadDelay whose timeout has expired.  Also find the
1096           -- timeout value for the select() call.
1097           now <- getUSecOfDay
1098           (delays', timeout) <- getDelay now ptimeval delays
1099
1100           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1101                         nullPtr timeout
1102           if (res == -1)
1103              then do
1104                 err <- getErrno
1105                 case err of
1106                   _ | err == eINTR ->  do_select delays'
1107                         -- EINTR: just redo the select()
1108                   _ | err == eBADF ->  return (True, delays)
1109                         -- EBADF: one of the file descriptors is closed or bad,
1110                         -- we don't know which one, so wake everyone up.
1111                   _ | otherwise    ->  throwErrno "select"
1112                         -- otherwise (ENOMEM or EINVAL) something has gone
1113                         -- wrong; report the error.
1114              else
1115                 return (False,delays')
1116
1117   (wakeup_all,delays') <- do_select delays0
1118
1119   exit <-
1120     if wakeup_all then return False
1121       else do
1122         b <- fdIsSet wakeup readfds
1123         if b == 0 
1124           then return False
1125           else alloca $ \p -> do 
1126                  c_read (fromIntegral wakeup) p 1
1127                  s <- peek p            
1128                  case s of
1129                   _ | s == io_MANAGER_WAKEUP -> return False
1130                   _ | s == io_MANAGER_DIE    -> return True
1131                   _ | s == io_MANAGER_SYNC   -> do
1132                        mvars <- readIORef sync
1133                        mapM_ (flip putMVar ()) mvars
1134                        return False
1135                   _ -> do
1136                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1137                        withForeignPtr fp $ \p_siginfo -> do
1138                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1139                                  sizeof_siginfo_t
1140                          when (r /= fromIntegral sizeof_siginfo_t) $
1141                             error "failed to read siginfo_t"
1142                        runHandlers' fp (fromIntegral s)
1143                        return False
1144
1145   if exit then return () else do
1146
1147   atomicModifyIORef prodding (\_ -> (False,False))
1148
1149   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1150                          else completeRequests reqs readfds writefds []
1151
1152   service_loop wakeup readfds writefds ptimeval reqs' delays'
1153
1154 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar
1155 io_MANAGER_WAKEUP = 0xff
1156 io_MANAGER_DIE    = 0xfe
1157 io_MANAGER_SYNC   = 0xfd
1158
1159 -- | the stick is for poking the IO manager with
1160 stick :: IORef Fd
1161 {-# NOINLINE stick #-}
1162 stick = unsafePerformIO (newIORef 0)
1163
1164 {-# NOINLINE sync #-}
1165 sync :: IORef [MVar ()]
1166 sync = unsafePerformIO (newIORef [])
1167
1168 -- waits for the IO manager to drain the pipe
1169 syncIOManager :: IO ()
1170 syncIOManager = do
1171   m <- newEmptyMVar
1172   atomicModifyIORef sync (\old -> (m:old,()))
1173   fd <- readIORef stick
1174   with io_MANAGER_SYNC $ \pbuf -> do 
1175     c_write (fromIntegral fd) pbuf 1; return ()
1176   takeMVar m
1177
1178 wakeupIOManager :: IO ()
1179 wakeupIOManager = do
1180   fd <- readIORef stick
1181   with io_MANAGER_WAKEUP $ \pbuf -> do 
1182     c_write (fromIntegral fd) pbuf 1; return ()
1183
1184 -- For the non-threaded RTS
1185 runHandlers :: Ptr Word8 -> Int -> IO ()
1186 runHandlers p_info sig = do
1187   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1188   withForeignPtr fp $ \p -> do
1189     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1190     free p_info
1191   runHandlers' fp (fromIntegral sig)
1192
1193 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1194 runHandlers' p_info sig = do
1195   let int = fromIntegral sig
1196   withMVar signal_handlers $ \arr ->
1197       if not (inRange (boundsIOArray arr) int)
1198          then return ()
1199          else do handler <- unsafeReadIOArray arr int
1200                  case handler of
1201                     Nothing -> return ()
1202                     Just (f,_)  -> do forkIO (f p_info); return ()
1203
1204 foreign import ccall "setIOManagerPipe"
1205   c_setIOManagerPipe :: CInt -> IO ()
1206
1207 foreign import ccall "__hscore_sizeof_siginfo_t"
1208   sizeof_siginfo_t :: CSize
1209
1210 type Signal = CInt
1211
1212 maxSig = 64 :: Int
1213
1214 type HandlerFun = ForeignPtr Word8 -> IO ()
1215
1216 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1217 -- this race condition is #1922, although that bug was on Windows a similar
1218 -- bug also exists on Unix.
1219 {-# NOINLINE signal_handlers #-}
1220 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1221 signal_handlers = unsafePerformIO $ do
1222    arr <- newIOArray (0,maxSig) Nothing
1223    newMVar arr
1224
1225 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1226 setHandler sig handler = do
1227   let int = fromIntegral sig
1228   withMVar signal_handlers $ \arr -> 
1229      if not (inRange (boundsIOArray arr) int)
1230         then error "GHC.Conc.setHandler: signal out of range"
1231         else do old <- unsafeReadIOArray arr int
1232                 unsafeWriteIOArray arr int handler
1233                 return old
1234
1235 -- -----------------------------------------------------------------------------
1236 -- IO requests
1237
1238 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1239 buildFdSets maxfd _       _        [] = return maxfd
1240 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1241   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1242   | otherwise        =  do
1243         fdSet fd readfds
1244         buildFdSets (max maxfd fd) readfds writefds reqs
1245 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1246   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1247   | otherwise        =  do
1248         fdSet fd writefds
1249         buildFdSets (max maxfd fd) readfds writefds reqs
1250
1251 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1252                  -> IO [IOReq]
1253 completeRequests [] _ _ reqs' = return reqs'
1254 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1255   b <- fdIsSet fd readfds
1256   if b /= 0
1257     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1258     else completeRequests reqs readfds writefds (Read fd m : reqs')
1259 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1260   b <- fdIsSet fd writefds
1261   if b /= 0
1262     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1263     else completeRequests reqs readfds writefds (Write fd m : reqs')
1264
1265 wakeupAll :: [IOReq] -> IO ()
1266 wakeupAll [] = return ()
1267 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1268 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1269
1270 waitForReadEvent :: Fd -> IO ()
1271 waitForReadEvent fd = do
1272   m <- newEmptyMVar
1273   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1274   prodServiceThread
1275   takeMVar m
1276
1277 waitForWriteEvent :: Fd -> IO ()
1278 waitForWriteEvent fd = do
1279   m <- newEmptyMVar
1280   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1281   prodServiceThread
1282   takeMVar m
1283
1284 -- -----------------------------------------------------------------------------
1285 -- Delays
1286
1287 -- Walk the queue of pending delays, waking up any that have passed
1288 -- and return the smallest delay to wait for.  The queue of pending
1289 -- delays is kept ordered.
1290 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1291 getDelay _   _        [] = return ([],nullPtr)
1292 getDelay now ptimeval all@(d : rest) 
1293   = case d of
1294      Delay time m | now >= time -> do
1295         putMVar m ()
1296         getDelay now ptimeval rest
1297      DelaySTM time t | now >= time -> do
1298         atomically $ writeTVar t True
1299         getDelay now ptimeval rest
1300      _otherwise -> do
1301         setTimevalTicks ptimeval (delayTime d - now)
1302         return (all,ptimeval)
1303
1304 data CTimeVal
1305
1306 foreign import ccall unsafe "sizeofTimeVal"
1307   sizeofTimeVal :: Int
1308
1309 foreign import ccall unsafe "setTimevalTicks" 
1310   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1311
1312 {- 
1313   On Win32 we're going to have a single Pipe, and a
1314   waitForSingleObject with the delay time.  For signals, we send a
1315   byte down the pipe just like on Unix.
1316 -}
1317
1318 -- ----------------------------------------------------------------------------
1319 -- select() interface
1320
1321 -- ToDo: move to System.Posix.Internals?
1322
1323 data CFdSet
1324
1325 foreign import ccall safe "select"
1326   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1327            -> IO CInt
1328
1329 foreign import ccall unsafe "hsFD_SETSIZE"
1330   c_fD_SETSIZE :: CInt
1331
1332 fD_SETSIZE :: Fd
1333 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1334
1335 foreign import ccall unsafe "hsFD_ISSET"
1336   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1337
1338 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1339 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1340
1341 foreign import ccall unsafe "hsFD_SET"
1342   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1343
1344 fdSet :: Fd -> Ptr CFdSet -> IO ()
1345 fdSet (Fd fd) fdset = c_fdSet fd fdset
1346
1347 foreign import ccall unsafe "hsFD_ZERO"
1348   fdZero :: Ptr CFdSet -> IO ()
1349
1350 foreign import ccall unsafe "sizeof_fd_set"
1351   sizeofFdSet :: Int
1352
1353 #endif
1354
1355 reportStackOverflow :: IO a
1356 reportStackOverflow = do callStackOverflowHook; return undefined
1357
1358 reportError :: SomeException -> IO a
1359 reportError ex = do
1360    handler <- getUncaughtExceptionHandler
1361    handler ex
1362    return undefined
1363
1364 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1365 -- the unsafe below.
1366 foreign import ccall unsafe "stackOverflow"
1367         callStackOverflowHook :: IO ()
1368
1369 {-# NOINLINE uncaughtExceptionHandler #-}
1370 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1371 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1372    where
1373       defaultHandler :: SomeException -> IO ()
1374       defaultHandler se@(SomeException ex) = do
1375          (hFlush stdout) `catchAny` (\ _ -> return ())
1376          let msg = case cast ex of
1377                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1378                _ -> case cast ex of
1379                     Just (ErrorCall s) -> s
1380                     _                  -> showsPrec 0 se ""
1381          withCString "%s" $ \cfmt ->
1382           withCString msg $ \cmsg ->
1383             errorBelch cfmt cmsg
1384
1385 -- don't use errorBelch() directly, because we cannot call varargs functions
1386 -- using the FFI.
1387 foreign import ccall unsafe "HsBase.h errorBelch2"
1388    errorBelch :: CString -> CString -> IO ()
1389
1390 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1391 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1392
1393 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1394 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1395
1396
1397 withMVar :: MVar a -> (a -> IO b) -> IO b
1398 withMVar m io = 
1399   block $ do
1400     a <- takeMVar m
1401     b <- catchAny (unblock (io a))
1402             (\e -> do putMVar m a; throw e)
1403     putMVar m a
1404     return b
1405 \end{code}