001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.harvester.harvesting;
025
026import java.io.BufferedReader;
027import java.io.File;
028import java.io.FileReader;
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Set;
034
035import org.apache.commons.io.IOUtils;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import dk.netarkivet.common.CommonSettings;
040import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
041import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
042import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord;
043import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient;
044import dk.netarkivet.common.distribute.indexserver.Index;
045import dk.netarkivet.common.distribute.indexserver.IndexClientFactory;
046import dk.netarkivet.common.distribute.indexserver.JobIndexCache;
047import dk.netarkivet.common.exceptions.ArgumentNotValid;
048import dk.netarkivet.common.exceptions.IOFailure;
049import dk.netarkivet.common.utils.FileUtils;
050import dk.netarkivet.common.utils.NotificationType;
051import dk.netarkivet.common.utils.NotificationsFactory;
052import dk.netarkivet.common.utils.Settings;
053import dk.netarkivet.common.utils.StringUtils;
054import dk.netarkivet.common.utils.batch.FileBatchJob;
055import dk.netarkivet.common.utils.cdx.ArchiveExtractCDXJob;
056import dk.netarkivet.common.utils.cdx.CDXRecord;
057import dk.netarkivet.harvester.HarvesterSettings;
058import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
059import dk.netarkivet.harvester.datamodel.HeritrixTemplate;
060import dk.netarkivet.harvester.datamodel.Job;
061import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
062import dk.netarkivet.harvester.harvesting.metadata.MetadataFile;
063import dk.netarkivet.harvester.harvesting.report.DomainStatsReport;
064import dk.netarkivet.harvester.harvesting.report.HarvestReport;
065import dk.netarkivet.harvester.harvesting.report.HarvestReportFactory;
066import dk.netarkivet.harvester.harvesting.report.HarvestReportGenerator;
067
068/**
069 * This class handles all the things in a single harvest that are not related directly related either to launching
070 * Heritrix or to handling JMS messages.
071 */
072public class HarvestController {
073
074    /** The instance logger. */
075    private static final Logger log = LoggerFactory.getLogger(HarvestController.class);
076
077    /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */
078    private static HarvestController instance;
079
080    /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */
081    private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5;
082
083    /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */
084    private HarvesterArcRepositoryClient arcRepController;
085
086    /**
087     * Private constructor controlled by getInstance().
088     */
089    private HarvestController() {
090        arcRepController = ArcRepositoryClientFactory.getHarvesterInstance();
091    }
092
093    /**
094     * Get the instance of the singleton HarvestController.
095     *
096     * @return The singleton instance.
097     */
098    public static synchronized HarvestController getInstance() {
099        if (instance == null) {
100            instance = new HarvestController();
101        }
102        return instance;
103    }
104
105    /**
106     * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be
107     * used after this method has been called. After this has been called, new calls to getInstance will return a new
108     * instance.
109     */
110    public void cleanup() {
111        if (arcRepController != null) {
112            arcRepController.close();
113        }
114        resetInstance();
115    }
116
117    /**
118     * Reset the singleton instance.
119     */
120    private static void resetInstance() {
121        instance = null;
122    }
123
124    /**
125     * Writes the files involved with a harvests. Creates the Heritrix arcs directory to ensure that this directory
126     * exists in advance.
127     *
128     * @param crawldir The directory that the crawl should take place in.
129     * @param job The Job object containing various harvest setup data.
130     * @param hdi The object encapsulating documentary information about the harvest.
131     * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use.
132     * @return An object encapsulating where these files have been written.
133     */
134    public HeritrixFiles writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi,
135            List<MetadataEntry> metadataEntries) {
136        // FIXME this hardwires the HeritrixFiles to H1
137        final HeritrixFiles files = HeritrixFiles.getH1HeritrixFilesWithDefaultJmxFiles(crawldir, job);
138
139        // If this job is a job that tries to continue a previous job
140        // using the Heritrix recover.gz log, and this feature is enabled,
141        // then try to fetch the recover.log from the metadata-arc-file.
142        if (job.getContinuationOf() != null && Settings.getBoolean(HarvesterSettings.RECOVERlOG_CONTINUATION_ENABLED)) {
143            tryToRetrieveRecoverLog(job, files);
144        }
145
146        // Create harvestInfo file in crawldir
147        // & create preharvest-metadata-1.arc
148        log.debug("Writing persistent job data for job {}", job.getJobID());
149        // Check that harvestInfo does not yet exist
150
151        // Write job data to persistent storage (harvestinfo file)
152        new PersistentJobData(files.getCrawlDir()).write(job, hdi);
153        // Create jobId-preharvest-metadata-1.arc for this job
154        writePreharvestMetadata(job, metadataEntries, crawldir);
155
156        files.writeSeedsTxt(job.getSeedListAsString());
157
158        files.writeOrderXml(job.getOrderXMLdoc());
159        // Only retrieve index if deduplication is not disabled in the template.
160        if (job.getOrderXMLdoc().IsDeduplicationEnabled()) {
161            log.debug("Deduplication enabled. Fetching deduplication index..");
162            files.setIndexDir(fetchDeduplicateIndex(metadataEntries));
163        } else {
164            log.debug("Deduplication disabled.");
165        }
166
167        // Create Heritrix arcs directory before starting Heritrix to ensure
168        // the arcs directory exists in advance.
169        boolean created = files.getArcsDir().mkdir();
170        if (!created) {
171            log.warn("Unable to create arcsdir: {}", files.getArcsDir());
172        }
173        // Create Heritrix warcs directory before starting Heritrix to ensure
174        // the warcs directory exists in advance.
175        created = files.getWarcsDir().mkdir();
176        if (!created) {
177            log.warn("Unable to create warcsdir: {}", files.getWarcsDir());
178        }
179
180        return files;
181    }
182
183    /**
184     * This method attempts to retrieve the Heritrix recover log from the job which this job tries to continue. If
185     * successful, the Heritrix template is updated accordingly.
186     *
187     * @param job The harvest Job object containing various harvest setup data.
188     * @param files Heritrix files related to this harvestjob.
189     */
190    private void tryToRetrieveRecoverLog(Job job, HeritrixFiles files) {
191        Long previousJob = job.getContinuationOf();
192        List<CDXRecord> metaCDXes = null;
193        try {
194            metaCDXes = getMetadataCDXRecordsForJob(previousJob);
195        } catch (IOFailure e) {
196            log.debug("Failed to retrive CDX of metatadata records. "
197                    + "Maybe the metadata arcfile for job {} does not exist in repository", previousJob, e);
198        }
199
200        CDXRecord recoverlogCDX = null;
201        if (metaCDXes != null) {
202            for (CDXRecord cdx : metaCDXes) {
203                if (cdx.getURL().matches(MetadataFile.RECOVER_LOG_PATTERN)) {
204                    recoverlogCDX = cdx;
205                }
206            }
207            if (recoverlogCDX == null) {
208                log.debug("A recover.gz log file was not found in metadata-arcfile");
209            } else {
210                log.debug("recover.gz log found in metadata-arcfile");
211            }
212        }
213
214        BitarchiveRecord br = null;
215        if (recoverlogCDX != null) { // Retrieve recover.gz from metadata.arc file
216            br = ArcRepositoryClientFactory.getViewerInstance().get(recoverlogCDX.getArcfile(),
217                    recoverlogCDX.getOffset());
218            if (br != null) {
219                log.debug("recover.gz log retrieved from metadata-arcfile");
220                if (files.writeRecoverBackupfile(br.getData())) {
221                    // modify order.xml, so Heritrix recover-path points
222                    // to the retrieved recoverlog
223                    insertHeritrixRecoverPathInOrderXML(job, files);
224                } else {
225                    log.warn("Failed to retrieve and write recoverlog to disk.");
226                }
227            } else {
228                log.debug("recover.gz log not retrieved from metadata-arcfile");
229            }
230        }
231    }
232
233    /**
234     * Insert the correct recoverpath in the order.xml for the given harvestjob.
235     *
236     * @param job A harvestjob
237     * @param files Heritrix files related to this harvestjob.
238     */
239    private void insertHeritrixRecoverPathInOrderXML(Job job, HeritrixFiles files) {
240        
241        HeritrixTemplate temp = job.getOrderXMLdoc(); 
242        temp.setRecoverlogNode(files.getRecoverBackupGzFile());
243        job.setOrderXMLDoc(temp); // Update template associated with job
244    }
245
246    /**
247     * Writes pre-harvest metadata to the "metadata" directory.
248     *
249     * @param harvestJob a given Job.
250     * @param metadata the list of metadata entries to write to metadata file.
251     * @param crawlDir the directory, where the metadata will be written.
252     * @throws IOFailure If there are errors in writing the metadata.
253     */
254    private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure {
255        if (metadata.size() == 0) {
256            // Do not generate preharvest metadata file for empty list
257            return;
258        }
259
260        // make sure that metadata directory exists
261        File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR);
262        metadataDir.mkdir();
263        if (!(metadataDir.exists() && metadataDir.isDirectory())) {
264            throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID()
265                    + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist.");
266        }
267
268        // Serializing the MetadataEntry objects to the metadataDir
269        MetadataEntry.storeMetadataToDisk(metadata, metadataDir);
270    }
271
272    /**
273     * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written.
274     *
275     * @param files Description of files involved in running Heritrix. Not Null.
276     * @throws ArgumentNotValid if an argument isn't valid.
277     */
278    public void runHarvest(HeritrixFiles files) throws ArgumentNotValid {
279        ArgumentNotValid.checkNotNull(files, "HeritrixFiles files");
280        HeritrixLauncher hl = HeritrixLauncherFactory.getInstance(files);
281        hl.doCrawl();
282    }
283
284    /**
285     * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files
286     * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object
287     * which is sent along in the crawlstatusmessage.
288     * <p>
289     * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts.
290     *
291     * @param files The HeritrixFiles object for this crawl. Not Null.
292     * @param errorMessage A place where error messages accumulate. Not Null.
293     * @param failedFiles List of files that failed to upload. Not Null.
294     * @return An object containing info about the domains harvested.
295     * @throws ArgumentNotValid if an argument isn't valid.
296     */
297    public HarvestReport storeFiles(HeritrixFiles files, StringBuilder errorMessage, List<File> failedFiles)
298            throws ArgumentNotValid {
299        ArgumentNotValid.checkNotNull(files, "HeritrixFiles files");
300        ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage");
301        ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles");
302        long jobID = files.getJobID();
303        try {
304            IngestableFiles inf = new IngestableFiles(files);
305
306            inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS);
307            // Create a metadata ARC file
308            HarvestDocumentation.documentHarvest(inf);
309            // Upload all files
310
311            // Check, if arcsdir or warcsdir is empty
312            // Send a notification, if this is the case
313            if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) {
314                String errMsg = "Probable error in Heritrix job setup. "
315                        + "No arcfiles or warcfiles generated by Heritrix for job " + jobID;
316                log.warn(errMsg);
317                NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
318            } else {
319                if (!inf.getArcFiles().isEmpty()) {
320                    uploadFiles(inf.getArcFiles(), errorMessage, failedFiles);
321                }
322                if (!inf.getWarcFiles().isEmpty()) {
323                    uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles);
324                }
325            }
326
327            // Now the ARC/WARC files have been uploaded,
328            // we finally upload the metadata archive file.
329            uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles);
330
331            // Make the harvestReport ready for uploading
332            HarvestReportGenerator hrg = new HarvestReportGenerator(files);
333            DomainStatsReport dsr = new DomainStatsReport(hrg.getDomainStatsMap(), hrg.getDefaultStopReason());
334            return HarvestReportFactory.generateHarvestReport(dsr);
335
336        } catch (IOFailure e) {
337            String errMsg = "IOFailure occurred, while trying to upload files";
338            log.warn(errMsg, e);
339            throw new IOFailure(errMsg, e);
340        }
341    }
342
343    /**
344     * Upload given files to the archive repository.
345     *
346     * @param files List of (ARC/WARC) files to upload.
347     * @param errorMessage Accumulator for error messages.
348     * @param failedFiles Accumulator for failed files.
349     */
350    private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) {
351        // Upload all archive files
352        if (files != null) {
353            for (File f : files) {
354                try {
355                    log.info("Uploading file '{}' to arcrepository.", f.getName());
356                    arcRepController.store(f);
357                    log.info("File '{}' uploaded successfully to arcrepository.", f.getName());
358                } catch (Exception e) {
359                    File oldJobsDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR));
360                    String errorMsg = "Error uploading arcfile '" + f.getAbsolutePath() + "' Will be moved to '"
361                            + oldJobsDir.getAbsolutePath() + "'";
362                    errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n");
363                    log.warn(errorMsg, e);
364                    failedFiles.add(f);
365                }
366            }
367        }
368    }
369
370    /**
371     * Retrieve the list of jobs for deduplicate reduction.
372     * <p>
373     * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning
374     * only on errors.
375     *
376     * @param metadataEntries list of metadataEntries.
377     * @return the list of jobs for deduplicate reduction.
378     */
379    private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) {
380        // find metadataEntry for duplicatereduction if any.
381        List<Long> result = new ArrayList<Long>();
382        for (MetadataEntry me : metadataEntries) {
383            if (me.isDuplicateReductionMetadataEntry()) {
384                String s = new String(me.getData());
385                if (s.isEmpty()) { // An empty string is now possible
386                    continue;
387                }
388                String[] longs = s.split(",");
389                for (String stringLong : longs) {
390                    try {
391                        result.add(Long.parseLong(stringLong));
392                    } catch (NumberFormatException e) {
393                        log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'"
394                                + " to a jobID. Ignoring.", stringLong, s, e);
395                    }
396                }
397            }
398        }
399        return result;
400    }
401
402    /**
403     * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs.
404     * The files will then be cached locally.
405     * <p>
406     * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and
407     * next time we ask for the same index, we will call the index server again. This will be handled well, though,
408     * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway.
409     *
410     * @param metadataEntries list of metadataEntries top get jobIDs from.
411     * @return a directory containing the index itself.
412     * @throws IOFailure on errors retrieving the index from the client. FIXME Better forgiving handling of no index
413     * available Add setting for disable deduplication if no index available
414     */
415    private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) {
416        // Get list of jobs, which should be used for duplicate reduction
417        // and retrieve a luceneIndex from the IndexServer
418        // based on the crawl.logs from these jobs and their CDX'es.
419        Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries));
420
421        // The client for requesting job index.
422        JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance();
423
424        // Request the index and return the index file.
425        Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction);
426        // Check which jobs didn't become part of the index.
427        Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction);
428        diffSet.removeAll(jobIndex.getIndexSet());
429        if (log.isDebugEnabled()) {
430            log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(),
431                    ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : ""));
432        }
433
434        return jobIndex.getIndexFile();
435    }
436
437    /**
438     * Submit a batch job to generate cdx for all metadata files for a job, and report result in a list.
439     *
440     * @param jobid The job to get cdx for.
441     * @return A list of cdx records.
442     * @throws ArgumentNotValid If jobid is 0 or negative.
443     * @throws IOFailure On trouble generating the cdx
444     */
445    public static List<CDXRecord> getMetadataCDXRecordsForJob(long jobid) {
446        ArgumentNotValid.checkPositive(jobid, "jobid");
447        FileBatchJob cdxJob = new ArchiveExtractCDXJob(false);
448        cdxJob.processOnlyFilesMatching(jobid + "-metadata-[0-9]+\\.(w)?arc(\\.gz)?");
449        File f;
450        try {
451            f = File.createTempFile(jobid + "-reports", ".cdx", FileUtils.getTempDir());
452        } catch (IOException e) {
453            throw new IOFailure("Could not create temporary file", e);
454        }
455        BatchStatus status = ArcRepositoryClientFactory.getViewerInstance().batch(cdxJob,
456                Settings.get(CommonSettings.USE_REPLICA_ID));
457        status.getResultFile().copyTo(f);
458        List<CDXRecord> records;
459        BufferedReader reader = null;
460        try {
461            reader = new BufferedReader(new FileReader(f));
462            records = new ArrayList<CDXRecord>();
463            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
464                String[] parts = line.split("\\s+");
465                CDXRecord record = new CDXRecord(parts);
466                records.add(record);
467            }
468        } catch (IOException e) {
469            throw new IOFailure("Unable to read results from file '" + f + "'", e);
470        } finally {
471            IOUtils.closeQuietly(reader);
472            FileUtils.remove(f);
473        }
474        return records;
475    }
476
477}