001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.utils.warc; 024 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.util.Iterator; 029import java.util.List; 030 031import org.archive.io.ArchiveRecord; 032import org.archive.io.warc.WARCReader; 033import org.archive.io.warc.WARCReaderFactory; 034import org.archive.io.warc.WARCRecord; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import dk.netarkivet.common.exceptions.ArgumentNotValid; 039import dk.netarkivet.common.exceptions.NetarkivetException; 040import dk.netarkivet.common.utils.batch.FileBatchJob; 041import dk.netarkivet.common.utils.batch.WARCBatchFilter; 042 043/** 044 * Abstract class defining a batch job to run on a set of WARC files. Each implementation is required to define 045 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job run 046 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish(). 047 */ 048@SuppressWarnings({"serial"}) 049public abstract class WARCBatchJob extends FileBatchJob { 050 051 private static final Logger log = LoggerFactory.getLogger(WARCBatchJob.class); 052 053 /** The total number of records processed. */ 054 protected int noOfRecordsProcessed = 0; 055 056 /** 057 * Initialize the job before running. This is called before the processRecord() calls start coming. 058 * 059 * @param os The OutputStream to which output data is written 060 */ 061 public abstract void initialize(OutputStream os); 062 063 /** 064 * Exceptions should be handled with the handleException() method. 065 * 066 * @param os The OutputStream to which output data is written 067 * @param record the object to be processed. 068 */ 069 public abstract void processRecord(WARCRecord record, OutputStream os); 070 071 /** 072 * Finish up the job. This is called after the last processRecord() call. 073 * 074 * @param os The OutputStream to which output data is written 075 */ 076 public abstract void finish(OutputStream os); 077 078 /** 079 * returns a BatchFilter object which restricts the set of warc records in the archive on which this batch-job is 080 * performed. The default value is a neutral filter which allows all records. 081 * 082 * @return A filter telling which records should be given to processRecord(). 083 */ 084 public WARCBatchFilter getFilter() { 085 return WARCBatchFilter.NO_FILTER; 086 } 087 088 /** 089 * Accepts only WARC and WARCGZ files. Runs through all records and calls processRecord() on every record that is 090 * allowed by getFilter(). Does nothing on a non-arc file. 091 * 092 * @param warcFile The WARC or WARCGZ file to be processed. 093 * @param os the OutputStream to which output is to be written 094 * @return true, if file processed successful, otherwise false 095 * @throws ArgumentNotValid if either argument is null 096 */ 097 public final boolean processFile(File warcFile, OutputStream os) throws ArgumentNotValid { 098 ArgumentNotValid.checkNotNull(warcFile, "warcFile"); 099 ArgumentNotValid.checkNotNull(os, "os"); 100 long arcFileIndex = 0; 101 boolean success = true; 102 log.info("Processing WARCfile: {}", warcFile.getName()); 103 104 try { // This outer try-catch block catches all unexpected exceptions 105 // Create an WARCReader and retrieve its Iterator: 106 WARCReader warcReader = null; 107 108 try { 109 warcReader = WARCReaderFactory.get(warcFile); 110 } catch (IOException e) { // Some IOException 111 handleException(e, warcFile, arcFileIndex); 112 113 return false; // Can't process file after exception 114 } 115 116 try { 117 Iterator<? extends ArchiveRecord> it = warcReader.iterator(); 118 /* Process all records from this Iterator: */ 119 log.debug("Starting processing records in WARCfile '{}'.", warcFile.getName()); 120 if (!it.hasNext()) { 121 log.debug("No WARCRecords found in WARCfile '{}'.", warcFile.getName()); 122 } 123 WARCRecord record = null; 124 while (it.hasNext()) { 125 log.trace("At begin of processing-loop"); 126 // Get a record from the file 127 record = (WARCRecord) it.next(); 128 // Process with the job 129 try { 130 if (!getFilter().accept(record)) { 131 continue; 132 } 133 log.debug("Processing WARCRecord #{} in WARCfile '{}'.", noOfRecordsProcessed, 134 warcFile.getName()); 135 processRecord(record, os); 136 ++noOfRecordsProcessed; 137 } catch (NetarkivetException e) { 138 // Our exceptions don't stop us 139 success = false; 140 141 // With our exceptions, we assume that just the 142 // processing of this record got stopped, and we can 143 // easily find the next 144 handleOurException(e, warcFile, arcFileIndex); 145 } catch (Exception e) { 146 success = false; // Strange exceptions do stop us 147 148 handleException(e, warcFile, arcFileIndex); 149 // With strange exceptions, we don't know 150 // if we've skipped records 151 break; 152 } 153 // Close the record 154 try { 155 // TODO maybe this works, maybe not... 156 long arcRecordOffset = record.getHeader().getContentBegin() + record.getHeader().getLength(); 157 record.close(); 158 arcFileIndex = arcRecordOffset; 159 } catch (IOException ioe) { // Couldn't close an WARCRecord 160 success = false; 161 162 handleException(ioe, warcFile, arcFileIndex); 163 // If close fails, we don't know if we've skipped 164 // records 165 break; 166 } 167 log.trace("At end of processing-loop"); 168 } 169 } finally { 170 try { 171 warcReader.close(); 172 } catch (IOException e) { // Some IOException 173 // TODO Discuss whether exceptions on close cause 174 // filesFailed addition 175 handleException(e, warcFile, arcFileIndex); 176 } 177 } 178 } catch (Exception unexpectedException) { 179 handleException(unexpectedException, warcFile, arcFileIndex); 180 return false; 181 } 182 return success; 183 } 184 185 /** 186 * Private method that handles our exception. 187 * 188 * @param e the given exception 189 * @param warcFile The WARC File where the exception occurred. 190 * @param index The offset in the WARC File where the exception occurred. 191 */ 192 private void handleOurException(NetarkivetException e, File warcFile, long index) { 193 handleException(e, warcFile, index); 194 } 195 196 /** 197 * When the org.archive.io.arc classes throw IOExceptions while reading, this is where they go. Subclasses are 198 * welcome to override the default functionality which simply logs and records them in a list. TODO Actually use the 199 * warcfile/index entries in the exception list 200 * 201 * @param e An Exception thrown by the org.archive.io.arc classes. 202 * @param warcfile The arcFile that was processed while the Exception was thrown 203 * @param index The index (in the WARC file) at which the Exception was thrown 204 * @throws ArgumentNotValid if e is null 205 */ 206 public void handleException(Exception e, File warcfile, long index) throws ArgumentNotValid { 207 ArgumentNotValid.checkNotNull(e, "e"); 208 209 log.debug("Caught exception while running batch job on file {}, position {}:\n{}", warcfile, index, 210 e.getMessage(), e); 211 addException(warcfile, index, ExceptionOccurrence.UNKNOWN_OFFSET, e); 212 } 213 214 /** 215 * Returns a representation of the list of Exceptions recorded for this WARC batch job. If called by a subclass, a 216 * method overriding handleException() should always call super.handleException(). 217 * 218 * @return All Exceptions passed to handleException so far. 219 */ 220 public Exception[] getExceptionArray() { 221 List<ExceptionOccurrence> exceptions = getExceptions(); 222 Exception[] exceptionList = new Exception[exceptions.size()]; 223 int i = 0; 224 for (ExceptionOccurrence e : exceptions) { 225 exceptionList[i++] = e.getException(); 226 } 227 return exceptionList; 228 } 229 230 /** 231 * @return the number of records processed. 232 */ 233 public int noOfRecordsProcessed() { 234 return noOfRecordsProcessed; 235 } 236 237}