fix a warning
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 5760010..d49d4ed 100644 (file)
@@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
+    // Any threads that were woken up by other Capabilities get
+    // appended to our run queue.
+    if (!emptyWakeupQueue(cap)) {
+       ACQUIRE_LOCK(&cap->lock);
+       if (emptyRunQueue(cap)) {
+           cap->run_queue_hd = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       } else {
+           cap->run_queue_tl->link = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       }
+       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+       RELEASE_LOCK(&cap->lock);
+    }
+
     scheduleCheckBlockedThreads(cap);
 
     scheduleDetectDeadlock(cap,task);
@@ -604,6 +619,7 @@ run_thread:
     // Run the current thread 
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     prev_what_next = t->what_next;
 
@@ -674,6 +690,7 @@ run_thread:
 #endif
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     // ----------------------------------------------------------------------
     
@@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
+    // migration can be turned off with +RTS -qg
+    if (!RtsFlags.ParFlags.migrate) return;
+
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
@@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
                    appendToRunQueue(free_caps[i],t);
                    if (t->bound) { t->bound->cap = free_caps[i]; }
+                   t->cap = free_caps[i];
                    i++;
                }
            }
@@ -2105,7 +2126,7 @@ isThreadBound(StgTSO* tso USED_IF_THREADS)
 
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
 static void 
-deleteThreadImmediately(Capability *cap, StgTSO *tso);
+deleteThread_(Capability *cap, StgTSO *tso);
 #endif
 StgInt
 forkProcess(HsStablePtr *entry
@@ -2142,28 +2163,51 @@ forkProcess(HsStablePtr *entry
        
     } else { // child
        
-       // delete all threads
-       cap->run_queue_hd = END_TSO_QUEUE;
-       cap->run_queue_tl = END_TSO_QUEUE;
-       
+       // Now, all OS threads except the thread that forked are
+       // stopped.  We need to stop all Haskell threads, including
+       // those involved in foreign calls.  Also we need to delete
+       // all Tasks, because they correspond to OS threads that are
+       // now gone.
+
        for (t = all_threads; t != END_TSO_QUEUE; t = next) {
-           next = t->link;
-           
-           // don't allow threads to catch the ThreadKilled exception
-           deleteThreadImmediately(cap,t);
+           if (t->what_next == ThreadRelocated) {
+               next = t->link;
+           } else {
+               next = t->global_link;
+               // don't allow threads to catch the ThreadKilled
+               // exception, but we do want to raiseAsync() because these
+               // threads may be evaluating thunks that we need later.
+               deleteThread_(cap,t);
+           }
        }
        
-       // wipe the task list
+       // Empty the run queue.  It seems tempting to let all the
+       // killed threads stay on the run queue as zombies to be
+       // cleaned up later, but some of them correspond to bound
+       // threads for which the corresponding Task does not exist.
+       cap->run_queue_hd = END_TSO_QUEUE;
+       cap->run_queue_tl = END_TSO_QUEUE;
+
+       // Any suspended C-calling Tasks are no more, their OS threads
+       // don't exist now:
+       cap->suspended_ccalling_tasks = NULL;
+
+       // Empty the all_threads list.  Otherwise, the garbage
+       // collector may attempt to resurrect some of these threads.
+       all_threads = END_TSO_QUEUE;
+
+       // Wipe the task list, except the current Task.
        ACQUIRE_LOCK(&sched_mutex);
        for (task = all_tasks; task != NULL; task=task->all_link) {
-           if (task != cap->running_task) discardTask(task);
+           if (task != cap->running_task) {
+               discardTask(task);
+           }
        }
        RELEASE_LOCK(&sched_mutex);
 
-       cap->suspended_ccalling_tasks = NULL;
-
 #if defined(THREADED_RTS)
-       // wipe our spare workers list.
+       // Wipe our spare workers list, they no longer exist.  New
+       // workers will be created if necessary.
        cap->spare_workers = NULL;
        cap->returning_tasks_hd = NULL;
        cap->returning_tasks_tl = NULL;
@@ -2468,6 +2512,7 @@ createThread(Capability *cap, nat size)
     
     tso->saved_errno = 0;
     tso->bound = NULL;
+    tso->cap = cap;
     
     tso->stack_size     = stack_size;
     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
@@ -2675,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     // This TSO is now a bound thread; make the Task and TSO
     // point to each other.
     tso->bound = task;
+    tso->cap = cap;
 
     task->tso = tso;
     task->ret = ret;
@@ -2882,15 +2928,21 @@ GetRoots( evac_fn evac )
 
     for (i = 0; i < n_capabilities; i++) {
        cap = &capabilities[i];
-       evac((StgClosure **)&cap->run_queue_hd);
-       evac((StgClosure **)&cap->run_queue_tl);
-       
+       evac((StgClosure **)(void *)&cap->run_queue_hd);
+       evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+       evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+       evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
        for (task = cap->suspended_ccalling_tasks; task != NULL; 
             task=task->next) {
-           evac((StgClosure **)&task->suspended_tso);
+           IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
+           evac((StgClosure **)(void *)&task->suspended_tso);
        }
+
     }
     
+
 #if !defined(THREADED_RTS)
     evac((StgClosure **)(void *)&blocked_queue_hd);
     evac((StgClosure **)(void *)&blocked_queue_tl);
@@ -3187,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso)
 
   ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+
   tso->why_blocked = NotBlocked;
   next = tso->link;
   tso->link = END_TSO_QUEUE;
 
-  // We might have just migrated this TSO to our Capability:
-  if (tso->bound) {
-      tso->bound->cap = cap;
+  if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+      // We are waking up this thread on the current Capability, which
+      // might involve migrating it from the Capability it was last on.
+      if (tso->bound) {
+         ASSERT(tso->bound->cap == tso->cap);
+         tso->bound->cap = cap;
+      }
+      tso->cap = cap;
+      appendToRunQueue(cap,tso);
+      // we're holding a newly woken thread, make sure we context switch
+      // quickly so we can migrate it if necessary.
+      context_switch = 1;
+  } else {
+      // we'll try to wake it up on the Capability it was last on.
+      wakeupThreadOnCapability(tso->cap, tso);
   }
 
-  appendToRunQueue(cap,tso);
-
-  // we're holding a newly woken thread, make sure we context switch
-  // quickly so we can migrate it if necessary.
-  context_switch = 1;
-  IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+  IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
   return next;
 }
 
