[project @ 2005-10-25 11:13:53 by simonmar]
[haskell-directory.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 -- #not-home
23 module GHC.Conc
24         ( ThreadId(..)
25
26         -- Forking and suchlike
27         , myThreadId    -- :: IO ThreadId
28         , killThread    -- :: ThreadId -> IO ()
29         , throwTo       -- :: ThreadId -> Exception -> IO ()
30         , par           -- :: a -> b -> b
31         , pseq          -- :: a -> b -> b
32         , yield         -- :: IO ()
33         , labelThread   -- :: ThreadId -> String -> IO ()
34
35         -- Waiting
36         , threadDelay           -- :: Int -> IO ()
37         , threadWaitRead        -- :: Int -> IO ()
38         , threadWaitWrite       -- :: Int -> IO ()
39
40         -- MVars
41         , MVar          -- abstract
42         , newMVar       -- :: a -> IO (MVar a)
43         , newEmptyMVar  -- :: IO (MVar a)
44         , takeMVar      -- :: MVar a -> IO a
45         , putMVar       -- :: MVar a -> a -> IO ()
46         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
47         , tryPutMVar    -- :: MVar a -> a -> IO Bool
48         , isEmptyMVar   -- :: MVar a -> IO Bool
49         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
50
51         -- TVars
52         , STM           -- abstract
53         , atomically    -- :: STM a -> IO a
54         , retry         -- :: STM a
55         , orElse        -- :: STM a -> STM a -> STM a
56         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
57         , TVar          -- abstract
58         , newTVar       -- :: a -> STM (TVar a)
59         , readTVar      -- :: TVar a -> STM a
60         , writeTVar     -- :: a -> TVar a -> STM ()
61         , unsafeIOToSTM -- :: IO a -> STM a
62
63 #ifdef mingw32_HOST_OS
64         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
65         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
66         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
67
68         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
69         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
70 #endif
71
72 #ifndef mingw32_HOST_OS
73         , ensureIOManagerIsRunning
74 #endif
75         ) where
76
77 import System.Posix.Types
78 import System.Posix.Internals
79 import Foreign
80 import Foreign.C
81
82 import Data.Maybe
83
84 import GHC.Base
85 import GHC.IOBase
86 import GHC.Num          ( Num(..) )
87 import GHC.Real         ( fromIntegral, quot )
88 import GHC.Base         ( Int(..) )
89 import GHC.Exception    ( Exception(..), AsyncException(..) )
90 import GHC.Pack         ( packCString# )
91 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
92 import GHC.STRef
93 import Data.Typeable
94
95 infixr 0 `par`, `pseq`
96 \end{code}
97
98 %************************************************************************
99 %*                                                                      *
100 \subsection{@ThreadId@, @par@, and @fork@}
101 %*                                                                      *
102 %************************************************************************
103
104 \begin{code}
105 data ThreadId = ThreadId ThreadId# deriving( Typeable )
106 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
107 -- But since ThreadId# is unlifted, the Weak type must use open
108 -- type variables.
109 {- ^
110 A 'ThreadId' is an abstract type representing a handle to a thread.
111 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
112 the 'Ord' instance implements an arbitrary total ordering over
113 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
114 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
115 useful when debugging or diagnosing the behaviour of a concurrent
116 program.
117
118 /Note/: in GHC, if you have a 'ThreadId', you essentially have
119 a pointer to the thread itself.  This means the thread itself can\'t be
120 garbage collected until you drop the 'ThreadId'.
121 This misfeature will hopefully be corrected at a later date.
122
123 /Note/: Hugs does not provide any operations on other threads;
124 it defines 'ThreadId' as a synonym for ().
125 -}
126
127 --forkIO has now been hoisted out into the Concurrent library.
128
129 {- | 'killThread' terminates the given thread (GHC only).
130 Any work already done by the thread isn\'t
131 lost: the computation is suspended until required by another thread.
132 The memory used by the thread will be garbage collected if it isn\'t
133 referenced from anywhere.  The 'killThread' function is defined in
134 terms of 'throwTo':
135
136 > killThread tid = throwTo tid (AsyncException ThreadKilled)
137
138 -}
139 killThread :: ThreadId -> IO ()
140 killThread tid = throwTo tid (AsyncException ThreadKilled)
141
142 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
143
144 'throwTo' does not return until the exception has been raised in the
145 target thread.  The calling thread can thus be certain that the target
146 thread has received the exception.  This is a useful property to know
147 when dealing with race conditions: eg. if there are two threads that
148 can kill each other, it is guaranteed that only one of the threads
149 will get to kill the other.
150
151 If the target thread is currently making a foreign call, then the
152 exception will not be raised (and hence 'throwTo' will not return)
153 until the call has completed.  This is the case regardless of whether
154 the call is inside a 'block' or not.
155  -}
156 throwTo :: ThreadId -> Exception -> IO ()
157 throwTo (ThreadId id) ex = IO $ \ s ->
158    case (killThread# id ex s) of s1 -> (# s1, () #)
159
160 -- | Returns the 'ThreadId' of the calling thread (GHC only).
161 myThreadId :: IO ThreadId
162 myThreadId = IO $ \s ->
163    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
164
165
166 -- |The 'yield' action allows (forces, in a co-operative multitasking
167 -- implementation) a context-switch to any other currently runnable
168 -- threads (if any), and is occasionally useful when implementing
169 -- concurrency abstractions.
170 yield :: IO ()
171 yield = IO $ \s -> 
172    case (yield# s) of s1 -> (# s1, () #)
173
174 {- | 'labelThread' stores a string as identifier for this thread if
175 you built a RTS with debugging support. This identifier will be used in
176 the debugging output to make distinction of different threads easier
177 (otherwise you only have the thread state object\'s address in the heap).
178
179 Other applications like the graphical Concurrent Haskell Debugger
180 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
181 'labelThread' for their purposes as well.
182 -}
183
184 labelThread :: ThreadId -> String -> IO ()
185 labelThread (ThreadId t) str = IO $ \ s ->
186    let ps  = packCString# str
187        adr = byteArrayContents# ps in
188      case (labelThread# t adr s) of s1 -> (# s1, () #)
189
190 --      Nota Bene: 'pseq' used to be 'seq'
191 --                 but 'seq' is now defined in PrelGHC
192 --
193 -- "pseq" is defined a bit weirdly (see below)
194 --
195 -- The reason for the strange "lazy" call is that
196 -- it fools the compiler into thinking that pseq  and par are non-strict in
197 -- their second argument (even if it inlines pseq at the call site).
198 -- If it thinks pseq is strict in "y", then it often evaluates
199 -- "y" before "x", which is totally wrong.  
200
201 {-# INLINE pseq  #-}
202 pseq :: a -> b -> b
203 pseq  x y = x `seq` lazy y
204
205 {-# INLINE par  #-}
206 par :: a -> b -> b
207 par  x y = case (par# x) of { _ -> lazy y }
208 \end{code}
209
210
211 %************************************************************************
212 %*                                                                      *
213 \subsection[stm]{Transactional heap operations}
214 %*                                                                      *
215 %************************************************************************
216
217 TVars are shared memory locations which support atomic memory
218 transactions.
219
220 \begin{code}
221 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
222
223 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
224 unSTM (STM a) = a
225
226 instance  Functor STM where
227    fmap f x = x >>= (return . f)
228
229 instance  Monad STM  where
230     {-# INLINE return #-}
231     {-# INLINE (>>)   #-}
232     {-# INLINE (>>=)  #-}
233     m >> k      = thenSTM m k
234     return x    = returnSTM x
235     m >>= k     = bindSTM m k
236
237 bindSTM :: STM a -> (a -> STM b) -> STM b
238 bindSTM (STM m) k = STM ( \s ->
239   case m s of 
240     (# new_s, a #) -> unSTM (k a) new_s
241   )
242
243 thenSTM :: STM a -> STM b -> STM b
244 thenSTM (STM m) k = STM ( \s ->
245   case m s of 
246     (# new_s, a #) -> unSTM k new_s
247   )
248
249 returnSTM :: a -> STM a
250 returnSTM x = STM (\s -> (# s, x #))
251
252 -- | Unsafely performs IO in the STM monad.
253 unsafeIOToSTM :: IO a -> STM a
254 unsafeIOToSTM (IO m) = STM m
255
256 -- |Perform a series of STM actions atomically.
257 atomically :: STM a -> IO a
258 atomically (STM m) = IO (\s -> (atomically# m) s )
259
260 -- |Retry execution of the current memory transaction because it has seen
261 -- values in TVars which mean that it should not continue (e.g. the TVars
262 -- represent a shared buffer that is now empty).  The implementation may
263 -- block the thread until one of the TVars that it has read from has been
264 -- udpated.
265 retry :: STM a
266 retry = STM $ \s# -> retry# s#
267
268 -- |Compose two alternative STM actions.  If the first action completes without
269 -- retrying then it forms the result of the orElse.  Otherwise, if the first
270 -- action retries, then the second action is tried in its place.  If both actions
271 -- retry then the orElse as a whole retries.
272 orElse :: STM a -> STM a -> STM a
273 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
274
275 -- |Exception handling within STM actions.
276 catchSTM :: STM a -> (Exception -> STM a) -> STM a
277 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
278
279 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
280
281 instance Eq (TVar a) where
282         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
283
284 -- |Create a new TVar holding a value supplied
285 newTVar :: a -> STM (TVar a)
286 newTVar val = STM $ \s1# ->
287     case newTVar# val s1# of
288          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
289
290 -- |Return the current value stored in a TVar
291 readTVar :: TVar a -> STM a
292 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
293
294 -- |Write the supplied value into a TVar
295 writeTVar :: TVar a -> a -> STM ()
296 writeTVar (TVar tvar#) val = STM $ \s1# ->
297     case writeTVar# tvar# val s1# of
298          s2# -> (# s2#, () #)
299   
300 \end{code}
301
302 %************************************************************************
303 %*                                                                      *
304 \subsection[mvars]{M-Structures}
305 %*                                                                      *
306 %************************************************************************
307
308 M-Vars are rendezvous points for concurrent threads.  They begin
309 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
310 is written, a single blocked thread may be freed.  Reading an M-Var
311 toggles its state from full back to empty.  Therefore, any value
312 written to an M-Var may only be read once.  Multiple reads and writes
313 are allowed, but there must be at least one read between any two
314 writes.
315
316 \begin{code}
317 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
318
319 -- |Create an 'MVar' which is initially empty.
320 newEmptyMVar  :: IO (MVar a)
321 newEmptyMVar = IO $ \ s# ->
322     case newMVar# s# of
323          (# s2#, svar# #) -> (# s2#, MVar svar# #)
324
325 -- |Create an 'MVar' which contains the supplied value.
326 newMVar :: a -> IO (MVar a)
327 newMVar value =
328     newEmptyMVar        >>= \ mvar ->
329     putMVar mvar value  >>
330     return mvar
331
332 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
333 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
334 -- the 'MVar' is left empty.
335 -- 
336 -- If several threads are competing to take the same 'MVar', one is chosen
337 -- to continue at random when the 'MVar' becomes full.
338 takeMVar :: MVar a -> IO a
339 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
340
341 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
342 -- 'putMVar' will wait until it becomes empty.
343 --
344 -- If several threads are competing to fill the same 'MVar', one is
345 -- chosen to continue at random when the 'MVar' becomes empty.
346 putMVar  :: MVar a -> a -> IO ()
347 putMVar (MVar mvar#) x = IO $ \ s# ->
348     case putMVar# mvar# x s# of
349         s2# -> (# s2#, () #)
350
351 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
352 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
353 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
354 -- the 'MVar' is left empty.
355 tryTakeMVar :: MVar a -> IO (Maybe a)
356 tryTakeMVar (MVar m) = IO $ \ s ->
357     case tryTakeMVar# m s of
358         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
359         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
360
361 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
362 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
363 -- it was successful, or 'False' otherwise.
364 tryPutMVar  :: MVar a -> a -> IO Bool
365 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
366     case tryPutMVar# mvar# x s# of
367         (# s, 0# #) -> (# s, False #)
368         (# s, _  #) -> (# s, True #)
369
370 -- |Check whether a given 'MVar' is empty.
371 --
372 -- Notice that the boolean value returned  is just a snapshot of
373 -- the state of the MVar. By the time you get to react on its result,
374 -- the MVar may have been filled (or emptied) - so be extremely
375 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
376 isEmptyMVar :: MVar a -> IO Bool
377 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
378     case isEmptyMVar# mv# s# of
379         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
380
381 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
382 -- "System.Mem.Weak" for more about finalizers.
383 addMVarFinalizer :: MVar a -> IO () -> IO ()
384 addMVarFinalizer (MVar m) finalizer = 
385   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
386 \end{code}
387
388
389 %************************************************************************
390 %*                                                                      *
391 \subsection{Thread waiting}
392 %*                                                                      *
393 %************************************************************************
394
395 \begin{code}
396 #ifdef mingw32_HOST_OS
397
398 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
399 -- on Win32, but left in there because lib code (still) uses them (the manner
400 -- in which they're used doesn't cause problems on a Win32 platform though.)
401
402 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
403 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
404   IO $ \s -> case asyncRead# fd isSock len buf s of 
405                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
406
407 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
408 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
409   IO $ \s -> case asyncWrite# fd isSock len buf s of 
410                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
411
412 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
413 asyncDoProc (FunPtr proc) (Ptr param) = 
414     -- the 'length' value is ignored; simplifies implementation of
415     -- the async*# primops to have them all return the same result.
416   IO $ \s -> case asyncDoProc# proc param s  of 
417                (# s, len#, err# #) -> (# s, I# err# #)
418
419 -- to aid the use of these primops by the IO Handle implementation,
420 -- provide the following convenience funs:
421
422 -- this better be a pinned byte array!
423 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
424 asyncReadBA fd isSock len off bufB = 
425   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
426   
427 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
428 asyncWriteBA fd isSock len off bufB = 
429   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
430
431 #endif
432
433 -- -----------------------------------------------------------------------------
434 -- Thread IO API
435
436 -- | Block the current thread until data is available to read on the
437 -- given file descriptor (GHC only).
438 threadWaitRead :: Fd -> IO ()
439 threadWaitRead fd
440 #ifndef mingw32_HOST_OS
441   | threaded  = waitForReadEvent fd
442 #endif
443   | otherwise = IO $ \s -> 
444         case fromIntegral fd of { I# fd# ->
445         case waitRead# fd# s of { s -> (# s, () #)
446         }}
447
448 -- | Block the current thread until data can be written to the
449 -- given file descriptor (GHC only).
450 threadWaitWrite :: Fd -> IO ()
451 threadWaitWrite fd
452 #ifndef mingw32_HOST_OS
453   | threaded  = waitForWriteEvent fd
454 #endif
455   | otherwise = IO $ \s -> 
456         case fromIntegral fd of { I# fd# ->
457         case waitWrite# fd# s of { s -> (# s, () #)
458         }}
459
460 -- | Suspends the current thread for a given number of microseconds
461 -- (GHC only).
462 --
463 -- Note that the resolution used by the Haskell runtime system's
464 -- internal timer is 1\/50 second, and 'threadDelay' will round its
465 -- argument up to the nearest multiple of this resolution.
466 --
467 -- There is no guarantee that the thread will be rescheduled promptly
468 -- when the delay has expired, but the thread will never continue to
469 -- run /earlier/ than specified.
470 --
471 threadDelay :: Int -> IO ()
472 threadDelay time
473 #ifndef mingw32_HOST_OS
474   | threaded  = waitForDelayEvent time
475 #else
476   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
477 #endif
478   | otherwise = IO $ \s -> 
479         case fromIntegral time of { I# time# ->
480         case delay# time# s of { s -> (# s, () #)
481         }}
482
483 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
484 #ifdef mingw32_HOST_OS
485 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
486 #endif
487
488 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
489
490 -- ----------------------------------------------------------------------------
491 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
492
493 -- In the threaded RTS, we employ a single IO Manager thread to wait
494 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
495 -- and delays (threadDelay).  
496 --
497 -- We can do this because in the threaded RTS the IO Manager can make
498 -- a non-blocking call to select(), so we don't have to do select() in
499 -- the scheduler as we have to in the non-threaded RTS.  We get performance
500 -- benefits from doing it this way, because we only have to restart the select()
501 -- when a new request arrives, rather than doing one select() each time
502 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
503 -- by not having to check for completed IO requests.
504
505 -- Issues, possible problems:
506 --
507 --      - we might want bound threads to just do the blocking
508 --        operation rather than communicating with the IO manager
509 --        thread.  This would prevent simgle-threaded programs which do
510 --        IO from requiring multiple OS threads.  However, it would also
511 --        prevent bound threads waiting on IO from being killed or sent
512 --        exceptions.
513 --
514 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
515 --        I couldn't repeat this.
516 --
517 --      - How do we handle signal delivery in the multithreaded RTS?
518 --
519 --      - forkProcess will kill the IO manager thread.  Let's just
520 --        hope we don't need to do any blocking IO between fork & exec.
521
522 #ifndef mingw32_HOST_OS
523
524 data IOReq
525   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
526   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
527
528 data DelayReq
529   = Delay  {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
530
531 pendingEvents :: IORef [IOReq]
532 pendingDelays :: IORef [DelayReq]
533         -- could use a strict list or array here
534 {-# NOINLINE pendingEvents #-}
535 {-# NOINLINE pendingDelays #-}
536 (pendingEvents,pendingDelays) = unsafePerformIO $ do
537   startIOManagerThread
538   reqs <- newIORef []
539   dels <- newIORef []
540   return (reqs, dels)
541         -- the first time we schedule an IO request, the service thread
542         -- will be created (cool, huh?)
543
544 ensureIOManagerIsRunning :: IO ()
545 ensureIOManagerIsRunning 
546   | threaded  = seq pendingEvents $ return ()
547   | otherwise = return ()
548
549 startIOManagerThread :: IO ()
550 startIOManagerThread = do
551         allocaArray 2 $ \fds -> do
552         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
553         rd_end <- peekElemOff fds 0
554         wr_end <- peekElemOff fds 1
555         writeIORef stick (fromIntegral wr_end)
556         c_setIOManagerPipe wr_end
557         quickForkIO $ do
558             allocaBytes sizeofFdSet   $ \readfds -> do
559             allocaBytes sizeofFdSet   $ \writefds -> do 
560             allocaBytes sizeofTimeVal $ \timeval -> do
561             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
562         return ()
563
564 -- XXX: move real forkIO here from Control.Concurrent?
565 quickForkIO action = IO $ \s ->
566    case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
567
568 service_loop
569    :: Fd                -- listen to this for wakeup calls
570    -> Ptr CFdSet
571    -> Ptr CFdSet
572    -> Ptr CTimeVal
573    -> [IOReq]
574    -> [DelayReq]
575    -> IO ()
576 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
577
578   -- pick up new IO requests
579   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
580   let reqs = new_reqs ++ old_reqs
581
582   -- pick up new delay requests
583   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
584   let  delays = foldr insertDelay old_delays new_delays
585
586   -- build the FDSets for select()
587   fdZero readfds
588   fdZero writefds
589   fdSet wakeup readfds
590   maxfd <- buildFdSets 0 readfds writefds reqs
591
592   -- perform the select()
593   let do_select delays = do
594           -- check the current time and wake up any thread in
595           -- threadDelay whose timeout has expired.  Also find the
596           -- timeout value for the select() call.
597           now <- getTicksOfDay
598           (delays', timeout) <- getDelay now ptimeval delays
599
600           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
601                         nullPtr timeout
602           if (res == -1)
603              then do
604                 err <- getErrno
605                 if err == eINTR
606                         then do_select delays'
607                         else return (res,delays')
608              else
609                 return (res,delays')
610
611   (res,delays') <- do_select delays
612   -- ToDo: check result
613
614   b <- fdIsSet wakeup readfds
615   if b == 0 
616     then return ()
617     else alloca $ \p -> do 
618             c_read (fromIntegral wakeup) p 1; return ()
619             s <- peek p         
620             if (s == 0xff) 
621               then return ()
622               else do sp <- peekElemOff handlers (fromIntegral s)
623                       quickForkIO (deRefStablePtr sp)
624                       return ()
625
626   takeMVar prodding
627   putMVar prodding False
628
629   reqs' <- completeRequests reqs readfds writefds []
630   service_loop wakeup readfds writefds ptimeval reqs' delays'
631
632 stick :: IORef Fd
633 {-# NOINLINE stick #-}
634 stick = unsafePerformIO (newIORef 0)
635
636 prodding :: MVar Bool
637 {-# NOINLINE prodding #-}
638 prodding = unsafePerformIO (newMVar False)
639
640 prodServiceThread :: IO ()
641 prodServiceThread = do
642   b <- takeMVar prodding
643   if (not b) 
644     then do fd <- readIORef stick
645             with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
646     else return ()
647   putMVar prodding True
648
649 foreign import ccall "&signal_handlers" handlers :: Ptr (StablePtr (IO ()))
650
651 foreign import ccall "setIOManagerPipe"
652   c_setIOManagerPipe :: CInt -> IO ()
653
654 -- -----------------------------------------------------------------------------
655 -- IO requests
656
657 buildFdSets maxfd readfds writefds [] = return maxfd
658 buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
659   fdSet fd readfds
660   buildFdSets (max maxfd fd) readfds writefds reqs
661 buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
662   fdSet fd writefds
663   buildFdSets (max maxfd fd) readfds writefds reqs
664
665 completeRequests [] _ _ reqs' = return reqs'
666 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
667   b <- fdIsSet fd readfds
668   if b /= 0
669     then do putMVar m (); completeRequests reqs readfds writefds reqs'
670     else completeRequests reqs readfds writefds (Read fd m : reqs')
671 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
672   b <- fdIsSet fd writefds
673   if b /= 0
674     then do putMVar m (); completeRequests reqs readfds writefds reqs'
675     else completeRequests reqs readfds writefds (Write fd m : reqs')
676
677 waitForReadEvent :: Fd -> IO ()
678 waitForReadEvent fd = do
679   m <- newEmptyMVar
680   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
681   prodServiceThread
682   takeMVar m
683
684 waitForWriteEvent :: Fd -> IO ()
685 waitForWriteEvent fd = do
686   m <- newEmptyMVar
687   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
688   prodServiceThread
689   takeMVar m
690
691 -- XXX: move into GHC.IOBase from Data.IORef?
692 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
693 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
694
695 -- -----------------------------------------------------------------------------
696 -- Delays
697
698 waitForDelayEvent :: Int -> IO ()
699 waitForDelayEvent usecs = do
700   m <- newEmptyMVar
701   now <- getTicksOfDay
702   let target = now + usecs `quot` tick_usecs
703   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
704   prodServiceThread
705   takeMVar m
706
707 -- Walk the queue of pending delays, waking up any that have passed
708 -- and return the smallest delay to wait for.  The queue of pending
709 -- delays is kept ordered.
710 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
711 getDelay now ptimeval [] = return ([],nullPtr)
712 getDelay now ptimeval all@(Delay time m : rest)
713   | now >= time = do
714         putMVar m ()
715         getDelay now ptimeval rest
716   | otherwise = do
717         setTimevalTicks ptimeval (time - now)
718         return (all,ptimeval)
719
720 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
721 insertDelay d@(Delay time m) [] = [d]
722 insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
723   | time <= time' = d1 : ds
724   | otherwise     = d2 : insertDelay d1 rest
725
726 type Ticks = Int
727 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
728 tick_usecs = 1000000 `quot` tick_freq :: Int
729
730 newtype CTimeVal = CTimeVal ()
731
732 foreign import ccall unsafe "sizeofTimeVal"
733   sizeofTimeVal :: Int
734
735 foreign import ccall unsafe "getTicksOfDay" 
736   getTicksOfDay :: IO Ticks
737
738 foreign import ccall unsafe "setTimevalTicks" 
739   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
740
741 -- ----------------------------------------------------------------------------
742 -- select() interface
743
744 -- ToDo: move to System.Posix.Internals?
745
746 newtype CFdSet = CFdSet ()
747
748 foreign import ccall safe "select"
749   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
750            -> IO CInt
751
752 foreign import ccall unsafe "hsFD_CLR"
753   fdClr :: Fd -> Ptr CFdSet -> IO ()
754
755 foreign import ccall unsafe "hsFD_ISSET"
756   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
757
758 foreign import ccall unsafe "hsFD_SET"
759   fdSet :: Fd -> Ptr CFdSet -> IO ()
760
761 foreign import ccall unsafe "hsFD_ZERO"
762   fdZero :: Ptr CFdSet -> IO ()
763
764 foreign import ccall unsafe "sizeof_fd_set"
765   sizeofFdSet :: Int
766
767 #endif
768 \end{code}