/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.41 2000/01/13 14:34:05 hwloidl Exp $
+ * $Id: Schedule.c,v 1.42 2000/01/14 11:45:21 hwloidl Exp $
*
* (c) The GHC Team, 1998-1999
*
/* uses more flexible routine in GranSim */
insertThread(tso, CurrentProc);
#else
- add_to_run_queue(tso);
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ from its creation
+ */
#endif
#if defined(GRAN)
// ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE
#if defined(GRAN)
-# error FixME
+static inline void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+}
#elif defined(PAR)
static inline void
unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
#endif
#if defined(GRAN)
-# error FixME
+static StgBlockingQueueElement *
+unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+ StgBlockingQueueElement *next;
+ PEs node_loc, tso_loc;
+
+ node_loc = where_is(node); // should be lifted out of loop
+ tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ tso_loc = where_is(tso);
+ if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
+ /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
+ ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
+ bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
+ // insertThread(tso, node_loc);
+ new_event(tso_loc, tso_loc,
+ CurrentTime[CurrentProc]+bq_processing_time,
+ ResumeThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ // len_local++;
+ // len++;
+ } else { // TSO is remote (actually should be FMBQ)
+ bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
+ bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
+ new_event(tso_loc, CurrentProc,
+ CurrentTime[CurrentProc]+bq_processing_time+
+ RtsFlags.GranFlags.Costs.latency,
+ UnblockThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
+ // len++;
+ }
+ /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
+ IF_GRAN_DEBUG(bq,
+ fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
+ (node_loc==tso_loc ? "Local" : "Global"),
+ tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
+ tso->blocked_on = NULL;
+ IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
+ tso->id, tso));
+ }
+
+ /* if this is the BQ of an RBH, we have to put back the info ripped out of
+ the closure to make room for the anchor of the BQ */
+ if (next!=END_BQ_QUEUE) {
+ ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
+ /*
+ ASSERT((info_ptr==&RBH_Save_0_info) ||
+ (info_ptr==&RBH_Save_1_info) ||
+ (info_ptr==&RBH_Save_2_info));
+ */
+ /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
+ ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
+ ((StgRBH *)node)->mut_link = ((StgRBHSave *)next)->payload[1];
+
+ IF_GRAN_DEBUG(bq,
+ belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
+ node, info_type(node)));
+ }
+}
#elif defined(PAR)
static StgBlockingQueueElement *
unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
#endif
#if defined(GRAN)
-# error FixME
+inline StgTSO *
+unblockOne(StgTSO *tso, StgClosure *node)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+ tso = unblockOneLocked(tso, node);
+ RELEASE_LOCK(&sched_mutex);
+ return tso;
+}
#elif defined(PAR)
inline StgTSO *
unblockOne(StgTSO *tso, StgClosure *node)
#endif
#if defined(GRAN)
-# error FixME
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+ StgBlockingQueueElement *bqe, *next;
+ StgTSO *tso;
+ PEs node_loc, tso_loc;
+ rtsTime bq_processing_time = 0;
+ nat len = 0, len_local = 0;
+
+ IF_GRAN_DEBUG(bq,
+ belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
+ node, CurrentProc, CurrentTime[CurrentProc],
+ CurrentTSO->id, CurrentTSO));
+
+ node_loc = where_is(node);
+
+ ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
+ get_itbl(q)->type == CONSTR); // closure (type constructor)
+ ASSERT(is_unique(node));
+
+ /* FAKE FETCH: magically copy the node to the tso's proc;
+ no Fetch necessary because in reality the node should not have been
+ moved to the other PE in the first place
+ */
+ if (CurrentProc!=node_loc) {
+ IF_GRAN_DEBUG(bq,
+ belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
+ node, node_loc, CurrentProc, CurrentTSO->id,
+ // CurrentTSO, where_is(CurrentTSO),
+ node->header.gran.procs));
+ node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
+ IF_GRAN_DEBUG(bq,
+ belch("## new bitmask of node %p is %#x",
+ node, node->header.gran.procs));
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_fake_fetches++;
+ }
+ }
+
+ bqe = q;
+ // ToDo: check: ASSERT(CurrentProc==node_loc);
+ while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
+ //next = bqe->link;
+ /*
+ bqe points to the current element in the queue
+ next points to the next element in the queue
+ */
+ //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ //tso_loc = where_is(tso);
+ bqe = unblockOneLocked(bqe, node);
+ }
+
+ /* statistics gathering */
+ /* ToDo: fix counters
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_bq_processing_time += bq_processing_time;
+ globalGranStats.tot_bq_len += len; // total length of all bqs awakened
+ globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
+ globalGranStats.tot_awbq++; // total no. of bqs awakened
+ }
+ IF_GRAN_DEBUG(bq,
+ fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
+ node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
+ */
+}
#elif defined(PAR)
void
awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)