X-Git-Url: http://git.megacz.com/?p=org.ibex.io.git;a=blobdiff_plain;f=src%2Forg%2Fibex%2Fio%2FStream.java;h=b31c9a6c88d0dcec589968b74fcf38db4224e325;hp=a1bf021786471e11089e18a3483ca6e56e8b7dbe;hb=0a8f7a78d5e62d0353b83c5fb7847d54ca285b3b;hpb=1ec9169c1ca9b270c6b94e0a8499b20eb9f59ba6 diff --git a/src/org/ibex/io/Stream.java b/src/org/ibex/io/Stream.java index a1bf021..b31c9a6 100644 --- a/src/org/ibex/io/Stream.java +++ b/src/org/ibex/io/Stream.java @@ -3,9 +3,9 @@ package org.ibex.io; import java.io.*; import java.net.*; +import java.util.*; import java.util.zip.*; import org.ibex.util.*; -import org.ibex.net.*; /** plays the role of InputStream, OutputStream, Reader and Writer, with logging and unchecked exceptions */ public class Stream { @@ -35,38 +35,98 @@ public class Stream { public static class Closed extends StreamException { public Closed(String s) { super(s); } } - public char peekc() { flush(); return in.getc(true); } - public char getc() { flush(); char ret = in.getc(false); log(ret); return ret; } - public String readln() { flush(); String s = in.readln(); log(s); log('\n'); return s; } + private static Hashtable blocker = new Hashtable(); + public static void kill(Thread thread) { + Stream block = (Stream)blocker.get(thread); + if (block == null) { + Log.warn(Stream.class, "thread " + thread + " is not blocked on a stream"); + } else { + Log.warn(Stream.class, "asynchronously closing " + block); + block.close(); + } + } + + public char peekc() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); return in.getc(true); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public char getc() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); char ret = in.getc(false); log(ret); return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public String readln() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); String s = in.readln(); log(s); log('\n'); return s; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + + public void print(String s) { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + logWrite(s); + out.write(s); + flush(); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public void println(String s) { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + logWrite(s); + out.write(s); + out.write(newLine); + flush(); + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } + public void flush() { + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + if (out != null) try { out.w.flush(); } catch(IOException e) { ioe(e); } + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } + } public int read(byte[] b, int off, int len) { - flush(); - int ret = in.readBytes(b, off, len); - if (log != null) log("\n[read " + ret + " bytes of binary data ]\n"); - nnl = false; - return ret; + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); + int ret = in.readBytes(b, off, len); + if (log != null) log("\n[read " + ret + " bytes of binary data ]\n"); + nnl = false; + return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } } public int read(char[] c, int off, int len) { - flush(); - int ret = in.read(c, off, len); - if (log != null && ret != -1) log(new String(c, off, ret)); - return ret; + Stream old = (Stream)blocker.get(Thread.currentThread()); + try { + blocker.put(Thread.currentThread(), this); + flush(); + int ret = in.read(c, off, len); + if (log != null && ret != -1) log(new String(c, off, ret)); + return ret; + } finally { if (old == null) blocker.remove(Thread.currentThread()); else blocker.put(Thread.currentThread(), old); } } public void unread(String s) { in.unread(s); } - public void println() { println(""); } - public void println(String s) { logWrite(s); out.write(s); out.write(newLine); } - - public void flush() { if (out != null) try { out.w.flush(); } catch(IOException e) { ioe(e); } } - public void close() { in.close(); out.close(); } + public void close() { try { in.close(); } finally { out.close(); } } public void setNewline(String s) { newLine = s; } /** dumps the connection log into a file */ public String dumpLog() { String ret = log.toString(); log = new StringBuffer(16 * 1024); return ret; } - private void log(String s) { if(log==null) return; if (!nnl) Log.note("\n[read ] "); Log.note(s + "\n"); nnl=false; } - private void logWrite(String s) { if(log==null) return; if (nnl) Log.note("\n"); Log.note("[write] "+s+"\n"); nnl=false; } - private void log(char c) { if(log==null) return; if (c == '\r') return; if (!nnl) Log.note("[read ] "); Log.note(c+""); nnl = c != '\n'; } + private void log(String s) { if(log==null) return; if (!nnl) Log.note("\n[read ] "); Log.note(s + "\n"); nnl=false; if (log != null) log.append(s); } + private void logWrite(String s) { if(log==null) return; if (nnl) Log.note("\n"); Log.note("[write] "+s+"\n"); nnl=false; if (log != null) log.append(s); } + private void log(char c) { if(log==null) return; if (c == '\r') return; if (!nnl) Log.note("[read ] "); Log.note(c+""); nnl = c != '\n'; if (log != null) log.append(c); } private boolean nnl = false; private static class Out extends BufferedOutputStream { @@ -115,7 +175,7 @@ public class Stream { int len = i-cstart; cstart = i+1; if (cbuf[begin] == '\r') { begin++; len--; } - if (len > 0 && cbuf[begin+len-1] == '\r') { len--; } + while (len > 0 && cbuf[begin+len-1] == '\r') { len--; } return new String(cbuf, begin, len); } ensurec(256); @@ -186,15 +246,16 @@ public class Stream { } finally { flushing = false; } } - Writer unreader = new OutputStreamWriter(new OutputStream() { - public void close() { } - public void write(int i) throws IOException { byte[] b = new byte[1]; b[0] = (byte)i; write(b, 0, 1); } - public void write(byte[] b) throws IOException { write(b, 0, b.length); } - public void write(byte[] b, int p, int l) { - ensureb2(l); - System.arraycopy(b, p, buf, start-l, l); - start -= l; - } - }); + Writer unreader = new OutputStreamWriter(new InOutputStream()); + private class InOutputStream extends OutputStream { + public void close() { } + public void write(int i) throws IOException { byte[] b = new byte[1]; b[0] = (byte)i; write(b, 0, 1); } + public void write(byte[] b) throws IOException { write(b, 0, b.length); } + public void write(byte[] b, int p, int l) { + ensureb2(l); + System.arraycopy(b, p, buf, start-l, l); + start -= l; + } + } } }