[project @ 2005-10-25 09:01:48 by simonmar]
[ghc-base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -fno-implicit-prelude #-}
3 -----------------------------------------------------------------------------
4 -- |
5 -- Module      :  GHC.Conc
6 -- Copyright   :  (c) The University of Glasgow, 1994-2002
7 -- License     :  see libraries/base/LICENSE
8 -- 
9 -- Maintainer  :  cvs-ghc@haskell.org
10 -- Stability   :  internal
11 -- Portability :  non-portable (GHC extensions)
12 --
13 -- Basic concurrency stuff.
14 -- 
15 -----------------------------------------------------------------------------
16
17 -- No: #hide, because bits of this module are exposed by the stm package.
18 -- However, we don't want this module to be the home location for the
19 -- bits it exports, we'd rather have Control.Concurrent and the other
20 -- higher level modules be the home.  Hence:
21
22 -- #not-home
23 module GHC.Conc
24         ( ThreadId(..)
25
26         -- Forking and suchlike
27         , myThreadId    -- :: IO ThreadId
28         , killThread    -- :: ThreadId -> IO ()
29         , throwTo       -- :: ThreadId -> Exception -> IO ()
30         , par           -- :: a -> b -> b
31         , pseq          -- :: a -> b -> b
32         , yield         -- :: IO ()
33         , labelThread   -- :: ThreadId -> String -> IO ()
34
35         -- Waiting
36         , threadDelay           -- :: Int -> IO ()
37         , threadWaitRead        -- :: Int -> IO ()
38         , threadWaitWrite       -- :: Int -> IO ()
39
40         -- MVars
41         , MVar          -- abstract
42         , newMVar       -- :: a -> IO (MVar a)
43         , newEmptyMVar  -- :: IO (MVar a)
44         , takeMVar      -- :: MVar a -> IO a
45         , putMVar       -- :: MVar a -> a -> IO ()
46         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
47         , tryPutMVar    -- :: MVar a -> a -> IO Bool
48         , isEmptyMVar   -- :: MVar a -> IO Bool
49         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
50
51         -- TVars
52         , STM           -- abstract
53         , atomically    -- :: STM a -> IO a
54         , retry         -- :: STM a
55         , orElse        -- :: STM a -> STM a -> STM a
56         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
57         , TVar          -- abstract
58         , newTVar       -- :: a -> STM (TVar a)
59         , readTVar      -- :: TVar a -> STM a
60         , writeTVar     -- :: a -> TVar a -> STM ()
61         , unsafeIOToSTM -- :: IO a -> STM a
62
63 #ifdef mingw32_HOST_OS
64         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
65         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
66         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
67
68         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
69         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
70 #endif
71
72 #ifndef mingw32_HOST_OS
73         , ensureIOManagerIsRunning
74 #endif
75         ) where
76
77 import System.Posix.Types
78 import System.Posix.Internals
79 import Foreign
80 import Foreign.C
81
82 import Data.Maybe
83
84 import GHC.Base
85 import GHC.IOBase
86 import GHC.Num          ( Num(..) )
87 import GHC.Real         ( fromIntegral, quot )
88 import GHC.Base         ( Int(..) )
89 import GHC.Exception    ( Exception(..), AsyncException(..) )
90 import GHC.Pack         ( packCString# )
91 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
92 import GHC.STRef
93 import Data.Typeable
94
95 infixr 0 `par`, `pseq`
96 \end{code}
97
98 %************************************************************************
99 %*                                                                      *
100 \subsection{@ThreadId@, @par@, and @fork@}
101 %*                                                                      *
102 %************************************************************************
103
104 \begin{code}
105 data ThreadId = ThreadId ThreadId# deriving( Typeable )
106 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
107 -- But since ThreadId# is unlifted, the Weak type must use open
108 -- type variables.
109 {- ^
110 A 'ThreadId' is an abstract type representing a handle to a thread.
111 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
112 the 'Ord' instance implements an arbitrary total ordering over
113 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
114 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
115 useful when debugging or diagnosing the behaviour of a concurrent
116 program.
117
118 /Note/: in GHC, if you have a 'ThreadId', you essentially have
119 a pointer to the thread itself.  This means the thread itself can\'t be
120 garbage collected until you drop the 'ThreadId'.
121 This misfeature will hopefully be corrected at a later date.
122
123 /Note/: Hugs does not provide any operations on other threads;
124 it defines 'ThreadId' as a synonym for ().
125 -}
126
127 --forkIO has now been hoisted out into the Concurrent library.
128
129 {- | 'killThread' terminates the given thread (GHC only).
130 Any work already done by the thread isn\'t
131 lost: the computation is suspended until required by another thread.
132 The memory used by the thread will be garbage collected if it isn\'t
133 referenced from anywhere.  The 'killThread' function is defined in
134 terms of 'throwTo':
135
136 > killThread tid = throwTo tid (AsyncException ThreadKilled)
137
138 -}
139 killThread :: ThreadId -> IO ()
140 killThread tid = throwTo tid (AsyncException ThreadKilled)
141
142 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
143
144 'throwTo' does not return until the exception has been raised in the
145 target thread.  The calling thread can thus be certain that the target
146 thread has received the exception.  This is a useful property to know
147 when dealing with race conditions: eg. if there are two threads that
148 can kill each other, it is guaranteed that only one of the threads
149 will get to kill the other.
150
151 If the target thread is currently making a foreign call, then the
152 exception will not be raised (and hence 'throwTo' will not return)
153 until the call has completed.  This is the case regardless of whether
154 the call is inside a 'block' or not.
155  -}
156 throwTo :: ThreadId -> Exception -> IO ()
157 throwTo (ThreadId id) ex = IO $ \ s ->
158    case (killThread# id ex s) of s1 -> (# s1, () #)
159
160 -- | Returns the 'ThreadId' of the calling thread (GHC only).
161 myThreadId :: IO ThreadId
162 myThreadId = IO $ \s ->
163    case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
164
165
166 -- |The 'yield' action allows (forces, in a co-operative multitasking
167 -- implementation) a context-switch to any other currently runnable
168 -- threads (if any), and is occasionally useful when implementing
169 -- concurrency abstractions.
170 yield :: IO ()
171 yield = IO $ \s -> 
172    case (yield# s) of s1 -> (# s1, () #)
173
174 {- | 'labelThread' stores a string as identifier for this thread if
175 you built a RTS with debugging support. This identifier will be used in
176 the debugging output to make distinction of different threads easier
177 (otherwise you only have the thread state object\'s address in the heap).
178
179 Other applications like the graphical Concurrent Haskell Debugger
180 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
181 'labelThread' for their purposes as well.
182 -}
183
184 labelThread :: ThreadId -> String -> IO ()
185 labelThread (ThreadId t) str = IO $ \ s ->
186    let ps  = packCString# str
187        adr = byteArrayContents# ps in
188      case (labelThread# t adr s) of s1 -> (# s1, () #)
189
190 --      Nota Bene: 'pseq' used to be 'seq'
191 --                 but 'seq' is now defined in PrelGHC
192 --
193 -- "pseq" is defined a bit weirdly (see below)
194 --
195 -- The reason for the strange "lazy" call is that
196 -- it fools the compiler into thinking that pseq  and par are non-strict in
197 -- their second argument (even if it inlines pseq at the call site).
198 -- If it thinks pseq is strict in "y", then it often evaluates
199 -- "y" before "x", which is totally wrong.  
200
201 {-# INLINE pseq  #-}
202 pseq :: a -> b -> b
203 pseq  x y = x `seq` lazy y
204
205 {-# INLINE par  #-}
206 par :: a -> b -> b
207 par  x y = case (par# x) of { _ -> lazy y }
208 \end{code}
209
210
211 %************************************************************************
212 %*                                                                      *
213 \subsection[stm]{Transactional heap operations}
214 %*                                                                      *
215 %************************************************************************
216
217 TVars are shared memory locations which support atomic memory
218 transactions.
219
220 \begin{code}
221 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
222
223 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
224 unSTM (STM a) = a
225
226 instance  Functor STM where
227    fmap f x = x >>= (return . f)
228
229 instance  Monad STM  where
230     {-# INLINE return #-}
231     {-# INLINE (>>)   #-}
232     {-# INLINE (>>=)  #-}
233     m >> k      = thenSTM m k
234     return x    = returnSTM x
235     m >>= k     = bindSTM m k
236
237 bindSTM :: STM a -> (a -> STM b) -> STM b
238 bindSTM (STM m) k = STM ( \s ->
239   case m s of 
240     (# new_s, a #) -> unSTM (k a) new_s
241   )
242
243 thenSTM :: STM a -> STM b -> STM b
244 thenSTM (STM m) k = STM ( \s ->
245   case m s of 
246     (# new_s, a #) -> unSTM k new_s
247   )
248
249 returnSTM :: a -> STM a
250 returnSTM x = STM (\s -> (# s, x #))
251
252 -- | Unsafely performs IO in the STM monad.
253 unsafeIOToSTM :: IO a -> STM a
254 unsafeIOToSTM (IO m) = STM m
255
256 -- |Perform a series of STM actions atomically.
257 atomically :: STM a -> IO a
258 atomically (STM m) = IO (\s -> (atomically# m) s )
259
260 -- |Retry execution of the current memory transaction because it has seen
261 -- values in TVars which mean that it should not continue (e.g. the TVars
262 -- represent a shared buffer that is now empty).  The implementation may
263 -- block the thread until one of the TVars that it has read from has been
264 -- udpated.
265 retry :: STM a
266 retry = STM $ \s# -> retry# s#
267
268 -- |Compose two alternative STM actions.  If the first action completes without
269 -- retrying then it forms the result of the orElse.  Otherwise, if the first
270 -- action retries, then the second action is tried in its place.  If both actions
271 -- retry then the orElse as a whole retries.
272 orElse :: STM a -> STM a -> STM a
273 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
274
275 -- |Exception handling within STM actions.
276 catchSTM :: STM a -> (Exception -> STM a) -> STM a
277 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
278
279 data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
280
281 instance Eq (TVar a) where
282         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
283
284 -- |Create a new TVar holding a value supplied
285 newTVar :: a -> STM (TVar a)
286 newTVar val = STM $ \s1# ->
287     case newTVar# val s1# of
288          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
289
290 -- |Return the current value stored in a TVar
291 readTVar :: TVar a -> STM a
292 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
293
294 -- |Write the supplied value into a TVar
295 writeTVar :: TVar a -> a -> STM ()
296 writeTVar (TVar tvar#) val = STM $ \s1# ->
297     case writeTVar# tvar# val s1# of
298          s2# -> (# s2#, () #)
299   
300 \end{code}
301
302 %************************************************************************
303 %*                                                                      *
304 \subsection[mvars]{M-Structures}
305 %*                                                                      *
306 %************************************************************************
307
308 M-Vars are rendezvous points for concurrent threads.  They begin
309 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
310 is written, a single blocked thread may be freed.  Reading an M-Var
311 toggles its state from full back to empty.  Therefore, any value
312 written to an M-Var may only be read once.  Multiple reads and writes
313 are allowed, but there must be at least one read between any two
314 writes.
315
316 \begin{code}
317 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
318
319 -- |Create an 'MVar' which is initially empty.
320 newEmptyMVar  :: IO (MVar a)
321 newEmptyMVar = IO $ \ s# ->
322     case newMVar# s# of
323          (# s2#, svar# #) -> (# s2#, MVar svar# #)
324
325 -- |Create an 'MVar' which contains the supplied value.
326 newMVar :: a -> IO (MVar a)
327 newMVar value =
328     newEmptyMVar        >>= \ mvar ->
329     putMVar mvar value  >>
330     return mvar
331
332 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
333 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
334 -- the 'MVar' is left empty.
335 -- 
336 -- If several threads are competing to take the same 'MVar', one is chosen
337 -- to continue at random when the 'MVar' becomes full.
338 takeMVar :: MVar a -> IO a
339 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
340
341 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
342 -- 'putMVar' will wait until it becomes empty.
343 --
344 -- If several threads are competing to fill the same 'MVar', one is
345 -- chosen to continue at random when the 'MVar' becomes empty.
346 putMVar  :: MVar a -> a -> IO ()
347 putMVar (MVar mvar#) x = IO $ \ s# ->
348     case putMVar# mvar# x s# of
349         s2# -> (# s2#, () #)
350
351 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
352 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
353 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
354 -- the 'MVar' is left empty.
355 tryTakeMVar :: MVar a -> IO (Maybe a)
356 tryTakeMVar (MVar m) = IO $ \ s ->
357     case tryTakeMVar# m s of
358         (# s, 0#, _ #) -> (# s, Nothing #)      -- MVar is empty
359         (# s, _,  a #) -> (# s, Just a  #)      -- MVar is full
360
361 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
362 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
363 -- it was successful, or 'False' otherwise.
364 tryPutMVar  :: MVar a -> a -> IO Bool
365 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
366     case tryPutMVar# mvar# x s# of
367         (# s, 0# #) -> (# s, False #)
368         (# s, _  #) -> (# s, True #)
369
370 -- |Check whether a given 'MVar' is empty.
371 --
372 -- Notice that the boolean value returned  is just a snapshot of
373 -- the state of the MVar. By the time you get to react on its result,
374 -- the MVar may have been filled (or emptied) - so be extremely
375 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
376 isEmptyMVar :: MVar a -> IO Bool
377 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
378     case isEmptyMVar# mv# s# of
379         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
380
381 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
382 -- "System.Mem.Weak" for more about finalizers.
383 addMVarFinalizer :: MVar a -> IO () -> IO ()
384 addMVarFinalizer (MVar m) finalizer = 
385   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
386 \end{code}
387
388
389 %************************************************************************
390 %*                                                                      *
391 \subsection{Thread waiting}
392 %*                                                                      *
393 %************************************************************************
394
395 \begin{code}
396 #ifdef mingw32_HOST_OS
397
398 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
399 -- on Win32, but left in there because lib code (still) uses them (the manner
400 -- in which they're used doesn't cause problems on a Win32 platform though.)
401
402 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
403 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
404   IO $ \s -> case asyncRead# fd isSock len buf s of 
405                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
406
407 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
408 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
409   IO $ \s -> case asyncWrite# fd isSock len buf s of 
410                (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
411
412 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
413 asyncDoProc (FunPtr proc) (Ptr param) = 
414     -- the 'length' value is ignored; simplifies implementation of
415     -- the async*# primops to have them all return the same result.
416   IO $ \s -> case asyncDoProc# proc param s  of 
417                (# s, len#, err# #) -> (# s, I# err# #)
418
419 -- to aid the use of these primops by the IO Handle implementation,
420 -- provide the following convenience funs:
421
422 -- this better be a pinned byte array!
423 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
424 asyncReadBA fd isSock len off bufB = 
425   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
426   
427 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
428 asyncWriteBA fd isSock len off bufB = 
429   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
430
431 #endif
432
433 -- -----------------------------------------------------------------------------
434 -- Thread IO API
435
436 -- | Block the current thread until data is available to read on the
437 -- given file descriptor (GHC only).
438 threadWaitRead :: Fd -> IO ()
439 threadWaitRead fd
440 #ifndef mingw32_HOST_OS
441   | threaded  = waitForReadEvent fd
442 #endif
443   | otherwise = IO $ \s -> 
444         case fromIntegral fd of { I# fd# ->
445         case waitRead# fd# s of { s -> (# s, () #)
446         }}
447
448 -- | Block the current thread until data can be written to the
449 -- given file descriptor (GHC only).
450 threadWaitWrite :: Fd -> IO ()
451 threadWaitWrite fd
452 #ifndef mingw32_HOST_OS
453   | threaded  = waitForWriteEvent fd
454 #endif
455   | otherwise = IO $ \s -> 
456         case fromIntegral fd of { I# fd# ->
457         case waitWrite# fd# s of { s -> (# s, () #)
458         }}
459
460 -- | Suspends the current thread for a given number of microseconds
461 -- (GHC only).
462 --
463 -- Note that the resolution used by the Haskell runtime system's
464 -- internal timer is 1\/50 second, and 'threadDelay' will round its
465 -- argument up to the nearest multiple of this resolution.
466 --
467 -- There is no guarantee that the thread will be rescheduled promptly
468 -- when the delay has expired, but the thread will never continue to
469 -- run /earlier/ than specified.
470 --
471 threadDelay :: Int -> IO ()
472 threadDelay time
473 #ifndef mingw32_HOST_OS
474   | threaded  = waitForDelayEvent time
475 #else
476   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
477 #endif
478   | otherwise = IO $ \s -> 
479         case fromIntegral time of { I# time# ->
480         case delay# time# s of { s -> (# s, () #)
481         }}
482
483 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
484 #ifdef mingw32_HOST_OS
485 foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
486 #endif
487
488 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
489
490 -- ----------------------------------------------------------------------------
491 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
492
493 -- In the threaded RTS, we employ a single IO Manager thread to wait
494 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
495 -- and delays (threadDelay).  
496 --
497 -- We can do this because in the threaded RTS the IO Manager can make
498 -- a non-blocking call to select(), so we don't have to do select() in
499 -- the scheduler as we have to in the non-threaded RTS.  We get performance
500 -- benefits from doing it this way, because we only have to restart the select()
501 -- when a new request arrives, rather than doing one select() each time
502 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
503 -- by not having to check for completed IO requests.
504
505 -- Issues, possible problems:
506 --
507 --      - we might want bound threads to just do the blocking
508 --        operation rather than communicating with the IO manager
509 --        thread.  This would prevent simgle-threaded programs which do
510 --        IO from requiring multiple OS threads.  However, it would also
511 --        prevent bound threads waiting on IO from being killed or sent
512 --        exceptions.
513 --
514 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
515 --        I couldn't repeat this.
516 --
517 --      - How do we handle signal delivery in the multithreaded RTS?
518 --
519 --      - forkProcess will kill the IO manager thread.  Let's just
520 --        hope we don't need to do any blocking IO between fork & exec.
521
522 #ifndef mingw32_HOST_OS
523
524 data IOReq
525   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
526   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
527
528 data DelayReq
529   = Delay  {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
530
531 pendingEvents :: IORef [IOReq]
532 pendingDelays :: IORef [DelayReq]
533         -- could use a strict list or array here
534 {-# NOINLINE pendingEvents #-}
535 {-# NOINLINE pendingDelays #-}
536 (pendingEvents,pendingDelays) = unsafePerformIO $ do
537   startIOManagerThread
538   reqs <- newIORef []
539   dels <- newIORef []
540   return (reqs, dels)
541         -- the first time we schedule an IO request, the service thread
542         -- will be created (cool, huh?)
543
544 ensureIOManagerIsRunning :: IO ()
545 ensureIOManagerIsRunning 
546   | threaded  = seq pendingEvents $ return ()
547   | otherwise = return ()
548
549 startIOManagerThread :: IO ()
550 startIOManagerThread = do
551         allocaArray 2 $ \fds -> do
552         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
553         rd_end <- peekElemOff fds 0
554         wr_end <- peekElemOff fds 1
555         writeIORef stick (fromIntegral wr_end)
556         c_setIOManagerPipe wr_end
557         quickForkIO $ do
558             allocaBytes sizeofFdSet   $ \readfds -> do
559             allocaBytes sizeofFdSet   $ \writefds -> do 
560             allocaBytes sizeofTimeVal $ \timeval -> do
561             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
562         return ()
563
564 -- XXX: move real forkIO here from Control.Concurrent?
565 quickForkIO action = IO $ \s ->
566    case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
567
568 service_loop
569    :: Fd                -- listen to this for wakeup calls
570    -> Ptr CFdSet
571    -> Ptr CFdSet
572    -> Ptr CTimeVal
573    -> [IOReq]
574    -> [DelayReq]
575    -> IO ()
576 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
577
578   -- pick up new IO requests
579   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
580   let reqs = new_reqs ++ old_reqs
581
582   -- pick up new delay requests
583   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
584   let  delays = foldr insertDelay old_delays new_delays
585
586   -- build the FDSets for select()
587   fdZero readfds
588   fdZero writefds
589   fdSet wakeup readfds
590   maxfd <- buildFdSets 0 readfds writefds reqs
591
592   -- perform the select()
593   let do_select delays = do
594           -- check the current time and wake up any thread in
595           -- threadDelay whose timeout has expired.  Also find the
596           -- timeout value for the select() call.
597           now <- getTicksOfDay
598           (delays', timeout) <- getDelay now ptimeval delays
599
600           res <- c_select ((max wakeup maxfd)+1) readfds writefds 
601                         nullPtr timeout
602           if (res == -1)
603              then do
604                 err <- getErrno
605                 if err == eINTR
606                         then do_select delays'
607                         else return (res,delays')
608              else
609                 return (res,delays')
610
611   (res,delays') <- do_select delays
612   -- ToDo: check result
613
614   b <- fdIsSet wakeup readfds
615   if b == 0 
616     then return ()
617     else alloca $ \p -> do 
618             c_read (fromIntegral wakeup) p 1; return ()
619             s <- peek p         
620             if (s == 0xff) 
621               then return ()
622               else c_startSignalHandler (fromIntegral s)
623
624   takeMVar prodding
625   putMVar prodding False
626
627   reqs' <- completeRequests reqs readfds writefds []
628   service_loop wakeup readfds writefds ptimeval reqs' delays'
629
630 stick :: IORef Fd
631 {-# NOINLINE stick #-}
632 stick = unsafePerformIO (newIORef 0)
633
634 prodding :: MVar Bool
635 {-# NOINLINE prodding #-}
636 prodding = unsafePerformIO (newMVar False)
637
638 prodServiceThread :: IO ()
639 prodServiceThread = do
640   b <- takeMVar prodding
641   if (not b) 
642     then do fd <- readIORef stick
643             with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
644     else return ()
645   putMVar prodding True
646
647 foreign import ccall unsafe "startSignalHandler"
648   c_startSignalHandler :: CInt -> IO ()
649
650 foreign import ccall "setIOManagerPipe"
651   c_setIOManagerPipe :: CInt -> IO ()
652
653 -- -----------------------------------------------------------------------------
654 -- IO requests
655
656 buildFdSets maxfd readfds writefds [] = return maxfd
657 buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
658   fdSet fd readfds
659   buildFdSets (max maxfd fd) readfds writefds reqs
660 buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
661   fdSet fd writefds
662   buildFdSets (max maxfd fd) readfds writefds reqs
663
664 completeRequests [] _ _ reqs' = return reqs'
665 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
666   b <- fdIsSet fd readfds
667   if b /= 0
668     then do putMVar m (); completeRequests reqs readfds writefds reqs'
669     else completeRequests reqs readfds writefds (Read fd m : reqs')
670 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
671   b <- fdIsSet fd writefds
672   if b /= 0
673     then do putMVar m (); completeRequests reqs readfds writefds reqs'
674     else completeRequests reqs readfds writefds (Write fd m : reqs')
675
676 waitForReadEvent :: Fd -> IO ()
677 waitForReadEvent fd = do
678   m <- newEmptyMVar
679   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
680   prodServiceThread
681   takeMVar m
682
683 waitForWriteEvent :: Fd -> IO ()
684 waitForWriteEvent fd = do
685   m <- newEmptyMVar
686   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
687   prodServiceThread
688   takeMVar m
689
690 -- XXX: move into GHC.IOBase from Data.IORef?
691 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
692 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
693
694 -- -----------------------------------------------------------------------------
695 -- Delays
696
697 waitForDelayEvent :: Int -> IO ()
698 waitForDelayEvent usecs = do
699   m <- newEmptyMVar
700   now <- getTicksOfDay
701   let target = now + usecs `quot` tick_usecs
702   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
703   prodServiceThread
704   takeMVar m
705
706 -- Walk the queue of pending delays, waking up any that have passed
707 -- and return the smallest delay to wait for.  The queue of pending
708 -- delays is kept ordered.
709 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
710 getDelay now ptimeval [] = return ([],nullPtr)
711 getDelay now ptimeval all@(Delay time m : rest)
712   | now >= time = do
713         putMVar m ()
714         getDelay now ptimeval rest
715   | otherwise = do
716         setTimevalTicks ptimeval (time - now)
717         return (all,ptimeval)
718
719 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
720 insertDelay d@(Delay time m) [] = [d]
721 insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
722   | time <= time' = d1 : ds
723   | otherwise     = d2 : insertDelay d1 rest
724
725 type Ticks = Int
726 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
727 tick_usecs = 1000000 `quot` tick_freq :: Int
728
729 newtype CTimeVal = CTimeVal ()
730
731 foreign import ccall unsafe "sizeofTimeVal"
732   sizeofTimeVal :: Int
733
734 foreign import ccall unsafe "getTicksOfDay" 
735   getTicksOfDay :: IO Ticks
736
737 foreign import ccall unsafe "setTimevalTicks" 
738   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
739
740 -- ----------------------------------------------------------------------------
741 -- select() interface
742
743 -- ToDo: move to System.Posix.Internals?
744
745 newtype CFdSet = CFdSet ()
746
747 foreign import ccall safe "select"
748   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
749            -> IO CInt
750
751 foreign import ccall unsafe "hsFD_CLR"
752   fdClr :: Fd -> Ptr CFdSet -> IO ()
753
754 foreign import ccall unsafe "hsFD_ISSET"
755   fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
756
757 foreign import ccall unsafe "hsFD_SET"
758   fdSet :: Fd -> Ptr CFdSet -> IO ()
759
760 foreign import ccall unsafe "hsFD_ZERO"
761   fdZero :: Ptr CFdSet -> IO ()
762
763 foreign import ccall unsafe "sizeof_fd_set"
764   sizeofFdSet :: Int
765
766 #endif
767 \end{code}