1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2 RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
19 -- * Registering interest in I/O events
31 -- * Registering interest in timeout events
39 #include "EventConfig.h"
41 ------------------------------------------------------------------------
44 import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_, newMVar,
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mconcat, mempty)
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral, fromRational)
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..))
61 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
62 import System.Posix.Types (Fd)
64 import qualified System.Event.IntMap as IM
65 import qualified System.Event.Internal as I
66 import qualified System.Event.PSQ as Q
68 #if defined(HAVE_KQUEUE)
69 import qualified System.Event.KQueue as KQueue
70 #elif defined(HAVE_EPOLL)
71 import qualified System.Event.EPoll as EPoll
72 #elif defined(HAVE_POLL)
73 import qualified System.Event.Poll as Poll
75 # error not implemented for this operating system
78 ------------------------------------------------------------------------
81 data FdData = FdData {
82 fdKey :: {-# UNPACK #-} !FdKey
83 , fdEvents :: {-# UNPACK #-} !Event
84 , _fdCallback :: !IOCallback
87 -- | A file descriptor registration cookie.
89 keyFd :: {-# UNPACK #-} !Fd
90 , keyUnique :: {-# UNPACK #-} !Unique
93 -- | Callback invoked on I/O events.
94 type IOCallback = FdKey -> Event -> IO ()
96 instance Show IOCallback where
99 newtype TimeoutKey = TK Unique
102 -- | Callback invoked on timeout events.
103 type TimeoutCallback = IO ()
111 -- | A priority search queue, with timeouts as priorities.
112 type TimeoutQueue = Q.PSQ TimeoutCallback
115 Instead of directly modifying the 'TimeoutQueue' in
116 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
117 of a chain of function closures, and have the I/O manager thread
118 perform the edits later. This exist to address the following GC
121 Since e.g. 'registerTimeout' doesn't force the evaluation of the
122 thunks inside the 'emTimeouts' IORef a number of thunks build up
123 inside the IORef. If the I/O manager thread doesn't evaluate these
124 thunks soon enough they'll get promoted to the old generation and
125 become roots for all subsequent minor GCs.
127 When the thunks eventually get evaluated they will each create a new
128 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
129 the thunks serve as roots until the next major GC these intermediate
130 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
131 increasing GC time. This problem is known as "floating garbage".
133 Keeping a list of edits doesn't stop this from happening but makes the
134 amount of data that gets copied smaller.
136 TODO: Evaluate the content of the IORef to WHNF on each insert once
137 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
140 -- | An edit to apply to a 'TimeoutQueue'.
141 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
143 -- | The event manager state.
144 data EventManager = EventManager
145 { emBackend :: !Backend
146 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
147 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
148 , emState :: {-# UNPACK #-} !(IORef State)
149 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
150 , emControl :: {-# UNPACK #-} !Control
153 ------------------------------------------------------------------------
156 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
157 handleControlEvent mgr reg _evt = do
158 msg <- readControlMessage (emControl mgr) (keyFd reg)
160 CMsgWakeup -> return ()
161 CMsgDie -> writeIORef (emState mgr) Finished
162 CMsgSignal fp s -> runHandlers fp s
164 newDefaultBackend :: IO Backend
165 #if defined(HAVE_KQUEUE)
166 newDefaultBackend = KQueue.new
167 #elif defined(HAVE_EPOLL)
168 newDefaultBackend = EPoll.new
169 #elif defined(HAVE_POLL)
170 newDefaultBackend = Poll.new
172 newDefaultBackend = error "no back end for this platform"
175 -- | Create a new event manager.
176 new :: IO EventManager
177 new = newWith =<< newDefaultBackend
179 newWith :: Backend -> IO EventManager
181 iofds <- newMVar IM.empty
182 timeouts <- newIORef id
184 state <- newIORef Created
186 _ <- mkWeakIORef state $ do
187 st <- atomicModifyIORef state $ \s -> (Finished, s)
188 when (st /= Finished) $ do
191 let mgr = EventManager { emBackend = be
193 , emTimeouts = timeouts
195 , emUniqueSource = us
198 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
199 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
202 -- | Asynchronously shuts down the event manager, if running.
203 shutdown :: EventManager -> IO ()
205 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
206 when (state == Running) $ sendDie (emControl mgr)
208 finished :: EventManager -> IO Bool
209 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
211 cleanup :: EventManager -> IO ()
212 cleanup EventManager{..} = do
213 writeIORef emState Finished
215 closeControl emControl
217 ------------------------------------------------------------------------
220 -- | Start handling events. This function loops until told to stop.
222 -- /Note/: This loop can only be run once per 'EventManager', as it
223 -- closes all of its control resources when it finishes.
224 loop :: EventManager -> IO ()
225 loop mgr@EventManager{..} = do
226 state <- atomicModifyIORef emState $ \s -> case s of
227 Created -> (Running, s)
230 Created -> go Q.empty `finally` cleanup mgr
233 error $ "System.Event.Manager.loop: state is already " ++
236 go q = do (running, q') <- step mgr q
239 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
240 step mgr@EventManager{..} tq = do
241 (timeout, q') <- mkTimeout tq
242 I.poll emBackend timeout (onFdEvent mgr)
243 state <- readIORef emState
244 state `seq` return (state == Running, q')
247 -- | Call all expired timer callbacks and return the time to the
249 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
251 now <- getCurrentTime
252 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
253 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
254 sequence_ $ map Q.value expired
255 let timeout = case Q.minView q'' of
257 Just (Q.E _ t _, _) ->
258 -- This value will always be positive since the call
259 -- to 'atMost' above removed any timeouts <= 'now'
260 let t' = t - now in t' `seq` Timeout t'
261 return (timeout, q'')
263 ------------------------------------------------------------------------
264 -- Registering interest in I/O events
266 -- | Register interest in the given events, without waking the event
267 -- manager thread. The 'Bool' return value indicates whether the
268 -- event manager ought to be woken.
269 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
271 registerFd_ EventManager{..} cb fd evs = do
272 u <- newUnique emUniqueSource
273 modifyMVar emFds $ \oldMap -> do
274 let fd' = fromIntegral fd
276 !fdd = FdData reg evs cb
277 (!newMap, (oldEvs, newEvs)) =
278 case IM.insertWith (++) fd' [fdd] oldMap of
279 (Nothing, n) -> (n, (mempty, evs))
280 (Just prev, n) -> (n, pairEvents prev newMap fd')
281 modify = oldEvs /= newEvs
282 when modify $ I.modifyFd emBackend fd oldEvs newEvs
283 return (newMap, (reg, modify))
284 {-# INLINE registerFd_ #-}
286 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
287 -- on the file descriptor @fd@. @cb@ is called for each event that
288 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
289 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
290 registerFd mgr cb fd evs = do
291 (r, wake) <- registerFd_ mgr cb fd evs
292 when wake $ wakeManager mgr
294 {-# INLINE registerFd #-}
296 -- | Wake up the event manager.
297 wakeManager :: EventManager -> IO ()
298 wakeManager mgr = sendWakeup (emControl mgr)
300 eventsOf :: [FdData] -> Event
301 eventsOf = mconcat . map fdEvents
303 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
304 pairEvents prev m fd = let l = eventsOf prev
305 r = case IM.lookup fd m of
307 Just fds -> eventsOf fds
310 -- | Drop a previous file descriptor registration, without waking the
311 -- event manager thread. The return value indicates whether the event
312 -- manager ought to be woken.
313 unregisterFd_ :: EventManager -> FdKey -> IO Bool
314 unregisterFd_ EventManager{..} (FdKey fd u) =
315 modifyMVar emFds $ \oldMap -> do
316 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
319 fd' = fromIntegral fd
320 (!newMap, (oldEvs, newEvs)) =
321 case IM.updateWith dropReg fd' oldMap of
322 (Nothing, _) -> (oldMap, (mempty, mempty))
323 (Just prev, newm) -> (newm, pairEvents prev newm fd')
324 modify = oldEvs /= newEvs
325 when modify $ I.modifyFd emBackend fd oldEvs newEvs
326 return (newMap, modify)
328 -- | Drop a previous file descriptor registration.
329 unregisterFd :: EventManager -> FdKey -> IO ()
330 unregisterFd mgr reg = do
331 wake <- unregisterFd_ mgr reg
332 when wake $ wakeManager mgr
334 -- | Notify the event manager that a file descriptor has been closed.
335 fdWasClosed :: EventManager -> Fd -> IO ()
337 modifyMVar_ (emFds mgr) $ \oldMap ->
338 case IM.delete (fromIntegral fd) oldMap of
339 (Nothing, _) -> return oldMap
340 (Just fds, !newMap) -> do
341 when (eventsOf fds /= mempty) $ wakeManager mgr
344 ------------------------------------------------------------------------
345 -- Registering interest in timeout events
347 -- | Register a timeout in the given number of microseconds.
348 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
349 registerTimeout mgr us cb = do
350 !key <- newUnique (emUniqueSource mgr)
353 now <- getCurrentTime
354 let expTime = fromIntegral us / 1000000.0 + now
356 -- We intentionally do not evaluate the modified map to WHNF here.
357 -- Instead, we leave a thunk inside the IORef and defer its
358 -- evaluation until mkTimeout in the event loop. This is a
359 -- workaround for a nasty IORef contention problem that causes the
360 -- thread-delay benchmark to take 20 seconds instead of 0.2.
361 atomicModifyIORef (emTimeouts mgr) $ \f ->
362 let f' = (Q.insert key expTime cb) . f in (f', ())
366 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
367 unregisterTimeout mgr (TK key) = do
368 atomicModifyIORef (emTimeouts mgr) $ \f ->
369 let f' = (Q.delete key) . f in (f', ())
372 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
373 updateTimeout mgr (TK key) us = do
374 now <- getCurrentTime
375 let expTime = fromIntegral us / 1000000.0 + now
377 atomicModifyIORef (emTimeouts mgr) $ \f ->
378 let f' = (Q.adjust (const expTime) key) . f in (f', ())
381 ------------------------------------------------------------------------
384 -- | Call the callbacks corresponding to the given file descriptor.
385 onFdEvent :: EventManager -> Fd -> Event -> IO ()
386 onFdEvent mgr fd evs = do
387 fds <- readMVar (emFds mgr)
388 case IM.lookup (fromIntegral fd) fds of
389 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
390 when (evs `I.eventIs` ev) $ cb reg evs