outgoing smtp
[org.ibex.mail.git] / src / org / ibex / mail / protocol / SMTP.java
1 package org.ibex.mail.protocol;
2 import org.ibex.mail.*;
3 import org.ibex.mail.target.*;
4 import org.ibex.util.*;
5 import java.net.*;
6 import java.io.*;
7 import java.util.*;
8 import java.text.*;
9 import javax.naming.*;
10 import javax.naming.directory.*;
11
12 public class SMTP extends MessageProtocol {
13
14     static { new Thread() { public void run() { Outgoing.runq(); } }.start(); }
15     public static String convdir = null;
16     public static void main(String[] s) throws Exception {
17         String logto = System.getProperty("ibex.mail.root", File.separatorChar + "var" + File.separatorChar + "org.ibex.mail");
18         logto += File.separatorChar + "log";
19         Log.file(logto);
20         convdir = System.getProperty("ibex.mail.root", File.separatorChar + "var" + File.separatorChar + "org.ibex.mail");
21         convdir += File.separatorChar + "conversation";
22         new File(convdir).mkdirs();
23         SMTP smtp = new SMTP();
24         int port = Integer.parseInt(System.getProperty("ibex.mail.port", "25"));
25         Log.info(SMTP.class, "binding to port " + port + "...");
26         ServerSocket ss = new ServerSocket(port);
27         Log.info(SMTP.class, "listening for connections...");
28         while(true) {
29             final Socket sock = ss.accept();
30             new Thread() { public void run() { smtp.handle(sock); } }.start();
31         }
32     }
33
34     //public SMTP() { setProtocolName("SMTP"); }
35     //public ServerRequest createRequest(Connection conn) { return new Listener((TcpConnection)conn); }
36
37     public void handle(Socket s) { new Listener(s, "megacz.com").handleRequest(); }
38
39     //  FEATURE: exponential backoff on retry time?
40     public static class Outgoing {
41         private static final HashSet deadHosts = new HashSet();
42         private static final org.ibex.util.Queue queue = new org.ibex.util.Queue(100);
43         private static FileSystem store = FileSystem.root.slash("outgoing");
44
45         public static void send(Message m) {
46             if (m.traces.length >= 100) {
47                 Log.warn("Message with " + m.traces.length + " trace hops; silently dropping\n" + m.summary());
48                 return;
49             }
50             synchronized(Outgoing.class) {
51                 store.add(m);
52                 queue.append(m);
53                 Outgoing.class.notify();
54             }
55         }
56
57         private static boolean attempt(Message m) {
58             InetAddress[] mx = getMailExchangerIPs(m.envelopeTo.host);
59             if (mx.length == 0) {
60                 Log.warn("could not resolve " + m.envelopeTo.host + "; bouncing it\n" + m.summary());
61                 send(m.bounce("could not resolve " + m.envelopeTo.host));
62                 return true;
63             }
64             if (new Date().getTime() - m.arrival.getTime() > 1000 * 60 * 60 * 24 * 5) {
65                 Log.warn("could not send message after 5 days; bouncing it\n" + m.summary());
66                 send(m.bounce("could not send for 5 days"));
67                 return true;
68             }
69             for(int i=0; i<mx.length; i++) {
70                 if (deadHosts.contains(mx[i])) continue;
71                 if (attempt(m, mx[i])) { queue.remove(m); return true; }
72             }
73             return false;
74         }
75
76         private static boolean attempt(Message m, InetAddress mx) {
77             try {
78                 String conversationId = getConversation();
79                 PrintWriter logf = new PrintWriter(new OutputStreamWriter(new FileOutputStream(convdir+File.separatorChar+cid)));
80                 Log.setThreadAnnotation("[outgoing smtp: " + mx + " / " + cid + "] ");
81                 Log.info(SMTP.Outgoing.class, "connecting...");
82                 Socket s = new Socket(mx, 25);
83                 Log.info(SMTP.Outgoing.class, "connected");
84                 LineReader  r = new LoggedLineReader(new InputStreamReader(conn.getInputStream()), logf);
85                 PrintWriter w = new LoggedPrintWriter(new OutputStreamWriter(conn.getOutputStream()), logf);
86                                                                   check(r.readLine());  // banner
87                 w.print("HELO " + vhost + "\r\n");                check(r.readLine());
88                 w.print("MAIL FROM: " + m.envelopeFrom + "\r\n"); check(r.readLine());
89                 w.print("RCPT TO: " + m.envelopeTo + "\r\n");     check(r.readLine());
90                 w.print("DATA\r\n");                              check(r.readLine());
91                 w.print(m.body);
92                 w.print(".\r\n");
93                 check(r.readLine());
94                 Log.info(SMTP.Outgoing.class, "message accepted by " + mx);
95                 m.delete();
96                 s.close();
97                 return true;
98             } catch (Exception e) {
99                 Log.warn(SMTP.Outgoing.class, "unable to send; error=" + e);
100                 Log.warn(SMTP.Outgoing.class, e);
101                 return false;
102             } finally {
103                 Log.setThreadAnnotation("[outgoing smtp] ");
104             }
105         }
106
107         static void runq() {
108             Log.setThreadAnnotation("[outgoing smtp] ");
109             int[] outgoing = store.list();
110             Log.info("outgoing thread started; " + outgoing.length + " messages to send");
111             for(int i=0; i<outgoing.length; i++) queue.append(store.get(outgoing[i]));
112             while(true) {
113                 int num = queue.size();
114                 for(int i=0; i<num; i++) {
115                     Message next = queue.remove(true);
116                     if (!attempt(next)) queue.append(next);
117                 }
118                 synchronized(Outgoing.class) {
119                     Log.info("outgoing thread going to sleep");
120                     Outgoing.class.wait(10 * 60 * 1000);
121                     deadHosts.clear();
122                     Log.info("outgoing thread woke up; " + queue.size() + " messages in queue");
123                 }
124             }
125         }
126     }
127
128     private static String lastTime = null;
129     private static int lastCounter = 0;
130
131     private class Listener extends Incoming /*implements ServerRequest*/ {
132         Socket conn;
133         String vhost;
134         public void init() { }
135         public Listener(Socket conn, String vhost) { this.vhost = vhost; this.conn = conn; }
136
137         //TcpConnection conn;
138         //public Listener(TcpConnection conn) { this.conn = conn; conn.getSocket().setSoTimeout(5 * 60 * 1000); }
139         public boolean handleRequest() {
140             try {
141                 conn.setSoTimeout(5 * 60 * 1000);
142                 StringBuffer logMessage = new StringBuffer();
143                 String conversationId = getConversation();
144                 Log.setThreadAnnotation("[conversation/" + conversationId + "] ");
145                 InetSocketAddress remote = (InetSocketAddress)conn.getRemoteSocketAddress();
146                 Log.info(this, "connection from " + remote.getHostName() + ":" + remote.getPort() +
147                          " (" + remote.getAddress() + ")");
148                 PrintWriter logf =
149                     new PrintWriter(new OutputStreamWriter(new FileOutputStream(convdir + File.separatorChar + conversationId)));
150                 try {
151                     return handleRequest(new LoggedLineReader(new InputStreamReader(conn.getInputStream()), logf),
152                                          new LoggedPrintWriter(new OutputStreamWriter(conn.getOutputStream()), logf));
153                 } catch(Throwable t) {
154                     Log.warn(this, t);
155                 } finally {
156                     logf.close();
157                     Log.setThreadAnnotation("");
158                 }
159             } catch (Exception e) {
160                 Log.error(this, e);
161             }
162             return false;
163         }
164         
165         static String getConversation() {
166             String time = new SimpleDateFormat("yy.MMM.dd-hh:mm:ss").format(new Date());
167             synchronized (SMTP.class) {
168                 if (lastTime != null && lastTime.equals(time)) {
169                     time += "." + (++lastCounter);
170                 } else {
171                     lastTime = time;
172                 }
173             }
174             return time;
175         }
176
177         private class LoggedLineReader extends LineReader {
178             PrintWriter log;
179             public LoggedLineReader(Reader r, PrintWriter log) { super(r); this.log = log; }
180             public String readLine() throws IOException {
181                 String s = super.readLine();
182                 if (s != null) { log.println("C: " + s); log.flush(); }
183                 return s;
184             }
185         }
186
187         private class LoggedPrintWriter extends PrintWriter {
188             PrintWriter log;
189             public LoggedPrintWriter(Writer w, PrintWriter log) { super(w); this.log = log; }
190             public void println(String s) {
191                 log.println("S: " + s);
192                 super.println(s);
193                 flush();
194             }
195         }
196
197         public boolean handleRequest(LineReader rs, PrintWriter ws) throws IOException, MailException {
198             //ReadStream rs = conn.getReadStream();
199             //WriteStream ws = conn.getWriteStream();
200             //ws.setNewLineString("\r\n");
201             ws.println("220 " + vhost + " ESMTP " + this.getClass().getName());
202             Address from = null;
203             Vector to = new Vector();
204             while(true) {
205                 String command = rs.readLine();
206                 String c = command.toUpperCase();
207                 if (c.startsWith("HELO")) {
208                     ws.println("250 HELO " + vhost);
209                     from = null;
210                     to = new Vector();
211
212                 } else if (c.startsWith("EHLO")) {
213                     ws.println("250-" + vhost);
214                     ws.println("250-SIZE");
215                     ws.println("250 PIPELINING");
216                     from = null;
217                     to = new Vector();
218
219                 } else if (c.startsWith("RSET")) {
220                     from = null;
221                     to = new Vector();
222                     ws.println("250 reset ok");
223
224                 } else if (c.startsWith("MAIL FROM:")) {
225                     command = command.substring(10).trim();
226                     from = new Address(command);
227                     ws.println("250 " + from + " is syntactically correct");
228
229                 } else if (c.startsWith("RCPT TO:")) {
230                     if (from == null) {
231                         ws.println("503 MAIL FROM must precede RCPT TO");
232                         continue;
233                     }
234                     command = command.substring(9).trim();
235                     if(command.indexOf(' ') != -1) command = command.substring(0, command.indexOf(' '));
236                     Address addr = new Address(command);
237                     // FIXME: 551 = no, i won't forward that
238                     to.addElement(addr);
239                     ws.println("250 " + addr + " is syntactically correct");
240
241                 } else if (c.startsWith("DATA")) {
242                     if (from == null) { ws.println("503 MAIL FROM command must precede DATA"); continue; }
243                     if (to == null) { ws.println("503 RCPT TO command must precede DATA"); continue; }
244                     ws.println("354 Enter message, ending with \".\" on a line by itself");
245                     StringBuffer data = new StringBuffer();
246                     // move this into the Message class
247                     boolean good = false;
248                     try {
249                         accept(new Message(new DotTerminatedLineReader(rs)));
250                     } catch (MailException.Malformed mfe) {
251                         ws.println("501 " + mfe.toString()); break;
252                     } catch (MailException.MailboxFull mbf) {
253                         ws.println("452 " + mbf);
254                     } catch (IOException ioe) {
255                         ws.println("554 " + ioe.toString()); break;
256                     }
257                     ws.println("250 message accepted"); break;
258                     
259                 } else if (c.startsWith("HELP")) { ws.println("214 you are beyond help.  see a trained professional.");
260                 } else if (c.startsWith("VRFY")) { ws.println("252 We don't VRFY; proceed anyway");
261                 } else if (c.startsWith("EXPN")) { ws.println("550 EXPN not available");
262                 } else if (c.startsWith("NOOP")) { ws.println("250 OK");
263                 } else if (c.startsWith("QUIT")) { ws.println("221 " + vhost + " closing connection"); break;
264                 } else                           { ws.println("500 unrecognized command");
265                 }                    
266             
267             }
268             return false; // always tell resin to close the connection
269         }
270     }
271
272     private static class DotTerminatedLineReader extends LineReader {
273         private final LineReader r;
274         private boolean done = false;
275         public DotTerminatedLineReader(LineReader r) { super(null); this.r = r; }
276         public String readLine() throws IOException {
277             if (done) return null;
278             String s = r.readLine();
279             if (s.equals(".")) { done = true; return null; }
280             if (s.startsWith(".")) return s.substring(1);
281             return s;
282         }
283     }
284
285     public InetAddress[] getMailExchangerIPs(String domainName) {
286         InetAddress ret;
287         try {
288             Hashtable env = new Hashtable();
289             env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
290             DirContext ictx = new InitialDirContext(env);
291             Attributes attrs = ictx.getAttributes(hostName, new String[] { "MX" });
292             Attribute attr = attrs.get("MX");
293             if (attr == null) {
294                 ret = new InetAddress[1];
295                 try {
296                     ret[0] = InetAddress.getByName(domainName);
297                     return ret;
298                 } catch (UnknownHostException uhe) {
299                     Log.warn(SMTP.class, "no MX hosts or A record for " + domainName);
300                     return new InetAddress[0];
301                 }
302             } else {
303                 ret = new InetAddress[attr.size()];
304                 NamingEnumeration ne = attr.getAll();
305                 for(int i=0; ne.hasMore(); i++) ret[i] = (InetAddress)ne.next();
306             }
307         } catch (Exception e) {
308             Log.warn(SMTP.class, "couldn't find MX host for " + domainName + " due to");
309             Log.warn(SMTP.class, e);
310             return new InetAddress[0];
311         }
312         return ret;
313     }
314
315
316 }