Merge branch 'monad-comp'
[ghc-base.git] / GHC / Event / Manager.hs
1 {-# LANGUAGE BangPatterns
2            , CPP
3            , ExistentialQuantification
4            , NoImplicitPrelude
5            , RecordWildCards
6            , TypeSynonymInstances
7            , FlexibleInstances
8   #-}
9
10 module GHC.Event.Manager
11     ( -- * Types
12       EventManager
13
14       -- * Creation
15     , new
16     , newWith
17     , newDefaultBackend
18
19       -- * Running
20     , finished
21     , loop
22     , step
23     , shutdown
24     , cleanup
25     , wakeManager
26
27       -- * Registering interest in I/O events
28     , Event
29     , evtRead
30     , evtWrite
31     , IOCallback
32     , FdKey(keyFd)
33     , registerFd_
34     , registerFd
35     , unregisterFd_
36     , unregisterFd
37     , closeFd
38
39       -- * Registering interest in timeout events
40     , TimeoutCallback
41     , TimeoutKey
42     , registerTimeout
43     , updateTimeout
44     , unregisterTimeout
45     ) where
46
47 #include "EventConfig.h"
48
49 ------------------------------------------------------------------------
50 -- Imports
51
52 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
53 import Control.Exception (finally)
54 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
55 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
56                    writeIORef)
57 import Data.Maybe (Maybe(..))
58 import Data.Monoid (mappend, mconcat, mempty)
59 import GHC.Base
60 import GHC.Conc.Signal (runHandlers)
61 import GHC.List (filter)
62 import GHC.Num (Num(..))
63 import GHC.Real ((/), fromIntegral )
64 import GHC.Show (Show(..))
65 import GHC.Event.Clock (getCurrentTime)
66 import GHC.Event.Control
67 import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
68                            Timeout(..))
69 import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
70 import System.Posix.Types (Fd)
71
72 import qualified GHC.Event.IntMap as IM
73 import qualified GHC.Event.Internal as I
74 import qualified GHC.Event.PSQ as Q
75
76 #if defined(HAVE_KQUEUE)
77 import qualified GHC.Event.KQueue as KQueue
78 #elif defined(HAVE_EPOLL)
79 import qualified GHC.Event.EPoll  as EPoll
80 #elif defined(HAVE_POLL)
81 import qualified GHC.Event.Poll   as Poll
82 #else
83 # error not implemented for this operating system
84 #endif
85
86 ------------------------------------------------------------------------
87 -- Types
88
89 data FdData = FdData {
90       fdKey       :: {-# UNPACK #-} !FdKey
91     , fdEvents    :: {-# UNPACK #-} !Event
92     , _fdCallback :: !IOCallback
93     }
94
95 -- | A file descriptor registration cookie.
96 data FdKey = FdKey {
97       keyFd     :: {-# UNPACK #-} !Fd
98     , keyUnique :: {-# UNPACK #-} !Unique
99     } deriving (Eq, Show)
100
101 -- | Callback invoked on I/O events.
102 type IOCallback = FdKey -> Event -> IO ()
103
104 -- | A timeout registration cookie.
105 newtype TimeoutKey   = TK Unique
106     deriving (Eq)
107
108 -- | Callback invoked on timeout events.
109 type TimeoutCallback = IO ()
110
111 data State = Created
112            | Running
113            | Dying
114            | Finished
115              deriving (Eq, Show)
116
117 -- | A priority search queue, with timeouts as priorities.
118 type TimeoutQueue = Q.PSQ TimeoutCallback
119
120 {-
121 Instead of directly modifying the 'TimeoutQueue' in
122 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
123 of a chain of function closures, and have the I/O manager thread
124 perform the edits later.  This exist to address the following GC
125 problem:
126
127 Since e.g. 'registerTimeout' doesn't force the evaluation of the
128 thunks inside the 'emTimeouts' IORef a number of thunks build up
129 inside the IORef.  If the I/O manager thread doesn't evaluate these
130 thunks soon enough they'll get promoted to the old generation and
131 become roots for all subsequent minor GCs.
132
133 When the thunks eventually get evaluated they will each create a new
134 intermediate 'TimeoutQueue' that immediately becomes garbage.  Since
135 the thunks serve as roots until the next major GC these intermediate
136 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
137 increasing GC time.  This problem is known as "floating garbage".
138
139 Keeping a list of edits doesn't stop this from happening but makes the
140 amount of data that gets copied smaller.
141
142 TODO: Evaluate the content of the IORef to WHNF on each insert once
143 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
144 -}
145
146 -- | An edit to apply to a 'TimeoutQueue'.
147 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
148
149 -- | The event manager state.
150 data EventManager = EventManager
151     { emBackend      :: !Backend
152     , emFds          :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
153     , emTimeouts     :: {-# UNPACK #-} !(IORef TimeoutEdit)
154     , emState        :: {-# UNPACK #-} !(IORef State)
155     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
156     , emControl      :: {-# UNPACK #-} !Control
157     }
158
159 ------------------------------------------------------------------------
160 -- Creation
161
162 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
163 handleControlEvent mgr reg _evt = do
164   msg <- readControlMessage (emControl mgr) (keyFd reg)
165   case msg of
166     CMsgWakeup      -> return ()
167     CMsgDie         -> writeIORef (emState mgr) Finished
168     CMsgSignal fp s -> runHandlers fp s
169
170 newDefaultBackend :: IO Backend
171 #if defined(HAVE_KQUEUE)
172 newDefaultBackend = KQueue.new
173 #elif defined(HAVE_EPOLL)
174 newDefaultBackend = EPoll.new
175 #elif defined(HAVE_POLL)
176 newDefaultBackend = Poll.new
177 #else
178 newDefaultBackend = error "no back end for this platform"
179 #endif
180
181 -- | Create a new event manager.
182 new :: IO EventManager
183 new = newWith =<< newDefaultBackend
184
185 newWith :: Backend -> IO EventManager
186 newWith be = do
187   iofds <- newMVar IM.empty
188   timeouts <- newIORef id
189   ctrl <- newControl
190   state <- newIORef Created
191   us <- newSource
192   _ <- mkWeakIORef state $ do
193                st <- atomicModifyIORef state $ \s -> (Finished, s)
194                when (st /= Finished) $ do
195                  I.delete be
196                  closeControl ctrl
197   let mgr = EventManager { emBackend = be
198                          , emFds = iofds
199                          , emTimeouts = timeouts
200                          , emState = state
201                          , emUniqueSource = us
202                          , emControl = ctrl
203                          }
204   _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
205   _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
206   return mgr
207
208 -- | Asynchronously shuts down the event manager, if running.
209 shutdown :: EventManager -> IO ()
210 shutdown mgr = do
211   state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
212   when (state == Running) $ sendDie (emControl mgr)
213
214 finished :: EventManager -> IO Bool
215 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
216
217 cleanup :: EventManager -> IO ()
218 cleanup EventManager{..} = do
219   writeIORef emState Finished
220   I.delete emBackend
221   closeControl emControl
222
223 ------------------------------------------------------------------------
224 -- Event loop
225
226 -- | Start handling events.  This function loops until told to stop,
227 -- using 'shutdown'.
228 --
229 -- /Note/: This loop can only be run once per 'EventManager', as it
230 -- closes all of its control resources when it finishes.
231 loop :: EventManager -> IO ()
232 loop mgr@EventManager{..} = do
233   state <- atomicModifyIORef emState $ \s -> case s of
234     Created -> (Running, s)
235     _       -> (s, s)
236   case state of
237     Created -> go Q.empty `finally` cleanup mgr
238     Dying   -> cleanup mgr
239     _       -> do cleanup mgr
240                   error $ "GHC.Event.Manager.loop: state is already " ++
241                       show state
242  where
243   go q = do (running, q') <- step mgr q
244             when running $ go q'
245
246 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
247 step mgr@EventManager{..} tq = do
248   (timeout, q') <- mkTimeout tq
249   I.poll emBackend timeout (onFdEvent mgr)
250   state <- readIORef emState
251   state `seq` return (state == Running, q')
252  where
253
254   -- | Call all expired timer callbacks and return the time to the
255   -- next timeout.
256   mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
257   mkTimeout q = do
258       now <- getCurrentTime
259       applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
260       let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
261       sequence_ $ map Q.value expired
262       let timeout = case Q.minView q'' of
263             Nothing             -> Forever
264             Just (Q.E _ t _, _) ->
265                 -- This value will always be positive since the call
266                 -- to 'atMost' above removed any timeouts <= 'now'
267                 let t' = t - now in t' `seq` Timeout t'
268       return (timeout, q'')
269
270 ------------------------------------------------------------------------
271 -- Registering interest in I/O events
272
273 -- | Register interest in the given events, without waking the event
274 -- manager thread.  The 'Bool' return value indicates whether the
275 -- event manager ought to be woken.
276 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
277             -> IO (FdKey, Bool)
278 registerFd_ EventManager{..} cb fd evs = do
279   u <- newUnique emUniqueSource
280   modifyMVar emFds $ \oldMap -> do
281     let fd'  = fromIntegral fd
282         reg  = FdKey fd u
283         !fdd = FdData reg evs cb
284         (!newMap, (oldEvs, newEvs)) =
285             case IM.insertWith (++) fd' [fdd] oldMap of
286               (Nothing,   n) -> (n, (mempty, evs))
287               (Just prev, n) -> (n, pairEvents prev newMap fd')
288         modify = oldEvs /= newEvs
289     when modify $ I.modifyFd emBackend fd oldEvs newEvs
290     return (newMap, (reg, modify))
291 {-# INLINE registerFd_ #-}
292
293 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
294 -- on the file descriptor @fd@.  @cb@ is called for each event that
295 -- occurs.  Returns a cookie that can be handed to 'unregisterFd'.
296 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
297 registerFd mgr cb fd evs = do
298   (r, wake) <- registerFd_ mgr cb fd evs
299   when wake $ wakeManager mgr
300   return r
301 {-# INLINE registerFd #-}
302
303 -- | Wake up the event manager.
304 wakeManager :: EventManager -> IO ()
305 wakeManager mgr = sendWakeup (emControl mgr)
306
307 eventsOf :: [FdData] -> Event
308 eventsOf = mconcat . map fdEvents
309
310 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
311 pairEvents prev m fd = let l = eventsOf prev
312                            r = case IM.lookup fd m of
313                                  Nothing  -> mempty
314                                  Just fds -> eventsOf fds
315                        in (l, r)
316
317 -- | Drop a previous file descriptor registration, without waking the
318 -- event manager thread.  The return value indicates whether the event
319 -- manager ought to be woken.
320 unregisterFd_ :: EventManager -> FdKey -> IO Bool
321 unregisterFd_ EventManager{..} (FdKey fd u) =
322   modifyMVar emFds $ \oldMap -> do
323     let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
324                           []   -> Nothing
325                           cbs' -> Just cbs'
326         fd' = fromIntegral fd
327         (!newMap, (oldEvs, newEvs)) =
328             case IM.updateWith dropReg fd' oldMap of
329               (Nothing,   _)    -> (oldMap, (mempty, mempty))
330               (Just prev, newm) -> (newm, pairEvents prev newm fd')
331         modify = oldEvs /= newEvs
332     when modify $ I.modifyFd emBackend fd oldEvs newEvs
333     return (newMap, modify)
334
335 -- | Drop a previous file descriptor registration.
336 unregisterFd :: EventManager -> FdKey -> IO ()
337 unregisterFd mgr reg = do
338   wake <- unregisterFd_ mgr reg
339   when wake $ wakeManager mgr
340
341 -- | Close a file descriptor in a race-safe way.
342 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
343 closeFd mgr close fd = do
344   fds <- modifyMVar (emFds mgr) $ \oldMap -> do
345     close fd
346     case IM.delete (fromIntegral fd) oldMap of
347       (Nothing,  _)       -> return (oldMap, [])
348       (Just fds, !newMap) -> do
349         when (eventsOf fds /= mempty) $ wakeManager mgr
350         return (newMap, fds)
351   forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
352
353 ------------------------------------------------------------------------
354 -- Registering interest in timeout events
355
356 -- | Register a timeout in the given number of microseconds.  The
357 -- returned 'TimeoutKey' can be used to later unregister or update the
358 -- timeout.  The timeout is automatically unregistered after the given
359 -- time has passed.
360 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
361 registerTimeout mgr us cb = do
362   !key <- newUnique (emUniqueSource mgr)
363   if us <= 0 then cb
364     else do
365       now <- getCurrentTime
366       let expTime = fromIntegral us / 1000000.0 + now
367
368       -- We intentionally do not evaluate the modified map to WHNF here.
369       -- Instead, we leave a thunk inside the IORef and defer its
370       -- evaluation until mkTimeout in the event loop.  This is a
371       -- workaround for a nasty IORef contention problem that causes the
372       -- thread-delay benchmark to take 20 seconds instead of 0.2.
373       atomicModifyIORef (emTimeouts mgr) $ \f ->
374           let f' = (Q.insert key expTime cb) . f in (f', ())
375       wakeManager mgr
376   return $ TK key
377
378 -- | Unregister an active timeout.
379 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
380 unregisterTimeout mgr (TK key) = do
381   atomicModifyIORef (emTimeouts mgr) $ \f ->
382       let f' = (Q.delete key) . f in (f', ())
383   wakeManager mgr
384
385 -- | Update an active timeout to fire in the given number of
386 -- microseconds.
387 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
388 updateTimeout mgr (TK key) us = do
389   now <- getCurrentTime
390   let expTime = fromIntegral us / 1000000.0 + now
391
392   atomicModifyIORef (emTimeouts mgr) $ \f ->
393       let f' = (Q.adjust (const expTime) key) . f in (f', ())
394   wakeManager mgr
395
396 ------------------------------------------------------------------------
397 -- Utilities
398
399 -- | Call the callbacks corresponding to the given file descriptor.
400 onFdEvent :: EventManager -> Fd -> Event -> IO ()
401 onFdEvent mgr fd evs = do
402   fds <- readMVar (emFds mgr)
403   case IM.lookup (fromIntegral fd) fds of
404       Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
405                     when (evs `I.eventIs` ev) $ cb reg evs
406       Nothing  -> return ()