001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.arcrepository.distribute;
024
025import java.io.File;
026import java.io.IOException;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage;
032import dk.netarkivet.archive.bitarchive.distribute.BatchMessage;
033import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage;
034import dk.netarkivet.archive.bitarchive.distribute.GetFileMessage;
035import dk.netarkivet.archive.bitarchive.distribute.GetMessage;
036import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage;
037import dk.netarkivet.archive.checksum.distribute.CorrectMessage;
038import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage;
039import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage;
040import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage;
041import dk.netarkivet.common.distribute.ChannelID;
042import dk.netarkivet.common.distribute.Channels;
043import dk.netarkivet.common.distribute.JMSConnectionFactory;
044import dk.netarkivet.common.distribute.NetarkivetMessage;
045import dk.netarkivet.common.distribute.RemoteFile;
046import dk.netarkivet.common.distribute.RemoteFileFactory;
047import dk.netarkivet.common.distribute.Synchronizer;
048import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClient;
049import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
050import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord;
051import dk.netarkivet.common.distribute.arcrepository.Replica;
052import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState;
053import dk.netarkivet.common.exceptions.ArgumentNotValid;
054import dk.netarkivet.common.exceptions.IOFailure;
055import dk.netarkivet.common.utils.ExceptionUtils;
056import dk.netarkivet.common.utils.FileUtils;
057import dk.netarkivet.common.utils.NotificationType;
058import dk.netarkivet.common.utils.NotificationsFactory;
059import dk.netarkivet.common.utils.Settings;
060import dk.netarkivet.common.utils.batch.FileBatchJob;
061
062/**
063 * Client side usage of an arc repository. All requests are forwarded to the ArcRepositoryServer over the network. get
064 * and store messages are retried a number of time before giving up, and will timeout after a specified time.
065 */
066public class JMSArcRepositoryClient extends Synchronizer implements ArcRepositoryClient {
067
068    /** The default place in classpath where the settings file can be found. */
069    private static String defaultSettingsClasspath = "dk/netarkivet/archive/arcrepository/distribute/JMSArcRepositoryClientSettings.xml";
070
071    /*
072     * The static initialiser is called when the class is loaded. It will add default values for all settings defined in
073     * this class, by loading them from a settings.xml file in classpath.
074     */
075    static {
076        Settings.addDefaultClasspathSettings(defaultSettingsClasspath);
077    }
078
079    /** The amount of milliseconds in a second. 1000. */
080    private static final int MILLISECONDS_PER_SECOND = 1000;
081
082    /** the one and only JMSArcRepositoryClient instance. */
083    private static JMSArcRepositoryClient instance;
084
085    /** Logging output place. */
086    protected static final Logger log = LoggerFactory.getLogger(JMSArcRepositoryClient.class);
087
088    /** Listens on this queue for replies. */
089    private final ChannelID replyQ;
090
091    /** The number of times to try sending a store message before giving up. */
092    private long storeRetries;
093
094    /** The length of time to wait for a store reply before giving up. */
095    private long storeTimeout;
096
097    /** The length of time to wait for a get reply before giving up. */
098    private long getTimeout;
099
100    // NOTE: The constants defining setting names below are left non-final on
101    // purpose! Otherwise, the static initialiser that loads default values
102    // will not run.
103
104    /**
105     * <b>settings.common.arcrepositoryClient.getTimeout</b>: <br>
106     * The setting for how many milliseconds we will wait before giving up on a lookup request to the Arcrepository.
107     */
108    public static final String ARCREPOSITORY_GET_TIMEOUT = "settings.common.arcrepositoryClient.getTimeout";
109
110    /**
111     * <b>settings.common.arcrepositoryClient.storeRetries</b>: <br>
112     * The setting for the number of times to try sending a store message before failing, including the first attempt.
113     */
114    public static final String ARCREPOSITORY_STORE_RETRIES = "settings.common.arcrepositoryClient.storeRetries";
115
116    /**
117     * <b>settings.common.arcrepositoryClient.storeTimeout</b>: <br>
118     * the setting for the timeout in milliseconds before retrying when calling {@link ArcRepositoryClient#store(File)}.
119     */
120    public static final String ARCREPOSITORY_STORE_TIMEOUT = "settings.common.arcrepositoryClient.storeTimeout";
121
122    /** Adds this Synchronizer as listener on a jms connection. */
123    protected JMSArcRepositoryClient() {
124        storeRetries = Settings.getLong(ARCREPOSITORY_STORE_RETRIES);
125        storeTimeout = Settings.getLong(ARCREPOSITORY_STORE_TIMEOUT);
126        getTimeout = Settings.getLong(ARCREPOSITORY_GET_TIMEOUT);
127
128        log.info(
129                "JMSArcRepositoryClient will retry a store {} times and timeout on each try after {} milliseconds, and timeout on each getrequest after {} milliseconds.",
130                storeRetries, storeTimeout, getTimeout);
131        replyQ = Channels.getThisReposClient();
132        JMSConnectionFactory.getInstance().setListener(replyQ, this);
133        log.info("JMSArcRepository listens for replies on channel '{}'", replyQ);
134    }
135
136    /**
137     * Get an JMSArcRepositoryClient instance. This is guaranteed to be a singleton.
138     *
139     * @return an JMSArcRepositoryClient instance.
140     */
141    public static synchronized JMSArcRepositoryClient getInstance() {
142        if (instance == null) {
143            instance = new JMSArcRepositoryClient();
144        }
145        return instance;
146    }
147
148    /** Removes this object as a JMS listener. */
149    public void close() {
150        synchronized (JMSArcRepositoryClient.class) {
151            JMSConnectionFactory.getInstance().removeListener(replyQ, this);
152            instance = null;
153        }
154    }
155
156    /**
157     * Sends a GetMessage on the "TheArcrepos" queue and waits for a reply. This is a blocking call. Returns null if no
158     * message is returned within Settings.ARCREPOSITORY_GET_TIMEOUT
159     *
160     * @param arcfile The name of a file.
161     * @param index The offset of the wanted record in the file
162     * @return a BitarchiveRecord-object or null if request times out or object is not found.
163     * @throws ArgumentNotValid If the given arcfile is null or empty, or the given index is negative.
164     * @throws IOFailure If a wrong message is returned or the get operation failed.
165     */
166    public BitarchiveRecord get(String arcfile, long index) throws ArgumentNotValid, IOFailure {
167        ArgumentNotValid.checkNotNullOrEmpty(arcfile, "arcfile");
168        ArgumentNotValid.checkNotNegative(index, "index");
169        log.debug("Requesting get of record '{}:{}'", arcfile, index);
170        long start = System.currentTimeMillis();
171        GetMessage requestGetMsg = new GetMessage(Channels.getTheRepos(), replyQ, arcfile, index);
172        NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(requestGetMsg, getTimeout);
173        long timePassed = System.currentTimeMillis() - start;
174        log.debug("Reply received after {} seconds", (timePassed / MILLISECONDS_PER_SECOND));
175        if (replyNetMsg == null) {
176            log.info("Request for record({}:{}) timed out after {} seconds. Returning null BitarchiveRecord", arcfile,
177                    index, (getTimeout / MILLISECONDS_PER_SECOND));
178            return null;
179        }
180        GetMessage replyGetMsg;
181        try {
182            replyGetMsg = (GetMessage) replyNetMsg;
183        } catch (ClassCastException e) {
184            throw new IOFailure("Received invalid argument reply: '" + replyNetMsg + "'", e);
185        }
186        if (!replyGetMsg.isOk()) {
187            throw new IOFailure("GetMessage failed: '" + replyGetMsg.getErrMsg() + "'");
188        }
189        return replyGetMsg.getRecord();
190    }
191
192    /**
193     * Synchronously retrieves a file from a bitarchive and places it in a local file. This is the interface for sending
194     * GetFileMessage on the "TheArcrepos" queue. This is a blocking call.
195     *
196     * @param arcfilename Name of the arcfile to retrieve.
197     * @param replica The bitarchive to retrieve the data from.
198     * @param toFile Filename of a place where the file fetched can be put.
199     * @throws ArgumentNotValid If the arcfilename are null or empty, or if either replica or toFile is null.
200     * @throws IOFailure if there are problems getting a reply or the file could not be found.
201     */
202    public void getFile(String arcfilename, Replica replica, File toFile) throws ArgumentNotValid, IOFailure {
203        ArgumentNotValid.checkNotNullOrEmpty(arcfilename, "arcfilename");
204        ArgumentNotValid.checkNotNull(replica, "replica");
205        ArgumentNotValid.checkNotNull(toFile, "toFile");
206
207        log.debug("Requesting get of file '{}' from '{}'", arcfilename, replica);
208        // ArgumentNotValid.checkNotNull(replyQ, "replyQ must not be null");
209        GetFileMessage gfMsg = new GetFileMessage(Channels.getTheRepos(), replyQ, arcfilename, replica.getId());
210        GetFileMessage getFileMessage = (GetFileMessage) sendAndWaitForOneReply(gfMsg, 0);
211        if (getFileMessage == null) {
212            throw new IOFailure("GetFileMessage timed out before returning." + "File not found?");
213        } else if (!getFileMessage.isOk()) {
214            throw new IOFailure("GetFileMessage failed: " + getFileMessage.getErrMsg());
215        } else {
216            getFileMessage.getData(toFile);
217        }
218    }
219
220    /**
221     * Sends a StoreMessage via the synchronized JMS connection method sendAndWaitForOneReply(). After a successful
222     * storage operation, both the local copy of the file and the copy on the ftp server are deleted.
223     *
224     * @param file A file to be stored. Must exist.
225     * @throws IOFailure thrown if store is unsuccessful, or failed to clean up files locally or on the ftp server after
226     * the store operation.
227     * @throws ArgumentNotValid if file parameter is null or file is not an existing file.
228     */
229    public void store(File file) throws IOFailure, ArgumentNotValid {
230        ArgumentNotValid.checkNotNull(file, "file");
231        ArgumentNotValid.checkTrue(file.isFile(), "The file '" + file.getPath() + "' is not an existing file.");
232
233        StringBuilder messages = new StringBuilder();
234        for (long i = 0; i < storeRetries; i++) {
235            StoreMessage outMsg = null;
236            try {
237                log.debug("Sending a StoreMessage with file '{}'", file.getPath());
238                outMsg = new StoreMessage(replyQ, file);
239                NetarkivetMessage replyMsg = sendAndWaitForOneReply(outMsg, storeTimeout);
240                if (replyMsg != null && replyMsg.isOk()) {
241                    try {
242                        FileUtils.removeRecursively(file);
243                    } catch (IOFailure e) {
244                        log.warn("Failed to clean up '{}'", file.getAbsolutePath(), e);
245                        // Not fatal
246                    }
247                    return;
248                } else if (replyMsg == null) {
249                    String msg = "Timed out" + " while waiting for reply on store of file '" + file.getPath()
250                            + "' on attempt number " + (i + 1) + " of " + storeRetries;
251                    log.warn(msg);
252                    messages.append(msg).append("\n");
253                } else {
254                    String msg = "The returned message '" + replyMsg + "' was not ok while waiting for reply on store "
255                            + "of file '" + file.getPath() + "' on attempt number " + (i + 1) + " of " + storeRetries
256                            + ". Error message was '" + replyMsg.getErrMsg() + "'";
257                    log.warn(msg);
258                    messages.append(msg).append("\n");
259                }
260            } catch (Exception e) {
261                String msg = "Client-side exception occurred while storing '" + file.getPath() + "' on attempt number "
262                        + (i + 1) + " of " + storeRetries + ".";
263                log.warn(msg, e);
264                messages.append(msg).append("\n");
265                messages.append(ExceptionUtils.getStackTrace(e));
266            } finally {
267                if (outMsg != null) {
268                    cleanUpAfterStore(outMsg);
269                }
270            }
271        }
272        String errMsg = "Could not store '" + file.getPath() + "' after " + storeRetries + " attempts. Giving up.\n"
273                + messages;
274        log.error(errMsg);
275        NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR);
276        throw new IOFailure(errMsg);
277    }
278
279    /**
280     * Tries to clean up a file on the FTP server after a store operation. Will not throw exception on error, merely log
281     * exception.
282     *
283     * @param m the StoreMessage sent back as reply
284     */
285    private void cleanUpAfterStore(StoreMessage m) {
286        RemoteFile rf;
287        try {
288            rf = m.getRemoteFile();
289        } catch (Exception e) {
290            log.warn("Could not get remote file object from message {}", m, e);
291            return;
292        }
293        try {
294            rf.cleanup();
295        } catch (Exception e) {
296            log.warn("Could not delete remote file on ftp server: {}", rf, e);
297        }
298    }
299
300    /**
301     * Runs a batch batch job on each file in the ArcRepository.
302     * <p>
303     * Note: The id for the batchjob is the empty string, which removes the possibility of terminating the batchjob
304     * remotely while it is running.
305     *
306     * @param job An object that implements the FileBatchJob interface. The initialize() method will be called before
307     * processing and the finish() method will be called afterwards. The process() method will be called with each File
308     * entry. An optional function postProcess() allows handling the combined results of the batchjob, e.g. summing the
309     * results, sorting, etc.
310     * @param replicaId The archive to execute the job on.
311     * @param args The arguments for the batchjob.
312     * @return The status of the batch job after it ended.
313     */
314    public BatchStatus batch(FileBatchJob job, String replicaId, String... args) {
315        return batch(job, replicaId, "", args);
316    }
317
318    /**
319     * Runs a batch job on each file in the ArcRepository.
320     *
321     * @param job An object that implements the FileBatchJob interface. The initialize() method will be called before
322     * processing and the finish() method will be called afterwards. The process() method will be called with each File
323     * entry. An optional function postProcess() allows handling the combined results of the batchjob, e.g. summing the
324     * results, sorting, etc.
325     * @param replicaId The archive to execute the job on.
326     * @param args The arguments for the batchjob. This is allowed to be null.
327     * @param batchId The id for the batch process.
328     * @return The status of the batch job after it ended.
329     * @throws ArgumentNotValid If the job is null or the replicaId is either null or the empty string.
330     * @throws IOFailure If no result file is returned.
331     */
332    public BatchStatus batch(FileBatchJob job, String replicaId, String batchId, String... args) throws IOFailure,
333            ArgumentNotValid {
334        ArgumentNotValid.checkNotNull(job, "FileBatchJob job");
335        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
336
337        log.debug("Starting batchjob '{}' running on replica '{}'", job, replicaId);
338        BatchMessage bMsg = new BatchMessage(Channels.getTheRepos(), replyQ, job, replicaId, batchId, args);
339        log.debug("Sending batchmessage to queue '{}' with replyqueue set to '{}'", Channels.getTheRepos(), replyQ);
340        BatchReplyMessage brMsg = (BatchReplyMessage) sendAndWaitForOneReply(bMsg, 0);
341        if (!brMsg.isOk()) {
342            String msg = "The batch job '" + bMsg + "' resulted in the following " + "error: " + brMsg.getErrMsg();
343            log.warn(msg);
344            if (brMsg.getResultFile() == null) {
345                // If no result is available at all, this is non-recoverable
346                throw new IOFailure(msg);
347            }
348        }
349        return new BatchStatus(brMsg.getFilesFailed(), brMsg.getNoOfFilesProcessed(), brMsg.getResultFile(),
350                job.getExceptions());
351    }
352
353    /**
354     * Request update of admin data to specific state.
355     *
356     * @param fileName The file for which admin data should be updated.
357     * @param replicaId The id if the replica that the administrative data for fileName is wrong for.
358     * @param newval The new value in admin data.
359     * @throws ArgumentNotValid If one of the arguments are invalid (null or empty string).
360     * @throws IOFailure If the reply to the request update timed out.
361     */
362    public void updateAdminData(String fileName, String replicaId, ReplicaStoreState newval) throws ArgumentNotValid,
363            IOFailure {
364        ArgumentNotValid.checkNotNullOrEmpty(fileName, "String fileName");
365        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
366        ArgumentNotValid.checkNotNull(newval, "ReplicaStoreState newval");
367
368        String msg = "Requesting update of admin data for file '" + fileName + "' replica '" + replicaId
369                + "' to state " + newval;
370        log.warn(msg);
371        NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
372        AdminDataMessage aMsg = new AdminDataMessage(fileName, replicaId, newval);
373        // We only need to know that a reply to our message has arrived.
374        // The replyMessage is thrown away, because it does not contain
375        // any more useful knowledge.
376        sendAndWaitForOneReply(aMsg, 0);
377    }
378
379    /**
380     * Request update of admin data to specific checksum.
381     *
382     * @param filename The file for which admin data should be updated.
383     * @param checksum The new checksum for the file
384     */
385    public void updateAdminChecksum(String filename, String checksum) {
386        ArgumentNotValid.checkNotNullOrEmpty(filename, "filename");
387        ArgumentNotValid.checkNotNullOrEmpty(checksum, "checksum");
388
389        String msg = "Requesting update of admin data for file '" + filename + "' to checksum '" + checksum;
390        log.warn(msg);
391        NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
392        AdminDataMessage aMsg = new AdminDataMessage(filename, checksum);
393        // We only need to know that a reply to our message has arrived.
394        // The replyMessage is thrown away, because it does not contain
395        // any more useful knowledge.
396        sendAndWaitForOneReply(aMsg, 0);
397    }
398
399    /**
400     * Removes a file from the bitarchives, if given credentials and checksum are correct.
401     *
402     * @param fileName The name of the file to delete
403     * @param bitarchiveId The id of the bitarchive to delete the file in
404     * @param checksum The checksum of the deleted file
405     * @param credentials The credentials used to delete the file
406     * @return The file that was removed
407     * @throws ArgumentNotValid if arguments are null or equal to the empty string
408     * @throws IOFailure if we could not delete the remote file, or there was no response to our RemoveAndGetFileMessage
409     * within the allotted time defined by the setting {@link JMSArcRepositoryClient#ARCREPOSITORY_STORE_TIMEOUT}.
410     */
411    public File removeAndGetFile(String fileName, String bitarchiveId, String checksum, String credentials)
412            throws IOFailure, ArgumentNotValid {
413        ArgumentNotValid.checkNotNullOrEmpty(fileName, "filename");
414        ArgumentNotValid.checkNotNullOrEmpty(bitarchiveId, "bitarchiveName");
415        ArgumentNotValid.checkNotNullOrEmpty(checksum, "checksum");
416        ArgumentNotValid.checkNotNullOrEmpty(credentials, "credentials");
417
418        String msg = "Requesting remove of file '" + fileName + "' with checksum '" + checksum + "' from bitarchive '"
419                + bitarchiveId + "'";
420        log.warn(msg);
421        NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
422        RemoveAndGetFileMessage aMsg = new RemoveAndGetFileMessage(Channels.getTheRepos(),
423                Channels.getThisReposClient(), fileName, bitarchiveId, checksum, credentials);
424        RemoveAndGetFileMessage replyMsg = (RemoveAndGetFileMessage) sendAndWaitForOneReply(aMsg, storeTimeout);
425
426        // The removed file is returned, move to temp location
427        if (replyMsg != null) {
428            if (replyMsg.isOk()) {
429                File removedFile = replyMsg.getData();
430                log.debug("Stored copy of removed file: {} as: {}", fileName, removedFile.getAbsolutePath());
431                return removedFile;
432            } else {
433                throw new IOFailure("Could not delete remote file: " + replyMsg.getErrMsg());
434            }
435        } else {
436            throw new IOFailure("Request timed out while requesting remove of " + "file '" + fileName
437                    + "' in bitarchive '" + bitarchiveId + "'");
438        }
439    }
440
441    /**
442     * Retrieves all the checksum from the replica through a GetAllChecksumMessage.
443     * <p>
444     * This is the checksum archive alternative to running a ChecksumBatchJob.
445     *
446     * @param replicaId The id of the replica from which the checksums should be retrieved.
447     * @return A file containing filename and checksum of all the files in an archive in the same format as a
448     * ChecksumJob.
449     * @throws IOFailure If the reply is not of type GetAllChecksumsMessage or if the file could not properly be
450     * retrieved from the reply message or if the message timed out.
451     * @throws ArgumentNotValid If the replicaId is null or empty.
452     * @see dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage
453     */
454    public File getAllChecksums(String replicaId) throws IOFailure, ArgumentNotValid {
455        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
456        log.debug("Sending GetAllChecksumMessage to replica '{}'.", replicaId);
457        // time this.
458        long start = System.currentTimeMillis();
459        // make and send the message to the replica.
460        GetAllChecksumsMessage gacMsg = new GetAllChecksumsMessage(Channels.getTheRepos(), replyQ, replicaId);
461        NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gacMsg, 0);
462
463        // calculate and log the time spent on handling the message.
464        long timePassed = System.currentTimeMillis() - start;
465        log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND));
466        // check whether the output was valid.
467        if (replyNetMsg == null) {
468            throw new IOFailure("Request for all checksum timed out after " + (getTimeout / MILLISECONDS_PER_SECOND)
469                    + " seconds.");
470        }
471        // convert to the correct type of message.
472        GetAllChecksumsMessage replyCSMsg;
473        try {
474            replyCSMsg = (GetAllChecksumsMessage) replyNetMsg;
475        } catch (ClassCastException e) {
476            throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e);
477        }
478
479        try {
480            // retrieve the data from this message and place it in tempDir.
481            File result = File.createTempFile("tmp", "tmp", FileUtils.getTempDir());
482            replyCSMsg.getData(result);
483
484            return result;
485        } catch (IOException e) {
486            throw new IOFailure("Cannot create a temporary file for retrieving "
487                    + "the data remote from checksum message: " + replyCSMsg, e);
488        }
489    }
490
491    /**
492     * Retrieves the names of all the files in the replica through a GetAllFilenamesMessage.
493     * <p>
494     * This is the checksum archive alternative to running a FilelistBatchJob.
495     *
496     * @param replicaId The id of the replica from which the list of filenames should be retrieved.
497     * @return A file with all the filenames within the archive of the given replica. A null is returned if the message
498     * timeout.
499     * @throws IOFailure If the reply is not of type GetAllFilenamesMessage or if the file could not properly be
500     * retrieved from the reply message
501     * @throws ArgumentNotValid If the replicaId is null or empty.
502     * @see dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage
503     */
504    public File getAllFilenames(String replicaId) throws ArgumentNotValid, IOFailure {
505        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
506        log.debug("Sending GetAllFilenamesMessage to replica '{}'.", replicaId);
507        // time this.
508        long start = System.currentTimeMillis();
509        // make and send the message to the replica.
510        GetAllFilenamesMessage gafMsg = new GetAllFilenamesMessage(Channels.getTheRepos(), replyQ, replicaId);
511        NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gafMsg, 0);
512
513        // calculate and log the time spent on handling the message.
514        long timePassed = System.currentTimeMillis() - start;
515        log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND));
516        // check whether the output was valid.
517        if (replyNetMsg == null) {
518            throw new IOFailure("Request for all filenames timed out after " + (getTimeout / MILLISECONDS_PER_SECOND)
519                    + " seconds.");
520        }
521        // convert to the correct type of message.
522        GetAllFilenamesMessage replyCSMsg;
523        try {
524            replyCSMsg = (GetAllFilenamesMessage) replyNetMsg;
525        } catch (ClassCastException e) {
526            throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e);
527        }
528
529        try {
530            // retrieve the data from this message.
531            File result = File.createTempFile("tmp", "tmp", FileUtils.getTempDir());
532            replyCSMsg.getData(result);
533
534            return result;
535        } catch (IOException e) {
536            throw new IOFailure("Cannot create a temporary file for retrieving "
537                    + " the data remote from checksum message: " + replyCSMsg, e);
538        }
539    }
540
541    /**
542     * Retrieves the checksum of a specific file.
543     * <p>
544     * This is the checksum archive alternative to running a ChecksumJob limited to a specific file.
545     *
546     * @param replicaId The ID of the replica to send the message.
547     * @param filename The name of the file for whom the checksum should be retrieved.
548     * @return The checksum of the file in the replica.
549     * @throws IOFailure If the reply is not of type GetChecksumMessage. Or if the message timed out.
550     * @throws ArgumentNotValid If either the replicaId of the filename is null or empty.
551     */
552    public String getChecksum(String replicaId, String filename) throws ArgumentNotValid, IOFailure {
553        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
554        ArgumentNotValid.checkNotNullOrEmpty(filename, "String filename");
555        log.debug("Sending GetChecksumMessage to replica '{}' for file '{}'.", replicaId, filename);
556        // time this.
557        long start = System.currentTimeMillis();
558        // make and send the message to the replica.
559        GetChecksumMessage gcsMsg = new GetChecksumMessage(Channels.getTheRepos(), replyQ, filename, replicaId);
560        NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gcsMsg, 0);
561        // calculate and log the time spent on handling the message.
562        long timePassed = System.currentTimeMillis() - start;
563        log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND));
564        // check whether the output was valid.
565        if (replyNetMsg == null) {
566            throw new IOFailure("Request for checksum timed out after " + (getTimeout / MILLISECONDS_PER_SECOND)
567                    + " seconds.");
568        }
569
570        // convert to the expected type of message.
571        GetChecksumMessage replyCSMsg;
572        try {
573            replyCSMsg = (GetChecksumMessage) replyNetMsg;
574        } catch (ClassCastException e) {
575            throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e);
576        }
577
578        if (!replyCSMsg.isOk()) {
579            log.warn("The reply message for retrieval of checksum was not OK. Tries to extract checksum anyway. {}",
580                    replyCSMsg.getErrMsg());
581        }
582        return replyCSMsg.getChecksum();
583    }
584
585    /**
586     * Method for correcting an entry in a replica. This is done by sending a correct message to the replica.
587     * <p>
588     * The file which is removed from the replica is put into the tempDir.
589     *
590     * @param replicaId The id of the replica to send the message.
591     * @param checksum The checksum of the corrupt entry in the archive. It is important to validate that the checksum
592     * actually is wrong before correcting the entry.
593     * @param file The file to correct the entry in the archive of the replica.
594     * @param credentials A string with the password for allowing changes inside an archive. If it does not correspond
595     * to the credentials of the archive, the correction will not be allowed.
596     * @return The corrupted file from the archive.
597     * @throws IOFailure If the message is not handled properly.
598     * @throws ArgumentNotValid If the replicaId, the checksum or the credentials are either null or empty, or if file
599     * is null.
600     */
601    public File correct(String replicaId, String checksum, File file, String credentials) throws IOFailure,
602            ArgumentNotValid {
603        ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId");
604        ArgumentNotValid.checkNotNullOrEmpty(checksum, "String checksum");
605        ArgumentNotValid.checkNotNull(file, "File file");
606        ArgumentNotValid.checkNotNullOrEmpty(credentials, "String credentials");
607
608        RemoteFile rm = RemoteFileFactory.getCopyfileInstance(file);
609        CorrectMessage correctMsg = new CorrectMessage(Channels.getTheRepos(), replyQ, checksum, rm, replicaId,
610                credentials);
611        CorrectMessage responseMessage = (CorrectMessage) sendAndWaitForOneReply(correctMsg, 0);
612
613        if (responseMessage == null) {
614            throw new IOFailure("Correct Message timed out before returning." + " File not found?");
615        } else if (!responseMessage.isOk()) {
616            throw new IOFailure("CorrectMessage failed: " + responseMessage.getErrMsg());
617        }
618
619        // retrieve the wrong file.
620        RemoteFile removedFile = responseMessage.getRemovedFile();
621        try {
622            File destFile = new File(FileUtils.getTempDir(), removedFile.getName());
623            removedFile.copyTo(destFile);
624            return destFile;
625        } catch (Throwable e) {
626            String errMsg = "Problems occured during retrieval of file " + "removed from archive.";
627            log.warn(errMsg, e);
628            throw new IOFailure(errMsg, e);
629        }
630    }
631
632}