[project @ 2005-04-27 12:52:52 by simonmar]
[ghc-hetmet.git] / ghc / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #if defined(RTS_SUPPORTS_THREADS) /* to the end */
13 #include "RtsUtils.h"
14 #include "OSThreads.h"
15 #include "Task.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Capability.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 #define INIT_TASK_TABLE_SIZE 16
27
28 TaskInfo* taskTable;
29 static nat taskTableSize;
30
31 // maps OSThreadID to TaskInfo*
32 HashTable *taskHash;
33
34 nat taskCount;
35 static nat tasksRunning;
36 static nat workerCount;
37
38 #define DEFAULT_MAX_WORKERS 64
39 nat maxWorkers; // we won't create more workers than this
40
41 void
42 initTaskManager (void)
43 {
44     static int initialized = 0;
45
46     if (!initialized) {
47 #if defined(SMP)
48         taskTableSize = stg_max(INIT_TASK_TABLE_SIZE, 
49                                 RtsFlags.ParFlags.nNodes * 2);
50 #else
51         taskTableSize = INIT_TASK_TABLE_SIZE;
52 #endif
53         taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
54                                     "initTaskManager");
55     
56         taskCount = 0;
57         workerCount = 0;
58         tasksRunning = 0;
59
60         taskHash = allocHashTable();
61   
62         maxWorkers = DEFAULT_MAX_WORKERS;
63
64         initialized = 1;
65     }
66 }
67
68 static void
69 expandTaskTable (void)
70 {
71     nat i;
72
73     taskTableSize *= 2;
74     taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
75                                 "expandTaskTable");
76
77     /* Have to update the hash table now... */
78     for (i = 0; i < taskCount; i++) {
79         removeHashTable( taskHash, taskTable[i].id, NULL );
80         insertHashTable( taskHash, taskTable[i].id, &taskTable[i] );
81     }
82 }
83
84 void
85 stopTaskManager (void)
86 {
87     nat i;
88
89     IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
90     for (i = 1000; i > 0; i--) {
91         if (tasksRunning == 0) {
92             IF_DEBUG(scheduler, sched_belch("all tasks stopped"));
93             return;
94         }
95         IF_DEBUG(scheduler, sched_belch("yielding"));
96         prodWorker();
97         yieldThread();
98     }
99     IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning));
100
101     /* 
102        OLD CODE follows:
103     */
104 #if old_code
105     /* Send 'em all a SIGHUP.  That should shut 'em up. */
106     awaitDeath = taskCount==0 ? 0 : taskCount-1;
107     for (i = 0; i < taskCount; i++) {
108         /* don't cancel the thread running this piece of code. */
109         if ( taskTable[i].id != tid ) {
110             pthread_kill(taskTable[i].id,SIGTERM);
111         }
112     }
113     while (awaitDeath > 0) {
114         sched_yield();
115     }
116 #endif // old_code
117 }
118
119
120 rtsBool
121 startTasks (nat num, void (*taskStart)(void))
122 {
123     nat i; 
124     for (i = 0; i < num; i++) {
125         if (!startTask(taskStart)) {
126             return rtsFalse;
127         }
128     }
129     return rtsTrue;
130 }
131
132 static TaskInfo*
133 newTask (OSThreadId id, rtsBool is_worker)
134 {
135     long currentElapsedTime, currentUserTime, elapsedGCTime;
136     TaskInfo *task_info;
137
138     if (taskCount >= taskTableSize) {
139         expandTaskTable();
140     }
141     
142     insertHashTable( taskHash, id, &(taskTable[taskCount]) );
143     
144     stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
145     
146     task_info = &taskTable[taskCount];
147     
148     task_info->id = id;
149     task_info->is_worker = is_worker;
150     task_info->stopped = rtsFalse;
151     task_info->mut_time = 0.0;
152     task_info->mut_etime = 0.0;
153     task_info->gc_time = 0.0;
154     task_info->gc_etime = 0.0;
155     task_info->muttimestart = currentUserTime;
156     task_info->elapsedtimestart = currentElapsedTime;
157     
158     taskCount++;
159     workerCount++;
160     tasksRunning++;
161
162     IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
163     
164     return task_info;
165 }
166
167 rtsBool
168 startTask (void (*taskStart)(void))
169 {
170   int r;
171   OSThreadId tid;
172
173   r = createOSThread(&tid,taskStart);
174   if (r != 0) {
175     barf("startTask: Can't create new task");
176   }
177   newTask (tid, rtsTrue);
178   return rtsTrue;
179 }
180
181 TaskInfo *
182 threadIsTask (OSThreadId id)
183 {
184     TaskInfo *task_info;
185     
186     task_info = lookupHashTable(taskHash, id);
187     if (task_info != NULL) {
188         if (task_info->stopped) {
189             task_info->stopped = rtsFalse;
190         }
191         return task_info;
192     }
193
194     return newTask(id, rtsFalse);
195 }
196
197 TaskInfo *
198 taskOfId (OSThreadId id)
199 {
200     return lookupHashTable(taskHash, id);
201 }
202
203 void
204 taskStop (void)
205 {
206     OSThreadId id;
207     long currentElapsedTime, currentUserTime, elapsedGCTime;
208     TaskInfo *task_info;
209
210     id = osThreadId();
211     task_info = taskOfId(id);
212     if (task_info == NULL) {
213         debugBelch("taskStop: not a task");
214         return;
215     }
216     ASSERT(task_info->id == id);
217
218     stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
219     
220     task_info->mut_time = 
221         currentUserTime - task_info->muttimestart - task_info->gc_time;
222     task_info->mut_etime = 
223         currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
224
225     if (task_info->mut_time < 0.0)  { task_info->mut_time = 0.0;  }
226     if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
227
228     task_info->stopped = rtsTrue;
229     tasksRunning--;
230 }
231
232 void
233 resetTaskManagerAfterFork (void)
234 {
235     rts_n_waiting_tasks = 0;
236     taskCount = 0;
237 }
238
239 rtsBool
240 maybeStartNewWorker (void (*taskStart)(void))
241 {
242     /* 
243      * If more than one worker thread is known to be blocked waiting
244      * on thread_ready_cond, don't create a new one.
245      */
246     if ( rts_n_waiting_tasks > 0) {
247         IF_DEBUG(scheduler,sched_belch(
248                      "startTask: %d tasks waiting, not creating new one", 
249                      rts_n_waiting_tasks););
250         // the task will run as soon as a capability is available,
251         // so there's no need to wake it.
252         return rtsFalse;
253     }
254     
255     /* If the task limit has been reached, just return. */
256     if (maxWorkers > 0 && workerCount >= maxWorkers) {
257         IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
258         return rtsFalse;
259     }
260     
261     return startTask(taskStart);
262 }
263
264 #endif /* RTS_SUPPORTS_THREADS */