001package dk.netarkivet.common.utils.hadoop; 002 003import java.io.IOException; 004 005import org.apache.hadoop.conf.Configuration; 006import org.apache.hadoop.conf.Configured; 007import org.apache.hadoop.fs.Path; 008import org.apache.hadoop.io.LongWritable; 009import org.apache.hadoop.io.NullWritable; 010import org.apache.hadoop.io.Text; 011import org.apache.hadoop.mapreduce.Job; 012import org.apache.hadoop.mapreduce.Mapper; 013import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 014import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 015import org.apache.hadoop.util.Tool; 016 017/** 018 * A simple generic Hadoop map-only tool that runs a given mapper on the passed input file 019 * containing new-line separated file paths and outputs the job's resulting files in the passed output path 020 */ 021public class HadoopJobTool extends Configured implements Tool { 022 private Mapper<LongWritable, Text, NullWritable, Text> mapper; 023 024 public HadoopJobTool(Configuration conf, Mapper<LongWritable, Text, NullWritable, Text> mapper) { 025 super(conf); 026 this.mapper = mapper; 027 } 028 029 /** 030 * Method for running the tool/job. 031 * @param args Expects two strings representing the job's in- and output paths (Tool interface dictates String[]) 032 * @return An exitcode to report back if the job succeeded. 033 * @throws InterruptedException, IOException, ClassNotFoundException 034 */ 035 @Override 036 public int run(String[] args) throws InterruptedException, IOException, ClassNotFoundException { 037 Path inputPath = new Path(args[0]); 038 Path outputPath = new Path(args[1]); 039 Configuration conf = getConf(); 040 Job job = Job.getInstance(conf); 041 job.setJobName("HadoopJob using " + mapper.getClass().getSimpleName()); 042 043 //job.setJarByClass(this.getClass()); 044 job.setInputFormatClass(NLineInputFormat.class); 045 job.setOutputFormatClass(TextOutputFormat.class); 046 NLineInputFormat.addInputPath(job, inputPath); 047 TextOutputFormat.setOutputPath(job, outputPath); 048 job.setMapperClass(mapper.getClass()); 049 job.setNumReduceTasks(0); // Ensure job is map-only 050 051 // How many files should each node process at a time (how many lines are read from the input file) 052 NLineInputFormat.setNumLinesPerSplit(job, 5); 053 054 // In- and output types 055 job.setMapOutputKeyClass(NullWritable.class); 056 job.setMapOutputValueClass(Text.class); 057 job.setOutputKeyClass(NullWritable.class); 058 job.setOutputValueClass(Text.class); 059 060 return job.waitForCompletion(true) ? 0 : 1; 061 } 062}