001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.common.utils.archive;
024
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.util.Iterator;
029
030import org.archive.io.ArchiveReader;
031import org.archive.io.ArchiveReaderFactory;
032import org.archive.io.ArchiveRecord;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import dk.netarkivet.common.exceptions.ArgumentNotValid;
037import dk.netarkivet.common.exceptions.NetarkivetException;
038import dk.netarkivet.common.utils.batch.ArchiveBatchFilter;
039
040/**
041 * Abstract class defining a batch job to run on a set of ARC/WARC files. Each implementation is required to define
042 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job runs
043 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish().
044 */
045@SuppressWarnings({"serial"})
046public abstract class ArchiveBatchJob extends ArchiveBatchJobBase {
047
048    private static final Logger log = LoggerFactory.getLogger(ArchiveBatchJob.class);
049
050    /**
051     * Exceptions should be handled with the handleException() method.
052     *
053     * @param os The OutputStream to which output data is written
054     * @param record the object to be processed.
055     */
056    public abstract void processRecord(ArchiveRecordBase record, OutputStream os);
057
058    /**
059     * Returns an ArchiveBatchFilter object which restricts the set of records in the archive on which this batch-job is
060     * performed. The default value is a neutral filter which allows all records.
061     *
062     * @return A filter telling which records should be given to processRecord().
063     */
064    public ArchiveBatchFilter getFilter() {
065        return ArchiveBatchFilter.NO_FILTER;
066    }
067
068    /**
069     * Accepts only arc(.gz) and warc(.gz) files. Runs through all records and calls processRecord() on every record
070     * that is allowed by getFilter(). Does nothing on a non-(w)arc file.
071     *
072     * @param archiveFile The arc(.gz) or warc(.gz) file to be processed.
073     * @param os the OutputStream to which output is to be written
074     * @return true, if file processed successful, otherwise false
075     * @throws ArgumentNotValid if either argument is null
076     */
077    public final boolean processFile(File archiveFile, OutputStream os) throws ArgumentNotValid {
078        ArgumentNotValid.checkNotNull(archiveFile, "archiveFile");
079        ArgumentNotValid.checkNotNull(os, "os");
080        long arcFileIndex = 0;
081        boolean success = true;
082        log.info("Processing archive file: {}", archiveFile.getName());
083
084        try { // This outer try-catch block catches all unexpected exceptions
085              // Create an ArchiveReader and retrieve its Iterator:
086            ArchiveReader archiveReader = null;
087
088            try {
089                archiveReader = ArchiveReaderFactory.get(archiveFile);
090            } catch (IOException e) { // Some IOException
091                handleException(e, archiveFile, arcFileIndex);
092
093                return false; // Can't process file after exception
094            }
095
096            try {
097                Iterator<? extends ArchiveRecord> it = archiveReader.iterator();
098                /* Process all records from this Iterator: */
099                log.debug("Starting processing records in archive file '{}'.", archiveFile.getName());
100                if (!it.hasNext()) {
101                    log.debug("No records found in archive file '{}'.", archiveFile.getName());
102                }
103                ArchiveRecord archiveRecord = null;
104                ArchiveRecordBase record;
105                while (it.hasNext()) {
106                    log.trace("At begin of processing-loop");
107                    // Get a record from the file
108                    archiveRecord = (ArchiveRecord) it.next();
109                    record = ArchiveRecordBase.wrapArchiveRecord(archiveRecord);
110                    // Process with the job
111                    try {
112                        if (!getFilter().accept(record)) {
113                            continue;
114                        }
115                        log.debug("Processing record #{} in archive file '{}'.", noOfRecordsProcessed,
116                                archiveFile.getName());
117                        processRecord(record, os);
118                        ++noOfRecordsProcessed;
119                    } catch (NetarkivetException e) {
120                        // Our exceptions don't stop us
121                        success = false;
122
123                        // With our exceptions, we assume that just the
124                        // processing of this record got stopped, and we can
125                        // easily find the next
126                        handleOurException(e, archiveFile, arcFileIndex);
127                    } catch (Exception e) {
128                        success = false; // Strange exceptions do stop us
129
130                        handleException(e, archiveFile, arcFileIndex);
131                        // With strange exceptions, we don't know
132                        // if we've skipped records
133                        break;
134                    }
135                    // Close the record
136                    try {
137                        /*
138                         * // FIXME: Don't know how to compute this for warc-files // computation for arc-files: long
139                         * arcRecordOffset = // record.getBodyOffset() + record.getMetaData().getLength(); //
140                         * computation for warc-files (experimental) long arcRecordOffset =
141                         * record.getHeader().getOffset();
142                         */
143                        // TODO maybe this works, maybe not...
144                        long arcRecordOffset = archiveRecord.getHeader().getContentBegin()
145                                + archiveRecord.getHeader().getLength();
146                        archiveRecord.close();
147                        arcFileIndex = arcRecordOffset;
148                    } catch (IOException ioe) { // Couldn't close an WARCRecord
149                        success = false;
150
151                        handleException(ioe, archiveFile, arcFileIndex);
152                        // If close fails, we don't know if we've skipped
153                        // records
154                        break;
155                    }
156                    log.trace("At end of processing-loop");
157                }
158            } finally {
159                try {
160                    archiveReader.close();
161                } catch (IOException e) { // Some IOException
162                    // TODO Discuss whether exceptions on close cause
163                    // filesFailed addition
164                    handleException(e, archiveFile, arcFileIndex);
165                }
166            }
167        } catch (Exception unexpectedException) {
168            handleException(unexpectedException, archiveFile, arcFileIndex);
169            return false;
170        }
171        return success;
172    }
173
174}