001/*
002 * #%L
003 * Netarchivesuite - archive
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.archive.checksum.distribute;
024
025import java.io.File;
026
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import dk.netarkivet.archive.ArchiveSettings;
031import dk.netarkivet.archive.bitarchive.distribute.UploadMessage;
032import dk.netarkivet.archive.checksum.ChecksumArchive;
033import dk.netarkivet.common.CommonSettings;
034import dk.netarkivet.common.distribute.Channels;
035import dk.netarkivet.common.distribute.JMSConnectionFactory;
036import dk.netarkivet.common.distribute.RemoteFile;
037import dk.netarkivet.common.distribute.RemoteFileFactory;
038import dk.netarkivet.common.exceptions.ArgumentNotValid;
039import dk.netarkivet.common.exceptions.IllegalState;
040import dk.netarkivet.common.exceptions.UnknownID;
041import dk.netarkivet.common.utils.FileUtils;
042import dk.netarkivet.common.utils.NotificationType;
043import dk.netarkivet.common.utils.NotificationsFactory;
044import dk.netarkivet.common.utils.Settings;
045import dk.netarkivet.common.utils.SystemUtils;
046
047/**
048 * The server for the ChecksumFileApplication. Used for communication between the ArcRepository and the checksum
049 * archive.
050 */
051public class ChecksumFileServer extends ChecksumArchiveServer {
052
053    /** The logger used by this class. */
054    private static final Logger log = LoggerFactory.getLogger(ChecksumFileServer.class);
055
056    /** The instance of this server. */
057    protected static ChecksumFileServer instance;
058
059    /** The archive which contain the actual data. */
060    protected ChecksumArchive cs;
061
062    /** The character to separate the applicationInstanceId and the IP address. */
063    public static final String APPLICATION_ID_SEPARATOR = "_";
064
065        private boolean usePrecomputedChecksumDuringUpload;
066
067    /**
068     * Returns the unique instance of this class.
069     * <p>
070     * The server creates an instance of the checksum it creates access to and starts to listen to a JMS messages on the
071     * incoming JMS queue.
072     * <p>
073     * <p>
074     * Should this do the heart beats to a monitor? This would be quite odd, since Checksum does not use a monitor.
075     *
076     * @return This instance.
077     */
078    public static ChecksumFileServer getInstance() {
079        if (instance == null) {
080            instance = new ChecksumFileServer();
081        }
082        return instance;
083    }
084
085    /**
086     * Constructor.
087     */
088    private ChecksumFileServer() {
089        // log that this instance is been invoked.
090        log.info("Initialising the ChecksumFileServer.");
091
092        // get the instance of the checksum archive
093        cs = ChecksumArchiveFactory.getInstance();
094
095        // initialise the JMSConnection.
096        jmsCon = JMSConnectionFactory.getInstance();
097
098        // initialise the channel.
099        theCR = Channels.getTheCR();
100
101        // Start listening to the channel.
102        jmsCon.setListener(theCR, this);
103
104        // create the application identifier
105        checksumAppId = createAppId();
106
107        usePrecomputedChecksumDuringUpload = Settings.getBoolean(ArchiveSettings.CHECKSUM_USE_PRECOMPUTED_CHECKSUM_DURING_UPLOAD);
108        
109        // log that this instance has successfully been invoked.
110        log.info("ChecksumFileServer '{}' initialised. Using precomputedChecksums during upload: {}", checksumAppId, usePrecomputedChecksumDuringUpload);
111    }
112
113    /**
114     * Method for closing the instance.
115     */
116    public void close() {
117        log.info("ChecksumFileServer '{}' closing down.", checksumAppId);
118        cleanup();
119        if (jmsCon != null) {
120            jmsCon.removeListener(theCR, this);
121            jmsCon = null;
122        }
123        log.info("ChecksumFileServer '{}' closed down.", checksumAppId);
124    }
125
126    /**
127     * Method for cleaning up, when closing this instance down.
128     */
129    public void cleanup() {
130        instance = null;
131        cs.cleanup();
132    }
133
134    /**
135     * Method for retrieving the identification of this application.
136     *
137     * @return The id of this application.
138     */
139    public String getAppId() {
140        return checksumAppId;
141    }
142
143    /**
144     * Method for creating the identification for this application.
145     *
146     * @return The id of this application.
147     */
148    protected String createAppId() {
149        String id;
150        // Create an id with the IP address of this current host
151        id = SystemUtils.getLocalIP();
152
153        // Append an underscore and APPLICATION_INSTANCE_ID from settings
154        // to the id, if specified in settings.
155        // If no APPLICATION_INSTANCE_ID is found do nothing.
156        try {
157            String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID);
158            if (!applicationInstanceId.isEmpty()) {
159                id += APPLICATION_ID_SEPARATOR + applicationInstanceId;
160            }
161        } catch (UnknownID e) {
162            // Ignore the fact, that there is no APPLICATION_INSTANCE_ID in
163            // settings
164            log.warn("No setting APPLICATION_INSTANCE_ID found in settings: ", e);
165        }
166        return id;
167    }
168
169    /**
170     * The method for uploading arc files. Note that cleanup of the upload file embedded in the message is delegated the
171     * method {@link ChecksumArchive#upload(RemoteFile, String)}
172     *
173     * @param msg The upload message, containing the file to upload.
174     * @throws ArgumentNotValid If the UploadMessage is null.
175     */
176    public void visit(UploadMessage msg) throws ArgumentNotValid {
177        ArgumentNotValid.checkNotNull(msg, "UploadMessage msg");
178        log.debug("Receiving UploadMessage: " + msg.toString());
179        try {
180            try {
181                if (usePrecomputedChecksumDuringUpload) {
182                        cs.upload(msg.getPrecomputedChecksum(), msg.getArcfileName());
183                } else {
184                        cs.upload(msg.getRemoteFile(), msg.getArcfileName());
185                }
186            } catch (Throwable e) {
187                log.warn("Cannot process upload message '{}'", msg, e);
188                msg.setNotOk(e);
189            } finally { // check if enough space
190                if (!cs.hasEnoughSpace()) {
191                    jmsCon.removeListener(theCR, this);
192                        String errMsg = "Not enough space any more. Stopped listening to messages from " 
193                                        + theCR + ". Restart application after fixing problem";
194                        
195                    log.warn(errMsg);
196                    NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR);
197                }
198            }
199        } catch (Throwable e) {
200            log.warn("Cannnot remove listener after upload message '{}'", msg, e);
201        } finally {
202            log.debug("Replying to UploadMessage: {}", msg.toString());
203            jmsCon.reply(msg);
204        }
205    }
206
207    /**
208     * Method for correcting an entry in the archive. It start by ensuring that the file exists, then it checks the
209     * credentials. Then it is checked whether the "bad entry" does have the "bad checksum". If no problems occurred,
210     * then the bad entry will be corrected by the archive (the bad entry is removed from the archive file and put into
211     * the "wrong entry" file. Then the new entry is placed in the archive file.
212     * <p>
213     * If it fails in any of the above, then the method fails (throws an exception which is caught and use for replying
214     * NotOk to the message).
215     *
216     * @param msg The message containing the correct instance of the file to correct.
217     * @throws ArgumentNotValid If the correct message is null.
218     */
219    public void visit(CorrectMessage msg) throws ArgumentNotValid {
220        ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg");
221        log.debug("Receiving correct message: {}", msg.toString());
222        // the file for containing the received file from the message.
223        File correctFile = null;
224        try {
225            String filename = msg.getArcfileName();
226            String currentCs = cs.getChecksum(filename);
227            String incorrectCs = msg.getIncorrectChecksum();
228
229            // ensure that the entry actually exists.
230            if (currentCs == null) {
231                // This exception is logged later.
232                throw new IllegalState("Cannot correct an entry for the file '" + filename
233                        + "', since it is not within the archive.");
234            }
235
236            // Check credentials
237            String credentialsReceived = msg.getCredentials();
238            if (credentialsReceived == null || credentialsReceived.isEmpty()
239                    || !credentialsReceived.equals(Settings.get(ArchiveSettings.ENVIRONMENT_THIS_CREDENTIALS))) {
240                throw new IllegalState("The received credentials '" + credentialsReceived
241                        + "' were invalid. The entry of " + "file '" + filename + "' will not be corrected.");
242            }
243
244            // check that the current checksum is incorrect as supposed.
245            if (!currentCs.equals(incorrectCs)) {
246                throw new IllegalState("Wrong checksum for the entry for file '" + filename + "' has the checksum '"
247                        + currentCs + "', " + "though it was supposed to have the checksum '" + incorrectCs + "'.");
248            }
249
250            // retrieve the data as a file.
251            correctFile = File.createTempFile("correct", filename, FileUtils.getTempDir());
252            msg.getData(correctFile);
253
254            // Log and notify
255            String warning = "The record for file '" + filename + "' is being corrected at '"
256                    + Settings.get(CommonSettings.USE_REPLICA_ID) + "'";
257            log.warn(warning);
258            NotificationsFactory.getInstance().notify(warning, NotificationType.WARNING);
259
260            // put the file into the archive.
261            File badFile = cs.correct(filename, correctFile);
262
263            // Send the file containing the removed entry back.
264            msg.setRemovedFile(RemoteFileFactory.getMovefileInstance(badFile));
265        } catch (Throwable t) {
266            // Handle errors.
267            log.warn("Cannot handle CorrectMessage: '{}'", msg, t);
268            msg.setNotOk(t);
269        } finally {
270            // log and reply at the end.
271            log.info("Replying CorrectMessage: {}", msg.toString());
272            jmsCon.reply(msg);
273
274            // cleanup the data file
275            if (correctFile != null) {
276                FileUtils.remove(correctFile);
277            }
278        }
279    }
280
281    /**
282     * Method for retrieving the checksum of a record.
283     *
284     * @param msg The GetChecksumMessage which contains the name of the record to have its checksum retrieved.
285     * @throws ArgumentNotValid If the message is null.
286     */
287    public void visit(GetChecksumMessage msg) throws ArgumentNotValid {
288        ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg");
289
290        log.debug("Receiving GetChecksumMessage: {}", msg.toString());
291        try {
292            // get the name of the arc file
293            String filename = msg.getArcfileName();
294            // get the checksum of the arc file
295            String checksum = cs.getChecksum(filename);
296
297            // Check if the checksum was found. If not throw exception.
298            if (checksum == null || checksum.isEmpty()) {
299                // The error is logged, when the exception is caught.
300                throw new IllegalState("Cannot fetch checksum of an entry, " + filename
301                        + ", which is not within the archive.");
302            }
303
304            // send the checksum of the arc file.
305            msg.setChecksum(checksum);
306        } catch (Throwable e) {
307            // Handle errors (if the file cannot be found).
308            log.warn("Cannot handle '{}' containing the message: {}", msg.getClass().getName(), msg, e);
309            msg.setNotOk(e);
310        } finally {
311            // TODO this should be set elsewhere.
312            msg.setIsReply();
313            // log the message and reply.
314            log.info("Replying GetChecksumMessage: {}", msg.toString());
315            jmsCon.reply(msg);
316        }
317    }
318
319    /**
320     * Method for retrieving all the filenames within the archive.
321     *
322     * @param msg The GetAllFilenamesMessage.
323     * @throws ArgumentNotValid If the GetAllFilenamesMessages is null.
324     */
325    public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid {
326        ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg");
327        log.debug("Receiving GetAllFilenamesMessage: {}", msg.toString());
328
329        try {
330            // get all the file names
331            msg.setFile(cs.getAllFilenames());
332        } catch (Throwable e) {
333            log.warn("Cannot retrieve the filenames to reply on the {} : {}", msg.getClass().getName(), msg, e);
334            msg.setNotOk(e);
335        } finally {
336            // log the message and reply.
337            log.info("Replying GetAllFilenamesMessage: {}", msg.toString());
338            jmsCon.reply(msg);
339        }
340    }
341
342    /**
343     * Method for retrieving a map containing all the checksums and their corresponding filenames within the archive.
344     *
345     * @param msg The GetAllChecksumMessage.
346     * @throws ArgumentNotValid If the GetAllChecksumMessage is null.
347     */
348    public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid {
349        ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg");
350        log.debug("Receiving GetAllChecksumsMessage: {}", msg.toString());
351
352        try {
353            msg.setFile(cs.getArchiveAsFile());
354        } catch (Throwable e) {
355            log.warn("Cannot retrieve all the checksums.", e);
356            msg.setNotOk(e);
357        } finally {
358            // log the message and reply
359            log.info("Replying GetAllChecksumsMessage: {}", msg.toString());
360            jmsCon.reply(msg);
361        }
362    }
363
364}