+ }
+ }
+ if (tso == END_TSO_QUEUE) {
+ barf("resumeThread: thread not found");
+ }
+
+#ifdef SMP
+ while (free_capabilities == NULL) {
+ IF_DEBUG(scheduler, sched_belch("waiting to resume"));
+ pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
+ }
+ cap = free_capabilities;
+ free_capabilities = cap->link;
+ n_free_capabilities--;
+#else
+ cap = &MainRegTable;
+#endif
+
+ cap->rCurrentTSO = tso;
+
+ RELEASE_LOCK(&sched_mutex);
+ return cap;
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Static functions
+ * ------------------------------------------------------------------------ */
+static void unblockThread(StgTSO *tso);
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
+{
+ StgThreadID id1 = tso1->id;
+ StgThreadID id2 = tso2->id;
+
+ if (id1 < id2) return (-1);
+ if (id1 > id2) return 1;
+ return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ Create a new thread.
+
+ The new thread starts with the given stack size. Before the
+ scheduler can run, however, this thread needs to have a closure
+ (and possibly some arguments) pushed on its stack. See
+ pushClosure() in Schedule.h.
+
+ createGenThread() and createIOThread() (in SchedAPI.h) are
+ convenient packaged versions of this function.
+ ------------------------------------------------------------------------ */
+//@cindex createThread
+#if defined(GRAN)
+/* currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat stack_size, StgInt pri)
+{
+ return createThread_(stack_size, rtsFalse, pri);
+}
+
+static StgTSO *
+createThread_(nat size, rtsBool have_lock, StgInt pri)
+{
+#else
+StgTSO *
+createThread(nat stack_size)
+{
+ return createThread_(stack_size, rtsFalse);
+}
+
+static StgTSO *
+createThread_(nat size, rtsBool have_lock)
+{
+#endif
+ StgTSO *tso;
+ nat stack_size;
+
+ /* First check whether we should create a thread at all */
+#if defined(PAR)
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
+#endif
+
+#if defined(GRAN)
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
+
+ tso = (StgTSO *)allocate(size);
+ TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
+
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
+ SET_HDR(tso, &TSO_info, CCS_MAIN);
+#if defined(GRAN)
+ SET_GRAN_HDR(tso, ThisPE);
+#endif
+ tso->whatNext = ThreadEnterGHC;
+
+ /* tso->id needs to be unique. For now we use a heavyweight mutex to
+ protect the increment operation on next_thread_id.
+ In future, we could use an atomic increment instead.
+ */
+
+ if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
+ tso->id = next_thread_id++;
+ if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
+
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = NULL;
+
+ tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+
+#ifdef PROFILING
+ tso->prof.CCCS = CCS_MAIN;
+#endif
+
+ /* put a stop frame on the stack */
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
+ tso->su = (StgUpdateFrame*)tso->sp;
+
+ IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words",
+ tso->id, tso, tso->stack_size));
+
+ // ToDo: check this
+#if defined(GRAN)
+ tso->link = END_TSO_QUEUE;
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
+#else
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ from its creation
+ */
+#endif
+
+#if defined(GRAN)
+ tso->gran.pri = pri;
+ tso->gran.magic = TSO_MAGIC; // debugging only
+ tso->gran.sparkname = 0;
+ tso->gran.startedat = CURRENT_TIME;
+ tso->gran.exported = 0;
+ tso->gran.basicblocks = 0;
+ tso->gran.allocs = 0;
+ tso->gran.exectime = 0;
+ tso->gran.fetchtime = 0;
+ tso->gran.fetchcount = 0;
+ tso->gran.blocktime = 0;
+ tso->gran.blockcount = 0;
+ tso->gran.blockedat = 0;
+ tso->gran.globalsparks = 0;
+ tso->gran.localsparks = 0;
+ if (RtsFlags.GranFlags.Light)
+ tso->gran.clock = Now; /* local clock */
+ else
+ tso->gran.clock = 0;
+
+ IF_DEBUG(gran,printTSO(tso));
+#elif defined(PAR)
+ tso->par.sparkname = 0;
+ tso->par.startedat = CURRENT_TIME;
+ tso->par.exported = 0;
+ tso->par.basicblocks = 0;
+ tso->par.allocs = 0;
+ tso->par.exectime = 0;
+ tso->par.fetchtime = 0;
+ tso->par.fetchcount = 0;
+ tso->par.blocktime = 0;
+ tso->par.blockcount = 0;
+ tso->par.blockedat = 0;
+ tso->par.globalsparks = 0;
+ tso->par.localsparks = 0;
+#endif
+
+#if defined(GRAN)
+ globalGranStats.tot_threads_created++;
+ globalGranStats.threads_created_on_PE[CurrentProc]++;
+ globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+ globalGranStats.tot_sq_probes++;
+#endif
+
+ IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
+ tso->id, tso->stack_size));
+ return tso;
+}
+
+/* ---------------------------------------------------------------------------
+ * scheduleThread()
+ *
+ * scheduleThread puts a thread on the head of the runnable queue.
+ * This will usually be done immediately after a thread is created.
+ * The caller of scheduleThread must create the thread using e.g.
+ * createThread and push an appropriate closure
+ * on this thread's stack before the scheduler is invoked.
+ * ------------------------------------------------------------------------ */
+
+void
+scheduleThread(StgTSO *tso)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+
+ /* Put the new thread on the head of the runnable queue. The caller
+ * better push an appropriate closure on this thread's stack
+ * beforehand. In the SMP case, the thread may start running as
+ * soon as we release the scheduler lock below.
+ */
+ PUSH_ON_RUN_QUEUE(tso);
+ THREAD_RUNNABLE();
+
+ IF_DEBUG(scheduler,printTSO(tso));
+ RELEASE_LOCK(&sched_mutex);
+}
+
+/* ---------------------------------------------------------------------------
+ * startTasks()
+ *
+ * Start up Posix threads to run each of the scheduler tasks.
+ * I believe the task ids are not needed in the system as defined.
+ * KH @ 25/10/99
+ * ------------------------------------------------------------------------ */
+
+#ifdef SMP
+static void *
+taskStart( void *arg STG_UNUSED )
+{
+ schedule();
+ return NULL;
+}
+#endif
+
+/* ---------------------------------------------------------------------------
+ * initScheduler()
+ *
+ * Initialise the scheduler. This resets all the queues - if the
+ * queues contained any threads, they'll be garbage collected at the
+ * next pass.
+ *
+ * This now calls startTasks(), so should only be called once! KH @ 25/10/99
+ * ------------------------------------------------------------------------ */
+
+#ifdef SMP
+static void
+term_handler(int sig STG_UNUSED)
+{
+ stat_workerStop();
+ ACQUIRE_LOCK(&term_mutex);
+ await_death--;
+ RELEASE_LOCK(&term_mutex);
+ pthread_exit(NULL);
+}
+#endif
+
+//@cindex initScheduler
+void
+initScheduler(void)
+{
+#if defined(GRAN)
+ nat i;
+
+ for (i=0; i<=MAX_PROC; i++) {
+ run_queue_hds[i] = END_TSO_QUEUE;
+ run_queue_tls[i] = END_TSO_QUEUE;
+ blocked_queue_hds[i] = END_TSO_QUEUE;
+ blocked_queue_tls[i] = END_TSO_QUEUE;
+ ccalling_threadss[i] = END_TSO_QUEUE;
+ }
+#else
+ run_queue_hd = END_TSO_QUEUE;
+ run_queue_tl = END_TSO_QUEUE;
+ blocked_queue_hd = END_TSO_QUEUE;
+ blocked_queue_tl = END_TSO_QUEUE;
+#endif
+
+ suspended_ccalling_threads = END_TSO_QUEUE;
+
+ main_threads = NULL;
+
+ context_switch = 0;
+ interrupted = 0;
+
+ enteredCAFs = END_CAF_LIST;
+
+ /* Install the SIGHUP handler */
+#ifdef SMP
+ {
+ struct sigaction action,oact;
+
+ action.sa_handler = term_handler;
+ sigemptyset(&action.sa_mask);
+ action.sa_flags = 0;
+ if (sigaction(SIGTERM, &action, &oact) != 0) {
+ barf("can't install TERM handler");
+ }
+ }
+#endif
+
+#ifdef SMP
+ /* Allocate N Capabilities */
+ {
+ nat i;
+ Capability *cap, *prev;
+ cap = NULL;
+ prev = NULL;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+ cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
+ cap->link = prev;
+ prev = cap;
+ }
+ free_capabilities = cap;
+ n_free_capabilities = RtsFlags.ParFlags.nNodes;
+ }
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
+ n_free_capabilities););
+#endif
+
+#if defined(SMP) || defined(PAR)
+ initSparkPools();
+#endif
+}
+
+#ifdef SMP
+void
+startTasks( void )
+{
+ nat i;
+ int r;
+ pthread_t tid;
+
+ /* make some space for saving all the thread ids */
+ task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
+ "initScheduler:task_ids");
+
+ /* and create all the threads */
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+ r = pthread_create(&tid,NULL,taskStart,NULL);
+ if (r != 0) {
+ barf("startTasks: Can't create new Posix thread");
+ }
+ task_ids[i].id = tid;
+ task_ids[i].mut_time = 0.0;
+ task_ids[i].mut_etime = 0.0;
+ task_ids[i].gc_time = 0.0;
+ task_ids[i].gc_etime = 0.0;
+ task_ids[i].elapsedtimestart = elapsedtime();
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
+ }
+}
+#endif
+
+void
+exitScheduler( void )
+{
+#ifdef SMP
+ nat i;
+
+ /* Don't want to use pthread_cancel, since we'd have to install
+ * these silly exception handlers (pthread_cleanup_{push,pop}) around
+ * all our locks.
+ */
+#if 0
+ /* Cancel all our tasks */
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+ pthread_cancel(task_ids[i].id);
+ }
+
+ /* Wait for all the tasks to terminate */
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
+ task_ids[i].id));
+ pthread_join(task_ids[i].id, NULL);
+ }
+#endif
+
+ /* Send 'em all a SIGHUP. That should shut 'em up.
+ */
+ await_death = RtsFlags.ParFlags.nNodes;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+ pthread_kill(task_ids[i].id,SIGTERM);
+ }
+ while (await_death > 0) {
+ sched_yield();
+ }
+#endif
+}
+
+/* -----------------------------------------------------------------------------
+ Managing the per-task allocation areas.
+
+ Each capability comes with an allocation area. These are
+ fixed-length block lists into which allocation can be done.
+
+ ToDo: no support for two-space collection at the moment???
+ -------------------------------------------------------------------------- */
+
+/* -----------------------------------------------------------------------------
+ * waitThread is the external interface for running a new computataion
+ * and waiting for the result.
+ *
+ * In the non-SMP case, we create a new main thread, push it on the
+ * main-thread stack, and invoke the scheduler to run it. The
+ * scheduler will return when the top main thread on the stack has
+ * completed or died, and fill in the necessary fields of the
+ * main_thread structure.
+ *
+ * In the SMP case, we create a main thread as before, but we then
+ * create a new condition variable and sleep on it. When our new
+ * main thread has completed, we'll be woken up and the status/result
+ * will be in the main_thread struct.
+ * -------------------------------------------------------------------------- */
+
+SchedulerStatus
+waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+{
+ StgMainThread *m;
+ SchedulerStatus stat;
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+ m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
+
+ m->tso = tso;
+ m->ret = ret;
+ m->stat = NoStatus;
+#ifdef SMP
+ pthread_cond_init(&m->wakeup, NULL);
+#endif
+
+ m->link = main_threads;
+ main_threads = m;
+
+ IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
+ m->tso->id));
+
+#ifdef SMP
+ do {
+ pthread_cond_wait(&m->wakeup, &sched_mutex);
+ } while (m->stat == NoStatus);
+#else
+ schedule();
+ ASSERT(m->stat != NoStatus);
+#endif
+
+ stat = m->stat;
+
+#ifdef SMP
+ pthread_cond_destroy(&m->wakeup);
+#endif
+
+ IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
+ m->tso->id));
+ free(m);
+
+ RELEASE_LOCK(&sched_mutex);
+
+ return stat;
+}
+
+//@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
+//@subsection Run queue code
+
+#if 0
+/*
+ NB: In GranSim we have many run queues; run_queue_hd is actually a macro
+ unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
+ implicit global variable that has to be correct when calling these
+ fcts -- HWL
+*/
+
+/* Put the new thread on the head of the runnable queue.
+ * The caller of createThread better push an appropriate closure
+ * on this thread's stack before the scheduler is invoked.
+ */
+static /* inline */ void
+add_to_run_queue(tso)
+StgTSO* tso;
+{
+ ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
+ tso->link = run_queue_hd;
+ run_queue_hd = tso;
+ if (run_queue_tl == END_TSO_QUEUE) {
+ run_queue_tl = tso;
+ }
+}