0c8293f5f3751b090d56735162c756d3057fcdc9
[ghc-base.git] / System / Event / Manager.hs
1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2     RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
4     ( -- * Types
5       EventManager
6
7       -- * Creation
8     , new
9     , newWith
10     , newDefaultBackend
11
12       -- * Running
13     , finished
14     , loop
15     , step
16     , shutdown
17     , cleanup
18     , wakeManager
19
20       -- * Registering interest in I/O events
21     , Event
22     , evtRead
23     , evtWrite
24     , IOCallback
25     , FdKey(keyFd)
26     , registerFd_
27     , registerFd
28     , unregisterFd_
29     , unregisterFd
30     , closeFd
31
32       -- * Registering interest in timeout events
33     , TimeoutCallback
34     , TimeoutKey
35     , registerTimeout
36     , updateTimeout
37     , unregisterTimeout
38     ) where
39
40 #include "EventConfig.h"
41
42 ------------------------------------------------------------------------
43 -- Imports
44
45 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
49                    writeIORef)
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mappend, mconcat, mempty)
52 import GHC.Base
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral )
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
61                               Timeout(..))
62 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
63 import System.Posix.Types (Fd)
64
65 import qualified System.Event.IntMap as IM
66 import qualified System.Event.Internal as I
67 import qualified System.Event.PSQ as Q
68
69 #if defined(HAVE_KQUEUE)
70 import qualified System.Event.KQueue as KQueue
71 #elif defined(HAVE_EPOLL)
72 import qualified System.Event.EPoll  as EPoll
73 #elif defined(HAVE_POLL)
74 import qualified System.Event.Poll   as Poll
75 #else
76 # error not implemented for this operating system
77 #endif
78
79 ------------------------------------------------------------------------
80 -- Types
81
82 data FdData = FdData {
83       fdKey       :: {-# UNPACK #-} !FdKey
84     , fdEvents    :: {-# UNPACK #-} !Event
85     , _fdCallback :: !IOCallback
86     } deriving (Show)
87
88 -- | A file descriptor registration cookie.
89 data FdKey = FdKey {
90       keyFd     :: {-# UNPACK #-} !Fd
91     , keyUnique :: {-# UNPACK #-} !Unique
92     } deriving (Eq, Show)
93
94 -- | Callback invoked on I/O events.
95 type IOCallback = FdKey -> Event -> IO ()
96
97 instance Show IOCallback where
98     show _ = "IOCallback"
99
100 -- | A timeout registration cookie.
101 newtype TimeoutKey   = TK Unique
102     deriving (Eq)
103
104 -- | Callback invoked on timeout events.
105 type TimeoutCallback = IO ()
106
107 data State = Created
108            | Running
109            | Dying
110            | Finished
111              deriving (Eq, Show)
112
113 -- | A priority search queue, with timeouts as priorities.
114 type TimeoutQueue = Q.PSQ TimeoutCallback
115
116 {-
117 Instead of directly modifying the 'TimeoutQueue' in
118 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
119 of a chain of function closures, and have the I/O manager thread
120 perform the edits later.  This exist to address the following GC
121 problem:
122
123 Since e.g. 'registerTimeout' doesn't force the evaluation of the
124 thunks inside the 'emTimeouts' IORef a number of thunks build up
125 inside the IORef.  If the I/O manager thread doesn't evaluate these
126 thunks soon enough they'll get promoted to the old generation and
127 become roots for all subsequent minor GCs.
128
129 When the thunks eventually get evaluated they will each create a new
130 intermediate 'TimeoutQueue' that immediately becomes garbage.  Since
131 the thunks serve as roots until the next major GC these intermediate
132 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
133 increasing GC time.  This problem is known as "floating garbage".
134
135 Keeping a list of edits doesn't stop this from happening but makes the
136 amount of data that gets copied smaller.
137
138 TODO: Evaluate the content of the IORef to WHNF on each insert once
139 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
140 -}
141
142 -- | An edit to apply to a 'TimeoutQueue'.
143 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
144
145 -- | The event manager state.
146 data EventManager = EventManager
147     { emBackend      :: !Backend
148     , emFds          :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
149     , emTimeouts     :: {-# UNPACK #-} !(IORef TimeoutEdit)
150     , emState        :: {-# UNPACK #-} !(IORef State)
151     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
152     , emControl      :: {-# UNPACK #-} !Control
153     }
154
155 ------------------------------------------------------------------------
156 -- Creation
157
158 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
159 handleControlEvent mgr reg _evt = do
160   msg <- readControlMessage (emControl mgr) (keyFd reg)
161   case msg of
162     CMsgWakeup      -> return ()
163     CMsgDie         -> writeIORef (emState mgr) Finished
164     CMsgSignal fp s -> runHandlers fp s
165
166 newDefaultBackend :: IO Backend
167 #if defined(HAVE_KQUEUE)
168 newDefaultBackend = KQueue.new
169 #elif defined(HAVE_EPOLL)
170 newDefaultBackend = EPoll.new
171 #elif defined(HAVE_POLL)
172 newDefaultBackend = Poll.new
173 #else
174 newDefaultBackend = error "no back end for this platform"
175 #endif
176
177 -- | Create a new event manager.
178 new :: IO EventManager
179 new = newWith =<< newDefaultBackend
180
181 newWith :: Backend -> IO EventManager
182 newWith be = do
183   iofds <- newMVar IM.empty
184   timeouts <- newIORef id
185   ctrl <- newControl
186   state <- newIORef Created
187   us <- newSource
188   _ <- mkWeakIORef state $ do
189                st <- atomicModifyIORef state $ \s -> (Finished, s)
190                when (st /= Finished) $ do
191                  I.delete be
192                  closeControl ctrl
193   let mgr = EventManager { emBackend = be
194                          , emFds = iofds
195                          , emTimeouts = timeouts
196                          , emState = state
197                          , emUniqueSource = us
198                          , emControl = ctrl
199                          }
200   _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
201   _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
202   return mgr
203
204 -- | Asynchronously shuts down the event manager, if running.
205 shutdown :: EventManager -> IO ()
206 shutdown mgr = do
207   state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
208   when (state == Running) $ sendDie (emControl mgr)
209
210 finished :: EventManager -> IO Bool
211 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
212
213 cleanup :: EventManager -> IO ()
214 cleanup EventManager{..} = do
215   writeIORef emState Finished
216   I.delete emBackend
217   closeControl emControl
218
219 ------------------------------------------------------------------------
220 -- Event loop
221
222 -- | Start handling events.  This function loops until told to stop,
223 -- using 'shutdown'.
224 --
225 -- /Note/: This loop can only be run once per 'EventManager', as it
226 -- closes all of its control resources when it finishes.
227 loop :: EventManager -> IO ()
228 loop mgr@EventManager{..} = do
229   state <- atomicModifyIORef emState $ \s -> case s of
230     Created -> (Running, s)
231     _       -> (s, s)
232   case state of
233     Created -> go Q.empty `finally` cleanup mgr
234     Dying   -> cleanup mgr
235     _       -> do cleanup mgr
236                   error $ "System.Event.Manager.loop: state is already " ++
237                       show state
238  where
239   go q = do (running, q') <- step mgr q
240             when running $ go q'
241
242 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
243 step mgr@EventManager{..} tq = do
244   (timeout, q') <- mkTimeout tq
245   I.poll emBackend timeout (onFdEvent mgr)
246   state <- readIORef emState
247   state `seq` return (state == Running, q')
248  where
249
250   -- | Call all expired timer callbacks and return the time to the
251   -- next timeout.
252   mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
253   mkTimeout q = do
254       now <- getCurrentTime
255       applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
256       let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
257       sequence_ $ map Q.value expired
258       let timeout = case Q.minView q'' of
259             Nothing             -> Forever
260             Just (Q.E _ t _, _) ->
261                 -- This value will always be positive since the call
262                 -- to 'atMost' above removed any timeouts <= 'now'
263                 let t' = t - now in t' `seq` Timeout t'
264       return (timeout, q'')
265
266 ------------------------------------------------------------------------
267 -- Registering interest in I/O events
268
269 -- | Register interest in the given events, without waking the event
270 -- manager thread.  The 'Bool' return value indicates whether the
271 -- event manager ought to be woken.
272 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
273             -> IO (FdKey, Bool)
274 registerFd_ EventManager{..} cb fd evs = do
275   u <- newUnique emUniqueSource
276   modifyMVar emFds $ \oldMap -> do
277     let fd'  = fromIntegral fd
278         reg  = FdKey fd u
279         !fdd = FdData reg evs cb
280         (!newMap, (oldEvs, newEvs)) =
281             case IM.insertWith (++) fd' [fdd] oldMap of
282               (Nothing,   n) -> (n, (mempty, evs))
283               (Just prev, n) -> (n, pairEvents prev newMap fd')
284         modify = oldEvs /= newEvs
285     when modify $ I.modifyFd emBackend fd oldEvs newEvs
286     return (newMap, (reg, modify))
287 {-# INLINE registerFd_ #-}
288
289 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
290 -- on the file descriptor @fd@.  @cb@ is called for each event that
291 -- occurs.  Returns a cookie that can be handed to 'unregisterFd'.
292 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
293 registerFd mgr cb fd evs = do
294   (r, wake) <- registerFd_ mgr cb fd evs
295   when wake $ wakeManager mgr
296   return r
297 {-# INLINE registerFd #-}
298
299 -- | Wake up the event manager.
300 wakeManager :: EventManager -> IO ()
301 wakeManager mgr = sendWakeup (emControl mgr)
302
303 eventsOf :: [FdData] -> Event
304 eventsOf = mconcat . map fdEvents
305
306 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
307 pairEvents prev m fd = let l = eventsOf prev
308                            r = case IM.lookup fd m of
309                                  Nothing  -> mempty
310                                  Just fds -> eventsOf fds
311                        in (l, r)
312
313 -- | Drop a previous file descriptor registration, without waking the
314 -- event manager thread.  The return value indicates whether the event
315 -- manager ought to be woken.
316 unregisterFd_ :: EventManager -> FdKey -> IO Bool
317 unregisterFd_ EventManager{..} (FdKey fd u) =
318   modifyMVar emFds $ \oldMap -> do
319     let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
320                           []   -> Nothing
321                           cbs' -> Just cbs'
322         fd' = fromIntegral fd
323         (!newMap, (oldEvs, newEvs)) =
324             case IM.updateWith dropReg fd' oldMap of
325               (Nothing,   _)    -> (oldMap, (mempty, mempty))
326               (Just prev, newm) -> (newm, pairEvents prev newm fd')
327         modify = oldEvs /= newEvs
328     when modify $ I.modifyFd emBackend fd oldEvs newEvs
329     return (newMap, modify)
330
331 -- | Drop a previous file descriptor registration.
332 unregisterFd :: EventManager -> FdKey -> IO ()
333 unregisterFd mgr reg = do
334   wake <- unregisterFd_ mgr reg
335   when wake $ wakeManager mgr
336
337 -- | Close a file descriptor in a race-safe way.
338 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
339 closeFd mgr close fd = do
340   fds <- modifyMVar (emFds mgr) $ \oldMap -> do
341     close fd
342     case IM.delete (fromIntegral fd) oldMap of
343       (Nothing,  _)       -> return (oldMap, [])
344       (Just fds, !newMap) -> do
345         when (eventsOf fds /= mempty) $ wakeManager mgr
346         return (newMap, fds)
347   forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
348
349 ------------------------------------------------------------------------
350 -- Registering interest in timeout events
351
352 -- | Register a timeout in the given number of microseconds.  The
353 -- returned 'TimeoutKey' can be used to later unregister or update the
354 -- timeout.  The timeout is automatically unregistered after the given
355 -- time has passed.
356 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
357 registerTimeout mgr us cb = do
358   !key <- newUnique (emUniqueSource mgr)
359   if us <= 0 then cb
360     else do
361       now <- getCurrentTime
362       let expTime = fromIntegral us / 1000000.0 + now
363
364       -- We intentionally do not evaluate the modified map to WHNF here.
365       -- Instead, we leave a thunk inside the IORef and defer its
366       -- evaluation until mkTimeout in the event loop.  This is a
367       -- workaround for a nasty IORef contention problem that causes the
368       -- thread-delay benchmark to take 20 seconds instead of 0.2.
369       atomicModifyIORef (emTimeouts mgr) $ \f ->
370           let f' = (Q.insert key expTime cb) . f in (f', ())
371       wakeManager mgr
372   return $ TK key
373
374 -- | Unregister an active timeout.
375 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
376 unregisterTimeout mgr (TK key) = do
377   atomicModifyIORef (emTimeouts mgr) $ \f ->
378       let f' = (Q.delete key) . f in (f', ())
379   wakeManager mgr
380
381 -- | Update an active timeout to fire in the given number of
382 -- microseconds.
383 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
384 updateTimeout mgr (TK key) us = do
385   now <- getCurrentTime
386   let expTime = fromIntegral us / 1000000.0 + now
387
388   atomicModifyIORef (emTimeouts mgr) $ \f ->
389       let f' = (Q.adjust (const expTime) key) . f in (f', ())
390   wakeManager mgr
391
392 ------------------------------------------------------------------------
393 -- Utilities
394
395 -- | Call the callbacks corresponding to the given file descriptor.
396 onFdEvent :: EventManager -> Fd -> Event -> IO ()
397 onFdEvent mgr fd evs = do
398   fds <- readMVar (emFds mgr)
399   case IM.lookup (fromIntegral fd) fds of
400       Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
401                     when (evs `I.eventIs` ev) $ cb reg evs
402       Nothing  -> return ()