Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / Task.c
index 98f083c..f26785a 100644 (file)
@@ -24,7 +24,7 @@
 #endif
 
 // Task lists and global counters.
-// Locks required: sched_mutex.
+// Locks required: all_tasks_mutex.
 Task *all_tasks = NULL;
 static nat taskCount;
 static int tasksInitialized = 0;
@@ -33,6 +33,10 @@ static void   freeTask  (Task *task);
 static Task * allocTask (void);
 static Task * newTask   (rtsBool);
 
+#if defined(THREADED_RTS)
+static Mutex all_tasks_mutex;
+#endif
+
 /* -----------------------------------------------------------------------------
  * Remembering the current thread's Task
  * -------------------------------------------------------------------------- */
@@ -59,9 +63,12 @@ initTaskManager (void)
     if (!tasksInitialized) {
        taskCount = 0;
        tasksInitialized = 1;
-#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
+#if defined(THREADED_RTS)
+#if !defined(MYTASK_USE_TLV)
        newThreadLocalKey(&currentTaskKey);
 #endif
+        initMutex(&all_tasks_mutex);
+#endif
     }
 }
 
@@ -71,7 +78,7 @@ freeTaskManager (void)
     Task *task, *next;
     nat tasksRunning = 0;
 
-    ASSERT_LOCK_HELD(&sched_mutex);
+    ACQUIRE_LOCK(&all_tasks_mutex);
 
     for (task = all_tasks; task != NULL; task = next) {
         next = task->all_link;
@@ -86,7 +93,11 @@ freeTaskManager (void)
                tasksRunning);
 
     all_tasks = NULL;
+
+    RELEASE_LOCK(&all_tasks_mutex);
+
 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
+    closeMutex(&all_tasks_mutex); 
     freeThreadLocalKey(&currentTaskKey);
 #endif
 
@@ -154,8 +165,6 @@ newTask (rtsBool worker)
     task->worker        = worker;
     task->stopped       = rtsFalse;
     task->running_finalizers = rtsFalse;
-    task->stat          = NoStatus;
-    task->ret           = NULL;
     task->n_spare_incalls = 0;
     task->spare_incalls = NULL;
     task->incall        = NULL;
@@ -179,14 +188,14 @@ newTask (rtsBool worker)
 
     task->next = NULL;
 
-    ACQUIRE_LOCK(&sched_mutex);
+    ACQUIRE_LOCK(&all_tasks_mutex);
 
     task->all_link = all_tasks;
     all_tasks = task;
 
     taskCount++;
 
-    RELEASE_LOCK(&sched_mutex);
+    RELEASE_LOCK(&all_tasks_mutex);
 
     return task;
 }
@@ -211,6 +220,8 @@ newInCall (Task *task)
     incall->task = task;
     incall->suspended_tso = NULL;
     incall->suspended_cap = NULL;
+    incall->stat          = NoStatus;
+    incall->ret           = NULL;
     incall->next = NULL;
     incall->prev = NULL;
     incall->prev_stack = task->incall;
@@ -284,7 +295,7 @@ discardTasksExcept (Task *keep)
     Task *task, *next;
 
     // Wipe the task list, except the current Task.
-    ACQUIRE_LOCK(&sched_mutex);
+    ACQUIRE_LOCK(&all_tasks_mutex);
     for (task = all_tasks; task != NULL; task=next) {
         next = task->all_link;
         if (task != keep) {
@@ -294,7 +305,7 @@ discardTasksExcept (Task *keep)
     }
     all_tasks = keep;
     keep->all_link = NULL;
-    RELEASE_LOCK(&sched_mutex);
+    RELEASE_LOCK(&all_tasks_mutex);
 }
 
 void
@@ -398,6 +409,15 @@ startWorkerTask (Capability *cap)
   RELEASE_LOCK(&task->lock);
 }
 
+void
+interruptWorkerTask (Task *task)
+{
+  ASSERT(osThreadId() != task->id);    // seppuku not allowed
+  ASSERT(task->incall->suspended_tso); // use this only for FFI calls
+  interruptOSThread(task->id);
+  debugTrace(DEBUG_sched, "interrupted worker task %lu", task->id);
+}
+
 #endif /* THREADED_RTS */
 
 #ifdef DEBUG