[project @ 1999-10-04 16:13:18 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 09aeb15..8f3f7e3 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.20 1999/04/27 10:59:31 sewardj Exp $
+ * $Id: Schedule.c,v 1.26 1999/10/04 16:13:18 simonmar Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -76,6 +76,23 @@ StgTSO   *MainTSO;
 static void unblockThread(StgTSO *tso);
 
 /* -----------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * -------------------------------------------------------------------------- */
+
+int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
+{ 
+  StgThreadID id1 = tso1->id; 
+  StgThreadID id2 = tso2->id;
+  if (id1 < id2) return (-1);
+  if (id1 > id2) return 1;
+  return 0;
+}
+
+/* -----------------------------------------------------------------------------
    Create a new thread.
 
    The new thread starts with the given stack size.  Before the
@@ -110,7 +127,7 @@ initThread(StgTSO *tso, nat stack_size)
   SET_INFO(tso,&TSO_info);
   tso->whatNext     = ThreadEnterGHC;
   tso->id           = next_thread_id++;
-  tso->blocked_on   = NULL;
+  tso->why_blocked  = NotBlocked;
 
   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
   tso->stack_size   = stack_size;
@@ -243,7 +260,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val)
     /* If we have more threads on the run queue, set up a context
      * switch at some point in the future.
      */
-    if (run_queue_hd != END_TSO_QUEUE) {
+    if (run_queue_hd != END_TSO_QUEUE || blocked_queue_hd != END_TSO_QUEUE) {
       context_switch = 1;
     } else {
       context_switch = 0;
@@ -375,7 +392,10 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val)
       break;
 
     case ThreadBlocked:
-      IF_DEBUG(scheduler,belch("Thread %ld stopped, blocking\n", t->id));
+      IF_DEBUG(scheduler,
+              fprintf(stderr, "Thread %d stopped, ", t->id);
+              printThreadBlockage(t);
+              fprintf(stderr, "\n"));
       threadPaused(t);
       /* assume the thread has put itself on some blocked queue
        * somewhere.
@@ -421,6 +441,14 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val)
     }
 
   next_thread:
+    /* Checked whether any waiting threads need to be woken up.
+     * If the run queue is empty, we can wait indefinitely for
+     * something to happen.
+     */
+    if (blocked_queue_hd != END_TSO_QUEUE) {
+      awaitEvent(run_queue_hd == END_TSO_QUEUE);
+    }
+
     t = run_queue_hd;
     if (t != END_TSO_QUEUE) {
       run_queue_hd = t->link;
@@ -431,12 +459,42 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val)
     }
   }
 
-  if (blocked_queue_hd != END_TSO_QUEUE) {
-    return AllBlocked;
-  } else {
-    return Deadlock;
+  /* If we got to here, then we ran out of threads to run, but the
+   * main thread hasn't finished yet.  It must be blocked on an MVar
+   * or a black hole somewhere, so we return deadlock.
+   */
+  return Deadlock;
+}
+
+/* -----------------------------------------------------------------------------
+   Debugging: why is a thread blocked
+   -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+void printThreadBlockage(StgTSO *tso)
+{
+  switch (tso->why_blocked) {
+  case BlockedOnRead:
+    fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
+    break;
+  case BlockedOnWrite:
+    fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
+    break;
+  case BlockedOnDelay:
+    fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
+    break;
+  case BlockedOnMVar:
+    fprintf(stderr,"blocked on an MVar");
+    break;
+  case BlockedOnBlackHole:
+    fprintf(stderr,"blocked on a black hole");
+    break;
+  case NotBlocked:
+    fprintf(stderr,"not blocked");
+    break;
   }
 }
+#endif
 
 /* -----------------------------------------------------------------------------
    Where are the roots that we know about?
@@ -571,7 +629,7 @@ threadStackOverflow(StgTSO *tso)
   tso->whatNext = ThreadKilled;
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->su = (StgUpdateFrame *)tso->sp;
-  tso->blocked_on = NULL;
+  tso->why_blocked = NotBlocked;
   dest->mut_link = NULL;
 
   IF_DEBUG(sanity,checkTSO(tso));
@@ -585,21 +643,27 @@ threadStackOverflow(StgTSO *tso)
 }
 
 /* -----------------------------------------------------------------------------
-   Wake up a queue that was blocked on some resource (usually a
-   computation in progress).
+   Wake up a queue that was blocked on some resource.
    -------------------------------------------------------------------------- */
 
