fork download
  1. import java.io.IOException;
  2.  
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.BytesWritable;
  9. import org.apache.hadoop.io.IOUtils;
  10. import org.apache.hadoop.io.LongWritable;p
  11. import org.apache.hadoop.io.NullWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.InputSplit;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.JobContext;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.RecordReader;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  20. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  21. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  22. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. import org.apache.hadoop.util.Tool;
  25. import org.apache.hadoop.util.ToolRunner;
  26. import org.jsoup.Jsoup;
  27.  
  28. public class MapReduceProgram1 extends Configured implements Tool{
  29.  
  30. public static void main(String[] args) throws Exception {
  31. int exitCode = ToolRunner.run(new MapReduceProgram1(), args);
  32. System.exit(exitCode);
  33. }
  34.  
  35. @Override
  36. public int run(String[] args) throws Exception {
  37. if (args.length != 2) {
  38. System.err.printf("Usage: %s [generic options] <input> <output>\n",
  39. getClass().getSimpleName());
  40. ToolRunner.printGenericCommandUsage(System.err);
  41. return -1;
  42. }
  43.  
  44. Job job = new Job(getConf());
  45. job.setJarByClass(getClass());
  46. job.setJobName(getClass().getSimpleName());
  47.  
  48. //WholeFileInputFormat.addInputPath(job, new Path(args[0]));
  49. FileInputFormat.addInputPath(job, new Path(args[0]));
  50. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  51.  
  52. job.setMapperClass(MyMapper.class);
  53. job.setReducerClass(MyReducer.class);
  54.  
  55. job.setOutputKeyClass(Text.class);
  56. job.setOutputValueClass(LongWritable.class);
  57. return job.waitForCompletion(true) ? 0 : 1;
  58. }
  59.  
  60. class MyMapper extends Mapper<NullWritable,BytesWritable,Text,LongWritable>{
  61.  
  62. private Text word = new Text();
  63. private LongWritable count = new LongWritable(1);
  64.  
  65. @Override
  66. public void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException{
  67. String html = value.toString();
  68. String text = Jsoup.parse(html).text();
  69.  
  70. String[] keywords = {"education", "politics", "sports", "agriculture"};
  71. for(int i=0; i<keywords.length; i++){
  72. text = text.replaceAll("(?i)"+keywords[i], keywords[i]);
  73.  
  74. int lastIndex = 0;
  75.  
  76. do{
  77. lastIndex = text.indexOf(text,lastIndex);
  78. if (lastIndex != -1){
  79. word.set(keywords[i]);
  80. context.write(word, count);
  81. lastIndex += keywords[i].length();
  82. }
  83. }
  84. while(lastIndex != -1);
  85. }
  86.  
  87. }
  88. }
  89.  
  90. class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
  91.  
  92. private LongWritable result = new LongWritable();
  93.  
  94. @Override
  95. public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
  96. long sum = 0;
  97. for(LongWritable val: values){
  98. sum += val.get();
  99. }
  100. result.set(sum);
  101. context.write(key, result);
  102. }
  103. }
  104.  
  105. class NonSplittableTextInputFormat extends TextInputFormat {
  106. @Override
  107. protected boolean isSplitable(JobContext context, Path file) {
  108. return false;
  109. }
  110. }
  111.  
  112. class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
  113.  
  114. @Override
  115. protected boolean isSplitable(JobContext context, Path file) {
  116. return false;
  117. }
  118.  
  119. @Override
  120. public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  121. WholeFileRecordReader reader = new WholeFileRecordReader();
  122. reader.initialize(split, context);
  123. return reader;
  124. }
  125. }
  126.  
  127. class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
  128. private FileSplit fileSplit;
  129. private Configuration conf;
  130. private BytesWritable value = new BytesWritable();
  131. private boolean processed = false;
  132.  
  133. @Override
  134. public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  135. this.fileSplit = (FileSplit) split;
  136. this.conf = context.getConfiguration();
  137. }
  138.  
  139. @Override
  140. public boolean nextKeyValue() throws IOException, InterruptedException {
  141. if (!processed) {
  142. byte[] contents = new byte[(int) fileSplit.getLength()];
  143. Path file = fileSplit.getPath();
  144. FileSystem fs = file.getFileSystem(conf);
  145. FSDataInputStream in = null;
  146.  
  147. try {
  148. in = fs.open(file);
  149. IOUtils.readFully(in, contents, 0, contents.length);
  150. value.set(contents, 0, contents.length);
  151. } finally {
  152. IOUtils.closeStream(in);
  153. }
  154. processed = true;
  155. return true;
  156. }
  157. return false;
  158. }
  159.  
  160. @Override
  161. public NullWritable getCurrentKey() throws IOException, InterruptedException {
  162. return NullWritable.get();
  163. }
  164.  
  165. @Override
  166. public BytesWritable getCurrentValue() throws IOException, InterruptedException {
  167. return value;
  168. }
  169.  
  170. @Override
  171. public float getProgress() throws IOException {
  172. return processed ? 1.0f : 0.0f;
  173. }
  174.  
  175. @Override
  176. public void close() throws IOException {
  177. // do nothing
  178. }
  179. }
  180. }
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
Main.java:10: error: class, interface, or enum expected
import org.apache.hadoop.io.LongWritable;p
                                         ^
1 error
stdout
Standard output is empty