1 package org.ibex.mail.protocol;
2 import org.ibex.mail.*;
3 import org.ibex.mail.target.*;
4 import org.ibex.util.*;
10 import javax.naming.directory.*;
12 public class SMTP extends MessageProtocol {
14 static { new Thread() { public void run() { Outgoing.runq(); } }.start(); }
15 public static String convdir = null;
16 public static void main(String[] s) throws Exception {
17 String logto = System.getProperty("ibex.mail.root", File.separatorChar + "var" + File.separatorChar + "org.ibex.mail");
18 logto += File.separatorChar + "log";
20 convdir = System.getProperty("ibex.mail.root", File.separatorChar + "var" + File.separatorChar + "org.ibex.mail");
21 convdir += File.separatorChar + "conversation";
22 new File(convdir).mkdirs();
23 final SMTP smtp = new SMTP();
24 int port = Integer.parseInt(System.getProperty("ibex.mail.port", "25"));
25 Log.info(SMTP.class, "binding to port " + port + "...");
26 ServerSocket ss = new ServerSocket(port);
27 Log.info(SMTP.class, "listening for connections...");
29 final Socket sock = ss.accept();
30 new Thread() { public void run() { smtp.handle(sock); } }.start();
34 //public SMTP() { setProtocolName("SMTP"); }
35 //public ServerRequest createRequest(Connection conn) { return new Listener((TcpConnection)conn); }
37 public void handle(Socket s) { new Listener(s, "megacz.com").handleRequest(); }
39 // FEATURE: exponential backoff on retry time?
40 public static class Outgoing {
41 private static final HashSet deadHosts = new HashSet();
42 private static final org.ibex.util.Queue queue = new org.ibex.util.Queue(100);
44 public static void send(Message m) throws IOException {
45 if (m.traces.length >= 100) {
46 Log.warn(SMTP.Outgoing.class,
47 "Message with " + m.traces.length + " trace hops; silently dropping\n" + m.summary());
50 synchronized(Outgoing.class) {
51 Mailbox.root.slash("outgoing").add(m);
53 Outgoing.class.notify();
57 // FIXME!!! ignores more than one destination envelope!!!!
58 private static boolean attempt(Message m) throws IOException {
59 InetAddress[] mx = getMailExchangerIPs(m.envelopeTo[0].host);
61 Log.warn(SMTP.Outgoing.class, "could not resolve " + m.envelopeTo[0].host + "; bouncing it\n" + m.summary());
62 send(m.bounce("could not resolve " + m.envelopeTo[0].host));
65 if (new Date().getTime() - m.arrival.getTime() > 1000 * 60 * 60 * 24 * 5) {
66 Log.warn(SMTP.Outgoing.class, "could not send message after 5 days; bouncing it\n" + m.summary());
67 send(m.bounce("could not send for 5 days"));
70 for(int i=0; i<mx.length; i++) {
71 if (deadHosts.contains(mx[i])) continue;
72 if (attempt(m, mx[i])) { return true; }
77 private static void check(String s) throws IOException {
78 if (s.startsWith("4") || s.startsWith("5")) throw new IOException("SMTP Error: " + s); }
79 private static boolean attempt(Message m, InetAddress mx) {
81 String vhost = InetAddress.getLocalHost().getHostName();
82 String cid = getConversation();
83 PrintWriter logf = new PrintWriter(new OutputStreamWriter(new FileOutputStream(convdir+File.separatorChar+cid)));
84 Log.setThreadAnnotation("[outgoing smtp: " + mx + " / " + cid + "] ");
85 Log.info(SMTP.Outgoing.class, "connecting...");
86 Socket s = new Socket(mx, 25);
87 Log.info(SMTP.Outgoing.class, "connected");
88 LineReader r = new LoggedLineReader(new InputStreamReader(s.getInputStream()), logf);
89 PrintWriter w = new LoggedPrintWriter(new OutputStreamWriter(s.getOutputStream()), logf);
90 check(r.readLine()); // banner
91 w.print("HELO " + vhost + "\r\n"); check(r.readLine());
92 w.print("MAIL FROM: " + m.envelopeFrom + "\r\n"); check(r.readLine());
93 w.print("RCPT TO: " + m.envelopeTo + "\r\n"); check(r.readLine());
94 w.print("DATA\r\n"); check(r.readLine());
98 Log.info(SMTP.Outgoing.class, "message accepted by " + mx);
99 Mailbox.root.slash("outgoing").delete(m);
102 } catch (Exception e) {
103 Log.warn(SMTP.Outgoing.class, "unable to send; error=" + e);
104 Log.warn(SMTP.Outgoing.class, e);
107 Log.setThreadAnnotation("[outgoing smtp] ");
113 Log.setThreadAnnotation("[outgoing smtp] ");
114 int[] outgoing = Mailbox.root.slash("outgoing").list();
115 Log.info(SMTP.Outgoing.class, "outgoing thread started; " + outgoing.length + " messages to send");
116 for(int i=0; i<outgoing.length; i++) queue.append(Mailbox.root.slash("outgoing").get(outgoing[i]));
118 int num = queue.size();
119 for(int i=0; i<num; i++) {
120 Message next = (Message)queue.remove(true);
121 boolean good = false;
123 good = attempt(next);
124 } catch (IOException e) {
125 Log.error(SMTP.Outgoing.class, e);
127 if (!good) queue.append(next);
130 synchronized(Outgoing.class) {
131 Log.info(SMTP.Outgoing.class, "outgoing thread going to sleep");
132 Outgoing.class.wait(10 * 60 * 1000);
134 Log.info(SMTP.Outgoing.class, "outgoing thread woke up; " + queue.size() + " messages in queue");
137 } catch (Exception e) {
138 Log.error(SMTP.Outgoing.class, e);
143 private static String lastTime = null;
144 private static int lastCounter = 0;
146 private class Listener extends Incoming /*implements ServerRequest*/ {
149 public void init() { }
150 public Listener(Socket conn, String vhost) { this.vhost = vhost; this.conn = conn; }
152 //TcpConnection conn;
153 //public Listener(TcpConnection conn) { this.conn = conn; conn.getSocket().setSoTimeout(5 * 60 * 1000); }
154 public boolean handleRequest() {
156 conn.setSoTimeout(5 * 60 * 1000);
157 StringBuffer logMessage = new StringBuffer();
158 String cid = getConversation();
159 Log.setThreadAnnotation("[conversation " + cid + "] ");
160 InetSocketAddress remote = (InetSocketAddress)conn.getRemoteSocketAddress();
161 Log.info(this, "connection from "+remote.getHostName()+":"+remote.getPort()+" ("+remote.getAddress()+")");
162 PrintWriter logf = new PrintWriter(new OutputStreamWriter(new FileOutputStream(convdir+File.separatorChar+cid)));
164 return handleRequest(new LoggedLineReader(new InputStreamReader(conn.getInputStream()), logf),
165 new LoggedPrintWriter(new OutputStreamWriter(conn.getOutputStream()), logf));
166 } catch(Throwable t) { Log.warn(this, t);
167 } finally { logf.close(); Log.setThreadAnnotation("");
169 } catch (Exception e) { Log.error(this, e); }
173 public boolean handleRequest(LineReader rs, PrintWriter ws) throws IOException, MailException {
174 //ReadStream rs = conn.getReadStream();
175 //WriteStream ws = conn.getWriteStream();
176 //ws.setNewLineString("\r\n");
177 ws.println("220 " + vhost + " ESMTP " + this.getClass().getName());
179 Vector to = new Vector();
181 String command = rs.readLine();
182 String c = command.toUpperCase();
183 if (c.startsWith("HELO")) {
184 ws.println("250 HELO " + vhost);
188 } else if (c.startsWith("EHLO")) {
189 ws.println("250-" + vhost);
190 ws.println("250-SIZE");
191 ws.println("250 PIPELINING");
195 } else if (c.startsWith("RSET")) {
198 ws.println("250 reset ok");
200 } else if (c.startsWith("MAIL FROM:")) {
201 command = command.substring(10).trim();
202 from = new Address(command);
203 ws.println("250 " + from + " is syntactically correct");
205 } else if (c.startsWith("RCPT TO:")) {
207 ws.println("503 MAIL FROM must precede RCPT TO");
210 command = command.substring(9).trim();
211 if(command.indexOf(' ') != -1) command = command.substring(0, command.indexOf(' '));
212 Address addr = new Address(command);
213 InetAddress[] mx = getMailExchangerIPs(addr.host);
215 if (((InetSocketAddress)conn.getRemoteSocketAddress()).getAddress().isLoopbackAddress()) {
216 ws.println("250 you are connected locally, so I will let you send");
218 boolean good = false;
219 for(int i=0; !good && i<mx.length; i++)
220 if (NetworkInterface.getByInetAddress(mx[i]) != null)
223 ws.println("551 sorry, " + addr + " is not on this machine");
226 ws.println("250 " + addr + " is on this machine; I will deliver it");
229 } else if (c.startsWith("DATA")) {
230 if (from == null) { ws.println("503 MAIL FROM command must precede DATA"); continue; }
231 if (to == null) { ws.println("503 RCPT TO command must precede DATA"); continue; }
232 ws.println("354 Enter message, ending with \".\" on a line by itself");
234 Address[] toArr = new Address[to.size()];
236 accept(new Message(from, toArr, new DotTerminatedLineReader(rs)));
237 ws.println("250 message accepted");
238 } catch (MailException.Malformed mfe) { ws.println("501 " + mfe.toString());
239 } catch (MailException.MailboxFull mbf) { ws.println("452 " + mbf);
240 } catch (IOException ioe) { ws.println("554 " + ioe.toString());
244 } else if (c.startsWith("HELP")) { ws.println("214 you are beyond help. see a trained professional.");
245 } else if (c.startsWith("VRFY")) { ws.println("252 We don't VRFY; proceed anyway");
246 } else if (c.startsWith("EXPN")) { ws.println("550 EXPN not available");
247 } else if (c.startsWith("NOOP")) { ws.println("250 OK");
248 } else if (c.startsWith("QUIT")) { ws.println("221 " + vhost + " closing connection"); break;
249 } else { ws.println("500 unrecognized command");
253 return false; // always tell resin to close the connection
257 private static class DotTerminatedLineReader extends LineReader {
258 private final LineReader r;
259 private boolean done = false;
260 public DotTerminatedLineReader(LineReader r) { super(null); this.r = r; }
261 public String readLine() throws IOException {
262 if (done) return null;
263 String s = r.readLine();
264 if (s.equals(".")) { done = true; return null; }
265 if (s.startsWith(".")) return s.substring(1);
270 public static InetAddress[] getMailExchangerIPs(String hostName) {
273 Hashtable env = new Hashtable();
274 env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
275 DirContext ictx = new InitialDirContext(env);
276 Attributes attrs = ictx.getAttributes(hostName, new String[] { "MX" });
277 Attribute attr = attrs.get("MX");
279 ret = new InetAddress[1];
281 ret[0] = InetAddress.getByName(hostName);
283 } catch (UnknownHostException uhe) {
284 Log.warn(SMTP.class, "no MX hosts or A record for " + hostName);
285 return new InetAddress[0];
288 ret = new InetAddress[attr.size()];
289 NamingEnumeration ne = attr.getAll();
290 for(int i=0; ne.hasMore(); i++) {
291 String mx = (String)ne.next();
292 // FIXME we should be sorting here
293 mx = mx.substring(mx.indexOf(" ") + 1);
294 if (mx.charAt(mx.length() - 1) == '.') mx = mx.substring(0, mx.length() - 1);
295 ret[i] = InetAddress.getByName(mx);
298 } catch (Exception e) {
299 Log.warn(SMTP.class, "couldn't find MX host for " + hostName + " due to");
300 Log.warn(SMTP.class, e);
301 return new InetAddress[0];
306 private static class LoggedLineReader extends LineReader {
308 public LoggedLineReader(Reader r, PrintWriter log) { super(r); this.log = log; }
309 public String readLine() throws IOException {
310 String s = super.readLine();
311 if (s != null) { log.println("C: " + s); log.flush(); }
316 private static class LoggedPrintWriter extends PrintWriter {
318 public LoggedPrintWriter(Writer w, PrintWriter log) { super(w); this.log = log; }
319 public void println(String s) {
320 log.println("S: " + s);
326 static String getConversation() {
327 String time = new SimpleDateFormat("yy.MMM.dd-hh:mm:ss").format(new Date());
328 synchronized (SMTP.class) {
329 if (lastTime != null && lastTime.equals(time)) {
330 time += "." + (++lastCounter);