001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.heritrix3; 024 025import java.io.File; 026import java.util.ArrayList; 027import java.util.List; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import dk.netarkivet.common.distribute.JMSConnection; 033import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 034import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient; 035import dk.netarkivet.common.exceptions.ArgumentNotValid; 036import dk.netarkivet.common.exceptions.IOFailure; 037import dk.netarkivet.common.utils.ExceptionUtils; 038import dk.netarkivet.common.utils.NotificationType; 039import dk.netarkivet.common.utils.NotificationsFactory; 040import dk.netarkivet.common.utils.Settings; 041import dk.netarkivet.harvester.HarvesterSettings; 042import dk.netarkivet.harvester.datamodel.JobStatus; 043import dk.netarkivet.harvester.harvesting.PersistentJobData; 044import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 045import dk.netarkivet.harvester.harvesting.report.DomainStatsReport; 046import dk.netarkivet.harvester.harvesting.report.HarvestReport; 047import dk.netarkivet.harvester.heritrix3.report.HarvestReportFactory; 048import dk.netarkivet.harvester.heritrix3.report.HarvestReportGenerator; 049 050public class PostProcessing { 051 052 /** The logger to use. */ 053 private static final Logger log = LoggerFactory.getLogger(PostProcessing.class); 054 055 /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */ 056 private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5; 057 058 /** The JMSConnection to use. */ 059 private JMSConnection jmsConnection; 060 061 /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */ 062 private HarvesterArcRepositoryClient arcRepController; 063 064 /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */ 065 private static PostProcessing instance; 066 067 /** 068 * Private constructor controlled by getInstance(). 069 */ 070 private PostProcessing(JMSConnection jmsConnection) { 071 arcRepController = ArcRepositoryClientFactory.getHarvesterInstance(); 072 this.jmsConnection = jmsConnection; 073 } 074 075 /** 076 * Get the instance of the singleton HarvestController. 077 * 078 * @return The singleton instance. 079 */ 080 public static synchronized PostProcessing getInstance(JMSConnection jmsConnection) { 081 if (instance == null) { 082 instance = new PostProcessing(jmsConnection); 083 } 084 return instance; 085 } 086 087 /** 088 * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be 089 * used after this method has been called. After this has been called, new calls to getInstance will return a new 090 * instance. 091 */ 092 public void cleanup() { 093 if (arcRepController != null) { 094 arcRepController.close(); 095 } 096 resetInstance(); 097 } 098 099 /** 100 * Reset the singleton instance. 101 */ 102 private static void resetInstance() { 103 instance = null; 104 } 105 106 /** 107 * Looks for old job directories that await uploading of data. 108 * The existence of the harvestInfo.xml in the 109 */ 110 public void processOldJobs() { 111 // Search through all crawldirs and process PersistentJobData 112 // files in them 113 File crawlDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_SERVERDIR)); 114 log.info("Looking for unprocessed crawldata in '{}'",crawlDir ); 115 File[] subdirs = crawlDir.listFiles(); 116 for (File oldCrawlDir : subdirs) { 117 if (PersistentJobData.existsIn(oldCrawlDir)) { 118 // Assume that crawl had not ended at this point so 119 // job must be marked as failed 120 final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath() 121 + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data."; 122 log.warn(msg); 123 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 124 doPostProcessing(oldCrawlDir, new IOFailure("Crawl probably interrupted by " 125 + "shutdown of HarvestController")); 126 } 127 } 128 } 129 130 /** 131 * Do postprocessing of data in a crawldir.</br> 132 * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br> 133 * 2. finds JobId and arcsdir</br> 134 * 3. calls storeArcFiles</br> 135 * 4. moves harvestdir to oldjobs and deletes crawl.log and other superfluous files. 136 * 137 * @param crawlDir The location of harvest-info to be processed 138 * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be 139 * null for success) 140 * @throws IOFailure if the harvestInfo.xml file cannot be read 141 */ 142 public void doPostProcessing(File crawlDir, Throwable crawlException) throws IOFailure { 143 File harvestInfoFile = PersistentJobData.getHarvestInfoFile(crawlDir); 144 log.debug("Post-processing files in directory '{}' based on the harvestInfofile '{}'", crawlDir.getAbsolutePath(), harvestInfoFile); 145 146 if (!harvestInfoFile.exists()) { 147 throw new IOFailure("Critical error: No '" + harvestInfoFile.getName() + "' found in directory: '" + crawlDir.getAbsolutePath() + "'"); 148 } 149 150 PersistentJobData harvestInfo = new PersistentJobData(crawlDir); 151 Long jobID = harvestInfo.getJobID(); 152 153 StringBuilder errorMessage = new StringBuilder(); 154 HarvestReport dhr = null; 155 List<File> failedFiles = new ArrayList<File>(); 156 157 Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawlDir, harvestInfo); 158 159 try { 160 log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID); 161 dhr = storeFiles(files, errorMessage, failedFiles); 162 } catch (Exception e) { 163 String msg = "Trouble occurred during postprocessing (including upload of files) in '" + crawlDir.getAbsolutePath() + "'"; 164 log.warn(msg, e); 165 errorMessage.append(e.getMessage()).append("\n"); 166 // send a mail about this problem 167 NotificationsFactory.getInstance().notify( 168 msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(), 169 NotificationType.ERROR, e); 170 } finally { 171 // Send a done or failed message back to harvest scheduler 172 // FindBugs claims a load of known null value here, but that 173 // will not be the case if storeFiles() succeeds. 174 CrawlStatusMessage csm; 175 176 if (crawlException == null && errorMessage.length() == 0) { 177 log.info("Job with ID {} finished with status DONE", jobID); 178 csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr); 179 } else { 180 log.warn("Job with ID {} finished with status FAILED", jobID); 181 csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr); 182 setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size()); 183 } 184 185 try { // TODO What kind of errors are we actually catching here if any 186 if (jmsConnection != null) { 187 jmsConnection.send(csm); 188 } else { 189 log.error("CrawlStatusMessage was not sent, as jmsConnection variable was null!"); 190 } 191 if (crawlException == null && errorMessage.length() == 0) { // and the message is sent without throwing an exception 192 log.info("Deleting crawl.log and progressstatistics.log for job {} ", jobID); 193 files.deleteFinalLogs(); 194 } 195 } finally { 196 // Delete superfluous files and move the rest to oldjobs. 197 // Cleanup is in an extra finally, because it consists of large amounts 198 // of data we need to remove, even on send trouble. 199 File oldJobsdir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)); 200 log.info("Now doing cleanup after harvesting job with id '{}' and moving the rest of the job to oldjobsdir '{}' ", jobID, oldJobsdir); 201 files.cleanUpAfterHarvest(oldJobsdir); 202 } 203 } 204 log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath()); 205 } 206 207 /** 208 * Adds error messages from an exception to the status message errors. 209 * 210 * @param csm The message we're setting messages on. 211 * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix. 212 * @param errorMessage Description of errors that happened during upload. 213 * @param missingHostsReport If true, no hosts report was found. 214 * @param failedFiles List of files that failed to upload. 215 */ 216 private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage, 217 boolean missingHostsReport, int failedFiles) { 218 if (crawlException != null) { 219 csm.setHarvestErrors(crawlException.toString()); 220 csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException)); 221 } 222 if (errorMessage.length() > 0) { 223 String shortDesc = ""; 224 if (missingHostsReport) { 225 shortDesc = "No hosts report found"; 226 } 227 if (failedFiles > 0) { 228 if (shortDesc.length() > 0) { 229 shortDesc += ", "; 230 } 231 shortDesc += failedFiles + " files failed to upload"; 232 } 233 csm.setUploadErrors(shortDesc); 234 csm.setUploadErrorDetails(errorMessage); 235 } 236 } 237 238 /** 239 * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files 240 * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object 241 * which is sent along in the crawlstatusmessage. 242 * <p> 243 * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts. 244 * 245 * @param files The HeritrixFiles object for this crawl. Not Null. 246 * @param errorMessage A place where error messages accumulate. Not Null. 247 * @param failedFiles List of files that failed to upload. Not Null. 248 * @return An object containing info about the domains harvested. 249 * @throws ArgumentNotValid if an argument isn't valid. 250 */ 251 private HarvestReport storeFiles(Heritrix3Files files, StringBuilder errorMessage, List<File> failedFiles) 252 throws ArgumentNotValid { 253 ArgumentNotValid.checkNotNull(files, "Heritrix3Files files"); 254 ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage"); 255 ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles"); 256 long jobID = files.getJobID(); 257 log.info("Store the files from harvest in '{}'", files.getCrawlDir()); 258 try { 259 IngestableFiles inf = new IngestableFiles(files); 260 261 inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS); 262 // Create a metadata archive file 263 HarvestDocumentation.documentHarvest(inf); 264 // Upload all files 265 266 // Check, if arcsdir or warcsdir is empty 267 // Send a notification, if this is the case 268 if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) { 269 String errMsg = "Probable error in Heritrix job setup. " 270 + "No arcfiles or warcfiles generated by Heritrix for job " + jobID; 271 log.warn(errMsg); 272 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 273 } else { 274 if (!inf.getArcFiles().isEmpty()) { 275 log.info("Beginning upload of {} ARC files", inf.getArcFiles().size()); 276 uploadFiles(inf.getArcFiles(), errorMessage, failedFiles); 277 } 278 if (!inf.getWarcFiles().isEmpty()) { 279 log.info("Beginning upload of {} WARC files", inf.getWarcFiles().size()); 280 uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles); 281 } 282 } 283 284 // Now the ARC/WARC files have been uploaded, 285 // we finally upload the metadata archive file. 286 log.info("Beginning upload of the {} metadafile(s) ", inf.getMetadataArcFiles().size()); 287 uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles); 288 289 // Make the harvestReport ready for transfer back to the scheduler 290 DomainStatsReport dsr = HarvestReportGenerator.getDomainStatsReport(files); 291 292 return HarvestReportFactory.generateHarvestReport(dsr); 293 } catch (IOFailure e) { 294 String errMsg = "IOFailure occurred, while trying to upload files"; 295 log.warn(errMsg, e); 296 throw new IOFailure(errMsg, e); 297 } 298 } 299 300 /** 301 * Upload given files to the archive repository. 302 * 303 * @param files List of (ARC/WARC) files to upload. 304 * @param errorMessage Accumulator for error messages. 305 * @param failedFiles Accumulator for failed files. 306 */ 307 private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) { 308 // Upload all archive files 309 if (files != null) { 310 int count=0; 311 for (File f : files) { 312 count++; 313 try { 314 log.info("Uploading file #{} - '{}' to arcrepository.", count, f.getName()); 315 arcRepController.store(f); 316 log.info("File '{}' uploaded successfully to the arcrepository.", f.getName()); 317 } catch (Exception e) { 318 File oldJobsDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)); 319 String errorMsg = "Error uploading file '" + f.getAbsolutePath() + "' Will be moved to the oldjobs directory '" 320 + oldJobsDir.getAbsolutePath() + "'"; 321 errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n"); 322 log.warn(errorMsg, e); 323 failedFiles.add(f); 324 } 325 } 326 } 327 } 328 329}