merge GHC HEAD
[ghc-hetmet.git] / rts / Capability.c
index 5f54eca..fe5dbdc 100644 (file)
@@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void)
 STATIC_INLINE rtsBool
 globalWorkToDo (void)
 {
-    return blackholes_need_checking
-       || sched_state >= SCHED_INTERRUPTING
-       ;
+    return sched_state >= SCHED_INTERRUPTING
+        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
 }
 #endif
 
@@ -220,13 +219,16 @@ initCapability( Capability *cap, nat i )
     initMutex(&cap->lock);
     cap->running_task      = NULL; // indicates cap is free
     cap->spare_workers     = NULL;
+    cap->n_spare_workers   = 0;
     cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
     cap->inbox              = (Message*)END_TSO_QUEUE;
     cap->sparks_created     = 0;
+    cap->sparks_dud         = 0;
     cap->sparks_converted   = 0;
-    cap->sparks_pruned      = 0;
+    cap->sparks_gcd         = 0;
+    cap->sparks_fizzled     = 0;
 #endif
 
     cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
@@ -251,6 +253,8 @@ initCapability( Capability *cap, nat i )
     cap->transaction_tokens = 0;
     cap->context_switch = 0;
     cap->pinned_object_block = NULL;
+
+    traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
 }
 
 /* ---------------------------------------------------------------------------
@@ -264,6 +268,10 @@ initCapability( Capability *cap, nat i )
 void
 initCapabilities( void )
 {
+    /* Declare a single capability set representing the process. 
+       Each capability will get added to this capset. */ 
+    traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
+
 #if defined(THREADED_RTS)
     nat i;
 
@@ -456,14 +464,33 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
 
     task = cap->running_task;
 
+    // If the Task is stopped, we shouldn't be yielding, we should
+    // be just exiting.
+    ASSERT(!task->stopped);
+
     // If the current task is a worker, save it on the spare_workers
     // list of this Capability.  A worker can mark itself as stopped,
     // in which case it is not replaced on the spare_worker queue.
     // This happens when the system is shutting down (see
     // Schedule.c:workerStart()).
-    if (!isBoundTask(task) && !task->stopped) {
-       task->next = cap->spare_workers;
-       cap->spare_workers = task;
+    if (!isBoundTask(task))
+    {
+        if (cap->n_spare_workers < MAX_SPARE_WORKERS)
+        {
+            task->next = cap->spare_workers;
+            cap->spare_workers = task;
+            cap->n_spare_workers++;
+        }
+        else
+        {
+            debugTrace(DEBUG_sched, "%d spare workers already, exiting",
+                       cap->n_spare_workers);
+            releaseCapability_(cap,rtsFalse);
+            // hold the lock until after workerTaskStop; c.f. scheduleWorker()
+            workerTaskStop(task);
+            RELEASE_LOCK(&cap->lock);
+            shutdownThread();
+        }
     }
     // Bound tasks just float around attached to their TSOs.
 
@@ -620,7 +647,8 @@ yieldCapability (Capability** pCap, Task *task)
                }
                cap->spare_workers = task->next;
                task->next = NULL;
-           }
+                cap->n_spare_workers--;
+            }
            cap->running_task = task;
            RELEASE_LOCK(&cap->lock);
            break;
@@ -637,60 +665,48 @@ yieldCapability (Capability** pCap, Task *task)
 }
 
 /* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
+ * prodCapability
  *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
+ * If a Capability is currently idle, wake up a Task on it.  Used to 
+ * get every Capability into the GC.
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *cap,
-                          Capability *other_cap, 
-                          StgTSO *tso)
+prodCapability (Capability *cap, Task *task)
 {
-    MessageWakeup *msg;
-
-    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
-    if (tso->bound) {
-       ASSERT(tso->bound->task->cap == tso->cap);
-       tso->bound->task->cap = other_cap;
+    ACQUIRE_LOCK(&cap->lock);
+    if (!cap->running_task) {
+        cap->running_task = task;
+        releaseCapability_(cap,rtsTrue);
     }
-    tso->cap = other_cap;
-
-    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-           tso->block_info.closure->header.info == &stg_IND_info);
-
-    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
-    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
-    msg->header.info = &stg_MSG_WAKEUP_info;
-    msg->tso = tso;
-    tso->block_info.closure = (StgClosure *)msg;
-    dirty_TSO(cap, tso);
-    write_barrier();
-    tso->why_blocked = BlockedOnMsgWakeup;
-
-    sendMessage(other_cap, (Message*)msg);
+    RELEASE_LOCK(&cap->lock);
 }
 
 /* ----------------------------------------------------------------------------
- * prodCapability
+ * tryGrabCapability
+ *
+ * Attempt to gain control of a Capability if it is free.
  *
- * If a Capability is currently idle, wake up a Task on it.  Used to 
- * get every Capability into the GC.
  * ------------------------------------------------------------------------- */
 
