001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.checksum.distribute; 024 025import java.io.File; 026 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import dk.netarkivet.archive.ArchiveSettings; 031import dk.netarkivet.archive.bitarchive.distribute.UploadMessage; 032import dk.netarkivet.archive.checksum.ChecksumArchive; 033import dk.netarkivet.common.CommonSettings; 034import dk.netarkivet.common.distribute.Channels; 035import dk.netarkivet.common.distribute.JMSConnectionFactory; 036import dk.netarkivet.common.distribute.RemoteFile; 037import dk.netarkivet.common.distribute.RemoteFileFactory; 038import dk.netarkivet.common.exceptions.ArgumentNotValid; 039import dk.netarkivet.common.exceptions.IllegalState; 040import dk.netarkivet.common.exceptions.UnknownID; 041import dk.netarkivet.common.utils.FileUtils; 042import dk.netarkivet.common.utils.NotificationType; 043import dk.netarkivet.common.utils.NotificationsFactory; 044import dk.netarkivet.common.utils.Settings; 045import dk.netarkivet.common.utils.SystemUtils; 046 047/** 048 * The server for the ChecksumFileApplication. Used for communication between the ArcRepository and the checksum 049 * archive. 050 */ 051public class ChecksumFileServer extends ChecksumArchiveServer { 052 053 /** The logger used by this class. */ 054 private static final Logger log = LoggerFactory.getLogger(ChecksumFileServer.class); 055 056 /** The instance of this server. */ 057 protected static ChecksumFileServer instance; 058 059 /** The archive which contain the actual data. */ 060 protected ChecksumArchive cs; 061 062 /** The character to separate the applicationInstanceId and the IP address. */ 063 public static final String APPLICATION_ID_SEPARATOR = "_"; 064 065 private boolean usePrecomputedChecksumDuringUpload; 066 067 /** 068 * Returns the unique instance of this class. 069 * <p> 070 * The server creates an instance of the checksum it creates access to and starts to listen to a JMS messages on the 071 * incoming JMS queue. 072 * <p> 073 * <p> 074 * Should this do the heart beats to a monitor? This would be quite odd, since Checksum does not use a monitor. 075 * 076 * @return This instance. 077 */ 078 public static ChecksumFileServer getInstance() { 079 if (instance == null) { 080 instance = new ChecksumFileServer(); 081 } 082 return instance; 083 } 084 085 /** 086 * Constructor. 087 */ 088 private ChecksumFileServer() { 089 // log that this instance is been invoked. 090 log.info("Initialising the ChecksumFileServer."); 091 092 // get the instance of the checksum archive 093 cs = ChecksumArchiveFactory.getInstance(); 094 095 // initialise the JMSConnection. 096 jmsCon = JMSConnectionFactory.getInstance(); 097 098 // initialise the channel. 099 theCR = Channels.getTheCR(); 100 101 // Start listening to the channel. 102 jmsCon.setListener(theCR, this); 103 104 // create the application identifier 105 checksumAppId = createAppId(); 106 107 usePrecomputedChecksumDuringUpload = Settings.getBoolean(ArchiveSettings.CHECKSUM_USE_PRECOMPUTED_CHECKSUM_DURING_UPLOAD); 108 109 // log that this instance has successfully been invoked. 110 log.info("ChecksumFileServer '{}' initialised. Using precomputedChecksums during upload: {}", checksumAppId, usePrecomputedChecksumDuringUpload); 111 } 112 113 /** 114 * Method for closing the instance. 115 */ 116 public void close() { 117 log.info("ChecksumFileServer '{}' closing down.", checksumAppId); 118 cleanup(); 119 if (jmsCon != null) { 120 jmsCon.removeListener(theCR, this); 121 jmsCon = null; 122 } 123 log.info("ChecksumFileServer '{}' closed down.", checksumAppId); 124 } 125 126 /** 127 * Method for cleaning up, when closing this instance down. 128 */ 129 public void cleanup() { 130 instance = null; 131 cs.cleanup(); 132 } 133 134 /** 135 * Method for retrieving the identification of this application. 136 * 137 * @return The id of this application. 138 */ 139 public String getAppId() { 140 return checksumAppId; 141 } 142 143 /** 144 * Method for creating the identification for this application. 145 * 146 * @return The id of this application. 147 */ 148 protected String createAppId() { 149 String id; 150 // Create an id with the IP address of this current host 151 id = SystemUtils.getLocalIP(); 152 153 // Append an underscore and APPLICATION_INSTANCE_ID from settings 154 // to the id, if specified in settings. 155 // If no APPLICATION_INSTANCE_ID is found do nothing. 156 try { 157 String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID); 158 if (!applicationInstanceId.isEmpty()) { 159 id += APPLICATION_ID_SEPARATOR + applicationInstanceId; 160 } 161 } catch (UnknownID e) { 162 // Ignore the fact, that there is no APPLICATION_INSTANCE_ID in 163 // settings 164 log.warn("No setting APPLICATION_INSTANCE_ID found in settings: ", e); 165 } 166 return id; 167 } 168 169 /** 170 * The method for uploading arc files. Note that cleanup of the upload file embedded in the message is delegated the 171 * method {@link ChecksumArchive#upload(RemoteFile, String)} 172 * 173 * @param msg The upload message, containing the file to upload. 174 * @throws ArgumentNotValid If the UploadMessage is null. 175 */ 176 public void visit(UploadMessage msg) throws ArgumentNotValid { 177 ArgumentNotValid.checkNotNull(msg, "UploadMessage msg"); 178 log.debug("Receiving UploadMessage: " + msg.toString()); 179 try { 180 try { 181 if (usePrecomputedChecksumDuringUpload) { 182 cs.upload(msg.getPrecomputedChecksum(), msg.getArcfileName()); 183 } else { 184 cs.upload(msg.getRemoteFile(), msg.getArcfileName()); 185 } 186 } catch (Throwable e) { 187 log.warn("Cannot process upload message '{}'", msg, e); 188 msg.setNotOk(e); 189 } finally { // check if enough space 190 if (!cs.hasEnoughSpace()) { 191 jmsCon.removeListener(theCR, this); 192 String errMsg = "Not enough space any more. Stopped listening to messages from " 193 + theCR + ". Restart application after fixing problem"; 194 195 log.warn(errMsg); 196 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR); 197 } 198 } 199 } catch (Throwable e) { 200 log.warn("Cannnot remove listener after upload message '{}'", msg, e); 201 } finally { 202 log.debug("Replying to UploadMessage: {}", msg.toString()); 203 jmsCon.reply(msg); 204 } 205 } 206 207 /** 208 * Method for correcting an entry in the archive. It start by ensuring that the file exists, then it checks the 209 * credentials. Then it is checked whether the "bad entry" does have the "bad checksum". If no problems occurred, 210 * then the bad entry will be corrected by the archive (the bad entry is removed from the archive file and put into 211 * the "wrong entry" file. Then the new entry is placed in the archive file. 212 * <p> 213 * If it fails in any of the above, then the method fails (throws an exception which is caught and use for replying 214 * NotOk to the message). 215 * 216 * @param msg The message containing the correct instance of the file to correct. 217 * @throws ArgumentNotValid If the correct message is null. 218 */ 219 public void visit(CorrectMessage msg) throws ArgumentNotValid { 220 ArgumentNotValid.checkNotNull(msg, "CorrectMessage msg"); 221 log.debug("Receiving correct message: {}", msg.toString()); 222 // the file for containing the received file from the message. 223 File correctFile = null; 224 try { 225 String filename = msg.getArcfileName(); 226 String currentCs = cs.getChecksum(filename); 227 String incorrectCs = msg.getIncorrectChecksum(); 228 229 // ensure that the entry actually exists. 230 if (currentCs == null) { 231 // This exception is logged later. 232 throw new IllegalState("Cannot correct an entry for the file '" + filename 233 + "', since it is not within the archive."); 234 } 235 236 // Check credentials 237 String credentialsReceived = msg.getCredentials(); 238 if (credentialsReceived == null || credentialsReceived.isEmpty() 239 || !credentialsReceived.equals(Settings.get(ArchiveSettings.ENVIRONMENT_THIS_CREDENTIALS))) { 240 throw new IllegalState("The received credentials '" + credentialsReceived 241 + "' were invalid. The entry of " + "file '" + filename + "' will not be corrected."); 242 } 243 244 // check that the current checksum is incorrect as supposed. 245 if (!currentCs.equals(incorrectCs)) { 246 throw new IllegalState("Wrong checksum for the entry for file '" + filename + "' has the checksum '" 247 + currentCs + "', " + "though it was supposed to have the checksum '" + incorrectCs + "'."); 248 } 249 250 // retrieve the data as a file. 251 correctFile = File.createTempFile("correct", filename, FileUtils.getTempDir()); 252 msg.getData(correctFile); 253 254 // Log and notify 255 String warning = "The record for file '" + filename + "' is being corrected at '" 256 + Settings.get(CommonSettings.USE_REPLICA_ID) + "'"; 257 log.warn(warning); 258 NotificationsFactory.getInstance().notify(warning, NotificationType.WARNING); 259 260 // put the file into the archive. 261 File badFile = cs.correct(filename, correctFile); 262 263 // Send the file containing the removed entry back. 264 msg.setRemovedFile(RemoteFileFactory.getMovefileInstance(badFile)); 265 } catch (Throwable t) { 266 // Handle errors. 267 log.warn("Cannot handle CorrectMessage: '{}'", msg, t); 268 msg.setNotOk(t); 269 } finally { 270 // log and reply at the end. 271 log.info("Replying CorrectMessage: {}", msg.toString()); 272 jmsCon.reply(msg); 273 274 // cleanup the data file 275 if (correctFile != null) { 276 FileUtils.remove(correctFile); 277 } 278 } 279 } 280 281 /** 282 * Method for retrieving the checksum of a record. 283 * 284 * @param msg The GetChecksumMessage which contains the name of the record to have its checksum retrieved. 285 * @throws ArgumentNotValid If the message is null. 286 */ 287 public void visit(GetChecksumMessage msg) throws ArgumentNotValid { 288 ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg"); 289 290 log.debug("Receiving GetChecksumMessage: {}", msg.toString()); 291 try { 292 // get the name of the arc file 293 String filename = msg.getArcfileName(); 294 // get the checksum of the arc file 295 String checksum = cs.getChecksum(filename); 296 297 // Check if the checksum was found. If not throw exception. 298 if (checksum == null || checksum.isEmpty()) { 299 // The error is logged, when the exception is caught. 300 throw new IllegalState("Cannot fetch checksum of an entry, " + filename 301 + ", which is not within the archive."); 302 } 303 304 // send the checksum of the arc file. 305 msg.setChecksum(checksum); 306 } catch (Throwable e) { 307 // Handle errors (if the file cannot be found). 308 log.warn("Cannot handle '{}' containing the message: {}", msg.getClass().getName(), msg, e); 309 msg.setNotOk(e); 310 } finally { 311 // TODO this should be set elsewhere. 312 msg.setIsReply(); 313 // log the message and reply. 314 log.info("Replying GetChecksumMessage: {}", msg.toString()); 315 jmsCon.reply(msg); 316 } 317 } 318 319 /** 320 * Method for retrieving all the filenames within the archive. 321 * 322 * @param msg The GetAllFilenamesMessage. 323 * @throws ArgumentNotValid If the GetAllFilenamesMessages is null. 324 */ 325 public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid { 326 ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg"); 327 log.debug("Receiving GetAllFilenamesMessage: {}", msg.toString()); 328 329 try { 330 // get all the file names 331 msg.setFile(cs.getAllFilenames()); 332 } catch (Throwable e) { 333 log.warn("Cannot retrieve the filenames to reply on the {} : {}", msg.getClass().getName(), msg, e); 334 msg.setNotOk(e); 335 } finally { 336 // log the message and reply. 337 log.info("Replying GetAllFilenamesMessage: {}", msg.toString()); 338 jmsCon.reply(msg); 339 } 340 } 341 342 /** 343 * Method for retrieving a map containing all the checksums and their corresponding filenames within the archive. 344 * 345 * @param msg The GetAllChecksumMessage. 346 * @throws ArgumentNotValid If the GetAllChecksumMessage is null. 347 */ 348 public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid { 349 ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg"); 350 log.debug("Receiving GetAllChecksumsMessage: {}", msg.toString()); 351 352 try { 353 msg.setFile(cs.getArchiveAsFile()); 354 } catch (Throwable e) { 355 log.warn("Cannot retrieve all the checksums.", e); 356 msg.setNotOk(e); 357 } finally { 358 // log the message and reply 359 log.info("Replying GetAllChecksumsMessage: {}", msg.toString()); 360 jmsCon.reply(msg); 361 } 362 } 363 364}