001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.scheduler; 024 025import java.util.ArrayList; 026import java.util.Date; 027import java.util.Iterator; 028import java.util.List; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import dk.netarkivet.common.distribute.JMSConnection; 034import dk.netarkivet.common.exceptions.ArgumentNotValid; 035import dk.netarkivet.common.exceptions.IOFailure; 036import dk.netarkivet.common.utils.ExceptionUtils; 037import dk.netarkivet.common.utils.Settings; 038import dk.netarkivet.harvester.HarvesterSettings; 039import dk.netarkivet.harvester.datamodel.AliasInfo; 040import dk.netarkivet.harvester.datamodel.HarvestChannel; 041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO; 042import dk.netarkivet.harvester.datamodel.HeritrixTemplate; 043import dk.netarkivet.harvester.datamodel.Job; 044import dk.netarkivet.harvester.datamodel.JobDAO; 045import dk.netarkivet.harvester.datamodel.JobStatus; 046import dk.netarkivet.harvester.datamodel.SparseFullHarvest; 047import dk.netarkivet.harvester.datamodel.SparsePartialHarvest; 048import dk.netarkivet.harvester.distribute.HarvesterChannels; 049import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage; 050import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 051import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 052 053/** 054 * This class handles dispatching of Harvest jobs to the Harvesters. 055 */ 056public class JobDispatcher { 057 058 /** The logger to use. */ 059 private static final Logger log = LoggerFactory.getLogger(JobDispatcher.class); 060 061 /** Connection to JMS provider. */ 062 private final JMSConnection jmsConnection; 063 064 private final HarvestDefinitionDAO harvestDefinitionDAO; 065 private final JobDAO jobDao; 066 067 /** 068 * @param jmsConnection The JMS connection to use. 069 * @param hDao The HarvestDefinitionDAO to use. 070 */ 071 public JobDispatcher(JMSConnection jmsConnection, HarvestDefinitionDAO hDao, JobDAO jobDao) { 072 log.info("Creating JobDispatcher"); 073 ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection"); 074 ArgumentNotValid.checkNotNull(hDao, "hDao"); 075 ArgumentNotValid.checkNotNull(jobDao, "jobDao"); 076 this.jmsConnection = jmsConnection; 077 this.harvestDefinitionDAO = hDao; 078 this.jobDao = jobDao; 079 } 080 081 /** 082 * Submit the next new job (the one with the lowest ID) with the given priority, and updates the internal counter as 083 * needed. If no jobs are ready for the given priority, nothing is done 084 * 085 * @param channel the Channel to use for the job. 086 */ 087 protected void submitNextNewJob(HarvestChannel channel) { 088 Job jobToSubmit = prepareNextJobForSubmission(channel); 089 if (jobToSubmit == null) { 090 log.trace("No {} jobs to be run at this time", channel.getName()); 091 } else { 092 log.debug("Submitting new {} job {}", channel.getName(), jobToSubmit.getJobID()); 093 try { 094 List<MetadataEntry> metadata = createMetadata(jobToSubmit); 095 096 // Extract documentary information about the harvest 097 String hName = harvestDefinitionDAO.getHarvestName(jobToSubmit.getOrigHarvestDefinitionID()); 098 099 String schedule = ""; 100 String hdComments = ""; 101 String hdAudience = ""; 102 SparseFullHarvest fh = harvestDefinitionDAO.getSparseFullHarvest(hName); 103 if (fh != null) { 104 hdComments = fh.getComments(); 105 } else { 106 SparsePartialHarvest ph = harvestDefinitionDAO.getSparsePartialHarvest(hName); 107 108 if (ph == null) { 109 throw new ArgumentNotValid("No harvest definition found for id '" 110 + jobToSubmit.getOrigHarvestDefinitionID() + "', named '" + hName + "'"); 111 } 112 113 // The schedule name can only be documented for 114 // selective crawls. 115 schedule = ph.getScheduleName(); 116 117 hdComments = ph.getComments(); 118 hdAudience = ph.getAudience(); 119 } 120 121 doOneCrawl(jobToSubmit, hName, hdComments, schedule, channel, hdAudience, metadata); 122 123 log.info("Job #{} submitted", jobToSubmit.getJobID()); 124 } catch (Throwable t) { 125 String message = "Error while dispatching job " + jobToSubmit.getJobID() 126 + ". Job status changed to FAILED"; 127 log.warn(message, t); 128 if (jobToSubmit != null) { 129 jobToSubmit.setStatus(JobStatus.FAILED); 130 jobToSubmit.appendHarvestErrors(message); 131 jobToSubmit.appendHarvestErrorDetails(ExceptionUtils.getStackTrace(t)); 132 jobDao.update(jobToSubmit); 133 } 134 } 135 } 136 } 137 138 /** 139 * Will read the next job ready to run from the db and set the job to submitted. If no jobs are ready, null will be 140 * returned. 141 * <p> 142 * Note the operation is synchronized, so only one thread may start the submission of a job. 143 * 144 * @param channel the job channel. 145 * @return A job ready to be submitted. 146 */ 147 private synchronized Job prepareNextJobForSubmission(HarvestChannel channel) { 148 Iterator<Long> jobsToSubmit = jobDao.getAllJobIds(JobStatus.NEW, channel); 149 if (!jobsToSubmit.hasNext()) { 150 return null; 151 } else { 152 final long jobID = jobsToSubmit.next(); 153 Job jobToSubmit = jobDao.read(jobID); 154 jobToSubmit.setStatus(JobStatus.SUBMITTED); 155 jobToSubmit.setSubmittedDate(new Date()); 156 jobDao.update(jobToSubmit); 157 return jobToSubmit; 158 } 159 } 160 161 /** 162 * Creates the metadata for the indicated job. This includes: 163 * <ol> 164 * <li>Alias metadata. 165 * <li>DuplicationReduction MetadataEntry, if Deduplication //is enabled. 166 * 167 * @param job The job to create meta data for. 168 */ 169 private List<MetadataEntry> createMetadata(Job job) { 170 List<MetadataEntry> metadata = new ArrayList<MetadataEntry>(); 171 List<AliasInfo> aliasInfos = jobDao.getJobAliasInfo(job); 172 MetadataEntry aliasMetadataEntry = MetadataEntry.makeAliasMetadataEntry(aliasInfos, 173 job.getOrigHarvestDefinitionID(), job.getHarvestNum(), job.getJobID()); 174 if (aliasMetadataEntry != null) { 175 // Add an entry documenting that this job 176 // contains domains that has aliases 177 metadata.add(aliasMetadataEntry); 178 } 179 180 if (job.getOrderXMLdoc().IsDeduplicationEnabled()) { 181 MetadataEntry duplicateReductionMetadataEntry = MetadataEntry.makeDuplicateReductionMetadataEntry( 182 jobDao.getJobIDsForDuplicateReduction(job.getJobID()), job.getOrigHarvestDefinitionID(), 183 job.getHarvestNum(), job.getJobID()); 184 // Always add a duplicationReductionMetadataEntry when deduplication is enabled 185 // even if the list of JobIDs for deduplication is empty! 186 metadata.add(duplicateReductionMetadataEntry); 187 } 188 return metadata; 189 } 190 191 /** 192 * Submit an doOneCrawl request to a HarvestControllerServer. 193 * 194 * @param job the specific job to send 195 * @param origHarvestName the harvest definition's name 196 * @param origHarvestDesc the harvest definition's description 197 * @param origHarvestSchedule the harvest definition schedule name 198 * @param channel the channel to which the job should be sent 199 * @param metadata pre-harvest metadata to store in (w)arcfile. 200 * @param origHarvestAudience the audience for the data generated by harvest definitions. 201 * @throws ArgumentNotValid one of the parameters are null 202 * @throws IOFailure if unable to send the doOneCrawl request to a harvestControllerServer 203 */ 204 public void doOneCrawl(Job job, String origHarvestName, String origHarvestDesc, String origHarvestSchedule, 205 HarvestChannel channel, String origHarvestAudience, List<MetadataEntry> metadata) throws ArgumentNotValid, 206 IOFailure { 207 ArgumentNotValid.checkNotNull(job, "job"); 208 ArgumentNotValid.checkNotNull(metadata, "metadata"); 209 210 if (origHarvestAudience != null && !origHarvestAudience.isEmpty()) { 211 job.setHarvestAudience(origHarvestAudience); 212 } 213 if (usingWarcAsArchiveFormat()) { 214 log.info("As we're using WARC as archiveFormat WarcInfoMetadata is now added to the template"); 215 HeritrixTemplate ht = job.getOrderXMLdoc(); 216 ht.insertWarcInfoMetadata(job, origHarvestName, origHarvestSchedule, Settings.get(HarvesterSettings.PERFORMER)); 217 job.setOrderXMLDoc(ht); 218 } else { 219 log.info("As we're using ARC as archiveFormat no WarcInfoMetadata was added to the template"); 220 } 221 222 DoOneCrawlMessage nMsg = new DoOneCrawlMessage(job, HarvesterChannels.getHarvestJobChannelId(channel), 223 new HarvestDefinitionInfo(origHarvestName, origHarvestDesc, origHarvestSchedule), metadata); 224 log.debug("Send crawl request: {}", nMsg); 225 jmsConnection.send(nMsg); 226 } 227 228 private boolean usingWarcAsArchiveFormat() { 229 return Settings.get(HarvesterSettings.HERITRIX_ARCHIVE_FORMAT).equalsIgnoreCase("warc"); 230 } 231 232}