-void awaken_blocked_queue(StgTSO *q)
+StgTSO *unblockOne(StgTSO *tso)
 {
-  StgTSO *tso;
+  StgTSO *next;
+
+  ASSERT(get_itbl(tso)->type == TSO);
+  ASSERT(tso->why_blocked != NotBlocked);
+  tso->why_blocked = NotBlocked;
+  next = tso->link;
+  tso->link = END_TSO_QUEUE;
+  PUSH_ON_RUN_QUEUE(tso);
+  IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id));
+  return next;
+}
 
-  while (q != END_TSO_QUEUE) {
-    ASSERT(get_itbl(q)->type == TSO);
-    tso = q;
-    q = tso->link;
-    PUSH_ON_RUN_QUEUE(tso);
-    tso->blocked_on = NULL;
-    IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id));
+void awakenBlockedQueue(StgTSO *tso)
+{
+  while (tso != END_TSO_QUEUE) {
+    tso = unblockOne(tso);
   }
 }
 
@@ -627,16 +691,16 @@ unblockThread(StgTSO *tso)
 {
   StgTSO *t, **last;
 
-  if (tso->blocked_on == NULL) {
-    return;  /* not blocked */
-  }
+  switch (tso->why_blocked) {
 
-  switch (get_itbl(tso->blocked_on)->type) {
+  case NotBlocked:
+    return;  /* not blocked */
 
-  case MVAR:
+  case BlockedOnMVar:
+    ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
     {
       StgTSO *last_tso = END_TSO_QUEUE;
-      StgMVar *mvar = (StgMVar *)(tso->blocked_on);
+      StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
 
       last = &mvar->head;
       for (t = mvar->head; t != END_TSO_QUEUE; 
@@ -652,9 +716,10 @@ unblockThread(StgTSO *tso)
       barf("unblockThread (MVAR): TSO not found");
     }
 
-  case BLACKHOLE_BQ:
+  case BlockedOnBlackHole:
+    ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
     {
-      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->blocked_on);
+      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
 
       last = &bq->blocking_queue;
       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
@@ -667,13 +732,32 @@ unblockThread(StgTSO *tso)
       barf("unblockThread (BLACKHOLE): TSO not found");
     }
 
+  case BlockedOnDelay:
+  case BlockedOnRead:
+  case BlockedOnWrite:
+    {
+      last = &blocked_queue_hd;
+      for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
+          last = &t->link, t = t->link) {
+       if (t == tso) {
+         *last = tso->link;
+         if (blocked_queue_tl == t) {
+           blocked_queue_tl = tso->link;
+         }
+         goto done;
+       }
+      }
+      barf("unblockThread (I/O): TSO not found");
+    }
+
   default:
     barf("unblockThread");
   }
 
  done:
   tso->link = END_TSO_QUEUE;
-  tso->blocked_on = NULL;
+  tso->why_blocked = NotBlocked;
+  tso->block_info.closure = NULL;
   PUSH_ON_RUN_QUEUE(tso);
 }
 
@@ -755,7 +839,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        * handler in this frame.
        */
       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 1);
-      TICK_ALLOC_THK(2,0);
+      TICK_ALLOC_UPD_PAP(2,0);
       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
              
       ap->n_args = 1;
@@ -779,7 +863,6 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
      * fun field.
      */
     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
-    TICK_ALLOC_THK(words+1,0);
     
     ASSERT(words >= 0);
     
@@ -795,6 +878,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
     case UPDATE_FRAME:
       {
        SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
+       TICK_ALLOC_UP_THK(words+1,0);
        
        IF_DEBUG(scheduler,
                 fprintf(stderr,  "Updating ");
@@ -823,10 +907,11 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
         * layout's the same.
         */
        SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+       TICK_ALLOC_UPD_PAP(words+1,0);
        
        /* now build o = FUN(catch,ap,handler) */
        o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
-       TICK_ALLOC_THK(2,0);
+       TICK_ALLOC_FUN(2,0);
        SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
        o->payload[0] = (StgClosure *)ap;
        o->payload[1] = cf->handler;
@@ -849,10 +934,11 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        StgClosure* o;
        
        SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+       TICK_ALLOC_UPD_PAP(words+1,0);
        
        /* now build o = FUN(seq,ap) */
        o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
-       TICK_ALLOC_THK(1,0);
+       TICK_ALLOC_SE_THK(1,0);
        SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
        payloadCPtr(o,0) = (StgClosure *)ap;