001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.heritrix3; 024 025import java.io.File; 026import java.util.List; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import dk.netarkivet.common.CommonSettings; 032import dk.netarkivet.common.distribute.ChannelID; 033import dk.netarkivet.common.distribute.JMSConnection; 034import dk.netarkivet.common.distribute.JMSConnectionFactory; 035import dk.netarkivet.common.exceptions.ArgumentNotValid; 036import dk.netarkivet.common.exceptions.IOFailure; 037import dk.netarkivet.common.exceptions.PermissionDenied; 038import dk.netarkivet.common.exceptions.UnknownID; 039import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor; 040import dk.netarkivet.common.utils.ApplicationUtils; 041import dk.netarkivet.common.utils.CleanupIF; 042import dk.netarkivet.common.utils.DomainUtils; 043import dk.netarkivet.common.utils.FileUtils; 044import dk.netarkivet.common.utils.NotificationType; 045import dk.netarkivet.common.utils.NotificationsFactory; 046import dk.netarkivet.common.utils.Settings; 047import dk.netarkivet.common.utils.SystemUtils; 048import dk.netarkivet.common.utils.TimeUtils; 049import dk.netarkivet.harvester.HarvesterSettings; 050import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 051import dk.netarkivet.harvester.datamodel.Job; 052import dk.netarkivet.harvester.datamodel.JobStatus; 053import dk.netarkivet.harvester.distribute.HarvesterChannels; 054import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 055import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 056import dk.netarkivet.harvester.harvesting.distribute.DoOneCrawlMessage; 057import dk.netarkivet.harvester.harvesting.distribute.HarvesterReadyMessage; 058import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationRequest; 059import dk.netarkivet.harvester.harvesting.distribute.HarvesterRegistrationResponse; 060import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 061 062/** 063 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and launches a Heritrix crawl with the 064 * received job description. The generated ARC files are uploaded to the bitarchives once a harvest job has been 065 * completed. 066 * 067 * Initially, the HarvestControllerServer registers its channel with the Scheduler by sending a HarvesterRegistrationRequest and waits for a 068 * positive HarvesterRegistrationResponse that its channel is recognized. 069 * If not recognized by the Scheduler, the HarvestControllerServer will send a notification about this, 070 * and then close down the application. 071 * <p> 072 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When starting the actual 073 * harvesting a message is sent with status 'STARTED'. When the harvesting has finished a message is sent with either 074 * status 'DONE' or 'FAILED'. Either a 'DONE' or 'FAILED' message with result should ALWAYS be sent if at all possible, 075 * but only ever one such message per job. 076 * While the harvestControllerServer is waiting for the harvesting to finish, it sends HarvesterReadyMessages to the scheduler. 077 * The interval between each HarvesterReadyMessage being sent is defined by the setting 'settings.harvester.harvesting.sendReadyDelay'. 078 * 079 * <p> 080 * It is necessary to be able to run the Heritrix harvester on several machines and several processes on each machine. 081 * Each instance of Heritrix is started and monitored by a HarvestControllerServer. 082 * <p> 083 * Initially, all directories under serverdir are scanned for harvestinfo files. If any are found, they are parsed for 084 * information, and all remaining files are attempted uploaded to the bitarchive. It will then send back a 085 * CrawlStatusMessage with status failed. 086 * <p> 087 * A new thread is started for each actual crawl, in which the JMS listener is removed. Threading is required since JMS 088 * will not let the called thread remove the listener that's being handled. 089 * <p> 090 * After a harvestjob has been terminated, either successfully or unsuccessfully, the serverdir is again scanned for 091 * harvestInfo files to attempt upload of files not yet uploaded. Then it begins to listen again after new jobs, if 092 * there is enough room available on the machine. If not, it logs a warning about this, which is also sent as a 093 * notification. 094 */ 095public class HarvestControllerServer extends HarvesterMessageHandler implements CleanupIF { 096 097 /** The logger to use. */ 098 private static final Logger log = LoggerFactory.getLogger(HarvestControllerServer.class); 099 100 /** The unique instance of this class. */ 101 private static HarvestControllerServer instance; 102 103 /** The configured application instance id. @see CommonSettings#APPLICATION_INSTANCE_ID */ 104 private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID); 105 106 /** The name of the server this <code>HarvestControllerServer</code> is running on. */ 107 private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName()); 108 109 /** Min. space required to start a job. */ 110 private final long minSpaceRequired; 111 112 /** The JMSConnection to use. */ 113 private JMSConnection jmsConnection; 114 115 /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */ 116 public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels 117 .getHarvesterRegistrationResponseChannel(); 118 119 private final PostProcessing postProcessing; 120 121 /** The CHANNEL of jobs processed by this instance. */ 122 private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL); 123 124 /** Jobs are fetched from this queue. */ 125 private ChannelID jobChannel; 126 127 /** the serverdir, where the harvesting takes place. */ 128 private final File serverDir; 129 130 /** This is true while a doOneCrawl is running. No jobs are accepted while this is running. */ 131 private CrawlStatus status; 132 133 /** 134 * Returns or creates the unique instance of this singleton The server creates an instance of the HarvestController, 135 * uploads arc-files from unfinished harvests, and starts to listen to JMS messages on the incoming jms queues. 136 * @return The instance 137 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created 138 * @throws IOFailure if data from old harvests exist, but contain illegal data 139 */ 140 public static synchronized HarvestControllerServer getInstance() throws IOFailure { 141 if (instance == null) { 142 instance = new HarvestControllerServer(); 143 } 144 return instance; 145 } 146 147 /** 148 * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from 149 * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In 150 * that case, it sends a notification to the administrator and pauses. 151 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created. 152 * @throws IOFailure If harvestInfoFile contains invalid data. 153 * @throws UnknownID if the settings file does not specify a valid queue priority. 154 */ 155 private HarvestControllerServer() throws IOFailure { 156 log.info("Starting HarvestControllerServer."); 157 log.info("Bound to harvest channel '{}'", CHANNEL); 158 159 // Make sure serverdir (where active crawl-dirs live) and oldJobsDir 160 // (where old crawl dirs are stored) exist. 161 serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 162 ApplicationUtils.dirMustExist(serverDir); 163 log.info("Serverdir: '{}'", serverDir); 164 minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE); 165 if (minSpaceRequired <= 0L) { 166 log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired); 167 throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired); 168 } 169 log.info("Harvesting requires at least {} bytes free.", minSpaceRequired); 170 171 // If shutdown.txt found in serverdir, just close down the HarvestControllerApplication at once. 172 shutdownNowOrContinue(); 173 174 // Get JMS-connection 175 // Channel THIS_CLIENT is only used for replies to store messages so 176 // do not set as listener here. It is registered in the arcrepository 177 // client. 178 // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and 179 // registered below. 180 181 jmsConnection = JMSConnectionFactory.getInstance(); 182 postProcessing = PostProcessing.getInstance(jmsConnection); 183 log.debug("Obtained JMS connection."); 184 185 status = new CrawlStatus(); 186 log.info("SEND_READY_DELAY used by HarvestControllerServer is {}", status.getSendReadyDelay()); 187 188 // If any unprocessed jobs are left on the server, process them now 189 postProcessing.processOldJobs(); 190 191 // Register for listening to harvest channel validity responses 192 JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this); 193 194 // Ask if the channel this harvester is assigned to is valid 195 jmsConnection.send(new HarvesterRegistrationRequest(HarvestControllerServer.CHANNEL, applicationInstanceId)); 196 log.info("Requested to check the validity of harvest channel '{}'", HarvestControllerServer.CHANNEL); 197 } 198 199 /** 200 * Release all jms connections. Close the Controller 201 */ 202 public synchronized void close() { 203 log.info("Closing HarvestControllerServer."); 204 cleanup(); 205 log.info("Closed down HarvestControllerServer"); 206 } 207 208 /** 209 * Will be called on shutdown. 210 * 211 * @see CleanupIF#cleanup() 212 */ 213 public void cleanup() { 214 if (jmsConnection != null) { 215 jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this); 216 if (jobChannel != null) { 217 jmsConnection.removeListener(jobChannel, this); 218 } 219 } 220 // Stop the sending of status messages 221 status.stopSending(); 222 instance = null; 223 } 224 225 @Override 226 public void visit(HarvesterRegistrationResponse msg) { 227 // If we have already started or the message notifies for another channel, resend it. 228 String channelName = msg.getHarvestChannelName(); 229 if (status.isChannelValid() || !CHANNEL.equals(channelName)) { 230 // Controller has already started 231 jmsConnection.resend(msg, msg.getTo()); 232 if (log.isTraceEnabled()) { 233 log.trace("Resending harvest channel validity message for channel '{}'", channelName); 234 } 235 return; 236 } 237 238 if (!msg.isValid()) { 239 String errMsg = "Received message stating that channel '" + channelName + "' is invalid. Will stop. " 240 + "Probable cause: the channel is not one of the known channels stored in the channels table"; 241 log.error(errMsg); 242 // Send a notification about this, ASAP 243 NotificationsFactory.getInstance().notify(errMsg, NotificationType.ERROR); 244 close(); 245 return; 246 } 247 248 log.info("Received message stating that channel '{}' is valid.", channelName); 249 // Environment and connections are now ready for processing of messages 250 jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot()); 251 252 // Only listen for harvester jobs if enough available space 253 beginListeningIfSpaceAvailable(); 254 255 // Notify the harvest dispatcher that we are ready 256 startAcceptingJobs(); 257 status.startSending(); 258 } 259 260 /** Start listening for crawls, if space available. */ 261 private void beginListeningIfSpaceAvailable() { 262 long availableSpace = FileUtils.getBytesFree(serverDir); 263 if (availableSpace > minSpaceRequired) { 264 log.info("Starts to listen to new jobs on queue '{}'", jobChannel); 265 jmsConnection.setListener(jobChannel, this); 266 } else { 267 String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available." 268 + " Harvester is paused."; 269 log.error(pausedMessage); 270 NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR); 271 } 272 } 273 274 /** 275 * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets 276 * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we 277 * stop resending messages we get. 278 */ 279 private synchronized void startAcceptingJobs() { 280 // allow this harvestControllerServer to receive messages again 281 status.setRunning(false); 282 } 283 284 /** 285 * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit 286 * further down, we will stop listening altogether, but that requires another thread. 287 */ 288 private synchronized void stopAcceptingJobs() { 289 status.setRunning(true); 290 log.debug("No longer accepting jobs."); 291 } 292 293 /** 294 * Stop listening for new crawl requests. 295 */ 296 private void removeListener() { 297 log.debug("Removing listener on CHANNEL '{}'", jobChannel); 298 jmsConnection.removeListener(jobChannel, this); 299 } 300 301 /** 302 * Does the operator want us to shutdown now. 303 * TODO In a later implementation, the harvestControllerServer could 304 * be notified over JMX. Now we just look for a "shutdown.txt" file in the HARVEST_CONTROLLER_SERVERDIR 305 * log that we're shutting down, send a notification about this, and then shutdown. 306 */ 307 private void shutdownNowOrContinue() { 308 File shutdownFile = new File(serverDir, "shutdown.txt"); 309 310 if (shutdownFile.exists()) { 311 String msg = "Found shutdown-file in serverdir '" + serverDir.getAbsolutePath() + "'. Shutting down the application"; 312 log.info(msg); 313 NotificationsFactory.getInstance().notify(msg, NotificationType.INFO); 314 instance.cleanup(); 315 System.exit(0); 316 } 317 } 318 319 /** 320 * Checks that we're available to do a crawl, and if so, marks us as unavailable, checks that the job message is 321 * well-formed, and starts the thread that the crawl happens in. If an error occurs starting the crawl, we will 322 * start listening for messages again. 323 * <p> 324 * The sequence of actions involved in a crawl are:</br> 1. If we are already running, resend the job to the queue 325 * and return</br> 2. Check the job for validity</br> 3. Send a CrawlStatus message that crawl has STARTED</br> In a 326 * separate thread:</br> 4. Unregister this HACO as listener</br> 5. Create a new crawldir (based on the JobID and a 327 * timestamp)</br> 6. Write a harvestInfoFile (using JobID and crawldir) and metadata</br> 7. Instantiate a new 328 * HeritrixLauncher</br> 8. Start a crawl</br> 9. Store the generated arc-files and metadata in the known 329 * bit-archives </br>10. _Always_ send CrawlStatus DONE or FAILED</br> 11. Move crawldir into oldJobs dir</br> 330 * 331 * @param msg The crawl job 332 * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo 333 * @throws UnknownID if jobID is null in the message 334 * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED 335 * @throws PermissionDenied if the crawldir can't be created 336 * @see #visit(DoOneCrawlMessage) for more details 337 */ 338 public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied { 339 // Only one doOneCrawl at a time. Returning should almost never happen, 340 // since we deregister the listener, but we may receive another message 341 // before the listener is removed. Also, if the job message is 342 // malformed or starting the crawl fails, we re-add the listener. 343 synchronized (this) { 344 if (status.isRunning()) { 345 log.warn( 346 "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'", 347 msg); 348 jmsConnection.resend(msg, jobChannel); 349 try { 350 // Wait a second before listening again, so the message has 351 // a chance of getting snatched by another harvester. 352 Thread.sleep(TimeUtils.SECOND_IN_MILLIS); 353 } catch (InterruptedException e) { 354 // Since we're not waiting for anything critical, we can 355 // ignore this exception. 356 } 357 return; 358 } 359 stopAcceptingJobs(); 360 } 361 362 Thread t = null; 363 364 // This 'try' matches a finally that restores running=false if we don't 365 // start a crawl after all 366 try { 367 final Job job = msg.getJob(); 368 369 // Every job must have an ID or we can do nothing with it, not even 370 // send a proper failure message back. 371 Long jobID = job.getJobID(); 372 if (jobID == null) { 373 log.warn("DoOneCrawlMessage arrived without JobID: '{}'", msg.toString()); 374 throw new UnknownID("DoOneCrawlMessage arrived without JobID"); 375 } 376 377 log.info("Received crawlrequest for job {}: '{}'", jobID, msg); 378 379 // Send message to scheduler that job is started 380 CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED); 381 jmsConnection.send(csmStarted); 382 383 // Jobs should arrive with status "submitted". If this is not the 384 // case, log the error and send a job-failed message back. 385 // HarvestScheduler likes getting a STARTED message before 386 // FAILED, so we oblige it here. 387 if (job.getStatus() != JobStatus.SUBMITTED) { 388 String message = "Message '" + msg.toString() + "' arrived with" + " status " + job.getStatus() 389 + " for job " + jobID + ", should have been STATUS_SUBMITTED"; 390 log.warn(message); 391 sendErrorMessage(jobID, message, message); 392 throw new ArgumentNotValid(message); 393 } 394 395 final List<MetadataEntry> metadataEntries = msg.getMetadata(); 396 397 Thread t1; 398 // Create thread in which harvesting will occur 399 t1 = new HarvesterThread(job, msg.getOrigHarvestInfo(), metadataEntries); 400 // start thread which will remove this listener, harvest, store, and 401 // exit the VM 402 t1.start(); 403 log.info("Started harvester thread for job {}", jobID); 404 // We delay assigning the thread variable until start() has 405 // succeeded. Thus, if start() fails, we will resume accepting 406 // jobs. 407 t = t1; 408 } finally { 409 // If we didn't start a thread for crawling after all, accept more 410 // messages 411 if (t == null) { 412 startAcceptingJobs(); 413 } 414 } 415 // Now return from this method letting the thread do the work. 416 // This is important as it allows us to receive upload-replies from 417 // THIS_CLIENT in the crawl thread. 418 } 419 420 /** 421 * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message. 422 * 423 * @param jobID ID of the job that failed 424 * @param message A short message indicating what went wrong 425 * @param detailedMessage A more detailed message detailing why it went wrong. 426 */ 427 public void sendErrorMessage(long jobID, String message, String detailedMessage) { 428 CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null); 429 csm.setHarvestErrors(message); 430 csm.setHarvestErrorDetails(detailedMessage); 431 jmsConnection.send(csm); 432 } 433 434 /** 435 * A Thread class for the actual harvesting. This is required in order to stop listening while we're busy 436 * harvesting, since JMS doesn't allow the called thread to remove the listener that was called. 437 */ 438 private class HarvesterThread extends Thread { 439 440 /** The harvester Job in this thread. */ 441 private final Job job; 442 443 /** Stores documentary information about the harvest. */ 444 private final HarvestDefinitionInfo origHarvestInfo; 445 446 /** The list of metadata associated with this Job. */ 447 private final List<MetadataEntry> metadataEntries; 448 449 /** 450 * Constructor for the HarvesterThread class. 451 * 452 * @param job a harvesting job 453 * @param origHarvestInfo Info about the harvestdefinition that scheduled this job 454 * @param metadataEntries metadata associated with the given job 455 */ 456 public HarvesterThread(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) { 457 this.job = job; 458 this.origHarvestInfo = origHarvestInfo; 459 this.metadataEntries = metadataEntries; 460 } 461 462 /** 463 * The thread body for the harvester thread. Removes the JMS listener, sets up the files for Heritrix, then 464 * passes control to the HarvestController to perform the actual harvest. 465 * <p> 466 * 467 * @throws PermissionDenied if we cannot create the crawl directory. 468 * @throws IOFailure if there are problems preparing or running the crawl. 469 */ 470 public void run() { 471 try { 472 // We must remove the listener inside a thread, 473 // as JMS doesn't allow us to remove it within the 474 // call it made. 475 removeListener(); 476 477 HarvestJob harvestJob = new HarvestJob(instance); 478 harvestJob.init(job, origHarvestInfo, metadataEntries); 479 Heritrix3Files files = harvestJob.getHeritrix3Files(); 480 481 Throwable crawlException = null; 482 try { 483 harvestJob.runHarvest(); 484 } catch (Throwable e) { 485 String msg = "Error during crawling. The crawl may have been only partially completed."; 486 log.warn(msg, e); 487 crawlException = e; 488 throw new IOFailure(msg, e); 489 } finally { 490 postProcessing.doPostProcessing(files.getCrawlDir(), crawlException); 491 } 492 } catch (Throwable t) { 493 String msg = "Fatal error while operating job '" + job + "'"; 494 log.error(msg, t); 495 NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR, t); 496 } finally { 497 log.info("Ending crawl of job : {}", job.getJobID()); 498 // process serverdir for files not yet uploaded. 499 postProcessing.processOldJobs(); 500 instance.shutdownNowOrContinue(); 501 startAcceptingJobs(); 502 beginListeningIfSpaceAvailable(); 503 } 504 } 505 } 506 507 /** 508 * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying 509 * the HarvestJobManager of the status. 510 */ 511 private class CrawlStatus implements Runnable { 512 513 /** The status. */ 514 private Boolean running = false; 515 516 private boolean channelIsValid = false; 517 518 /** Handles the periodic sending of status messages. */ 519 private PeriodicTaskExecutor statusTransmitter; 520 521 private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY); 522 523 /** 524 * Starts the sending of status messages. Interval defined by HarvesterSettings.SEND_READY_DELAY . 525 */ 526 public void startSending() { 527 this.channelIsValid = true; 528 statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", this, 0, 529 getSendReadyDelay()); 530 } 531 532 /** 533 * Stops the sending of status messages. 534 */ 535 public void stopSending() { 536 if (statusTransmitter != null) { 537 statusTransmitter.shutdown(); 538 statusTransmitter = null; 539 } 540 } 541 542 /** 543 * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>. 544 * @return Whether a crawl running. 545 */ 546 public boolean isRunning() { 547 return running; 548 } 549 550 /** 551 * Used for changing the running state in methods startAcceptingJobs and stopAcceptingJobs 552 * @param running The new status 553 */ 554 public void setRunning(boolean running) { 555 this.running = running; 556 } 557 558 /** 559 * @return the channelIsValid 560 */ 561 protected final boolean isChannelValid() { 562 return channelIsValid; 563 } 564 565 @Override 566 public void run() { 567 try { 568 Thread.sleep(getSendReadyDelay()); 569 } catch (Exception e) { 570 log.error("Unable to sleep", e); 571 } 572 if (!running) { 573 jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName, 574 HarvestControllerServer.CHANNEL)); 575 } 576 } 577 578 public int getSendReadyDelay() { 579 return SEND_READY_DELAY; 580 } 581 582 } 583 584}