added multiple outbound threads
authoradam <adam@megacz.com>
Sat, 4 Feb 2006 10:09:55 +0000 (10:09 +0000)
committeradam <adam@megacz.com>
Sat, 4 Feb 2006 10:09:55 +0000 (10:09 +0000)
darcs-hash:20060204100955-5007d-eedf6905aed9f38beb14022677575e54c72cce5f.gz

src/org/ibex/mail/protocol/SMTP.java

index 18d49db..c3be6a4 100644 (file)
@@ -15,21 +15,29 @@ import java.text.*;
 import javax.naming.*;
 import javax.naming.directory.*;
 
-// FIXME: bounce messages (must go to return-path unless empty, in which case do not send
-// FIXME: if more than 100 "Received" lines, must drop message
+// FIXME: better delivery cycle attempt algorithm; current one sucks
+// FIXME: logging: current logging sucks
+// FIXME: loop prevention
+
+// graylisting?
+
 // FEATURE: infer messageid, date, if not present (?)
+// FEATURE: exponential backoff on retry time?
 // FEATURE: RFC2822, section 4.5.1: special "postmaster" address
 // FEATURE: RFC2822, section 4.5.4.1: retry strategies
 // FEATURE: RFC2822, section 5, multiple MX records, preferences, ordering
-// FEATURE: exponential backoff on retry time?
 // FEATURE: RFC2822, end of 4.1.2: backslashes in headers
 public class SMTP {
 
     public static final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
+    public static final int numOutgoingThreads = 5;
     private static final Mailbox spool =
         FileBasedMailbox.getFileBasedMailbox(Mailbox.STORAGE_ROOT,false).slash("spool",true).slash("smtp",true);
 
-    static { new Thread() { public void run() { Outgoing.runq(); } }.start(); }
+    static {
+        for(int i=0; i<numOutgoingThreads; i++)
+            new Outgoing().start();
+    }
 
     public static void accept(Message m) throws IOException {
         if (!m.envelopeTo.isLocal()) Outgoing.accept(m);
@@ -65,6 +73,7 @@ public class SMTP {
             String remotehost = null;
             for(String command = conn.readln(); ; command = conn.readln()) try {
                 if (command == null) return;
+                Log.warn(conn.getRemoteAddress()+"", command);
                 String c = command.toUpperCase();
                 if (c.startsWith("HELO"))        {
                     remotehost = c.substring(5).trim();
@@ -72,7 +81,7 @@ public class SMTP {
                     from = null; to = new Vector();
                 } else if (c.startsWith("EHLO")) {
                     remotehost = c.substring(5).trim();
-                    conn.println("250");
+                    conn.println("250 "+conn.vhost+" greets " + remotehost);
                     ehlo = true;     
                     from = null; to = new Vector();
                 } else if (c.startsWith("RSET")) { conn.println("250 reset ok");           from = null; to = new Vector();
@@ -147,19 +156,25 @@ public class SMTP {
 
     // Outgoing Mail Thread //////////////////////////////////////////////////////////////////////////////
 
-    public static class Outgoing {
+    public static class Outgoing extends Thread {
 
         private static final HashSet deadHosts = new HashSet();
         public static void accept(Message m) throws IOException {
             if (m == null) { Log.warn(Outgoing.class, "attempted to accept(null)"); return; }
-            //Log.info(SMTP.class, "queued: " + m.summary());
-            /*
-            if (m.traces.length >= 100)
-                Log.warn(SMTP.Outgoing.class, "Message with " + m.traces.length + " trace hops; dropping\n" + m.summary());
-            */
-            else synchronized(Outgoing.class) {
+            String traces = m.headers.get("Received");
+            if (traces!=null) {
+                int lines = 0;
+                for(int i=0; i<traces.length(); i++)
+                    if (traces.charAt(i)=='\n' || traces.charAt(i)=='\r')
+                        lines++;
+                if (lines > 100) { // required by rfc
+                    Log.warn(SMTP.Outgoing.class, "Message with " + lines + " trace hops; dropping\n" + m.summary());
+                    return;
+                }
+            }
+            synchronized(Outgoing.class) {
                 spool.add(m);
-                Outgoing.class.notify();
+                Outgoing.class.notifyAll();
             }
         }
 
@@ -170,17 +185,15 @@ public class SMTP {
             }
             InetAddress[] mx = getMailExchangerIPs(m.envelopeTo.host);
             if (mx.length == 0) {
-                Log.warn(SMTP.Outgoing.class, "could not resolve " + m.envelopeTo.host + "; bouncing it\n" + m.summary());
                 accept(m.bounce("could not resolve " + m.envelopeTo.host));
                 return true;
             }
             if (new Date().getTime() - m.arrival.getTime() > 1000 * 60 * 60 * 24 * 5) {
-                Log.warn(SMTP.Outgoing.class, "could not send message after 5 days; bouncing it\n" + m.summary());
                 accept(m.bounce("could not send for 5 days"));
                 return true;
             }
             for(int i=0; i<mx.length; i++) {
-                if (deadHosts.contains(mx[i])) continue;
+                //if (deadHosts.contains(mx[i])) continue;
                 if (attempt(m, mx[i])) { return true; }
             }
             return false;
@@ -223,15 +236,12 @@ public class SMTP {
                 Stream stream = head.getStream();
                 for(String s = stream.readln(); s!=null; s=stream.readln()) {
                     if (s.startsWith(".")) conn.print(".");
-                    //Log.warn("***",s);
                     conn.println(s);
                 }
-                //Log.warn("***","");
                 conn.println("");
                 stream = m.getBody().getStream();
                 for(String s = stream.readln(); s!=null; s=stream.readln()) {
                     if (s.startsWith(".")) conn.print(".");
-                    //Log.warn("***",s);
                     conn.println(s);
                 }
                 conn.println(".");
@@ -270,33 +280,54 @@ public class SMTP {
             return accepted;
         }
 
-        static void runq() {
+        private static HashSet<Outgoing> threads = new HashSet<Outgoing>();
+        private static int serials = 1;
+        private int serial = serials++;
+        private Mailbox.Iterator it;
+
+        public Outgoing() {
+            synchronized(Outgoing.class) {
+                threads.add(this);
+            }
+        }
+
+        public void wake() {
+            int count = spool.count(Query.all());
+            Log.info(SMTP.Outgoing.class, "outgoing thread #"+serial+" woke up; " + count + " messages to send");
             try {
-                Log.setThreadAnnotation("[outgoing smtp] ");
-                Log.info(SMTP.Outgoing.class, "outgoing thread started; " + spool.count(Query.all()) + " messages to send");
                 while(true) {
-                    if (Thread.currentThread().isInterrupted()) throw new InterruptedException();
-                    for(Mailbox.Iterator it = spool.iterator(); it.next(); ) {
-                        try {
-                            if (Thread.currentThread().isInterrupted()) throw new InterruptedException();
-                            if (attempt(it.cur())) it.delete();
-                        } catch (Exception e)   {
-                            if (e instanceof InterruptedException) throw e;
-                            Log.error(SMTP.Outgoing.class, e);
-                        }
-                    }
+                    boolean good = false;
                     synchronized(Outgoing.class) {
-                        if (Thread.currentThread().isInterrupted()) throw new InterruptedException();
-                        Log.info(SMTP.Outgoing.class, "outgoing thread going to sleep");
-                        Outgoing.class.wait(5 * 60 * 1000);
-                        deadHosts.clear();
-                        Log.info(SMTP.Outgoing.class,"outgoing thread woke up; "+spool.count(Query.all())+" messages in queue");
+                        it = spool.iterator();
+                        OUTER: for(; it.next(); ) {
+                            for(Outgoing o : threads)
+                                if (o!=this && o.it != null && o.it.uid()==it.uid())
+                                    continue OUTER;
+                            good = true;
+                            break;
+                        }
                     }
+                    if (!good) break;
+                    if (attempt(it.cur())) it.delete();
                 }
             } catch (Exception e) {
-                Log.error(SMTP.Outgoing.class, "outgoing thread killed by exception: " + e);
+                //if (e instanceof InterruptedException) throw e;
                 Log.error(SMTP.Outgoing.class, e);
             }
+            Log.info(SMTP.Outgoing.class, "outgoing thread #"+serial+" going back to sleep");
+            it = null;
+        }
+
+        public void run() {
+            try {
+                while(true) {
+                    Log.setThreadAnnotation("[outgoing #"+serial+"] ");
+                    wake();
+                    synchronized(Outgoing.class) {
+                        Outgoing.class.wait(5 * 60 * 1000);
+                    }
+                }
+            } catch (InterruptedException e) { Log.warn(this, e); }
         }
     }