001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.scheduler;
024
025import java.util.ArrayList;
026import java.util.Date;
027import java.util.Iterator;
028import java.util.List;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import dk.netarkivet.common.distribute.JMSConnection;
034import dk.netarkivet.common.exceptions.ArgumentNotValid;
035import dk.netarkivet.common.exceptions.IOFailure;
036import dk.netarkivet.common.utils.ExceptionUtils;
037import dk.netarkivet.common.utils.Settings;
038import dk.netarkivet.harvester.HarvesterSettings;
039import dk.netarkivet.harvester.datamodel.AliasInfo;
040import dk.netarkivet.harvester.datamodel.HarvestChannel;
041import dk.netarkivet.harvester.datamodel.HarvestDefinitionDAO;
042import dk.netarkivet.harvester.datamodel.HeritrixTemplate;
043import dk.netarkivet.harvester.datamodel.Job;
044import dk.netarkivet.harvester.datamodel.JobDAO;
045import dk.netarkivet.harvester.datamodel.JobStatus;
046import dk.netarkivet.harvester.datamodel.SparseFullHarvest;
047import dk.netarkivet.harvester.datamodel.SparsePartialHarvest;
048import dk.netarkivet.harvester.distribute.HarvesterChannels;
049import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage;
050import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
051import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
052
053/**
054 * This class handles dispatching of Harvest jobs to the Harvesters.
055 */
056public class JobDispatcher {
057
058    /** The logger to use. */
059    private static final Logger log = LoggerFactory.getLogger(JobDispatcher.class);
060
061    /** Connection to JMS provider. */
062    private final JMSConnection jmsConnection;
063
064    private final HarvestDefinitionDAO harvestDefinitionDAO;
065    private final JobDAO jobDao;
066
067    /**
068     * @param jmsConnection The JMS connection to use.
069     * @param hDao The HarvestDefinitionDAO to use.
070     */
071    public JobDispatcher(JMSConnection jmsConnection, HarvestDefinitionDAO hDao, JobDAO jobDao) {
072        log.info("Creating JobDispatcher");
073        ArgumentNotValid.checkNotNull(jmsConnection, "jmsConnection");
074        ArgumentNotValid.checkNotNull(hDao, "hDao");
075        ArgumentNotValid.checkNotNull(jobDao, "jobDao");
076        this.jmsConnection = jmsConnection;
077        this.harvestDefinitionDAO = hDao;
078        this.jobDao = jobDao;
079    }
080
081    /**
082     * Submit the next new job (the one with the lowest ID) with the given priority, and updates the internal counter as
083     * needed. If no jobs are ready for the given priority, nothing is done
084     *
085     * @param channel the Channel to use for the job.
086     */
087    protected void submitNextNewJob(HarvestChannel channel) {
088        Job jobToSubmit = prepareNextJobForSubmission(channel);
089        if (jobToSubmit == null) {
090            log.trace("No {} jobs to be run at this time", channel.getName());
091        } else {
092            log.debug("Submitting new {} job {}", channel.getName(), jobToSubmit.getJobID());
093            try {
094                List<MetadataEntry> metadata = createMetadata(jobToSubmit);
095
096                // Extract documentary information about the harvest
097                String hName = harvestDefinitionDAO.getHarvestName(jobToSubmit.getOrigHarvestDefinitionID());
098
099                String schedule = "";
100                String hdComments = "";
101                String hdAudience = "";
102                SparseFullHarvest fh = harvestDefinitionDAO.getSparseFullHarvest(hName);
103                if (fh != null) {
104                    hdComments = fh.getComments();
105                } else {
106                    SparsePartialHarvest ph = harvestDefinitionDAO.getSparsePartialHarvest(hName);
107
108                    if (ph == null) {
109                        throw new ArgumentNotValid("No harvest definition found for id '"
110                                + jobToSubmit.getOrigHarvestDefinitionID() + "', named '" + hName + "'");
111                    }
112
113                    // The schedule name can only be documented for
114                    // selective crawls.
115                    schedule = ph.getScheduleName();
116
117                    hdComments = ph.getComments();
118                    hdAudience = ph.getAudience();
119                }
120
121                doOneCrawl(jobToSubmit, hName, hdComments, schedule, channel, hdAudience, metadata);
122
123                log.info("Job #{} submitted", jobToSubmit.getJobID());
124            } catch (Throwable t) {
125                String message = "Error while dispatching job " + jobToSubmit.getJobID()
126                        + ". Job status changed to FAILED";
127                log.warn(message, t);
128                if (jobToSubmit != null) {
129                    jobToSubmit.setStatus(JobStatus.FAILED);
130                    jobToSubmit.appendHarvestErrors(message);
131                    jobToSubmit.appendHarvestErrorDetails(ExceptionUtils.getStackTrace(t));
132                    jobDao.update(jobToSubmit);
133                }
134            }
135        }
136    }
137
138    /**
139     * Will read the next job ready to run from the db and set the job to submitted. If no jobs are ready, null will be
140     * returned.
141     * <p>
142     * Note the operation is synchronized, so only one thread may start the submission of a job.
143     *
144     * @param channel the job channel.
145     * @return A job ready to be submitted.
146     */
147    private synchronized Job prepareNextJobForSubmission(HarvestChannel channel) {
148        Iterator<Long> jobsToSubmit = jobDao.getAllJobIds(JobStatus.NEW, channel);
149        if (!jobsToSubmit.hasNext()) {
150            return null;
151        } else {
152            final long jobID = jobsToSubmit.next();
153            Job jobToSubmit = jobDao.read(jobID);
154            jobToSubmit.setStatus(JobStatus.SUBMITTED);
155            jobToSubmit.setSubmittedDate(new Date());
156            jobDao.update(jobToSubmit);
157            return jobToSubmit;
158        }
159    }
160
161    /**
162     * Creates the metadata for the indicated job. This includes:
163     * <ol>
164     * <li>Alias metadata.
165     * <li>DuplicationReduction MetadataEntry, if Deduplication //is enabled.
166     *
167     * @param job The job to create meta data for.
168     */
169    private List<MetadataEntry> createMetadata(Job job) {
170        List<MetadataEntry> metadata = new ArrayList<MetadataEntry>();
171        List<AliasInfo> aliasInfos = jobDao.getJobAliasInfo(job);
172        MetadataEntry aliasMetadataEntry = MetadataEntry.makeAliasMetadataEntry(aliasInfos,
173                job.getOrigHarvestDefinitionID(), job.getHarvestNum(), job.getJobID());
174        if (aliasMetadataEntry != null) {
175            // Add an entry documenting that this job
176            // contains domains that has aliases
177            metadata.add(aliasMetadataEntry);
178        }
179
180        if (job.getOrderXMLdoc().IsDeduplicationEnabled()) {
181            MetadataEntry duplicateReductionMetadataEntry = MetadataEntry.makeDuplicateReductionMetadataEntry(
182                    jobDao.getJobIDsForDuplicateReduction(job.getJobID()), job.getOrigHarvestDefinitionID(),
183                    job.getHarvestNum(), job.getJobID());
184            // Always add a duplicationReductionMetadataEntry when deduplication is enabled
185            // even if the list of JobIDs for deduplication is empty!
186            metadata.add(duplicateReductionMetadataEntry);
187        }
188        return metadata;
189    }
190
191    /**
192     * Submit an doOneCrawl request to a HarvestControllerServer.
193     *
194     * @param job the specific job to send
195     * @param origHarvestName the harvest definition's name
196     * @param origHarvestDesc the harvest definition's description
197     * @param origHarvestSchedule the harvest definition schedule name
198     * @param channel the channel to which the job should be sent
199     * @param metadata pre-harvest metadata to store in (w)arcfile.
200     * @param origHarvestAudience the audience for the data generated by harvest definitions.
201     * @throws ArgumentNotValid one of the parameters are null
202     * @throws IOFailure if unable to send the doOneCrawl request to a harvestControllerServer
203     */
204    public void doOneCrawl(Job job, String origHarvestName, String origHarvestDesc, String origHarvestSchedule,
205            HarvestChannel channel, String origHarvestAudience, List<MetadataEntry> metadata) throws ArgumentNotValid,
206            IOFailure {
207        ArgumentNotValid.checkNotNull(job, "job");
208        ArgumentNotValid.checkNotNull(metadata, "metadata");
209
210        if (origHarvestAudience != null && !origHarvestAudience.isEmpty()) {
211            job.setHarvestAudience(origHarvestAudience);
212        }
213        if (usingWarcAsArchiveFormat()) {
214                log.info("As we're using WARC as archiveFormat WarcInfoMetadata is now added to the template");
215                HeritrixTemplate ht = job.getOrderXMLdoc();
216                ht.insertWarcInfoMetadata(job, origHarvestName, origHarvestSchedule, Settings.get(HarvesterSettings.PERFORMER));
217                job.setOrderXMLDoc(ht);
218        } else {
219                log.info("As we're using ARC as archiveFormat no WarcInfoMetadata was added to the template");
220        }
221        
222        DoOneCrawlMessage nMsg = new DoOneCrawlMessage(job, HarvesterChannels.getHarvestJobChannelId(channel),
223                new HarvestDefinitionInfo(origHarvestName, origHarvestDesc, origHarvestSchedule), metadata);
224        log.debug("Send crawl request: {}", nMsg);
225        jmsConnection.send(nMsg);
226    }
227    
228        private boolean usingWarcAsArchiveFormat() {
229                return Settings.get(HarvesterSettings.HERITRIX_ARCHIVE_FORMAT).equalsIgnoreCase("warc");
230        }
231
232}