001/*
002 * #%L
003 * Netarchivesuite - wayback
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.wayback.indexer;
024
025import java.io.File;
026import java.util.Date;
027import java.util.UUID;
028
029import javax.persistence.Entity;
030import javax.persistence.Id;
031
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import dk.netarkivet.common.CommonSettings;
036import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
037import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
038import dk.netarkivet.common.distribute.arcrepository.PreservationArcRepositoryClient;
039import dk.netarkivet.common.exceptions.IllegalState;
040import dk.netarkivet.common.utils.FileUtils;
041import dk.netarkivet.common.utils.Settings;
042import dk.netarkivet.common.utils.arc.ARCUtils;
043import dk.netarkivet.common.utils.batch.FileBatchJob;
044import dk.netarkivet.common.utils.warc.WARCUtils;
045import dk.netarkivet.wayback.WaybackSettings;
046import dk.netarkivet.wayback.batch.DeduplicationCDXExtractionBatchJob;
047import dk.netarkivet.wayback.batch.WaybackCDXExtractionARCBatchJob;
048import dk.netarkivet.wayback.batch.WaybackCDXExtractionWARCBatchJob;
049
050/**
051 * This class represents a file in the arcrepository which may be indexed by the indexer.
052 */
053@Entity
054public class ArchiveFile {
055
056    /** Logger for this class. */
057    private static final Logger log = LoggerFactory.getLogger(ArchiveFile.class);
058
059    /** The name of the file in the arcrepository. */
060    private String filename;
061
062    /** Boolean flag indicating whether the file has been indexed. */
063    private boolean isIndexed;
064
065    /** The name of the unsorted cdx index file created from the archive file. */
066    private String originalIndexFileName;
067
068    /** The number of times an attempt to index this file has failed. */
069    private int indexingFailedAttempts;
070
071    /** The date on which this file was indexed. */
072    private Date indexedDate;
073
074    /**
075     * Constructor, creates a new instance in the unindexed state.
076     */
077    public ArchiveFile() {
078        isIndexed = false;
079        indexedDate = null;
080    }
081
082    /**
083     * Gets originalIndexFileName.
084     *
085     * @return the originalIndexFileName
086     */
087    public String getOriginalIndexFileName() {
088        return originalIndexFileName;
089    }
090
091    /**
092     * Sets originalIndexFileName.
093     *
094     * @param originalIndexFileName The new original index filename
095     */
096    public void setOriginalIndexFileName(String originalIndexFileName) {
097        this.originalIndexFileName = originalIndexFileName;
098    }
099
100    /**
101     * Returns indexedDate.
102     *
103     * @return the date indexed.
104     */
105    public Date getIndexedDate() {
106        return indexedDate;
107    }
108
109    /**
110     * Sets indexedDate.
111     *
112     * @param indexedDate The new indexed date.
113     */
114    public void setIndexedDate(Date indexedDate) {
115        this.indexedDate = indexedDate;
116    }
117
118    /**
119     * The filename is used as a natural key because it is a fundamental property of the arcrepository that filenames
120     * are unique.
121     *
122     * @return the filename.
123     */
124    @Id
125    public String getFilename() {
126        return filename;
127    }
128
129    /**
130     * Sets the filename.
131     *
132     * @param filename The new filename
133     */
134    public void setFilename(String filename) {
135        this.filename = filename;
136    }
137
138    /**
139     * Returns true if the file has been indexed.
140     *
141     * @return whether the file is indexed
142     */
143    public boolean isIndexed() {
144        return isIndexed;
145    }
146
147    /**
148     * Sets whether the file has been indexed.
149     *
150     * @param indexed The new value of the isIndexed variable.
151     */
152    public void setIndexed(boolean indexed) {
153        isIndexed = indexed;
154    }
155
156    /**
157     * Gets the number of failed indexing attempts.
158     *
159     * @return the number of failed attempts
160     */
161    public int getIndexingFailedAttempts() {
162        return indexingFailedAttempts;
163    }
164
165    /**
166     * Sets the number of failed indexing attempts.
167     *
168     * @param indexingFailedAttempts The number of failed indexing attempts
169     */
170    public void setIndexingFailedAttempts(int indexingFailedAttempts) {
171        this.indexingFailedAttempts = indexingFailedAttempts;
172    }
173
174    /**
175     * Run a batch job to index this file, storing the result locally. If this method runs successfully, the isIndexed
176     * flag will be set to true and the originalIndexFileName field will be set to the (arbitrary) name of the file
177     * containing the results. The values are persisted to the datastore.
178     *
179     * @throws IllegalState If the indexing has already been done.
180     */
181    public void index() throws IllegalState {
182        log.info("Indexing {}", this.getFilename());
183        if (isIndexed) {
184            throw new IllegalState("Attempted to index file '" + filename + "' which is already indexed");
185        }
186        // TODO the following if-block could be replaced by some fancier more
187        // general class with methods for associating particular types of
188        // archived files with particular types of batch processor. e.g.
189        // something with a signature like
190        // List<FileBatchJob> getIndexers(ArchiveFile file)
191        // This more-flexible approach
192        // may be of value when we begin to add warc support.
193        FileBatchJob theJob = null;
194        if (filename.matches("(.*)" + Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX))) {
195            theJob = new DeduplicationCDXExtractionBatchJob();
196        } else if (ARCUtils.isARC(filename)) {
197            theJob = new WaybackCDXExtractionARCBatchJob();
198        } else if (WARCUtils.isWarc(filename)) {
199            theJob = new WaybackCDXExtractionWARCBatchJob();
200        } else {
201            log.warn("Skipping indexing of file with filename '{}'", filename);
202            return;
203        }
204        theJob.processOnlyFileNamed(filename);
205        PreservationArcRepositoryClient client = ArcRepositoryClientFactory.getPreservationInstance();
206        String replicaId = Settings.get(WaybackSettings.WAYBACK_REPLICA);
207        log.info("Submitting {} for {} to {}", theJob.getClass().getName(), getFilename(), replicaId.toString());
208        BatchStatus batchStatus = client.batch(theJob, replicaId);
209        log.info("Batch job for {} returned", this.getFilename());
210        // Normally expect exactly one file per job.
211        if (!batchStatus.getFilesFailed().isEmpty() || batchStatus.getNoOfFilesProcessed() == 0
212                || !batchStatus.getExceptions().isEmpty()) {
213            logBatchError(batchStatus);
214        } else {
215            if (batchStatus.getNoOfFilesProcessed() > 1) {
216                log.warn(
217                        "Processed '{}' files for {}.\n This may indicate a doublet in the arcrepository. Proceeding with caution.",
218                        batchStatus.getNoOfFilesProcessed(), this.getFilename());
219            }
220            try {
221                collectResults(batchStatus);
222            } catch (Exception e) {
223                logBatchError(batchStatus);
224                log.error("Failed to retrieve results", e);
225            }
226        }
227    }
228
229    /**
230     * Collects the batch results from the BatchStatus, first to a file in temporary directory, after which they are
231     * renamed to the directory WAYBACK_BATCH_OUTPUTDIR. The status of this object is then updated to reflect that the
232     * object has been indexed.
233     *
234     * @param status the status of a batch job.
235     */
236    private void collectResults(BatchStatus status) {
237        // Use an arbitrary filename for the output
238        String outputFilename = UUID.randomUUID().toString();
239
240        // Read the name of the temporary output directory and create it if
241        // necessary
242        String tempBatchOutputDir = Settings.get(WaybackSettings.WAYBACK_INDEX_TEMPDIR);
243        final File outDir = new File(tempBatchOutputDir);
244        FileUtils.createDir(outDir);
245
246        // Copy the batch output to the temporary directory.
247        File batchOutputFile = new File(outDir, outputFilename);
248        log.info("Collecting index for '{}' to '{}'", this.getFilename(), batchOutputFile.getAbsolutePath());
249        status.copyResults(batchOutputFile);
250        log.info("Finished collecting index for '{}' to '{}'", this.getFilename(), batchOutputFile.getAbsolutePath());
251        // Read the name of the final batch output directory and create it if
252        // necessary
253        String finalBatchOutputDir = Settings.get(WaybackSettings.WAYBACK_BATCH_OUTPUTDIR);
254        final File finalDirectory = new File(finalBatchOutputDir);
255        FileUtils.createDir(finalDirectory);
256
257        // Move the output file from the temporary directory to the final
258        // directory
259        File finalFile = new File(finalDirectory, outputFilename);
260        batchOutputFile.renameTo(finalFile);
261
262        // Update the file status in the object store
263        originalIndexFileName = outputFilename;
264        isIndexed = true;
265        log.info("Indexed '{}' to '{}'", this.filename, finalFile.getAbsolutePath());
266        (new ArchiveFileDAO()).update(this);
267    }
268
269    /**
270     * Logs the error and increments the number of failed attempts for this ArchiveFile.
271     *
272     * @param status the status of the batch job.
273     */
274    private void logBatchError(BatchStatus status) {
275        String message = "Error indexing file '" + getFilename() + "'\n" + "Number of files processed: '"
276                + status.getNoOfFilesProcessed() + "'\n" + "Number of files failed '" + status.getFilesFailed().size()
277                + "'";
278        if (!status.getExceptions().isEmpty()) {
279            message += "\n Exceptions thrown: " + "\n";
280            for (FileBatchJob.ExceptionOccurrence e : status.getExceptions()) {
281                message += e.toString() + "\n";
282            }
283        }
284        log.error(message);
285        indexingFailedAttempts += 1;
286        (new ArchiveFileDAO()).update(this);
287    }
288
289    // Autogenerated code
290    @Override
291    public boolean equals(Object o) {
292        if (this == o) {
293            return true;
294        }
295        if (o == null || getClass() != o.getClass()) {
296            return false;
297        }
298
299        ArchiveFile that = (ArchiveFile) o;
300
301        if (indexingFailedAttempts != that.indexingFailedAttempts) {
302            return false;
303        }
304        if (isIndexed != that.isIndexed) {
305            return false;
306        }
307        if (!filename.equals(that.filename)) {
308            return false;
309        }
310
311        if (indexedDate != null ? !indexedDate.equals(that.indexedDate) : that.indexedDate != null) {
312            return false;
313        }
314        if (originalIndexFileName != null ? !originalIndexFileName.equals(that.originalIndexFileName)
315                : that.originalIndexFileName != null) {
316            return false;
317        }
318
319        return true;
320    }
321
322    // Autogenerated code
323    @Override
324    public int hashCode() {
325        int result = filename.hashCode();
326        result = 31 * result + (isIndexed ? 1 : 0);
327        result = 31 * result + (originalIndexFileName != null ? originalIndexFileName.hashCode() : 0);
328        result = 31 * result + indexingFailedAttempts;
329        result = 31 * result + (indexedDate != null ? indexedDate.hashCode() : 0);
330        return result;
331    }
332
333}