001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.arcrepository; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.archive.ArchiveSettings; 036import dk.netarkivet.archive.arcrepository.bitpreservation.AdminDataMessage; 037import dk.netarkivet.archive.arcrepository.distribute.ArcRepositoryServer; 038import dk.netarkivet.archive.arcrepository.distribute.StoreMessage; 039import dk.netarkivet.archive.arcrepositoryadmin.Admin; 040import dk.netarkivet.archive.arcrepositoryadmin.AdminFactory; 041import dk.netarkivet.archive.bitarchive.distribute.BatchReplyMessage; 042import dk.netarkivet.archive.bitarchive.distribute.BitarchiveClient; 043import dk.netarkivet.archive.bitarchive.distribute.RemoveAndGetFileMessage; 044import dk.netarkivet.archive.bitarchive.distribute.UploadMessage; 045import dk.netarkivet.archive.checksum.distribute.ChecksumClient; 046import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 047import dk.netarkivet.archive.distribute.ReplicaClient; 048import dk.netarkivet.common.distribute.ChannelID; 049import dk.netarkivet.common.distribute.Channels; 050import dk.netarkivet.common.distribute.JMSConnectionFactory; 051import dk.netarkivet.common.distribute.NetarkivetMessage; 052import dk.netarkivet.common.distribute.NullRemoteFile; 053import dk.netarkivet.common.distribute.RemoteFile; 054import dk.netarkivet.common.distribute.arcrepository.Replica; 055import dk.netarkivet.common.distribute.arcrepository.ReplicaStoreState; 056import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 057import dk.netarkivet.common.exceptions.ArgumentNotValid; 058import dk.netarkivet.common.exceptions.IOFailure; 059import dk.netarkivet.common.exceptions.IllegalState; 060import dk.netarkivet.common.exceptions.UnknownID; 061import dk.netarkivet.common.utils.CleanupIF; 062import dk.netarkivet.common.utils.FileUtils; 063import dk.netarkivet.common.utils.NotificationType; 064import dk.netarkivet.common.utils.NotificationsFactory; 065import dk.netarkivet.common.utils.Settings; 066import dk.netarkivet.common.utils.batch.ChecksumJob; 067 068/** 069 * The Arcrepository handles the communication with the different replicas. This class ensures that arc files are stored 070 * in all available replica and verifies that the storage process succeeded. Retrieval of data from a replica goes 071 * through the JMSArcRepositoryClient that contacts the appropriate (typically nearest) replica and retrieves data from 072 * this archive. Batch execution is sent to the bitarchive replica(s), since batch cannot be executed on checksum 073 * replicas. Correction operations are typically only allowed on one replica. 074 */ 075@SuppressWarnings({"deprecation"}) 076public class ArcRepository implements CleanupIF { 077 078 /** The log. */ 079 private static final Logger log = LoggerFactory.getLogger(ArcRepository.class); 080 081 /** The unique instance (singleton) of this class. */ 082 private static ArcRepository instance; 083 084 /** The administration data associated with the arcrepository. */ 085 private Admin ad; 086 087 /** The class which listens to messages sent to this instance of Arcrepository or its subclasses. */ 088 private ArcRepositoryServer arcReposhandler; 089 090 /** A Map of a Replica and their corresponding ReplicaClient. From this Map the relevant channels can be found. */ 091 private final Map<Replica, ReplicaClient> connectedReplicas = new HashMap<Replica, ReplicaClient>(); 092 093 /** Map from MessageId to arcfiles for which there are outstanding checksum jobs. */ 094 private final Map<String, String> outstandingChecksumFiles = new HashMap<String, String>(); 095 096 /** 097 * Map from filenames to remote files. Used for retrieving a remote file reference while a store operation is in 098 * process. 099 */ 100 private final Map<String, RemoteFile> outstandingRemoteFiles = new HashMap<String, RemoteFile>(); 101 102 private final Map<String, String> outstandingRemoteFilesC = new HashMap<String, String>(); 103 104 105 /** 106 * Map from bitarchive names to Map from filenames to the number of times a file has been attempted uploaded to the 107 * the bitarchive. 108 */ 109 private final Map<String, Map<String, Integer>> uploadRetries = new HashMap<String, Map<String, Integer>>(); 110 111 /** 112 * Constructor for the ArcRepository. Connects the ArcRepository to all BitArchives, and initialises admin data. 113 * 114 * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive. 115 * @throws IllegalState if inconsistent channel info is given in settings. 116 */ 117 protected ArcRepository() throws IOFailure, IllegalState { 118 // UpdateableAdminData Throws IOFailure 119 this.ad = AdminFactory.getInstance(); 120 this.arcReposhandler = new ArcRepositoryServer(this); 121 initialiseReplicaClients(); 122 123 log.info("Starting the ArcRepository"); 124 } 125 126 /** 127 * Returns the unique ArcRepository instance. 128 * 129 * @return the instance. 130 * @throws IOFailure if admin data cannot be read/initialised or we cannot' connect to some bitarchive. 131 * @throws IllegalState if inconsistent channel info is given in settings. 132 */ 133 public static synchronized ArcRepository getInstance() throws IllegalState, IOFailure { 134 if (instance == null) { 135 instance = new ArcRepository(); 136 } 137 return instance; 138 } 139 140 /** 141 * Method for initialising the replica clients. 142 */ 143 private void initialiseReplicaClients() { 144 // Get channels 145 ChannelID[] allBas = Channels.getAllArchives_ALL_BAs(); 146 ChannelID[] anyBas = Channels.getAllArchives_ANY_BAs(); 147 ChannelID[] theBamons = Channels.getAllArchives_BAMONs(); 148 ChannelID[] theCrs = Channels.getAllArchives_CRs(); 149 150 Replica[] replicas = Replica.getKnown().toArray(new Replica[theBamons.length]); 151 // Checks equal number of channels 152 checkChannels(allBas, anyBas, theBamons); 153 154 for (int i = 0; i < allBas.length; i++) { 155 Replica rep = replicas[i]; 156 if (rep.getType() == ReplicaType.BITARCHIVE) { 157 connectedReplicas.put(rep, BitarchiveClient.getInstance(allBas[i], anyBas[i], theBamons[i])); 158 } else { // checksum replica 159 connectedReplicas.put(rep, ChecksumClient.getInstance(theCrs[i])); 160 } 161 } 162 } 163 164 /** 165 * Sanity check for data consistency in the construction of the ArcRepository, specifically that the number of 166 * ALL_BA, ANY_BA, and THE_BAMON queues are all equal to the number of credentials. 167 * 168 * @param allBas The topics for bitarchives 169 * @param anyBas The queues for bitarchives 170 * @param theBamons The queues for bitarchive monitors 171 * @throws IllegalState if inconsistent data is found 172 */ 173 private void checkChannels(ChannelID[] allBas, ChannelID[] anyBas, ChannelID[] theBamons) throws IllegalState { 174 if (theBamons.length != allBas.length || theBamons.length != anyBas.length) { 175 throw new IllegalState("Inconsistent data found in construction of " + "ArcRepository: \n\nALL_BAs: " 176 + Arrays.toString(allBas) + "\nANY_BAs: " + Arrays.toString(anyBas) + "\nTHE_BAMONs: " 177 + Arrays.toString(theBamons)); 178 } 179 } 180 181 /** 182 * Stores a file in all known replicas. It sends out a upload message to all replicas. 183 * 184 * @param rf The remotefile to be stored. 185 * @param replyInfo A StoreMessage used to reply with success or failure. 186 * @throws IOFailure If file couldn't be stored. 187 * @throws ArgumentNotValid If a input parameter is null. 188 */ 189 public synchronized void store(RemoteFile rf, StoreMessage replyInfo) throws IOFailure, ArgumentNotValid { 190 ArgumentNotValid.checkNotNull(rf, "rf"); 191 ArgumentNotValid.checkNotNull(replyInfo, "replyInfo"); 192 193 final String filename = rf.getName(); 194 log.info("Store started: '{}'", filename); 195 196 // Record, that store of this filename is in progress 197 // needed for retrying uploads. 198 if (outstandingRemoteFiles.containsKey(filename)) { 199 log.info("File: '{}' was outstanding from the start.", filename); 200 } 201 outstandingRemoteFiles.put(filename, rf); 202 outstandingRemoteFilesC.put(filename, replyInfo.getPrecomputedChecksum()); // Hack 203 204 if (ad.hasEntry(filename)) { 205 // Any valid entry (and all existing entries are now 206 // known to be valid) by definition has a checksum. 207 if (!rf.getChecksum().equals(ad.getCheckSum(filename))) { 208 String msg = "Attempting to store file '" + filename + "' with a different checksum than before: " 209 + "Old checksum: " + ad.getCheckSum(filename) + ", new checksum: " + rf.getChecksum(); 210 log.warn(msg); 211 replyNotOK(filename, replyInfo); 212 return; 213 } 214 log.debug("Retrying store of already known file '{}'," + " Already completed: {}", filename, 215 isStoreCompleted(filename)); 216 ad.setReplyInfo(filename, replyInfo); 217 } else { 218 ad.addEntry(filename, replyInfo, rf.getChecksum()); 219 } 220 for (Map.Entry<Replica, ReplicaClient> entry : connectedReplicas.entrySet()) { 221 startUpload(rf, entry.getValue(), entry.getKey(), replyInfo); 222 } 223 224 // Check state and reply if needed 225 considerReplyingOnStore(filename); 226 } 227 228 /** 229 * Initiate uploading of file to a specific replica. The corresponding upload record in admin data is created. 230 * 231 * @param rf Remotefile to upload to replica. 232 * @param replicaClient The replica client to upload to. 233 * @param replica The replica where RemoteFile is to be stored. 234 * @param replyInfo 235 */ 236 private synchronized void startUpload(RemoteFile rf, ReplicaClient replicaClient, Replica replica, StoreMessage replyInfo) { 237 final String filename = rf.getName(); 238 log.debug("Upload started of file '{}' to replica '{}'", filename, replica.getId()); 239 240 String replicaChannelId = replica.getIdentificationChannel().getName(); 241 242 if (!ad.hasState(filename, replicaChannelId)) { 243 // New upload 244 ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED); 245 replicaClient.sendUploadMessage(rf, replyInfo.getPrecomputedChecksum()); // Updated to include checksum information 246 } else { 247 // Recovery from old upload 248 ReplicaStoreState storeState = ad.getState(filename, replicaChannelId); 249 switch (storeState) { 250 case UPLOAD_FAILED: 251 case UPLOAD_STARTED: 252 case DATA_UPLOADED: 253 log.debug("Recovery from old upload. StoreState: {}. Sending new Checksum request for verifying " 254 + "whether the file '{}' has been succesfully uploaded for replica: '{}'", storeState, 255 filename, replica); 256 // Unknown condition in bitarchive. Test with checksum job. 257 if (storeState == ReplicaStoreState.UPLOAD_FAILED) { 258 ad.setState(filename, replicaChannelId, ReplicaStoreState.UPLOAD_STARTED); 259 log.info("ReplicaStoreState for file '{}' on replica '{}' changed from '{}' to '{}'", filename, 260 replica, ReplicaStoreState.UPLOAD_FAILED, ReplicaStoreState.UPLOAD_STARTED); 261 } 262 sendChecksumRequestForFile(filename, replicaClient); 263 break; 264 case UPLOAD_COMPLETED: 265 log.warn("Trying to upload file '{}' that already has state UPLOAD_COMPLETED for this replica", 266 filename); 267 break; 268 default: 269 throw new UnknownID("Unknown state: '" + storeState + "'"); 270 } 271 } 272 } 273 274 /** 275 * Method for retrieving the checksum of a specific file from the archive of a specific replica. If the replica is a 276 * BitArchive, then a Batch message with the ChecksumJob is sent. If the replica is a ChecksumArchive, then a 277 * GetChecksumMessage is sent. 278 * 279 * @param filename The file to checksum. 280 * @param replicaClient The client to retrieve the checksum of the file from. 281 */ 282 private void sendChecksumRequestForFile(String filename, ReplicaClient replicaClient) { 283 NetarkivetMessage msg; 284 285 // Retrieve the checksum of the file. 286 msg = replicaClient.sendGetChecksumMessage(Channels.getTheRepos(), filename); 287 288 outstandingChecksumFiles.put(msg.getID(), filename); 289 log.debug("Checksum job message submitted for file '{}' with message id: '{}'", filename, msg.getID()); 290 } 291 292 /** 293 * Test whether the current state is such that we may send a reply for the file we are currently processing, and 294 * send the reply if it is. We reply only when there is an outstanding message to reply to, and a) The file is 295 * reported complete in all replicas or b) No replica has outstanding reply messages AND some replica has reported 296 * failure. 297 * 298 * @param arcFileName The arcfile we consider replying to. 299 */ 300 private synchronized void considerReplyingOnStore(String arcFileName) { 301 if (ad.hasReplyInfo(arcFileName)) { 302 if (isStoreCompleted(arcFileName)) { 303 replyOK(arcFileName, ad.removeReplyInfo(arcFileName)); 304 } else if (oneReplicaHasFailed(arcFileName) && noReplicaInStateUploadStarted(arcFileName)) { 305 replyNotOK(arcFileName, ad.removeReplyInfo(arcFileName)); 306 } 307 } 308 } 309 310 /** 311 * Reply to a store message with status Ok. 312 * 313 * @param arcFileName The file for which we are replying. 314 * @param msg The message to reply to. 315 */ 316 private synchronized void replyOK(String arcFileName, StoreMessage msg) { 317 outstandingRemoteFiles.remove(arcFileName); 318 outstandingRemoteFilesC.remove(arcFileName); 319 clearRetries(arcFileName); 320 log.info("Store OK: '{}'", arcFileName); 321 log.debug("Sending store OK reply to message '{}'", msg); 322 JMSConnectionFactory.getInstance().reply(msg); 323 } 324 325 /** 326 * Reply to a store message with status NotOk. 327 * 328 * @param arcFileName The file for which we are replying. 329 * @param msg The message to reply to. 330 */ 331 private synchronized void replyNotOK(String arcFileName, StoreMessage msg) { 332 outstandingRemoteFiles.remove(arcFileName); 333 outstandingRemoteFilesC.remove(arcFileName); 334 clearRetries(arcFileName); 335 msg.setNotOk("Failure while trying to store ARC file: " + arcFileName); 336 log.warn("Store NOT OK: '{}'", arcFileName); 337 log.debug("Sending store NOT OK reply to message '{}'", msg); 338 JMSConnectionFactory.getInstance().reply(msg); 339 } 340 341 /** 342 * Check if all replicas have reported that storage has been successfully completed. If this is the case return true 343 * else false. 344 * 345 * @param arcfileName The file being stored. 346 * @return true only if all replicas report UPLOAD_COMPLETED. 347 */ 348 private boolean isStoreCompleted(String arcfileName) { 349 // TODO remove quadratic scaling hidden here!! 350 for (Replica rep : connectedReplicas.keySet()) { 351 try { 352 // retrieve the replica channel and check upload status. 353 if (ad.getState(arcfileName, rep.getIdentificationChannel().getName()) != ReplicaStoreState.UPLOAD_COMPLETED) { 354 return false; 355 } 356 } catch (UnknownID e) { 357 // Since no upload status exists, then it cannot be completed! 358 log.warn("Non-fatal error! A replica does not have a upload status for the file '{}'.", arcfileName, e); 359 return false; 360 } 361 } 362 363 // Since no replica has a storestate differing from 'UPLOAD_COMPLETED' 364 // and no errors, then the store is completed for the entire system. 365 return true; 366 } 367 368 /** 369 * Checks if there are at least one replica that has reported that storage has failed. If this is the case return 370 * true else false. 371 * 372 * @param arcFileName the name of file being stored. 373 * @return true only if at least one replica report UPLOAD_FAILED. 374 */ 375 private boolean oneReplicaHasFailed(String arcFileName) { 376 for (Replica rep : connectedReplicas.keySet()) { 377 try { 378 // retrieve the replica channel and check upload status. 379 String repChannel = rep.getIdentificationChannel().getName(); 380 if (ad.getState(arcFileName, repChannel) == ReplicaStoreState.UPLOAD_FAILED) { 381 return true; 382 } 383 } catch (UnknownID e) { 384 log.warn("Non-fatal error. One replica does not have a upload status for the file '{}'.", arcFileName, 385 e); 386 return true; 387 } 388 } 389 return false; 390 } 391 392 /** 393 * Checks if no replicas which has reported that upload is in started state. If this is the case return true else 394 * false. 395 * 396 * @param arcFileName The name of the file being stored. 397 * @return true only if no replica report UPLOAD_STARTED. 398 */ 399 private boolean noReplicaInStateUploadStarted(String arcFileName) { 400 for (Replica rep : connectedReplicas.keySet()) { 401 try { 402 // retrieve the replica channel and check upload status. 403 String repChannelName = rep.getIdentificationChannel().getName(); 404 if (ad.getState(arcFileName, repChannelName) == ReplicaStoreState.UPLOAD_STARTED) { 405 return false; 406 } 407 } catch (UnknownID e) { 408 // When no upload exists, then upload cannot have started. 409 log.warn("Non-fatal error! A replica does not have a upload status for tshe file '{}'.", arcFileName, e); 410 } 411 } 412 413 return true; 414 } 415 416 /** 417 * Returns a replica client based on a replica id. 418 * 419 * @param replicaId the replica id 420 * @return A replica client. 421 * @throws ArgumentNotValid if replicaId parameter is null 422 */ 423 public ReplicaClient getReplicaClientFromReplicaId(String replicaId) throws ArgumentNotValid { 424 ArgumentNotValid.checkNotNullOrEmpty(replicaId, "replicaId"); 425 426 // retrieve the replica client. 427 ReplicaClient rc = connectedReplicas.get(Replica.getReplicaFromId(replicaId)); 428 429 return rc; 430 } 431 432 /** 433 * Finds the identification channel for the replica. If the replica is a BitArchive then the channel to the 434 * BitArchiveMonitor is returned, and if the replica is a ChecksumArchive then the checksum replica channel is 435 * returned. This means that only the channels for the bitarchive should be changed into the bamon channel, e.g. 436 * replacing the ALL_BA and ANY_BA identifiers with THE_BAMON. This change does not affect the checksum channel, and 437 * is therefore also performed on it. 438 * 439 * @param channel A channel to the replica. 440 * @return The name of the channel which identifies the replica. 441 */ 442 private String resolveReplicaChannel(String channel) { 443 return channel.replaceAll("ALL_BA", "THE_BAMON").replaceAll("ANY_BA", "THE_BAMON"); 444 } 445 446 /** 447 * Event handler for upload messages reporting the upload result. Checks the success status of the upload and 448 * updates admin data accordingly. 449 * 450 * @param msg an UploadMessage. 451 */ 452 public synchronized void onUpload(UploadMessage msg) { 453 ArgumentNotValid.checkNotNull(msg, "msg"); 454 log.debug("Received upload reply: {}", msg.toString()); 455 456 String repChannelName = resolveReplicaChannel(msg.getTo().getName()); 457 458 if (msg.isOk()) { 459 processDataUploaded(msg.getArcfileName(), repChannelName); 460 } else { 461 processUploadFailed(msg.getArcfileName(), repChannelName); 462 } 463 } 464 465 /** 466 * Process the report by a bitarchive that a file was correctly uploaded. 467 * <ol> 468 * <il>1. Update the upload, and store states appropriately.</il><br/> 469 * <il>2. Verify that data are correctly stored in the archive by running a batch job on the archived file to 470 * perform a MD5 checksum comparison.</il> <br/> 471 * <il>3. Check if store operation is completed and update admin data if so.</il><br/> 472 * </ol> 473 * 474 * @param arcfileName The arcfile that was uploaded. 475 * @param replicaChannelName The name of the identification channel for the replica that uploaded it (THE_BAMON for 476 * bitarchive and THE_CR for checksum). 477 */ 478 private synchronized void processDataUploaded(String arcfileName, String replicaChannelName) { 479 log.debug("Data uploaded '{}' ,{}", arcfileName, replicaChannelName); 480 ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.DATA_UPLOADED); 481 482 // retrieve the replica 483 Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName); 484 // Verify that the file has been correctly uploaded. 485 sendChecksumRequestForFile(arcfileName, connectedReplicas.get(rep)); 486 } 487 488 /** 489 * Update admin data with the information that upload to a replica failed. The replica record is set to 490 * UPLOAD_FAILED. 491 * 492 * @param arcfileName The file that resulted in an upload failure. 493 * @param replicaChannelName The name of the idenfiticaiton channel for the replica that could not upload the file. 494 */ 495 private void processUploadFailed(String arcfileName, String replicaChannelName) { 496 log.warn("Upload failed for ARC file '{}' to bit archive '{}'", arcfileName, replicaChannelName); 497 498 // Update state to reflect upload failure 499 ad.setState(arcfileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED); 500 considerReplyingOnStore(arcfileName); 501 } 502 503 /** 504 * Called when we receive replies on our checksum batch jobs. 505 * <p> 506 * This does not handle checksum replicas. 507 * 508 * @param msg a BatchReplyMessage. 509 */ 510 public synchronized void onBatchReply(BatchReplyMessage msg) { 511 ArgumentNotValid.checkNotNull(msg, "msg"); 512 log.debug("BatchReplyMessage received: '{}'", msg); 513 514 if (!outstandingChecksumFiles.containsKey(msg.getReplyOfId())) { 515 // Message was NOT expected 516 log.warn("Received batchreply message with unknown originating ID {}\n{}\n. Known IDs are: {}", 517 msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString()); 518 return; 519 } 520 521 String arcfileName = outstandingChecksumFiles.remove(msg.getReplyOfId()); 522 523 // Check incoming message 524 if (!msg.isOk()) { 525 // Checksum job has ended with errors, but can contain checksum 526 // anyway, therefore it is logged - but we try to go on 527 log.warn("Message '" + msg.getID() + "' is reported not okay" + "\nReported error: '" + msg.getErrMsg() 528 + "'" + "\nTrying to process anyway."); 529 } 530 531 // Parse results 532 // if legal result is found it is placed in reportedChecksum 533 // if illegal or errors occurs reportedChecksum is set to "" 534 RemoteFile checksumResFile = msg.getResultFile(); 535 String reportedChecksum = ""; 536 boolean checksumReadOk = false; 537 if (checksumResFile == null) { 538 log.debug("The results of message '{}' was null.\nNo checksum to use for file '{}'", msg.getID(), 539 arcfileName); 540 } else if (checksumResFile instanceof NullRemoteFile) { 541 log.debug( 542 "The results of the message '{}' was instance of NullRemoteFile\nNo checksum to use for file '{}'", 543 msg.getID(), arcfileName); 544 } else { 545 // Read checksum 546 // Copy result to a local file 547 File outputFile = new File(FileUtils.getTempDir(), msg.getReplyTo().getName() + "_" + arcfileName 548 + "_checksumOutput.txt"); 549 try { 550 checksumResFile.copyTo(outputFile); 551 552 // Read checksum from local file 553 reportedChecksum = readChecksum(outputFile, arcfileName); 554 checksumReadOk = true; 555 } catch (IOFailure e) { 556 log.warn("Couldn't read checksumjob " + "output for '{}'", arcfileName, e); 557 } catch (IllegalState e) { 558 log.warn("Couldn't read result of checksumjob " + "in '{}'", arcfileName, e); 559 } 560 561 // Clean up output file and remote file 562 // clean up does NOT result in general error, i.e. 563 // reportedChecksum is NOT set to "" in case of errors 564 try { 565 FileUtils.removeRecursively(outputFile); 566 } catch (IOFailure e) { 567 log.warn("Couldn't clean up checksumjob " + "output file '{}'", outputFile, e); 568 } 569 try { 570 checksumResFile.cleanup(); 571 } catch (IOFailure e) { 572 log.warn("Couldn't clean up checksumjob " + "remote file '{}'", checksumResFile.getName(), e); 573 } 574 } 575 576 // Process result 577 String orgCheckSum = ad.getCheckSum(arcfileName); 578 String repChannel = resolveReplicaChannel(msg.getReplyTo().getName()); 579 processCheckSum(arcfileName, repChannel, orgCheckSum, reportedChecksum, msg.isOk() && checksumReadOk); 580 } 581 582 /** 583 * The message for handling the results of the GetChecksumMessage. 584 * 585 * @param msg The message containing the checksum of a specific file. 586 */ 587 public synchronized void onChecksumReply(GetChecksumMessage msg) { 588 ArgumentNotValid.checkNotNull(msg, "msg"); 589 590 log.debug("Received the reply to a GetChecksumMessage with ID: '{}'", msg.getID()); 591 592 // handle the case when unwanted reply. 593 if (!outstandingChecksumFiles.containsKey(msg.getID())) { 594 log.warn("Received GetChecksumMessage with unknown originating ID {}\n{}\n. Known IDs are: {}", 595 msg.getReplyOfId(), msg.toString(), outstandingChecksumFiles.keySet().toString()); 596 return; 597 } 598 599 String arcfileName = outstandingChecksumFiles.remove(msg.getID()); 600 601 // Check incoming message 602 if (!msg.isOk()) { 603 // Checksum job has ended with errors, but can contain checksum 604 // anyway, therefore it is logged - but we try to go on 605 log.warn("Message '{}' is reported not okay\nReported error: '{}'\nTrying to process anyway.", msg.getID(), 606 msg.getErrMsg()); 607 } 608 609 String reportedChecksum = msg.getChecksum(); 610 611 // check the checksum 612 if (reportedChecksum == null) { 613 // set the reported checksum to empty, like for BAs. 614 reportedChecksum = ""; 615 } 616 617 // The checksum was not read by the batchjob, therefore this variable 618 // cannot be false 619 boolean checksumReadOk = true; 620 621 // process the checksum. 622 String orgChecksum = ad.getCheckSum(arcfileName); 623 if (orgChecksum == null) { 624 throw new IllegalState("The admin checksum for file '" + arcfileName + "' is null. Should never happen."); 625 } 626 String repChannelName = resolveReplicaChannel(msg.getTo().getName()); 627 processCheckSum(arcfileName, repChannelName, orgChecksum, reportedChecksum, checksumReadOk); 628 } 629 630 /** 631 * Reads output from a checksum file. Only the first instance of the desired file will be used. If other filenames 632 * are encountered than the wanted one, they will be logged at level warning, as that is indicative of serious 633 * errors. Having more than one instance of the desired file merely means it was found in several bitarchives, which 634 * is not our problem. 635 * 636 * @param outputFile The file to read checksum from. 637 * @param arcfileName The arcfile to find checksum for. 638 * @return The checksum, or the empty string if no checksum found for arcfilename. 639 * @throws IOFailure If any error occurs reading the file. 640 * @throws IllegalState if the read format is wrong 641 */ 642 private String readChecksum(File outputFile, String arcfileName) throws IOFailure, IllegalState { 643 // List of lines in batch (checksum job) output file 644 List<String> lines = FileUtils.readListFromFile(outputFile); 645 // List of checksums found in batch (checksum job) output file 646 List<String> checksumList = new ArrayList<String>(); 647 648 // Extract checksums for arcfile from lines 649 // If errors occurs then throw exception 650 for (String line : lines) { 651 String readFileName = ""; 652 String checksum = ""; 653 String[] tokens = line.split(ChecksumJob.STRING_FILENAME_SEPARATOR); 654 boolean ignoreLine = false; 655 656 // Check line format 657 ignoreLine = (tokens.length == 0 || line.isEmpty()); 658 if (tokens.length != 2 && !ignoreLine) { // wrong format 659 throw new IllegalState("Read checksum line had unexpected format '" + line + "'"); 660 } 661 662 // Check checksum and arc-file name in line 663 if (!ignoreLine) { 664 readFileName = tokens[0]; 665 checksum = tokens[1]; 666 if (checksum.length() == 0) { // wrong format of checksum 667 // do not exit - there may be more checksums 668 ignoreLine = true; 669 log.warn("There were an empty checksum in result for checksums to arc-file '{}' (line: '{}')", 670 arcfileName, line); 671 } else { 672 if (!readFileName.equals(arcfileName)) { // wrong arcfile 673 // do not exit - there may be more checksums 674 ignoreLine = true; 675 log.warn( 676 "There were an unexpected arc-file name in checksum result for arc-file '{}' (line: '{}')", 677 arcfileName, line); 678 } 679 } 680 } 681 682 // If previously checksum found, then check if they are different. 683 if (checksumList.size() > 0 && !ignoreLine && !checksum.equals(checksumList.get(checksumList.size() - 1))) { 684 String errMsg = "The arc-file '" + arcfileName + "' was found with two different checksums: " 685 + checksumList.get(0) + " and " + checksum + ". Last line: '" + line + "'."; 686 log.warn(errMsg); 687 throw new IllegalState(errMsg); 688 } 689 690 // Add error free non-empty found checksum in list 691 if (!ignoreLine) { 692 checksumList.add(checksum); 693 } 694 } 695 696 // Check that checksum list contain a result, 697 // log if it has more than one result 698 if (checksumList.size() > 1) { 699 // Log and proceed - the checksums are equal 700 log.warn("Arcfile '{}' was found with {} occurrences of the checksum: {}", arcfileName, 701 checksumList.size(), checksumList.get(0)); 702 } 703 704 if (checksumList.size() == 0) { 705 log.debug("Arcfile '{}' not found in lines of checksum output file '{}': {}", arcfileName, outputFile, 706 FileUtils.readListFromFile(outputFile)); 707 return ""; 708 } else { 709 return checksumList.get(0); 710 } 711 } 712 713 /** 714 * Process reporting of a checksum from a bitarchive for a specific file as part of a store operation for the file. 715 * Verify that the checksum is correct, update the BitArchiveStoreState state. Invariant: upload-state is changed or 716 * retry count is increased. 717 * 718 * @param arcFileName The file being stored. 719 * @param replicaChannelName The id of the replica reporting a checksum. 720 * @param orgChecksum The original checksum. 721 * @param reportedChecksum The checksum calculated by the replica. This value is "", if an error has occurred 722 * (except reply NOT ok from replica). 723 * @param checksumReadOk Tells whether the checksum was read ok by batch job. 724 */ 725 private synchronized void processCheckSum(String arcFileName, String replicaChannelName, String orgChecksum, 726 String reportedChecksum, boolean checksumReadOk) { 727 log.debug("Checksum received for file '{}'... processing", arcFileName); 728 ArgumentNotValid.checkNotNullOrEmpty(arcFileName, "String arcfileName"); 729 ArgumentNotValid.checkNotNullOrEmpty(replicaChannelName, "String replicaChannelName"); 730 ArgumentNotValid.checkNotNullOrEmpty(orgChecksum, "String orgChecksum"); 731 ArgumentNotValid.checkNotNull(reportedChecksum, "String reportedChecksum"); 732 733 // Log if we do not find file outstanding 734 // we proceed anyway in order to be sure to update stae of file 735 if (!outstandingRemoteFiles.containsKey(arcFileName)) { 736 log.warn("Could not find arc-file as outstanding remote file: '{}'", arcFileName); 737 } 738 739 // If everything works fine complete process of this checksum 740 if (orgChecksum.equals(reportedChecksum) && !reportedChecksum.isEmpty()) { 741 742 // Checksum is valid and job matches expected results 743 ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_COMPLETED); 744 745 // Find out if and how to make general reply on store() 746 // remove file from outstandingRemoteFiles if a reply is given 747 considerReplyingOnStore(arcFileName); 748 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 749 return; 750 } 751 752 // Log error or retry upload 753 if (reportedChecksum.isEmpty()) { // no checksum found 754 if (checksumReadOk) { // no errors in finding no checksum 755 if (retryOk(replicaChannelName, arcFileName)) { // we can retry 756 if (outstandingRemoteFiles.containsKey(arcFileName)) { 757 RemoteFile rf = outstandingRemoteFiles.get(arcFileName); 758 String preComputedChecksum = outstandingRemoteFilesC.get(arcFileName); 759 // Retry upload only if allowed and in case we are sure 760 // that the empty checksum means that the arcfile is not 761 // in the archive 762 log.debug("Retrying upload of '{}'", arcFileName); 763 ad.setState(rf.getName(), replicaChannelName, ReplicaStoreState.UPLOAD_STARTED); 764 // retrieve the replica from the name of the channel. 765 Replica rep = Channels.retrieveReplicaFromIdentifierChannel(replicaChannelName); 766 connectedReplicas.get(rep).sendUploadMessage(rf, preComputedChecksum); 767 incRetry(replicaChannelName, arcFileName); 768 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 769 return; 770 } // else logging was already done above 771 } else { // cannot retry 772 log.warn("Cannot do more retry upload of remote file: '{}' to '{}', reported checksum='{}'", 773 arcFileName, replicaChannelName, reportedChecksum); 774 } 775 } else { // error in getting checksum 776 log.warn( 777 "Cannot retry upload of remote file: '{}' to '{}', reported checksum='{}' due to earlier batchjob error.", 778 arcFileName, replicaChannelName, reportedChecksum); 779 } 780 } else { // non empty checksum 781 if (!orgChecksum.equals(reportedChecksum)) { 782 log.warn("Cannot upload (wrong checksum) '{}' to '{}', reported checksum='{}'", arcFileName, 783 replicaChannelName, reportedChecksum); 784 } else { 785 log.warn("Cannot upload (unknown reason) '{}' to '{}', reported checksum='{}'", arcFileName, 786 replicaChannelName, reportedChecksum); 787 } 788 } 789 790 // This point is reached if there is some kind of (logged) error, i.e. 791 // - the file has not been accepted as completed 792 // - the file has not been sent to retry of upload 793 ad.setState(arcFileName, replicaChannelName, ReplicaStoreState.UPLOAD_FAILED); 794 considerReplyingOnStore(arcFileName); 795 log.debug("Checksum processing for file '{}'... completed.", arcFileName); 796 } 797 798 /** 799 * Keep track of upload retries of an arcfile to an archive. 800 * 801 * @param replicaChannelName The name of a given replica. 802 * @param arcfileName The name of a given ARC file 803 * @return true if it is ok to retry an upload of arcfileName to the replica through the replicaChannelName. 804 */ 805 private boolean retryOk(String replicaChannelName, String arcfileName) { 806 Map<String, Integer> bitarchiveRetries = uploadRetries.get(replicaChannelName); 807 if (bitarchiveRetries == null) { 808 return true; 809 } 810 Integer retryCount = bitarchiveRetries.get(arcfileName); 811 if (retryCount == null) { 812 return true; 813 } 814 815 if (retryCount >= Settings.getInt(ArchiveSettings.ARCREPOSITORY_UPLOAD_RETRIES)) { 816 return false; 817 } 818 819 return true; 820 } 821 822 /** 823 * Increment the number of upload retries. 824 * 825 * @param replicaChannelName The name of the identification channel for the replica. 826 * @param arcfileName The name of a given ARC file. 827 */ 828 private void incRetry(String replicaChannelName, String arcfileName) { 829 Map<String, Integer> replicaRetries = uploadRetries.get(replicaChannelName); 830 if (replicaRetries == null) { 831 replicaRetries = new HashMap<String, Integer>(); 832 uploadRetries.put(replicaChannelName, replicaRetries); 833 } 834 835 Integer retryCount = replicaRetries.get(arcfileName); 836 if (retryCount == null) { 837 replicaRetries.put(arcfileName, Integer.valueOf(1)); 838 return; 839 } 840 841 replicaRetries.put(arcfileName, Integer.valueOf(retryCount + 1)); 842 } 843 844 /** 845 * Remove all retry tracking information for the arcfile. 846 * 847 * @param arcfileName The name of a given ARC file 848 */ 849 private void clearRetries(String arcfileName) { 850 for (String replicaChannelName : uploadRetries.keySet()) { 851 Map<String, Integer> baretries = uploadRetries.get(replicaChannelName); 852 baretries.remove(arcfileName); 853 } 854 } 855 856 /** 857 * Change admin data entry for a given file. 858 * <p> 859 * The following information is contained in the given AdminDataMessage: 1) The name of the given file to change the 860 * entry for, 2) the name of the bitarchive to modify the entry for, 3) a boolean that says whether or not to 861 * replace the checksum for the entry for the given file in AdminData, 4) a replacement for the case where the 862 * former value is true. 863 * 864 * @param msg an AdminDataMessage object 865 */ 866 public void updateAdminData(AdminDataMessage msg) { 867 868 if (!ad.hasEntry(msg.getFileName())) { 869 throw new ArgumentNotValid("No admin entry exists for the file '" + msg.getFileName() + "'"); 870 } 871 872 String message = "Handling request to change admin data for '" + msg.getFileName() + "'. "; 873 // add information if store-state is changed. 874 if (msg.isChangeStoreState()) { 875 message += "Change store state to " + msg.getNewvalue(); 876 } 877 // add information if checksum is changed. 878 if (msg.isChangeChecksum()) { 879 message += "Change checksum to " + msg.getChecksum(); 880 } 881 // log the message. 882 log.warn(message); 883 NotificationsFactory.getInstance().notify(message, NotificationType.WARNING); 884 885 if (msg.isChangeStoreState()) { 886 String replicaChannelName = Replica.getReplicaFromId(msg.getReplicaId()).getIdentificationChannel() 887 .getName(); 888 ad.setState(msg.getFileName(), replicaChannelName, msg.getNewvalue()); 889 } 890 891 if (msg.isChangeChecksum()) { 892 ad.setCheckSum(msg.getFileName(), msg.getChecksum()); 893 } 894 } 895 896 /** 897 * Forwards a RemoveAndGetFileMessage to the designated bitarchive. Before forwarding the message it is verified 898 * that the checksum of the file to remove differs from the registered checksum of the file to remove. If no 899 * registration exists for the file to remove the message is always forwarded. 900 * 901 * @param msg the message to forward to a bitarchive 902 */ 903 public void removeAndGetFile(RemoveAndGetFileMessage msg) { 904 // Prevent removal of files with correct checksum 905 if (ad.hasEntry(msg.getFileName())) { 906 String refchecksum = ad.getCheckSum(msg.getFileName()); 907 if (msg.getCheckSum().equals(refchecksum)) { 908 throw new ArgumentNotValid("Attempting to remove file with correct checksum. File=" + msg.getFileName() 909 + "; with checksum:" + msg.getCheckSum() + ";"); 910 } 911 } 912 913 // checksum ok - try to remove the file 914 String errMsg = "Requesting remove of file '" + msg.getFileName() + "' with checksum '" + msg.getCheckSum() 915 + "' from: '" + msg.getReplicaId() + "'"; 916 log.warn(errMsg); 917 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 918 ReplicaClient rc = getReplicaClientFromReplicaId(msg.getReplicaId()); 919 rc.sendRemoveAndGetFileMessage(msg); 920 } 921 922 /** 923 * Close all replicas connections, open loggers, and the ArcRepository handler. 924 */ 925 public void close() { 926 log.info("Closing down ArcRepository"); 927 cleanup(); 928 log.info("Closed ArcRepository"); 929 } 930 931 /** 932 * Closes all connections and nulls the instance. The ArcRepositoryHandler, the AdminData and the ReplicaClients are 933 * closed along with all their connections. 934 */ 935 public void cleanup() { 936 if (arcReposhandler != null && arcReposhandler instanceof ArcRepositoryServer) { 937 ((ArcRepositoryServer) arcReposhandler).close(); 938 arcReposhandler = null; 939 } 940 if (connectedReplicas != null) { 941 for (ReplicaClient rc : connectedReplicas.values()) { 942 rc.close(); 943 } 944 connectedReplicas.clear(); 945 } 946 if (ad != null) { 947 ad.close(); 948 ad = null; 949 } 950 instance = null; 951 } 952 953}