[project @ 2005-05-05 13:17:47 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index d078df2..036c5b0 100644 (file)
@@ -174,14 +174,12 @@ static StgTSO *suspended_ccalling_threads;
 /* flag set by signal handler to precipitate a context switch */
 int context_switch = 0;
 
+/* flag that tracks whether we have done any execution in this time slice. */
+nat recent_activity = ACTIVITY_YES;
+
 /* if this flag is set as well, give up execution */
 rtsBool interrupted = rtsFalse;
 
-/* If this flag is set, we are running Haskell code.  Used to detect
- * uses of 'foreign import unsafe' that should be 'safe'.
- */
-static rtsBool in_haskell = rtsFalse;
-
 /* Next thread ID to allocate.
  * Locks required: thread_id_mutex
  */
@@ -318,13 +316,13 @@ StgTSO * activateSpark (rtsSpark spark);
  * ------------------------------------------------------------------------- */
 
 #if defined(RTS_SUPPORTS_THREADS)
-static rtsBool startingWorkerThread = rtsFalse;
+static nat startingWorkerThread = 0;
 
 static void
 taskStart(void)
 {
   ACQUIRE_LOCK(&sched_mutex);
-  startingWorkerThread = rtsFalse;
+  startingWorkerThread--;
   schedule(NULL,NULL);
   taskStop();
   RELEASE_LOCK(&sched_mutex);
@@ -335,14 +333,14 @@ startSchedulerTaskIfNecessary(void)
 {
     if ( !EMPTY_RUN_QUEUE()
         && !shutting_down_scheduler // not if we're shutting down
-        && !startingWorkerThread)
+        && startingWorkerThread==0)
     {
        // we don't want to start another worker thread
        // just because the last one hasn't yet reached the
        // "waiting for capability" state
-       startingWorkerThread = rtsTrue;
+       startingWorkerThread++;
        if (!maybeStartNewWorker(taskStart)) {
-           startingWorkerThread = rtsFalse;
+           startingWorkerThread--;
        }
     }
 }
@@ -478,11 +476,20 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       // We now have a capability...
 #endif
+      
+#if 0 /* extra sanity checking */
+      { 
+         StgMainThread *m;
+         for (m = main_threads; m != NULL; m = m->link) {
+             ASSERT(get_itbl(m->tso)->type == TSO);
+         }
+      }
+#endif
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
-    if (in_haskell) {
+    if (cap->r.rInHaskell) {
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(1);
@@ -671,7 +678,9 @@ run_thread:
     prev_what_next = t->what_next;
 
     errno = t->saved_errno;
-    in_haskell = rtsTrue;
+    cap->r.rInHaskell = rtsTrue;
+
+    recent_activity = ACTIVITY_YES;
 
     switch (prev_what_next) {
 
@@ -699,7 +708,7 @@ run_thread:
        blackholes_need_checking = rtsTrue;
     }
 
-    in_haskell = rtsFalse;
+    cap->r.rInHaskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
     // happened.  So find the new location:
@@ -855,6 +864,12 @@ scheduleCheckBlackHoles( void )
 static void
 scheduleDetectDeadlock(void)
 {
+
+#if defined(PARALLEL_HASKELL)
+    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+    return;
+#endif
+
     /* 
      * Detect deadlock: when we have no threads to run, there are no
      * threads blocked, waiting for I/O, or sleeping, and all the
@@ -863,7 +878,16 @@ scheduleDetectDeadlock(void)
      */
     if ( EMPTY_THREAD_QUEUES() )
     {
-#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS)
+       /* 
+        * In the threaded RTS, we only check for deadlock if there
+        * has been no activity in a complete timeslice.  This means
+        * we won't eagerly start a full GC just because we don't have
+        * any threads to run currently.
+        */
+       if (recent_activity != ACTIVITY_INACTIVE) return;
+#endif
+
        IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
 
        // Garbage collection can release some new threads due to
@@ -872,9 +896,10 @@ scheduleDetectDeadlock(void)
        // exception.  Any threads thus released will be immediately
        // runnable.
        GarbageCollect(GetRoots,rtsTrue);
+       recent_activity = ACTIVITY_DONE_GC;
        if ( !EMPTY_RUN_QUEUE() ) return;
 
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
@@ -896,6 +921,7 @@ scheduleDetectDeadlock(void)
        }
 #endif
 
+#if !defined(RTS_SUPPORTS_THREADS)
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception (or in the SMP build, send *all* main
         * threads the deadlock exception, since none of them can make
@@ -905,6 +931,7 @@ scheduleDetectDeadlock(void)
            StgMainThread *m;
            m = main_threads;
            switch (m->tso->why_blocked) {
+           case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnException:
            case BlockedOnMVar:
@@ -914,11 +941,6 @@ scheduleDetectDeadlock(void)
                barf("deadlock: main thread blocked in a strange way");
            }
        }
-
-#elif defined(RTS_SUPPORTS_THREADS)
-    // ToDo: add deadlock detection in threaded RTS
-#elif defined(PARALLEL_HASKELL)
-    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
 #endif
     }
 }
@@ -1468,10 +1490,10 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            } else {
 #if !defined(SMP)
                ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
-                      g0s0->blocks == cap->r.rNursery);
+                      g0s0 == cap->r.rNursery);
                g0s0->blocks = bd;
 #endif
