001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.utils.arc; 024 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.util.Iterator; 029import java.util.List; 030 031import org.archive.io.ArchiveRecord; 032import org.archive.io.arc.ARCReader; 033import org.archive.io.arc.ARCReaderFactory; 034import org.archive.io.arc.ARCRecord; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import dk.netarkivet.common.exceptions.ArgumentNotValid; 039import dk.netarkivet.common.exceptions.NetarkivetException; 040import dk.netarkivet.common.utils.batch.ARCBatchFilter; 041import dk.netarkivet.common.utils.batch.FileBatchJob; 042 043/** 044 * Abstract class defining a batch job to run on a set of ARC files. Each implementation is required to define 045 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job run 046 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish(). 047 */ 048@SuppressWarnings({"serial"}) 049public abstract class ARCBatchJob extends FileBatchJob { 050 051 private static final Logger log = LoggerFactory.getLogger(ARCBatchJob.class); 052 053 /** The total number of records processed. */ 054 protected int noOfRecordsProcessed = 0; 055 056 /** 057 * Initialize the job before running. This is called before the processRecord() calls start coming. 058 * 059 * @param os The OutputStream to which output data is written 060 */ 061 @Override 062 public abstract void initialize(OutputStream os); 063 064 /** 065 * Exceptions should be handled with the handleException() method. 066 * 067 * @param os The OutputStream to which output data is written 068 * @param record the object to be processed. 069 */ 070 public abstract void processRecord(ARCRecord record, OutputStream os); 071 072 /** 073 * Finish up the job. This is called after the last processRecord() call. 074 * 075 * @param os The OutputStream to which output data is written 076 */ 077 @Override 078 public abstract void finish(OutputStream os); 079 080 /** 081 * returns a BatchFilter object which restricts the set of arcrecords in the archive on which this batch-job is 082 * performed. The default value is a neutral filter which allows all records. 083 * 084 * @return A filter telling which records should be given to processRecord(). 085 */ 086 public ARCBatchFilter getFilter() { 087 return ARCBatchFilter.NO_FILTER; 088 } 089 090 /** 091 * Accepts only ARC and ARCGZ files. Runs through all records and calls processRecord() on every record that is 092 * allowed by getFilter(). Does nothing on a non-arc file. 093 * 094 * @param arcFile The ARC or ARCGZ file to be processed. 095 * @param os the OutputStream to which output is to be written 096 * @return true, if file processed successful, otherwise false 097 * @throws ArgumentNotValid if either argument is null 098 */ 099 @Override 100 public final boolean processFile(File arcFile, OutputStream os) throws ArgumentNotValid { 101 ArgumentNotValid.checkNotNull(arcFile, "arcFile"); 102 ArgumentNotValid.checkNotNull(os, "os"); 103 long arcFileIndex = 0; 104 boolean success = true; 105 log.info("Processing ARCfile: {}", arcFile.getName()); 106 107 try { // This outer try-catch block catches all unexpected exceptions 108 // Create an ARCReader and retrieve its Iterator: 109 ARCReader arcReader = null; 110 111 try { 112 arcReader = ARCReaderFactory.get(arcFile); 113 } catch (IOException e) { // Some IOException 114 handleException(e, arcFile, arcFileIndex); 115 116 return false; // Can't process file after exception 117 } 118 119 try { 120 Iterator<? extends ArchiveRecord> it = arcReader.iterator(); 121 /* Process all records from this Iterator: */ 122 log.debug("Starting processing records in ARCfile '{}'.", arcFile.getName()); 123 if (!it.hasNext()) { 124 log.debug("No ARCRecords found in ARCfile '{}'.", arcFile.getName()); 125 } 126 ARCRecord record = null; 127 while (it.hasNext()) { 128 log.trace("At begin of processing-loop"); 129 // Get a record from the file 130 record = (ARCRecord) it.next(); 131 // Process with the job 132 try { 133 if (!getFilter().accept(record)) { 134 continue; 135 } 136 log.debug("Processing ARCRecord #{} in ARCfile '{}'.", noOfRecordsProcessed, arcFile.getName()); 137 processRecord(record, os); 138 ++noOfRecordsProcessed; 139 } catch (NetarkivetException e) { 140 // Our exceptions don't stop us 141 success = false; 142 143 // With our exceptions, we assume that just the 144 // processing of this record got stopped, and we can 145 // easily find the next 146 handleOurException(e, arcFile, arcFileIndex); 147 } catch (Exception e) { 148 success = false; // Strange exceptions do stop us 149 150 handleException(e, arcFile, arcFileIndex); 151 // With strange exceptions, we don't know 152 // if we've skipped records 153 break; 154 } 155 // Close the record 156 try { 157 long arcRecordOffset = record.getBodyOffset() + record.getMetaData().getLength(); 158 record.close(); 159 arcFileIndex = arcRecordOffset; 160 } catch (IOException ioe) { // Couldn't close an ARCRecord 161 success = false; 162 163 handleException(ioe, arcFile, arcFileIndex); 164 // If close fails, we don't know if we've skipped 165 // records 166 break; 167 } 168 log.trace("At end of processing-loop"); 169 } 170 } finally { 171 try { 172 arcReader.close(); 173 } catch (IOException e) { // Some IOException 174 // TODO Discuss whether exceptions on close cause 175 // filesFailed addition 176 handleException(e, arcFile, arcFileIndex); 177 } 178 } 179 } catch (Exception unexpectedException) { 180 handleException(unexpectedException, arcFile, arcFileIndex); 181 return false; 182 } 183 return success; 184 } 185 186 /** 187 * Private method that handles our exception. 188 * 189 * @param e the given exception 190 * @param arcFile The ARCFile where the exception occurred. 191 * @param index The offset in the ARCFile where the exception occurred. 192 */ 193 private void handleOurException(NetarkivetException e, File arcFile, long index) { 194 handleException(e, arcFile, index); 195 } 196 197 /** 198 * When the org.archive.io.arc classes throw IOExceptions while reading, this is where they go. Subclasses are 199 * welcome to override the default functionality which simply logs and records them in a list. TODO Actually use the 200 * arcfile/index entries in the exception list 201 * 202 * @param e An Exception thrown by the org.archive.io.arc classes. 203 * @param arcfile The arcFile that was processed while the Exception was thrown 204 * @param index The index (in the ARC file) at which the Exception was thrown 205 * @throws ArgumentNotValid if e is null 206 */ 207 public void handleException(Exception e, File arcfile, long index) throws ArgumentNotValid { 208 ArgumentNotValid.checkNotNull(e, "e"); 209 210 log.debug("Caught exception while running batch job on file {}, position {}:\n{}", arcfile, index, 211 e.getMessage(), e); 212 addException(arcfile, index, ExceptionOccurrence.UNKNOWN_OFFSET, e); 213 } 214 215 /** 216 * Returns a representation of the list of Exceptions recorded for this ARC batch job. If called by a subclass, a 217 * method overriding handleException() should always call super.handleException(). 218 * 219 * @return All Exceptions passed to handleException so far. 220 */ 221 public Exception[] getExceptionArray() { 222 List<ExceptionOccurrence> exceptions = getExceptions(); 223 Exception[] exceptionList = new Exception[exceptions.size()]; 224 int i = 0; 225 for (ExceptionOccurrence e : exceptions) { 226 exceptionList[i++] = e.getException(); 227 } 228 return exceptionList; 229 } 230 231 /** 232 * @return the number of records processed. 233 */ 234 public int noOfRecordsProcessed() { 235 return noOfRecordsProcessed; 236 } 237 238}