001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.distribute; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.List; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import dk.netarkivet.common.CommonSettings; 033import dk.netarkivet.common.Constants; 034import dk.netarkivet.common.distribute.ChannelID; 035import dk.netarkivet.common.distribute.JMSConnection; 036import dk.netarkivet.common.distribute.JMSConnectionFactory; 037import dk.netarkivet.common.exceptions.ArgumentNotValid; 038import dk.netarkivet.common.exceptions.IOFailure; 039import dk.netarkivet.common.exceptions.PermissionDenied; 040import dk.netarkivet.common.exceptions.UnknownID; 041import dk.netarkivet.common.lifecycle.PeriodicTaskExecutor; 042import dk.netarkivet.common.utils.ApplicationUtils; 043import dk.netarkivet.common.utils.CleanupIF; 044import dk.netarkivet.common.utils.DomainUtils; 045import dk.netarkivet.common.utils.ExceptionUtils; 046import dk.netarkivet.common.utils.FileUtils; 047import dk.netarkivet.common.utils.NotificationType; 048import dk.netarkivet.common.utils.NotificationsFactory; 049import dk.netarkivet.common.utils.Settings; 050import dk.netarkivet.common.utils.SystemUtils; 051import dk.netarkivet.common.utils.TimeUtils; 052import dk.netarkivet.harvester.HarvesterSettings; 053import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 054import dk.netarkivet.harvester.datamodel.Job; 055import dk.netarkivet.harvester.datamodel.JobStatus; 056import dk.netarkivet.harvester.distribute.HarvesterChannels; 057import dk.netarkivet.harvester.distribute.HarvesterMessageHandler; 058import dk.netarkivet.harvester.harvesting.DomainnameQueueAssignmentPolicy; 059import dk.netarkivet.harvester.harvesting.HarvestController; 060import dk.netarkivet.harvester.harvesting.HeritrixFiles; 061import dk.netarkivet.harvester.harvesting.PersistentJobData; 062import dk.netarkivet.harvester.harvesting.SeedUriDomainnameQueueAssignmentPolicy; 063import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 064import dk.netarkivet.harvester.harvesting.report.HarvestReport; 065 066/** 067 * This class responds to JMS doOneCrawl messages from the HarvestScheduler and launches a Heritrix crawl with the 068 * received job description. The generated ARC files are uploaded to the bitarchives once a harvest job has been 069 * completed. 070 * <p> 071 * During its operation CrawlStatus messages are sent to the HarvestSchedulerMonitorServer. When starting the actual 072 * harvesting a message is sent with status 'STARTED'. When the harvesting has finished a message is sent with either 073 * status 'DONE' or 'FAILED'. Either a 'DONE' or 'FAILED' message with result should ALWAYS be sent if at all possible, 074 * but only ever one such message per job. 075 * <p> 076 * It is necessary to be able to run the Heritrix harvester on several machines and several processes on each machine. 077 * Each instance of Heritrix is started and monitored by a HarvestControllerServer. 078 * <p> 079 * Initially, all directories under serverdir are scanned for harvestinfo files. If any are found, they are parsed for 080 * information, and all remaining files are attempted uploaded to the bitarchive. It will then send back a 081 * crawlstatusmessage with status failed. 082 * <p> 083 * A new thread is started for each actual crawl, in which the JMS listener is removed. Threading is required since JMS 084 * will not let the called thread remove the listener that's being handled. 085 * <p> 086 * After a harvestjob has been terminated, either successfully or unsuccessfully, the serverdir is again scanned for 087 * harvestInfo files to attempt upload of files not yet uploaded. Then it begins to listen again after new jobs, if 088 * there is enough room available on the machine. If not, it logs a warning about this, which is also sent as a 089 * notification. 090 */ 091public class HarvestControllerServer extends HarvesterMessageHandler implements CleanupIF { 092 093 /** The logger to use. */ 094 private static final Logger log = LoggerFactory.getLogger(HarvestControllerServer.class); 095 096 /** The unique instance of this class. */ 097 private static HarvestControllerServer instance; 098 099 /** 100 * The configured application instance id. 101 * 102 * @see CommonSettings#APPLICATION_INSTANCE_ID 103 */ 104 private final String applicationInstanceId = Settings.get(CommonSettings.APPLICATION_INSTANCE_ID); 105 106 /** 107 * The name of the server this <code>HarvestControllerServer</code> is running on. 108 */ 109 private final String physicalServerName = DomainUtils.reduceHostname(SystemUtils.getLocalHostName()); 110 111 /** The message to write to log when starting the server. */ 112 private static final String STARTING_MESSAGE = "Starting HarvestControllerServer."; 113 /** The message to write to log when server is started. */ 114 private static final String STARTED_MESSAGE = "HarvestControllerServer started."; 115 /** The message to write to log when stopping the server. */ 116 private static final String CLOSING_MESSAGE = "Closing HarvestControllerServer."; 117 /** The message to write to log when server is stopped. */ 118 private static final String CLOSED_MESSAGE = "Closed down HarvestControllerServer"; 119 /** The message to write to log when starting a crawl. */ 120 private static final String STARTCRAWL_MESSAGE = "Starting crawl of job :"; 121 /** The message to write to log after ending a crawl. */ 122 private static final String ENDCRAWL_MESSAGE = "Ending crawl of job :"; 123 /** The max time to wait for the hosts-report.txt to be available (in secs). */ 124 static final int WAIT_FOR_HOSTS_REPORT_TIMEOUT_SECS = 30; 125 /** Heritrix version property. */ 126 private static final String HERITRIX_VERSION_PROPERTY = "heritrix.version"; 127 /** queue-assignment-policy property. */ 128 private static final String HERITRIX_QUEUE_ASSIGNMENT_POLICY_PROPERTY = "org.archive.crawler.frontier.AbstractFrontier.queue-assignment-policy"; 129 130 /** The CHANNEL of jobs processed by this instance. */ 131 private static final String CHANNEL = Settings.get(HarvesterSettings.HARVEST_CONTROLLER_CHANNEL); 132 133 /** The JMS channel on which to listen for {@link HarvesterRegistrationResponse}s. */ 134 public static final ChannelID HARVEST_CHAN_VALID_RESP_ID = HarvesterChannels 135 .getHarvesterRegistrationResponseChannel(); 136 137 /** The JMSConnection to use. */ 138 private JMSConnection jmsConnection; 139 140 /** The (singleton) HarvestController that handles the non-JMS parts of a harvest. */ 141 private final HarvestController controller; 142 143 /** Jobs are fetched from this queue. */ 144 private ChannelID jobChannel; 145 146 /** Min. space required to start a job. */ 147 private final long minSpaceRequired; 148 149 /** the serverdir, where the harvesting takes place. */ 150 private final File serverDir; 151 152 /** 153 * This is true while a doOneCrawl is running. No jobs are accepted while this is running. 154 */ 155 private CrawlStatus status; 156 157 /** 158 * In this constructor, the server creates an instance of the HarvestController, uploads any arc-files from 159 * incomplete harvests. Then it starts listening for new doOneCrawl messages, unless there is no available space. In 160 * that case, it sends a notification to the administrator and pauses. 161 * 162 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created. 163 * @throws IOFailure If harvestInfoFile contains invalid data. 164 * @throws UnknownID if the settings file does not specify a valid queue priority. 165 */ 166 private HarvestControllerServer() throws IOFailure { 167 log.info(STARTING_MESSAGE); 168 169 log.info("Bound to harvest channel '{}'", CHANNEL); 170 171 // Make sure serverdir (where active crawl-dirs live) and oldJobsDir 172 // (where old crawl dirs are stored) exist. 173 serverDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 174 ApplicationUtils.dirMustExist(serverDir); 175 log.info("Serverdir: '{}'", serverDir); 176 minSpaceRequired = Settings.getLong(HarvesterSettings.HARVEST_SERVERDIR_MINSPACE); 177 if (minSpaceRequired <= 0L) { 178 log.warn("Wrong setting of minSpaceLeft read from Settings: {}", minSpaceRequired); 179 throw new ArgumentNotValid("Wrong setting of minSpaceLeft read from Settings: " + minSpaceRequired); 180 } 181 log.info("Harvesting requires at least {} bytes free.", minSpaceRequired); 182 183 controller = HarvestController.getInstance(); 184 185 // Set properties "heritrix.version" and 186 // "org.archive.crawler.frontier.AbstractFrontier 187 // .queue-assignment-policy" 188 System.setProperty(HERITRIX_VERSION_PROPERTY, Constants.getHeritrixVersionString()); 189 System.setProperty(HERITRIX_QUEUE_ASSIGNMENT_POLICY_PROPERTY, 190 "org.archive.crawler.frontier.HostnameQueueAssignmentPolicy," 191 + "org.archive.crawler.frontier.IPQueueAssignmentPolicy," 192 + "org.archive.crawler.frontier.BucketQueueAssignmentPolicy," + "org.archive.crawler.frontier" 193 + ".SurtAuthorityQueueAssignmentPolicy," + DomainnameQueueAssignmentPolicy.class.getName() 194 + "," + SeedUriDomainnameQueueAssignmentPolicy.class.getName()); 195 196 // Get JMS-connection 197 // Channel THIS_CLIENT is only used for replies to store messages so 198 // do not set as listener here. It is registered in the arcrepository 199 // client. 200 // Channel ANY_xxxPRIORIRY_HACO is used for listening for jobs, and 201 // registered below. 202 203 jmsConnection = JMSConnectionFactory.getInstance(); 204 log.debug("Obtained JMS connection."); 205 206 status = new CrawlStatus(); 207 208 // If any unprocessed jobs are left on the server, process them now 209 processOldJobs(); 210 211 // Register for listening to harvest channel validity responses 212 JMSConnectionFactory.getInstance().setListener(HARVEST_CHAN_VALID_RESP_ID, this); 213 214 // Ask if the channel this harvester is assigned to is valid 215 jmsConnection.send(new HarvesterRegistrationRequest(HarvestControllerServer.CHANNEL, applicationInstanceId)); 216 log.info("Requested to check the validity of harvest channel '{}'", HarvestControllerServer.CHANNEL); 217 } 218 219 /** 220 * Returns or creates the unique instance of this singleton The server creates an instance of the HarvestController, 221 * uploads arc-files from unfinished harvests, and starts to listen to JMS messages on the incoming jms queues. 222 * 223 * @return The instance 224 * @throws PermissionDenied If the serverdir or oldjobsdir can't be created 225 * @throws IOFailure if data from old harvests exist, but contain illegal data 226 */ 227 public static synchronized HarvestControllerServer getInstance() throws IOFailure { 228 if (instance == null) { 229 instance = new HarvestControllerServer(); 230 } 231 return instance; 232 } 233 234 /** 235 * Release all jms connections. Close the Controller 236 */ 237 public synchronized void close() { 238 log.info(CLOSING_MESSAGE); 239 cleanup(); 240 log.info(CLOSED_MESSAGE); 241 } 242 243 /** 244 * Will be called on shutdown. 245 * 246 * @see CleanupIF#cleanup() 247 */ 248 public void cleanup() { 249 if (controller != null) { 250 controller.cleanup(); 251 } 252 if (jmsConnection != null) { 253 jmsConnection.removeListener(HARVEST_CHAN_VALID_RESP_ID, this); 254 if (jobChannel != null) { 255 jmsConnection.removeListener(jobChannel, this); 256 } 257 } 258 259 // Stop the sending of status messages 260 status.stopSending(); 261 262 instance = null; 263 } 264 265 /** 266 * Looks for old job directories that await uploading. 267 */ 268 private void processOldJobs() { 269 // Search through all crawldirs and process PersistentJobData 270 // files in them 271 File crawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 272 File[] subdirs = crawlDir.listFiles(); 273 for (File oldCrawlDir : subdirs) { 274 if (PersistentJobData.existsIn(oldCrawlDir)) { 275 // Assume that crawl had not ended at this point so 276 // job must be marked as failed 277 final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath() 278 + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data."; 279 log.warn(msg); 280 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 281 processHarvestInfoFile(oldCrawlDir, new IOFailure("Crawl probably interrupted by " 282 + "shutdown of HarvestController")); 283 } 284 } 285 } 286 287 /** 288 * Checks that we're available to do a crawl, and if so, marks us as unavailable, checks that the job message is 289 * well-formed, and starts the thread that the crawl happens in. If an error occurs starting the crawl, we will 290 * start listening for messages again. 291 * <p> 292 * The sequence of actions involved in a crawl are:</br> 1. If we are already running, resend the job to the queue 293 * and return</br> 2. Check the job for validity</br> 3. Send a CrawlStatus message that crawl has STARTED</br> In a 294 * separate thread:</br> 4. Unregister this HACO as listener</br> 5. Create a new crawldir (based on the JobID and a 295 * timestamp)</br> 6. Write a harvestInfoFile (using JobID and crawldir) and metadata</br> 7. Instantiate a new 296 * HeritrixLauncher</br> 8. Start a crawl</br> 9. Store the generated arc-files and metadata in the known 297 * bit-archives </br>10. _Always_ send CrawlStatus DONE or FAILED</br> 11. Move crawldir into oldJobs dir</br> 298 * 299 * @param msg The crawl job 300 * @throws IOFailure On trouble harvesting, uploading or processing harvestInfo 301 * @throws UnknownID if jobID is null in the message 302 * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED 303 * @throws PermissionDenied 304 * @see #visit(DoOneCrawlMessage) for more details 305 */ 306 private void onDoOneCrawl(final DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, 307 PermissionDenied { 308 // Only one doOneCrawl at a time. Returning should almost never happen, 309 // since we deregister the listener, but we may receive another message 310 // before the listener is removed. Also, if the job message is 311 // malformed or starting the crawl fails, we re-add the listener. 312 synchronized (this) { 313 if (status.isRunning()) { 314 log.warn( 315 "Received crawl request, but sent it back to queue, as another crawl is already running: '{}'", 316 msg); 317 jmsConnection.resend(msg, jobChannel); 318 try { 319 // Wait a second before listening again, so the message has 320 // a chance of getting snatched by another harvester. 321 Thread.sleep(TimeUtils.SECOND_IN_MILLIS); 322 } catch (InterruptedException e) { 323 // Since we're not waiting for anything critical, we can 324 // ignore this exception. 325 } 326 return; 327 } 328 stopAcceptingJobs(); 329 } 330 331 Thread t = null; 332 333 // This 'try' matches a finally that restores running=false if we don't 334 // start a crawl after all 335 try { 336 final Job job = msg.getJob(); 337 338 // Every job must have an ID or we can do nothing with it, not even 339 // send a proper failure message back. 340 Long jobID = job.getJobID(); 341 if (jobID == null) { 342 log.warn("DoOneCrawlMessage arrived without JobID: '{}'", msg.toString()); 343 throw new UnknownID("DoOneCrawlMessage arrived without JobID"); 344 } 345 346 log.info("Received crawlrequest for job {}: '{}'", jobID, msg); 347 348 // Send message to scheduler that job is started 349 CrawlStatusMessage csmStarted = new CrawlStatusMessage(jobID, JobStatus.STARTED); 350 jmsConnection.send(csmStarted); 351 352 // Jobs should arrive with status "submitted". If this is not the 353 // case, log the error and send a job-failed message back. 354 // HarvestScheduler likes getting a STARTED message before 355 // FAILED, so we oblige it here. 356 if (job.getStatus() != JobStatus.SUBMITTED) { 357 String message = "Message '" + msg.toString() + "' arrived with" + " status " + job.getStatus() 358 + " for job " + jobID + ", should have been STATUS_SUBMITTED"; 359 log.warn(message); 360 sendErrorMessage(jobID, message, message); 361 throw new ArgumentNotValid(message); 362 } 363 364 final List<MetadataEntry> metadataEntries = msg.getMetadata(); 365 366 Thread t1; 367 // Create thread in which harvesting will occur 368 t1 = new HarvesterThread(job, msg.getOrigHarvestInfo(), metadataEntries); 369 // start thread which will remove this listener, harvest, store, and 370 // exit the VM 371 t1.start(); 372 log.info("Started harvester thread for job {}", jobID); 373 // We delay assigning the thread variable until start() has 374 // succeeded. Thus, if start() fails, we will resume accepting 375 // jobs. 376 t = t1; 377 } finally { 378 // If we didn't start a thread for crawling after all, accept more 379 // messages 380 if (t == null) { 381 startAcceptingJobs(); 382 } 383 } 384 // Now return from this method letting the thread do the work. 385 // This is important as it allows us to receive upload-replies from 386 // THIS_CLIENT in the crawl thread. 387 } 388 389 /** 390 * Sends a CrawlStatusMessage for a failed job with the given short message and detailed message. 391 * 392 * @param jobID ID of the job that failed 393 * @param message A short message indicating what went wrong 394 * @param detailedMessage A more detailed message detailing why it went wrong. 395 */ 396 private void sendErrorMessage(long jobID, String message, String detailedMessage) { 397 CrawlStatusMessage csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, null); 398 csm.setHarvestErrors(message); 399 csm.setHarvestErrorDetails(detailedMessage); 400 jmsConnection.send(csm); 401 } 402 403 /** 404 * Stop accepting more jobs. After this is called, all crawl messages received will be resent to the queue. A bit 405 * further down, we will stop listening altogether, but that requires another thread. 406 */ 407 private synchronized void stopAcceptingJobs() { 408 status.setRunning(true); 409 log.debug("No longer accepting jobs."); 410 } 411 412 /** 413 * Start listening for new crawl requests again. This actually doesn't re-add a listener, but the listener only gets 414 * removed when we're so far committed that we're going to exit at the end. So to start accepting jobs again, we 415 * stop resending messages we get. 416 */ 417 private synchronized void startAcceptingJobs() { 418 // allow this haco to receive messages 419 status.setRunning(false); 420 } 421 422 /** 423 * Stop listening for new crawl requests. 424 */ 425 private void removeListener() { 426 log.debug("Removing listener on CHANNEL '{}'", jobChannel); 427 jmsConnection.removeListener(jobChannel, this); 428 } 429 430 /** Start listening for crawls, if space available. */ 431 private void beginListeningIfSpaceAvailable() { 432 long availableSpace = FileUtils.getBytesFree(serverDir); 433 if (availableSpace > minSpaceRequired) { 434 log.info("Starts to listen to new jobs on queue '{}'", jobChannel); 435 jmsConnection.setListener(jobChannel, this); 436 log.info(STARTED_MESSAGE); 437 } else { 438 String pausedMessage = "Not enough available diskspace. Only " + availableSpace + " bytes available." 439 + " Harvester is paused."; 440 log.error(pausedMessage); 441 NotificationsFactory.getInstance().notify(pausedMessage, NotificationType.ERROR); 442 } 443 } 444 445 /** 446 * Adds error messages from an exception to the status message errors. 447 * 448 * @param csm The message we're setting messages on. 449 * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix. 450 * @param errorMessage Description of errors that happened during upload. 451 * @param missingHostsReport If true, no hosts report was found. 452 * @param failedFiles List of files that failed to upload. 453 */ 454 private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage, 455 boolean missingHostsReport, int failedFiles) { 456 if (crawlException != null) { 457 csm.setHarvestErrors(crawlException.toString()); 458 csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException)); 459 } 460 if (errorMessage.length() > 0) { 461 String shortDesc = ""; 462 if (missingHostsReport) { 463 shortDesc = "No hosts report found"; 464 } 465 if (failedFiles > 0) { 466 if (shortDesc.length() > 0) { 467 shortDesc += ", "; 468 } 469 shortDesc += failedFiles + " files failed to upload"; 470 } 471 csm.setUploadErrors(shortDesc); 472 csm.setUploadErrorDetails(errorMessage); 473 } 474 } 475 476 /** 477 * Receives a DoOneCrawlMessage and call onDoOneCrawl. 478 * 479 * @param msg the message received 480 * @throws IOFailure if the crawl fails if unable to write to harvestInfoFile 481 * @throws UnknownID if jobID is null in the message 482 * @throws ArgumentNotValid if the status of the job is not valid - must be SUBMITTED 483 * @throws PermissionDenied if the crawldir can't be created 484 */ 485 public void visit(DoOneCrawlMessage msg) throws IOFailure, UnknownID, ArgumentNotValid, PermissionDenied { 486 onDoOneCrawl(msg); 487 } 488 489 @Override 490 public void visit(HarvesterRegistrationResponse msg) { 491 492 // If we have already started or the message notifies for another channel, 493 // resend it. 494 String channelName = msg.getHarvestChannelName(); 495 if (status.isChannelValid() || !CHANNEL.equals(channelName)) { 496 // Controller has already started 497 jmsConnection.resend(msg, msg.getTo()); 498 if (log.isDebugEnabled()) { 499 log.debug("Resending harvest channel validity message for channel '{}'", channelName); 500 } 501 return; 502 } 503 504 if (!msg.isValid()) { 505 log.error("Received message stating that channel '{}' is invalid. Will stop. " 506 + "Probable cause: the channel is not one of the known channels stored in the channels table", channelName); 507 close(); 508 return; 509 } 510 511 log.info("Received message stating that channel '{}' is valid.", channelName); 512 // Environment and connections are now ready for processing of messages 513 jobChannel = HarvesterChannels.getHarvestJobChannelId(channelName, msg.isSnapshot()); 514 515 // Only listen for harvester jobs if enough available space 516 beginListeningIfSpaceAvailable(); 517 518 // Notify the harvest dispatcher that we are ready 519 startAcceptingJobs(); 520 status.startSending(); 521 } 522 523 /** 524 * Processes an existing harvestInfoFile:</br> 525 * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br> 526 * 2. finds JobId and arcsdir</br> 3. calls storeArcFiles</br> 4. moves harvestdir to oldjobs 527 * and deletes crawl.log and other superfluous files. 528 * 529 * @param crawlDir The location of harvest-info to be processed 530 * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be 531 * null for success) 532 * @throws IOFailure if the file cannot be read 533 */ 534 private void processHarvestInfoFile(File crawlDir, Throwable crawlException) throws IOFailure { 535 log.debug("Post-processing files in '{}'", crawlDir.getAbsolutePath()); 536 if (!PersistentJobData.existsIn(crawlDir)) { 537 throw new IOFailure("No harvestInfo found in directory: " + crawlDir.getAbsolutePath()); 538 } 539 540 PersistentJobData harvestInfo = new PersistentJobData(crawlDir); 541 Long jobID = harvestInfo.getJobID(); 542 543 StringBuilder errorMessage = new StringBuilder(); 544 HarvestReport dhr = null; 545 List<File> failedFiles = new ArrayList<File>(); 546 HeritrixFiles files = HeritrixFiles.getH1HeritrixFilesWithDefaultJmxFiles(crawlDir, harvestInfo); 547 548 try { 549 log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID); 550 dhr = controller.storeFiles(files, errorMessage, failedFiles); 551 } catch (Exception e) { 552 String msg = "Trouble during postprocessing of files in '" + crawlDir.getAbsolutePath() + "'"; 553 log.warn(msg, e); 554 errorMessage.append(e.getMessage()).append("\n"); 555 // send a mail about this problem 556 NotificationsFactory.getInstance().notify( 557 msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(), 558 NotificationType.ERROR, e); 559 } finally { 560 // Send a done or failed message back to harvest scheduler 561 // FindBugs claims a load of known null value here, but that 562 // will not be the case if storeFiles() succeeds. 563 CrawlStatusMessage csm; 564 565 if (crawlException == null && errorMessage.length() == 0) { 566 log.info("Job with ID {} finished with status DONE", jobID); 567 csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr); 568 } else { 569 log.warn("Job with ID {} finished with status FAILED", jobID); 570 csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr); 571 setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size()); 572 } 573 try { 574 jmsConnection.send(csm); 575 if (crawlException == null && errorMessage.length() == 0) { 576 files.deleteFinalLogs(); 577 } 578 } finally { 579 // Delete superfluous files and move the rest to oldjobs 580 // Cleanup is in an extra finally, because it is large amounts 581 // of data we need to remove, even on send trouble. 582 log.info("Cleanup after harvesting job with id: {}.", jobID); 583 files.cleanUpAfterHarvest(new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR))); 584 } 585 } 586 log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath()); 587 } 588 589 /** 590 * A Thread class for the actual harvesting. This is required in order to stop listening while we're busy 591 * harvesting, since JMS doesn't allow the called thread to remove the listener that was called. 592 */ 593 private class HarvesterThread extends Thread { 594 595 /** The harvester Job in this thread. */ 596 private final Job job; 597 598 /** Stores documentary information about the harvest. */ 599 private final HarvestDefinitionInfo origHarvestInfo; 600 601 /** The list of metadata associated with this Job. */ 602 private final List<MetadataEntry> metadataEntries; 603 604 /** 605 * Constructor for the HarvesterThread class. 606 * 607 * @param job a harvesting job 608 * @param origHarvestInfo Info about the harvestdefinition that scheduled this job 609 * @param metadataEntries metadata associated with the given job 610 */ 611 public HarvesterThread(Job job, HarvestDefinitionInfo origHarvestInfo, List<MetadataEntry> metadataEntries) { 612 this.job = job; 613 this.origHarvestInfo = origHarvestInfo; 614 this.metadataEntries = metadataEntries; 615 } 616 617 /** 618 * The thread body for the harvester thread. Removes the JMS listener, sets up the files for Heritrix, then 619 * passes control to the HarvestController to perform the actual harvest. 620 * <p> 621 * TODO Get file writing into HarvestController as well (requires some rearrangement of the message sending) 622 * 623 * @throws PermissionDenied if we cannot create the crawl directory. 624 * @throws IOFailure if there are problems preparing or running the crawl. 625 */ 626 public void run() { 627 try { 628 // We must remove the listener inside a thread, 629 // as JMS doesn't allow us to remove it within the 630 // call it made. 631 removeListener(); 632 633 File crawlDir = createCrawlDir(); 634 635 final HeritrixFiles files = controller.writeHarvestFiles(crawlDir, job, origHarvestInfo, 636 metadataEntries); 637 638 log.info(STARTCRAWL_MESSAGE + " {}", job.getJobID()); 639 640 Throwable crawlException = null; 641 try { 642 controller.runHarvest(files); 643 } catch (Throwable e) { 644 String msg = "Error during crawling. The crawl may have been only partially completed."; 645 log.warn(msg, e); 646 crawlException = e; 647 throw new IOFailure(msg, e); 648 } finally { 649 // This handles some message sending, so it must live 650 // in HCS for now, but the meat of it should be in 651 // HarvestController 652 // TODO Refactor to be able to move this out. 653 // TODO This may overwrite another exception, since this 654 // may throw exceptions. 655 processHarvestInfoFile(files.getCrawlDir(), crawlException); 656 } 657 } catch (Throwable t) { 658 String msg = "Fatal error while operating job '" + job + "'"; 659 log.error(msg, t); 660 NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR, t); 661 } finally { 662 log.info(ENDCRAWL_MESSAGE + " {}", job.getJobID()); 663 // process serverdir for files not yet uploaded. 664 processOldJobs(); 665 shutdownNowOrContinue(); 666 startAcceptingJobs(); 667 beginListeningIfSpaceAvailable(); 668 } 669 } 670 671 /** 672 * Does the operator want us to shutdown now. TODO In a later implementation, the harvestControllerServer could 673 * be notified over JMX. Now we just look for a "shutdown.txt" file in the HARVEST_CONTROLLER_SERVERDIR 674 */ 675 private void shutdownNowOrContinue() { 676 File shutdownFile = new File(serverDir, "shutdown.txt"); 677 if (shutdownFile.exists()) { 678 log.info("Found shutdown-file in serverdir - " + "shutting down the application"); 679 instance.cleanup(); 680 System.exit(0); 681 } 682 } 683 684 /** 685 * Create the crawl dir, but make sure a message is sent if there is a problem. 686 * 687 * @return The directory that the crawl will take place in. 688 * @throws PermissionDenied if the directory cannot be created. 689 */ 690 private File createCrawlDir() { 691 // The directory where arcfiles are stored (crawldir in the above 692 // description) 693 File crawlDir = null; 694 // Create the crawldir. This is done here in order to be able 695 // to send a proper message if something goes wrong. 696 try { 697 File baseCrawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 698 crawlDir = new File(baseCrawlDir, job.getJobID() + "_" + System.currentTimeMillis()); 699 FileUtils.createDir(crawlDir); 700 log.info("Created crawl directory: '{}'", crawlDir); 701 return crawlDir; 702 } catch (PermissionDenied e) { 703 String message = "Couldn't create the directory for job " + job.getJobID(); 704 log.warn(message, e); 705 sendErrorMessage(job.getJobID(), message, ExceptionUtils.getStackTrace(e)); 706 throw e; 707 } 708 } 709 } 710 711 /** 712 * Used for maintaining the running status of the crawling, is it running or not. Will also take care of notifying 713 * the HarvestJobManager of the status. 714 */ 715 private class CrawlStatus { 716 717 /** The status. */ 718 private boolean running = false; 719 720 private boolean channelIsValid = false; 721 722 /** Handles the periodic sending of status messages. */ 723 private PeriodicTaskExecutor statusTransmitter; 724 725 private final int SEND_READY_DELAY = Settings.getInt(HarvesterSettings.SEND_READY_DELAY); 726 727 /** 728 * Returns <code>true</code> if the a doOneCrawl is running, else <code>false</code>. 729 * 730 * @return Whether a crawl running. 731 */ 732 public boolean isRunning() { 733 return running; 734 } 735 736 /** 737 * Use for changing the running state. 738 * 739 * @param running The new status 740 */ 741 public synchronized void setRunning(boolean running) { 742 this.running = running; 743 } 744 745 /** 746 * @return the channelIsValid 747 */ 748 protected final boolean isChannelValid() { 749 return channelIsValid; 750 } 751 752 /** 753 * Starts the sending of status messages. 754 */ 755 public void startSending() { 756 this.channelIsValid = true; 757 statusTransmitter = new PeriodicTaskExecutor("HarvesterStatus", new Runnable() { 758 public void run() { 759 sendStatus(); 760 } 761 }, 0, Settings.getInt(HarvesterSettings.SEND_READY_INTERVAL)); 762 } 763 764 /** 765 * Stops the sending of status messages. 766 */ 767 public void stopSending() { 768 if (statusTransmitter != null) { 769 statusTransmitter.shutdown(); 770 statusTransmitter = null; 771 } 772 } 773 774 /** 775 * Send a HarvesterReadyMessage to the HarvestJobManager. 776 */ 777 private synchronized void sendStatus() { 778 try { 779 Thread.sleep(SEND_READY_DELAY); 780 } catch (Exception e) { 781 log.error("Unable to sleep", e); 782 } 783 if (!running) { 784 jmsConnection.send(new HarvesterReadyMessage(applicationInstanceId + " on " + physicalServerName, 785 HarvestControllerServer.CHANNEL)); 786 } 787 } 788 } 789 790}