001package dk.netarkivet.common.utils.hadoop;
002
003import java.io.IOException;
004
005import org.apache.hadoop.conf.Configuration;
006import org.apache.hadoop.conf.Configured;
007import org.apache.hadoop.fs.Path;
008import org.apache.hadoop.io.LongWritable;
009import org.apache.hadoop.io.NullWritable;
010import org.apache.hadoop.io.Text;
011import org.apache.hadoop.mapreduce.Job;
012import org.apache.hadoop.mapreduce.Mapper;
013import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
014import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
015import org.apache.hadoop.util.Tool;
016
017/**
018 * A simple generic Hadoop map-only tool that runs a given mapper on the passed input file
019 * containing new-line separated file paths and outputs the job's resulting files in the passed output path
020 */
021public class HadoopJobTool extends Configured implements Tool {
022    private Mapper<LongWritable, Text, NullWritable, Text> mapper;
023
024    public HadoopJobTool(Configuration conf, Mapper<LongWritable, Text, NullWritable, Text> mapper) {
025        super(conf);
026        this.mapper = mapper;
027    }
028
029    /**
030     * Method for running the tool/job.
031     * @param args Expects two strings representing the job's in- and output paths (Tool interface dictates String[])
032     * @return An exitcode to report back if the job succeeded.
033     * @throws InterruptedException, IOException, ClassNotFoundException
034     */
035    @Override
036    public int run(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
037        Path inputPath = new Path(args[0]);
038        Path outputPath = new Path(args[1]);
039        Configuration conf = getConf();
040        Job job = Job.getInstance(conf);
041        job.setJobName("HadoopJob using " + mapper.getClass().getSimpleName());
042
043        //job.setJarByClass(this.getClass());
044        job.setInputFormatClass(NLineInputFormat.class);
045        job.setOutputFormatClass(TextOutputFormat.class);
046        NLineInputFormat.addInputPath(job, inputPath);
047        TextOutputFormat.setOutputPath(job, outputPath);
048        job.setMapperClass(mapper.getClass());
049        job.setNumReduceTasks(0); // Ensure job is map-only
050
051        // How many files should each node process at a time (how many lines are read from the input file)
052        NLineInputFormat.setNumLinesPerSplit(job, 5);
053
054        // In- and output types
055        job.setMapOutputKeyClass(NullWritable.class);
056        job.setMapOutputValueClass(Text.class);
057        job.setOutputKeyClass(NullWritable.class);
058        job.setOutputValueClass(Text.class);
059
060        return job.waitForCompletion(true) ? 0 : 1;
061    }
062}