001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.arcrepository.distribute; 024 025import java.io.File; 026import java.io.IOException; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage; 032import dk.netarkivet.archive.bitarchive.distribute.BatchMessage; 033import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage; 034import dk.netarkivet.archive.bitarchive.distribute.GetFileMessage; 035import dk.netarkivet.archive.bitarchive.distribute.GetMessage; 036import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage; 037import dk.netarkivet.archive.checksum.distribute.CorrectMessage; 038import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage; 039import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage; 040import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 041import dk.netarkivet.common.distribute.ChannelID; 042import dk.netarkivet.common.distribute.Channels; 043import dk.netarkivet.common.distribute.JMSConnectionFactory; 044import dk.netarkivet.common.distribute.NetarkivetMessage; 045import dk.netarkivet.common.distribute.RemoteFile; 046import dk.netarkivet.common.distribute.RemoteFileFactory; 047import dk.netarkivet.common.distribute.Synchronizer; 048import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClient; 049import dk.netarkivet.common.distribute.arcrepository.BatchStatus; 050import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord; 051import dk.netarkivet.common.distribute.arcrepository.Replica; 052import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState; 053import dk.netarkivet.common.exceptions.ArgumentNotValid; 054import dk.netarkivet.common.exceptions.IOFailure; 055import dk.netarkivet.common.utils.ExceptionUtils; 056import dk.netarkivet.common.utils.FileUtils; 057import dk.netarkivet.common.utils.NotificationType; 058import dk.netarkivet.common.utils.NotificationsFactory; 059import dk.netarkivet.common.utils.Settings; 060import dk.netarkivet.common.utils.batch.FileBatchJob; 061 062/** 063 * Client side usage of an arc repository. All requests are forwarded to the ArcRepositoryServer over the network. get 064 * and store messages are retried a number of time before giving up, and will timeout after a specified time. 065 */ 066public class JMSArcRepositoryClient extends Synchronizer implements ArcRepositoryClient { 067 068 /** The default place in classpath where the settings file can be found. */ 069 private static String defaultSettingsClasspath = "dk/netarkivet/archive/arcrepository/distribute/JMSArcRepositoryClientSettings.xml"; 070 071 /* 072 * The static initialiser is called when the class is loaded. It will add default values for all settings defined in 073 * this class, by loading them from a settings.xml file in classpath. 074 */ 075 static { 076 Settings.addDefaultClasspathSettings(defaultSettingsClasspath); 077 } 078 079 /** The amount of milliseconds in a second. 1000. */ 080 private static final int MILLISECONDS_PER_SECOND = 1000; 081 082 /** the one and only JMSArcRepositoryClient instance. */ 083 private static JMSArcRepositoryClient instance; 084 085 /** Logging output place. */ 086 protected static final Logger log = LoggerFactory.getLogger(JMSArcRepositoryClient.class); 087 088 /** Listens on this queue for replies. */ 089 private final ChannelID replyQ; 090 091 /** The number of times to try sending a store message before giving up. */ 092 private long storeRetries; 093 094 /** The length of time to wait for a store reply before giving up. */ 095 private long storeTimeout; 096 097 /** The length of time to wait for a get reply before giving up. */ 098 private long getTimeout; 099 100 // NOTE: The constants defining setting names below are left non-final on 101 // purpose! Otherwise, the static initialiser that loads default values 102 // will not run. 103 104 /** 105 * <b>settings.common.arcrepositoryClient.getTimeout</b>: <br> 106 * The setting for how many milliseconds we will wait before giving up on a lookup request to the Arcrepository. 107 */ 108 public static final String ARCREPOSITORY_GET_TIMEOUT = "settings.common.arcrepositoryClient.getTimeout"; 109 110 /** 111 * <b>settings.common.arcrepositoryClient.storeRetries</b>: <br> 112 * The setting for the number of times to try sending a store message before failing, including the first attempt. 113 */ 114 public static final String ARCREPOSITORY_STORE_RETRIES = "settings.common.arcrepositoryClient.storeRetries"; 115 116 /** 117 * <b>settings.common.arcrepositoryClient.storeTimeout</b>: <br> 118 * the setting for the timeout in milliseconds before retrying when calling {@link ArcRepositoryClient#store(File)}. 119 */ 120 public static final String ARCREPOSITORY_STORE_TIMEOUT = "settings.common.arcrepositoryClient.storeTimeout"; 121 122 /** Adds this Synchronizer as listener on a jms connection. */ 123 protected JMSArcRepositoryClient() { 124 storeRetries = Settings.getLong(ARCREPOSITORY_STORE_RETRIES); 125 storeTimeout = Settings.getLong(ARCREPOSITORY_STORE_TIMEOUT); 126 getTimeout = Settings.getLong(ARCREPOSITORY_GET_TIMEOUT); 127 128 log.info( 129 "JMSArcRepositoryClient will retry a store {} times and timeout on each try after {} milliseconds, and timeout on each getrequest after {} milliseconds.", 130 storeRetries, storeTimeout, getTimeout); 131 replyQ = Channels.getThisReposClient(); 132 JMSConnectionFactory.getInstance().setListener(replyQ, this); 133 log.info("JMSArcRepository listens for replies on channel '{}'", replyQ); 134 } 135 136 /** 137 * Get an JMSArcRepositoryClient instance. This is guaranteed to be a singleton. 138 * 139 * @return an JMSArcRepositoryClient instance. 140 */ 141 public static synchronized JMSArcRepositoryClient getInstance() { 142 if (instance == null) { 143 instance = new JMSArcRepositoryClient(); 144 } 145 return instance; 146 } 147 148 /** Removes this object as a JMS listener. */ 149 public void close() { 150 synchronized (JMSArcRepositoryClient.class) { 151 JMSConnectionFactory.getInstance().removeListener(replyQ, this); 152 instance = null; 153 } 154 } 155 156 /** 157 * Sends a GetMessage on the "TheArcrepos" queue and waits for a reply. This is a blocking call. Returns null if no 158 * message is returned within Settings.ARCREPOSITORY_GET_TIMEOUT 159 * 160 * @param arcfile The name of a file. 161 * @param index The offset of the wanted record in the file 162 * @return a BitarchiveRecord-object or null if request times out or object is not found. 163 * @throws ArgumentNotValid If the given arcfile is null or empty, or the given index is negative. 164 * @throws IOFailure If a wrong message is returned or the get operation failed. 165 */ 166 public BitarchiveRecord get(String arcfile, long index) throws ArgumentNotValid, IOFailure { 167 ArgumentNotValid.checkNotNullOrEmpty(arcfile, "arcfile"); 168 ArgumentNotValid.checkNotNegative(index, "index"); 169 log.debug("Requesting get of record '{}:{}'", arcfile, index); 170 long start = System.currentTimeMillis(); 171 GetMessage requestGetMsg = new GetMessage(Channels.getTheRepos(), replyQ, arcfile, index); 172 NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(requestGetMsg, getTimeout); 173 long timePassed = System.currentTimeMillis() - start; 174 log.debug("Reply received after {} seconds", (timePassed / MILLISECONDS_PER_SECOND)); 175 if (replyNetMsg == null) { 176 log.info("Request for record({}:{}) timed out after {} seconds. Returning null BitarchiveRecord", arcfile, 177 index, (getTimeout / MILLISECONDS_PER_SECOND)); 178 return null; 179 } 180 GetMessage replyGetMsg; 181 try { 182 replyGetMsg = (GetMessage) replyNetMsg; 183 } catch (ClassCastException e) { 184 throw new IOFailure("Received invalid argument reply: '" + replyNetMsg + "'", e); 185 } 186 if (!replyGetMsg.isOk()) { 187 throw new IOFailure("GetMessage failed: '" + replyGetMsg.getErrMsg() + "'"); 188 } 189 return replyGetMsg.getRecord(); 190 } 191 192 /** 193 * Synchronously retrieves a file from a bitarchive and places it in a local file. This is the interface for sending 194 * GetFileMessage on the "TheArcrepos" queue. This is a blocking call. 195 * 196 * @param arcfilename Name of the arcfile to retrieve. 197 * @param replica The bitarchive to retrieve the data from. 198 * @param toFile Filename of a place where the file fetched can be put. 199 * @throws ArgumentNotValid If the arcfilename are null or empty, or if either replica or toFile is null. 200 * @throws IOFailure if there are problems getting a reply or the file could not be found. 201 */ 202 public void getFile(String arcfilename, Replica replica, File toFile) throws ArgumentNotValid, IOFailure { 203 ArgumentNotValid.checkNotNullOrEmpty(arcfilename, "arcfilename"); 204 ArgumentNotValid.checkNotNull(replica, "replica"); 205 ArgumentNotValid.checkNotNull(toFile, "toFile"); 206 207 log.debug("Requesting get of file '{}' from '{}'", arcfilename, replica); 208 // ArgumentNotValid.checkNotNull(replyQ, "replyQ must not be null"); 209 GetFileMessage gfMsg = new GetFileMessage(Channels.getTheRepos(), replyQ, arcfilename, replica.getId()); 210 GetFileMessage getFileMessage = (GetFileMessage) sendAndWaitForOneReply(gfMsg, 0); 211 if (getFileMessage == null) { 212 throw new IOFailure("GetFileMessage timed out before returning." + "File not found?"); 213 } else if (!getFileMessage.isOk()) { 214 throw new IOFailure("GetFileMessage failed: " + getFileMessage.getErrMsg()); 215 } else { 216 getFileMessage.getData(toFile); 217 } 218 } 219 220 /** 221 * Sends a StoreMessage via the synchronized JMS connection method sendAndWaitForOneReply(). After a successful 222 * storage operation, both the local copy of the file and the copy on the ftp server are deleted. 223 * 224 * @param file A file to be stored. Must exist. 225 * @throws IOFailure thrown if store is unsuccessful, or failed to clean up files locally or on the ftp server after 226 * the store operation. 227 * @throws ArgumentNotValid if file parameter is null or file is not an existing file. 228 */ 229 public void store(File file) throws IOFailure, ArgumentNotValid { 230 ArgumentNotValid.checkNotNull(file, "file"); 231 ArgumentNotValid.checkTrue(file.isFile(), "The file '" + file.getPath() + "' is not an existing file."); 232 233 StringBuilder messages = new StringBuilder(); 234 for (long i = 0; i < storeRetries; i++) { 235 StoreMessage outMsg = null; 236 try { 237 log.debug("Sending a StoreMessage with file '{}'", file.getPath()); 238 outMsg = new StoreMessage(replyQ, file); 239 NetarkivetMessage replyMsg = sendAndWaitForOneReply(outMsg, storeTimeout); 240 if (replyMsg != null && replyMsg.isOk()) { 241 try { 242 FileUtils.removeRecursively(file); 243 } catch (IOFailure e) { 244 log.warn("Failed to clean up '{}'", file.getAbsolutePath(), e); 245 // Not fatal 246 } 247 return; 248 } else if (replyMsg == null) { 249 String msg = "Timed out" + " while waiting for reply on store of file '" + file.getPath() 250 + "' on attempt number " + (i + 1) + " of " + storeRetries; 251 log.warn(msg); 252 messages.append(msg).append("\n"); 253 } else { 254 String msg = "The returned message '" + replyMsg + "' was not ok while waiting for reply on store " 255 + "of file '" + file.getPath() + "' on attempt number " + (i + 1) + " of " + storeRetries 256 + ". Error message was '" + replyMsg.getErrMsg() + "'"; 257 log.warn(msg); 258 messages.append(msg).append("\n"); 259 } 260 } catch (Exception e) { 261 String msg = "Client-side exception occurred while storing '" + file.getPath() + "' on attempt number " 262 + (i + 1) + " of " + storeRetries + "."; 263 log.warn(msg, e); 264 messages.append(msg).append("\n"); 265 messages.append(ExceptionUtils.getStackTrace(e)); 266 } finally { 267 if (outMsg != null) { 268 cleanUpAfterStore(outMsg); 269 } 270 } 271 } 272 String errMsg = "Could not store '" + file.getPath() + "' after " + storeRetries + " attempts. Giving up.\n" 273 + messages; 274 log.error(errMsg); 275 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR); 276 throw new IOFailure(errMsg); 277 } 278 279 /** 280 * Tries to clean up a file on the FTP server after a store operation. Will not throw exception on error, merely log 281 * exception. 282 * 283 * @param m the StoreMessage sent back as reply 284 */ 285 private void cleanUpAfterStore(StoreMessage m) { 286 RemoteFile rf; 287 try { 288 rf = m.getRemoteFile(); 289 } catch (Exception e) { 290 log.warn("Could not get remote file object from message {}", m, e); 291 return; 292 } 293 try { 294 rf.cleanup(); 295 } catch (Exception e) { 296 log.warn("Could not delete remote file on ftp server: {}", rf, e); 297 } 298 } 299 300 /** 301 * Runs a batch batch job on each file in the ArcRepository. 302 * <p> 303 * Note: The id for the batchjob is the empty string, which removes the possibility of terminating the batchjob 304 * remotely while it is running. 305 * 306 * @param job An object that implements the FileBatchJob interface. The initialize() method will be called before 307 * processing and the finish() method will be called afterwards. The process() method will be called with each File 308 * entry. An optional function postProcess() allows handling the combined results of the batchjob, e.g. summing the 309 * results, sorting, etc. 310 * @param replicaId The archive to execute the job on. 311 * @param args The arguments for the batchjob. 312 * @return The status of the batch job after it ended. 313 */ 314 public BatchStatus batch(FileBatchJob job, String replicaId, String... args) { 315 return batch(job, replicaId, "", args); 316 } 317 318 /** 319 * Runs a batch job on each file in the ArcRepository. 320 * 321 * @param job An object that implements the FileBatchJob interface. The initialize() method will be called before 322 * processing and the finish() method will be called afterwards. The process() method will be called with each File 323 * entry. An optional function postProcess() allows handling the combined results of the batchjob, e.g. summing the 324 * results, sorting, etc. 325 * @param replicaId The archive to execute the job on. 326 * @param args The arguments for the batchjob. This is allowed to be null. 327 * @param batchId The id for the batch process. 328 * @return The status of the batch job after it ended. 329 * @throws ArgumentNotValid If the job is null or the replicaId is either null or the empty string. 330 * @throws IOFailure If no result file is returned. 331 */ 332 public BatchStatus batch(FileBatchJob job, String replicaId, String batchId, String... args) throws IOFailure, 333 ArgumentNotValid { 334 ArgumentNotValid.checkNotNull(job, "FileBatchJob job"); 335 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 336 337 log.debug("Starting batchjob '{}' running on replica '{}'", job, replicaId); 338 BatchMessage bMsg = new BatchMessage(Channels.getTheRepos(), replyQ, job, replicaId, batchId, args); 339 log.debug("Sending batchmessage to queue '{}' with replyqueue set to '{}'", Channels.getTheRepos(), replyQ); 340 BatchReplyMessage brMsg = (BatchReplyMessage) sendAndWaitForOneReply(bMsg, 0); 341 if (!brMsg.isOk()) { 342 String msg = "The batch job '" + bMsg + "' resulted in the following " + "error: " + brMsg.getErrMsg(); 343 log.warn(msg); 344 if (brMsg.getResultFile() == null) { 345 // If no result is available at all, this is non-recoverable 346 throw new IOFailure(msg); 347 } 348 } 349 return new BatchStatus(brMsg.getFilesFailed(), brMsg.getNoOfFilesProcessed(), brMsg.getResultFile(), 350 job.getExceptions()); 351 } 352 353 /** 354 * Request update of admin data to specific state. 355 * 356 * @param fileName The file for which admin data should be updated. 357 * @param replicaId The id if the replica that the administrative data for fileName is wrong for. 358 * @param newval The new value in admin data. 359 * @throws ArgumentNotValid If one of the arguments are invalid (null or empty string). 360 * @throws IOFailure If the reply to the request update timed out. 361 */ 362 public void updateAdminData(String fileName, String replicaId, ReplicaStoreState newval) throws ArgumentNotValid, 363 IOFailure { 364 ArgumentNotValid.checkNotNullOrEmpty(fileName, "String fileName"); 365 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 366 ArgumentNotValid.checkNotNull(newval, "ReplicaStoreState newval"); 367 368 String msg = "Requesting update of admin data for file '" + fileName + "' replica '" + replicaId 369 + "' to state " + newval; 370 log.warn(msg); 371 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 372 AdminDataMessage aMsg = new AdminDataMessage(fileName, replicaId, newval); 373 // We only need to know that a reply to our message has arrived. 374 // The replyMessage is thrown away, because it does not contain 375 // any more useful knowledge. 376 sendAndWaitForOneReply(aMsg, 0); 377 } 378 379 /** 380 * Request update of admin data to specific checksum. 381 * 382 * @param filename The file for which admin data should be updated. 383 * @param checksum The new checksum for the file 384 */ 385 public void updateAdminChecksum(String filename, String checksum) { 386 ArgumentNotValid.checkNotNullOrEmpty(filename, "filename"); 387 ArgumentNotValid.checkNotNullOrEmpty(checksum, "checksum"); 388 389 String msg = "Requesting update of admin data for file '" + filename + "' to checksum '" + checksum; 390 log.warn(msg); 391 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 392 AdminDataMessage aMsg = new AdminDataMessage(filename, checksum); 393 // We only need to know that a reply to our message has arrived. 394 // The replyMessage is thrown away, because it does not contain 395 // any more useful knowledge. 396 sendAndWaitForOneReply(aMsg, 0); 397 } 398 399 /** 400 * Removes a file from the bitarchives, if given credentials and checksum are correct. 401 * 402 * @param fileName The name of the file to delete 403 * @param bitarchiveId The id of the bitarchive to delete the file in 404 * @param checksum The checksum of the deleted file 405 * @param credentials The credentials used to delete the file 406 * @return The file that was removed 407 * @throws ArgumentNotValid if arguments are null or equal to the empty string 408 * @throws IOFailure if we could not delete the remote file, or there was no response to our RemoveAndGetFileMessage 409 * within the allotted time defined by the setting {@link JMSArcRepositoryClient#ARCREPOSITORY_STORE_TIMEOUT}. 410 */ 411 public File removeAndGetFile(String fileName, String bitarchiveId, String checksum, String credentials) 412 throws IOFailure, ArgumentNotValid { 413 ArgumentNotValid.checkNotNullOrEmpty(fileName, "filename"); 414 ArgumentNotValid.checkNotNullOrEmpty(bitarchiveId, "bitarchiveName"); 415 ArgumentNotValid.checkNotNullOrEmpty(checksum, "checksum"); 416 ArgumentNotValid.checkNotNullOrEmpty(credentials, "credentials"); 417 418 String msg = "Requesting remove of file '" + fileName + "' with checksum '" + checksum + "' from bitarchive '" 419 + bitarchiveId + "'"; 420 log.warn(msg); 421 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 422 RemoveAndGetFileMessage aMsg = new RemoveAndGetFileMessage(Channels.getTheRepos(), 423 Channels.getThisReposClient(), fileName, bitarchiveId, checksum, credentials); 424 RemoveAndGetFileMessage replyMsg = (RemoveAndGetFileMessage) sendAndWaitForOneReply(aMsg, storeTimeout); 425 426 // The removed file is returned, move to temp location 427 if (replyMsg != null) { 428 if (replyMsg.isOk()) { 429 File removedFile = replyMsg.getData(); 430 log.debug("Stored copy of removed file: {} as: {}", fileName, removedFile.getAbsolutePath()); 431 return removedFile; 432 } else { 433 throw new IOFailure("Could not delete remote file: " + replyMsg.getErrMsg()); 434 } 435 } else { 436 throw new IOFailure("Request timed out while requesting remove of " + "file '" + fileName 437 + "' in bitarchive '" + bitarchiveId + "'"); 438 } 439 } 440 441 /** 442 * Retrieves all the checksum from the replica through a GetAllChecksumMessage. 443 * <p> 444 * This is the checksum archive alternative to running a ChecksumBatchJob. 445 * 446 * @param replicaId The id of the replica from which the checksums should be retrieved. 447 * @return A file containing filename and checksum of all the files in an archive in the same format as a 448 * ChecksumJob. 449 * @throws IOFailure If the reply is not of type GetAllChecksumsMessage or if the file could not properly be 450 * retrieved from the reply message or if the message timed out. 451 * @throws ArgumentNotValid If the replicaId is null or empty. 452 * @see dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage 453 */ 454 public File getAllChecksums(String replicaId) throws IOFailure, ArgumentNotValid { 455 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 456 log.debug("Sending GetAllChecksumMessage to replica '{}'.", replicaId); 457 // time this. 458 long start = System.currentTimeMillis(); 459 // make and send the message to the replica. 460 GetAllChecksumsMessage gacMsg = new GetAllChecksumsMessage(Channels.getTheRepos(), replyQ, replicaId); 461 NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gacMsg, 0); 462 463 // calculate and log the time spent on handling the message. 464 long timePassed = System.currentTimeMillis() - start; 465 log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND)); 466 // check whether the output was valid. 467 if (replyNetMsg == null) { 468 throw new IOFailure("Request for all checksum timed out after " + (getTimeout / MILLISECONDS_PER_SECOND) 469 + " seconds."); 470 } 471 // convert to the correct type of message. 472 GetAllChecksumsMessage replyCSMsg; 473 try { 474 replyCSMsg = (GetAllChecksumsMessage) replyNetMsg; 475 } catch (ClassCastException e) { 476 throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e); 477 } 478 479 try { 480 // retrieve the data from this message and place it in tempDir. 481 File result = File.createTempFile("tmp", "tmp", FileUtils.getTempDir()); 482 replyCSMsg.getData(result); 483 484 return result; 485 } catch (IOException e) { 486 throw new IOFailure("Cannot create a temporary file for retrieving " 487 + "the data remote from checksum message: " + replyCSMsg, e); 488 } 489 } 490 491 /** 492 * Retrieves the names of all the files in the replica through a GetAllFilenamesMessage. 493 * <p> 494 * This is the checksum archive alternative to running a FilelistBatchJob. 495 * 496 * @param replicaId The id of the replica from which the list of filenames should be retrieved. 497 * @return A file with all the filenames within the archive of the given replica. A null is returned if the message 498 * timeout. 499 * @throws IOFailure If the reply is not of type GetAllFilenamesMessage or if the file could not properly be 500 * retrieved from the reply message 501 * @throws ArgumentNotValid If the replicaId is null or empty. 502 * @see dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage 503 */ 504 public File getAllFilenames(String replicaId) throws ArgumentNotValid, IOFailure { 505 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 506 log.debug("Sending GetAllFilenamesMessage to replica '{}'.", replicaId); 507 // time this. 508 long start = System.currentTimeMillis(); 509 // make and send the message to the replica. 510 GetAllFilenamesMessage gafMsg = new GetAllFilenamesMessage(Channels.getTheRepos(), replyQ, replicaId); 511 NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gafMsg, 0); 512 513 // calculate and log the time spent on handling the message. 514 long timePassed = System.currentTimeMillis() - start; 515 log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND)); 516 // check whether the output was valid. 517 if (replyNetMsg == null) { 518 throw new IOFailure("Request for all filenames timed out after " + (getTimeout / MILLISECONDS_PER_SECOND) 519 + " seconds."); 520 } 521 // convert to the correct type of message. 522 GetAllFilenamesMessage replyCSMsg; 523 try { 524 replyCSMsg = (GetAllFilenamesMessage) replyNetMsg; 525 } catch (ClassCastException e) { 526 throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e); 527 } 528 529 try { 530 // retrieve the data from this message. 531 File result = File.createTempFile("tmp", "tmp", FileUtils.getTempDir()); 532 replyCSMsg.getData(result); 533 534 return result; 535 } catch (IOException e) { 536 throw new IOFailure("Cannot create a temporary file for retrieving " 537 + " the data remote from checksum message: " + replyCSMsg, e); 538 } 539 } 540 541 /** 542 * Retrieves the checksum of a specific file. 543 * <p> 544 * This is the checksum archive alternative to running a ChecksumJob limited to a specific file. 545 * 546 * @param replicaId The ID of the replica to send the message. 547 * @param filename The name of the file for whom the checksum should be retrieved. 548 * @return The checksum of the file in the replica. 549 * @throws IOFailure If the reply is not of type GetChecksumMessage. Or if the message timed out. 550 * @throws ArgumentNotValid If either the replicaId of the filename is null or empty. 551 */ 552 public String getChecksum(String replicaId, String filename) throws ArgumentNotValid, IOFailure { 553 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 554 ArgumentNotValid.checkNotNullOrEmpty(filename, "String filename"); 555 log.debug("Sending GetChecksumMessage to replica '{}' for file '{}'.", replicaId, filename); 556 // time this. 557 long start = System.currentTimeMillis(); 558 // make and send the message to the replica. 559 GetChecksumMessage gcsMsg = new GetChecksumMessage(Channels.getTheRepos(), replyQ, filename, replicaId); 560 NetarkivetMessage replyNetMsg = sendAndWaitForOneReply(gcsMsg, 0); 561 // calculate and log the time spent on handling the message. 562 long timePassed = System.currentTimeMillis() - start; 563 log.debug("Reply received after {} seconds.", (timePassed / MILLISECONDS_PER_SECOND)); 564 // check whether the output was valid. 565 if (replyNetMsg == null) { 566 throw new IOFailure("Request for checksum timed out after " + (getTimeout / MILLISECONDS_PER_SECOND) 567 + " seconds."); 568 } 569 570 // convert to the expected type of message. 571 GetChecksumMessage replyCSMsg; 572 try { 573 replyCSMsg = (GetChecksumMessage) replyNetMsg; 574 } catch (ClassCastException e) { 575 throw new IOFailure("Received invalid reply message: '" + replyNetMsg, e); 576 } 577 578 if (!replyCSMsg.isOk()) { 579 log.warn("The reply message for retrieval of checksum was not OK. Tries to extract checksum anyway. {}", 580 replyCSMsg.getErrMsg()); 581 } 582 return replyCSMsg.getChecksum(); 583 } 584 585 /** 586 * Method for correcting an entry in a replica. This is done by sending a correct message to the replica. 587 * <p> 588 * The file which is removed from the replica is put into the tempDir. 589 * 590 * @param replicaId The id of the replica to send the message. 591 * @param checksum The checksum of the corrupt entry in the archive. It is important to validate that the checksum 592 * actually is wrong before correcting the entry. 593 * @param file The file to correct the entry in the archive of the replica. 594 * @param credentials A string with the password for allowing changes inside an archive. If it does not correspond 595 * to the credentials of the archive, the correction will not be allowed. 596 * @return The corrupted file from the archive. 597 * @throws IOFailure If the message is not handled properly. 598 * @throws ArgumentNotValid If the replicaId, the checksum or the credentials are either null or empty, or if file 599 * is null. 600 */ 601 public File correct(String replicaId, String checksum, File file, String credentials) throws IOFailure, 602 ArgumentNotValid { 603 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "String replicaId"); 604 ArgumentNotValid.checkNotNullOrEmpty(checksum, "String checksum"); 605 ArgumentNotValid.checkNotNull(file, "File file"); 606 ArgumentNotValid.checkNotNullOrEmpty(credentials, "String credentials"); 607 608 RemoteFile rm = RemoteFileFactory.getCopyfileInstance(file); 609 CorrectMessage correctMsg = new CorrectMessage(Channels.getTheRepos(), replyQ, checksum, rm, replicaId, 610 credentials); 611 CorrectMessage responseMessage = (CorrectMessage) sendAndWaitForOneReply(correctMsg, 0); 612 613 if (responseMessage == null) { 614 throw new IOFailure("Correct Message timed out before returning." + " File not found?"); 615 } else if (!responseMessage.isOk()) { 616 throw new IOFailure("CorrectMessage failed: " + responseMessage.getErrMsg()); 617 } 618 619 // retrieve the wrong file. 620 RemoteFile removedFile = responseMessage.getRemovedFile(); 621 try { 622 File destFile = new File(FileUtils.getTempDir(), removedFile.getName()); 623 removedFile.copyTo(destFile); 624 return destFile; 625 } catch (Throwable e) { 626 String errMsg = "Problems occured during retrieval of file " + "removed from archive."; 627 log.warn(errMsg, e); 628 throw new IOFailure(errMsg, e); 629 } 630 } 631 632}