update CachedInputStream
authorcrawshaw <crawshaw@ibex.org>
Thu, 6 Jan 2005 16:05:21 +0000 (16:05 +0000)
committercrawshaw <crawshaw@ibex.org>
Thu, 6 Jan 2005 16:05:21 +0000 (16:05 +0000)
darcs-hash:20050106160521-2eb37-18d2965fb67912b78fb9d3ebbed3f9314a12a076.gz

src/org/ibex/util/CachedInputStream.java

index d1affa1..ac3ab97 100644 (file)
 // You may not use this file except in compliance with the License.
 
 package org.ibex.util;
+
 import java.io.*;
 
-// FEATURE: don't use a byte[] if we have a diskCache file
-/**
- *  Wraps around an InputStream, caching the stream in a byte[] as it
- *  is read and permitting multiple simultaneous readers
+/** Wraps around an InputStream, caching the stream in a byte[] as it
+ *  is read and permitting multiple simultaneous readers.
+ *
+ *  @author adam@ibex.org, crawshaw@ibex.org
  */
 public class CachedInputStream {
 
-    boolean filling = false;               ///< true iff some thread is blocked on us waiting for input
-    boolean eof = false;                   ///< true iff end of stream has been reached
-    byte[] cache = new byte[1024 * 128];
-    int size = 0;
-    final InputStream is;
-    File diskCache;
+    private final InputStream is;
+
+    private byte[] cache = new byte[128];
+    private File cacheFile;
+    private FileOutputStream out;
+
+    /** True iff end of stream has been reached. */
+    private boolean eof = false;
 
-    public CachedInputStream(InputStream is) { this(is, null); }
-    public CachedInputStream(InputStream is, File diskCache) {
+    /** Number of bytes currently cached. */
+    private long size = 0;
+
+    public CachedInputStream(InputStream is) {
         this.is = is;
-        this.diskCache = diskCache;
+        this.cacheFile = null;
+        this.out = null;
     }
-    public InputStream getInputStream() throws IOException {
-        if (diskCache != null && diskCache.exists()) return new FileInputStream(diskCache);
-        return new SubStream();
+    public CachedInputStream(InputStream is, File cacheFile) throws IOException {
+        this.is = is;
+        this.cacheFile = cacheFile;
+        this.out = new FileOutputStream(cacheFile);
     }
 
-    public void grow(int newLength) {
-        if (newLength < cache.length) return;
-        byte[] newCache = new byte[cache.length + 2 * (newLength - cache.length)];
-        System.arraycopy(cache, 0, newCache, 0, size);
-        cache = newCache;
-    }
+    public InputStream getInputStream() throws IOException { return new SubStream(); }
+
+    private synchronized int fill(int need) throws IOException {
+        int ret;
 
-    synchronized void fillCache(int howMuch) throws IOException {
-        if (filling) { try { wait(); } catch (InterruptedException e) { }; return; }
-        filling = true;
-        grow(size + howMuch);
-        int ret = is.read(cache, size, howMuch);
-        if (ret == -1) {
-            eof = true;
-            // FIXME: probably a race here
-            if (diskCache != null && !diskCache.exists())
-                try {
-                    File cacheFile = new File(diskCache + ".incomplete");
-                    FileOutputStream cacheFileStream = new FileOutputStream(cacheFile);
-                    cacheFileStream.write(cache, 0, size);
-                    cacheFileStream.close();
-                    cacheFile.renameTo(diskCache);
-                } catch (IOException e) {
-                    Log.info(this, "exception thrown while writing disk cache");
-                    Log.info(this, e);
-                }
+        if (out == null) {
+            if (size + need > cache.length) {
+                byte[] newCache = new byte[Math.max(cache.length * 2, (int)(size + need))];
+                System.arraycopy(cache, 0, newCache, 0, (int)size);
+                cache = newCache;
+            }
+            ret = is.read(cache, (int)size, need);
+        } else {
+            ret = is.read(cache, 0, Math.min(cache.length, need));
+            if (ret > 0) out.write(cache, 0, ret);
         }
+
+        if (ret == -1) { eof = true; if (out != null) { out.close(); out = null; } }
         else size += ret;
-        filling = false;
-        notifyAll();
+
+        return ret;
     }
 
-    private class SubStream extends InputStream implements KnownLength {
-        int pos = 0;
-        public int available() { return Math.max(0, size - pos); }
-        public long skip(long n) throws IOException { pos += (int)n; return n; }     // FEATURE: don't skip past EOF
-        public int getLength() { return eof ? size : is instanceof KnownLength ? ((KnownLength)is).getLength() : 0; }
-        public int read() throws IOException {                                       // FEATURE: be smarter here
-            byte[] b = new byte[1];
-            int ret = read(b, 0, 1);
-            return ret == -1 ? -1 : b[0]&0xff;
+    private final class SubStream extends InputStream {
+        private final InputStream fis;
+        private long pos = 0;
+        
+        private SubStream() throws IOException {
+            fis = cacheFile != null ? new FileInputStream(cacheFile) : null;
         }
+
+        public int available() { synchronized(CachedInputStream.this) { return (int)(size - pos); } }
+        public long skip(long n) throws IOException {
+            synchronized(CachedInputStream.this) {
+                int need = (int)(pos + n - size);
+                while (need > 0 && !eof) need -= fill(need);
+                n -= need;
+                if (fis != null) { long fisn = n; while(fisn > 0) fisn -= fis.skip(fisn); }
+                pos += n;
+                return n;
+            }
+        }
+        public long length() { synchronized(CachedInputStream.this) {
+                return eof ? size : is instanceof SubStream ? ((SubStream)is).length() : 0;
+        } }
+
+        private final byte[] single = new byte[1];
+        public int read() throws IOException {
+            int ret = read(single, 0, 1);
+            return ret == -1 ? -1 : single[0]&0xff;
+        }
+
         public int read(byte[] b, int off, int len) throws IOException {
             synchronized(CachedInputStream.this) {
-                while (pos >= size && !eof) fillCache(pos + len - size);
-                if (eof && pos == size) return -1;
-                int count = Math.min(size - pos, len);
-                System.arraycopy(cache, pos, b, off, count);
-                pos += count;
-                return count;
+                int i = (int)(pos - size) + len; while (i > 0 && !eof) i -= (int)fill(i);
+                if (i > 0) len -= i;
+
+                if (fis != null) {
+                    int fislen = len;
+                    while (fislen > 0) {
+                        int j = fis.read(b, off, fislen);
+                        if (j == -1) throw new IOException("less data retrieved from disk than written to it");
+                        off += j; fislen -= j;
+                    }
+                } else System.arraycopy(cache, (int)pos, b, off, len);
+
+                pos += len;
+                return len;
             }
         }
     }