001package dk.netarkivet.harvester.heritrix3;
002
003import java.io.File;
004import java.util.ArrayList;
005import java.util.List;
006
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009
010import dk.netarkivet.common.distribute.JMSConnection;
011import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
012import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient;
013import dk.netarkivet.common.exceptions.ArgumentNotValid;
014import dk.netarkivet.common.exceptions.IOFailure;
015import dk.netarkivet.common.utils.ExceptionUtils;
016import dk.netarkivet.common.utils.NotificationType;
017import dk.netarkivet.common.utils.NotificationsFactory;
018import dk.netarkivet.common.utils.Settings;
019import dk.netarkivet.harvester.datamodel.JobStatus;
020import dk.netarkivet.harvester.harvesting.PersistentJobData;
021import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage;
022import dk.netarkivet.harvester.harvesting.report.DomainStatsReport;
023import dk.netarkivet.harvester.harvesting.report.HarvestReport;
024import dk.netarkivet.harvester.heritrix3.report.HarvestReportFactory;
025import dk.netarkivet.harvester.heritrix3.report.HarvestReportGenerator;
026
027public class PostProcessing {
028
029    /** The logger to use. */
030    private static final Logger log = LoggerFactory.getLogger(PostProcessing.class);
031
032    /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */
033    private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5;
034
035    /** The JMSConnection to use. */
036    private JMSConnection jmsConnection;
037
038    /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */
039    private HarvesterArcRepositoryClient arcRepController;
040
041    /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */
042    private static PostProcessing instance;
043
044    /**
045     * Private constructor controlled by getInstance().
046     */
047    private PostProcessing(JMSConnection jmsConnection) {
048        arcRepController = ArcRepositoryClientFactory.getHarvesterInstance();
049        this.jmsConnection = jmsConnection;
050    }
051
052    /**
053     * Get the instance of the singleton HarvestController.
054     *
055     * @return The singleton instance.
056     */
057    public static synchronized PostProcessing getInstance(JMSConnection jmsConnection) {
058        if (instance == null) {
059            instance = new PostProcessing(jmsConnection);
060        }
061        return instance;
062    }
063
064    /**
065     * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be
066     * used after this method has been called. After this has been called, new calls to getInstance will return a new
067     * instance.
068     */
069    public void cleanup() {
070        if (arcRepController != null) {
071            arcRepController.close();
072        }
073        resetInstance();
074    }
075
076    /**
077     * Reset the singleton instance.
078     */
079    private static void resetInstance() {
080        instance = null;
081    }
082
083    /**
084     * Looks for old job directories that await uploading of data.
085     * The existence of the harvestInfo.xml in the 
086     */
087    public void processOldJobs() {
088        // Search through all crawldirs and process PersistentJobData
089        // files in them
090        File crawlDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_SERVERDIR));
091        log.info("Looking for unprocessed crawldata in '{}'",crawlDir );
092        File[] subdirs = crawlDir.listFiles();
093        for (File oldCrawlDir : subdirs) {
094            if (PersistentJobData.existsIn(oldCrawlDir)) {
095                // Assume that crawl had not ended at this point so
096                // job must be marked as failed
097                final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath()
098                        + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data.";
099                log.warn(msg);
100                NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING);
101                doPostProcessing(oldCrawlDir, new IOFailure("Crawl probably interrupted by "
102                        + "shutdown of HarvestController"));
103            }
104        }
105    }
106
107    /**
108     * Do postprocessing of data in a crawldir.</br>
109     * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br>
110     * 2. finds JobId and arcsdir</br> 
111     * 3. calls storeArcFiles</br> 
112     * 4. moves harvestdir to oldjobs and deletes crawl.log and other superfluous files.
113     *
114     * @param crawlDir The location of harvest-info to be processed
115     * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be
116     * null for success)
117     * @throws IOFailure if the file cannot be read
118     */
119    public void doPostProcessing(File crawlDir, Throwable crawlException) throws IOFailure {
120        log.debug("Post-processing files in '{}'", crawlDir.getAbsolutePath());
121        if (!PersistentJobData.existsIn(crawlDir)) {
122            throw new IOFailure("No harvestInfo found in directory: " + crawlDir.getAbsolutePath());
123        }
124
125        PersistentJobData harvestInfo = new PersistentJobData(crawlDir);
126        Long jobID = harvestInfo.getJobID();
127
128        StringBuilder errorMessage = new StringBuilder();
129        HarvestReport dhr = null;
130        List<File> failedFiles = new ArrayList<File>();
131
132        Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawlDir, harvestInfo);
133        
134        try {
135            log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID);
136            dhr = storeFiles(files, errorMessage, failedFiles);
137        } catch (Exception e) {
138            String msg = "Trouble during postprocessing of files in '" + crawlDir.getAbsolutePath() + "'";
139            log.warn(msg, e);
140            errorMessage.append(e.getMessage()).append("\n");
141            // send a mail about this problem
142            NotificationsFactory.getInstance().notify(
143                    msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(),
144                    NotificationType.ERROR, e);
145        } finally {
146            // Send a done or failed message back to harvest scheduler
147            // FindBugs claims a load of known null value here, but that
148            // will not be the case if storeFiles() succeeds.
149            CrawlStatusMessage csm;
150
151            if (crawlException == null && errorMessage.length() == 0) {
152                log.info("Job with ID {} finished with status DONE", jobID);
153                csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr);
154            } else {
155                log.warn("Job with ID {} finished with status FAILED", jobID);
156                csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr);
157                setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size());
158            }
159            try {
160                if (jmsConnection != null) {
161                        jmsConnection.send(csm); 
162                } else {
163                        log.error("Message not sent, as jmsConnection variable was null!");
164                }
165                if (crawlException == null && errorMessage.length() == 0) {
166                        log.info("Deleting final logs");
167                        files.deleteFinalLogs();
168                }
169            } finally {
170                // Delete superfluous files and move the rest to oldjobs.
171                // Cleanup is in an extra finally, because it consists of large amounts
172                // of data we need to remove, even on send trouble.
173                File oldJobsdir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_OLDJOBSDIR));
174                log.info("Cleanup after harvesting job with id '{}' and moving the rest of the job to oldjobsdir '{}' ", jobID, oldJobsdir);
175                files.cleanUpAfterHarvest(oldJobsdir);
176            }
177        }
178        log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath());
179    }
180
181    /**
182     * Adds error messages from an exception to the status message errors.
183     *
184     * @param csm The message we're setting messages on.
185     * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix.
186     * @param errorMessage Description of errors that happened during upload.
187     * @param missingHostsReport If true, no hosts report was found.
188     * @param failedFiles List of files that failed to upload.
189     */
190    private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage,
191            boolean missingHostsReport, int failedFiles) {
192        if (crawlException != null) {
193            csm.setHarvestErrors(crawlException.toString());
194            csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException));
195        }
196        if (errorMessage.length() > 0) {
197            String shortDesc = "";
198            if (missingHostsReport) {
199                shortDesc = "No hosts report found";
200            }
201            if (failedFiles > 0) {
202                if (shortDesc.length() > 0) {
203                    shortDesc += ", ";
204                }
205                shortDesc += failedFiles + " files failed to upload";
206            }
207            csm.setUploadErrors(shortDesc);
208            csm.setUploadErrorDetails(errorMessage);
209        }
210    }
211
212    /**
213     * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files
214     * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object
215     * which is sent along in the crawlstatusmessage.
216     * <p>
217     * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts.
218     *
219     * @param files The HeritrixFiles object for this crawl. Not Null.
220     * @param errorMessage A place where error messages accumulate. Not Null.
221     * @param failedFiles List of files that failed to upload. Not Null.
222     * @return An object containing info about the domains harvested.
223     * @throws ArgumentNotValid if an argument isn't valid.
224     */
225    private HarvestReport storeFiles(Heritrix3Files files, StringBuilder errorMessage, List<File> failedFiles)
226            throws ArgumentNotValid {
227        ArgumentNotValid.checkNotNull(files, "Heritrix3Files files");
228        ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage");
229        ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles");
230        long jobID = files.getJobID();
231        log.info("Store the files from harvest in '{}'", files.getCrawlDir());
232        try {
233            IngestableFiles inf = new IngestableFiles(files);
234
235            inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS);
236            // Create a metadata ARC file
237            HarvestDocumentation.documentHarvest(inf);
238            // Upload all files
239
240            // Check, if arcsdir or warcsdir is empty
241            // Send a notification, if this is the case
242            if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) {
243                String errMsg = "Probable error in Heritrix job setup. "
244                        + "No arcfiles or warcfiles generated by Heritrix for job " + jobID;
245                log.warn(errMsg);
246                NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING);
247            } else {
248                if (!inf.getArcFiles().isEmpty()) {
249                    uploadFiles(inf.getArcFiles(), errorMessage, failedFiles);
250                }
251                if (!inf.getWarcFiles().isEmpty()) {
252                    uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles);
253                }
254            }
255
256            // Now the ARC/WARC files have been uploaded,
257            // we finally upload the metadata archive file.
258            uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles);
259            
260            // Make the harvestReport ready for uploading 
261            DomainStatsReport dsr =  HarvestReportGenerator.getDomainStatsReport(files);
262                        
263            //new DomainStatsReport(hrg.getDomainStatsMap(), hrg.getDefaultStopReason()); 
264            return HarvestReportFactory.generateHarvestReport(dsr);
265        } catch (IOFailure e) {
266            String errMsg = "IOFailure occurred, while trying to upload files";
267            log.warn(errMsg, e);
268            throw new IOFailure(errMsg, e);
269        }
270    }
271
272    /**
273     * Upload given files to the archive repository.
274     *
275     * @param files List of (ARC/WARC) files to upload.
276     * @param errorMessage Accumulator for error messages.
277     * @param failedFiles Accumulator for failed files.
278     */
279    private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) {
280        // Upload all archive files
281        if (files != null) {
282            for (File f : files) {
283                try {
284                    log.info("Uploading file '{}' to arcrepository.", f.getName());
285                    arcRepController.store(f);
286                    log.info("File '{}' uploaded successfully to arcrepository.", f.getName());
287                } catch (Exception e) {
288                    File oldJobsDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_OLDJOBSDIR));
289                    String errorMsg = "Error uploading arcfile '" + f.getAbsolutePath() + "' Will be moved to '"
290                            + oldJobsDir.getAbsolutePath() + "'";
291                    errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n");
292                    log.warn(errorMsg, e);
293                    failedFiles.add(f);
294                }
295            }
296        }
297    }
298
299}