001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.heritrix3.controller;
024
025import java.io.File;
026import java.io.IOException;
027import java.util.Arrays;
028
029import org.netarchivesuite.heritrix3wrapper.EngineResult;
030import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper;
031import org.netarchivesuite.heritrix3wrapper.Heritrix3Wrapper.CrawlControllerState;
032import org.netarchivesuite.heritrix3wrapper.JobResult;
033import org.netarchivesuite.heritrix3wrapper.ResultStatus;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import dk.netarkivet.common.exceptions.IOFailure;
038import dk.netarkivet.common.exceptions.NotImplementedException;
039import dk.netarkivet.common.utils.Settings;
040import dk.netarkivet.common.utils.StringUtils;
041import dk.netarkivet.common.utils.SystemUtils;
042import dk.netarkivet.common.utils.TimeUtils;
043import dk.netarkivet.harvester.HarvesterSettings;
044import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage;
045import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceInfo;
046import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlServiceJobInfo;
047import dk.netarkivet.harvester.harvesting.distribute.CrawlProgressMessage.CrawlStatus;
048import dk.netarkivet.harvester.harvesting.frontier.FullFrontierReport;
049import dk.netarkivet.harvester.heritrix3.Heritrix3Files;
050import dk.netarkivet.harvester.heritrix3.Heritrix3Settings;
051
052/**
053 * This implementation of the HeritrixController interface starts Heritrix as a separate process and uses JMX to
054 * communicate with it. Each instance executes exactly one process that runs exactly one crawl job.
055 */
056public class HeritrixController extends AbstractRestHeritrixController {
057
058    /** The logger for this class. */
059    private static final Logger log = LoggerFactory.getLogger(HeritrixController.class);
060
061    /**
062     * The name that Heritrix gives to the job we ask it to create. 
063     */
064    private String jobName;
065
066    /** The header line (legend) for the statistics report. */
067    private String progressStatisticsLegend;
068
069    /**
070     * Create a BnfHeritrixController object.
071     *
072     * @param files Files that are used to set up Heritrix.
073     */
074    public HeritrixController(Heritrix3Files files, String jobName) {
075        super(files);
076        this.jobName = jobName;
077    }
078
079    /**
080     * Initialize the JMXconnection to the Heritrix.
081     *
082     * @throws IOFailure If Heritrix dies before initialisation, or we encounter any problems during the initialisation.
083     * @see IHeritrixController#initialize()
084     */
085    @Override
086    public void initialize() {
087
088        /////////////////////////////////////////////////////
089        // Initialize H3 wrapper 
090        /////////////////////////////////////////////////////
091
092        h3wrapper = Heritrix3Wrapper.getInstance(getHostName(), getGuiPort(), 
093                        null, null, getHeritrixAdminName(), getHeritrixAdminPassword());
094
095        EngineResult engineResult;
096        try {
097                //TODO these numbers should be settings
098                int tries = 60;
099                int millisecondsBetweenTries = 1000;
100                engineResult = h3wrapper.waitForEngineReady(tries, millisecondsBetweenTries);
101        } catch (Throwable e){
102                e.printStackTrace();
103                throw new IOFailure("Heritrix not started: " + e);
104        }
105
106        if (engineResult != null) {
107                if (engineResult.status != ResultStatus.OK) {
108                log.error("Heritrix3 wrapper could not connect to Heritrix3. Resultstate = {}", engineResult.status, engineResult.t);
109                throw new IOFailure("Heritrix3 wrapper could not connect to Heritrix3. Resultstate = " + engineResult.status);
110                }
111        } else {
112                throw new IOFailure("Heritrix3 wrapper returned null engine result.");
113        }
114
115        // POST: Heritrix3 is up and running and responds nicely
116        log.info("Heritrix3 REST interface connectable.");
117    }
118
119    @Override
120    public void requestCrawlStart() {
121        // Create a new job 
122        File cxmlFile = getHeritrixFiles().getOrderFile();
123        File seedsFile = getHeritrixFiles().getSeedsFile();
124        JobResult jobResult;
125
126                File jobDir = files.getHeritrixJobDir();
127                if (!jobDir.exists()) {
128                        jobDir.mkdirs();
129                }
130
131                try {
132                        log.info("Copying the crawler-beans.cxml file and seeds.txt to the heritrix3 jobdir '{}'", jobDir);
133                        Heritrix3Wrapper.copyFile( cxmlFile, jobDir );
134                        Heritrix3Wrapper.copyFileAs( seedsFile, jobDir, "seeds.txt" ); 
135                } catch (IOException e) {
136                        throw new IOFailure("Problem occurred during the copying of files to our heritrix job", e);
137                }
138
139                // PRE: h3 is running, and the job files copied to their final location? 
140                EngineResult engineResult = null;
141                try {
142                engineResult = h3wrapper.rescanJobDirectory();
143                log.debug("Result of rescanJobDirectory() operation: " + new String(engineResult.response, "UTF-8"));
144                jobResult = h3wrapper.job(jobName);
145
146                jobResult = h3wrapper.buildJobConfiguration(jobName);
147                log.debug("Result of buildJobConfiguration() operation: " + new String(jobResult.response, "UTF-8"));
148                jobResult = h3wrapper.waitForJobState(jobName, CrawlControllerState.NASCENT, 60, 1000);
149                jobResult = h3wrapper.launchJob(jobName);
150                log.debug("Result of launchJob() operation: " + new String(jobResult.response, "UTF-8"));
151                jobResult = h3wrapper.waitForJobState(jobName, CrawlControllerState.PAUSED, 60, 1000);
152                jobResult = h3wrapper.unpauseJob(jobName);
153        } catch (Throwable e) {
154                throw new IOFailure("Unknown error during communication with heritrix3", e);
155        }
156
157                // POST: h3 is running, and the job with name 'jobName' is running
158                log.debug("h3-State after unpausing job '{}': {}", jobName, jobResult.response);
159    }
160
161    @Override
162    public void requestCrawlStop(String reason) {
163        log.info("Terminating job {}. Reason: {}", this.jobName, reason);
164        JobResult jobResult = h3wrapper.job(jobName);
165        if (jobResult != null) {
166                if (jobResult.job.isRunning) {
167                        JobResult result = h3wrapper.terminateJob(this.jobName);
168                        if (!result.job.isRunning) {
169                                log.warn("Job '{}' terminated", this.jobName);
170                        } else {
171                    log.warn("Job '{}' not terminated correctly", this.jobName);
172                }
173                } else {
174                        log.warn("Job '{}' not terminated, as it was not running", this.jobName);
175                }
176        } else {
177                log.warn("Job '{}' has maybe already been terminated and/or heritrix3 is no longer running", this.jobName); 
178        }
179    }
180
181    @Override
182    public void stopHeritrix() {
183        log.debug("Stopping Heritrix");
184        try {
185            ProcessBuilder processBuilder = new ProcessBuilder("pgrep", "-f", jobName);
186            log.info("Looking up heritrix process with. " + processBuilder.command());
187            if (processBuilder.start().waitFor() == 0) {
188                log.info("Heritrix running, requesting heritrix to stop ignoring running job " + jobName);
189                h3wrapper.exitJavaProcess(Arrays.asList(new String[] {jobName}));
190            } else {
191                log.info("Heritrix not running.");
192            }
193            if (processBuilder.start().waitFor() == 0) {
194                log.info("Heritrix still running, pkill'ing heritrix ");
195                ProcessBuilder killerProcessBuilder = new ProcessBuilder("pkill", "-f", jobName);
196                int pkillExitValue = killerProcessBuilder.start().exitValue();
197                if ( pkillExitValue != 0) {
198                    log.warn("Non xero exit value )" + pkillExitValue + ") when trying to pkill Heritrix.");
199                }
200            } else {
201                log.info("Heritrix stopped successfully.");
202            }
203        } catch (IOException e) {
204            log.warn("Exception while trying to shutdown heritrix", e);
205        } catch (InterruptedException e) {
206            log.debug("stopHeritrix call interupted", e);
207        }
208    }
209
210    /**
211     * Return the URL for monitoring this instance.
212     *
213     * @return the URL for monitoring this instance.
214     */
215    public String getHeritrixConsoleURL() {
216        return "https://" + SystemUtils.getLocalHostName() + ":" + getGuiPort() + "/engine";
217    }
218
219    /**
220     * Cleanup after an Heritrix process. This entails sending the shutdown command to the Heritrix process, and killing
221     * it forcefully, if it is still alive after waiting the period of time specified by the
222     * CommonSettings.PROCESS_TIMEOUT setting.
223     *
224     * @param crawlDir the crawldir to cleanup
225     * @see IHeritrixController#cleanup()
226     */
227    public void cleanup(File crawlDir) {
228        JobResult jobResult;
229                try {
230                // Before cleaning up, we need to wait for the reports to be generated
231                //waitForReportGeneration(crawlDir);
232                // TODO Should we teardown job as well????
233                        jobResult = h3wrapper.job(jobName);
234                        if (jobResult != null) {
235                                if (jobResult.status == ResultStatus.OK && jobResult.job.crawlControllerState != null) {
236                                        if (CrawlControllerState.FINISHED.name().equals(jobResult.job.crawlControllerState)) {
237                                                jobResult = h3wrapper.teardownJob(jobName);
238                                        } else {
239                                        throw new IOFailure("Heritrix3 job  '" + jobName + "' not finished");
240                                        }
241                                }
242                        } else {
243                        throw new IOFailure("Unknown error during communication with heritrix3");
244                        }
245                h3wrapper.waitForJobState(jobName, null, 10, 1000);
246                EngineResult result = h3wrapper.exitJavaProcess(null);
247                if (result == null || (result.status != ResultStatus.RESPONSE_EXCEPTION && result.status != ResultStatus.OFFLINE)) {
248                        throw new IOFailure("Heritrix3 could not be shut down");
249                }
250        } catch (Throwable e) {
251                throw new IOFailure("Unknown error during communication with heritrix3", e);
252        }
253    }
254
255    /**
256     * Return the URL for monitoring this instance.
257     *
258     * @return the URL for monitoring this instance.
259     */
260    public String getAdminInterfaceUrl() {
261        return "https://" + SystemUtils.getLocalHostName() + ":" + getGuiPort() + "/engine";
262    }
263
264    /**
265     * Gets a message that stores the information summarizing the crawl progress.
266     *
267     * @return a message that stores the information summarizing the crawl progress.
268     */
269    public CrawlProgressMessage getCrawlProgress() {
270        Heritrix3Files files = getHeritrixFiles();
271        CrawlProgressMessage cpm = new CrawlProgressMessage(files.getHarvestID(), files.getJobID(),
272                progressStatisticsLegend);
273        cpm.setHostUrl(getHeritrixConsoleURL());
274        JobResult jobResult = h3wrapper.job(jobName);
275        if (jobResult != null) {
276                getCrawlServiceAttributes(cpm, jobResult);
277        } else {
278                log.warn("Unable to get Heritrix3 status for job '{}'", jobName);
279        }
280        if (cpm.crawlIsFinished()) {
281            cpm.setStatus(CrawlStatus.CRAWLING_FINISHED);
282            // No need to go further, CrawlService.Job bean does not exist
283            return cpm;
284        }
285        if (jobResult != null) {
286                fetchCrawlServiceJobAttributes(cpm, jobResult);
287        } else {
288                log.warn("Unable to get JobAttributes for job '{}'", jobName);
289        }
290        return cpm;
291    }
292
293    /**
294     * Retrieve the values of the crawl service attributes and add them to the CrawlProgressMessage being put together.
295     *
296     * @param cpm the crawlProgress message being prepared
297     */
298    private void getCrawlServiceAttributes(CrawlProgressMessage cpm, JobResult job) {
299        // TODO check job state??
300        CrawlServiceInfo hStatus = cpm.getHeritrixStatus();
301        hStatus.setAlertCount(job.job.alertCount); // info taken from job information
302        hStatus.setCurrentJob(this.jobName); // Note:Information not taken from H3
303        hStatus.setCrawling(job.job.isRunning);// info taken from job information
304    }
305
306    /**
307     * Retrieve the values of the crawl service job attributes and add them to the CrawlProgressMessage being put
308     * together.
309     *
310     * @param cpm the crawlProgress message being prepared
311     */
312    private void fetchCrawlServiceJobAttributes(CrawlProgressMessage cpm, JobResult job) {
313        CrawlServiceJobInfo jStatus = cpm.getJobStatus();
314
315/*
316           timestamp  discovered      queued   downloaded       doc/s(avg)  KB/s(avg)   dl-failures   busy-thread   mem-use-KB  heap-size-KB   congestion   max-depth   avg-depth
3172015-04-29T12:42:54Z         774         573          185        0.9(2.31)     49(41)            16             2        61249        270848            1         456         114
318*/
319        /*
320        jStatus.setProgressStatistics(newProgressStats);
321        if (progressStatisticsLegend == null) {
322            progressStatisticsLegend = (String) executeMBeanOperation(CrawlServiceJobOperation.progressStatisticsLegend);
323        }
324        */
325
326        long totalUriCount = job.job.uriTotalsReport.totalUriCount;
327        long downloadedUriCount = job.job.uriTotalsReport.downloadedUriCount;
328        Double progress;
329        if (totalUriCount == 0) {
330                progress = 0.0;
331        } else {
332            progress = downloadedUriCount * 100.0 / totalUriCount;
333        }
334        jStatus.setProgressStatistics(progress + "%");
335
336        Long elapsedSeconds = job.job.elapsedReport.elapsedMilliseconds;
337        if (elapsedSeconds == null) {
338                elapsedSeconds = -1L;
339        } else {
340            elapsedSeconds = elapsedSeconds / 1000L; 
341        }
342        jStatus.setElapsedSeconds(elapsedSeconds);
343
344        Double currentProcessedDocsPerSec = job.job.rateReport.currentDocsPerSecond;
345        if (currentProcessedDocsPerSec == null) {
346            currentProcessedDocsPerSec = new Double(-1L);
347        }
348        jStatus.setCurrentProcessedDocsPerSec(currentProcessedDocsPerSec);
349
350        Double processedDocsPerSec = job.job.rateReport.averageDocsPerSecond;
351        if (processedDocsPerSec == null) {
352            processedDocsPerSec = new Double(-1L);
353        }
354        jStatus.setProcessedDocsPerSec(processedDocsPerSec);
355
356        Integer kbRate = job.job.rateReport.currentKiBPerSec;
357        if (kbRate == null) {
358                kbRate = -1;
359        }
360        jStatus.setCurrentProcessedKBPerSec(kbRate);
361
362        Integer processedKBPerSec = job.job.rateReport.averageKiBPerSec;
363        if (processedKBPerSec == null) {
364            processedKBPerSec = -1;
365        }
366        jStatus.setProcessedKBPerSec(processedKBPerSec);
367
368        Long discoveredFilesCount = job.job.uriTotalsReport.totalUriCount;
369        if (discoveredFilesCount == null) {
370                discoveredFilesCount = -1L;
371        }
372        jStatus.setDiscoveredFilesCount(discoveredFilesCount);
373
374        Long downloadedCount = job.job.uriTotalsReport.downloadedUriCount;
375        if (downloadedCount == null) {
376                downloadedCount = -1L;
377        }
378        jStatus.setDownloadedFilesCount(downloadedCount);
379/*
38027 queues: 5 active (1 in-process; 0 ready; 4 snoozed); 0 inactive; 0 retired; 22 exhausted
381*/
382        String frontierShortReport = String.format("%d queues: %d active (%d in-process; %d ready; %d snoozed); %d inactive; %d retired; %d exhausted",
383                        job.job.frontierReport.totalQueues,
384                        job.job.frontierReport.activeQueues,
385                        job.job.frontierReport.inProcessQueues,
386                        job.job.frontierReport.readyQueues,
387                        job.job.frontierReport.snoozedQueues,
388                        job.job.frontierReport.inactiveQueues,
389                        job.job.frontierReport.retiredQueues,
390                        job.job.frontierReport.exhaustedQueues);
391        jStatus.setFrontierShortReport(frontierShortReport);
392
393        String newStatus = "?";
394        String StringValue = job.job.crawlControllerState;
395        if (StringValue != null) {
396            newStatus = (String) StringValue;
397        }
398        jStatus.setStatus(newStatus);
399        String status = (String) StringValue;
400        if (status.contains("PAUSE")) { // FIXME this is not correct
401            cpm.setStatus(CrawlStatus.CRAWLER_PAUSED);
402        } else {
403            cpm.setStatus(CrawlStatus.CRAWLER_ACTIVE);
404        }
405
406        //Integer currentActiveToecount = job.job.threadReport.toeCount;
407        /*
408        Integer currentActiveToecount = null;
409        Iterator<String> iter = job.job.threadReport.processors.iterator();
410        String tmpStr;
411        int idx;
412        while (currentActiveToecount == null && iter.hasNext()) {
413                tmpStr = iter.next();
414                idx = tmpStr.indexOf(" noActiveProcessor");
415                if (idx != -1) {
416                        currentActiveToecount = Integer.parseInt(tmpStr.substring(0, idx).trim());
417                }
418        }
419        */
420        Integer currentActiveToecount = job.job.loadReport.busyThreads;
421        if (currentActiveToecount == null) {
422            currentActiveToecount = -1;
423        }
424        jStatus.setActiveToeCount(currentActiveToecount);
425    }
426
427    /**
428     * Generates a full frontier report.
429     *
430     * @return a Full frontier report.
431     */
432    public FullFrontierReport getFullFrontierReport() {
433        //FIXME get frontier report from H3 using an appropriate REST call.
434        // Is the following OK: No!!!
435        //https://localhost:8444/engine/job/testjob/jobdir/20150210135411/reports/frontier-summary-report.txt
436        
437        return null;
438        /*              
439        return FullFrontierReport.parseContentsAsString(
440                jobName,
441                (String) executeOperationNoRetry(crawlServiceJobBeanName,
442                        CrawlServiceJobOperation.frontierReport.name(), "all"));
443                        
444        */                
445    }
446
447    /**
448     * Periodically scans the crawl dir to see if Heritrix has finished generating the crawl reports. The time to wait
449     * is bounded by {@link HarvesterSettings#WAIT_FOR_REPORT_GENERATION_TIMEOUT}.
450     * Currently not used
451     * 
452     * @param crawlDir the crawl directory to scan.
453     */
454    @Deprecated
455    private void waitForReportGeneration(File crawlDir) {
456        log.info("Started waiting for Heritrix report generation.");
457
458        long currentTime = System.currentTimeMillis();
459        long waitSeconds = Settings.getLong(Heritrix3Settings.WAIT_FOR_REPORT_GENERATION_TIMEOUT);
460        long waitDeadline = currentTime + TimeUtils.SECOND_IN_MILLIS * waitSeconds;
461
462       
463        // While the deadline is not attained, periodically perform the
464        // following checks:
465        // 1) Verify that the crawl job MBean still exists. If not then
466        // the job is over, no need to wait more and exit the loop.
467        // 2) Read the job(s status. Since Heritrix 1.14.4, a FINISHED status
468        // guarantees that all reports have been generated. In this case
469        // exit the loop.
470        while (currentTime <= waitDeadline) {
471            currentTime = System.currentTimeMillis();
472            
473            //FIXME see if job is finished, if so, reports can be considered to ready as well?????
474            try {
475                // Wait 20 seconds
476                Thread.sleep(20 * TimeUtils.SECOND_IN_MILLIS);
477            } catch (InterruptedException e) {
478                log.trace("Received InterruptedException", e);
479            }
480        }
481        log.info("Waited {} for report generation. Will proceed with cleanup.", StringUtils.formatDuration(waitSeconds));
482    }
483    
484    @Override
485    public boolean atFinish() {
486        throw new NotImplementedException("Not implemented");
487    }
488
489    @Override
490    public void beginCrawlStop() {
491        throw new NotImplementedException("Not implemented");
492    }
493
494    @Override
495    public void cleanup() {
496        throw new NotImplementedException("Not implemented");
497    }
498
499    @Override
500    public boolean crawlIsEnded() {
501        throw new NotImplementedException("Not implemented");
502    }
503
504    @Override
505    public int getActiveToeCount() {
506        throw new NotImplementedException("Not implemented");
507    }
508
509    @Override
510    public int getCurrentProcessedKBPerSec() {
511        throw new NotImplementedException("Not implemented");
512    }
513
514    @Override
515    public String getHarvestInformation() {
516        throw new NotImplementedException("Not implemented");
517    }
518
519    @Override
520    public String getProgressStats() {
521        throw new NotImplementedException("Not implemented");
522    }
523
524    @Override
525    public long getQueuedUriCount() {
526        throw new NotImplementedException("Not implemented");
527    }
528
529    @Override
530    public boolean isPaused() {
531        throw new NotImplementedException("Not implemented");
532    }
533
534}