Starting an External Hadoop MapReduce Job and Using Results in a Query

Define the map and reduce methods to input and output data structured in <key, value> pairs.

Assume you have a directory with two text files with the following contents:
  • File1.txt: Hello World Goodbye World
  • File2.txt: Goodbye World Hadoop
  1. During the mapping step, each file is worked on as a separate map job and the output from each of these maps is the following <key, value>:
    • Job1: <Hello, one> <World, one> <Goodbye, one> <World, one>
    • Job2: <Goodbye, one> <world, one> <Hadoop, one>
  2. Call the Reducer which simply adds the <key, value> outputted from the Map step. Output from the local Reducer is:
    • Job1: <Hello, one> <World, two> <Goodbye, one>
    • Job2: <Goodbye, one> <World, one> <Hadoop, one>
  3. Combine to get the final output:
    <Hello, one> <World, 3><Goodbye,2><Hadoop, 1>
     public class WordCountDriver extends Configured {
	public static void String HADOOP_ROOT_DIR = “hdfs://localhost:9000”
private Text word = new Text();
private final IntWritable one = new IntWritable(1);

static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()){
	word.set(itr.next(Token));
	context.write(word, one);
}
};

static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable > {

public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
	int sum = 0;
for (IntWritable value : values) {
	sum += value.get();
}
context.write(key, new IntWrtiable(sum));
}
};

Public static void run(String input, String output, ResultSet rs[]) throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path(“/home/mymachine/hadoop/conf/core-site.xml”));
conf.set(“fs.default.name”,”hdfs://localhost:9000”);
conf.set(“mapred.job.tracker”,”localhost:9000”);

// Specify output types
Job job = new Job(conf, “Word Count”);
Job.setOutputKeyClass(Text.class);
Job.setOutputValueClass(IntWritable.class);

// Specify input and output locations
FileInputFormat.addInputPath(job, new Path(HADOOP_ROOT_DIR+input));)
FileOutputFormat.addInputPath(job, new Path(HADOOP_ROOT_DIR+output));

// Specify a mapper
job.setMapperClass(WordCountDriver.WordCountMapper.class);

// Specify a reducer
job.setReducerClass(WordCountDriver.WordCountReducer.class);
job.setCombinerClass(WordCountDriver.WordCountReducer.class);
job.setJarByClass(WordCountDriver.class)

// Wait for MR job to complete
while (job.waitForCompletion(true) ? false : true) {
	// Waiting…
}
    HDFSclient hdfsc = new HDFSclient();
    hdfsc.readFileByLine(file, rs);
   }
}
}