Hadoop is good for processing small number of large files rather large numbers of small files. It splits large input files into so called input splits and each part is process parallely by different mapper. For example inside hadoop cluster of 256MB block size, a 1GB input file files is split into 4 parts and 4 mapper processes them parallaly.
What if files are small say 10 MB, in this case each file is processed using separate mapper resulting into large number of mapper hence makes hadoop underutilized.
Solution is to combine small size files as input to single mapper by using combine input format. Combine input format should be custom implemented.
Combine input format use simple logic that "The key to map function is combination of filename and byteoffset" .
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
//two varible as key filename, and byteoffset
public class FileLineWritable implements WritableComparable<FileLineWritable>
{
public long offset;
public String fileName;
public void readFields(DataInput in) throws IOException
{
this.offset = in.readLong();
this.fileName = Text.readString(in);
}
public void write(DataOutput out) throws IOException
{
out.writeLong(offset);
Text.writeString(out, fileName);
}
//Compare file name first then offset
public int compareTo(FileLineWritable that)
{
int cmp = this.fileName.compareTo(that.fileName);
if (cmp != 0)
{
return cmp;
}
return (int) Math.signum((double) (this.offset - that.offset));
}
@Override
public int hashCode()
{ // generated hashCode()
final int prime = 31;
int result = 1;
result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
result = prime * result + (int) (offset ^ (offset >>> 32));
return result;
}
@Override
public boolean equals(Object obj)
{ // generated equals()
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (getClass() != obj.getClass())
{
return false;
}
FileLineWritable other = (FileLineWritable) obj;
if (fileName == null)
{
if (other.fileName != null)
{
return false;
}
}
else if (!fileName.equals(other.fileName))
{
return false;
}
if (offset != other.offset)
{
return false;
}
return true;
}
}
Now customized combine file input format needs to be implemented which uses FileLineWritable (as key) and extends CombineFileInputFormat as shown below-
public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text>
{
public CFInputFormat()
{
super();
setMaxSplitSize(67108864); // 64 MB, default block size on hadoop
}
public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException
{
return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit) split, context, CFRecordReader.class);
}
@Override
protected boolean isSplitable(JobContext context, Path file)
{
return false;
}
}
Also Note that "isSplitable" value is false to prevent input files to splits further, by default its true.
Next is to implement custom record reader to read record line by line.
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
public class CFRecordReader extends RecordReader<FileLineWritable, Text>
{
private long startOffset;
private long end;
private long pos;
private FileSystem fs;
private Path path;
private FileLineWritable key;
private Text value;
private FSDataInputStream fileIn;
private LineReader reader;
public CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException
{
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
fileIn = fs.open(path);
reader = new LineReader(fileIn);
this.pos = startOffset;
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException,
InterruptedException
{
// Won't be called, use custom Constructor
// `CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)`
// instead
}
@Override
public void close() throws IOException
{
}
@Override
public float getProgress() throws IOException
{
if (startOffset == end)
{
return 0;
}
return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset));
}
@Override
public FileLineWritable getCurrentKey() throws IOException,
InterruptedException
{
return key;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException
{
return value;
}
@Override
public boolean nextKeyValue() throws IOException
{
if (key == null)
{
key = new FileLineWritable();
key.fileName = path.getName();
}
key.offset = pos;
if (value == null)
{
value = new Text();
}
int newSize = 0;
if (pos < end)
{
newSize = reader.readLine(value);
pos += newSize;
}
if (newSize == 0)
{
key = null;
value = null;
return false;
}
else
{
return true;
}
}
}
The complete code can be downloaded from my Github repo for MaxTemprature problem.
:) learning goes on forever