001/* 002 * #%L 003 * Netarchivesuite - archive 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.archive.bitarchive; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Observable; 037import java.util.Set; 038import java.util.Timer; 039import java.util.TimerTask; 040 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import dk.netarkivet.archive.ArchiveSettings; 045import dk.netarkivet.common.distribute.ChannelID; 046import dk.netarkivet.common.distribute.RemoteFile; 047import dk.netarkivet.common.exceptions.ArgumentNotValid; 048import dk.netarkivet.common.exceptions.IOFailure; 049import dk.netarkivet.common.utils.CleanupIF; 050import dk.netarkivet.common.utils.ExceptionUtils; 051import dk.netarkivet.common.utils.FileUtils; 052import dk.netarkivet.common.utils.Settings; 053import dk.netarkivet.common.utils.StringUtils; 054import dk.netarkivet.common.utils.batch.FileBatchJob; 055 056/** 057 * Class representing the monitor for bitarchives. The monitor is used for sending out and combining the results of 058 * executing batch jobs. 059 * <p> 060 * Registers outgoing batchjobs to bitarchives, and handles replies from bitarchives, finally notifying observers when 061 * all bitarchives have replied, or when the batch times out, after a time specified in settings. 062 * <p> 063 * We wait for replies from bitarchives that are considered live when the batch begins. A bitarchive is considered live 064 * if we have heard any activity from it within a time specified in settings. 065 */ 066public class BitarchiveMonitor extends Observable implements CleanupIF { 067 068 /** The current instance. */ 069 private static BitarchiveMonitor instance; 070 071 /** The time of the latest sign of life received from each bitarchive. */ 072 private Map<String, Long> bitarchiveSignsOfLife = Collections.synchronizedMap(new HashMap<String, Long>()); 073 074 /** The acceptable delay in milliseconds between signs of life. */ 075 private final long acceptableSignOfLifeDelay; 076 077 /** 078 * Map from the ID of batch jobs sent to bitarchives, to tuple class of status for this batch job. The Map contains 079 * all batch jobs currently running. 080 */ 081 private Map<String, BatchJobStatus> runningBatchJobs = Collections 082 .synchronizedMap(new HashMap<String, BatchJobStatus>()); 083 084 /** Whether the timer will be created as a daemon-thread. */ 085 private static final boolean IS_DAEMON = true; 086 087 /** The timer for keeping track of running batchjobs. */ 088 protected final Timer batchTimer = new Timer(IS_DAEMON); 089 090 /** Logger for this class. */ 091 private static final Logger log = LoggerFactory.getLogger(BitarchiveMonitor.class); 092 093 /** 094 * Initialises the bitarchive monitor. During this, the acceptable delay between signs of life and the timeout 095 * setting for batchjobs are read and logged. 096 */ 097 private BitarchiveMonitor() { 098 acceptableSignOfLifeDelay = Settings.getLong(ArchiveSettings.BITARCHIVE_ACCEPTABLE_HEARTBEAT_DELAY); 099 log.info("Bitarchive liveness times out after {} milliseconds.", acceptableSignOfLifeDelay); 100 } 101 102 /** 103 * Method for retrieving the current instance. If no instance has been instantiated, then a new one will be created. 104 * 105 * @return The current instance of the BitarchiveMonitor. 106 */ 107 public static synchronized BitarchiveMonitor getInstance() { 108 if (instance == null) { 109 instance = new BitarchiveMonitor(); 110 } 111 return instance; 112 } 113 114 /** 115 * Registers a sign of life from a bitarchive. This method logs when new bit archives present themselves. 116 * 117 * @param appID the ID of the bitarchive that generated the life sign 118 */ 119 public void signOfLife(String appID) { 120 ArgumentNotValid.checkNotNullOrEmpty(appID, "String appID"); 121 long now = System.currentTimeMillis(); 122 if ((!bitarchiveSignsOfLife.containsKey(appID))) { 123 log.info("Bitarchive '{}' is now known by the bitarchive monitor", appID); 124 } 125 log.trace("Received sign of life from bitarchive '{}'", appID); 126 bitarchiveSignsOfLife.put(appID, now); 127 } 128 129 /** 130 * Register a new batch sent to the bitarchives. 131 * <p> 132 * This registers a new batchstatus object, with a list of live bitarchives awaiting reply, and a timer task letting 133 * the job time out after the specified time. 134 * 135 * @param requestID The ID of the batch request. 136 * @param requestReplyTo The replyTo channel of the batch request. 137 * @param bitarchiveBatchID The ID of the batch job sent on to the bit archives. 138 * @param timeout Timeout of specific batch job. 139 * @throws ArgumentNotValid If any argument is null, or either string is empty. 140 */ 141 public void registerBatch(String requestID, ChannelID requestReplyTo, String bitarchiveBatchID, long timeout) 142 throws ArgumentNotValid { 143 ArgumentNotValid.checkNotNullOrEmpty(requestID, "String requestID"); 144 ArgumentNotValid.checkNotNull(requestReplyTo, "ChannelID requestReplyTo"); 145 ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID"); 146 BatchJobStatus bjs = new BatchJobStatus(requestID, requestReplyTo, bitarchiveBatchID, 147 getRunningBitarchiveIDs(), timeout); 148 runningBatchJobs.put(bitarchiveBatchID, bjs); 149 log.info("Registered Batch job from {} with timeout {}. Number of outstanding batchjobs are now: {}", 150 requestID, timeout, runningBatchJobs.size()); 151 } 152 153 /** 154 * Generate a set of bitarchiveIDs that are considered live. 155 * 156 * @return Set of IDs of active bitarchives 157 */ 158 private Set<String> getRunningBitarchiveIDs() { 159 Map<String, Long> signsOfLifeCopy; 160 long now; 161 synchronized (bitarchiveSignsOfLife) { 162 now = System.currentTimeMillis(); 163 signsOfLifeCopy = new HashMap<String, Long>(bitarchiveSignsOfLife); 164 } 165 Set<String> runningApps = new HashSet<String>(); 166 for (Map.Entry<String, Long> baID : signsOfLifeCopy.entrySet()) { 167 if (baID.getValue() + acceptableSignOfLifeDelay > now) { 168 runningApps.add(baID.getKey()); 169 } else { 170 log.warn( 171 "Not listening for replies from the bitarchive '{}' which hasn't shown signs of life in {} milliseconds", 172 baID.getKey(), (now - baID.getValue())); 173 // Remove the bitarchive to ensure this warning is not logged 174 // more than once, and a new message is logged when it returns. 175 bitarchiveSignsOfLife.remove(baID.getKey()); 176 } 177 } 178 return runningApps; 179 } 180 181 /** 182 * Handle a reply received from a bitarchive. 183 * <p> 184 * This method registers the information from the bitarchive in the batch status for this job, if any (otherwise 185 * logs and quits). 186 * <p> 187 * If this is the last bitarchive we were missing replies from, notify observers with the batch status for this job. 188 * <p> 189 * TODO why are the 'exceptions' argument not used? 190 * 191 * @param bitarchiveBatchID The ID of the batch job sent on to the bit archives. 192 * @param bitarchiveID The ID of the replying bitarchive. 193 * @param noOfFilesProcessed The number of files the bitarchive has processed. 194 * @param filesFailed A collection of filenames of failed files in that bitarchive. Might be null if no files 195 * failed. 196 * @param remoteFile A remote pointer to a file with results from that bitarchive. Might be null if job was not OK. 197 * @param errMsg An error message, if the job was not successful on the bitarchive, or null for none. 198 * @param exceptions A list of exceptions caught during batch processing. 199 * @throws ArgumentNotValid If either ID is null. 200 */ 201 public void bitarchiveReply(String bitarchiveBatchID, String bitarchiveID, int noOfFilesProcessed, 202 Collection<File> filesFailed, RemoteFile remoteFile, String errMsg, 203 List<FileBatchJob.ExceptionOccurrence> exceptions) throws ArgumentNotValid { 204 ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID"); 205 ArgumentNotValid.checkNotNullOrEmpty(bitarchiveID, "String bitarchiveID"); 206 ArgumentNotValid.checkNotNegative(noOfFilesProcessed, "int noOfFilesProcessed"); 207 208 BatchJobStatus bjs = runningBatchJobs.get(bitarchiveBatchID); 209 if (bjs == null) { 210 // If the batch ID does not correspond to any of the pending batch 211 // jobs, just log and ignore the message. 212 log.debug( 213 "The batch ID '{}' of the received reply from bitarchives does not correspond to any pending batch job. " 214 + "Ignoring and deleting RemoteFile '{}'." + "Only knows batchjob with IDs: {}", 215 bitarchiveBatchID, remoteFile, runningBatchJobs.keySet()); 216 217 if (remoteFile != null) { 218 remoteFile.cleanup(); 219 } 220 } else { 221 bjs.updateWithBitarchiveReply(bitarchiveID, noOfFilesProcessed, filesFailed, remoteFile, errMsg); 222 } 223 } 224 225 /** 226 * Notifies observers that the given batch job has ended. 227 * 228 * @param batchJobStatus The batch job that has ended. 229 */ 230 private void notifyBatchEnded(BatchJobStatus batchJobStatus) { 231 runningBatchJobs.remove(batchJobStatus.bitarchiveBatchID); 232 // Notify observers that this batch is done 233 setChanged(); 234 notifyObservers(batchJobStatus); 235 log.info("Batchjob '{}' finished. The number of outstanding batchjobs are now: {}", 236 batchJobStatus.bitarchiveBatchID, runningBatchJobs.size()); 237 } 238 239 /** 240 * Closes this BitarchiveMonitor cleanly. Currently does nothing. 241 */ 242 public void cleanup() { 243 instance = null; 244 } 245 246 /** 247 * Class handling state and updates in batch job status. 248 * <p> 249 * This class remembers information about the batchjob sent, and information from all bitarchive replies received. 250 * It also contains information about the original requester of the batchjob. 251 */ 252 public final class BatchJobStatus { 253 254 /** The timer task that handles timeout of this batch job. */ 255 private final BatchTimeoutTask batchTimeoutTask; 256 /** The ID of the job sent to the bitarchives. */ 257 private final String bitarchiveBatchID; 258 /** Have we begun replying for this batch job? */ 259 private boolean notifyInitiated; 260 261 /** the ID of the original batch request. */ 262 public final String originalRequestID; 263 264 /** The reply channel for the original request. */ 265 public final ChannelID originalRequestReplyTo; 266 267 /** set containing the bitarchives that were alive when we sent the job, but haven't answered yet. */ 268 public final Set<String> missingRespondents; 269 270 /** The accumulated number of files processed in replies received so far. */ 271 public int noOfFilesProcessed; 272 273 /** The accumulated list of files failed in replies received so far. */ 274 public final Collection<File> filesFailed; 275 276 /** A string with a concatenation of errors. This error message is null, if the job is successful. */ 277 public String errorMessages; 278 279 /** A File with a concatenation of results from replies received so far. */ 280 public final File batchResultFile; 281 282 /** A list of the exceptions that occurred during processing. */ 283 public final List<FileBatchJob.ExceptionOccurrence> exceptions; 284 285 /** The timeout for batch jobs in milliseconds. */ 286 private long batchTimeout; 287 288 /** 289 * Initialise the status on a fresh batch request. Apart from the given values, a file is created to store batch 290 * results in. <b>Sideeffect</b>: BatchTimeout is started here 291 * 292 * @param originalRequestID The ID of the originating request. 293 * @param originalRequestReplyTo The reply channel for the originating request. 294 * @param bitarchiveBatchID The ID of the job sent to bitarchives. 295 * @param missingRespondents List of all live bitarchives, used to know which bitarchives to await reply from. 296 * @param timeout Timeout for Batch job 297 * @throws IOFailure if a file for batch results cannot be made. 298 */ 299 private BatchJobStatus(String originalRequestID, ChannelID originalRequestReplyTo, String bitarchiveBatchID, 300 Set<String> missingRespondents, long timeout) throws IOFailure { 301 this.originalRequestID = originalRequestID; 302 this.originalRequestReplyTo = originalRequestReplyTo; 303 this.bitarchiveBatchID = bitarchiveBatchID; 304 this.missingRespondents = missingRespondents; 305 batchTimeoutTask = new BatchTimeoutTask(bitarchiveBatchID); 306 batchTimeout = timeout; 307 batchTimer.schedule(batchTimeoutTask, batchTimeout); 308 this.noOfFilesProcessed = 0; 309 try { 310 this.batchResultFile = File.createTempFile(bitarchiveBatchID, "batch_aggregation", 311 FileUtils.getTempDir()); 312 } catch (IOException e) { 313 log.warn("Unable to create file for batch output"); 314 throw new IOFailure("Unable to create file for batch output", e); 315 } 316 this.filesFailed = new ArrayList<File>(); 317 // Null indicates no error 318 this.errorMessages = null; 319 this.notifyInitiated = false; 320 321 exceptions = new ArrayList<FileBatchJob.ExceptionOccurrence>(); 322 } 323 324 /** 325 * Appends the given message to the current error message. 326 * 327 * @param errMsg A message describing what went wrong. 328 */ 329 public void appendError(String errMsg) { 330 if (this.errorMessages == null) { 331 this.errorMessages = errMsg; 332 } else { 333 this.errorMessages += "\n" + errMsg; 334 } 335 } 336 337 /** 338 * Updates the status with info from a bitarchive reply. 339 * <p> 340 * This will add the results given to the status, and if this was the last remaining bitarchive, also sends a 341 * notification to all observers of the bitarchive monitor. 342 * 343 * @param bitarchiveID The ID of the bitarchive that has replied 344 * @param numberOfFilesProcessed The number of files processed by that bit archive. 345 * @param failedFiles List of files failed in that bit archive. 346 * @param remoteFile A pointer to a remote file with results from the bitarchive. 347 * @param errMsg An error message with errors from that bit archive. 348 */ 349 private synchronized void updateWithBitarchiveReply(String bitarchiveID, int numberOfFilesProcessed, 350 Collection<File> failedFiles, RemoteFile remoteFile, String errMsg) { 351 if (notifyInitiated) { 352 log.debug("The reply for batch job: '{}' from bitarchive '{}' arrived after we had started replying. " 353 + "Ignoring this reply.", bitarchiveBatchID, bitarchiveID); 354 remoteFile.cleanup(); 355 return; 356 } 357 // found is set to true, if bitarchiveID was among 358 // the missingRespondents, before it was deleted. 359 boolean found = missingRespondents.remove(bitarchiveID); 360 361 // Handle the reply, even though the bitarchive was not known to be 362 // live, but log a warning. 363 if (!found) { 364 log.warn("Received a batch reply for: {} from an unexpected bit archive: '{}'", bitarchiveBatchID, 365 bitarchiveID); 366 } 367 this.noOfFilesProcessed += numberOfFilesProcessed; 368 if (failedFiles != null) { 369 this.filesFailed.addAll(failedFiles); 370 } 371 372 appendRemoteFileToAggregateFile(remoteFile); 373 this.exceptions.addAll(this.exceptions); 374 375 // In case the batch reply contains an error, the final 376 // we append this error. 377 if (errMsg != null) { 378 appendError(errMsg); 379 log.warn("Received batch reply with error: {} at BA monitor from bitarchive {}", errMsg, bitarchiveID); 380 } 381 382 // if all archives have answered then notify observers that we are 383 // done. 384 if (missingRespondents.isEmpty()) { 385 notifyBatchEnded(); 386 } 387 } 388 389 /** 390 * Append a remotefile to the batch result aggregate file. Adds info on errors while concatenating to the batch 391 * status. 392 * 393 * @param rf A remotefile to read from 394 */ 395 private void appendRemoteFileToAggregateFile(RemoteFile rf) { 396 if (rf != null) { 397 OutputStream aggregateStream = null; 398 try { 399 aggregateStream = new FileOutputStream(batchResultFile, true); 400 rf.appendTo(aggregateStream); 401 402 try { 403 rf.cleanup(); 404 } catch (IOFailure e) { 405 log.warn("Could not remove remotefile '{}'", rf, e); 406 // Harmless, though. Continue 407 } 408 } catch (IOFailure e) { 409 String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": " 410 + ExceptionUtils.getStackTrace(e); 411 appendError(errMsg); 412 } catch (IOException e) { 413 String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": " 414 + ExceptionUtils.getStackTrace(e); 415 appendError(errMsg); 416 } finally { 417 if (aggregateStream != null) { 418 try { 419 aggregateStream.close(); 420 } catch (IOException e) { 421 String errMsg = "Exception while aggregating batch output for " + rf.getName() + ": " 422 + ExceptionUtils.getStackTrace(e); 423 appendError(errMsg); 424 } 425 } 426 } 427 } 428 } 429 430 /** 431 * Checks whether this batch job is already being notified about. If not, it notifies observers with this batch 432 * status. 433 */ 434 private synchronized void notifyBatchEnded() { 435 if (!notifyInitiated) { 436 notifyInitiated = true; 437 batchTimeoutTask.cancel(); 438 BitarchiveMonitor.this.notifyBatchEnded(this); 439 } 440 } 441 442 } 443 444 /** 445 * A timertask that makes batch ended notifications happen after a time specified in settings has elapsed, even 446 * though not all replies have been received. 447 */ 448 private class BatchTimeoutTask extends TimerTask { 449 /** The ID of the batch job this object handles timeout for. */ 450 private final String bitarchiveBatchID; 451 452 /** 453 * Initiate a timer task for the given batch job status. 454 * 455 * @param bitarchiveBatchID The ID of the batch job to monitor timeout for. 456 */ 457 public BatchTimeoutTask(String bitarchiveBatchID) { 458 ArgumentNotValid.checkNotNullOrEmpty(bitarchiveBatchID, "String bitarchiveBatchID"); 459 this.bitarchiveBatchID = bitarchiveBatchID; 460 } 461 462 /** 463 * Send a notifications on timeout if a notification is not already initiated. 464 */ 465 public void run() { 466 // synchronize to ensure timeouts and batchreplies do not interfere 467 // with one another 468 BatchJobStatus bjs = runningBatchJobs.get(bitarchiveBatchID); 469 if (bjs != null) { 470 synchronized (bjs) { 471 if (bjs.notifyInitiated) { 472 // timeout occurred, but we are already in the process of 473 // notifying. Just ignore. 474 return; 475 } 476 try { 477 final String errMsg = "A timeout has occurred for batch job: " + bjs.bitarchiveBatchID 478 + ". Missing replies from [" + StringUtils.conjoin(", ", bjs.missingRespondents) + "]"; 479 log.warn(errMsg); 480 bjs.appendError(errMsg); 481 bjs.notifyBatchEnded(); 482 } catch (Throwable t) { 483 log.warn("An error occurred during execution of timeout task.", t); 484 } 485 } 486 } 487 } 488 } 489 490}