001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.warc;
024
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.util.Iterator;
029import java.util.List;
030
031import org.archive.io.ArchiveRecord;
032import org.archive.io.warc.WARCReader;
033import org.archive.io.warc.WARCReaderFactory;
034import org.archive.io.warc.WARCRecord;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import dk.netarkivet.common.exceptions.ArgumentNotValid;
039import dk.netarkivet.common.exceptions.NetarkivetException;
040import dk.netarkivet.common.utils.batch.FileBatchJob;
041import dk.netarkivet.common.utils.batch.WARCBatchFilter;
042
043/**
044 * Abstract class defining a batch job to run on a set of WARC files. Each implementation is required to define
045 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job run
046 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish().
047 */
048@SuppressWarnings({"serial"})
049public abstract class WARCBatchJob extends FileBatchJob {
050
051    private static final Logger log = LoggerFactory.getLogger(WARCBatchJob.class);
052
053    /** The total number of records processed. */
054    protected int noOfRecordsProcessed = 0;
055
056    /**
057     * Initialize the job before running. This is called before the processRecord() calls start coming.
058     *
059     * @param os The OutputStream to which output data is written
060     */
061    public abstract void initialize(OutputStream os);
062
063    /**
064     * Exceptions should be handled with the handleException() method.
065     *
066     * @param os The OutputStream to which output data is written
067     * @param record the object to be processed.
068     */
069    public abstract void processRecord(WARCRecord record, OutputStream os);
070
071    /**
072     * Finish up the job. This is called after the last processRecord() call.
073     *
074     * @param os The OutputStream to which output data is written
075     */
076    public abstract void finish(OutputStream os);
077
078    /**
079     * returns a BatchFilter object which restricts the set of warc records in the archive on which this batch-job is
080     * performed. The default value is a neutral filter which allows all records.
081     *
082     * @return A filter telling which records should be given to processRecord().
083     */
084    public WARCBatchFilter getFilter() {
085        return WARCBatchFilter.NO_FILTER;
086    }
087
088    /**
089     * Accepts only WARC and WARCGZ files. Runs through all records and calls processRecord() on every record that is
090     * allowed by getFilter(). Does nothing on a non-arc file.
091     *
092     * @param warcFile The WARC or WARCGZ file to be processed.
093     * @param os the OutputStream to which output is to be written
094     * @return true, if file processed successful, otherwise false
095     * @throws ArgumentNotValid if either argument is null
096     */
097    public final boolean processFile(File warcFile, OutputStream os) throws ArgumentNotValid {
098        ArgumentNotValid.checkNotNull(warcFile, "warcFile");
099        ArgumentNotValid.checkNotNull(os, "os");
100        long arcFileIndex = 0;
101        boolean success = true;
102        log.info("Processing WARCfile: {}", warcFile.getName());
103
104        try { // This outer try-catch block catches all unexpected exceptions
105              // Create an WARCReader and retrieve its Iterator:
106            WARCReader warcReader = null;
107
108            try {
109                warcReader = WARCReaderFactory.get(warcFile);
110            } catch (IOException e) { // Some IOException
111                handleException(e, warcFile, arcFileIndex);
112
113                return false; // Can't process file after exception
114            }
115
116            try {
117                Iterator<? extends ArchiveRecord> it = warcReader.iterator();
118                /* Process all records from this Iterator: */
119                log.debug("Starting processing records in WARCfile '{}'.", warcFile.getName());
120                if (!it.hasNext()) {
121                    log.debug("No WARCRecords found in WARCfile '{}'.", warcFile.getName());
122                }
123                WARCRecord record = null;
124                while (it.hasNext()) {
125                    log.trace("At begin of processing-loop");
126                    // Get a record from the file
127                    record = (WARCRecord) it.next();
128                    // Process with the job
129                    try {
130                        if (!getFilter().accept(record)) {
131                            continue;
132                        }
133                        log.debug("Processing WARCRecord #{} in WARCfile '{}'.", noOfRecordsProcessed,
134                                warcFile.getName());
135                        processRecord(record, os);
136                        ++noOfRecordsProcessed;
137                    } catch (NetarkivetException e) {
138                        // Our exceptions don't stop us
139                        success = false;
140
141                        // With our exceptions, we assume that just the
142                        // processing of this record got stopped, and we can
143                        // easily find the next
144                        handleOurException(e, warcFile, arcFileIndex);
145                    } catch (Exception e) {
146                        success = false; // Strange exceptions do stop us
147
148                        handleException(e, warcFile, arcFileIndex);
149                        // With strange exceptions, we don't know
150                        // if we've skipped records
151                        break;
152                    }
153                    // Close the record
154                    try {
155                        // TODO maybe this works, maybe not...
156                        long arcRecordOffset = record.getHeader().getContentBegin() + record.getHeader().getLength();
157                        record.close();
158                        arcFileIndex = arcRecordOffset;
159                    } catch (IOException ioe) { // Couldn't close an WARCRecord
160                        success = false;
161
162                        handleException(ioe, warcFile, arcFileIndex);
163                        // If close fails, we don't know if we've skipped
164                        // records
165                        break;
166                    }
167                    log.trace("At end of processing-loop");
168                }
169            } finally {
170                try {
171                    warcReader.close();
172                } catch (IOException e) { // Some IOException
173                    // TODO Discuss whether exceptions on close cause
174                    // filesFailed addition
175                    handleException(e, warcFile, arcFileIndex);
176                }
177            }
178        } catch (Exception unexpectedException) {
179            handleException(unexpectedException, warcFile, arcFileIndex);
180            return false;
181        }
182        return success;
183    }
184
185    /**
186     * Private method that handles our exception.
187     *
188     * @param e the given exception
189     * @param warcFile The WARC File where the exception occurred.
190     * @param index The offset in the WARC File where the exception occurred.
191     */
192    private void handleOurException(NetarkivetException e, File warcFile, long index) {
193        handleException(e, warcFile, index);
194    }
195
196    /**
197     * When the org.archive.io.arc classes throw IOExceptions while reading, this is where they go. Subclasses are
198     * welcome to override the default functionality which simply logs and records them in a list. TODO Actually use the
199     * warcfile/index entries in the exception list
200     *
201     * @param e An Exception thrown by the org.archive.io.arc classes.
202     * @param warcfile The arcFile that was processed while the Exception was thrown
203     * @param index The index (in the WARC file) at which the Exception was thrown
204     * @throws ArgumentNotValid if e is null
205     */
206    public void handleException(Exception e, File warcfile, long index) throws ArgumentNotValid {
207        ArgumentNotValid.checkNotNull(e, "e");
208
209        log.debug("Caught exception while running batch job on file {}, position {}:\n{}", warcfile, index,
210                e.getMessage(), e);
211        addException(warcfile, index, ExceptionOccurrence.UNKNOWN_OFFSET, e);
212    }
213
214    /**
215     * Returns a representation of the list of Exceptions recorded for this WARC batch job. If called by a subclass, a
216     * method overriding handleException() should always call super.handleException().
217     *
218     * @return All Exceptions passed to handleException so far.
219     */
220    public Exception[] getExceptionArray() {
221        List<ExceptionOccurrence> exceptions = getExceptions();
222        Exception[] exceptionList = new Exception[exceptions.size()];
223        int i = 0;
224        for (ExceptionOccurrence e : exceptions) {
225            exceptionList[i++] = e.getException();
226        }
227        return exceptionList;
228    }
229
230    /**
231     * @return the number of records processed.
232     */
233    public int noOfRecordsProcessed() {
234        return noOfRecordsProcessed;
235    }
236
237}