001/* 002 * #%L 003 * Netarchivesuite - harvester 004 * %% 005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023package dk.netarkivet.harvester.indexserver; 024 025import java.io.File; 026import java.io.IOException; 027import java.nio.file.Files; 028import java.nio.file.Paths; 029import java.util.Hashtable; 030import java.util.List; 031import java.util.regex.Matcher; 032import java.util.regex.Pattern; 033 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.math3.util.Pair; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import dk.netarkivet.common.CommonSettings; 042import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory; 043import dk.netarkivet.common.distribute.arcrepository.BatchStatus; 044import dk.netarkivet.common.distribute.arcrepository.Replica; 045import dk.netarkivet.common.distribute.arcrepository.ReplicaType; 046import dk.netarkivet.common.distribute.arcrepository.ViewerArcRepositoryClient; 047import dk.netarkivet.common.exceptions.ArgumentNotValid; 048import dk.netarkivet.common.exceptions.IOFailure; 049import dk.netarkivet.common.utils.FileUtils; 050import dk.netarkivet.common.utils.Settings; 051import dk.netarkivet.common.utils.archive.ArchiveBatchJob; 052import dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob; 053import dk.netarkivet.common.utils.hadoop.GetMetadataMapper; 054import dk.netarkivet.common.utils.hadoop.HadoopJob; 055import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy; 056import dk.netarkivet.common.utils.hadoop.HadoopJobUtils; 057import dk.netarkivet.common.utils.hadoop.MetadataExtractionStrategy; 058import dk.netarkivet.harvester.HarvesterSettings; 059import dk.netarkivet.harvester.harvesting.metadata.MetadataFile; 060 061/** 062 * This is an implementation of the RawDataCache specialized for data out of metadata files. It uses regular expressions 063 * for matching URL and mime-type of ARC entries for the kind of metadata we want. 064 */ 065public class RawMetadataCache extends FileBasedCache<Long> implements RawDataCache { 066 067 /** The logger for this class. */ 068 private static final Logger log = LoggerFactory.getLogger(RawMetadataCache.class); 069 070 /** A regular expression object that matches everything. */ 071 public static final Pattern MATCH_ALL_PATTERN = Pattern.compile(".*"); 072 /** The prefix (cache name) that this cache uses. */ 073 private final String prefix; 074 /** 075 * The arc repository interface. This does not need to be closed, it is a singleton. 076 */ 077 private ViewerArcRepositoryClient arcrep = ArcRepositoryClientFactory.getViewerInstance(); 078 079 /** The actual pattern to be used for matching the url in the metadata record */ 080 private Pattern urlPattern; 081 082 /** The actual pattern to be used for matching the mimetype in the metadata record */ 083 private Pattern mimePattern; 084 085 /** Try to migrate jobs with a duplicationmigration record. */ 086 private boolean tryToMigrateDuplicationRecords; 087 /** 088 * Create a new RawMetadataCache. For a given job ID, this will fetch and cache selected content from metadata files 089 * (<ID>-metadata-[0-9]+.arc). Any entry in a metadata file that matches both patterns will be returned. The 090 * returned data does not directly indicate which file they were from, though parts intrinsic to the particular 091 * format might. 092 * 093 * @param prefix A prefix that will be used to distinguish this cache's files from other caches'. It will be used 094 * for creating a directory, so it must not contain characters not legal in directory names. 095 * @param urlMatcher A pattern for matching URLs of the desired entries. If null, a .* pattern will be used. 096 * @param mimeMatcher A pattern for matching mime-types of the desired entries. If null, a .* pattern will be used. 097 */ 098 public RawMetadataCache(String prefix, Pattern urlMatcher, Pattern mimeMatcher) { 099 super(prefix); 100 this.prefix = prefix; 101 Pattern urlMatcher1; 102 if (urlMatcher != null) { 103 urlMatcher1 = urlMatcher; 104 } else { 105 urlMatcher1 = MATCH_ALL_PATTERN; 106 } 107 urlPattern = urlMatcher1; 108 Pattern mimeMatcher1; 109 if (mimeMatcher != null) { 110 mimeMatcher1 = mimeMatcher; 111 } else { 112 mimeMatcher1 = MATCH_ALL_PATTERN; 113 } 114 mimePattern = mimeMatcher1; 115 // Should we try to migrate duplicaterecords, yes or no. 116 tryToMigrateDuplicationRecords = Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_TRY_TO_MIGRATE_DUPLICATION_RECORDS); 117 log.info("Metadata cache for '{}' is fetching metadata with urls matching '{}' and mimetype matching '{}'. Migration of duplicate records is " 118 + (tryToMigrateDuplicationRecords? "enabled":"disabled"), 119 prefix, urlMatcher1.toString(), mimeMatcher1); 120 } 121 122 /** 123 * Get the file potentially containing (cached) data for a single job. 124 * 125 * @param id The job to find data for. 126 * @return The file where cache data for the job can be stored. 127 * @see FileBasedCache#getCacheFile(Object) 128 */ 129 @Override 130 public File getCacheFile(Long id) { 131 ArgumentNotValid.checkNotNull(id, "job ID"); 132 ArgumentNotValid.checkNotNegative(id, "job ID"); 133 return new File(getCacheDir(), prefix + "-" + id + "-cache"); 134 } 135 136 /** 137 * Actually cache data for the given ID. 138 * 139 * @param id A job ID to cache data for. 140 * @return A File containing the data. This file will be the same as getCacheFile(ID); 141 * @see FileBasedCache#cacheData(Object) 142 */ 143 protected Long cacheData(Long id) { 144 if (Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) { 145 return cacheDataHadoop(id); 146 } else { 147 return cacheDataBatch(id); 148 } 149 } 150 151 /** 152 * Cache data for the given ID using Hadoop. 153 * 154 * @param id A job ID to cache data for. 155 * @return A File containing the data. This file will be the same as getCacheFile(ID); 156 * @see FileBasedCache#cacheData(Object) 157 */ 158 private Long cacheDataHadoop(Long id) { 159 final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX); 160 final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix; 161 Configuration conf = HadoopJobUtils.getConf(); 162 conf.setPattern(GetMetadataMapper.URL_PATTERN, urlPattern); 163 conf.setPattern(GetMetadataMapper.MIME_PATTERN, mimePattern); 164 165 try (FileSystem fileSystem = FileSystem.newInstance(conf)) { 166 HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem); 167 HadoopJob job = new HadoopJob(id, jobStrategy); 168 job.processOnlyFilesMatching(specifiedPattern); 169 job.prepareJobInputOutput(fileSystem); 170 job.run(); 171 // If no error is thrown, job was success 172 173 List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); 174 if (metadataLines.size() > 0) { 175 File cacheFileName = getCacheFile(id); 176 if (tryToMigrateDuplicationRecords) { 177 migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName); 178 } else { 179 copyResults(id, metadataLines, cacheFileName); 180 } 181 log.debug("Cached data for job '{}' for '{}'", id, prefix); 182 return id; 183 } else { 184 log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix); 185 } 186 } catch (IOException e) { 187 log.error("Error instantiating Hadoop filesystem for job {}.", id, e); 188 } 189 return null; 190 } 191 192 /** 193 * If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in 194 * the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset 195 * pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index 196 * cache (whether migrated or not) into the cache file whose name is generated from the id. 197 * @param id the id of the cache 198 * @param fileSystem the filesystem on which the operations are carried out 199 * @param specifiedPattern the pattern specifying the files to be found 200 * @param originalJobResults the original hadoop job results which is a list containing the unmigrated data. 201 * @param cacheFileName the cache file for the job which the index cache is copied to. 202 */ 203 private void migrateDuplicatesHadoop(Long id, FileSystem fileSystem, String specifiedPattern, List<String> originalJobResults, File cacheFileName) { 204 log.debug("Looking for a duplicationmigration record for id {}", id); 205 if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) { 206 Configuration conf = fileSystem.getConf(); 207 conf.setPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*duplicationmigration.*")); 208 conf.setPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile("text/plain")); 209 HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem); 210 HadoopJob job = new HadoopJob(id, jobStrategy); 211 job.processOnlyFilesMatching(specifiedPattern); 212 job.prepareJobInputOutput(fileSystem); 213 job.run(); 214 215 try { 216 List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); 217 handleMigrationHadoop(id, metadataLines, originalJobResults, cacheFileName); 218 } catch (IOException e) { 219 log.error("Failed getting duplicationmigration lines output from Hadoop job with ID: {}", id); 220 } 221 } else { 222 copyResults(id, originalJobResults, cacheFileName); 223 } 224 } 225 226 /** 227 * Helper method for {@link #migrateDuplicatesHadoop}. 228 * Does the actual handling of migration after the job has finished successfully. 229 * @param id The id of the cache. 230 * @param metadataLines The resulting lines from the duplication-migration job. 231 * @param originalJobResults The original hadoop job results which is a list containing the unmigrated data. 232 * @param cacheFileName The cache file for the job which the index cache is copied to. 233 */ 234 private void handleMigrationHadoop(Long id, List<String> metadataLines, List<String> originalJobResults, 235 File cacheFileName) { 236 File migration = null; 237 try { 238 migration = File.createTempFile("migration", "txt"); 239 } catch (IOException e) { 240 throw new IOFailure("Could not create temporary output file."); 241 } 242 if (metadataLines.size() > 0) { 243 copyResults(id, metadataLines, migration); 244 } 245 boolean doMigration = migration.exists() && migration.length() > 0; 246 if (doMigration) { 247 log.info("Found a nonempty duplicationmigration record. Now we do the migration for job {}", id); 248 Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration); 249 250 File crawllog = createTempOutputFile(); 251 copyResults(id, originalJobResults, crawllog); 252 migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup); 253 } else { 254 copyResults(id, originalJobResults, cacheFileName); 255 } 256 } 257 258 /** 259 * Helper method for creating a temp file to copy job results to. 260 * @return A new temp file to copy output to. 261 */ 262 private File createTempOutputFile() { 263 File crawllog = null; 264 try { 265 crawllog = File.createTempFile("dedup", "txt"); 266 } catch (IOException e) { 267 throw new IOFailure("Could not create temporary output file."); 268 } 269 return crawllog; 270 } 271 272 /** 273 * Helper method for Hadoop methods. 274 * Copies the results of a job to a file. 275 * @param id The ID of the current job. 276 * @param jobResults The resulting lines output from a job. 277 * @param file The file to copy the results to. 278 */ 279 private void copyResults(Long id, List<String> jobResults, File file) { 280 try { 281 Files.write(Paths.get(file.getAbsolutePath()), jobResults); 282 } catch (IOException e) { 283 throw new IOFailure("Failed writing results of job with ID '" + id + "' to file " + file.getAbsolutePath()); 284 } 285 } 286 287 /** 288 * Actually cache data for the given ID. 289 * 290 * @param id A job ID to cache data for. 291 * @return A File containing the data. This file will be the same as getCacheFile(ID); 292 * @see FileBasedCache#cacheData(Object) 293 */ 294 private Long cacheDataBatch(Long id) { 295 final String replicaUsed = Settings.get(CommonSettings.USE_REPLICA_ID); 296 final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX); 297 // Same pattern here as defined in class dk.netarkivet.viewerproxy.webinterface.Reporting 298 final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix; 299 300 final ArchiveBatchJob job = new GetMetadataArchiveBatchJob(urlPattern, mimePattern); 301 log.debug("Extract using a batchjob of type '{}' cachedata from files matching '{}' on replica '{}'. Url pattern is '{}' and mimepattern is '{}'", job 302 .getClass().getName(), specifiedPattern, replicaUsed, urlPattern, mimePattern); 303 304 job.processOnlyFilesMatching(specifiedPattern); 305 BatchStatus b = arcrep.batch(job, replicaUsed); 306 // This check ensures that we got data from at least one file. 307 // Mind you, the data may be empty, but at least one file was 308 // successfully processed. 309 if (b.hasResultFile() && b.getNoOfFilesProcessed() > b.getFilesFailed().size()) { 310 File cacheFileName = getCacheFile(id); 311 if (tryToMigrateDuplicationRecords) { 312 migrateDuplicatesBatch(id, replicaUsed, specifiedPattern, b, cacheFileName); 313 } else { 314 b.copyResults(cacheFileName); 315 } 316 log.debug("Cached data for job '{}' for '{}'", id, prefix); 317 return id; 318 } else { 319 // Look for data in other bitarchive replicas, if this option is enabled 320 if (!Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_LOOKFORDATAINOTHERBITARCHIVEREPLICAS)) { 321 log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. ", id, prefix, replicaUsed); 322 } else { 323 log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. Trying other replicas.", id, 324 prefix, replicaUsed); 325 for (Replica rep : Replica.getKnown()) { 326 // Only use different bitarchive replicas than replicaUsed 327 if (rep.getType().equals(ReplicaType.BITARCHIVE) && !rep.getId().equals(replicaUsed)) { 328 log.debug("Trying to retrieve index data for job '{}' from '{}'.", id, rep.getId()); 329 b = arcrep.batch(job, rep.getId()); 330 // Perform same check as for the batchresults from 331 // the default replica. 332 if (b.hasResultFile() && (b.getNoOfFilesProcessed() > b.getFilesFailed().size())) { 333 File cacheFileName = getCacheFile(id); 334 if (tryToMigrateDuplicationRecords) { 335 migrateDuplicatesBatch(id, rep.getId(), specifiedPattern, b, cacheFileName); 336 } else { 337 b.copyResults(cacheFileName); 338 } 339 log.debug("Cached data for job '{}' for '{}'", id, prefix); 340 return id; 341 } else { 342 log.trace("No data found for job '{}' for '{}' in bitarchive '{}'. ", id, prefix, rep); 343 } 344 } 345 } 346 log.info("No data found for job '{}' for '{}' in all bitarchive replicas", id, prefix); 347 } 348 return null; 349 } 350 } 351 352 /** 353 * If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in 354 * the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset 355 * pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index 356 * cache (whether migrated or not) into the cache file whose name is generated from the id. 357 * @param id the id of the cache 358 * @param replicaUsed which replica to look the file up in 359 * @param specifiedPattern the pattern specifying the files to be found 360 * @param originalBatchJob the original batch job which returned the unmigrated data. 361 */ 362 private void migrateDuplicatesBatch(Long id, String replicaUsed, String specifiedPattern, BatchStatus originalBatchJob, File cacheFileName) { 363 log.debug("Looking for a duplicationmigration record for id {}", id); 364 if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) { 365 GetMetadataArchiveBatchJob job2 = new GetMetadataArchiveBatchJob(Pattern.compile(".*duplicationmigration.*"), Pattern.compile("text/plain")); 366 job2.processOnlyFilesMatching(specifiedPattern); 367 BatchStatus b2 = arcrep.batch(job2, replicaUsed); 368 File migration = null; 369 try { 370 migration = File.createTempFile("migration", "txt"); 371 } catch (IOException e) { 372 throw new IOFailure("Could not create temporary output file."); 373 } 374 if (b2.hasResultFile()) { 375 b2.copyResults(migration); 376 } 377 boolean doMigration = migration.exists() && migration.length() > 0; 378 if (doMigration) { 379 log.info("Found a nonempty duplicationmigration record. Now we do the migration for job {}", id); 380 Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration); 381 382 File crawllog = createTempOutputFile(); 383 originalBatchJob.copyResults(crawllog); 384 migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup); 385 } else { 386 originalBatchJob.copyResults(cacheFileName); 387 } 388 } else { 389 originalBatchJob.copyResults(cacheFileName); 390 } 391 } 392 393 /** 394 * Helper method. 395 * Does the actual migration of filename/offset pairs from uncompressed to compressed (w)arc files. 396 * This method has the side effect of copying the index cache (whether migrated or not) into the cache file 397 * whose name is generated from the ID 398 * @param id The ID of the current job. 399 * @param cacheFileName The cache file for the job which the index cache is copied to. 400 * @param crawllog A temp file containing the resulting lines of the Hadoop job. 401 * @param lookup A lookup table to get the filename/offset pairs from. 402 */ 403 private void migrateFilenameOffsetPairs(Long id, File cacheFileName, File crawllog, Hashtable<Pair<String, Long>, Long> lookup) { 404 Pattern duplicatePattern = Pattern.compile(".*duplicate:\"([^,]+),([0-9]+).*"); 405 try { 406 int matches = 0; 407 int errors = 0; 408 for (String line : org.apache.commons.io.FileUtils.readLines(crawllog)) { 409 Matcher m = duplicatePattern.matcher(line); 410 if (m.matches()) { 411 matches++; 412 Long newOffset = lookup.get(new Pair<String, Long>(m.group(1), Long.parseLong(m.group(2)))); 413 if (newOffset == null) { 414 log.warn("Could not migrate duplicate in " + line); 415 FileUtils.appendToFile(cacheFileName, line); 416 errors++; 417 } else { 418 String newLine = line.substring(0, m.start(2)) + newOffset + line.substring(m.end(2)); 419 newLine = newLine.replace(m.group(1), m.group(1) + ".gz"); 420 FileUtils.appendToFile(cacheFileName, newLine); 421 } 422 } else { 423 FileUtils.appendToFile(cacheFileName, line); 424 } 425 } 426 log.info("Found and migrated {} duplicate lines for job {} with {} errors", matches, id, errors); 427 } catch (IOException e) { 428 throw new IOFailure("Could not read " + crawllog.getAbsolutePath()); 429 } finally { 430 crawllog.delete(); 431 } 432 } 433 434 /** 435 * Helper method. 436 * Generates a lookup table of filename/offset pairs from a file containing extracted metadata lines. 437 * @param id The ID for the current job. 438 * @param migrationFile The file containing the extracted metadata lines. 439 * @return A lookup table of filename/offset pairs. 440 */ 441 private Hashtable<Pair<String, Long>, Long> createLookupTableFromMigrationLines(Long id, File migrationFile) { 442 Hashtable<Pair<String, Long>, Long> lookup = new Hashtable<>(); 443 try { 444 final List<String> migrationLines = org.apache.commons.io.FileUtils.readLines(migrationFile); 445 log.info("{} migrationFile records found for job {}", migrationLines.size(), id); 446 for (String line : migrationLines) { 447 // duplicationmigration lines look like this: "FILENAME 496812 393343 1282069269000" 448 // But only the first 3 entries are used. 449 String[] splitLine = StringUtils.split(line); 450 if (splitLine.length >= 3) { 451 lookup.put(new Pair<String, Long>(splitLine[0], Long.parseLong(splitLine[1])), 452 Long.parseLong(splitLine[2])); 453 } else { 454 log.warn("Line '" + line + "' has a wrong format. Ignoring line"); 455 } 456 } 457 } catch (IOException e) { 458 throw new IOFailure("Could not read " + migrationFile.getAbsolutePath()); 459 } finally { 460 migrationFile.delete(); 461 } 462 return lookup; 463 } 464}