-- * Basic concurrency operations
ThreadId,
+#ifdef __GLASGOW_HASKELL__
myThreadId,
+#endif
forkIO,
+#ifdef __GLASGOW_HASKELL__
killThread,
throwTo,
+#endif
-- * Scheduling
module Control.Concurrent.SampleVar,
-- * Merging of streams
+#ifndef __HUGS__
mergeIO, -- :: [a] -> [a] -> IO [a]
nmergeIO, -- :: [[a]] -> IO [a]
+#endif
-- $merge
+#ifdef __GLASGOW_HASKELL__
+ -- * Bound Threads
+ -- $boundthreads
+ rtsSupportsBoundThreads,
+ forkOS,
+ isCurrentThreadBound,
+ runInBoundThread,
+ runInUnboundThread
+#endif
+
-- * GHC's implementation of concurrency
-- |This section describes features specific to GHC's
-- ** Pre-emption
-- $preemption
-
) where
import Prelude
import Control.Exception as Exception
#ifdef __GLASGOW_HASKELL__
-import GHC.Conc
+import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
+ threadDelay, threadWaitRead, threadWaitWrite )
import GHC.TopHandler ( reportStackOverflow, reportError )
import GHC.IOBase ( IO(..) )
import GHC.IOBase ( unsafeInterleaveIO )
+import GHC.IOBase ( newIORef, readIORef, writeIORef )
import GHC.Base
-import GHC.Ptr
+
+import Foreign.StablePtr
+import Foreign.C.Types ( CInt )
+import Control.Monad ( when )
#endif
#ifdef __HUGS__
-import IOExts ( unsafeInterleaveIO )
-import ConcBase
+import Hugs.ConcBase
#endif
import Control.Concurrent.MVar
import Control.Concurrent.QSemN
import Control.Concurrent.SampleVar
+#ifdef __HUGS__
+type ThreadId = ()
+#endif
+
{- $conc_intro
The concurrency extension for Haskell is described in the paper
Haskell threads is done internally in the Haskell runtime system, and
doesn't make use of any operating system-supplied thread packages.
+However, if you want to interact with a foreign library that expects your
+program to use the operating system-supplied thread package, you can do so
+by using 'forkOS' instead of 'forkIO'.
+
Haskell threads can communicate via 'MVar's, a kind of synchronised
mutable variable (see "Control.Concurrent.MVar"). Several common
concurrency abstractions can be built from 'MVar's, and these are
-provided by the "Concurrent" library. Threads may also communicate
-via exceptions.
+provided by the "Control.Concurrent" library.
+In GHC, threads may also communicate via exceptions.
-}
{- $conc_scheduling
Scheduling may be either pre-emptive or co-operative,
depending on the implementation of Concurrent Haskell (see below
- for imformation related to specific compilers). In a co-operative
+ for information related to specific compilers). In a co-operative
system, context switches only occur when you use one of the
primitives defined in this module. This means that programs such
as:
-- cmp_thread in the RTS.
#ifdef __GLASGOW_HASKELL__
-type StgTSO = Ptr ()
+id2TSO :: ThreadId -> ThreadId#
+id2TSO (ThreadId t) = t
-id2TSO :: ThreadId -> StgTSO
-id2TSO (ThreadId t) = unsafeCoerce# t
-
-foreign import ccall unsafe "cmp_thread" cmp_thread :: StgTSO -> StgTSO -> Int
+foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> Int
-- Returns -1, 0, 1
cmpThread :: ThreadId -> ThreadId -> Ordering
instance Ord ThreadId where
compare = cmpThread
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: StgTSO -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
instance Show ThreadId where
showsPrec d t =
This sparks off a new thread to run the 'IO' computation passed as the
first argument, and returns the 'ThreadId' of the newly created
thread.
+
+The new thread will be a lightweight thread; if you want to use a foreign
+library that uses thread-local storage, use 'forkOS' instead.
-}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
case ex of
-- ignore thread GC and killThread exceptions:
BlockedOnDeadMVar -> return ()
+ BlockedIndefinitely -> return ()
AsyncException ThreadKilled -> return ()
-- report all others:
- AsyncException StackOverflow -> reportStackOverflow False
- ErrorCall s -> reportError False s
- other -> reportError False (showsPrec 0 other "\n")
+ AsyncException StackOverflow -> reportStackOverflow
+ other -> reportError other
#endif /* __GLASGOW_HASKELL__ */
-
+#ifndef __HUGS__
max_buff_size :: Int
max_buff_size = 1
return val
where
mapIO f xs = sequence (map f xs)
+#endif /* __HUGS__ */
+
+#ifdef __GLASGOW_HASKELL__
+-- ---------------------------------------------------------------------------
+-- Bound Threads
+
+{- $boundthreads
+
+Support for multiple operating system threads and bound threads as described
+below is currently only available in the GHC runtime system if you use the
+/-threaded/ option when linking.
+
+Other Haskell systems do not currently support multiple operating system threads.
+
+A bound thread is a haskell thread that is /bound/ to an operating system
+thread. While the bound thread is still scheduled by the Haskell run-time
+system, the operating system thread takes care of all the foreign calls made
+by the bound thread.
+
+To a foreign library, the bound thread will look exactly like an ordinary
+operating system thread created using OS functions like @pthread_create@
+or @CreateThread@.
+
+Bound threads can be created using the 'forkOS' function below. All foreign
+exported functions are run in a bound thread (bound to the OS thread that
+called the function). Also, the @main@ action of every Haskell program is
+run in a bound thread.
+
+Why do we need this? Because if a foreign library is called from a thread
+created using 'forkIO', it won't have access to any /thread-local state/ -
+state variables that have specific values for each OS thread
+(see POSIX's @pthread_key_create@ or Win32's @TlsAlloc@). Therefore, some
+libraries (OpenGL, for example) will not work from a thread created using
+'forkIO'. They work fine in threads created using 'forkOS' or when called
+from @main@ or from a @foreign export@.
+-}
+
+-- | 'True' if bound threads are supported.
+-- If @rtsSupportsBoundThreads@ is 'False', 'isCurrentThreadBound'
+-- will always return 'False' and both 'forkOS' and 'runInBoundThread' will
+-- fail.
+foreign import ccall rtsSupportsBoundThreads :: Bool
+
+
+{- |
+Like 'forkIO', this sparks off a new thread to run the 'IO' computation passed as the
+first argument, and returns the 'ThreadId' of the newly created
+thread.
+
+However, @forkOS@ uses operating system-supplied multithreading support to create
+a new operating system thread. The new thread is /bound/, which means that
+all foreign calls made by the 'IO' computation are guaranteed to be executed
+in this new operating system thread; also, the operating system thread is not
+used for any other foreign calls.
+
+This means that you can use all kinds of foreign libraries from this thread
+(even those that rely on thread-local state), without the limitations of 'forkIO'.
+-}
+forkOS :: IO () -> IO ThreadId
+
+foreign export ccall forkOS_entry
+ :: StablePtr (IO ()) -> IO ()
+
+foreign import ccall "forkOS_entry" forkOS_entry_reimported
+ :: StablePtr (IO ()) -> IO ()
+
+forkOS_entry stableAction = do
+ action <- deRefStablePtr stableAction
+ action
+
+foreign import ccall forkOS_createThread
+ :: StablePtr (IO ()) -> IO CInt
+
+failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
+ ++"(use ghc -threaded when linking)"
+
+forkOS action
+ | rtsSupportsBoundThreads = do
+ mv <- newEmptyMVar
+ let action_plus = Exception.catch action childHandler
+ entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
+ err <- forkOS_createThread entry
+ when (err /= 0) $ fail "Cannot create OS thread."
+ tid <- takeMVar mv
+ freeStablePtr entry
+ return tid
+ | otherwise = failNonThreaded
+
+-- | Returns 'True' if the calling thread is /bound/, that is, if it is
+-- safe to use foreign libraries that rely on thread-local state from the
+-- calling thread.
+isCurrentThreadBound :: IO Bool
+isCurrentThreadBound = IO $ \ s# ->
+ case isCurrentThreadBound# s# of
+ (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
+
+
+{- |
+Run the 'IO' computation passed as the first argument. If the calling thread
+is not /bound/, a bound thread is created temporarily. @runInBoundThread@
+doesn't finish until the 'IO' computation finishes.
+
+You can wrap a series of foreign function calls that rely on thread-local state
+with @runInBoundThread@ so that you can use them without knowing whether the
+current thread is /bound/.
+-}
+runInBoundThread :: IO a -> IO a
+
+runInBoundThread action
+ | rtsSupportsBoundThreads = do
+ bound <- isCurrentThreadBound
+ if bound
+ then action
+ else do
+ ref <- newIORef undefined
+ let action_plus = Exception.try action >>= writeIORef ref
+ resultOrException <-
+ bracket (newStablePtr action_plus)
+ freeStablePtr
+ (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
+ case resultOrException of
+ Left exception -> Exception.throw exception
+ Right result -> return result
+ | otherwise = failNonThreaded
+
+{- |
+Run the 'IO' computation passed as the first argument. If the calling thread
+is /bound/, an unbound thread is created temporarily using 'forkIO'.
+@runInBoundThread@ doesn't finish until the 'IO' computation finishes.
+
+Use this function /only/ in the rare case that you have actually observed a
+performance loss due to the use of bound threads. A program that
+doesn't need it's main thread to be bound and makes /heavy/ use of concurrency
+(e.g. a web server), might want to wrap it's @main@ action in
+@runInUnboundThread@.
+-}
+runInUnboundThread :: IO a -> IO a
+
+runInUnboundThread action = do
+ bound <- isCurrentThreadBound
+ if bound
+ then do
+ mv <- newEmptyMVar
+ forkIO (Exception.try action >>= putMVar mv)
+ takeMVar mv >>= \either -> case either of
+ Left exception -> Exception.throw exception
+ Right result -> return result
+ else action
+
+#endif /* __GLASGOW_HASKELL__ */
-- ---------------------------------------------------------------------------
-- More docs
> myForkIO :: IO () -> IO (MVar ())
> myForkIO io = do
-> mvar \<- newEmptyMVar
-> forkIO (io \`finally\` putMVar mvar ())
+> mvar <- newEmptyMVar
+> forkIO (io `finally` putMVar mvar ())
> return mvar
Note that we use 'finally' from the
- "Exception" module to make sure that the
+ "Control.Exception" module to make sure that the
'MVar' is written to even if the thread dies or
is killed for some reason.
A better method is to keep a global list of all child
threads which we should wait for at the end of the program:
-> children :: MVar [MVar ()]
-> children = unsafePerformIO (newMVar [])
->
-> waitForChildren :: IO ()
-> waitForChildren = do
-> (mvar:mvars) \<- takeMVar children
-> putMVar children mvars
-> takeMVar mvar
-> waitForChildren
->
-> forkChild :: IO () -> IO ()
-> forkChild io = do
-> mvar \<- newEmptyMVar
-> forkIO (p \`finally\` putMVar mvar ())
-> childs \<- takeMVar children
-> putMVar children (mvar:childs)
->
-> later = flip finally
->
+> children :: MVar [MVar ()]
+> children = unsafePerformIO (newMVar [])
+>
+> waitForChildren :: IO ()
+> waitForChildren = do
+> cs <- takeMVar children
+> case cs of
+> [] -> return ()
+> m:ms -> do
+> putMVar children ms
+> takeMVar m
+> waitForChildren
+>
+> forkChild :: IO () -> IO ()
+> forkChild io = do
+> mvar <- newEmptyMVar
+> childs <- takeMVar children
+> putMVar children (mvar:childs)
+> forkIO (io `finally` putMVar mvar ())
+>
> main =
> later waitForChildren $
> ...
a thread may be pre-empted whenever it allocates some memory,
which unfortunately means that tight loops which do no
allocation tend to lock out other threads (this only seems to
- happen with pathalogical benchmark-style code, however).
+ happen with pathological benchmark-style code, however).
The rescheduling timer runs on a 20ms granularity by
default, but this may be altered using the
- @-i<n>@ RTS option. After a rescheduling
+ @-i\<n\>@ RTS option. After a rescheduling
\"tick\" the running thread is pre-empted as soon as
possible.
One final note: the
@aaaa@ @bbbb@ example may not
work too well on GHC (see Scheduling, above), due
- to the locking on a 'Handle'. Only one thread
- may hold the lock on a 'Handle' at any one
+ to the locking on a 'System.IO.Handle'. Only one thread
+ may hold the lock on a 'System.IO.Handle' at any one
time, so if a reschedule happens while a thread is holding the
lock, the other thread won't be able to run. The upshot is that
the switch from @aaaa@ to