001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.heritrix3;
024
025import java.io.File;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import dk.netarkivet.common.distribute.indexserver.Index;
035import dk.netarkivet.common.distribute.indexserver.IndexClientFactory;
036import dk.netarkivet.common.distribute.indexserver.JobIndexCache;
037import dk.netarkivet.common.exceptions.ArgumentNotValid;
038import dk.netarkivet.common.exceptions.IOFailure;
039import dk.netarkivet.common.exceptions.IllegalState;
040import dk.netarkivet.common.exceptions.PermissionDenied;
041import dk.netarkivet.common.utils.ExceptionUtils;
042import dk.netarkivet.common.utils.FileUtils;
043import dk.netarkivet.common.utils.Settings;
044import dk.netarkivet.common.utils.StringUtils;
045import dk.netarkivet.harvester.HarvesterSettings;
046import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
047import dk.netarkivet.harvester.datamodel.Job;
048import dk.netarkivet.harvester.harvesting.PersistentJobData;
049import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
050
051public class HarvestJob {
052
053    /** The instance logger. */
054    private static final Logger log = LoggerFactory.getLogger(HarvestJob.class);
055
056    private HarvestControllerServer hcs;
057
058    /** The harvester Job in this thread. */
059    private Job job;
060
061    /**
062     * Constructor.
063     * @param hcs a HarvestControllerServer instance
064     */
065        public HarvestJob(HarvestControllerServer hcs) {
066                this.hcs = hcs;
067        }
068
069    private File crawlDir;
070
071    private Heritrix3Files files;
072
073    private String jobName;
074   
075    /**
076     * Initialization of the harvestJob.
077     * @param job A job from the jobs table in the harvestdatabase
078     * @param origHarvestInfo metadata about the harvest
079     * @param metadataEntries entries for the metadata file for the harvest
080     */
081    public void init(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) {
082        this.job = job;
083        jobName = job.getJobID() + "_" + System.currentTimeMillis();
084        crawlDir = createCrawlDir();
085        files = writeHarvestFiles(crawlDir, job, origHarvestInfo, metadataEntries);
086        }
087    /**
088     * @return the Heritrix3Files object initialized with the init() method.
089     */
090    public Heritrix3Files getHeritrix3Files() {
091        return files;
092    }
093
094    /**
095     * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written.
096     *
097     * @throws ArgumentNotValid if an argument isn't valid.
098     */
099    public void runHarvest() throws ArgumentNotValid {
100        log.info("Starting crawl of job : {}", job.getJobID());
101        HeritrixLauncherAbstract hl = HeritrixLauncherFactory.getInstance(files, jobName);
102        hl.doCrawl();
103    }
104
105    /**
106     * Create the crawl dir, but make sure a message is sent if there is a problem.
107     *
108     * @return The directory that the crawl will take place in.
109     * @throws PermissionDenied if the directory cannot be created.
110     */
111    public File createCrawlDir() {
112        // The directory where arcfiles are stored (crawldir in the above
113        // description)
114        File crawlDir = null;
115        // Create the crawldir. This is done here in order to be able
116        // to send a proper message if something goes wrong.
117        try {
118            File baseCrawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
119            crawlDir = new File(baseCrawlDir, jobName);
120            FileUtils.createDir(crawlDir);
121            log.info("Created crawl directory: '{}'", crawlDir);
122            return crawlDir;
123        } catch (PermissionDenied e) {
124            String message = "Couldn't create the directory for job " + job.getJobID();
125            log.warn(message, e);
126            hcs.sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e));
127            throw e;
128        }
129    }
130
131    /**
132     * Writes the files needed to start a harvest.. 
133     * 
134     * @param crawldir The directory that the crawl should take place in.
135     * @param job The Job object containing various harvest setup data.
136     * @param hdi The object encapsulating documentary information about the harvest.
137     * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use.
138     * @return An object encapsulating where these files have been written.
139     */
140    public Heritrix3Files writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi,
141            List<MetadataEntry> metadataEntries) {
142        
143        final Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawldir, job);
144
145        // If this job is a job that tries to continue a previous job
146        // using the Heritrix recover.gz log, and this feature is enabled,
147        // then try to fetch the recover.log from the metadata-arc-file.
148        if (job.getContinuationOf() != null && Settings.getBoolean(HarvesterSettings.RECOVERlOG_CONTINUATION_ENABLED)) {
149                log.warn("Continuation of crawl from a RecoverLog is not implemented for Heritrix3!");
150        }
151        
152        // Create harvestInfo file in crawldir
153        // & create preharvest-metadata-1.arc
154        log.debug("Writing persistent job data for job {} to crawldir '{}'", job.getJobID(), crawldir);
155        if (!PersistentJobData.existsIn(crawldir)) {
156            // Write job data to persistent storage (harvestinfo file)
157            new PersistentJobData(crawldir).write(job, hdi);
158        } else {
159            throw new IllegalState("We already found a harvestInfo.xml for the crawldir " + crawldir.getAbsolutePath());
160        }
161        
162        // Create jobId-preharvest-metadata-1.arc for this job
163        writePreharvestMetadata(job, metadataEntries, crawldir);
164
165        files.writeSeedsTxt(job.getSeedListAsString());
166
167        files.writeOrderXml(job.getOrderXMLdoc());
168        // Only retrieve index if deduplication is not disabled in the template.
169        if (job.getOrderXMLdoc().IsDeduplicationEnabled()) {
170            log.debug("Deduplication enabled. Fetching deduplication index..");
171            files.setIndexDir(fetchDeduplicateIndex(metadataEntries));
172        } else {
173            log.debug("Deduplication disabled.");
174        }
175
176        return files;
177    }
178
179    /**
180     * Writes pre-harvest metadata to the "metadata" directory.
181     *
182     * @param harvestJob a given Job.
183     * @param metadata the list of metadata entries to write to metadata file.
184     * @param crawlDir the directory, where the metadata will be written.
185     * @throws IOFailure If there are errors in writing the metadata.
186     */
187    private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure {
188        if (metadata.size() == 0) {
189            // Do not generate preharvest metadata file for empty list
190            return;
191        }
192
193        // make sure that metadata directory exists
194        File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR);
195        metadataDir.mkdir();
196        if (!(metadataDir.exists() && metadataDir.isDirectory())) {
197            throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID()
198                    + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist.");
199        }
200
201        // Serializing the MetadataEntry objects to the metadataDir
202        MetadataEntry.storeMetadataToDisk(metadata, metadataDir);
203    }
204
205    /**
206     * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs.
207     * The files will then be cached locally.
208     * <p>
209     * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and
210     * next time we ask for the same index, we will call the index server again. This will be handled well, though,
211     * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway.
212     *
213     * @param metadataEntries list of metadataEntries top get jobIDs from.
214     * @return a directory containing the index itself.
215     * @throws IOFailure on errors retrieving the index from the client. 
216     * FIXME Better forgiving handling of no index available. Add setting for disable deduplication if no index available
217     */
218    private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) {
219        // Get list of jobs, which should be used for duplicate reduction
220        // and retrieve a luceneIndex from the IndexServer
221        // based on the crawl.logs from these jobs and their CDX'es.
222        Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries));
223
224        // The client for requesting job index.
225        JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance();
226
227        // Request the index and return the index file.
228        Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction);
229        // Check which jobs didn't become part of the index.
230        Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction);
231        diffSet.removeAll(jobIndex.getIndexSet());
232        if (log.isDebugEnabled()) {
233            log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(),
234                    ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : ""));
235        }
236
237        return jobIndex.getIndexFile();
238    }
239
240    /**
241     * Retrieve the list of jobs for deduplicate reduction.
242     * <p>
243     * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning
244     * only on errors.
245     *
246     * @param metadataEntries list of metadataEntries.
247     * @return the list of jobs for deduplicate reduction.
248     */
249    private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) {
250        // find metadataEntry for duplicatereduction if any.
251        List<Long> result = new ArrayList<Long>();
252        for (MetadataEntry me : metadataEntries) {
253            if (me.isDuplicateReductionMetadataEntry()) {
254                String s = new String(me.getData());
255                if (s.isEmpty()) { // An empty string is now possible
256                    continue;
257                }
258                String[] longs = s.split(",");
259                for (String stringLong : longs) {
260                    try {
261                        result.add(Long.parseLong(stringLong));
262                    } catch (NumberFormatException e) {
263                        log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'"
264                                + " to a jobID. Ignoring.", stringLong, s, e);
265                    }
266                }
267            }
268        }
269        return result;
270    }
271
272}