46569ebbe9b7f3829156688f31c738579750c435
[ghc-base.git] / System / Event / Manager.hs
1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2     RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
4     ( -- * Types
5       EventManager
6
7       -- * Creation
8     , new
9     , newWith
10     , newDefaultBackend
11
12       -- * Running
13     , finished
14     , loop
15     , step
16     , shutdown
17     , wakeManager
18
19       -- * Registering interest in I/O events
20     , Event
21     , evtRead
22     , evtWrite
23     , IOCallback
24     , FdKey(keyFd)
25     , registerFd_
26     , registerFd
27     , unregisterFd_
28     , unregisterFd
29     , fdWasClosed
30
31       -- * Registering interest in timeout events
32     , TimeoutCallback
33     , TimeoutKey
34     , registerTimeout
35     , updateTimeout
36     , unregisterTimeout
37     ) where
38
39 #include "EventConfig.h"
40
41 ------------------------------------------------------------------------
42 -- Imports
43
44 import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_, newMVar,
45                                 readMVar)
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
49                    writeIORef)
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mconcat, mempty)
52 import GHC.Base
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral )
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..))
61 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
62 import System.Posix.Types (Fd)
63
64 import qualified System.Event.IntMap as IM
65 import qualified System.Event.Internal as I
66 import qualified System.Event.PSQ as Q
67
68 #if defined(HAVE_KQUEUE)
69 import qualified System.Event.KQueue as KQueue
70 #elif defined(HAVE_EPOLL)
71 import qualified System.Event.EPoll  as EPoll
72 #elif defined(HAVE_POLL)
73 import qualified System.Event.Poll   as Poll
74 #else
75 # error not implemented for this operating system
76 #endif
77
78 ------------------------------------------------------------------------
79 -- Types
80
81 data FdData = FdData {
82       fdKey       :: {-# UNPACK #-} !FdKey
83     , fdEvents    :: {-# UNPACK #-} !Event
84     , _fdCallback :: !IOCallback
85     } deriving (Show)
86
87 -- | A file descriptor registration cookie.
88 data FdKey = FdKey {
89       keyFd     :: {-# UNPACK #-} !Fd
90     , keyUnique :: {-# UNPACK #-} !Unique
91     } deriving (Eq, Show)
92
93 -- | Callback invoked on I/O events.
94 type IOCallback = FdKey -> Event -> IO ()
95
96 instance Show IOCallback where
97     show _ = "IOCallback"
98
99 newtype TimeoutKey   = TK Unique
100     deriving (Eq)
101
102 -- | Callback invoked on timeout events.
103 type TimeoutCallback = IO ()
104
105 data State = Created
106            | Running
107            | Dying
108            | Finished
109              deriving (Eq, Show)
110
111 -- | A priority search queue, with timeouts as priorities.
112 type TimeoutQueue = Q.PSQ TimeoutCallback
113
114 {-
115 Instead of directly modifying the 'TimeoutQueue' in
116 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
117 of a chain of function closures, and have the I/O manager thread
118 perform the edits later.  This exist to address the following GC
119 problem:
120
121 Since e.g. 'registerTimeout' doesn't force the evaluation of the
122 thunks inside the 'emTimeouts' IORef a number of thunks build up
123 inside the IORef.  If the I/O manager thread doesn't evaluate these
124 thunks soon enough they'll get promoted to the old generation and
125 become roots for all subsequent minor GCs.
126
127 When the thunks eventually get evaluated they will each create a new
128 intermediate 'TimeoutQueue' that immediately becomes garbage.  Since
129 the thunks serve as roots until the next major GC these intermediate
130 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
131 increasing GC time.  This problem is known as "floating garbage".
132
133 Keeping a list of edits doesn't stop this from happening but makes the
134 amount of data that gets copied smaller.
135
136 TODO: Evaluate the content of the IORef to WHNF on each insert once
137 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
138 -}
139
140 -- | An edit to apply to a 'TimeoutQueue'.
141 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
142
143 -- | The event manager state.
144 data EventManager = EventManager
145     { emBackend      :: !Backend
146     , emFds          :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
147     , emTimeouts     :: {-# UNPACK #-} !(IORef TimeoutEdit)
148     , emState        :: {-# UNPACK #-} !(IORef State)
149     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
150     , emControl      :: {-# UNPACK #-} !Control
151     }
152
153 ------------------------------------------------------------------------
154 -- Creation
155
156 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
157 handleControlEvent mgr reg _evt = do
158   msg <- readControlMessage (emControl mgr) (keyFd reg)
159   case msg of
160     CMsgWakeup      -> return ()
161     CMsgDie         -> writeIORef (emState mgr) Finished
162     CMsgSignal fp s -> runHandlers fp s
163
164 newDefaultBackend :: IO Backend
165 #if defined(HAVE_KQUEUE)
166 newDefaultBackend = KQueue.new
167 #elif defined(HAVE_EPOLL)
168 newDefaultBackend = EPoll.new
169 #elif defined(HAVE_POLL)
170 newDefaultBackend = Poll.new
171 #else
172 newDefaultBackend = error "no back end for this platform"
173 #endif
174
175 -- | Create a new event manager.
176 new :: IO EventManager
177 new = newWith =<< newDefaultBackend
178
179 newWith :: Backend -> IO EventManager
180 newWith be = do
181   iofds <- newMVar IM.empty
182   timeouts <- newIORef id
183   ctrl <- newControl
184   state <- newIORef Created
185   us <- newSource
186   _ <- mkWeakIORef state $ do
187                st <- atomicModifyIORef state $ \s -> (Finished, s)
188                when (st /= Finished) $ do
189                  I.delete be
190                  closeControl ctrl
191   let mgr = EventManager { emBackend = be
192                          , emFds = iofds
193                          , emTimeouts = timeouts
194                          , emState = state
195                          , emUniqueSource = us
196                          , emControl = ctrl
197                          }
198   _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
199   _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
200   return mgr
201
202 -- | Asynchronously shuts down the event manager, if running.
203 shutdown :: EventManager -> IO ()
204 shutdown mgr = do
205   state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
206   when (state == Running) $ sendDie (emControl mgr)
207
208 finished :: EventManager -> IO Bool
209 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
210
211 cleanup :: EventManager -> IO ()
212 cleanup EventManager{..} = do
213   writeIORef emState Finished
214   I.delete emBackend
215   closeControl emControl
216
217 ------------------------------------------------------------------------
218 -- Event loop
219
220 -- | Start handling events.  This function loops until told to stop.
221 --
222 -- /Note/: This loop can only be run once per 'EventManager', as it
223 -- closes all of its control resources when it finishes.
224 loop :: EventManager -> IO ()
225 loop mgr@EventManager{..} = do
226   state <- atomicModifyIORef emState $ \s -> case s of
227     Created -> (Running, s)
228     _       -> (s, s)
229   case state of
230     Created -> go Q.empty `finally` cleanup mgr
231     Dying   -> cleanup mgr
232     _       -> do cleanup mgr
233                   error $ "System.Event.Manager.loop: state is already " ++
234                       show state
235  where
236   go q = do (running, q') <- step mgr q
237             when running $ go q'
238
239 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
240 step mgr@EventManager{..} tq = do
241   (timeout, q') <- mkTimeout tq
242   I.poll emBackend timeout (onFdEvent mgr)
243   state <- readIORef emState
244   state `seq` return (state == Running, q')
245  where
246
247   -- | Call all expired timer callbacks and return the time to the
248   -- next timeout.
249   mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
250   mkTimeout q = do
251       now <- getCurrentTime
252       applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
253       let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
254       sequence_ $ map Q.value expired
255       let timeout = case Q.minView q'' of
256             Nothing             -> Forever
257             Just (Q.E _ t _, _) ->
258                 -- This value will always be positive since the call
259                 -- to 'atMost' above removed any timeouts <= 'now'
260                 let t' = t - now in t' `seq` Timeout t'
261       return (timeout, q'')
262
263 ------------------------------------------------------------------------
264 -- Registering interest in I/O events
265
266 -- | Register interest in the given events, without waking the event
267 -- manager thread.  The 'Bool' return value indicates whether the
268 -- event manager ought to be woken.
269 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
270             -> IO (FdKey, Bool)
271 registerFd_ EventManager{..} cb fd evs = do
272   u <- newUnique emUniqueSource
273   modifyMVar emFds $ \oldMap -> do
274     let fd'  = fromIntegral fd
275         reg  = FdKey fd u
276         !fdd = FdData reg evs cb
277         (!newMap, (oldEvs, newEvs)) =
278             case IM.insertWith (++) fd' [fdd] oldMap of
279               (Nothing,   n) -> (n, (mempty, evs))
280               (Just prev, n) -> (n, pairEvents prev newMap fd')
281         modify = oldEvs /= newEvs
282     when modify $ I.modifyFd emBackend fd oldEvs newEvs
283     return (newMap, (reg, modify))
284 {-# INLINE registerFd_ #-}
285
286 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
287 -- on the file descriptor @fd@.  @cb@ is called for each event that
288 -- occurs.  Returns a cookie that can be handed to 'unregisterFd'.
289 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
290 registerFd mgr cb fd evs = do
291   (r, wake) <- registerFd_ mgr cb fd evs
292   when wake $ wakeManager mgr
293   return r
294 {-# INLINE registerFd #-}
295
296 -- | Wake up the event manager.
297 wakeManager :: EventManager -> IO ()
298 wakeManager mgr = sendWakeup (emControl mgr)
299
300 eventsOf :: [FdData] -> Event
301 eventsOf = mconcat . map fdEvents
302
303 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
304 pairEvents prev m fd = let l = eventsOf prev
305                            r = case IM.lookup fd m of
306                                  Nothing  -> mempty
307                                  Just fds -> eventsOf fds
308                        in (l, r)
309
310 -- | Drop a previous file descriptor registration, without waking the
311 -- event manager thread.  The return value indicates whether the event
312 -- manager ought to be woken.
313 unregisterFd_ :: EventManager -> FdKey -> IO Bool
314 unregisterFd_ EventManager{..} (FdKey fd u) =
315   modifyMVar emFds $ \oldMap -> do
316     let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
317                           []   -> Nothing
318                           cbs' -> Just cbs'
319         fd' = fromIntegral fd
320         (!newMap, (oldEvs, newEvs)) =
321             case IM.updateWith dropReg fd' oldMap of
322               (Nothing,   _)    -> (oldMap, (mempty, mempty))
323               (Just prev, newm) -> (newm, pairEvents prev newm fd')
324         modify = oldEvs /= newEvs
325     when modify $ I.modifyFd emBackend fd oldEvs newEvs
326     return (newMap, modify)
327
328 -- | Drop a previous file descriptor registration.
329 unregisterFd :: EventManager -> FdKey -> IO ()
330 unregisterFd mgr reg = do
331   wake <- unregisterFd_ mgr reg
332   when wake $ wakeManager mgr
333
334 -- | Notify the event manager that a file descriptor has been closed.
335 fdWasClosed :: EventManager -> Fd -> IO ()
336 fdWasClosed mgr fd =
337   modifyMVar_ (emFds mgr) $ \oldMap ->
338     case IM.delete (fromIntegral fd) oldMap of
339       (Nothing,  _)       -> return oldMap
340       (Just fds, !newMap) -> do
341         when (eventsOf fds /= mempty) $ wakeManager mgr
342         return newMap
343
344 ------------------------------------------------------------------------
345 -- Registering interest in timeout events
346
347 -- | Register a timeout in the given number of microseconds.
348 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
349 registerTimeout mgr us cb = do
350   !key <- newUnique (emUniqueSource mgr)
351   if us <= 0 then cb
352     else do
353       now <- getCurrentTime
354       let expTime = fromIntegral us / 1000000.0 + now
355
356       -- We intentionally do not evaluate the modified map to WHNF here.
357       -- Instead, we leave a thunk inside the IORef and defer its
358       -- evaluation until mkTimeout in the event loop.  This is a
359       -- workaround for a nasty IORef contention problem that causes the
360       -- thread-delay benchmark to take 20 seconds instead of 0.2.
361       atomicModifyIORef (emTimeouts mgr) $ \f ->
362           let f' = (Q.insert key expTime cb) . f in (f', ())
363       wakeManager mgr
364   return $ TK key
365
366 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
367 unregisterTimeout mgr (TK key) = do
368   atomicModifyIORef (emTimeouts mgr) $ \f ->
369       let f' = (Q.delete key) . f in (f', ())
370   wakeManager mgr
371
372 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
373 updateTimeout mgr (TK key) us = do
374   now <- getCurrentTime
375   let expTime = fromIntegral us / 1000000.0 + now
376
377   atomicModifyIORef (emTimeouts mgr) $ \f ->
378       let f' = (Q.adjust (const expTime) key) . f in (f', ())
379   wakeManager mgr
380
381 ------------------------------------------------------------------------
382 -- Utilities
383
384 -- | Call the callbacks corresponding to the given file descriptor.
385 onFdEvent :: EventManager -> Fd -> Event -> IO ()
386 onFdEvent mgr fd evs = do
387   fds <- readMVar (emFds mgr)
388   case IM.lookup (fromIntegral fd) fds of
389       Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
390                     when (evs `I.eventIs` ev) $ cb reg evs
391       Nothing  -> return ()