001/* 002 * #%L 003 * Netarchivesuite - wayback 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.wayback.indexer; 024 025import java.io.File; 026import java.util.Date; 027import java.util.UUID; 028 029import javax.persistence.Entity; 030import javax.persistence.Id; 031 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import dk.netarkivet.common.CommonSettings; 036import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 037import dk.netarkivet.common.distribute.arcrepository.BatchStatus; 038import dk.netarkivet.common.distribute.arcrepository.PreservationArcRepositoryClient; 039import dk.netarkivet.common.exceptions.IllegalState; 040import dk.netarkivet.common.utils.FileUtils; 041import dk.netarkivet.common.utils.Settings; 042import dk.netarkivet.common.utils.arc.ARCUtils; 043import dk.netarkivet.common.utils.batch.FileBatchJob; 044import dk.netarkivet.common.utils.warc.WARCUtils; 045import dk.netarkivet.wayback.WaybackSettings; 046import dk.netarkivet.wayback.batch.DeduplicationCDXExtractionBatchJob; 047import dk.netarkivet.wayback.batch.WaybackCDXExtractionARCBatchJob; 048import dk.netarkivet.wayback.batch.WaybackCDXExtractionWARCBatchJob; 049 050/** 051 * This class represents a file in the arcrepository which may be indexed by the indexer. 052 */ 053@Entity 054public class ArchiveFile { 055 056 /** Logger for this class. */ 057 private static final Logger log = LoggerFactory.getLogger(ArchiveFile.class); 058 059 /** The name of the file in the arcrepository. */ 060 private String filename; 061 062 /** Boolean flag indicating whether the file has been indexed. */ 063 private boolean isIndexed; 064 065 /** The name of the unsorted cdx index file created from the archive file. */ 066 private String originalIndexFileName; 067 068 /** The number of times an attempt to index this file has failed. */ 069 private int indexingFailedAttempts; 070 071 /** The date on which this file was indexed. */ 072 private Date indexedDate; 073 074 /** 075 * Constructor, creates a new instance in the unindexed state. 076 */ 077 public ArchiveFile() { 078 isIndexed = false; 079 indexedDate = null; 080 } 081 082 /** 083 * Gets originalIndexFileName. 084 * 085 * @return the originalIndexFileName 086 */ 087 public String getOriginalIndexFileName() { 088 return originalIndexFileName; 089 } 090 091 /** 092 * Sets originalIndexFileName. 093 * 094 * @param originalIndexFileName The new original index filename 095 */ 096 public void setOriginalIndexFileName(String originalIndexFileName) { 097 this.originalIndexFileName = originalIndexFileName; 098 } 099 100 /** 101 * Returns indexedDate. 102 * 103 * @return the date indexed. 104 */ 105 public Date getIndexedDate() { 106 return indexedDate; 107 } 108 109 /** 110 * Sets indexedDate. 111 * 112 * @param indexedDate The new indexed date. 113 */ 114 public void setIndexedDate(Date indexedDate) { 115 this.indexedDate = indexedDate; 116 } 117 118 /** 119 * The filename is used as a natural key because it is a fundamental property of the arcrepository that filenames 120 * are unique. 121 * 122 * @return the filename. 123 */ 124 @Id 125 public String getFilename() { 126 return filename; 127 } 128 129 /** 130 * Sets the filename. 131 * 132 * @param filename The new filename 133 */ 134 public void setFilename(String filename) { 135 this.filename = filename; 136 } 137 138 /** 139 * Returns true if the file has been indexed. 140 * 141 * @return whether the file is indexed 142 */ 143 public boolean isIndexed() { 144 return isIndexed; 145 } 146 147 /** 148 * Sets whether the file has been indexed. 149 * 150 * @param indexed The new value of the isIndexed variable. 151 */ 152 public void setIndexed(boolean indexed) { 153 isIndexed = indexed; 154 } 155 156 /** 157 * Gets the number of failed indexing attempts. 158 * 159 * @return the number of failed attempts 160 */ 161 public int getIndexingFailedAttempts() { 162 return indexingFailedAttempts; 163 } 164 165 /** 166 * Sets the number of failed indexing attempts. 167 * 168 * @param indexingFailedAttempts The number of failed indexing attempts 169 */ 170 public void setIndexingFailedAttempts(int indexingFailedAttempts) { 171 this.indexingFailedAttempts = indexingFailedAttempts; 172 } 173 174 /** 175 * Run a batch job to index this file, storing the result locally. If this method runs successfully, the isIndexed 176 * flag will be set to true and the originalIndexFileName field will be set to the (arbitrary) name of the file 177 * containing the results. The values are persisted to the datastore. 178 * 179 * @throws IllegalState If the indexing has already been done. 180 */ 181 public void index() throws IllegalState { 182 log.info("Indexing {}", this.getFilename()); 183 if (isIndexed) { 184 throw new IllegalState("Attempted to index file '" + filename + "' which is already indexed"); 185 } 186 // TODO the following if-block could be replaced by some fancier more 187 // general class with methods for associating particular types of 188 // archived files with particular types of batch processor. e.g. 189 // something with a signature like 190 // List<FileBatchJob> getIndexers(ArchiveFile file) 191 // This more-flexible approach 192 // may be of value when we begin to add warc support. 193 FileBatchJob theJob = null; 194 if (filename.matches("(.*)" + Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX))) { 195 theJob = new DeduplicationCDXExtractionBatchJob(); 196 } else if (ARCUtils.isARC(filename)) { 197 theJob = new WaybackCDXExtractionARCBatchJob(); 198 } else if (WARCUtils.isWarc(filename)) { 199 theJob = new WaybackCDXExtractionWARCBatchJob(); 200 } else { 201 log.warn("Skipping indexing of file with filename '{}'", filename); 202 return; 203 } 204 theJob.processOnlyFileNamed(filename); 205 PreservationArcRepositoryClient client = ArcRepositoryClientFactory.getPreservationInstance(); 206 String replicaId = Settings.get(WaybackSettings.WAYBACK_REPLICA); 207 log.info("Submitting {} for {} to {}", theJob.getClass().getName(), getFilename(), replicaId.toString()); 208 BatchStatus batchStatus = client.batch(theJob, replicaId); 209 log.info("Batch job for {} returned", this.getFilename()); 210 // Normally expect exactly one file per job. 211 if (!batchStatus.getFilesFailed().isEmpty() || batchStatus.getNoOfFilesProcessed() == 0 212 || !batchStatus.getExceptions().isEmpty()) { 213 logBatchError(batchStatus); 214 } else { 215 if (batchStatus.getNoOfFilesProcessed() > 1) { 216 log.warn( 217 "Processed '{}' files for {}.\n This may indicate a doublet in the arcrepository. Proceeding with caution.", 218 batchStatus.getNoOfFilesProcessed(), this.getFilename()); 219 } 220 try { 221 collectResults(batchStatus); 222 } catch (Exception e) { 223 logBatchError(batchStatus); 224 log.error("Failed to retrieve results", e); 225 } 226 } 227 } 228 229 /** 230 * Collects the batch results from the BatchStatus, first to a file in temporary directory, after which they are 231 * renamed to the directory WAYBACK_BATCH_OUTPUTDIR. The status of this object is then updated to reflect that the 232 * object has been indexed. 233 * 234 * @param status the status of a batch job. 235 */ 236 private void collectResults(BatchStatus status) { 237 // Use an arbitrary filename for the output 238 String outputFilename = UUID.randomUUID().toString(); 239 240 // Read the name of the temporary output directory and create it if 241 // necessary 242 String tempBatchOutputDir = Settings.get(WaybackSettings.WAYBACK_INDEX_TEMPDIR); 243 final File outDir = new File(tempBatchOutputDir); 244 FileUtils.createDir(outDir); 245 246 // Copy the batch output to the temporary directory. 247 File batchOutputFile = new File(outDir, outputFilename); 248 log.info("Collecting index for '{}' to '{}'", this.getFilename(), batchOutputFile.getAbsolutePath()); 249 status.copyResults(batchOutputFile); 250 log.info("Finished collecting index for '{}' to '{}'", this.getFilename(), batchOutputFile.getAbsolutePath()); 251 // Read the name of the final batch output directory and create it if 252 // necessary 253 String finalBatchOutputDir = Settings.get(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR); 254 final File finalDirectory = new File(finalBatchOutputDir); 255 FileUtils.createDir(finalDirectory); 256 257 // Move the output file from the temporary directory to the final 258 // directory 259 File finalFile = new File(finalDirectory, outputFilename); 260 batchOutputFile.renameTo(finalFile); 261 262 // Update the file status in the object store 263 originalIndexFileName = outputFilename; 264 isIndexed = true; 265 log.info("Indexed '{}' to '{}'", this.filename, finalFile.getAbsolutePath()); 266 (new ArchiveFileDAO()).update(this); 267 } 268 269 /** 270 * Logs the error and increments the number of failed attempts for this ArchiveFile. 271 * 272 * @param status the status of the batch job. 273 */ 274 private void logBatchError(BatchStatus status) { 275 String message = "Error indexing file '" + getFilename() + "'\n" + "Number of files processed: '" 276 + status.getNoOfFilesProcessed() + "'\n" + "Number of files failed '" + status.getFilesFailed().size() 277 + "'"; 278 if (!status.getExceptions().isEmpty()) { 279 message += "\n Exceptions thrown: " + "\n"; 280 for (FileBatchJob.ExceptionOccurrence e : status.getExceptions()) { 281 message += e.toString() + "\n"; 282 } 283 } 284 log.error(message); 285 indexingFailedAttempts += 1; 286 (new ArchiveFileDAO()).update(this); 287 } 288 289 // Autogenerated code 290 @Override 291 public boolean equals(Object o) { 292 if (this == o) { 293 return true; 294 } 295 if (o == null || getClass() != o.getClass()) { 296 return false; 297 } 298 299 ArchiveFile that = (ArchiveFile) o; 300 301 if (indexingFailedAttempts != that.indexingFailedAttempts) { 302 return false; 303 } 304 if (isIndexed != that.isIndexed) { 305 return false; 306 } 307 if (!filename.equals(that.filename)) { 308 return false; 309 } 310 311 if (indexedDate != null ? !indexedDate.equals(that.indexedDate) : that.indexedDate != null) { 312 return false; 313 } 314 if (originalIndexFileName != null ? !originalIndexFileName.equals(that.originalIndexFileName) 315 : that.originalIndexFileName != null) { 316 return false; 317 } 318 319 return true; 320 } 321 322 // Autogenerated code 323 @Override 324 public int hashCode() { 325 int result = filename.hashCode(); 326 result = 31 * result + (isIndexed ? 1 : 0); 327 result = 31 * result + (originalIndexFileName != null ? originalIndexFileName.hashCode() : 0); 328 result = 31 * result + indexingFailedAttempts; 329 result = 31 * result + (indexedDate != null ? indexedDate.hashCode() : 0); 330 return result; 331 } 332 333}