-void
-prodCapability (Capability *cap, Task *task)
+rtsBool
+tryGrabCapability (Capability *cap, Task *task)
 {
+    if (cap->running_task != NULL) return rtsFalse;
     ACQUIRE_LOCK(&cap->lock);
-    if (!cap->running_task) {
-        cap->running_task = task;
-        releaseCapability_(cap,rtsTrue);
+    if (cap->running_task != NULL) {
+       RELEASE_LOCK(&cap->lock);
+       return rtsFalse;
     }
+    task->cap = cap;
+    cap->running_task = task;
     RELEASE_LOCK(&cap->lock);
+    return rtsTrue;
 }
 
+
+#endif /* THREADED_RTS */
+
 /* ----------------------------------------------------------------------------
  * shutdownCapability
  *
@@ -707,8 +723,11 @@ prodCapability (Capability *cap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
-shutdownCapability (Capability *cap, Task *task, rtsBool safe)
+shutdownCapability (Capability *cap,
+                    Task *task USED_IF_THREADS,
+                    rtsBool safe USED_IF_THREADS)
 {
+#if defined(THREADED_RTS)
     nat i;
 
     task->cap = cap;
@@ -746,12 +765,13 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
                 if (!osThreadIsAlive(t->id)) {
                     debugTrace(DEBUG_sched, 
                                "worker thread %p has died unexpectedly", (void *)t->id);
-                        if (!prev) {
-                            cap->spare_workers = t->next;
-                        } else {
-                            prev->next = t->next;
-                        }
-                        prev = t;
+                    cap->n_spare_workers--;
+                    if (!prev) {
+                        cap->spare_workers = t->next;
+                    } else {
+                        prev->next = t->next;
+                    }
+                    prev = t;
                 }
             }
         }
@@ -799,33 +819,23 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
     // threads performing foreign calls that will eventually try to 
     // return via resumeThread() and attempt to grab cap->lock.
     // closeMutex(&cap->lock);
-}
+    
+#endif /* THREADED_RTS */
 
-/* ----------------------------------------------------------------------------
- * tryGrabCapability
- *
- * Attempt to gain control of a Capability if it is free.
- *
- * ------------------------------------------------------------------------- */
+    traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
+}
 
-rtsBool
-tryGrabCapability (Capability *cap, Task *task)
+void
+shutdownCapabilities(Task *task, rtsBool safe)
 {
-    if (cap->running_task != NULL) return rtsFalse;
-    ACQUIRE_LOCK(&cap->lock);
-    if (cap->running_task != NULL) {
-       RELEASE_LOCK(&cap->lock);
-       return rtsFalse;
+    nat i;
+    for (i=0; i < n_capabilities; i++) {
+        ASSERT(task->incall->tso == NULL);
+        shutdownCapability(&capabilities[i], task, safe);
     }
-    task->cap = cap;
-    cap->running_task = task;
-    RELEASE_LOCK(&cap->lock);
-    return rtsTrue;
+    traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
 }
 
-
-#endif /* THREADED_RTS */
-
 static void
 freeCapability (Capability *cap)
 {
@@ -856,11 +866,9 @@ freeCapabilities (void)
    ------------------------------------------------------------------------ */
 
 void
-markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, 
-                      rtsBool prune_sparks USED_IF_THREADS)
+markCapability (evac_fn evac, void *user, Capability *cap,
+                rtsBool no_mark_sparks USED_IF_THREADS)
 {
-    nat i;
-    Capability *cap;
     InCall *incall;
 
     // Each GC thread is responsible for following roots from the
@@ -868,62 +876,31 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
     // or fewer Capabilities as GC threads, but just in case there
     // are more, we mark every Capability whose number is the GC
     // thread's index plus a multiple of the number of GC threads.
-    for (i = i0; i < n_capabilities; i += delta) {
-       cap = &capabilities[i];
-       evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
-       evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
+    evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
+    evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
-        evac(user, (StgClosure **)(void *)&cap->inbox);
+    evac(user, (StgClosure **)(void *)&cap->inbox);
 #endif
-       for (incall = cap->suspended_ccalls; incall != NULL; 
-            incall=incall->next) {
-           evac(user, (StgClosure **)(void *)&incall->suspended_tso);
-       }
+    for (incall = cap->suspended_ccalls; incall != NULL;
+         incall=incall->next) {
+        evac(user, (StgClosure **)(void *)&incall->suspended_tso);
+    }
 
 #if defined(THREADED_RTS)
-        if (prune_sparks) {
-            pruneSparkQueue (evac, user, cap);
-        } else {
-            traverseSparkQueue (evac, user, cap);
-        }
-#endif
+    if (!no_mark_sparks) {
+        traverseSparkQueue (evac, user, cap);
     }
+#endif
 
-#if !defined(THREADED_RTS)
-    evac(user, (StgClosure **)(void *)&blocked_queue_hd);
-    evac(user, (StgClosure **)(void *)&blocked_queue_tl);
-    evac(user, (StgClosure **)(void *)&sleeping_queue);
-#endif 
+    // Free STM structures for this Capability
+    stmPreGCHook(cap);
 }
 
 void
 markCapabilities (evac_fn evac, void *user)
 {
-    markSomeCapabilities(evac, user, 0, 1, rtsFalse);
-}
-
-/* -----------------------------------------------------------------------------
-   Messages
-   -------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void sendMessage(Capability *cap, Message *msg)
-{
-    ACQUIRE_LOCK(&cap->lock);
-
-    msg->link = cap->inbox;
-    cap->inbox = msg;
-
-    if (cap->running_task == NULL) {
-       cap->running_task = myTask(); 
-            // precond for releaseCapability_()
-       releaseCapability_(cap,rtsFalse);
-    } else {
-        contextSwitchCapability(cap);
+    nat n;
+    for (n = 0; n < n_capabilities; n++) {
+        markCapability(evac, user, &capabilities[n], rtsFalse);
     }
-
-    RELEASE_LOCK(&cap->lock);
 }
-
-#endif // THREADED_RTS