@@ -3651,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso)
   if (tso->bound) {
       tso->bound->cap = cap;
   }
+  tso->cap = cap;
 }
 #endif
 
@@ -3979,20 +4040,17 @@ deleteThread (Capability *cap, StgTSO *tso)
 
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
 static void 
-deleteThreadImmediately(Capability *cap, StgTSO *tso)
+deleteThread_(Capability *cap, StgTSO *tso)
 { // for forkProcess only:
-  // delete thread without giving it a chance to catch the KillThread exception
-
-  if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
-      return;
-  }
-
-  if (tso->why_blocked != BlockedOnCCall &&
-      tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
-      unblockThread(cap,tso);
-  }
+  // like deleteThread(), but we delete threads in foreign calls, too.
 
-  tso->what_next = ThreadKilled;
+    if (tso->why_blocked == BlockedOnCCall ||
+       tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+       unblockOne(cap,tso);
+       tso->what_next = ThreadKilled;
+    } else {
+       deleteThread(cap,tso);
+    }
 }
 #endif
 
@@ -4150,13 +4208,8 @@ resurrectThreads (StgTSO *threads)
        all_threads = tso;
        IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
        
-       // Wake up the thread on the Capability it was last on for a
-       // bound thread, or last_free_capability otherwise.
-       if (tso->bound) {
-           cap = tso->bound->cap;
-       } else {
-           cap = last_free_capability;
-       }
+       // Wake up the thread on the Capability it was last on
+       cap = tso->cap;
        
        switch (tso->why_blocked) {
        case BlockedOnMVar: