-----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
+-- $Id: primops.txt.pp,v 1.29 2003/09/21 22:20:51 wolfgang Exp $
--
-- Primitive Operations
--
with
has_side_effects = True
out_of_line = True
+
+primop IsCurrentThreadBoundOp "isCurrentThreadBound#" GenPrimOp
+ State# RealWorld -> (# State# RealWorld, Int# #)
+ with
+ out_of_line = True
------------------------------------------------------------------------
section "Weak pointers"
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
+ * $Id: PrimOps.h,v 1.104 2003/09/21 22:20:52 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
EXTFUN_RTS(unblockAsyncExceptionszh_fast);
EXTFUN_RTS(myThreadIdzh_fast);
EXTFUN_RTS(labelThreadzh_fast);
+EXTFUN_RTS(isCurrentThreadBoundzh_fast);
extern int cmp_thread(StgPtr tso1, StgPtr tso2);
extern int rts_getThreadId(StgPtr tso);
-------------------------------------------------------------------------- */
#define ForeignObj_CLOSURE_DATA(c) (((StgForeignObj *)c)->data)
+
#endif /* PRIMOPS_H */
/* ----------------------------------------------------------------------------
- * $Id: RtsAPI.h,v 1.35 2003/08/22 22:38:02 sof Exp $
+ * $Id: RtsAPI.h,v 1.36 2003/09/21 22:20:52 wolfgang Exp $
*
* (c) The GHC Team, 1998-1999
*
SchedulerStatus
rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret );
-#if defined(COMPILING_RTS_MAIN)
-/* Used by the RTS' main() only */
-SchedulerStatus
-rts_mainLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
-#endif
-
SchedulerStatus
rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret );
SchedulerStatus
-rts_evalLazyIO ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
+rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
+
+SchedulerStatus
+rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
void
rts_checkSchedStatus ( char* site, SchedulerStatus rc);
/* -----------------------------------------------------------------------------
- * $Id: SchedAPI.h,v 1.17 2002/12/27 12:33:21 panne Exp $
+ * $Id: SchedAPI.h,v 1.18 2003/09/21 22:20:53 wolfgang Exp $
*
* (c) The GHC Team 1998-2002
*
#define NO_PRI 0
#endif
-extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret);
+extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret,
+ Capability *initialCapability);
/*
* Creating threads
extern void taskStart(void);
#endif
extern void scheduleThread(StgTSO *tso);
-extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret);
+extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret,
+ Capability *initialCapability);
static inline void pushClosure (StgTSO *tso, StgWord c) {
tso->sp--;
/* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
+ * $Id: TSO.h,v 1.32 2003/09/21 22:20:53 wolfgang Exp $
*
* (c) The GHC Team, 1998-1999
*
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
-
+ int saved_errno;
+
StgTSOTickyInfo ticky;
StgTSOProfInfo prof;
StgTSOParInfo par;
free_capabilities = (*cap)->link;
rts_n_free_capabilities--;
#endif
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): got capability\n",
+ osThreadId()));
}
/*
signalCondition(&thread_ready_cond);
}
#endif
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): released capability\n",
+ osThreadId()));
return;
}
grabReturnCapability(Mutex* pMutex, Capability** pCap)
{
IF_DEBUG(scheduler,
- fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
+ fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId()));
IF_DEBUG(scheduler,
- fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
+ fprintf(stderr,"worker (%p): returning; workers waiting: %d\n",
osThreadId(), rts_n_waiting_workers));
if ( noCapabilities() ) {
rts_n_waiting_workers++;
-------------------------------------------------------------------------- */
/*
- * Function: yieldToReturningWorker(Mutex*,Capability*)
+ * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*)
*
* Purpose: when, upon entry to the Scheduler, an OS worker thread
* spots that one or more threads are blocked waiting for
* permission to return back their result, it gives up
- * its Capability.
+ * its Capability.
+ * Immediately afterwards, it tries to reaquire the Capabilty
+ * using waitForWorkCapability.
+ *
*
* Pre-condition: pMutex is assumed held and the thread possesses
* a Capability.
- * Post-condition: pMutex is held and the Capability has
- * been given back.
+ * Post-condition: pMutex is held and the thread possesses
+ * a Capability.
*/
void
-yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
+yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
{
if ( rts_n_waiting_workers > 0 ) {
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
releaseCapability(*pCap);
/* And wait for work */
- waitForWorkCapability(pMutex, pCap, rtsFalse);
+ waitForWorkCapability(pMutex, pCap, pThreadCond);
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
osThreadId()));
/*
- * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool)
+ * Function: waitForWorkCapability(Mutex*, Capability**, Condition*)
*
* Purpose: wait for a Capability to become available. In
* the process of doing so, updates the number
* work. That counter is used when deciding whether or
* not to create a new worker thread when an external
* call is made.
+ * If pThreadCond is not NULL, a capability can be specifically
+ * passed to this thread using passCapability.
*
* Pre-condition: pMutex is held.
* Post-condition: pMutex is held and *pCap is held by the current thread
*/
+
+static Condition *passTarget = NULL;
+
void
-waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
+waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
{
- while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) {
- rts_n_waiting_tasks++;
- waitCondition(&thread_ready_cond, pMutex);
- rts_n_waiting_tasks--;
+#ifdef SMP
+ #error SMP version not implemented
+#endif
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
+ osThreadId(),pThreadCond));
+ while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
+ || (!pThreadCond && passTarget)) {
+ if(pThreadCond)
+ {
+ waitCondition(pThreadCond, pMutex);
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): get passed capability\n",
+ osThreadId()));
+ }
+ else
+ {
+ rts_n_waiting_tasks++;
+ waitCondition(&thread_ready_cond, pMutex);
+ rts_n_waiting_tasks--;
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): get normal capability\n",
+ osThreadId()));
+ }
}
+ passTarget = NULL;
grabCapability(pCap);
return;
}
+/*
+ * Function: passCapability(Mutex*, Capability*, Condition*)
+ *
+ * Purpose: Let go of the capability and make sure the thread associated
+ * with the Condition pTargetThreadCond gets it next.
+ *
+ * Pre-condition: pMutex is held and cap is held by the current thread
+ * Post-condition: pMutex is held; cap will be grabbed by the "target"
+ * thread when pMutex is released.
+ */
+
+void
+passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
+{
+#ifdef SMP
+ #error SMP version not implemented
+#endif
+ rts_n_free_capabilities = 1;
+ signalCondition(pTargetThreadCond);
+ passTarget = pTargetThreadCond;
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): passCapability\n",
+ osThreadId()));
+}
+
+
#endif /* RTS_SUPPORTS_THREADS */
#if defined(SMP)
extern nat rts_n_waiting_workers;
extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
-extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap);
-extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
-
+extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
+extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
+extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
static inline rtsBool needToYieldToReturningWorker(void)
{
#include "Disassembler.h"
#include "Interpreter.h"
+#include <string.h> /* for memcpy */
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
/* --------------------------------------------------------------------------
* The bytecode interpreter
memcpy(arguments, Sp, sizeof(W_) * stk_offset);
#endif
+ // Restore the Haskell thread's current value of errno
+ errno = cap->r.rCurrentTSO->saved_errno;
+
// There are a bunch of non-ptr words on the stack (the
// ccall args, the ccall fun address and space for the
// result), which we need to cover with an info table
LOAD_STACK_POINTERS;
Sp += ret_dyn_size;
+ // Save the Haskell thread's current value of errno
+ cap->r.rCurrentTSO->saved_errno = errno;
+
#ifdef RTS_SUPPORTS_THREADS
// Threaded RTS:
// Copy the "arguments", which might include a return value,
/* -----------------------------------------------------------------------------
- * $Id: Linker.c,v 1.129 2003/09/11 15:12:25 wolfgang Exp $
+ * $Id: Linker.c,v 1.130 2003/09/21 22:20:54 wolfgang Exp $
*
* (c) The GHC Team, 2000-2003
*
SymX(int2Integerzh_fast) \
SymX(integer2Intzh_fast) \
SymX(integer2Wordzh_fast) \
+ SymX(isCurrentThreadBoundzh_fast) \
SymX(isDoubleDenormalized) \
SymX(isDoubleInfinite) \
SymX(isDoubleNaN) \
SymX(rts_eval) \
SymX(rts_evalIO) \
SymX(rts_evalLazyIO) \
+ SymX(rts_evalStableIO) \
SymX(rts_eval_) \
SymX(rts_getBool) \
SymX(rts_getChar) \
SymX(rts_mkWord64) \
SymX(rts_mkWord8) \
SymX(rts_unlock) \
+ SymX(rtsSupportsBoundThreads) \
SymX(run_queue_hd) \
SymX(setProgArgv) \
SymX(startupHaskell) \
/* -----------------------------------------------------------------------------
- * $Id: Main.c,v 1.39 2003/07/10 08:02:29 simonpj Exp $
+ * $Id: Main.c,v 1.40 2003/09/21 22:20:55 wolfgang Exp $
*
* (c) The GHC Team 1998-2000
*
# else /* !PAR && !GRAN */
/* ToDo: want to start with a larger stack size */
- status = rts_mainLazyIO((HaskellObj)mainIO_closure, NULL);
+ rts_lock();
+ status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL);
+ rts_unlock();
# endif /* !PAR && !GRAN */
SRC_HC_OPTS += -I$$PVM_ROOT/include
endif
-# Currently, you only get 'threads support' in the normal
-# way.
+# You get 'threads support' in the normal
+# and profiling ways.
ifeq "$(GhcRtsThreaded)" "YES"
ifeq "$(way)" ""
SRC_CC_OPTS += -DTHREADED_RTS
+SRC_HC_OPTS += -optc-DTHREADED_RTS
+PACKAGE_CPP_OPTS += -DTHREADED_RTS
+endif
+ifeq "$(way)" "p"
+SRC_CC_OPTS += -DTHREADED_RTS
+SRC_HC_OPTS += -optc-DTHREADED_RTS
PACKAGE_CPP_OPTS += -DTHREADED_RTS
endif
endif
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.112 2003/09/12 16:32:13 sof Exp $
+ * $Id: PrimOps.hc,v 1.113 2003/09/21 22:20:55 wolfgang Exp $
*
* (c) The GHC Team, 1998-2002
*
FE_
}
+FN_(isCurrentThreadBoundzh_fast)
+{
+ /* no args */
+ I_ r;
+ FB_
+ r = (I_)(RET_STGCALL1(StgBool, isThreadBound, CurrentTSO));
+ RET_N(r);
+ FE_
+}
/* -----------------------------------------------------------------------------
* MVar primitives
FE_
}
#endif
+
/* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.45 2003/08/28 16:33:42 simonmar Exp $
+ * $Id: RtsAPI.c,v 1.46 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-2001
*
#include <stdlib.h>
+static Capability *rtsApiCapability = NULL;
+
/* ----------------------------------------------------------------------------
Building Haskell objects from C datatypes.
------------------------------------------------------------------------- */
StgTSO *tso;
tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
- return scheduleWaitThread(tso,ret);
+ return scheduleWaitThread(tso,ret,rtsApiCapability);
}
SchedulerStatus
StgTSO *tso;
tso = createGenThread(stack_size, p);
- return scheduleWaitThread(tso,ret);
+ return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/*
StgTSO* tso;
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
- return scheduleWaitThread(tso,ret);
-}
-
-/*
- * Identical to rts_evalLazyIO(), but won't create a new task/OS thread
- * to evaluate the Haskell thread. Used by main() only. Hack.
- */
-
-SchedulerStatus
-rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret)
-{
- StgTSO* tso;
-
- tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
- scheduleThread(tso);
- return waitThread(tso, ret);
+ return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/*
p = (StgClosure *)deRefStablePtr(s);
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
- stat = scheduleWaitThread(tso,&r);
+ stat = scheduleWaitThread(tso,&r,rtsApiCapability);
- if (stat == Success) {
+ if (stat == Success && ret != NULL) {
ASSERT(r != NULL);
*ret = getStablePtr((StgPtr)r);
}
* Like rts_evalIO(), but doesn't force the action's result.
*/
SchedulerStatus
-rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
+rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret)
+{
+ StgTSO *tso;
+
+ tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
+ return scheduleWaitThread(tso,ret,rtsApiCapability);
+}
+
+SchedulerStatus
+rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
{
StgTSO *tso;
tso = createIOThread(stack_size, p);
- return scheduleWaitThread(tso,ret);
+ return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/* Convenience function for decoding the returned status. */
rts_lock()
{
#ifdef RTS_SUPPORTS_THREADS
- Capability *cap;
ACQUIRE_LOCK(&sched_mutex);
// we request to get the capability immediately, in order to
// a) stop other threads from using allocate()
// b) wake the current worker thread from awaitEvent()
// (so that a thread started by rts_eval* will start immediately)
- grabReturnCapability(&sched_mutex,&cap);
-
- // now that we have the capability, we don't need it anymore
- // (other threads will continue to run as soon as we release the sched_mutex)
- releaseCapability(cap);
+ grabReturnCapability(&sched_mutex,&rtsApiCapability);
// In the RTS hasn't been entered yet,
// start a RTS task.
rts_unlock()
{
#ifdef RTS_SUPPORTS_THREADS
+ rtsApiCapability = NULL;
RELEASE_LOCK(&sched_mutex);
#endif
}
/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $
+ * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
#include <stdlib.h>
#include <stdarg.h>
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures
*/
StgMainThread *main_threads = NULL;
-#ifdef THREADED_RTS
-// Pointer to the thread that executes main
-// When this thread is finished, the program terminates
-// by calling shutdownHaskellAndExit.
-// It would be better to add a call to shutdownHaskellAndExit
-// to the Main.main wrapper and to remove this hack.
-StgMainThread *main_main_thread = NULL;
-#endif
-
/* Thread queues.
* Locks required: sched_mutex.
*/
void addToBlockedQueue ( StgTSO *tso );
-static void schedule ( void );
+static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
static void detectBlackHoles ( void );
static void
taskStart(void)
{
- schedule();
+ schedule(NULL,NULL);
}
#endif
------------------------------------------------------------------------ */
//@cindex schedule
static void
-schedule( void )
+schedule( StgMainThread *mainThread, Capability *initialCapability )
{
StgTSO *t;
- Capability *cap;
+ Capability *cap = initialCapability;
StgThreadReturnCode ret;
#if defined(GRAN)
rtsEvent *event;
ACQUIRE_LOCK(&sched_mutex);
#if defined(RTS_SUPPORTS_THREADS)
- waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
- IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
+ /* in the threaded case, the capability is either passed in via the initialCapability
+ parameter, or initialized inside the scheduler loop */
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
+ osThreadId(), osThreadId()));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### main thread: %p\n",mainThread));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### initial cap: %p\n",initialCapability));
#else
/* simply initialise it in the non-threaded case */
grabCapability(&cap);
#if defined(RTS_SUPPORTS_THREADS)
/* Check to see whether there are any worker threads
waiting to deposit external call results. If so,
- yield our capability */
- yieldToReturningWorker(&sched_mutex, &cap);
+ yield our capability... if we have a capability, that is. */
+ if(cap)
+ yieldToReturningWorker(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+
+ /* If we do not currently hold a capability, we wait for one */
+ if(!cap)
+ {
+ waitForWorkCapability(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+ IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
+ osThreadId()));
+ }
#endif
/* If we're interrupted (the user pressed ^C, or some other
*/
#if defined(RTS_SUPPORTS_THREADS)
{
- StgMainThread *m, **prev;
- prev = &main_threads;
- for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
- switch (m->tso->what_next) {
- case ThreadComplete:
- if (m->ret) {
- // NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(m->ret) = (StgClosure *)m->tso->sp[1];
- }
- *prev = m->link;
- m->stat = Success;
- broadcastCondition(&m->wakeup);
-#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
-#endif
- if(m == main_main_thread)
- {
- releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
- }
- break;
- case ThreadKilled:
- if (m->ret) *(m->ret) = NULL;
- *prev = m->link;
- if (was_interrupted) {
- m->stat = Interrupted;
- } else {
- m->stat = Killed;
- }
- broadcastCondition(&m->wakeup);
+ StgMainThread *m, **prev;
+ prev = &main_threads;
+ for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+ if (m->tso->what_next == ThreadComplete
+ || m->tso->what_next == ThreadKilled)
+ {
+ if(m == mainThread)
+ {
+ if(m->tso->what_next == ThreadComplete)
+ {
+ if (m->ret)
+ {
+ // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+ *(m->ret) = (StgClosure *)m->tso->sp[1];
+ }
+ m->stat = Success;
+ }
+ else
+ {
+ if (m->ret)
+ {
+ *(m->ret) = NULL;
+ }
+ if (was_interrupted)
+ {
+ m->stat = Interrupted;
+ }
+ else
+ {
+ m->stat = Killed;
+ }
+ }
+ *prev = m->link;
+
#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
+ removeThreadLabel((StgWord)m->tso);
#endif
- if(m == main_main_thread)
- {
releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
+ return;
+ }
+ else
+ {
+ // The current OS thread can not handle the fact that the Haskell
+ // thread "m" has ended.
+ // "m" is bound; the scheduler loop in it's bound OS thread has
+ // to return, so let's pass our capability directly to that thread.
+ passCapability(&sched_mutex, cap, &m->bound_thread_cond);
+ cap = NULL;
+ }
}
- break;
- default:
- break;
}
- }
}
-
+
+ if(!cap) // If we gave our capability away,
+ continue; // go to the top to get it back
+
#else /* not threaded */
# if defined(PAR)
IF_DEBUG(sanity,checkTSO(t));
#endif
+#ifdef THREADED_RTS
+ {
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == t)
+ break;
+ }
+
+ if(m)
+ {
+ if(m == mainThread)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
+ t, osThreadId()));
+ // yes, the Haskell thread is bound to the current native thread
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
+ t, osThreadId()));
+ // no, bound to a different Haskell thread: pass to that thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapability(&sched_mutex,cap,&m->bound_thread_cond);
+ cap = NULL;
+ continue;
+ }
+ }
+ else
+ {
+ // The thread we want to run is not bound.
+ if(mainThread == NULL)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
+ t, osThreadId()));
+ // if we are a worker thread,
+ // we may run it here
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
+ t, mainThread, osThreadId()));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ PUSH_ON_RUN_QUEUE(t);
+ releaseCapability(cap);
+ cap = NULL;
+ continue;
+ }
+ }
+ }
+#endif
+
cap->r.rCurrentTSO = t;
/* context switches are now initiated by the timer signal, unless
ret = ThreadFinished;
break;
case ThreadRunGHC:
+ errno = t->saved_errno;
ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ t->saved_errno = errno;
break;
case ThreadInterpret:
ret = interpretBCO(cap);
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
}
/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso)
+{
+#ifdef THREADED_RTS
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == tso)
+ return rtsTrue;
+ }
+#endif
+ return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
+static void
+deleteThreadImmediately(StgTSO *tso);
+
StgInt
forkProcess(StgTSO* tso)
{
#ifndef mingw32_TARGET_OS
pid_t pid;
StgTSO* t,*next;
- StgMainThread *m;
- rtsBool doKill;
IF_DEBUG(scheduler,sched_belch("forking!"));
+ ACQUIRE_LOCK(&sched_mutex);
pid = fork();
if (pid) { /* parent */
/* just return the pid */
} else { /* child */
+#ifdef THREADED_RTS
+ /* wipe all other threads */
+ run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+ tso->link = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ /* Don't kill the current thread.. */
+ if (t->id == tso->id) {
+ continue;
+ }
+
+ if (isThreadBound(t)) {
+ // If the thread is bound, the OS thread that the thread is bound to
+ // no longer exists after the fork() system call.
+ // The bound Haskell thread is therefore unable to run at all;
+ // we must not give it a chance to survive by catching the
+ // ThreadKilled exception. So we kill it "brutally" rather than
+ // using deleteThread.
+ deleteThreadImmediately(t);
+ } else {
+ deleteThread(t);
+ }
+ }
+
+ if (isThreadBound(tso)) {
+ } else {
+ // If the current is not bound, then we should make it so.
+ // The OS thread left over by fork() is special in that the process
+ // will terminate as soon as the thread terminates;
+ // we'd expect forkProcess to behave similarily.
+ // FIXME - we don't do this.
+ }
+#else
+ StgMainThread *m;
+ rtsBool doKill;
/* wipe all other threads */
run_queue_hd = run_queue_tl = END_TSO_QUEUE;
tso->link = END_TSO_QUEUE;
deleteThread(t);
}
}
+#endif
}
+ RELEASE_LOCK(&sched_mutex);
return pid;
#else /* mingw32 */
barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
{
nat tok;
Capability *cap;
+ int saved_errno = errno;
/* assume that *reg is a pointer to the StgRegTable part
* of a Capability.
/* Other threads _might_ be available for execution; signal this */
THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
+
+ errno = saved_errno;
return tok;
}
{
StgTSO *tso, **prev;
Capability *cap;
+ int saved_errno = errno;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
#endif
+ errno = saved_errno;
return &cap->r;
}
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->saved_errno = 0;
+
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- TSO_STRUCT_SIZEW;
}
#endif
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+ Capability *initialCapability
);
}
SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
{ // Precondition: sched_mutex must be held
StgMainThread *m;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#endif
#endif
/* Put the thread on the main-threads list prior to scheduling the TSO.
main_threads = m;
scheduleThread_(tso);
-#if defined(THREADED_RTS)
- return waitThread_(m, rtsTrue);
-#else
- return waitThread_(m);
-#endif
+
+ return waitThread_(m, initialCapability);
}
/* ---------------------------------------------------------------------------
{
do {
while (run_queue_hd != END_TSO_QUEUE) {
- waitThread ( run_queue_hd, NULL);
+ waitThread ( run_queue_hd, NULL, NULL );
}
while (blocked_queue_hd != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
while (sleeping_queue != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
} while
(blocked_queue_hd != END_TSO_QUEUE ||
}
SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
{
StgMainThread *m;
SchedulerStatus stat;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#endif
#endif
/* see scheduleWaitThread() comment */
main_threads = m;
IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
- stat = waitThread_(m, rtsFalse);
-#else
- stat = waitThread_(m);
-#endif
+
+ stat = waitThread_(m,initialCapability);
+
RELEASE_LOCK(&sched_mutex);
return stat;
}
static
SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
- )
+waitThread_(StgMainThread* m, Capability *initialCapability)
{
SchedulerStatus stat;
// Precondition: sched_mutex must be held.
IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
-#if defined(RTS_SUPPORTS_THREADS)
-
-# if defined(THREADED_RTS)
- if (!blockWaiting) {
- /* In the threaded case, the OS thread that called main()
- * gets to enter the RTS directly without going via another
- * task/thread.
- */
- main_main_thread = m;
- RELEASE_LOCK(&sched_mutex);
- schedule();
- ACQUIRE_LOCK(&sched_mutex);
- main_main_thread = NULL;
- ASSERT(m->stat != NoStatus);
- } else
-# endif
- {
+#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
+ { // FIXME: does this still make sense?
+ // It's not for the threaded rts => SMP only
do {
waitCondition(&m->wakeup, &sched_mutex);
} while (m->stat == NoStatus);
CurrentProc = MainProc; // PE to run it on
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
#else
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
+ ACQUIRE_LOCK(&sched_mutex);
ASSERT(m->stat != NoStatus);
#endif
#if defined(RTS_SUPPORTS_THREADS)
closeCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ closeCondition(&m->bound_thread_cond);
+#endif
#endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
raiseAsync(tso,NULL);
}
+static void
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+ unblockThread(tso);
+ tso->what_next = ThreadKilled;
+}
+
void
raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
{
/* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.38 2003/03/17 14:47:48 simonmar Exp $
+ * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team 1998-1999
*
extern nat rts_n_waiting_tasks;
#endif
+StgBool rtsSupportsBoundThreads(void);
+StgBool isThreadBound(StgTSO *tso);
StgInt forkProcess(StgTSO *tso);
extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
*
* These are the threads which clients have requested that we run.
*
- * In a 'threaded' build, we might have several concurrent clients all
- * waiting for results, and each one will wait on a condition variable
- * until the result is available.
+ * In a 'threaded' build, each of these corresponds to one bound thread.
+ * The pointer to the StgMainThread is passed as a parameter to schedule;
+ * this invocation of schedule will always pass this main thread's
+ * bound_thread_cond to waitForkWorkCapability; OS-thread-switching
+ * takes place using passCapability.
*
- * In non-SMP, clients are strictly nested: the first client calls
+ * In non-threaded builds, clients are strictly nested: the first client calls
* into the RTS, which might call out again to C with a _ccall_GC, and
* eventually re-enter the RTS.
*
#if defined(RTS_SUPPORTS_THREADS)
Condition wakeup;
#if defined(THREADED_RTS)
- rtsBool thread_bound;
Condition bound_thread_cond;
#endif
#endif
/* -----------------------------------------------------------------------------
- * $Id: Weak.c,v 1.29 2003/03/26 17:43:05 sof Exp $
+ * $Id: Weak.c,v 1.30 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-1999
*
{
StgWeak *w;
-#if defined(RTS_SUPPORTS_THREADS)
rts_lock();
-#endif
while ((w = weak_ptr_list)) {
weak_ptr_list = w->link;
if (w->header.info != &stg_DEAD_WEAK_info) {
w->header.info = &stg_DEAD_WEAK_info;
IF_DEBUG(weak,fprintf(stderr,"Finalising weak pointer at %p -> %p\n", w, w->key));
if (w->finalizer != &stg_NO_FINALIZER_closure) {
-#if defined(RTS_SUPPORTS_THREADS)
- rts_evalIO(w->finalizer,NULL);
+ rts_evalLazyIO(w->finalizer,NULL);
rts_unlock();
rts_lock();
-#else
- rts_mainLazyIO(w->finalizer,NULL);
-#endif
}
}
}
-#if defined(RTS_SUPPORTS_THREADS)
rts_unlock();
-#endif
}
/*