001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.batch;
024
025import java.io.File;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Set;
034import java.util.regex.Pattern;
035
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import dk.netarkivet.common.CommonSettings;
040import dk.netarkivet.common.exceptions.ArgumentNotValid;
041import dk.netarkivet.common.utils.Settings;
042import dk.netarkivet.common.utils.StringUtils;
043
044/**
045 * Interface defining a batch job to run on a set of files. The job is initialized by calling initialize(), executed on
046 * a file by calling processFile() and any cleanup is handled by finish().
047 */
048@SuppressWarnings({"serial"})
049public abstract class FileBatchJob implements Serializable {
050
051    /** The class log. */
052    private static final Logger log = LoggerFactory.getLogger(FileBatchJob.class);
053
054    /** Regexp that matches everything. */
055    private static final String EVERYTHING_REGEXP = ".*";
056
057    /**
058     * Regular expression for the files to process with this job. By default, all files are processed. This pattern must
059     * match the entire filename, but not the path (e.g. .*foo.* for any file with foo in it).
060     */
061    private Pattern filesToProcess = Pattern.compile(EVERYTHING_REGEXP);
062
063    /** The total number of files processed (including any that generated errors). */
064    protected int noOfFilesProcessed = 0;
065
066    /**
067     * If positiv it is the timeout of specific Batch Job in miliseconds. If numbers is negative we use standard timeout
068     * from settings.
069     */
070    protected long batchJobTimeout = -1;
071
072    /** A Set of files which generated errors. */
073    protected Set<File> filesFailed = new HashSet<File>();
074
075    /** A list with information about the exceptions thrown during the execution of the batchjob. */
076    protected List<ExceptionOccurrence> exceptions = new ArrayList<ExceptionOccurrence>();
077
078    /**
079     * Initialize the job before runnning. This is called before the processFile() calls. If this throws an exception,
080     * processFile() will not be called, but finish() will,
081     *
082     * @param os the OutputStream to which output should be written
083     */
084    public abstract void initialize(OutputStream os);
085
086    /**
087     * Process one file stored in the bit archive.
088     *
089     * @param file the file to be processed.
090     * @param os the OutputStream to which output should be written
091     * @return true if the file was successfully processed, false otherwise
092     */
093    public abstract boolean processFile(File file, OutputStream os);
094
095    /**
096     * Finish up the job. This is called after the last process() call. If the initialize() call throws an exception,
097     * this will still be called so that any resources allocated can be cleaned up. Implementations should make sure
098     * that this method can handle a partial initialization
099     *
100     * @param os the OutputStream to which output should be written
101     */
102    public abstract void finish(OutputStream os);
103
104    /**
105     * Mark the job to process only the specified files. This will override any previous setting of which files to
106     * process.
107     *
108     * @param specifiedFilenames A list of filenamess to process (without paths). If null, all files will be processed.
109     */
110    public void processOnlyFilesNamed(List<String> specifiedFilenames) {
111        if (specifiedFilenames != null) {
112            List<String> quoted = new ArrayList<String>();
113            for (String name : specifiedFilenames) {
114                quoted.add(Pattern.quote(name));
115            }
116            processOnlyFilesMatching(quoted);
117        } else {
118            processOnlyFilesMatching(EVERYTHING_REGEXP);
119        }
120    }
121
122    /**
123     * Helper method for only processing one file. This will override any previous setting of which files to process.
124     *
125     * @param specifiedFilename The name of the single file that should be processed. Should not include any path
126     * information.
127     */
128    public void processOnlyFileNamed(String specifiedFilename) {
129        ArgumentNotValid.checkNotNullOrEmpty(specifiedFilename, "specificedFilename");
130        processOnlyFilesMatching(Pattern.quote(specifiedFilename));
131    }
132
133    /**
134     * Set this job to match only a certain set of patterns. This will override any previous setting of which files to
135     * process.
136     *
137     * @param specifiedPatterns The patterns of file names that this job will operate on. These should not include any
138     * path information, but should match the entire filename (e.g. .*foo.* for any file with foo in the name).
139     */
140    public void processOnlyFilesMatching(List<String> specifiedPatterns) {
141        ArgumentNotValid.checkNotNull(specifiedPatterns, "specifiedPatterns");
142        processOnlyFilesMatching("(" + StringUtils.conjoin("|", specifiedPatterns) + ")");
143    }
144
145    /**
146     * Set this job to match only a certain pattern. This will override any previous setting of which files to process.
147     *
148     * @param specifiedPattern Regular expression of file names that this job will operate on. This should not include
149     * any path information, but should match the entire filename (e.g. .*foo.* for any file with foo in the name).
150     */
151    public void processOnlyFilesMatching(String specifiedPattern) {
152        ArgumentNotValid.checkNotNullOrEmpty(specifiedPattern, "specificedPattern");
153        filesToProcess = Pattern.compile(specifiedPattern);
154    }
155
156    /**
157     * Get the pattern for files that should be processed.
158     *
159     * @return A pattern for files to process.
160     */
161    public Pattern getFilenamePattern() {
162        return filesToProcess;
163    }
164
165    /**
166     * Return the number of files processed in this job.
167     *
168     * @return the number of files processed in this job
169     */
170    public int getNoOfFilesProcessed() {
171        return noOfFilesProcessed;
172    }
173
174    /**
175     * Return the list of names of files where processing failed. An empty list is returned, if none failed.
176     *
177     * @return the possibly empty list of names of files where processing failed
178     */
179    public Collection<File> getFilesFailed() {
180        return filesFailed;
181    }
182
183    /**
184     * Get the list of exceptions that have occurred during processing.
185     *
186     * @return List of exceptions together with information on where they happened.
187     */
188    public List<ExceptionOccurrence> getExceptions() {
189        return exceptions;
190    }
191
192    /**
193     * Processes the concatenated result files. This is intended to be overridden by batchjobs, who they wants a
194     * different post-processing process than concatenation.
195     *
196     * @param input The inputstream to the file containing the concatenated results.
197     * @param output The outputstream where the resulting data should be written.
198     * @return Whether it actually does any post processing. If false is returned then the default concatenated result
199     * file is returned.
200     * @throws ArgumentNotValid If the concatenated file is null.
201     */
202    public boolean postProcess(InputStream input, OutputStream output) {
203        // Do not post process. Override in inherited classes to post process.
204        return false;
205    }
206
207    /**
208     * Record an exception that occurred during the processFile of this job and that should be returned with the result.
209     * If maxExceptionsReached() returns true, this method silently does nothing.
210     *
211     * @param currentFile The file that is currently being processed.
212     * @param currentOffset The relevant offset into the file when the exception happened (e.g. the start of an ARC
213     * record).
214     * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the
215     * offset could not be found.
216     * @param e The exception thrown. This exception must be serializable.
217     */
218    protected void addException(File currentFile, long currentOffset, long outputOffset, Exception e) {
219        if (!maxExceptionsReached()) {
220            exceptions.add(new ExceptionOccurrence(currentFile, currentOffset, outputOffset, e));
221        } else {
222            if (log.isTraceEnabled()) {
223                log.trace("Exception not added, because max exceptions reached. currentFile = {},currentOffset = {},"
224                        + "outputOffset = {}, exception: ", currentFile.getAbsolutePath(), currentOffset, outputOffset,
225                        e);
226            }
227        }
228    }
229
230    /**
231     * Record an exception that occurred during the initialize() method of this job.
232     *
233     * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the
234     * offset could not be found.
235     * @param e The exception thrown. This exception must be serializable.
236     */
237    protected void addInitializeException(long outputOffset, Exception e) {
238        if (!maxExceptionsReached()) {
239            exceptions.add(new ExceptionOccurrence(true, outputOffset, e));
240        } else {
241            log.trace("Exception not added, because max exceptions reached. outputOffset = {}, exception: ",
242                    outputOffset, e);
243        }
244    }
245
246    /**
247     * Record an exception that occurred during the finish() method of this job.
248     *
249     * @param outputOffset The offset we were at in the outputstream when the exception happened. If UNKNOWN_OFFSET, the
250     * offset could not be found.
251     * @param e The exception thrown. This exception must be serializable.
252     */
253    protected void addFinishException(long outputOffset, Exception e) {
254        if (!maxExceptionsReached()) {
255            exceptions.add(new ExceptionOccurrence(false, outputOffset, e));
256        } else {
257            log.trace("Exception not added, because max exceptions reached. outputOffset = {}, exception: ",
258                    outputOffset, e);
259        }
260    }
261
262    /**
263     * Getter for batchJobTimeout. If the batchjob has not defined a maximum time (thus set the value to -1) then the
264     * default value from settings are used.
265     *
266     * @return timeout in miliseconds.
267     */
268    public long getBatchJobTimeout() {
269        if (batchJobTimeout != -1) {
270            return batchJobTimeout;
271        } else {
272            return Long.parseLong(Settings.get(CommonSettings.BATCH_DEFAULT_TIMEOUT));
273        }
274    }
275
276    /**
277     * Returns true if we have already recorded the maximum number of exceptions. At this point, no more exceptions will
278     * be recorded, and processing should be aborted.
279     *
280     * @return True if the maximum number of exceptions (MAX_EXCEPTIONS) has been recorded already.
281     */
282    protected boolean maxExceptionsReached() {
283        return exceptions.size() >= ExceptionOccurrence.MAX_EXCEPTIONS;
284    }
285
286    /**
287     * Override predefined timeout period for batchjob.
288     *
289     * @param batchJobTimeout timout period
290     */
291    public void setBatchJobTimeout(long batchJobTimeout) {
292        this.batchJobTimeout = batchJobTimeout;
293    }
294
295    /**
296     * This class holds the information about exceptions that occurred in a batchjob.
297     */
298    public static class ExceptionOccurrence implements Serializable {
299
300        /**
301         * The maximum number of exceptions we will accumulate before aborting processing.
302         */
303        private static final int MAX_EXCEPTIONS = Settings.getInt(CommonSettings.MAX_NUM_BATCH_EXCEPTIONS);
304
305        /**
306         * Marker for the case when we couldn't find an offset for the outputstream.
307         */
308        public static final long UNKNOWN_OFFSET = -1;
309
310        /**
311         * The name of the file we were processing when the exception occurred, or null.
312         */
313        private final String fileName;
314
315        /**
316         * The offset in the file we were processing when the exception occurred.
317         */
318        private final long fileOffset;
319        /**
320         * How much we had written to the output stream when the exception occurred.
321         */
322        private final long outputOffset;
323        /** The exception that was thrown. */
324        private final Exception exception;
325        /** True if this exception was thrown during initialize(). */
326        private final boolean inInitialize;
327        /** True if this exception was thrown during finish(). */
328        private final boolean inFinish;
329
330        /**
331         * Standard Constructor for ExceptionOccurrence.
332         *
333         * @param file The file that caused the exception.
334         * @param fileOffset The relevant offset into the file when the exception happened (e.g. the start of an ARC
335         * record).
336         * @param outputOffset The offset we were at in the outputstream when the exception happened.
337         * @param exception The exception thrown. This exception must be serializable.
338         * @see FileBatchJob#addException(File, long, long, Exception) for details on the parameters.
339         */
340        public ExceptionOccurrence(File file, long fileOffset, long outputOffset, Exception exception) {
341            ArgumentNotValid.checkNotNull(file, "File file");
342            ArgumentNotValid.checkNotNegative(fileOffset, "long fileOffset");
343            ArgumentNotValid.checkTrue(outputOffset >= 0 || outputOffset == UNKNOWN_OFFSET,
344                    "outputOffset must be either non-negative or UNKNOWN_OFFSET");
345            ArgumentNotValid.checkNotNull(exception, "Exception exception");
346            this.fileName = file.getName();
347            this.fileOffset = fileOffset;
348            this.inFinish = false;
349            this.inInitialize = false;
350            this.outputOffset = outputOffset;
351            this.exception = exception;
352        }
353
354        /**
355         * Constructor for ExceptionOccurrence when an exception happened during initialize() or finish().
356         *
357         * @param inInitialize True if the exception happened in initialize()
358         * @param outputOffset Current offset in the output stream, or UNKNOWN_OFFSET if the offset cannot be found.
359         * @param exception The exception that was thrown.
360         */
361        public ExceptionOccurrence(boolean inInitialize, long outputOffset, Exception exception) {
362            ArgumentNotValid.checkTrue(outputOffset >= 0 || outputOffset == UNKNOWN_OFFSET,
363                    "outputOffset must be either non-negative or UNKNOWN_OFFSET");
364            ArgumentNotValid.checkNotNull(exception, "Exception exception");
365            this.fileName = null;
366            this.fileOffset = UNKNOWN_OFFSET;
367            this.inFinish = !inInitialize;
368            this.inInitialize = inInitialize;
369            this.outputOffset = outputOffset;
370            this.exception = exception;
371        }
372
373        /**
374         * Get the name of the file that this exception occurred in.
375         *
376         * @return Name of the file that this exception occurred in, or null if it happened during initialize() or
377         * finish().
378         */
379        public String getFileName() {
380            return fileName;
381        }
382
383        /**
384         * Get the offset into the file that this exception occurred at. This location may not be exactly where the
385         * problem that caused the exception occurred, but may be e.g. at the start of a corrupt record.
386         *
387         * @return Offset into the file that this exception occurred at, or UNKNOWN_OFFSET if it happened during
388         * initialize() or finish().
389         */
390        public long getFileOffset() {
391            return fileOffset;
392        }
393
394        /**
395         * Offset of the output stream when this exception occurred.
396         *
397         * @return Offset in output stream, or UNKNOWN_OFFSET if the offset could not be determined.
398         */
399        public long getOutputOffset() {
400            return outputOffset;
401        }
402
403        /**
404         * The exception that was thrown.
405         *
406         * @return An exception.
407         */
408        public Exception getException() {
409            return exception;
410        }
411
412        /**
413         * Returns true if the exception was thrown during initialize(). In that case, no processing has taken place,
414         * but finish() has been called.
415         *
416         * @return true if the exception was thrown during initialize()
417         */
418        public boolean isInitializeException() {
419            return inInitialize;
420        }
421
422        /**
423         * Returns true if the exception was thrown during finish().
424         *
425         * @return true if the exception was thrown during finish().
426         */
427        public boolean isFinishException() {
428            return inFinish;
429        }
430
431        /**
432         * @return a Human readable representation of this ExceptionOccurence object.
433         */
434        public String toString() {
435            return "ExceptionOccurrence: (filename, fileoffset, outputoffset, " + "exception, inInitialize, inFinish)"
436                    + " = (" + fileName + ", " + fileOffset + ", " + outputOffset + ", " + exception + ", "
437                    + inInitialize + ", " + inFinish + "). ";
438        }
439
440    }
441
442}