spark에서 특정 단어의 개수를 찾는 예시이다. 이 예시는 pv, uv를 뽑는데 도움이 되는 코드이다. 



예제


 https://kodejava.org/how-do-i-format-a-date-into-ddmmyyyy/를 참조했다.

$ cat  xxx.txt

     Date date = Calendar.getInstance().getTime();


        // Display a date in day, month, year format

        DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");

        String today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with day name in a short format

        formatter = new SimpleDateFormat("EEE, dd/MM/yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with a short day and month name

        formatter = new SimpleDateFormat("EEE, dd MMM yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Formatting date with full day and month name and show time up to

        // milliseconds with AM/PM

        formatter = new SimpleDateFormat("EEEE, dd MMMM yyyy, hh:mm:ss.SSS a");

        today = formatter.format(date);

        System.out.println("Today : " + today);




scala> val codes = sc.textFile("xxx.txt")

codes: org.apache.spark.rdd.RDD[String] = xxx.txt MapPartitionsRDD[1] at textFile at <console>:24


scala> val lower = codes.map( line =>line.toLowerCase)

lower: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26


scala> val words = lower.flatMap(line => line.split("\\s+"))

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28


scala> val counts = words.map(word => (word, 1))

counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:30


scala> val frequency = counts.reduceByKey(_ + _)

frequency: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:32


scala> val invFrequency = frequency.map(_.swap)

invFrequency: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:34


scala> invFrequency.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)



이를 다음처럼 축약해서 쓸 수 있다. 



scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[21] at map at <console>:24


scala> result.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)





필요없는 코드는 다음처럼 stopWords를 만들어 필터링할 수 있다. 


scala> val stopWords = Set("", "=", "//", ")", "(", ";", ":", "+", "-", "\"")

stopWords: scala.collection.immutable.Set[String] = Set("", =, ), ", -, ;, //, +, (, :)


scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).filter(! stopWords.contains(_)).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at map at <console>:26


scala> result.top(10).foreach(println)

(6,date)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)

(4,formatter)

(3,name)

(3,display)



Posted by '김용환'
,