74b1a726b4c2f0bd05fbacc25e5c146862e637c1
[ghc-base.git] / System / Event / Manager.hs
1 {-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude,
2     RecordWildCards, TypeSynonymInstances #-}
3 module System.Event.Manager
4     ( -- * Types
5       EventManager
6
7       -- * Creation
8     , new
9     , newWith
10     , newDefaultBackend
11
12       -- * Running
13     , finished
14     , loop
15     , step
16     , shutdown
17     , wakeManager
18
19       -- * Registering interest in I/O events
20     , Event
21     , evtRead
22     , evtWrite
23     , IOCallback
24     , FdKey(keyFd)
25     , registerFd_
26     , registerFd
27     , unregisterFd_
28     , unregisterFd
29     , closeFd
30
31       -- * Registering interest in timeout events
32     , TimeoutCallback
33     , TimeoutKey
34     , registerTimeout
35     , updateTimeout
36     , unregisterTimeout
37     ) where
38
39 #include "EventConfig.h"
40
41 ------------------------------------------------------------------------
42 -- Imports
43
44 import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_, newMVar,
45                                 readMVar)
46 import Control.Exception (finally)
47 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
48 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
49                    writeIORef)
50 import Data.Maybe (Maybe(..))
51 import Data.Monoid (mappend, mconcat, mempty)
52 import GHC.Base
53 import GHC.Conc.Signal (runHandlers)
54 import GHC.List (filter)
55 import GHC.Num (Num(..))
56 import GHC.Real ((/), fromIntegral )
57 import GHC.Show (Show(..))
58 import System.Event.Clock (getCurrentTime)
59 import System.Event.Control
60 import System.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
61                               Timeout(..))
62 import System.Event.Unique (Unique, UniqueSource, newSource, newUnique)
63 import System.Posix.Types (Fd)
64
65 import qualified System.Event.IntMap as IM
66 import qualified System.Event.Internal as I
67 import qualified System.Event.PSQ as Q
68
69 #if defined(HAVE_KQUEUE)
70 import qualified System.Event.KQueue as KQueue
71 #elif defined(HAVE_EPOLL)
72 import qualified System.Event.EPoll  as EPoll
73 #elif defined(HAVE_POLL)
74 import qualified System.Event.Poll   as Poll
75 #else
76 # error not implemented for this operating system
77 #endif
78
79 ------------------------------------------------------------------------
80 -- Types
81
82 data FdData = FdData {
83       fdKey       :: {-# UNPACK #-} !FdKey
84     , fdEvents    :: {-# UNPACK #-} !Event
85     , _fdCallback :: !IOCallback
86     } deriving (Show)
87
88 -- | A file descriptor registration cookie.
89 data FdKey = FdKey {
90       keyFd     :: {-# UNPACK #-} !Fd
91     , keyUnique :: {-# UNPACK #-} !Unique
92     } deriving (Eq, Show)
93
94 -- | Callback invoked on I/O events.
95 type IOCallback = FdKey -> Event -> IO ()
96
97 instance Show IOCallback where
98     show _ = "IOCallback"
99
100 newtype TimeoutKey   = TK Unique
101     deriving (Eq)
102
103 -- | Callback invoked on timeout events.
104 type TimeoutCallback = IO ()
105
106 data State = Created
107            | Running
108            | Dying
109            | Finished
110              deriving (Eq, Show)
111
112 -- | A priority search queue, with timeouts as priorities.
113 type TimeoutQueue = Q.PSQ TimeoutCallback
114
115 {-
116 Instead of directly modifying the 'TimeoutQueue' in
117 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
118 of a chain of function closures, and have the I/O manager thread
119 perform the edits later.  This exist to address the following GC
120 problem:
121
122 Since e.g. 'registerTimeout' doesn't force the evaluation of the
123 thunks inside the 'emTimeouts' IORef a number of thunks build up
124 inside the IORef.  If the I/O manager thread doesn't evaluate these
125 thunks soon enough they'll get promoted to the old generation and
126 become roots for all subsequent minor GCs.
127
128 When the thunks eventually get evaluated they will each create a new
129 intermediate 'TimeoutQueue' that immediately becomes garbage.  Since
130 the thunks serve as roots until the next major GC these intermediate
131 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
132 increasing GC time.  This problem is known as "floating garbage".
133
134 Keeping a list of edits doesn't stop this from happening but makes the
135 amount of data that gets copied smaller.
136
137 TODO: Evaluate the content of the IORef to WHNF on each insert once
138 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
139 -}
140
141 -- | An edit to apply to a 'TimeoutQueue'.
142 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
143
144 -- | The event manager state.
145 data EventManager = EventManager
146     { emBackend      :: !Backend
147     , emFds          :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
148     , emTimeouts     :: {-# UNPACK #-} !(IORef TimeoutEdit)
149     , emState        :: {-# UNPACK #-} !(IORef State)
150     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
151     , emControl      :: {-# UNPACK #-} !Control
152     }
153
154 ------------------------------------------------------------------------
155 -- Creation
156
157 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
158 handleControlEvent mgr reg _evt = do
159   msg <- readControlMessage (emControl mgr) (keyFd reg)
160   case msg of
161     CMsgWakeup      -> return ()
162     CMsgDie         -> writeIORef (emState mgr) Finished
163     CMsgSignal fp s -> runHandlers fp s
164
165 newDefaultBackend :: IO Backend
166 #if defined(HAVE_KQUEUE)
167 newDefaultBackend = KQueue.new
168 #elif defined(HAVE_EPOLL)
169 newDefaultBackend = EPoll.new
170 #elif defined(HAVE_POLL)
171 newDefaultBackend = Poll.new
172 #else
173 newDefaultBackend = error "no back end for this platform"
174 #endif
175
176 -- | Create a new event manager.
177 new :: IO EventManager
178 new = newWith =<< newDefaultBackend
179
180 newWith :: Backend -> IO EventManager
181 newWith be = do
182   iofds <- newMVar IM.empty
183   timeouts <- newIORef id
184   ctrl <- newControl
185   state <- newIORef Created
186   us <- newSource
187   _ <- mkWeakIORef state $ do
188                st <- atomicModifyIORef state $ \s -> (Finished, s)
189                when (st /= Finished) $ do
190                  I.delete be
191                  closeControl ctrl
192   let mgr = EventManager { emBackend = be
193                          , emFds = iofds
194                          , emTimeouts = timeouts
195                          , emState = state
196                          , emUniqueSource = us
197                          , emControl = ctrl
198                          }
199   _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
200   _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
201   return mgr
202
203 -- | Asynchronously shuts down the event manager, if running.
204 shutdown :: EventManager -> IO ()
205 shutdown mgr = do
206   state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
207   when (state == Running) $ sendDie (emControl mgr)
208
209 finished :: EventManager -> IO Bool
210 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
211
212 cleanup :: EventManager -> IO ()
213 cleanup EventManager{..} = do
214   writeIORef emState Finished
215   I.delete emBackend
216   closeControl emControl
217
218 ------------------------------------------------------------------------
219 -- Event loop
220
221 -- | Start handling events.  This function loops until told to stop.
222 --
223 -- /Note/: This loop can only be run once per 'EventManager', as it
224 -- closes all of its control resources when it finishes.
225 loop :: EventManager -> IO ()
226 loop mgr@EventManager{..} = do
227   state <- atomicModifyIORef emState $ \s -> case s of
228     Created -> (Running, s)
229     _       -> (s, s)
230   case state of
231     Created -> go Q.empty `finally` cleanup mgr
232     Dying   -> cleanup mgr
233     _       -> do cleanup mgr
234                   error $ "System.Event.Manager.loop: state is already " ++
235                       show state
236  where
237   go q = do (running, q') <- step mgr q
238             when running $ go q'
239
240 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
241 step mgr@EventManager{..} tq = do
242   (timeout, q') <- mkTimeout tq
243   I.poll emBackend timeout (onFdEvent mgr)
244   state <- readIORef emState
245   state `seq` return (state == Running, q')
246  where
247
248   -- | Call all expired timer callbacks and return the time to the
249   -- next timeout.
250   mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
251   mkTimeout q = do
252       now <- getCurrentTime
253       applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
254       let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
255       sequence_ $ map Q.value expired
256       let timeout = case Q.minView q'' of
257             Nothing             -> Forever
258             Just (Q.E _ t _, _) ->
259                 -- This value will always be positive since the call
260                 -- to 'atMost' above removed any timeouts <= 'now'
261                 let t' = t - now in t' `seq` Timeout t'
262       return (timeout, q'')
263
264 ------------------------------------------------------------------------
265 -- Registering interest in I/O events
266
267 -- | Register interest in the given events, without waking the event
268 -- manager thread.  The 'Bool' return value indicates whether the
269 -- event manager ought to be woken.
270 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
271             -> IO (FdKey, Bool)
272 registerFd_ EventManager{..} cb fd evs = do
273   u <- newUnique emUniqueSource
274   modifyMVar emFds $ \oldMap -> do
275     let fd'  = fromIntegral fd
276         reg  = FdKey fd u
277         !fdd = FdData reg evs cb
278         (!newMap, (oldEvs, newEvs)) =
279             case IM.insertWith (++) fd' [fdd] oldMap of
280               (Nothing,   n) -> (n, (mempty, evs))
281               (Just prev, n) -> (n, pairEvents prev newMap fd')
282         modify = oldEvs /= newEvs
283     when modify $ I.modifyFd emBackend fd oldEvs newEvs
284     return (newMap, (reg, modify))
285 {-# INLINE registerFd_ #-}
286
287 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
288 -- on the file descriptor @fd@.  @cb@ is called for each event that
289 -- occurs.  Returns a cookie that can be handed to 'unregisterFd'.
290 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
291 registerFd mgr cb fd evs = do
292   (r, wake) <- registerFd_ mgr cb fd evs
293   when wake $ wakeManager mgr
294   return r
295 {-# INLINE registerFd #-}
296
297 -- | Wake up the event manager.
298 wakeManager :: EventManager -> IO ()
299 wakeManager mgr = sendWakeup (emControl mgr)
300
301 eventsOf :: [FdData] -> Event
302 eventsOf = mconcat . map fdEvents
303
304 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
305 pairEvents prev m fd = let l = eventsOf prev
306                            r = case IM.lookup fd m of
307                                  Nothing  -> mempty
308                                  Just fds -> eventsOf fds
309                        in (l, r)
310
311 -- | Drop a previous file descriptor registration, without waking the
312 -- event manager thread.  The return value indicates whether the event
313 -- manager ought to be woken.
314 unregisterFd_ :: EventManager -> FdKey -> IO Bool
315 unregisterFd_ EventManager{..} (FdKey fd u) =
316   modifyMVar emFds $ \oldMap -> do
317     let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
318                           []   -> Nothing
319                           cbs' -> Just cbs'
320         fd' = fromIntegral fd
321         (!newMap, (oldEvs, newEvs)) =
322             case IM.updateWith dropReg fd' oldMap of
323               (Nothing,   _)    -> (oldMap, (mempty, mempty))
324               (Just prev, newm) -> (newm, pairEvents prev newm fd')
325         modify = oldEvs /= newEvs
326     when modify $ I.modifyFd emBackend fd oldEvs newEvs
327     return (newMap, modify)
328
329 -- | Drop a previous file descriptor registration.
330 unregisterFd :: EventManager -> FdKey -> IO ()
331 unregisterFd mgr reg = do
332   wake <- unregisterFd_ mgr reg
333   when wake $ wakeManager mgr
334
335 -- | Close a file descriptor in a race-safe way.
336 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
337 closeFd mgr close fd = do
338   fds <- modifyMVar (emFds mgr) $ \oldMap -> do
339     close fd
340     case IM.delete (fromIntegral fd) oldMap of
341       (Nothing,  _)       -> return (oldMap, [])
342       (Just fds, !newMap) -> do
343         when (eventsOf fds /= mempty) $ wakeManager mgr
344         return (newMap, fds)
345   forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
346
347 ------------------------------------------------------------------------
348 -- Registering interest in timeout events
349
350 -- | Register a timeout in the given number of microseconds.
351 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
352 registerTimeout mgr us cb = do
353   !key <- newUnique (emUniqueSource mgr)
354   if us <= 0 then cb
355     else do
356       now <- getCurrentTime
357       let expTime = fromIntegral us / 1000000.0 + now
358
359       -- We intentionally do not evaluate the modified map to WHNF here.
360       -- Instead, we leave a thunk inside the IORef and defer its
361       -- evaluation until mkTimeout in the event loop.  This is a
362       -- workaround for a nasty IORef contention problem that causes the
363       -- thread-delay benchmark to take 20 seconds instead of 0.2.
364       atomicModifyIORef (emTimeouts mgr) $ \f ->
365           let f' = (Q.insert key expTime cb) . f in (f', ())
366       wakeManager mgr
367   return $ TK key
368
369 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
370 unregisterTimeout mgr (TK key) = do
371   atomicModifyIORef (emTimeouts mgr) $ \f ->
372       let f' = (Q.delete key) . f in (f', ())
373   wakeManager mgr
374
375 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
376 updateTimeout mgr (TK key) us = do
377   now <- getCurrentTime
378   let expTime = fromIntegral us / 1000000.0 + now
379
380   atomicModifyIORef (emTimeouts mgr) $ \f ->
381       let f' = (Q.adjust (const expTime) key) . f in (f', ())
382   wakeManager mgr
383
384 ------------------------------------------------------------------------
385 -- Utilities
386
387 -- | Call the callbacks corresponding to the given file descriptor.
388 onFdEvent :: EventManager -> Fd -> Event -> IO ()
389 onFdEvent mgr fd evs = do
390   fds <- readMVar (emFds mgr)
391   case IM.lookup (fromIntegral fd) fds of
392       Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
393                     when (evs `I.eventIs` ev) $ cb reg evs
394       Nothing  -> return ()