13511259c455309f8ecd1062419bdb62ee3d02cc
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * MVars
54         , MVar(..)
55         , newMVar       -- :: a -> IO (MVar a)
56         , newEmptyMVar  -- :: IO (MVar a)
57         , takeMVar      -- :: MVar a -> IO a
58         , putMVar       -- :: MVar a -> a -> IO ()
59         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
60         , tryPutMVar    -- :: MVar a -> a -> IO Bool
61         , isEmptyMVar   -- :: MVar a -> IO Bool
62         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
63
64         -- * TVars
65         , STM(..)
66         , atomically    -- :: STM a -> IO a
67         , retry         -- :: STM a
68         , orElse        -- :: STM a -> STM a -> STM a
69         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
70         , alwaysSucceeds -- :: STM a -> STM ()
71         , always        -- :: STM Bool -> STM ()
72         , TVar(..)
73         , newTVar       -- :: a -> STM (TVar a)
74         , newTVarIO     -- :: a -> STM (TVar a)
75         , readTVar      -- :: TVar a -> STM a
76         , readTVarIO    -- :: TVar a -> IO a
77         , writeTVar     -- :: a -> TVar a -> STM ()
78         , unsafeIOToSTM -- :: IO a -> STM a
79
80         -- * Miscellaneous
81 #ifdef mingw32_HOST_OS
82         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
84         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85
86         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
88 #endif
89
90 #ifndef mingw32_HOST_OS
91         , Signal, HandlerFun, setHandler, runHandlers
92 #endif
93
94         , ensureIOManagerIsRunning
95 #ifndef mingw32_HOST_OS
96         , syncIOManager
97 #endif
98
99 #ifdef mingw32_HOST_OS
100         , ConsoleEvent(..)
101         , win32ConsoleHandler
102         , toWin32ConsoleEvent
103 #endif
104         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
105         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
106
107         , reportError, reportStackOverflow
108         ) where
109
110 import System.Posix.Types
111 #ifndef mingw32_HOST_OS
112 import System.Posix.Internals
113 #endif
114 import Foreign
115 import Foreign.C
116
117 #ifndef mingw32_HOST_OS
118 import Data.Dynamic
119 import Control.Monad
120 #endif
121 import Data.Maybe
122
123 import GHC.Base
124 import {-# SOURCE #-} GHC.Handle
125 import GHC.IOBase
126 import GHC.Num          ( Num(..) )
127 import GHC.Real         ( fromIntegral )
128 #ifndef mingw32_HOST_OS
129 import GHC.Arr          ( inRange )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Real         ( div )
133 import GHC.Ptr          ( plusPtr, FunPtr(..) )
134 #endif
135 #ifdef mingw32_HOST_OS
136 import GHC.Read         ( Read )
137 import GHC.Enum         ( Enum )
138 #endif
139 import GHC.Exception    ( SomeException(..), throw )
140 import GHC.Pack         ( packCString# )
141 import GHC.Ptr          ( Ptr(..) )
142 import GHC.STRef
143 import GHC.Show         ( Show(..), showString )
144 import Data.Typeable
145 import GHC.Err
146
147 infixr 0 `par`, `pseq`
148 \end{code}
149
150 %************************************************************************
151 %*                                                                      *
152 \subsection{@ThreadId@, @par@, and @fork@}
153 %*                                                                      *
154 %************************************************************************
155
156 \begin{code}
157 data ThreadId = ThreadId ThreadId# deriving( Typeable )
158 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
159 -- But since ThreadId# is unlifted, the Weak type must use open
160 -- type variables.
161 {- ^
162 A 'ThreadId' is an abstract type representing a handle to a thread.
163 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
164 the 'Ord' instance implements an arbitrary total ordering over
165 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
166 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
167 useful when debugging or diagnosing the behaviour of a concurrent
168 program.
169
170 /Note/: in GHC, if you have a 'ThreadId', you essentially have
171 a pointer to the thread itself.  This means the thread itself can\'t be
172 garbage collected until you drop the 'ThreadId'.
173 This misfeature will hopefully be corrected at a later date.
174
175 /Note/: Hugs does not provide any operations on other threads;
176 it defines 'ThreadId' as a synonym for ().
177 -}
178
179 instance Show ThreadId where
180    showsPrec d t = 
181         showString "ThreadId " . 
182         showsPrec d (getThreadId (id2TSO t))
183
184 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
185
186 id2TSO :: ThreadId -> ThreadId#
187 id2TSO (ThreadId t) = t
188
189 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 -- Returns -1, 0, 1
191
192 cmpThread :: ThreadId -> ThreadId -> Ordering
193 cmpThread t1 t2 = 
194    case cmp_thread (id2TSO t1) (id2TSO t2) of
195       -1 -> LT
196       0  -> EQ
197       _  -> GT -- must be 1
198
199 instance Eq ThreadId where
200    t1 == t2 = 
201       case t1 `cmpThread` t2 of
202          EQ -> True
203          _  -> False
204
205 instance Ord ThreadId where
206    compare = cmpThread
207
208 {- |
209 Sparks off a new thread to run the 'IO' computation passed as the
210 first argument, and returns the 'ThreadId' of the newly created
211 thread.
212
213 The new thread will be a lightweight thread; if you want to use a foreign
214 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
215
216 GHC note: the new thread inherits the /blocked/ state of the parent 
217 (see 'Control.Exception.block').
218
219 The newly created thread has an exception handler that discards the
220 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
221 'ThreadKilled', and passes all other exceptions to the uncaught
222 exception handler (see 'setUncaughtExceptionHandler').
223 -}
224 forkIO :: IO () -> IO ThreadId
225 forkIO action = IO $ \ s -> 
226    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
227  where
228   action_plus = catchException action childHandler
229
230 {- |
231 Like 'forkIO', but lets you specify on which CPU the thread is
232 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
233 will stay on the same CPU for its entire lifetime (`forkIO` threads
234 can migrate between CPUs according to the scheduling policy).
235 `forkOnIO` is useful for overriding the scheduling policy when you
236 know in advance how best to distribute the threads.
237
238 The `Int` argument specifies the CPU number; it is interpreted modulo
239 'numCapabilities' (note that it actually specifies a capability number
240 rather than a CPU number, but to a first approximation the two are
241 equivalent).
242 -}
243 forkOnIO :: Int -> IO () -> IO ThreadId
244 forkOnIO (I# cpu) action = IO $ \ s -> 
245    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
246  where
247   action_plus = catchException action childHandler
248
249 -- | the value passed to the @+RTS -N@ flag.  This is the number of
250 -- Haskell threads that can run truly simultaneously at any given
251 -- time, and is typically set to the number of physical CPU cores on
252 -- the machine.
253 numCapabilities :: Int
254 numCapabilities = unsafePerformIO $  do 
255                     n <- peek n_capabilities
256                     return (fromIntegral n)
257
258 #if defined(mingw32_HOST_OS) && defined(__PIC__)
259 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
260 #else
261 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
262 #endif
263 childHandler :: SomeException -> IO ()
264 childHandler err = catchException (real_handler err) childHandler
265
266 real_handler :: SomeException -> IO ()
267 real_handler se@(SomeException ex) =
268   -- ignore thread GC and killThread exceptions:
269   case cast ex of
270   Just BlockedOnDeadMVar                -> return ()
271   _ -> case cast ex of
272        Just BlockedIndefinitely         -> return ()
273        _ -> case cast ex of
274             Just ThreadKilled           -> return ()
275             _ -> case cast ex of
276                  -- report all others:
277                  Just StackOverflow     -> reportStackOverflow
278                  _                      -> reportError se
279
280 {- | 'killThread' terminates the given thread (GHC only).
281 Any work already done by the thread isn\'t
282 lost: the computation is suspended until required by another thread.
283 The memory used by the thread will be garbage collected if it isn\'t
284 referenced from anywhere.  The 'killThread' function is defined in
285 terms of 'throwTo':
286
287 > killThread tid = throwTo tid ThreadKilled
288
289 Killthread is a no-op if the target thread has already completed.
290 -}
291 killThread :: ThreadId -> IO ()
292 killThread tid = throwTo tid ThreadKilled
293
294 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
295
296 'throwTo' does not return until the exception has been raised in the
297 target thread. 
298 The calling thread can thus be certain that the target
299 thread has received the exception.  This is a useful property to know
300 when dealing with race conditions: eg. if there are two threads that
301 can kill each other, it is guaranteed that only one of the threads
302 will get to kill the other.
303
304 If the target thread is currently making a foreign call, then the
305 exception will not be raised (and hence 'throwTo' will not return)
306 until the call has completed.  This is the case regardless of whether
307 the call is inside a 'block' or not.
308
309 Important note: the behaviour of 'throwTo' differs from that described in
310 the paper \"Asynchronous exceptions in Haskell\"
311 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
312 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
313 a more synchronous design in which 'throwTo' does not return until the exception
314 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
315 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
316 the paper).
317
318 There is currently no guarantee that the exception delivered by 'throwTo' will be
319 delivered at the first possible opportunity.  In particular, a thread may 
320 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
321 a pending 'throwTo'.  This is arguably undesirable behaviour.
322
323  -}
324 throwTo :: Exception e => ThreadId -> e -> IO ()
325 throwTo (ThreadId tid) ex = IO $ \ s ->
326    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
327
328 -- | Returns the 'ThreadId' of the calling thread (GHC only).
329 myThreadId :: IO ThreadId
330 myThreadId = IO $ \s ->
331    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
332
333
334 -- |The 'yield' action allows (forces, in a co-operative multitasking
335 -- implementation) a context-switch to any other currently runnable
336 -- threads (if any), and is occasionally useful when implementing
337 -- concurrency abstractions.
338 yield :: IO ()
339 yield = IO $ \s -> 
340    case (yield# s) of s1 -> (# s1, () #)
341
342 {- | 'labelThread' stores a string as identifier for this thread if
343 you built a RTS with debugging support. This identifier will be used in
344 the debugging output to make distinction of different threads easier
345 (otherwise you only have the thread state object\'s address in the heap).
346
347 Other applications like the graphical Concurrent Haskell Debugger
348 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
349 'labelThread' for their purposes as well.
350 -}
351
352 labelThread :: ThreadId -> String -> IO ()
353 labelThread (ThreadId t) str = IO $ \ s ->
354    let ps  = packCString# str
355        adr = byteArrayContents# ps in
356      case (labelThread# t adr s) of s1 -> (# s1, () #)
357
358 --      Nota Bene: 'pseq' used to be 'seq'
359 --                 but 'seq' is now defined in PrelGHC
360 --
361 -- "pseq" is defined a bit weirdly (see below)
362 --
363 -- The reason for the strange "lazy" call is that
364 -- it fools the compiler into thinking that pseq  and par are non-strict in
365 -- their second argument (even if it inlines pseq at the call site).
366 -- If it thinks pseq is strict in "y", then it often evaluates
367 -- "y" before "x", which is totally wrong.  
368
369 {-# INLINE pseq  #-}
370 pseq :: a -> b -> b
371 pseq  x y = x `seq` lazy y
372
373 {-# INLINE par  #-}
374 par :: a -> b -> b
375 par  x y = case (par# x) of { _ -> lazy y }
376
377 -- | Internal function used by the RTS to run sparks.
378 runSparks :: IO ()
379 runSparks = IO loop
380   where loop s = case getSpark# s of
381                    (# s', n, p #) ->
382                       if n ==# 0# then (# s', () #)
383                                   else p `seq` loop s'
384
385 data BlockReason
386   = BlockedOnMVar
387         -- ^blocked on on 'MVar'
388   | BlockedOnBlackHole
389         -- ^blocked on a computation in progress by another thread
390   | BlockedOnException
391         -- ^blocked in 'throwTo'
392   | BlockedOnSTM
393         -- ^blocked in 'retry' in an STM transaction
394   | BlockedOnForeignCall
395         -- ^currently in a foreign call
396   | BlockedOnOther
397         -- ^blocked on some other resource.  Without @-threaded@,
398         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
399         -- they show up as 'BlockedOnMVar'.
400   deriving (Eq,Ord,Show)
401
402 -- | The current status of a thread
403 data ThreadStatus
404   = ThreadRunning
405         -- ^the thread is currently runnable or running
406   | ThreadFinished
407         -- ^the thread has finished
408   | ThreadBlocked  BlockReason
409         -- ^the thread is blocked on some resource
410   | ThreadDied
411         -- ^the thread received an uncaught exception
412   deriving (Eq,Ord,Show)
413
414 threadStatus :: ThreadId -> IO ThreadStatus
415 threadStatus (ThreadId t) = IO $ \s ->
416    case threadStatus# t s of
417      (# s', stat #) -> (# s', mk_stat (I# stat) #)
418    where
419         -- NB. keep these in sync with includes/Constants.h
420      mk_stat 0  = ThreadRunning
421      mk_stat 1  = ThreadBlocked BlockedOnMVar
422      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
423      mk_stat 3  = ThreadBlocked BlockedOnException
424      mk_stat 7  = ThreadBlocked BlockedOnSTM
425      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
426      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
427      mk_stat 16 = ThreadFinished
428      mk_stat 17 = ThreadDied
429      mk_stat _  = ThreadBlocked BlockedOnOther
430 \end{code}
431
432
433 %************************************************************************
434 %*                                                                      *
435 \subsection[stm]{Transactional heap operations}
436 %*                                                                      *
437 %************************************************************************
438
439 TVars are shared memory locations which support atomic memory
440 transactions.
441
442 \begin{code}
443 -- |A monad supporting atomic memory transactions.
444 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
445
446 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
447 unSTM (STM a) = a
448
449 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
450
451 instance  Functor STM where
452    fmap f x = x >>= (return . f)
453
454 instance  Monad STM  where
455     {-# INLINE return #-}
456     {-# INLINE (>>)   #-}
457     {-# INLINE (>>=)  #-}
458     m >> k      = thenSTM m k
459     return x    = returnSTM x
460     m >>= k     = bindSTM m k
461
462 bindSTM :: STM a -> (a -> STM b) -> STM b
463 bindSTM (STM m) k = STM ( \s ->
464   case m s of 
465     (# new_s, a #) -> unSTM (k a) new_s
466   )
467
468 thenSTM :: STM a -> STM b -> STM b
469 thenSTM (STM m) k = STM ( \s ->
470   case m s of 
471     (# new_s, _ #) -> unSTM k new_s
472   )
473
474 returnSTM :: a -> STM a
475 returnSTM x = STM (\s -> (# s, x #))
476
477 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
478 -- dangerous thing to do.  
479 --
480 --   * The STM implementation will often run transactions multiple
481 --     times, so you need to be prepared for this if your IO has any
482 --     side effects.
483 --
484 --   * The STM implementation will abort transactions that are known to
485 --     be invalid and need to be restarted.  This may happen in the middle
486 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
487 --     that need releasing (exception handlers are ignored when aborting
488 --     the transaction).  That includes doing any IO using Handles, for
489 --     example.  Getting this wrong will probably lead to random deadlocks.
490 --
491 --   * The transaction may have seen an inconsistent view of memory when
492 --     the IO runs.  Invariants that you expect to be true throughout
493 --     your program may not be true inside a transaction, due to the
494 --     way transactions are implemented.  Normally this wouldn't be visible
495 --     to the programmer, but using `unsafeIOToSTM` can expose it.
496 --
497 unsafeIOToSTM :: IO a -> STM a
498 unsafeIOToSTM (IO m) = STM m
499
500 -- |Perform a series of STM actions atomically.
501 --
502 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
503 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
504 -- this would effectively allow a transaction inside a transaction, depending
505 -- on exactly when the thunk is evaluated.)
506 --
507 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
508 -- and which allows top-level TVars to be allocated.
509
510 atomically :: STM a -> IO a
511 atomically (STM m) = IO (\s -> (atomically# m) s )
512
513 -- |Retry execution of the current memory transaction because it has seen
514 -- values in TVars which mean that it should not continue (e.g. the TVars
515 -- represent a shared buffer that is now empty).  The implementation may
516 -- block the thread until one of the TVars that it has read from has been
517 -- udpated. (GHC only)
518 retry :: STM a
519 retry = STM $ \s# -> retry# s#
520
521 -- |Compose two alternative STM actions (GHC only).  If the first action
522 -- completes without retrying then it forms the result of the orElse.
523 -- Otherwise, if the first action retries, then the second action is
524 -- tried in its place.  If both actions retry then the orElse as a
525 -- whole retries.
526 orElse :: STM a -> STM a -> STM a
527 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
528
529 -- |Exception handling within STM actions.
530 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
531 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
532
533 -- | Low-level primitive on which always and alwaysSucceeds are built.
534 -- checkInv differs form these in that (i) the invariant is not 
535 -- checked when checkInv is called, only at the end of this and
536 -- subsequent transcations, (ii) the invariant failure is indicated
537 -- by raising an exception.
538 checkInv :: STM a -> STM ()
539 checkInv (STM m) = STM (\s -> (check# m) s)
540
541 -- | alwaysSucceeds adds a new invariant that must be true when passed
542 -- to alwaysSucceeds, at the end of the current transaction, and at
543 -- the end of every subsequent transaction.  If it fails at any
544 -- of those points then the transaction violating it is aborted
545 -- and the exception raised by the invariant is propagated.
546 alwaysSucceeds :: STM a -> STM ()
547 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
548                       checkInv i
549
550 -- | always is a variant of alwaysSucceeds in which the invariant is
551 -- expressed as an STM Bool action that must return True.  Returning
552 -- False or raising an exception are both treated as invariant failures.
553 always :: STM Bool -> STM ()
554 always i = alwaysSucceeds ( do v <- i
555                                if (v) then return () else ( error "Transacional invariant violation" ) )
556
557 -- |Shared memory locations that support atomic memory transactions.
558 data TVar a = TVar (TVar# RealWorld a)
559
560 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
561
562 instance Eq (TVar a) where
563         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
564
565 -- |Create a new TVar holding a value supplied
566 newTVar :: a -> STM (TVar a)
567 newTVar val = STM $ \s1# ->
568     case newTVar# val s1# of
569          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
570
571 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
572 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
573 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
574 -- possible.
575 newTVarIO :: a -> IO (TVar a)
576 newTVarIO val = IO $ \s1# ->
577     case newTVar# val s1# of
578          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
579
580 -- |Return the current value stored in a TVar.
581 -- This is equivalent to
582 --
583 -- >  readTVarIO = atomically . readTVar
584 --
585 -- but works much faster, because it doesn't perform a complete
586 -- transaction, it just reads the current value of the 'TVar'.
587 readTVarIO :: TVar a -> IO a
588 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
589
590 -- |Return the current value stored in a TVar
591 readTVar :: TVar a -> STM a
592 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
593
594 -- |Write the supplied value into a TVar
595 writeTVar :: TVar a -> a -> STM ()
596 writeTVar (TVar tvar#) val = STM $ \s1# ->
597     case writeTVar# tvar# val s1# of
598          s2# -> (# s2#, () #)
599   
600 \end{code}
601
602 %************************************************************************
603 %*                                                                      *
604 \subsection[mvars]{M-Structures}
605 %*                                                                      *
606 %************************************************************************
607
608 M-Vars are rendezvous points for concurrent threads.  They begin
609 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
610 is written, a single blocked thread may be freed.  Reading an M-Var
611 toggles its state from full back to empty.  Therefore, any value
612 written to an M-Var may only be read once.  Multiple reads and writes
613 are allowed, but there must be at least one read between any two
614 writes.
615
616 \begin{code}
617 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
618
619 -- |Create an 'MVar' which is initially empty.
620 newEmptyMVar  :: IO (MVar a)
621 newEmptyMVar = IO $ \ s# ->
622     case newMVar# s# of
623          (# s2#, svar# #) -> (# s2#, MVar svar# #)
624
625 -- |Create an 'MVar' which contains the supplied value.
626 newMVar :: a -> IO (MVar a)
627 newMVar value =
628     newEmptyMVar        >>= \ mvar ->
629     putMVar mvar value  >>
630     return mvar
631
632 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
633 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
634 -- the 'MVar' is left empty.
635 -- 
636 -- There are two further important properties of 'takeMVar':
637 --
638 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
639 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
640 --     only one thread will be woken up.  The runtime guarantees that
641 --     the woken thread completes its 'takeMVar' operation.
642 --
643 --   * When multiple threads are blocked on an 'MVar', they are
644 --     woken up in FIFO order.  This is useful for providing
645 --     fairness properties of abstractions built using 'MVar's.
646 --
647 takeMVar :: MVar a -> IO a
648 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
649
650 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
651 -- 'putMVar' will wait until it becomes empty.
652 --
653 -- There are two further important properties of 'putMVar':
654 --
655 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
656 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
657 --     only one thread will be woken up.  The runtime guarantees that
658 --     the woken thread completes its 'putMVar' operation.
659 --
660 --   * When multiple threads are blocked on an 'MVar', they are
661 --     woken up in FIFO order.  This is useful for providing
662 --     fairness properties of abstractions built using 'MVar's.
663 --
664 putMVar  :: MVar a -> a -> IO ()
665 putMVar (MVar mvar#) x = IO $ \ s# ->
666     case putMVar# mvar# x s# of
667         s2# -> (# s2#, () #)
668
669 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
670 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
671 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
672 -- the 'MVar' is left empty.
673 tryTakeMVar :: MVar a -> IO (Maybe a)
674 tryTakeMVar (MVar m) = IO $ \ s ->
675     case tryTakeMVar# m s of
676         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
677         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
678
679 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
680 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
681 -- it was successful, or 'False' otherwise.
682 tryPutMVar  :: MVar a -> a -> IO Bool
683 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
684     case tryPutMVar# mvar# x s# of
685         (# s, 0# #) -> (# s, False #)
686         (# s, _  #) -> (# s, True #)
687
688 -- |Check whether a given 'MVar' is empty.
689 --
690 -- Notice that the boolean value returned  is just a snapshot of
691 -- the state of the MVar. By the time you get to react on its result,
692 -- the MVar may have been filled (or emptied) - so be extremely
693 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
694 isEmptyMVar :: MVar a -> IO Bool
695 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
696     case isEmptyMVar# mv# s# of
697         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
698
699 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
700 -- "System.Mem.Weak" for more about finalizers.
701 addMVarFinalizer :: MVar a -> IO () -> IO ()
702 addMVarFinalizer (MVar m) finalizer = 
703   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
704 \end{code}
705
706
707 %************************************************************************
708 %*                                                                      *
709 \subsection{Thread waiting}
710 %*                                                                      *
711 %************************************************************************
712
713 \begin{code}
714 #ifdef mingw32_HOST_OS
715
716 -- Note: threadWaitRead and threadWaitWrite aren't really functional
717 -- on Win32, but left in there because lib code (still) uses them (the manner
718 -- in which they're used doesn't cause problems on a Win32 platform though.)
719
720 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
721 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
722   IO $ \s -> case asyncRead# fd isSock len buf s of 
723                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
724
725 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
726 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
727   IO $ \s -> case asyncWrite# fd isSock len buf s of 
728                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
729
730 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
731 asyncDoProc (FunPtr proc) (Ptr param) = 
732     -- the 'length' value is ignored; simplifies implementation of
733     -- the async*# primops to have them all return the same result.
734   IO $ \s -> case asyncDoProc# proc param s  of 
735                (# s', _len#, err# #) -> (# s', I# err# #)
736
737 -- to aid the use of these primops by the IO Handle implementation,
738 -- provide the following convenience funs:
739
740 -- this better be a pinned byte array!
741 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
742 asyncReadBA fd isSock len off bufB = 
743   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
744   
745 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
746 asyncWriteBA fd isSock len off bufB = 
747   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
748
749 #endif
750
751 -- -----------------------------------------------------------------------------
752 -- Thread IO API
753
754 -- | Block the current thread until data is available to read on the
755 -- given file descriptor (GHC only).
756 threadWaitRead :: Fd -> IO ()
757 threadWaitRead fd
758 #ifndef mingw32_HOST_OS
759   | threaded  = waitForReadEvent fd
760 #endif
761   | otherwise = IO $ \s -> 
762         case fromIntegral fd of { I# fd# ->
763         case waitRead# fd# s of { s' -> (# s', () #)
764         }}
765
766 -- | Block the current thread until data can be written to the
767 -- given file descriptor (GHC only).
768 threadWaitWrite :: Fd -> IO ()
769 threadWaitWrite fd
770 #ifndef mingw32_HOST_OS
771   | threaded  = waitForWriteEvent fd
772 #endif
773   | otherwise = IO $ \s -> 
774         case fromIntegral fd of { I# fd# ->
775         case waitWrite# fd# s of { s' -> (# s', () #)
776         }}
777
778 -- | Suspends the current thread for a given number of microseconds
779 -- (GHC only).
780 --
781 -- There is no guarantee that the thread will be rescheduled promptly
782 -- when the delay has expired, but the thread will never continue to
783 -- run /earlier/ than specified.
784 --
785 threadDelay :: Int -> IO ()
786 threadDelay time
787   | threaded  = waitForDelayEvent time
788   | otherwise = IO $ \s -> 
789         case fromIntegral time of { I# time# ->
790         case delay# time# s of { s' -> (# s', () #)
791         }}
792
793
794 -- | Set the value of returned TVar to True after a given number of
795 -- microseconds. The caveats associated with threadDelay also apply.
796 --
797 registerDelay :: Int -> IO (TVar Bool)
798 registerDelay usecs 
799   | threaded = waitForDelayEventSTM usecs
800   | otherwise = error "registerDelay: requires -threaded"
801
802 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
803
804 waitForDelayEvent :: Int -> IO ()
805 waitForDelayEvent usecs = do
806   m <- newEmptyMVar
807   target <- calculateTarget usecs
808   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
809   prodServiceThread
810   takeMVar m
811
812 -- Delays for use in STM
813 waitForDelayEventSTM :: Int -> IO (TVar Bool)
814 waitForDelayEventSTM usecs = do
815    t <- atomically $ newTVar False
816    target <- calculateTarget usecs
817    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
818    prodServiceThread
819    return t  
820     
821 calculateTarget :: Int -> IO USecs
822 calculateTarget usecs = do
823     now <- getUSecOfDay
824     return $ now + (fromIntegral usecs)
825
826
827 -- ----------------------------------------------------------------------------
828 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
829
830 -- In the threaded RTS, we employ a single IO Manager thread to wait
831 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
832 -- and delays (threadDelay).  
833 --
834 -- We can do this because in the threaded RTS the IO Manager can make
835 -- a non-blocking call to select(), so we don't have to do select() in
836 -- the scheduler as we have to in the non-threaded RTS.  We get performance
837 -- benefits from doing it this way, because we only have to restart the select()
838 -- when a new request arrives, rather than doing one select() each time
839 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
840 -- by not having to check for completed IO requests.
841
842 -- Issues, possible problems:
843 --
844 --      - we might want bound threads to just do the blocking
845 --        operation rather than communicating with the IO manager
846 --        thread.  This would prevent simgle-threaded programs which do
847 --        IO from requiring multiple OS threads.  However, it would also
848 --        prevent bound threads waiting on IO from being killed or sent
849 --        exceptions.
850 --
851 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
852 --        I couldn't repeat this.
853 --
854 --      - How do we handle signal delivery in the multithreaded RTS?
855 --
856 --      - forkProcess will kill the IO manager thread.  Let's just
857 --        hope we don't need to do any blocking IO between fork & exec.
858
859 #ifndef mingw32_HOST_OS
860 data IOReq
861   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
862   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
863 #endif
864
865 data DelayReq
866   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
867   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
868
869 #ifndef mingw32_HOST_OS
870 pendingEvents :: IORef [IOReq]
871 #endif
872 pendingDelays :: IORef [DelayReq]
873         -- could use a strict list or array here
874 {-# NOINLINE pendingEvents #-}
875 {-# NOINLINE pendingDelays #-}
876 (pendingEvents,pendingDelays) = unsafePerformIO $ do
877   startIOManagerThread
878   reqs <- newIORef []
879   dels <- newIORef []
880   return (reqs, dels)
881         -- the first time we schedule an IO request, the service thread
882         -- will be created (cool, huh?)
883
884 ensureIOManagerIsRunning :: IO ()
885 ensureIOManagerIsRunning 
886   | threaded  = seq pendingEvents $ return ()
887   | otherwise = return ()
888
889 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
890 insertDelay d [] = [d]
891 insertDelay d1 ds@(d2 : rest)
892   | delayTime d1 <= delayTime d2 = d1 : ds
893   | otherwise                    = d2 : insertDelay d1 rest
894
895 delayTime :: DelayReq -> USecs
896 delayTime (Delay t _) = t
897 delayTime (DelaySTM t _) = t
898
899 type USecs = Word64
900
901 -- XXX: move into GHC.IOBase from Data.IORef?
902 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
903 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
904
905 foreign import ccall unsafe "getUSecOfDay" 
906   getUSecOfDay :: IO USecs
907
908 prodding :: IORef Bool
909 {-# NOINLINE prodding #-}
910 prodding = unsafePerformIO (newIORef False)
911
912 prodServiceThread :: IO ()
913 prodServiceThread = do
914   was_set <- atomicModifyIORef prodding (\a -> (True,a))
915   if (not (was_set)) then wakeupIOManager else return ()
916
917 #ifdef mingw32_HOST_OS
918 -- ----------------------------------------------------------------------------
919 -- Windows IO manager thread
920
921 startIOManagerThread :: IO ()
922 startIOManagerThread = do
923   wakeup <- c_getIOManagerEvent
924   forkIO $ service_loop wakeup []
925   return ()
926
927 service_loop :: HANDLE          -- read end of pipe
928              -> [DelayReq]      -- current delay requests
929              -> IO ()
930
931 service_loop wakeup old_delays = do
932   -- pick up new delay requests
933   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
934   let  delays = foldr insertDelay old_delays new_delays
935
936   now <- getUSecOfDay
937   (delays', timeout) <- getDelay now delays
938
939   r <- c_WaitForSingleObject wakeup timeout
940   case r of
941     0xffffffff -> do c_maperrno; throwErrno "service_loop"
942     0 -> do
943         r2 <- c_readIOManagerEvent
944         exit <- 
945               case r2 of
946                 _ | r2 == io_MANAGER_WAKEUP -> return False
947                 _ | r2 == io_MANAGER_DIE    -> return True
948                 0 -> return False -- spurious wakeup
949                 _ -> do start_console_handler (r2 `shiftR` 1); return False
950         if exit
951           then return ()
952           else service_cont wakeup delays'
953
954     _other -> service_cont wakeup delays' -- probably timeout        
955
956 service_cont :: HANDLE -> [DelayReq] -> IO ()
957 service_cont wakeup delays = do
958   atomicModifyIORef prodding (\_ -> (False,False))
959   service_loop wakeup delays
960
961 -- must agree with rts/win32/ThrIOManager.c
962 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
963 io_MANAGER_WAKEUP = 0xffffffff
964 io_MANAGER_DIE    = 0xfffffffe
965
966 data ConsoleEvent
967  = ControlC
968  | Break
969  | Close
970     -- these are sent to Services only.
971  | Logoff
972  | Shutdown
973  deriving (Eq, Ord, Enum, Show, Read, Typeable)
974
975 start_console_handler :: Word32 -> IO ()
976 start_console_handler r =
977   case toWin32ConsoleEvent r of
978      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
979                     forkIO (handler x)
980                     return ()
981      Nothing -> return ()
982
983 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
984 toWin32ConsoleEvent ev = 
985    case ev of
986        0 {- CTRL_C_EVENT-}        -> Just ControlC
987        1 {- CTRL_BREAK_EVENT-}    -> Just Break
988        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
989        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
990        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
991        _ -> Nothing
992
993 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
994 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
995
996 -- XXX Is this actually needed?
997 stick :: IORef HANDLE
998 {-# NOINLINE stick #-}
999 stick = unsafePerformIO (newIORef nullPtr)
1000
1001 wakeupIOManager :: IO ()
1002 wakeupIOManager = do 
1003   _hdl <- readIORef stick
1004   c_sendIOManagerEvent io_MANAGER_WAKEUP
1005
1006 -- Walk the queue of pending delays, waking up any that have passed
1007 -- and return the smallest delay to wait for.  The queue of pending
1008 -- delays is kept ordered.
1009 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1010 getDelay _   [] = return ([], iNFINITE)
1011 getDelay now all@(d : rest) 
1012   = case d of
1013      Delay time m | now >= time -> do
1014         putMVar m ()
1015         getDelay now rest
1016      DelaySTM time t | now >= time -> do
1017         atomically $ writeTVar t True
1018         getDelay now rest
1019      _otherwise ->
1020         -- delay is in millisecs for WaitForSingleObject
1021         let micro_seconds = delayTime d - now
1022             milli_seconds = (micro_seconds + 999) `div` 1000
1023         in return (all, fromIntegral milli_seconds)
1024
1025 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1026 -- available yet.  We should move some Win32 functionality down here,
1027 -- maybe as part of the grand reorganisation of the base package...
1028 type HANDLE       = Ptr ()
1029 type DWORD        = Word32
1030
1031 iNFINITE :: DWORD
1032 iNFINITE = 0xFFFFFFFF -- urgh
1033
1034 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1035   c_getIOManagerEvent :: IO HANDLE
1036
1037 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1038   c_readIOManagerEvent :: IO Word32
1039
1040 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1041   c_sendIOManagerEvent :: Word32 -> IO ()
1042
1043 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1044    c_maperrno :: IO ()
1045
1046 foreign import stdcall "WaitForSingleObject"
1047    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1048
1049 #else
1050 -- ----------------------------------------------------------------------------
1051 -- Unix IO manager thread, using select()
1052
1053 startIOManagerThread :: IO ()
1054 startIOManagerThread = do
1055         allocaArray 2 $ \fds -> do
1056         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1057         rd_end <- peekElemOff fds 0
1058         wr_end <- peekElemOff fds 1
1059         setNonBlockingFD wr_end -- writes happen in a signal handler, we
1060                                 -- don't want them to block.
1061         setCloseOnExec rd_end
1062         setCloseOnExec wr_end
1063         writeIORef stick (fromIntegral wr_end)
1064         c_setIOManagerPipe wr_end
1065         forkIO $ do
1066             allocaBytes sizeofFdSet   $ \readfds -> do
1067             allocaBytes sizeofFdSet   $ \writefds -> do 
1068             allocaBytes sizeofTimeVal $ \timeval -> do
1069             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1070         return ()
1071
1072 service_loop
1073    :: Fd                -- listen to this for wakeup calls
1074    -> Ptr CFdSet
1075    -> Ptr CFdSet
1076    -> Ptr CTimeVal
1077    -> [IOReq]
1078    -> [DelayReq]
1079    -> IO ()
1080 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1081
1082   -- pick up new IO requests
1083   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1084   let reqs = new_reqs ++ old_reqs
1085
1086   -- pick up new delay requests
1087   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1088   let  delays0 = foldr insertDelay old_delays new_delays
1089
1090   -- build the FDSets for select()
1091   fdZero readfds
1092   fdZero writefds
1093   fdSet wakeup readfds
1094   maxfd <- buildFdSets 0 readfds writefds reqs
1095
1096   -- perform the select()
1097   let do_select delays = do
1098           -- check the current time and wake up any thread in
1099           -- threadDelay whose timeout has expired.  Also find the
1100           -- timeout value for the select() call.
1101           now <- getUSecOfDay
1102           (delays', timeout) <- getDelay now ptimeval delays
1103
1104           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1105                         nullPtr timeout
1106           if (res == -1)
1107              then do
1108                 err <- getErrno
1109                 case err of
1110                   _ | err == eINTR ->  do_select delays'
1111                         -- EINTR: just redo the select()
1112                   _ | err == eBADF ->  return (True, delays)
1113                         -- EBADF: one of the file descriptors is closed or bad,
1114                         -- we don't know which one, so wake everyone up.
1115                   _ | otherwise    ->  throwErrno "select"
1116                         -- otherwise (ENOMEM or EINVAL) something has gone
1117                         -- wrong; report the error.
1118              else
1119                 return (False,delays')
1120
1121   (wakeup_all,delays') <- do_select delays0
1122
1123   exit <-
1124     if wakeup_all then return False
1125       else do
1126         b <- fdIsSet wakeup readfds
1127         if b == 0 
1128           then return False
1129           else alloca $ \p -> do 
1130                  c_read (fromIntegral wakeup) p 1
1131                  s <- peek p            
1132                  case s of
1133                   _ | s == io_MANAGER_WAKEUP -> return False
1134                   _ | s == io_MANAGER_DIE    -> return True
1135                   _ | s == io_MANAGER_SYNC   -> do
1136                        mvars <- readIORef sync
1137                        mapM_ (flip putMVar ()) mvars
1138                        return False
1139                   _ -> do
1140                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1141                        withForeignPtr fp $ \p_siginfo -> do
1142                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1143                                  sizeof_siginfo_t
1144                          when (r /= fromIntegral sizeof_siginfo_t) $
1145                             error "failed to read siginfo_t"
1146                        runHandlers' fp (fromIntegral s)
1147                        return False
1148
1149   if exit then return () else do
1150
1151   atomicModifyIORef prodding (\_ -> (False,False))
1152
1153   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1154                          else completeRequests reqs readfds writefds []
1155
1156   service_loop wakeup readfds writefds ptimeval reqs' delays'
1157
1158 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar
1159 io_MANAGER_WAKEUP = 0xff
1160 io_MANAGER_DIE    = 0xfe
1161 io_MANAGER_SYNC   = 0xfd
1162
1163 -- | the stick is for poking the IO manager with
1164 stick :: IORef Fd
1165 {-# NOINLINE stick #-}
1166 stick = unsafePerformIO (newIORef 0)
1167
1168 {-# NOINLINE sync #-}
1169 sync :: IORef [MVar ()]
1170 sync = unsafePerformIO (newIORef [])
1171
1172 -- waits for the IO manager to drain the pipe
1173 syncIOManager :: IO ()
1174 syncIOManager = do
1175   m <- newEmptyMVar
1176   atomicModifyIORef sync (\old -> (m:old,()))
1177   fd <- readIORef stick
1178   with io_MANAGER_SYNC $ \pbuf -> do 
1179     c_write (fromIntegral fd) pbuf 1; return ()
1180   takeMVar m
1181
1182 wakeupIOManager :: IO ()
1183 wakeupIOManager = do
1184   fd <- readIORef stick
1185   with io_MANAGER_WAKEUP $ \pbuf -> do 
1186     c_write (fromIntegral fd) pbuf 1; return ()
1187
1188 -- For the non-threaded RTS
1189 runHandlers :: Ptr Word8 -> Int -> IO ()
1190 runHandlers p_info sig = do
1191   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1192   withForeignPtr fp $ \p -> do
1193     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1194     free p_info
1195   runHandlers' fp (fromIntegral sig)
1196
1197 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1198 runHandlers' p_info sig = do
1199   let int = fromIntegral sig
1200   withMVar signal_handlers $ \arr ->
1201       if not (inRange (boundsIOArray arr) int)
1202          then return ()
1203          else do handler <- unsafeReadIOArray arr int
1204                  case handler of
1205                     Nothing -> return ()
1206                     Just (f,_)  -> do forkIO (f p_info); return ()
1207
1208 foreign import ccall "setIOManagerPipe"
1209   c_setIOManagerPipe :: CInt -> IO ()
1210
1211 foreign import ccall "__hscore_sizeof_siginfo_t"
1212   sizeof_siginfo_t :: CSize
1213
1214 type Signal = CInt
1215
1216 maxSig = 64 :: Int
1217
1218 type HandlerFun = ForeignPtr Word8 -> IO ()
1219
1220 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1221 -- this race condition is #1922, although that bug was on Windows a similar
1222 -- bug also exists on Unix.
1223 {-# NOINLINE signal_handlers #-}
1224 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1225 signal_handlers = unsafePerformIO $ do
1226    arr <- newIOArray (0,maxSig) Nothing
1227    newMVar arr
1228
1229 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1230 setHandler sig handler = do
1231   let int = fromIntegral sig
1232   withMVar signal_handlers $ \arr -> 
1233      if not (inRange (boundsIOArray arr) int)
1234         then error "GHC.Conc.setHandler: signal out of range"
1235         else do old <- unsafeReadIOArray arr int
1236                 unsafeWriteIOArray arr int handler
1237                 return old
1238
1239 -- -----------------------------------------------------------------------------
1240 -- IO requests
1241
1242 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1243 buildFdSets maxfd _       _        [] = return maxfd
1244 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1245   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1246   | otherwise        =  do
1247         fdSet fd readfds
1248         buildFdSets (max maxfd fd) readfds writefds reqs
1249 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1250   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1251   | otherwise        =  do
1252         fdSet fd writefds
1253         buildFdSets (max maxfd fd) readfds writefds reqs
1254
1255 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1256                  -> IO [IOReq]
1257 completeRequests [] _ _ reqs' = return reqs'
1258 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1259   b <- fdIsSet fd readfds
1260   if b /= 0
1261     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1262     else completeRequests reqs readfds writefds (Read fd m : reqs')
1263 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1264   b <- fdIsSet fd writefds
1265   if b /= 0
1266     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1267     else completeRequests reqs readfds writefds (Write fd m : reqs')
1268
1269 wakeupAll :: [IOReq] -> IO ()
1270 wakeupAll [] = return ()
1271 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1272 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1273
1274 waitForReadEvent :: Fd -> IO ()
1275 waitForReadEvent fd = do
1276   m <- newEmptyMVar
1277   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1278   prodServiceThread
1279   takeMVar m
1280
1281 waitForWriteEvent :: Fd -> IO ()
1282 waitForWriteEvent fd = do
1283   m <- newEmptyMVar
1284   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1285   prodServiceThread
1286   takeMVar m
1287
1288 -- -----------------------------------------------------------------------------
1289 -- Delays
1290
1291 -- Walk the queue of pending delays, waking up any that have passed
1292 -- and return the smallest delay to wait for.  The queue of pending
1293 -- delays is kept ordered.
1294 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1295 getDelay _   _        [] = return ([],nullPtr)
1296 getDelay now ptimeval all@(d : rest) 
1297   = case d of
1298      Delay time m | now >= time -> do
1299         putMVar m ()
1300         getDelay now ptimeval rest
1301      DelaySTM time t | now >= time -> do
1302         atomically $ writeTVar t True
1303         getDelay now ptimeval rest
1304      _otherwise -> do
1305         setTimevalTicks ptimeval (delayTime d - now)
1306         return (all,ptimeval)
1307
1308 data CTimeVal
1309
1310 foreign import ccall unsafe "sizeofTimeVal"
1311   sizeofTimeVal :: Int
1312
1313 foreign import ccall unsafe "setTimevalTicks" 
1314   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1315
1316 {- 
1317   On Win32 we're going to have a single Pipe, and a
1318   waitForSingleObject with the delay time.  For signals, we send a
1319   byte down the pipe just like on Unix.
1320 -}
1321
1322 -- ----------------------------------------------------------------------------
1323 -- select() interface
1324
1325 -- ToDo: move to System.Posix.Internals?
1326
1327 data CFdSet
1328
1329 foreign import ccall safe "select"
1330   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1331            -> IO CInt
1332
1333 foreign import ccall unsafe "hsFD_SETSIZE"
1334   c_fD_SETSIZE :: CInt
1335
1336 fD_SETSIZE :: Fd
1337 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1338
1339 foreign import ccall unsafe "hsFD_ISSET"
1340   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1341
1342 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1343 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1344
1345 foreign import ccall unsafe "hsFD_SET"
1346   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1347
1348 fdSet :: Fd -> Ptr CFdSet -> IO ()
1349 fdSet (Fd fd) fdset = c_fdSet fd fdset
1350
1351 foreign import ccall unsafe "hsFD_ZERO"
1352   fdZero :: Ptr CFdSet -> IO ()
1353
1354 foreign import ccall unsafe "sizeof_fd_set"
1355   sizeofFdSet :: Int
1356
1357 #endif
1358
1359 reportStackOverflow :: IO a
1360 reportStackOverflow = do callStackOverflowHook; return undefined
1361
1362 reportError :: SomeException -> IO a
1363 reportError ex = do
1364    handler <- getUncaughtExceptionHandler
1365    handler ex
1366    return undefined
1367
1368 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1369 -- the unsafe below.
1370 foreign import ccall unsafe "stackOverflow"
1371         callStackOverflowHook :: IO ()
1372
1373 {-# NOINLINE uncaughtExceptionHandler #-}
1374 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1375 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1376    where
1377       defaultHandler :: SomeException -> IO ()
1378       defaultHandler se@(SomeException ex) = do
1379          (hFlush stdout) `catchAny` (\ _ -> return ())
1380          let msg = case cast ex of
1381                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1382                _ -> case cast ex of
1383                     Just (ErrorCall s) -> s
1384                     _                  -> showsPrec 0 se ""
1385          withCString "%s" $ \cfmt ->
1386           withCString msg $ \cmsg ->
1387             errorBelch cfmt cmsg
1388
1389 -- don't use errorBelch() directly, because we cannot call varargs functions
1390 -- using the FFI.
1391 foreign import ccall unsafe "HsBase.h errorBelch2"
1392    errorBelch :: CString -> CString -> IO ()
1393
1394 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1395 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1396
1397 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1398 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1399
1400
1401 withMVar :: MVar a -> (a -> IO b) -> IO b
1402 withMVar m io = 
1403   block $ do
1404     a <- takeMVar m
1405     b <- catchAny (unblock (io a))
1406             (\e -> do putMVar m a; throw e)
1407     putMVar m a
1408     return b
1409 \end{code}