001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.heritrix3;
024
025import java.io.File;
026import java.util.List;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import dk.netarkivet.common.CommonSettings;
032import dk.netarkivet.common.distribute.ChannelID;
033import dk.netarkivet.common.distribute.JMSConnection;
034import dk.netarkivet.common.distribute.JMSConnectionFactory;
035import dk.netarkivet.common.exceptions.ArgumentNotValid;
036import dk.netarkivet.common.exceptions.IOFailure;
037import dk.netarkivet.common.exceptions.PermissionDenied;
038import dk.netarkivet.common.exceptions.UnknownID;
039import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor;
040import dk.netarkivet.common.utils.ApplicationUtils;
041import dk.netarkivet.common.utils.CleanupIF;
042import dk.netarkivet.common.utils.DomainUtils;
043import dk.netarkivet.common.utils.FileUtils;
044import dk.netarkivet.common.utils.NotificationType;
045import dk.netarkivet.common.utils.NotificationsFactory;
046import dk.netarkivet.common.utils.Settings;
047import dk.netarkivet.common.utils.SystemUtils;
048import dk.netarkivet.common.utils.TimeUtils;
049import dk.netarkivet.harvester.HarvesterSettings;
050import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo;
051import dk.netarkivet.harvester.datamodel.Job;
052import dk.netarkivet.harvester.datamodel.JobStatus;
053import dk.netarkivet.harvester.distribute.HarvesterChannels;
054import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
055import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage;
056import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage;
057import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage;
058import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest;
059import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse;
060import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry;
061
062/**
063 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and launches a Heritrix crawl with the
064 * received job description. The generated ARC files are uploaded to the bitarchives once a harvest job has been
065 * completed.
066 * 
067 * Initially, the HarvestControllerServer registers its channel with the Scheduler by sending a HarvesterRegistrationRequest and waits for a 
068 * positive HarvesterRegistrationResponse that its channel is recognized. 
069 * If not recognized by the Scheduler, the HarvestControllerServer will send a notification about this, 
070 * and then close down the application.
071 * <p>
072 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When starting the actual
073 * harvesting a message is sent with status 'STARTED'. When the harvesting has finished a message is sent with either
074 * status 'DONE' or 'FAILED'. Either a 'DONE' or 'FAILED' message with result should ALWAYS be sent if at all possible,
075 * but only ever one such message per job.
076 * While the harvestControllerServer is waiting for the harvesting to finish, it sends HarvesterReadyMessages to the scheduler.
077 * The interval between each HarvesterReadyMessage being sent is defined by the setting 'settings.harvester.harvesting.sendReadyDelay'.    
078 * 
079 * <p>
080 * It is necessary to be able to run the Heritrix harvester on several machines and several processes on each machine.
081 * Each instance of Heritrix is started and monitored by a HarvestControllerServer.
082 * <p>
083 * Initially, all directories under serverdir are scanned for harvestinfo files. If any are found, they are parsed for
084 * information, and all remaining files are attempted uploaded to the bitarchive. It will then send back a
085 * CrawlStatusMessage with status failed.
086 * <p>
087 * A new thread is started for each actual crawl, in which the JMS listener is removed. Threading is required since JMS
088 * will not let the called thread remove the listener that's being handled.
089 * <p>
090 * After a harvestjob has been terminated, either successfully or unsuccessfully, the serverdir is again scanned for
091 * harvestInfo files to attempt upload of files not yet uploaded. Then it begins to listen again after new jobs, if
092 * there is enough room available on the machine. If not, it logs a warning about this, which is also sent as a
093 * notification.
094 */
095public class HarvestControllerServer extends HarvesterMessageHandler implements CleanupIF {
096
097    /** The logger to use. */
098    private static final Logger log = LoggerFactory.getLogger(HarvestControllerServer.class);
099
100    /** The unique instance of this class. */
101    private static HarvestControllerServer instance;
102
103    /** The configured application instance id. @see CommonSettings#APPLICATION_INSTANCE_ID */
104    private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID);
105
106    /** The name of the server this <code>HarvestControllerServer</code> is running on. */
107    private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName());
108
109    /** Min. space required to start a job. */
110    private final long minSpaceRequired;
111
112    /** The JMSConnection to use. */
113    private JMSConnection jmsConnection;
114
115    /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */
116    public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels
117            .getHarvesterRegistrationResponseChannel();
118
119    private final PostProcessing postProcessing;
120
121    /** The CHANNEL of jobs processed by this instance. */
122    private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL);
123
124    /** Jobs are fetched from this queue. */
125    private ChannelID jobChannel;
126
127    /** the serverdir, where the harvesting takes place. */
128    private final File serverDir;
129
130    /** This is true while a doOneCrawl is running. No jobs are accepted while this is running. */
131    private CrawlStatus status;
132
133    /**
134     * Returns or creates the unique instance of this singleton The server creates an instance of the HarvestController,
135     * uploads arc-files from unfinished harvests, and starts to listen to JMS messages on the incoming jms queues.
136     * @return The instance
137     * @throws PermissionDenied If the serverdir or oldjobsdir can't be created
138     * @throws IOFailure if data from old harvests exist, but contain illegal data
139     */
140    public static synchronized HarvestControllerServer getInstance() throws IOFailure {
141        if (instance == null) {
142            instance = new HarvestControllerServer();
143        }
144        return instance;
145    }
146
147    /**
148     * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from
149     * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In
150     * that case, it sends a notification to the administrator and pauses.
151     * @throws PermissionDenied If the serverdir or oldjobsdir can't be created.
152     * @throws IOFailure If harvestInfoFile contains invalid data.
153     * @throws UnknownID if the settings file does not specify a valid queue priority.
154     */
155    private HarvestControllerServer() throws IOFailure {
156        log.info("Starting HarvestControllerServer.");
157        log.info("Bound to harvest channel '{}'", CHANNEL);
158
159        // Make sure serverdir (where active crawl-dirs live) and oldJobsDir
160        // (where old crawl dirs are stored) exist.
161        serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
162        ApplicationUtils.dirMustExist(serverDir);
163        log.info("Serverdir: '{}'", serverDir);
164        minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE);
165        if (minSpaceRequired <= 0L) {
166            log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired);
167            throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired);
168        }
169        log.info("Harvesting requires at least {} bytes free.", minSpaceRequired);
170        
171        // If shutdown.txt found in serverdir, just close down the HarvestControllerApplication at once.
172        shutdownNowOrContinue();
173
174        // Get JMS-connection
175        // Channel THIS_CLIENT is only used for replies to store messages so
176        // do not set as listener here. It is registered in the arcrepository
177        // client.
178        // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and
179        // registered below.
180
181        jmsConnection = JMSConnectionFactory.getInstance();
182        postProcessing = PostProcessing.getInstance(jmsConnection);
183        log.debug("Obtained JMS connection.");
184
185        status = new CrawlStatus();
186        log.info("SEND_READY_DELAY used by HarvestControllerServer is {}", status.getSendReadyDelay());
187
188        // If any unprocessed jobs are left on the server, process them now
189        postProcessing.processOldJobs();
190
191        // Register for listening to harvest channel validity responses
192        JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this);
193
194        // Ask if the channel this harvester is assigned to is valid
195        jmsConnection.send(new HarvesterRegistrationRequest(HarvestControllerServer.CHANNEL, applicationInstanceId));
196        log.info("Requested to check the validity of harvest channel '{}'", HarvestControllerServer.CHANNEL);
197    }
198
199    /**
200     * Release all jms connections. Close the Controller
201     */
202    public synchronized void close() {
203        log.info("Closing HarvestControllerServer.");
204        cleanup();
205        log.info("Closed down HarvestControllerServer");
206    }
207
208    /**
209     * Will be called on shutdown.
210     *
211     * @see CleanupIF#cleanup()
212     */
213    public void cleanup() {
214        if (jmsConnection != null) {
215            jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this);
216            if (jobChannel != null) {
217                jmsConnection.removeListener(jobChannel, this);
218            }
219        }
220        // Stop the sending of status messages
221        status.stopSending();
222        instance = null;
223    }
224
225    @Override
226    public void visit(HarvesterRegistrationResponse msg) {
227        // If we have already started or the message notifies for another channel, resend it.
228        String channelName = msg.getHarvestChannelName();
229        if (status.isChannelValid() || !CHANNEL.equals(channelName)) {
230            // Controller has already started
231            jmsConnection.resend(msg, msg.getTo());
232            if (log.isTraceEnabled()) {
233                log.trace("Resending harvest channel validity message for channel '{}'", channelName);
234            }
235            return;
236        }
237
238        if (!msg.isValid()) {
239                String errMsg = "Received message stating that channel '" +  channelName + "' is invalid. Will stop. "
240                        + "Probable cause: the channel is not one of the known channels stored in the channels table"; 
241            log.error(errMsg);
242            // Send a notification about this, ASAP
243            NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR);
244            close();
245            return;
246        }
247
248        log.info("Received message stating that channel '{}' is valid.", channelName);
249        // Environment and connections are now ready for processing of messages
250        jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot());
251
252        // Only listen for harvester jobs if enough available space
253        beginListeningIfSpaceAvailable();
254
255        // Notify the harvest dispatcher that we are ready
256        startAcceptingJobs();
257        status.startSending();
258    }
259
260    /** Start listening for crawls, if space available. */
261    private void beginListeningIfSpaceAvailable() {
262        long availableSpace = FileUtils.getBytesFree(serverDir);
263        if (availableSpace > minSpaceRequired) {
264            log.info("Starts to listen to new jobs on queue '{}'", jobChannel);
265            jmsConnection.setListener(jobChannel, this);
266        } else {
267            String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available."
268                    + " Harvester is paused.";
269            log.error(pausedMessage);
270            NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR);
271        }
272    }
273
274    /**
275     * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets
276     * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we
277     * stop resending messages we get.
278     */
279    private synchronized void startAcceptingJobs() {
280        // allow this harvestControllerServer to receive messages again
281        status.setRunning(false);
282    }
283
284    /**
285     * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit
286     * further down, we will stop listening altogether, but that requires another thread.
287     */
288    private synchronized void stopAcceptingJobs() {
289        status.setRunning(true);
290        log.debug("No longer accepting jobs.");
291    }
292
293    /**
294     * Stop listening for new crawl requests.
295     */
296    private void removeListener() {
297        log.debug("Removing listener on CHANNEL '{}'", jobChannel);
298        jmsConnection.removeListener(jobChannel, this);
299    }
300    
301    /**
302     * Does the operator want us to shutdown now.
303     * TODO In a later implementation, the harvestControllerServer could
304     * be notified over JMX. Now we just look for a "shutdown.txt" file in the HARVEST_CONTROLLER_SERVERDIR 
305     * log that we're shutting down, send a notification about this, and then shutdown.
306     */
307    private void shutdownNowOrContinue() {
308        File shutdownFile = new File(serverDir, "shutdown.txt");
309        
310        if (shutdownFile.exists()) {
311                String msg = "Found shutdown-file in serverdir '" +  serverDir.getAbsolutePath() + "'. Shutting down the application"; 
312            log.info(msg);
313            NotificationsFactory.getInstance().notify(msg, NotificationType.INFO);
314            instance.cleanup();
315            System.exit(0);
316        }
317    }
318
319    /**
320     * Checks that we're available to do a crawl, and if so, marks us as unavailable, checks that the job message is
321     * well-formed, and starts the thread that the crawl happens in. If an error occurs starting the crawl, we will
322     * start listening for messages again.
323     * <p>
324     * The sequence of actions involved in a crawl are:</br> 1. If we are already running, resend the job to the queue
325     * and return</br> 2. Check the job for validity</br> 3. Send a CrawlStatus message that crawl has STARTED</br> In a
326     * separate thread:</br> 4. Unregister this HACO as listener</br> 5. Create a new crawldir (based on the JobID and a
327     * timestamp)</br> 6. Write a harvestInfoFile (using JobID and crawldir) and metadata</br> 7. Instantiate a new
328     * HeritrixLauncher</br> 8. Start a crawl</br> 9. Store the generated arc-files and metadata in the known
329     * bit-archives </br>10. _Always_ send CrawlStatus DONE or FAILED</br> 11. Move crawldir into oldJobs dir</br>
330     *
331     * @param msg The crawl job
332     * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo
333     * @throws UnknownID if jobID is null in the message
334     * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED
335     * @throws PermissionDenied if the crawldir can't be created
336     * @see #visit(DoOneCrawlMessage) for more details
337     */
338    public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied {
339        // Only one doOneCrawl at a time. Returning should almost never happen,
340        // since we deregister the listener, but we may receive another message
341        // before the listener is removed. Also, if the job message is
342        // malformed or starting the crawl fails, we re-add the listener.
343        synchronized (this) {
344            if (status.isRunning()) {
345                log.warn(
346                        "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'",
347                        msg);
348                jmsConnection.resend(msg, jobChannel);
349                try {
350                    // Wait a second before listening again, so the message has
351                    // a chance of getting snatched by another harvester.
352                    Thread.sleep(TimeUtils.SECOND_IN_MILLIS);
353                } catch (InterruptedException e) {
354                    // Since we're not waiting for anything critical, we can
355                    // ignore this exception.
356                }
357                return;
358            }
359            stopAcceptingJobs();
360        }
361
362        Thread t = null;
363
364        // This 'try' matches a finally that restores running=false if we don't
365        // start a crawl after all
366        try {
367            final Job job = msg.getJob();
368
369            // Every job must have an ID or we can do nothing with it, not even
370            // send a proper failure message back.
371            Long jobID = job.getJobID();
372            if (jobID == null) {
373                log.warn("DoOneCrawlMessage arrived without JobID: '{}'", msg.toString());
374                throw new UnknownID("DoOneCrawlMessage arrived without JobID");
375            }
376
377            log.info("Received crawlrequest for job {}: '{}'", jobID, msg);
378
379            // Send message to scheduler that job is started
380            CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED);
381            jmsConnection.send(csmStarted);
382
383            // Jobs should arrive with status "submitted". If this is not the
384            // case, log the error and send a job-failed message back.
385            // HarvestScheduler likes getting a STARTED message before
386            // FAILED, so we oblige it here.
387            if (job.getStatus() != JobStatus.SUBMITTED) {
388                String message = "Message '" + msg.toString() + "' arrived with" + " status " + job.getStatus()
389                        + " for job " + jobID + ", should have been STATUS_SUBMITTED";
390                log.warn(message);
391                sendErrorMessage(jobID, message, message);
392                throw new ArgumentNotValid(message);
393            }
394
395            final List<MetadataEntry> metadataEntries = msg.getMetadata();
396
397            Thread t1;
398            // Create thread in which harvesting will occur
399            t1 = new HarvesterThread(job, msg.getOrigHarvestInfo(), metadataEntries);
400            // start thread which will remove this listener, harvest, store, and
401            // exit the VM
402            t1.start();
403            log.info("Started harvester thread for job {}", jobID);
404            // We delay assigning the thread variable until start() has
405            // succeeded. Thus, if start() fails, we will resume accepting
406            // jobs.
407            t = t1;
408        } finally {
409            // If we didn't start a thread for crawling after all, accept more
410            // messages
411            if (t == null) {
412                startAcceptingJobs();
413            }
414        }
415        // Now return from this method letting the thread do the work.
416        // This is important as it allows us to receive upload-replies from
417        // THIS_CLIENT in the crawl thread.
418    }
419
420    /**
421     * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message.
422     *
423     * @param jobID ID of the job that failed
424     * @param message A short message indicating what went wrong
425     * @param detailedMessage A more detailed message detailing why it went wrong.
426     */
427    public void sendErrorMessage(long jobID, String message, String detailedMessage) {
428        CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null);
429        csm.setHarvestErrors(message);
430        csm.setHarvestErrorDetails(detailedMessage);
431        jmsConnection.send(csm);
432    }
433
434    /**
435     * A Thread class for the actual harvesting. This is required in order to stop listening while we're busy
436     * harvesting, since JMS doesn't allow the called thread to remove the listener that was called.
437     */
438    private class HarvesterThread extends Thread {
439
440        /** The harvester Job in this thread. */
441        private final Job job;
442
443        /** Stores documentary information about the harvest. */
444        private final HarvestDefinitionInfo origHarvestInfo;
445
446        /** The list of metadata associated with this Job. */
447        private final List<MetadataEntry> metadataEntries;
448
449        /**
450         * Constructor for the HarvesterThread class.
451         *
452         * @param job a harvesting job
453         * @param origHarvestInfo Info about the harvestdefinition that scheduled this job
454         * @param metadataEntries metadata associated with the given job
455         */
456        public HarvesterThread(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) {
457            this.job = job;
458            this.origHarvestInfo = origHarvestInfo;
459            this.metadataEntries = metadataEntries;
460        }
461
462        /**
463         * The thread body for the harvester thread. Removes the JMS listener, sets up the files for Heritrix, then
464         * passes control to the HarvestController to perform the actual harvest.
465         * <p>
466         *
467         * @throws PermissionDenied if we cannot create the crawl directory.
468         * @throws IOFailure if there are problems preparing or running the crawl.
469         */
470        public void run() {
471            try {
472                // We must remove the listener inside a thread,
473                // as JMS doesn't allow us to remove it within the
474                // call it made.
475                removeListener();
476
477                HarvestJob harvestJob = new HarvestJob(instance);
478                harvestJob.init(job, origHarvestInfo, metadataEntries);
479                Heritrix3Files files = harvestJob.getHeritrix3Files();
480
481                Throwable crawlException = null;
482                try {
483                    harvestJob.runHarvest();
484                } catch (Throwable e) {
485                    String msg = "Error during crawling. The crawl may have been only partially completed.";
486                    log.warn(msg, e);
487                    crawlException = e;
488                    throw new IOFailure(msg, e);
489                } finally {
490                        postProcessing.doPostProcessing(files.getCrawlDir(), crawlException);
491                }
492            } catch (Throwable t) {
493                String msg = "Fatal error while operating job '" + job + "'";
494                log.error(msg, t);
495                NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR, t);
496            } finally {
497                log.info("Ending crawl of job : {}", job.getJobID());
498                // process serverdir for files not yet uploaded.
499                postProcessing.processOldJobs();
500                instance.shutdownNowOrContinue();
501                startAcceptingJobs();
502                beginListeningIfSpaceAvailable();
503            }
504        }
505    }
506
507    /**
508     * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying
509     * the HarvestJobManager of the status.
510     */
511    private class CrawlStatus implements Runnable {
512
513        /** The status. */
514        private Boolean running = false;
515
516        private boolean channelIsValid = false;
517
518        /** Handles the periodic sending of status messages. */
519        private PeriodicTaskExecutor statusTransmitter;
520
521        private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY);
522
523        /**
524         * Starts the sending of status messages. Interval defined by HarvesterSettings.SEND_READY_DELAY .
525         */
526        public void startSending() {
527            this.channelIsValid = true;
528            statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", this, 0,
529                        getSendReadyDelay());
530        }
531
532        /**
533         * Stops the sending of status messages.
534         */
535        public void stopSending() {
536            if (statusTransmitter != null) {
537                statusTransmitter.shutdown();
538                statusTransmitter = null;
539            }
540        }
541
542        /**
543         * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>.
544         * @return Whether a crawl running.
545         */
546        public boolean isRunning() {
547            return running;
548        }
549
550        /**
551         * Used for changing the running state in methods startAcceptingJobs and stopAcceptingJobs 
552         * @param running The new status
553         */
554        public void setRunning(boolean running) {
555            this.running = running;
556        }
557
558        /**
559         * @return the channelIsValid
560         */
561        protected final boolean isChannelValid() {
562            return channelIsValid;
563        }
564
565        @Override
566        public void run() {
567            try {
568                Thread.sleep(getSendReadyDelay());
569            } catch (Exception e) {
570                log.error("Unable to sleep", e);
571            }
572            if (!running) {
573                jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName,
574                        HarvestControllerServer.CHANNEL));
575            }
576        }
577
578        public int getSendReadyDelay() {
579                return SEND_READY_DELAY;
580        }
581        
582    }
583
584}