001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.harvesting.controller; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029 030import javax.management.Attribute; 031import javax.management.InstanceNotFoundException; 032import javax.management.MBeanException; 033import javax.management.MBeanServerConnection; 034import javax.management.ReflectionException; 035import javax.management.openmbean.CompositeData; 036import javax.management.openmbean.TabularData; 037import javax.management.remote.JMXConnector; 038 039import org.archive.crawler.framework.CrawlController; 040import org.archive.util.JmxUtils; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import dk.netarkivet.common.exceptions.HarvestingAbort; 045import dk.netarkivet.common.exceptions.IOFailure; 046import dk.netarkivet.common.exceptions.IllegalState; 047import dk.netarkivet.common.exceptions.NotImplementedException; 048import dk.netarkivet.common.exceptions.UnknownID; 049import dk.netarkivet.common.utils.JMXUtils; 050import dk.netarkivet.common.utils.Settings; 051import dk.netarkivet.common.utils.StringUtils; 052import dk.netarkivet.common.utils.SystemUtils; 053import dk.netarkivet.common.utils.TimeUtils; 054import dk.netarkivet.harvester.HarvesterSettings; 055import dk.netarkivet.harvester.harvesting.HeritrixFiles; 056import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 057import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceInfo; 058import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceJobInfo; 059import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus; 060import dk.netarkivet.harvester.harvesting.frontier.FullFrontierReport; 061 062/** 063 * This implementation of the HeritrixController interface starts Heritrix as a separate process and uses JMX to 064 * communicate with it. Each instance executes exactly one process that runs exactly one crawl job. 065 */ 066public class BnfHeritrixController extends AbstractJMXHeritrixController { 067 068 /** The logger for this class. */ 069 private static final Logger log = LoggerFactory.getLogger(BnfHeritrixController.class); 070 071 /** 072 * The below commands and attributes are copied from the attributes and operations exhibited by the Heritrix MBeans 073 * of type CrawlJob and CrawlService.Job, as they appear in JConsole. 074 * <p> 075 * Only operations and attributes used in NAS are listed. 076 */ 077 private static enum CrawlServiceAttribute { 078 /** The number of alerts raised by Heritrix. */ 079 AlertCount, 080 /** True if Heritrix is currently crawling, false otherwise. */ 081 IsCrawling, 082 /** The ID of the job being currently crawled by Heritrix. */ 083 CurrentJob; 084 085 /** 086 * Returns the {@link CrawlServiceAttribute} enum value matching the given name. Throws {@link UnknownID} if no 087 * match is found. 088 * 089 * @param name the attribute name 090 * @return the corresponding {@link CrawlServiceAttribute} enum value. 091 */ 092 public static CrawlServiceAttribute fromString(String name) { 093 for (CrawlServiceAttribute att : values()) { 094 if (att.name().equals(name)) { 095 return att; 096 } 097 } 098 throw new UnknownID(name + " : unknown CrawlServiceAttribute !"); 099 } 100 } 101 102 /** 103 * Enum listing the different job attributes available. 104 */ 105 private static enum CrawlServiceJobAttribute { 106 /** The time in seconds elapsed since the crawl began. */ 107 CrawlTime, 108 /** The current download rate in URI/s. */ 109 CurrentDocRate, 110 /** The current download rate in kB/s. */ 111 CurrentKbRate, 112 /** The number of URIs discovered by Heritrix. */ 113 DiscoveredCount, 114 /** The average download rate in URI/s. */ 115 DocRate, 116 /** The number of URIs downloaded by Heritrix. */ 117 DownloadedCount, 118 /** A string summarizing the Heritrix frontier. */ 119 FrontierShortReport, 120 /** The average download rate in kB/s. */ 121 KbRate, 122 /** The job status (Heritrix status). */ 123 Status, 124 /** The number of active toe threads. */ 125 ThreadCount; 126 127 /** 128 * Returns the {@link CrawlServiceJobAttribute} enum value matching the given name. Throws {@link UnknownID} if 129 * no match is found. 130 * 131 * @param name the attribute name 132 * @return the corresponding {@link CrawlServiceJobAttribute} enum value. 133 */ 134 public static CrawlServiceJobAttribute fromString(String name) { 135 for (CrawlServiceJobAttribute att : values()) { 136 if (att.name().equals(name)) { 137 return att; 138 } 139 } 140 throw new UnknownID(name + " : unknown CrawlServiceJobAttribute !"); 141 } 142 } 143 144 /** 145 * Enum class defining the general operations available to the Heritrix operator. 146 */ 147 private static enum CrawlServiceOperation { 148 /** Adds a new job to an Heritrix instance. */ 149 addJob, 150 /** Fetches the identifiers of pending jobs. */ 151 pendingJobs, 152 /** Fetches the identifiers of completed jobs. */ 153 completedJobs, 154 /** Shuts down an Heritrix instance. */ 155 shutdown, 156 /** Instructs an Heritrix instance to starts crawling jobs. */ 157 startCrawling, 158 /** Instructs an Heritrix instance to terminate the current job. */ 159 terminateCurrentJob; 160 } 161 162 /** 163 * Enum class defining the Job-operations available to the Heritrix operator. 164 */ 165 private static enum CrawlServiceJobOperation { 166 /** Fetches the progress statistics string from an Heritrix instance. */ 167 progressStatistics, 168 /** 169 * Fetches the progress statistics legend string from an Heritrix instance. 170 */ 171 progressStatisticsLegend, 172 /** Fetches the frontier report. */ 173 frontierReport; 174 } 175 176 /** 177 * Shall we abort, if we lose the connection to Heritrix. 178 */ 179 private static final boolean ABORT_IF_CONN_LOST = Settings.getBoolean(HarvesterSettings.ABORT_IF_CONNECTION_LOST); 180 181 /** 182 * The part of the Job MBean name that designates the unique id. For some reason, this is not included in the normal 183 * Heritrix definitions in JmxUtils, otherwise we wouldn't have to define it. I have committed a feature request: 184 * http://webteam.archive.org/jira/browse/HER-1618 185 */ 186 private static final String UID_PROPERTY = "uid"; 187 188 /** 189 * The name that Heritrix gives to the job we ask it to create. This is part of the name of the MBean for that job, 190 * but we can only retrieve the name after the MBean has been created. 191 */ 192 private String jobName; 193 194 /** The header line (legend) for the statistics report. */ 195 private String progressStatisticsLegend; 196 197 /** The connector to the Heritrix MBeanServer. */ 198 private JMXConnector jmxConnector; 199 200 /** Max tries for a JMX operation. */ 201 private final int jmxMaxTries = JMXUtils.getMaxTries(); 202 203 /** The name of the MBean for the submitted job. */ 204 private String crawlServiceJobBeanName; 205 206 /** The name of the main Heritrix MBean. */ 207 private String crawlServiceBeanName; 208 209 /** 210 * Create a BnfHeritrixController object. 211 * 212 * @param files Files that are used to set up Heritrix. 213 */ 214 public BnfHeritrixController(HeritrixFiles files) { 215 super(files); 216 } 217 218 /** 219 * Initialize the JMXconnection to the Heritrix. 220 * 221 * @throws IOFailure If Heritrix dies before initialization, or we encounter any problems during the initialization. 222 * @see HeritrixController#initialize() 223 */ 224 @Override 225 public void initialize() { 226 if (processHasExited()) { 227 String errMsg = "Heritrix process of " + this + " died before initialization"; 228 log.warn(errMsg); 229 throw new IOFailure(errMsg); 230 } 231 232 log.info("Abort, if we lose the connection to Heritrix, is {}", ABORT_IF_CONN_LOST); 233 initJMXConnection(); 234 235 log.info("JMX connection initialized successfully"); 236 237 crawlServiceBeanName = "org.archive.crawler:" + JmxUtils.NAME + "=Heritrix," + JmxUtils.TYPE + "=CrawlService," 238 + JmxUtils.JMX_PORT + "=" + getJmxPort() + "," + JmxUtils.GUI_PORT + "=" + getGuiPort() + "," 239 + JmxUtils.HOST + "=" + getHostName(); 240 241 // We want to be sure there are no jobs when starting, in case we got 242 // an old Heritrix or somebody added jobs behind our back. 243 TabularData doneJobs = (TabularData) executeMBeanOperation(CrawlServiceOperation.completedJobs); 244 TabularData pendingJobs = (TabularData) executeMBeanOperation(CrawlServiceOperation.pendingJobs); 245 if (doneJobs != null && doneJobs.size() > 0 || pendingJobs != null && pendingJobs.size() > 0) { 246 throw new IllegalState("This Heritrix instance is in a illegalState! " 247 + "This instance has either old done jobs (" + doneJobs + "), or old pending jobs (" + pendingJobs 248 + ")."); 249 } 250 // From here on, we can assume there's only the one job we make. 251 // We'll use the arc file prefix to name the job, since the prefix 252 // already contains the harvest id and job id. 253 HeritrixFiles files = getHeritrixFiles(); 254 executeMBeanOperation(CrawlServiceOperation.addJob, files.getOrderXmlFile().getAbsolutePath(), 255 files.getArchiveFilePrefix(), getJobDescription(), files.getSeedsTxtFile().getAbsolutePath()); 256 257 jobName = getJobName(); 258 259 crawlServiceJobBeanName = "org.archive.crawler:" + JmxUtils.NAME + "=" + jobName + "," + JmxUtils.TYPE 260 + "=CrawlService.Job," + JmxUtils.JMX_PORT + "=" + getJmxPort() + "," + JmxUtils.MOTHER + "=Heritrix," 261 + JmxUtils.HOST + "=" + getHostName(); 262 } 263 264 @Override 265 public void requestCrawlStart() { 266 executeMBeanOperation(CrawlServiceOperation.startCrawling); 267 } 268 269 @Override 270 public void requestCrawlStop(String reason) { 271 executeMBeanOperation(CrawlServiceOperation.terminateCurrentJob); 272 } 273 274 /** 275 * Return the URL for monitoring this instance. 276 * 277 * @return the URL for monitoring this instance. 278 */ 279 public String getHeritrixConsoleURL() { 280 return "http://" + SystemUtils.getLocalHostName() + ":" + getGuiPort(); 281 } 282 283 /** 284 * Cleanup after an Heritrix process. This entails sending the shutdown command to the Heritrix process, and killing 285 * it forcefully, if it is still alive after waiting the period of time specified by the 286 * CommonSettings.PROCESS_TIMEOUT setting. 287 * 288 * @param crawlDir the crawldir to cleanup 289 * @see HeritrixController#cleanup() 290 */ 291 public void cleanup(File crawlDir) { 292 // Before cleaning up, we need to wait for the reports to be generated 293 waitForReportGeneration(crawlDir); 294 295 try { 296 executeMBeanOperation(CrawlServiceOperation.shutdown); 297 } catch (IOFailure e) { 298 log.error("JMX error while cleaning up Heritrix controller", e); 299 } 300 301 closeJMXConnection(); 302 303 waitForHeritrixProcessExit(); 304 } 305 306 /** 307 * Return the URL for monitoring this instance. 308 * 309 * @return the URL for monitoring this instance. 310 */ 311 public String getAdminInterfaceUrl() { 312 return "http://" + SystemUtils.getLocalHostName() + ":" + getGuiPort(); 313 } 314 315 /** 316 * Gets a message that stores the information summarizing the crawl progress. 317 * 318 * @return a message that stores the information summarizing the crawl progress. 319 */ 320 public CrawlProgressMessage getCrawlProgress() { 321 HeritrixFiles files = getHeritrixFiles(); 322 CrawlProgressMessage cpm = new CrawlProgressMessage(files.getHarvestID(), files.getJobID(), 323 progressStatisticsLegend); 324 325 cpm.setHostUrl(getHeritrixConsoleURL()); 326 327 getCrawlServiceAttributes(cpm); 328 329 if (cpm.crawlIsFinished()) { 330 cpm.setStatus(CrawlStatus.CRAWLING_FINISHED); 331 // No need to go further, CrawlService.Job bean does not exist 332 return cpm; 333 } 334 335 fetchCrawlServiceJobAttributes(cpm); 336 337 return cpm; 338 } 339 340 /** 341 * Retrieve the values of the crawl service attributes and add them to the CrawlProgressMessage being put together. 342 * 343 * @param cpm the crawlProgress message being prepared 344 */ 345 private void getCrawlServiceAttributes(CrawlProgressMessage cpm) { 346 List<Attribute> heritrixAtts = getMBeanAttributes(new CrawlServiceAttribute[] { 347 CrawlServiceAttribute.AlertCount, CrawlServiceAttribute.IsCrawling, CrawlServiceAttribute.CurrentJob}); 348 349 CrawlServiceInfo hStatus = cpm.getHeritrixStatus(); 350 for (Attribute att : heritrixAtts) { 351 Object value = att.getValue(); 352 CrawlServiceAttribute crawlServiceAttribute = CrawlServiceAttribute.fromString(att.getName()); 353 switch (crawlServiceAttribute) { 354 case AlertCount: 355 Integer alertCount = -1; 356 if (value != null) { 357 alertCount = (Integer) value; 358 } 359 hStatus.setAlertCount(alertCount); 360 break; 361 case CurrentJob: 362 String newCurrentJob = ""; 363 if (value != null) { 364 newCurrentJob = (String) value; 365 } 366 hStatus.setCurrentJob(newCurrentJob); 367 break; 368 case IsCrawling: 369 Boolean newCrawling = false; 370 if (value != null) { 371 newCrawling = (Boolean) value; 372 } 373 hStatus.setCrawling(newCrawling); 374 break; 375 default: 376 log.debug("Unhandled attribute: {}", crawlServiceAttribute); 377 } 378 } 379 } 380 381 /** 382 * Retrieve the values of the crawl service job attributes and add them to the CrawlProgressMessage being put 383 * together. 384 * 385 * @param cpm the crawlProgress message being prepared 386 */ 387 private void fetchCrawlServiceJobAttributes(CrawlProgressMessage cpm) { 388 String progressStats = (String) executeMBeanOperation(CrawlServiceJobOperation.progressStatistics); 389 CrawlServiceJobInfo jStatus = cpm.getJobStatus(); 390 String newProgressStats = "?"; 391 if (progressStats != null) { 392 newProgressStats = progressStats; 393 } 394 jStatus.setProgressStatistics(newProgressStats); 395 396 if (progressStatisticsLegend == null) { 397 progressStatisticsLegend = (String) executeMBeanOperation(CrawlServiceJobOperation.progressStatisticsLegend); 398 } 399 400 List<Attribute> jobAtts = getMBeanAttributes(CrawlServiceJobAttribute.values()); 401 402 for (Attribute att : jobAtts) { 403 Object value = att.getValue(); 404 CrawlServiceJobAttribute aCrawlServiceJobAttribute = CrawlServiceJobAttribute.fromString(att.getName()); 405 switch (aCrawlServiceJobAttribute) { 406 case CrawlTime: 407 Long elapsedSeconds = -1L; 408 if (value != null) { 409 elapsedSeconds = (Long) value; 410 } 411 jStatus.setElapsedSeconds(elapsedSeconds); 412 break; 413 case CurrentDocRate: 414 Double processedDocsPerSec = new Double(-1L); 415 if (value != null) { 416 processedDocsPerSec = (Double) value; 417 } 418 jStatus.setCurrentProcessedDocsPerSec(processedDocsPerSec); 419 break; 420 case CurrentKbRate: 421 // NB Heritrix seems to store the average value in 422 // KbRate instead of CurrentKbRate... 423 // Inverse of doc rates. 424 Long processedKBPerSec = -1L; 425 if (value != null) { 426 processedKBPerSec = (Long) value; 427 } 428 jStatus.setProcessedKBPerSec(processedKBPerSec); 429 break; 430 case DiscoveredCount: 431 Long discoveredCount = -1L; 432 if (value != null) { 433 discoveredCount = (Long) value; 434 } 435 jStatus.setDiscoveredFilesCount(discoveredCount); 436 break; 437 case DocRate: 438 Double docRate = new Double(-1L); 439 if (value != null) { 440 docRate = (Double) value; 441 } 442 jStatus.setProcessedDocsPerSec(docRate); 443 break; 444 case DownloadedCount: 445 Long downloadedCount = -1L; 446 if (value != null) { 447 downloadedCount = (Long) value; 448 } 449 jStatus.setDownloadedFilesCount(downloadedCount); 450 break; 451 case FrontierShortReport: 452 String frontierShortReport = "?"; 453 if (value != null) { 454 frontierShortReport = (String) value; 455 } 456 jStatus.setFrontierShortReport(frontierShortReport); 457 break; 458 case KbRate: 459 // NB Heritrix seems to store the average value in 460 // KbRate instead of CurrentKbRate... 461 // Inverse of doc rates. 462 Long kbRate = -1L; 463 if (value != null) { 464 kbRate = (Long) value; 465 } 466 jStatus.setCurrentProcessedKBPerSec(kbRate); 467 break; 468 case Status: 469 String newStatus = "?"; 470 if (value != null) { 471 newStatus = (String) value; 472 } 473 jStatus.setStatus(newStatus); 474 if (value != null) { 475 String status = (String) value; 476 if (CrawlController.PAUSING.equals(status)) { 477 cpm.setStatus(CrawlStatus.CRAWLER_PAUSING); 478 } else if (CrawlController.PAUSED.equals(status)) { 479 cpm.setStatus(CrawlStatus.CRAWLER_PAUSED); 480 } else { 481 cpm.setStatus(CrawlStatus.CRAWLER_ACTIVE); 482 } 483 } 484 break; 485 case ThreadCount: 486 Integer currentActiveToecount = -1; 487 if (value != null) { 488 currentActiveToecount = (Integer) value; 489 } 490 jStatus.setActiveToeCount(currentActiveToecount); 491 break; 492 default: 493 log.debug("Unhandled attribute: {}", aCrawlServiceJobAttribute); 494 } 495 } 496 } 497 498 /** 499 * Generates a full frontier report. 500 * 501 * @return a Full frontier report. 502 */ 503 public FullFrontierReport getFullFrontierReport() { 504 return FullFrontierReport.parseContentsAsString( 505 jobName, 506 (String) executeOperationNoRetry(crawlServiceJobBeanName, 507 CrawlServiceJobOperation.frontierReport.name(), "all")); 508 } 509 510 /** 511 * Get the name of the one job we let this Heritrix run. The handling of done jobs depends on Heritrix not being in 512 * crawl. This call may take several seconds to finish. 513 * 514 * @return The name of the one job that Heritrix has. 515 * @throws IOFailure if the job created failed to initialize or didn't appear in time. 516 * @throws IllegalState if more than one job in done list, or more than one pending job 517 */ 518 private String getJobName() { 519 /* 520 * This is called just after we've told Heritrix to create a job. It may take a while before the job is actually 521 * created, so we have to wait around a bit. 522 */ 523 TabularData pendingJobs = null; 524 TabularData doneJobs; 525 int retries = 0; 526 final int maxJmxRetries = JMXUtils.getMaxTries(); 527 while (retries++ < maxJmxRetries) { 528 // If the job turns up in Heritrix' pending jobs list, it's ready 529 pendingJobs = (TabularData) executeMBeanOperation(CrawlServiceOperation.pendingJobs); 530 if (pendingJobs != null && pendingJobs.size() > 0) { 531 break; // It's ready, we can move on. 532 } 533 534 // If there's an error in the job configuration, the job will be put 535 // in Heritrix' completed jobs list. 536 doneJobs = (TabularData) executeMBeanOperation(CrawlServiceOperation.completedJobs); 537 if (doneJobs != null && doneJobs.size() >= 1) { 538 // Since we haven't allowed Heritrix to start any crawls yet, 539 // the only way the job could have ended and then put into 540 // the list of completed jobs is by error. 541 if (doneJobs.size() > 1) { 542 throw new IllegalState("More than one job in done list: " + doneJobs); 543 } else { 544 CompositeData job = JMXUtils.getOneCompositeData(doneJobs); 545 throw new IOFailure("Job " + job + " failed: " + job.get(CrawlServiceJobAttribute.Status.name())); 546 } 547 } 548 if (retries < maxJmxRetries) { 549 TimeUtils.exponentialBackoffSleep(retries); 550 } 551 } 552 // If all went well, we now have exactly one job in the pending 553 // jobs list. 554 if (pendingJobs == null || pendingJobs.size() == 0) { 555 throw new IOFailure("Heritrix has not created a job after " 556 + (Math.pow(2, maxJmxRetries) / TimeUtils.SECOND_IN_MILLIS) + " seconds, giving up."); 557 } else if (pendingJobs.size() > 1) { 558 throw new IllegalState("More than one pending job: " + pendingJobs); 559 } else { 560 // Note that we may actually get through to here even if the job 561 // is malformed. The job will then die as soon as we tell it to 562 // start crawling. 563 CompositeData job = JMXUtils.getOneCompositeData(pendingJobs); 564 String name = job.get(JmxUtils.NAME) + "-" + job.get(UID_PROPERTY); 565 log.info("Heritrix created a job with name {}", name); 566 return name; 567 } 568 } 569 570 /** 571 * Periodically scans the crawl dir to see if Heritrix has finished generating the crawl reports. The time to wait 572 * is bounded by {@link HarvesterSettings#WAIT_FOR_REPORT_GENERATION_TIMEOUT}. 573 * 574 * @param crawlDir the crawl directory to scan. 575 */ 576 private void waitForReportGeneration(File crawlDir) { 577 log.info("Started waiting for Heritrix report generation."); 578 579 long currentTime = System.currentTimeMillis(); 580 long waitSeconds = Settings.getLong(HarvesterSettings.WAIT_FOR_REPORT_GENERATION_TIMEOUT); 581 long waitDeadline = currentTime + TimeUtils.SECOND_IN_MILLIS * waitSeconds; 582 583 // While the deadline is not attained, periodically perform the 584 // following checks: 585 // 1) Verify that the crawl job MBean still exists. If not then 586 // the job is over, no need to wait more and exit the loop. 587 // 2) Read the job(s status. Since Heritrix 1.14.4, a FINISHED status 588 // guarantees that all reports have been generated. In this case 589 // exit the loop. 590 while (currentTime <= waitDeadline) { 591 currentTime = System.currentTimeMillis(); 592 593 boolean crawlServiceJobExists = false; 594 try { 595 if (crawlServiceJobBeanName != null) { 596 crawlServiceJobExists = getMBeanServerConnection().isRegistered( 597 JMXUtils.getBeanName(crawlServiceJobBeanName)); 598 } else { 599 // An error occurred when initializing the controller 600 // Simply log a warning for the record 601 log.warn("crawlServiceJobBeanName is null, earlier initialization of controller did not complete."); 602 } 603 } catch (IOException e) { 604 log.warn("IOException", e); 605 continue; 606 } 607 608 if (!crawlServiceJobExists) { 609 log.info("{} MBean not found, report generation is finished. Exiting wait loop.", 610 crawlServiceJobBeanName); 611 break; 612 } 613 614 String status = ""; 615 try { 616 List<Attribute> atts = getAttributesNoRetry(crawlServiceJobBeanName, 617 new String[] {CrawlServiceJobAttribute.Status.name()}); 618 status = (String) atts.get(0).getValue(); 619 } catch (IOFailure e) { 620 log.warn("IOFailure", e); 621 continue; 622 } catch (IndexOutOfBoundsException e) { 623 // sometimes the array is empty TODO find out why 624 log.warn("IndexOutOfBoundsException", e); 625 continue; 626 } 627 628 if (CrawlController.FINISHED.equals(status)) { 629 log.info("{} status is FINISHED, report generation is complete. Exiting wait loop.", 630 crawlServiceJobBeanName); 631 return; 632 } 633 634 try { 635 // Wait 20 seconds 636 Thread.sleep(20 * TimeUtils.SECOND_IN_MILLIS); 637 } catch (InterruptedException e) { 638 log.trace("Received InterruptedException", e); 639 } 640 } 641 log.info("Waited {} for report generation. Will proceed with cleanup.", StringUtils.formatDuration(waitSeconds)); 642 } 643 644 /** 645 * Execute a single CrawlServiceOperation. 646 * 647 * @param operation the operation to execute 648 * @param arguments any arguments needed by the operation 649 * @return Whatever the command returned. 650 */ 651 private Object executeMBeanOperation(CrawlServiceOperation operation, String... arguments) { 652 return executeOperation(crawlServiceBeanName, operation.name(), arguments); 653 } 654 655 /** 656 * Execute a single CrawlServiceOperation. 657 * 658 * @param operation the operation to execute 659 * @param arguments any arguments needed by the operation 660 * @return Whatever the command returned. 661 */ 662 private Object executeMBeanOperation(CrawlServiceJobOperation operation, String... arguments) { 663 return executeOperation(crawlServiceJobBeanName, operation.name(), arguments); 664 } 665 666 /** 667 * Get the value of several attributes. 668 * 669 * @param attributes The attributes to get. 670 * @return Whatever the command returned. 671 */ 672 private List<Attribute> getMBeanAttributes(CrawlServiceJobAttribute[] attributes) { 673 String[] attNames = new String[attributes.length]; 674 for (int i = 0; i < attributes.length; i++) { 675 attNames[i] = attributes[i].name(); 676 } 677 678 return getAttributes(crawlServiceJobBeanName, attNames); 679 } 680 681 /** 682 * Get the value of several attributes. 683 * 684 * @param attributes The attributes to get. 685 * @return Whatever the command returned. 686 */ 687 private List<Attribute> getMBeanAttributes(CrawlServiceAttribute[] attributes) { 688 String[] attNames = new String[attributes.length]; 689 for (int i = 0; i < attributes.length; i++) { 690 attNames[i] = attributes[i].name(); 691 } 692 693 return getAttributes(crawlServiceBeanName, attNames); 694 } 695 696 /** 697 * Execute a command on a bean. 698 * 699 * @param beanName Name of the bean. 700 * @param operation Command to execute. 701 * @param args Arguments to the command. Only string arguments are possible at the moment. 702 * @return The return value of the executed command. 703 */ 704 private Object executeOperation(String beanName, String operation, String... args) { 705 return jmxCall(beanName, true, true, new String[] {operation}, args); 706 } 707 708 /** 709 * Execute a command on a bean, does not retry if fails. 710 * 711 * @param beanName Name of the bean. 712 * @param operation Command to execute. 713 * @param args Arguments to the command. Only string arguments are possible at the moment. 714 * @return The return value of the executed command. 715 */ 716 private Object executeOperationNoRetry(String beanName, String operation, String... args) { 717 return jmxCall(beanName, false, true, new String[] {operation}, args); 718 } 719 720 /** 721 * Get the value of several attributes from a bean. 722 * 723 * @param beanName Name of the bean to get an attribute for. 724 * @param attributes Name of the attributes to get. 725 * @return Value of the attribute. 726 */ 727 @SuppressWarnings("unchecked") 728 private List<Attribute> getAttributes(String beanName, String[] attributes) { 729 return (List<Attribute>) jmxCall(beanName, true, false, attributes); 730 } 731 732 /** 733 * Get the value of several attributes from a bean, but does not retry if the fetch fails. 734 * 735 * @param beanName Name of the bean to get an attribute for. 736 * @param attributes Name of the attributes to get. 737 * @return Value of the attribute. 738 */ 739 @SuppressWarnings("unchecked") 740 private List<Attribute> getAttributesNoRetry(String beanName, String[] attributes) { 741 return (List<Attribute>) jmxCall(beanName, false, false, attributes); 742 } 743 744 /** 745 * Executes a JMX call (attribute read or single operation) on a given bean. 746 * 747 * @param beanName the MBean name. 748 * @param retry if true, will retry a number of times if the operation fails. 749 * @param isOperation true if the call is an operation, false if it's an attribute read. 750 * @param names name of operation or name of attributes 751 * @param args optional arguments for operations 752 * @return the object returned by the distant MBean 753 */ 754 private Object jmxCall(String beanName, boolean retry, boolean isOperation, String[] names, String... args) { 755 MBeanServerConnection connection = getMBeanServerConnection(); 756 757 int maxTries = 1; 758 if (retry) { 759 maxTries = jmxMaxTries; 760 } 761 int tries = 0; 762 Throwable lastException; 763 do { 764 tries++; 765 try { 766 if (isOperation) { 767 final String[] signature = new String[args.length]; 768 Arrays.fill(signature, String.class.getName()); 769 return connection.invoke(JMXUtils.getBeanName(beanName), names[0], args, signature); 770 } else { 771 return connection.getAttributes(JMXUtils.getBeanName(beanName), names).asList(); 772 } 773 } catch (IOException e) { 774 lastException = e; 775 } catch (ReflectionException e) { 776 lastException = e; 777 } catch (InstanceNotFoundException e) { 778 lastException = e; 779 } catch (MBeanException e) { 780 lastException = e; 781 } 782 log.debug("Attempt {} out of {} attempts to make this jmxCall failed ", tries, maxTries); 783 if (tries < maxTries) { 784 TimeUtils.exponentialBackoffSleep(tries); 785 } 786 787 } while (tries < maxTries); 788 789 String msg = ""; 790 if (isOperation) { 791 msg = "Failed to execute " + names[0] + " with args " + Arrays.toString(args) + " on " + beanName; 792 } else { 793 msg = "Failed to read attributes " + Arrays.toString(names) + " of " + beanName; 794 } 795 796 if (lastException != null) { 797 msg += ", last exception was " + lastException.getClass().getName(); 798 } 799 msg += " after " + tries + " attempts"; 800 throw new IOFailure(msg, lastException); 801 } 802 803 /** 804 * Initializes the JMX connection. 805 */ 806 private void initJMXConnection() { 807 // Initialize the connection to Heritrix' MBeanServer 808 this.jmxConnector = JMXUtils.getJMXConnector(SystemUtils.LOCALHOST, getJmxPort(), 809 Settings.get(HarvesterSettings.HERITRIX_JMX_USERNAME), 810 Settings.get(HarvesterSettings.HERITRIX_JMX_PASSWORD)); 811 } 812 813 /** 814 * Closes the JMX connection. 815 */ 816 private void closeJMXConnection() { 817 // Close the connection to the MBean Server 818 try { 819 jmxConnector.close(); 820 } catch (IOException e) { 821 log.error("JMX error while closing connection to Heritrix", e); 822 } 823 } 824 825 /** 826 * @return aMBeanServerConnection to Heritrix 827 */ 828 private MBeanServerConnection getMBeanServerConnection() { 829 MBeanServerConnection connection = null; 830 int tries = 0; 831 IOException ioe = null; 832 while (tries < jmxMaxTries && connection == null) { 833 ++tries; 834 try { 835 connection = jmxConnector.getMBeanServerConnection(); 836 log.debug("Got a MBeanserverconnection at attempt #{}", tries); 837 return connection; 838 } catch (IOException e) { 839 ioe = e; 840 log.info("IOException while getting MBeanServerConnection. Attempt {} out of {}. " 841 + "Will try to renew the JMX connection to Heritrix", tries, jmxMaxTries); 842 // When an IOException is raised in RMIConnector, a terminated 843 // flag is set to true, even if the underlying connection is 844 // not closed. This seems to be part of a mechanism to prevent 845 // deadlocks, but can cause trouble for us. 846 // So if this happens, we close and reinitialize 847 // the JMX connector itself. 848 closeJMXConnection(); 849 try { 850 initJMXConnection(); 851 log.info("Successfully renewed JMX connection"); 852 } catch (IOFailure e1) { 853 log.debug("Renewal of JMXConnection failed at retry #{} with exception: ", tries, e1); 854 } 855 } 856 if (tries < jmxMaxTries) { 857 TimeUtils.exponentialBackoffSleep(tries); 858 } 859 } 860 861 if (ABORT_IF_CONN_LOST) { 862 log.debug("Connection to Heritrix seems to be lost. Trying to abort ..."); 863 throw new HarvestingAbort("Failed to connect to MBeanServer", ioe); 864 } else { 865 throw new IOFailure("Failed to connect to MBeanServer", ioe); 866 } 867 } 868 869 @Override 870 public boolean atFinish() { 871 throw new NotImplementedException("Not implemented"); 872 } 873 874 @Override 875 public void beginCrawlStop() { 876 throw new NotImplementedException("Not implemented"); 877 } 878 879 @Override 880 public void cleanup() { 881 throw new NotImplementedException("Not implemented"); 882 } 883 884 @Override 885 public boolean crawlIsEnded() { 886 throw new NotImplementedException("Not implemented"); 887 } 888 889 @Override 890 public int getActiveToeCount() { 891 throw new NotImplementedException("Not implemented"); 892 } 893 894 @Override 895 public int getCurrentProcessedKBPerSec() { 896 throw new NotImplementedException("Not implemented"); 897 } 898 899 @Override 900 public String getHarvestInformation() { 901 throw new NotImplementedException("Not implemented"); 902 } 903 904 @Override 905 public String getProgressStats() { 906 throw new NotImplementedException("Not implemented"); 907 } 908 909 @Override 910 public long getQueuedUriCount() { 911 throw new NotImplementedException("Not implemented"); 912 } 913 914 @Override 915 public boolean isPaused() { 916 throw new NotImplementedException("Not implemented"); 917 } 918 919}