From 64cd1d56deb8a25285b8edd51b2f78b54fae66d3 Mon Sep 17 00:00:00 2001 From: adam Date: Sat, 30 Oct 2004 22:52:34 +0000 Subject: [PATCH] experimental thread-killing capabilities in Stream darcs-hash:20041030225234-5007d-7f2796ced341a0fa70199bd9dbf3452397311999.gz --- src/org/ibex/io/Stream.java | 100 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/src/org/ibex/io/Stream.java b/src/org/ibex/io/Stream.java index 51fe9f6..83a8193 100644 --- a/src/org/ibex/io/Stream.java +++ b/src/org/ibex/io/Stream.java @@ -3,6 +3,7 @@ package org.ibex.io; import java.io.*; import java.net.*; +import java.util.*; import java.util.zip.*; import org.ibex.util.*; @@ -34,39 +35,98 @@ public class Stream { public static class Closed extends StreamException { public Closed(String s) { super(s); } } - public char peekc() { flush(); return in.getc(true); } - public char getc() { flush(); char ret = in.getc(false); log(ret); return ret; } - public String readln() { flush(); String s = in.readln(); log(s); log('\n'); return s; } + private static Hashtable blocker = new Hashtable(); + public static void kill(Thread thread) { + Stream block = (Stream)blocker.get(thread); + if (block == null) { + Log.warn(Stream.class, "thread " + thread + " is not blocked on a stream"); + } else { + Log.warn(Stream.class, "asynchronously closing " + block); + block.close(); + } + } + + public char peekc() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); return in.getc(true); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public char getc() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); char ret = in.getc(false); log(ret); return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public String readln() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); String s = in.readln(); log(s); log('\n'); return s; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + + public void print(String s) { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + logWrite(s); + out.write(s); + flush(); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public void println(String s) { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + logWrite(s); + out.write(s); + out.write(newLine); + flush(); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public void flush() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + if (out != null) try { out.w.flush(); } catch(IOException e) { ioe(e); } + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } public int read(byte[] b, int off, int len) { - flush(); - int ret = in.readBytes(b, off, len); - if (log != null) log("\n[read " + ret + " bytes of binary data ]\n"); - nnl = false; - return ret; + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); + int ret = in.readBytes(b, off, len); + if (log != null) log("\n[read " + ret + " bytes of binary data ]\n"); + nnl = false; + return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } } public int read(char[] c, int off, int len) { - flush(); - int ret = in.read(c, off, len); - if (log != null && ret != -1) log(new String(c, off, ret)); - return ret; + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); + int ret = in.read(c, off, len); + if (log != null && ret != -1) log(new String(c, off, ret)); + return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } } public void unread(String s) { in.unread(s); } - public void println() { println(""); } - public void print(String s) { logWrite(s); out.write(s); flush(); } - public void println(String s) { logWrite(s); out.write(s); out.write(newLine); flush(); } - - public void flush() { if (out != null) try { out.w.flush(); } catch(IOException e) { ioe(e); } } public void close() { try { in.close(); } finally { out.close(); } } public void setNewline(String s) { newLine = s; } /** dumps the connection log into a file */ public String dumpLog() { String ret = log.toString(); log = new StringBuffer(16 * 1024); return ret; } - private void log(String s) { if(log==null) return; if (!nnl) Log.note("\n[read ] "); Log.note(s + "\n"); nnl=false; } - private void logWrite(String s) { if(log==null) return; if (nnl) Log.note("\n"); Log.note("[write] "+s+"\n"); nnl=false; } - private void log(char c) { if(log==null) return; if (c == '\r') return; if (!nnl) Log.note("[read ] "); Log.note(c+""); nnl = c != '\n'; } + private void log(String s) { if(log==null) return; if (!nnl) Log.note("\n[read ] "); Log.note(s + "\n"); nnl=false; if (log != null) log.append(s); } + private void logWrite(String s) { if(log==null) return; if (nnl) Log.note("\n"); Log.note("[write] "+s+"\n"); nnl=false; if (log != null) log.append(s); } + private void log(char c) { if(log==null) return; if (c == '\r') return; if (!nnl) Log.note("[read ] "); Log.note(c+""); nnl = c != '\n'; if (log != null) log.append(c); } private boolean nnl = false; private static class Out extends BufferedOutputStream { -- 1.7.10.4