001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.common.utils.batch;
025
026import java.io.File;
027import java.io.OutputStream;
028import java.util.Date;
029import java.util.HashSet;
030
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import dk.netarkivet.common.CommonSettings;
035import dk.netarkivet.common.exceptions.ArgumentNotValid;
036import dk.netarkivet.common.exceptions.BatchTermination;
037import dk.netarkivet.common.utils.Settings;
038
039/**
040 * Class for running FileBatchJobs on a set of local files. The constructor takes an array of files to be processed and
041 * the run() method takes a FileBatchJob and applies it to each file in turn.
042 */
043public class BatchLocalFiles {
044
045    /** The class logger. */
046    private static final Logger log = LoggerFactory.getLogger(BatchLocalFiles.class);
047
048    /** The list of files to run batch jobs on. */
049    private File[] files;
050
051    /** The last time logging was performed. Initial 0 to ensure logging the first time. */
052    private long lastLoggingDate = 0;
053    /** The time when the batchjob was started. */
054    private long startTime = 0;
055
056    /**
057     * Given an array of files, constructs a BatchLocalFiles instance to be used in running a batch job over those
058     * files.
059     *
060     * @param incomingFiles The files that should be used processed by the batchjob
061     * @throws ArgumentNotValid if incomingFiles is null or contains a null entry
062     */
063    public BatchLocalFiles(File[] incomingFiles) throws ArgumentNotValid {
064        ArgumentNotValid.checkNotNull(incomingFiles, "incomingFiles");
065        for (int i = 0; i < incomingFiles.length; i++) {
066            ArgumentNotValid.checkNotNull(incomingFiles[i], "Null element at index " + i + " in file list for batch.");
067        }
068        this.files = incomingFiles;
069    }
070
071    /**
072     * Run the given job on the files associated with this object.
073     *
074     * @param job - the job to be executed
075     * @param os - the OutputStream to which output data is written
076     */
077    public void run(FileBatchJob job, OutputStream os) {
078        ArgumentNotValid.checkNotNull(job, "FileBatchJob job");
079        ArgumentNotValid.checkNotNull(os, "OutputStream os");
080        // Initialise the job:
081        job.noOfFilesProcessed = 0;
082        job.filesFailed = new HashSet<File>();
083        try {
084            job.initialize(os);
085            // count the files (used for logging).
086            int fileCount = 0;
087            // the time in milliseconds between the status logging
088            long logInterval = Settings.getLong(CommonSettings.BATCH_LOGGING_INTERVAL);
089            // get the time for starting the batchjob (used for logging).
090            startTime = new Date().getTime();
091            // Process each file:
092            for (File file : files) {
093                fileCount++;
094                if (job.getFilenamePattern().matcher(file.getName()).matches()) {
095                    long currentTime = new Date().getTime();
096                    // perform logging if necessary.
097                    if (lastLoggingDate + logInterval < currentTime) {
098                        log.info(
099                                "The batchjob '{}' has run for {} seconds and has reached file '{}', which is number {} out of {}",
100                                job.getClass(), (currentTime - startTime) / 1000, file.getName(), fileCount,
101                                files.length);
102                        // set that we have just logged.
103                        lastLoggingDate = currentTime;
104                    }
105                    processFile(job, file, os);
106                }
107
108                // check whether the batchjob should stop.
109                if (Thread.currentThread().isInterrupted()) {
110                    // log and throw an error (not exception, they are caught!)
111                    String errMsg = "The batchjob '" + job.toString() + "' has been interrupted and will terminate!";
112                    log.warn(errMsg);
113                    // TODO make new exception to thrown instead.
114                    throw new BatchTermination(errMsg);
115                }
116            }
117        } catch (Exception e) {
118            // TODO Consider adding this initialization exception to the list
119            // of exception accumulated:
120            // job.addInitializeException(outputOffset, e)
121            log.warn("Exception while initializing job {}", job, e);
122
123            // rethrow exception
124            if (e instanceof BatchTermination) {
125                throw (BatchTermination) e;
126            }
127        } finally {
128            // Finally, allow the job to finish: */
129            try {
130                job.finish(os);
131            } catch (Exception e) {
132                // TODO consider adding this finalization exception to the list
133                // of exception accumulated:
134                // job.addFinishException(outputOffset, e)
135                log.warn("Exception while finishing job {}", job, e);
136
137                // rethrow exception
138                if (e instanceof BatchTermination) {
139                    throw (BatchTermination) e;
140                }
141            }
142        }
143    }
144
145    /**
146     * Process a single file.
147     *
148     * @param job The job that does the processing
149     * @param file The file to process
150     * @param os Where to put the output.
151     */
152    private void processFile(FileBatchJob job, final File file, OutputStream os) {
153        log.trace("Started processing of file '{}'.", file.getAbsolutePath());
154        boolean success = false;
155        try {
156            success = job.processFile(file, os);
157        } catch (Exception e) {
158            // TODO consider adding this exception to the list
159            // of exception accumulated:
160            // job.addException(currentFile, currentOffset, outputOffset, e)
161            log.warn("Exception while processing file {} with job {}", file, job, e);
162        }
163        job.noOfFilesProcessed++;
164        if (!success) {
165            job.filesFailed.add(file);
166        }
167    }
168
169}