[project @ 2004-03-01 14:18:35 by simonmar]
authorsimonmar <unknown>
Mon, 1 Mar 2004 14:18:36 +0000 (14:18 +0000)
committersimonmar <unknown>
Mon, 1 Mar 2004 14:18:36 +0000 (14:18 +0000)
Threaded RTS improvements:

  - Make the main_threads list doubly linked.  Have threads
    remove themselves from this list when they complete, rather
    than searching for completed main threads each time around
    the scheduler loop.  This removes an O(n) loop from the
    scheduler, but adds some new constraints (basically completed
    threads must remain on the run queue until dealt with, including
    threads which have been killed by an async exception).

  - Add a pointer from the TSO to the StgMainThread struct, for
    main threads.  This avoids a number of places where we had
    to traverse the list of main threads to find the right one,
    including one place in the scheduler loop.  Adding a field to
    a TSO is cheap.

  - taskStart: we should be resetting the startingWorkerThread flag
    in here.  Not sure why we aren't; maybe this got lost at some point.

  - Use the BlockedOnCCall flags in the non-threaded RTS too.  Q: what
    should happen if a thread does a foreign call which re-enters the
    RTS, and then sends an async exception to the original thread?
    Answer: it should deadlock, which it does in the threaded RTS, and
    this commit makes it do so in the non-threaded RTS too (see
    testsuite/tests/concurrent/should_run/conc040.hs).

ghc/includes/TSO.h
ghc/rts/Exception.h
ghc/rts/Schedule.c
ghc/rts/Schedule.h

index f72d3bb..22f3e53 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.33 2003/11/12 17:27:05 sof Exp $
+ * $Id: TSO.h,v 1.34 2004/03/01 14:18:35 simonmar Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -161,11 +161,9 @@ typedef enum {
   , BlockedOnGA  // blocked on a remote closure represented by a Global Address
   , BlockedOnGA_NoSend // same as above but without sending a Fetch message
 #endif
-#if defined(RTS_SUPPORTS_THREADS)
   , BlockedOnCCall
-  , BlockedOnCCall_NoUnblockExc // same as above but don't unblock async exceptions
-                               // in resumeThread()
-#endif
+  , BlockedOnCCall_NoUnblockExc // same as above but don't unblock
+                               // async exceptions in resumeThread()
 } StgTSOBlockReason;
 
 #if defined(mingw32_TARGET_OS)
@@ -204,12 +202,13 @@ typedef struct StgTSO_ {
   StgMutClosure *    mut_link;      /* TSO's are mutable of course! */
   struct StgTSO_*    global_link;    /* Links all threads together */
   
-  StgTSOWhatNext     what_next   : 16;
-  StgTSOBlockReason  why_blocked : 16;
-  StgTSOBlockInfo    block_info;
-  struct StgTSO_*    blocked_exceptions;
-  StgThreadID        id;
-  int                saved_errno;
+  StgTSOWhatNext         what_next   : 16;
+  StgTSOBlockReason      why_blocked : 16;
+  StgTSOBlockInfo        block_info;
+  struct StgTSO_*        blocked_exceptions;
+  StgThreadID            id;
+  int                    saved_errno;
+  struct StgMainThread_* main;
   
   MAYBE_EMPTY_STRUCT(StgTSOTickyInfo,ticky)
   MAYBE_EMPTY_STRUCT(StgTSOProfInfo,prof)
index 291ad20..ea22223 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Exception.h,v 1.7 2003/11/12 17:49:07 sof Exp $
+ * $Id: Exception.h,v 1.8 2004/03/01 14:18:35 simonmar Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -27,6 +27,8 @@ interruptible(StgTSO *t)
 #endif
   case BlockedOnDelay:
     return 1;
+  // NB. Threaded blocked on foreign calls (BlockedOnCCall) are
+  // *not* interruptible.  We can't send these threads an exception.
   default:
     return 0;
   }
index 0e14c6e..44a156b 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.192 2004/02/27 16:16:31 simonmar Exp $
+ * $Id: Schedule.c,v 1.193 2004/03/01 14:18:35 simonmar Exp $
  *
  * (c) The GHC Team, 1998-2003
  *
