+ // FEATURE: Make sure fstat info is correct
+ // FEATURE: This could be faster if we did direct copies from each process' memory
+ // FEATURE: Check this logic one more time
+ public static class Pipe {
+ private final byte[] pipebuf = new byte[PIPE_BUF*4];
+ private int readPos;
+ private int writePos;
+
+ public final FD reader = new Reader();
+ public final FD writer = new Writer();
+
+ public class Reader extends FD {
+ protected FStat _fstat() { return new FStat(); }
+ public int read(byte[] buf, int off, int len) throws ErrnoException {
+ if(len == 0) return 0;
+ synchronized(Pipe.this) {
+ while(writePos != -1 && readPos == writePos) {
+ try { Pipe.this.wait(); } catch(InterruptedException e) { /* ignore */ }
+ }
+ if(writePos == -1) return 0; // eof
+ len = Math.min(len,writePos-readPos);
+ System.arraycopy(pipebuf,readPos,buf,off,len);
+ readPos += len;
+ if(readPos == writePos) Pipe.this.notify();
+ return len;
+ }
+ }
+ public void _close() { synchronized(Pipe.this) { readPos = -1; Pipe.this.notify(); } }
+ }
+
+ public class Writer extends FD {
+ protected FStat _fstat() { return new FStat(); }
+ public int write(byte[] buf, int off, int len) throws ErrnoException {
+ if(len == 0) return 0;
+ synchronized(Pipe.this) {
+ if(readPos == -1) throw new ErrnoException(EPIPE);
+ if(pipebuf.length - writePos < Math.min(len,PIPE_BUF)) {
+ // not enough space to atomicly write the data
+ while(readPos != -1 && readPos != writePos) {
+ try { Pipe.this.wait(); } catch(InterruptedException e) { /* ignore */ }
+ }
+ if(readPos == -1) throw new ErrnoException(EPIPE);
+ readPos = writePos = 0;
+ }
+ len = Math.min(len,pipebuf.length - writePos);
+ System.arraycopy(buf,off,pipebuf,writePos,len);
+ if(readPos == writePos) Pipe.this.notify();
+ writePos += len;
+ return len;
+ }
+ }
+ public void _close() { synchronized(Pipe.this) { writePos = -1; Pipe.this.notify(); } }