Add support for the IO manager thread on Windows
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 #include "Typeable.h"
23
24 -- #not-home
25 module GHC.Conc
26         ( ThreadId(..)
27
28         -- * Forking and suchlike
29         , forkIO        -- :: IO a -> IO ThreadId
30         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
31         , childHandler  -- :: Exception -> IO ()
32         , myThreadId    -- :: IO ThreadId
33         , killThread    -- :: ThreadId -> IO ()
34         , throwTo       -- :: ThreadId -> Exception -> IO ()
35         , par           -- :: a -> b -> b
36         , pseq          -- :: a -> b -> b
37         , yield         -- :: IO ()
38         , labelThread   -- :: ThreadId -> String -> IO ()
39
40         -- * Waiting
41         , threadDelay           -- :: Int -> IO ()
42         , registerDelay         -- :: Int -> IO (TVar Bool)
43         , threadWaitRead        -- :: Int -> IO ()
44         , threadWaitWrite       -- :: Int -> IO ()
45
46         -- * MVars
47         , MVar          -- abstract
48         , newMVar       -- :: a -> IO (MVar a)
49         , newEmptyMVar  -- :: IO (MVar a)
50         , takeMVar      -- :: MVar a -> IO a
51         , putMVar       -- :: MVar a -> a -> IO ()
52         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
53         , tryPutMVar    -- :: MVar a -> a -> IO Bool
54         , isEmptyMVar   -- :: MVar a -> IO Bool
55         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
56
57         -- * TVars
58         , STM           -- abstract
59         , atomically    -- :: STM a -> IO a
60         , retry         -- :: STM a
61         , orElse        -- :: STM a -> STM a -> STM a
62         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
63         , alwaysSucceeds -- :: STM a -> STM ()
64         , always        -- :: STM Bool -> STM ()
65         , TVar          -- abstract
66         , newTVar       -- :: a -> STM (TVar a)
67         , newTVarIO     -- :: a -> STM (TVar a)
68         , readTVar      -- :: TVar a -> STM a
69         , writeTVar     -- :: a -> TVar a -> STM ()
70         , unsafeIOToSTM -- :: IO a -> STM a
71
72         -- * Miscellaneous
73 #ifdef mingw32_HOST_OS
74         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
75         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
76         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
77
78         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
79         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
80 #endif
81
82         , ensureIOManagerIsRunning
83         ) where
84
85 import System.Posix.Types
86 #ifndef mingw32_HOST_OS
87 import System.Posix.Internals
88 #endif
89 import Foreign
90 import Foreign.C
91
92 #ifndef __HADDOCK__
93 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
94 #endif
95
96 import Data.Maybe
97
98 import GHC.Base
99 import GHC.IOBase
100 import GHC.Num          ( Num(..) )
101 import GHC.Real         ( fromIntegral, quot )
102 #ifndef mingw32_HOST_OS
103 import GHC.Base         ( Int(..) )
104 #endif
105 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
106 import GHC.Pack         ( packCString# )
107 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
108 import GHC.STRef
109 import GHC.Show         ( Show(..), showString )
110 import Data.Typeable
111
112 infixr 0 `par`, `pseq`
113 \end{code}
114
115 %************************************************************************
116 %*                                                                      *
117 \subsection{@ThreadId@, @par@, and @fork@}
118 %*                                                                      *
119 %************************************************************************
120
121 \begin{code}
122 data ThreadId = ThreadId ThreadId# deriving( Typeable )
123 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
124 -- But since ThreadId# is unlifted, the Weak type must use open
125 -- type variables.
126 {- ^
127 A 'ThreadId' is an abstract type representing a handle to a thread.
128 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
129 the 'Ord' instance implements an arbitrary total ordering over
130 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
131 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
132 useful when debugging or diagnosing the behaviour of a concurrent
133 program.
134
135 /Note/: in GHC, if you have a 'ThreadId', you essentially have
136 a pointer to the thread itself.  This means the thread itself can\'t be
137 garbage collected until you drop the 'ThreadId'.
138 This misfeature will hopefully be corrected at a later date.
139
140 /Note/: Hugs does not provide any operations on other threads;
141 it defines 'ThreadId' as a synonym for ().
142 -}
143
144 instance Show ThreadId where
145    showsPrec d t = 
146         showString "ThreadId " . 
147         showsPrec d (getThreadId (id2TSO t))
148
149 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
150
151 id2TSO :: ThreadId -> ThreadId#
152 id2TSO (ThreadId t) = t
153
154 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
155 -- Returns -1, 0, 1
156
157 cmpThread :: ThreadId -> ThreadId -> Ordering
158 cmpThread t1 t2 = 
159    case cmp_thread (id2TSO t1) (id2TSO t2) of
160       -1 -> LT
161       0  -> EQ
162       _  -> GT -- must be 1
163
164 instance Eq ThreadId where
165    t1 == t2 = 
166       case t1 `cmpThread` t2 of
167          EQ -> True
168          _  -> False
169
170 instance Ord ThreadId where
171    compare = cmpThread
172
173 {- |
174 This sparks off a new thread to run the 'IO' computation passed as the
175 first argument, and returns the 'ThreadId' of the newly created
176 thread.
177
178 The new thread will be a lightweight thread; if you want to use a foreign
179 library that uses thread-local storage, use 'forkOS' instead.
180 -}
181 forkIO :: IO () -> IO ThreadId
182 forkIO action = IO $ \ s -> 
183    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
184  where
185   action_plus = catchException action childHandler
186
187 forkOnIO :: Int -> IO () -> IO ThreadId
188 forkOnIO (I# cpu) action = IO $ \ s -> 
189    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
190  where
191   action_plus = catchException action childHandler
192
193 childHandler :: Exception -> IO ()
194 childHandler err = catchException (real_handler err) childHandler
195
196 real_handler :: Exception -> IO ()
197 real_handler ex =
198   case ex of
199         -- ignore thread GC and killThread exceptions:
200         BlockedOnDeadMVar            -> return ()
201         BlockedIndefinitely          -> return ()
202         AsyncException ThreadKilled  -> return ()
203
204         -- report all others:
205         AsyncException StackOverflow -> reportStackOverflow
206         other       -> reportError other
207
208 {- | 'killThread' terminates the given thread (GHC only).
209 Any work already done by the thread isn\'t
210 lost: the computation is suspended until required by another thread.
211 The memory used by the thread will be garbage collected if it isn\'t
212 referenced from anywhere.  The 'killThread' function is defined in
213 terms of 'throwTo':
214
215 > killThread tid = throwTo tid (AsyncException ThreadKilled)
216
217 -}
218 killThread :: ThreadId -> IO ()
219 killThread tid = throwTo tid (AsyncException ThreadKilled)
220
221 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
222
223 'throwTo' does not return until the exception has been raised in the
224 target thread.  The calling thread can thus be certain that the target
225 thread has received the exception.  This is a useful property to know
226 when dealing with race conditions: eg. if there are two threads that
227 can kill each other, it is guaranteed that only one of the threads
228 will get to kill the other.
229
230 If the target thread is currently making a foreign call, then the
231 exception will not be raised (and hence 'throwTo' will not return)
232 until the call has completed.  This is the case regardless of whether
233 the call is inside a 'block' or not.
234  -}
235 throwTo :: ThreadId -> Exception -> IO ()
236 throwTo (ThreadId id) ex = IO $ \ s ->
237    case (killThread# id ex s) of s1 -> (# s1, () #)
238
239 -- | Returns the 'ThreadId' of the calling thread (GHC only).
240 myThreadId :: IO ThreadId
241 myThreadId = IO $ \s ->
242    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
243
244
245 -- |The 'yield' action allows (forces, in a co-operative multitasking
246 -- implementation) a context-switch to any other currently runnable
247 -- threads (if any), and is occasionally useful when implementing
248 -- concurrency abstractions.
249 yield :: IO ()
250 yield = IO $ \s -> 
251    case (yield# s) of s1 -> (# s1, () #)
252
253 {- | 'labelThread' stores a string as identifier for this thread if
254 you built a RTS with debugging support. This identifier will be used in
255 the debugging output to make distinction of different threads easier
256 (otherwise you only have the thread state object\'s address in the heap).
257
258 Other applications like the graphical Concurrent Haskell Debugger
259 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
260 'labelThread' for their purposes as well.
261 -}
262
263 labelThread :: ThreadId -> String -> IO ()
264 labelThread (ThreadId t) str = IO $ \ s ->
265    let ps  = packCString# str
266        adr = byteArrayContents# ps in
267      case (labelThread# t adr s) of s1 -> (# s1, () #)
268
269 --      Nota Bene: 'pseq' used to be 'seq'
270 --                 but 'seq' is now defined in PrelGHC
271 --
272 -- "pseq" is defined a bit weirdly (see below)
273 --
274 -- The reason for the strange "lazy" call is that
275 -- it fools the compiler into thinking that pseq  and par are non-strict in
276 -- their second argument (even if it inlines pseq at the call site).
277 -- If it thinks pseq is strict in "y", then it often evaluates
278 -- "y" before "x", which is totally wrong.  
279
280 {-# INLINE pseq  #-}
281 pseq :: a -> b -> b
282 pseq  x y = x `seq` lazy y
283
284 {-# INLINE par  #-}
285 par :: a -> b -> b
286 par  x y = case (par# x) of { _ -> lazy y }
287 \end{code}
288
289
290 %************************************************************************
291 %*                                                                      *
292 \subsection[stm]{Transactional heap operations}
293 %*                                                                      *
294 %************************************************************************
295
296 TVars are shared memory locations which support atomic memory
297 transactions.
298
299 \begin{code}
300 -- |A monad supporting atomic memory transactions.
301 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
302
303 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
304 unSTM (STM a) = a
305
306 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
307
308 instance  Functor STM where
309    fmap f x = x >>= (return . f)
310
311 instance  Monad STM  where
312     {-# INLINE return #-}
313     {-# INLINE (>>)   #-}
314     {-# INLINE (>>=)  #-}
315     m >> k      = thenSTM m k
316     return x    = returnSTM x
317     m >>= k     = bindSTM m k
318
319 bindSTM :: STM a -> (a -> STM b) -> STM b
320 bindSTM (STM m) k = STM ( \s ->
321   case m s of 
322     (# new_s, a #) -> unSTM (k a) new_s
323   )
324
325 thenSTM :: STM a -> STM b -> STM b
326 thenSTM (STM m) k = STM ( \s ->
327   case m s of 
328     (# new_s, a #) -> unSTM k new_s
329   )
330
331 returnSTM :: a -> STM a
332 returnSTM x = STM (\s -> (# s, x #))
333
334 -- | Unsafely performs IO in the STM monad.
335 unsafeIOToSTM :: IO a -> STM a
336 unsafeIOToSTM (IO m) = STM m
337
338 -- |Perform a series of STM actions atomically.
339 --
340 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
341 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
342 -- this would effectively allow a transaction inside a transaction, depending
343 -- on exactly when the thunk is evaluated.)
344 --
345 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
346 -- and which allows top-level TVars to be allocated.
347
348 atomically :: STM a -> IO a
349 atomically (STM m) = IO (\s -> (atomically# m) s )
350
351 -- |Retry execution of the current memory transaction because it has seen
352 -- values in TVars which mean that it should not continue (e.g. the TVars
353 -- represent a shared buffer that is now empty).  The implementation may
354 -- block the thread until one of the TVars that it has read from has been
355 -- udpated. (GHC only)
356 retry :: STM a
357 retry = STM $ \s# -> retry# s#
358
359 -- |Compose two alternative STM actions (GHC only).  If the first action
360 -- completes without retrying then it forms the result of the orElse.
361 -- Otherwise, if the first action retries, then the second action is
362 -- tried in its place.  If both actions retry then the orElse as a
363 -- whole retries.
364 orElse :: STM a -> STM a -> STM a
365 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
366
367 -- |Exception handling within STM actions.
368 catchSTM :: STM a -> (Exception -> STM a) -> STM a
369 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
370
371 -- | Low-level primitive on which always and alwaysSucceeds are built.
372 -- checkInv differs form these in that (i) the invariant is not 
373 -- checked when checkInv is called, only at the end of this and
374 -- subsequent transcations, (ii) the invariant failure is indicated
375 -- by raising an exception.
376 checkInv :: STM a -> STM ()
377 checkInv (STM m) = STM (\s -> (check# m) s)
378
379 -- | alwaysSucceeds adds a new invariant that must be true when passed
380 -- to alwaysSucceeds, at the end of the current transaction, and at
381 -- the end of every subsequent transaction.  If it fails at any
382 -- of those points then the transaction violating it is aborted
383 -- and the exception raised by the invariant is propagated.
384 alwaysSucceeds :: STM a -> STM ()
385 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
386                       checkInv i
387
388 -- | always is a variant of alwaysSucceeds in which the invariant is
389 -- expressed as an STM Bool action that must return True.  Returning
390 -- False or raising an exception are both treated as invariant failures.
391 always :: STM Bool -> STM ()
392 always i = alwaysSucceeds ( do v <- i
393                                if (v) then return () else ( error "Transacional invariant violation" ) )
394
395 -- |Shared memory locations that support atomic memory transactions.
396 data TVar a = TVar (TVar# RealWorld a)
397
398 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
399
400 instance Eq (TVar a) where
401         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
402
403 -- |Create a new TVar holding a value supplied
404 newTVar :: a -> STM (TVar a)
405 newTVar val = STM $ \s1# ->
406     case newTVar# val s1# of
407          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
408
409 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
410 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
411 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
412 -- possible.
413 newTVarIO :: a -> IO (TVar a)
414 newTVarIO val = IO $ \s1# ->
415     case newTVar# val s1# of
416          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
417
418 -- |Return the current value stored in a TVar
419 readTVar :: TVar a -> STM a
420 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
421
422 -- |Write the supplied value into a TVar
423 writeTVar :: TVar a -> a -> STM ()
424 writeTVar (TVar tvar#) val = STM $ \s1# ->
425     case writeTVar# tvar# val s1# of
426          s2# -> (# s2#, () #)
427   
428 \end{code}
429
430 %************************************************************************
431 %*                                                                      *
432 \subsection[mvars]{M-Structures}
433 %*                                                                      *
434 %************************************************************************
435
436 M-Vars are rendezvous points for concurrent threads.  They begin
437 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
438 is written, a single blocked thread may be freed.  Reading an M-Var
439 toggles its state from full back to empty.  Therefore, any value
440 written to an M-Var may only be read once.  Multiple reads and writes
441 are allowed, but there must be at least one read between any two
442 writes.
443
444 \begin{code}
445 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
446
447 -- |Create an 'MVar' which is initially empty.
448 newEmptyMVar  :: IO (MVar a)
449 newEmptyMVar = IO $ \ s# ->
450     case newMVar# s# of
451          (# s2#, svar# #) -> (# s2#, MVar svar# #)
452
453 -- |Create an 'MVar' which contains the supplied value.
454 newMVar :: a -> IO (MVar a)
455 newMVar value =
456     newEmptyMVar        >>= \ mvar ->
457     putMVar mvar value  >>
458     return mvar
459
460 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
461 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
462 -- the 'MVar' is left empty.
463 -- 
464 -- There are two further important properties of 'takeMVar':
465 --
466 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
467 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
468 --     only one thread will be woken up.  The runtime guarantees that
469 --     the woken thread completes its 'takeMVar' operation.
470 --
471 --   * When multiple threads are blocked on an 'MVar', they are
472 --     woken up in FIFO order.  This is useful for providing
473 --     fairness properties of abstractions built using 'MVar's.
474 --
475 takeMVar :: MVar a -> IO a
476 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
477
478 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
479 -- 'putMVar' will wait until it becomes empty.
480 --
481 -- There are two further important properties of 'putMVar':
482 --
483 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
484 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
485 --     only one thread will be woken up.  The runtime guarantees that
486 --     the woken thread completes its 'putMVar' operation.
487 --
488 --   * When multiple threads are blocked on an 'MVar', they are
489 --     woken up in FIFO order.  This is useful for providing
490 --     fairness properties of abstractions built using 'MVar's.
491 --
492 putMVar  :: MVar a -> a -> IO ()
493 putMVar (MVar mvar#) x = IO $ \ s# ->
494     case putMVar# mvar# x s# of
495         s2# -> (# s2#, () #)
496
497 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
498 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
499 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
500 -- the 'MVar' is left empty.
501 tryTakeMVar :: MVar a -> IO (Maybe a)
502 tryTakeMVar (MVar m) = IO $ \ s ->
503     case tryTakeMVar# m s of
504         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
505         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
506
507 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
508 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
509 -- it was successful, or 'False' otherwise.
510 tryPutMVar  :: MVar a -> a -> IO Bool
511 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
512     case tryPutMVar# mvar# x s# of
513         (# s, 0# #) -> (# s, False #)
514         (# s, _  #) -> (# s, True #)
515
516 -- |Check whether a given 'MVar' is empty.
517 --
518 -- Notice that the boolean value returned  is just a snapshot of
519 -- the state of the MVar. By the time you get to react on its result,
520 -- the MVar may have been filled (or emptied) - so be extremely
521 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
522 isEmptyMVar :: MVar a -> IO Bool
523 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
524     case isEmptyMVar# mv# s# of
525         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
526
527 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
528 -- "System.Mem.Weak" for more about finalizers.
529 addMVarFinalizer :: MVar a -> IO () -> IO ()
530 addMVarFinalizer (MVar m) finalizer = 
531   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
532 \end{code}
533
534
535 %************************************************************************
536 %*                                                                      *
537 \subsection{Thread waiting}
538 %*                                                                      *
539 %************************************************************************
540
541 \begin{code}
542 #ifdef mingw32_HOST_OS
543
544 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
545 -- on Win32, but left in there because lib code (still) uses them (the manner
546 -- in which they're used doesn't cause problems on a Win32 platform though.)
547
548 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
549 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
550   IO $ \s -> case asyncRead# fd isSock len buf s of 
551                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
552
553 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
554 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
555   IO $ \s -> case asyncWrite# fd isSock len buf s of 
556                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
557
558 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
559 asyncDoProc (FunPtr proc) (Ptr param) = 
560     -- the 'length' value is ignored; simplifies implementation of
561     -- the async*# primops to have them all return the same result.
562   IO $ \s -> case asyncDoProc# proc param s  of 
563                (# s, len#, err# #) -> (# s, I# err# #)
564
565 -- to aid the use of these primops by the IO Handle implementation,
566 -- provide the following convenience funs:
567
568 -- this better be a pinned byte array!
569 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
570 asyncReadBA fd isSock len off bufB = 
571   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
572   
573 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
574 asyncWriteBA fd isSock len off bufB = 
575   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
576
577 #endif
578
579 -- -----------------------------------------------------------------------------
580 -- Thread IO API
581
582 -- | Block the current thread until data is available to read on the
583 -- given file descriptor (GHC only).
584 threadWaitRead :: Fd -> IO ()
585 threadWaitRead fd
586 #ifndef mingw32_HOST_OS
587   | threaded  = waitForReadEvent fd
588 #endif
589   | otherwise = IO $ \s -> 
590         case fromIntegral fd of { I# fd# ->
591         case waitRead# fd# s of { s -> (# s, () #)
592         }}
593
594 -- | Block the current thread until data can be written to the
595 -- given file descriptor (GHC only).
596 threadWaitWrite :: Fd -> IO ()
597 threadWaitWrite fd
598 #ifndef mingw32_HOST_OS
599   | threaded  = waitForWriteEvent fd
600 #endif
601   | otherwise = IO $ \s -> 
602         case fromIntegral fd of { I# fd# ->
603         case waitWrite# fd# s of { s -> (# s, () #)
604         }}
605
606 -- | Suspends the current thread for a given number of microseconds
607 -- (GHC only).
608 --
609 -- Note that the resolution used by the Haskell runtime system's
610 -- internal timer is 1\/50 second, and 'threadDelay' will round its
611 -- argument up to the nearest multiple of this resolution.
612 --
613 -- There is no guarantee that the thread will be rescheduled promptly
614 -- when the delay has expired, but the thread will never continue to
615 -- run /earlier/ than specified.
616 --
617 threadDelay :: Int -> IO ()
618 threadDelay time
619   | threaded  = waitForDelayEvent time
620   | otherwise = IO $ \s -> 
621         case fromIntegral time of { I# time# ->
622         case delay# time# s of { s -> (# s, () #)
623         }}
624
625 registerDelay :: Int -> IO (TVar Bool)
626 registerDelay usecs 
627   | threaded = waitForDelayEventSTM usecs
628   | otherwise = error "registerDelay: requires -threaded"
629
630 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
631
632 waitForDelayEvent :: Int -> IO ()
633 waitForDelayEvent usecs = do
634   m <- newEmptyMVar
635   now <- getTicksOfDay
636   let target = now + usecs `quot` tick_usecs
637   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
638   prodServiceThread
639   takeMVar m
640
641 -- Delays for use in STM
642 waitForDelayEventSTM :: Int -> IO (TVar Bool)
643 waitForDelayEventSTM usecs = do
644    t <- atomically $ newTVar False
645    now <- getTicksOfDay
646    let target = now + usecs `quot` tick_usecs
647    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
648    prodServiceThread
649    return t  
650     
651 calculateTarget :: Int -> IO Int
652 calculateTarget usecs = do
653     now <- getTicksOfDay
654     let -- Convert usecs to ticks, rounding up as we must wait /at least/
655         -- as long as we are told
656         usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
657         target = now + 1 -- getTicksOfDay will have rounded down, but
658                          -- again we need to wait for /at least/ as long
659                          -- as we are told, so add 1 to it
660                + usecs'
661     return target
662
663 -- ----------------------------------------------------------------------------
664 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
665
666 -- In the threaded RTS, we employ a single IO Manager thread to wait
667 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
668 -- and delays (threadDelay).  
669 --
670 -- We can do this because in the threaded RTS the IO Manager can make
671 -- a non-blocking call to select(), so we don't have to do select() in
672 -- the scheduler as we have to in the non-threaded RTS.  We get performance
673 -- benefits from doing it this way, because we only have to restart the select()
674 -- when a new request arrives, rather than doing one select() each time
675 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
676 -- by not having to check for completed IO requests.
677
678 -- Issues, possible problems:
679 --
680 --      - we might want bound threads to just do the blocking
681 --        operation rather than communicating with the IO manager
682 --        thread.  This would prevent simgle-threaded programs which do
683 --        IO from requiring multiple OS threads.  However, it would also
684 --        prevent bound threads waiting on IO from being killed or sent
685 --        exceptions.
686 --
687 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
688 --        I couldn't repeat this.
689 --
690 --      - How do we handle signal delivery in the multithreaded RTS?
691 --
692 --      - forkProcess will kill the IO manager thread.  Let's just
693 --        hope we don't need to do any blocking IO between fork & exec.
694
695 #ifndef mingw32_HOST_OS
696 data IOReq
697   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
698   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
699 #endif
700
701 data DelayReq
702   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
703   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
704
705 #ifndef mingw32_HOST_OS
706 pendingEvents :: IORef [IOReq]
707 #endif
708 pendingDelays :: IORef [DelayReq]
709         -- could use a strict list or array here
710 {-# NOINLINE pendingEvents #-}
711 {-# NOINLINE pendingDelays #-}
712 (pendingEvents,pendingDelays) = unsafePerformIO $ do
713   startIOManagerThread
714   reqs <- newIORef []
715   dels <- newIORef []
716   return (reqs, dels)
717         -- the first time we schedule an IO request, the service thread
718         -- will be created (cool, huh?)
719
720 ensureIOManagerIsRunning :: IO ()
721 ensureIOManagerIsRunning 
722   | threaded  = seq pendingEvents $ return ()
723   | otherwise = return ()
724
725 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
726 insertDelay d [] = [d]
727 insertDelay d1 ds@(d2 : rest)
728   | delayTime d1 <= delayTime d2 = d1 : ds
729   | otherwise                    = d2 : insertDelay d1 rest
730
731 delayTime (Delay t _) = t
732 delayTime (DelaySTM t _) = t
733
734 type Ticks = Int
735 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
736 tick_usecs = 1000000 `quot` tick_freq :: Int
737 tick_msecs = 1000 `quot` tick_freq :: Int
738
739 -- XXX: move into GHC.IOBase from Data.IORef?
740 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
741 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
742
743 foreign import ccall unsafe "getTicksOfDay" 
744   getTicksOfDay :: IO Ticks
745
746 #ifdef mingw32_HOST_OS
747 -- ----------------------------------------------------------------------------
748 -- Windows IO manager thread
749
750 startIOManagerThread :: IO ()
751 startIOManagerThread = do
752   wakeup <- c_getIOManagerEvent
753   forkIO $ service_loop wakeup []
754   return ()
755
756 service_loop :: HANDLE          -- read end of pipe
757              -> [DelayReq]      -- current delay requests
758              -> IO ()
759
760 service_loop wakeup old_delays = do
761   -- pick up new delay requests
762   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
763   let  delays = foldr insertDelay old_delays new_delays
764
765   now <- getTicksOfDay
766   (delays', timeout) <- getDelay now delays
767
768   r <- c_WaitForSingleObject wakeup timeout
769   case r of
770     0xffffffff -> do c_maperrno; throwErrno "service_loop"
771     0 -> do
772         r <- c_readIOManagerEvent
773         exit <- 
774               case r of
775                 _ | r == io_MANAGER_WAKEUP -> return False
776                 _ | r == io_MANAGER_DIE    -> return True
777                 0 -> return False -- spurious wakeup
778                 r -> do start_console_handler (r `shiftR` 1); return False
779         if exit
780           then return ()
781           else service_cont wakeup delays'
782
783     _other -> service_cont wakeup delays' -- probably timeout        
784
785 service_cont wakeup delays = do
786   takeMVar prodding
787   putMVar prodding False
788   service_loop wakeup delays
789
790 -- must agree with rts/win32/ThrIOManager.c
791 io_MANAGER_WAKEUP = 0xffffffff :: Word32
792 io_MANAGER_DIE    = 0xfffffffe :: Word32
793
794 start_console_handler :: Word32 -> IO ()
795 start_console_handler r = do                   
796   stableptr <- peek console_handler
797   forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
798   return ()
799
800 foreign import ccall "&console_handler" 
801    console_handler :: Ptr (StablePtr (CInt -> IO ()))
802
803 stick :: IORef HANDLE
804 {-# NOINLINE stick #-}
805 stick = unsafePerformIO (newIORef nullPtr)
806
807 prodding :: MVar Bool
808 {-# NOINLINE prodding #-}
809 prodding = unsafePerformIO (newMVar False)
810
811 prodServiceThread :: IO ()
812 prodServiceThread = do
813   b <- takeMVar prodding
814   if (not b) 
815     then do hdl <- readIORef stick
816             c_sendIOManagerEvent io_MANAGER_WAKEUP
817     else return ()
818   putMVar prodding True
819
820 -- Walk the queue of pending delays, waking up any that have passed
821 -- and return the smallest delay to wait for.  The queue of pending
822 -- delays is kept ordered.
823 getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
824 getDelay now [] = return ([], iNFINITE)
825 getDelay now all@(d : rest) 
826   = case d of
827      Delay time m | now >= time -> do
828         putMVar m ()
829         getDelay now rest
830      DelaySTM time t | now >= time -> do
831         atomically $ writeTVar t True
832         getDelay now rest
833      _otherwise ->
834         return (all, (fromIntegral (delayTime d - now) * 
835                         fromIntegral tick_msecs))
836                         -- delay is in millisecs for WaitForSingleObject
837
838 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
839 -- available yet.  We should move some Win32 functionality down here,
840 -- maybe as part of the grand reorganisation of the base package...
841 type HANDLE       = Ptr ()
842 type DWORD        = Word32
843
844 iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
845
846 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
847   c_getIOManagerEvent :: IO HANDLE
848
849 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
850   c_readIOManagerEvent :: IO Word32
851
852 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
853   c_sendIOManagerEvent :: Word32 -> IO ()
854
855 foreign import ccall unsafe "maperrno"             -- in runProcess.c
856    c_maperrno :: IO ()
857
858 foreign import stdcall "WaitForSingleObject"
859    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
860
861 #else
862 -- ----------------------------------------------------------------------------
863 -- Unix IO manager thread, using select()
864
865 startIOManagerThread :: IO ()
866 startIOManagerThread = do
867         allocaArray 2 $ \fds -> do
868         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
869         rd_end <- peekElemOff fds 0
870         wr_end <- peekElemOff fds 1
871         writeIORef stick (fromIntegral wr_end)
872         c_setIOManagerPipe wr_end
873         forkIO $ do
874             allocaBytes sizeofFdSet   $ \readfds -> do
875             allocaBytes sizeofFdSet   $ \writefds -> do 
876             allocaBytes sizeofTimeVal $ \timeval -> do
877             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
878         return ()
879
880 service_loop
881    :: Fd                -- listen to this for wakeup calls
882    -> Ptr CFdSet
883    -> Ptr CFdSet
884    -> Ptr CTimeVal
885    -> [IOReq]
886    -> [DelayReq]
887    -> IO ()
888 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
889
890   -- pick up new IO requests
891   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
892   let reqs = new_reqs ++ old_reqs
893
894   -- pick up new delay requests
895   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
896   let  delays = foldr insertDelay old_delays new_delays
897
898   -- build the FDSets for select()
899   fdZero readfds
900   fdZero writefds
901   fdSet wakeup readfds
902   maxfd <- buildFdSets 0 readfds writefds reqs
903
904   -- perform the select()
905   let do_select delays = do
906           -- check the current time and wake up any thread in
907           -- threadDelay whose timeout has expired.  Also find the
908           -- timeout value for the select() call.
909           now <- getTicksOfDay
910           (delays', timeout) <- getDelay now ptimeval delays
911
912           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
913                         nullPtr timeout
914           if (res == -1)
915              then do
916                 err <- getErrno
917                 case err of
918                   _ | err == eINTR ->  do_select delays'
919                         -- EINTR: just redo the select()
920                   _ | err == eBADF ->  return (True, delays)
921                         -- EBADF: one of the file descriptors is closed or bad,
922                         -- we don't know which one, so wake everyone up.
923                   _ | otherwise    ->  throwErrno "select"
924                         -- otherwise (ENOMEM or EINVAL) something has gone
925                         -- wrong; report the error.
926              else
927                 return (False,delays')
928
929   (wakeup_all,delays') <- do_select delays
930
931   exit <-
932     if wakeup_all then return False
933       else do
934         b <- fdIsSet wakeup readfds
935         if b == 0 
936           then return False
937           else alloca $ \p -> do 
938                  c_read (fromIntegral wakeup) p 1; return ()
939                  s <- peek p            
940                  case s of
941                   _ | s == io_MANAGER_WAKEUP -> return False
942                   _ | s == io_MANAGER_DIE    -> return True
943                   _ -> do handler_tbl <- peek handlers
944                           sp <- peekElemOff handler_tbl (fromIntegral s)
945                           forkIO (do io <- deRefStablePtr sp; io)
946                           return False
947
948   if exit then return () else do
949
950   takeMVar prodding
951   putMVar prodding False
952
953   reqs' <- if wakeup_all then do wakeupAll reqs; return []
954                          else completeRequests reqs readfds writefds []
955
956   service_loop wakeup readfds writefds ptimeval reqs' delays'
957
958 io_MANAGER_WAKEUP = 0xff :: CChar
959 io_MANAGER_DIE    = 0xfe :: CChar
960
961 stick :: IORef Fd
962 {-# NOINLINE stick #-}
963 stick = unsafePerformIO (newIORef 0)
964
965 prodding :: MVar Bool
966 {-# NOINLINE prodding #-}
967 prodding = unsafePerformIO (newMVar False)
968
969 prodServiceThread :: IO ()
970 prodServiceThread = do
971   b <- takeMVar prodding
972   if (not b) 
973     then do fd <- readIORef stick
974             with io_MANAGER_WAKEUP $ \pbuf -> do 
975                 c_write (fromIntegral fd) pbuf 1; return ()
976     else return ()
977   putMVar prodding True
978
979 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
980
981 foreign import ccall "setIOManagerPipe"
982   c_setIOManagerPipe :: CInt -> IO ()
983
984 -- -----------------------------------------------------------------------------
985 -- IO requests
986
987 buildFdSets maxfd readfds writefds [] = return maxfd
988 buildFdSets maxfd readfds writefds (Read fd m : reqs)
989   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
990   | otherwise        =  do
991         fdSet fd readfds
992         buildFdSets (max maxfd fd) readfds writefds reqs
993 buildFdSets maxfd readfds writefds (Write fd m : reqs)
994   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
995   | otherwise        =  do
996         fdSet fd writefds
997         buildFdSets (max maxfd fd) readfds writefds reqs
998
999 completeRequests [] _ _ reqs' = return reqs'
1000 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1001   b <- fdIsSet fd readfds
1002   if b /= 0
1003     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1004     else completeRequests reqs readfds writefds (Read fd m : reqs')
1005 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1006   b <- fdIsSet fd writefds
1007   if b /= 0
1008     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1009     else completeRequests reqs readfds writefds (Write fd m : reqs')
1010
1011 wakeupAll [] = return ()
1012 wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
1013 wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
1014
1015 waitForReadEvent :: Fd -> IO ()
1016 waitForReadEvent fd = do
1017   m <- newEmptyMVar
1018   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1019   prodServiceThread
1020   takeMVar m
1021
1022 waitForWriteEvent :: Fd -> IO ()
1023 waitForWriteEvent fd = do
1024   m <- newEmptyMVar
1025   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1026   prodServiceThread
1027   takeMVar m
1028
1029 -- -----------------------------------------------------------------------------
1030 -- Delays
1031
1032 -- Walk the queue of pending delays, waking up any that have passed
1033 -- and return the smallest delay to wait for.  The queue of pending
1034 -- delays is kept ordered.
1035 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1036 getDelay now ptimeval [] = return ([],nullPtr)
1037 getDelay now ptimeval all@(d : rest) 
1038   = case d of
1039      Delay time m | now >= time -> do
1040         putMVar m ()
1041         getDelay now ptimeval rest
1042      DelaySTM time t | now >= time -> do
1043         atomically $ writeTVar t True
1044         getDelay now ptimeval rest
1045      _otherwise -> do
1046         setTimevalTicks ptimeval (delayTime d - now)
1047         return (all,ptimeval)
1048
1049 newtype CTimeVal = CTimeVal ()
1050
1051 foreign import ccall unsafe "sizeofTimeVal"
1052   sizeofTimeVal :: Int
1053
1054 foreign import ccall unsafe "setTimevalTicks" 
1055   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
1056
1057 {- 
1058   On Win32 we're going to have a single Pipe, and a
1059   waitForSingleObject with the delay time.  For signals, we send a
1060   byte down the pipe just like on Unix.
1061 -}
1062
1063 -- ----------------------------------------------------------------------------
1064 -- select() interface
1065
1066 -- ToDo: move to System.Posix.Internals?
1067
1068 newtype CFdSet = CFdSet ()
1069
1070 foreign import ccall safe "select"
1071   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1072            -> IO CInt
1073
1074 foreign import ccall unsafe "hsFD_SETSIZE"
1075   fD_SETSIZE :: Fd
1076
1077 foreign import ccall unsafe "hsFD_CLR"
1078   fdClr :: Fd -> Ptr CFdSet -> IO ()
1079
1080 foreign import ccall unsafe "hsFD_ISSET"
1081   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1082
1083 foreign import ccall unsafe "hsFD_SET"
1084   fdSet :: Fd -> Ptr CFdSet -> IO ()
1085
1086 foreign import ccall unsafe "hsFD_ZERO"
1087   fdZero :: Ptr CFdSet -> IO ()
1088
1089 foreign import ccall unsafe "sizeof_fd_set"
1090   sizeofFdSet :: Int
1091
1092 #endif
1093
1094 \end{code}