001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.harvesting.distribute;
024
025import java.io.File;
026import java.util.ArrayList;
027import java.util.List;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import dk.netarkivet.common.CommonSettings;
033import dk.netarkivet.common.Constants;
034import dk.netarkivet.common.distribute.ChannelID;
035import dk.netarkivet.common.distribute.JMSConnection;
036import dk.netarkivet.common.distribute.JMSConnectionFactory;
037import dk.netarkivet.common.exceptions.ArgumentNotValid;
038import dk.netarkivet.common.exceptions.IOFailure;
039import dk.netarkivet.common.exceptions.PermissionDenied;
040import dk.netarkivet.common.exceptions.UnknownID;
041import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor;
042import dk.netarkivet.common.utils.ApplicationUtils;
043import dk.netarkivet.common.utils.CleanupIF;
044import dk.netarkivet.common.utils.DomainUtils;
045import dk.netarkivet.common.utils.ExceptionUtils;
046import dk.netarkivet.common.utils.FileUtils;
047import dk.netarkivet.common.utils.NotificationType;
048import dk.netarkivet.common.utils.NotificationsFactory;
049import dk.netarkivet.common.utils.Settings;
050import dk.netarkivet.common.utils.SystemUtils;
051import dk.netarkivet.common.utils.TimeUtils;
052import dk.netarkivet.harvester.HarvesterSettings;
053import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
054import dk.netarkivet.harvester.datamodel.Job;
055import dk.netarkivet.harvester.datamodel.JobStatus;
056import dk.netarkivet.harvester.distribute.HarvesterChannels;
057import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
058import dk.netarkivet.harvester.harvesting.DomainnameQueueAssignmentPolicy;
059import dk.netarkivet.harvester.harvesting.HarvestController;
060import dk.netarkivet.harvester.harvesting.HeritrixFiles;
061import dk.netarkivet.harvester.harvesting.PersistentJobData;
062import dk.netarkivet.harvester.harvesting.SeedUriDomainnameQueueAssignmentPolicy;
063import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
064import dk.netarkivet.harvester.harvesting.report.HarvestReport;
065
066/**
067 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and launches a Heritrix crawl with the
068 * received job description. The generated ARC files are uploaded to the bitarchives once a harvest job has been
069 * completed.
070 * <p>
071 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When starting the actual
072 * harvesting a message is sent with status 'STARTED'. When the harvesting has finished a message is sent with either
073 * status 'DONE' or 'FAILED'. Either a 'DONE' or 'FAILED' message with result should ALWAYS be sent if at all possible,
074 * but only ever one such message per job.
075 * <p>
076 * It is necessary to be able to run the Heritrix harvester on several machines and several processes on each machine.
077 * Each instance of Heritrix is started and monitored by a HarvestControllerServer.
078 * <p>
079 * Initially, all directories under serverdir are scanned for harvestinfo files. If any are found, they are parsed for
080 * information, and all remaining files are attempted uploaded to the bitarchive. It will then send back a
081 * crawlstatusmessage with status failed.
082 * <p>
083 * A new thread is started for each actual crawl, in which the JMS listener is removed. Threading is required since JMS
084 * will not let the called thread remove the listener that's being handled.
085 * <p>
086 * After a harvestjob has been terminated, either successfully or unsuccessfully, the serverdir is again scanned for
087 * harvestInfo files to attempt upload of files not yet uploaded. Then it begins to listen again after new jobs, if
088 * there is enough room available on the machine. If not, it logs a warning about this, which is also sent as a
089 * notification.
090 */
091public class HarvestControllerServer extends HarvesterMessageHandler implements CleanupIF {
092
093    /** The logger to use. */
094    private static final Logger log = LoggerFactory.getLogger(HarvestControllerServer.class);
095
096    /** The unique instance of this class. */
097    private static HarvestControllerServer instance;
098
099    /**
100     * The configured application instance id.
101     *
102     * @see CommonSettings#APPLICATION_INSTANCE_ID
103     */
104    private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID);
105
106    /**
107     * The name of the server this <code>HarvestControllerServer</code> is running on.
108     */
109    private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName());
110
111    /** The message to write to log when starting the server. */
112    private static final String STARTING_MESSAGE = "Starting HarvestControllerServer.";
113    /** The message to write to log when server is started. */
114    private static final String STARTED_MESSAGE = "HarvestControllerServer started.";
115    /** The message to write to log when stopping the server. */
116    private static final String CLOSING_MESSAGE = "Closing HarvestControllerServer.";
117    /** The message to write to log when server is stopped. */
118    private static final String CLOSED_MESSAGE = "Closed down HarvestControllerServer";
119    /** The message to write to log when starting a crawl. */
120    private static final String STARTCRAWL_MESSAGE = "Starting crawl of job :";
121    /** The message to write to log after ending a crawl. */
122    private static final String ENDCRAWL_MESSAGE = "Ending crawl of job :";
123    /** The max time to wait for the hosts-report.txt to be available (in secs). */
124    static final int WAIT_FOR_HOSTS_REPORT_TIMEOUT_SECS = 30;
125    /** Heritrix version property. */
126    private static final String HERITRIX_VERSION_PROPERTY = "heritrix.version";
127    /** queue-assignment-policy property. */
128    private static final String HERITRIX_QUEUE_ASSIGNMENT_POLICY_PROPERTY = "org.archive.crawler.frontier.AbstractFrontier.queue-assignment-policy";
129
130    /** The CHANNEL of jobs processed by this instance. */
131    private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL);
132
133    /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */
134    public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels
135            .getHarvesterRegistrationResponseChannel();
136
137    /** The JMSConnection to use. */
138    private JMSConnection jmsConnection;
139
140    /** The (singleton) HarvestController that handles the non-JMS parts of a harvest. */
141    private final HarvestController controller;
142
143    /** Jobs are fetched from this queue. */
144    private ChannelID jobChannel;
145
146    /** Min. space required to start a job. */
147    private final long minSpaceRequired;
148
149    /** the serverdir, where the harvesting takes place. */
150    private final File serverDir;
151
152    /**
153     * This is true while a doOneCrawl is running. No jobs are accepted while this is running.
154     */
155    private CrawlStatus status;
156
157    /**
158     * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from
159     * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In
160     * that case, it sends a notification to the administrator and pauses.
161     *
162     * @throws PermissionDenied If the serverdir or oldjobsdir can't be created.
163     * @throws IOFailure If harvestInfoFile contains invalid data.
164     * @throws UnknownID if the settings file does not specify a valid queue priority.
165     */
166    private HarvestControllerServer() throws IOFailure {
167        log.info(STARTING_MESSAGE);
168
169        log.info("Bound to harvest channel '{}'", CHANNEL);
170
171        // Make sure serverdir (where active crawl-dirs live) and oldJobsDir
172        // (where old crawl dirs are stored) exist.
173        serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
174        ApplicationUtils.dirMustExist(serverDir);
175        log.info("Serverdir: '{}'", serverDir);
176        minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE);
177        if (minSpaceRequired <= 0L) {
178            log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired);
179            throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired);
180        }
181        log.info("Harvesting requires at least {} bytes free.", minSpaceRequired);
182
183        controller = HarvestController.getInstance();
184
185        // Set properties "heritrix.version" and
186        // "org.archive.crawler.frontier.AbstractFrontier
187        // .queue-assignment-policy"
188        System.setProperty(HERITRIX_VERSION_PROPERTY, Constants.getHeritrixVersionString());
189        System.setProperty(HERITRIX_QUEUE_ASSIGNMENT_POLICY_PROPERTY,
190                "org.archive.crawler.frontier.HostnameQueueAssignmentPolicy,"
191                        + "org.archive.crawler.frontier.IPQueueAssignmentPolicy,"
192                        + "org.archive.crawler.frontier.BucketQueueAssignmentPolicy," + "org.archive.crawler.frontier"
193                        + ".SurtAuthorityQueueAssignmentPolicy," + DomainnameQueueAssignmentPolicy.class.getName()
194                        + "," + SeedUriDomainnameQueueAssignmentPolicy.class.getName());
195
196        // Get JMS-connection
197        // Channel THIS_CLIENT is only used for replies to store messages so
198        // do not set as listener here. It is registered in the arcrepository
199        // client.
200        // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and
201        // registered below.
202
203        jmsConnection = JMSConnectionFactory.getInstance();
204        log.debug("Obtained JMS connection.");
205
206        status = new CrawlStatus();
207
208        // If any unprocessed jobs are left on the server, process them now
209        processOldJobs();
210
211        // Register for listening to harvest channel validity responses
212        JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this);
213
214        // Ask if the channel this harvester is assigned to is valid
215        jmsConnection.send(new HarvesterRegistrationRequest(HarvestControllerServer.CHANNEL, applicationInstanceId));
216        log.info("Requested to check the validity of harvest channel '{}'", HarvestControllerServer.CHANNEL);
217    }
218
219    /**
220     * Returns or creates the unique instance of this singleton The server creates an instance of the HarvestController,
221     * uploads arc-files from unfinished harvests, and starts to listen to JMS messages on the incoming jms queues.
222     *
223     * @return The instance
224     * @throws PermissionDenied If the serverdir or oldjobsdir can't be created
225     * @throws IOFailure if data from old harvests exist, but contain illegal data
226     */
227    public static synchronized HarvestControllerServer getInstance() throws IOFailure {
228        if (instance == null) {
229            instance = new HarvestControllerServer();
230        }
231        return instance;
232    }
233
234    /**
235     * Release all jms connections. Close the Controller
236     */
237    public synchronized void close() {
238        log.info(CLOSING_MESSAGE);
239        cleanup();
240        log.info(CLOSED_MESSAGE);
241    }
242
243    /**
244     * Will be called on shutdown.
245     *
246     * @see CleanupIF#cleanup()
247     */
248    public void cleanup() {
249        if (controller != null) {
250            controller.cleanup();
251        }
252        if (jmsConnection != null) {
253            jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this);
254            if (jobChannel != null) {
255                jmsConnection.removeListener(jobChannel, this);
256            }
257        }
258
259        // Stop the sending of status messages
260        status.stopSending();
261
262        instance = null;
263    }
264
265    /**
266     * Looks for old job directories that await uploading.
267     */
268    private void processOldJobs() {
269        // Search through all crawldirs and process PersistentJobData
270        // files in them
271        File crawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
272        File[] subdirs = crawlDir.listFiles();
273        for (File oldCrawlDir : subdirs) {
274            if (PersistentJobData.existsIn(oldCrawlDir)) {
275                // Assume that crawl had not ended at this point so
276                // job must be marked as failed
277                final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath()
278                        + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data.";
279                log.warn(msg);
280                NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
281                processHarvestInfoFile(oldCrawlDir, new IOFailure("Crawl probably interrupted by "
282                        + "shutdown of HarvestController"));
283            }
284        }
285    }
286
287    /**
288     * Checks that we're available to do a crawl, and if so, marks us as unavailable, checks that the job message is
289     * well-formed, and starts the thread that the crawl happens in. If an error occurs starting the crawl, we will
290     * start listening for messages again.
291     * <p>
292     * The sequence of actions involved in a crawl are:</br> 1. If we are already running, resend the job to the queue
293     * and return</br> 2. Check the job for validity</br> 3. Send a CrawlStatus message that crawl has STARTED</br> In a
294     * separate thread:</br> 4. Unregister this HACO as listener</br> 5. Create a new crawldir (based on the JobID and a
295     * timestamp)</br> 6. Write a harvestInfoFile (using JobID and crawldir) and metadata</br> 7. Instantiate a new
296     * HeritrixLauncher</br> 8. Start a crawl</br> 9. Store the generated arc-files and metadata in the known
297     * bit-archives </br>10. _Always_ send CrawlStatus DONE or FAILED</br> 11. Move crawldir into oldJobs dir</br>
298     *
299     * @param msg The crawl job
300     * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo
301     * @throws UnknownID if jobID is null in the message
302     * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED
303     * @throws PermissionDenied
304     * @see #visit(DoOneCrawlMessage) for more details
305     */
306    private void onDoOneCrawl(final DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid,
307            PermissionDenied {
308        // Only one doOneCrawl at a time. Returning should almost never happen,
309        // since we deregister the listener, but we may receive another message
310        // before the listener is removed. Also, if the job message is
311        // malformed or starting the crawl fails, we re-add the listener.
312        synchronized (this) {
313            if (status.isRunning()) {
314                log.warn(
315                        "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'",
316                        msg);
317                jmsConnection.resend(msg, jobChannel);
318                try {
319                    // Wait a second before listening again, so the message has
320                    // a chance of getting snatched by another harvester.
321                    Thread.sleep(TimeUtils.SECOND_IN_MILLIS);
322                } catch (InterruptedException e) {
323                    // Since we're not waiting for anything critical, we can
324                    // ignore this exception.
325                }
326                return;
327            }
328            stopAcceptingJobs();
329        }
330
331        Thread t = null;
332
333        // This 'try' matches a finally that restores running=false if we don't
334        // start a crawl after all
335        try {
336            final Job job = msg.getJob();
337
338            // Every job must have an ID or we can do nothing with it, not even
339            // send a proper failure message back.
340            Long jobID = job.getJobID();
341            if (jobID == null) {
342                log.warn("DoOneCrawlMessage arrived without JobID: '{}'", msg.toString());
343                throw new UnknownID("DoOneCrawlMessage arrived without JobID");
344            }
345
346            log.info("Received crawlrequest for job {}: '{}'", jobID, msg);
347
348            // Send message to scheduler that job is started
349            CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED);
350            jmsConnection.send(csmStarted);
351
352            // Jobs should arrive with status "submitted". If this is not the
353            // case, log the error and send a job-failed message back.
354            // HarvestScheduler likes getting a STARTED message before
355            // FAILED, so we oblige it here.
356            if (job.getStatus() != JobStatus.SUBMITTED) {
357                String message = "Message '" + msg.toString() + "' arrived with" + " status " + job.getStatus()
358                        + " for job " + jobID + ", should have been STATUS_SUBMITTED";
359                log.warn(message);
360                sendErrorMessage(jobID, message, message);
361                throw new ArgumentNotValid(message);
362            }
363
364            final List<MetadataEntry> metadataEntries = msg.getMetadata();
365
366            Thread t1;
367            // Create thread in which harvesting will occur
368            t1 = new HarvesterThread(job, msg.getOrigHarvestInfo(), metadataEntries);
369            // start thread which will remove this listener, harvest, store, and
370            // exit the VM
371            t1.start();
372            log.info("Started harvester thread for job {}", jobID);
373            // We delay assigning the thread variable until start() has
374            // succeeded. Thus, if start() fails, we will resume accepting
375            // jobs.
376            t = t1;
377        } finally {
378            // If we didn't start a thread for crawling after all, accept more
379            // messages
380            if (t == null) {
381                startAcceptingJobs();
382            }
383        }
384        // Now return from this method letting the thread do the work.
385        // This is important as it allows us to receive upload-replies from
386        // THIS_CLIENT in the crawl thread.
387    }
388
389    /**
390     * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message.
391     *
392     * @param jobID ID of the job that failed
393     * @param message A short message indicating what went wrong
394     * @param detailedMessage A more detailed message detailing why it went wrong.
395     */
396    private void sendErrorMessage(long jobID, String message, String detailedMessage) {
397        CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null);
398        csm.setHarvestErrors(message);
399        csm.setHarvestErrorDetails(detailedMessage);
400        jmsConnection.send(csm);
401    }
402
403    /**
404     * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit
405     * further down, we will stop listening altogether, but that requires another thread.
406     */
407    private synchronized void stopAcceptingJobs() {
408        status.setRunning(true);
409        log.debug("No longer accepting jobs.");
410    }
411
412    /**
413     * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets
414     * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we
415     * stop resending messages we get.
416     */
417    private synchronized void startAcceptingJobs() {
418        // allow this haco to receive messages
419        status.setRunning(false);
420    }
421
422    /**
423     * Stop listening for new crawl requests.
424     */
425    private void removeListener() {
426        log.debug("Removing listener on CHANNEL '{}'", jobChannel);
427        jmsConnection.removeListener(jobChannel, this);
428    }
429
430    /** Start listening for crawls, if space available. */
431    private void beginListeningIfSpaceAvailable() {
432        long availableSpace = FileUtils.getBytesFree(serverDir);
433        if (availableSpace > minSpaceRequired) {
434            log.info("Starts to listen to new jobs on queue '{}'", jobChannel);
435            jmsConnection.setListener(jobChannel, this);
436            log.info(STARTED_MESSAGE);
437        } else {
438            String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available."
439                    + " Harvester is paused.";
440            log.error(pausedMessage);
441            NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR);
442        }
443    }
444
445    /**
446     * Adds error messages from an exception to the status message errors.
447     *
448     * @param csm The message we're setting messages on.
449     * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix.
450     * @param errorMessage Description of errors that happened during upload.
451     * @param missingHostsReport If true, no hosts report was found.
452     * @param failedFiles List of files that failed to upload.
453     */
454    private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage,
455            boolean missingHostsReport, int failedFiles) {
456        if (crawlException != null) {
457            csm.setHarvestErrors(crawlException.toString());
458            csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException));
459        }
460        if (errorMessage.length() > 0) {
461            String shortDesc = "";
462            if (missingHostsReport) {
463                shortDesc = "No hosts report found";
464            }
465            if (failedFiles > 0) {
466                if (shortDesc.length() > 0) {
467                    shortDesc += ", ";
468                }
469                shortDesc += failedFiles + " files failed to upload";
470            }
471            csm.setUploadErrors(shortDesc);
472            csm.setUploadErrorDetails(errorMessage);
473        }
474    }
475
476    /**
477     * Receives a DoOneCrawlMessage and call onDoOneCrawl.
478     *
479     * @param msg the message received
480     * @throws IOFailure if the crawl fails if unable to write to harvestInfoFile
481     * @throws UnknownID if jobID is null in the message
482     * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED
483     * @throws PermissionDenied if the crawldir can't be created
484     */
485    public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied {
486        onDoOneCrawl(msg);
487    }
488
489    @Override
490    public void visit(HarvesterRegistrationResponse msg) {
491
492        // If we have already started or the message notifies for another channel,
493        // resend it.
494        String channelName = msg.getHarvestChannelName();
495        if (status.isChannelValid() || !CHANNEL.equals(channelName)) {
496            // Controller has already started
497            jmsConnection.resend(msg, msg.getTo());
498            if (log.isDebugEnabled()) {
499                log.debug("Resending harvest channel validity message for channel '{}'", channelName);
500            }
501            return;
502        }
503
504        if (!msg.isValid()) {
505            log.error("Received message stating that channel '{}' is invalid. Will stop. "
506                        + "Probable cause: the channel is not one of the known channels stored in the channels table", channelName);
507            close();
508            return;
509        }
510
511        log.info("Received message stating that channel '{}' is valid.", channelName);
512        // Environment and connections are now ready for processing of messages
513        jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot());
514
515        // Only listen for harvester jobs if enough available space
516        beginListeningIfSpaceAvailable();
517
518        // Notify the harvest dispatcher that we are ready
519        startAcceptingJobs();
520        status.startSending();
521    }
522
523    /**
524     * Processes an existing harvestInfoFile:</br>
525     * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br>
526     * 2. finds JobId and arcsdir</br> 3. calls storeArcFiles</br> 4. moves harvestdir to oldjobs
527     *    and deletes crawl.log and other superfluous files.
528     *
529     * @param crawlDir The location of harvest-info to be processed
530     * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be
531     * null for success)
532     * @throws IOFailure if the file cannot be read
533     */
534    private void processHarvestInfoFile(File crawlDir, Throwable crawlException) throws IOFailure {
535        log.debug("Post-processing files in '{}'", crawlDir.getAbsolutePath());
536        if (!PersistentJobData.existsIn(crawlDir)) {
537            throw new IOFailure("No harvestInfo found in directory: " + crawlDir.getAbsolutePath());
538        }
539
540        PersistentJobData harvestInfo = new PersistentJobData(crawlDir);
541        Long jobID = harvestInfo.getJobID();
542
543        StringBuilder errorMessage = new StringBuilder();
544        HarvestReport dhr = null;
545        List<File> failedFiles = new ArrayList<File>();
546        HeritrixFiles files = HeritrixFiles.getH1HeritrixFilesWithDefaultJmxFiles(crawlDir, harvestInfo);
547       
548        try {
549            log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID);
550            dhr = controller.storeFiles(files, errorMessage, failedFiles);
551        } catch (Exception e) {
552            String msg = "Trouble during postprocessing of files in '" + crawlDir.getAbsolutePath() + "'";
553            log.warn(msg, e);
554            errorMessage.append(e.getMessage()).append("\n");
555            // send a mail about this problem
556            NotificationsFactory.getInstance().notify(
557                    msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(),
558                    NotificationType.ERROR, e);
559        } finally {
560            // Send a done or failed message back to harvest scheduler
561            // FindBugs claims a load of known null value here, but that
562            // will not be the case if storeFiles() succeeds.
563            CrawlStatusMessage csm;
564
565            if (crawlException == null && errorMessage.length() == 0) {
566                log.info("Job with ID {} finished with status DONE", jobID);
567                csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr);
568            } else {
569                log.warn("Job with ID {} finished with status FAILED", jobID);
570                csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr);
571                setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size());
572            }
573            try {
574                jmsConnection.send(csm);
575                if (crawlException == null && errorMessage.length() == 0) {
576                    files.deleteFinalLogs();
577                }
578            } finally {
579                // Delete superfluous files and move the rest to oldjobs
580                // Cleanup is in an extra finally, because it is large amounts
581                // of data we need to remove, even on send trouble.
582                log.info("Cleanup after harvesting job with id: {}.", jobID);
583                files.cleanUpAfterHarvest(new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)));
584            }
585        }
586        log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath());
587    }
588
589    /**
590     * A Thread class for the actual harvesting. This is required in order to stop listening while we're busy
591     * harvesting, since JMS doesn't allow the called thread to remove the listener that was called.
592     */
593    private class HarvesterThread extends Thread {
594
595        /** The harvester Job in this thread. */
596        private final Job job;
597
598        /** Stores documentary information about the harvest. */
599        private final HarvestDefinitionInfo origHarvestInfo;
600
601        /** The list of metadata associated with this Job. */
602        private final List<MetadataEntry> metadataEntries;
603
604        /**
605         * Constructor for the HarvesterThread class.
606         *
607         * @param job a harvesting job
608         * @param origHarvestInfo Info about the harvestdefinition that scheduled this job
609         * @param metadataEntries metadata associated with the given job
610         */
611        public HarvesterThread(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) {
612            this.job = job;
613            this.origHarvestInfo = origHarvestInfo;
614            this.metadataEntries = metadataEntries;
615        }
616
617        /**
618         * The thread body for the harvester thread. Removes the JMS listener, sets up the files for Heritrix, then
619         * passes control to the HarvestController to perform the actual harvest.
620         * <p>
621         * TODO Get file writing into HarvestController as well (requires some rearrangement of the message sending)
622         *
623         * @throws PermissionDenied if we cannot create the crawl directory.
624         * @throws IOFailure if there are problems preparing or running the crawl.
625         */
626        public void run() {
627            try {
628                // We must remove the listener inside a thread,
629                // as JMS doesn't allow us to remove it within the
630                // call it made.
631                removeListener();
632
633                File crawlDir = createCrawlDir();
634
635                final HeritrixFiles files = controller.writeHarvestFiles(crawlDir, job, origHarvestInfo,
636                        metadataEntries);
637
638                log.info(STARTCRAWL_MESSAGE + " {}", job.getJobID());
639
640                Throwable crawlException = null;
641                try {
642                    controller.runHarvest(files);
643                } catch (Throwable e) {
644                    String msg = "Error during crawling. The crawl may have been only partially completed.";
645                    log.warn(msg, e);
646                    crawlException = e;
647                    throw new IOFailure(msg, e);
648                } finally {
649                    // This handles some message sending, so it must live
650                    // in HCS for now, but the meat of it should be in
651                    // HarvestController
652                    // TODO Refactor to be able to move this out.
653                    // TODO This may overwrite another exception, since this
654                    // may throw exceptions.
655                    processHarvestInfoFile(files.getCrawlDir(), crawlException);
656                }
657            } catch (Throwable t) {
658                String msg = "Fatal error while operating job '" + job + "'";
659                log.error(msg, t);
660                NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR, t);
661            } finally {
662                log.info(ENDCRAWL_MESSAGE + " {}", job.getJobID());
663                // process serverdir for files not yet uploaded.
664                processOldJobs();
665                shutdownNowOrContinue();
666                startAcceptingJobs();
667                beginListeningIfSpaceAvailable();
668            }
669        }
670
671        /**
672         * Does the operator want us to shutdown now. TODO In a later implementation, the harvestControllerServer could
673         * be notified over JMX. Now we just look for a "shutdown.txt" file in the HARVEST_CONTROLLER_SERVERDIR
674         */
675        private void shutdownNowOrContinue() {
676            File shutdownFile = new File(serverDir, "shutdown.txt");
677            if (shutdownFile.exists()) {
678                log.info("Found shutdown-file in serverdir - " + "shutting down the application");
679                instance.cleanup();
680                System.exit(0);
681            }
682        }
683
684        /**
685         * Create the crawl dir, but make sure a message is sent if there is a problem.
686         *
687         * @return The directory that the crawl will take place in.
688         * @throws PermissionDenied if the directory cannot be created.
689         */
690        private File createCrawlDir() {
691            // The directory where arcfiles are stored (crawldir in the above
692            // description)
693            File crawlDir = null;
694            // Create the crawldir. This is done here in order to be able
695            // to send a proper message if something goes wrong.
696            try {
697                File baseCrawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
698                crawlDir = new File(baseCrawlDir, job.getJobID() + "_" + System.currentTimeMillis());
699                FileUtils.createDir(crawlDir);
700                log.info("Created crawl directory: '{}'", crawlDir);
701                return crawlDir;
702            } catch (PermissionDenied e) {
703                String message = "Couldn't create the directory for job " + job.getJobID();
704                log.warn(message, e);
705                sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e));
706                throw e;
707            }
708        }
709    }
710
711    /**
712     * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying
713     * the HarvestJobManager of the status.
714     */
715    private class CrawlStatus {
716
717        /** The status. */
718        private boolean running = false;
719
720        private boolean channelIsValid = false;
721
722        /** Handles the periodic sending of status messages. */
723        private PeriodicTaskExecutor statusTransmitter;
724
725        private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY);
726
727        /**
728         * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>.
729         *
730         * @return Whether a crawl running.
731         */
732        public boolean isRunning() {
733            return running;
734        }
735
736        /**
737         * Use for changing the running state.
738         *
739         * @param running The new status
740         */
741        public synchronized void setRunning(boolean running) {
742            this.running = running;
743        }
744
745        /**
746         * @return the channelIsValid
747         */
748        protected final boolean isChannelValid() {
749            return channelIsValid;
750        }
751
752        /**
753         * Starts the sending of status messages.
754         */
755        public void startSending() {
756            this.channelIsValid = true;
757            statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", new Runnable() {
758                public void run() {
759                    sendStatus();
760                }
761            }, 0, Settings.getInt(HarvesterSettings.SEND_READY_INTERVAL));
762        }
763
764        /**
765         * Stops the sending of status messages.
766         */
767        public void stopSending() {
768            if (statusTransmitter != null) {
769                statusTransmitter.shutdown();
770                statusTransmitter = null;
771            }
772        }
773
774        /**
775         * Send a HarvesterReadyMessage to the HarvestJobManager.
776         */
777        private synchronized void sendStatus() {
778            try {
779                Thread.sleep(SEND_READY_DELAY);
780            } catch (Exception e) {
781                log.error("Unable to sleep", e);
782            }
783            if (!running) {
784                jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName,
785                        HarvestControllerServer.CHANNEL));
786            }
787        }
788    }
789
790}