001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.utils.batch; 024 025import java.io.File; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.io.Serializable; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Set; 034import java.util.regex.Pattern; 035 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import dk.netarkivet.common.CommonSettings; 040import dk.netarkivet.common.exceptions.ArgumentNotValid; 041import dk.netarkivet.common.utils.Settings; 042import dk.netarkivet.common.utils.StringUtils; 043 044/** 045 * Interface defining a batch job to run on a set of files. The job is initialized by calling initialize(), executed on 046 * a file by calling processFile() and any cleanup is handled by finish(). 047 */ 048@SuppressWarnings({"serial"}) 049public abstract class FileBatchJob implements Serializable { 050 051 /** The class log. */ 052 private static final Logger log = LoggerFactory.getLogger(FileBatchJob.class); 053 054 /** Regexp that matches everything. */ 055 private static final String EVERYTHING_REGEXP = ".*"; 056 057 /** 058 * Regular expression for the files to process with this job. By default, all files are processed. This pattern must 059 * match the entire filename, but not the path (e.g. .*foo.* for any file with foo in it). 060 */ 061 private Pattern filesToProcess = Pattern.compile(EVERYTHING_REGEXP); 062 063 /** The total number of files processed (including any that generated errors). */ 064 protected int noOfFilesProcessed = 0; 065 066 /** 067 * If positiv it is the timeout of specific Batch Job in miliseconds. If numbers is negative we use standard timeout 068 * from settings. 069 */ 070 protected long batchJobTimeout = -1; 071 072 /** A Set of files which generated errors. */ 073 protected Set<File> filesFailed = new HashSet<File>(); 074 075 /** A list with information about the exceptions thrown during the execution of the batchjob. */ 076 protected List<ExceptionOccurrence> exceptions = new ArrayList<ExceptionOccurrence>(); 077 078 /** 079 * Initialize the job before runnning. This is called before the processFile() calls. If this throws an exception, 080 * processFile() will not be called, but finish() will, 081 * 082 * @param os the OutputStream to which output should be written 083 */ 084 public abstract void initialize(OutputStream os); 085 086 /** 087 * Process one file stored in the bit archive. 088 * 089 * @param file the file to be processed. 090 * @param os the OutputStream to which output should be written 091 * @return true if the file was successfully processed, false otherwise 092 */ 093 public abstract boolean processFile(File file, OutputStream os); 094 095 /** 096 * Finish up the job. This is called after the last process() call. If the initialize() call throws an exception, 097 * this will still be called so that any resources allocated can be cleaned up. Implementations should make sure 098 * that this method can handle a partial initialization 099 * 100 * @param os the OutputStream to which output should be written 101 */ 102 public abstract void finish(OutputStream os); 103 104 /** 105 * Mark the job to process only the specified files. This will override any previous setting of which files to 106 * process. 107 * 108 * @param specifiedFilenames A list of filenamess to process (without paths). If null, all files will be processed. 109 */ 110 public void processOnlyFilesNamed(List<String> specifiedFilenames) { 111 if (specifiedFilenames != null) { 112 List<String> quoted = new ArrayList<String>(); 113 for (String name : specifiedFilenames) { 114 quoted.add(Pattern.quote(name)); 115 } 116 processOnlyFilesMatching(quoted); 117 } else { 118 processOnlyFilesMatching(EVERYTHING_REGEXP); 119 } 120 } 121 122 /** 123 * Helper method for only processing one file. This will override any previous setting of which files to process. 124 * 125 * @param specifiedFilename The name of the single file that should be processed. Should not include any path 126 * information. 127 */ 128 public void processOnlyFileNamed(String specifiedFilename) { 129 ArgumentNotValid.checkNotNullOrEmpty(specifiedFilename, "specificedFilename"); 130 processOnlyFilesMatching(Pattern.quote(specifiedFilename)); 131 } 132 133 /** 134 * Set this job to match only a certain set of patterns. This will override any previous setting of which files to 135 * process. 136 * 137 * @param specifiedPatterns The patterns of file names that this job will operate on. These should not include any 138 * path information, but should match the entire filename (e.g. .*foo.* for any file with foo in the name). 139 */ 140 public void processOnlyFilesMatching(List<String> specifiedPatterns) { 141 ArgumentNotValid.checkNotNull(specifiedPatterns, "specifiedPatterns"); 142 processOnlyFilesMatching("(" + StringUtils.conjoin("|", specifiedPatterns) + ")"); 143 } 144 145 /** 146 * Set this job to match only a certain pattern. This will override any previous setting of which files to process. 147 * 148 * @param specifiedPattern Regular expression of file names that this job will operate on. This should not include 149 * any path information, but should match the entire filename (e.g. .*foo.* for any file with foo in the name). 150 */ 151 public void processOnlyFilesMatching(String specifiedPattern) { 152 ArgumentNotValid.checkNotNullOrEmpty(specifiedPattern, "specificedPattern"); 153 filesToProcess = Pattern.compile(specifiedPattern); 154 } 155 156 /** 157 * Get the pattern for files that should be processed. 158 * 159 * @return A pattern for files to process. 160 */ 161 public Pattern getFilenamePattern() { 162 return filesToProcess; 163 } 164 165 /** 166 * Return the number of files processed in this job. 167 * 168 * @return the number of files processed in this job 169 */ 170 public int getNoOfFilesProcessed() { 171 return noOfFilesProcessed; 172 } 173 174 /** 175 * Return the list of names of files where processing failed. An empty list is returned, if none failed. 176 * 177 * @return the possibly empty list of names of files where processing failed 178 */ 179 public Collection<File> getFilesFailed() { 180 return filesFailed; 181 } 182 183 /** 184 * Get the list of exceptions that have occurred during processing. 185 * 186 * @return List of exceptions together with information on where they happened. 187 */ 188 public List<ExceptionOccurrence> getExceptions() { 189 return exceptions; 190 } 191 192 /** 193 * Processes the concatenated result files. This is intended to be overridden by batchjobs, who they wants a 194 * different post-processing process than concatenation. 195 * 196 * @param input The inputstream to the file containing the concatenated results. 197 * @param output The outputstream where the resulting data should be written. 198 * @return Whether it actually does any post processing. If false is returned then the default concatenated result 199 * file is returned. 200 * @throws ArgumentNotValid If the concatenated file is null. 201 */ 202 public boolean postProcess(InputStream input, OutputStream output) { 203 // Do not post process. Override in inherited classes to post process. 204 return false; 205 } 206 207 /** 208 * Record an exception that occurred during the processFile of this job and that should be returned with the result. 209 * If maxExceptionsReached() returns true, this method silently does nothing. 210 * 211 * @param currentFile The file that is currently being processed. 212 * @param currentOffset The relevant offset into the file when the exception happened (e.g. the start of an ARC 213 * record). 214 * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the 215 * offset could not be found. 216 * @param e The exception thrown. This exception must be serializable. 217 */ 218 protected void addException(File currentFile, long currentOffset, long outputOffset, Exception e) { 219 if (!maxExceptionsReached()) { 220 exceptions.add(new ExceptionOccurrence(currentFile, currentOffset, outputOffset, e)); 221 } else { 222 if (log.isTraceEnabled()) { 223 log.trace("Exception not added, because max exceptions reached. currentFile = {},currentOffset = {}," 224 + "outputOffset = {}, exception: ", currentFile.getAbsolutePath(), currentOffset, outputOffset, 225 e); 226 } 227 } 228 } 229 230 /** 231 * Record an exception that occurred during the initialize() method of this job. 232 * 233 * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the 234 * offset could not be found. 235 * @param e The exception thrown. This exception must be serializable. 236 */ 237 protected void addInitializeException(long outputOffset, Exception e) { 238 if (!maxExceptionsReached()) { 239 exceptions.add(new ExceptionOccurrence(true, outputOffset, e)); 240 } else { 241 log.trace("Exception not added, because max exceptions reached. outputOffset = {}, exception: ", 242 outputOffset, e); 243 } 244 } 245 246 /** 247 * Record an exception that occurred during the finish() method of this job. 248 * 249 * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the 250 * offset could not be found. 251 * @param e The exception thrown. This exception must be serializable. 252 */ 253 protected void addFinishException(long outputOffset, Exception e) { 254 if (!maxExceptionsReached()) { 255 exceptions.add(new ExceptionOccurrence(false, outputOffset, e)); 256 } else { 257 log.trace("Exception not added, because max exceptions reached. outputOffset = {}, exception: ", 258 outputOffset, e); 259 } 260 } 261 262 /** 263 * Getter for batchJobTimeout. If the batchjob has not defined a maximum time (thus set the value to -1) then the 264 * default value from settings are used. 265 * 266 * @return timeout in miliseconds. 267 */ 268 public long getBatchJobTimeout() { 269 if (batchJobTimeout != -1) { 270 return batchJobTimeout; 271 } else { 272 return Long.parseLong(Settings.get(CommonSettings.BATCH_DEFAULT_TIMEOUT)); 273 } 274 } 275 276 /** 277 * Returns true if we have already recorded the maximum number of exceptions. At this point, no more exceptions will 278 * be recorded, and processing should be aborted. 279 * 280 * @return True if the maximum number of exceptions (MAX_EXCEPTIONS) has been recorded already. 281 */ 282 protected boolean maxExceptionsReached() { 283 return exceptions.size() >= ExceptionOccurrence.MAX_EXCEPTIONS; 284 } 285 286 /** 287 * Override predefined timeout period for batchjob. 288 * 289 * @param batchJobTimeout timout period 290 */ 291 public void setBatchJobTimeout(long batchJobTimeout) { 292 this.batchJobTimeout = batchJobTimeout; 293 } 294 295 /** 296 * This class holds the information about exceptions that occurred in a batchjob. 297 */ 298 public static class ExceptionOccurrence implements Serializable { 299 300 /** 301 * The maximum number of exceptions we will accumulate before aborting processing. 302 */ 303 private static final int MAX_EXCEPTIONS = Settings.getInt(CommonSettings.MAX_NUM_BATCH_EXCEPTIONS); 304 305 /** 306 * Marker for the case when we couldn't find an offset for the outputstream. 307 */ 308 public static final long UNKNOWN_OFFSET = -1; 309 310 /** 311 * The name of the file we were processing when the exception occurred, or null. 312 */ 313 private final String fileName; 314 315 /** 316 * The offset in the file we were processing when the exception occurred. 317 */ 318 private final long fileOffset; 319 /** 320 * How much we had written to the output stream when the exception occurred. 321 */ 322 private final long outputOffset; 323 /** The exception that was thrown. */ 324 private final Exception exception; 325 /** True if this exception was thrown during initialize(). */ 326 private final boolean inInitialize; 327 /** True if this exception was thrown during finish(). */ 328 private final boolean inFinish; 329 330 /** 331 * Standard Constructor for ExceptionOccurrence. 332 * 333 * @param file The file that caused the exception. 334 * @param fileOffset The relevant offset into the file when the exception happened (e.g. the start of an ARC 335 * record). 336 * @param outputOffset The offset we were at in the outputstream when the exception happened. 337 * @param exception The exception thrown. This exception must be serializable. 338 * @see FileBatchJob#addException(File, long, long, Exception) for details on the parameters. 339 */ 340 public ExceptionOccurrence(File file, long fileOffset, long outputOffset, Exception exception) { 341 ArgumentNotValid.checkNotNull(file, "File file"); 342 ArgumentNotValid.checkNotNegative(fileOffset, "long fileOffset"); 343 ArgumentNotValid.checkTrue(outputOffset >= 0 || outputOffset == UNKNOWN_OFFSET, 344 "outputOffset must be either non-negative or UNKNOWN_OFFSET"); 345 ArgumentNotValid.checkNotNull(exception, "Exception exception"); 346 this.fileName = file.getName(); 347 this.fileOffset = fileOffset; 348 this.inFinish = false; 349 this.inInitialize = false; 350 this.outputOffset = outputOffset; 351 this.exception = exception; 352 } 353 354 /** 355 * Constructor for ExceptionOccurrence when an exception happened during initialize() or finish(). 356 * 357 * @param inInitialize True if the exception happened in initialize() 358 * @param outputOffset Current offset in the output stream, or UNKNOWN_OFFSET if the offset cannot be found. 359 * @param exception The exception that was thrown. 360 */ 361 public ExceptionOccurrence(boolean inInitialize, long outputOffset, Exception exception) { 362 ArgumentNotValid.checkTrue(outputOffset >= 0 || outputOffset == UNKNOWN_OFFSET, 363 "outputOffset must be either non-negative or UNKNOWN_OFFSET"); 364 ArgumentNotValid.checkNotNull(exception, "Exception exception"); 365 this.fileName = null; 366 this.fileOffset = UNKNOWN_OFFSET; 367 this.inFinish = !inInitialize; 368 this.inInitialize = inInitialize; 369 this.outputOffset = outputOffset; 370 this.exception = exception; 371 } 372 373 /** 374 * Get the name of the file that this exception occurred in. 375 * 376 * @return Name of the file that this exception occurred in, or null if it happened during initialize() or 377 * finish(). 378 */ 379 public String getFileName() { 380 return fileName; 381 } 382 383 /** 384 * Get the offset into the file that this exception occurred at. This location may not be exactly where the 385 * problem that caused the exception occurred, but may be e.g. at the start of a corrupt record. 386 * 387 * @return Offset into the file that this exception occurred at, or UNKNOWN_OFFSET if it happened during 388 * initialize() or finish(). 389 */ 390 public long getFileOffset() { 391 return fileOffset; 392 } 393 394 /** 395 * Offset of the output stream when this exception occurred. 396 * 397 * @return Offset in output stream, or UNKNOWN_OFFSET if the offset could not be determined. 398 */ 399 public long getOutputOffset() { 400 return outputOffset; 401 } 402 403 /** 404 * The exception that was thrown. 405 * 406 * @return An exception. 407 */ 408 public Exception getException() { 409 return exception; 410 } 411 412 /** 413 * Returns true if the exception was thrown during initialize(). In that case, no processing has taken place, 414 * but finish() has been called. 415 * 416 * @return true if the exception was thrown during initialize() 417 */ 418 public boolean isInitializeException() { 419 return inInitialize; 420 } 421 422 /** 423 * Returns true if the exception was thrown during finish(). 424 * 425 * @return true if the exception was thrown during finish(). 426 */ 427 public boolean isFinishException() { 428 return inFinish; 429 } 430 431 /** 432 * @return a Human readable representation of this ExceptionOccurence object. 433 */ 434 public String toString() { 435 return "ExceptionOccurrence: (filename, fileoffset, outputoffset, " + "exception, inInitialize, inFinish)" 436 + " = (" + fileName + ", " + fileOffset + ", " + outputOffset + ", " + exception + ", " 437 + inInitialize + ", " + inFinish + "). "; 438 } 439 440 } 441 442}