RTS tidyup sweep, first phase
[ghc-hetmet.git] / rts / RaiseAsync.c
index 10d91a7..39c973b 100644 (file)
@@ -8,16 +8,17 @@
 
 #include "PosixSource.h"
 #include "Rts.h"
+
+#include "sm/Storage.h"
 #include "Threads.h"
 #include "Trace.h"
 #include "RaiseAsync.h"
-#include "SMP.h"
 #include "Schedule.h"
-#include "LdvProfile.h"
 #include "Updates.h"
 #include "STM.h"
 #include "Sanity.h"
 #include "Profiling.h"
+#include "eventlog/EventLog.h"
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
@@ -26,7 +27,7 @@ static void raiseAsync (Capability *cap,
                        StgTSO *tso,
                        StgClosure *exception, 
                        rtsBool stop_at_atomically,
-                       StgPtr stop_here);
+                       StgUpdateFrame *stop_here);
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
@@ -55,12 +56,12 @@ static void performBlockedException (Capability *cap,
 void
 throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
 {
-    throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL);
+    throwToSingleThreaded_(cap, tso, exception, rtsFalse);
 }
 
 void
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-                      rtsBool stop_at_atomically, StgPtr stop_here)
+                      rtsBool stop_at_atomically)
 {
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
@@ -70,11 +71,11 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
     // Remove it from any blocking queues
     removeFromQueues(cap,tso);
 
-    raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
+    raiseAsync(cap, tso, exception, stop_at_atomically, NULL);
 }
 
 void
-suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
@@ -264,6 +265,15 @@ check_target:
                target = target->_link;
                goto retry;
            }
+            // check again for ThreadComplete and ThreadKilled.  This
+            // cooperates with scheduleHandleThreadFinished to ensure
+            // that we never miss any threads that are throwing an
+            // exception to a thread in the process of terminating.
+            if (target->what_next == ThreadComplete
+                || target->what_next == ThreadKilled) {
+               unlockTSO(target);
+                return THROWTO_SUCCESS;
+            }
            blockedThrowTo(cap,source,target);
            *out = target;
            return THROWTO_BLOCKED;
@@ -415,6 +425,7 @@ check_target:
        // Unblocking BlockedOnSTM threads requires the TSO to be
        // locked; see STM.c:unpark_tso().
        if (target->why_blocked != BlockedOnSTM) {
+           unlockTSO(target);
            goto retry;
        }
        if ((target->flags & TSO_BLOCKEX) &&
@@ -436,6 +447,11 @@ check_target:
        // thread is blocking exceptions, and block on its
        // blocked_exception queue.
        lockTSO(target);
+       if (target->why_blocked != BlockedOnCCall &&
+           target->why_blocked != BlockedOnCCall_NoUnblockExc) {
+           unlockTSO(target);
+            goto retry;
+       }
        blockedThrowTo(cap,source,target);
        *out = target;
        return THROWTO_BLOCKED;
@@ -512,6 +528,15 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
     StgTSO *source;
     
+    if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
+        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+            awakenBlockedExceptionQueue(cap,tso);
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
     if (tso->blocked_exceptions != END_TSO_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
         debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
@@ -543,15 +568,16 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
     return 0;
 }
 
+// awakenBlockedExceptionQueue(): Just wake up the whole queue of
+// blocked exceptions and let them try again.
+
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
-    if (tso->blocked_exceptions != END_TSO_QUEUE) {
-       lockTSO(tso);
-       awakenBlockedQueue(cap, tso->blocked_exceptions);
-       tso->blocked_exceptions = END_TSO_QUEUE;
-       unlockTSO(tso);
-    }
+    lockTSO(tso);
+    awakenBlockedQueue(cap, tso->blocked_exceptions);
+    tso->blocked_exceptions = END_TSO_QUEUE;
+    unlockTSO(tso);
 }    
 
 static void
@@ -576,166 +602,10 @@ performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
    This is for use when we raise an exception in another thread, which
    may be blocked.
 
