1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2 RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
20 -- * Registering interest in I/O events
32 -- * Registering interest in timeout events
40 #include "EventConfig.h"
42 ------------------------------------------------------------------------
45 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mappend, mconcat, mempty)
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral )
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
62 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
63 import System.Posix.Types (Fd)
65 import qualified System.Event.IntMap as IM
66 import qualified System.Event.Internal as I
67 import qualified System.Event.PSQ as Q
69 #if defined(HAVE_KQUEUE)
70 import qualified System.Event.KQueue as KQueue
71 #elif defined(HAVE_EPOLL)
72 import qualified System.Event.EPoll as EPoll
73 #elif defined(HAVE_POLL)
74 import qualified System.Event.Poll as Poll
76 # error not implemented for this operating system
79 ------------------------------------------------------------------------
82 data FdData = FdData {
83 fdKey :: {-# UNPACK #-} !FdKey
84 , fdEvents :: {-# UNPACK #-} !Event
85 , _fdCallback :: !IOCallback
88 -- | A file descriptor registration cookie.
90 keyFd :: {-# UNPACK #-} !Fd
91 , keyUnique :: {-# UNPACK #-} !Unique
94 -- | Callback invoked on I/O events.
95 type IOCallback = FdKey -> Event -> IO ()
97 instance Show IOCallback where
100 -- | A timeout registration cookie.
101 newtype TimeoutKey = TK Unique
104 -- | Callback invoked on timeout events.
105 type TimeoutCallback = IO ()
113 -- | A priority search queue, with timeouts as priorities.
114 type TimeoutQueue = Q.PSQ TimeoutCallback
117 Instead of directly modifying the 'TimeoutQueue' in
118 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
119 of a chain of function closures, and have the I/O manager thread
120 perform the edits later. This exist to address the following GC
123 Since e.g. 'registerTimeout' doesn't force the evaluation of the
124 thunks inside the 'emTimeouts' IORef a number of thunks build up
125 inside the IORef. If the I/O manager thread doesn't evaluate these
126 thunks soon enough they'll get promoted to the old generation and
127 become roots for all subsequent minor GCs.
129 When the thunks eventually get evaluated they will each create a new
130 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
131 the thunks serve as roots until the next major GC these intermediate
132 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
133 increasing GC time. This problem is known as "floating garbage".
135 Keeping a list of edits doesn't stop this from happening but makes the
136 amount of data that gets copied smaller.
138 TODO: Evaluate the content of the IORef to WHNF on each insert once
139 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
142 -- | An edit to apply to a 'TimeoutQueue'.
143 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
145 -- | The event manager state.
146 data EventManager = EventManager
147 { emBackend :: !Backend
148 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
149 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
150 , emState :: {-# UNPACK #-} !(IORef State)
151 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
152 , emControl :: {-# UNPACK #-} !Control
155 ------------------------------------------------------------------------
158 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
159 handleControlEvent mgr reg _evt = do
160 msg <- readControlMessage (emControl mgr) (keyFd reg)
162 CMsgWakeup -> return ()
163 CMsgDie -> writeIORef (emState mgr) Finished
164 CMsgSignal fp s -> runHandlers fp s
166 newDefaultBackend :: IO Backend
167 #if defined(HAVE_KQUEUE)
168 newDefaultBackend = KQueue.new
169 #elif defined(HAVE_EPOLL)
170 newDefaultBackend = EPoll.new
171 #elif defined(HAVE_POLL)
172 newDefaultBackend = Poll.new
174 newDefaultBackend = error "no back end for this platform"
177 -- | Create a new event manager.
178 new :: IO EventManager
179 new = newWith =<< newDefaultBackend
181 newWith :: Backend -> IO EventManager
183 iofds <- newMVar IM.empty
184 timeouts <- newIORef id
186 state <- newIORef Created
188 _ <- mkWeakIORef state $ do
189 st <- atomicModifyIORef state $ \s -> (Finished, s)
190 when (st /= Finished) $ do
193 let mgr = EventManager { emBackend = be
195 , emTimeouts = timeouts
197 , emUniqueSource = us
200 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
201 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
204 -- | Asynchronously shuts down the event manager, if running.
205 shutdown :: EventManager -> IO ()
207 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
208 when (state == Running) $ sendDie (emControl mgr)
210 finished :: EventManager -> IO Bool
211 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
213 cleanup :: EventManager -> IO ()
214 cleanup EventManager{..} = do
215 writeIORef emState Finished
217 closeControl emControl
219 ------------------------------------------------------------------------
222 -- | Start handling events. This function loops until told to stop,
225 -- /Note/: This loop can only be run once per 'EventManager', as it
226 -- closes all of its control resources when it finishes.
227 loop :: EventManager -> IO ()
228 loop mgr@EventManager{..} = do
229 state <- atomicModifyIORef emState $ \s -> case s of
230 Created -> (Running, s)
233 Created -> go Q.empty `finally` cleanup mgr
236 error $ "System.Event.Manager.loop: state is already " ++
239 go q = do (running, q') <- step mgr q
242 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
243 step mgr@EventManager{..} tq = do
244 (timeout, q') <- mkTimeout tq
245 I.poll emBackend timeout (onFdEvent mgr)
246 state <- readIORef emState
247 state `seq` return (state == Running, q')
250 -- | Call all expired timer callbacks and return the time to the
252 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
254 now <- getCurrentTime
255 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
256 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
257 sequence_ $ map Q.value expired
258 let timeout = case Q.minView q'' of
260 Just (Q.E _ t _, _) ->
261 -- This value will always be positive since the call
262 -- to 'atMost' above removed any timeouts <= 'now'
263 let t' = t - now in t' `seq` Timeout t'
264 return (timeout, q'')
266 ------------------------------------------------------------------------
267 -- Registering interest in I/O events
269 -- | Register interest in the given events, without waking the event
270 -- manager thread. The 'Bool' return value indicates whether the
271 -- event manager ought to be woken.
272 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
274 registerFd_ EventManager{..} cb fd evs = do
275 u <- newUnique emUniqueSource
276 modifyMVar emFds $ \oldMap -> do
277 let fd' = fromIntegral fd
279 !fdd = FdData reg evs cb
280 (!newMap, (oldEvs, newEvs)) =
281 case IM.insertWith (++) fd' [fdd] oldMap of
282 (Nothing, n) -> (n, (mempty, evs))
283 (Just prev, n) -> (n, pairEvents prev newMap fd')
284 modify = oldEvs /= newEvs
285 when modify $ I.modifyFd emBackend fd oldEvs newEvs
286 return (newMap, (reg, modify))
287 {-# INLINE registerFd_ #-}
289 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
290 -- on the file descriptor @fd@. @cb@ is called for each event that
291 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
292 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
293 registerFd mgr cb fd evs = do
294 (r, wake) <- registerFd_ mgr cb fd evs
295 when wake $ wakeManager mgr
297 {-# INLINE registerFd #-}
299 -- | Wake up the event manager.
300 wakeManager :: EventManager -> IO ()
301 wakeManager mgr = sendWakeup (emControl mgr)
303 eventsOf :: [FdData] -> Event
304 eventsOf = mconcat . map fdEvents
306 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
307 pairEvents prev m fd = let l = eventsOf prev
308 r = case IM.lookup fd m of
310 Just fds -> eventsOf fds
313 -- | Drop a previous file descriptor registration, without waking the
314 -- event manager thread. The return value indicates whether the event
315 -- manager ought to be woken.
316 unregisterFd_ :: EventManager -> FdKey -> IO Bool
317 unregisterFd_ EventManager{..} (FdKey fd u) =
318 modifyMVar emFds $ \oldMap -> do
319 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
322 fd' = fromIntegral fd
323 (!newMap, (oldEvs, newEvs)) =
324 case IM.updateWith dropReg fd' oldMap of
325 (Nothing, _) -> (oldMap, (mempty, mempty))
326 (Just prev, newm) -> (newm, pairEvents prev newm fd')
327 modify = oldEvs /= newEvs
328 when modify $ I.modifyFd emBackend fd oldEvs newEvs
329 return (newMap, modify)
331 -- | Drop a previous file descriptor registration.
332 unregisterFd :: EventManager -> FdKey -> IO ()
333 unregisterFd mgr reg = do
334 wake <- unregisterFd_ mgr reg
335 when wake $ wakeManager mgr
337 -- | Close a file descriptor in a race-safe way.
338 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
339 closeFd mgr close fd = do
340 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
342 case IM.delete (fromIntegral fd) oldMap of
343 (Nothing, _) -> return (oldMap, [])
344 (Just fds, !newMap) -> do
345 when (eventsOf fds /= mempty) $ wakeManager mgr
347 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
349 ------------------------------------------------------------------------
350 -- Registering interest in timeout events
352 -- | Register a timeout in the given number of microseconds. The
353 -- returned 'TimeoutKey' can be used to later unregister or update the
354 -- timeout. The timeout is automatically unregistered after the given
356 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
357 registerTimeout mgr us cb = do
358 !key <- newUnique (emUniqueSource mgr)
361 now <- getCurrentTime
362 let expTime = fromIntegral us / 1000000.0 + now
364 -- We intentionally do not evaluate the modified map to WHNF here.
365 -- Instead, we leave a thunk inside the IORef and defer its
366 -- evaluation until mkTimeout in the event loop. This is a
367 -- workaround for a nasty IORef contention problem that causes the
368 -- thread-delay benchmark to take 20 seconds instead of 0.2.
369 atomicModifyIORef (emTimeouts mgr) $ \f ->
370 let f' = (Q.insert key expTime cb) . f in (f', ())
374 -- | Unregister an active timeout.
375 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
376 unregisterTimeout mgr (TK key) = do
377 atomicModifyIORef (emTimeouts mgr) $ \f ->
378 let f' = (Q.delete key) . f in (f', ())
381 -- | Update an active timeout to fire in the given number of
383 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
384 updateTimeout mgr (TK key) us = do
385 now <- getCurrentTime
386 let expTime = fromIntegral us / 1000000.0 + now
388 atomicModifyIORef (emTimeouts mgr) $ \f ->
389 let f' = (Q.adjust (const expTime) key) . f in (f', ())
392 ------------------------------------------------------------------------
395 -- | Call the callbacks corresponding to the given file descriptor.
396 onFdEvent :: EventManager -> Fd -> Event -> IO ()
397 onFdEvent mgr fd evs = do
398 fds <- readMVar (emFds mgr)
399 case IM.lookup (fromIntegral fd) fds of
400 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
401 when (evs `I.eventIs` ev) $ cb reg evs