001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.arcrepository; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.archive.ArchiveSettings; 036import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage; 037import dk.netarkivet.archive.arcrepository.distribute.ArcRepositoryServer; 038import dk.netarkivet.archive.arcrepository.distribute.StoreMessage; 039import dk.netarkivet.archive.arcrepositoryadmin.Admin; 040import dk.netarkivet.archive.arcrepositoryadmin.AdminFactory; 041import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage; 042import dk.netarkivet.archive.bitarchive.distribute.BitarchiveClient; 043import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage; 044import dk.netarkivet.archive.bitarchive.distribute.UploadMessage; 045import dk.netarkivet.archive.checksum.distribute.ChecksumClient; 046import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 047import dk.netarkivet.archive.distribute.ReplicaClient; 048import dk.netarkivet.common.distribute.ChannelID; 049import dk.netarkivet.common.distribute.Channels; 050import dk.netarkivet.common.distribute.JMSConnectionFactory; 051import dk.netarkivet.common.distribute.NetarkivetMessage; 052import dk.netarkivet.common.distribute.NullRemoteFile; 053import dk.netarkivet.common.distribute.RemoteFile; 054import dk.netarkivet.common.distribute.arcrepository.Replica; 055import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState; 056import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 057import dk.netarkivet.common.exceptions.ArgumentNotValid; 058import dk.netarkivet.common.exceptions.IOFailure; 059import dk.netarkivet.common.exceptions.IllegalState; 060import dk.netarkivet.common.exceptions.UnknownID; 061import dk.netarkivet.common.utils.CleanupIF; 062import dk.netarkivet.common.utils.FileUtils; 063import dk.netarkivet.common.utils.NotificationType; 064import dk.netarkivet.common.utils.NotificationsFactory; 065import dk.netarkivet.common.utils.Settings; 066import dk.netarkivet.common.utils.batch.ChecksumJob; 067 068/** 069 * The Arcrepository handles the communication with the different replicas. This class ensures that arc files are stored 070 * in all available replica and verifies that the storage process succeeded. Retrieval of data from a replica goes 071 * through the JMSArcRepositoryClient that contacts the appropriate (typically nearest) replica and retrieves data from 072 * this archive. Batch execution is sent to the bitarchive replica(s), since batch cannot be executed on checksum 073 * replicas. Correction operations are typically only allowed on one replica. 074 */ 075@SuppressWarnings({"deprecation"}) 076public class ArcRepository implements CleanupIF { 077 078 /** The log. */ 079 private static final Logger log = LoggerFactory.getLogger(ArcRepository.class); 080 081 /** The unique instance (singleton) of this class. */ 082 private static ArcRepository instance; 083 084 /** The administration data associated with the arcrepository. */ 085 private Admin ad; 086 087 /** The class which listens to messages sent to this instance of Arcrepository or its subclasses. */ 088 private ArcRepositoryServer arcReposhandler; 089 090 /** A Map of a Replica and their corresponding ReplicaClient. From this Map the relevant channels can be found. */ 091 private final Map<Replica, ReplicaClient> connectedReplicas = new HashMap<Replica, ReplicaClient>(); 092 093 /** Map from MessageId to arcfiles for which there are outstanding checksum jobs. */ 094 private final Map<String, String> outstandingChecksumFiles = new HashMap<String, String>(); 095 096 /** 097 * Map from filenames to remote files. Used for retrieving a remote file reference while a store operation is in 098 * process. 099 */ 100 private final Map<String, RemoteFile> outstandingRemoteFiles = new HashMap<String, RemoteFile>(); 101 102 private final Map<String, String> outstandingRemoteFilesC = new HashMap<String, String>(); 103 104 105 /** 106 * Map from bitarchive names to Map from filenames to the number of times a file has been attempted uploaded to the 107 * the bitarchive. 108 */ 109 private final Map<String, Map<String, Integer>> uploadRetries = new HashMap<String, Map<String, Integer>>(); 110 111 /** 112 * Constructor for the ArcRepository. Connects the ArcRepository to all BitArchives, and initialises admin data. 113 * 114 * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive. 115 * @throws IllegalState if inconsistent channel info is given in settings. 116 */ 117 protected ArcRepository() throws IOFailure, IllegalState { 118 // UpdateableAdminData Throws IOFailure 119 this.ad = AdminFactory.getInstance(); 120 this.arcReposhandler = new ArcRepositoryServer(this); 121 122 initialiseReplicaClients(); 123 124 log.info("Starting the ArcRepository"); 125 } 126 127 /** 128 * Returns the unique ArcRepository instance. 129 * 130 * @return the instance. 131 * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive. 132 * @throws IllegalState if inconsistent channel info is given in settings. 133 */ 134 public static synchronized ArcRepository getInstance() throws IllegalState, IOFailure { 135 if (instance == null) { 136 instance = new ArcRepository(); 137 } 138 return instance; 139 } 140 141 /** 142 * Method for initialising the replica clients. 143 */ 144 private void initialiseReplicaClients() { 145 // Get channels 146 ChannelID[] allBas = Channels.getAllArchives_ALL_BAs(); 147 ChannelID[] anyBas = Channels.getAllArchives_ANY_BAs(); 148 ChannelID[] theBamons = Channels.getAllArchives_BAMONs(); 149 ChannelID[] theCrs = Channels.getAllArchives_CRs(); 150 151 Replica[] replicas = Replica.getKnown().toArray(new Replica[theBamons.length]); 152 // Checks equal number of channels 153 checkChannels(allBas, anyBas, theBamons); 154 155 for (int i = 0; i < allBas.length; i++) { 156 Replica rep = replicas[i]; 157 if (rep.getType() == ReplicaType.BITARCHIVE) { 158 connectedReplicas.put(rep, BitarchiveClient.getInstance(allBas[i], anyBas[i], theBamons[i])); 159 } else { // checksum replica 160 connectedReplicas.put(rep, ChecksumClient.getInstance(theCrs[i])); 161 } 162 } 163 } 164 165 /** 166 * Sanity check for data consistency in the construction of the ArcRepository, specifically that the number of 167 * ALL_BA, ANY_BA, and THE_BAMON queues are all equal to the number of credentials. 168 * 169 * @param allBas The topics for bitarchives 170 * @param anyBas The queues for bitarchives 171 * @param theBamons The queues for bitarchive monitors 172 * @throws IllegalState if inconsistent data is found 173 */ 174 private void checkChannels(ChannelID[] allBas, ChannelID[] anyBas, ChannelID[] theBamons) throws IllegalState { 175 if (theBamons.length != allBas.length || theBamons.length != anyBas.length) { 176 throw new IllegalState("Inconsistent data found in construction of " + "ArcRepository: \n\nALL_BAs: " 177 + Arrays.toString(allBas) + "\nANY_BAs: " + Arrays.toString(anyBas) + "\nTHE_BAMONs: " 178 + Arrays.toString(theBamons)); 179 } 180 } 181 182 /** 183 * Stores a file in all known replicas. It sends out a upload message to all replicas. 184 * 185 * @param rf The remotefile to be stored. 186 * @param replyInfo A StoreMessage used to reply with success or failure. 187 * @throws IOFailure If file couldn't be stored. 188 * @throws ArgumentNotValid If a input parameter is null. 189 */ 190 public synchronized void store(RemoteFile rf, StoreMessage replyInfo) throws IOFailure, ArgumentNotValid { 191 ArgumentNotValid.checkNotNull(rf, "rf"); 192 ArgumentNotValid.checkNotNull(replyInfo, "replyInfo"); 193 194 final String filename = rf.getName(); 195 log.info("Store started: '{}'", filename); 196 197 // Record, that store of this filename is in progress 198 // needed for retrying uploads. 199 if (outstandingRemoteFiles.containsKey(filename)) { 200 log.info("File: '{}' was outstanding from the start.", filename); 201 } 202 outstandingRemoteFiles.put(filename, rf); 203 outstandingRemoteFilesC.put(filename, replyInfo.getPrecomputedChecksum()); // Hack 204 205 if (ad.hasEntry(filename)) { 206 // Any valid entry (and all existing entries are now 207 // known to be valid) by definition has a checksum. 208 if (!rf.getChecksum().equals(ad.getCheckSum(filename))) { 209 String msg = "Attempting to store file '" + filename + "' with a different checksum than before: " 210 + "Old checksum: " + ad.getCheckSum(filename) + ", new checksum: " + rf.getChecksum(); 211 log.warn(msg); 212 replyNotOK(filename, replyInfo); 213 return; 214 } 215 log.debug("Retrying store of already known file '{}'," + " Already completed: {}", filename, 216 isStoreCompleted(filename)); 217 ad.setReplyInfo(filename, replyInfo); 218 } else { 219 ad.addEntry(filename, replyInfo, rf.getChecksum()); 220 } 221 for (Map.Entry<Replica, ReplicaClient> entry : connectedReplicas.entrySet()) { 222 startUpload(rf, entry.getValue(), entry.getKey(), replyInfo); 223 } 224 225 // Check state and reply if needed 226 considerReplyingOnStore(filename); 227 } 228 229 /** 230 * Initiate uploading of file to a specific replica. The corresponding upload record in admin data is created. 231 * 232 * @param rf Remotefile to upload to replica. 233 * @param replicaClient The replica client to upload to. 234 * @param replica The replica where RemoteFile is to be stored. 235 * @param replyInfo 236 */ 237 private synchronized void startUpload(RemoteFile rf, ReplicaClient replicaClient, Replica replica, StoreMessage replyInfo) { 238 final String filename = rf.getName(); 239 log.debug("Upload started of file '{}' to replica '{}'", filename, replica.getId()); 240 241 String replicaChannelId = replica.getIdentificationChannel().getName(); 242 243 if (!ad.hasState(filename, replicaChannelId)) { 244 // New upload 245 ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED); 246 replicaClient.sendUploadMessage(rf, replyInfo.getPrecomputedChecksum()); // Updated to include checksum information 247 } else { 248 // Recovery from old upload 249 ReplicaStoreState storeState = ad.getState(filename, replicaChannelId); 250 switch (storeState) { 251 case UPLOAD_FAILED: 252 case UPLOAD_STARTED: 253 case DATA_UPLOADED: 254 log.debug("Recovery from old upload. StoreState: {}. Sending new Checksum request for verifying " 255 + "whether the file '{}' has been succesfully uploaded for replica: '{}'", storeState, 256 filename, replica); 257 // Unknown condition in bitarchive. Test with checksum job. 258 if (storeState == ReplicaStoreState.UPLOAD_FAILED) { 259 ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED); 260 log.info("ReplicaStoreState for file '{}' on replica '{}' changed from '{}' to '{}'", filename, 261 replica, ReplicaStoreState.UPLOAD_FAILED, ReplicaStoreState.UPLOAD_STARTED); 262 } 263 sendChecksumRequestForFile(filename, replicaClient); 264 break; 265 case UPLOAD_COMPLETED: 266 log.warn("Trying to upload file '{}' that already has state UPLOAD_COMPLETED for this replica", 267 filename); 268 break; 269 default: 270 throw new UnknownID("Unknown state: '" + storeState + "'"); 271 } 272 } 273 } 274 275 /** 276 * Method for retrieving the checksum of a specific file from the archive of a specific replica. If the replica is a 277 * BitArchive, then a Batch message with the ChecksumJob is sent. If the replica is a ChecksumArchive, then a 278 * GetChecksumMessage is sent. 279 * 280 * @param filename The file to checksum. 281 * @param replicaClient The client to retrieve the checksum of the file from. 282 */ 283 private void sendChecksumRequestForFile(String filename, ReplicaClient replicaClient) { 284 NetarkivetMessage msg; 285 286 // Retrieve the checksum of the file. 287 msg = replicaClient.sendGetChecksumMessage(Channels.getTheRepos(), filename); 288 289 outstandingChecksumFiles.put(msg.getID(), filename); 290 log.debug("Checksum job message submitted for file '{}' with message id: '{}'", filename, msg.getID()); 291 } 292 293 /** 294 * Test whether the current state is such that we may send a reply for the file we are currently processing, and 295 * send the reply if it is. We reply only when there is an outstanding message to reply to, and a) The file is 296 * reported complete in all replicas or b) No replica has outstanding reply messages AND some replica has reported 297 * failure. 298 * 299 * @param arcFileName The arcfile we consider replying to. 300 */ 301 private synchronized void considerReplyingOnStore(String arcFileName) { 302 if (ad.hasReplyInfo(arcFileName)) { 303 if (isStoreCompleted(arcFileName)) { 304 replyOK(arcFileName, ad.removeReplyInfo(arcFileName)); 305 } else if (oneReplicaHasFailed(arcFileName) && noReplicaInStateUploadStarted(arcFileName)) { 306 replyNotOK(arcFileName, ad.removeReplyInfo(arcFileName)); 307 } 308 } 309 } 310 311 /** 312 * Reply to a store message with status Ok. 313 * 314 * @param arcFileName The file for which we are replying. 315 * @param msg The message to reply to. 316 */ 317 private synchronized void replyOK(String arcFileName, StoreMessage msg) { 318 outstandingRemoteFiles.remove(arcFileName); 319 outstandingRemoteFilesC.remove(arcFileName); 320 clearRetries(arcFileName); 321 log.info("Store OK: '{}'", arcFileName); 322 log.debug("Sending store OK reply to message '{}'", msg); 323 JMSConnectionFactory.getInstance().reply(msg); 324 } 325 326 /** 327 * Reply to a store message with status NotOk. 328 * 329 * @param arcFileName The file for which we are replying. 330 * @param msg The message to reply to. 331 */ 332 private synchronized void replyNotOK(String arcFileName, StoreMessage msg) { 333 outstandingRemoteFiles.remove(arcFileName); 334 outstandingRemoteFilesC.remove(arcFileName); 335 clearRetries(arcFileName); 336 msg.setNotOk("Failure while trying to store ARC file: " + arcFileName); 337 log.warn("Store NOT OK: '{}'", arcFileName); 338 log.debug("Sending store NOT OK reply to message '{}'", msg); 339 JMSConnectionFactory.getInstance().reply(msg); 340 } 341 342 /** 343 * Check if all replicas have reported that storage has been successfully completed. If this is the case return true 344 * else false. 345 * 346 * @param arcfileName The file being stored. 347 * @return true only if all replicas report UPLOAD_COMPLETED. 348 */ 349 private boolean isStoreCompleted(String arcfileName) { 350 // TODO remove quadratic scaling hidden here!! 351 for (Replica rep : connectedReplicas.keySet()) { 352 try { 353 // retrieve the replica channel and check upload status. 354 if (ad.getState(arcfileName, rep.getIdentificationChannel().getName()) != ReplicaStoreState.UPLOAD_COMPLETED) { 355 return false; 356 } 357 } catch (UnknownID e) { 358 // Since no upload status exists, then it cannot be completed! 359 log.warn("Non-fatal error! A replica does not have a upload status for the file '{}'.", arcfileName, e); 360 return false; 361 } 362 } 363 364 // Since no replica has a storestate differing from 'UPLOAD_COMPLETED' 365 // and no errors, then the store is completed for the entire system. 366 return true; 367 } 368 369 /** 370 * Checks if there are at least one replica that has reported that storage has failed. If this is the case return 371 * true else false. 372 * 373 * @param arcFileName the name of file being stored. 374 * @return true only if at least one replica report UPLOAD_FAILED. 375 */ 376 private boolean oneReplicaHasFailed(String arcFileName) { 377 for (Replica rep : connectedReplicas.keySet()) { 378 try { 379 // retrieve the replica channel and check upload status. 380 String repChannel = rep.getIdentificationChannel().getName(); 381 if (ad.getState(arcFileName, repChannel) == ReplicaStoreState.UPLOAD_FAILED) { 382 return true; 383 } 384 } catch (UnknownID e) { 385 log.warn("Non-fatal error. One replica does not have a upload status for the file '{}'.", arcFileName, 386 e); 387 return true; 388 } 389 } 390 return false; 391 } 392 393 /** 394 * Checks if no replicas which has reported that upload is in started state. If this is the case return true else 395 * false. 396 * 397 * @param arcFileName The name of the file being stored. 398 * @return true only if no replica report UPLOAD_STARTED. 399 */ 400 private boolean noReplicaInStateUploadStarted(String arcFileName) { 401 for (Replica rep : connectedReplicas.keySet()) { 402 try { 403 // retrieve the replica channel and check upload status. 404 String repChannelName = rep.getIdentificationChannel().getName(); 405 if (ad.getState(arcFileName, repChannelName) == ReplicaStoreState.UPLOAD_STARTED) { 406 return false; 407 } 408 } catch (UnknownID e) { 409 // When no upload exists, then upload cannot have started. 410 log.warn("Non-fatal error! A replica does not have a upload status for tshe file '{}'.", arcFileName, e); 411 } 412 } 413 414 return true; 415 } 416 417 /** 418 * Returns a replica client based on a replica id. 419 * 420 * @param replicaId the replica id 421 * @return A replica client. 422 * @throws ArgumentNotValid if replicaId parameter is null 423 */ 424 public ReplicaClient getReplicaClientFromReplicaId(String replicaId) throws ArgumentNotValid { 425 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId"); 426 427 // retrieve the replica client. 428 ReplicaClient rc = connectedReplicas.get(Replica.getReplicaFromId(replicaId)); 429 430 return rc; 431 } 432 433 /** 434 * Finds the identification channel for the replica. If the replica is a BitArchive then the channel to the 435 * BitArchiveMonitor is returned, and if the replica is a ChecksumArchive then the checksum replica channel is 436 * returned. This means that only the channels for the bitarchive should be changed into the bamon channel, e.g. 437 * replacing the ALL_BA and ANY_BA identifiers with THE_BAMON. This change does not affect the checksum channel, and 438 * is therefore also performed on it. 439 * 440 * @param channel A channel to the replica. 441 * @return The name of the channel which identifies the replica. 442 */ 443 private String resolveReplicaChannel(String channel) { 444 return channel.replaceAll("ALL_BA", "THE_BAMON").replaceAll("ANY_BA", "THE_BAMON"); 445 } 446 447 /** 448 * Event handler for upload messages reporting the upload result. Checks the success status of the upload and 449 * updates admin data accordingly. 450 * 451 * @param msg an UploadMessage. 452 */ 453 public synchronized void onUpload(UploadMessage msg) { 454 ArgumentNotValid.checkNotNull(msg, "msg"); 455 log.debug("Received upload reply: {}", msg.toString()); 456 457 String repChannelName = resolveReplicaChannel(msg.getTo().getName()); 458 459 if (msg.isOk()) { 460 processDataUploaded(msg.getArcfileName(), repChannelName); 461 } else { 462 processUploadFailed(msg.getArcfileName(), repChannelName); 463 } 464 } 465 466 /** 467 * Process the report by a bitarchive that a file was correctly uploaded. 468 * <ol> 469 * <il>1. Update the upload, and store states appropriately.</il><br/> 470 * <il>2. Verify that data are correctly stored in the archive by running a batch job on the archived file to 471 * perform a MD5 checksum comparison.</il> <br/> 472 * <il>3. Check if store operation is completed and update admin data if so.</il><br/> 473 * </ol> 474 * 475 * @param arcfileName The arcfile that was uploaded. 476 * @param replicaChannelName The name of the identification channel for the replica that uploaded it (THE_BAMON for 477 * bitarchive and THE_CR for checksum). 478 */ 479 private synchronized void processDataUploaded(String arcfileName, String replicaChannelName) { 480 log.debug("Data uploaded '{}' ,{}", arcfileName, replicaChannelName); 481 ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.DATA_UPLOADED); 482 483 // retrieve the replica 484 Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName); 485 // Verify that the file has been correctly uploaded. 486 sendChecksumRequestForFile(arcfileName, connectedReplicas.get(rep)); 487 } 488 489 /** 490 * Update admin data with the information that upload to a replica failed. The replica record is set to 491 * UPLOAD_FAILED. 492 * 493 * @param arcfileName The file that resulted in an upload failure. 494 * @param replicaChannelName The name of the idenfiticaiton channel for the replica that could not upload the file. 495 */ 496 private void processUploadFailed(String arcfileName, String replicaChannelName) { 497 log.warn("Upload failed for ARC file '{}' to bit archive '{}'", arcfileName, replicaChannelName); 498 499 // Update state to reflect upload failure 500 ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED); 501 considerReplyingOnStore(arcfileName); 502 } 503 504 /** 505 * Called when we receive replies on our checksum batch jobs. 506 * <p> 507 * This does not handle checksum replicas. 508 * 509 * @param msg a BatchReplyMessage. 510 */ 511 public synchronized void onBatchReply(BatchReplyMessage msg) { 512 ArgumentNotValid.checkNotNull(msg, "msg"); 513 log.debug("BatchReplyMessage received: '{}'", msg); 514 515 if (!outstandingChecksumFiles.containsKey(msg.getReplyOfId())) { 516 // Message was NOT expected 517 log.warn("Received batchreply message with unknown originating ID {}\n{}\n. Known IDs are: {}", 518 msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString()); 519 return; 520 } 521 522 String arcfileName = outstandingChecksumFiles.remove(msg.getReplyOfId()); 523 524 // Check incoming message 525 if (!msg.isOk()) { 526 // Checksum job has ended with errors, but can contain checksum 527 // anyway, therefore it is logged - but we try to go on 528 log.warn("Message '" + msg.getID() + "' is reported not okay" + "\nReported error: '" + msg.getErrMsg() 529 + "'" + "\nTrying to process anyway."); 530 } 531 532 // Parse results 533 // if legal result is found it is placed in reportedChecksum 534 // if illegal or errors occurs reportedChecksum is set to "" 535 RemoteFile checksumResFile = msg.getResultFile(); 536 String reportedChecksum = ""; 537 boolean checksumReadOk = false; 538 if (checksumResFile == null) { 539 log.debug("The results of message '{}' was null.\nNo checksum to use for file '{}'", msg.getID(), 540 arcfileName); 541 } else if (checksumResFile instanceof NullRemoteFile) { 542 log.debug( 543 "The results of the message '{}' was instance of NullRemoteFile\nNo checksum to use for file '{}'", 544 msg.getID(), arcfileName); 545 } else { 546 // Read checksum 547 // Copy result to a local file 548 File outputFile = new File(FileUtils.getTempDir(), msg.getReplyTo().getName() + "_" + arcfileName 549 + "_checksumOutput.txt"); 550 try { 551 checksumResFile.copyTo(outputFile); 552 553 // Read checksum from local file 554 reportedChecksum = readChecksum(outputFile, arcfileName); 555 checksumReadOk = true; 556 } catch (IOFailure e) { 557 log.warn("Couldn't read checksumjob " + "output for '{}'", arcfileName, e); 558 } catch (IllegalState e) { 559 log.warn("Couldn't read result of checksumjob " + "in '{}'", arcfileName, e); 560 } 561 562 // Clean up output file and remote file 563 // clean up does NOT result in general error, i.e. 564 // reportedChecksum is NOT set to "" in case of errors 565 try { 566 FileUtils.removeRecursively(outputFile); 567 } catch (IOFailure e) { 568 log.warn("Couldn't clean up checksumjob " + "output file '{}'", outputFile, e); 569 } 570 try { 571 checksumResFile.cleanup(); 572 } catch (IOFailure e) { 573 log.warn("Couldn't clean up checksumjob " + "remote file '{}'", checksumResFile.getName(), e); 574 } 575 } 576 577 // Process result 578 String orgCheckSum = ad.getCheckSum(arcfileName); 579 String repChannel = resolveReplicaChannel(msg.getReplyTo().getName()); 580 processCheckSum(arcfileName, repChannel, orgCheckSum, reportedChecksum, msg.isOk() && checksumReadOk); 581 } 582 583 /** 584 * The message for handling the results of the GetChecksumMessage. 585 * 586 * @param msg The message containing the checksum of a specific file. 587 */ 588 public synchronized void onChecksumReply(GetChecksumMessage msg) { 589 ArgumentNotValid.checkNotNull(msg, "msg"); 590 591 log.debug("Received the reply to a GetChecksumMessage with ID: '{}'", msg.getID()); 592 593 // handle the case when unwanted reply. 594 if (!outstandingChecksumFiles.containsKey(msg.getID())) { 595 log.warn("Received GetChecksumMessage with unknown originating ID {}\n{}\n. Known IDs are: {}", 596 msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString()); 597 return; 598 } 599 600 String arcfileName = outstandingChecksumFiles.remove(msg.getID()); 601 602 // Check incoming message 603 if (!msg.isOk()) { 604 // Checksum job has ended with errors, but can contain checksum 605 // anyway, therefore it is logged - but we try to go on 606 log.warn("Message '{}' is reported not okay\nReported error: '{}'\nTrying to process anyway.", msg.getID(), 607 msg.getErrMsg()); 608 } 609 610 String reportedChecksum = msg.getChecksum(); 611 612 // check the checksum 613 if (reportedChecksum == null) { 614 // set the reported checksum to empty, like for BAs. 615 reportedChecksum = ""; 616 } 617 618 // The checksum was not read by the batchjob, therefore this variable 619 // cannot be false 620 boolean checksumReadOk = true; 621 622 // process the checksum. 623 String orgChecksum = ad.getCheckSum(arcfileName); 624 if (orgChecksum == null) { 625 throw new IllegalState("The admin checksum for file '" + arcfileName + "' is null. Should never happen."); 626 } 627 String repChannelName = resolveReplicaChannel(msg.getTo().getName()); 628 processCheckSum(arcfileName, repChannelName, orgChecksum, reportedChecksum, checksumReadOk); 629 } 630 631 /** 632 * Reads output from a checksum file. Only the first instance of the desired file will be used. If other filenames 633 * are encountered than the wanted one, they will be logged at level warning, as that is indicative of serious 634 * errors. Having more than one instance of the desired file merely means it was found in several bitarchives, which 635 * is not our problem. 636 * 637 * @param outputFile The file to read checksum from. 638 * @param arcfileName The arcfile to find checksum for. 639 * @return The checksum, or the empty string if no checksum found for arcfilename. 640 * @throws IOFailure If any error occurs reading the file. 641 * @throws IllegalState if the read format is wrong 642 */ 643 private String readChecksum(File outputFile, String arcfileName) throws IOFailure, IllegalState { 644 // List of lines in batch (checksum job) output file 645 List<String> lines = FileUtils.readListFromFile(outputFile); 646 // List of checksums found in batch (checksum job) output file 647 List<String> checksumList = new ArrayList<String>(); 648 649 // Extract checksums for arcfile from lines 650 // If errors occurs then throw exception 651 for (String line : lines) { 652 String readFileName = ""; 653 String checksum = ""; 654 String[] tokens = line.split(ChecksumJob.STRING_FILENAME_SEPARATOR); 655 boolean ignoreLine = false; 656 657 // Check line format 658 ignoreLine = (tokens.length == 0 || line.isEmpty()); 659 if (tokens.length != 2 && !ignoreLine) { // wrong format 660 throw new IllegalState("Read checksum line had unexpected format '" + line + "'"); 661 } 662 663 // Check checksum and arc-file name in line 664 if (!ignoreLine) { 665 readFileName = tokens[0]; 666 checksum = tokens[1]; 667 if (checksum.length() == 0) { // wrong format of checksum 668 // do not exit - there may be more checksums 669 ignoreLine = true; 670 log.warn("There were an empty checksum in result for checksums to arc-file '{}' (line: '{}')", 671 arcfileName, line); 672 } else { 673 if (!readFileName.equals(arcfileName)) { // wrong arcfile 674 // do not exit - there may be more checksums 675 ignoreLine = true; 676 log.warn( 677 "There were an unexpected arc-file name in checksum result for arc-file '{}' (line: '{}')", 678 arcfileName, line); 679 } 680 } 681 } 682 683 // If previously checksum found, then check if they are different. 684 if (checksumList.size() > 0 && !ignoreLine && !checksum.equals(checksumList.get(checksumList.size() - 1))) { 685 String errMsg = "The arc-file '" + arcfileName + "' was found with two different checksums: " 686 + checksumList.get(0) + " and " + checksum + ". Last line: '" + line + "'."; 687 log.warn(errMsg); 688 throw new IllegalState(errMsg); 689 } 690 691 // Add error free non-empty found checksum in list 692 if (!ignoreLine) { 693 checksumList.add(checksum); 694 } 695 } 696 697 // Check that checksum list contain a result, 698 // log if it has more than one result 699 if (checksumList.size() > 1) { 700 // Log and proceed - the checksums are equal 701 log.warn("Arcfile '{}' was found with {} occurrences of the checksum: {}", arcfileName, 702 checksumList.size(), checksumList.get(0)); 703 } 704 705 if (checksumList.size() == 0) { 706 log.debug("Arcfile '{}' not found in lines of checksum output file '{}': {}", arcfileName, outputFile, 707 FileUtils.readListFromFile(outputFile)); 708 return ""; 709 } else { 710 return checksumList.get(0); 711 } 712 } 713 714 /** 715 * Process reporting of a checksum from a bitarchive for a specific file as part of a store operation for the file. 716 * Verify that the checksum is correct, update the BitArchiveStoreState state. Invariant: upload-state is changed or 717 * retry count is increased. 718 * 719 * @param arcFileName The file being stored. 720 * @param replicaChannelName The id of the replica reporting a checksum. 721 * @param orgChecksum The original checksum. 722 * @param reportedChecksum The checksum calculated by the replica. This value is "", if an error has occurred 723 * (except reply NOT ok from replica). 724 * @param checksumReadOk Tells whether the checksum was read ok by batch job. 725 */ 726 private synchronized void processCheckSum(String arcFileName, String replicaChannelName, String orgChecksum, 727 String reportedChecksum, boolean checksumReadOk) { 728 log.debug("Checksum received for file '{}'... processing", arcFileName); 729 ArgumentNotValid.checkNotNullOrEmpty(arcFileName, "String arcfileName"); 730 ArgumentNotValid.checkNotNullOrEmpty(replicaChannelName, "String replicaChannelName"); 731 ArgumentNotValid.checkNotNullOrEmpty(orgChecksum, "String orgChecksum"); 732 ArgumentNotValid.checkNotNull(reportedChecksum, "String reportedChecksum"); 733 734 // Log if we do not find file outstanding 735 // we proceed anyway in order to be sure to update stae of file 736 if (!outstandingRemoteFiles.containsKey(arcFileName)) { 737 log.warn("Could not find arc-file as outstanding remote file: '{}'", arcFileName); 738 } 739 740 // If everything works fine complete process of this checksum 741 if (orgChecksum.equals(reportedChecksum) && !reportedChecksum.isEmpty()) { 742 743 // Checksum is valid and job matches expected results 744 ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_COMPLETED); 745 746 // Find out if and how to make general reply on store() 747 // remove file from outstandingRemoteFiles if a reply is given 748 considerReplyingOnStore(arcFileName); 749 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 750 return; 751 } 752 753 // Log error or retry upload 754 if (reportedChecksum.isEmpty()) { // no checksum found 755 if (checksumReadOk) { // no errors in finding no checksum 756 if (retryOk(replicaChannelName, arcFileName)) { // we can retry 757 if (outstandingRemoteFiles.containsKey(arcFileName)) { 758 RemoteFile rf = outstandingRemoteFiles.get(arcFileName); 759 String preComputedChecksum = outstandingRemoteFilesC.get(arcFileName); 760 // Retry upload only if allowed and in case we are sure 761 // that the empty checksum means that the arcfile is not 762 // in the archive 763 log.debug("Retrying upload of '{}'", arcFileName); 764 ad.setState(rf.getName(), replicaChannelName, ReplicaStoreState.UPLOAD_STARTED); 765 // retrieve the replica from the name of the channel. 766 Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName); 767 connectedReplicas.get(rep).sendUploadMessage(rf, preComputedChecksum); 768 incRetry(replicaChannelName, arcFileName); 769 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 770 return; 771 } // else logging was already done above 772 } else { // cannot retry 773 log.warn("Cannot do more retry upload of remote file: '{}' to '{}', reported checksum='{}'", 774 arcFileName, replicaChannelName, reportedChecksum); 775 } 776 } else { // error in getting checksum 777 log.warn( 778 "Cannot retry upload of remote file: '{}' to '{}', reported checksum='{}' due to earlier batchjob error.", 779 arcFileName, replicaChannelName, reportedChecksum); 780 } 781 } else { // non empty checksum 782 if (!orgChecksum.equals(reportedChecksum)) { 783 log.warn("Cannot upload (wrong checksum) '{}' to '{}', reported checksum='{}'", arcFileName, 784 replicaChannelName, reportedChecksum); 785 } else { 786 log.warn("Cannot upload (unknown reason) '{}' to '{}', reported checksum='{}'", arcFileName, 787 replicaChannelName, reportedChecksum); 788 } 789 } 790 791 // This point is reached if there is some kind of (logged) error, i.e. 792 // - the file has not been accepted as completed 793 // - the file has not been sent to retry of upload 794 ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED); 795 considerReplyingOnStore(arcFileName); 796 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 797 } 798 799 /** 800 * Keep track of upload retries of an arcfile to an archive. 801 * 802 * @param replicaChannelName The name of a given replica. 803 * @param arcfileName The name of a given ARC file 804 * @return true if it is ok to retry an upload of arcfileName to the replica through the replicaChannelName. 805 */ 806 private boolean retryOk(String replicaChannelName, String arcfileName) { 807 Map<String, Integer> bitarchiveRetries = uploadRetries.get(replicaChannelName); 808 if (bitarchiveRetries == null) { 809 return true; 810 } 811 Integer retryCount = bitarchiveRetries.get(arcfileName); 812 if (retryCount == null) { 813 return true; 814 } 815 816 if (retryCount >= Settings.getInt(ArchiveSettings.ARCREPOSITORY_UPLOAD_RETRIES)) { 817 return false; 818 } 819 820 return true; 821 } 822 823 /** 824 * Increment the number of upload retries. 825 * 826 * @param replicaChannelName The name of the identification channel for the replica. 827 * @param arcfileName The name of a given ARC file. 828 */ 829 private void incRetry(String replicaChannelName, String arcfileName) { 830 Map<String, Integer> replicaRetries = uploadRetries.get(replicaChannelName); 831 if (replicaRetries == null) { 832 replicaRetries = new HashMap<String, Integer>(); 833 uploadRetries.put(replicaChannelName, replicaRetries); 834 } 835 836 Integer retryCount = replicaRetries.get(arcfileName); 837 if (retryCount == null) { 838 replicaRetries.put(arcfileName, Integer.valueOf(1)); 839 return; 840 } 841 842 replicaRetries.put(arcfileName, Integer.valueOf(retryCount + 1)); 843 } 844 845 /** 846 * Remove all retry tracking information for the arcfile. 847 * 848 * @param arcfileName The name of a given ARC file 849 */ 850 private void clearRetries(String arcfileName) { 851 for (String replicaChannelName : uploadRetries.keySet()) { 852 Map<String, Integer> baretries = uploadRetries.get(replicaChannelName); 853 baretries.remove(arcfileName); 854 } 855 } 856 857 /** 858 * Change admin data entry for a given file. 859 * <p> 860 * The following information is contained in the given AdminDataMessage: 1) The name of the given file to change the 861 * entry for, 2) the name of the bitarchive to modify the entry for, 3) a boolean that says whether or not to 862 * replace the checksum for the entry for the given file in AdminData, 4) a replacement for the case where the 863 * former value is true. 864 * 865 * @param msg an AdminDataMessage object 866 */ 867 public void updateAdminData(AdminDataMessage msg) { 868 869 if (!ad.hasEntry(msg.getFileName())) { 870 throw new ArgumentNotValid("No admin entry exists for the file '" + msg.getFileName() + "'"); 871 } 872 873 String message = "Handling request to change admin data for '" + msg.getFileName() + "'. "; 874 // add information if store-state is changed. 875 if (msg.isChangeStoreState()) { 876 message += "Change store state to " + msg.getNewvalue(); 877 } 878 // add information if checksum is changed. 879 if (msg.isChangeChecksum()) { 880 message += "Change checksum to " + msg.getChecksum(); 881 } 882 // log the message. 883 log.warn(message); 884 NotificationsFactory.getInstance().notify(message, NotificationType.WARNING); 885 886 if (msg.isChangeStoreState()) { 887 String replicaChannelName = Replica.getReplicaFromId(msg.getReplicaId()).getIdentificationChannel() 888 .getName(); 889 ad.setState(msg.getFileName(), replicaChannelName, msg.getNewvalue()); 890 } 891 892 if (msg.isChangeChecksum()) { 893 ad.setCheckSum(msg.getFileName(), msg.getChecksum()); 894 } 895 } 896 897 /** 898 * Forwards a RemoveAndGetFileMessage to the designated bitarchive. Before forwarding the message it is verified 899 * that the checksum of the file to remove differs from the registered checksum of the file to remove. If no 900 * registration exists for the file to remove the message is always forwarded. 901 * 902 * @param msg the message to forward to a bitarchive 903 */ 904 public void removeAndGetFile(RemoveAndGetFileMessage msg) { 905 // Prevent removal of files with correct checksum 906 if (ad.hasEntry(msg.getFileName())) { 907 String refchecksum = ad.getCheckSum(msg.getFileName()); 908 if (msg.getCheckSum().equals(refchecksum)) { 909 throw new ArgumentNotValid("Attempting to remove file with correct checksum. File=" + msg.getFileName() 910 + "; with checksum:" + msg.getCheckSum() + ";"); 911 } 912 } 913 914 // checksum ok - try to remove the file 915 String errMsg = "Requesting remove of file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum() 916 + "' from: '" + msg.getReplicaId() + "'"; 917 log.warn(errMsg); 918 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 919 ReplicaClient rc = getReplicaClientFromReplicaId(msg.getReplicaId()); 920 rc.sendRemoveAndGetFileMessage(msg); 921 } 922 923 /** 924 * Close all replicas connections, open loggers, and the ArcRepository handler. 925 */ 926 public void close() { 927 log.info("Closing down ArcRepository"); 928 cleanup(); 929 log.info("Closed ArcRepository"); 930 } 931 932 /** 933 * Closes all connections and nulls the instance. The ArcRepositoryHandler, the AdminData and the ReplicaClients are 934 * closed along with all their connections. 935 */ 936 public void cleanup() { 937 if (arcReposhandler != null) { 938 arcReposhandler.close(); 939 arcReposhandler = null; 940 } 941 if (connectedReplicas != null) { 942 for (ReplicaClient rc : connectedReplicas.values()) { 943 rc.close(); 944 } 945 connectedReplicas.clear(); 946 } 947 if (ad != null) { 948 ad.close(); 949 ad = null; 950 } 951 instance = null; 952 } 953 954}