Improvements to the threading model.
- asynchronous exceptions supported.
- killThread# can now raise an exception in the specified
thread. It's new type is
killThread# :: ThreadId# -> Exception -> IO ()
High-level versions:
killThread :: ThreadId -> IO ()
raiseInThread :: ThreadId -> Exception -> IO ()
(killThread raises a 'ThreadKilled' exception in the
specified thread).
If the thread has no exception handler, it is killed
as before. Otherwise, the exception is passed to
the innermost CATCH_FRAME and the thread is woken up
if it was blocked. The current computation is
suspended, instead of being replaced by the exception
(as is the case with throw).
Sending an exception to the current thread works too.
- new primitive: myThreadId# :: IO ThreadId# and corresponding
high-level version myThreadId :: IO ThreadId.
- new primitive: yield# :: IO (), and yield :: IO ().
- the TSO now contains a pointer to the resource currently blocked
on (MVAR or BLACKHOLE_BQ).
- Add a giant comment to TSO.h about what the various link fields
are supposed to do, and invariants etc.
-- concurrency
| ForkOp
| KillThreadOp
+ | YieldOp
+ | MyThreadIdOp
| DelayOp
| WaitReadOp
| WaitWriteOp
tagOf_PrimOp ParOp = ILIT(221)
tagOf_PrimOp ForkOp = ILIT(222)
tagOf_PrimOp KillThreadOp = ILIT(223)
-tagOf_PrimOp DelayOp = ILIT(224)
-tagOf_PrimOp WaitReadOp = ILIT(225)
-tagOf_PrimOp WaitWriteOp = ILIT(226)
-tagOf_PrimOp ParGlobalOp = ILIT(227)
-tagOf_PrimOp ParLocalOp = ILIT(228)
-tagOf_PrimOp ParAtOp = ILIT(229)
-tagOf_PrimOp ParAtAbsOp = ILIT(230)
-tagOf_PrimOp ParAtRelOp = ILIT(231)
-tagOf_PrimOp ParAtForNowOp = ILIT(232)
-tagOf_PrimOp CopyableOp = ILIT(233)
-tagOf_PrimOp NoFollowOp = ILIT(234)
-tagOf_PrimOp NewMutVarOp = ILIT(235)
-tagOf_PrimOp ReadMutVarOp = ILIT(236)
-tagOf_PrimOp WriteMutVarOp = ILIT(237)
-tagOf_PrimOp SameMutVarOp = ILIT(238)
-tagOf_PrimOp CatchOp = ILIT(239)
-tagOf_PrimOp RaiseOp = ILIT(240)
+tagOf_PrimOp YieldOp = ILIT(224)
+tagOf_PrimOp MyThreadIdOp = ILIT(225)
+tagOf_PrimOp DelayOp = ILIT(226)
+tagOf_PrimOp WaitReadOp = ILIT(227)
+tagOf_PrimOp WaitWriteOp = ILIT(228)
+tagOf_PrimOp ParGlobalOp = ILIT(229)
+tagOf_PrimOp ParLocalOp = ILIT(230)
+tagOf_PrimOp ParAtOp = ILIT(231)
+tagOf_PrimOp ParAtAbsOp = ILIT(232)
+tagOf_PrimOp ParAtRelOp = ILIT(233)
+tagOf_PrimOp ParAtForNowOp = ILIT(234)
+tagOf_PrimOp CopyableOp = ILIT(235)
+tagOf_PrimOp NoFollowOp = ILIT(236)
+tagOf_PrimOp NewMutVarOp = ILIT(237)
+tagOf_PrimOp ReadMutVarOp = ILIT(238)
+tagOf_PrimOp WriteMutVarOp = ILIT(239)
+tagOf_PrimOp SameMutVarOp = ILIT(240)
+tagOf_PrimOp CatchOp = ILIT(241)
+tagOf_PrimOp RaiseOp = ILIT(242)
tagOf_PrimOp op = pprPanic# "tagOf_PrimOp: pattern-match" (ppr op)
--panic# "tagOf_PrimOp: pattern-match"
ParOp,
ForkOp,
KillThreadOp,
+ YieldOp,
+ MyThreadIdOp,
DelayOp,
WaitReadOp,
WaitWriteOp
-- killThread# :: ThreadId# -> State# RealWorld -> State# RealWorld
primOpInfo KillThreadOp
- = mkGenPrimOp SLIT("killThread#") []
- [threadIdPrimTy, realWorldStatePrimTy]
+ = mkGenPrimOp SLIT("killThread#") [alphaTyVar]
+ [threadIdPrimTy, alphaTy, realWorldStatePrimTy]
realWorldStatePrimTy
+
+-- yield# :: State# RealWorld -> State# RealWorld
+primOpInfo YieldOp
+ = mkGenPrimOp SLIT("yield#") []
+ [realWorldStatePrimTy]
+ realWorldStatePrimTy
+
+-- myThreadId# :: State# RealWorld -> (# State# RealWorld, ThreadId# #)
+primOpInfo MyThreadIdOp
+ = mkGenPrimOp SLIT("myThreadId#") []
+ [realWorldStatePrimTy]
+ (unboxedPair [realWorldStatePrimTy, threadIdPrimTy])
\end{code}
************************************************************************
NewMVarOp -> True
ForkOp -> True
KillThreadOp -> True
+ YieldOp -> True
CCallOp _ _ may_gc@True _ -> True -- _ccall_GC_
-- the next one doesn't perform any heap checks,
-- but it is of such an esoteric nature that
primOpHasSideEffects ParOp = True
primOpHasSideEffects ForkOp = True
primOpHasSideEffects KillThreadOp = True
+primOpHasSideEffects YieldOp = True
primOpHasSideEffects SeqOp = True
primOpHasSideEffects MakeForeignObjOp = True
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.23 1999/03/05 10:21:29 sof Exp $
+ * $Id: PrimOps.h,v 1.24 1999/03/16 13:20:09 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
-------------------------------------------------------------------------- */
EF_(forkzh_fast);
+EF_(yieldzh_fast);
EF_(killThreadzh_fast);
EF_(seqzh_fast);
+#define myThreadIdzh(t) (t = CurrentTSO)
+
/* Hmm, I'll think about these later. */
/* -----------------------------------------------------------------------------
Pointer equality
/* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.5 1999/03/02 19:44:22 sof Exp $
+ * $Id: TSO.h,v 1.6 1999/03/16 13:20:10 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
struct StgTSO_* link;
StgMutClosure * mut_link; /* TSO's are mutable of course! */
StgTSOWhatNext whatNext;
- StgTSOState state; /* necessary? */
+ StgClosure * blocked_on;
StgThreadID id;
- /* Exception Handlers */
StgTSOTickyInfo ticky;
StgTSOProfInfo prof;
StgTSOParInfo par;
extern DLL_IMPORT_RTS StgTSO *CurrentTSO;
+/* -----------------------------------------------------------------------------
+ Invariants:
+
+ An active thread has the following properties:
+
+ tso->stack < tso->sp < tso->stack+tso->stack_size
+ tso->stack_size <= tso->max_stack_size
+ tso->splim == tso->stack + RESERVED_STACK_WORDS;
+
+ RESERVED_STACK_WORDS is large enough for any heap-check or
+ stack-check failure.
+
+ The size of the TSO struct plus the stack is either
+ (a) smaller than a block, or
+ (b) a multiple of BLOCK_SIZE
+
+ tso->link
+ == END_TSO_QUEUE , iff the thread is currently running.
+ == (StgTSO *) , otherwise, and it is linked onto either:
+
+ - the runnable_queue tso->blocked_on == END_TSO_QUEUE
+ - the blocked_queue tso->blocked_on == END_TSO_QUEUE
+ - a BLACKHOLE_BQ, tso->blocked_on == the BLACKHOLE_BQ
+ - an MVAR, tso->blocked_on == the MVAR
+
+ A zombie thread has the following properties:
+
+ tso->whatNext == ThreadComplete or ThreadKilled
+ tso->link == (could be on some queue somewhere)
+ tso->su == tso->stack + tso->stack_size
+ tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word)
+ tso->sp[0] == return value of thread, if whatNext == ThreadComplete,
+ exception , if whatNext == ThreadKilled
+
+ (tso->sp is left pointing at the top word on the stack so that
+ the return value or exception will be retained by a GC).
+
+ ---------------------------------------------------------------------------- */
/* Workaround for a bug/quirk in gcc on certain architectures.
* symptom is that (&tso->stack - &tso->header) /= sizeof(StgTSO)
% -----------------------------------------------------------------------------
-% $Id: Exception.lhs,v 1.5 1999/01/19 09:57:12 sof Exp $
+% $Id: Exception.lhs,v 1.6 1999/03/16 13:20:11 simonm Exp $
%
% (c) The GRAP/AQUA Project, Glasgow University, 1998
%
-- Throwing exceptions
throw, -- :: Exception -> a
+ raiseInThread, -- :: ThreadId -> Exception -> a
-- Dynamic exceptions
import Prelude hiding (catch)
import PrelGHC (catch#)
import PrelException hiding (catch)
+import PrelConc ( raiseInThread )
#endif
import Dynamic
\begin{code}
{-# OPTIONS -fno-implicit-prelude #-}
-module PrelConc (
-
- -- Thread Ids
- ThreadId,
-
- -- Forking and suchlike
- forkIO,
- killThread,
- par, fork, seq,
+module PrelConc
+
+ -- Thread Ids
+ ( ThreadId -- abstract
+
+ -- Forking and suchlike
+ , forkIO -- :: IO () -> IO ThreadId
+ , myThreadId -- :: IO ThreadId
+ , killThread -- :: ThreadId -> IO ()
+ , raiseInThread -- :: ThreadId -> Exception -> IO ()
+ , par -- :: a -> b -> b
+ , fork -- :: a -> b -> b
+ , seq -- :: a -> b -> b
{-threadDelay, threadWaitRead, threadWaitWrite,-}
- -- MVars
- MVar
- , newMVar
- , newEmptyMVar
- , takeMVar
- , putMVar
- , readMVar
- , swapMVar
- -- use with care (see comment.)
- , isEmptyMVar
+ -- MVars
+ , MVar -- abstract
+ , newMVar -- :: a -> IO (MVar a)
+ , newEmptyMVar -- :: IO (MVar a)
+ , takeMVar -- :: MVar a -> IO a
+ , putMVar -- :: MVar a -> a -> IO ()
+ , readMVar -- :: MVar a -> IO a
+ , swapMVar -- :: MVar a -> a -> IO a
+ , isEmptyMVar -- :: MVar a -> IO Bool
+
) where
import PrelBase
import PrelST ( liftST )
import PrelIOBase ( IO(..), MVar(..), unsafePerformIO )
import PrelBase ( Int(..) )
+import PrelException ( Exception(..), AsyncException(..) )
infixr 0 `par`, `fork`
\end{code}
\begin{code}
data ThreadId = ThreadId ThreadId#
--- ToDo: data ThreadId = ThreadId (WeakPair ThreadId# ())
--- But since ThreadId# is unlifted, the WeakPair type must use open
+-- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
+-- But since ThreadId# is unlifted, the Weak type must use open
-- type variables.
forkIO :: IO () -> IO ThreadId
killThread :: ThreadId -> IO ()
killThread (ThreadId id) = IO $ \ s ->
- case (killThread# id s) of s1 -> (# s1, () #)
+ case (killThread# id (AsyncException ThreadKilled) s) of s1 -> (# s1, () #)
+
+raiseInThread :: ThreadId -> Exception -> IO ()
+raiseInThread (ThreadId id) ex = IO $ \ s ->
+ case (killThread# id ex s) of s1 -> (# s1, () #)
+
+myThreadId :: IO ThreadId
+myThreadId = IO $ \s ->
+ case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #)
-- "seq" is defined a bit wierdly (see below)
--
-- Concurrency primitives
ThreadIdzh
+ myThreadIdzh
forkzh
+ yieldzh
killThreadzh
delayzh
waitReadzh
/* -----------------------------------------------------------------------------
- * $Id: GC.c,v 1.52 1999/03/15 16:53:11 simonm Exp $
+ * $Id: GC.c,v 1.53 1999/03/16 13:20:13 simonm Exp $
*
* (c) The GHC Team 1998-1999
*
evac_gen = 0;
/* chase the link field for any TSOs on the same queue */
(StgClosure *)tso->link = evacuate((StgClosure *)tso->link);
+ if (tso->blocked_on) {
+ tso->blocked_on = evacuate(tso->blocked_on);
+ }
/* scavenge this thread's stack */
scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
evac_gen = saved_evac_gen;
}
case TSO:
- /* follow ptrs and remove this from the mutable list */
{
StgTSO *tso = (StgTSO *)p;
- /* Don't bother scavenging if this thread is dead
- */
- if (!(tso->whatNext == ThreadComplete ||
- tso->whatNext == ThreadKilled)) {
- /* Don't need to chase the link field for any TSOs on the
- * same queue. Just scavenge this thread's stack
- */
- scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
+ (StgClosure *)tso->link = evacuate((StgClosure *)tso->link);
+ if (tso->blocked_on) {
+ tso->blocked_on = evacuate(tso->blocked_on);
}
+ scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
/* Don't take this TSO off the mutable list - it might still
* point to some younger objects (because we set evac_gen to 0
tso = (StgTSO *)p;
/* chase the link field for any TSOs on the same queue */
(StgClosure *)tso->link = evacuate((StgClosure *)tso->link);
+ if (tso->blocked_on) {
+ tso->blocked_on = evacuate(tso->blocked_on);
+ }
/* scavenge this thread's stack */
scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
continue;
EXTFUN(stg_gen_chk);
EXTFUN(stg_gen_hp);
EXTFUN(stg_gen_yield);
+EXTFUN(stg_yield_noregs);
EXTFUN(stg_yield_to_Hugs);
EXTFUN(stg_gen_block);
EXTFUN(stg_block_1);
/* -----------------------------------------------------------------------------
- * $Id: HeapStackCheck.hc,v 1.3 1999/02/05 16:02:43 simonm Exp $
+ * $Id: HeapStackCheck.hc,v 1.4 1999/03/16 13:20:15 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
FE_
}
+FN_(stg_yield_noregs)
+{
+ FB_
+ YIELD_GENERIC
+ FE_
+}
+
FN_(stg_yield_to_Hugs)
{
FB_
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.21 1999/03/05 10:21:27 sof Exp $
+ * $Id: PrimOps.hc,v 1.22 1999/03/16 13:20:15 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
#include "Storage.h"
#include "BlockAlloc.h" /* tmp */
#include "StablePriv.h"
+#include "HeapStackCheck.h"
+#include "StgRun.h"
/* ** temporary **
TICK_ALLOC_PRIM(sizeofW(StgArrWords),1,0);
CCS_ALLOC(CCCS,sizeofW(StgArrWords)+1); /* ccs prof */
- p = stgCast(StgArrWords*,Hp)-1;
+ p = (StgArrWords *)Hp - 1;
SET_ARR_HDR(p, &ARR_WORDS_info, CCCS, 1);
/* mpz_set_si is inlined here, makes things simpler */
TICK_ALLOC_PRIM(sizeofW(StgArrWords),1,0);
CCS_ALLOC(CCCS,sizeofW(StgArrWords)+1); /* ccs prof */
- p = stgCast(StgArrWords*,Hp)-1;
+ p = (StgArrWords *)Hp - 1;
SET_ARR_HDR(p, &ARR_WORDS_info, CCCS, 1);
if (val != 0) {
TICK_ALLOC_PRIM(sizeofW(StgArrWords),words_needed,0);
CCS_ALLOC(CCCS,sizeofW(StgArrWords)+words_needed); /* ccs prof */
- p = stgCast(StgArrWords*,(Hp-words_needed+1))-1;
+ p = (StgArrWords *)(Hp-words_needed+1) - 1;
SET_ARR_HDR(p, &ARR_WORDS_info, CCCS, words_needed);
if ( val < 0LL ) {
TICK_ALLOC_PRIM(sizeofW(StgArrWords),words_needed,0);
CCS_ALLOC(CCCS,sizeofW(StgArrWords)+words_needed); /* ccs prof */
- p = stgCast(StgArrWords*,(Hp-words_needed+1))-1;
+ p = (StgArrWords *)(Hp-words_needed+1) - 1;
SET_ARR_HDR(p, &ARR_WORDS_info, CCCS, words_needed);
hi = (W_)((LW_)val / 0x100000000ULL);
/* Be prepared to tell Lennart-coded __decodeFloat */
/* where mantissa._mp_d can be put (it does not care about the rest) */
- p = stgCast(StgArrWords*,Hp)-1;
+ p = (StgArrWords *)Hp - 1;
SET_ARR_HDR(p,&ARR_WORDS_info,CCCS,1)
mantissa._mp_d = (void *)BYTE_ARR_CTS(p);
/* Be prepared to tell Lennart-coded __decodeDouble */
/* where mantissa.d can be put (it does not care about the rest) */
- p = stgCast(StgArrWords*,Hp-ARR_SIZE+1);
+ p = (StgArrWords *)(Hp-ARR_SIZE+1);
SET_ARR_HDR(p, &ARR_WORDS_info, CCCS, DOUBLE_MANTISSA_SIZE);
mantissa._mp_d = (void *)BYTE_ARR_CTS(p);
FB_
/* args: R1 = closure to spark */
- if (closure_SHOULD_SPARK(stgCast(StgClosure*,R1.p))) {
+ if (closure_SHOULD_SPARK(R1.cl)) {
MAYBE_GC(R1_PTR, forkzh_fast);
FE_
}
+FN_(yieldzh_fast)
+{
+ FB_
+ JMP_(stg_yield_noregs)
+ FE_
+}
+
FN_(killThreadzh_fast)
{
FB_
- /* args: R1.p = TSO to kill */
+ /* args: R1.p = TSO to kill, R2.p = Exception */
/* The thread is dead, but the TSO sticks around for a while. That's why
* we don't have to explicitly remove it from any queues it might be on.
*/
- STGCALL1(deleteThread, (StgTSO *)R1.p);
- /* We might have killed ourselves. In which case, better return to the
- * scheduler...
+ /* We might have killed ourselves. In which case, better be *very*
+ * careful. If the exception killed us, then return to the scheduler.
+ * If the exception went to a catch frame, we'll just continue from
+ * the handler.
*/
- if ((StgTSO *)R1.p == CurrentTSO) {
- JMP_(stg_stop_thread_entry); /* leave semi-gracefully */
+ if (R1.t == CurrentTSO) {
+ SaveThreadState(); /* inline! */
+ STGCALL2(raiseAsync, R1.t, R2.cl);
+ if (CurrentTSO->whatNext == ThreadKilled) {
+ R1.w = ThreadYielding;
+ JMP_(StgReturn);
+ }
+ LoadThreadState();
+ if (CurrentTSO->whatNext == ThreadEnterGHC) {
+ R1.w = Sp[0];
+ Sp++;
+ JMP_(GET_ENTRY(R1.cl));
+ } else {
+ barf("killThreadzh_fast");
+ }
+ } else {
+ STGCALL2(raiseAsync, R1.t, R2.cl);
}
JMP_(ENTRY_CODE(Sp[0]));
/* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.15 1999/03/08 16:41:42 sof Exp $
+ * $Id: Schedule.c,v 1.16 1999/03/16 13:20:16 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
/* -----------------------------------------------------------------------------
+ * Static functions
+ * -------------------------------------------------------------------------- */
+static void unblockThread(StgTSO *tso);
+
+/* -----------------------------------------------------------------------------
Create a new thread.
The new thread starts with the given stack size. Before the
{
SET_INFO(tso,&TSO_info);
tso->whatNext = ThreadEnterGHC;
- tso->state = tso_state_runnable;
tso->id = next_thread_id++;
+ tso->blocked_on = NULL;
tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
tso->stack_size = stack_size;
}
/* -----------------------------------------------------------------------------
- Delete a thread - reverting all blackholes to (something
- equivalent to) their former state.
-
- We create an AP_UPD for every UpdateFrame on the stack.
- Entering one of these AP_UPDs pushes everything from the corresponding
- update frame upwards onto the stack. (Actually, it pushes everything
- up to the next update frame plus a pointer to the next AP_UPD
- object. Entering the next AP_UPD object pushes more onto the
- stack until we reach the last AP_UPD object - at which point
- the stack should look exactly as it did when we killed the TSO
- and we can continue execution by entering the closure on top of
- the stack.
- -------------------------------------------------------------------------- */
-
-void deleteThread(StgTSO *tso)
-{
- StgUpdateFrame* su = tso->su;
- StgPtr sp = tso->sp;
-
- /* Thread already dead? */
- if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
- return;
- }
-
- IF_DEBUG(scheduler, belch("Killing thread %ld.", tso->id));
-
- tso->whatNext = ThreadKilled; /* changed to ThreadComplete in schedule() */
- tso->link = END_TSO_QUEUE; /* Just to be on the safe side... */
-
- /* Threads that finish normally leave Su pointing to the word
- * beyond the top of the stack, and Sp pointing to the last word
- * on the stack, which is the return value of the thread.
- */
- if ((P_)tso->su >= tso->stack + tso->stack_size
- || get_itbl(tso->su)->type == STOP_FRAME) {
- return;
- }
-
- IF_DEBUG(scheduler,
- fprintf(stderr, "Freezing TSO stack\n");
- printTSO(tso);
- );
-
- /* The stack freezing code assumes there's a closure pointer on
- * the top of the stack. This isn't always the case with compiled
- * code, so we have to push a dummy closure on the top which just
- * returns to the next return address on the stack.
- */
- if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
- *(--sp) = (W_)&dummy_ret_closure;
- }
-
- while (1) {
- int words = (stgCast(StgPtr,su) - stgCast(StgPtr,sp)) - 1;
- nat i;
- StgAP_UPD* ap = stgCast(StgAP_UPD*,allocate(AP_sizeW(words)));
- TICK_ALLOC_THK(words+1,0);
-
- /* First build an AP_UPD consisting of the stack chunk above the
- * current update frame, with the top word on the stack as the
- * fun field.
- */
- ASSERT(words >= 0);
-
- /* if (words == 0) { -- optimisation
- ap = stgCast(StgAP_UPD*,*stgCast(StgPtr*,sp)++);
- } else */ {
- ap->n_args = words;
- ap->fun = stgCast(StgClosure*,*stgCast(StgPtr*,sp)++);
- for(i=0; i < (nat)words; ++i) {
- payloadWord(ap,i) = *sp++;
- }
- }
-
- switch (get_itbl(su)->type) {
-
- case UPDATE_FRAME:
- {
- SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
-
- IF_DEBUG(scheduler,
- fprintf(stderr, "Updating ");
- printPtr(stgCast(StgPtr,su->updatee));
- fprintf(stderr, " with ");
- printObj(stgCast(StgClosure*,ap));
- );
-
- /* Replace the updatee with an indirection - happily
- * this will also wake up any threads currently
- * waiting on the result.
- */
- UPD_IND(su->updatee,ap); /* revert the black hole */
- su = su->link;
- sp += sizeofW(StgUpdateFrame) -1;
- sp[0] = stgCast(StgWord,ap); /* push onto stack */
- break;
- }
-
- case CATCH_FRAME:
- {
- StgCatchFrame *cf = (StgCatchFrame *)su;
- StgClosure* o;
-
- /* We want a PAP, not an AP_UPD. Fortunately, the
- * layout's the same.
- */
- SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
-
- /* now build o = FUN(catch,ap,handler) */
- o = stgCast(StgClosure*, allocate(sizeofW(StgClosure)+2));
- TICK_ALLOC_THK(2,0);
- SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
- payloadCPtr(o,0) = stgCast(StgClosure*,ap);
- payloadCPtr(o,1) = cf->handler;
-
- IF_DEBUG(scheduler,
- fprintf(stderr, "Built ");
- printObj(stgCast(StgClosure*,o));
- );
-
- /* pop the old handler and put o on the stack */
- su = cf->link;
- sp += sizeofW(StgCatchFrame) - 1;
- sp[0] = (W_)o;
- break;
- }
-
- case SEQ_FRAME:
- {
- StgSeqFrame *sf = (StgSeqFrame *)su;
- StgClosure* o;
-
- SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
-
- /* now build o = FUN(seq,ap) */
- o = stgCast(StgClosure*, allocate(sizeofW(StgClosure)+1));
- TICK_ALLOC_THK(1,0);
- SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
- payloadCPtr(o,0) = stgCast(StgClosure*,ap);
-
- IF_DEBUG(scheduler,
- fprintf(stderr, "Built ");
- printObj(stgCast(StgClosure*,o));
- );
-
- /* pop the old handler and put o on the stack */
- su = sf->link;
- sp += sizeofW(StgSeqFrame) - 1;
- sp[0] = (W_)o;
- break;
- }
-
- case STOP_FRAME:
- return;
-
- default:
- barf("freezeTSO");
- }
- }
-}
+ * initScheduler()
+ *
+ * Initialise the scheduler. This resets all the queues - if the
+ * queues contained any threads, they'll be garbage collected at the
+ * next pass.
+ * -------------------------------------------------------------------------- */
void initScheduler(void)
{
LoadThreadState();
/* CHECK_SENSIBLE_REGS(); */
{
- StgClosure* c = stgCast(StgClosure*,*Sp);
+ StgClosure* c = (StgClosure *)Sp[0];
Sp += 1;
ret = enter(c);
}
case ThreadFinished:
IF_DEBUG(scheduler,belch("Thread %ld finished\n", t->id));
- deleteThread(t);
t->whatNext = ThreadComplete;
break;
tso = q;
q = tso->link;
PUSH_ON_RUN_QUEUE(tso);
+ tso->blocked_on = NULL;
IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id));
}
}
- usually called inside a signal handler so it mustn't do anything fancy.
-------------------------------------------------------------------------- */
-void interruptStgRts(void)
+void
+interruptStgRts(void)
{
interrupted = 1;
context_switch = 1;
}
+/* -----------------------------------------------------------------------------
+ Unblock a thread
+
+ This is for use when we raise an exception in another thread, which
+ may be blocked.
+ -------------------------------------------------------------------------- */
+
+static void
+unblockThread(StgTSO *tso)
+{
+ StgTSO *t, **last;
+
+ if (tso->blocked_on == NULL) {
+ return; /* not blocked */
+ }
+
+ switch (get_itbl(tso->blocked_on)->type) {
+
+ case MVAR:
+ {
+ StgTSO *last_tso = END_TSO_QUEUE;
+ StgMVar *mvar = (StgMVar *)(tso->blocked_on);
+
+ last = &mvar->head;
+ for (t = mvar->head; t != END_TSO_QUEUE;
+ last = &t->link, last_tso = t, t = t->link) {
+ if (t == tso) {
+ *last = tso->link;
+ if (mvar->tail == tso) {
+ mvar->tail = last_tso;
+ }
+ break;
+ }
+ }
+ }
+
+ case BLACKHOLE_BQ:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)(tso->blocked_on);
+
+ last = &bq->blocking_queue;
+ for (t = bq->blocking_queue; t != END_TSO_QUEUE;
+ last = &t->link, t = t->link) {
+ if (t == tso) {
+ *last = tso->link;
+ break;
+ }
+ }
+ }
+
+ default:
+ barf("unblockThread");
+ }
+
+ tso->link = END_TSO_QUEUE;
+ tso->blocked_on = NULL;
+}
+
+/* -----------------------------------------------------------------------------
+ * raiseAsync()
+ *
+ * The following function implements the magic for raising an
+ * asynchronous exception in an existing thread.
+ *
+ * We first remove the thread from any queue on which it might be
+ * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ *
+ * We strip the stack down to the innermost CATCH_FRAME, building
+ * thunks in the heap for all the active computations, so they can
+ * be restarted if necessary. When we reach a CATCH_FRAME, we build
+ * an application of the handler to the exception, and push it on
+ * the top of the stack.
+ *
+ * How exactly do we save all the active computations? We create an
+ * AP_UPD for every UpdateFrame on the stack. Entering one of these
+ * AP_UPDs pushes everything from the corresponding update frame
+ * upwards onto the stack. (Actually, it pushes everything up to the
+ * next update frame plus a pointer to the next AP_UPD object.
+ * Entering the next AP_UPD object pushes more onto the stack until we
+ * reach the last AP_UPD object - at which point the stack should look
+ * exactly as it did when we killed the TSO and we can continue
+ * execution by entering the closure on top of the stack.
+ *
+ * We can also kill a thread entirely - this happens if either (a) the
+ * exception passed to raiseAsync is NULL, or (b) there's no
+ * CATCH_FRAME on the stack. In either case, we strip the entire
+ * stack and replace the thread with a zombie.
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+deleteThread(StgTSO *tso)
+{
+ raiseAsync(tso,NULL);
+}
+
+void
+raiseAsync(StgTSO *tso, StgClosure *exception)
+{
+ StgUpdateFrame* su = tso->su;
+ StgPtr sp = tso->sp;
+
+ /* Thread already dead? */
+ if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
+ return;
+ }
+
+ IF_DEBUG(scheduler, belch("Raising exception in thread %ld.", tso->id));
+
+ /* Remove it from any blocking queues */
+ unblockThread(tso);
+
+ /* The stack freezing code assumes there's a closure pointer on
+ * the top of the stack. This isn't always the case with compiled
+ * code, so we have to push a dummy closure on the top which just
+ * returns to the next return address on the stack.
+ */
+ if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
+ *(--sp) = (W_)&dummy_ret_closure;
+ }
+
+ while (1) {
+ int words = ((P_)su - (P_)sp) - 1;
+ nat i;
+ StgAP_UPD * ap;
+
+ /* If we find a CATCH_FRAME, and we've got an exception to raise,
+ * then build PAP(handler,exception), and leave it on top of
+ * the stack ready to enter.
+ */
+ if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
+ StgCatchFrame *cf = (StgCatchFrame *)su;
+ /* we've got an exception to raise, so let's pass it to the
+ * handler in this frame.
+ */
+ ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 1);
+ TICK_ALLOC_THK(2,0);
+ SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
+
+ ap->n_args = 1;
+ ap->fun = cf->handler;
+ ap->payload[0] = (P_)exception;
+
+ /* sp currently points to the word above the CATCH_FRAME on the
+ * stack. Replace the CATCH_FRAME with a pointer to the new handler
+ * application.
+ */
+ sp += sizeofW(StgCatchFrame);
+ sp[0] = (W_)ap;
+ tso->su = cf->link;
+ tso->sp = sp;
+ tso->whatNext = ThreadEnterGHC;
+ /* wake up the thread */
+ if (tso->link == END_TSO_QUEUE) {
+ PUSH_ON_RUN_QUEUE(tso);
+ }
+ return;
+ }
+
+ /* First build an AP_UPD consisting of the stack chunk above the
+ * current update frame, with the top word on the stack as the
+ * fun field.
+ */
+ ap = (StgAP_UPD *)allocate(AP_sizeW(words));
+ TICK_ALLOC_THK(words+1,0);
+
+ ASSERT(words >= 0);
+
+ ap->n_args = words;
+ ap->fun = (StgClosure *)sp[0];
+ sp++;
+ for(i=0; i < (nat)words; ++i) {
+ ap->payload[i] = (P_)*sp++;
+ }
+
+ switch (get_itbl(su)->type) {
+
+ case UPDATE_FRAME:
+ {
+ SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "Updating ");
+ printPtr((P_)su->updatee);
+ fprintf(stderr, " with ");
+ printObj((StgClosure *)ap);
+ );
+
+ /* Replace the updatee with an indirection - happily
+ * this will also wake up any threads currently
+ * waiting on the result.
+ */
+ UPD_IND(su->updatee,ap); /* revert the black hole */
+ su = su->link;
+ sp += sizeofW(StgUpdateFrame) -1;
+ sp[0] = (W_)ap; /* push onto stack */
+ break;
+ }
+
+ case CATCH_FRAME:
+ {
+ StgCatchFrame *cf = (StgCatchFrame *)su;
+ StgClosure* o;
+
+ /* We want a PAP, not an AP_UPD. Fortunately, the
+ * layout's the same.
+ */
+ SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+
+ /* now build o = FUN(catch,ap,handler) */
+ o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
+ TICK_ALLOC_THK(2,0);
+ SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
+ o->payload[0] = (StgClosure *)ap;
+ o->payload[1] = cf->handler;
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "Built ");
+ printObj((StgClosure *)o);
+ );
+
+ /* pop the old handler and put o on the stack */
+ su = cf->link;
+ sp += sizeofW(StgCatchFrame) - 1;
+ sp[0] = (W_)o;
+ break;
+ }
+
+ case SEQ_FRAME:
+ {
+ StgSeqFrame *sf = (StgSeqFrame *)su;
+ StgClosure* o;
+
+ SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+
+ /* now build o = FUN(seq,ap) */
+ o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
+ TICK_ALLOC_THK(1,0);
+ SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
+ payloadCPtr(o,0) = (StgClosure *)ap;
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "Built ");
+ printObj((StgClosure *)o);
+ );
+
+ /* pop the old handler and put o on the stack */
+ su = sf->link;
+ sp += sizeofW(StgSeqFrame) - 1;
+ sp[0] = (W_)o;
+ break;
+ }
+
+ case STOP_FRAME:
+ /* We've stripped the entire stack, the thread is now dead. */
+ sp += sizeofW(StgStopFrame) - 1;
+ sp[0] = (W_)exception; /* save the exception */
+ tso->whatNext = ThreadKilled;
+ tso->su = (StgUpdateFrame *)(sp+1);
+ tso->sp = sp;
+ return;
+
+ default:
+ barf("raiseAsync");
+ }
+ }
+ barf("raiseAsync");
+}
/* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.4 1999/03/02 20:04:04 sof Exp $
+ * $Id: Schedule.h,v 1.5 1999/03/16 13:20:17 simonm Exp $
*
* (c) The GHC Team 1998-1999
*
*/
void awaken_blocked_queue(StgTSO *tso);
-
void initThread(StgTSO *tso, nat stack_size);
-
void interruptStgRts(void);
+void raiseAsync(StgTSO *tso, StgClosure *exception);
extern nat context_switch;
/* -----------------------------------------------------------------------------
- * $Id: StgMiscClosures.hc,v 1.17 1999/03/15 16:30:29 simonm Exp $
+ * $Id: StgMiscClosures.hc,v 1.18 1999/03/16 13:20:17 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
/* Put ourselves on the blocking queue for this black hole */
CurrentTSO->link = (StgTSO *)&END_TSO_QUEUE_closure;
((StgBlockingQueue *)R1.p)->blocking_queue = CurrentTSO;
+ CurrentTSO->blocked_on = R1.cl;
recordMutable((StgMutClosure *)R1.cl);
/* stg_gen_block is too heavyweight, use a specialised one */
TICK_ENT_BH();
/* Put ourselves on the blocking queue for this black hole */
+ CurrentTSO->blocked_on = R1.cl;
CurrentTSO->link = ((StgBlockingQueue *)R1.p)->blocking_queue;
((StgBlockingQueue *)R1.p)->blocking_queue = CurrentTSO;
/* Put ourselves on the blocking queue for this black hole */
CurrentTSO->link = (StgTSO *)&END_TSO_QUEUE_closure;
((StgBlockingQueue *)R1.p)->blocking_queue = CurrentTSO;
+ CurrentTSO->blocked_on = R1.cl;
recordMutable((StgMutClosure *)R1.cl);
/* stg_gen_block is too heavyweight, use a specialised one */
/* -----------------------------------------------------------------------------
- * $Id: Storage.c,v 1.16 1999/03/02 19:50:12 sof Exp $
+ * $Id: Storage.c,v 1.17 1999/03/16 13:20:18 simonm Exp $
*
* (c) The GHC Team, 1998-1999
*
info = get_itbl(caf);
ASSERT(info->type == IND_STATIC);
+#if 0
STATIC_LINK2(info,caf) = caf_list;
caf_list = caf;
+#endif
}
#endif
}
array \
ccall \
codeGen \
+ concurrent \
deSugar \
deriving \
lib \
#-----------------------------------------------------------------------------
-# $Id: Makefile,v 1.1 1998/07/27 12:37:57 simonm Exp $
+# $Id: Makefile,v 1.2 1999/03/16 13:20:21 simonm Exp $
TOP = ../..
include $(TOP)/mk/boilerplate.mk
include $(TOP)/mk/should_run.mk
-SRC_HC_OPTS += -dcore-lint -syslib concurrent
+conc009_RUNTEST_OPTS = -x 1
+
+SRC_HC_OPTS += -dcore-lint -syslib concurrent -fglasgow-exts
include $(TOP)/mk/target.mk
--- /dev/null
+{-# OPTIONS -fglasgow-exts #-}
+
+module Main where
+
+import Concurrent
+import Exception
+
+-- Send ourselves a KillThread signal, catch it and recover.
+
+main = do
+ id <- myThreadId
+ catchAllIO (killThread id) (\e -> putStr (show e))
--- /dev/null
+thread killed
\ No newline at end of file
--- /dev/null
+{-# OPTIONS -fglasgow-exts #-}
+
+module Main where
+
+import Concurrent
+import Exception
+
+main = do
+ id <- myThreadId
+ raiseInThread id (ErrorCall "hello")
--- /dev/null
+
+Fail: hello
--- /dev/null
+{-# OPTIONS -fglasgow-exts #-}
+
+module Main where
+
+import Concurrent
+import Exception
+
+-- Raise an exception in another thread. We need a lot of synchronisation here:
+
+-- - an MVar for the second thread to block on which it waits for the
+-- signal (block)
+
+-- - an MVar to signal the main thread that the second thread is ready to
+-- accept the signal (ready)
+
+-- - an MVar to signal the main thread that the second thread has received
+-- the signal (ready2). If we don't have this MVar, then the main
+-- thread could exit before the second thread has time to print
+-- the result.
+
+main = do
+ block <- newEmptyMVar
+ ready <- newEmptyMVar
+ ready2 <- newEmptyMVar
+ id <- forkIO (catchAllIO (putMVar ready () >> takeMVar block)
+ (\e -> putStr (show e) >> putMVar ready2 ()))
+ takeMVar ready
+ raiseInThread id (ErrorCall "hello")
+ takeMVar ready2
--- /dev/null
+hello
\ No newline at end of file