001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.common.utils.archive; 024 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.util.Iterator; 029 030import org.archive.io.ArchiveReader; 031import org.archive.io.ArchiveReaderFactory; 032import org.archive.io.ArchiveRecord; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import dk.netarkivet.common.exceptions.ArgumentNotValid; 037import dk.netarkivet.common.exceptions.NetarkivetException; 038import dk.netarkivet.common.utils.batch.ArchiveBatchFilter; 039 040/** 041 * Abstract class defining a batch job to run on a set of ARC/WARC files. Each implementation is required to define 042 * initialize() , processRecord() and finish() methods. The bitarchive application then ensures that the batch job runs 043 * initialize(), runs processRecord() on each record in each file in the archive, and then runs finish(). 044 */ 045@SuppressWarnings({"serial"}) 046public abstract class ArchiveBatchJob extends ArchiveBatchJobBase { 047 048 private static final Logger log = LoggerFactory.getLogger(ArchiveBatchJob.class); 049 050 /** 051 * Exceptions should be handled with the handleException() method. 052 * 053 * @param os The OutputStream to which output data is written 054 * @param record the object to be processed. 055 */ 056 public abstract void processRecord(ArchiveRecordBase record, OutputStream os); 057 058 /** 059 * Returns an ArchiveBatchFilter object which restricts the set of records in the archive on which this batch-job is 060 * performed. The default value is a neutral filter which allows all records. 061 * 062 * @return A filter telling which records should be given to processRecord(). 063 */ 064 public ArchiveBatchFilter getFilter() { 065 return ArchiveBatchFilter.NO_FILTER; 066 } 067 068 /** 069 * Accepts only arc(.gz) and warc(.gz) files. Runs through all records and calls processRecord() on every record 070 * that is allowed by getFilter(). Does nothing on a non-(w)arc file. 071 * 072 * @param archiveFile The arc(.gz) or warc(.gz) file to be processed. 073 * @param os the OutputStream to which output is to be written 074 * @return true, if file processed successful, otherwise false 075 * @throws ArgumentNotValid if either argument is null 076 */ 077 public final boolean processFile(File archiveFile, OutputStream os) throws ArgumentNotValid { 078 ArgumentNotValid.checkNotNull(archiveFile, "archiveFile"); 079 ArgumentNotValid.checkNotNull(os, "os"); 080 long arcFileIndex = 0; 081 boolean success = true; 082 log.info("Processing archive file: {}", archiveFile.getName()); 083 084 try { // This outer try-catch block catches all unexpected exceptions 085 // Create an ArchiveReader and retrieve its Iterator: 086 ArchiveReader archiveReader = null; 087 088 try { 089 archiveReader = ArchiveReaderFactory.get(archiveFile); 090 } catch (IOException e) { // Some IOException 091 handleException(e, archiveFile, arcFileIndex); 092 093 return false; // Can't process file after exception 094 } 095 096 try { 097 Iterator<? extends ArchiveRecord> it = archiveReader.iterator(); 098 /* Process all records from this Iterator: */ 099 log.debug("Starting processing records in archive file '{}'.", archiveFile.getName()); 100 if (!it.hasNext()) { 101 log.debug("No records found in archive file '{}'.", archiveFile.getName()); 102 } 103 ArchiveRecord archiveRecord = null; 104 ArchiveRecordBase record; 105 while (it.hasNext()) { 106 log.trace("At begin of processing-loop"); 107 // Get a record from the file 108 archiveRecord = (ArchiveRecord) it.next(); 109 record = ArchiveRecordBase.wrapArchiveRecord(archiveRecord); 110 // Process with the job 111 try { 112 if (!getFilter().accept(record)) { 113 continue; 114 } 115 log.debug("Processing record #{} in archive file '{}'.", noOfRecordsProcessed, 116 archiveFile.getName()); 117 processRecord(record, os); 118 ++noOfRecordsProcessed; 119 } catch (NetarkivetException e) { 120 // Our exceptions don't stop us 121 success = false; 122 123 // With our exceptions, we assume that just the 124 // processing of this record got stopped, and we can 125 // easily find the next 126 handleOurException(e, archiveFile, arcFileIndex); 127 } catch (Exception e) { 128 success = false; // Strange exceptions do stop us 129 130 handleException(e, archiveFile, arcFileIndex); 131 // With strange exceptions, we don't know 132 // if we've skipped records 133 break; 134 } 135 // Close the record 136 try { 137 /* 138 * // FIXME: Don't know how to compute this for warc-files // computation for arc-files: long 139 * arcRecordOffset = // record.getBodyOffset() + record.getMetaData().getLength(); // 140 * computation for warc-files (experimental) long arcRecordOffset = 141 * record.getHeader().getOffset(); 142 */ 143 // TODO maybe this works, maybe not... 144 long arcRecordOffset = archiveRecord.getHeader().getContentBegin() 145 + archiveRecord.getHeader().getLength(); 146 archiveRecord.close(); 147 arcFileIndex = arcRecordOffset; 148 } catch (IOException ioe) { // Couldn't close an WARCRecord 149 success = false; 150 151 handleException(ioe, archiveFile, arcFileIndex); 152 // If close fails, we don't know if we've skipped 153 // records 154 break; 155 } 156 log.trace("At end of processing-loop"); 157 } 158 } finally { 159 try { 160 archiveReader.close(); 161 } catch (IOException e) { // Some IOException 162 // TODO Discuss whether exceptions on close cause 163 // filesFailed addition 164 handleException(e, archiveFile, arcFileIndex); 165 } 166 } 167 } catch (Exception unexpectedException) { 168 handleException(unexpectedException, archiveFile, arcFileIndex); 169 return false; 170 } 171 return success; 172 } 173 174}