001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.indexserver.distribute; 024 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.util.ArrayList; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.common.distribute.ChannelID; 036import dk.netarkivet.common.distribute.Channels; 037import dk.netarkivet.common.distribute.RemoteFile; 038import dk.netarkivet.common.distribute.RemoteFileSettings; 039import dk.netarkivet.common.distribute.indexserver.RequestType; 040import dk.netarkivet.common.exceptions.ArgumentNotValid; 041import dk.netarkivet.common.exceptions.IOFailure; 042import dk.netarkivet.common.exceptions.IllegalState; 043import dk.netarkivet.harvester.distribute.HarvesterMessage; 044import dk.netarkivet.harvester.distribute.HarvesterMessageVisitor; 045 046/** 047 * Message for requesting and index from the index server, and for giving back the reply. 048 */ 049@SuppressWarnings({"serial"}) 050public class IndexRequestMessage extends HarvesterMessage { 051 052 /** The log. */ 053 private transient static final Logger log = LoggerFactory.getLogger(IndexRequestMessage.class); 054 055 /** List of jobs for which an index is requested. Should always be set. */ 056 private Set<Long> requestedJobs; 057 /** Type of index is requested. Should always be set. */ 058 private RequestType requestType; 059 /** 060 * List of jobs for which an index _can_ be generated. Should only be set on reply. Should always be a subset of 061 * requestedJobs. If This set is equal to the requested set, resultFile should also be set. 062 */ 063 private Set<Long> foundJobs; 064 065 /** 066 * The list of files that make up the generated index. Should only be set on reply, and only if index was generated 067 * for all files 068 * <p> 069 * if indexIsStoredInDirectory is false, this list must contain exactly one file (or not have been set yet). 070 */ 071 private List<RemoteFile> resultFiles; 072 073 /** 074 * If true, the underlying cache uses a directory to store its files (which may be zero or more files), otherwise 075 * just a single file is used. 076 */ 077 private boolean indexIsStoredInDirectory; 078 079 /** 080 * If true, return the index to the sender. If false, send IndexReadyMessage instead. 081 */ 082 private boolean shouldReturnIndex; 083 084 /** 085 * The harvestId needing this index for its jobs. 086 */ 087 private Long harvestId; 088 089 /** 090 * Optionally, the client can decide which connection settings to use for the RemoteFile. Only applicable when using 091 * FTPRemoteFile. 092 */ 093 private RemoteFileSettings optionalConnectionSettings; 094 095 /** 096 * Generate an index request message. Receiver is always the index server channel, replyTo is always this index 097 * client. 098 * 099 * @param requestType Type of index requested. 100 * @param jobSet Type of index requested. 101 * @param ftpconnectionInfo FTP connection parameters to be used (if null, we use the local settings) 102 * @throws ArgumentNotValid if any argument is null. 103 */ 104 public IndexRequestMessage(RequestType requestType, Set<Long> jobSet, RemoteFileSettings ftpconnectionInfo) 105 throws ArgumentNotValid { 106 super(Channels.getTheIndexServer(), Channels.getThisIndexClient()); 107 ArgumentNotValid.checkNotNull(requestType, "RequestType requestType"); 108 ArgumentNotValid.checkNotNull(jobSet, "Set<Long> jobSet"); 109 // Note: Copy the set, since the received set may not be serializable. 110 this.requestedJobs = new HashSet<Long>(jobSet); 111 this.requestType = requestType; 112 this.shouldReturnIndex = true; 113 this.optionalConnectionSettings = ftpconnectionInfo; 114 } 115 116 /** 117 * Generate an IndexRequestMessage that can send its reply to a specific channel. 118 * 119 * @param requestType Type of index requested. 120 * @param jobSet Type of index requested. 121 * @param replyTo The channel to send the reply to. 122 * @param returnIndex If true, include the index in the reply. 123 * @param harvestId The harvestId needing this index for its jobs 124 */ 125 public IndexRequestMessage(RequestType requestType, Set<Long> jobSet, ChannelID replyTo, boolean returnIndex, 126 Long harvestId) { 127 super(Channels.getTheIndexServer(), replyTo); 128 ArgumentNotValid.checkNotNull(requestType, "RequestType requestType"); 129 ArgumentNotValid.checkNotNull(jobSet, "Set<Long> jobSet"); 130 // Note: Copy the set, since the received set may not be serializable. 131 this.requestedJobs = new HashSet<Long>(jobSet); 132 this.requestType = requestType; 133 this.shouldReturnIndex = returnIndex; 134 this.harvestId = harvestId; 135 } 136 137 /** 138 * @return the remoteFilesettings 139 */ 140 public RemoteFileSettings getRemoteFileSettings() { 141 return this.optionalConnectionSettings; 142 } 143 144 /** 145 * @return the harvestId which will use this index, if available. 146 */ 147 public Long getHarvestId() { 148 return this.harvestId; 149 } 150 151 /** 152 * @return true, if this index requested should be returned to the caller. False, if we instead should send a 153 * IndexReadyMessage to the HarvestJobManager queue. 154 */ 155 public boolean mustReturnIndex() { 156 return this.shouldReturnIndex; 157 } 158 159 /** 160 * Calls visit on the visitor. 161 * 162 * @param v The visitor of this message. 163 * @see HarvesterMessageVisitor 164 */ 165 @Override 166 public void accept(HarvesterMessageVisitor v) { 167 v.visit(this); 168 169 } 170 171 /** 172 * Get list of requested jobs. Should never return null. 173 * 174 * @return Set of jobs for which an index is requested. 175 */ 176 public Set<Long> getRequestedJobs() { 177 return requestedJobs; 178 } 179 180 /** 181 * Get the request type. Should never be null. 182 * 183 * @return Type of index requested. 184 */ 185 public RequestType getRequestType() { 186 return requestType; 187 } 188 189 /** 190 * Get the set of jobs for which the index is found. This should always be set on replies, and should always be a 191 * subset of the jobs requested. If set of jobs found jobs is the same as the set of requested jobs, the index file 192 * should also be present. 193 * 194 * @return Set of jobs for which the index is found. 195 */ 196 public Set<Long> getFoundJobs() { 197 return foundJobs; 198 } 199 200 /** 201 * On reply, set the set of jobs for which an index is found. This should always be set on replies, and should 202 * always be a subset of the jobs requested. If set of jobs found jobs is the same as the set of requested jobs, the 203 * index file should also be set. 204 * 205 * @param foundJobs The set of jobs for which the index is found 206 * @throws ArgumentNotValid on null argument 207 */ 208 public void setFoundJobs(Set<Long> foundJobs) throws ArgumentNotValid { 209 ArgumentNotValid.checkNotNull(foundJobs, "Set<Long> foundJobs"); 210 // Note: Copy the set, since the received set may not be serializable. 211 this.foundJobs = new HashSet<Long>(foundJobs); 212 } 213 214 /** 215 * The index over the requested jobs. Only set on replies, and only if foundJobs is the same set as requestedJobs. 216 * 217 * @return index of requested jobs. 218 * @throws IllegalState if this message is a multiFile message. 219 */ 220 public RemoteFile getResultFile() throws IllegalState { 221 if (resultFiles != null) { 222 if (isIndexIsStoredInDirectory()) { 223 throw new IllegalState("This message carries multiple result files: " + resultFiles); 224 } 225 return resultFiles.get(0); 226 } else { 227 return null; 228 } 229 } 230 231 /** 232 * Returns the list of result files for the requested jobs. 233 * 234 * @return index of requested jobs in the form of several possibly co-dependent files. 235 * @throws IllegalState if this message is not a multiFile message. 236 */ 237 public List<RemoteFile> getResultFiles() throws IllegalState { 238 if (resultFiles != null) { 239 if (!isIndexIsStoredInDirectory()) { 240 throw new IllegalState("This message carries a single result file: '" + resultFiles.get(0) + "'"); 241 } 242 return resultFiles; 243 } else { 244 return null; 245 } 246 } 247 248 /** 249 * On reply, set remote file containing index of requested jobs. Should _only_ be set when an index over ALL 250 * requested jobs is present. 251 * 252 * @param resultFile RemoteFile containing index over requested jobs. 253 * @throws ArgumentNotValid on null argument. 254 * @throws IllegalState if the result file has already been set. 255 */ 256 public void setResultFile(RemoteFile resultFile) throws IllegalState, ArgumentNotValid { 257 ArgumentNotValid.checkNotNull(resultFile, "RemoteFile resultFile"); 258 if (this.resultFiles != null) { 259 throw new IllegalState(this + " already has result files " + this.resultFiles + " set."); 260 } 261 resultFiles = new ArrayList<RemoteFile>(1); 262 resultFiles.add(resultFile); 263 indexIsStoredInDirectory = false; 264 } 265 266 /** 267 * Set several result files making up an index of requested jobs. Should _only_ be set when an index over ALL 268 * requested jobs is present. 269 * 270 * @param resultFiles RemoteFiles containing index over requested jobs. 271 * @throws ArgumentNotValid on null argument or null element in list. 272 * @throws IllegalState if the result files have already been set. 273 */ 274 public void setResultFiles(List<RemoteFile> resultFiles) throws IllegalState, ArgumentNotValid { 275 ArgumentNotValid.checkNotNull(resultFiles, "List<RemoteFile> resultFiles"); 276 for (RemoteFile rf : resultFiles) { 277 if (rf == null) { 278 throw new ArgumentNotValid("List of result files contains a null element: " + resultFiles); 279 } 280 } 281 if (this.resultFiles != null) { 282 throw new IllegalState(this + " already has result files " + this.resultFiles + " set."); 283 } 284 log.debug("Sending result containing {} files", resultFiles.size()); 285 this.resultFiles = resultFiles; 286 indexIsStoredInDirectory = true; 287 } 288 289 /** 290 * If true, this message may carry multiple files that should be stored in a directory. 291 * 292 * @return True if more than one file may be transferred with this message. 293 */ 294 public boolean isIndexIsStoredInDirectory() { 295 return indexIsStoredInDirectory; 296 } 297 298 /** 299 * Invoke default method for deserializing object, and reinitialise the logger. 300 * 301 * @param s The stream the object is read from. 302 */ 303 private void readObject(ObjectInputStream s) { 304 try { 305 s.defaultReadObject(); 306 } catch (Exception e) { 307 throw new IOFailure("Unexpected error during deserialization", e); 308 } 309 } 310 311 /** 312 * Invoke default method for serializing object. 313 * 314 * @param s The stream the object is written to. 315 */ 316 private void writeObject(ObjectOutputStream s) { 317 try { 318 s.defaultWriteObject(); 319 } catch (Exception e) { 320 throw new IOFailure("Unexpected error during serialization", e); 321 } 322 } 323 324}