001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.bitarchive.distribute; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Observable; 032import java.util.Observer; 033 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import dk.netarkivet.archive.ArchiveSettings; 038import dk.netarkivet.archive.bitarchive.BitarchiveMonitor; 039import dk.netarkivet.archive.checksum.distribute.CorrectMessage; 040import dk.netarkivet.archive.checksum.distribute.GetAllChecksumsMessage; 041import dk.netarkivet.archive.checksum.distribute.GetAllFilenamesMessage; 042import dk.netarkivet.archive.checksum.distribute.GetChecksumMessage; 043import dk.netarkivet.archive.distribute.ArchiveMessageHandler; 044import dk.netarkivet.common.CommonSettings; 045import dk.netarkivet.common.distribute.Channels; 046import dk.netarkivet.common.distribute.JMSConnection; 047import dk.netarkivet.common.distribute.JMSConnectionFactory; 048import dk.netarkivet.common.distribute.NetarkivetMessage; 049import dk.netarkivet.common.distribute.RemoteFile; 050import dk.netarkivet.common.distribute.RemoteFileFactory; 051import dk.netarkivet.common.exceptions.ArgumentNotValid; 052import dk.netarkivet.common.exceptions.IOFailure; 053import dk.netarkivet.common.exceptions.UnknownID; 054import dk.netarkivet.common.utils.CleanupIF; 055import dk.netarkivet.common.utils.FileUtils; 056import dk.netarkivet.common.utils.KeyValuePair; 057import dk.netarkivet.common.utils.NotificationType; 058import dk.netarkivet.common.utils.NotificationsFactory; 059import dk.netarkivet.common.utils.Settings; 060import dk.netarkivet.common.utils.batch.ChecksumJob; 061import dk.netarkivet.common.utils.batch.FileBatchJob; 062import dk.netarkivet.common.utils.batch.FileListJob; 063 064/** 065 * Class representing message handling for the monitor for bitarchives. The monitor is used for sending out and 066 * combining the results of executing batch jobs. 067 * <p> 068 * Batch jobs are received on the BAMON-channel, and resent to all bitarchives, that are considered live by the 069 * bitarchive monitor. 070 * <p> 071 * Lets the bitarchive monitor handle batch replies from the bitarchives, and observes it for when the batch job is 072 * done. Then constructs a reply from the data given, and sends it back to the originator. 073 * <p> 074 * Also registers signs of life from the bitarchives in the bitarchive monitor. 075 */ 076public class BitarchiveMonitorServer extends ArchiveMessageHandler implements Observer, CleanupIF { 077 078 /** The unique instance of this class. */ 079 private static BitarchiveMonitorServer instance; 080 081 /** Logger. */ 082 private static final Logger log = LoggerFactory.getLogger(BitarchiveMonitorServer.class); 083 084 /** The jms connection used. */ 085 private final JMSConnection con = JMSConnectionFactory.getInstance(); 086 087 /** Object that handles logical operations. */ 088 private BitarchiveMonitor bamon; 089 090 /** Map for managing the messages, which are made into batchjobs. The String is the ID of the message. */ 091 private Map<String, NetarkivetMessage> batchConversions = new HashMap<String, NetarkivetMessage>(); 092 093 /** 094 * Map for containing the batch-message-ids and the batchjobs, for the result files to be post-processed before 095 * returned back. 096 */ 097 private Map<String, FileBatchJob> batchjobs = new HashMap<String, FileBatchJob>(); 098 099 /** 100 * The map for managing the CorrectMessages. This involves three stages. 101 * <p> 102 * In the first, a RemoveAndGetFileMessage is sent, and then the CorrectMessage is put in the map along the ID of 103 * the RemoveAndGetFileMessage. 104 * <p> 105 * In the second stage, the reply of the RemoveAndGetFileMessage is used to extract the CorrectMessage from the Map. 106 * The CorrectMessage is then updated with the results from the RemoveAndGetFileMessage. Then an UploadMessage is 107 * send with the 'correct' file, where the ID of the UploadMessage is put into the map along the CorrectMessage. 108 * <p> 109 * In the third stage, the reply of the UploadMessage is used to extract the CorrectMessage from the map again, and 110 * the results of the UploadMessage is used to update the UploadMessage, which is then returned. 111 */ 112 private Map<String, CorrectMessage> correctMessages = new HashMap<String, CorrectMessage>(); 113 114 /** 115 * Creates an instance of a BitarchiveMonitorServer. 116 * 117 * @throws IOFailure - if an error with the JMSConnection occurs 118 */ 119 protected BitarchiveMonitorServer() throws IOFailure { 120 bamon = BitarchiveMonitor.getInstance(); 121 bamon.addObserver(this); 122 con.setListener(Channels.getTheBamon(), this); 123 log.info("BitarchiveMonitorServer instantiated. Listening to queue: '{}'.", Channels.getTheBamon()); 124 } 125 126 /** 127 * Returns the unique instance of a BitarchiveMonitorServer. 128 * 129 * @return the instance 130 * @throws IOFailure - if an error with the JMSConnection occurs 131 */ 132 public static synchronized BitarchiveMonitorServer getInstance() throws IOFailure { 133 if (instance == null) { 134 instance = new BitarchiveMonitorServer(); 135 } 136 return instance; 137 } 138 139 /** 140 * This is the message handling method for BatchMessages. 141 * <p> 142 * A new BatchMessage is created with the same Job as the incoming BatchMessage and sent off to all live 143 * bitarchives. 144 * <p> 145 * The incoming and outgoing batch messages are then registered at the bitarchive monitor. 146 * 147 * @param inbMsg The message received 148 * @throws ArgumentNotValid If the BatchMessage is null. 149 */ 150 public void visit(BatchMessage inbMsg) throws ArgumentNotValid { 151 ArgumentNotValid.checkNotNull(inbMsg, "BatchMessage inbMsg"); 152 153 log.info("Received BatchMessage\n{}", inbMsg.toString()); 154 try { 155 BatchMessage outbMsg = new BatchMessage(Channels.getAllBa(), inbMsg.getJob(), 156 Settings.get(CommonSettings.USE_REPLICA_ID)); 157 con.send(outbMsg); 158 long batchTimeout = inbMsg.getJob().getBatchJobTimeout(); 159 // if batch time out is not a positive number, then use settings. 160 if (batchTimeout <= 0) { 161 batchTimeout = Settings.getLong(ArchiveSettings.BITARCHIVE_BATCH_JOB_TIMEOUT); 162 } 163 bamon.registerBatch(inbMsg.getID(), inbMsg.getReplyTo(), outbMsg.getID(), batchTimeout); 164 batchjobs.put(inbMsg.getID(), inbMsg.getJob()); 165 } catch (Exception e) { 166 log.warn("Trouble while handling batch request '{}'", inbMsg, e); 167 } 168 } 169 170 /** 171 * This is the message handling method for BatchEndedMessages. 172 * <p> 173 * This delegates the handling of the reply to the bitarchive monitor, which will notify us if the batch job is now 174 * done. 175 * 176 * @param beMsg The BatchEndedMessage to be handled. 177 * @throws ArgumentNotValid If the BatchEndedMessage is null. 178 */ 179 public void visit(final BatchEndedMessage beMsg) throws ArgumentNotValid { 180 ArgumentNotValid.checkNotNull(beMsg, "BatchEndedMessage beMsg"); 181 182 log.debug("Received batch ended from bitarchive '{}': {}", beMsg.getBitarchiveID(), beMsg); 183 bamon.signOfLife(beMsg.getBitarchiveID()); 184 try { 185 new Thread() { 186 public void run() { 187 // retrieve the error messages. 188 String errorMessages = null; 189 if (!beMsg.isOk()) { 190 errorMessages = beMsg.getErrMsg(); 191 } 192 // send reply to the bitarchive. 193 bamon.bitarchiveReply(beMsg.getOriginatingBatchMsgID(), beMsg.getBitarchiveID(), 194 beMsg.getNoOfFilesProcessed(), beMsg.getFilesFailed(), beMsg.getRemoteFile(), 195 errorMessages, beMsg.getExceptions()); 196 } 197 }.start(); 198 } catch (Exception e) { 199 log.warn("Trouble while handling bitarchive reply '{}'", beMsg, e); 200 } 201 } 202 203 /** 204 * This is the message handling method for HeartBeatMessages. 205 * <p> 206 * Registers a sign of life from a bitarchive. 207 * 208 * @param hbMsg the message that represents the sign of life 209 * @throws ArgumentNotValid If the HeartBeatMessage is null. 210 */ 211 public void visit(HeartBeatMessage hbMsg) throws ArgumentNotValid { 212 ArgumentNotValid.checkNotNull(hbMsg, "HeartBeatMessage hbMsg"); 213 214 try { 215 bamon.signOfLife(hbMsg.getBitarchiveID()); 216 } catch (Exception e) { 217 log.warn("Trouble while handling bitarchive sign of life '{}'", hbMsg, e); 218 } 219 } 220 221 /** 222 * This is the first step in correcting a bad entry. 223 * <p> 224 * In the first stage, a RemoveAndGetFileMessage is sent, and then the CorrectMessage is put in the map along the ID 225 * of the RemoveAndGetFileMessage. 226 * <p> 227 * See the correctMessages Map. 228 * 229 * @param cm The CorrectMessage to handle. 230 * @throws ArgumentNotValid If the CorrectMessage is null. 231 */ 232 public void visit(CorrectMessage cm) throws ArgumentNotValid { 233 ArgumentNotValid.checkNotNull(cm, "CorrectMessage cm"); 234 log.info("Receiving CorrectMessage: {}", cm); 235 236 try { 237 // Create the RemoveAndGetFileMessage for removing the file. 238 RemoveAndGetFileMessage ragfm = new RemoveAndGetFileMessage(Channels.getAllBa(), Channels.getTheBamon(), 239 cm.getArcfileName(), cm.getReplicaId(), cm.getIncorrectChecksum(), cm.getCredentials()); 240 241 // Send the message. 242 con.send(ragfm); 243 244 log.info("Step 1 of handling CorrectMessage. Sending RemoveAndGetFileMessage: {}", ragfm); 245 246 // Put the CorrectMessage into the map along the id of the 247 // RemoveAndGetFileMessage 248 correctMessages.put(ragfm.getID(), cm); 249 } catch (Exception e) { 250 String errMsg = "An error occurred during step 1 of handling " 251 + " the CorrectMessage: sending RemoveAndGetFileMessage"; 252 log.warn(errMsg, e); 253 cm.setNotOk(e); 254 con.reply(cm); 255 } 256 } 257 258 /** 259 * This is the second step in correcting a bad entry. 260 * <p> 261 * In the second stage, the reply of the RemoveAndGetFileMessage is used to extract the CorrectMessage from the Map. 262 * The CorrectMessage is then updated with the results from the RemoveAndGetFileMessage. Then an UploadMessage is 263 * send with the 'correct' file, where the ID of the UploadMessage is put into the map along the CorrectMessage. 264 * <p> 265 * See the correctMessages Map. 266 * 267 * @param msg The RemoteAndGetFileMessage. 268 * @throws ArgumentNotValid If the RemoveAndGetFileMessage is null. 269 */ 270 public void visit(RemoveAndGetFileMessage msg) throws ArgumentNotValid { 271 ArgumentNotValid.checkNotNull(msg, "RemoveAndGetFileMessage msg"); 272 log.info("Receiving RemoveAndGetFileMessage (presumably reply): {}", msg); 273 274 // Retrieve the correct message 275 CorrectMessage cm = correctMessages.remove(msg.getID()); 276 277 // If the RemoveAndGetFileMessage has failed, then the CorrectMessage 278 // has also failed, and should be returned as a fail. 279 if (!msg.isOk()) { 280 String errMsg = "The RemoveAndGetFileMessage has returned the " + "error: '" + msg.getErrMsg() 281 + "'. Reply to CorrectMessage " + "with the same error."; 282 log.warn(errMsg); 283 cm.setNotOk(errMsg); 284 con.reply(cm); 285 return; 286 } 287 288 try { 289 // update the correct message. 290 cm.setRemovedFile(msg.getRemoteFile()); 291 292 // Create the upload message, send it. 293 UploadMessage um = new UploadMessage(Channels.getAllBa(), Channels.getTheBamon(), cm.getCorrectFile()); 294 con.send(um); 295 log.info("Step 2 of handling CorrectMessage. Sending UploadMessage: {}", um); 296 297 // Store the CorrectMessage along with the ID of the UploadMessage. 298 correctMessages.put(um.getID(), cm); 299 } catch (Exception e) { 300 String errMsg = "An error occurred during step 2 of handling " 301 + " the CorrectMessage: sending UploadMessage"; 302 log.warn(errMsg, e); 303 cm.setNotOk(e); 304 con.reply(cm); 305 } 306 } 307 308 /** 309 * This is the third step in correcting a bad entry. 310 * <p> 311 * In the third stage, the reply of the UploadMessage is used to extract the CorrectMessage from the map again, and 312 * the results of the UploadMessage is used to update the UploadMessage, which is then returned. 313 * <p> 314 * See the correctMessages Map. 315 * 316 * @param msg The reply of the UploadMessage. 317 * @throws ArgumentNotValid If the UploadMessage is null. 318 */ 319 public void visit(UploadMessage msg) throws ArgumentNotValid { 320 ArgumentNotValid.checkNotNull(msg, "UploadMessage msg"); 321 log.info("Receiving a reply to an UploadMessage: {}", msg); 322 323 // retrieve the CorrectMessage. 324 CorrectMessage cm = correctMessages.remove(msg.getID()); 325 326 // handle potential errors. 327 if (!msg.isOk()) { 328 cm.setNotOk(msg.getErrMsg()); 329 } 330 331 // reply to the correct message. 332 con.reply(cm); 333 log.info("Step 3 of handling CorrectMessage. Sending reply for CorrectMessage: '{}'", cm); 334 } 335 336 /** 337 * Method for handling the GetAllChecksumsMessage. This message will be made into a batchjob, which will executed on 338 * the bitarchives. The reply to the batchjob will be handled and uses as reply to the GetAllChecksumsMessage. 339 * 340 * @param msg The GetAllChecksumsMessage, which will be made into a batchjob and sent to the bitarchives. 341 * @throws ArgumentNotValid If the GetAllChecksumsMessage is null. 342 */ 343 public void visit(GetAllChecksumsMessage msg) throws ArgumentNotValid { 344 ArgumentNotValid.checkNotNull(msg, "GetAllChecksumsMessage msg"); 345 346 log.info("Receiving GetAllChecksumsMessage '{}'", msg); 347 348 // Create batchjob for the GetAllChecksumsMessage. 349 ChecksumJob cj = new ChecksumJob(); 350 351 // Execute the batchjob. 352 executeConvertedBatch(cj, msg); 353 } 354 355 /** 356 * Method for handling the GetAllFilenamesMessage. The GetAllFilenamesMessage will be made into a filelist batchjob, 357 * which will be sent to the bitarchives. The reply to the batchjob will then be used as reply to the 358 * GetAllFilenamesMessage. 359 * 360 * @param msg The GetAllFilenamesMessage, which will be made into a batchjob and sent to the bitarchives. 361 * @throws ArgumentNotValid If the GetAllFilenamesMessage is null. 362 */ 363 public void visit(GetAllFilenamesMessage msg) throws ArgumentNotValid { 364 ArgumentNotValid.checkNotNull(msg, "GetAllFilenamesMessage msg"); 365 366 log.info("Receiving GetAllFilenamesMessage '{}'", msg); 367 368 // Create batchjob for the GetAllChecksumsMessage. 369 FileListJob flj = new FileListJob(); 370 371 // Execute the batchjob. 372 executeConvertedBatch(flj, msg); 373 } 374 375 /** 376 * Method for handling the GetChecksumMessage. This is made into the batchjob ChecksumsJob which will be limitted to 377 * the specific filename. The batchjob will be executed on the bitarchives and the reply to the batchjob will be 378 * used as reply to the GetChecksumMessage. 379 * 380 * @param msg The GetAllChecksumsMessage, which will be made into a batchjob and sent to the bitarchives. 381 * @throws ArgumentNotValid If the GetChecksumMessage is null. 382 */ 383 public void visit(GetChecksumMessage msg) throws ArgumentNotValid { 384 ArgumentNotValid.checkNotNull(msg, "GetChecksumMessage msg"); 385 386 log.info("Receiving GetChecksumsMessage '{}'", msg); 387 388 // Create batchjob for the GetAllChecksumsMessage. 389 ChecksumJob cj = new ChecksumJob(); 390 cj.processOnlyFileNamed(msg.getArcfileName()); 391 cj.setBatchJobTimeout(Settings.getLong(ArchiveSettings.SINGLE_CHECKSUM_TIMEOUT)); 392 393 // Execute the batchjob. 394 executeConvertedBatch(cj, msg); 395 } 396 397 /** 398 * Method for executing messages converted into batchjobs. 399 * 400 * @param job The job to execute. 401 * @param msg The message which is converted into the batchjob. 402 */ 403 private void executeConvertedBatch(FileBatchJob job, NetarkivetMessage msg) { 404 try { 405 BatchMessage outbMsg = new BatchMessage(Channels.getAllBa(), job, 406 Settings.get(CommonSettings.USE_REPLICA_ID)); 407 con.send(outbMsg); 408 409 long batchTimeout = job.getBatchJobTimeout(); 410 // if batch time out is not a positive number, then use settings. 411 if (batchTimeout <= 0) { 412 batchTimeout = Settings.getLong(ArchiveSettings.BITARCHIVE_BATCH_JOB_TIMEOUT); 413 } 414 bamon.registerBatch(msg.getID(), msg.getReplyTo(), outbMsg.getID(), batchTimeout); 415 batchjobs.put(msg.getID(), job); 416 // Remember that the message is a batch conversion. 417 log.info("{}", outbMsg); 418 419 batchConversions.put(msg.getID(), msg); 420 } catch (Throwable t) { 421 log.warn("Unable to handle batch '{}'request due to unexpected exception", msg, t); 422 msg.setNotOk(t); 423 con.reply(msg); 424 } 425 } 426 427 /** 428 * Handles notifications from the bitarchive monitor, that a batch job is complete. 429 * <p> 430 * Spawns a new thread in which all the results are wrapped and sent back in a reply to the originator of this batch 431 * request. 432 * 433 * @param o the observable object. Should always be the bitarchive monitor. If it isn't, this notification will be 434 * logged and ignored. 435 * @param arg an argument passed from the bitarchive monitor. This should always be a batch status object indicating 436 * the end of that batchjob. If it isn't, this notification will be logged and ignored. 437 */ 438 public void update(Observable o, final Object arg) { 439 if (o != bamon) { 440 log.warn("Received unexpected notification from '" + o + "'"); 441 return; 442 } 443 if (arg == null) { 444 log.warn("Received unexpected notification with no argument"); 445 return; 446 } 447 if (!(arg instanceof dk.netarkivet.archive.bitarchive.BitarchiveMonitor.BatchJobStatus)) { 448 log.warn("Received notification with incorrect argument type {}:'{}''", arg.getClass(), arg); 449 return; 450 } 451 new Thread() { 452 public void run() { 453 // convert the input argument. 454 BitarchiveMonitor.BatchJobStatus bjs = (BitarchiveMonitor.BatchJobStatus) arg; 455 456 // Check whether converted message or actual batchjob. 457 if (batchConversions.containsKey(bjs.originalRequestID)) { 458 replyConvertedBatch(bjs); 459 } else { 460 doBatchReply(bjs); 461 } 462 } 463 }.start(); 464 } 465 466 /** 467 * This method sends a reply based on the information from bitarchives received and stored in the given batch job 468 * status. 469 * <p> 470 * It will concatenate the results from all the bitarchives in one file, and construct a reply to the originating 471 * requester with all information. 472 * 473 * @param bjs Status of received messages from bitarchives. 474 */ 475 private void doBatchReply(BitarchiveMonitor.BatchJobStatus bjs) { 476 RemoteFile resultsFile = null; 477 try { 478 // Post process the file. 479 File postFile = File.createTempFile("post", "batch", FileUtils.getTempDir()); 480 try { 481 // retrieve the batchjob 482 FileBatchJob bj = batchjobs.remove(bjs.originalRequestID); 483 if (bj == null) { 484 throw new UnknownID("Only knows: " + batchjobs.keySet()); 485 } 486 log.info("Post processing batchjob results for '{}' with id '{}'", bj.getClass().getName(), 487 bjs.originalRequestID); 488 // perform the post process, and handle whether it succeeded. 489 if (bj.postProcess(new FileInputStream(bjs.batchResultFile), new FileOutputStream(postFile))) { 490 log.debug("Post processing finished."); 491 } else { 492 log.debug("No post processing. Using concatenated file."); 493 tryAndDeleteTemporaryFile(postFile); 494 postFile = bjs.batchResultFile; 495 } 496 } catch (Exception e) { 497 log.warn("Exception caught during post processing batchjob. Concatenated file used instead.", e); 498 tryAndDeleteTemporaryFile(postFile); 499 postFile = bjs.batchResultFile; 500 } 501 502 // Get remote file for batch result 503 resultsFile = RemoteFileFactory.getMovefileInstance(postFile); 504 } catch (Exception e) { 505 log.warn("Make remote file from {}", bjs.batchResultFile, e); 506 bjs.appendError("Could not append batch results: " + e); 507 } 508 509 // Make batch reply message 510 BatchReplyMessage brMsg = new BatchReplyMessage(bjs.originalRequestReplyTo, Channels.getTheBamon(), 511 bjs.originalRequestID, bjs.noOfFilesProcessed, bjs.filesFailed, resultsFile); 512 if (bjs.errorMessages != null) { 513 brMsg.setNotOk(bjs.errorMessages); 514 } 515 516 // Send the batch reply message. 517 con.send(brMsg); 518 519 log.info("BatchReplyMessage: '{}' sent from BA monitor to queue: '{}'", brMsg, brMsg.getTo()); 520 } 521 522 /** 523 * Helper method to delete temporary files. Logs at level debug, if it couldn't delete the file. 524 * 525 * @param tmpFile the tmpFile that needs to be deleted. 526 */ 527 private void tryAndDeleteTemporaryFile(File tmpFile) { 528 boolean deleted = tmpFile.delete(); 529 if (!deleted) { 530 log.debug("Failed to delete temporary file '{}'", tmpFile.getAbsolutePath()); 531 } else { 532 log.trace("Deleted temporary file '{}' successfully", tmpFile.getAbsolutePath()); 533 } 534 } 535 536 /** 537 * Uses the batchjobstatus on the message converted batchjob to reply on the original message. 538 * 539 * @param bjs The status of the batchjob. 540 */ 541 private void replyConvertedBatch(BitarchiveMonitor.BatchJobStatus bjs) { 542 // Retrieve the message corresponding to the converted batchjob. 543 NetarkivetMessage msg = batchConversions.remove(bjs.originalRequestID); 544 log.info("replying to converted batchjob message : {}", msg); 545 if (msg instanceof GetAllChecksumsMessage) { 546 replyToGetAllChecksumsMessage(bjs, (GetAllChecksumsMessage) msg); 547 } else if (msg instanceof GetAllFilenamesMessage) { 548 replyToGetAllFilenamesMessage(bjs, (GetAllFilenamesMessage) msg); 549 } else if (msg instanceof GetChecksumMessage) { 550 replyToGetChecksumMessage(bjs, (GetChecksumMessage) msg); 551 } else /* unhandled message type. */{ 552 String errMsg = "The message cannot be handled '" + msg + "'"; 553 log.error(errMsg); 554 msg.setNotOk(errMsg); 555 con.reply(msg); 556 } 557 } 558 559 /** 560 * Method for replying to a GetAllChecksumsMessage. It uses the reply from the batchjob to make a proper reply to 561 * the GetAllChecksumsMessage. 562 * 563 * @param bjs The BatchJobStatus used to reply to the GetAllChecksumsMessage. 564 * @param msg The GetAllChecksumsMessage to reply to. 565 */ 566 private void replyToGetAllChecksumsMessage(BitarchiveMonitor.BatchJobStatus bjs, GetAllChecksumsMessage msg) { 567 try { 568 // Set the resulting file. 569 msg.setFile(bjs.batchResultFile); 570 571 // record any errors. 572 if (bjs.errorMessages != null) { 573 msg.setNotOk(bjs.errorMessages); 574 } 575 } catch (Throwable t) { 576 msg.setNotOk(t); 577 log.warn("An error occurred during the handling of the GetAllChecksumsMessage", t); 578 } finally { 579 // reply 580 log.info("Replying to GetAllChecksumsMessage '{}'", msg); 581 con.reply(msg); 582 } 583 } 584 585 /** 586 * Method for replying to a GetAllFilenamesMessage. It uses the reply from the batchjob to make a proper reply to 587 * the GetAllFilenamesMessage. 588 * 589 * @param bjs The BatchJobStatus used to reply to the GetAllFilenamesMessage. 590 * @param msg The GetAllFilenamesMessage to reply to. 591 */ 592 private void replyToGetAllFilenamesMessage(BitarchiveMonitor.BatchJobStatus bjs, GetAllFilenamesMessage msg) { 593 try { 594 // Set the resulting file. 595 msg.setFile(bjs.batchResultFile); 596 597 // record any errors. 598 if (bjs.errorMessages != null) { 599 msg.setNotOk(bjs.errorMessages); 600 } 601 } catch (Throwable t) { 602 msg.setNotOk(t); 603 log.warn("An error occurred during the handling of the GetAllFilenamesMessage", t); 604 } finally { 605 // reply 606 log.info("Replying to GetAllFilenamesMessage '{}'", msg); 607 con.reply(msg); 608 } 609 } 610 611 /** 612 * Method for replying to a GetChecksumMessage. It uses the reply from the batchjob to make a proper reply to the 613 * GetChecksumMessage. 614 * 615 * @param bjs The BatchJobStatus to be used for the reply. 616 * @param msg The GetChecksumMessage to reply to. 617 */ 618 private void replyToGetChecksumMessage(BitarchiveMonitor.BatchJobStatus bjs, GetChecksumMessage msg) { 619 try { 620 // Fetch the content of the batchresultfile. 621 List<String> output = FileUtils.readListFromFile(bjs.batchResultFile); 622 623 if (output.size() < 1) { 624 String errMsg = "The batchjob did not find the file '" + msg.getArcfileName() + "' within the " 625 + "archive."; 626 log.warn(errMsg); 627 628 throw new IOFailure(errMsg); 629 } 630 if (output.size() > 1) { 631 // Log that duplicates have been found. 632 log.warn("The file '{}' was found {} times in the archive. Using the first found '{}' out of '{}'", 633 msg.getArcfileName(), output.size(), output.get(0), output); 634 635 // check if any different values. 636 String firstVal = output.get(0); 637 for (int i = 1; i < output.size(); i++) { 638 if (!output.get(i).equals(firstVal)) { 639 String errorString = "Replica '" + msg.getReplicaId() + "' has unidentical duplicates: '" 640 + firstVal + "' and '" + output.get(i) + "'."; 641 log.warn(errorString); 642 NotificationsFactory.getInstance().notify(errorString, NotificationType.WARNING); 643 } else { 644 log.debug("Replica '{}' has identical duplicates: '{}'.", msg.getReplicaId(), firstVal); 645 } 646 } 647 } 648 649 // Extract the filename and checksum of the first result. 650 KeyValuePair<String, String> firstResult = ChecksumJob.parseLine(output.get(0)); 651 652 // Check that the filename has the expected value (the name of 653 // the requested file). 654 if (!msg.getArcfileName().equals(firstResult.getKey())) { 655 String errMsg = "The first result found the file '" + firstResult.getKey() 656 + "' but should have found '" + msg.getArcfileName() + "'."; 657 log.error(errMsg); 658 throw new IOFailure(errMsg); 659 } 660 661 // Put the checksum into the reply message, and reply. 662 msg.setChecksum(firstResult.getValue()); 663 664 // cleanup batchjob file 665 FileUtils.remove(bjs.batchResultFile); 666 } catch (Throwable t) { 667 msg.setNotOk(t); 668 log.warn("An error occurred during the handling of the GetChecksumMessage", t); 669 } finally { 670 log.info("Replying GetChecksumMessage: '{}'.", msg.toString()); 671 672 // Reply to the original message (set 'isReply'). 673 msg.setIsReply(); 674 con.reply(msg); 675 } 676 } 677 678 /** 679 * Close down this BitarchiveMonitor. 680 */ 681 public void close() { 682 log.info("BitarchiveMonitorServer closing down."); 683 cleanup(); 684 log.info("BitarchiveMonitorServer closed down"); 685 } 686 687 /** 688 * Closes this BitarchiveMonitorServer cleanly. 689 */ 690 public synchronized void cleanup() { 691 if (instance != null) { 692 con.removeListener(Channels.getTheBamon(), this); 693 batchConversions.clear(); 694 instance = null; 695 if (bamon != null) { 696 bamon.cleanup(); 697 bamon = null; 698 } 699 } 700 } 701 702}