001package dk.netarkivet.viewerproxy.webinterface.hadoop; 002 003import java.io.ByteArrayOutputStream; 004import java.io.File; 005import java.io.IOException; 006import java.util.ArrayList; 007import java.util.Arrays; 008import java.util.List; 009import java.util.regex.Pattern; 010 011import org.apache.hadoop.conf.Configuration; 012import org.apache.hadoop.fs.FileSystem; 013import org.apache.hadoop.fs.LocalFileSystem; 014import org.apache.hadoop.fs.Path; 015import org.apache.hadoop.io.LongWritable; 016import org.apache.hadoop.io.NullWritable; 017import org.apache.hadoop.io.Text; 018import org.apache.hadoop.mapreduce.Mapper; 019import org.slf4j.Logger; 020import org.slf4j.LoggerFactory; 021 022import dk.netarkivet.common.utils.batch.FileBatchJob; 023import dk.netarkivet.viewerproxy.webinterface.CrawlLogLinesMatchingRegexp; 024 025/** 026 * Hadoop Mapper for extracting crawllog lines from metadata files. 027 * Expects the Configuration provided for the job to have a regex set, which is used to filter for relevant lines. 028 * If no regex is set an all-matching regex will be used. 029 */ 030public class CrawlLogExtractionMapper extends Mapper<LongWritable, Text, NullWritable, Text> { 031 private static final Logger log = LoggerFactory.getLogger(CrawlLogExtractionMapper.class); 032 033 /** 034 * Mapping method. 035 * 036 * @param linenumber The linenumber. Is ignored. 037 * @param archiveFilePath The path to the archive file. 038 * @param context Context used for writing output. 039 * @throws IOException If it fails to generate the CDX indexes. 040 */ 041 @Override 042 protected void map(LongWritable linenumber, Text archiveFilePath, Context context) throws IOException, 043 InterruptedException { 044 // reject empty or null warc paths. 045 if (archiveFilePath == null || archiveFilePath.toString().trim().isEmpty()) { 046 log.warn("Encountered empty path in job {}", context.getJobID().toString()); 047 return; 048 } 049 Path path = new Path(archiveFilePath.toString()); 050 Configuration conf = context.getConfiguration(); 051 List<String> crawlLogLines; 052 Pattern crawlLogRegex = conf.getPattern("regex", Pattern.compile(".*")); 053 054 log.info("Extracting crawl log lines matching regex: {}", crawlLogRegex); 055 final FileSystem fileSystem = path.getFileSystem(conf); 056 if (!(fileSystem instanceof LocalFileSystem)) { 057 final String status = "Crawl log extraction only implemented for LocalFileSystem. Cannot extract from " + path; 058 context.setStatus(status); 059 System.err.println(status); 060 crawlLogLines = new ArrayList<>(); 061 } else { 062 LocalFileSystem localFileSystem = ((LocalFileSystem) fileSystem); 063 crawlLogLines = extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex.pattern()); 064 } 065 066 for (String crawlLog : crawlLogLines) { 067 context.write(NullWritable.get(), new Text(crawlLog)); 068 } 069 } 070 071 /** 072 * Extract the crawl logs from a file matching the provided regex 073 * @param file File to look for crawl logs in. 074 * @param regex The regex to match lines with. 075 * @return A list of crawl log lines extracted from the file. 076 */ 077 private List<String> extractCrawlLogLines(File file, String regex) { 078 FileBatchJob batchJob = new CrawlLogLinesMatchingRegexp(regex); 079 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 080 batchJob.processFile(file, baos); 081 try { 082 baos.flush(); 083 } catch (IOException e) { 084 log.warn("Error when trying to flush batch job output stream", e); 085 } 086 return Arrays.asList(baos.toString().split("\\n")); 087 } 088}