001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler; 024 025import java.util.ArrayList; 026import java.util.Date; 027import java.util.Iterator; 028import java.util.List; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import dk.netarkivet.common.distribute.JMSConnection; 034import dk.netarkivet.common.exceptions.ArgumentNotValid; 035import dk.netarkivet.common.exceptions.IOFailure; 036import dk.netarkivet.common.utils.ExceptionUtils; 037import dk.netarkivet.common.utils.Settings; 038import dk.netarkivet.harvester.HarvesterSettings; 039import dk.netarkivet.harvester.datamodel.AliasInfo; 040import dk.netarkivet.harvester.datamodel.HarvestChannel; 041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 042import dk.netarkivet.harvester.datamodel.HeritrixTemplate; 043import dk.netarkivet.harvester.datamodel.Job; 044import dk.netarkivet.harvester.datamodel.JobDAO; 045import dk.netarkivet.harvester.datamodel.JobStatus; 046import dk.netarkivet.harvester.datamodel.SparseFullHarvest; 047import dk.netarkivet.harvester.datamodel.SparsePartialHarvest; 048import dk.netarkivet.harvester.distribute.HarvesterChannels; 049import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage; 050import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 051import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 052 053/** 054 * This class handles dispatching of Harvest jobs to the Harvesters. 055 */ 056public class JobDispatcher { 057 058 /** The logger to use. */ 059 private static final Logger log = LoggerFactory.getLogger(JobDispatcher.class); 060 061 /** Connection to JMS provider. */ 062 private final JMSConnection jmsConnection; 063 064 private final HarvestDefinitionDAO harvestDefinitionDAO; 065 private final JobDAO jobDao; 066 067 /** 068 * @param jmsConnection The JMS connection to use. 069 * @param hDao The HarvestDefinitionDAO to use. 070 */ 071 public JobDispatcher(JMSConnection jmsConnection, HarvestDefinitionDAO hDao, JobDAO jobDao) { 072 log.info("Creating JobDispatcher"); 073 ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection"); 074 ArgumentNotValid.checkNotNull(hDao, "hDao"); 075 ArgumentNotValid.checkNotNull(jobDao, "jobDao"); 076 this.jmsConnection = jmsConnection; 077 this.harvestDefinitionDAO = hDao; 078 this.jobDao = jobDao; 079 } 080 081 /** 082 * Submit the next new job (the one with the lowest ID) with the given priority, and updates the internal counter as 083 * needed. If no jobs are ready for the given priority, nothing is done 084 * 085 * @param channel the Channel to use for the job. 086 */ 087 protected void submitNextNewJob(HarvestChannel channel) { 088 Job jobToSubmit = prepareNextJobForSubmission(channel); 089 if (jobToSubmit == null) { 090 log.trace("No {} jobs to be run at this time", channel.getName()); 091 } else { 092 log.debug("Submitting new {} job {}", channel.getName(), jobToSubmit.getJobID()); 093 try { 094 List<MetadataEntry> metadata = createMetadata(jobToSubmit); 095 096 // Extract documentary information about the harvest 097 String hName = harvestDefinitionDAO.getHarvestName(jobToSubmit.getOrigHarvestDefinitionID()); 098 099 String schedule = ""; 100 String hdComments = ""; 101 String hdAudience = ""; 102 SparseFullHarvest fh = harvestDefinitionDAO.getSparseFullHarvest(hName); 103 if (fh != null) { 104 hdComments = fh.getComments(); 105 } else { 106 SparsePartialHarvest ph = harvestDefinitionDAO.getSparsePartialHarvest(hName); 107 108 if (ph == null) { 109 throw new ArgumentNotValid("No harvest definition found for id '" 110 + jobToSubmit.getOrigHarvestDefinitionID() + "', named '" + hName + "'"); 111 } 112 113 // The schedule name can only be documented for 114 // selective crawls. 115 schedule = ph.getScheduleName(); 116 117 hdComments = ph.getComments(); 118 hdAudience = ph.getAudience(); 119 } 120 121 doOneCrawl(jobToSubmit, hName, hdComments, schedule, channel, hdAudience, metadata); 122 123 log.info("Job #{} submitted", jobToSubmit.getJobID()); 124 } catch (Throwable t) { 125 String message = "Error while dispatching job " + jobToSubmit.getJobID() 126 + ". Job status changed to FAILED"; 127 log.warn(message, t); 128 if (jobToSubmit != null) { 129 jobToSubmit.setStatus(JobStatus.FAILED); 130 jobToSubmit.appendHarvestErrors(message); 131 jobToSubmit.appendHarvestErrorDetails(ExceptionUtils.getStackTrace(t)); 132 jobDao.update(jobToSubmit); 133 } 134 } 135 } 136 } 137 138 /** 139 * Will read the next job ready to run from the db and set the job to submitted. If no jobs are ready, null will be 140 * returned. 141 * <p> 142 * Note the operation is synchronized, so only one thread may start the submission of a job. 143 * 144 * @param channel the job channel. 145 * @return A job ready to be submitted. 146 */ 147 private synchronized Job prepareNextJobForSubmission(HarvestChannel channel) { 148 Iterator<Long> jobsToSubmit = jobDao.getAllJobIds(JobStatus.NEW, channel); 149 if (!jobsToSubmit.hasNext()) { 150 return null; 151 } else { 152 final long jobID = jobsToSubmit.next(); 153 Job jobToSubmit = jobDao.read(jobID); 154 jobToSubmit.setStatus(JobStatus.SUBMITTED); 155 jobToSubmit.setSubmittedDate(new Date()); 156 jobDao.update(jobToSubmit); 157 return jobToSubmit; 158 } 159 } 160 161 /** 162 * Creates the metadata for the indicated job. This includes: 163 * <ol> 164 * <li>Alias metadata. 165 * <li>DuplicationReduction MetadataEntry, if Deduplication //is enabled. 166 * 167 * @param job The job to create meta data for. 168 */ 169 private List<MetadataEntry> createMetadata(Job job) { 170 List<MetadataEntry> metadata = new ArrayList<MetadataEntry>(); 171 List<AliasInfo> aliasInfos = jobDao.getJobAliasInfo(job); 172 MetadataEntry aliasMetadataEntry = MetadataEntry.makeAliasMetadataEntry(aliasInfos, 173 job.getOrigHarvestDefinitionID(), job.getHarvestNum(), job.getJobID()); 174 if (aliasMetadataEntry != null) { 175 // Add an entry documenting that this job 176 // contains domains that has aliases 177 metadata.add(aliasMetadataEntry); 178 log.info("Added alias metadataEntry for job {} ", job.getJobID()); 179 } 180 181 if (job.getOrderXMLdoc().IsDeduplicationEnabled()) { 182 MetadataEntry duplicateReductionMetadataEntry = MetadataEntry.makeDuplicateReductionMetadataEntry( 183 jobDao.getJobIDsForDuplicateReduction(job.getJobID()), job.getOrigHarvestDefinitionID(), 184 job.getHarvestNum(), job.getJobID()); 185 // Always add a duplicationReductionMetadataEntry when deduplication is enabled 186 // even if the list of JobIDs for deduplication is empty! 187 metadata.add(duplicateReductionMetadataEntry); 188 log.info("Added duplicateReductionMetadataEntry metadataEntry for job {} ", job.getJobID()); 189 } 190 return metadata; 191 } 192 193 /** 194 * Submit an doOneCrawl request to a HarvestControllerServer. 195 * 196 * @param job the specific job to send 197 * @param origHarvestName the harvest definition's name 198 * @param origHarvestDesc the harvest definition's description 199 * @param origHarvestSchedule the harvest definition schedule name 200 * @param channel the channel to which the job should be sent 201 * @param metadata pre-harvest metadata to store in (w)arcfile. 202 * @param origHarvestAudience the audience for the data generated by harvest definitions. 203 * @throws ArgumentNotValid one of the parameters are null 204 * @throws IOFailure if unable to send the doOneCrawl request to a harvestControllerServer 205 */ 206 public void doOneCrawl(Job job, String origHarvestName, String origHarvestDesc, String origHarvestSchedule, 207 HarvestChannel channel, String origHarvestAudience, List<MetadataEntry> metadata) throws ArgumentNotValid, 208 IOFailure { 209 ArgumentNotValid.checkNotNull(job, "job"); 210 ArgumentNotValid.checkNotNull(metadata, "metadata"); 211 212 if (origHarvestAudience != null && !origHarvestAudience.isEmpty()) { 213 job.setHarvestAudience(origHarvestAudience); 214 } 215 if (usingWarcAsArchiveFormat()) { 216 log.info("As we're using WARC as archiveFormat WarcInfoMetadata is now added to the template"); 217 HeritrixTemplate ht = job.getOrderXMLdoc(); 218 if (job.getContinuationOf() == null ) { 219 ht.insertWarcInfoMetadata(job, origHarvestName, origHarvestSchedule, 220 Settings.get(HarvesterSettings.PERFORMER)); 221 } else { 222 log.info("Job is a continuation of " + job.getContinuationOf() + " so no need to replace WarcInfoMetadata"); 223 } 224 job.setOrderXMLDoc(ht); 225 } else { 226 log.info("As we're using ARC as archiveFormat no WarcInfoMetadata was added to the template"); 227 } 228 229 DoOneCrawlMessage nMsg = new DoOneCrawlMessage(job, HarvesterChannels.getHarvestJobChannelId(channel), 230 new HarvestDefinitionInfo(origHarvestName, origHarvestDesc, origHarvestSchedule), metadata); 231 log.debug("Send crawl request: {}", nMsg); 232 jmsConnection.send(nMsg); 233 } 234 235 private boolean usingWarcAsArchiveFormat() { 236 return Settings.get(HarvesterSettings.HERITRIX_ARCHIVE_FORMAT).equalsIgnoreCase("warc"); 237 } 238 239}