fork download
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkConf
  3. import SparkContext._
  4. import java.io.File
  5. import java.net.URI
  6. import java.util.regex.Matcher;
  7. import java.util.regex.Pattern;
  8. import java.util.Scanner;
  9.  
  10. object Main {
  11.  
  12. def main(args: Array[String]) {
  13. val appName = "SparkSpec"
  14. val jars = List(SparkContext.jarOfObject(this).get)
  15. println(jars)
  16. val conf = new SparkConf().setAppName(appName).setJars(jars)
  17. val sc = new SparkContext(conf)
  18. if (args.length < 2) {
  19. println(args.mkString(","))
  20. println("ERROR. Please, specify input and output directories.")
  21. } else {
  22. val inputDir = args(0)
  23. val outputDir = args(1)
  24. println("Input directory: " + inputDir)
  25. println("Output directory: " + outputDir)
  26. run(sc, inputDir, outputDir)
  27. }
  28. }
  29.  
  30. val GLUSTERFS_MOUNT_PATH = "/mnt/root"
  31. val FILENAME_PATTERN = "([0-9]{5})([dijkw])([0-9]{4})\\.txt\\.gz".r
  32. val LINE_PATTERN = "([0-9]{4} [0-9]{2} [0-9]{2} [0-9]{2} [0-9]{2})(.*)".r
  33. val VARIABLE_NAMES = Array("d", "i", "j", "k", "w")
  34.  
  35. def toGlusterfsPath(path: String): String = path.replace(GLUSTERFS_MOUNT_PATH, "")
  36. def toGlusterfsPath(file: File): String = toGlusterfsPath(file.getAbsolutePath())
  37.  
  38. def makeLines( fileNum: String, s: String, array: Array[String] ) = {
  39. var result: Array[(String, String, String)] = new Array[(String, String, String)](array.length);
  40. for (i <- 0 to array.length-1 ){
  41. result(i)= (fileNum, s, array(i) );
  42. }
  43. result
  44. }
  45.  
  46.  
  47. def parseString(s: String) = {
  48. var scanner : Scanner = new Scanner(s);
  49. var string : String = new String("[");
  50. if ( scanner.hasNextFloat() ){
  51. string+=scanner.nextFloat();
  52. }
  53. while(scanner.hasNextFloat()){
  54. string+= ", " + scanner.nextFloat();
  55. }
  56. string+="]"
  57. string
  58. }
  59.  
  60. def run(sc: SparkContext, inputDir: String, outputDir: String) {
  61.  
  62. sc
  63. .wholeTextFiles(toGlusterfsPath(inputDir + "/*.gz"))
  64. //glusterfs:/datasets/ndbc-small/41013i2012.txt.gz textfromfile
  65. .map(it => ((new File(new File(it._1).toURI().getPath()).getName()), it._2) )
  66. //45005w2012.txt.gz textfromfile
  67. .map( it => (FILENAME_PATTERN.pattern.matcher(it._1), it) )
  68. .filter(it => it._1.matches())
  69. .map( it => ( (it._1.group(1) , it._1.group(2)), it._2._2) )
  70. // (45005, w) textfromfile
  71. .map( it => (it._1, it._2.split("\\n")) )
  72. // (45005, w) lines
  73. .flatMap( it => makeLines(it._1._1, it._1._2, it._2) )
  74. // 45005 w line
  75. .map( it => (LINE_PATTERN.pattern.matcher(it._3), it) )
  76. // matcher ( 45005 w line )
  77. .filter( it => it._1.matches() )
  78. .map( it => (( it._2._1, it._1.group(1), it._2._2 ),( it._1.group(2))))
  79. // ( 45005, date, w) ( 38473 9 8453 2 98)
  80. .groupByKey()
  81. // ( 45005, date ,w) [ (38473 9 8453 2 98)
  82. //(38473 9 8453 2 98)(38473 9 8453 2 98) ...
  83. .map(it => ( (it._1._1, it._1._2), (it._1._3, it._2.iterator.next())) )
  84. //(45005, date)( w, 495 34957 349857 )
  85. .groupByKey()
  86. // ( 45005, date) [ (i, 38473 9 8453 2 98) (j, 38473 9 8453 2 98) (k, 38473 9 8453 2 98)
  87. //(w, 38473 9 8453 2 98) (d, 38473 9 8453 2 98) ]
  88. .filter( it => it._2.size == 5 )
  89. // ( 45005, date) [ (i, 38473 9 8453 2 98) (j, 38473 9 8453 2 98) (k, 38473 9 8453 2 98)
  90. //(w, 38473 9 8453 2 98) (d, 38473 9 8453 2 98) ]
  91.  
  92. .map( it => (it._1._2, it._2.toMap) )
  93. // date Map: [ (w, 38473 9 8453 2 98) (w, 38473 9 8453 2 98) (w, 38473 9 8453 2 98)
  94. //(w, 38473 9 8453 2 98) (w, 38473 9 8453 2 98) ]
  95.  
  96. .sortByKey()
  97. //sorted by date
  98.  
  99. .map( it => (it._1, it._2.get("i"), it._2.get("j"), it._2.get("k"), it._2.get("w"), it._2.get("d") ) )
  100. //упорядочены по ijkwd'
  101. .map(set => (set._1, set._2.getOrElse("[]"), set._3.getOrElse("[]"), set._4.getOrElse("[]"),
  102. set._5.getOrElse("[]"),set._6.getOrElse("[]")) )
  103. .map( set => (set._1, parseString(set._2), parseString(set._3), parseString(set._4), parseString(set._5), parseString(set._6)) )
  104. .map( set => set._1 + "\t[" + "i=" + set._2 + "," + "j=" + set._3 + "," + "k=" + set._4 + "," +
  105. "w=" + set._5 + "," + "d=" + set._6 + "]")
  106.  
  107. .saveAsTextFile(outputDir)
  108.  
  109. }
  110.  
  111. }
  112.  
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
Main.java:1: error: ';' expected
import org.apache.spark.SparkContext
                                    ^
Main.java:2: error: ';' expected
import org.apache.spark.SparkConf
                                 ^
Main.java:3: warning: '_' used as an identifier
import SparkContext._
                    ^
  (use of '_' as an identifier might not be supported in releases after Java SE 8)
Main.java:3: error: ';' expected
import SparkContext._
                     ^
Main.java:4: error: ';' expected
import java.io.File
                   ^
Main.java:5: error: ';' expected
import java.net.URI
                   ^
Main.java:10: error: class, interface, or enum expected
object Main {
^
Main.java:40: error: class, interface, or enum expected
		for (i <- 0 to array.length-1 ){ 
		^
Main.java:42: error: class, interface, or enum expected
		}
		^
Main.java:49: error: class, interface, or enum expected
		var string : String = new String("[");
		^
Main.java:50: error: class, interface, or enum expected
		if ( scanner.hasNextFloat() ){
		^
Main.java:52: error: class, interface, or enum expected
		}				
		^
Main.java:55: error: class, interface, or enum expected
		}
		^
12 errors
1 warning
stdout
Standard output is empty