1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2 RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
19 -- * Registering interest in I/O events
31 -- * Registering interest in timeout events
39 #include "EventConfig.h"
41 ------------------------------------------------------------------------
44 import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_, newMVar,
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mappend, mconcat, mempty)
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral )
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
62 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
63 import System.Posix.Types (Fd)
65 import qualified System.Event.IntMap as IM
66 import qualified System.Event.Internal as I
67 import qualified System.Event.PSQ as Q
69 #if defined(HAVE_KQUEUE)
70 import qualified System.Event.KQueue as KQueue
71 #elif defined(HAVE_EPOLL)
72 import qualified System.Event.EPoll as EPoll
73 #elif defined(HAVE_POLL)
74 import qualified System.Event.Poll as Poll
76 # error not implemented for this operating system
79 ------------------------------------------------------------------------
82 data FdData = FdData {
83 fdKey :: {-# UNPACK #-} !FdKey
84 , fdEvents :: {-# UNPACK #-} !Event
85 , _fdCallback :: !IOCallback
88 -- | A file descriptor registration cookie.
90 keyFd :: {-# UNPACK #-} !Fd
91 , keyUnique :: {-# UNPACK #-} !Unique
94 -- | Callback invoked on I/O events.
95 type IOCallback = FdKey -> Event -> IO ()
97 instance Show IOCallback where
100 newtype TimeoutKey = TK Unique
103 -- | Callback invoked on timeout events.
104 type TimeoutCallback = IO ()
112 -- | A priority search queue, with timeouts as priorities.
113 type TimeoutQueue = Q.PSQ TimeoutCallback
116 Instead of directly modifying the 'TimeoutQueue' in
117 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
118 of a chain of function closures, and have the I/O manager thread
119 perform the edits later. This exist to address the following GC
122 Since e.g. 'registerTimeout' doesn't force the evaluation of the
123 thunks inside the 'emTimeouts' IORef a number of thunks build up
124 inside the IORef. If the I/O manager thread doesn't evaluate these
125 thunks soon enough they'll get promoted to the old generation and
126 become roots for all subsequent minor GCs.
128 When the thunks eventually get evaluated they will each create a new
129 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
130 the thunks serve as roots until the next major GC these intermediate
131 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
132 increasing GC time. This problem is known as "floating garbage".
134 Keeping a list of edits doesn't stop this from happening but makes the
135 amount of data that gets copied smaller.
137 TODO: Evaluate the content of the IORef to WHNF on each insert once
138 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
141 -- | An edit to apply to a 'TimeoutQueue'.
142 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
144 -- | The event manager state.
145 data EventManager = EventManager
146 { emBackend :: !Backend
147 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
148 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
149 , emState :: {-# UNPACK #-} !(IORef State)
150 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
151 , emControl :: {-# UNPACK #-} !Control
154 ------------------------------------------------------------------------
157 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
158 handleControlEvent mgr reg _evt = do
159 msg <- readControlMessage (emControl mgr) (keyFd reg)
161 CMsgWakeup -> return ()
162 CMsgDie -> writeIORef (emState mgr) Finished
163 CMsgSignal fp s -> runHandlers fp s
165 newDefaultBackend :: IO Backend
166 #if defined(HAVE_KQUEUE)
167 newDefaultBackend = KQueue.new
168 #elif defined(HAVE_EPOLL)
169 newDefaultBackend = EPoll.new
170 #elif defined(HAVE_POLL)
171 newDefaultBackend = Poll.new
173 newDefaultBackend = error "no back end for this platform"
176 -- | Create a new event manager.
177 new :: IO EventManager
178 new = newWith =<< newDefaultBackend
180 newWith :: Backend -> IO EventManager
182 iofds <- newMVar IM.empty
183 timeouts <- newIORef id
185 state <- newIORef Created
187 _ <- mkWeakIORef state $ do
188 st <- atomicModifyIORef state $ \s -> (Finished, s)
189 when (st /= Finished) $ do
192 let mgr = EventManager { emBackend = be
194 , emTimeouts = timeouts
196 , emUniqueSource = us
199 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
200 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
203 -- | Asynchronously shuts down the event manager, if running.
204 shutdown :: EventManager -> IO ()
206 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
207 when (state == Running) $ sendDie (emControl mgr)
209 finished :: EventManager -> IO Bool
210 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
212 cleanup :: EventManager -> IO ()
213 cleanup EventManager{..} = do
214 writeIORef emState Finished
216 closeControl emControl
218 ------------------------------------------------------------------------
221 -- | Start handling events. This function loops until told to stop.
223 -- /Note/: This loop can only be run once per 'EventManager', as it
224 -- closes all of its control resources when it finishes.
225 loop :: EventManager -> IO ()
226 loop mgr@EventManager{..} = do
227 state <- atomicModifyIORef emState $ \s -> case s of
228 Created -> (Running, s)
231 Created -> go Q.empty `finally` cleanup mgr
234 error $ "System.Event.Manager.loop: state is already " ++
237 go q = do (running, q') <- step mgr q
240 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
241 step mgr@EventManager{..} tq = do
242 (timeout, q') <- mkTimeout tq
243 I.poll emBackend timeout (onFdEvent mgr)
244 state <- readIORef emState
245 state `seq` return (state == Running, q')
248 -- | Call all expired timer callbacks and return the time to the
250 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
252 now <- getCurrentTime
253 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
254 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
255 sequence_ $ map Q.value expired
256 let timeout = case Q.minView q'' of
258 Just (Q.E _ t _, _) ->
259 -- This value will always be positive since the call
260 -- to 'atMost' above removed any timeouts <= 'now'
261 let t' = t - now in t' `seq` Timeout t'
262 return (timeout, q'')
264 ------------------------------------------------------------------------
265 -- Registering interest in I/O events
267 -- | Register interest in the given events, without waking the event
268 -- manager thread. The 'Bool' return value indicates whether the
269 -- event manager ought to be woken.
270 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
272 registerFd_ EventManager{..} cb fd evs = do
273 u <- newUnique emUniqueSource
274 modifyMVar emFds $ \oldMap -> do
275 let fd' = fromIntegral fd
277 !fdd = FdData reg evs cb
278 (!newMap, (oldEvs, newEvs)) =
279 case IM.insertWith (++) fd' [fdd] oldMap of
280 (Nothing, n) -> (n, (mempty, evs))
281 (Just prev, n) -> (n, pairEvents prev newMap fd')
282 modify = oldEvs /= newEvs
283 when modify $ I.modifyFd emBackend fd oldEvs newEvs
284 return (newMap, (reg, modify))
285 {-# INLINE registerFd_ #-}
287 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
288 -- on the file descriptor @fd@. @cb@ is called for each event that
289 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
290 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
291 registerFd mgr cb fd evs = do
292 (r, wake) <- registerFd_ mgr cb fd evs
293 when wake $ wakeManager mgr
295 {-# INLINE registerFd #-}
297 -- | Wake up the event manager.
298 wakeManager :: EventManager -> IO ()
299 wakeManager mgr = sendWakeup (emControl mgr)
301 eventsOf :: [FdData] -> Event
302 eventsOf = mconcat . map fdEvents
304 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
305 pairEvents prev m fd = let l = eventsOf prev
306 r = case IM.lookup fd m of
308 Just fds -> eventsOf fds
311 -- | Drop a previous file descriptor registration, without waking the
312 -- event manager thread. The return value indicates whether the event
313 -- manager ought to be woken.
314 unregisterFd_ :: EventManager -> FdKey -> IO Bool
315 unregisterFd_ EventManager{..} (FdKey fd u) =
316 modifyMVar emFds $ \oldMap -> do
317 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
320 fd' = fromIntegral fd
321 (!newMap, (oldEvs, newEvs)) =
322 case IM.updateWith dropReg fd' oldMap of
323 (Nothing, _) -> (oldMap, (mempty, mempty))
324 (Just prev, newm) -> (newm, pairEvents prev newm fd')
325 modify = oldEvs /= newEvs
326 when modify $ I.modifyFd emBackend fd oldEvs newEvs
327 return (newMap, modify)
329 -- | Drop a previous file descriptor registration.
330 unregisterFd :: EventManager -> FdKey -> IO ()
331 unregisterFd mgr reg = do
332 wake <- unregisterFd_ mgr reg
333 when wake $ wakeManager mgr
335 -- | Close a file descriptor in a race-safe way.
336 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
337 closeFd mgr close fd = do
338 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
340 case IM.delete (fromIntegral fd) oldMap of
341 (Nothing, _) -> return (oldMap, [])
342 (Just fds, !newMap) -> do
343 when (eventsOf fds /= mempty) $ wakeManager mgr
345 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
347 ------------------------------------------------------------------------
348 -- Registering interest in timeout events
350 -- | Register a timeout in the given number of microseconds.
351 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
352 registerTimeout mgr us cb = do
353 !key <- newUnique (emUniqueSource mgr)
356 now <- getCurrentTime
357 let expTime = fromIntegral us / 1000000.0 + now
359 -- We intentionally do not evaluate the modified map to WHNF here.
360 -- Instead, we leave a thunk inside the IORef and defer its
361 -- evaluation until mkTimeout in the event loop. This is a
362 -- workaround for a nasty IORef contention problem that causes the
363 -- thread-delay benchmark to take 20 seconds instead of 0.2.
364 atomicModifyIORef (emTimeouts mgr) $ \f ->
365 let f' = (Q.insert key expTime cb) . f in (f', ())
369 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
370 unregisterTimeout mgr (TK key) = do
371 atomicModifyIORef (emTimeouts mgr) $ \f ->
372 let f' = (Q.delete key) . f in (f', ())
375 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
376 updateTimeout mgr (TK key) us = do
377 now <- getCurrentTime
378 let expTime = fromIntegral us / 1000000.0 + now
380 atomicModifyIORef (emTimeouts mgr) $ \f ->
381 let f' = (Q.adjust (const expTime) key) . f in (f', ())
384 ------------------------------------------------------------------------
387 -- | Call the callbacks corresponding to the given file descriptor.
388 onFdEvent :: EventManager -> Fd -> Event -> IO ()
389 onFdEvent mgr fd evs = do
390 fds <- readMVar (emFds mgr)
391 case IM.lookup (fromIntegral fd) fds of
392 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
393 when (evs `I.eventIs` ev) $ cb reg evs