7e8b9161c0d7827afc3e1f25efa5f89716c0df9f
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * MVars
54         , MVar(..)
55         , newMVar       -- :: a -> IO (MVar a)
56         , newEmptyMVar  -- :: IO (MVar a)
57         , takeMVar      -- :: MVar a -> IO a
58         , putMVar       -- :: MVar a -> a -> IO ()
59         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
60         , tryPutMVar    -- :: MVar a -> a -> IO Bool
61         , isEmptyMVar   -- :: MVar a -> IO Bool
62         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
63
64         -- * TVars
65         , STM(..)
66         , atomically    -- :: STM a -> IO a
67         , retry         -- :: STM a
68         , orElse        -- :: STM a -> STM a -> STM a
69         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
70         , alwaysSucceeds -- :: STM a -> STM ()
71         , always        -- :: STM Bool -> STM ()
72         , TVar(..)
73         , newTVar       -- :: a -> STM (TVar a)
74         , newTVarIO     -- :: a -> STM (TVar a)
75         , readTVar      -- :: TVar a -> STM a
76         , readTVarIO    -- :: TVar a -> IO a
77         , writeTVar     -- :: a -> TVar a -> STM ()
78         , unsafeIOToSTM -- :: IO a -> STM a
79
80         -- * Miscellaneous
81 #ifdef mingw32_HOST_OS
82         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
84         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85
86         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
88 #endif
89
90 #ifndef mingw32_HOST_OS
91         , Signal, HandlerFun, setHandler, runHandlers
92 #endif
93
94         , ensureIOManagerIsRunning
95         , syncIOManager
96
97 #ifdef mingw32_HOST_OS
98         , ConsoleEvent(..)
99         , win32ConsoleHandler
100         , toWin32ConsoleEvent
101 #endif
102         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
103         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
104
105         , reportError, reportStackOverflow
106         ) where
107
108 import System.Posix.Types
109 #ifndef mingw32_HOST_OS
110 import System.Posix.Internals
111 #endif
112 import Foreign
113 import Foreign.C
114
115 import Data.Dynamic
116 import Data.Maybe
117 import Control.Monad
118
119 import GHC.Base
120 import {-# SOURCE #-} GHC.Handle
121 import GHC.IOBase
122 import GHC.Num          ( Num(..) )
123 import GHC.Real         ( fromIntegral )
124 import GHC.Arr          ( inRange )
125 #ifdef mingw32_HOST_OS
126 import GHC.Real         ( div )
127 import GHC.Ptr          ( plusPtr, FunPtr(..) )
128 #endif
129 #ifdef mingw32_HOST_OS
130 import GHC.Read         ( Read )
131 import GHC.Enum         ( Enum )
132 #endif
133 import GHC.Exception    ( SomeException(..), throw )
134 import GHC.Pack         ( packCString# )
135 import GHC.Ptr          ( Ptr(..) )
136 import GHC.STRef
137 import GHC.Show         ( Show(..), showString )
138 import Data.Typeable
139 import GHC.Err
140
141 infixr 0 `par`, `pseq`
142 \end{code}
143
144 %************************************************************************
145 %*                                                                      *
146 \subsection{@ThreadId@, @par@, and @fork@}
147 %*                                                                      *
148 %************************************************************************
149
150 \begin{code}
151 data ThreadId = ThreadId ThreadId# deriving( Typeable )
152 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
153 -- But since ThreadId# is unlifted, the Weak type must use open
154 -- type variables.
155 {- ^
156 A 'ThreadId' is an abstract type representing a handle to a thread.
157 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
158 the 'Ord' instance implements an arbitrary total ordering over
159 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
160 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
161 useful when debugging or diagnosing the behaviour of a concurrent
162 program.
163
164 /Note/: in GHC, if you have a 'ThreadId', you essentially have
165 a pointer to the thread itself.  This means the thread itself can\'t be
166 garbage collected until you drop the 'ThreadId'.
167 This misfeature will hopefully be corrected at a later date.
168
169 /Note/: Hugs does not provide any operations on other threads;
170 it defines 'ThreadId' as a synonym for ().
171 -}
172
173 instance Show ThreadId where
174    showsPrec d t = 
175         showString "ThreadId " . 
176         showsPrec d (getThreadId (id2TSO t))
177
178 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
179
180 id2TSO :: ThreadId -> ThreadId#
181 id2TSO (ThreadId t) = t
182
183 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
184 -- Returns -1, 0, 1
185
186 cmpThread :: ThreadId -> ThreadId -> Ordering
187 cmpThread t1 t2 = 
188    case cmp_thread (id2TSO t1) (id2TSO t2) of
189       -1 -> LT
190       0  -> EQ
191       _  -> GT -- must be 1
192
193 instance Eq ThreadId where
194    t1 == t2 = 
195       case t1 `cmpThread` t2 of
196          EQ -> True
197          _  -> False
198
199 instance Ord ThreadId where
200    compare = cmpThread
201
202 {- |
203 Sparks off a new thread to run the 'IO' computation passed as the
204 first argument, and returns the 'ThreadId' of the newly created
205 thread.
206
207 The new thread will be a lightweight thread; if you want to use a foreign
208 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
209
210 GHC note: the new thread inherits the /blocked/ state of the parent 
211 (see 'Control.Exception.block').
212
213 The newly created thread has an exception handler that discards the
214 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
215 'ThreadKilled', and passes all other exceptions to the uncaught
216 exception handler (see 'setUncaughtExceptionHandler').
217 -}
218 forkIO :: IO () -> IO ThreadId
219 forkIO action = IO $ \ s -> 
220    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
221  where
222   action_plus = catchException action childHandler
223
224 {- |
225 Like 'forkIO', but lets you specify on which CPU the thread is
226 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
227 will stay on the same CPU for its entire lifetime (`forkIO` threads
228 can migrate between CPUs according to the scheduling policy).
229 `forkOnIO` is useful for overriding the scheduling policy when you
230 know in advance how best to distribute the threads.
231
232 The `Int` argument specifies the CPU number; it is interpreted modulo
233 'numCapabilities' (note that it actually specifies a capability number
234 rather than a CPU number, but to a first approximation the two are
235 equivalent).
236 -}
237 forkOnIO :: Int -> IO () -> IO ThreadId
238 forkOnIO (I# cpu) action = IO $ \ s -> 
239    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
240  where
241   action_plus = catchException action childHandler
242
243 -- | the value passed to the @+RTS -N@ flag.  This is the number of
244 -- Haskell threads that can run truly simultaneously at any given
245 -- time, and is typically set to the number of physical CPU cores on
246 -- the machine.
247 numCapabilities :: Int
248 numCapabilities = unsafePerformIO $  do 
249                     n <- peek n_capabilities
250                     return (fromIntegral n)
251
252 #if defined(mingw32_HOST_OS) && defined(__PIC__)
253 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
254 #else
255 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
256 #endif
257 childHandler :: SomeException -> IO ()
258 childHandler err = catchException (real_handler err) childHandler
259
260 real_handler :: SomeException -> IO ()
261 real_handler se@(SomeException ex) =
262   -- ignore thread GC and killThread exceptions:
263   case cast ex of
264   Just BlockedOnDeadMVar                -> return ()
265   _ -> case cast ex of
266        Just BlockedIndefinitely         -> return ()
267        _ -> case cast ex of
268             Just ThreadKilled           -> return ()
269             _ -> case cast ex of
270                  -- report all others:
271                  Just StackOverflow     -> reportStackOverflow
272                  _                      -> reportError se
273
274 {- | 'killThread' terminates the given thread (GHC only).
275 Any work already done by the thread isn\'t
276 lost: the computation is suspended until required by another thread.
277 The memory used by the thread will be garbage collected if it isn\'t
278 referenced from anywhere.  The 'killThread' function is defined in
279 terms of 'throwTo':
280
281 > killThread tid = throwTo tid ThreadKilled
282
283 Killthread is a no-op if the target thread has already completed.
284 -}
285 killThread :: ThreadId -> IO ()
286 killThread tid = throwTo tid ThreadKilled
287
288 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
289
290 'throwTo' does not return until the exception has been raised in the
291 target thread. 
292 The calling thread can thus be certain that the target
293 thread has received the exception.  This is a useful property to know
294 when dealing with race conditions: eg. if there are two threads that
295 can kill each other, it is guaranteed that only one of the threads
296 will get to kill the other.
297
298 If the target thread is currently making a foreign call, then the
299 exception will not be raised (and hence 'throwTo' will not return)
300 until the call has completed.  This is the case regardless of whether
301 the call is inside a 'block' or not.
302
303 Important note: the behaviour of 'throwTo' differs from that described in
304 the paper \"Asynchronous exceptions in Haskell\"
305 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
306 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
307 a more synchronous design in which 'throwTo' does not return until the exception
308 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
309 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
310 the paper).
311
312 There is currently no guarantee that the exception delivered by 'throwTo' will be
313 delivered at the first possible opportunity.  In particular, a thread may 
314 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
315 a pending 'throwTo'.  This is arguably undesirable behaviour.
316
317  -}
318 throwTo :: Exception e => ThreadId -> e -> IO ()
319 throwTo (ThreadId tid) ex = IO $ \ s ->
320    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
321
322 -- | Returns the 'ThreadId' of the calling thread (GHC only).
323 myThreadId :: IO ThreadId
324 myThreadId = IO $ \s ->
325    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
326
327
328 -- |The 'yield' action allows (forces, in a co-operative multitasking
329 -- implementation) a context-switch to any other currently runnable
330 -- threads (if any), and is occasionally useful when implementing
331 -- concurrency abstractions.
332 yield :: IO ()
333 yield = IO $ \s -> 
334    case (yield# s) of s1 -> (# s1, () #)
335
336 {- | 'labelThread' stores a string as identifier for this thread if
337 you built a RTS with debugging support. This identifier will be used in
338 the debugging output to make distinction of different threads easier
339 (otherwise you only have the thread state object\'s address in the heap).
340
341 Other applications like the graphical Concurrent Haskell Debugger
342 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
343 'labelThread' for their purposes as well.
344 -}
345
346 labelThread :: ThreadId -> String -> IO ()
347 labelThread (ThreadId t) str = IO $ \ s ->
348    let ps  = packCString# str
349        adr = byteArrayContents# ps in
350      case (labelThread# t adr s) of s1 -> (# s1, () #)
351
352 --      Nota Bene: 'pseq' used to be 'seq'
353 --                 but 'seq' is now defined in PrelGHC
354 --
355 -- "pseq" is defined a bit weirdly (see below)
356 --
357 -- The reason for the strange "lazy" call is that
358 -- it fools the compiler into thinking that pseq  and par are non-strict in
359 -- their second argument (even if it inlines pseq at the call site).
360 -- If it thinks pseq is strict in "y", then it often evaluates
361 -- "y" before "x", which is totally wrong.  
362
363 {-# INLINE pseq  #-}
364 pseq :: a -> b -> b
365 pseq  x y = x `seq` lazy y
366
367 {-# INLINE par  #-}
368 par :: a -> b -> b
369 par  x y = case (par# x) of { _ -> lazy y }
370
371 -- | Internal function used by the RTS to run sparks.
372 runSparks :: IO ()
373 runSparks = IO loop
374   where loop s = case getSpark# s of
375                    (# s', n, p #) ->
376                       if n ==# 0# then (# s', () #)
377                                   else p `seq` loop s'
378
379 data BlockReason
380   = BlockedOnMVar
381         -- ^blocked on on 'MVar'
382   | BlockedOnBlackHole
383         -- ^blocked on a computation in progress by another thread
384   | BlockedOnException
385         -- ^blocked in 'throwTo'
386   | BlockedOnSTM
387         -- ^blocked in 'retry' in an STM transaction
388   | BlockedOnForeignCall
389         -- ^currently in a foreign call
390   | BlockedOnOther
391         -- ^blocked on some other resource.  Without @-threaded@,
392         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
393         -- they show up as 'BlockedOnMVar'.
394   deriving (Eq,Ord,Show)
395
396 -- | The current status of a thread
397 data ThreadStatus
398   = ThreadRunning
399         -- ^the thread is currently runnable or running
400   | ThreadFinished
401         -- ^the thread has finished
402   | ThreadBlocked  BlockReason
403         -- ^the thread is blocked on some resource
404   | ThreadDied
405         -- ^the thread received an uncaught exception
406   deriving (Eq,Ord,Show)
407
408 threadStatus :: ThreadId -> IO ThreadStatus
409 threadStatus (ThreadId t) = IO $ \s ->
410    case threadStatus# t s of
411      (# s', stat #) -> (# s', mk_stat (I# stat) #)
412    where
413         -- NB. keep these in sync with includes/Constants.h
414      mk_stat 0  = ThreadRunning
415      mk_stat 1  = ThreadBlocked BlockedOnMVar
416      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
417      mk_stat 3  = ThreadBlocked BlockedOnException
418      mk_stat 7  = ThreadBlocked BlockedOnSTM
419      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
420      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
421      mk_stat 16 = ThreadFinished
422      mk_stat 17 = ThreadDied
423      mk_stat _  = ThreadBlocked BlockedOnOther
424 \end{code}
425
426
427 %************************************************************************
428 %*                                                                      *
429 \subsection[stm]{Transactional heap operations}
430 %*                                                                      *
431 %************************************************************************
432
433 TVars are shared memory locations which support atomic memory
434 transactions.
435
436 \begin{code}
437 -- |A monad supporting atomic memory transactions.
438 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
439
440 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
441 unSTM (STM a) = a
442
443 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
444
445 instance  Functor STM where
446    fmap f x = x >>= (return . f)
447
448 instance  Monad STM  where
449     {-# INLINE return #-}
450     {-# INLINE (>>)   #-}
451     {-# INLINE (>>=)  #-}
452     m >> k      = thenSTM m k
453     return x    = returnSTM x
454     m >>= k     = bindSTM m k
455
456 bindSTM :: STM a -> (a -> STM b) -> STM b
457 bindSTM (STM m) k = STM ( \s ->
458   case m s of 
459     (# new_s, a #) -> unSTM (k a) new_s
460   )
461
462 thenSTM :: STM a -> STM b -> STM b
463 thenSTM (STM m) k = STM ( \s ->
464   case m s of 
465     (# new_s, _ #) -> unSTM k new_s
466   )
467
468 returnSTM :: a -> STM a
469 returnSTM x = STM (\s -> (# s, x #))
470
471 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
472 -- dangerous thing to do.  
473 --
474 --   * The STM implementation will often run transactions multiple
475 --     times, so you need to be prepared for this if your IO has any
476 --     side effects.
477 --
478 --   * The STM implementation will abort transactions that are known to
479 --     be invalid and need to be restarted.  This may happen in the middle
480 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
481 --     that need releasing (exception handlers are ignored when aborting
482 --     the transaction).  That includes doing any IO using Handles, for
483 --     example.  Getting this wrong will probably lead to random deadlocks.
484 --
485 --   * The transaction may have seen an inconsistent view of memory when
486 --     the IO runs.  Invariants that you expect to be true throughout
487 --     your program may not be true inside a transaction, due to the
488 --     way transactions are implemented.  Normally this wouldn't be visible
489 --     to the programmer, but using `unsafeIOToSTM` can expose it.
490 --
491 unsafeIOToSTM :: IO a -> STM a
492 unsafeIOToSTM (IO m) = STM m
493
494 -- |Perform a series of STM actions atomically.
495 --
496 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
497 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
498 -- this would effectively allow a transaction inside a transaction, depending
499 -- on exactly when the thunk is evaluated.)
500 --
501 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
502 -- and which allows top-level TVars to be allocated.
503
504 atomically :: STM a -> IO a
505 atomically (STM m) = IO (\s -> (atomically# m) s )
506
507 -- |Retry execution of the current memory transaction because it has seen
508 -- values in TVars which mean that it should not continue (e.g. the TVars
509 -- represent a shared buffer that is now empty).  The implementation may
510 -- block the thread until one of the TVars that it has read from has been
511 -- udpated. (GHC only)
512 retry :: STM a
513 retry = STM $ \s# -> retry# s#
514
515 -- |Compose two alternative STM actions (GHC only).  If the first action
516 -- completes without retrying then it forms the result of the orElse.
517 -- Otherwise, if the first action retries, then the second action is
518 -- tried in its place.  If both actions retry then the orElse as a
519 -- whole retries.
520 orElse :: STM a -> STM a -> STM a
521 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
522
523 -- |Exception handling within STM actions.
524 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
525 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
526
527 -- | Low-level primitive on which always and alwaysSucceeds are built.
528 -- checkInv differs form these in that (i) the invariant is not 
529 -- checked when checkInv is called, only at the end of this and
530 -- subsequent transcations, (ii) the invariant failure is indicated
531 -- by raising an exception.
532 checkInv :: STM a -> STM ()
533 checkInv (STM m) = STM (\s -> (check# m) s)
534
535 -- | alwaysSucceeds adds a new invariant that must be true when passed
536 -- to alwaysSucceeds, at the end of the current transaction, and at
537 -- the end of every subsequent transaction.  If it fails at any
538 -- of those points then the transaction violating it is aborted
539 -- and the exception raised by the invariant is propagated.
540 alwaysSucceeds :: STM a -> STM ()
541 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
542                       checkInv i
543
544 -- | always is a variant of alwaysSucceeds in which the invariant is
545 -- expressed as an STM Bool action that must return True.  Returning
546 -- False or raising an exception are both treated as invariant failures.
547 always :: STM Bool -> STM ()
548 always i = alwaysSucceeds ( do v <- i
549                                if (v) then return () else ( error "Transacional invariant violation" ) )
550
551 -- |Shared memory locations that support atomic memory transactions.
552 data TVar a = TVar (TVar# RealWorld a)
553
554 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
555
556 instance Eq (TVar a) where
557         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
558
559 -- |Create a new TVar holding a value supplied
560 newTVar :: a -> STM (TVar a)
561 newTVar val = STM $ \s1# ->
562     case newTVar# val s1# of
563          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
564
565 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
566 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
567 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
568 -- possible.
569 newTVarIO :: a -> IO (TVar a)
570 newTVarIO val = IO $ \s1# ->
571     case newTVar# val s1# of
572          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
573
574 -- |Return the current value stored in a TVar.
575 -- This is equivalent to
576 --
577 -- >  readTVarIO = atomically . readTVar
578 --
579 -- but works much faster, because it doesn't perform a complete
580 -- transaction, it just reads the current value of the 'TVar'.
581 readTVarIO :: TVar a -> IO a
582 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
583
584 -- |Return the current value stored in a TVar
585 readTVar :: TVar a -> STM a
586 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
587
588 -- |Write the supplied value into a TVar
589 writeTVar :: TVar a -> a -> STM ()
590 writeTVar (TVar tvar#) val = STM $ \s1# ->
591     case writeTVar# tvar# val s1# of
592          s2# -> (# s2#, () #)
593   
594 \end{code}
595
596 %************************************************************************
597 %*                                                                      *
598 \subsection[mvars]{M-Structures}
599 %*                                                                      *
600 %************************************************************************
601
602 M-Vars are rendezvous points for concurrent threads.  They begin
603 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
604 is written, a single blocked thread may be freed.  Reading an M-Var
605 toggles its state from full back to empty.  Therefore, any value
606 written to an M-Var may only be read once.  Multiple reads and writes
607 are allowed, but there must be at least one read between any two
608 writes.
609
610 \begin{code}
611 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
612
613 -- |Create an 'MVar' which is initially empty.
614 newEmptyMVar  :: IO (MVar a)
615 newEmptyMVar = IO $ \ s# ->
616     case newMVar# s# of
617          (# s2#, svar# #) -> (# s2#, MVar svar# #)
618
619 -- |Create an 'MVar' which contains the supplied value.
620 newMVar :: a -> IO (MVar a)
621 newMVar value =
622     newEmptyMVar        >>= \ mvar ->
623     putMVar mvar value  >>
624     return mvar
625
626 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
627 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
628 -- the 'MVar' is left empty.
629 -- 
630 -- There are two further important properties of 'takeMVar':
631 --
632 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
633 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
634 --     only one thread will be woken up.  The runtime guarantees that
635 --     the woken thread completes its 'takeMVar' operation.
636 --
637 --   * When multiple threads are blocked on an 'MVar', they are
638 --     woken up in FIFO order.  This is useful for providing
639 --     fairness properties of abstractions built using 'MVar's.
640 --
641 takeMVar :: MVar a -> IO a
642 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
643
644 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
645 -- 'putMVar' will wait until it becomes empty.
646 --
647 -- There are two further important properties of 'putMVar':
648 --
649 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
650 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
651 --     only one thread will be woken up.  The runtime guarantees that
652 --     the woken thread completes its 'putMVar' operation.
653 --
654 --   * When multiple threads are blocked on an 'MVar', they are
655 --     woken up in FIFO order.  This is useful for providing
656 --     fairness properties of abstractions built using 'MVar's.
657 --
658 putMVar  :: MVar a -> a -> IO ()
659 putMVar (MVar mvar#) x = IO $ \ s# ->
660     case putMVar# mvar# x s# of
661         s2# -> (# s2#, () #)
662
663 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
664 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
665 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
666 -- the 'MVar' is left empty.
667 tryTakeMVar :: MVar a -> IO (Maybe a)
668 tryTakeMVar (MVar m) = IO $ \ s ->
669     case tryTakeMVar# m s of
670         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
671         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
672
673 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
674 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
675 -- it was successful, or 'False' otherwise.
676 tryPutMVar  :: MVar a -> a -> IO Bool
677 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
678     case tryPutMVar# mvar# x s# of
679         (# s, 0# #) -> (# s, False #)
680         (# s, _  #) -> (# s, True #)
681
682 -- |Check whether a given 'MVar' is empty.
683 --
684 -- Notice that the boolean value returned  is just a snapshot of
685 -- the state of the MVar. By the time you get to react on its result,
686 -- the MVar may have been filled (or emptied) - so be extremely
687 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
688 isEmptyMVar :: MVar a -> IO Bool
689 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
690     case isEmptyMVar# mv# s# of
691         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
692
693 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
694 -- "System.Mem.Weak" for more about finalizers.
695 addMVarFinalizer :: MVar a -> IO () -> IO ()
696 addMVarFinalizer (MVar m) finalizer = 
697   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
698 \end{code}
699
700
701 %************************************************************************
702 %*                                                                      *
703 \subsection{Thread waiting}
704 %*                                                                      *
705 %************************************************************************
706
707 \begin{code}
708 #ifdef mingw32_HOST_OS
709
710 -- Note: threadWaitRead and threadWaitWrite aren't really functional
711 -- on Win32, but left in there because lib code (still) uses them (the manner
712 -- in which they're used doesn't cause problems on a Win32 platform though.)
713
714 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
715 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
716   IO $ \s -> case asyncRead# fd isSock len buf s of 
717                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
718
719 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
720 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
721   IO $ \s -> case asyncWrite# fd isSock len buf s of 
722                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
723
724 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
725 asyncDoProc (FunPtr proc) (Ptr param) = 
726     -- the 'length' value is ignored; simplifies implementation of
727     -- the async*# primops to have them all return the same result.
728   IO $ \s -> case asyncDoProc# proc param s  of 
729                (# s', _len#, err# #) -> (# s', I# err# #)
730
731 -- to aid the use of these primops by the IO Handle implementation,
732 -- provide the following convenience funs:
733
734 -- this better be a pinned byte array!
735 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
736 asyncReadBA fd isSock len off bufB = 
737   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
738   
739 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
740 asyncWriteBA fd isSock len off bufB = 
741   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
742
743 #endif
744
745 -- -----------------------------------------------------------------------------
746 -- Thread IO API
747
748 -- | Block the current thread until data is available to read on the
749 -- given file descriptor (GHC only).
750 threadWaitRead :: Fd -> IO ()
751 threadWaitRead fd
752 #ifndef mingw32_HOST_OS
753   | threaded  = waitForReadEvent fd
754 #endif
755   | otherwise = IO $ \s -> 
756         case fromIntegral fd of { I# fd# ->
757         case waitRead# fd# s of { s' -> (# s', () #)
758         }}
759
760 -- | Block the current thread until data can be written to the
761 -- given file descriptor (GHC only).
762 threadWaitWrite :: Fd -> IO ()
763 threadWaitWrite fd
764 #ifndef mingw32_HOST_OS
765   | threaded  = waitForWriteEvent fd
766 #endif
767   | otherwise = IO $ \s -> 
768         case fromIntegral fd of { I# fd# ->
769         case waitWrite# fd# s of { s' -> (# s', () #)
770         }}
771
772 -- | Suspends the current thread for a given number of microseconds
773 -- (GHC only).
774 --
775 -- There is no guarantee that the thread will be rescheduled promptly
776 -- when the delay has expired, but the thread will never continue to
777 -- run /earlier/ than specified.
778 --
779 threadDelay :: Int -> IO ()
780 threadDelay time
781   | threaded  = waitForDelayEvent time
782   | otherwise = IO $ \s -> 
783         case fromIntegral time of { I# time# ->
784         case delay# time# s of { s' -> (# s', () #)
785         }}
786
787
788 -- | Set the value of returned TVar to True after a given number of
789 -- microseconds. The caveats associated with threadDelay also apply.
790 --
791 registerDelay :: Int -> IO (TVar Bool)
792 registerDelay usecs 
793   | threaded = waitForDelayEventSTM usecs
794   | otherwise = error "registerDelay: requires -threaded"
795
796 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
797
798 waitForDelayEvent :: Int -> IO ()
799 waitForDelayEvent usecs = do
800   m <- newEmptyMVar
801   target <- calculateTarget usecs
802   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
803   prodServiceThread
804   takeMVar m
805
806 -- Delays for use in STM
807 waitForDelayEventSTM :: Int -> IO (TVar Bool)
808 waitForDelayEventSTM usecs = do
809    t <- atomically $ newTVar False
810    target <- calculateTarget usecs
811    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
812    prodServiceThread
813    return t  
814     
815 calculateTarget :: Int -> IO USecs
816 calculateTarget usecs = do
817     now <- getUSecOfDay
818     return $ now + (fromIntegral usecs)
819
820
821 -- ----------------------------------------------------------------------------
822 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
823
824 -- In the threaded RTS, we employ a single IO Manager thread to wait
825 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
826 -- and delays (threadDelay).  
827 --
828 -- We can do this because in the threaded RTS the IO Manager can make
829 -- a non-blocking call to select(), so we don't have to do select() in
830 -- the scheduler as we have to in the non-threaded RTS.  We get performance
831 -- benefits from doing it this way, because we only have to restart the select()
832 -- when a new request arrives, rather than doing one select() each time
833 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
834 -- by not having to check for completed IO requests.
835
836 -- Issues, possible problems:
837 --
838 --      - we might want bound threads to just do the blocking
839 --        operation rather than communicating with the IO manager
840 --        thread.  This would prevent simgle-threaded programs which do
841 --        IO from requiring multiple OS threads.  However, it would also
842 --        prevent bound threads waiting on IO from being killed or sent
843 --        exceptions.
844 --
845 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
846 --        I couldn't repeat this.
847 --
848 --      - How do we handle signal delivery in the multithreaded RTS?
849 --
850 --      - forkProcess will kill the IO manager thread.  Let's just
851 --        hope we don't need to do any blocking IO between fork & exec.
852
853 #ifndef mingw32_HOST_OS
854 data IOReq
855   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
856   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
857 #endif
858
859 data DelayReq
860   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
861   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
862
863 #ifndef mingw32_HOST_OS
864 pendingEvents :: IORef [IOReq]
865 #endif
866 pendingDelays :: IORef [DelayReq]
867         -- could use a strict list or array here
868 {-# NOINLINE pendingEvents #-}
869 {-# NOINLINE pendingDelays #-}
870 (pendingEvents,pendingDelays) = unsafePerformIO $ do
871   startIOManagerThread
872   reqs <- newIORef []
873   dels <- newIORef []
874   return (reqs, dels)
875         -- the first time we schedule an IO request, the service thread
876         -- will be created (cool, huh?)
877
878 ensureIOManagerIsRunning :: IO ()
879 ensureIOManagerIsRunning 
880   | threaded  = seq pendingEvents $ return ()
881   | otherwise = return ()
882
883 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
884 insertDelay d [] = [d]
885 insertDelay d1 ds@(d2 : rest)
886   | delayTime d1 <= delayTime d2 = d1 : ds
887   | otherwise                    = d2 : insertDelay d1 rest
888
889 delayTime :: DelayReq -> USecs
890 delayTime (Delay t _) = t
891 delayTime (DelaySTM t _) = t
892
893 type USecs = Word64
894
895 -- XXX: move into GHC.IOBase from Data.IORef?
896 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
897 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
898
899 foreign import ccall unsafe "getUSecOfDay" 
900   getUSecOfDay :: IO USecs
901
902 prodding :: IORef Bool
903 {-# NOINLINE prodding #-}
904 prodding = unsafePerformIO (newIORef False)
905
906 prodServiceThread :: IO ()
907 prodServiceThread = do
908   was_set <- atomicModifyIORef prodding (\a -> (True,a))
909   if (not (was_set)) then wakeupIOManager else return ()
910
911 #ifdef mingw32_HOST_OS
912 -- ----------------------------------------------------------------------------
913 -- Windows IO manager thread
914
915 startIOManagerThread :: IO ()
916 startIOManagerThread = do
917   wakeup <- c_getIOManagerEvent
918   forkIO $ service_loop wakeup []
919   return ()
920
921 service_loop :: HANDLE          -- read end of pipe
922              -> [DelayReq]      -- current delay requests
923              -> IO ()
924
925 service_loop wakeup old_delays = do
926   -- pick up new delay requests
927   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
928   let  delays = foldr insertDelay old_delays new_delays
929
930   now <- getUSecOfDay
931   (delays', timeout) <- getDelay now delays
932
933   r <- c_WaitForSingleObject wakeup timeout
934   case r of
935     0xffffffff -> do c_maperrno; throwErrno "service_loop"
936     0 -> do
937         r2 <- c_readIOManagerEvent
938         exit <- 
939               case r2 of
940                 _ | r2 == io_MANAGER_WAKEUP -> return False
941                 _ | r2 == io_MANAGER_DIE    -> return True
942                 0 -> return False -- spurious wakeup
943                 _ -> do start_console_handler (r2 `shiftR` 1); return False
944         if exit
945           then return ()
946           else service_cont wakeup delays'
947
948     _other -> service_cont wakeup delays' -- probably timeout        
949
950 service_cont :: HANDLE -> [DelayReq] -> IO ()
951 service_cont wakeup delays = do
952   atomicModifyIORef prodding (\_ -> (False,False))
953   service_loop wakeup delays
954
955 -- must agree with rts/win32/ThrIOManager.c
956 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
957 io_MANAGER_WAKEUP = 0xffffffff
958 io_MANAGER_DIE    = 0xfffffffe
959
960 data ConsoleEvent
961  = ControlC
962  | Break
963  | Close
964     -- these are sent to Services only.
965  | Logoff
966  | Shutdown
967  deriving (Eq, Ord, Enum, Show, Read, Typeable)
968
969 start_console_handler :: Word32 -> IO ()
970 start_console_handler r =
971   case toWin32ConsoleEvent r of
972      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
973                     forkIO (handler x)
974                     return ()
975      Nothing -> return ()
976
977 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
978 toWin32ConsoleEvent ev = 
979    case ev of
980        0 {- CTRL_C_EVENT-}        -> Just ControlC
981        1 {- CTRL_BREAK_EVENT-}    -> Just Break
982        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
983        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
984        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
985        _ -> Nothing
986
987 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
988 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
989
990 -- XXX Is this actually needed?
991 stick :: IORef HANDLE
992 {-# NOINLINE stick #-}
993 stick = unsafePerformIO (newIORef nullPtr)
994
995 wakeupIOManager :: IO ()
996 wakeupIOManager = do 
997   _hdl <- readIORef stick
998   c_sendIOManagerEvent io_MANAGER_WAKEUP
999
1000 -- Walk the queue of pending delays, waking up any that have passed
1001 -- and return the smallest delay to wait for.  The queue of pending
1002 -- delays is kept ordered.
1003 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1004 getDelay _   [] = return ([], iNFINITE)
1005 getDelay now all@(d : rest) 
1006   = case d of
1007      Delay time m | now >= time -> do
1008         putMVar m ()
1009         getDelay now rest
1010      DelaySTM time t | now >= time -> do
1011         atomically $ writeTVar t True
1012         getDelay now rest
1013      _otherwise ->
1014         -- delay is in millisecs for WaitForSingleObject
1015         let micro_seconds = delayTime d - now
1016             milli_seconds = (micro_seconds + 999) `div` 1000
1017         in return (all, fromIntegral milli_seconds)
1018
1019 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1020 -- available yet.  We should move some Win32 functionality down here,
1021 -- maybe as part of the grand reorganisation of the base package...
1022 type HANDLE       = Ptr ()
1023 type DWORD        = Word32
1024
1025 iNFINITE :: DWORD
1026 iNFINITE = 0xFFFFFFFF -- urgh
1027
1028 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1029   c_getIOManagerEvent :: IO HANDLE
1030
1031 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1032   c_readIOManagerEvent :: IO Word32
1033
1034 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1035   c_sendIOManagerEvent :: Word32 -> IO ()
1036
1037 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1038    c_maperrno :: IO ()
1039
1040 foreign import stdcall "WaitForSingleObject"
1041    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1042
1043 #else
1044 -- ----------------------------------------------------------------------------
1045 -- Unix IO manager thread, using select()
1046
1047 startIOManagerThread :: IO ()
1048 startIOManagerThread = do
1049         allocaArray 2 $ \fds -> do
1050         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1051         rd_end <- peekElemOff fds 0
1052         wr_end <- peekElemOff fds 1
1053         setNonBlockingFD wr_end -- writes happen in a signal handler, we
1054                                 -- don't want them to block.
1055         setCloseOnExec rd_end
1056         setCloseOnExec wr_end
1057         writeIORef stick (fromIntegral wr_end)
1058         c_setIOManagerPipe wr_end
1059         forkIO $ do
1060             allocaBytes sizeofFdSet   $ \readfds -> do
1061             allocaBytes sizeofFdSet   $ \writefds -> do 
1062             allocaBytes sizeofTimeVal $ \timeval -> do
1063             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1064         return ()
1065
1066 service_loop
1067    :: Fd                -- listen to this for wakeup calls
1068    -> Ptr CFdSet
1069    -> Ptr CFdSet
1070    -> Ptr CTimeVal
1071    -> [IOReq]
1072    -> [DelayReq]
1073    -> IO ()
1074 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1075
1076   -- pick up new IO requests
1077   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1078   let reqs = new_reqs ++ old_reqs
1079
1080   -- pick up new delay requests
1081   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1082   let  delays0 = foldr insertDelay old_delays new_delays
1083
1084   -- build the FDSets for select()
1085   fdZero readfds
1086   fdZero writefds
1087   fdSet wakeup readfds
1088   maxfd <- buildFdSets 0 readfds writefds reqs
1089
1090   -- perform the select()
1091   let do_select delays = do
1092           -- check the current time and wake up any thread in
1093           -- threadDelay whose timeout has expired.  Also find the
1094           -- timeout value for the select() call.
1095           now <- getUSecOfDay
1096           (delays', timeout) <- getDelay now ptimeval delays
1097
1098           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1099                         nullPtr timeout
1100           if (res == -1)
1101              then do
1102                 err <- getErrno
1103                 case err of
1104                   _ | err == eINTR ->  do_select delays'
1105                         -- EINTR: just redo the select()
1106                   _ | err == eBADF ->  return (True, delays)
1107                         -- EBADF: one of the file descriptors is closed or bad,
1108                         -- we don't know which one, so wake everyone up.
1109                   _ | otherwise    ->  throwErrno "select"
1110                         -- otherwise (ENOMEM or EINVAL) something has gone
1111                         -- wrong; report the error.
1112              else
1113                 return (False,delays')
1114
1115   (wakeup_all,delays') <- do_select delays0
1116
1117   exit <-
1118     if wakeup_all then return False
1119       else do
1120         b <- fdIsSet wakeup readfds
1121         if b == 0 
1122           then return False
1123           else alloca $ \p -> do 
1124                  c_read (fromIntegral wakeup) p 1
1125                  s <- peek p            
1126                  case s of
1127                   _ | s == io_MANAGER_WAKEUP -> return False
1128                   _ | s == io_MANAGER_DIE    -> return True
1129                   _ | s == io_MANAGER_SYNC   -> do
1130                        mvars <- readIORef sync
1131                        mapM_ (flip putMVar ()) mvars
1132                        return False
1133                   _ -> do
1134                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1135                        withForeignPtr fp $ \p_siginfo -> do
1136                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1137                                  sizeof_siginfo_t
1138                          when (r /= fromIntegral sizeof_siginfo_t) $
1139                             error "failed to read siginfo_t"
1140                        runHandlers' fp (fromIntegral s)
1141                        return False
1142
1143   if exit then return () else do
1144
1145   atomicModifyIORef prodding (\_ -> (False,False))
1146
1147   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1148                          else completeRequests reqs readfds writefds []
1149
1150   service_loop wakeup readfds writefds ptimeval reqs' delays'
1151
1152 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar
1153 io_MANAGER_WAKEUP = 0xff
1154 io_MANAGER_DIE    = 0xfe
1155 io_MANAGER_SYNC   = 0xfd
1156
1157 -- | the stick is for poking the IO manager with
1158 stick :: IORef Fd
1159 {-# NOINLINE stick #-}
1160 stick = unsafePerformIO (newIORef 0)
1161
1162 {-# NOINLINE sync #-}
1163 sync :: IORef [MVar ()]
1164 sync = unsafePerformIO (newIORef [])
1165
1166 -- waits for the IO manager to drain the pipe
1167 syncIOManager :: IO ()
1168 syncIOManager = do
1169   m <- newEmptyMVar
1170   atomicModifyIORef sync (\old -> (m:old,()))
1171   fd <- readIORef stick
1172   with io_MANAGER_SYNC $ \pbuf -> do 
1173     c_write (fromIntegral fd) pbuf 1; return ()
1174   takeMVar m
1175
1176 wakeupIOManager :: IO ()
1177 wakeupIOManager = do
1178   fd <- readIORef stick
1179   with io_MANAGER_WAKEUP $ \pbuf -> do 
1180     c_write (fromIntegral fd) pbuf 1; return ()
1181
1182 -- For the non-threaded RTS
1183 runHandlers :: Ptr Word8 -> Int -> IO ()
1184 runHandlers p_info sig = do
1185   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1186   withForeignPtr fp $ \p -> do
1187     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1188     free p_info
1189   runHandlers' fp (fromIntegral sig)
1190
1191 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1192 runHandlers' p_info sig = do
1193   let int = fromIntegral sig
1194   withMVar signal_handlers $ \arr ->
1195       if not (inRange (boundsIOArray arr) int)
1196          then return ()
1197          else do handler <- unsafeReadIOArray arr int
1198                  case handler of
1199                     Nothing -> return ()
1200                     Just (f,_)  -> do forkIO (f p_info); return ()
1201
1202 foreign import ccall "setIOManagerPipe"
1203   c_setIOManagerPipe :: CInt -> IO ()
1204
1205 foreign import ccall "__hscore_sizeof_siginfo_t"
1206   sizeof_siginfo_t :: CSize
1207
1208 type Signal = CInt
1209
1210 maxSig = 64 :: Int
1211
1212 type HandlerFun = ForeignPtr Word8 -> IO ()
1213
1214 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1215 -- this race condition is #1922, although that bug was on Windows a similar
1216 -- bug also exists on Unix.
1217 {-# NOINLINE signal_handlers #-}
1218 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1219 signal_handlers = unsafePerformIO $ do
1220    arr <- newIOArray (0,maxSig) Nothing
1221    newMVar arr
1222
1223 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1224 setHandler sig handler = do
1225   let int = fromIntegral sig
1226   withMVar signal_handlers $ \arr -> 
1227      if not (inRange (boundsIOArray arr) int)
1228         then error "GHC.Conc.setHandler: signal out of range"
1229         else do old <- unsafeReadIOArray arr int
1230                 unsafeWriteIOArray arr int handler
1231                 return old
1232
1233 -- -----------------------------------------------------------------------------
1234 -- IO requests
1235
1236 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1237 buildFdSets maxfd _       _        [] = return maxfd
1238 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1239   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1240   | otherwise        =  do
1241         fdSet fd readfds
1242         buildFdSets (max maxfd fd) readfds writefds reqs
1243 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1244   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1245   | otherwise        =  do
1246         fdSet fd writefds
1247         buildFdSets (max maxfd fd) readfds writefds reqs
1248
1249 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1250                  -> IO [IOReq]
1251 completeRequests [] _ _ reqs' = return reqs'
1252 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1253   b <- fdIsSet fd readfds
1254   if b /= 0
1255     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1256     else completeRequests reqs readfds writefds (Read fd m : reqs')
1257 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1258   b <- fdIsSet fd writefds
1259   if b /= 0
1260     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1261     else completeRequests reqs readfds writefds (Write fd m : reqs')
1262
1263 wakeupAll :: [IOReq] -> IO ()
1264 wakeupAll [] = return ()
1265 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1266 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1267
1268 waitForReadEvent :: Fd -> IO ()
1269 waitForReadEvent fd = do
1270   m <- newEmptyMVar
1271   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1272   prodServiceThread
1273   takeMVar m
1274
1275 waitForWriteEvent :: Fd -> IO ()
1276 waitForWriteEvent fd = do
1277   m <- newEmptyMVar
1278   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1279   prodServiceThread
1280   takeMVar m
1281
1282 -- -----------------------------------------------------------------------------
1283 -- Delays
1284
1285 -- Walk the queue of pending delays, waking up any that have passed
1286 -- and return the smallest delay to wait for.  The queue of pending
1287 -- delays is kept ordered.
1288 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1289 getDelay _   _        [] = return ([],nullPtr)
1290 getDelay now ptimeval all@(d : rest) 
1291   = case d of
1292      Delay time m | now >= time -> do
1293         putMVar m ()
1294         getDelay now ptimeval rest
1295      DelaySTM time t | now >= time -> do
1296         atomically $ writeTVar t True
1297         getDelay now ptimeval rest
1298      _otherwise -> do
1299         setTimevalTicks ptimeval (delayTime d - now)
1300         return (all,ptimeval)
1301
1302 data CTimeVal
1303
1304 foreign import ccall unsafe "sizeofTimeVal"
1305   sizeofTimeVal :: Int
1306
1307 foreign import ccall unsafe "setTimevalTicks" 
1308   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1309
1310 {- 
1311   On Win32 we're going to have a single Pipe, and a
1312   waitForSingleObject with the delay time.  For signals, we send a
1313   byte down the pipe just like on Unix.
1314 -}
1315
1316 -- ----------------------------------------------------------------------------
1317 -- select() interface
1318
1319 -- ToDo: move to System.Posix.Internals?
1320
1321 data CFdSet
1322
1323 foreign import ccall safe "select"
1324   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1325            -> IO CInt
1326
1327 foreign import ccall unsafe "hsFD_SETSIZE"
1328   c_fD_SETSIZE :: CInt
1329
1330 fD_SETSIZE :: Fd
1331 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1332
1333 foreign import ccall unsafe "hsFD_ISSET"
1334   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1335
1336 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1337 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1338
1339 foreign import ccall unsafe "hsFD_SET"
1340   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1341
1342 fdSet :: Fd -> Ptr CFdSet -> IO ()
1343 fdSet (Fd fd) fdset = c_fdSet fd fdset
1344
1345 foreign import ccall unsafe "hsFD_ZERO"
1346   fdZero :: Ptr CFdSet -> IO ()
1347
1348 foreign import ccall unsafe "sizeof_fd_set"
1349   sizeofFdSet :: Int
1350
1351 #endif
1352
1353 reportStackOverflow :: IO a
1354 reportStackOverflow = do callStackOverflowHook; return undefined
1355
1356 reportError :: SomeException -> IO a
1357 reportError ex = do
1358    handler <- getUncaughtExceptionHandler
1359    handler ex
1360    return undefined
1361
1362 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1363 -- the unsafe below.
1364 foreign import ccall unsafe "stackOverflow"
1365         callStackOverflowHook :: IO ()
1366
1367 {-# NOINLINE uncaughtExceptionHandler #-}
1368 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1369 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1370    where
1371       defaultHandler :: SomeException -> IO ()
1372       defaultHandler se@(SomeException ex) = do
1373          (hFlush stdout) `catchAny` (\ _ -> return ())
1374          let msg = case cast ex of
1375                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1376                _ -> case cast ex of
1377                     Just (ErrorCall s) -> s
1378                     _                  -> showsPrec 0 se ""
1379          withCString "%s" $ \cfmt ->
1380           withCString msg $ \cmsg ->
1381             errorBelch cfmt cmsg
1382
1383 -- don't use errorBelch() directly, because we cannot call varargs functions
1384 -- using the FFI.
1385 foreign import ccall unsafe "HsBase.h errorBelch2"
1386    errorBelch :: CString -> CString -> IO ()
1387
1388 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1389 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1390
1391 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1392 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1393
1394
1395 withMVar :: MVar a -> (a -> IO b) -> IO b
1396 withMVar m io = 
1397   block $ do
1398     a <- takeMVar m
1399     b <- catchAny (unblock (io a))
1400             (\e -> do putMVar m a; throw e)
1401     putMVar m a
1402     return b
1403 \end{code}