-               cap->r.rNursery = bd;
+               cap->r.rNursery->blocks = bd;
            }             
            cap->r.rCurrentNursery->u.back = bd;
            
@@ -1806,12 +1828,13 @@ scheduleHandleThreadFinished( StgMainThread *mainThread
          removeThreadLabel((StgWord)mainThread->tso->id);
 #endif
          if (mainThread->prev == NULL) {
+             ASSERT(mainThread == main_threads);
              main_threads = mainThread->link;
          } else {
              mainThread->prev->link = mainThread->link;
          }
          if (mainThread->link != NULL) {
-             mainThread->link->prev = NULL;
+             mainThread->link->prev = mainThread->prev;
          }
          releaseCapability(cap);
          return rtsTrue; // tells schedule() to return
@@ -1865,6 +1888,7 @@ scheduleDoGC( Capability *cap STG_UNUSED )
 {
     StgTSO *t;
 #ifdef SMP
+    static rtsBool waiting_for_gc;
     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
            // subtract one because we're already holding one.
     Capability *caps[n_capabilities];
@@ -1882,6 +1906,10 @@ scheduleDoGC( Capability *cap STG_UNUSED )
     // the other tasks to sleep and stay asleep.
     //
        
+    // Someone else is already trying to GC
+    if (waiting_for_gc) return;
+    waiting_for_gc = rtsTrue;
+
     caps[n_capabilities] = cap;
     while (n_capabilities > 0) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
@@ -1889,6 +1917,8 @@ scheduleDoGC( Capability *cap STG_UNUSED )
        n_capabilities--;
        caps[n_capabilities] = cap;
     }
+
+    waiting_for_gc = rtsFalse;
 #endif
 
     /* Kick any transactions which are invalid back to their
@@ -1955,7 +1985,7 @@ scheduleDoGC( Capability *cap STG_UNUSED )
 StgBool
 rtsSupportsBoundThreads(void)
 {
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
   return rtsTrue;
 #else
   return rtsFalse;
@@ -1969,7 +1999,7 @@ rtsSupportsBoundThreads(void)
 StgBool
 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
 {
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
   return (tso->main != NULL);
 #endif
   return rtsFalse;
@@ -2131,6 +2161,7 @@ suspendThread( StgRegTable *reg )
   tok = cap->r.rCurrentTSO->id;
 
   /* Hand back capability */
+  cap->r.rInHaskell = rtsFalse;
   releaseCapability(cap);
   
 #if defined(RTS_SUPPORTS_THREADS)
@@ -2140,7 +2171,6 @@ suspendThread( StgRegTable *reg )
   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
 #endif
 
-  in_haskell = rtsFalse;
   RELEASE_LOCK(&sched_mutex);
   
   errno = saved_errno;
@@ -2188,7 +2218,7 @@ resumeThread( StgInt tok )
   tso->why_blocked  = NotBlocked;
 
   cap->r.rCurrentTSO = tso;
-  in_haskell = rtsTrue;
+  cap->r.rInHaskell = rtsTrue;
   RELEASE_LOCK(&sched_mutex);
   errno = saved_errno;
   return &cap->r;
@@ -2630,6 +2660,7 @@ initScheduler(void)
 
 #if defined(SMP)
   /* eagerly start some extra workers */
+  startingWorkerThread = RtsFlags.ParFlags.nNodes;
   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
 #endif
 
@@ -3216,6 +3247,11 @@ interruptStgRts(void)
 {
     interrupted    = 1;
     context_switch = 1;
+    threadRunnable();
+    /* ToDo: if invoked from a signal handler, this threadRunnable
+     * only works if there's another thread (not this one) waiting to
+     * be woken up.
+     */
 }
 
 /* -----------------------------------------------------------------------------
@@ -3339,6 +3375,12 @@ unblockThread(StgTSO *tso)
              blocked_queue_tl = (StgTSO *)prev;
            }
          }
+#if defined(mingw32_HOST_OS)
+         /* (Cooperatively) signal that the worker thread should abort
+          * the request.
+          */
+         abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
          goto done;
        }
       }
@@ -3473,6 +3515,12 @@ unblockThread(StgTSO *tso)
              blocked_queue_tl = prev;
            }
          }
+#if defined(mingw32_HOST_OS)
+         /* (Cooperatively) signal that the worker thread should abort
+          * the request.
+          */
+         abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
          goto done;
        }
       }
@@ -3735,12 +3783,12 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
 #ifdef PROFILING
            StgCatchFrame *cf = (StgCatchFrame *)frame;
 #endif
-           StgClosure *raise;
+           StgThunk *raise;
            
            // we've got an exception to raise, so let's pass it to the
            // handler in this frame.
            //
-           raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
+           raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
            TICK_ALLOC_SE_THK(1,0);
            SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
            raise->payload[0] = exception;
@@ -3778,7 +3826,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
            // fun field.
            //
            words = frame - sp - 1;
-           ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
+           ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
            
            ap->size = words;
            ap->fun  = (StgClosure *)sp[0];
@@ -3845,7 +3893,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
 StgWord
 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
 {
-    StgClosure *raise_closure = NULL;
+    StgThunk *raise_closure = NULL;
     StgPtr p, next;
     StgRetInfoTable *info;
     //
@@ -3882,11 +3930,11 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
            // Only create raise_closure if we need to.
            if (raise_closure == NULL) {
                raise_closure = 
-                   (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
+                   (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
-           UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
+           UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
            p = next;
            continue;