1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2 RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
19 -- * Registering interest in I/O events
31 -- * Registering interest in timeout events
39 #include "EventConfig.h"
41 ------------------------------------------------------------------------
44 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
45 import Control.Exception (finally)
46 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
47 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
49 import Data.Maybe (Maybe(..))
50 import Data.Monoid (mappend, mconcat, mempty)
52 import GHC.Conc.Signal (runHandlers)
53 import GHC.List (filter)
54 import GHC.Num (Num(..))
55 import GHC.Real ((/), fromIntegral )
56 import GHC.Show (Show(..))
57 import System.Event.Clock (getCurrentTime)
58 import System.Event.Control
59 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
61 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
62 import System.Posix.Types (Fd)
64 import qualified System.Event.IntMap as IM
65 import qualified System.Event.Internal as I
66 import qualified System.Event.PSQ as Q
68 #if defined(HAVE_KQUEUE)
69 import qualified System.Event.KQueue as KQueue
70 #elif defined(HAVE_EPOLL)
71 import qualified System.Event.EPoll as EPoll
72 #elif defined(HAVE_POLL)
73 import qualified System.Event.Poll as Poll
75 # error not implemented for this operating system
78 ------------------------------------------------------------------------
81 data FdData = FdData {
82 fdKey :: {-# UNPACK #-} !FdKey
83 , fdEvents :: {-# UNPACK #-} !Event
84 , _fdCallback :: !IOCallback
87 -- | A file descriptor registration cookie.
89 keyFd :: {-# UNPACK #-} !Fd
90 , keyUnique :: {-# UNPACK #-} !Unique
93 -- | Callback invoked on I/O events.
94 type IOCallback = FdKey -> Event -> IO ()
96 instance Show IOCallback where
99 -- | A timeout registration cookie.
100 newtype TimeoutKey = TK Unique
103 -- | Callback invoked on timeout events.
104 type TimeoutCallback = IO ()
112 -- | A priority search queue, with timeouts as priorities.
113 type TimeoutQueue = Q.PSQ TimeoutCallback
116 Instead of directly modifying the 'TimeoutQueue' in
117 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
118 of a chain of function closures, and have the I/O manager thread
119 perform the edits later. This exist to address the following GC
122 Since e.g. 'registerTimeout' doesn't force the evaluation of the
123 thunks inside the 'emTimeouts' IORef a number of thunks build up
124 inside the IORef. If the I/O manager thread doesn't evaluate these
125 thunks soon enough they'll get promoted to the old generation and
126 become roots for all subsequent minor GCs.
128 When the thunks eventually get evaluated they will each create a new
129 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
130 the thunks serve as roots until the next major GC these intermediate
131 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
132 increasing GC time. This problem is known as "floating garbage".
134 Keeping a list of edits doesn't stop this from happening but makes the
135 amount of data that gets copied smaller.
137 TODO: Evaluate the content of the IORef to WHNF on each insert once
138 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
141 -- | An edit to apply to a 'TimeoutQueue'.
142 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
144 -- | The event manager state.
145 data EventManager = EventManager
146 { emBackend :: !Backend
147 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
148 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
149 , emState :: {-# UNPACK #-} !(IORef State)
150 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
151 , emControl :: {-# UNPACK #-} !Control
154 ------------------------------------------------------------------------
157 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
158 handleControlEvent mgr reg _evt = do
159 msg <- readControlMessage (emControl mgr) (keyFd reg)
161 CMsgWakeup -> return ()
162 CMsgDie -> writeIORef (emState mgr) Finished
163 CMsgSignal fp s -> runHandlers fp s
165 newDefaultBackend :: IO Backend
166 #if defined(HAVE_KQUEUE)
167 newDefaultBackend = KQueue.new
168 #elif defined(HAVE_EPOLL)
169 newDefaultBackend = EPoll.new
170 #elif defined(HAVE_POLL)
171 newDefaultBackend = Poll.new
173 newDefaultBackend = error "no back end for this platform"
176 -- | Create a new event manager.
177 new :: IO EventManager
178 new = newWith =<< newDefaultBackend
180 newWith :: Backend -> IO EventManager
182 iofds <- newMVar IM.empty
183 timeouts <- newIORef id
185 state <- newIORef Created
187 _ <- mkWeakIORef state $ do
188 st <- atomicModifyIORef state $ \s -> (Finished, s)
189 when (st /= Finished) $ do
192 let mgr = EventManager { emBackend = be
194 , emTimeouts = timeouts
196 , emUniqueSource = us
199 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
200 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
203 -- | Asynchronously shuts down the event manager, if running.
204 shutdown :: EventManager -> IO ()
206 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
207 when (state == Running) $ sendDie (emControl mgr)
209 finished :: EventManager -> IO Bool
210 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
212 cleanup :: EventManager -> IO ()
213 cleanup EventManager{..} = do
214 writeIORef emState Finished
216 closeControl emControl
218 ------------------------------------------------------------------------
221 -- | Start handling events. This function loops until told to stop,
224 -- /Note/: This loop can only be run once per 'EventManager', as it
225 -- closes all of its control resources when it finishes.
226 loop :: EventManager -> IO ()
227 loop mgr@EventManager{..} = do
228 state <- atomicModifyIORef emState $ \s -> case s of
229 Created -> (Running, s)
232 Created -> go Q.empty `finally` cleanup mgr
235 error $ "System.Event.Manager.loop: state is already " ++
238 go q = do (running, q') <- step mgr q
241 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
242 step mgr@EventManager{..} tq = do
243 (timeout, q') <- mkTimeout tq
244 I.poll emBackend timeout (onFdEvent mgr)
245 state <- readIORef emState
246 state `seq` return (state == Running, q')
249 -- | Call all expired timer callbacks and return the time to the
251 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
253 now <- getCurrentTime
254 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
255 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
256 sequence_ $ map Q.value expired
257 let timeout = case Q.minView q'' of
259 Just (Q.E _ t _, _) ->
260 -- This value will always be positive since the call
261 -- to 'atMost' above removed any timeouts <= 'now'
262 let t' = t - now in t' `seq` Timeout t'
263 return (timeout, q'')
265 ------------------------------------------------------------------------
266 -- Registering interest in I/O events
268 -- | Register interest in the given events, without waking the event
269 -- manager thread. The 'Bool' return value indicates whether the
270 -- event manager ought to be woken.
271 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
273 registerFd_ EventManager{..} cb fd evs = do
274 u <- newUnique emUniqueSource
275 modifyMVar emFds $ \oldMap -> do
276 let fd' = fromIntegral fd
278 !fdd = FdData reg evs cb
279 (!newMap, (oldEvs, newEvs)) =
280 case IM.insertWith (++) fd' [fdd] oldMap of
281 (Nothing, n) -> (n, (mempty, evs))
282 (Just prev, n) -> (n, pairEvents prev newMap fd')
283 modify = oldEvs /= newEvs
284 when modify $ I.modifyFd emBackend fd oldEvs newEvs
285 return (newMap, (reg, modify))
286 {-# INLINE registerFd_ #-}
288 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
289 -- on the file descriptor @fd@. @cb@ is called for each event that
290 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
291 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
292 registerFd mgr cb fd evs = do
293 (r, wake) <- registerFd_ mgr cb fd evs
294 when wake $ wakeManager mgr
296 {-# INLINE registerFd #-}
298 -- | Wake up the event manager.
299 wakeManager :: EventManager -> IO ()
300 wakeManager mgr = sendWakeup (emControl mgr)
302 eventsOf :: [FdData] -> Event
303 eventsOf = mconcat . map fdEvents
305 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
306 pairEvents prev m fd = let l = eventsOf prev
307 r = case IM.lookup fd m of
309 Just fds -> eventsOf fds
312 -- | Drop a previous file descriptor registration, without waking the
313 -- event manager thread. The return value indicates whether the event
314 -- manager ought to be woken.
315 unregisterFd_ :: EventManager -> FdKey -> IO Bool
316 unregisterFd_ EventManager{..} (FdKey fd u) =
317 modifyMVar emFds $ \oldMap -> do
318 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
321 fd' = fromIntegral fd
322 (!newMap, (oldEvs, newEvs)) =
323 case IM.updateWith dropReg fd' oldMap of
324 (Nothing, _) -> (oldMap, (mempty, mempty))
325 (Just prev, newm) -> (newm, pairEvents prev newm fd')
326 modify = oldEvs /= newEvs
327 when modify $ I.modifyFd emBackend fd oldEvs newEvs
328 return (newMap, modify)
330 -- | Drop a previous file descriptor registration.
331 unregisterFd :: EventManager -> FdKey -> IO ()
332 unregisterFd mgr reg = do
333 wake <- unregisterFd_ mgr reg
334 when wake $ wakeManager mgr
336 -- | Close a file descriptor in a race-safe way.
337 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
338 closeFd mgr close fd = do
339 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
341 case IM.delete (fromIntegral fd) oldMap of
342 (Nothing, _) -> return (oldMap, [])
343 (Just fds, !newMap) -> do
344 when (eventsOf fds /= mempty) $ wakeManager mgr
346 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
348 ------------------------------------------------------------------------
349 -- Registering interest in timeout events
351 -- | Register a timeout in the given number of microseconds. The
352 -- returned 'TimeoutKey' can be used to later unregister or update the
353 -- timeout. The timeout is automatically unregistered after the given
355 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
356 registerTimeout mgr us cb = do
357 !key <- newUnique (emUniqueSource mgr)
360 now <- getCurrentTime
361 let expTime = fromIntegral us / 1000000.0 + now
363 -- We intentionally do not evaluate the modified map to WHNF here.
364 -- Instead, we leave a thunk inside the IORef and defer its
365 -- evaluation until mkTimeout in the event loop. This is a
366 -- workaround for a nasty IORef contention problem that causes the
367 -- thread-delay benchmark to take 20 seconds instead of 0.2.
368 atomicModifyIORef (emTimeouts mgr) $ \f ->
369 let f' = (Q.insert key expTime cb) . f in (f', ())
373 -- | Unregister an active timeout.
374 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
375 unregisterTimeout mgr (TK key) = do
376 atomicModifyIORef (emTimeouts mgr) $ \f ->
377 let f' = (Q.delete key) . f in (f', ())
380 -- | Update an active timeout to fire in the given number of
382 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
383 updateTimeout mgr (TK key) us = do
384 now <- getCurrentTime
385 let expTime = fromIntegral us / 1000000.0 + now
387 atomicModifyIORef (emTimeouts mgr) $ \f ->
388 let f' = (Q.adjust (const expTime) key) . f in (f', ())
391 ------------------------------------------------------------------------
394 -- | Call the callbacks corresponding to the given file descriptor.
395 onFdEvent :: EventManager -> Fd -> Event -> IO ()
396 onFdEvent mgr fd evs = do
397 fds <- readMVar (emFds mgr)
398 case IM.lookup (fromIntegral fd) fds of
399 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
400 when (evs `I.eventIs` ev) $ cb reg evs