Document System.Event
[ghc-base.git] / System / Event / Manager.hs
1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2     RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
4     ( -- * Types
5       EventManager
6
7       -- * Creation
8     , new
9     , newWith
10     , newDefaultBackend
11
12       -- * Running
13     , finished
14     , loop
15     , step
16     , shutdown
17     , wakeManager
18
19       -- * Registering interest in I/O events
20     , Event
21     , evtRead
22     , evtWrite
23     , IOCallback
24     , FdKey(keyFd)
25     , registerFd_
26     , registerFd
27     , unregisterFd_
28     , unregisterFd
29     , closeFd
30
31       -- * Registering interest in timeout events
32     , TimeoutCallback
33     , TimeoutKey
34     , registerTimeout
35     , updateTimeout
36     , unregisterTimeout
37     ) where
38
39 #include "EventConfig.h"
40
41 ------------------------------------------------------------------------
42 -- Imports
43
44 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
45 import Control.Exception (finally)
46 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
47 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
48                    writeIORef)
49 import Data.Maybe (Maybe(..))
50 import Data.Monoid (mappend, mconcat, mempty)
51 import GHC.Base
52 import GHC.Conc.Signal (runHandlers)
53 import GHC.List (filter)
54 import GHC.Num (Num(..))
55 import GHC.Real ((/), fromIntegral )
56 import GHC.Show (Show(..))
57 import System.Event.Clock (getCurrentTime)
58 import System.Event.Control
59 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
60                               Timeout(..))
61 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
62 import System.Posix.Types (Fd)
63
64 import qualified System.Event.IntMap as IM
65 import qualified System.Event.Internal as I
66 import qualified System.Event.PSQ as Q
67
68 #if defined(HAVE_KQUEUE)
69 import qualified System.Event.KQueue as KQueue
70 #elif defined(HAVE_EPOLL)
71 import qualified System.Event.EPoll  as EPoll
72 #elif defined(HAVE_POLL)
73 import qualified System.Event.Poll   as Poll
74 #else
75 # error not implemented for this operating system
76 #endif
77
78 ------------------------------------------------------------------------
79 -- Types
80
81 data FdData = FdData {
82       fdKey       :: {-# UNPACK #-} !FdKey
83     , fdEvents    :: {-# UNPACK #-} !Event
84     , _fdCallback :: !IOCallback
85     } deriving (Show)
86
87 -- | A file descriptor registration cookie.
88 data FdKey = FdKey {
89       keyFd     :: {-# UNPACK #-} !Fd
90     , keyUnique :: {-# UNPACK #-} !Unique
91     } deriving (Eq, Show)
92
93 -- | Callback invoked on I/O events.
94 type IOCallback = FdKey -> Event -> IO ()
95
96 instance Show IOCallback where
97     show _ = "IOCallback"
98
99 -- | A timeout registration cookie.
100 newtype TimeoutKey   = TK Unique
101     deriving (Eq)
102
103 -- | Callback invoked on timeout events.
104 type TimeoutCallback = IO ()
105
106 data State = Created
107            | Running
108            | Dying
109            | Finished
110              deriving (Eq, Show)
111
112 -- | A priority search queue, with timeouts as priorities.
113 type TimeoutQueue = Q.PSQ TimeoutCallback
114
115 {-
116 Instead of directly modifying the 'TimeoutQueue' in
117 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
118 of a chain of function closures, and have the I/O manager thread
119 perform the edits later.  This exist to address the following GC
120 problem:
121
122 Since e.g. 'registerTimeout' doesn't force the evaluation of the
123 thunks inside the 'emTimeouts' IORef a number of thunks build up
124 inside the IORef.  If the I/O manager thread doesn't evaluate these
125 thunks soon enough they'll get promoted to the old generation and
126 become roots for all subsequent minor GCs.
127
128 When the thunks eventually get evaluated they will each create a new
129 intermediate 'TimeoutQueue' that immediately becomes garbage.  Since
130 the thunks serve as roots until the next major GC these intermediate
131 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
132 increasing GC time.  This problem is known as "floating garbage".
133
134 Keeping a list of edits doesn't stop this from happening but makes the
135 amount of data that gets copied smaller.
136
137 TODO: Evaluate the content of the IORef to WHNF on each insert once
138 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
139 -}
140
141 -- | An edit to apply to a 'TimeoutQueue'.
142 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
143
144 -- | The event manager state.
145 data EventManager = EventManager
146     { emBackend      :: !Backend
147     , emFds          :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
148     , emTimeouts     :: {-# UNPACK #-} !(IORef TimeoutEdit)
149     , emState        :: {-# UNPACK #-} !(IORef State)
150     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
151     , emControl      :: {-# UNPACK #-} !Control
152     }
153
154 ------------------------------------------------------------------------
155 -- Creation
156
157 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
158 handleControlEvent mgr reg _evt = do
159   msg <- readControlMessage (emControl mgr) (keyFd reg)
160   case msg of
161     CMsgWakeup      -> return ()
162     CMsgDie         -> writeIORef (emState mgr) Finished
163     CMsgSignal fp s -> runHandlers fp s
164
165 newDefaultBackend :: IO Backend
166 #if defined(HAVE_KQUEUE)
167 newDefaultBackend = KQueue.new
168 #elif defined(HAVE_EPOLL)
169 newDefaultBackend = EPoll.new
170 #elif defined(HAVE_POLL)
171 newDefaultBackend = Poll.new
172 #else
173 newDefaultBackend = error "no back end for this platform"
174 #endif
175
176 -- | Create a new event manager.
177 new :: IO EventManager
178 new = newWith =<< newDefaultBackend
179
180 newWith :: Backend -> IO EventManager
181 newWith be = do
182   iofds <- newMVar IM.empty
183   timeouts <- newIORef id
184   ctrl <- newControl
185   state <- newIORef Created
186   us <- newSource
187   _ <- mkWeakIORef state $ do
188                st <- atomicModifyIORef state $ \s -> (Finished, s)
189                when (st /= Finished) $ do
190                  I.delete be
191                  closeControl ctrl
192   let mgr = EventManager { emBackend = be
193                          , emFds = iofds
194                          , emTimeouts = timeouts
195                          , emState = state
196                          , emUniqueSource = us
197                          , emControl = ctrl
198                          }
199   _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
200   _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
201   return mgr
202
203 -- | Asynchronously shuts down the event manager, if running.
204 shutdown :: EventManager -> IO ()
205 shutdown mgr = do
206   state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
207   when (state == Running) $ sendDie (emControl mgr)
208
209 finished :: EventManager -> IO Bool
210 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
211
212 cleanup :: EventManager -> IO ()
213 cleanup EventManager{..} = do
214   writeIORef emState Finished
215   I.delete emBackend
216   closeControl emControl
217
218 ------------------------------------------------------------------------
219 -- Event loop
220
221 -- | Start handling events.  This function loops until told to stop,
222 -- using 'shutdown'.
223 --
224 -- /Note/: This loop can only be run once per 'EventManager', as it
225 -- closes all of its control resources when it finishes.
226 loop :: EventManager -> IO ()
227 loop mgr@EventManager{..} = do
228   state <- atomicModifyIORef emState $ \s -> case s of
229     Created -> (Running, s)
230     _       -> (s, s)
231   case state of
232     Created -> go Q.empty `finally` cleanup mgr
233     Dying   -> cleanup mgr
234     _       -> do cleanup mgr
235                   error $ "System.Event.Manager.loop: state is already " ++
236                       show state
237  where
238   go q = do (running, q') <- step mgr q
239             when running $ go q'
240
241 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
242 step mgr@EventManager{..} tq = do
243   (timeout, q') <- mkTimeout tq
244   I.poll emBackend timeout (onFdEvent mgr)
245   state <- readIORef emState
246   state `seq` return (state == Running, q')
247  where
248
249   -- | Call all expired timer callbacks and return the time to the
250   -- next timeout.
251   mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
252   mkTimeout q = do
253       now <- getCurrentTime
254       applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
255       let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
256       sequence_ $ map Q.value expired
257       let timeout = case Q.minView q'' of
258             Nothing             -> Forever
259             Just (Q.E _ t _, _) ->
260                 -- This value will always be positive since the call
261                 -- to 'atMost' above removed any timeouts <= 'now'
262                 let t' = t - now in t' `seq` Timeout t'
263       return (timeout, q'')
264
265 ------------------------------------------------------------------------
266 -- Registering interest in I/O events
267
268 -- | Register interest in the given events, without waking the event
269 -- manager thread.  The 'Bool' return value indicates whether the
270 -- event manager ought to be woken.
271 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
272             -> IO (FdKey, Bool)
273 registerFd_ EventManager{..} cb fd evs = do
274   u <- newUnique emUniqueSource
275   modifyMVar emFds $ \oldMap -> do
276     let fd'  = fromIntegral fd
277         reg  = FdKey fd u
278         !fdd = FdData reg evs cb
279         (!newMap, (oldEvs, newEvs)) =
280             case IM.insertWith (++) fd' [fdd] oldMap of
281               (Nothing,   n) -> (n, (mempty, evs))
282               (Just prev, n) -> (n, pairEvents prev newMap fd')
283         modify = oldEvs /= newEvs
284     when modify $ I.modifyFd emBackend fd oldEvs newEvs
285     return (newMap, (reg, modify))
286 {-# INLINE registerFd_ #-}
287
288 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
289 -- on the file descriptor @fd@.  @cb@ is called for each event that
290 -- occurs.  Returns a cookie that can be handed to 'unregisterFd'.
291 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
292 registerFd mgr cb fd evs = do
293   (r, wake) <- registerFd_ mgr cb fd evs
294   when wake $ wakeManager mgr
295   return r
296 {-# INLINE registerFd #-}
297
298 -- | Wake up the event manager.
299 wakeManager :: EventManager -> IO ()
300 wakeManager mgr = sendWakeup (emControl mgr)
301
302 eventsOf :: [FdData] -> Event
303 eventsOf = mconcat . map fdEvents
304
305 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
306 pairEvents prev m fd = let l = eventsOf prev
307                            r = case IM.lookup fd m of
308                                  Nothing  -> mempty
309                                  Just fds -> eventsOf fds
310                        in (l, r)
311
312 -- | Drop a previous file descriptor registration, without waking the
313 -- event manager thread.  The return value indicates whether the event
314 -- manager ought to be woken.
315 unregisterFd_ :: EventManager -> FdKey -> IO Bool
316 unregisterFd_ EventManager{..} (FdKey fd u) =
317   modifyMVar emFds $ \oldMap -> do
318     let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
319                           []   -> Nothing
320                           cbs' -> Just cbs'
321         fd' = fromIntegral fd
322         (!newMap, (oldEvs, newEvs)) =
323             case IM.updateWith dropReg fd' oldMap of
324               (Nothing,   _)    -> (oldMap, (mempty, mempty))
325               (Just prev, newm) -> (newm, pairEvents prev newm fd')
326         modify = oldEvs /= newEvs
327     when modify $ I.modifyFd emBackend fd oldEvs newEvs
328     return (newMap, modify)
329
330 -- | Drop a previous file descriptor registration.
331 unregisterFd :: EventManager -> FdKey -> IO ()
332 unregisterFd mgr reg = do
333   wake <- unregisterFd_ mgr reg
334   when wake $ wakeManager mgr
335
336 -- | Close a file descriptor in a race-safe way.
337 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
338 closeFd mgr close fd = do
339   fds <- modifyMVar (emFds mgr) $ \oldMap -> do
340     close fd
341     case IM.delete (fromIntegral fd) oldMap of
342       (Nothing,  _)       -> return (oldMap, [])
343       (Just fds, !newMap) -> do
344         when (eventsOf fds /= mempty) $ wakeManager mgr
345         return (newMap, fds)
346   forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
347
348 ------------------------------------------------------------------------
349 -- Registering interest in timeout events
350
351 -- | Register a timeout in the given number of microseconds.  The
352 -- returned 'TimeoutKey' can be used to later unregister or update the
353 -- timeout.  The timeout is automatically unregistered after the given
354 -- time has passed.
355 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
356 registerTimeout mgr us cb = do
357   !key <- newUnique (emUniqueSource mgr)
358   if us <= 0 then cb
359     else do
360       now <- getCurrentTime
361       let expTime = fromIntegral us / 1000000.0 + now
362
363       -- We intentionally do not evaluate the modified map to WHNF here.
364       -- Instead, we leave a thunk inside the IORef and defer its
365       -- evaluation until mkTimeout in the event loop.  This is a
366       -- workaround for a nasty IORef contention problem that causes the
367       -- thread-delay benchmark to take 20 seconds instead of 0.2.
368       atomicModifyIORef (emTimeouts mgr) $ \f ->
369           let f' = (Q.insert key expTime cb) . f in (f', ())
370       wakeManager mgr
371   return $ TK key
372
373 -- | Unregister an active timeout.
374 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
375 unregisterTimeout mgr (TK key) = do
376   atomicModifyIORef (emTimeouts mgr) $ \f ->
377       let f' = (Q.delete key) . f in (f', ())
378   wakeManager mgr
379
380 -- | Update an active timeout to fire in the given number of
381 -- microseconds.
382 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
383 updateTimeout mgr (TK key) us = do
384   now <- getCurrentTime
385   let expTime = fromIntegral us / 1000000.0 + now
386
387   atomicModifyIORef (emTimeouts mgr) $ \f ->
388       let f' = (Q.adjust (const expTime) key) . f in (f', ())
389   wakeManager mgr
390
391 ------------------------------------------------------------------------
392 -- Utilities
393
394 -- | Call the callbacks corresponding to the given file descriptor.
395 onFdEvent :: EventManager -> Fd -> Event -> IO ()
396 onFdEvent mgr fd evs = do
397   fds <- readMVar (emFds mgr)
398   case IM.lookup (fromIntegral fd) fds of
399       Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
400                     when (evs `I.eventIs` ev) $ cb reg evs
401       Nothing  -> return ()