001package dk.netarkivet.harvester.heritrix3;
002
003import java.io.File;
004import java.util.ArrayList;
005import java.util.HashSet;
006import java.util.List;
007import java.util.Set;
008
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012import dk.netarkivet.common.distribute.indexserver.Index;
013import dk.netarkivet.common.distribute.indexserver.IndexClientFactory;
014import dk.netarkivet.common.distribute.indexserver.JobIndexCache;
015import dk.netarkivet.common.exceptions.ArgumentNotValid;
016import dk.netarkivet.common.exceptions.IOFailure;
017import dk.netarkivet.common.exceptions.PermissionDenied;
018import dk.netarkivet.common.utils.ExceptionUtils;
019import dk.netarkivet.common.utils.FileUtils;
020import dk.netarkivet.common.utils.Settings;
021import dk.netarkivet.common.utils.StringUtils;
022import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
023import dk.netarkivet.harvester.datamodel.Job;
024import dk.netarkivet.harvester.harvesting.PersistentJobData;
025import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
026
027public class HarvestJob {
028
029    /** The instance logger. */
030    private static final Logger log = LoggerFactory.getLogger(HarvestJob.class);
031
032    private HarvestControllerServer hcs;
033
034    /** The harvester Job in this thread. */
035    private Job job;
036
037    /** Stores documentary information about the harvest. */
038    //private HarvestDefinitionInfo origHarvestInfo;
039
040    /** The list of metadata associated with this Job. */
041    //private List<MetadataEntry> metadataEntries;
042
043        public HarvestJob(HarvestControllerServer hcs) {
044                this.hcs = hcs;
045        }
046
047    private File crawlDir;
048
049    private Heritrix3Files files;
050
051    private String jobName;
052
053    public void init(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) {
054        this.job = job;
055        //this.origHarvestInfo = origHarvestInfo;
056        //this.metadataEntries = metadataEntries;
057        jobName = job.getJobID() + "_" + System.currentTimeMillis();
058        crawlDir = createCrawlDir();
059        files = writeHarvestFiles(crawlDir, job, origHarvestInfo, metadataEntries);
060        }
061
062    public Heritrix3Files getHeritrix3Files() {
063        return files;
064    }
065
066    /**
067     * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written.
068     *
069     * @throws ArgumentNotValid if an argument isn't valid.
070     */
071    public void runHarvest() throws ArgumentNotValid {
072        log.info("Starting crawl of job : {}", job.getJobID());
073        HeritrixLauncherAbstract hl = HeritrixLauncherFactory.getInstance(files, jobName);
074        hl.doCrawl();
075    }
076
077    /**
078     * Create the crawl dir, but make sure a message is sent if there is a problem.
079     *
080     * @return The directory that the crawl will take place in.
081     * @throws PermissionDenied if the directory cannot be created.
082     */
083    public File createCrawlDir() {
084        // The directory where arcfiles are stored (crawldir in the above
085        // description)
086        File crawlDir = null;
087        // Create the crawldir. This is done here in order to be able
088        // to send a proper message if something goes wrong.
089        try {
090            File baseCrawlDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_SERVERDIR));
091            crawlDir = new File(baseCrawlDir, jobName);
092            FileUtils.createDir(crawlDir);
093            log.info("Created crawl directory: '{}'", crawlDir);
094            return crawlDir;
095        } catch (PermissionDenied e) {
096            String message = "Couldn't create the directory for job " + job.getJobID();
097            log.warn(message, e);
098            hcs.sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e));
099            throw e;
100        }
101    }
102
103    /**
104     * Writes the files needed to start a harvest.. 
105     * 
106     * @param crawldir The directory that the crawl should take place in.
107     * @param job The Job object containing various harvest setup data.
108     * @param hdi The object encapsulating documentary information about the harvest.
109     * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use.
110     * @return An object encapsulating where these files have been written.
111     */
112    public Heritrix3Files writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi,
113            List<MetadataEntry> metadataEntries) {
114        
115        final Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawldir, job);
116
117        // If this job is a job that tries to continue a previous job
118        // using the Heritrix recover.gz log, and this feature is enabled,
119        // then try to fetch the recover.log from the metadata-arc-file.
120        if (job.getContinuationOf() != null && Settings.getBoolean(Heritrix3Settings.RECOVERlOG_CONTINUATION_ENABLED)) {
121            //tryToRetrieveRecoverLog(job, files);
122                log.warn("Continuation of from a RecoverLog is disabled for now!");
123        }
124        
125        // Create harvestInfo file in crawldir
126        // & create preharvest-metadata-1.arc
127        log.debug("Writing persistent job data for job {} to crawldir '{}'", job.getJobID(), crawldir);
128        // TODO Check that harvestInfo does not yet exist
129        
130        // Write job data to persistent storage (harvestinfo file)
131        new PersistentJobData(files.getCrawlDir()).write(job, hdi);
132        
133        // Create jobId-preharvest-metadata-1.arc for this job
134        writePreharvestMetadata(job, metadataEntries, crawldir);
135
136        files.writeSeedsTxt(job.getSeedListAsString());
137
138        files.writeOrderXml(job.getOrderXMLdoc());
139        // Only retrieve index if deduplication is not disabled in the template.
140        if (job.getOrderXMLdoc().IsDeduplicationEnabled()) {
141            log.debug("Deduplication enabled. Fetching deduplication index..");
142            files.setIndexDir(fetchDeduplicateIndex(metadataEntries));
143        } else {
144            log.debug("Deduplication disabled.");
145        }
146
147        return files;
148    }
149
150    /**
151     * Writes pre-harvest metadata to the "metadata" directory.
152     *
153     * @param harvestJob a given Job.
154     * @param metadata the list of metadata entries to write to metadata file.
155     * @param crawlDir the directory, where the metadata will be written.
156     * @throws IOFailure If there are errors in writing the metadata.
157     */
158    private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure {
159        if (metadata.size() == 0) {
160            // Do not generate preharvest metadata file for empty list
161            return;
162        }
163
164        // make sure that metadata directory exists
165        File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR);
166        metadataDir.mkdir();
167        if (!(metadataDir.exists() && metadataDir.isDirectory())) {
168            throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID()
169                    + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist.");
170        }
171
172        // Serializing the MetadataEntry objects to the metadataDir
173        MetadataEntry.storeMetadataToDisk(metadata, metadataDir);
174    }
175
176    /**
177     * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs.
178     * The files will then be cached locally.
179     * <p>
180     * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and
181     * next time we ask for the same index, we will call the index server again. This will be handled well, though,
182     * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway.
183     *
184     * @param metadataEntries list of metadataEntries top get jobIDs from.
185     * @return a directory containing the index itself.
186     * @throws IOFailure on errors retrieving the index from the client. FIXME Better forgiving handling of no index
187     * available Add setting for disable deduplication if no index available
188     */
189    private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) {
190        // Get list of jobs, which should be used for duplicate reduction
191        // and retrieve a luceneIndex from the IndexServer
192        // based on the crawl.logs from these jobs and their CDX'es.
193        Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries));
194
195        // The client for requesting job index.
196        JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance();
197
198        // Request the index and return the index file.
199        Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction);
200        // Check which jobs didn't become part of the index.
201        Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction);
202        diffSet.removeAll(jobIndex.getIndexSet());
203        if (log.isDebugEnabled()) {
204            log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(),
205                    ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : ""));
206        }
207
208        return jobIndex.getIndexFile();
209    }
210
211    /**
212     * Retrieve the list of jobs for deduplicate reduction.
213     * <p>
214     * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning
215     * only on errors.
216     *
217     * @param metadataEntries list of metadataEntries.
218     * @return the list of jobs for deduplicate reduction.
219     */
220    private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) {
221        // find metadataEntry for duplicatereduction if any.
222        List<Long> result = new ArrayList<Long>();
223        for (MetadataEntry me : metadataEntries) {
224            if (me.isDuplicateReductionMetadataEntry()) {
225                String s = new String(me.getData());
226                if (s.isEmpty()) { // An empty string is now possible
227                    continue;
228                }
229                String[] longs = s.split(",");
230                for (String stringLong : longs) {
231                    try {
232                        result.add(Long.parseLong(stringLong));
233                    } catch (NumberFormatException e) {
234                        log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'"
235                                + " to a jobID. Ignoring.", stringLong, s, e);
236                    }
237                }
238            }
239        }
240        return result;
241    }
242
243}