001package dk.netarkivet.harvester.tools.dummy;
002
003import java.io.File;
004
005import org.slf4j.Logger;
006import org.slf4j.LoggerFactory;
007
008import dk.netarkivet.common.CommonSettings;
009import dk.netarkivet.common.distribute.ChannelID;
010import dk.netarkivet.common.distribute.JMSConnection;
011import dk.netarkivet.common.distribute.JMSConnectionFactory;
012import dk.netarkivet.common.exceptions.ArgumentNotValid;
013import dk.netarkivet.common.exceptions.IOFailure;
014import dk.netarkivet.common.exceptions.PermissionDenied;
015import dk.netarkivet.common.exceptions.UnknownID;
016import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor;
017import dk.netarkivet.common.utils.ApplicationUtils;
018import dk.netarkivet.common.utils.CleanupIF;
019import dk.netarkivet.common.utils.DomainUtils;
020import dk.netarkivet.common.utils.FileUtils;
021import dk.netarkivet.common.utils.NotificationType;
022import dk.netarkivet.common.utils.NotificationsFactory;
023import dk.netarkivet.common.utils.Settings;
024import dk.netarkivet.common.utils.SystemUtils;
025import dk.netarkivet.common.utils.TimeUtils;
026import dk.netarkivet.harvester.HarvesterSettings;
027import dk.netarkivet.harvester.datamodel.Job;
028import dk.netarkivet.harvester.datamodel.JobStatus;
029import dk.netarkivet.harvester.distribute.HarvesterChannels;
030import dk.netarkivet.harvester.distribute.HarvesterMessageHandler;
031import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage;
032import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage;
033import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage;
034import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest;
035import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse;
036
037/**
038 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and waits 10 minutes before failing the job. 
039 * <p>
040 * Initially, it registers its channel with the Scheduler by sending a HarvesterRegistrationRequest and waits for a positive HarvesterRegistrationResponse
041 * that its channel is recognized. If not recognized by the Scheduler, the HarvestControllerServer will send a notification about this, 
042 * and then close down the application.
043 * 
044 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When responding to the message,
045 * it sends a message with status 'STARTED'. When 10 minutes have passed, a message is sent with status 'FAILED'.
046 * <p>
047 * Before the 10 minutes starts, the JMS listener is removed to avoid handling more than one doOneCrawlMessage at a time
048 * During the 10 minutes, it will send two (instead of one) HarvesterReadyMessages to the scheduler to test issue NAS-2614.
049 * The interval between sending HarvesterReadyMessages is defined by the setting 'settings.harvester.harvesting.sendReadyDelay'.    
050 * <p>
051 */
052public class FaultyHarvestControllerServer extends HarvesterMessageHandler implements CleanupIF {
053    
054    
055            /** The logger to use. */
056            private static final Logger log = LoggerFactory.getLogger(FaultyHarvestControllerServer.class);
057
058            /** The unique instance of this class. */
059            private static FaultyHarvestControllerServer instance;
060
061            /** The configured application instance id. @see CommonSettings#APPLICATION_INSTANCE_ID */
062            private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID);
063
064            /** The name of the server this <code>HarvestControllerServer</code> is running on. */
065            private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName());
066
067            /** Min. space required to start a job. */
068            private final long minSpaceRequired;
069
070            /** The JMSConnection to use. */
071            private JMSConnection jmsConnection;
072
073            /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */
074            public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels
075                    .getHarvesterRegistrationResponseChannel();
076
077            /** The CHANNEL of jobs processed by this instance. */
078            private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL);
079
080            /** Jobs are fetched from this queue. */
081            private ChannelID jobChannel;
082
083            /** the serverdir, where the harvesting takes place. */
084            private final File serverDir;
085
086            /** This is true while a doOneCrawl is running. No jobs are accepted while this is running. */
087            private CrawlStatus status;
088
089            /**
090             * Returns or creates the unique instance of this singleton.
091             * @return The instance
092             * @throws PermissionDenied If the serverdir or oldjobsdir can't be created
093             * @throws IOFailure if data from old harvests exist, but contain illegal data
094             */
095            public static synchronized FaultyHarvestControllerServer getInstance() throws IOFailure {
096                if (instance == null) {
097                    instance = new FaultyHarvestControllerServer();
098                }
099                return instance;
100            }
101
102            /**
103             * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from
104             * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In
105             * that case, it sends a notification to the administrator and pauses.
106             * @throws PermissionDenied If the serverdir or oldjobsdir can't be created.
107             * @throws IOFailure If harvestInfoFile contains invalid data.
108             * @throws UnknownID if the settings file does not specify a valid queue priority.
109             */
110            private FaultyHarvestControllerServer() throws IOFailure {
111                log.info("Starting {}.", this.getClass());
112                log.info("Bound to harvest channel '{}'", CHANNEL);
113
114                // Make sure serverdir (where active crawl-dirs live) and oldJobsDir
115                // (where old crawl dirs are stored) exist.
116                serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR));
117                ApplicationUtils.dirMustExist(serverDir);
118                log.info("Serverdir: '{}'", serverDir);
119                minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE);
120                if (minSpaceRequired <= 0L) {
121                    log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired);
122                    throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired);
123                }
124                log.info("Harvesting requires at least {} bytes free.", minSpaceRequired);
125
126                // Get JMS-connection
127                // Channel THIS_CLIENT is only used for replies to store messages so
128                // do not set as listener here. It is registered in the arcrepository
129                // client.
130                // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and
131                // registered below.
132
133                jmsConnection = JMSConnectionFactory.getInstance();
134                
135                status = new CrawlStatus();
136                log.info("SEND_READY_DELAY used by {} is {}", this.getClass().getName(), status.getSendReadyDelay());
137
138               
139                // Register for listening to harvest channel validity responses
140                JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this);
141
142                // Ask if the channel this harvester is assigned to is valid
143                jmsConnection.send(new HarvesterRegistrationRequest(FaultyHarvestControllerServer.CHANNEL, applicationInstanceId));
144                log.info("Requested to check the validity of harvest channel '{}'", FaultyHarvestControllerServer.CHANNEL);
145            }
146
147            /**
148             * Release all jms connections. Close the Controller
149             */
150            public synchronized void close() {
151                log.info("Closing {}.", this.getClass().getName());
152                cleanup();
153                log.info("Closed down {}", this.getClass().getName());
154            }
155
156            /**
157             * Will be called on shutdown.
158             *
159             * @see CleanupIF#cleanup()
160             */
161            public void cleanup() {
162                if (jmsConnection != null) {
163                    jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this);
164                    if (jobChannel != null) {
165                        jmsConnection.removeListener(jobChannel, this);
166                    }
167                }
168                // Stop the sending of status messages
169                status.stopSending();
170                instance = null;
171            }
172
173            @Override
174            public void visit(HarvesterRegistrationResponse msg) {
175                // If we have already started or the message notifies for another channel, resend it.
176                String channelName = msg.getHarvestChannelName();
177                if (status.isChannelValid() || !CHANNEL.equals(channelName)) {
178                    // Controller has already started
179                    jmsConnection.resend(msg, msg.getTo());
180                    if (log.isTraceEnabled()) {
181                        log.trace("Resending harvest channel validity message for channel '{}'", channelName);
182                    }
183                    return;
184                }
185
186                if (!msg.isValid()) {
187                        String errMsg = "Received message stating that channel '" +  channelName + "' is invalid. Will stop. "
188                                + "Probable cause: the channel is not one of the known channels stored in the channels table"; 
189                    log.error(errMsg);
190                    // Send a notification about this, ASAP
191                    NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR);
192                    close();
193                    return;
194                }
195
196                log.info("Received message stating that channel '{}' is valid.", channelName);
197                // Environment and connections are now ready for processing of messages
198                jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot());
199
200                // Only listen for harvester jobs if enough available space
201                beginListeningIfSpaceAvailable();
202
203                // Notify the harvest dispatcher that we are ready
204                startAcceptingJobs();
205                status.startSending();
206            }
207
208            /** Start listening for crawls, if space available. */
209            private void beginListeningIfSpaceAvailable() {
210                long availableSpace = FileUtils.getBytesFree(serverDir);
211                if (availableSpace > minSpaceRequired) {
212                    log.info("Starts to listen to new jobs on queue '{}'", jobChannel);
213                    jmsConnection.setListener(jobChannel, this);
214                } else {
215                    String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available."
216                            + " Harvester is paused.";
217                    log.error(pausedMessage);
218                    NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR);
219                }
220            }
221
222            /**
223             * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets
224             * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we
225             * stop resending messages we get.
226             */
227            private synchronized void startAcceptingJobs() {
228                // allow this harvestControllerServer to receive messages again
229                status.setRunning(false);
230            }
231
232            /**
233             * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit
234             * further down, we will stop listening altogether, but that requires another thread.
235             */
236            private synchronized void stopAcceptingJobs() {
237                status.setRunning(true);
238                log.debug("No longer accepting jobs.");
239            }
240
241            /**
242             * Stop listening for new crawl requests.
243             */
244            private void removeListener() {
245                log.debug("Removing listener on CHANNEL '{}'", jobChannel);
246                jmsConnection.removeListener(jobChannel, this);
247            }
248
249            /**
250             * Here we receives a DoOneCrawlMessage, waits 10 minutes, and then fails the job.
251             *
252             * @param msg The crawl job
253             * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo
254             * @throws UnknownID if jobID is null in the message
255             * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED
256             * @throws PermissionDenied if the crawldir can't be created
257             * @see #visit(DoOneCrawlMessage) for more details
258             */
259            public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied {
260                // Only one doOneCrawl at a time. Returning should almost never happen,
261                // since we deregister the listener, but we may receive another message
262                // before the listener is removed. Also, if the job message is
263                // malformed or starting the crawl fails, we re-add the listener.
264                synchronized (this) {
265                    if (status.isRunning()) {
266                        log.warn(
267                                "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'",
268                                msg);
269                        jmsConnection.resend(msg, jobChannel);
270                        try {
271                            // Wait a second before listening again, so the message has
272                            // a chance of getting snatched by another harvester.
273                            Thread.sleep(TimeUtils.SECOND_IN_MILLIS);
274                        } catch (InterruptedException e) {
275                            // Since we're not waiting for anything critical, we can
276                            // ignore this exception.
277                        }
278                        return;
279                    }
280                    stopAcceptingJobs();
281                }
282
283                final Job job = msg.getJob();
284
285            // Every job must have an ID or we can do nothing with it, not even
286            // send a proper failure message back.
287            Long jobID = job.getJobID();
288
289            log.info("Received crawlrequest for job {}: '{}'", jobID, msg);
290            
291            // Send message to scheduler that job is started
292            CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED);
293            jmsConnection.send(csmStarted);
294
295            removeListener();
296            
297            log.info("Waiting 10 minutes to illustrate a real harvest");
298            // wait 10 minutes (should this be in a thread of its own???
299            try {
300                                Thread.sleep(10 *// minutes to sleep
301                                        60 *   // seconds to a minute
302                                        1000);
303                        } catch (InterruptedException e) {
304                                e.printStackTrace();
305                        }
306            
307            csmStarted = new CrawlStatusMessage(jobID, JobStatus.FAILED);
308            jmsConnection.send(csmStarted);
309            
310            
311            startAcceptingJobs();
312            }
313            
314
315            /**
316             * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message.
317             *
318             * @param jobID ID of the job that failed
319             * @param message A short message indicating what went wrong
320             * @param detailedMessage A more detailed message detailing why it went wrong.
321             */
322            public void sendErrorMessage(long jobID, String message, String detailedMessage) {
323                CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null);
324                csm.setHarvestErrors(message);
325                csm.setHarvestErrorDetails(detailedMessage);
326                jmsConnection.send(csm);
327            }    
328            
329            /**
330             * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying
331             * the HarvestJobManager of the status.
332             */
333            private class CrawlStatus implements Runnable {
334
335                /** The status. */
336                private Boolean running = false;
337
338                private boolean channelIsValid = false;
339
340                /** Handles the periodic sending of status messages. */
341                private PeriodicTaskExecutor statusTransmitter;
342
343                private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY);
344
345                /**
346                 * Starts the sending of status messages. Interval defined by HarvesterSettings.SEND_READY_DELAY .
347                 */
348                public void startSending() {
349                    this.channelIsValid = true;
350                    statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", this, 0,
351                                getSendReadyDelay());
352                }
353
354                /**
355                 * Stops the sending of status messages.
356                 */
357                public void stopSending() {
358                    if (statusTransmitter != null) {
359                        statusTransmitter.shutdown();
360                        statusTransmitter = null;
361                    }
362                }
363
364                /**
365                 * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>.
366                 * @return Whether a crawl running.
367                 */
368                public boolean isRunning() {
369                    return running;
370                }
371
372                /**
373                 * Used for changing the running state in methods startAcceptingJobs and stopAcceptingJobs 
374                 * @param running The new status
375                 */
376                public void setRunning(boolean running) {
377                    this.running = running;
378                }
379
380                /**
381                 * @return the channelIsValid
382                 */
383                protected final boolean isChannelValid() {
384                    return channelIsValid;
385                }
386
387                @Override
388                public void run() {
389                    try {
390                        Thread.sleep(getSendReadyDelay());
391                    } catch (Exception e) {
392                        log.error("Unable to sleep", e);
393                    }
394                    log.info("Sending ready message #1 from {}", this.getClass());
395                    jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName,
396                            FaultyHarvestControllerServer.CHANNEL));
397                    log.info("Sending ready message #2 from {}", this.getClass());
398                    jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName,
399                            FaultyHarvestControllerServer.CHANNEL));
400                }
401
402                public int getSendReadyDelay() {
403                        return SEND_READY_DELAY;
404                }
405                
406            }
407}