-   Precondition: we have exclusive access to the TSO, which entails
-   holding a lock on the object that owns the queue, if the TSO is
-   blocked.  e.g. if the thread is blocked on an MVar, we must hold a
-   lock on the MVar before calling removeFromQueues().
-
-   This has nothing to do with the UnblockThread event in GranSim. -- HWL
+   Precondition: we have exclusive access to the TSO, via the same set
+   of conditions as throwToSingleThreaded() (c.f.).
    -------------------------------------------------------------------------- */
 
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
-  NB: only the type of the blocking queue is different in GranSim and GUM
-      the operations on the queue-elements are the same
-      long live polymorphism!
-
-  Locks: sched_mutex is held upon entry and exit.
-
-*/
-static void
-removeFromQueues(Capability *cap, StgTSO *tso)
-{
-  StgBlockingQueueElement *t, **last;
-
-  switch (tso->why_blocked) {
-
-  case NotBlocked:
-    return;  /* not blocked */
-
-  case BlockedOnSTM:
-    // Be careful: nothing to do here!  We tell the scheduler that the thread
-    // is runnable and we leave it to the stack-walking code to abort the 
-    // transaction while unwinding the stack.  We should perhaps have a debugging
-    // test to make sure that this really happens and that the 'zombie' transaction
-    // does not get committed.
-    goto done;
-
-  case BlockedOnMVar:
-    ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
-    {
-      StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
-      StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
-      last = (StgBlockingQueueElement **)&mvar->head;
-      for (t = (StgBlockingQueueElement *)mvar->head; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, last_tso = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         if (mvar->tail == tso) {
-           mvar->tail = (StgTSO *)last_tso;
-         }
-         goto done;
-       }
-      }
-      barf("removeFromQueues (MVAR): TSO not found");
-    }
-
-  case BlockedOnBlackHole:
-    ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
-    {
-      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
-      last = &bq->blocking_queue;
-      for (t = bq->blocking_queue; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         goto done;
-       }
-      }
-      barf("removeFromQueues (BLACKHOLE): TSO not found");
-    }
-
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target2->link;
-         ASSERT(get_itbl(target)->type == TSO);
-      }
-
-      last = (StgBlockingQueueElement **)&target->blocked_exceptions;
-      for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, t = t->link) {
-       ASSERT(get_itbl(t)->type == TSO);
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         goto done;
-       }
-      }
-      barf("removeFromQueues (Exception): TSO not found");
-    }
-
-  case BlockedOnRead:
-  case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
-  case BlockedOnDoProc:
-#endif
-    {
-      /* take TSO off blocked_queue */
-      StgBlockingQueueElement *prev = NULL;
-      for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         if (prev == NULL) {
-           blocked_queue_hd = (StgTSO *)t->link;
-           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
-             blocked_queue_tl = END_TSO_QUEUE;
-           }
-         } else {
-           prev->link = t->link;
-           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
-             blocked_queue_tl = (StgTSO *)prev;
-           }
-         }
-#if defined(mingw32_HOST_OS)
-         /* (Cooperatively) signal that the worker thread should abort
-          * the request.
-          */
-         abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
-         goto done;
-       }
-      }
-      barf("removeFromQueues (I/O): TSO not found");
-    }
-
-  case BlockedOnDelay:
-    {
-      /* take TSO off sleeping_queue */
-      StgBlockingQueueElement *prev = NULL;
-      for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         if (prev == NULL) {
-           sleeping_queue = (StgTSO *)t->link;
-         } else {
-           prev->link = t->link;
-         }
-         goto done;
-       }
-      }
-      barf("removeFromQueues (delay): TSO not found");
-    }
-
-  default:
-    barf("removeFromQueues: %d", tso->why_blocked);
-  }
-
- done:
-  tso->link = END_TSO_QUEUE;
-  tso->why_blocked = NotBlocked;
-  tso->block_info.closure = NULL;
-  pushOnRunQueue(cap,tso);
-}
-#else
 static void
 removeFromQueues(Capability *cap, StgTSO *tso)
 {
@@ -758,9 +628,6 @@ removeFromQueues(Capability *cap, StgTSO *tso)
       goto done;
 
   case BlockedOnBlackHole:
-      // we have exclusive access to this TSO, which implies that we
-      // must hold sched_mutex:
-      ASSERT_LOCK_HELD(&sched_mutex);
       removeThreadFromQueue(cap, &blackhole_queue, tso);
       goto done;
 
@@ -806,18 +673,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
   }
 
  done:
-  tso->_link = END_TSO_QUEUE; // no write barrier reqd
-  tso->why_blocked = NotBlocked;
-  tso->block_info.closure = NULL;
-  appendToRunQueue(cap,tso);
-
-  // We might have just migrated this TSO to our Capability:
-  if (tso->bound) {
-      tso->bound->cap = cap;
-  }
-  tso->cap = cap;
+  unblockOne(cap, tso);
 }
-#endif
 
 /* -----------------------------------------------------------------------------
  * raiseAsync()
@@ -858,10 +715,11 @@ removeFromQueues(Capability *cap, StgTSO *tso)
 
 static void
 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, 
-          rtsBool stop_at_atomically, StgPtr stop_here)
+          rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
 {
     StgRetInfoTable *info;
     StgPtr sp, frame;
+    StgClosure *updatee;
     nat i;
 
     debugTrace(DEBUG_sched,
@@ -888,6 +746,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     // layers should deal with that.
     ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
 
+    if (stop_here != NULL) {
+        updatee = stop_here->updatee;
+    } else {
+        updatee = NULL;
+    }
+
     // The stack freezing code assumes there's a closure pointer on
     // the top of the stack, so we have to arrange that this is the case...
     //
@@ -899,7 +763,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     }
 
     frame = sp + 1;
-    while (stop_here == NULL || frame < stop_here) {
+    while (stop_here == NULL || frame < (StgPtr)stop_here) {
 
        // 1. Let the top of the stack be the "current closure"
        //
@@ -953,11 +817,19 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
            //       printObj((StgClosure *)ap);
            //  );
 
-            // Perform the update
-            // TODO: this may waste some work, if the thunk has
-            // already been updated by another thread.
-            UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
-                           (StgClosure *)ap);
+            if (((StgUpdateFrame *)frame)->updatee == updatee) {
+                // If this update frame points to the same closure as
+                // the update frame further down the stack
+                // (stop_here), then don't perform the update.  We
+                // want to keep the blackhole in this case, so we can
+                // detect and report the loop (#2783).
+                ap = (StgAP_STACK*)updatee;
+            } else {
+                // Perform the update
+                // TODO: this may waste some work, if the thunk has
+                // already been updated by another thread.
+                UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+            }
 
            sp += sizeofW(StgUpdateFrame) - 1;
            sp[0] = (W_)ap; // push onto stack