001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.heritrix3; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.List; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import dk.netarkivet.common.distribute.JMSConnection; 033import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 034import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient; 035import dk.netarkivet.common.exceptions.ArgumentNotValid; 036import dk.netarkivet.common.exceptions.IOFailure; 037import dk.netarkivet.common.utils.ExceptionUtils; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.NotificationsFactory; 040import dk.netarkivet.common.utils.Settings; 041import dk.netarkivet.harvester.HarvesterSettings; 042import dk.netarkivet.harvester.datamodel.JobStatus; 043import dk.netarkivet.harvester.harvesting.PersistentJobData; 044import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 045import dk.netarkivet.harvester.harvesting.report.DomainStatsReport; 046import dk.netarkivet.harvester.harvesting.report.HarvestReport; 047import dk.netarkivet.harvester.heritrix3.report.HarvestReportFactory; 048import dk.netarkivet.harvester.heritrix3.report.HarvestReportGenerator; 049 050public class PostProcessing { 051 052 /** The logger to use. */ 053 private static final Logger log = LoggerFactory.getLogger(PostProcessing.class); 054 055 /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */ 056 private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5; 057 058 /** The JMSConnection to use. */ 059 private JMSConnection jmsConnection; 060 061 /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */ 062 private HarvesterArcRepositoryClient arcRepController; 063 064 /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */ 065 private static PostProcessing instance; 066 067 /** 068 * Private constructor controlled by getInstance(). 069 */ 070 private PostProcessing(JMSConnection jmsConnection) { 071 arcRepController = ArcRepositoryClientFactory.getHarvesterInstance(); 072 this.jmsConnection = jmsConnection; 073 } 074 075 /** 076 * Get the instance of the singleton HarvestController. 077 * 078 * @return The singleton instance. 079 */ 080 public static synchronized PostProcessing getInstance(JMSConnection jmsConnection) { 081 if (instance == null) { 082 instance = new PostProcessing(jmsConnection); 083 } 084 return instance; 085 } 086 087 /** 088 * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be 089 * used after this method has been called. After this has been called, new calls to getInstance will return a new 090 * instance. 091 */ 092 public void cleanup() { 093 if (arcRepController != null) { 094 arcRepController.close(); 095 } 096 resetInstance(); 097 } 098 099 /** 100 * Reset the singleton instance. 101 */ 102 private static void resetInstance() { 103 instance = null; 104 } 105 106 /** 107 * Looks for old job directories that await uploading of data. 108 * The existence of the harvestInfo.xml in the 109 */ 110 public void processOldJobs() { 111 // Search through all crawldirs and process PersistentJobData 112 // files in them 113 File crawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 114 log.info("Looking for unprocessed crawldata in '{}'",crawlDir ); 115 File[] subdirs = crawlDir.listFiles(); 116 for (File oldCrawlDir : subdirs) { 117 if (PersistentJobData.existsIn(oldCrawlDir)) { 118 // Assume that crawl had not ended at this point so 119 // job must be marked as failed 120 final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath() 121 + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data."; 122 log.warn(msg); 123 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 124 doPostProcessing(oldCrawlDir, new IOFailure("Crawl probably interrupted by " 125 + "shutdown of HarvestController")); 126 } 127 } 128 } 129 130 /** 131 * Do postprocessing of data in a crawldir.</br> 132 * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br> 133 * 2. finds JobId and arcsdir</br> 134 * 3. calls storeArcFiles</br> 135 * 4. moves harvestdir to oldjobs and deletes crawl.log and other superfluous files. 136 * 137 * @param crawlDir The location of harvest-info to be processed 138 * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be 139 * null for success) 140 * @throws IOFailure if the file cannot be read 141 */ 142 public void doPostProcessing(File crawlDir, Throwable crawlException) throws IOFailure { 143 log.debug("Post-processing files in '{}'", crawlDir.getAbsolutePath()); 144 if (!PersistentJobData.existsIn(crawlDir)) { 145 throw new IOFailure("No harvestInfo found in directory: " + crawlDir.getAbsolutePath()); 146 } 147 148 PersistentJobData harvestInfo = new PersistentJobData(crawlDir); 149 Long jobID = harvestInfo.getJobID(); 150 151 StringBuilder errorMessage = new StringBuilder(); 152 HarvestReport dhr = null; 153 List<File> failedFiles = new ArrayList<File>(); 154 155 Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawlDir, harvestInfo); 156 157 try { 158 log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID); 159 dhr = storeFiles(files, errorMessage, failedFiles); 160 } catch (Exception e) { 161 String msg = "Trouble during postprocessing of files in '" + crawlDir.getAbsolutePath() + "'"; 162 log.warn(msg, e); 163 errorMessage.append(e.getMessage()).append("\n"); 164 // send a mail about this problem 165 NotificationsFactory.getInstance().notify( 166 msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(), 167 NotificationType.ERROR, e); 168 } finally { 169 // Send a done or failed message back to harvest scheduler 170 // FindBugs claims a load of known null value here, but that 171 // will not be the case if storeFiles() succeeds. 172 CrawlStatusMessage csm; 173 174 if (crawlException == null && errorMessage.length() == 0) { 175 log.info("Job with ID {} finished with status DONE", jobID); 176 csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr); 177 } else { 178 log.warn("Job with ID {} finished with status FAILED", jobID); 179 csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr); 180 setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size()); 181 } 182 try { 183 if (jmsConnection != null) { 184 jmsConnection.send(csm); 185 } else { 186 log.error("Message not sent, as jmsConnection variable was null!"); 187 } 188 if (crawlException == null && errorMessage.length() == 0) { 189 log.info("Deleting final logs"); 190 files.deleteFinalLogs(); 191 } 192 } finally { 193 // Delete superfluous files and move the rest to oldjobs. 194 // Cleanup is in an extra finally, because it consists of large amounts 195 // of data we need to remove, even on send trouble. 196 File oldJobsdir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)); 197 log.info("Cleanup after harvesting job with id '{}' and moving the rest of the job to oldjobsdir '{}' ", jobID, oldJobsdir); 198 files.cleanUpAfterHarvest(oldJobsdir); 199 } 200 } 201 log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath()); 202 } 203 204 /** 205 * Adds error messages from an exception to the status message errors. 206 * 207 * @param csm The message we're setting messages on. 208 * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix. 209 * @param errorMessage Description of errors that happened during upload. 210 * @param missingHostsReport If true, no hosts report was found. 211 * @param failedFiles List of files that failed to upload. 212 */ 213 private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage, 214 boolean missingHostsReport, int failedFiles) { 215 if (crawlException != null) { 216 csm.setHarvestErrors(crawlException.toString()); 217 csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException)); 218 } 219 if (errorMessage.length() > 0) { 220 String shortDesc = ""; 221 if (missingHostsReport) { 222 shortDesc = "No hosts report found"; 223 } 224 if (failedFiles > 0) { 225 if (shortDesc.length() > 0) { 226 shortDesc += ", "; 227 } 228 shortDesc += failedFiles + " files failed to upload"; 229 } 230 csm.setUploadErrors(shortDesc); 231 csm.setUploadErrorDetails(errorMessage); 232 } 233 } 234 235 /** 236 * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files 237 * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object 238 * which is sent along in the crawlstatusmessage. 239 * <p> 240 * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts. 241 * 242 * @param files The HeritrixFiles object for this crawl. Not Null. 243 * @param errorMessage A place where error messages accumulate. Not Null. 244 * @param failedFiles List of files that failed to upload. Not Null. 245 * @return An object containing info about the domains harvested. 246 * @throws ArgumentNotValid if an argument isn't valid. 247 */ 248 private HarvestReport storeFiles(Heritrix3Files files, StringBuilder errorMessage, List<File> failedFiles) 249 throws ArgumentNotValid { 250 ArgumentNotValid.checkNotNull(files, "Heritrix3Files files"); 251 ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage"); 252 ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles"); 253 long jobID = files.getJobID(); 254 log.info("Store the files from harvest in '{}'", files.getCrawlDir()); 255 try { 256 IngestableFiles inf = new IngestableFiles(files); 257 258 inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS); 259 // Create a metadata ARC file 260 HarvestDocumentation.documentHarvest(inf); 261 // Upload all files 262 263 // Check, if arcsdir or warcsdir is empty 264 // Send a notification, if this is the case 265 if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) { 266 String errMsg = "Probable error in Heritrix job setup. " 267 + "No arcfiles or warcfiles generated by Heritrix for job " + jobID; 268 log.warn(errMsg); 269 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 270 } else { 271 if (!inf.getArcFiles().isEmpty()) { 272 uploadFiles(inf.getArcFiles(), errorMessage, failedFiles); 273 } 274 if (!inf.getWarcFiles().isEmpty()) { 275 uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles); 276 } 277 } 278 279 // Now the ARC/WARC files have been uploaded, 280 // we finally upload the metadata archive file. 281 uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles); 282 283 // Make the harvestReport ready for uploading 284 DomainStatsReport dsr = HarvestReportGenerator.getDomainStatsReport(files); 285 286 //new DomainStatsReport(hrg.getDomainStatsMap(), hrg.getDefaultStopReason()); 287 return HarvestReportFactory.generateHarvestReport(dsr); 288 } catch (IOFailure e) { 289 String errMsg = "IOFailure occurred, while trying to upload files"; 290 log.warn(errMsg, e); 291 throw new IOFailure(errMsg, e); 292 } 293 } 294 295 /** 296 * Upload given files to the archive repository. 297 * 298 * @param files List of (ARC/WARC) files to upload. 299 * @param errorMessage Accumulator for error messages. 300 * @param failedFiles Accumulator for failed files. 301 */ 302 private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) { 303 // Upload all archive files 304 if (files != null) { 305 for (File f : files) { 306 try { 307 log.info("Uploading file '{}' to arcrepository.", f.getName()); 308 arcRepController.store(f); 309 log.info("File '{}' uploaded successfully to arcrepository.", f.getName()); 310 } catch (Exception e) { 311 File oldJobsDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)); 312 String errorMsg = "Error uploading arcfile '" + f.getAbsolutePath() + "' Will be moved to '" 313 + oldJobsDir.getAbsolutePath() + "'"; 314 errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n"); 315 log.warn(errorMsg, e); 316 failedFiles.add(f); 317 } 318 } 319 } 320 } 321 322}