import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class BinarySort {
public static class BinaryMapper extends Mapper<NullWritable, BytesWritable, BytesWritable, NullWritable> {
public void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class BinaryReducer extends Reducer<BytesWritable, NullWritable, BytesWritable, NullWritable> {
public void reduce(BytesWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Binary Sort");
job.setJarByClass(BinarySort.class);
job.setMapperClass(BinaryMapper.class);
job.setReducerClass(BinaryReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(BinaryInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.
exit(job.
waitForCompletion(true) ? 0 : 1); }
}
class BinaryInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new BinaryRecordReader();
}
}