spark에서 특정 단어의 개수를 찾는 예시이다. 이 예시는 pv, uv를 뽑는데 도움이 되는 코드이다.
예제
https://kodejava.org/how-do-i-format-a-date-into-ddmmyyyy/를 참조했다.
$ cat xxx.txt
Date date = Calendar.getInstance().getTime();
// Display a date in day, month, year format
DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");
String today = formatter.format(date);
System.out.println("Today : " + today);
// Display date with day name in a short format
formatter = new SimpleDateFormat("EEE, dd/MM/yyyy");
today = formatter.format(date);
System.out.println("Today : " + today);
// Display date with a short day and month name
formatter = new SimpleDateFormat("EEE, dd MMM yyyy");
today = formatter.format(date);
System.out.println("Today : " + today);
// Formatting date with full day and month name and show time up to
// milliseconds with AM/PM
formatter = new SimpleDateFormat("EEEE, dd MMMM yyyy, hh:mm:ss.SSS a");
today = formatter.format(date);
System.out.println("Today : " + today);
scala> val codes = sc.textFile("xxx.txt")
codes: org.apache.spark.rdd.RDD[String] = xxx.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> val lower = codes.map( line =>line.toLowerCase)
lower: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26
scala> val words = lower.flatMap(line => line.split("\\s+"))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28
scala> val counts = words.map(word => (word, 1))
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:30
scala> val frequency = counts.reduceByKey(_ + _)
frequency: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:32
scala> val invFrequency = frequency.map(_.swap)
invFrequency: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:34
scala> invFrequency.top(10).foreach(println)
(23,)
(9,=)
(6,date)
(5,//)
(4,with)
(4,today);)
(4,today)
(4,system.out.println("today)
(4,new)
(4,formatter.format(date);)
이를 다음처럼 축약해서 쓸 수 있다.
scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)
result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[21] at map at <console>:24
scala> result.top(10).foreach(println)
(23,)
(9,=)
(6,date)
(5,//)
(4,with)
(4,today);)
(4,today)
(4,system.out.println("today)
(4,new)
(4,formatter.format(date);)
필요없는 코드는 다음처럼 stopWords를 만들어 필터링할 수 있다.
scala> val stopWords = Set("", "=", "//", ")", "(", ";", ":", "+", "-", "\"")
stopWords: scala.collection.immutable.Set[String] = Set("", =, ), ", -, ;, //, +, (, :)
scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).filter(! stopWords.contains(_)).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)
result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at map at <console>:26
scala> result.top(10).foreach(println)
(6,date)
(4,with)
(4,today);)
(4,today)
(4,system.out.println("today)
(4,new)
(4,formatter.format(date);)
(4,formatter)
(3,name)
(3,display)