1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.bitrepository.client.conversation;
26
27 import java.util.Collection;
28 import java.util.Timer;
29 import java.util.TimerTask;
30
31 import org.bitrepository.bitrepositorymessages.Message;
32 import org.bitrepository.bitrepositorymessages.MessageRequest;
33 import org.bitrepository.bitrepositorymessages.MessageResponse;
34 import org.bitrepository.client.conversation.selector.ContributorResponseStatus;
35 import org.bitrepository.client.exceptions.UnexpectedResponseException;
36 import org.bitrepository.common.exceptions.UnableToFinishException;
37 import org.bitrepository.protocol.ProtocolVersionLoader;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45 public abstract class GeneralConversationState implements ConversationState {
46 private final Logger log = LoggerFactory.getLogger(getClass());
47
48 private static final Boolean TIMER_IS_DAEMON = true;
49
50 private final Timer timer;
51
52 private final ContributorResponseStatus responseStatus;
53
54
55
56
57
58
59 protected GeneralConversationState(Collection<String> expectedContributors) {
60 responseStatus = new ContributorResponseStatus(expectedContributors);
61 timer = new Timer(getClass().getSimpleName() + " conversation state " + "timer",
62 TIMER_IS_DAEMON);
63 }
64
65
66
67
68
69
70
71 public void start() {
72 if (!responseStatus.getOutstandComponents().isEmpty()) {
73 timer.schedule(new StateTimerTask(), getTimeoutValue());
74 sendRequest();
75 } else {
76
77 changeState();
78 }
79 }
80
81
82
83
84
85
86
87 public final void handleMessage(Message message) {
88 if (!(message instanceof MessageResponse)) {
89 getContext().getMonitor().warning("Unable to handle none-response type message " + message);
90 return;
91 }
92 MessageResponse response = (MessageResponse)message;
93
94 if (!canHandleResponseType(response)) {
95 getContext().getMonitor().outOfSequenceMessage(response);
96 return;
97 }
98 if (!responseStatus.getComponentsWhichShouldRespond().contains(message.getFrom())) {
99 getContext().getMonitor().debug("Ignoring message from irrelevant component " + response.getFrom());
100 return;
101 }
102
103 try {
104 if(processMessage(response)) {
105 responseStatus.responseReceived(response);
106 if (responseStatus.haveAllComponentsResponded()) {
107 timer.cancel();
108 changeState();
109 }
110 }
111 } catch (UnexpectedResponseException e) {
112 getContext().getMonitor().invalidMessage(message, e);
113 } catch (UnableToFinishException e) {
114 failConversation(e.getMessage());
115 }
116 }
117
118
119
120
121
122
123 private class StateTimerTask extends TimerTask {
124 @Override
125 public void run() {
126 try {
127 logStateTimeout();
128 changeState();
129 } catch (UnableToFinishException e) {
130 failConversation(e.getMessage());
131 } catch (Throwable throwable) {
132 log.error("Failed to handle timeout correctly", throwable);
133 }
134 }
135 }
136
137
138
139
140 private void changeState() {
141 try {
142 GeneralConversationState nextState = completeState();
143 getContext().setState(nextState);
144 nextState.start();
145 } catch (UnableToFinishException e) {
146 failConversation(e.getMessage());
147 }
148 }
149
150 private void failConversation(String message) {
151 timer.cancel();
152 getContext().getMonitor().operationFailed(message);
153 getContext().setState(new FinishedState(getContext()));
154 }
155
156 protected void initializeMessage(MessageRequest msg) {
157 msg.setCollectionID(getContext().getCollectionID());
158 msg.setCorrelationID(getContext().getConversationID());
159 msg.setMinVersion(ProtocolVersionLoader.loadProtocolVersion().getMinVersion());
160 msg.setVersion(ProtocolVersionLoader.loadProtocolVersion().getVersion());
161 msg.setCollectionID(getContext().getCollectionID());
162 msg.setReplyTo(getContext().getSettings().getReceiverDestinationID());
163 msg.setAuditTrailInformation(getContext().getAuditTrailInformation());
164 msg.setFrom(getContext().getClientID());
165 if (getContext().getContributors() != null && getContext().getContributors().size() == 1) {
166 msg.setTo(getContext().getContributors().iterator().next());
167 }
168 }
169
170
171 protected Collection<String> getOutstandingComponents() {
172 return responseStatus.getOutstandComponents();
173 }
174
175
176 protected abstract void logStateTimeout() throws UnableToFinishException ;
177
178
179
180
181 protected abstract void sendRequest();
182
183
184
185
186
187
188
189
190 protected abstract boolean processMessage(MessageResponse response)
191 throws UnexpectedResponseException, UnableToFinishException;
192
193
194
195
196 protected abstract ConversationContext getContext();
197
198
199
200
201
202
203
204
205 protected abstract GeneralConversationState completeState() throws UnableToFinishException;
206
207
208
209
210 protected abstract long getTimeoutValue();
211
212
213
214
215
216 protected abstract String getPrimitiveName();
217
218
219
220
221 private boolean canHandleResponseType(MessageResponse response) {
222 String responseType = response.getClass().getSimpleName();
223 return responseType.contains(getPrimitiveName());
224 }
225 }