import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;p
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jsoup.Jsoup;
public class MapReduceProgram1 extends Configured implements Tool{
int exitCode = ToolRunner.run(new MapReduceProgram1(), args);
}
@Override
if (args.length != 2) {
System.
err.
printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.
printGenericCommandUsage(System.
err); return -1;
}
Job job = new Job(getConf());
job.setJarByClass(getClass());
job.setJobName(getClass().getSimpleName());
//WholeFileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
class MyMapper extends Mapper<NullWritable,BytesWritable,Text,LongWritable>{
private Text word = new Text();
private LongWritable count = new LongWritable(1);
@Override
String html
= value.
toString(); String text
= Jsoup.
parse(html
).
text();
String[] keywords
= {"education",
"politics",
"sports",
"agriculture"}; for(int i=0; i<keywords.length; i++){
text = text.replaceAll("(?i)"+keywords[i], keywords[i]);
int lastIndex = 0;
do{
lastIndex = text.indexOf(text,lastIndex);
if (lastIndex != -1){
word.set(keywords[i]);
context.write(word, count);
lastIndex += keywords[i].length();
}
}
while(lastIndex != -1);
}
}
}
class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
private LongWritable result = new LongWritable();
@Override
long sum = 0;
for(LongWritable val: values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
class NonSplittableTextInputFormat extends TextInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader
<NullWritable, BytesWritable
> createRecordReader
(InputSplit split, TaskAttemptContext context
) throws IOException,
InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
return NullWritable.get();
}
@Override
return value;
}
@Override
return processed ? 1.0f : 0.0f;
}
@Override
// do nothing
}
}
}