001/*
002 * #%L
003 * Netarchivesuite - harvester
004 * %%
005 * Copyright (C) 2005 - 2018 The Royal Danish Library, 
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023package dk.netarkivet.harvester.indexserver;
024
025import java.io.File;
026import java.io.IOException;
027import java.nio.file.Files;
028import java.nio.file.Paths;
029import java.util.Hashtable;
030import java.util.List;
031import java.util.regex.Matcher;
032import java.util.regex.Pattern;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.math3.util.Pair;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import dk.netarkivet.common.CommonSettings;
042import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
043import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
044import dk.netarkivet.common.distribute.arcrepository.Replica;
045import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
046import dk.netarkivet.common.distribute.arcrepository.ViewerArcRepositoryClient;
047import dk.netarkivet.common.exceptions.ArgumentNotValid;
048import dk.netarkivet.common.exceptions.IOFailure;
049import dk.netarkivet.common.utils.FileUtils;
050import dk.netarkivet.common.utils.Settings;
051import dk.netarkivet.common.utils.archive.ArchiveBatchJob;
052import dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob;
053import dk.netarkivet.common.utils.hadoop.GetMetadataMapper;
054import dk.netarkivet.common.utils.hadoop.HadoopJob;
055import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy;
056import dk.netarkivet.common.utils.hadoop.HadoopJobUtils;
057import dk.netarkivet.common.utils.hadoop.MetadataExtractionStrategy;
058import dk.netarkivet.harvester.HarvesterSettings;
059import dk.netarkivet.harvester.harvesting.metadata.MetadataFile;
060
061/**
062 * This is an implementation of the RawDataCache specialized for data out of metadata files. It uses regular expressions
063 * for matching URL and mime-type of ARC entries for the kind of metadata we want.
064 */
065public class RawMetadataCache extends FileBasedCache<Long> implements RawDataCache {
066
067    /** The logger for this class. */
068    private static final Logger log = LoggerFactory.getLogger(RawMetadataCache.class);
069
070    /** A regular expression object that matches everything. */
071    public static final Pattern MATCH_ALL_PATTERN = Pattern.compile(".*");
072    /** The prefix (cache name) that this cache uses. */
073    private final String prefix;
074    /**
075     * The arc repository interface. This does not need to be closed, it is a singleton.
076     */
077    private ViewerArcRepositoryClient arcrep = ArcRepositoryClientFactory.getViewerInstance();
078
079    /** The actual pattern to be used for matching the url in the metadata record */
080    private Pattern urlPattern;
081 
082    /** The actual pattern to be used for matching the mimetype in the metadata record */ 
083    private Pattern mimePattern;
084 
085    /** Try to migrate jobs with a duplicationmigration record. */
086    private boolean tryToMigrateDuplicationRecords;
087    /**
088     * Create a new RawMetadataCache. For a given job ID, this will fetch and cache selected content from metadata files
089     * (&lt;ID&gt;-metadata-[0-9]+.arc). Any entry in a metadata file that matches both patterns will be returned. The
090     * returned data does not directly indicate which file they were from, though parts intrinsic to the particular
091     * format might.
092     *
093     * @param prefix A prefix that will be used to distinguish this cache's files from other caches'. It will be used
094     * for creating a directory, so it must not contain characters not legal in directory names.
095     * @param urlMatcher A pattern for matching URLs of the desired entries. If null, a .* pattern will be used.
096     * @param mimeMatcher A pattern for matching mime-types of the desired entries. If null, a .* pattern will be used.
097     */
098    public RawMetadataCache(String prefix, Pattern urlMatcher, Pattern mimeMatcher) {
099        super(prefix);
100        this.prefix = prefix;
101        Pattern urlMatcher1;
102        if (urlMatcher != null) {
103            urlMatcher1 = urlMatcher;
104        } else {
105            urlMatcher1 = MATCH_ALL_PATTERN;
106        }
107        urlPattern = urlMatcher1;
108        Pattern mimeMatcher1;
109        if (mimeMatcher != null) {
110            mimeMatcher1 = mimeMatcher;
111        } else {
112            mimeMatcher1 = MATCH_ALL_PATTERN;
113        }
114        mimePattern = mimeMatcher1;
115        // Should we try to migrate duplicaterecords, yes or no.
116        tryToMigrateDuplicationRecords = Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_TRY_TO_MIGRATE_DUPLICATION_RECORDS);
117        log.info("Metadata cache for '{}' is fetching metadata with urls matching '{}' and mimetype matching '{}'. Migration of duplicate records is " 
118                + (tryToMigrateDuplicationRecords? "enabled":"disabled"), 
119                prefix, urlMatcher1.toString(), mimeMatcher1);
120    }
121
122    /**
123     * Get the file potentially containing (cached) data for a single job.
124     *
125     * @param id The job to find data for.
126     * @return The file where cache data for the job can be stored.
127     * @see FileBasedCache#getCacheFile(Object)
128     */
129    @Override
130    public File getCacheFile(Long id) {
131        ArgumentNotValid.checkNotNull(id, "job ID");
132        ArgumentNotValid.checkNotNegative(id, "job ID");
133        return new File(getCacheDir(), prefix + "-" + id + "-cache");
134    }
135
136    /**
137     * Actually cache data for the given ID.
138     *
139     * @param id A job ID to cache data for.
140     * @return A File containing the data. This file will be the same as getCacheFile(ID);
141     * @see FileBasedCache#cacheData(Object)
142     */
143    protected Long cacheData(Long id) {
144        if (Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) {
145            return cacheDataHadoop(id);
146        } else {
147            return cacheDataBatch(id);
148        }
149    }
150
151    /**
152     * Cache data for the given ID using Hadoop.
153     *
154     * @param id A job ID to cache data for.
155     * @return A File containing the data. This file will be the same as getCacheFile(ID);
156     * @see FileBasedCache#cacheData(Object)
157     */
158    private Long cacheDataHadoop(Long id) {
159        final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX);
160        final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix;
161        Configuration conf = HadoopJobUtils.getConf();
162        conf.setPattern(GetMetadataMapper.URL_PATTERN, urlPattern);
163        conf.setPattern(GetMetadataMapper.MIME_PATTERN, mimePattern);
164
165        try (FileSystem fileSystem = FileSystem.newInstance(conf)) {
166            HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem);
167            HadoopJob job = new HadoopJob(id, jobStrategy);
168            job.processOnlyFilesMatching(specifiedPattern);
169            job.prepareJobInputOutput(fileSystem);
170            job.run();
171            // If no error is thrown, job was success
172
173            List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
174            if (metadataLines.size() > 0) {
175                File cacheFileName = getCacheFile(id);
176                if (tryToMigrateDuplicationRecords) {
177                    migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName);
178                } else {
179                    copyResults(id, metadataLines, cacheFileName);
180                }
181                log.debug("Cached data for job '{}' for '{}'", id, prefix);
182                return id;
183            } else {
184                log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix);
185            }
186        } catch (IOException e) {
187            log.error("Error instantiating Hadoop filesystem for job {}.", id, e);
188        }
189        return null;
190    }
191
192    /**
193     * If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in
194     * the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset
195     * pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index
196     * cache (whether migrated or not) into the cache file whose name is generated from the id.
197     * @param id the id of the cache
198     * @param fileSystem the filesystem on which the operations are carried out
199     * @param specifiedPattern the pattern specifying the files to be found
200     * @param originalJobResults the original hadoop job results which is a list containing the unmigrated data.
201     * @param cacheFileName the cache file for the job which the index cache is copied to.
202     */
203    private void migrateDuplicatesHadoop(Long id, FileSystem fileSystem, String specifiedPattern, List<String> originalJobResults, File cacheFileName) {
204        log.debug("Looking for a duplicationmigration record for id {}", id);
205        if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
206            Configuration conf = fileSystem.getConf();
207            conf.setPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*duplicationmigration.*"));
208            conf.setPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile("text/plain"));
209            HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem);
210            HadoopJob job = new HadoopJob(id, jobStrategy);
211            job.processOnlyFilesMatching(specifiedPattern);
212            job.prepareJobInputOutput(fileSystem);
213            job.run();
214
215            try {
216                List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
217                handleMigrationHadoop(id, metadataLines, originalJobResults, cacheFileName);
218            } catch (IOException e) {
219                log.error("Failed getting duplicationmigration lines output from Hadoop job with ID: {}", id);
220            }
221        } else {
222            copyResults(id, originalJobResults, cacheFileName);
223        }
224    }
225
226    /**
227     * Helper method for {@link #migrateDuplicatesHadoop}.
228     * Does the actual handling of migration after the job has finished successfully.
229     * @param id The id of the cache.
230     * @param metadataLines The resulting lines from the duplication-migration job.
231     * @param originalJobResults The original hadoop job results which is a list containing the unmigrated data.
232     * @param cacheFileName The cache file for the job which the index cache is copied to.
233     */
234    private void handleMigrationHadoop(Long id, List<String> metadataLines, List<String> originalJobResults,
235            File cacheFileName) {
236        File migration = null;
237        try {
238            migration = File.createTempFile("migration", "txt");
239        } catch (IOException e) {
240            throw new IOFailure("Could not create temporary output file.");
241        }
242        if (metadataLines.size() > 0) {
243            copyResults(id, metadataLines, migration);
244        }
245        boolean doMigration = migration.exists() && migration.length() > 0;
246        if (doMigration) {
247            log.info("Found a nonempty duplicationmigration record. Now we do the migration for job {}", id);
248            Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration);
249
250            File crawllog = createTempOutputFile();
251            copyResults(id, originalJobResults, crawllog);
252            migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup);
253        } else {
254            copyResults(id, originalJobResults, cacheFileName);
255        }
256    }
257
258    /**
259     * Helper method for creating a temp file to copy job results to.
260     * @return A new temp file to copy output to.
261     */
262    private File createTempOutputFile() {
263        File crawllog = null;
264        try {
265            crawllog = File.createTempFile("dedup", "txt");
266        } catch (IOException e) {
267            throw new IOFailure("Could not create temporary output file.");
268        }
269        return crawllog;
270    }
271
272    /**
273     * Helper method for Hadoop methods.
274     * Copies the results of a job to a file.
275     * @param id The ID of the current job.
276     * @param jobResults The resulting lines output from a job.
277     * @param file The file to copy the results to.
278     */
279    private void copyResults(Long id, List<String> jobResults, File file) {
280        try {
281            Files.write(Paths.get(file.getAbsolutePath()), jobResults);
282        } catch (IOException e) {
283            throw new IOFailure("Failed writing results of job with ID '" + id + "' to file " + file.getAbsolutePath());
284        }
285    }
286
287    /**
288     * Actually cache data for the given ID.
289     *
290     * @param id A job ID to cache data for.
291     * @return A File containing the data. This file will be the same as getCacheFile(ID);
292     * @see FileBasedCache#cacheData(Object)
293     */
294    private Long cacheDataBatch(Long id) {
295        final String replicaUsed = Settings.get(CommonSettings.USE_REPLICA_ID);
296        final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX);
297        // Same pattern here as defined in class dk.netarkivet.viewerproxy.webinterface.Reporting
298        final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix;
299
300        final ArchiveBatchJob job = new GetMetadataArchiveBatchJob(urlPattern, mimePattern);
301        log.debug("Extract using a batchjob of type '{}' cachedata from files matching '{}' on replica '{}'. Url pattern is '{}' and mimepattern is '{}'", job
302                .getClass().getName(), specifiedPattern, replicaUsed, urlPattern, mimePattern);
303
304        job.processOnlyFilesMatching(specifiedPattern);
305        BatchStatus b = arcrep.batch(job, replicaUsed);
306        // This check ensures that we got data from at least one file.
307        // Mind you, the data may be empty, but at least one file was
308        // successfully processed.
309        if (b.hasResultFile() && b.getNoOfFilesProcessed() > b.getFilesFailed().size()) {
310            File cacheFileName = getCacheFile(id);
311            if (tryToMigrateDuplicationRecords) {
312                migrateDuplicatesBatch(id, replicaUsed, specifiedPattern, b, cacheFileName);
313            } else {
314                b.copyResults(cacheFileName);
315            }
316            log.debug("Cached data for job '{}' for '{}'", id, prefix);
317            return id;
318        } else {
319            // Look for data in other bitarchive replicas, if this option is enabled
320            if (!Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_LOOKFORDATAINOTHERBITARCHIVEREPLICAS)) {
321                log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. ", id, prefix, replicaUsed);
322            } else {
323                log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. Trying other replicas.", id,
324                        prefix, replicaUsed);
325                for (Replica rep : Replica.getKnown()) {
326                    // Only use different bitarchive replicas than replicaUsed
327                    if (rep.getType().equals(ReplicaType.BITARCHIVE) && !rep.getId().equals(replicaUsed)) {
328                        log.debug("Trying to retrieve index data for job '{}' from '{}'.", id, rep.getId());
329                        b = arcrep.batch(job, rep.getId());
330                        // Perform same check as for the batchresults from
331                        // the default replica.
332                        if (b.hasResultFile() && (b.getNoOfFilesProcessed() > b.getFilesFailed().size())) {
333                            File cacheFileName = getCacheFile(id);
334                            if (tryToMigrateDuplicationRecords) {
335                                migrateDuplicatesBatch(id, rep.getId(), specifiedPattern, b, cacheFileName);
336                            } else {
337                                b.copyResults(cacheFileName);
338                            }
339                            log.debug("Cached data for job '{}' for '{}'", id, prefix);
340                            return id;
341                        } else {
342                            log.trace("No data found for job '{}' for '{}' in bitarchive '{}'. ", id, prefix, rep);
343                        }
344                    }
345                }
346                log.info("No data found for job '{}' for '{}' in all bitarchive replicas", id, prefix);
347            }
348            return null;
349        }
350    }
351
352    /**
353     * If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in
354     * the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset
355     * pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index
356     * cache (whether migrated or not) into the cache file whose name is generated from the id.
357     * @param id the id of the cache
358     * @param replicaUsed which replica to look the file up in
359     * @param specifiedPattern the pattern specifying the files to be found
360     * @param originalBatchJob the original batch job which returned the unmigrated data.
361     */
362    private void migrateDuplicatesBatch(Long id, String replicaUsed, String specifiedPattern, BatchStatus originalBatchJob, File cacheFileName) {
363        log.debug("Looking for a duplicationmigration record for id {}", id);
364        if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
365            GetMetadataArchiveBatchJob job2 = new GetMetadataArchiveBatchJob(Pattern.compile(".*duplicationmigration.*"), Pattern.compile("text/plain"));
366            job2.processOnlyFilesMatching(specifiedPattern);
367            BatchStatus b2 = arcrep.batch(job2, replicaUsed);
368            File migration = null;
369            try {
370                migration = File.createTempFile("migration", "txt");
371            } catch (IOException e) {
372                throw new IOFailure("Could not create temporary output file.");
373            }
374            if (b2.hasResultFile()) {
375                b2.copyResults(migration);
376            }
377            boolean doMigration = migration.exists() && migration.length() > 0;
378            if (doMigration) {
379                log.info("Found a nonempty duplicationmigration record. Now we do the migration for job {}", id);
380                Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration);
381
382                File crawllog = createTempOutputFile();
383                originalBatchJob.copyResults(crawllog);
384                migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup);
385            } else {
386                originalBatchJob.copyResults(cacheFileName);
387            }
388        } else {
389            originalBatchJob.copyResults(cacheFileName);
390        }
391    }
392
393    /**
394     * Helper method.
395     * Does the actual migration of filename/offset pairs from uncompressed to compressed (w)arc files.
396     * This method has the side effect of copying the index cache (whether migrated or not) into the cache file
397     * whose name is generated from the ID
398     * @param id The ID of the current job.
399     * @param cacheFileName The cache file for the job which the index cache is copied to.
400     * @param crawllog A temp file containing the resulting lines of the Hadoop job.
401     * @param lookup A lookup table to get the filename/offset pairs from.
402     */
403    private void migrateFilenameOffsetPairs(Long id, File cacheFileName, File crawllog, Hashtable<Pair<String, Long>, Long> lookup) {
404        Pattern duplicatePattern = Pattern.compile(".*duplicate:\"([^,]+),([0-9]+).*");
405        try {
406            int matches = 0;
407            int errors = 0;
408            for (String line : org.apache.commons.io.FileUtils.readLines(crawllog)) {
409                Matcher m = duplicatePattern.matcher(line);
410                if (m.matches()) {
411                    matches++;
412                    Long newOffset = lookup.get(new Pair<String, Long>(m.group(1), Long.parseLong(m.group(2))));
413                    if (newOffset == null) {
414                        log.warn("Could not migrate duplicate in " + line);
415                        FileUtils.appendToFile(cacheFileName, line);
416                        errors++;
417                    } else {
418                        String newLine = line.substring(0, m.start(2)) + newOffset + line.substring(m.end(2));
419                        newLine = newLine.replace(m.group(1), m.group(1) + ".gz");
420                        FileUtils.appendToFile(cacheFileName, newLine);
421                    }
422                } else {
423                    FileUtils.appendToFile(cacheFileName, line);
424                }
425            }
426            log.info("Found and migrated {} duplicate lines for job {} with {} errors", matches, id, errors);
427        } catch (IOException e) {
428            throw new IOFailure("Could not read " + crawllog.getAbsolutePath());
429        } finally {
430            crawllog.delete();
431        }
432    }
433
434    /**
435     * Helper method.
436     * Generates a lookup table of filename/offset pairs from a file containing extracted metadata lines.
437     * @param id The ID for the current job.
438     * @param migrationFile The file containing the extracted metadata lines.
439     * @return A lookup table of filename/offset pairs.
440     */
441    private Hashtable<Pair<String, Long>, Long> createLookupTableFromMigrationLines(Long id, File migrationFile) {
442        Hashtable<Pair<String, Long>, Long> lookup = new Hashtable<>();
443        try {
444            final List<String> migrationLines = org.apache.commons.io.FileUtils.readLines(migrationFile);
445            log.info("{} migrationFile records found for job {}", migrationLines.size(), id);
446            for (String line : migrationLines) {
447                // duplicationmigration lines look like this: "FILENAME 496812 393343 1282069269000"
448                // But only the first 3 entries are used.
449                String[] splitLine = StringUtils.split(line);
450                if (splitLine.length >= 3) {
451                    lookup.put(new Pair<String, Long>(splitLine[0], Long.parseLong(splitLine[1])),
452                         Long.parseLong(splitLine[2]));
453                } else {
454                   log.warn("Line '" + line + "' has a wrong format. Ignoring line");
455                }
456            }
457        } catch (IOException e) {
458            throw new IOFailure("Could not read " + migrationFile.getAbsolutePath());
459        } finally {
460            migrationFile.delete();
461        }
462        return lookup;
463    }
464}