001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.indexserver.distribute;
024
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.util.ArrayList;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.common.distribute.ChannelID;
036import dk.netarkivet.common.distribute.Channels;
037import dk.netarkivet.common.distribute.RemoteFile;
038import dk.netarkivet.common.distribute.RemoteFileSettings;
039import dk.netarkivet.common.distribute.indexserver.RequestType;
040import dk.netarkivet.common.exceptions.ArgumentNotValid;
041import dk.netarkivet.common.exceptions.IOFailure;
042import dk.netarkivet.common.exceptions.IllegalState;
043import dk.netarkivet.harvester.distribute.HarvesterMessage;
044import dk.netarkivet.harvester.distribute.HarvesterMessageVisitor;
045
046/**
047 * Message for requesting and index from the index server, and for giving back the reply.
048 */
049@SuppressWarnings({"serial"})
050public class IndexRequestMessage extends HarvesterMessage {
051
052    /** The log. */
053    private transient static final Logger log = LoggerFactory.getLogger(IndexRequestMessage.class);
054
055    /** List of jobs for which an index is requested. Should always be set. */
056    private Set<Long> requestedJobs;
057    /** Type of index is requested. Should always be set. */
058    private RequestType requestType;
059    /**
060     * List of jobs for which an index _can_ be generated. Should only be set on reply. Should always be a subset of
061     * requestedJobs. If This set is equal to the requested set, resultFile should also be set.
062     */
063    private Set<Long> foundJobs;
064
065    /**
066     * The list of files that make up the generated index. Should only be set on reply, and only if index was generated
067     * for all files
068     * <p>
069     * if indexIsStoredInDirectory is false, this list must contain exactly one file (or not have been set yet).
070     */
071    private List<RemoteFile> resultFiles;
072
073    /**
074     * If true, the underlying cache uses a directory to store its files (which may be zero or more files), otherwise
075     * just a single file is used.
076     */
077    private boolean indexIsStoredInDirectory;
078
079    /**
080     * If true, return the index to the sender. If false, send IndexReadyMessage instead.
081     */
082    private boolean shouldReturnIndex;
083
084    /**
085     * The harvestId needing this index for its jobs.
086     */
087    private Long harvestId;
088
089    /**
090     * Optionally, the client can decide which connection settings to use for the RemoteFile. Only applicable when using
091     * FTPRemoteFile.
092     */
093    private RemoteFileSettings optionalConnectionSettings;
094
095    /**
096     * Generate an index request message. Receiver is always the index server channel, replyTo is always this index
097     * client.
098     *
099     * @param requestType Type of index requested.
100     * @param jobSet Type of index requested.
101     * @param ftpconnectionInfo FTP connection parameters to be used (if null, we use the local settings)
102     * @throws ArgumentNotValid if any argument is null.
103     */
104    public IndexRequestMessage(RequestType requestType, Set<Long> jobSet, RemoteFileSettings ftpconnectionInfo)
105            throws ArgumentNotValid {
106        super(Channels.getTheIndexServer(), Channels.getThisIndexClient());
107        ArgumentNotValid.checkNotNull(requestType, "RequestType requestType");
108        ArgumentNotValid.checkNotNull(jobSet, "Set<Long> jobSet");
109        // Note: Copy the set, since the received set may not be serializable.
110        this.requestedJobs = new HashSet<Long>(jobSet);
111        this.requestType = requestType;
112        this.shouldReturnIndex = true;
113        this.optionalConnectionSettings = ftpconnectionInfo;
114    }
115
116    /**
117     * Generate an IndexRequestMessage that can send its reply to a specific channel.
118     *
119     * @param requestType Type of index requested.
120     * @param jobSet Type of index requested.
121     * @param replyTo The channel to send the reply to.
122     * @param returnIndex If true, include the index in the reply.
123     * @param harvestId The harvestId needing this index for its jobs
124     */
125    public IndexRequestMessage(RequestType requestType, Set<Long> jobSet, ChannelID replyTo, boolean returnIndex,
126            Long harvestId) {
127        super(Channels.getTheIndexServer(), replyTo);
128        ArgumentNotValid.checkNotNull(requestType, "RequestType requestType");
129        ArgumentNotValid.checkNotNull(jobSet, "Set<Long> jobSet");
130        // Note: Copy the set, since the received set may not be serializable.
131        this.requestedJobs = new HashSet<Long>(jobSet);
132        this.requestType = requestType;
133        this.shouldReturnIndex = returnIndex;
134        this.harvestId = harvestId;
135    }
136
137    /**
138     * @return the remoteFilesettings
139     */
140    public RemoteFileSettings getRemoteFileSettings() {
141        return this.optionalConnectionSettings;
142    }
143
144    /**
145     * @return the harvestId which will use this index, if available.
146     */
147    public Long getHarvestId() {
148        return this.harvestId;
149    }
150
151    /**
152     * @return true, if this index requested should be returned to the caller. False, if we instead should send a
153     * IndexReadyMessage to the HarvestJobManager queue.
154     */
155    public boolean mustReturnIndex() {
156        return this.shouldReturnIndex;
157    }
158
159    /**
160     * Calls visit on the visitor.
161     *
162     * @param v The visitor of this message.
163     * @see HarvesterMessageVisitor
164     */
165    @Override
166    public void accept(HarvesterMessageVisitor v) {
167        v.visit(this);
168
169    }
170
171    /**
172     * Get list of requested jobs. Should never return null.
173     *
174     * @return Set of jobs for which an index is requested.
175     */
176    public Set<Long> getRequestedJobs() {
177        return requestedJobs;
178    }
179
180    /**
181     * Get the request type. Should never be null.
182     *
183     * @return Type of index requested.
184     */
185    public RequestType getRequestType() {
186        return requestType;
187    }
188
189    /**
190     * Get the set of jobs for which the index is found. This should always be set on replies, and should always be a
191     * subset of the jobs requested. If set of jobs found jobs is the same as the set of requested jobs, the index file
192     * should also be present.
193     *
194     * @return Set of jobs for which the index is found.
195     */
196    public Set<Long> getFoundJobs() {
197        return foundJobs;
198    }
199
200    /**
201     * On reply, set the set of jobs for which an index is found. This should always be set on replies, and should
202     * always be a subset of the jobs requested. If set of jobs found jobs is the same as the set of requested jobs, the
203     * index file should also be set.
204     *
205     * @param foundJobs The set of jobs for which the index is found
206     * @throws ArgumentNotValid on null argument
207     */
208    public void setFoundJobs(Set<Long> foundJobs) throws ArgumentNotValid {
209        ArgumentNotValid.checkNotNull(foundJobs, "Set<Long> foundJobs");
210        // Note: Copy the set, since the received set may not be serializable.
211        this.foundJobs = new HashSet<Long>(foundJobs);
212    }
213
214    /**
215     * The index over the requested jobs. Only set on replies, and only if foundJobs is the same set as requestedJobs.
216     *
217     * @return index of requested jobs.
218     * @throws IllegalState if this message is a multiFile message.
219     */
220    public RemoteFile getResultFile() throws IllegalState {
221        if (resultFiles != null) {
222            if (isIndexIsStoredInDirectory()) {
223                throw new IllegalState("This message carries multiple result files: " + resultFiles);
224            }
225            return resultFiles.get(0);
226        } else {
227            return null;
228        }
229    }
230
231    /**
232     * Returns the list of result files for the requested jobs.
233     *
234     * @return index of requested jobs in the form of several possibly co-dependent files.
235     * @throws IllegalState if this message is not a multiFile message.
236     */
237    public List<RemoteFile> getResultFiles() throws IllegalState {
238        if (resultFiles != null) {
239            if (!isIndexIsStoredInDirectory()) {
240                throw new IllegalState("This message carries a single result file: '" + resultFiles.get(0) + "'");
241            }
242            return resultFiles;
243        } else {
244            return null;
245        }
246    }
247
248    /**
249     * On reply, set remote file containing index of requested jobs. Should _only_ be set when an index over ALL
250     * requested jobs is present.
251     *
252     * @param resultFile RemoteFile containing index over requested jobs.
253     * @throws ArgumentNotValid on null argument.
254     * @throws IllegalState if the result file has already been set.
255     */
256    public void setResultFile(RemoteFile resultFile) throws IllegalState, ArgumentNotValid {
257        ArgumentNotValid.checkNotNull(resultFile, "RemoteFile resultFile");
258        if (this.resultFiles != null) {
259            throw new IllegalState(this + " already has result files " + this.resultFiles + " set.");
260        }
261        resultFiles = new ArrayList<RemoteFile>(1);
262        resultFiles.add(resultFile);
263        indexIsStoredInDirectory = false;
264    }
265
266    /**
267     * Set several result files making up an index of requested jobs. Should _only_ be set when an index over ALL
268     * requested jobs is present.
269     *
270     * @param resultFiles RemoteFiles containing index over requested jobs.
271     * @throws ArgumentNotValid on null argument or null element in list.
272     * @throws IllegalState if the result files have already been set.
273     */
274    public void setResultFiles(List<RemoteFile> resultFiles) throws IllegalState, ArgumentNotValid {
275        ArgumentNotValid.checkNotNull(resultFiles, "List<RemoteFile> resultFiles");
276        for (RemoteFile rf : resultFiles) {
277            if (rf == null) {
278                throw new ArgumentNotValid("List of result files contains a null element: " + resultFiles);
279            }
280        }
281        if (this.resultFiles != null) {
282            throw new IllegalState(this + " already has result files " + this.resultFiles + " set.");
283        }
284        log.debug("Sending result containing {} files", resultFiles.size());
285        this.resultFiles = resultFiles;
286        indexIsStoredInDirectory = true;
287    }
288
289    /**
290     * If true, this message may carry multiple files that should be stored in a directory.
291     *
292     * @return True if more than one file may be transferred with this message.
293     */
294    public boolean isIndexIsStoredInDirectory() {
295        return indexIsStoredInDirectory;
296    }
297
298    /**
299     * Invoke default method for deserializing object, and reinitialise the logger.
300     *
301     * @param s The stream the object is read from.
302     */
303    private void readObject(ObjectInputStream s) {
304        try {
305            s.defaultReadObject();
306        } catch (Exception e) {
307            throw new IOFailure("Unexpected error during deserialization", e);
308        }
309    }
310
311    /**
312     * Invoke default method for serializing object.
313     *
314     * @param s The stream the object is written to.
315     */
316    private void writeObject(ObjectOutputStream s) {
317        try {
318            s.defaultWriteObject();
319        } catch (Exception e) {
320            throw new IOFailure("Unexpected error during serialization", e);
321        }
322    }
323
324}