Merge marina project in subdirectory marina/
[fleet.git] / src / edu / berkeley / fleet / dataflow / DoneNode.java
1 package edu.berkeley.fleet.dataflow;
2 import java.util.*;
3 import edu.berkeley.fleet.loops.*;
4 import edu.berkeley.fleet.api.*;
5 import edu.berkeley.fleet.api.Instruction.Set.FlagFunction;
6
7 /**
8  *  This class contains all of the shutdown and cleanup logic.
9  */
10 public class DoneNode extends Node {
11
12     private final Ship    counter = dfg.pool.allocateShip("Counter");
13     private final Program program;
14     private       CodeBag shutdownCodeBag = null;
15
16     public  final InPort  in       = new InPort("in") {
17             private Dock dock = counter.getDock("out");
18             public void recvToken(LoopFactory lf) { }
19             public void sendWord(LoopFactory lf) { lf.sendToken(dock); }
20             public int getInflight() { return 1; }
21             public int reset(CodeBag ctx, int phase, Destination ackDestination) { return 0; }
22             public void build(CodeBag ctx) { build(ctx, ctx.loopFactory(dock, 1)); }
23             protected void build(CodeBag ctx, LoopFactory lf) {
24                 lf.recvToken();
25                 lf.literal(getShutdownCodeBag());
26                 lf.sendWord(program.getCBDDestination());
27             }
28         };
29
30     public DoneNode(DataFlowGraph dfg, Program program) {
31         super(dfg);
32         this.program = program;
33     }
34
35     public Destination getDestinationToSendNextCodeBagDescriptorTo() {
36         return counter.getDock("in1").getDataDestination();
37     }
38
39     private static final int PHASES = 4;
40
41     private CodeBag getShutdownCodeBag() {
42         if (shutdownCodeBag!=null) return shutdownCodeBag;
43
44         CodeBag[] cb = new CodeBag[PHASES];
45         for(int phase=0; phase<PHASES; phase++)
46             cb[phase] = new CodeBag(dfg.fleet, program);
47
48         for(int phase=0; phase<PHASES; phase++) {
49             // ctx_counter sets up the counter ship which will tally
50             // the acknowledgement tokens coming back from the
51             // torpedoed docks.
52             CodeBag ctx_counter = cb[phase];
53
54             // ctx_reset is the context holding the reset code (not including torpedoes) for this phase
55             CodeBag ctx_reset = new CodeBag(dfg.fleet, program);
56
57             int expected_tokens = 0;
58             for(Node node : dfg)
59                 expected_tokens += node.reset(ctx_reset, phase, counter.getDock("in2").getDataDestination());
60             if (expected_tokens==0) throw new RuntimeException("should not happen");
61
62             // The loops below use a count of zero and an
63             // unconditional abort instead of a count of 1.  This is
64             // to ensure that the loop will not start executing until
65             // all instructions are safely in the instruction ring.
66             // That in turn ensures that the "ready" tokens are not
67             // sent until the counter is truly prepared to consume a
68             // deluge of tokens.
69
70             LoopFactory lf;
71             lf = ctx_counter.loopFactory(counter.getDock("inOp"), 0);
72             lf.sendToken(counter.getDock("out")); // ready->
73             lf.literal("DROP_C1_V2");
74             lf.deliver();
75             lf.literal("PASS_C1_V1");
76             lf.deliver();
77             lf.abort();
78
79             lf = ctx_counter.loopFactory(counter.getDock("in1"), 0);
80             lf.sendToken(counter.getDock("out")); // ready->
81             lf.literal(expected_tokens);
82             lf.deliver();
83             lf.literal(1);
84             lf.deliver();
85             if (phase<PHASES-1) lf.literal(cb[phase+1]);
86             else lf.recvWord();
87             lf.deliver();
88             lf.abort();
89
90             lf = ctx_counter.loopFactory(counter.getDock("in2"), 1);
91             lf.setFlags(FlagFunction.ZERO, FlagFunction.ZERO);
92             lf = lf.makeNext(0);
93             lf.setPredicate(Predicate.FlagD);
94             lf.sendToken(counter.getDock("out"));
95             lf.setPredicate(null);
96             lf.setPredicate(Predicate.NotFlagA);
97             lf.sendToken(counter.getDock("out")); // ready->
98             lf.setPredicate(null);
99             lf.setFlags(FlagFunction.ONE, FlagFunction.ZERO);
100             lf.abortLoopIfTorpedoPresent();
101             lf.recvWord();
102             lf.deliver();
103
104             lf = ctx_counter.loopFactory(counter.getDock("out"), 1);
105             lf.setFlags(FlagFunction.ZERO, FlagFunction.ZERO);
106             lf.literal(ctx_reset);
107             lf = lf.makeNext(0);
108             lf.setPredicate(Predicate.NotFlagA);
109             lf.recvToken();
110             lf.recvToken();
111             lf.recvToken();
112             lf.sendWord(program.getCBDDestination());
113             lf.setFlags(FlagFunction.ONE, FlagFunction.ZERO);
114             lf.setPredicate(null);
115             lf.collectWord();
116             lf.sendTorpedo(counter.getDock("in2"));
117             lf.recvToken();
118             lf.sendWord(program.getCBDDestination());
119             lf.abort();
120
121             cb[phase].seal();
122             ctx_reset.seal();
123         }
124         return (shutdownCodeBag = cb[0]);
125     }
126
127     // Reset //////////////////////////////////////////////////////////////////////////////
128
129     static int doReset(CodeBag ctx,
130                        int phase,
131                        Dock dock,
132                        Port self,
133                        Port peer,
134                        Destination ackDestination,
135                        boolean peerUsed) {
136         int ret = 0;
137
138         switch(phase) {
139
140             // Phase 0: torpedo every output dock, put it in
141             // collecting mode.  Cannot combine with phase 1,
142             // because until output docks are in vacuum mode we
143             // cannot be sure that the tokens to the input docks
144             // will eventually succeed.  This may cause the
145             // instructions sent after the tokens to back up into
146             // the switch fabric.
147             case 0: {
148                 if (!dock.isInputDock()) {
149                     ctx.sendTorpedo(dock);
150                     LoopFactory lf = ctx.loopFactory(dock, 1);
151                     lf.sendToken(ackDestination);
152                     lf = lf.makeNext(0);
153                     lf.abortLoopIfTorpedoPresent();
154                     lf.collectWord();
155                     ret++;
156                 }
157                 break;
158             }
159
160                 // [wait for all output docks to confirm that they've been torpedoed]
161                 // Phase 1: torpedo every input dock (causing them to flush), put it in loopback mode
162             case 1: {
163                 if (dock.isInputDock()) {
164                     ctx.sendTorpedo(dock);
165                     LoopFactory lf = ctx.loopFactory(dock, 1);
166                     lf.sendToken(ackDestination);
167
168                     // FIXME: this won't work right for ports that
169                     // get "shared" by two senders (for example,
170                     // inAddrRead1/2)
171
172                     if (peerUsed && peer!=null) {
173                         lf = lf.makeNext(0);
174                         lf.abortLoopIfTorpedoPresent();
175                         ((OutPort)peer).recvWord(lf);
176                         ((OutPort)peer).sendToken(lf);
177                     }
178                     ret++;
179                 }
180                 break;
181             }
182
183                 // [wait for all input docks to confirm that they've been torpedoed and have flushed]
184                 // Phase 2: torpedo every output dock, have it absorb tokens
185             case 2: {
186                 if (!dock.isInputDock()) {
187                     ctx.sendTorpedo(dock);
188                     LoopFactory lf = ctx.loopFactory(dock, 1);
189                     for(int i=0; i<((OutPort)self).getTokensToAbsorb(); i++)
190                         lf.recvToken();
191                     lf.sendToken(ackDestination);
192                     ret++;
193                 }
194                 break;
195             }
196
197                 // [wait for all output docks to confirm that they've absorbed enough tokens]
198                 // Phase 3: torpedo every input dock, await confirmation, and we're done
199             case 3: {
200                 if (dock.isInputDock()) {
201                     if (peerUsed && peer!=null) {
202                         ctx.sendTorpedo(dock);
203                     }
204                     LoopFactory lf = ctx.loopFactory(dock, 1);
205                     lf.sendToken(ackDestination);
206                     ret++;
207                 }
208                 break;
209             }
210         }
211         return ret;
212     }
213 }