001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.harvester.harvesting; 025 026import java.io.BufferedReader; 027import java.io.File; 028import java.io.FileReader; 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Set; 034 035import org.apache.commons.io.IOUtils; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import dk.netarkivet.common.CommonSettings; 040import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 041import dk.netarkivet.common.distribute.arcrepository.BatchStatus; 042import dk.netarkivet.common.distribute.arcrepository.BitarchiveRecord; 043import dk.netarkivet.common.distribute.arcrepository.HarvesterArcRepositoryClient; 044import dk.netarkivet.common.distribute.indexserver.Index; 045import dk.netarkivet.common.distribute.indexserver.IndexClientFactory; 046import dk.netarkivet.common.distribute.indexserver.JobIndexCache; 047import dk.netarkivet.common.exceptions.ArgumentNotValid; 048import dk.netarkivet.common.exceptions.IOFailure; 049import dk.netarkivet.common.utils.FileUtils; 050import dk.netarkivet.common.utils.NotificationType; 051import dk.netarkivet.common.utils.NotificationsFactory; 052import dk.netarkivet.common.utils.Settings; 053import dk.netarkivet.common.utils.StringUtils; 054import dk.netarkivet.common.utils.batch.FileBatchJob; 055import dk.netarkivet.common.utils.cdx.ArchiveExtractCDXJob; 056import dk.netarkivet.common.utils.cdx.CDXRecord; 057import dk.netarkivet.harvester.HarvesterSettings; 058import dk.netarkivet.harvester.datamodel.HarvestDefinitionInfo; 059import dk.netarkivet.harvester.datamodel.HeritrixTemplate; 060import dk.netarkivet.harvester.datamodel.Job; 061import dk.netarkivet.harvester.harvesting.metadata.MetadataEntry; 062import dk.netarkivet.harvester.harvesting.metadata.MetadataFile; 063import dk.netarkivet.harvester.harvesting.report.DomainStatsReport; 064import dk.netarkivet.harvester.harvesting.report.HarvestReport; 065import dk.netarkivet.harvester.harvesting.report.HarvestReportFactory; 066import dk.netarkivet.harvester.harvesting.report.HarvestReportGenerator; 067 068/** 069 * This class handles all the things in a single harvest that are not related directly related either to launching 070 * Heritrix or to handling JMS messages. 071 */ 072public class HarvestController { 073 074 /** The instance logger. */ 075 private static final Logger log = LoggerFactory.getLogger(HarvestController.class); 076 077 /** The singleton instance of this class. Calling cleanup() on the instance will null this field. */ 078 private static HarvestController instance; 079 080 /** The max time to wait for heritrix to close last ARC or WARC files (in secs). */ 081 private static final int WAIT_FOR_HERITRIX_TIMEOUT_SECS = 5; 082 083 /** The ArcRepositoryClient used to communicate with the ArcRepository to store the generated arc-files. */ 084 private HarvesterArcRepositoryClient arcRepController; 085 086 /** 087 * Private constructor controlled by getInstance(). 088 */ 089 private HarvestController() { 090 arcRepController = ArcRepositoryClientFactory.getHarvesterInstance(); 091 } 092 093 /** 094 * Get the instance of the singleton HarvestController. 095 * 096 * @return The singleton instance. 097 */ 098 public static synchronized HarvestController getInstance() { 099 if (instance == null) { 100 instance = new HarvestController(); 101 } 102 return instance; 103 } 104 105 /** 106 * Clean up this singleton, releasing the ArcRepositoryClient and removing the instance. This instance should not be 107 * used after this method has been called. After this has been called, new calls to getInstance will return a new 108 * instance. 109 */ 110 public void cleanup() { 111 if (arcRepController != null) { 112 arcRepController.close(); 113 } 114 resetInstance(); 115 } 116 117 /** 118 * Reset the singleton instance. 119 */ 120 private static void resetInstance() { 121 instance = null; 122 } 123 124 /** 125 * Writes the files involved with a harvests. Creates the Heritrix arcs directory to ensure that this directory 126 * exists in advance. 127 * 128 * @param crawldir The directory that the crawl should take place in. 129 * @param job The Job object containing various harvest setup data. 130 * @param hdi The object encapsulating documentary information about the harvest. 131 * @param metadataEntries Any metadata entries sent along with the job that should be stored for later use. 132 * @return An object encapsulating where these files have been written. 133 */ 134 public HeritrixFiles writeHarvestFiles(File crawldir, Job job, HarvestDefinitionInfo hdi, 135 List<MetadataEntry> metadataEntries) { 136 // FIXME this hardwires the HeritrixFiles to H1 137 final HeritrixFiles files = HeritrixFiles.getH1HeritrixFilesWithDefaultJmxFiles(crawldir, job); 138 139 // If this job is a job that tries to continue a previous job 140 // using the Heritrix recover.gz log, and this feature is enabled, 141 // then try to fetch the recover.log from the metadata-arc-file. 142 if (job.getContinuationOf() != null && Settings.getBoolean(HarvesterSettings.RECOVERlOG_CONTINUATION_ENABLED)) { 143 tryToRetrieveRecoverLog(job, files); 144 } 145 146 // Create harvestInfo file in crawldir 147 // & create preharvest-metadata-1.arc 148 log.debug("Writing persistent job data for job {}", job.getJobID()); 149 // Check that harvestInfo does not yet exist 150 151 // Write job data to persistent storage (harvestinfo file) 152 new PersistentJobData(files.getCrawlDir()).write(job, hdi); 153 // Create jobId-preharvest-metadata-1.arc for this job 154 writePreharvestMetadata(job, metadataEntries, crawldir); 155 156 files.writeSeedsTxt(job.getSeedListAsString()); 157 158 files.writeOrderXml(job.getOrderXMLdoc()); 159 // Only retrieve index if deduplication is not disabled in the template. 160 if (job.getOrderXMLdoc().IsDeduplicationEnabled()) { 161 log.debug("Deduplication enabled. Fetching deduplication index.."); 162 files.setIndexDir(fetchDeduplicateIndex(metadataEntries)); 163 } else { 164 log.debug("Deduplication disabled."); 165 } 166 167 // Create Heritrix arcs directory before starting Heritrix to ensure 168 // the arcs directory exists in advance. 169 boolean created = files.getArcsDir().mkdir(); 170 if (!created) { 171 log.warn("Unable to create arcsdir: {}", files.getArcsDir()); 172 } 173 // Create Heritrix warcs directory before starting Heritrix to ensure 174 // the warcs directory exists in advance. 175 created = files.getWarcsDir().mkdir(); 176 if (!created) { 177 log.warn("Unable to create warcsdir: {}", files.getWarcsDir()); 178 } 179 180 return files; 181 } 182 183 /** 184 * This method attempts to retrieve the Heritrix recover log from the job which this job tries to continue. If 185 * successful, the Heritrix template is updated accordingly. 186 * 187 * @param job The harvest Job object containing various harvest setup data. 188 * @param files Heritrix files related to this harvestjob. 189 */ 190 private void tryToRetrieveRecoverLog(Job job, HeritrixFiles files) { 191 Long previousJob = job.getContinuationOf(); 192 List<CDXRecord> metaCDXes = null; 193 try { 194 metaCDXes = getMetadataCDXRecordsForJob(previousJob); 195 } catch (IOFailure e) { 196 log.debug("Failed to retrive CDX of metatadata records. " 197 + "Maybe the metadata arcfile for job {} does not exist in repository", previousJob, e); 198 } 199 200 CDXRecord recoverlogCDX = null; 201 if (metaCDXes != null) { 202 for (CDXRecord cdx : metaCDXes) { 203 if (cdx.getURL().matches(MetadataFile.RECOVER_LOG_PATTERN)) { 204 recoverlogCDX = cdx; 205 } 206 } 207 if (recoverlogCDX == null) { 208 log.debug("A recover.gz log file was not found in metadata-arcfile"); 209 } else { 210 log.debug("recover.gz log found in metadata-arcfile"); 211 } 212 } 213 214 BitarchiveRecord br = null; 215 if (recoverlogCDX != null) { // Retrieve recover.gz from metadata.arc file 216 br = ArcRepositoryClientFactory.getViewerInstance().get(recoverlogCDX.getArcfile(), 217 recoverlogCDX.getOffset()); 218 if (br != null) { 219 log.debug("recover.gz log retrieved from metadata-arcfile"); 220 if (files.writeRecoverBackupfile(br.getData())) { 221 // modify order.xml, so Heritrix recover-path points 222 // to the retrieved recoverlog 223 insertHeritrixRecoverPathInOrderXML(job, files); 224 } else { 225 log.warn("Failed to retrieve and write recoverlog to disk."); 226 } 227 } else { 228 log.debug("recover.gz log not retrieved from metadata-arcfile"); 229 } 230 } 231 } 232 233 /** 234 * Insert the correct recoverpath in the order.xml for the given harvestjob. 235 * 236 * @param job A harvestjob 237 * @param files Heritrix files related to this harvestjob. 238 */ 239 private void insertHeritrixRecoverPathInOrderXML(Job job, HeritrixFiles files) { 240 241 HeritrixTemplate temp = job.getOrderXMLdoc(); 242 temp.setRecoverlogNode(files.getRecoverBackupGzFile()); 243 job.setOrderXMLDoc(temp); // Update template associated with job 244 } 245 246 /** 247 * Writes pre-harvest metadata to the "metadata" directory. 248 * 249 * @param harvestJob a given Job. 250 * @param metadata the list of metadata entries to write to metadata file. 251 * @param crawlDir the directory, where the metadata will be written. 252 * @throws IOFailure If there are errors in writing the metadata. 253 */ 254 private void writePreharvestMetadata(Job harvestJob, List<MetadataEntry> metadata, File crawlDir) throws IOFailure { 255 if (metadata.size() == 0) { 256 // Do not generate preharvest metadata file for empty list 257 return; 258 } 259 260 // make sure that metadata directory exists 261 File metadataDir = new File(crawlDir, IngestableFiles.METADATA_SUB_DIR); 262 metadataDir.mkdir(); 263 if (!(metadataDir.exists() && metadataDir.isDirectory())) { 264 throw new IOFailure("Unable to write preharvest metadata for job '" + harvestJob.getJobID() 265 + "' to directory '" + metadataDir.getAbsolutePath() + "', as directory does not exist."); 266 } 267 268 // Serializing the MetadataEntry objects to the metadataDir 269 MetadataEntry.storeMetadataToDisk(metadata, metadataDir); 270 } 271 272 /** 273 * Creates the actual HeritrixLauncher instance and runs it, after the various setup files have been written. 274 * 275 * @param files Description of files involved in running Heritrix. Not Null. 276 * @throws ArgumentNotValid if an argument isn't valid. 277 */ 278 public void runHarvest(HeritrixFiles files) throws ArgumentNotValid { 279 ArgumentNotValid.checkNotNull(files, "HeritrixFiles files"); 280 HeritrixLauncher hl = HeritrixLauncherFactory.getInstance(files); 281 hl.doCrawl(); 282 } 283 284 /** 285 * Controls storing all files involved in a job. The files are 1) The actual ARC/WARC files, 2) The metadata files 286 * The crawl.log is parsed and information for each domain is generated and stored in a AbstractHarvestReport object 287 * which is sent along in the crawlstatusmessage. 288 * <p> 289 * Additionally, any leftover open ARC files are closed and harvest documentation is extracted before upload starts. 290 * 291 * @param files The HeritrixFiles object for this crawl. Not Null. 292 * @param errorMessage A place where error messages accumulate. Not Null. 293 * @param failedFiles List of files that failed to upload. Not Null. 294 * @return An object containing info about the domains harvested. 295 * @throws ArgumentNotValid if an argument isn't valid. 296 */ 297 public HarvestReport storeFiles(HeritrixFiles files, StringBuilder errorMessage, List<File> failedFiles) 298 throws ArgumentNotValid { 299 ArgumentNotValid.checkNotNull(files, "HeritrixFiles files"); 300 ArgumentNotValid.checkNotNull(errorMessage, "StringBuilder errorMessage"); 301 ArgumentNotValid.checkNotNull(failedFiles, "List<File> failedFiles"); 302 long jobID = files.getJobID(); 303 try { 304 IngestableFiles inf = new IngestableFiles(files); 305 306 inf.closeOpenFiles(WAIT_FOR_HERITRIX_TIMEOUT_SECS); 307 // Create a metadata ARC file 308 HarvestDocumentation.documentHarvest(inf); 309 // Upload all files 310 311 // Check, if arcsdir or warcsdir is empty 312 // Send a notification, if this is the case 313 if (inf.getArcFiles().isEmpty() && inf.getWarcFiles().isEmpty()) { 314 String errMsg = "Probable error in Heritrix job setup. " 315 + "No arcfiles or warcfiles generated by Heritrix for job " + jobID; 316 log.warn(errMsg); 317 NotificationsFactory.getInstance().notify(errMsg, NotificationType.WARNING); 318 } else { 319 if (!inf.getArcFiles().isEmpty()) { 320 uploadFiles(inf.getArcFiles(), errorMessage, failedFiles); 321 } 322 if (!inf.getWarcFiles().isEmpty()) { 323 uploadFiles(inf.getWarcFiles(), errorMessage, failedFiles); 324 } 325 } 326 327 // Now the ARC/WARC files have been uploaded, 328 // we finally upload the metadata archive file. 329 uploadFiles(inf.getMetadataArcFiles(), errorMessage, failedFiles); 330 331 // Make the harvestReport ready for uploading 332 HarvestReportGenerator hrg = new HarvestReportGenerator(files); 333 DomainStatsReport dsr = new DomainStatsReport(hrg.getDomainStatsMap(), hrg.getDefaultStopReason()); 334 return HarvestReportFactory.generateHarvestReport(dsr); 335 336 } catch (IOFailure e) { 337 String errMsg = "IOFailure occurred, while trying to upload files"; 338 log.warn(errMsg, e); 339 throw new IOFailure(errMsg, e); 340 } 341 } 342 343 /** 344 * Upload given files to the archive repository. 345 * 346 * @param files List of (ARC/WARC) files to upload. 347 * @param errorMessage Accumulator for error messages. 348 * @param failedFiles Accumulator for failed files. 349 */ 350 private void uploadFiles(List<File> files, StringBuilder errorMessage, List<File> failedFiles) { 351 // Upload all archive files 352 if (files != null) { 353 for (File f : files) { 354 try { 355 log.info("Uploading file '{}' to arcrepository.", f.getName()); 356 arcRepController.store(f); 357 log.info("File '{}' uploaded successfully to arcrepository.", f.getName()); 358 } catch (Exception e) { 359 File oldJobsDir = new File(Settings.get(HarvesterSettings.HARVEST_CONTROLLER_OLDJOBSDIR)); 360 String errorMsg = "Error uploading arcfile '" + f.getAbsolutePath() + "' Will be moved to '" 361 + oldJobsDir.getAbsolutePath() + "'"; 362 errorMessage.append(errorMsg).append("\n").append(e.toString()).append("\n"); 363 log.warn(errorMsg, e); 364 failedFiles.add(f); 365 } 366 } 367 } 368 } 369 370 /** 371 * Retrieve the list of jobs for deduplicate reduction. 372 * <p> 373 * Runs through all metadata entries, finding duplicate reduction entries, and parsing all jobIDs in them, warning 374 * only on errors. 375 * 376 * @param metadataEntries list of metadataEntries. 377 * @return the list of jobs for deduplicate reduction. 378 */ 379 private List<Long> parseJobIDsForDuplicateReduction(List<MetadataEntry> metadataEntries) { 380 // find metadataEntry for duplicatereduction if any. 381 List<Long> result = new ArrayList<Long>(); 382 for (MetadataEntry me : metadataEntries) { 383 if (me.isDuplicateReductionMetadataEntry()) { 384 String s = new String(me.getData()); 385 if (s.isEmpty()) { // An empty string is now possible 386 continue; 387 } 388 String[] longs = s.split(","); 389 for (String stringLong : longs) { 390 try { 391 result.add(Long.parseLong(stringLong)); 392 } catch (NumberFormatException e) { 393 log.warn("Unable to convert String '{}' in duplicate reduction jobid list metadataEntry '{}'" 394 + " to a jobID. Ignoring.", stringLong, s, e); 395 } 396 } 397 } 398 } 399 return result; 400 } 401 402 /** 403 * Get an index for deduplication. This will make a call to the index server, requesting an index for the given IDs. 404 * The files will then be cached locally. 405 * <p> 406 * If we request index for IDs that don't exist/have problems, we get a smaller set of IDs in our cache files, and 407 * next time we ask for the same index, we will call the index server again. This will be handled well, though, 408 * because if the ids are still missing, we will get a reply telling us to use the cached smaller index anyway. 409 * 410 * @param metadataEntries list of metadataEntries top get jobIDs from. 411 * @return a directory containing the index itself. 412 * @throws IOFailure on errors retrieving the index from the client. FIXME Better forgiving handling of no index 413 * available Add setting for disable deduplication if no index available 414 */ 415 private File fetchDeduplicateIndex(List<MetadataEntry> metadataEntries) { 416 // Get list of jobs, which should be used for duplicate reduction 417 // and retrieve a luceneIndex from the IndexServer 418 // based on the crawl.logs from these jobs and their CDX'es. 419 Set<Long> jobIDsForDuplicateReduction = new HashSet<Long>(parseJobIDsForDuplicateReduction(metadataEntries)); 420 421 // The client for requesting job index. 422 JobIndexCache jobIndexCache = IndexClientFactory.getDedupCrawllogInstance(); 423 424 // Request the index and return the index file. 425 Index<Set<Long>> jobIndex = jobIndexCache.getIndex(jobIDsForDuplicateReduction); 426 // Check which jobs didn't become part of the index. 427 Set<Long> diffSet = new HashSet<Long>(jobIDsForDuplicateReduction); 428 diffSet.removeAll(jobIndex.getIndexSet()); 429 if (log.isDebugEnabled()) { 430 log.debug("Received deduplication index containing {} jobs. {}", jobIndex.getIndexSet().size(), 431 ((diffSet.size() > 0) ? "Missing jobs: " + StringUtils.conjoin(",", diffSet) : "")); 432 } 433 434 return jobIndex.getIndexFile(); 435 } 436 437 /** 438 * Submit a batch job to generate cdx for all metadata files for a job, and report result in a list. 439 * 440 * @param jobid The job to get cdx for. 441 * @return A list of cdx records. 442 * @throws ArgumentNotValid If jobid is 0 or negative. 443 * @throws IOFailure On trouble generating the cdx 444 */ 445 public static List<CDXRecord> getMetadataCDXRecordsForJob(long jobid) { 446 ArgumentNotValid.checkPositive(jobid, "jobid"); 447 FileBatchJob cdxJob = new ArchiveExtractCDXJob(false); 448 cdxJob.processOnlyFilesMatching(jobid + "-metadata-[0-9]+\\.(w)?arc(\\.gz)?"); 449 File f; 450 try { 451 f = File.createTempFile(jobid + "-reports", ".cdx", FileUtils.getTempDir()); 452 } catch (IOException e) { 453 throw new IOFailure("Could not create temporary file", e); 454 } 455 BatchStatus status = ArcRepositoryClientFactory.getViewerInstance().batch(cdxJob, 456 Settings.get(CommonSettings.USE_REPLICA_ID)); 457 status.getResultFile().copyTo(f); 458 List<CDXRecord> records; 459 BufferedReader reader = null; 460 try { 461 reader = new BufferedReader(new FileReader(f)); 462 records = new ArrayList<CDXRecord>(); 463 for (String line = reader.readLine(); line != null; line = reader.readLine()) { 464 String[] parts = line.split("\\s+"); 465 CDXRecord record = new CDXRecord(parts); 466 records.add(record); 467 } 468 } catch (IOException e) { 469 throw new IOFailure("Unable to read results from file '" + f + "'", e); 470 } finally { 471 IOUtils.closeQuietly(reader); 472 FileUtils.remove(f); 473 } 474 return records; 475 } 476 477}