[project @ 2005-12-01 12:32:24 by simonmar]
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 -- #not-home
23 module GHC.Conc
24         ( ThreadId(..)
25
26         -- Forking and suchlike
27         , forkIO        -- :: IO a -> IO ThreadId
28         , childHandler  -- :: Exception -> IO ()
29         , myThreadId    -- :: IO ThreadId
30         , killThread    -- :: ThreadId -> IO ()
31         , throwTo       -- :: ThreadId -> Exception -> IO ()
32         , par           -- :: a -> b -> b
33         , pseq          -- :: a -> b -> b
34         , yield         -- :: IO ()
35         , labelThread   -- :: ThreadId -> String -> IO ()
36
37         -- Waiting
38         , threadDelay           -- :: Int -> IO ()
39         , registerDelay         -- :: Int -> IO (TVar Bool)
40         , threadWaitRead        -- :: Int -> IO ()
41         , threadWaitWrite       -- :: Int -> IO ()
42
43         -- MVars
44         , MVar          -- abstract
45         , newMVar       -- :: a -> IO (MVar a)
46         , newEmptyMVar  -- :: IO (MVar a)
47         , takeMVar      -- :: MVar a -> IO a
48         , putMVar       -- :: MVar a -> a -> IO ()
49         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
50         , tryPutMVar    -- :: MVar a -> a -> IO Bool
51         , isEmptyMVar   -- :: MVar a -> IO Bool
52         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
53
54         -- TVars
55         , STM           -- abstract
56         , atomically    -- :: STM a -> IO a
57         , retry         -- :: STM a
58         , orElse        -- :: STM a -> STM a -> STM a
59         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
60         , TVar          -- abstract
61         , newTVar       -- :: a -> STM (TVar a)
62         , readTVar      -- :: TVar a -> STM a
63         , writeTVar     -- :: a -> TVar a -> STM ()
64         , unsafeIOToSTM -- :: IO a -> STM a
65
66 #ifdef mingw32_HOST_OS
67         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
68         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
69         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
70
71         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
72         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
73 #endif
74
75 #ifndef mingw32_HOST_OS
76         , ensureIOManagerIsRunning
77 #endif
78         ) where
79
80 import System.Posix.Types
81 import System.Posix.Internals
82 import Foreign
83 import Foreign.C
84
85 import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
86
87 import Data.Maybe
88
89 import GHC.Base
90 import GHC.IOBase
91 import GHC.Num          ( Num(..) )
92 import GHC.Real         ( fromIntegral, quot )
93 import GHC.Base         ( Int(..) )
94 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
95 import GHC.Pack         ( packCString# )
96 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
97 import GHC.STRef
98 import Data.Typeable
99
100 infixr 0 `par`, `pseq`
101 \end{code}
102
103 %************************************************************************
104 %*                                                                      *
105 \subsection{@ThreadId@, @par@, and @fork@}
106 %*                                                                      *
107 %************************************************************************
108
109 \begin{code}
110 data ThreadId = ThreadId ThreadId# deriving( Typeable )
111 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
112 -- But since ThreadId# is unlifted, the Weak type must use open
113 -- type variables.
114 {- ^
115 A 'ThreadId' is an abstract type representing a handle to a thread.
116 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
117 the 'Ord' instance implements an arbitrary total ordering over
118 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
119 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
120 useful when debugging or diagnosing the behaviour of a concurrent
121 program.
122
123 /Note/: in GHC, if you have a 'ThreadId', you essentially have
124 a pointer to the thread itself.  This means the thread itself can\'t be
125 garbage collected until you drop the 'ThreadId'.
126 This misfeature will hopefully be corrected at a later date.
127
128 /Note/: Hugs does not provide any operations on other threads;
129 it defines 'ThreadId' as a synonym for ().
130 -}
131
132 {- |
133 This sparks off a new thread to run the 'IO' computation passed as the
134 first argument, and returns the 'ThreadId' of the newly created
135 thread.
136
137 The new thread will be a lightweight thread; if you want to use a foreign
138 library that uses thread-local storage, use 'forkOS' instead.
139 -}
140 forkIO :: IO () -> IO ThreadId
141 forkIO action = IO $ \ s -> 
142    case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
143  where
144   action_plus = catchException action childHandler
145
146 childHandler :: Exception -> IO ()
147 childHandler err = catchException (real_handler err) childHandler
148
149 real_handler :: Exception -> IO ()
150 real_handler ex =
151   case ex of
152         -- ignore thread GC and killThread exceptions:
153         BlockedOnDeadMVar            -> return ()
154         BlockedIndefinitely          -> return ()
155         AsyncException ThreadKilled  -> return ()
156
157         -- report all others:
158         AsyncException StackOverflow -> reportStackOverflow
159         other       -> reportError other
160
161 {- | 'killThread' terminates the given thread (GHC only).
162 Any work already done by the thread isn\'t
163 lost: the computation is suspended until required by another thread.
164 The memory used by the thread will be garbage collected if it isn\'t
165 referenced from anywhere.  The 'killThread' function is defined in
166 terms of 'throwTo':
167
168 > killThread tid = throwTo tid (AsyncException ThreadKilled)
169
170 -}
171 killThread :: ThreadId -> IO ()
172 killThread tid = throwTo tid (AsyncException ThreadKilled)
173
174 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
175
176 'throwTo' does not return until the exception has been raised in the
177 target thread.  The calling thread can thus be certain that the target
178 thread has received the exception.  This is a useful property to know
179 when dealing with race conditions: eg. if there are two threads that
180 can kill each other, it is guaranteed that only one of the threads
181 will get to kill the other.
182
183 If the target thread is currently making a foreign call, then the
184 exception will not be raised (and hence 'throwTo' will not return)
185 until the call has completed.  This is the case regardless of whether
186 the call is inside a 'block' or not.
187  -}
188 throwTo :: ThreadId -> Exception -> IO ()
189 throwTo (ThreadId id) ex = IO $ \ s ->
190    case (killThread# id ex s) of s1 -> (# s1, () #)
191
192 -- | Returns the 'ThreadId' of the calling thread (GHC only).
193 myThreadId :: IO ThreadId
194 myThreadId = IO $ \s ->
195    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
196
197
198 -- |The 'yield' action allows (forces, in a co-operative multitasking
199 -- implementation) a context-switch to any other currently runnable
200 -- threads (if any), and is occasionally useful when implementing
201 -- concurrency abstractions.
202 yield :: IO ()
203 yield = IO $ \s -> 
204    case (yield# s) of s1 -> (# s1, () #)
205
206 {- | 'labelThread' stores a string as identifier for this thread if
207 you built a RTS with debugging support. This identifier will be used in
208 the debugging output to make distinction of different threads easier
209 (otherwise you only have the thread state object\'s address in the heap).
210
211 Other applications like the graphical Concurrent Haskell Debugger
212 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
213 'labelThread' for their purposes as well.
214 -}
215
216 labelThread :: ThreadId -> String -> IO ()
217 labelThread (ThreadId t) str = IO $ \ s ->
218    let ps  = packCString# str
219        adr = byteArrayContents# ps in
220      case (labelThread# t adr s) of s1 -> (# s1, () #)
221
222 --      Nota Bene: 'pseq' used to be 'seq'
223 --                 but 'seq' is now defined in PrelGHC
224 --
225 -- "pseq" is defined a bit weirdly (see below)
226 --
227 -- The reason for the strange "lazy" call is that
228 -- it fools the compiler into thinking that pseq  and par are non-strict in
229 -- their second argument (even if it inlines pseq at the call site).
230 -- If it thinks pseq is strict in "y", then it often evaluates
231 -- "y" before "x", which is totally wrong.  
232
233 {-# INLINE pseq  #-}
234 pseq :: a -> b -> b
235 pseq  x y = x `seq` lazy y
236
237 {-# INLINE par  #-}
238 par :: a -> b -> b
239 par  x y = case (par# x) of { _ -> lazy y }
240 \end{code}
241
242
243 %************************************************************************
244 %*                                                                      *
245 \subsection[stm]{Transactional heap operations}
246 %*                                                                      *
247 %************************************************************************
248
249 TVars are shared memory locations which support atomic memory
250 transactions.
251
252 \begin{code}
253 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
254
255 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
256 unSTM (STM a) = a
257
258 instance  Functor STM where
259    fmap f x = x >>= (return . f)
260
261 instance  Monad STM  where
262     {-# INLINE return #-}
263     {-# INLINE (>>)   #-}
264     {-# INLINE (>>=)  #-}
265     m >> k      = thenSTM m k
266     return x    = returnSTM x
267     m >>= k     = bindSTM m k
268
269 bindSTM :: STM a -> (a -> STM b) -> STM b
270 bindSTM (STM m) k = STM ( \s ->
271   case m s of 
272     (# new_s, a #) -> unSTM (k a) new_s
273   )
274
275 thenSTM :: STM a -> STM b -> STM b
276 thenSTM (STM m) k = STM ( \s ->
277   case m s of 
278     (# new_s, a #) -> unSTM k new_s
279   )
280
281 returnSTM :: a -> STM a
282 returnSTM x = STM (\s -> (# s, x #))
283
284 -- | Unsafely performs IO in the STM monad.
285 unsafeIOToSTM :: IO a -> STM a
286 unsafeIOToSTM (IO m) = STM m
287
288 -- |Perform a series of STM actions atomically.
289 atomically :: STM a -> IO a
290 atomically (STM m) = IO (\s -> (atomically# m) s )
291
292 -- |Retry execution of the current memory transaction because it has seen
293 -- values in TVars which mean that it should not continue (e.g. the TVars
294 -- represent a shared buffer that is now empty).  The implementation may
295 -- block the thread until one of the TVars that it has read from has been
296 -- udpated.
297 retry :: STM a
298 retry = STM $ \s# -> retry# s#
299
300 -- |Compose two alternative STM actions.  If the first action completes without
301 -- retrying then it forms the result of the orElse.  Otherwise, if the first
302 -- action retries, then the second action is tried in its place.  If both actions
303 -- retry then the orElse as a whole retries.
304 orElse :: STM a -> STM a -> STM a
305 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
306
307 -- |Exception handling within STM actions.
308 catchSTM :: STM a -> (Exception -> STM a) -> STM a
309 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
310
311 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
312
313 instance Eq (TVar a) where
314         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
315
316 -- |Create a new TVar holding a value supplied
317 newTVar :: a -> STM (TVar a)
318 newTVar val = STM $ \s1# ->
319     case newTVar# val s1# of
320          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
321
322 -- |Return the current value stored in a TVar
323 readTVar :: TVar a -> STM a
324 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
325
326 -- |Write the supplied value into a TVar
327 writeTVar :: TVar a -> a -> STM ()
328 writeTVar (TVar tvar#) val = STM $ \s1# ->
329     case writeTVar# tvar# val s1# of
330          s2# -> (# s2#, () #)
331   
332 \end{code}
333
334 %************************************************************************
335 %*                                                                      *
336 \subsection[mvars]{M-Structures}
337 %*                                                                      *
338 %************************************************************************
339
340 M-Vars are rendezvous points for concurrent threads.  They begin
341 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
342 is written, a single blocked thread may be freed.  Reading an M-Var
343 toggles its state from full back to empty.  Therefore, any value
344 written to an M-Var may only be read once.  Multiple reads and writes
345 are allowed, but there must be at least one read between any two
346 writes.
347
348 \begin{code}
349 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
350
351 -- |Create an 'MVar' which is initially empty.
352 newEmptyMVar  :: IO (MVar a)
353 newEmptyMVar = IO $ \ s# ->
354     case newMVar# s# of
355          (# s2#, svar# #) -> (# s2#, MVar svar# #)
356
357 -- |Create an 'MVar' which contains the supplied value.
358 newMVar :: a -> IO (MVar a)
359 newMVar value =
360     newEmptyMVar        >>= \ mvar ->
361     putMVar mvar value  >>
362     return mvar
363
364 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
365 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
366 -- the 'MVar' is left empty.
367 -- 
368 -- If several threads are competing to take the same 'MVar', one is chosen
369 -- to continue at random when the 'MVar' becomes full.
370 takeMVar :: MVar a -> IO a
371 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
372
373 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
374 -- 'putMVar' will wait until it becomes empty.
375 --
376 -- If several threads are competing to fill the same 'MVar', one is
377 -- chosen to continue at random when the 'MVar' becomes empty.
378 putMVar  :: MVar a -> a -> IO ()
379 putMVar (MVar mvar#) x = IO $ \ s# ->
380     case putMVar# mvar# x s# of
381         s2# -> (# s2#, () #)
382
383 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
384 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
385 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
386 -- the 'MVar' is left empty.
387 tryTakeMVar :: MVar a -> IO (Maybe a)
388 tryTakeMVar (MVar m) = IO $ \ s ->
389     case tryTakeMVar# m s of
390         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
391         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
392
393 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
394 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
395 -- it was successful, or 'False' otherwise.
396 tryPutMVar  :: MVar a -> a -> IO Bool
397 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
398     case tryPutMVar# mvar# x s# of
399         (# s, 0# #) -> (# s, False #)
400         (# s, _  #) -> (# s, True #)
401
402 -- |Check whether a given 'MVar' is empty.
403 --
404 -- Notice that the boolean value returned  is just a snapshot of
405 -- the state of the MVar. By the time you get to react on its result,
406 -- the MVar may have been filled (or emptied) - so be extremely
407 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
408 isEmptyMVar :: MVar a -> IO Bool
409 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
410     case isEmptyMVar# mv# s# of
411         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
412
413 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
414 -- "System.Mem.Weak" for more about finalizers.
415 addMVarFinalizer :: MVar a -> IO () -> IO ()
416 addMVarFinalizer (MVar m) finalizer = 
417   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
418 \end{code}
419
420
421 %************************************************************************
422 %*                                                                      *
423 \subsection{Thread waiting}
424 %*                                                                      *
425 %************************************************************************
426
427 \begin{code}
428 #ifdef mingw32_HOST_OS
429
430 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
431 -- on Win32, but left in there because lib code (still) uses them (the manner
432 -- in which they're used doesn't cause problems on a Win32 platform though.)
433
434 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
435 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
436   IO $ \s -> case asyncRead# fd isSock len buf s of 
437                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
438
439 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
440 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
441   IO $ \s -> case asyncWrite# fd isSock len buf s of 
442                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
443
444 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
445 asyncDoProc (FunPtr proc) (Ptr param) = 
446     -- the 'length' value is ignored; simplifies implementation of
447     -- the async*# primops to have them all return the same result.
448   IO $ \s -> case asyncDoProc# proc param s  of 
449                (# s, len#, err# #) -> (# s, I# err# #)
450
451 -- to aid the use of these primops by the IO Handle implementation,
452 -- provide the following convenience funs:
453
454 -- this better be a pinned byte array!
455 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
456 asyncReadBA fd isSock len off bufB = 
457   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
458   
459 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
460 asyncWriteBA fd isSock len off bufB = 
461   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
462
463 #endif
464
465 -- -----------------------------------------------------------------------------
466 -- Thread IO API
467
468 -- | Block the current thread until data is available to read on the
469 -- given file descriptor (GHC only).
470 threadWaitRead :: Fd -> IO ()
471 threadWaitRead fd
472 #ifndef mingw32_HOST_OS
473   | threaded  = waitForReadEvent fd
474 #endif
475   | otherwise = IO $ \s -> 
476         case fromIntegral fd of { I# fd# ->
477         case waitRead# fd# s of { s -> (# s, () #)
478         }}
479
480 -- | Block the current thread until data can be written to the
481 -- given file descriptor (GHC only).
482 threadWaitWrite :: Fd -> IO ()
483 threadWaitWrite fd
484 #ifndef mingw32_HOST_OS
485   | threaded  = waitForWriteEvent fd
486 #endif
487   | otherwise = IO $ \s -> 
488         case fromIntegral fd of { I# fd# ->
489         case waitWrite# fd# s of { s -> (# s, () #)
490         }}
491
492 -- | Suspends the current thread for a given number of microseconds
493 -- (GHC only).
494 --
495 -- Note that the resolution used by the Haskell runtime system's
496 -- internal timer is 1\/50 second, and 'threadDelay' will round its
497 -- argument up to the nearest multiple of this resolution.
498 --
499 -- There is no guarantee that the thread will be rescheduled promptly
500 -- when the delay has expired, but the thread will never continue to
501 -- run /earlier/ than specified.
502 --
503 threadDelay :: Int -> IO ()
504 threadDelay time
505 #ifndef mingw32_HOST_OS
506   | threaded  = waitForDelayEvent time
507 #else
508   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
509 #endif
510   | otherwise = IO $ \s -> 
511         case fromIntegral time of { I# time# ->
512         case delay# time# s of { s -> (# s, () #)
513         }}
514
515 registerDelay usecs 
516 #ifndef mingw32_HOST_OS
517   | threaded = waitForDelayEventSTM usecs
518   | otherwise = error "registerDelay: requires -threaded"
519 #else
520   = error "registerDelay: not currently supported on Windows"
521 #endif
522
523 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
524 #ifdef mingw32_HOST_OS
525 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
526 #endif
527
528 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
529
530 -- ----------------------------------------------------------------------------
531 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
532
533 -- In the threaded RTS, we employ a single IO Manager thread to wait
534 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
535 -- and delays (threadDelay).  
536 --
537 -- We can do this because in the threaded RTS the IO Manager can make
538 -- a non-blocking call to select(), so we don't have to do select() in
539 -- the scheduler as we have to in the non-threaded RTS.  We get performance
540 -- benefits from doing it this way, because we only have to restart the select()
541 -- when a new request arrives, rather than doing one select() each time
542 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
543 -- by not having to check for completed IO requests.
544
545 -- Issues, possible problems:
546 --
547 --      - we might want bound threads to just do the blocking
548 --        operation rather than communicating with the IO manager
549 --        thread.  This would prevent simgle-threaded programs which do
550 --        IO from requiring multiple OS threads.  However, it would also
551 --        prevent bound threads waiting on IO from being killed or sent
552 --        exceptions.
553 --
554 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
555 --        I couldn't repeat this.
556 --
557 --      - How do we handle signal delivery in the multithreaded RTS?
558 --
559 --      - forkProcess will kill the IO manager thread.  Let's just
560 --        hope we don't need to do any blocking IO between fork & exec.
561
562 #ifndef mingw32_HOST_OS
563
564 data IOReq
565   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
566   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
567
568 data DelayReq
569   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
570   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
571
572 pendingEvents :: IORef [IOReq]
573 pendingDelays :: IORef [DelayReq]
574         -- could use a strict list or array here
575 {-# NOINLINE pendingEvents #-}
576 {-# NOINLINE pendingDelays #-}
577 (pendingEvents,pendingDelays) = unsafePerformIO $ do
578   startIOManagerThread
579   reqs <- newIORef []
580   dels <- newIORef []
581   return (reqs, dels)
582         -- the first time we schedule an IO request, the service thread
583         -- will be created (cool, huh?)
584
585 ensureIOManagerIsRunning :: IO ()
586 ensureIOManagerIsRunning 
587   | threaded  = seq pendingEvents $ return ()
588   | otherwise = return ()
589
590 startIOManagerThread :: IO ()
591 startIOManagerThread = do
592         allocaArray 2 $ \fds -> do
593         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
594         rd_end <- peekElemOff fds 0
595         wr_end <- peekElemOff fds 1
596         writeIORef stick (fromIntegral wr_end)
597         c_setIOManagerPipe wr_end
598         forkIO $ do
599             allocaBytes sizeofFdSet   $ \readfds -> do
600             allocaBytes sizeofFdSet   $ \writefds -> do 
601             allocaBytes sizeofTimeVal $ \timeval -> do
602             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
603         return ()
604
605 service_loop
606    :: Fd                -- listen to this for wakeup calls
607    -> Ptr CFdSet
608    -> Ptr CFdSet
609    -> Ptr CTimeVal
610    -> [IOReq]
611    -> [DelayReq]
612    -> IO ()
613 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
614
615   -- pick up new IO requests
616   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
617   let reqs = new_reqs ++ old_reqs
618
619   -- pick up new delay requests
620   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
621   let  delays = foldr insertDelay old_delays new_delays
622
623   -- build the FDSets for select()
624   fdZero readfds
625   fdZero writefds
626   fdSet wakeup readfds
627   maxfd <- buildFdSets 0 readfds writefds reqs
628
629   -- perform the select()
630   let do_select delays = do
631           -- check the current time and wake up any thread in
632           -- threadDelay whose timeout has expired.  Also find the
633           -- timeout value for the select() call.
634           now <- getTicksOfDay
635           (delays', timeout) <- getDelay now ptimeval delays
636
637           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
638                         nullPtr timeout
639           if (res == -1)
640              then do
641                 err <- getErrno
642                 if err == eINTR
643                         then do_select delays'
644                         else return (res,delays')
645              else
646                 return (res,delays')
647
648   (res,delays') <- do_select delays
649   -- ToDo: check result
650
651   b <- fdIsSet wakeup readfds
652   if b == 0 
653     then return ()
654     else alloca $ \p -> do 
655             c_read (fromIntegral wakeup) p 1; return ()
656             s <- peek p         
657             if (s == 0xff) 
658               then return ()
659               else do handler_tbl <- peek handlers
660                       sp <- peekElemOff handler_tbl (fromIntegral s)
661                       forkIO (do io <- deRefStablePtr sp; io)
662                       return ()
663
664   takeMVar prodding
665   putMVar prodding False
666
667   reqs' <- completeRequests reqs readfds writefds []
668   service_loop wakeup readfds writefds ptimeval reqs' delays'
669
670 stick :: IORef Fd
671 {-# NOINLINE stick #-}
672 stick = unsafePerformIO (newIORef 0)
673
674 prodding :: MVar Bool
675 {-# NOINLINE prodding #-}
676 prodding = unsafePerformIO (newMVar False)
677
678 prodServiceThread :: IO ()
679 prodServiceThread = do
680   b <- takeMVar prodding
681   if (not b) 
682     then do fd <- readIORef stick
683             with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
684     else return ()
685   putMVar prodding True
686
687 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
688
689 foreign import ccall "setIOManagerPipe"
690   c_setIOManagerPipe :: CInt -> IO ()
691
692 -- -----------------------------------------------------------------------------
693 -- IO requests
694
695 buildFdSets maxfd readfds writefds [] = return maxfd
696 buildFdSets maxfd readfds writefds (Read fd m : reqs)
697   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
698   | otherwise        =  do
699         fdSet fd readfds
700         buildFdSets (max maxfd fd) readfds writefds reqs
701 buildFdSets maxfd readfds writefds (Write fd m : reqs)
702   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
703   | otherwise        =  do
704         fdSet fd writefds
705         buildFdSets (max maxfd fd) readfds writefds reqs
706
707 completeRequests [] _ _ reqs' = return reqs'
708 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
709   b <- fdIsSet fd readfds
710   if b /= 0
711     then do putMVar m (); completeRequests reqs readfds writefds reqs'
712     else completeRequests reqs readfds writefds (Read fd m : reqs')
713 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
714   b <- fdIsSet fd writefds
715   if b /= 0
716     then do putMVar m (); completeRequests reqs readfds writefds reqs'
717     else completeRequests reqs readfds writefds (Write fd m : reqs')
718
719 waitForReadEvent :: Fd -> IO ()
720 waitForReadEvent fd = do
721   m <- newEmptyMVar
722   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
723   prodServiceThread
724   takeMVar m
725
726 waitForWriteEvent :: Fd -> IO ()
727 waitForWriteEvent fd = do
728   m <- newEmptyMVar
729   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
730   prodServiceThread
731   takeMVar m
732
733 -- XXX: move into GHC.IOBase from Data.IORef?
734 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
735 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
736
737 -- -----------------------------------------------------------------------------
738 -- Delays
739
740 waitForDelayEvent :: Int -> IO ()
741 waitForDelayEvent usecs = do
742   m <- newEmptyMVar
743   now <- getTicksOfDay
744   let target = now + usecs `quot` tick_usecs
745   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
746   prodServiceThread
747   takeMVar m
748
749 -- Delays for use in STM
750 waitForDelayEventSTM :: Int -> IO (TVar Bool)
751 waitForDelayEventSTM usecs = do
752    t <- atomically $ newTVar False
753    now <- getTicksOfDay
754    let target = now + usecs `quot` tick_usecs
755    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
756    prodServiceThread
757    return t  
758     
759 -- Walk the queue of pending delays, waking up any that have passed
760 -- and return the smallest delay to wait for.  The queue of pending
761 -- delays is kept ordered.
762 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
763 getDelay now ptimeval [] = return ([],nullPtr)
764 getDelay now ptimeval all@(d : rest) 
765   = case d of
766      Delay time m | now >= time -> do
767         putMVar m ()
768         getDelay now ptimeval rest
769      DelaySTM time t | now >= time -> do
770         atomically $ writeTVar t True
771         getDelay now ptimeval rest
772      _otherwise -> do
773         setTimevalTicks ptimeval (delayTime d - now)
774         return (all,ptimeval)
775
776 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
777 insertDelay d [] = [d]
778 insertDelay d1 ds@(d2 : rest)
779   | delayTime d1 <= delayTime d2 = d1 : ds
780   | otherwise                    = d2 : insertDelay d1 rest
781
782 delayTime (Delay t _) = t
783 delayTime (DelaySTM t _) = t
784
785 type Ticks = Int
786 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
787 tick_usecs = 1000000 `quot` tick_freq :: Int
788
789 newtype CTimeVal = CTimeVal ()
790
791 foreign import ccall unsafe "sizeofTimeVal"
792   sizeofTimeVal :: Int
793
794 foreign import ccall unsafe "getTicksOfDay" 
795   getTicksOfDay :: IO Ticks
796
797 foreign import ccall unsafe "setTimevalTicks" 
798   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
799
800 -- ----------------------------------------------------------------------------
801 -- select() interface
802
803 -- ToDo: move to System.Posix.Internals?
804
805 newtype CFdSet = CFdSet ()
806
807 foreign import ccall safe "select"
808   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
809            -> IO CInt
810
811 foreign import ccall unsafe "hsFD_SETSIZE"
812   fD_SETSIZE :: Fd
813
814 foreign import ccall unsafe "hsFD_CLR"
815   fdClr :: Fd -> Ptr CFdSet -> IO ()
816
817 foreign import ccall unsafe "hsFD_ISSET"
818   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
819
820 foreign import ccall unsafe "hsFD_SET"
821   fdSet :: Fd -> Ptr CFdSet -> IO ()
822
823 foreign import ccall unsafe "hsFD_ZERO"
824   fdZero :: Ptr CFdSet -> IO ()
825
826 foreign import ccall unsafe "sizeof_fd_set"
827   sizeofFdSet :: Int
828
829 #endif
830 \end{code}