protect console handler against concurrent access (#1922)
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 #include "Typeable.h"
23
24 -- #not-home
25 module GHC.Conc
26         ( ThreadId(..)
27
28         -- * Forking and suchlike
29         , forkIO        -- :: IO a -> IO ThreadId
30         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
31         , numCapabilities -- :: Int
32         , childHandler  -- :: Exception -> IO ()
33         , myThreadId    -- :: IO ThreadId
34         , killThread    -- :: ThreadId -> IO ()
35         , throwTo       -- :: ThreadId -> Exception -> IO ()
36         , par           -- :: a -> b -> b
37         , pseq          -- :: a -> b -> b
38         , yield         -- :: IO ()
39         , labelThread   -- :: ThreadId -> String -> IO ()
40
41         -- * Waiting
42         , threadDelay           -- :: Int -> IO ()
43         , registerDelay         -- :: Int -> IO (TVar Bool)
44         , threadWaitRead        -- :: Int -> IO ()
45         , threadWaitWrite       -- :: Int -> IO ()
46
47         -- * MVars
48         , MVar          -- abstract
49         , newMVar       -- :: a -> IO (MVar a)
50         , newEmptyMVar  -- :: IO (MVar a)
51         , takeMVar      -- :: MVar a -> IO a
52         , putMVar       -- :: MVar a -> a -> IO ()
53         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
54         , tryPutMVar    -- :: MVar a -> a -> IO Bool
55         , isEmptyMVar   -- :: MVar a -> IO Bool
56         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
57
58         -- * TVars
59         , STM           -- abstract
60         , atomically    -- :: STM a -> IO a
61         , retry         -- :: STM a
62         , orElse        -- :: STM a -> STM a -> STM a
63         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
64         , alwaysSucceeds -- :: STM a -> STM ()
65         , always        -- :: STM Bool -> STM ()
66         , TVar          -- abstract
67         , newTVar       -- :: a -> STM (TVar a)
68         , newTVarIO     -- :: a -> STM (TVar a)
69         , readTVar      -- :: TVar a -> STM a
70         , writeTVar     -- :: a -> TVar a -> STM ()
71         , unsafeIOToSTM -- :: IO a -> STM a
72
73         -- * Miscellaneous
74 #ifdef mingw32_HOST_OS
75         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
76         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
77         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
78
79         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
81 #endif
82
83 #ifndef mingw32_HOST_OS
84         , signalHandlerLock
85 #endif
86
87         , ensureIOManagerIsRunning
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         ) where
95
96 import System.Posix.Types
97 #ifndef mingw32_HOST_OS
98 import System.Posix.Internals
99 #endif
100 import Foreign
101 import Foreign.C
102
103 #ifndef __HADDOCK__
104 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
105 #endif
106
107 import Data.Maybe
108
109 import GHC.Base
110 import GHC.IOBase
111 import GHC.Num          ( Num(..) )
112 import GHC.Real         ( fromIntegral, div )
113 #ifndef mingw32_HOST_OS
114 import GHC.Base         ( Int(..) )
115 #endif
116 #ifdef mingw32_HOST_OS
117 import GHC.Read         ( Read )
118 import GHC.Enum         ( Enum )
119 #endif
120 import GHC.Exception
121 import GHC.Pack         ( packCString# )
122 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
123 import GHC.STRef
124 import GHC.Show         ( Show(..), showString )
125 import Data.Typeable
126
127 infixr 0 `par`, `pseq`
128 \end{code}
129
130 %************************************************************************
131 %*                                                                      *
132 \subsection{@ThreadId@, @par@, and @fork@}
133 %*                                                                      *
134 %************************************************************************
135
136 \begin{code}
137 data ThreadId = ThreadId ThreadId# deriving( Typeable )
138 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
139 -- But since ThreadId# is unlifted, the Weak type must use open
140 -- type variables.
141 {- ^
142 A 'ThreadId' is an abstract type representing a handle to a thread.
143 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
144 the 'Ord' instance implements an arbitrary total ordering over
145 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
146 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
147 useful when debugging or diagnosing the behaviour of a concurrent
148 program.
149
150 /Note/: in GHC, if you have a 'ThreadId', you essentially have
151 a pointer to the thread itself.  This means the thread itself can\'t be
152 garbage collected until you drop the 'ThreadId'.
153 This misfeature will hopefully be corrected at a later date.
154
155 /Note/: Hugs does not provide any operations on other threads;
156 it defines 'ThreadId' as a synonym for ().
157 -}
158
159 instance Show ThreadId where
160    showsPrec d t = 
161         showString "ThreadId " . 
162         showsPrec d (getThreadId (id2TSO t))
163
164 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
165
166 id2TSO :: ThreadId -> ThreadId#
167 id2TSO (ThreadId t) = t
168
169 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
170 -- Returns -1, 0, 1
171
172 cmpThread :: ThreadId -> ThreadId -> Ordering
173 cmpThread t1 t2 = 
174    case cmp_thread (id2TSO t1) (id2TSO t2) of
175       -1 -> LT
176       0  -> EQ
177       _  -> GT -- must be 1
178
179 instance Eq ThreadId where
180    t1 == t2 = 
181       case t1 `cmpThread` t2 of
182          EQ -> True
183          _  -> False
184
185 instance Ord ThreadId where
186    compare = cmpThread
187
188 {- |
189 This sparks off a new thread to run the 'IO' computation passed as the
190 first argument, and returns the 'ThreadId' of the newly created
191 thread.
192
193 The new thread will be a lightweight thread; if you want to use a foreign
194 library that uses thread-local storage, use 'forkOS' instead.
195 -}
196 forkIO :: IO () -> IO ThreadId
197 forkIO action = IO $ \ s -> 
198    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
199  where
200   action_plus = catchException action childHandler
201
202 forkOnIO :: Int -> IO () -> IO ThreadId
203 forkOnIO (I# cpu) action = IO $ \ s -> 
204    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
205  where
206   action_plus = catchException action childHandler
207
208 -- | the value passed to the @+RTS -N@ flag.  This is the number of
209 -- Haskell threads that can run truly simultaneously at any given
210 -- time, and is typically set to the number of physical CPU cores on
211 -- the machine.
212 numCapabilities :: Int
213 numCapabilities = unsafePerformIO $  do 
214                     n <- peek n_capabilities
215                     return (fromIntegral n)
216
217 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
218
219 childHandler :: Exception -> IO ()
220 childHandler err = catchException (real_handler err) childHandler
221
222 real_handler :: Exception -> IO ()
223 real_handler ex =
224   case ex of
225         -- ignore thread GC and killThread exceptions:
226         BlockedOnDeadMVar            -> return ()
227         BlockedIndefinitely          -> return ()
228         AsyncException ThreadKilled  -> return ()
229
230         -- report all others:
231         AsyncException StackOverflow -> reportStackOverflow
232         other       -> reportError other
233
234 {- | 'killThread' terminates the given thread (GHC only).
235 Any work already done by the thread isn\'t
236 lost: the computation is suspended until required by another thread.
237 The memory used by the thread will be garbage collected if it isn\'t
238 referenced from anywhere.  The 'killThread' function is defined in
239 terms of 'throwTo':
240
241 > killThread tid = throwTo tid (AsyncException ThreadKilled)
242
243 -}
244 killThread :: ThreadId -> IO ()
245 killThread tid = throwTo tid (AsyncException ThreadKilled)
246
247 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
248
249 'throwTo' does not return until the exception has been raised in the
250 target thread. 
251 The calling thread can thus be certain that the target
252 thread has received the exception.  This is a useful property to know
253 when dealing with race conditions: eg. if there are two threads that
254 can kill each other, it is guaranteed that only one of the threads
255 will get to kill the other.
256
257 If the target thread is currently making a foreign call, then the
258 exception will not be raised (and hence 'throwTo' will not return)
259 until the call has completed.  This is the case regardless of whether
260 the call is inside a 'block' or not.
261
262 Important note: the behaviour of 'throwTo' differs from that described in
263 the paper \"Asynchronous exceptions in Haskell\"
264 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
265 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
266 a more synchronous design in which 'throwTo' does not return until the exception
267 is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
268 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
269 the paper).
270
271 There is currently no guarantee that the exception delivered by 'throwTo' will be
272 delivered at the first possible opportunity.  In particular, if a thread may 
273 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
274 a pending 'throwTo'.  This is arguably undesirable behaviour.
275
276  -}
277 throwTo :: ThreadId -> Exception -> IO ()
278 throwTo (ThreadId id) ex = IO $ \ s ->
279    case (killThread# id ex s) of s1 -> (# s1, () #)
280
281 -- | Returns the 'ThreadId' of the calling thread (GHC only).
282 myThreadId :: IO ThreadId
283 myThreadId = IO $ \s ->
284    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
285
286
287 -- |The 'yield' action allows (forces, in a co-operative multitasking
288 -- implementation) a context-switch to any other currently runnable
289 -- threads (if any), and is occasionally useful when implementing
290 -- concurrency abstractions.
291 yield :: IO ()
292 yield = IO $ \s -> 
293    case (yield# s) of s1 -> (# s1, () #)
294
295 {- | 'labelThread' stores a string as identifier for this thread if
296 you built a RTS with debugging support. This identifier will be used in
297 the debugging output to make distinction of different threads easier
298 (otherwise you only have the thread state object\'s address in the heap).
299
300 Other applications like the graphical Concurrent Haskell Debugger
301 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
302 'labelThread' for their purposes as well.
303 -}
304
305 labelThread :: ThreadId -> String -> IO ()
306 labelThread (ThreadId t) str = IO $ \ s ->
307    let ps  = packCString# str
308        adr = byteArrayContents# ps in
309      case (labelThread# t adr s) of s1 -> (# s1, () #)
310
311 --      Nota Bene: 'pseq' used to be 'seq'
312 --                 but 'seq' is now defined in PrelGHC
313 --
314 -- "pseq" is defined a bit weirdly (see below)
315 --
316 -- The reason for the strange "lazy" call is that
317 -- it fools the compiler into thinking that pseq  and par are non-strict in
318 -- their second argument (even if it inlines pseq at the call site).
319 -- If it thinks pseq is strict in "y", then it often evaluates
320 -- "y" before "x", which is totally wrong.  
321
322 {-# INLINE pseq  #-}
323 pseq :: a -> b -> b
324 pseq  x y = x `seq` lazy y
325
326 {-# INLINE par  #-}
327 par :: a -> b -> b
328 par  x y = case (par# x) of { _ -> lazy y }
329 \end{code}
330
331
332 %************************************************************************
333 %*                                                                      *
334 \subsection[stm]{Transactional heap operations}
335 %*                                                                      *
336 %************************************************************************
337
338 TVars are shared memory locations which support atomic memory
339 transactions.
340
341 \begin{code}
342 -- |A monad supporting atomic memory transactions.
343 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
344
345 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
346 unSTM (STM a) = a
347
348 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
349
350 instance  Functor STM where
351    fmap f x = x >>= (return . f)
352
353 instance  Monad STM  where
354     {-# INLINE return #-}
355     {-# INLINE (>>)   #-}
356     {-# INLINE (>>=)  #-}
357     m >> k      = thenSTM m k
358     return x    = returnSTM x
359     m >>= k     = bindSTM m k
360
361 bindSTM :: STM a -> (a -> STM b) -> STM b
362 bindSTM (STM m) k = STM ( \s ->
363   case m s of 
364     (# new_s, a #) -> unSTM (k a) new_s
365   )
366
367 thenSTM :: STM a -> STM b -> STM b
368 thenSTM (STM m) k = STM ( \s ->
369   case m s of 
370     (# new_s, a #) -> unSTM k new_s
371   )
372
373 returnSTM :: a -> STM a
374 returnSTM x = STM (\s -> (# s, x #))
375
376 -- | Unsafely performs IO in the STM monad.
377 unsafeIOToSTM :: IO a -> STM a
378 unsafeIOToSTM (IO m) = STM m
379
380 -- |Perform a series of STM actions atomically.
381 --
382 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
383 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
384 -- this would effectively allow a transaction inside a transaction, depending
385 -- on exactly when the thunk is evaluated.)
386 --
387 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
388 -- and which allows top-level TVars to be allocated.
389
390 atomically :: STM a -> IO a
391 atomically (STM m) = IO (\s -> (atomically# m) s )
392
393 -- |Retry execution of the current memory transaction because it has seen
394 -- values in TVars which mean that it should not continue (e.g. the TVars
395 -- represent a shared buffer that is now empty).  The implementation may
396 -- block the thread until one of the TVars that it has read from has been
397 -- udpated. (GHC only)
398 retry :: STM a
399 retry = STM $ \s# -> retry# s#
400
401 -- |Compose two alternative STM actions (GHC only).  If the first action
402 -- completes without retrying then it forms the result of the orElse.
403 -- Otherwise, if the first action retries, then the second action is
404 -- tried in its place.  If both actions retry then the orElse as a
405 -- whole retries.
406 orElse :: STM a -> STM a -> STM a
407 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
408
409 -- |Exception handling within STM actions.
410 catchSTM :: STM a -> (Exception -> STM a) -> STM a
411 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
412
413 -- | Low-level primitive on which always and alwaysSucceeds are built.
414 -- checkInv differs form these in that (i) the invariant is not 
415 -- checked when checkInv is called, only at the end of this and
416 -- subsequent transcations, (ii) the invariant failure is indicated
417 -- by raising an exception.
418 checkInv :: STM a -> STM ()
419 checkInv (STM m) = STM (\s -> (check# m) s)
420
421 -- | alwaysSucceeds adds a new invariant that must be true when passed
422 -- to alwaysSucceeds, at the end of the current transaction, and at
423 -- the end of every subsequent transaction.  If it fails at any
424 -- of those points then the transaction violating it is aborted
425 -- and the exception raised by the invariant is propagated.
426 alwaysSucceeds :: STM a -> STM ()
427 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
428                       checkInv i
429
430 -- | always is a variant of alwaysSucceeds in which the invariant is
431 -- expressed as an STM Bool action that must return True.  Returning
432 -- False or raising an exception are both treated as invariant failures.
433 always :: STM Bool -> STM ()
434 always i = alwaysSucceeds ( do v <- i
435                                if (v) then return () else ( error "Transacional invariant violation" ) )
436
437 -- |Shared memory locations that support atomic memory transactions.
438 data TVar a = TVar (TVar# RealWorld a)
439
440 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
441
442 instance Eq (TVar a) where
443         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
444
445 -- |Create a new TVar holding a value supplied
446 newTVar :: a -> STM (TVar a)
447 newTVar val = STM $ \s1# ->
448     case newTVar# val s1# of
449          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
450
451 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
452 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
453 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
454 -- possible.
455 newTVarIO :: a -> IO (TVar a)
456 newTVarIO val = IO $ \s1# ->
457     case newTVar# val s1# of
458          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
459
460 -- |Return the current value stored in a TVar
461 readTVar :: TVar a -> STM a
462 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
463
464 -- |Write the supplied value into a TVar
465 writeTVar :: TVar a -> a -> STM ()
466 writeTVar (TVar tvar#) val = STM $ \s1# ->
467     case writeTVar# tvar# val s1# of
468          s2# -> (# s2#, () #)
469   
470 \end{code}
471
472 %************************************************************************
473 %*                                                                      *
474 \subsection[mvars]{M-Structures}
475 %*                                                                      *
476 %************************************************************************
477
478 M-Vars are rendezvous points for concurrent threads.  They begin
479 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
480 is written, a single blocked thread may be freed.  Reading an M-Var
481 toggles its state from full back to empty.  Therefore, any value
482 written to an M-Var may only be read once.  Multiple reads and writes
483 are allowed, but there must be at least one read between any two
484 writes.
485
486 \begin{code}
487 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
488
489 -- |Create an 'MVar' which is initially empty.
490 newEmptyMVar  :: IO (MVar a)
491 newEmptyMVar = IO $ \ s# ->
492     case newMVar# s# of
493          (# s2#, svar# #) -> (# s2#, MVar svar# #)
494
495 -- |Create an 'MVar' which contains the supplied value.
496 newMVar :: a -> IO (MVar a)
497 newMVar value =
498     newEmptyMVar        >>= \ mvar ->
499     putMVar mvar value  >>
500     return mvar
501
502 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
503 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
504 -- the 'MVar' is left empty.
505 -- 
506 -- There are two further important properties of 'takeMVar':
507 --
508 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
509 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
510 --     only one thread will be woken up.  The runtime guarantees that
511 --     the woken thread completes its 'takeMVar' operation.
512 --
513 --   * When multiple threads are blocked on an 'MVar', they are
514 --     woken up in FIFO order.  This is useful for providing
515 --     fairness properties of abstractions built using 'MVar's.
516 --
517 takeMVar :: MVar a -> IO a
518 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
519
520 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
521 -- 'putMVar' will wait until it becomes empty.
522 --
523 -- There are two further important properties of 'putMVar':
524 --
525 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
526 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
527 --     only one thread will be woken up.  The runtime guarantees that
528 --     the woken thread completes its 'putMVar' operation.
529 --
530 --   * When multiple threads are blocked on an 'MVar', they are
531 --     woken up in FIFO order.  This is useful for providing
532 --     fairness properties of abstractions built using 'MVar's.
533 --
534 putMVar  :: MVar a -> a -> IO ()
535 putMVar (MVar mvar#) x = IO $ \ s# ->
536     case putMVar# mvar# x s# of
537         s2# -> (# s2#, () #)
538
539 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
540 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
541 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
542 -- the 'MVar' is left empty.
543 tryTakeMVar :: MVar a -> IO (Maybe a)
544 tryTakeMVar (MVar m) = IO $ \ s ->
545     case tryTakeMVar# m s of
546         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
547         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
548
549 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
550 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
551 -- it was successful, or 'False' otherwise.
552 tryPutMVar  :: MVar a -> a -> IO Bool
553 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
554     case tryPutMVar# mvar# x s# of
555         (# s, 0# #) -> (# s, False #)
556         (# s, _  #) -> (# s, True #)
557
558 -- |Check whether a given 'MVar' is empty.
559 --
560 -- Notice that the boolean value returned  is just a snapshot of
561 -- the state of the MVar. By the time you get to react on its result,
562 -- the MVar may have been filled (or emptied) - so be extremely
563 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
564 isEmptyMVar :: MVar a -> IO Bool
565 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
566     case isEmptyMVar# mv# s# of
567         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
568
569 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
570 -- "System.Mem.Weak" for more about finalizers.
571 addMVarFinalizer :: MVar a -> IO () -> IO ()
572 addMVarFinalizer (MVar m) finalizer = 
573   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
574
575 withMVar :: MVar a -> (a -> IO b) -> IO b
576 withMVar m io = 
577   block $ do
578     a <- takeMVar m
579     b <- catchException (unblock (io a))
580             (\e -> do putMVar m a; throw e)
581     putMVar m a
582     return b
583 \end{code}
584
585
586 %************************************************************************
587 %*                                                                      *
588 \subsection{Thread waiting}
589 %*                                                                      *
590 %************************************************************************
591
592 \begin{code}
593 #ifdef mingw32_HOST_OS
594
595 -- Note: threadWaitRead and threadWaitWrite aren't really functional
596 -- on Win32, but left in there because lib code (still) uses them (the manner
597 -- in which they're used doesn't cause problems on a Win32 platform though.)
598
599 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
600 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
601   IO $ \s -> case asyncRead# fd isSock len buf s of 
602                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
603
604 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
605 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
606   IO $ \s -> case asyncWrite# fd isSock len buf s of 
607                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
608
609 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
610 asyncDoProc (FunPtr proc) (Ptr param) = 
611     -- the 'length' value is ignored; simplifies implementation of
612     -- the async*# primops to have them all return the same result.
613   IO $ \s -> case asyncDoProc# proc param s  of 
614                (# s, len#, err# #) -> (# s, I# err# #)
615
616 -- to aid the use of these primops by the IO Handle implementation,
617 -- provide the following convenience funs:
618
619 -- this better be a pinned byte array!
620 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
621 asyncReadBA fd isSock len off bufB = 
622   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
623   
624 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
625 asyncWriteBA fd isSock len off bufB = 
626   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
627
628 #endif
629
630 -- -----------------------------------------------------------------------------
631 -- Thread IO API
632
633 -- | Block the current thread until data is available to read on the
634 -- given file descriptor (GHC only).
635 threadWaitRead :: Fd -> IO ()
636 threadWaitRead fd
637 #ifndef mingw32_HOST_OS
638   | threaded  = waitForReadEvent fd
639 #endif
640   | otherwise = IO $ \s -> 
641         case fromIntegral fd of { I# fd# ->
642         case waitRead# fd# s of { s -> (# s, () #)
643         }}
644
645 -- | Block the current thread until data can be written to the
646 -- given file descriptor (GHC only).
647 threadWaitWrite :: Fd -> IO ()
648 threadWaitWrite fd
649 #ifndef mingw32_HOST_OS
650   | threaded  = waitForWriteEvent fd
651 #endif
652   | otherwise = IO $ \s -> 
653         case fromIntegral fd of { I# fd# ->
654         case waitWrite# fd# s of { s -> (# s, () #)
655         }}
656
657 -- | Suspends the current thread for a given number of microseconds
658 -- (GHC only).
659 --
660 -- There is no guarantee that the thread will be rescheduled promptly
661 -- when the delay has expired, but the thread will never continue to
662 -- run /earlier/ than specified.
663 --
664 threadDelay :: Int -> IO ()
665 threadDelay time
666   | threaded  = waitForDelayEvent time
667   | otherwise = IO $ \s -> 
668         case fromIntegral time of { I# time# ->
669         case delay# time# s of { s -> (# s, () #)
670         }}
671
672
673 -- | Set the value of returned TVar to True after a given number of
674 -- microseconds. The caveats associated with threadDelay also apply.
675 --
676 registerDelay :: Int -> IO (TVar Bool)
677 registerDelay usecs 
678   | threaded = waitForDelayEventSTM usecs
679   | otherwise = error "registerDelay: requires -threaded"
680
681 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
682
683 waitForDelayEvent :: Int -> IO ()
684 waitForDelayEvent usecs = do
685   m <- newEmptyMVar
686   target <- calculateTarget usecs
687   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
688   prodServiceThread
689   takeMVar m
690
691 -- Delays for use in STM
692 waitForDelayEventSTM :: Int -> IO (TVar Bool)
693 waitForDelayEventSTM usecs = do
694    t <- atomically $ newTVar False
695    target <- calculateTarget usecs
696    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
697    prodServiceThread
698    return t  
699     
700 calculateTarget :: Int -> IO USecs
701 calculateTarget usecs = do
702     now <- getUSecOfDay
703     return $ now + (fromIntegral usecs)
704
705
706 -- ----------------------------------------------------------------------------
707 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
708
709 -- In the threaded RTS, we employ a single IO Manager thread to wait
710 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
711 -- and delays (threadDelay).  
712 --
713 -- We can do this because in the threaded RTS the IO Manager can make
714 -- a non-blocking call to select(), so we don't have to do select() in
715 -- the scheduler as we have to in the non-threaded RTS.  We get performance
716 -- benefits from doing it this way, because we only have to restart the select()
717 -- when a new request arrives, rather than doing one select() each time
718 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
719 -- by not having to check for completed IO requests.
720
721 -- Issues, possible problems:
722 --
723 --      - we might want bound threads to just do the blocking
724 --        operation rather than communicating with the IO manager
725 --        thread.  This would prevent simgle-threaded programs which do
726 --        IO from requiring multiple OS threads.  However, it would also
727 --        prevent bound threads waiting on IO from being killed or sent
728 --        exceptions.
729 --
730 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
731 --        I couldn't repeat this.
732 --
733 --      - How do we handle signal delivery in the multithreaded RTS?
734 --
735 --      - forkProcess will kill the IO manager thread.  Let's just
736 --        hope we don't need to do any blocking IO between fork & exec.
737
738 #ifndef mingw32_HOST_OS
739 data IOReq
740   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
741   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
742 #endif
743
744 data DelayReq
745   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
746   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
747
748 #ifndef mingw32_HOST_OS
749 pendingEvents :: IORef [IOReq]
750 #endif
751 pendingDelays :: IORef [DelayReq]
752         -- could use a strict list or array here
753 {-# NOINLINE pendingEvents #-}
754 {-# NOINLINE pendingDelays #-}
755 (pendingEvents,pendingDelays) = unsafePerformIO $ do
756   startIOManagerThread
757   reqs <- newIORef []
758   dels <- newIORef []
759   return (reqs, dels)
760         -- the first time we schedule an IO request, the service thread
761         -- will be created (cool, huh?)
762
763 ensureIOManagerIsRunning :: IO ()
764 ensureIOManagerIsRunning 
765   | threaded  = seq pendingEvents $ return ()
766   | otherwise = return ()
767
768 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
769 insertDelay d [] = [d]
770 insertDelay d1 ds@(d2 : rest)
771   | delayTime d1 <= delayTime d2 = d1 : ds
772   | otherwise                    = d2 : insertDelay d1 rest
773
774 delayTime :: DelayReq -> USecs
775 delayTime (Delay t _) = t
776 delayTime (DelaySTM t _) = t
777
778 type USecs = Word64
779
780 -- XXX: move into GHC.IOBase from Data.IORef?
781 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
782 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
783
784 foreign import ccall unsafe "getUSecOfDay" 
785   getUSecOfDay :: IO USecs
786
787 prodding :: IORef Bool
788 {-# NOINLINE prodding #-}
789 prodding = unsafePerformIO (newIORef False)
790
791 prodServiceThread :: IO ()
792 prodServiceThread = do
793   was_set <- atomicModifyIORef prodding (\a -> (True,a))
794   if (not (was_set)) then wakeupIOManager else return ()
795
796 #ifdef mingw32_HOST_OS
797 -- ----------------------------------------------------------------------------
798 -- Windows IO manager thread
799
800 startIOManagerThread :: IO ()
801 startIOManagerThread = do
802   wakeup <- c_getIOManagerEvent
803   forkIO $ service_loop wakeup []
804   return ()
805
806 service_loop :: HANDLE          -- read end of pipe
807              -> [DelayReq]      -- current delay requests
808              -> IO ()
809
810 service_loop wakeup old_delays = do
811   -- pick up new delay requests
812   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
813   let  delays = foldr insertDelay old_delays new_delays
814
815   now <- getUSecOfDay
816   (delays', timeout) <- getDelay now delays
817
818   r <- c_WaitForSingleObject wakeup timeout
819   case r of
820     0xffffffff -> do c_maperrno; throwErrno "service_loop"
821     0 -> do
822         r <- c_readIOManagerEvent
823         exit <- 
824               case r of
825                 _ | r == io_MANAGER_WAKEUP -> return False
826                 _ | r == io_MANAGER_DIE    -> return True
827                 0 -> return False -- spurious wakeup
828                 r -> do start_console_handler (r `shiftR` 1); return False
829         if exit
830           then return ()
831           else service_cont wakeup delays'
832
833     _other -> service_cont wakeup delays' -- probably timeout        
834
835 service_cont wakeup delays = do
836   atomicModifyIORef prodding (\_ -> (False,False))
837   service_loop wakeup delays
838
839 -- must agree with rts/win32/ThrIOManager.c
840 io_MANAGER_WAKEUP = 0xffffffff :: Word32
841 io_MANAGER_DIE    = 0xfffffffe :: Word32
842
843 data ConsoleEvent
844  = ControlC
845  | Break
846  | Close
847     -- these are sent to Services only.
848  | Logoff
849  | Shutdown
850  deriving (Eq, Ord, Enum, Show, Read, Typeable)
851
852 start_console_handler :: Word32 -> IO ()
853 start_console_handler r =
854   case toWin32ConsoleEvent r of
855      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
856                     forkIO (handler x)
857                     return ()
858      Nothing -> return ()
859
860 toWin32ConsoleEvent ev = 
861    case ev of
862        0 {- CTRL_C_EVENT-}        -> Just ControlC
863        1 {- CTRL_BREAK_EVENT-}    -> Just Break
864        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
865        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
866        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
867        _ -> Nothing
868
869 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
870 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
871
872 stick :: IORef HANDLE
873 {-# NOINLINE stick #-}
874 stick = unsafePerformIO (newIORef nullPtr)
875
876 wakeupIOManager = do 
877   hdl <- readIORef stick
878   c_sendIOManagerEvent io_MANAGER_WAKEUP
879
880 -- Walk the queue of pending delays, waking up any that have passed
881 -- and return the smallest delay to wait for.  The queue of pending
882 -- delays is kept ordered.
883 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
884 getDelay now [] = return ([], iNFINITE)
885 getDelay now all@(d : rest) 
886   = case d of
887      Delay time m | now >= time -> do
888         putMVar m ()
889         getDelay now rest
890      DelaySTM time t | now >= time -> do
891         atomically $ writeTVar t True
892         getDelay now rest
893      _otherwise ->
894         -- delay is in millisecs for WaitForSingleObject
895         let micro_seconds = delayTime d - now
896             milli_seconds = (micro_seconds + 999) `div` 1000
897         in return (all, fromIntegral milli_seconds)
898
899 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
900 -- available yet.  We should move some Win32 functionality down here,
901 -- maybe as part of the grand reorganisation of the base package...
902 type HANDLE       = Ptr ()
903 type DWORD        = Word32
904
905 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
906
907 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
908   c_getIOManagerEvent :: IO HANDLE
909
910 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
911   c_readIOManagerEvent :: IO Word32
912
913 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
914   c_sendIOManagerEvent :: Word32 -> IO ()
915
916 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
917    c_maperrno :: IO ()
918
919 foreign import stdcall "WaitForSingleObject"
920    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
921
922 #else
923 -- ----------------------------------------------------------------------------
924 -- Unix IO manager thread, using select()
925
926 startIOManagerThread :: IO ()
927 startIOManagerThread = do
928         allocaArray 2 $ \fds -> do
929         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
930         rd_end <- peekElemOff fds 0
931         wr_end <- peekElemOff fds 1
932         writeIORef stick (fromIntegral wr_end)
933         c_setIOManagerPipe wr_end
934         forkIO $ do
935             allocaBytes sizeofFdSet   $ \readfds -> do
936             allocaBytes sizeofFdSet   $ \writefds -> do 
937             allocaBytes sizeofTimeVal $ \timeval -> do
938             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
939         return ()
940
941 service_loop
942    :: Fd                -- listen to this for wakeup calls
943    -> Ptr CFdSet
944    -> Ptr CFdSet
945    -> Ptr CTimeVal
946    -> [IOReq]
947    -> [DelayReq]
948    -> IO ()
949 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
950
951   -- pick up new IO requests
952   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
953   let reqs = new_reqs ++ old_reqs
954
955   -- pick up new delay requests
956   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
957   let  delays = foldr insertDelay old_delays new_delays
958
959   -- build the FDSets for select()
960   fdZero readfds
961   fdZero writefds
962   fdSet wakeup readfds
963   maxfd <- buildFdSets 0 readfds writefds reqs
964
965   -- perform the select()
966   let do_select delays = do
967           -- check the current time and wake up any thread in
968           -- threadDelay whose timeout has expired.  Also find the
969           -- timeout value for the select() call.
970           now <- getUSecOfDay
971           (delays', timeout) <- getDelay now ptimeval delays
972
973           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
974                         nullPtr timeout
975           if (res == -1)
976              then do
977                 err <- getErrno
978                 case err of
979                   _ | err == eINTR ->  do_select delays'
980                         -- EINTR: just redo the select()
981                   _ | err == eBADF ->  return (True, delays)
982                         -- EBADF: one of the file descriptors is closed or bad,
983                         -- we don't know which one, so wake everyone up.
984                   _ | otherwise    ->  throwErrno "select"
985                         -- otherwise (ENOMEM or EINVAL) something has gone
986                         -- wrong; report the error.
987              else
988                 return (False,delays')
989
990   (wakeup_all,delays') <- do_select delays
991
992   exit <-
993     if wakeup_all then return False
994       else do
995         b <- fdIsSet wakeup readfds
996         if b == 0 
997           then return False
998           else alloca $ \p -> do 
999                  c_read (fromIntegral wakeup) p 1; return ()
1000                  s <- peek p            
1001                  case s of
1002                   _ | s == io_MANAGER_WAKEUP -> return False
1003                   _ | s == io_MANAGER_DIE    -> return True
1004                   _ -> withMVar signalHandlerLock $ \_ -> do
1005                           handler_tbl <- peek handlers
1006                           sp <- peekElemOff handler_tbl (fromIntegral s)
1007                           io <- deRefStablePtr sp
1008                           forkIO io
1009                           return False
1010
1011   if exit then return () else do
1012
1013   atomicModifyIORef prodding (\_ -> (False,False))
1014
1015   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1016                          else completeRequests reqs readfds writefds []
1017
1018   service_loop wakeup readfds writefds ptimeval reqs' delays'
1019
1020 io_MANAGER_WAKEUP = 0xff :: CChar
1021 io_MANAGER_DIE    = 0xfe :: CChar
1022
1023 stick :: IORef Fd
1024 {-# NOINLINE stick #-}
1025 stick = unsafePerformIO (newIORef 0)
1026
1027 wakeupIOManager :: IO ()
1028 wakeupIOManager = do
1029   fd <- readIORef stick
1030   with io_MANAGER_WAKEUP $ \pbuf -> do 
1031     c_write (fromIntegral fd) pbuf 1; return ()
1032
1033 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1034 -- this race condition is #1922, although that bug was on Windows a similar
1035 -- bug also exists on Unix.
1036 signalHandlerLock :: MVar ()
1037 signalHandlerLock = unsafePerformIO (newMVar ())
1038
1039 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
1040
1041 foreign import ccall "setIOManagerPipe"
1042   c_setIOManagerPipe :: CInt -> IO ()
1043
1044 -- -----------------------------------------------------------------------------
1045 -- IO requests
1046
1047 buildFdSets maxfd readfds writefds [] = return maxfd
1048 buildFdSets maxfd readfds writefds (Read fd m : reqs)
1049   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1050   | otherwise        =  do
1051         fdSet fd readfds
1052         buildFdSets (max maxfd fd) readfds writefds reqs
1053 buildFdSets maxfd readfds writefds (Write fd m : reqs)
1054   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1055   | otherwise        =  do
1056         fdSet fd writefds
1057         buildFdSets (max maxfd fd) readfds writefds reqs
1058
1059 completeRequests [] _ _ reqs' = return reqs'
1060 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1061   b <- fdIsSet fd readfds
1062   if b /= 0
1063     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1064     else completeRequests reqs readfds writefds (Read fd m : reqs')
1065 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1066   b <- fdIsSet fd writefds
1067   if b /= 0
1068     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1069     else completeRequests reqs readfds writefds (Write fd m : reqs')
1070
1071 wakeupAll [] = return ()
1072 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
1073 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1074
1075 waitForReadEvent :: Fd -> IO ()
1076 waitForReadEvent fd = do
1077   m <- newEmptyMVar
1078   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1079   prodServiceThread
1080   takeMVar m
1081
1082 waitForWriteEvent :: Fd -> IO ()
1083 waitForWriteEvent fd = do
1084   m <- newEmptyMVar
1085   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1086   prodServiceThread
1087   takeMVar m
1088
1089 -- -----------------------------------------------------------------------------
1090 -- Delays
1091
1092 -- Walk the queue of pending delays, waking up any that have passed
1093 -- and return the smallest delay to wait for.  The queue of pending
1094 -- delays is kept ordered.
1095 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1096 getDelay now ptimeval [] = return ([],nullPtr)
1097 getDelay now ptimeval all@(d : rest) 
1098   = case d of
1099      Delay time m | now >= time -> do
1100         putMVar m ()
1101         getDelay now ptimeval rest
1102      DelaySTM time t | now >= time -> do
1103         atomically $ writeTVar t True
1104         getDelay now ptimeval rest
1105      _otherwise -> do
1106         setTimevalTicks ptimeval (delayTime d - now)
1107         return (all,ptimeval)
1108
1109 newtype CTimeVal = CTimeVal ()
1110
1111 foreign import ccall unsafe "sizeofTimeVal"
1112   sizeofTimeVal :: Int
1113
1114 foreign import ccall unsafe "setTimevalTicks" 
1115   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1116
1117 {- 
1118   On Win32 we're going to have a single Pipe, and a
1119   waitForSingleObject with the delay time.  For signals, we send a
1120   byte down the pipe just like on Unix.
1121 -}
1122
1123 -- ----------------------------------------------------------------------------
1124 -- select() interface
1125
1126 -- ToDo: move to System.Posix.Internals?
1127
1128 newtype CFdSet = CFdSet ()
1129
1130 foreign import ccall safe "select"
1131   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1132            -> IO CInt
1133
1134 foreign import ccall unsafe "hsFD_SETSIZE"
1135   c_fD_SETSIZE :: CInt
1136
1137 fD_SETSIZE :: Fd
1138 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1139
1140 foreign import ccall unsafe "hsFD_CLR"
1141   c_fdClr :: CInt -> Ptr CFdSet -> IO ()
1142
1143 fdClr :: Fd -> Ptr CFdSet -> IO ()
1144 fdClr (Fd fd) fdset = c_fdClr fd fdset
1145
1146 foreign import ccall unsafe "hsFD_ISSET"
1147   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1148
1149 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1150 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1151
1152 foreign import ccall unsafe "hsFD_SET"
1153   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1154
1155 fdSet :: Fd -> Ptr CFdSet -> IO ()
1156 fdSet (Fd fd) fdset = c_fdSet fd fdset
1157
1158 foreign import ccall unsafe "hsFD_ZERO"
1159   fdZero :: Ptr CFdSet -> IO ()
1160
1161 foreign import ccall unsafe "sizeof_fd_set"
1162   sizeofFdSet :: Int
1163
1164 #endif
1165
1166 \end{code}