MapReduce program consist of :
- Map Class : Map Function
- Reduce Class : Reduce Function
- Driver Class : Main Method
Map Function :
Takes text file as input (TextInputFormat ),
for each line of input file it emits year and temperature as output key/ value pair
I/P key --> Byteoffset ( type : LongWritable)
I/p value --> Line ( type : Text)
o/p Key -> year ( type : text)
o/p value -> temperature ( type : float)
Map Class :
public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String year = line.substring(4,9);
float temp = Float.parseFloat(line.substring(38,43));
context.write(new Text(year), new FloatWritable(temp));
}
}
Reduce function :
Takes Map output as Input and emits maximum temperature of every year as temp/year (as key /value) pair
I/p key -> year ( type : text)
I/p value -> Temprature list ( type : Float )
o/p Key -> year ( type : text)
o/p value -> temperature ( type : float)
Reduce Class:
public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable>
{
public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException
{
float maxValue = Float.MIN_VALUE ;
for (FloatWritable value : values)
{
maxValue = Math.max(maxValue,value.get());
}
context.write(key, new FloatWritable(maxValue) );
}
}
Driver ( Main method) :
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf,"MaxTemp");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(MaxTemp.class);
job.waitForCompletion(true);
}
Execution Steps in Eclipse :
Step 1 :
Open Eclipse : File -> New -> Java Project
Enter project name : MaxTemp
Press Finish
Step 2 :
Under Package Explorer window
Right click on MaxTemp New->Class
Enter under Name MaxTemp and check Public static void main ()
Finish
Step 3 :
Paste the below programe code under MaxTemp.java
import java.io.IOException;
import java.util.*;
import java.lang.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MaxTemp
{
public static class Map extends Mapper<LongWritable, Text, Text, FloatWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String year = line.substring(4,9);
float temp = Float.parseFloat(line.substring(38,43));
context.write(new Text(year), new FloatWritable(temp));
}
}
public static class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable>
{
public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException
{
float maxValue = Float.MIN_VALUE ;
for (FloatWritable value : values)
{
maxValue = Math.max(maxValue,value.get());
}
context.write(key, new FloatWritable(maxValue) );
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf,"MaxTemp");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(MaxTemp.class);
job.waitForCompletion(true);
}
}
Step 4 :
Package Explorer --> MaxTemp-->Build Path --> Configure Build Path --> Java Build Path --> libraries -> Add Library
Add following libraries and press OK
Step5 :
Under Package Explorer
MaxTemp -> Run As -> Run Configuration
Under main tab edit
Name : MaxTemp
Project: MaxTemp
Main Class : MaxTemp
Under argument tab edit
Program Arguments :
input output
Apply ->Close
Step 6 :
Create new folder as input under your MaxTemp directory in your workspace folder ( my case its home/workspace/MaxTemp)
Step 7 :
Copy the sample data ( provided below) and store as a text file under input folder.
Step 8 :
Run the program and check the result under output folder( ../MaxTemp/output).