001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.heritrix3.controller; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Arrays; 028 029import org.netarchivesuite.heritrix3wrapper.EngineResult; 030import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper; 031import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper.CrawlControllerState; 032import org.netarchivesuite.heritrix3wrapper.JobResult; 033import org.netarchivesuite.heritrix3wrapper.ResultStatus; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import dk.netarkivet.common.exceptions.IOFailure; 038import dk.netarkivet.common.exceptions.NotImplementedException; 039import dk.netarkivet.common.utils.Settings; 040import dk.netarkivet.common.utils.StringUtils; 041import dk.netarkivet.common.utils.SystemUtils; 042import dk.netarkivet.common.utils.TimeUtils; 043import dk.netarkivet.harvester.HarvesterSettings; 044import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage; 045import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceInfo; 046import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceJobInfo; 047import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus; 048import dk.netarkivet.harvester.harvesting.frontier.FullFrontierReport; 049import dk.netarkivet.harvester.heritrix3.Heritrix3Files; 050import dk.netarkivet.harvester.heritrix3.Heritrix3Settings; 051 052/** 053 * This implementation of the HeritrixController interface starts Heritrix as a separate process and uses JMX to 054 * communicate with it. Each instance executes exactly one process that runs exactly one crawl job. 055 */ 056public class HeritrixController extends AbstractRestHeritrixController { 057 058 /** The logger for this class. */ 059 private static final Logger log = LoggerFactory.getLogger(HeritrixController.class); 060 061 /** 062 * The name that Heritrix gives to the job we ask it to create. 063 */ 064 private String jobName; 065 066 /** The header line (legend) for the statistics report. */ 067 private String progressStatisticsLegend; 068 069 /** 070 * Create a BnfHeritrixController object. 071 * 072 * @param files Files that are used to set up Heritrix. 073 */ 074 public HeritrixController(Heritrix3Files files, String jobName) { 075 super(files); 076 this.jobName = jobName; 077 } 078 079 /** 080 * Initialize the JMXconnection to the Heritrix. 081 * 082 * @throws IOFailure If Heritrix dies before initialisation, or we encounter any problems during the initialisation. 083 * @see IHeritrixController#initialize() 084 */ 085 @Override 086 public void initialize() { 087 088 ///////////////////////////////////////////////////// 089 // Initialize H3 wrapper 090 ///////////////////////////////////////////////////// 091 092 h3wrapper = Heritrix3Wrapper.getInstance(getHostName(), getGuiPort(), 093 null, null, getHeritrixAdminName(), getHeritrixAdminPassword()); 094 095 EngineResult engineResult; 096 try { 097 //TODO these numbers should be settings 098 int tries = 60; 099 int millisecondsBetweenTries = 1000; 100 engineResult = h3wrapper.waitForEngineReady(tries, millisecondsBetweenTries); 101 } catch (Throwable e){ 102 e.printStackTrace(); 103 throw new IOFailure("Heritrix not started: " + e); 104 } 105 106 if (engineResult != null) { 107 if (engineResult.status != ResultStatus.OK) { 108 log.error("Heritrix3 wrapper could not connect to Heritrix3. Resultstate = {}", engineResult.status, engineResult.t); 109 throw new IOFailure("Heritrix3 wrapper could not connect to Heritrix3. Resultstate = " + engineResult.status); 110 } 111 } else { 112 throw new IOFailure("Heritrix3 wrapper returned null engine result."); 113 } 114 115 // POST: Heritrix3 is up and running and responds nicely 116 log.info("Heritrix3 REST interface connectable."); 117 } 118 119 @Override 120 public void requestCrawlStart() { 121 // Create a new job 122 File cxmlFile = getHeritrixFiles().getOrderFile(); 123 File seedsFile = getHeritrixFiles().getSeedsFile(); 124 JobResult jobResult; 125 126 File jobDir = files.getHeritrixJobDir(); 127 if (!jobDir.exists()) { 128 jobDir.mkdirs(); 129 } 130 131 try { 132 log.info("Copying the crawler-beans.cxml file and seeds.txt to the heritrix3 jobdir '{}'", jobDir); 133 Heritrix3Wrapper.copyFile( cxmlFile, jobDir ); 134 Heritrix3Wrapper.copyFileAs( seedsFile, jobDir, "seeds.txt" ); 135 } catch (IOException e) { 136 throw new IOFailure("Problem occurred during the copying of files to our heritrix job", e); 137 } 138 139 // PRE: h3 is running, and the job files copied to their final location? 140 EngineResult engineResult = null; 141 try { 142 engineResult = h3wrapper.rescanJobDirectory(); 143 log.debug("Result of rescanJobDirectory() operation: " + new String(engineResult.response, "UTF-8")); 144 jobResult = h3wrapper.job(jobName); 145 146 jobResult = h3wrapper.buildJobConfiguration(jobName); 147 log.debug("Result of buildJobConfiguration() operation: " + new String(jobResult.response, "UTF-8")); 148 jobResult = h3wrapper.waitForJobState(jobName, CrawlControllerState.NASCENT, 60, 1000); 149 jobResult = h3wrapper.launchJob(jobName); 150 log.debug("Result of launchJob() operation: " + new String(jobResult.response, "UTF-8")); 151 jobResult = h3wrapper.waitForJobState(jobName, CrawlControllerState.PAUSED, 60, 1000); 152 jobResult = h3wrapper.unpauseJob(jobName); 153 } catch (Throwable e) { 154 throw new IOFailure("Unknown error during communication with heritrix3", e); 155 } 156 157 // POST: h3 is running, and the job with name 'jobName' is running 158 log.debug("h3-State after unpausing job '{}': {}", jobName, jobResult.response); 159 } 160 161 @Override 162 public void requestCrawlStop(String reason) { 163 log.info("Terminating job {}. Reason: {}", this.jobName, reason); 164 JobResult jobResult = h3wrapper.job(jobName); 165 if (jobResult != null) { 166 if (jobResult.job.isRunning) { 167 JobResult result = h3wrapper.terminateJob(this.jobName); 168 if (!result.job.isRunning) { 169 log.warn("Job '{}' terminated", this.jobName); 170 } else { 171 log.warn("Job '{}' not terminated correctly", this.jobName); 172 } 173 } else { 174 log.warn("Job '{}' not terminated, as it was not running", this.jobName); 175 } 176 } else { 177 log.warn("Job '{}' has maybe already been terminated and/or heritrix3 is no longer running", this.jobName); 178 } 179 } 180 181 @Override 182 public void stopHeritrix() { 183 log.debug("Stopping Heritrix"); 184 try { 185 ProcessBuilder processBuilder = new ProcessBuilder("pgrep", "-f", jobName); 186 log.info("Looking up heritrix process with. " + processBuilder.command()); 187 if (processBuilder.start().waitFor() == 0) { 188 log.info("Heritrix running, requesting heritrix to stop ignoring running job " + jobName); 189 h3wrapper.exitJavaProcess(Arrays.asList(new String[] {jobName})); 190 } else { 191 log.info("Heritrix not running."); 192 } 193 if (processBuilder.start().waitFor() == 0) { 194 log.info("Heritrix still running, pkill'ing heritrix "); 195 ProcessBuilder killerProcessBuilder = new ProcessBuilder("pkill", "-f", jobName); 196 int pkillExitValue = killerProcessBuilder.start().exitValue(); 197 if ( pkillExitValue != 0) { 198 log.warn("Non xero exit value )" + pkillExitValue + ") when trying to pkill Heritrix."); 199 } 200 } else { 201 log.info("Heritrix stopped successfully."); 202 } 203 } catch (IOException e) { 204 log.warn("Exception while trying to shutdown heritrix", e); 205 } catch (InterruptedException e) { 206 log.debug("stopHeritrix call interupted", e); 207 } 208 } 209 210 /** 211 * Return the URL for monitoring this instance. 212 * 213 * @return the URL for monitoring this instance. 214 */ 215 public String getHeritrixConsoleURL() { 216 return "https://" + SystemUtils.getLocalHostName() + ":" + getGuiPort() + "/engine"; 217 } 218 219 /** 220 * Cleanup after an Heritrix process. This entails sending the shutdown command to the Heritrix process, and killing 221 * it forcefully, if it is still alive after waiting the period of time specified by the 222 * CommonSettings.PROCESS_TIMEOUT setting. 223 * 224 * @param crawlDir the crawldir to cleanup 225 * @see IHeritrixController#cleanup() 226 */ 227 public void cleanup(File crawlDir) { 228 JobResult jobResult; 229 try { 230 // Before cleaning up, we need to wait for the reports to be generated 231 //waitForReportGeneration(crawlDir); 232 // TODO Should we teardown job as well???? 233 jobResult = h3wrapper.job(jobName); 234 if (jobResult != null) { 235 if (jobResult.status == ResultStatus.OK && jobResult.job.crawlControllerState != null) { 236 if (CrawlControllerState.FINISHED.name().equals(jobResult.job.crawlControllerState)) { 237 jobResult = h3wrapper.teardownJob(jobName); 238 } else { 239 throw new IOFailure("Heritrix3 job '" + jobName + "' not finished"); 240 } 241 } 242 } else { 243 throw new IOFailure("Unknown error during communication with heritrix3"); 244 } 245 h3wrapper.waitForJobState(jobName, null, 10, 1000); 246 EngineResult result = h3wrapper.exitJavaProcess(null); 247 if (result == null || (result.status != ResultStatus.RESPONSE_EXCEPTION && result.status != ResultStatus.OFFLINE)) { 248 throw new IOFailure("Heritrix3 could not be shut down"); 249 } 250 } catch (Throwable e) { 251 throw new IOFailure("Unknown error during communication with heritrix3", e); 252 } 253 } 254 255 /** 256 * Return the URL for monitoring this instance. 257 * 258 * @return the URL for monitoring this instance. 259 */ 260 public String getAdminInterfaceUrl() { 261 return "https://" + SystemUtils.getLocalHostName() + ":" + getGuiPort() + "/engine"; 262 } 263 264 /** 265 * Gets a message that stores the information summarizing the crawl progress. 266 * 267 * @return a message that stores the information summarizing the crawl progress. 268 */ 269 public CrawlProgressMessage getCrawlProgress() { 270 Heritrix3Files files = getHeritrixFiles(); 271 CrawlProgressMessage cpm = new CrawlProgressMessage(files.getHarvestID(), files.getJobID(), 272 progressStatisticsLegend); 273 cpm.setHostUrl(getHeritrixConsoleURL()); 274 JobResult jobResult = h3wrapper.job(jobName); 275 if (jobResult != null) { 276 getCrawlServiceAttributes(cpm, jobResult); 277 } else { 278 log.warn("Unable to get Heritrix3 status for job '{}'", jobName); 279 } 280 if (cpm.crawlIsFinished()) { 281 cpm.setStatus(CrawlStatus.CRAWLING_FINISHED); 282 // No need to go further, CrawlService.Job bean does not exist 283 return cpm; 284 } 285 if (jobResult != null) { 286 fetchCrawlServiceJobAttributes(cpm, jobResult); 287 } else { 288 log.warn("Unable to get JobAttributes for job '{}'", jobName); 289 } 290 return cpm; 291 } 292 293 /** 294 * Retrieve the values of the crawl service attributes and add them to the CrawlProgressMessage being put together. 295 * 296 * @param cpm the crawlProgress message being prepared 297 */ 298 private void getCrawlServiceAttributes(CrawlProgressMessage cpm, JobResult job) { 299 // TODO check job state?? 300 CrawlServiceInfo hStatus = cpm.getHeritrixStatus(); 301 hStatus.setAlertCount(job.job.alertCount); // info taken from job information 302 hStatus.setCurrentJob(this.jobName); // Note:Information not taken from H3 303 hStatus.setCrawling(job.job.isRunning);// info taken from job information 304 } 305 306 /** 307 * Retrieve the values of the crawl service job attributes and add them to the CrawlProgressMessage being put 308 * together. 309 * 310 * @param cpm the crawlProgress message being prepared 311 */ 312 private void fetchCrawlServiceJobAttributes(CrawlProgressMessage cpm, JobResult job) { 313 CrawlServiceJobInfo jStatus = cpm.getJobStatus(); 314 315/* 316 timestamp discovered queued downloaded doc/s(avg) KB/s(avg) dl-failures busy-thread mem-use-KB heap-size-KB congestion max-depth avg-depth 3172015-04-29T12:42:54Z 774 573 185 0.9(2.31) 49(41) 16 2 61249 270848 1 456 114 318*/ 319 /* 320 jStatus.setProgressStatistics(newProgressStats); 321 if (progressStatisticsLegend == null) { 322 progressStatisticsLegend = (String) executeMBeanOperation(CrawlServiceJobOperation.progressStatisticsLegend); 323 } 324 */ 325 326 long totalUriCount = job.job.uriTotalsReport.totalUriCount; 327 long downloadedUriCount = job.job.uriTotalsReport.downloadedUriCount; 328 Double progress; 329 if (totalUriCount == 0) { 330 progress = 0.0; 331 } else { 332 progress = downloadedUriCount * 100.0 / totalUriCount; 333 } 334 jStatus.setProgressStatistics(progress + "%"); 335 336 Long elapsedSeconds = job.job.elapsedReport.elapsedMilliseconds; 337 if (elapsedSeconds == null) { 338 elapsedSeconds = -1L; 339 } else { 340 elapsedSeconds = elapsedSeconds / 1000L; 341 } 342 jStatus.setElapsedSeconds(elapsedSeconds); 343 344 Double currentProcessedDocsPerSec = job.job.rateReport.currentDocsPerSecond; 345 if (currentProcessedDocsPerSec == null) { 346 currentProcessedDocsPerSec = new Double(-1L); 347 } 348 jStatus.setCurrentProcessedDocsPerSec(currentProcessedDocsPerSec); 349 350 Double processedDocsPerSec = job.job.rateReport.averageDocsPerSecond; 351 if (processedDocsPerSec == null) { 352 processedDocsPerSec = new Double(-1L); 353 } 354 jStatus.setProcessedDocsPerSec(processedDocsPerSec); 355 356 Integer kbRate = job.job.rateReport.currentKiBPerSec; 357 if (kbRate == null) { 358 kbRate = -1; 359 } 360 jStatus.setCurrentProcessedKBPerSec(kbRate); 361 362 Integer processedKBPerSec = job.job.rateReport.averageKiBPerSec; 363 if (processedKBPerSec == null) { 364 processedKBPerSec = -1; 365 } 366 jStatus.setProcessedKBPerSec(processedKBPerSec); 367 368 Long discoveredFilesCount = job.job.uriTotalsReport.totalUriCount; 369 if (discoveredFilesCount == null) { 370 discoveredFilesCount = -1L; 371 } 372 jStatus.setDiscoveredFilesCount(discoveredFilesCount); 373 374 Long downloadedCount = job.job.uriTotalsReport.downloadedUriCount; 375 if (downloadedCount == null) { 376 downloadedCount = -1L; 377 } 378 jStatus.setDownloadedFilesCount(downloadedCount); 379/* 38027 queues: 5 active (1 in-process; 0 ready; 4 snoozed); 0 inactive; 0 retired; 22 exhausted 381*/ 382 String frontierShortReport = String.format("%d queues: %d active (%d in-process; %d ready; %d snoozed); %d inactive; %d retired; %d exhausted", 383 job.job.frontierReport.totalQueues, 384 job.job.frontierReport.activeQueues, 385 job.job.frontierReport.inProcessQueues, 386 job.job.frontierReport.readyQueues, 387 job.job.frontierReport.snoozedQueues, 388 job.job.frontierReport.inactiveQueues, 389 job.job.frontierReport.retiredQueues, 390 job.job.frontierReport.exhaustedQueues); 391 jStatus.setFrontierShortReport(frontierShortReport); 392 393 String newStatus = "?"; 394 String StringValue = job.job.crawlControllerState; 395 if (StringValue != null) { 396 newStatus = (String) StringValue; 397 } 398 jStatus.setStatus(newStatus); 399 String status = (String) StringValue; 400 if (status.contains("PAUSE")) { // FIXME this is not correct 401 cpm.setStatus(CrawlStatus.CRAWLER_PAUSED); 402 } else { 403 cpm.setStatus(CrawlStatus.CRAWLER_ACTIVE); 404 } 405 406 //Integer currentActiveToecount = job.job.threadReport.toeCount; 407 /* 408 Integer currentActiveToecount = null; 409 Iterator<String> iter = job.job.threadReport.processors.iterator(); 410 String tmpStr; 411 int idx; 412 while (currentActiveToecount == null && iter.hasNext()) { 413 tmpStr = iter.next(); 414 idx = tmpStr.indexOf(" noActiveProcessor"); 415 if (idx != -1) { 416 currentActiveToecount = Integer.parseInt(tmpStr.substring(0, idx).trim()); 417 } 418 } 419 */ 420 Integer currentActiveToecount = job.job.loadReport.busyThreads; 421 if (currentActiveToecount == null) { 422 currentActiveToecount = -1; 423 } 424 jStatus.setActiveToeCount(currentActiveToecount); 425 } 426 427 /** 428 * Generates a full frontier report. 429 * 430 * @return a Full frontier report. 431 */ 432 public FullFrontierReport getFullFrontierReport() { 433 //FIXME get frontier report from H3 using an appropriate REST call. 434 // Is the following OK: No!!! 435 //https://localhost:8444/engine/job/testjob/jobdir/20150210135411/reports/frontier-summary-report.txt 436 437 return null; 438 /* 439 return FullFrontierReport.parseContentsAsString( 440 jobName, 441 (String) executeOperationNoRetry(crawlServiceJobBeanName, 442 CrawlServiceJobOperation.frontierReport.name(), "all")); 443 444 */ 445 } 446 447 /** 448 * Periodically scans the crawl dir to see if Heritrix has finished generating the crawl reports. The time to wait 449 * is bounded by {@link HarvesterSettings#WAIT_FOR_REPORT_GENERATION_TIMEOUT}. 450 * Currently not used 451 * 452 * @param crawlDir the crawl directory to scan. 453 */ 454 @Deprecated 455 private void waitForReportGeneration(File crawlDir) { 456 log.info("Started waiting for Heritrix report generation."); 457 458 long currentTime = System.currentTimeMillis(); 459 long waitSeconds = Settings.getLong(Heritrix3Settings.WAIT_FOR_REPORT_GENERATION_TIMEOUT); 460 long waitDeadline = currentTime + TimeUtils.SECOND_IN_MILLIS * waitSeconds; 461 462 463 // While the deadline is not attained, periodically perform the 464 // following checks: 465 // 1) Verify that the crawl job MBean still exists. If not then 466 // the job is over, no need to wait more and exit the loop. 467 // 2) Read the job(s status. Since Heritrix 1.14.4, a FINISHED status 468 // guarantees that all reports have been generated. In this case 469 // exit the loop. 470 while (currentTime <= waitDeadline) { 471 currentTime = System.currentTimeMillis(); 472 473 //FIXME see if job is finished, if so, reports can be considered to ready as well????? 474 try { 475 // Wait 20 seconds 476 Thread.sleep(20 * TimeUtils.SECOND_IN_MILLIS); 477 } catch (InterruptedException e) { 478 log.trace("Received InterruptedException", e); 479 } 480 } 481 log.info("Waited {} for report generation. Will proceed with cleanup.", StringUtils.formatDuration(waitSeconds)); 482 } 483 484 @Override 485 public boolean atFinish() { 486 throw new NotImplementedException("Not implemented"); 487 } 488 489 @Override 490 public void beginCrawlStop() { 491 throw new NotImplementedException("Not implemented"); 492 } 493 494 @Override 495 public void cleanup() { 496 throw new NotImplementedException("Not implemented"); 497 } 498 499 @Override 500 public boolean crawlIsEnded() { 501 throw new NotImplementedException("Not implemented"); 502 } 503 504 @Override 505 public int getActiveToeCount() { 506 throw new NotImplementedException("Not implemented"); 507 } 508 509 @Override 510 public int getCurrentProcessedKBPerSec() { 511 throw new NotImplementedException("Not implemented"); 512 } 513 514 @Override 515 public String getHarvestInformation() { 516 throw new NotImplementedException("Not implemented"); 517 } 518 519 @Override 520 public String getProgressStats() { 521 throw new NotImplementedException("Not implemented"); 522 } 523 524 @Override 525 public long getQueuedUriCount() { 526 throw new NotImplementedException("Not implemented"); 527 } 528 529 @Override 530 public boolean isPaused() { 531 throw new NotImplementedException("Not implemented"); 532 } 533 534}