001package dk.netarkivet.harvester.heritrix3; 002 003import java.io.File; 004import java.util.ArrayList; 005import java.util.HashSet; 006import java.util.List; 007import java.util.Set; 008 009import org.slf4j.Logger; 010import org.slf4j.LoggerFactory; 011 012import dk.netarkivet.common.distribute.indexserver.Index; 013import dk.netarkivet.common.distribute.indexserver.IndexClientFactory; 014import dk.netarkivet.common.distribute.indexserver.JobIndexCache; 015import dk.netarkivet.common.exceptions.ArgumentNotValid; 016import dk.netarkivet.common.exceptions.IOFailure; 017import dk.netarkivet.common.exceptions.PermissionDenied; 018import dk.netarkivet.common.utils.ExceptionUtils; 019import dk.netarkivet.common.utils.FileUtils; 020import dk.netarkivet.common.utils.Settings; 021import dk.netarkivet.common.utils.StringUtils; 022import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 023import dk.netarkivet.harvester.datamodel.Job; 024import dk.netarkivet.harvester.harvesting.PersistentJobData; 025import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 026 027public class HarvestJob { 028 029 /** The instance logger. */ 030 private static final Logger log = LoggerFactory.getLogger(HarvestJob.class); 031 032 private HarvestControllerServer hcs; 033 034 /** The harvester Job in this thread. */ 035 private Job job; 036 037 /** Stores documentary information about the harvest. */ 038 //private HarvestDefinitionInfo origHarvestInfo; 039 040 /** The list of metadata associated with this Job. */ 041 //private List<MetadataEntry> metadataEntries; 042 043 public HarvestJob(HarvestControllerServer hcs) { 044 this.hcs = hcs; 045 } 046 047 private File crawlDir; 048 049 private Heritrix3Files files; 050 051 private String jobName; 052 053 public void init(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) { 054 this.job = job; 055 //this.origHarvestInfo = origHarvestInfo; 056 //this.metadataEntries = metadataEntries; 057 jobName = job.getJobID() + "_" + System.currentTimeMillis(); 058 crawlDir = createCrawlDir(); 059 files = writeHarvestFiles(crawlDir, job, origHarvestInfo, metadataEntries); 060 } 061 062 public Heritrix3Files getHeritrix3Files() { 063 return files; 064 } 065 066 /** 067 * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written. 068 * 069 * @throws ArgumentNotValid if an argument isn't valid. 070 */ 071 public void runHarvest() throws ArgumentNotValid { 072 log.info("Starting crawl of job : {}", job.getJobID()); 073 HeritrixLauncherAbstract hl = HeritrixLauncherFactory.getInstance(files, jobName); 074 hl.doCrawl(); 075 } 076 077 /** 078 * Create the crawl dir, but make sure a message is sent if there is a problem. 079 * 080 * @return The directory that the crawl will take place in. 081 * @throws PermissionDenied if the directory cannot be created. 082 */ 083 public File createCrawlDir() { 084 // The directory where arcfiles are stored (crawldir in the above 085 // description) 086 File crawlDir = null; 087 // Create the crawldir. This is done here in order to be able 088 // to send a proper message if something goes wrong. 089 try { 090 File baseCrawlDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_SERVERDIR)); 091 crawlDir = new File(baseCrawlDir, jobName); 092 FileUtils.createDir(crawlDir); 093 log.info("Created crawl directory: '{}'", crawlDir); 094 return crawlDir; 095 } catch (PermissionDenied e) { 096 String message = "Couldn't create the directory for job " + job.getJobID(); 097 log.warn(message, e); 098 hcs.sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e)); 099 throw e; 100 } 101 } 102 103 /** 104 * Writes the files needed to start a harvest.. 105 * 106 * @param crawldir The directory that the crawl should take place in. 107 * @param job The Job object containing various harvest setup data. 108 * @param hdi The object encapsulating documentary information about the harvest. 109 * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use. 110 * @return An object encapsulating where these files have been written. 111 */ 112 public Heritrix3Files writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi, 113 List<MetadataEntry> metadataEntries) { 114 115 final Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawldir, job); 116 117 // If this job is a job that tries to continue a previous job 118 // using the Heritrix recover.gz log, and this feature is enabled, 119 // then try to fetch the recover.log from the metadata-arc-file. 120 if (job.getContinuationOf() != null && Settings.getBoolean(Heritrix3Settings.RECOVERlOG_CONTINUATION_ENABLED)) { 121 //tryToRetrieveRecoverLog(job, files); 122 log.warn("Continuation of from a RecoverLog is disabled for now!"); 123 } 124 125 // Create harvestInfo file in crawldir 126 // & create preharvest-metadata-1.arc 127 log.debug("Writing persistent job data for job {} to crawldir '{}'", job.getJobID(), crawldir); 128 // TODO Check that harvestInfo does not yet exist 129 130 // Write job data to persistent storage (harvestinfo file) 131 new PersistentJobData(files.getCrawlDir()).write(job, hdi); 132 133 // Create jobId-preharvest-metadata-1.arc for this job 134 writePreharvestMetadata(job, metadataEntries, crawldir); 135 136 files.writeSeedsTxt(job.getSeedListAsString()); 137 138 files.writeOrderXml(job.getOrderXMLdoc()); 139 // Only retrieve index if deduplication is not disabled in the template. 140 if (job.getOrderXMLdoc().IsDeduplicationEnabled()) { 141 log.debug("Deduplication enabled. Fetching deduplication index.."); 142 files.setIndexDir(fetchDeduplicateIndex(metadataEntries)); 143 } else { 144 log.debug("Deduplication disabled."); 145 } 146 147 return files; 148 } 149 150 /** 151 * Writes pre-harvest metadata to the "metadata" directory. 152 * 153 * @param harvestJob a given Job. 154 * @param metadata the list of metadata entries to write to metadata file. 155 * @param crawlDir the directory, where the metadata will be written. 156 * @throws IOFailure If there are errors in writing the metadata. 157 */ 158 private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure { 159 if (metadata.size() == 0) { 160 // Do not generate preharvest metadata file for empty list 161 return; 162 } 163 164 // make sure that metadata directory exists 165 File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR); 166 metadataDir.mkdir(); 167 if (!(metadataDir.exists() && metadataDir.isDirectory())) { 168 throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID() 169 + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist."); 170 } 171 172 // Serializing the MetadataEntry objects to the metadataDir 173 MetadataEntry.storeMetadataToDisk(metadata, metadataDir); 174 } 175 176 /** 177 * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs. 178 * The files will then be cached locally. 179 * <p> 180 * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and 181 * next time we ask for the same index, we will call the index server again. This will be handled well, though, 182 * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway. 183 * 184 * @param metadataEntries list of metadataEntries top get jobIDs from. 185 * @return a directory containing the index itself. 186 * @throws IOFailure on errors retrieving the index from the client. FIXME Better forgiving handling of no index 187 * available Add setting for disable deduplication if no index available 188 */ 189 private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) { 190 // Get list of jobs, which should be used for duplicate reduction 191 // and retrieve a luceneIndex from the IndexServer 192 // based on the crawl.logs from these jobs and their CDX'es. 193 Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries)); 194 195 // The client for requesting job index. 196 JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance(); 197 198 // Request the index and return the index file. 199 Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction); 200 // Check which jobs didn't become part of the index. 201 Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction); 202 diffSet.removeAll(jobIndex.getIndexSet()); 203 if (log.isDebugEnabled()) { 204 log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(), 205 ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : "")); 206 } 207 208 return jobIndex.getIndexFile(); 209 } 210 211 /** 212 * Retrieve the list of jobs for deduplicate reduction. 213 * <p> 214 * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning 215 * only on errors. 216 * 217 * @param metadataEntries list of metadataEntries. 218 * @return the list of jobs for deduplicate reduction. 219 */ 220 private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) { 221 // find metadataEntry for duplicatereduction if any. 222 List<Long> result = new ArrayList<Long>(); 223 for (MetadataEntry me : metadataEntries) { 224 if (me.isDuplicateReductionMetadataEntry()) { 225 String s = new String(me.getData()); 226 if (s.isEmpty()) { // An empty string is now possible 227 continue; 228 } 229 String[] longs = s.split(","); 230 for (String stringLong : longs) { 231 try { 232 result.add(Long.parseLong(stringLong)); 233 } catch (NumberFormatException e) { 234 log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'" 235 + " to a jobID. Ignoring.", stringLong, s, e); 236 } 237 } 238 } 239 } 240 return result; 241 } 242 243}