001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.heritrix3; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import dk.netarkivet.common.distribute.indexserver.Index; 035import dk.netarkivet.common.distribute.indexserver.IndexClientFactory; 036import dk.netarkivet.common.distribute.indexserver.JobIndexCache; 037import dk.netarkivet.common.exceptions.ArgumentNotValid; 038import dk.netarkivet.common.exceptions.IOFailure; 039import dk.netarkivet.common.exceptions.IllegalState; 040import dk.netarkivet.common.exceptions.PermissionDenied; 041import dk.netarkivet.common.utils.ExceptionUtils; 042import dk.netarkivet.common.utils.FileUtils; 043import dk.netarkivet.common.utils.Settings; 044import dk.netarkivet.common.utils.StringUtils; 045import dk.netarkivet.harvester.HarvesterSettings; 046import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 047import dk.netarkivet.harvester.datamodel.Job; 048import dk.netarkivet.harvester.harvesting.PersistentJobData; 049import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 050 051public class HarvestJob { 052 053 /** The instance logger. */ 054 private static final Logger log = LoggerFactory.getLogger(HarvestJob.class); 055 056 private HarvestControllerServer hcs; 057 058 /** The harvester Job in this thread. */ 059 private Job job; 060 061 /** 062 * Constructor. 063 * @param hcs a HarvestControllerServer instance 064 */ 065 public HarvestJob(HarvestControllerServer hcs) { 066 this.hcs = hcs; 067 } 068 069 private File crawlDir; 070 071 private Heritrix3Files files; 072 073 private String jobName; 074 075 /** 076 * Initialization of the harvestJob. 077 * @param job A job from the jobs table in the harvestdatabase 078 * @param origHarvestInfo metadata about the harvest 079 * @param metadataEntries entries for the metadata file for the harvest 080 */ 081 public void init(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) { 082 this.job = job; 083 jobName = job.getJobID() + "_" + System.currentTimeMillis(); 084 crawlDir = createCrawlDir(); 085 files = writeHarvestFiles(crawlDir, job, origHarvestInfo, metadataEntries); 086 } 087 /** 088 * @return the Heritrix3Files object initialized with the init() method. 089 */ 090 public Heritrix3Files getHeritrix3Files() { 091 return files; 092 } 093 094 /** 095 * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written. 096 * 097 * @throws ArgumentNotValid if an argument isn't valid. 098 */ 099 public void runHarvest() throws ArgumentNotValid { 100 log.info("Starting crawl of job : {}", job.getJobID()); 101 HeritrixLauncherAbstract hl = HeritrixLauncherFactory.getInstance(files, jobName); 102 hl.doCrawl(); 103 } 104 105 /** 106 * Create the crawl dir, but make sure a message is sent if there is a problem. 107 * 108 * @return The directory that the crawl will take place in. 109 * @throws PermissionDenied if the directory cannot be created. 110 */ 111 public File createCrawlDir() { 112 // The directory where arcfiles are stored (crawldir in the above 113 // description) 114 File crawlDir = null; 115 // Create the crawldir. This is done here in order to be able 116 // to send a proper message if something goes wrong. 117 try { 118 File baseCrawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 119 crawlDir = new File(baseCrawlDir, jobName); 120 FileUtils.createDir(crawlDir); 121 log.info("Created crawl directory: '{}'", crawlDir); 122 return crawlDir; 123 } catch (PermissionDenied e) { 124 String message = "Couldn't create the directory for job " + job.getJobID(); 125 log.warn(message, e); 126 hcs.sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e)); 127 throw e; 128 } 129 } 130 131 /** 132 * Writes the files needed to start a harvest.. 133 * 134 * @param crawldir The directory that the crawl should take place in. 135 * @param job The Job object containing various harvest setup data. 136 * @param hdi The object encapsulating documentary information about the harvest. 137 * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use. 138 * @return An object encapsulating where these files have been written. 139 */ 140 public Heritrix3Files writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi, 141 List<MetadataEntry> metadataEntries) { 142 143 final Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawldir, job); 144 145 // If this job is a job that tries to continue a previous job 146 // using the Heritrix recover.gz log, and this feature is enabled, 147 // then try to fetch the recover.log from the metadata-arc-file. 148 if (job.getContinuationOf() != null && Settings.getBoolean(HarvesterSettings.RECOVERlOG_CONTINUATION_ENABLED)) { 149 log.warn("Continuation of crawl from a RecoverLog is not implemented for Heritrix3!"); 150 } 151 152 // Create harvestInfo file in crawldir 153 // & create preharvest-metadata-1.arc 154 log.debug("Writing persistent job data for job {} to crawldir '{}'", job.getJobID(), crawldir); 155 if (!PersistentJobData.existsIn(crawldir)) { 156 // Write job data to persistent storage (harvestinfo file) 157 new PersistentJobData(crawldir).write(job, hdi); 158 } else { 159 throw new IllegalState("We already found a harvestInfo.xml for the crawldir " + crawldir.getAbsolutePath()); 160 } 161 162 // Create jobId-preharvest-metadata-1.arc for this job 163 writePreharvestMetadata(job, metadataEntries, crawldir); 164 165 files.writeSeedsTxt(job.getSeedListAsString()); 166 167 files.writeOrderXml(job.getOrderXMLdoc()); 168 // Only retrieve index if deduplication is not disabled in the template. 169 if (job.getOrderXMLdoc().IsDeduplicationEnabled()) { 170 log.debug("Deduplication enabled. Fetching deduplication index.."); 171 files.setIndexDir(fetchDeduplicateIndex(metadataEntries)); 172 } else { 173 log.debug("Deduplication disabled."); 174 } 175 176 return files; 177 } 178 179 /** 180 * Writes pre-harvest metadata to the "metadata" directory. 181 * 182 * @param harvestJob a given Job. 183 * @param metadata the list of metadata entries to write to metadata file. 184 * @param crawlDir the directory, where the metadata will be written. 185 * @throws IOFailure If there are errors in writing the metadata. 186 */ 187 private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure { 188 if (metadata.size() == 0) { 189 // Do not generate preharvest metadata file for empty list 190 return; 191 } 192 193 // make sure that metadata directory exists 194 File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR); 195 metadataDir.mkdir(); 196 if (!(metadataDir.exists() && metadataDir.isDirectory())) { 197 throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID() 198 + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist."); 199 } 200 201 // Serializing the MetadataEntry objects to the metadataDir 202 MetadataEntry.storeMetadataToDisk(metadata, metadataDir); 203 } 204 205 /** 206 * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs. 207 * The files will then be cached locally. 208 * <p> 209 * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and 210 * next time we ask for the same index, we will call the index server again. This will be handled well, though, 211 * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway. 212 * 213 * @param metadataEntries list of metadataEntries top get jobIDs from. 214 * @return a directory containing the index itself. 215 * @throws IOFailure on errors retrieving the index from the client. 216 * FIXME Better forgiving handling of no index available. Add setting for disable deduplication if no index available 217 */ 218 private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) { 219 // Get list of jobs, which should be used for duplicate reduction 220 // and retrieve a luceneIndex from the IndexServer 221 // based on the crawl.logs from these jobs and their CDX'es. 222 Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries)); 223 224 // The client for requesting job index. 225 JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance(); 226 227 // Request the index and return the index file. 228 Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction); 229 // Check which jobs didn't become part of the index. 230 Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction); 231 diffSet.removeAll(jobIndex.getIndexSet()); 232 if (log.isDebugEnabled()) { 233 log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(), 234 ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : "")); 235 } 236 237 return jobIndex.getIndexFile(); 238 } 239 240 /** 241 * Retrieve the list of jobs for deduplicate reduction. 242 * <p> 243 * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning 244 * only on errors. 245 * 246 * @param metadataEntries list of metadataEntries. 247 * @return the list of jobs for deduplicate reduction. 248 */ 249 private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) { 250 // find metadataEntry for duplicatereduction if any. 251 List<Long> result = new ArrayList<Long>(); 252 for (MetadataEntry me : metadataEntries) { 253 if (me.isDuplicateReductionMetadataEntry()) { 254 String s = new String(me.getData()); 255 if (s.isEmpty()) { // An empty string is now possible 256 continue; 257 } 258 String[] longs = s.split(","); 259 for (String stringLong : longs) { 260 try { 261 result.add(Long.parseLong(stringLong)); 262 } catch (NumberFormatException e) { 263 log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'" 264 + " to a jobID. Ignoring.", stringLong, s, e); 265 } 266 } 267 } 268 } 269 return result; 270 } 271 272}