@@ -259,6 +259,7 @@ static void
 taskStart(void)
 {
   ACQUIRE_LOCK(&sched_mutex);
+  startingWorkerThread = rtsFalse;
   schedule(NULL,NULL);
   RELEASE_LOCK(&sched_mutex);
 }
@@ -431,92 +432,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 #endif
     }
 
-    //
-    // Go through the list of main threads and wake up any
-    // clients whose computations have finished.  ToDo: this
-    // should be done more efficiently without a linear scan
-    // of the main threads list, somehow...
-    //
-#if defined(RTS_SUPPORTS_THREADS)
-    { 
-       StgMainThread *m, **prev;
-       prev = &main_threads;
-       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
-           if (m->tso->what_next == ThreadComplete
-               || m->tso->what_next == ThreadKilled) {
-               if (m == mainThread) {
-                   if (m->tso->what_next == ThreadComplete) {
-                       if (m->ret) {
-                           // NOTE: return val is tso->sp[1]
-                           // (see StgStartup.hc)
-                           *(m->ret) = (StgClosure *)m->tso->sp[1]; 
-                       }
-                       m->stat = Success;
-                   } else {
-                       if (m->ret) {
-                           *(m->ret) = NULL;
-                       }
-                       if (was_interrupted) {
-                           m->stat = Interrupted;
-                       } else {
-                           m->stat = Killed;
-                       }
-                   }
-                   *prev = m->link;
-#ifdef DEBUG
-                   removeThreadLabel((StgWord)m->tso->id);
-#endif
-                   releaseCapability(cap);
-                   return;
-               } else {
-                   // The current OS thread can not handle the fact that
-                   // the Haskell thread "m" has ended.  "m" is bound;
-                   // the scheduler loop in its bound OS thread has to
-                   // return, so let's pass the capability directly to
-                   // that thread.
-                   passCapability(&m->bound_thread_cond);
-                   continue;
-               }
-           }
-       }
-    }
-    
-#else /* not threaded */
-
-# if defined(PAR)
-    /* in GUM do this only on the Main PE */
-    if (IAmMainThread)
-# endif
-    /* If our main thread has finished or been killed, return.
-     */
-    {
-      StgMainThread *m = main_threads;
-      if (m->tso->what_next == ThreadComplete
-         || m->tso->what_next == ThreadKilled) {
-#ifdef DEBUG
-       removeThreadLabel((StgWord)m->tso->id);
-#endif
-       main_threads = main_threads->link;
-       if (m->tso->what_next == ThreadComplete) {
-           // We finished successfully, fill in the return value
-           // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
-           m->stat = Success;
-           return;
-       } else {
-         if (m->ret) { *(m->ret) = NULL; };
-         if (was_interrupted) {
-           m->stat = Interrupted;
-         } else {
-           m->stat = Killed;
-         }
-         return;
-       }
-      }
-    }
-#endif
-
-
 #if defined(RTS_USER_SIGNALS)
     // check for signals each time around the scheduler
     if (signals_pending()) {
@@ -605,11 +520,9 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
            m = main_threads;
            switch (m->tso->why_blocked) {
            case BlockedOnBlackHole:
-               raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
-               break;
            case BlockedOnException:
            case BlockedOnMVar:
-               raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
+               raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
                break;
            default:
                barf("deadlock: main thread blocked in a strange way");
@@ -915,12 +828,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
 #ifdef THREADED_RTS
     {
-      StgMainThread *m;
-      for(m = main_threads; m; m = m->link)
-      {
-       if(m->tso == t)
-         break;
-      }
+      StgMainThread *m = t->main;
       
       if(m)
       {
@@ -1144,7 +1052,6 @@ run_thread:
        */
       threadPaused(t);
       { 
-       StgMainThread *m;
        /* enlarge the stack */
        StgTSO *new_t = threadStackOverflow(t);
        
@@ -1152,10 +1059,8 @@ run_thread:
         * main thread stack.  It better not be on any other queues...
         * (it shouldn't be).
         */
-       for (m = main_threads; m != NULL; m = m->link) {
-         if (m->tso == t) {
-           m->tso = new_t;
-         }
+       if (t->main != NULL) {
+           t->main->tso = new_t;
        }
        threadPaused(new_t);
        PUSH_ON_RUN_QUEUE(new_t);
@@ -1318,8 +1223,74 @@ run_thread:
          !RtsFlags.ParFlags.ParStats.Suppressed) 
        DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
 #endif
+
+      //
+      // Check whether the thread that just completed was a main
+      // thread, and if so return with the result.  
+      //
+      // There is an assumption here that all thread completion goes
+      // through this point; we need to make sure that if a thread
+      // ends up in the ThreadKilled state, that it stays on the run
+      // queue so it can be dealt with here.
+      //
+      if (
+#if defined(RTS_SUPPORTS_THREADS)
+         mainThread != NULL
+#else
+         mainThread->tso == t
+#endif
+         )
+      {
+         // We are a bound thread: this must be our thread that just
+         // completed.
+         ASSERT(mainThread->tso == t);
+
+         if (t->what_next == ThreadComplete) {
+             if (mainThread->ret) {
+                 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+                 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
+             }
+             mainThread->stat = Success;
+         } else {
+             if (mainThread->ret) {
+                 *(mainThread->ret) = NULL;
+             }
+             if (was_interrupted) {
+                 mainThread->stat = Interrupted;
+             } else {
+                 mainThread->stat = Killed;
+             }
+         }
+#ifdef DEBUG
+         removeThreadLabel((StgWord)mainThread->tso->id);
+#endif
+         if (mainThread->prev == NULL) {
+             main_threads = mainThread->link;
+         } else {
+             mainThread->prev->link = mainThread->link;
+         }
+         if (mainThread->link != NULL) {
+             mainThread->link->prev = NULL;
+         }
+         releaseCapability(cap);
+         return;
+      }
+
+#ifdef RTS_SUPPORTS_THREADS
+      ASSERT(t->main == NULL);
+#else
+      if (t->main != NULL) {
+         // Must be a main thread that is not the topmost one.  Leave
+         // it on the run queue until the stack has unwound to the
+         // point where we can deal with this.  Leaving it on the run
+         // queue also ensures that the garbage collector knows about
+         // this thread and its return value (it gets dropped from the
+         // all_threads list so there's no other way to find it).
+         APPEND_TO_RUN_QUEUE(t);
+      }
+#endif
       break;
-      
+
     default:
       barf("schedule: invalid thread return code %d", (int)ret);
     }
@@ -1400,12 +1371,7 @@ StgBool
 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
 {
 #ifdef THREADED_RTS
-  StgMainThread *m;
-  for(m = main_threads; m; m = m->link)
-  {
-    if(m->tso == tso)
-      return rtsTrue;
-  }
+  return (tso->main != NULL);
 #endif
   return rtsFalse;
 }
@@ -1550,17 +1516,12 @@ suspendThread( StgRegTable *reg,
   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
   suspended_ccalling_threads = cap->r.rCurrentTSO;
 
-#if defined(RTS_SUPPORTS_THREADS)
-  if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
-  {
+  if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
-  }
-  else
-  {
+  } else {
       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
   }
-#endif
 
   /* Use the thread ID as the token; it should be unique */
   tok = cap->r.rCurrentTSO->id;
@@ -1616,13 +1577,10 @@ resumeThread( StgInt tok,
   }
   tso->link = END_TSO_QUEUE;
   
-#if defined(RTS_SUPPORTS_THREADS)
-  if(tso->why_blocked == BlockedOnCCall)
-  {
+  if(tso->why_blocked == BlockedOnCCall) {
       awakenBlockedQueueNoLock(tso->blocked_exceptions);
       tso->blocked_exceptions = NULL;
   }
-#endif
   
   /* Reset blocking status */
   tso->why_blocked  = NotBlocked;
@@ -1751,6 +1709,7 @@ createThread(nat size)
   tso->blocked_exceptions = NULL;
 
   tso->saved_errno = 0;
+  tso->main = NULL;
   
   tso->stack_size   = stack_size;
   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
@@ -1970,8 +1929,16 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
 
     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
     m->tso = tso;
+    tso->main = m;
     m->ret = ret;
     m->stat = NoStatus;
+    m->link = main_threads;
+    m->prev = NULL;
+    if (main_threads != NULL) {
+       main_threads->prev = m;
+    }
+    main_threads = m;
+
 #if defined(RTS_SUPPORTS_THREADS)
     // Allocating a new condition for each thread is expensive, so we
     // cache one.  This is a pretty feeble hack, but it helps speed up
@@ -1995,9 +1962,6 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
     */
     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
     
-    m->link = main_threads;
-    main_threads = m;
-    
     PUSH_ON_RUN_QUEUE(tso);
     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
     // bound and only runnable by *this* OS thread, so waking up other
@@ -2201,25 +2165,6 @@ GetRoots( evac_fn evac )
   // mark the signal handlers (signals should be already blocked)
   markSignalHandlers(evac);
 #endif
-
-  // main threads which have completed need to be retained until they
-  // are dealt with in the main scheduler loop.  They won't be
-  // retained any other way: the GC will drop them from the
-  // all_threads list, so we have to be careful to treat them as roots
-  // here.
-  { 
-      StgMainThread *m;
-      for (m = main_threads; m != NULL; m = m->link) {
-         switch (m->tso->what_next) {
-         case ThreadComplete:
-         case ThreadKilled:
-             evac((StgClosure **)&m->tso);
-             break;
-         default:
-             break;
-         }
-      }
-  }
 }
 
 /* -----------------------------------------------------------------------------
@@ -2635,7 +2580,6 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
 
 #else   /* !GRAN && !PAR */
 
-#ifdef RTS_SUPPORTS_THREADS
 void
 awakenBlockedQueueNoLock(StgTSO *tso)
 {
@@ -2643,7 +2587,6 @@ awakenBlockedQueueNoLock(StgTSO *tso)
     tso = unblockOneLocked(tso);
   }
 }
-#endif
 
 void
 awakenBlockedQueue(StgTSO *tso)
@@ -2996,11 +2939,12 @@ deleteThreadImmediately(StgTSO *tso)
   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
       return;
   }
-#if defined(RTS_SUPPORTS_THREADS)
-  if (tso->why_blocked != BlockedOnCCall
-      && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
-#endif
+
+  if (tso->why_blocked != BlockedOnCCall &&
+      tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
     unblockThread(tso);
+  }
+
   tso->what_next = ThreadKilled;
 }
 
@@ -3334,14 +3278,12 @@ printThreadBlockage(StgTSO *tso)
            tso->block_info.closure, info_type(tso->block_info.closure));
     break;
 #endif
-#if defined(RTS_SUPPORTS_THREADS)
   case BlockedOnCCall:
     fprintf(stderr,"is blocked on an external call");
     break;
   case BlockedOnCCall_NoUnblockExc:
     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
     break;
-#endif
   default:
     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
         tso->why_blocked, tso->id, tso);
index 7e12482..dc3763d 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.44 2004/02/26 16:14:21 simonmar Exp $
+ * $Id: Schedule.h,v 1.45 2004/03/01 14:18:36 simonmar Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -33,10 +33,8 @@ void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
 #elif defined(PAR)
 void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
 #else
-void awakenBlockedQueue(StgTSO *tso);
-#if defined(RTS_SUPPORTS_THREADS)
-void awakenBlockedQueueNoLock(StgTSO *tso);
-#endif
+void awakenBlockedQueue (StgTSO *tso);
+void awakenBlockedQueueNoLock (StgTSO *tso);
 #endif
 
 /* unblockOne()
@@ -203,6 +201,7 @@ typedef struct StgMainThread_ {
   Condition        wakeup;
 #endif
 #endif
+  struct StgMainThread_ *prev;
   struct StgMainThread_ *link;
 } StgMainThread;