1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2 RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
19 -- * Registering interest in I/O events
31 -- * Registering interest in timeout events
39 #include "EventConfig.h"
41 ------------------------------------------------------------------------
44 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
45 import Control.Exception (finally)
46 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
47 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
49 import Data.Maybe (Maybe(..))
50 import Data.Monoid (mappend, mconcat, mempty)
52 import GHC.Conc.Signal (runHandlers)
53 import GHC.List (filter)
54 import GHC.Num (Num(..))
55 import GHC.Real ((/), fromIntegral )
56 import GHC.Show (Show(..))
57 import System.Event.Clock (getCurrentTime)
58 import System.Event.Control
59 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
61 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
62 import System.Posix.Types (Fd)
64 import qualified System.Event.IntMap as IM
65 import qualified System.Event.Internal as I
66 import qualified System.Event.PSQ as Q
68 #if defined(HAVE_KQUEUE)
69 import qualified System.Event.KQueue as KQueue
70 #elif defined(HAVE_EPOLL)
71 import qualified System.Event.EPoll as EPoll
72 #elif defined(HAVE_POLL)
73 import qualified System.Event.Poll as Poll
75 # error not implemented for this operating system
78 ------------------------------------------------------------------------
81 data FdData = FdData {
82 fdKey :: {-# UNPACK #-} !FdKey
83 , fdEvents :: {-# UNPACK #-} !Event
84 , _fdCallback :: !IOCallback
87 -- | A file descriptor registration cookie.
89 keyFd :: {-# UNPACK #-} !Fd
90 , keyUnique :: {-# UNPACK #-} !Unique
93 -- | Callback invoked on I/O events.
94 type IOCallback = FdKey -> Event -> IO ()
96 instance Show IOCallback where
99 newtype TimeoutKey = TK Unique
102 -- | Callback invoked on timeout events.
103 type TimeoutCallback = IO ()
111 -- | A priority search queue, with timeouts as priorities.
112 type TimeoutQueue = Q.PSQ TimeoutCallback
115 Instead of directly modifying the 'TimeoutQueue' in
116 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
117 of a chain of function closures, and have the I/O manager thread
118 perform the edits later. This exist to address the following GC
121 Since e.g. 'registerTimeout' doesn't force the evaluation of the
122 thunks inside the 'emTimeouts' IORef a number of thunks build up
123 inside the IORef. If the I/O manager thread doesn't evaluate these
124 thunks soon enough they'll get promoted to the old generation and
125 become roots for all subsequent minor GCs.
127 When the thunks eventually get evaluated they will each create a new
128 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
129 the thunks serve as roots until the next major GC these intermediate
130 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
131 increasing GC time. This problem is known as "floating garbage".
133 Keeping a list of edits doesn't stop this from happening but makes the
134 amount of data that gets copied smaller.
136 TODO: Evaluate the content of the IORef to WHNF on each insert once
137 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
140 -- | An edit to apply to a 'TimeoutQueue'.
141 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
143 -- | The event manager state.
144 data EventManager = EventManager
145 { emBackend :: !Backend
146 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
147 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
148 , emState :: {-# UNPACK #-} !(IORef State)
149 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
150 , emControl :: {-# UNPACK #-} !Control
153 ------------------------------------------------------------------------
156 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
157 handleControlEvent mgr reg _evt = do
158 msg <- readControlMessage (emControl mgr) (keyFd reg)
160 CMsgWakeup -> return ()
161 CMsgDie -> writeIORef (emState mgr) Finished
162 CMsgSignal fp s -> runHandlers fp s
164 newDefaultBackend :: IO Backend
165 #if defined(HAVE_KQUEUE)
166 newDefaultBackend = KQueue.new
167 #elif defined(HAVE_EPOLL)
168 newDefaultBackend = EPoll.new
169 #elif defined(HAVE_POLL)
170 newDefaultBackend = Poll.new
172 newDefaultBackend = error "no back end for this platform"
175 -- | Create a new event manager.
176 new :: IO EventManager
177 new = newWith =<< newDefaultBackend
179 newWith :: Backend -> IO EventManager
181 iofds <- newMVar IM.empty
182 timeouts <- newIORef id
184 state <- newIORef Created
186 _ <- mkWeakIORef state $ do
187 st <- atomicModifyIORef state $ \s -> (Finished, s)
188 when (st /= Finished) $ do
191 let mgr = EventManager { emBackend = be
193 , emTimeouts = timeouts
195 , emUniqueSource = us
198 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
199 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
202 -- | Asynchronously shuts down the event manager, if running.
203 shutdown :: EventManager -> IO ()
205 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
206 when (state == Running) $ sendDie (emControl mgr)
208 finished :: EventManager -> IO Bool
209 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
211 cleanup :: EventManager -> IO ()
212 cleanup EventManager{..} = do
213 writeIORef emState Finished
215 closeControl emControl
217 ------------------------------------------------------------------------
220 -- | Start handling events. This function loops until told to stop.
222 -- /Note/: This loop can only be run once per 'EventManager', as it
223 -- closes all of its control resources when it finishes.
224 loop :: EventManager -> IO ()
225 loop mgr@EventManager{..} = do
226 state <- atomicModifyIORef emState $ \s -> case s of
227 Created -> (Running, s)
230 Created -> go Q.empty `finally` cleanup mgr
233 error $ "System.Event.Manager.loop: state is already " ++
236 go q = do (running, q') <- step mgr q
239 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
240 step mgr@EventManager{..} tq = do
241 (timeout, q') <- mkTimeout tq
242 I.poll emBackend timeout (onFdEvent mgr)
243 state <- readIORef emState
244 state `seq` return (state == Running, q')
247 -- | Call all expired timer callbacks and return the time to the
249 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
251 now <- getCurrentTime
252 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
253 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
254 sequence_ $ map Q.value expired
255 let timeout = case Q.minView q'' of
257 Just (Q.E _ t _, _) ->
258 -- This value will always be positive since the call
259 -- to 'atMost' above removed any timeouts <= 'now'
260 let t' = t - now in t' `seq` Timeout t'
261 return (timeout, q'')
263 ------------------------------------------------------------------------
264 -- Registering interest in I/O events
266 -- | Register interest in the given events, without waking the event
267 -- manager thread. The 'Bool' return value indicates whether the
268 -- event manager ought to be woken.
269 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
271 registerFd_ EventManager{..} cb fd evs = do
272 u <- newUnique emUniqueSource
273 modifyMVar emFds $ \oldMap -> do
274 let fd' = fromIntegral fd
276 !fdd = FdData reg evs cb
277 (!newMap, (oldEvs, newEvs)) =
278 case IM.insertWith (++) fd' [fdd] oldMap of
279 (Nothing, n) -> (n, (mempty, evs))
280 (Just prev, n) -> (n, pairEvents prev newMap fd')
281 modify = oldEvs /= newEvs
282 when modify $ I.modifyFd emBackend fd oldEvs newEvs
283 return (newMap, (reg, modify))
284 {-# INLINE registerFd_ #-}
286 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
287 -- on the file descriptor @fd@. @cb@ is called for each event that
288 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
289 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
290 registerFd mgr cb fd evs = do
291 (r, wake) <- registerFd_ mgr cb fd evs
292 when wake $ wakeManager mgr
294 {-# INLINE registerFd #-}
296 -- | Wake up the event manager.
297 wakeManager :: EventManager -> IO ()
298 wakeManager mgr = sendWakeup (emControl mgr)
300 eventsOf :: [FdData] -> Event
301 eventsOf = mconcat . map fdEvents
303 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
304 pairEvents prev m fd = let l = eventsOf prev
305 r = case IM.lookup fd m of
307 Just fds -> eventsOf fds
310 -- | Drop a previous file descriptor registration, without waking the
311 -- event manager thread. The return value indicates whether the event
312 -- manager ought to be woken.
313 unregisterFd_ :: EventManager -> FdKey -> IO Bool
314 unregisterFd_ EventManager{..} (FdKey fd u) =
315 modifyMVar emFds $ \oldMap -> do
316 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
319 fd' = fromIntegral fd
320 (!newMap, (oldEvs, newEvs)) =
321 case IM.updateWith dropReg fd' oldMap of
322 (Nothing, _) -> (oldMap, (mempty, mempty))
323 (Just prev, newm) -> (newm, pairEvents prev newm fd')
324 modify = oldEvs /= newEvs
325 when modify $ I.modifyFd emBackend fd oldEvs newEvs
326 return (newMap, modify)
328 -- | Drop a previous file descriptor registration.
329 unregisterFd :: EventManager -> FdKey -> IO ()
330 unregisterFd mgr reg = do
331 wake <- unregisterFd_ mgr reg
332 when wake $ wakeManager mgr
334 -- | Close a file descriptor in a race-safe way.
335 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
336 closeFd mgr close fd = do
337 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
339 case IM.delete (fromIntegral fd) oldMap of
340 (Nothing, _) -> return (oldMap, [])
341 (Just fds, !newMap) -> do
342 when (eventsOf fds /= mempty) $ wakeManager mgr
344 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
346 ------------------------------------------------------------------------
347 -- Registering interest in timeout events
349 -- | Register a timeout in the given number of microseconds.
350 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
351 registerTimeout mgr us cb = do
352 !key <- newUnique (emUniqueSource mgr)
355 now <- getCurrentTime
356 let expTime = fromIntegral us / 1000000.0 + now
358 -- We intentionally do not evaluate the modified map to WHNF here.
359 -- Instead, we leave a thunk inside the IORef and defer its
360 -- evaluation until mkTimeout in the event loop. This is a
361 -- workaround for a nasty IORef contention problem that causes the
362 -- thread-delay benchmark to take 20 seconds instead of 0.2.
363 atomicModifyIORef (emTimeouts mgr) $ \f ->
364 let f' = (Q.insert key expTime cb) . f in (f', ())
368 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
369 unregisterTimeout mgr (TK key) = do
370 atomicModifyIORef (emTimeouts mgr) $ \f ->
371 let f' = (Q.delete key) . f in (f', ())
374 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
375 updateTimeout mgr (TK key) us = do
376 now <- getCurrentTime
377 let expTime = fromIntegral us / 1000000.0 + now
379 atomicModifyIORef (emTimeouts mgr) $ \f ->
380 let f' = (Q.adjust (const expTime) key) . f in (f', ())
383 ------------------------------------------------------------------------
386 -- | Call the callbacks corresponding to the given file descriptor.
387 onFdEvent :: EventManager -> Fd -> Event -> IO ()
388 onFdEvent mgr fd evs = do
389 fds <- readMVar (emFds mgr)
390 case IM.lookup (fromIntegral fd) fds of
391 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
392 when (evs `I.eventIs` ev) $ cb reg evs