1 {-# LANGUAGE BangPatterns
3 , ExistentialQuantification
10 module GHC.Event.Manager
27 -- * Registering interest in I/O events
39 -- * Registering interest in timeout events
47 #include "EventConfig.h"
49 ------------------------------------------------------------------------
52 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
53 import Control.Exception (finally)
54 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
55 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
57 import Data.Maybe (Maybe(..))
58 import Data.Monoid (mappend, mconcat, mempty)
60 import GHC.Conc.Signal (runHandlers)
61 import GHC.List (filter)
62 import GHC.Num (Num(..))
63 import GHC.Real ((/), fromIntegral )
64 import GHC.Show (Show(..))
65 import GHC.Event.Clock (getCurrentTime)
66 import GHC.Event.Control
67 import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
69 import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
70 import System.Posix.Types (Fd)
72 import qualified GHC.Event.IntMap as IM
73 import qualified GHC.Event.Internal as I
74 import qualified GHC.Event.PSQ as Q
76 #if defined(HAVE_KQUEUE)
77 import qualified GHC.Event.KQueue as KQueue
78 #elif defined(HAVE_EPOLL)
79 import qualified GHC.Event.EPoll as EPoll
80 #elif defined(HAVE_POLL)
81 import qualified GHC.Event.Poll as Poll
83 # error not implemented for this operating system
86 ------------------------------------------------------------------------
89 data FdData = FdData {
90 fdKey :: {-# UNPACK #-} !FdKey
91 , fdEvents :: {-# UNPACK #-} !Event
92 , _fdCallback :: !IOCallback
95 -- | A file descriptor registration cookie.
97 keyFd :: {-# UNPACK #-} !Fd
98 , keyUnique :: {-# UNPACK #-} !Unique
101 -- | Callback invoked on I/O events.
102 type IOCallback = FdKey -> Event -> IO ()
104 -- | A timeout registration cookie.
105 newtype TimeoutKey = TK Unique
108 -- | Callback invoked on timeout events.
109 type TimeoutCallback = IO ()
117 -- | A priority search queue, with timeouts as priorities.
118 type TimeoutQueue = Q.PSQ TimeoutCallback
121 Instead of directly modifying the 'TimeoutQueue' in
122 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
123 of a chain of function closures, and have the I/O manager thread
124 perform the edits later. This exist to address the following GC
127 Since e.g. 'registerTimeout' doesn't force the evaluation of the
128 thunks inside the 'emTimeouts' IORef a number of thunks build up
129 inside the IORef. If the I/O manager thread doesn't evaluate these
130 thunks soon enough they'll get promoted to the old generation and
131 become roots for all subsequent minor GCs.
133 When the thunks eventually get evaluated they will each create a new
134 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
135 the thunks serve as roots until the next major GC these intermediate
136 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
137 increasing GC time. This problem is known as "floating garbage".
139 Keeping a list of edits doesn't stop this from happening but makes the
140 amount of data that gets copied smaller.
142 TODO: Evaluate the content of the IORef to WHNF on each insert once
143 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
146 -- | An edit to apply to a 'TimeoutQueue'.
147 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
149 -- | The event manager state.
150 data EventManager = EventManager
151 { emBackend :: !Backend
152 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
153 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
154 , emState :: {-# UNPACK #-} !(IORef State)
155 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
156 , emControl :: {-# UNPACK #-} !Control
159 ------------------------------------------------------------------------
162 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
163 handleControlEvent mgr reg _evt = do
164 msg <- readControlMessage (emControl mgr) (keyFd reg)
166 CMsgWakeup -> return ()
167 CMsgDie -> writeIORef (emState mgr) Finished
168 CMsgSignal fp s -> runHandlers fp s
170 newDefaultBackend :: IO Backend
171 #if defined(HAVE_KQUEUE)
172 newDefaultBackend = KQueue.new
173 #elif defined(HAVE_EPOLL)
174 newDefaultBackend = EPoll.new
175 #elif defined(HAVE_POLL)
176 newDefaultBackend = Poll.new
178 newDefaultBackend = error "no back end for this platform"
181 -- | Create a new event manager.
182 new :: IO EventManager
183 new = newWith =<< newDefaultBackend
185 newWith :: Backend -> IO EventManager
187 iofds <- newMVar IM.empty
188 timeouts <- newIORef id
190 state <- newIORef Created
192 _ <- mkWeakIORef state $ do
193 st <- atomicModifyIORef state $ \s -> (Finished, s)
194 when (st /= Finished) $ do
197 let mgr = EventManager { emBackend = be
199 , emTimeouts = timeouts
201 , emUniqueSource = us
204 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
205 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
208 -- | Asynchronously shuts down the event manager, if running.
209 shutdown :: EventManager -> IO ()
211 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
212 when (state == Running) $ sendDie (emControl mgr)
214 finished :: EventManager -> IO Bool
215 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
217 cleanup :: EventManager -> IO ()
218 cleanup EventManager{..} = do
219 writeIORef emState Finished
221 closeControl emControl
223 ------------------------------------------------------------------------
226 -- | Start handling events. This function loops until told to stop,
229 -- /Note/: This loop can only be run once per 'EventManager', as it
230 -- closes all of its control resources when it finishes.
231 loop :: EventManager -> IO ()
232 loop mgr@EventManager{..} = do
233 state <- atomicModifyIORef emState $ \s -> case s of
234 Created -> (Running, s)
237 Created -> go Q.empty `finally` cleanup mgr
240 error $ "GHC.Event.Manager.loop: state is already " ++
243 go q = do (running, q') <- step mgr q
246 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
247 step mgr@EventManager{..} tq = do
248 (timeout, q') <- mkTimeout tq
249 I.poll emBackend timeout (onFdEvent mgr)
250 state <- readIORef emState
251 state `seq` return (state == Running, q')
254 -- | Call all expired timer callbacks and return the time to the
256 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
258 now <- getCurrentTime
259 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
260 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
261 sequence_ $ map Q.value expired
262 let timeout = case Q.minView q'' of
264 Just (Q.E _ t _, _) ->
265 -- This value will always be positive since the call
266 -- to 'atMost' above removed any timeouts <= 'now'
267 let t' = t - now in t' `seq` Timeout t'
268 return (timeout, q'')
270 ------------------------------------------------------------------------
271 -- Registering interest in I/O events
273 -- | Register interest in the given events, without waking the event
274 -- manager thread. The 'Bool' return value indicates whether the
275 -- event manager ought to be woken.
276 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
278 registerFd_ EventManager{..} cb fd evs = do
279 u <- newUnique emUniqueSource
280 modifyMVar emFds $ \oldMap -> do
281 let fd' = fromIntegral fd
283 !fdd = FdData reg evs cb
284 (!newMap, (oldEvs, newEvs)) =
285 case IM.insertWith (++) fd' [fdd] oldMap of
286 (Nothing, n) -> (n, (mempty, evs))
287 (Just prev, n) -> (n, pairEvents prev newMap fd')
288 modify = oldEvs /= newEvs
289 when modify $ I.modifyFd emBackend fd oldEvs newEvs
290 return (newMap, (reg, modify))
291 {-# INLINE registerFd_ #-}
293 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
294 -- on the file descriptor @fd@. @cb@ is called for each event that
295 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
296 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
297 registerFd mgr cb fd evs = do
298 (r, wake) <- registerFd_ mgr cb fd evs
299 when wake $ wakeManager mgr
301 {-# INLINE registerFd #-}
303 -- | Wake up the event manager.
304 wakeManager :: EventManager -> IO ()
305 wakeManager mgr = sendWakeup (emControl mgr)
307 eventsOf :: [FdData] -> Event
308 eventsOf = mconcat . map fdEvents
310 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
311 pairEvents prev m fd = let l = eventsOf prev
312 r = case IM.lookup fd m of
314 Just fds -> eventsOf fds
317 -- | Drop a previous file descriptor registration, without waking the
318 -- event manager thread. The return value indicates whether the event
319 -- manager ought to be woken.
320 unregisterFd_ :: EventManager -> FdKey -> IO Bool
321 unregisterFd_ EventManager{..} (FdKey fd u) =
322 modifyMVar emFds $ \oldMap -> do
323 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
326 fd' = fromIntegral fd
327 (!newMap, (oldEvs, newEvs)) =
328 case IM.updateWith dropReg fd' oldMap of
329 (Nothing, _) -> (oldMap, (mempty, mempty))
330 (Just prev, newm) -> (newm, pairEvents prev newm fd')
331 modify = oldEvs /= newEvs
332 when modify $ I.modifyFd emBackend fd oldEvs newEvs
333 return (newMap, modify)
335 -- | Drop a previous file descriptor registration.
336 unregisterFd :: EventManager -> FdKey -> IO ()
337 unregisterFd mgr reg = do
338 wake <- unregisterFd_ mgr reg
339 when wake $ wakeManager mgr
341 -- | Close a file descriptor in a race-safe way.
342 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
343 closeFd mgr close fd = do
344 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
346 case IM.delete (fromIntegral fd) oldMap of
347 (Nothing, _) -> return (oldMap, [])
348 (Just fds, !newMap) -> do
349 when (eventsOf fds /= mempty) $ wakeManager mgr
351 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
353 ------------------------------------------------------------------------
354 -- Registering interest in timeout events
356 -- | Register a timeout in the given number of microseconds. The
357 -- returned 'TimeoutKey' can be used to later unregister or update the
358 -- timeout. The timeout is automatically unregistered after the given
360 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
361 registerTimeout mgr us cb = do
362 !key <- newUnique (emUniqueSource mgr)
365 now <- getCurrentTime
366 let expTime = fromIntegral us / 1000000.0 + now
368 -- We intentionally do not evaluate the modified map to WHNF here.
369 -- Instead, we leave a thunk inside the IORef and defer its
370 -- evaluation until mkTimeout in the event loop. This is a
371 -- workaround for a nasty IORef contention problem that causes the
372 -- thread-delay benchmark to take 20 seconds instead of 0.2.
373 atomicModifyIORef (emTimeouts mgr) $ \f ->
374 let f' = (Q.insert key expTime cb) . f in (f', ())
378 -- | Unregister an active timeout.
379 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
380 unregisterTimeout mgr (TK key) = do
381 atomicModifyIORef (emTimeouts mgr) $ \f ->
382 let f' = (Q.delete key) . f in (f', ())
385 -- | Update an active timeout to fire in the given number of
387 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
388 updateTimeout mgr (TK key) us = do
389 now <- getCurrentTime
390 let expTime = fromIntegral us / 1000000.0 + now
392 atomicModifyIORef (emTimeouts mgr) $ \f ->
393 let f' = (Q.adjust (const expTime) key) . f in (f', ())
396 ------------------------------------------------------------------------
399 -- | Call the callbacks corresponding to the given file descriptor.
400 onFdEvent :: EventManager -> Fd -> Event -> IO ()
401 onFdEvent mgr fd evs = do
402 fds <- readMVar (emFds mgr)
403 case IM.lookup (fromIntegral fd) fds of
404 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
405 when (evs `I.eventIs` ev) $ cb reg evs