9ebb15a958ed0ce8fbef54e218848d8486478e31
[org.ibex.mail.git] / src / org / ibex / mail / protocol / SMTP.java
1 package org.ibex.mail.protocol;
2 import org.ibex.mail.*;
3 import org.ibex.mail.target.*;
4 import org.ibex.jinetd.Worker;
5 import org.ibex.util.*;
6 import org.ibex.io.*;
7 import org.ibex.net.*;
8 import java.net.*;
9 import java.io.*;
10 import java.util.*;
11 import java.text.*;
12 import javax.naming.*;
13 import javax.naming.directory.*;
14
15 // FEATURE: exponential backoff on retry time?
16 public class SMTP {
17
18     private static final Mailbox spool =
19         FileBasedMailbox.getFileBasedMailbox(Mailbox.STORAGE_ROOT,false).slash("spool",true).slash("smtp",true);
20
21     static { new Thread() { public void run() { Outgoing.runq(); } }.start(); }
22
23     // Server //////////////////////////////////////////////////////////////////////////////
24
25     public static class Server implements Worker {
26         public void handleRequest(Connection conn) {
27             Log.error(this, "accepted...");
28             conn.setTimeout(5 * 60 * 1000);
29             conn.println("220 " + conn.vhost + " SMTP " + this.getClass().getName());
30             Address from = null;
31             Vector to = new Vector();
32             for(String command = conn.readln(); ; command = conn.readln()) try {
33                 if (command == null) return;
34                 String c = command.toUpperCase();
35                 if (c.startsWith("HELO"))        { conn.println("250 HELO " + conn.vhost); from = null; to = new Vector();
36                 } else if (c.startsWith("EHLO")) { conn.println("250");                    from = null; to = new Vector();
37                 } else if (c.startsWith("RSET")) { conn.println("250 reset ok");           from = null; to = new Vector();
38                 } else if (c.startsWith("HELP")) { conn.println("214 you are beyond help.  see a trained professional.");
39                 } else if (c.startsWith("VRFY")) { conn.println("252 We don't VRFY; proceed anyway");
40                 } else if (c.startsWith("EXPN")) { conn.println("550 EXPN not available");
41                 } else if (c.startsWith("NOOP")) { conn.println("250 OK");
42                 } else if (c.startsWith("QUIT")) { conn.println("221 " + conn.vhost + " closing connection"); return;
43                 } else if (c.startsWith("MAIL FROM:")) {
44                     conn.println("250 " + (from = new Address(command.substring(10).trim())) + " is syntactically correct");
45                 } else if (c.startsWith("RCPT TO:")) {
46                     if (from == null) { conn.println("503 MAIL FROM must precede RCPT TO"); continue; }
47                     command = command.substring(8).trim();
48                     if(command.indexOf(' ') != -1) command = command.substring(0, command.indexOf(' '));
49                     Address addr = new Address(command);
50                     if (addr.isLocal()) conn.println("250 " + addr + " is on this machine; I will deliver it");
51                     else if (conn.getRemoteAddress().isLoopbackAddress())
52                         conn.println("250 you are connected locally, so I will let you send");
53                     else { conn.println("551 sorry, " + addr + " is not on this machine"); }
54                     to.addElement(addr);
55                 } else if (c.startsWith("DATA")) {
56                     if (from == null) { conn.println("503 MAIL FROM command must precede DATA"); continue; }
57                     if (to == null) { conn.println("503 RCPT TO command must precede DATA"); continue; }
58                     conn.println("354 Enter message, ending with \".\" on a line by itself");
59                     conn.flush();
60                     try {
61                         StringBuffer buf = new StringBuffer();
62                         while(true) {
63                             String s = conn.readln();
64                             if (s == null) throw new RuntimeException("connection closed");
65                             if (s.equals(".")) break;
66                             if (s.startsWith(".")) s = s.substring(1);
67                             buf.append(s + "\r\n");
68                         }
69                         String body = buf.toString();
70                         Message m = null;
71                         for(int i=0; i<to.size(); i++) {
72                             m = new Message(from, (Address)to.elementAt(i), new Stream(body));
73                             if (!m.envelopeTo.isLocal()) Outgoing.accept(m);
74                             else                         Target.root.accept(m);
75                         }
76                         if (m != null) Log.info(SMTP.class, "accepted message: " + m.summary());
77                         conn.println("250 message accepted");
78                         conn.flush();
79                         from = null; to = new Vector();
80                     } catch (MailException.Malformed mfe) {   conn.println("501 " + mfe.toString());
81                     } catch (MailException.MailboxFull mbf) { conn.println("452 " + mbf);
82                     } catch (IOException ioe) {               conn.println("554 " + ioe.toString());
83                     }
84                 } else                           { conn.println("500 unrecognized command"); }                    
85             } catch (Message.Malformed e) { conn.println("501 " + e.toString()); /* FIXME could be wrong code */ }
86         }
87     }
88
89
90     // Outgoing Mail Thread //////////////////////////////////////////////////////////////////////////////
91
92     public static class Outgoing {
93
94         private static final HashSet deadHosts = new HashSet();
95         public static void accept(Message m) throws IOException {
96             Log.info(SMTP.class, "queued:\n" + m.summary());
97             if (m.traces.length >= 100)
98                 Log.warn(SMTP.Outgoing.class, "Message with " + m.traces.length + " trace hops; dropping\n" + m.summary());
99             else synchronized(Outgoing.class) {
100                 spool.add(m);
101                 Outgoing.class.notify();
102             }
103         }
104
105         public static boolean attempt(Message m) throws IOException {
106             InetAddress[] mx = getMailExchangerIPs(m.envelopeTo.host);
107             if (mx.length == 0) {
108                 Log.warn(SMTP.Outgoing.class, "could not resolve " + m.envelopeTo.host + "; bouncing it\n" + m.summary());
109                 accept(m.bounce("could not resolve " + m.envelopeTo.host));
110                 return true;
111             }
112             if (new Date().getTime() - m.arrival.getTime() > 1000 * 60 * 60 * 24 * 5) {
113                 Log.warn(SMTP.Outgoing.class, "could not send message after 5 days; bouncing it\n" + m.summary());
114                 accept(m.bounce("could not send for 5 days"));
115                 return true;
116             }
117             for(int i=0; i<mx.length; i++) {
118                 if (deadHosts.contains(mx[i])) continue;
119                 if (attempt(m, mx[i])) { return true; }
120             }
121             return false;
122         }
123
124         private static void check(String s, Connection conn) {
125             while (s.charAt(3) == '-') s = conn.readln();
126             if (s.startsWith("4")||s.startsWith("5")) throw new MailException(s);
127         }
128         private static boolean attempt(final Message m, final InetAddress mx) {
129             boolean accepted = false;
130             Connection conn = null;
131             try {
132                 Log.info(SMTP.Outgoing.class, "connecting to " + mx + "...");
133                 conn = new Connection(new Socket(mx, 25), InetAddress.getLocalHost().getHostName());
134                 conn.setTimeout(60 * 1000);
135                 Log.info(SMTP.Outgoing.class, "connected");
136                 check(conn.readln(), conn);  // banner
137                 conn.println("HELO " + conn.vhost);            check(conn.readln(), conn);
138                 conn.println("MAIL FROM:<" + m.envelopeFrom.user + "@" + m.envelopeFrom.host+">");  check(conn.readln(), conn);
139                 conn.println("RCPT TO:<"   + m.envelopeTo.user + "@" + m.envelopeTo.host+">");      check(conn.readln(), conn);
140                 conn.println("DATA");                          check(conn.readln(), conn);
141                 conn.println(m.toString());
142                 conn.println(".");
143                 check(conn.readln(), conn);
144                 Log.info(SMTP.Outgoing.class, "success: message accepted by " + mx);
145                 accepted = true;
146                 conn.close();
147             } catch (Exception e) {
148                 if (accepted) return true;
149                 Log.warn(SMTP.Outgoing.class, "unable to send; error=" + e);
150                 Log.warn(SMTP.Outgoing.class, e);
151                 return false;
152             } finally {
153                 if (conn != null) conn.close();
154             }
155             return accepted;
156         }
157
158         static void runq() {
159             try {
160                 Log.setThreadAnnotation("[outgoing smtp] ");
161                 Log.info(SMTP.Outgoing.class, "outgoing thread started; " + spool.count(Query.all()) + " messages to send");
162                 while(true) {
163                     for(Mailbox.Iterator it = spool.iterator(); it.next(); ) {
164                         try                   { if (attempt(it.cur())) it.delete(); }
165                         catch (Exception e)   { Log.error(SMTP.Outgoing.class, e); }
166                     }
167                     synchronized(Outgoing.class) {
168                         Log.info(SMTP.Outgoing.class, "outgoing thread going to sleep");
169                         Outgoing.class.wait(5 * 60 * 1000);
170                         deadHosts.clear();
171                         Log.info(SMTP.Outgoing.class,"outgoing thread woke up; "+spool.count(Query.all())+" messages in queue");
172                     }
173                 }
174             } catch (Exception e) { Log.error(SMTP.Outgoing.class, e); }
175         }
176     }
177
178     public static InetAddress[] getMailExchangerIPs(String hostName) {
179         InetAddress[] ret;
180         try {
181             Hashtable env = new Hashtable();
182             env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
183             DirContext ictx = new InitialDirContext(env);
184             Attributes attrs = ictx.getAttributes(hostName, new String[] { "MX" });
185             Attribute attr = attrs.get("MX");
186             if (attr == null) {
187                 ret = new InetAddress[1];
188                 try {
189                     ret[0] = InetAddress.getByName(hostName);
190                     return ret;
191                 } catch (UnknownHostException uhe) {
192                     Log.warn(SMTP.class, "no MX hosts or A record for " + hostName);
193                     return new InetAddress[0];
194                 }
195             } else {
196                 ret = new InetAddress[attr.size()];
197                 NamingEnumeration ne = attr.getAll();
198                 for(int i=0; ne.hasMore(); i++) {
199                     String mx = (String)ne.next();
200                     // FIXME we should be sorting here
201                     mx = mx.substring(mx.indexOf(" ") + 1);
202                     if (mx.charAt(mx.length() - 1) == '.') mx = mx.substring(0, mx.length() - 1);
203                     ret[i] = InetAddress.getByName(mx);
204                 }
205             }
206         } catch (Exception e) {
207             Log.warn(SMTP.class, "couldn't find MX host for " + hostName + " due to");
208             Log.warn(SMTP.class, e);
209             return new InetAddress[0];
210         }
211         return ret;
212     }
213 }