//printenv
// JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera
// PATH=/usr/java/jdk1.7.0_67-cloudera/bin
// export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
 
// hadoop com.sun.tools.javac.Main WordCount3.java
// jar cf WordCount2.jar WordCount2*.class
// hadoop jar WordCount2.jar WordCount2 big.txt outWCBig
// hadoop jar WordCount2.jar WordCount2 t8.shakespeare.txt outWCShake
 
// t8.shakespeare
 
// Remove the previous results.
// $ hadoop fs -rm -r -f /user/cloudera/wordcount/output
 
 
import java.io.* ;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.jobWordCount;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
 
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 
public class WordCount2 extends Configured implements Tool
{
	private static final Logger LOG = Logger.getLogger(WordCount2.class);
 
	public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCount2(), args);
    }
 
	public static boolean isNullOrEmpty(String str) {
        if(str != null && !str.trim().isEmpty())
            return false;
        return true;
    }
 
 
  public static class PunctuationMapper 
      extends Mapper<Object, Text, NullWritable, Text> {
 
    private Text punctd = new Text();  
	//private static fin//al String PunctuationMarks="\"\'\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
	private static final String PunctuationMarks="\"\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
	//DON'T
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString();
	  word=word.replaceAll("-", "");
	  //word=word.replaceAll("'", "");
	  word=word.replaceAll("["+PunctuationMarks+"]", " ");
	  //word = word.replaceAll("[^a-zA-Z\\s+]", " ").toLowerCase();
	  //word = curr.trim();
	  //punctd.set(Regex.Replace(word.toString(),[^\w\d\s-]," "));
	  //String word_punc=word.replaceAll("[^\\w\\d\\s-]", " ");
	  //word=word.replace( "/\s\s+/g", "" );//     ->  390ms
	  punctd.set(word);      
	  context.write(new IntWritable(1), punctd);	  
	  }
  } 
 
   public static class TrimMapper 
      extends Mapper<Object, Text, NullWritable, Text> {
 
    private Text trimd = new Text();  	
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String word= value.toString().trim();	  	  
	  //word=word.replaceAll( "/\s\s+/g", "" );//     ->  390ms
      word=word.replaceAll("^ +| +$|( )+", "$1");
	  trimd.set(word);      
	  context.write(new IntWritable(1), trimd);	  
	  }
  }   
   public static class LowerCaseMapper 
      extends Mapper<Object, Text, NullWritable, Text> {
 
    private Text lowercased = new Text();
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        lowercased.set(value.toString().toLowerCase());
        context.write(new IntWritable(1), lowercased);
    }
  }  
 
  public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{
 
	private static final java.util.regex.Pattern WORD_BOUNDARY = java.util.regex.Pattern.compile("\\s");
 
	private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
 
    public void map(IntWritable key, Text lineText, Context context
                    ) throws IOException, InterruptedException {
	  String line = lineText.toString();
      Text currentWord = new Text();
      for (String word : WORD_BOUNDARY.split(line)) {
        if (WordCount2.isNullOrEmpty(word)) {
            continue;
        }
            currentWord = new Text(word);
            context.write(currentWord,one);
        }      
    }
  }
 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key_word, Iterable<IntWritable> counts,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable count : counts) {
        sum += count.get();
      }
      result.set(sum);
      context.write(key_word, result);
    }
  }
  //////////////////////////////////////
  public static class TopTenMapper extends Mapper<Object, Text, NullWritable, Text>
    {
        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>(); //Collections.reverseOrder()
 
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {
            // (word, count) tuple
            String[] words = value.toString().split("\t") ;
                if (words.length < 2) {
                    return;
            }
 
            topN.put(Integer.parseInt(words[1]), new Text(value));
 
            if (topN.size() > 10) {
					//topN.remove(topN.lastKey());
            }
        }	
 
 
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            for (Text t : topN.values()) {
                context.write(NullWritable.get(), t);
            }
        }
    }
 
  public static class TopTenReducer extends
            Reducer<NullWritable, Text, NullWritable, Text> {
 
        private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>();
 
        @Override
        public void reduce(NullWritable key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                String[] words = value.toString().split("\t") ;
 
                topN.put(Integer.parseInt(words[1]), new Text(value));
 
                if (topN.size() > 10) {
                }
            }
 
            for (Text word : topN.descendingMap().values()) {
                context.write(NullWritable.get(), word);
            }
        }
    }
  //////////////////////////////////////
 
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
		//////////////
		FileSystem fs = FileSystem.get(conf);
		Path tmpPath = new Path("/w1/tmp");
		fs.delete(tmpPath, true);
 
		Path inputPath = new Path(args[0]);
		//Path partitionFile = new Path(args[1] + "_partitions.lst");
		//Path outputStage = new Path(args[1] + "_staging");
		Path outputStage = new Path("/w1/tmp");
		Path outputOrder = new Path(args[1]);
		//////////////
 
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
 
		// creating a word count jobWordCount
        jobWordCount jobWordCount = jobWordCount.getInstance(conf,"wordcount");
		//jobWordCount jobWordCount = jobWordCount.getInstance(getConf(), "wordcount");
        //jobWordCount.setJarByClass(WordCount2.class);
		jobWordCount.setJarByClass(this.getClass());
    // Use TextInputFormat, the default unless jobWordCount.setInputFormatClass is used
 
		//public static class PunctuationMapper 
		//extends Mapper<Object, Text, NullWritable, Text> 
		Configuration punctuationMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          PunctuationMapper.class,
          Object.class, Text.class,
          IntWritable.class, Text.class,
          punctuationMapperConf);
 
		  //public static class TrimMapper 
      //extends Mapper<Object, Text, NullWritable, Text>
		  Configuration trimMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          TrimMapper.class,
          Object.class, Text.class,
          IntWritable.class, Text.class,
          trimMapperConf);
 
		//public static class LowerCaseMapper 
      //extends Mapper<Object, Text, NullWritable, Text> 		
		Configuration lowerCaseMapperConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          LowerCaseMapper.class,
          Object.class, Text.class,
		  //IntWritable.class, Text.class,
          IntWritable.class, Text.class,
          lowerCaseMapperConf);  
 
		//public static class TokenizerMapper
      // extends Mapper<IntWritable, Text, Text, IntWritable>
        Configuration tokenizerConf = new Configuration(false);
        ChainMapper.addMapper(jobWordCount,
          TokenizerMapper.class,
          IntWritable.class,Text.class,
          Text.class, IntWritable.class,
          tokenizerConf);
 
		  //public static class IntSumReducer
       //extends Reducer<Text,IntWritable,Text,IntWritable>	   
        jobWordCount.setReducerClass(IntSumReducer.class);      
		jobWordCount.setOutputKeyClass(Text.class);
        jobWordCount.setOutputValueClass(IntWritable.class);
 
        //FileInputFormat.addInputPath(jobWordCount, new Path(args[0]));
        TextInputFormat.setInputPaths(jobWordCount, inputPath);
 
		//FileOutputFormat.setOutputPath(jobWordCount, new Path(args[1]));
		FileOutputFormat.setOutputPath(jobWordCount, tmpPath);
		// Set the output format to a sequence file
		//jobWordCount.setOutputFormatClass(SequenceFileOutputFormat.class);
		//SequenceFileOutputFormat.setOutputPath(jobWordCount, outputStage);
 
		int code = jobWordCount.waitForCompletion(true) ? 0 : 1;
 
		if (code == 0) {
 
			//Now that we have extracted column to sort
 
			Job orderJob = new Job(conf, "TopWords");
			orderJob.setJarByClass(WordCount2.class);
 
			// Here, use the identity mapper to output the key/value pairs in
			// the SequenceFile
			orderJob.setMapperClass(TopTenMapper.class);
			orderJob.setReducerClass(TopTenReducer.class);
			//********************
			//public static class TopTenMapper 
			//extends Mapper<Object, Text, NullWritable, Text>
 
			jobB.setMapOutputKeyClass(NullWritable.class);
			jobB.setMapOutputValueClass(Text.class);
			//********************
			// Set the number of reduce tasks to an appropriate number for the
			// amount of data being sorted
			orderJob.setNumReduceTasks(10);
			// Use Hadoop's TotalOrderPartitioner class
			//orderJob.setPartitionerClass(TotalOrderPartitioner.class);
			// Set the partition file
			//TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
			//		partitionFile);
			//********************
			//public static class TopTenReducer 
			//extends Reducer<NullWritable, Text, NullWritable, Text>
 
			//orderJob.setOutputKeyClass(Text.class);
			//orderJob.setOutputValueClass(IntWritable.class);  
			//********************		
			orderJob.setOutputKeyClass(NullWritable.class);
			orderJob.setOutputValueClass(Text.class);
			//********************
			// Set the input to the previous job's output
			//orderJob.setInputFormatClass(SequenceFileInputFormat.class);
			orderJob.setInputFormatClass(KeyValueTextInputFormat.class);
			orderJob.setOutputFormatClass(TextOutputFormat.class);
 
			//SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
			// Set the output path to the command line parameter
			TextOutputFormat.setOutputPath(orderJob, outputOrder);
			// Set the separator to an empty string
			//orderJob.getConfiguration().set(
			//		"mapred.textoutputformat.separator", "");
			// Use the InputSampler to go through the output of the previous
			// job, sample it, and create the partition file
			//InputSampler.writePartitionFile(orderJob,
			//		new InputSampler.RandomSampler(.1, 10000));
			  FileInputFormat.setInputPaths(orderJob, tmpPath);
			  FileOutputFormat.setOutputPath(orderJob, new Path(args[1]));
 
			  // Submit the job
			code = orderJob.waitForCompletion(true) ? 0 : 2;
		}
 
		// Clean up the partition file and the staging directory
		// FileSystem.get(new Configuration()).delete(partitionFile, false);
		// FileSystem.get(new Configuration()).delete(outputStage, true);
 
        return (jobWordCount.waitForCompletion(true) ? 0 : 1);
    }   
}