001package dk.netarkivet.viewerproxy.webinterface.hadoop;
002
003import java.io.ByteArrayOutputStream;
004import java.io.File;
005import java.io.IOException;
006import java.util.ArrayList;
007import java.util.Arrays;
008import java.util.List;
009import java.util.regex.Pattern;
010
011import org.apache.hadoop.conf.Configuration;
012import org.apache.hadoop.fs.FileSystem;
013import org.apache.hadoop.fs.LocalFileSystem;
014import org.apache.hadoop.fs.Path;
015import org.apache.hadoop.io.LongWritable;
016import org.apache.hadoop.io.NullWritable;
017import org.apache.hadoop.io.Text;
018import org.apache.hadoop.mapreduce.Mapper;
019import org.slf4j.Logger;
020import org.slf4j.LoggerFactory;
021
022import dk.netarkivet.common.utils.batch.FileBatchJob;
023import dk.netarkivet.viewerproxy.webinterface.CrawlLogLinesMatchingRegexp;
024
025/**
026 * Hadoop Mapper for extracting crawllog lines from metadata files.
027 * Expects the Configuration provided for the job to have a regex set, which is used to filter for relevant lines.
028 * If no regex is set an all-matching regex will be used.
029 */
030public class CrawlLogExtractionMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
031    private static final Logger log = LoggerFactory.getLogger(CrawlLogExtractionMapper.class);
032
033    /**
034     * Mapping method.
035     *
036     * @param linenumber The linenumber. Is ignored.
037     * @param archiveFilePath The path to the archive file.
038     * @param context Context used for writing output.
039     * @throws IOException If it fails to generate the CDX indexes.
040     */
041    @Override
042    protected void map(LongWritable linenumber, Text archiveFilePath, Context context) throws IOException,
043            InterruptedException {
044        // reject empty or null warc paths.
045        if (archiveFilePath == null || archiveFilePath.toString().trim().isEmpty()) {
046            log.warn("Encountered empty path in job {}", context.getJobID().toString());
047            return;
048        }
049        Path path = new Path(archiveFilePath.toString());
050        Configuration conf = context.getConfiguration();
051        List<String> crawlLogLines;
052        Pattern crawlLogRegex = conf.getPattern("regex", Pattern.compile(".*"));
053
054        log.info("Extracting crawl log lines matching regex: {}", crawlLogRegex);
055        final FileSystem fileSystem = path.getFileSystem(conf);
056        if (!(fileSystem instanceof LocalFileSystem)) {
057            final String status = "Crawl log extraction only implemented for LocalFileSystem. Cannot extract from " + path;
058            context.setStatus(status);
059            System.err.println(status);
060            crawlLogLines = new ArrayList<>();
061        } else {
062            LocalFileSystem localFileSystem = ((LocalFileSystem) fileSystem);
063            crawlLogLines = extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex.pattern());
064        }
065
066        for (String crawlLog : crawlLogLines) {
067            context.write(NullWritable.get(), new Text(crawlLog));
068        }
069    }
070
071    /**
072     * Extract the crawl logs from a file matching the provided regex
073     * @param file File to look for crawl logs in.
074     * @param regex The regex to match lines with.
075     * @return A list of crawl log lines extracted from the file.
076     */
077    private List<String> extractCrawlLogLines(File file, String regex) {
078        FileBatchJob batchJob = new CrawlLogLinesMatchingRegexp(regex);
079        ByteArrayOutputStream baos = new ByteArrayOutputStream();
080        batchJob.processFile(file, baos);
081        try {
082            baos.flush();
083        } catch (IOException e) {
084            log.warn("Error when trying to flush batch job output stream", e);
085        }
086        return Arrays.asList(baos.toString().split("\\n"));
087    }
088}