001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.arc;
024
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.util.Iterator;
029import java.util.List;
030
031import org.archive.io.ArchiveRecord;
032import org.archive.io.arc.ARCReader;
033import org.archive.io.arc.ARCReaderFactory;
034import org.archive.io.arc.ARCRecord;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import dk.netarkivet.common.exceptions.ArgumentNotValid;
039import dk.netarkivet.common.exceptions.NetarkivetException;
040import dk.netarkivet.common.utils.batch.ARCBatchFilter;
041import dk.netarkivet.common.utils.batch.FileBatchJob;
042
043/**
044 * Abstract class defining a batch job to run on a set of ARC files. Each implementation is required to define
045 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job run
046 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish().
047 */
048@SuppressWarnings({"serial"})
049public abstract class ARCBatchJob extends FileBatchJob {
050
051    private static final Logger log = LoggerFactory.getLogger(ARCBatchJob.class);
052
053    /** The total number of records processed. */
054    protected int noOfRecordsProcessed = 0;
055
056    /**
057     * Initialize the job before running. This is called before the processRecord() calls start coming.
058     *
059     * @param os The OutputStream to which output data is written
060     */
061    @Override
062    public abstract void initialize(OutputStream os);
063
064    /**
065     * Exceptions should be handled with the handleException() method.
066     *
067     * @param os The OutputStream to which output data is written
068     * @param record the object to be processed.
069     */
070    public abstract void processRecord(ARCRecord record, OutputStream os);
071
072    /**
073     * Finish up the job. This is called after the last processRecord() call.
074     *
075     * @param os The OutputStream to which output data is written
076     */
077    @Override
078    public abstract void finish(OutputStream os);
079
080    /**
081     * returns a BatchFilter object which restricts the set of arcrecords in the archive on which this batch-job is
082     * performed. The default value is a neutral filter which allows all records.
083     *
084     * @return A filter telling which records should be given to processRecord().
085     */
086    public ARCBatchFilter getFilter() {
087        return ARCBatchFilter.NO_FILTER;
088    }
089
090    /**
091     * Accepts only ARC and ARCGZ files. Runs through all records and calls processRecord() on every record that is
092     * allowed by getFilter(). Does nothing on a non-arc file.
093     *
094     * @param arcFile The ARC or ARCGZ file to be processed.
095     * @param os the OutputStream to which output is to be written
096     * @return true, if file processed successful, otherwise false
097     * @throws ArgumentNotValid if either argument is null
098     */
099    @Override
100    public final boolean processFile(File arcFile, OutputStream os) throws ArgumentNotValid {
101        ArgumentNotValid.checkNotNull(arcFile, "arcFile");
102        ArgumentNotValid.checkNotNull(os, "os");
103        long arcFileIndex = 0;
104        boolean success = true;
105        log.info("Processing ARCfile: {}", arcFile.getName());
106
107        try { // This outer try-catch block catches all unexpected exceptions
108              // Create an ARCReader and retrieve its Iterator:
109            ARCReader arcReader = null;
110
111            try {
112                arcReader = ARCReaderFactory.get(arcFile);
113            } catch (IOException e) { // Some IOException
114                handleException(e, arcFile, arcFileIndex);
115
116                return false; // Can't process file after exception
117            }
118
119            try {
120                Iterator<? extends ArchiveRecord> it = arcReader.iterator();
121                /* Process all records from this Iterator: */
122                log.debug("Starting processing records in ARCfile '{}'.", arcFile.getName());
123                if (!it.hasNext()) {
124                    log.debug("No ARCRecords found in ARCfile '{}'.", arcFile.getName());
125                }
126                ARCRecord record = null;
127                while (it.hasNext()) {
128                    log.trace("At begin of processing-loop");
129                    // Get a record from the file
130                    record = (ARCRecord) it.next();
131                    // Process with the job
132                    try {
133                        if (!getFilter().accept(record)) {
134                            continue;
135                        }
136                        log.debug("Processing ARCRecord #{} in ARCfile '{}'.", noOfRecordsProcessed, arcFile.getName());
137                        processRecord(record, os);
138                        ++noOfRecordsProcessed;
139                    } catch (NetarkivetException e) {
140                        // Our exceptions don't stop us
141                        success = false;
142
143                        // With our exceptions, we assume that just the
144                        // processing of this record got stopped, and we can
145                        // easily find the next
146                        handleOurException(e, arcFile, arcFileIndex);
147                    } catch (Exception e) {
148                        success = false; // Strange exceptions do stop us
149
150                        handleException(e, arcFile, arcFileIndex);
151                        // With strange exceptions, we don't know
152                        // if we've skipped records
153                        break;
154                    }
155                    // Close the record
156                    try {
157                        long arcRecordOffset = record.getBodyOffset() + record.getMetaData().getLength();
158                        record.close();
159                        arcFileIndex = arcRecordOffset;
160                    } catch (IOException ioe) { // Couldn't close an ARCRecord
161                        success = false;
162
163                        handleException(ioe, arcFile, arcFileIndex);
164                        // If close fails, we don't know if we've skipped
165                        // records
166                        break;
167                    }
168                    log.trace("At end of processing-loop");
169                }
170            } finally {
171                try {
172                    arcReader.close();
173                } catch (IOException e) { // Some IOException
174                    // TODO Discuss whether exceptions on close cause
175                    // filesFailed addition
176                    handleException(e, arcFile, arcFileIndex);
177                }
178            }
179        } catch (Exception unexpectedException) {
180            handleException(unexpectedException, arcFile, arcFileIndex);
181            return false;
182        }
183        return success;
184    }
185
186    /**
187     * Private method that handles our exception.
188     *
189     * @param e the given exception
190     * @param arcFile The ARCFile where the exception occurred.
191     * @param index The offset in the ARCFile where the exception occurred.
192     */
193    private void handleOurException(NetarkivetException e, File arcFile, long index) {
194        handleException(e, arcFile, index);
195    }
196
197    /**
198     * When the org.archive.io.arc classes throw IOExceptions while reading, this is where they go. Subclasses are
199     * welcome to override the default functionality which simply logs and records them in a list. TODO Actually use the
200     * arcfile/index entries in the exception list
201     *
202     * @param e An Exception thrown by the org.archive.io.arc classes.
203     * @param arcfile The arcFile that was processed while the Exception was thrown
204     * @param index The index (in the ARC file) at which the Exception was thrown
205     * @throws ArgumentNotValid if e is null
206     */
207    public void handleException(Exception e, File arcfile, long index) throws ArgumentNotValid {
208        ArgumentNotValid.checkNotNull(e, "e");
209
210        log.debug("Caught exception while running batch job on file {}, position {}:\n{}", arcfile, index,
211                e.getMessage(), e);
212        addException(arcfile, index, ExceptionOccurrence.UNKNOWN_OFFSET, e);
213    }
214
215    /**
216     * Returns a representation of the list of Exceptions recorded for this ARC batch job. If called by a subclass, a
217     * method overriding handleException() should always call super.handleException().
218     *
219     * @return All Exceptions passed to handleException so far.
220     */
221    public Exception[] getExceptionArray() {
222        List<ExceptionOccurrence> exceptions = getExceptions();
223        Exception[] exceptionList = new Exception[exceptions.size()];
224        int i = 0;
225        for (ExceptionOccurrence e : exceptions) {
226            exceptionList[i++] = e.getException();
227        }
228        return exceptionList;
229    }
230
231    /**
232     * @return the number of records processed.
233     */
234    public int noOfRecordsProcessed() {
235        return noOfRecordsProcessed;
236    }
237
238}