View Javadoc

1   /*
2    * #%L
3    * Bitrepository Protocol
4    * 
5    * $Id$
6    * $HeadURL$
7    * %%
8    * Copyright (C) 2010 - 2012 The State and University Library, The Royal Library and The State Archives, Denmark
9    * %%
10   * This program is free software: you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser General Public License as 
12   * published by the Free Software Foundation, either version 2.1 of the 
13   * License, or (at your option) any later version.
14   * 
15   * This program is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU General Lesser Public License for more details.
19   * 
20   * You should have received a copy of the GNU General Lesser Public 
21   * License along with this program.  If not, see
22   * <http://www.gnu.org/licenses/lgpl-2.1.html>.
23   * #L%
24   */
25  package org.bitrepository.client.conversation;
26  
27  import java.util.Collection;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  
31  import org.bitrepository.bitrepositorymessages.Message;
32  import org.bitrepository.bitrepositorymessages.MessageRequest;
33  import org.bitrepository.bitrepositorymessages.MessageResponse;
34  import org.bitrepository.client.conversation.selector.ContributorResponseStatus;
35  import org.bitrepository.client.exceptions.UnexpectedResponseException;
36  import org.bitrepository.common.exceptions.UnableToFinishException;
37  import org.bitrepository.protocol.ProtocolVersionLoader;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * Implements the generic conversation state functionality, 
43   * like timeouts and the definition of the common state attributes.
44   */
45  public abstract class GeneralConversationState implements ConversationState {
46      private final Logger log = LoggerFactory.getLogger(getClass());
47      /** Defines that the timer is a daemon thread. */
48      private static final Boolean TIMER_IS_DAEMON = true;
49      /** The timer used for timeout checks. */
50      private final Timer timer;
51      /** For response bookkeeping */
52      private final ContributorResponseStatus responseStatus;
53  
54      /**
55       *
56       * @param expectedContributors The collection of components to monitor responses from. This conversation
57       *                             phase is considered finished when all contributors have responded.
58       */
59      protected GeneralConversationState(Collection<String> expectedContributors) {
60          responseStatus = new ContributorResponseStatus(expectedContributors);
61          timer = new Timer(getClass().getSimpleName() + " conversation state " + "timer",
62                  TIMER_IS_DAEMON);
63      }
64  
65      /**
66       * Startes the state by: <ol>
67       *     <li>Starting the timeout timer.</li>
68       *     <li>Sends the request which triggers the responses for this state.</li>
69       * </ol>
70       */
71      public void start() {
72          if (!responseStatus.getOutstandComponents().isEmpty()) {
73              timer.schedule(new StateTimerTask(), getTimeoutValue());
74              sendRequest();
75          } else {
76              // No contributors need to be called for the operation to finish.
77              changeState();
78          }
79      }
80  
81      /**
82       * The general message handler for this state. Will only accept <code>MessageResponses</code>.
83       * Takes care of the general message bookkeeping and delegates the specifics of the message handling to the
84       * concrete states {@link #processMessage(MessageResponse)}.
85       * @param message The message to handle.
86       */
87      public final void handleMessage(Message message) {
88          if (!(message instanceof MessageResponse)) {
89              getContext().getMonitor().warning("Unable to handle none-response type message " + message);
90              return;
91          }
92          MessageResponse response = (MessageResponse)message;
93  
94          if (!canHandleResponseType(response)) {
95              getContext().getMonitor().outOfSequenceMessage(response);
96              return;
97          }
98          if (!responseStatus.getComponentsWhichShouldRespond().contains(message.getFrom())) {
99              getContext().getMonitor().debug("Ignoring message from irrelevant component " + response.getFrom());
100             return;
101         }
102 
103         try {
104             if(processMessage(response)) {
105                 responseStatus.responseReceived(response);
106                 if (responseStatus.haveAllComponentsResponded()) {
107                     timer.cancel();
108                     changeState();
109                 }
110             }
111         } catch (UnexpectedResponseException e) {
112             getContext().getMonitor().invalidMessage(message, e);
113         } catch (UnableToFinishException e) {
114             failConversation(e.getMessage());
115         }
116     }
117 
118     /**
119      * The timer task class for the outstanding identify requests.
120      * When the time is reached the selected pillar should
121      * be called requested for the delivery of the file.
122      */
123     private class StateTimerTask extends TimerTask {
124         @Override
125         public void run() {
126             try {
127                 logStateTimeout();
128                 changeState();
129             } catch (UnableToFinishException e) {
130                 failConversation(e.getMessage());
131             } catch (Throwable throwable) {
132                 log.error("Failed to handle timeout correctly", throwable);
133             }
134         }
135     }
136 
137     /**
138      * Changes to the next state.
139      */
140     private void changeState() {
141         try {
142             GeneralConversationState nextState = completeState();
143             getContext().setState(nextState);
144             nextState.start();
145         } catch (UnableToFinishException e) {
146             failConversation(e.getMessage());
147         }
148     }
149 
150     private void failConversation(String message) {
151         timer.cancel();
152         getContext().getMonitor().operationFailed(message);
153         getContext().setState(new FinishedState(getContext()));
154     }
155 
156     protected void initializeMessage(MessageRequest msg) {
157         msg.setCollectionID(getContext().getCollectionID());
158         msg.setCorrelationID(getContext().getConversationID());
159         msg.setMinVersion(ProtocolVersionLoader.loadProtocolVersion().getMinVersion());
160         msg.setVersion(ProtocolVersionLoader.loadProtocolVersion().getVersion());
161         msg.setCollectionID(getContext().getCollectionID());
162         msg.setReplyTo(getContext().getSettings().getReceiverDestinationID());
163         msg.setAuditTrailInformation(getContext().getAuditTrailInformation());
164         msg.setFrom(getContext().getClientID());
165         if (getContext().getContributors() != null && getContext().getContributors().size() == 1) {
166             msg.setTo(getContext().getContributors().iterator().next());
167         }
168     }
169 
170     /** Returns a list of components where a identify response hasn't been received. */
171     protected Collection<String> getOutstandingComponents() {
172         return responseStatus.getOutstandComponents();
173     }
174 
175     /** Must be implemented by subclasses to log informative timeout information */
176     protected abstract void logStateTimeout() throws UnableToFinishException ;
177 
178     /**
179      * Implement by concrete states for sending the request starting this state.
180      */
181     protected abstract void sendRequest();
182 
183     /**
184      * Implement by concrete states. Only messages from the indicated contributors and with the right type
185      * will be delegate to this method.
186      * @return boolean Return true if response should be considered a final response, false if not. 
187      *      This is intended for use when a failure response results in a retry, so the component is not finished. 
188      * @throws UnexpectedResponseException The response could not be processed successfully.
189      */
190     protected abstract boolean processMessage(MessageResponse response)
191             throws UnexpectedResponseException, UnableToFinishException;
192 
193     /**
194      * @return The conversation context used for this conversation.
195      */
196     protected abstract ConversationContext getContext();
197 
198     /**
199      * Completes the state by generating any state/primitive finish events, create the following state and
200      * return it.
201      * @return An instance of the state following this state. Will be called if the moveToNextState
202      * has returned true.
203      * @exception UnableToFinishException Thrown in case that it is impossible to create a valid ned state.
204      */
205     protected abstract GeneralConversationState completeState() throws UnableToFinishException;
206 
207     /**
208      * Gives access to the concrete timeout for the state.
209      */
210     protected abstract long getTimeoutValue();
211 
212     /**
213      * Informative naming of the process this state is performing. Used for logging. Examples are 'Delete files',
214      * 'Identify contributers for Audit Trails'
215      */
216     protected abstract String getPrimitiveName();
217 
218     /**
219      * Implemented by concrete classes to indicate whether the state expects responses of this type.
220      */
221     private boolean canHandleResponseType(MessageResponse response) {
222         String responseType = response.getClass().getSimpleName();
223         return responseType.contains(getPrimitiveName());
224     }
225 }