1 {-# LANGUAGE BangPatterns
3 , ExistentialQuantification
10 module GHC.Event.Manager
27 -- * Registering interest in I/O events
39 -- * Registering interest in timeout events
47 #include "EventConfig.h"
49 ------------------------------------------------------------------------
52 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
53 import Control.Exception (finally)
54 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
55 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
57 import Data.Maybe (Maybe(..))
58 import Data.Monoid (mappend, mconcat, mempty)
60 import GHC.Conc.Signal (runHandlers)
61 import GHC.List (filter)
62 import GHC.Num (Num(..))
63 import GHC.Real ((/), fromIntegral )
64 import GHC.Show (Show(..))
65 import GHC.Event.Clock (getCurrentTime)
66 import GHC.Event.Control
67 import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
69 import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
70 import System.Posix.Types (Fd)
72 import qualified GHC.Event.IntMap as IM
73 import qualified GHC.Event.Internal as I
74 import qualified GHC.Event.PSQ as Q
76 #if defined(HAVE_KQUEUE)
77 import qualified GHC.Event.KQueue as KQueue
78 #elif defined(HAVE_EPOLL)
79 import qualified GHC.Event.EPoll as EPoll
80 #elif defined(HAVE_POLL)
81 import qualified GHC.Event.Poll as Poll
83 # error not implemented for this operating system
86 ------------------------------------------------------------------------
89 data FdData = FdData {
90 fdKey :: {-# UNPACK #-} !FdKey
91 , fdEvents :: {-# UNPACK #-} !Event
92 , _fdCallback :: !IOCallback
95 -- | A file descriptor registration cookie.
97 keyFd :: {-# UNPACK #-} !Fd
98 , keyUnique :: {-# UNPACK #-} !Unique
101 -- | Callback invoked on I/O events.
102 type IOCallback = FdKey -> Event -> IO ()
104 instance Show IOCallback where
105 show _ = "IOCallback"
107 -- | A timeout registration cookie.
108 newtype TimeoutKey = TK Unique
111 -- | Callback invoked on timeout events.
112 type TimeoutCallback = IO ()
120 -- | A priority search queue, with timeouts as priorities.
121 type TimeoutQueue = Q.PSQ TimeoutCallback
124 Instead of directly modifying the 'TimeoutQueue' in
125 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
126 of a chain of function closures, and have the I/O manager thread
127 perform the edits later. This exist to address the following GC
130 Since e.g. 'registerTimeout' doesn't force the evaluation of the
131 thunks inside the 'emTimeouts' IORef a number of thunks build up
132 inside the IORef. If the I/O manager thread doesn't evaluate these
133 thunks soon enough they'll get promoted to the old generation and
134 become roots for all subsequent minor GCs.
136 When the thunks eventually get evaluated they will each create a new
137 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
138 the thunks serve as roots until the next major GC these intermediate
139 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
140 increasing GC time. This problem is known as "floating garbage".
142 Keeping a list of edits doesn't stop this from happening but makes the
143 amount of data that gets copied smaller.
145 TODO: Evaluate the content of the IORef to WHNF on each insert once
146 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
149 -- | An edit to apply to a 'TimeoutQueue'.
150 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
152 -- | The event manager state.
153 data EventManager = EventManager
154 { emBackend :: !Backend
155 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
156 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
157 , emState :: {-# UNPACK #-} !(IORef State)
158 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
159 , emControl :: {-# UNPACK #-} !Control
162 ------------------------------------------------------------------------
165 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
166 handleControlEvent mgr reg _evt = do
167 msg <- readControlMessage (emControl mgr) (keyFd reg)
169 CMsgWakeup -> return ()
170 CMsgDie -> writeIORef (emState mgr) Finished
171 CMsgSignal fp s -> runHandlers fp s
173 newDefaultBackend :: IO Backend
174 #if defined(HAVE_KQUEUE)
175 newDefaultBackend = KQueue.new
176 #elif defined(HAVE_EPOLL)
177 newDefaultBackend = EPoll.new
178 #elif defined(HAVE_POLL)
179 newDefaultBackend = Poll.new
181 newDefaultBackend = error "no back end for this platform"
184 -- | Create a new event manager.
185 new :: IO EventManager
186 new = newWith =<< newDefaultBackend
188 newWith :: Backend -> IO EventManager
190 iofds <- newMVar IM.empty
191 timeouts <- newIORef id
193 state <- newIORef Created
195 _ <- mkWeakIORef state $ do
196 st <- atomicModifyIORef state $ \s -> (Finished, s)
197 when (st /= Finished) $ do
200 let mgr = EventManager { emBackend = be
202 , emTimeouts = timeouts
204 , emUniqueSource = us
207 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
208 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
211 -- | Asynchronously shuts down the event manager, if running.
212 shutdown :: EventManager -> IO ()
214 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
215 when (state == Running) $ sendDie (emControl mgr)
217 finished :: EventManager -> IO Bool
218 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
220 cleanup :: EventManager -> IO ()
221 cleanup EventManager{..} = do
222 writeIORef emState Finished
224 closeControl emControl
226 ------------------------------------------------------------------------
229 -- | Start handling events. This function loops until told to stop,
232 -- /Note/: This loop can only be run once per 'EventManager', as it
233 -- closes all of its control resources when it finishes.
234 loop :: EventManager -> IO ()
235 loop mgr@EventManager{..} = do
236 state <- atomicModifyIORef emState $ \s -> case s of
237 Created -> (Running, s)
240 Created -> go Q.empty `finally` cleanup mgr
243 error $ "GHC.Event.Manager.loop: state is already " ++
246 go q = do (running, q') <- step mgr q
249 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
250 step mgr@EventManager{..} tq = do
251 (timeout, q') <- mkTimeout tq
252 I.poll emBackend timeout (onFdEvent mgr)
253 state <- readIORef emState
254 state `seq` return (state == Running, q')
257 -- | Call all expired timer callbacks and return the time to the
259 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
261 now <- getCurrentTime
262 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
263 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
264 sequence_ $ map Q.value expired
265 let timeout = case Q.minView q'' of
267 Just (Q.E _ t _, _) ->
268 -- This value will always be positive since the call
269 -- to 'atMost' above removed any timeouts <= 'now'
270 let t' = t - now in t' `seq` Timeout t'
271 return (timeout, q'')
273 ------------------------------------------------------------------------
274 -- Registering interest in I/O events
276 -- | Register interest in the given events, without waking the event
277 -- manager thread. The 'Bool' return value indicates whether the
278 -- event manager ought to be woken.
279 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
281 registerFd_ EventManager{..} cb fd evs = do
282 u <- newUnique emUniqueSource
283 modifyMVar emFds $ \oldMap -> do
284 let fd' = fromIntegral fd
286 !fdd = FdData reg evs cb
287 (!newMap, (oldEvs, newEvs)) =
288 case IM.insertWith (++) fd' [fdd] oldMap of
289 (Nothing, n) -> (n, (mempty, evs))
290 (Just prev, n) -> (n, pairEvents prev newMap fd')
291 modify = oldEvs /= newEvs
292 when modify $ I.modifyFd emBackend fd oldEvs newEvs
293 return (newMap, (reg, modify))
294 {-# INLINE registerFd_ #-}
296 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
297 -- on the file descriptor @fd@. @cb@ is called for each event that
298 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
299 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
300 registerFd mgr cb fd evs = do
301 (r, wake) <- registerFd_ mgr cb fd evs
302 when wake $ wakeManager mgr
304 {-# INLINE registerFd #-}
306 -- | Wake up the event manager.
307 wakeManager :: EventManager -> IO ()
308 wakeManager mgr = sendWakeup (emControl mgr)
310 eventsOf :: [FdData] -> Event
311 eventsOf = mconcat . map fdEvents
313 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
314 pairEvents prev m fd = let l = eventsOf prev
315 r = case IM.lookup fd m of
317 Just fds -> eventsOf fds
320 -- | Drop a previous file descriptor registration, without waking the
321 -- event manager thread. The return value indicates whether the event
322 -- manager ought to be woken.
323 unregisterFd_ :: EventManager -> FdKey -> IO Bool
324 unregisterFd_ EventManager{..} (FdKey fd u) =
325 modifyMVar emFds $ \oldMap -> do
326 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
329 fd' = fromIntegral fd
330 (!newMap, (oldEvs, newEvs)) =
331 case IM.updateWith dropReg fd' oldMap of
332 (Nothing, _) -> (oldMap, (mempty, mempty))
333 (Just prev, newm) -> (newm, pairEvents prev newm fd')
334 modify = oldEvs /= newEvs
335 when modify $ I.modifyFd emBackend fd oldEvs newEvs
336 return (newMap, modify)
338 -- | Drop a previous file descriptor registration.
339 unregisterFd :: EventManager -> FdKey -> IO ()
340 unregisterFd mgr reg = do
341 wake <- unregisterFd_ mgr reg
342 when wake $ wakeManager mgr
344 -- | Close a file descriptor in a race-safe way.
345 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
346 closeFd mgr close fd = do
347 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
349 case IM.delete (fromIntegral fd) oldMap of
350 (Nothing, _) -> return (oldMap, [])
351 (Just fds, !newMap) -> do
352 when (eventsOf fds /= mempty) $ wakeManager mgr
354 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
356 ------------------------------------------------------------------------
357 -- Registering interest in timeout events
359 -- | Register a timeout in the given number of microseconds. The
360 -- returned 'TimeoutKey' can be used to later unregister or update the
361 -- timeout. The timeout is automatically unregistered after the given
363 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
364 registerTimeout mgr us cb = do
365 !key <- newUnique (emUniqueSource mgr)
368 now <- getCurrentTime
369 let expTime = fromIntegral us / 1000000.0 + now
371 -- We intentionally do not evaluate the modified map to WHNF here.
372 -- Instead, we leave a thunk inside the IORef and defer its
373 -- evaluation until mkTimeout in the event loop. This is a
374 -- workaround for a nasty IORef contention problem that causes the
375 -- thread-delay benchmark to take 20 seconds instead of 0.2.
376 atomicModifyIORef (emTimeouts mgr) $ \f ->
377 let f' = (Q.insert key expTime cb) . f in (f', ())
381 -- | Unregister an active timeout.
382 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
383 unregisterTimeout mgr (TK key) = do
384 atomicModifyIORef (emTimeouts mgr) $ \f ->
385 let f' = (Q.delete key) . f in (f', ())
388 -- | Update an active timeout to fire in the given number of
390 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
391 updateTimeout mgr (TK key) us = do
392 now <- getCurrentTime
393 let expTime = fromIntegral us / 1000000.0 + now
395 atomicModifyIORef (emTimeouts mgr) $ \f ->
396 let f' = (Q.adjust (const expTime) key) . f in (f', ())
399 ------------------------------------------------------------------------
402 -- | Call the callbacks corresponding to the given file descriptor.
403 onFdEvent :: EventManager -> Fd -> Event -> IO ()
404 onFdEvent mgr fd evs = do
405 fds <- readMVar (emFds mgr)
406 case IM.lookup (fromIntegral fd) fds of
407 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
408 when (evs `I.eventIs` ev) $ cb reg evs