001package dk.netarkivet.harvester.heritrix3; 002 003import java.io.File; 004import java.util.ArrayList; 005import java.util.List; 006 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009 010import dk.netarkivet.common.distribute.JMSConnection; 011import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 012import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient; 013import dk.netarkivet.common.exceptions.ArgumentNotValid; 014import dk.netarkivet.common.exceptions.IOFailure; 015import dk.netarkivet.common.utils.ExceptionUtils; 016import dk.netarkivet.common.utils.NotificationType; 017import dk.netarkivet.common.utils.NotificationsFactory; 018import dk.netarkivet.common.utils.Settings; 019import dk.netarkivet.harvester.datamodel.JobStatus; 020import dk.netarkivet.harvester.harvesting.PersistentJobData; 021import dk.netarkivet.harvester.harvesting.distribute.CrawlStatusMessage; 022import dk.netarkivet.harvester.harvesting.report.DomainStatsReport; 023import dk.netarkivet.harvester.harvesting.report.HarvestReport; 024import dk.netarkivet.harvester.heritrix3.report.HarvestReportFactory; 025import dk.netarkivet.harvester.heritrix3.report.HarvestReportGenerator; 026 027public class PostProcessing { 028 029 /** The logger to use. */ 030 private static final Logger log = LoggerFactory.getLogger(PostProcessing.class); 031 032 /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */ 033 private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5; 034 035 /** The JMSConnection to use. */ 036 private JMSConnection jmsConnection; 037 038 /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */ 039 private HarvesterArcRepositoryClient arcRepController; 040 041 /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */ 042 private static PostProcessing instance; 043 044 /** 045 * Private constructor controlled by getInstance(). 046 */ 047 private PostProcessing(JMSConnection jmsConnection) { 048 arcRepController = ArcRepositoryClientFactory.getHarvesterInstance(); 049 this.jmsConnection = jmsConnection; 050 } 051 052 /** 053 * Get the instance of the singleton HarvestController. 054 * 055 * @return The singleton instance. 056 */ 057 public static synchronized PostProcessing getInstance(JMSConnection jmsConnection) { 058 if (instance == null) { 059 instance = new PostProcessing(jmsConnection); 060 } 061 return instance; 062 } 063 064 /** 065 * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be 066 * used after this method has been called. After this has been called, new calls to getInstance will return a new 067 * instance. 068 */ 069 public void cleanup() { 070 if (arcRepController != null) { 071 arcRepController.close(); 072 } 073 resetInstance(); 074 } 075 076 /** 077 * Reset the singleton instance. 078 */ 079 private static void resetInstance() { 080 instance = null; 081 } 082 083 /** 084 * Looks for old job directories that await uploading of data. 085 * The existence of the harvestInfo.xml in the 086 */ 087 public void processOldJobs() { 088 // Search through all crawldirs and process PersistentJobData 089 // files in them 090 File crawlDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_SERVERDIR)); 091 log.info("Looking for unprocessed crawldata in '{}'",crawlDir ); 092 File[] subdirs = crawlDir.listFiles(); 093 for (File oldCrawlDir : subdirs) { 094 if (PersistentJobData.existsIn(oldCrawlDir)) { 095 // Assume that crawl had not ended at this point so 096 // job must be marked as failed 097 final String msg = "Found old unprocessed job data in dir '" + oldCrawlDir.getAbsolutePath() 098 + "'. Crawl probably interrupted by " + "shutdown of HarvestController. " + "Processing data."; 099 log.warn(msg); 100 NotificationsFactory.getInstance().notify(msg, NotificationType.WARNING); 101 doPostProcessing(oldCrawlDir, new IOFailure("Crawl probably interrupted by " 102 + "shutdown of HarvestController")); 103 } 104 } 105 } 106 107 /** 108 * Do postprocessing of data in a crawldir.</br> 109 * 1. Retrieve jobID, and crawlDir from the harvestInfoFile using class PersistentJobData</br> 110 * 2. finds JobId and arcsdir</br> 111 * 3. calls storeArcFiles</br> 112 * 4. moves harvestdir to oldjobs and deletes crawl.log and other superfluous files. 113 * 114 * @param crawlDir The location of harvest-info to be processed 115 * @param crawlException any exceptions thrown by the crawl which need to be reported back to the scheduler (may be 116 * null for success) 117 * @throws IOFailure if the file cannot be read 118 */ 119 public void doPostProcessing(File crawlDir, Throwable crawlException) throws IOFailure { 120 log.debug("Post-processing files in '{}'", crawlDir.getAbsolutePath()); 121 if (!PersistentJobData.existsIn(crawlDir)) { 122 throw new IOFailure("No harvestInfo found in directory: " + crawlDir.getAbsolutePath()); 123 } 124 125 PersistentJobData harvestInfo = new PersistentJobData(crawlDir); 126 Long jobID = harvestInfo.getJobID(); 127 128 StringBuilder errorMessage = new StringBuilder(); 129 HarvestReport dhr = null; 130 List<File> failedFiles = new ArrayList<File>(); 131 132 Heritrix3Files files = Heritrix3Files.getH3HeritrixFiles(crawlDir, harvestInfo); 133 134 try { 135 log.info("Store files in directory '{}' " + "from jobID: {}.", crawlDir, jobID); 136 dhr = storeFiles(files, errorMessage, failedFiles); 137 } catch (Exception e) { 138 String msg = "Trouble during postprocessing of files in '" + crawlDir.getAbsolutePath() + "'"; 139 log.warn(msg, e); 140 errorMessage.append(e.getMessage()).append("\n"); 141 // send a mail about this problem 142 NotificationsFactory.getInstance().notify( 143 msg + ". Errors accumulated during the postprocessing: " + errorMessage.toString(), 144 NotificationType.ERROR, e); 145 } finally { 146 // Send a done or failed message back to harvest scheduler 147 // FindBugs claims a load of known null value here, but that 148 // will not be the case if storeFiles() succeeds. 149 CrawlStatusMessage csm; 150 151 if (crawlException == null && errorMessage.length() == 0) { 152 log.info("Job with ID {} finished with status DONE", jobID); 153 csm = new CrawlStatusMessage(jobID, JobStatus.DONE, dhr); 154 } else { 155 log.warn("Job with ID {} finished with status FAILED", jobID); 156 csm = new CrawlStatusMessage(jobID, JobStatus.FAILED, dhr); 157 setErrorMessages(csm, crawlException, errorMessage.toString(), dhr == null, failedFiles.size()); 158 } 159 try { 160 if (jmsConnection != null) { 161 jmsConnection.send(csm); 162 } else { 163 log.error("Message not sent, as jmsConnection variable was null!"); 164 } 165 if (crawlException == null && errorMessage.length() == 0) { 166 log.info("Deleting final logs"); 167 files.deleteFinalLogs(); 168 } 169 } finally { 170 // Delete superfluous files and move the rest to oldjobs. 171 // Cleanup is in an extra finally, because it consists of large amounts 172 // of data we need to remove, even on send trouble. 173 File oldJobsdir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_OLDJOBSDIR)); 174 log.info("Cleanup after harvesting job with id '{}' and moving the rest of the job to oldjobsdir '{}' ", jobID, oldJobsdir); 175 files.cleanUpAfterHarvest(oldJobsdir); 176 } 177 } 178 log.info("Done post-processing files for job {} in dir: '{}'", jobID, crawlDir.getAbsolutePath()); 179 } 180 181 /** 182 * Adds error messages from an exception to the status message errors. 183 * 184 * @param csm The message we're setting messages on. 185 * @param crawlException The exception that got thrown from further in, possibly as far in as Heritrix. 186 * @param errorMessage Description of errors that happened during upload. 187 * @param missingHostsReport If true, no hosts report was found. 188 * @param failedFiles List of files that failed to upload. 189 */ 190 private void setErrorMessages(CrawlStatusMessage csm, Throwable crawlException, String errorMessage, 191 boolean missingHostsReport, int failedFiles) { 192 if (crawlException != null) { 193 csm.setHarvestErrors(crawlException.toString()); 194 csm.setHarvestErrorDetails(ExceptionUtils.getStackTrace(crawlException)); 195 } 196 if (errorMessage.length() > 0) { 197 String shortDesc = ""; 198 if (missingHostsReport) { 199 shortDesc = "No hosts report found"; 200 } 201 if (failedFiles > 0) { 202 if (shortDesc.length() > 0) { 203 shortDesc += ", "; 204 } 205 shortDesc += failedFiles + " files failed to upload"; 206 } 207 csm.setUploadErrors(shortDesc); 208 csm.setUploadErrorDetails(errorMessage); 209 } 210 } 211 212 /** 213 * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files 214 * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object 215 * which is sent along in the crawlstatusmessage. 216 * <p> 217 * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts. 218 * 219 * @param files The HeritrixFiles object for this crawl. Not Null. 220 * @param errorMessage A place where error messages accumulate. Not Null. 221 * @param failedFiles List of files that failed to upload. Not Null. 222 * @return An object containing info about the domains harvested. 223 * @throws ArgumentNotValid if an argument isn't valid. 224 */ 225 private HarvestReport storeFiles(Heritrix3Files files, StringBuilder errorMessage, List<File> failedFiles) 226 throws ArgumentNotValid { 227 ArgumentNotValid.checkNotNull(files, "Heritrix3Files files"); 228 ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage"); 229 ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles"); 230 long jobID = files.getJobID(); 231 log.info("Store the files from harvest in '{}'", files.getCrawlDir()); 232 try { 233 IngestableFiles inf = new IngestableFiles(files); 234 235 inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS); 236 // Create a metadata ARC file 237 HarvestDocumentation.documentHarvest(inf); 238 // Upload all files 239 240 // Check, if arcsdir or warcsdir is empty 241 // Send a notification, if this is the case 242 if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) { 243 String errMsg = "Probable error in Heritrix job setup. " 244 + "No arcfiles or warcfiles generated by Heritrix for job " + jobID; 245 log.warn(errMsg); 246 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 247 } else { 248 if (!inf.getArcFiles().isEmpty()) { 249 uploadFiles(inf.getArcFiles(), errorMessage, failedFiles); 250 } 251 if (!inf.getWarcFiles().isEmpty()) { 252 uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles); 253 } 254 } 255 256 // Now the ARC/WARC files have been uploaded, 257 // we finally upload the metadata archive file. 258 uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles); 259 260 // Make the harvestReport ready for uploading 261 DomainStatsReport dsr = HarvestReportGenerator.getDomainStatsReport(files); 262 263 //new DomainStatsReport(hrg.getDomainStatsMap(), hrg.getDefaultStopReason()); 264 return HarvestReportFactory.generateHarvestReport(dsr); 265 } catch (IOFailure e) { 266 String errMsg = "IOFailure occurred, while trying to upload files"; 267 log.warn(errMsg, e); 268 throw new IOFailure(errMsg, e); 269 } 270 } 271 272 /** 273 * Upload given files to the archive repository. 274 * 275 * @param files List of (ARC/WARC) files to upload. 276 * @param errorMessage Accumulator for error messages. 277 * @param failedFiles Accumulator for failed files. 278 */ 279 private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) { 280 // Upload all archive files 281 if (files != null) { 282 for (File f : files) { 283 try { 284 log.info("Uploading file '{}' to arcrepository.", f.getName()); 285 arcRepController.store(f); 286 log.info("File '{}' uploaded successfully to arcrepository.", f.getName()); 287 } catch (Exception e) { 288 File oldJobsDir = new File(Settings.get(Heritrix3Settings.HARVEST_CONTROLLER_OLDJOBSDIR)); 289 String errorMsg = "Error uploading arcfile '" + f.getAbsolutePath() + "' Will be moved to '" 290 + oldJobsDir.getAbsolutePath() + "'"; 291 errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n"); 292 log.warn(errorMsg, e); 293 failedFiles.add(f); 294 } 295 } 296 } 297 } 298 299}