spark-shell을 이용해  "랜덤 포레스트를 이용한 MNIST 데이터셋 분류" 예를 공부한다. 



이 섹션에서는 랜덤 포레스트를 사용한 분류 예를 소개할 것이다. 코드를 단계별로 분석 해결책을 쉽게 이해할 수 있다.


1단계. LIVSVM 포맷으로 MNIST 데이터셋을 로드하고 파싱한다.



import org.apache.spark.mllib.util.MLUtils

// LIBSVM 포맷의 트레이닝 데이터를 로드한다.

val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")




2단계. 트레이닝과 테스트 셋을 준비한다.

데이터를 트레이닝 셋(75%)과 테스트 셋(25%)으로 나누고 재현하기 위해 다음처럼 시드를 설정한다.



val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)

val training = splits(0).cache()

val test = splits(1)




모델을 구축하기 위해 트레이닝 알고리즘을 실행한다.


빈 categoricalFeaturesInfo를 사용해 랜덤 포레스트 모델을 트레이닝시킨다. 모든 피쳐가 데이터셋에서 연속적이기 때문에 관련 작업이 필요하다.




import org.apache.spark.mllib.tree.RandomForest



val numClasses = 10 //MNIST 데이터 셋의 클래스의 개수

val categoricalFeaturesInfo = Map[Int, Int]()

val numTrees = 50 // 실제 상황에서는 더 큰 수를 사용한다. 값이 더 클수록 좋다.

val featureSubsetStrategy = "auto" // 알고리즘을 선택한다.

val impurity = "gini" // 이전에 설명한 랜덤 포레스트를 설명한 노트를 살펴보라.

val maxDepth = 30 // 실제 상황에서는 값이 더 클수록 좋다.

val maxBins = 32 // 실제 상황에서는 값이 더 클수록 좋다.

val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)





랜덤 포레스트 모델을 트레이닝하는 것은 매우 자원이 소비되는 작업이다. 따라서 더 많은 메모리가 필요하므로 OOM이 발생되지 않도록 주의해야 한다. 



이전에 언급한 성능 메트릭을 사용해 다음처럼 모델을 평가할 수 있도록 테스트 셋의 원 점수를 계산한다.


val scoreAndLabels = test.map { point =>

 val score = model.predict(point.features)

 (score, point.label)

}




평가를 위해 다중 클래스에 대한 메트릭을 초기화한다.


import org.apache.spark.mllib.evaluation.MulticlassMetrics

val metrics = new MulticlassMetrics(scoreAndLabels)



혼동 행렬을 생성한다.



println(metrics.confusionMatrix)


1498.0  0.0     3.0     2.0     0.0     2.0     4.0     0.0     12.0    0.0

0.0     1736.0  8.0     1.0     2.0     1.0     2.0     4.0     0.0     2.0

7.0     0.0     1424.0  2.0     3.0     1.0     5.0     12.0    10.0    4.0

0.0     3.0     20.0    1507.0  0.0     19.0    2.0     13.0    19.0    9.0

3.0     0.0     5.0     0.0     1416.0  0.0     2.0     4.0     4.0     29.0

10.0    2.0     1.0     21.0    4.0     1272.0  14.0    3.0     13.0    1.0

6.0     2.0     0.0     0.0     1.0     9.0     1456.0  0.0     4.0     0.0

2.0     1.0     6.0     0.0     9.0     1.0     0.0     1578.0  8.0     18.0

2.0     6.0     7.0     9.0     5.0     10.0    5.0     2.0     1398.0  10.0

7.0     3.0     0.0     22.0    16.0    4.0     1.0     15.0    13.0    1404.0





이전 코드는 분류를 위해 다음과 같은 혼동 행렬을 출력한다.




이제 모델의 성능을 판단하기 위해 전체 통계를 계산하자.


정확도, 정밀도, 회수율, 참 긍정 비율, 거짓 긍정 비율, F1 점수와 같은 성능 메트릭을 포함하는 다음 출력을 생성한다.



val accuracy = metrics.accuracy

println("Summary Statistics")

println(s"Accuracy = $accuracy")

// 레이블 당 정확도

val labels = metrics.labels

labels.foreach { l =>

 println(s"Precision($l) = " + metrics.precision(l))

}

// 레이블 당 회수율

labels.foreach { l =>

 println(s"Recall($l) = " + metrics.recall(l))

}

// 레이블 당 거짓 긍정 비율

labels.foreach { l =>

 println(s"FPR($l) = " + metrics.falsePositiveRate(l))

}

// 레이블 당 F-측정 값

labels.foreach { l =>

 println(s"F1-Score($l) = " + metrics.fMeasure(l))





실제 실행 결과는 다음과 같다.


scala> val accuracy = metrics.accuracy

accuracy: Double = 0.967591067782096


scala> println("Summary Statistics")

Summary Statistics


scala> println(s"Accuracy = $accuracy")

Accuracy = 0.967591067782096


scala> // 레이블 당 정확도


scala> val labels = metrics.labels

labels: Array[Double] = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)


scala> labels.foreach { l =>

     |  println(s"Precision($l) = " + metrics.precision(l))

     | }

Precision(0.0) = 0.9758957654723127

Precision(1.0) = 0.9903023388476897

Precision(2.0) = 0.966078697421981

Precision(3.0) = 0.9635549872122762

Precision(4.0) = 0.9725274725274725

Precision(5.0) = 0.9643669446550417

Precision(6.0) = 0.9765258215962441

Precision(7.0) = 0.967504598405886

Precision(8.0) = 0.9439567859554355

Precision(9.0) = 0.950575490859851


scala> // 레이블 당 회수율


scala> labels.foreach { l =>

     |  println(s"Recall($l) = " + metrics.recall(l))

     | }

Recall(0.0) = 0.9848783694937541

Recall(1.0) = 0.9886104783599089

Recall(2.0) = 0.9700272479564033

Recall(3.0) = 0.946608040201005

Recall(4.0) = 0.9678742310321258

Recall(5.0) = 0.9485458612975392

Recall(6.0) = 0.9851150202976996

Recall(7.0) = 0.9722735674676525

Recall(8.0) = 0.9614855570839065

Recall(9.0) = 0.9454545454545454


scala> // 레이블 당 거짓 긍정 비율


scala> labels.foreach { l =>

     |  println(s"FPR($l) = " + metrics.falsePositiveRate(l))

     | }

FPR(0.0) = 0.0027086383601756954

FPR(1.0) = 0.001266294227188082

FPR(2.0) = 0.003646175162254795

FPR(3.0) = 0.004194569136801825

FPR(4.0) = 0.0029158769499927103

FPR(5.0) = 0.0033959537572254336

FPR(6.0) = 0.0025541852149164415

FPR(7.0) = 0.003909131140286178

FPR(8.0) = 0.006046477744590952

FPR(9.0) = 0.005330023364485981


scala> // 레이블 당 F-측정 값


scala> labels.foreach { l =>

     |  println(s"F1-Score($l) = " + metrics.fMeasure(l))

     | }

F1-Score(0.0) = 0.9803664921465969

F1-Score(1.0) = 0.9894556853804503

F1-Score(2.0) = 0.9680489462950375

F1-Score(3.0) = 0.9550063371356146

F1-Score(4.0) = 0.9701952723535457

F1-Score(5.0) = 0.956390977443609

F1-Score(6.0) = 0.9808016167059616

F1-Score(7.0) = 0.9698832206515058

F1-Score(8.0) = 0.952640545144804

F1-Score(9.0) = 0.9480081026333558


scala>






이제 전체 통계를 다음처럼 계산하자.


println(s"Weighted precision: ${metrics.weightedPrecision}")

println(s"Weighted recall: ${metrics.weightedRecall}")

println(s"Weighted F1 score: ${metrics.weightedFMeasure}")

println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")

val testErr = scoreAndLabels.filter(r => r._1 != r._2).count.toDouble / test.count()

println("Accuracy = " + (1-testErr) * 100 + " %")




이전 코드는 가중치 정밀도, 회수율, F1 점수, 거짓 긍정 비율을 포함하는 다음 출력을 출력한다.


Overall statistics

----------------------------

Weighted precision: 0.9676041167963592

Weighted recall: 0.9675910677820959

Weighted F1 score: 0.9675700010426889

Weighted false positive rate: 0.03240893221790396

Accuracy = 96.7591067782096 %




전체 통계에 따르면 모형의 정확도는 96%이상 로지스틱 회귀 분석보다 우수하다. 


그러나 모델을 잘 튜닝하면 더욱 개선될 수 있다.


Posted by '김용환'
,



spark-shell을 이용한 "로지스틱 회귀 분석을 이용한 멀티 클래스 분류" 예이다. 


다중 클래스 분류 문제를 트레이닝하고 예측하기 위해 이진 로지스틱 회귀를 다항 로지스틱 회귀로 일반화할 수 있다. 



예를 들어 K개의 가능한 결과에 대해 결과 중 하나를 피벗으로 선택하고 다른 K-1개의 결과는 피벗 결과에 대해 개별적으로 회귀될 수 있다. 


spark.mllib에서 첫 번째 클래스 0은 피벗(pivot) 클래스로 선택된다.


다중 클래스 분류 문제의 경우 알고리즘은 첫 번째 클래스에 대해 회귀된 k-1 이진 로지스틱 회귀 모델을 포함하는 다항 로지스틱 회귀 모델을 출력한다.


 새로운 데이터 포인트가 주어지면 k-1 모델은 실행되고 가장 큰 확률을 가진 클래스가 예측 클래스로 선택된다. 


이 섹션에서는 더 빠른 수렴을 위해 L-BFGS를 사용하는 로지스틱 회귀 분석을 사용해 분류하는 예를 보여준다.





LIVSVM 포맷으로 https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html에서 

MNIST 데이터셋(https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2)을 로드하고 파싱한다.




import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")



다음처럼 데이터를 트레이닝 셋(75%)과 테스트 셋(25%)으로 나눈다.



val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)

val training = splits(0).cache()

val test = splits(1)




트레이닝 알고리즘을 실행해 다중 클래스(이 데이터 셋의 경우는 10개이다)를 설정하여 모델을 구축한다. 



분류 정확도를 높이려면 다음처럼 Boolean true 값을 사용해 데이터셋에 인터셉트를 추가(setIntercept)한 후 유효성을 검사(setValidateData)해야 한다.


import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

val model = new LogisticRegressionWithLBFGS()

          .setNumClasses(10)

          .setIntercept(true)

          .setValidateData(true)

          .run(training)





알고리즘이 setIntercept를 사용하여 인터셉트를 추가해야하는 경우 인터셉트를 true로 설정한다. 



모델 구축 전에 알고리즘에 트레이닝 셋으로 유효성을 검사하려면 setValidateData 함수를 사용하여 값을 true로 설정해야한다.




다음처럼 기본 임계값을 지워 기본 설정으로 트레이닝하지 않는다.


model.clearThreshold()





테스트 셋의 원본 점수를 계산해서 이전에 언급한 성능 메트릭를 사용해 다음처럼 모델을 평가할 수 있다.



val scoreAndLabels = test.map { point =>

 val score = model.predict(point.features)

 (score, point.label)

}



평가를 할 수 있도록 여러 개의 메트릭을 초기화한다.



import org.apache.spark.mllib.evaluation.MulticlassMetrics

val metrics = new MulticlassMetrics(scoreAndLabels)




혼동 행렬를 구축한다.



println(metrics.confusionMatrix)


1466.0  1.0     4.0     2.0     3.0     11.0    18.0    1.0     11.0    4.0

0.0     1709.0  11.0    3.0     2.0     6.0     1.0     5.0     15.0    4.0

10.0    17.0    1316.0  24.0    22.0    8.0     20.0    17.0    26.0    8.0

3.0     9.0     38.0    1423.0  1.0     52.0    9.0     11.0    31.0    15.0

3.0     4.0     23.0    1.0     1363.0  4.0     10.0    7.0     5.0     43.0

19.0    7.0     11.0    50.0    12.0    1170.0  23.0    6.0     32.0    11.0

6.0     2.0     15.0    3.0     10.0    19.0    1411.0  2.0     8.0     2.0

4.0     7.0     10.0    7.0     14.0    4.0     2.0     1519.0  8.0     48.0

9.0     22.0    26.0    43.0    11.0    46.0    16.0    5.0     1268.0  8.0

6.0     3.0     5.0     23.0    39.0    8.0     0.0     60.0    14.0    1327.0






혼동 행렬에서 행렬의 각 컬럼은 예측 클래스의 인스턴스를 나타내는 반면, 


각 라인은 실제 클래스의 인스턴스를 나타낸다(또는 그 반대). 


이름은 시스템이 2개의 클래스를 혼동하고 있는지 쉽게 알 수 있게 한다는 사실에서 유래한다. 


자세한 내용은 혼동 행렬(https://en.wikipedia.org/wiki/Confusion_matrix.Confusion)를 참조한다.






이제 모델의 성능을 판단하기 위해 전체 통계를 계산해보자.



val accuracy = metrics.accuracy

println("Summary Statistics")

println(s"Accuracy = $accuracy")

// 레이블 당 정확도

val labels = metrics.labels

labels.foreach { l =>

 println(s"Precision($l) = " + metrics.precision(l))

}

// 레이블 당 회수율

labels.foreach { l =>

 println(s"Recall($l) = " + metrics.recall(l))

}

// 레이블 당 거짓 긍정 비율

labels.foreach { l =>

 println(s"FPR($l) = " + metrics.falsePositiveRate(l))

}

// 레이블 당 F-측정 값

labels.foreach { l =>

 println(s"F1-Score($l) = " + metrics.fMeasure(l))

}




이전 코드 세그먼트는 정확도, 정밀도, 회수율, 참 긍정 비율, 오 탐지율 및 F1 점수와 같은 성능 메트릭을 포함하는 다음 출력을 생성한다.


Summary Statistics

----------------------

scala> // 레이블 당 정확도


scala> val labels = metrics.labels

labels: Array[Double] = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)


scala> labels.foreach { l =>

     |  println(s"Precision($l) = " + metrics.precision(l))

     | }

Precision(0.0) = 0.9606815203145478

Precision(1.0) = 0.9595732734418866

Precision(2.0) = 0.9019876627827279

Precision(3.0) = 0.9012032932235592

Precision(4.0) = 0.922816519972918

Precision(5.0) = 0.8810240963855421

Precision(6.0) = 0.9344370860927153

Precision(7.0) = 0.9301898346601347

Precision(8.0) = 0.8942172073342737

Precision(9.0) = 0.9027210884353741


scala> // 레이블 당 회수율


scala> labels.foreach { l =>

     |  println(s"Recall($l) = " + metrics.recall(l))

     | }

Recall(0.0) = 0.9638395792241946

Recall(1.0) = 0.9732346241457859

Recall(2.0) = 0.896457765667575

Recall(3.0) = 0.8938442211055276

Recall(4.0) = 0.9316473000683527

Recall(5.0) = 0.87248322147651

Recall(6.0) = 0.9546684709066305

Recall(7.0) = 0.9359211337030191

Recall(8.0) = 0.8720770288858322

Recall(9.0) = 0.8936026936026936


scala> // 레이블 당 거짓 긍정 비율


scala> labels.foreach { l =>

     |  println(s"FPR($l) = " + metrics.falsePositiveRate(l))

     | }

FPR(0.0) = 0.004392386530014641

FPR(1.0) = 0.005363128491620112

FPR(2.0) = 0.010428060964048714

FPR(3.0) = 0.011479873427036574

FPR(4.0) = 0.008310249307479225

FPR(5.0) = 0.011416184971098265

FPR(6.0) = 0.0072246953221922205

FPR(7.0) = 0.00840831981118159

FPR(8.0) = 0.010927369417935456

FPR(9.0) = 0.010441004672897197


scala> // 레이블 당 F-측정 값


scala> labels.foreach { l =>

     |  println(s"F1-Score($l) = " + metrics.fMeasure(l))

     | }

F1-Score(0.0) = 0.9622579586478502

F1-Score(1.0) = 0.966355668645745

F1-Score(2.0) = 0.8992142125042706

F1-Score(3.0) = 0.8975086723431095

F1-Score(4.0) = 0.9272108843537414

F1-Score(5.0) = 0.876732858748595

F1-Score(6.0) = 0.9444444444444444

F1-Score(7.0) = 0.933046683046683

F1-Score(8.0) = 0.883008356545961

F1-Score(9.0) = 0.8981387478849409




이제 전체 통계, 즉 요약 통계를 계산하자.


println(s"Weighted precision: ${metrics.weightedPrecision}")

println(s"Weighted recall: ${metrics.weightedRecall}")

println(s"Weighted F1 score: ${metrics.weightedFMeasure}")

println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}") 




이전 코드는 가중치 정밀도, 회수율, F1 점수, 거짓 긍정 비율을 다음처럼 출력한다.


Weighted precision: 0.920104303076327

Weighted recall: 0.9203609775377117

Weighted F1 score: 0.9201934861645358

Weighted false positive rate: 0.008752250453215607




전체 통계에 따르면 모델의 정확도는 92%이상이다. 


그러나 랜덤 포레스트(RF)와 같은 더 좋은 알고리즘을 사용하면 성능이 향상된다.



Posted by '김용환'
,

spark-shell에서 RandomForest와 같은 알고리즘을 트레이닝할 때 메모리가 한참 부족해서 spakr이 crash나는 경우가 있다. 이럴 때는 메모리와 cpu를 넉넉히 설정하는 것이 좋다. 


예는 다음과 같다. 


spark-shell --driver-memory 16G --executor-memory 16G --executor-cores 8


Posted by '김용환'
,





spark-shell을 이용해 유방암 가능성을 예측하는 방법을 소개한다. 


https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.data에서 데이터를 다운로드받고 로드한 다음 파싱하는 예를 소개한다.




breast-cancer-wisconsin.data를 읽고 이를 케이스 클래스로 저장하고  파싱하는 코드를 추가한다.



import org.apache.spark.rdd.RDD


case class Cancer(cancer_class: Double, thickness: Double, size: Double, shape: Double, madh: Double, epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)


def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {

 rdd.map(_.split(",")).filter(_(6) != "?").map(_.drop(1)).map(_.map(_.toDouble))


def parseCancer(line: Array[Double]): Cancer = {

 Cancer(if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8))

}


val rdd = spark.sparkContext.textFile("data/breast-cancer-wisconsin.data")

val cancerRDD = parseRDD(rdd).map(parseCancer) 




머신러닝 파이프 라인을 위해 RDD를 데이터 프레임으로 변환한다.


import spark.sqlContext.implicits._

val cancerDF = cancerRDD.toDF().cache()

cancerDF.show() 


+------------+---------+----+-----+----+------+----+------+----+---+

|cancer_class|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|

+------------+---------+----+-----+----+------+----+------+----+---+

|         0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|

|         0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|

|         0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|

|         0.0|      6.0| 8.0|  8.0| 1.0|   3.0| 4.0|   3.0| 7.0|1.0|

|         0.0|      4.0| 1.0|  1.0| 3.0|   2.0| 1.0|   3.0| 1.0|1.0|

|         1.0|      8.0|10.0| 10.0| 8.0|   7.0|10.0|   9.0| 7.0|1.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0|10.0|   3.0| 1.0|1.0|

|         0.0|      2.0| 1.0|  2.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|5.0|

|         0.0|      4.0| 2.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|

|         1.0|      5.0| 3.0|  3.0| 3.0|   2.0| 3.0|   4.0| 4.0|1.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 3.0|   3.0| 1.0|1.0|

|         1.0|      8.0| 7.0|  5.0|10.0|   7.0| 9.0|   5.0| 5.0|4.0|

|         1.0|      7.0| 4.0|  6.0| 4.0|   6.0| 1.0|   4.0| 3.0|1.0|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|

|         1.0|     10.0| 7.0|  7.0| 6.0|   4.0|10.0|   4.0| 1.0|2.0|

|         0.0|      6.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|

+------------+---------+----+-----+----+------+----+------+----+---+

only showing top 20 rows




피쳐를 추출하고 트랜스포메이션을 적용해보자.


먼저 다음처럼 피쳐 컬럼을 선택한다.


val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit") 




이제 다음처럼 피쳐 컬럼(featureCols)을 피쳐 벡터(features)로 생성한다.


import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features") 




이제 다음처럼 피쳐 벡터를 데이터 프레임으로 변환한다.



val df2 = assembler.transform(cancerDF) 

df2.show() 


+------------+---------+----+-----+----+------+----+------+----+---+--------------------+

|cancer_class|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+

|         0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|

|         0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|

|         0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|[3.0,1.0,1.0,1.0,...|

|         0.0|      6.0| 8.0|  8.0| 1.0|   3.0| 4.0|   3.0| 7.0|1.0|[6.0,8.0,8.0,1.0,...|

|         0.0|      4.0| 1.0|  1.0| 3.0|   2.0| 1.0|   3.0| 1.0|1.0|[4.0,1.0,1.0,3.0,...|

|         1.0|      8.0|10.0| 10.0| 8.0|   7.0|10.0|   9.0| 7.0|1.0|[8.0,10.0,10.0,8....|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0|10.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|

|         0.0|      2.0| 1.0|  2.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[2.0,1.0,2.0,1.0,...|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|5.0|[2.0,1.0,1.0,1.0,...|

|         0.0|      4.0| 2.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[4.0,2.0,1.0,1.0,...|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[2.0,1.0,1.0,1.0,...|

|         1.0|      5.0| 3.0|  3.0| 3.0|   2.0| 3.0|   4.0| 4.0|1.0|[5.0,3.0,3.0,3.0,...|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 3.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|

|         1.0|      8.0| 7.0|  5.0|10.0|   7.0| 9.0|   5.0| 5.0|4.0|[8.0,7.0,5.0,10.0...|

|         1.0|      7.0| 4.0|  6.0| 4.0|   6.0| 1.0|   4.0| 3.0|1.0|[7.0,4.0,6.0,4.0,...|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[4.0,1.0,1.0,1.0,...|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[4.0,1.0,1.0,1.0,...|

|         1.0|     10.0| 7.0|  7.0| 6.0|   4.0|10.0|   4.0| 1.0|2.0|[10.0,7.0,7.0,6.0...|

|         0.0|      6.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[6.0,1.0,1.0,1.0,...|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+




이제 왼쪽 컬럼을 기반으로 계산된 피쳐가 포함된 데이터 프레임을 관찰해야 한다.




마지막으로 StringIndexer를 사용하고 다음처럼 트레이닝 데이터셋 레이블을 생성한다.


import org.apache.spark.ml.feature.StringIndexer

val labelIndexer = new StringIndexer().setInputCol("cancer_class").setOutputCol("label")

val df3 = labelIndexer.fit(df2).transform(df2)

df3.show() 


+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+

|cancer_class|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+

|         0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|  0.0|

|         0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|[3.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      6.0| 8.0|  8.0| 1.0|   3.0| 4.0|   3.0| 7.0|1.0|[6.0,8.0,8.0,1.0,...|  0.0|

|         0.0|      4.0| 1.0|  1.0| 3.0|   2.0| 1.0|   3.0| 1.0|1.0|[4.0,1.0,1.0,3.0,...|  0.0|

|         1.0|      8.0|10.0| 10.0| 8.0|   7.0|10.0|   9.0| 7.0|1.0|[8.0,10.0,10.0,8....|  1.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0|10.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      2.0| 1.0|  2.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[2.0,1.0,2.0,1.0,...|  0.0|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|5.0|[2.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      4.0| 2.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[4.0,2.0,1.0,1.0,...|  0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      2.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[2.0,1.0,1.0,1.0,...|  0.0|

|         1.0|      5.0| 3.0|  3.0| 3.0|   2.0| 3.0|   4.0| 4.0|1.0|[5.0,3.0,3.0,3.0,...|  1.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 3.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|

|         1.0|      8.0| 7.0|  5.0|10.0|   7.0| 9.0|   5.0| 5.0|4.0|[8.0,7.0,5.0,10.0...|  1.0|

|         1.0|      7.0| 4.0|  6.0| 4.0|   6.0| 1.0|   4.0| 3.0|1.0|[7.0,4.0,6.0,4.0,...|  1.0|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[4.0,1.0,1.0,1.0,...|  0.0|

|         0.0|      4.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[4.0,1.0,1.0,1.0,...|  0.0|

|         1.0|     10.0| 7.0|  7.0| 6.0|   4.0|10.0|   4.0| 1.0|2.0|[10.0,7.0,7.0,6.0...|  1.0|

|         0.0|      6.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[6.0,1.0,1.0,1.0,...|  0.0|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+




이제 왼쪽 컬럼을 기반으로 계산된 피쳐와 레이블이 포함된 데이터 프레임을 살펴보자.



머신 러닝 모델을 트레이닝하기 위한 피쳐와 레이블이 포함된 새로운 데이터 프레임을 생성할 것이다. 


먼저 테스트 셋과 트레이닝 셋(3:7)을 생성한다. (지도 학습이라..)



val splitSeed = 1234567

val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)




이제 트레이닝 셋을 사용하여 에스티메이터 생성한다. elasticNetParam을 이용한 로지스틱 회귀 분석을 사용해 파이프 라인에 대한 에스티메이터를 생성하자. 다음처럼 최대 반복과 회귀 매개 변수도 지정한다.



import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setMaxIter(50).setRegParam(0.01).setElasticNetParam(0.01)

val model = lr.fit(trainingData)  




테스트 셋으로 원본 예측, 확률, 예측을 얻기 위해 테스트 셋을 사용해 모델을 변환한다. 



val predictions = model.transform(testData)

predictions.show() 


+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+

|cancer_class|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|       rawPrediction|         probability|prediction|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.15956430979038...|[0.99428860556932...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.15956430979038...|[0.99428860556932...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.88229871718381...|[0.99247744702488...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.88229871718381...|[0.99247744702488...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[5.26929960916807...|[0.99487914377217...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.99203401656150...|[0.99325398211858...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.99203401656150...|[0.99325398211858...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 3.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.74802132478210...|[0.99140567173413...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.71476842395493...|[0.99111766179519...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.71476842395493...|[0.99111766179519...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.71476842395493...|[0.99111766179519...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.71476842395493...|[0.99111766179519...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 2.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.59276207806523...|[0.98997663106901...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 5.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.10129026316119...|[0.98371817931939...|       0.0|

|         0.0|      1.0| 1.0|  1.0| 1.0|   4.0| 3.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[4.35023434970686...|[0.98726059831436...|       0.0|

+------------+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+


이제 원본 예측과 각 로우에 대한 실제 예측이 포함된 새로운 데이터 프레임을 볼 수 있다.



다음처럼 트레이닝의 객관적인 기록 생성을 위해 각 반복마다 모델의 목표 이력을 생성하자.


val trainingSummary = model.summary

val objectiveHistory = trainingSummary.objectiveHistory

objectiveHistory.foreach(loss => println(loss))




이전 코드는 트레이닝 손실 측면에서 다음과 같은 출력을 생성한다.


   0.6562291876496595

   0.6087867761081431

   0.538972588904556

   0.4928455913405332

   0.46269258074999386

   0.3527914819973198

   0.20206901337404978

   0.16459454874996993

   0.13783437051276512

   0.11478053164710095

   0.11420433621438157

   0.11138884788059378

   0.11041889032338036

   0.10849477236373875

   0.10818880537879513

   0.10682868640074723

   0.10641395229253267

   0.10555411704574749

   0.10505186414044905

   0.10470425580130915

   0.10376219754747162

   0.10331139609033112

   0.10276173290225406

   0.10245982201904923

   0.10198833366394071

   0.10168248313103552

   0.10163242551955443

   0.10162826209311404

   0.10162119367292953

   0.10161235376791203

   0.1016114803209495

   0.10161090505556039

   0.1016107261254795

   0.10161056082112738

   0.10161050381332608

   0.10161048515341387

   0.10161043900301985

   0.10161042057436288

   0.10161040971267737

   0.10161040846923354

   0.10161040625542347

   0.10161040595207525

   0.10161040575664354

   0.10161040565870835

   0.10161040519559975

   0.10161040489834573

   0.10161040445215266

   0.1016104043469577

   0.1016104042793553

   0.1016104042606048

   0.10161040423579716 




이전 결과를 살펴보면 반복하면거 결과 값이 점차 줄어드는 것을 볼 수 있다.



이제 이진 로직 회귀 요약에서 사용한 분류자인지 확인해야 한다.


import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]




이제 ROC를 데이터 프레임으로 얻고 areaUnderROC로 가져온다. 근사값이 1.0이면 더 좋다.


val roc = binarySummary.roc

roc.show()


+---+--------------------+

|FPR|                 TPR|

+---+--------------------+

|0.0|                 0.0|

|0.0|0.017341040462427744|

|0.0| 0.03468208092485549|

|0.0| 0.05202312138728324|

|0.0| 0.06936416184971098|

|0.0| 0.08670520231213873|

|0.0| 0.10404624277456648|

|0.0| 0.12138728323699421|

|0.0| 0.13872832369942195|

|0.0| 0.15606936416184972|

|0.0| 0.17341040462427745|

|0.0|  0.1907514450867052|

|0.0| 0.20809248554913296|

|0.0|  0.2254335260115607|

|0.0| 0.24277456647398843|

|0.0| 0.26011560693641617|

|0.0|  0.2774566473988439|

|0.0|  0.2947976878612717|

|0.0| 0.31213872832369943|

|0.0| 0.32947976878612717|

+---+--------------------+





areaUnderROC의 값을 출력한다.


println("binarySummary.areaUnderROC)

0.9960056075125305




이제 참 긍정 비율, 거짓 긍정 비율, 거짓 부정 비율, 전체 개수와 같은 메트릭과 다음처럼 제대로 예측하고 잘못 예측한 경우의 수를 계산하자. 


import org.apache.spark.sql.functions._


// 성능 메트릭을 계산한다

val lp = predictions.select("label", "prediction")

val counttotal = predictions.count()

val correct = lp.filter($"label" === $"prediction").count()

val wrong = lp.filter(not($"label" === $"prediction")).count()

val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()

val falseN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()

val falseP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()

val ratioWrong = wrong.toDouble / counttotal.toDouble

val ratioCorrect = correct.toDouble / counttotal.toDouble


println("Total Count: " + counttotal)

println("Correctly Predicted: " + correct)

println("Wrongly Identified: " + wrong)

println("True Positive: " + truep)

println("False Negative: " + falseN)

println("False Positive: " + falseP)

println("ratioWrong: " + ratioWrong)

println("ratioCorrect: " + ratioCorrect) 



결과는 다음과 같다. 


Total Count: 209

Correctly Predicted: 202

Wrongly Identified: 7

True Positive: 140

False Negative: 4

False Positive: 3

ratioWrong: 0.03349282296650718

ratioCorrect: 0.9665071770334929





마지막으로 모델의 정확도를 판단하자. 그러나 먼저 fMeasure를 최대화하기 위해 모델 임계 값을 설정해야 한다.


val fMeasure = binarySummary.fMeasureByThreshold

val fm = fMeasure.col("F-Measure")

val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)

val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)

model.setThreshold(bestThreshold) 




이제 다음처럼 정확도를 계산하자.


import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")

val accuracy = evaluator.evaluate(predictions)

println("Accuracy: " + accuracy)     



정확도는 다음처럼 거의 99.64%이다. 


Accuracy: 0.9963975418520874



Posted by '김용환'
,


spark-shell을 이용한 PCA 예이다. 



스파크에서는 선형 회귀 알고리즘의 RDD 기반 구현을 제공한다.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala




SGD(stochastic gradient descent)를 사용해 정규화가 필요없는 선형 회귀 모델을 학습시킬 수 있다. 이는 다음과 같은 최소 제곱 회귀(least squares regression) 공식을 풀 수 있다.



f(가중치) = 1/n ||A 가중치-y||^2  

(평균 제곱 오차(MSE, mean squared error)이다) 





여기서 데이터 행렬은 n 개의 로우를 가지며 입력 RDD는 A의 로우 셋을 보유하고 각각은 해당 오른쪽 사이드 레이블 y를 가진다. 



데이터셋을 로드하고 RDD를 생성한다.


LIBSVM 포맷으로 MNIST 데이터셋을 로딩하기 위해 여기에 스파크 MLlib의 MLUtils라는 내장 API를 사용했다. 예에서 사용되는 mnist.bz2의 주소는 https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2이다. 




import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2") 




차원 축소를 쉽게 진행하기 위해 피쳐 개수를 계산한다. 결과는 780이 나온다.


val featureSize = data.first().features.size

println("Feature Size: " + featureSize)






이제 데이터셋에는 780개의 컬럼이 있다. 피쳐는 고차원 피쳐로 간주될 수 있다. 


이제 다음처럼 트레이닝 셋과 테스트 셋을 준비한다.


LinearRegressionwithSGD 모델을 트레이닝시킬 것이다. 먼저 피쳐의 원래 차원을 가진 일반 데이터셋을 사용하고 두 번째로 피쳐의 절반을 사용한다. 원래 데이터 셋을 이용해 트레이닝 셋과 테스트 셋을 준비한다.


75%는 트레이닝 셋, 나머지 25%는 테스트 셋이다. 



val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)

val (training, test) = (splits(0), splits(1))




이제 PCA를 통해 축소된 피쳐를 트레이닝한다.


import org.apache.spark.mllib.feature.PCA

val pca = new PCA(featureSize/2).fit(data.map(_.features))

val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))

val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) 






선형 회귀 모형을 트레이닝한다.

다음처럼 LinearRegressionWithSGD를 20번 반복하고 일반 피쳐와 축소된 피쳐를 각각 트레이닝한다.


import org.apache.spark.mllib.regression.LinearRegressionWithSGD

val numIterations = 20

val stepSize = 0.0001

val model = LinearRegressionWithSGD.train(training, numIterations)

val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)




* LinearRegressionWithSGD는 NaN을 리턴할 수 있다. 이유는 다음과 같다.


1. stepSize가 큰 경우이다. 이 경우 0.0001, 0.001, 0.01, 0.03, 0.1, 0.3, 1.0 등과 같이 작은 값을 사용해야 한다.

2. 트레이닝 데이터에 NaN이 있어서 모델을 트레이닝하기 전에 null 값을 제거해야 한다.




두 모델을 평가한다.



분류 모델을 평가하기 전에, 먼저 원래 예측에 대한 차원 축소에 미치는 영향을 살펴보기 위해 일반적인 MSE를 계산한다.  모델 정확도를 정량화하고 잠재적으로 정밀도를 높여 오버 피팅을 피하는 공식적인 방법을 원한다면 분명하다. 



그럼에도 불구하고 잔차 분석(residual analysis)을 통해 수행할 수 있다. 또한 모델 구축과 평가에 사용될 트레이닝 셋과 테스트 셋의 선택을 분석하는 것은 가치가 있다. 모델 예측과 PCA 예측 코드를 작성한다.



val valuesAndPreds = test.map { point =>

                     val score = model.predict(point.features)

                     (score, point.label)

                    }


val valuesAndPreds_pca = test_pca.map { point =>

                        val score = model_pca.predict(point.features)

                        (score, point.label)

                      }




이제 MSE를 계산하고 다음처럼 각각의 경우를 출력한다.


val MSE = valuesAndPreds.map { case (v, p) => math.pow(v - p, 2) }.mean()

val MSE_pca = valuesAndPreds_pca.map { case (v, p) => math.pow(v - p, 2) }.mean()


println("Mean Squared Error = " + MSE)

println("PCA Mean Squared Error = " + MSE_pca)




다음과 같은 결과가 나온다. 거의 동일하다.


Mean Squared Error = 4.809249884658854E238


PCA Mean Squared Error = 4.807999600599884E238



MSE는 실제로 다음 공식을 사용해 계산되었다.





다음처럼 모델 계수를 계산한다.



println("Model coefficients:"+ model.toString())

println("Model with PCA coefficients:"+ model_pca.toString())



Model coefficients: intercept = 0.0, numFeatures = 780

Model with PCA coefficients: intercept = 0.0, numFeatures = 390



Posted by '김용환'
,


spark-shell에서 PCA를 공부한다. (차원 축소)


차원 축소는 고려하는 변수의 수를 줄이는 과정이다. 원본과 잡음이 많은 피쳐에서 잠재 피쳐를 추출하거나 구조를 유지하면서 데이터를 압축하는 데 사용할 수 있다. 스파크 MLlib는 RowMatrix 클래스의 차원 축소를 지원한다. 데이터의 차원을 줄이기 위해 가장 일반적으로 사용되는 알고리즘은 PCA와 SVD이다. 


PCA를 살펴본다.



PCA는 가능한 상관 변수의 관찰 셋을 주성분(Principal component)라고 부르는 선형 무상관 변수 집합으로 변환하기 위해 직교(orthogonal) 트랜스포메이션을 사용하는 통계적 과정이다.



PCA 알고리즘은 PCA를 사용해 저차원 공간에 벡터를 투영하는 데 사용할 수 있다. 



그런 다음, 감소 된 피쳐 벡터에 기초해 ML 모델을 트레이닝할 수 있다. 


다음 예는 6차원 피쳐 벡터를 4 차원 주성분에 투영하는 방법을 보여준다. 다음과 같은 피쳐 벡터가 있다고 가정한다.


scala>
import org.apache.spark.ml.linalg.Vectors


val data = Array(

Vectors.dense(3.5, 2.0, 5.0, 6.3, 5.60, 2.4),

Vectors.dense(4.40, 0.10, 3.0, 9.0, 7.0, 8.75),

Vectors.dense(3.20, 2.40, 0.0, 6.0, 7.4, 3.34))





* 여기서 주의할 점은 Vectors의 패키지를 잘 확인해야 한다.


import org.apache.spark.mllib.linalg.Vectors이 아니라 import org.apache.spark.ml.linalg.Vectors을 사용해야 한다.




이제 데이터 프레임을 생성하자.


scala>


val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

df.show(false)


+--------------------------+

|features                  |

+--------------------------+

|[3.5,2.0,5.0,6.3,5.6,2.4] |

|[4.4,0.1,3.0,9.0,7.0,8.75]|

|[3.2,2.4,0.0,6.0,7.4,3.34]|

+--------------------------+




이제 PCA에 대한 6차원 피쳐 벡터를 갖는 피쳐 데이터 프레임을 생성한다.


이제 다음과 같은 매개 변수를 설정하여 PCA 모델을 초기화한다.



import org.apache.spark.ml.feature.PCA


val pca = new PCA()

.setInputCol("features")

.setOutputCol("pcaFeatures")

.setK(4)

.fit(df)






차이를 주기 위해 setOutputCol 메소드를 사용하여 출력 컬럼을 pcaFeatures로 설정했다. 

그리고 PCA의 차원을 설정했다. 

마지막으로 트랜스포메이션을 생성하기 위해 데이터 프레임을 피팅했다. 



PCA 모델에는 explain variance(분산도)이 포함된다. 



val result = pca.transform(df).select("pcaFeatures")

result.show(false)


+-------------------------------------------------------------------------------+

|pcaFeatures                                                                    |

+-------------------------------------------------------------------------------+

|[-5.149253129088708,3.2157431427730376,-5.390271710168745,-1.0528214606325355] |

|[-12.372614091904456,0.8041966678176822,-5.390271710168748,-1.0528214606325337]|

|[-5.649682494292663,-2.1891778048858255,-5.390271710168744,-1.0528214606325337]|

+-------------------------------------------------------------------------------+



이전 코드는 PCA를 사용해 주성분으로서 4차원의 피쳐 벡터를 갖는 피쳐 데이터 프레임을 생성한다. 



Posted by '김용환'
,

spark-shell을 이용해 Spark의 StringIndexer 예이다.




StringIndexer는 레이블의 문자열 컬럼을 레이블 인덱스의 컬럼으로 인코딩한다. 


인덱스는 [0, numLabels]안에 있고 레이블 빈도에 따라 정렬되므로 


가장 빈번한 레이블은 0번째 인덱스를 갖는다. 



입력 컬럼이 숫자라면 숫자를 문자열로 변경하고 문자열 값을 인덱싱한다. 


에스티메이터( 또는 트랜스포머와 같은 다운 스트림 파이프 라인 컴포넌트가 문자열로 구성된 인덱스 레이블을 사용하는 경우 컴포넌트의 입력 컬럼을 문자열로 구성된 인덱스 컬럼 이름으로 설정해야 한다. 



대부분의 경우 setInputCol을 사용해 입력 컬럼을 설정할 수 있다. 다음처럼 포맷의 여러 범주 데이터가 있다고 가정하자.



이제 가장 빈번한 이름(이 경우 Jason)이 0번째 인덱스을 갖도록 name 컬럼을 인덱싱하고 싶다고 가정하자. 간단한 데이터 프레임을 생성한다.


scala>


val df = spark.createDataFrame(

Seq((0, "Jason", "Germany"),

(1, "David", "France"),

(2, "Martin", "Spain"),

(3, "Jason", "USA"),

(4, "Daiel", "UK"),

(5, "Moahmed", "Bangladesh"),

(6, "David", "Ireland"),

(7, "Jason", "Netherlands"))).toDF("id", "name", "address")



이제 다음과 같이 name 컬럼을 인덱싱하자.


import org.apache.spark.ml.feature.StringIndexer


val indexer = new StringIndexer()

.setInputCol("name")

.setOutputCol("label")

.fit(df)





이제 다음과 같이 StringIndexer 인스턴스에 트랜스포머를 사용한다.


val indexed = indexer.transform(df)




이제 올바로 동작하는지 살펴보자.


scala>

indexed.show(false)


+---+-------+-----------+-----+

|id |name   |address    |label|

+---+-------+-----------+-----+

|0  |Jason  |Germany    |0.0  |

|1  |David  |France     |1.0  |

|2  |Martin |Spain      |3.0  |

|3  |Jason  |USA        |0.0  |

|4  |Daiel  |UK         |4.0  |

|5  |Moahmed|Bangladesh |2.0  |

|6  |David  |Ireland    |1.0  |

|7  |Jason  |Netherlands|0.0  |

+---+-------+-----------+-----+


그림 13 : StringIndexer를 사용한 레이블 생성



결과를 보면 name을 기반으로 label의 값이 같다.




원-핫(one-hot) 인코딩은 레이블 인덱스 컬럼을 이진 벡터 컬럼으로 매핑하며 최대 값은 단일 값이다. 원-핫 인코딩을 사용하면 분류 피쳐를 사용하기 위해 로지스틱 회귀(Logistic Regression)와 같은 연속적인 피쳐를 기대하는 알고리즘을 사용할 수 있다



val df = spark.createDataFrame(

Seq((0, Array("Jason", "David")),

(1, Array("David", "Martin")),

(2, Array("Martin", "Jason")),

(3, Array("Jason", "Daiel")),

(4, Array("Daiel", "Martin")),

(5, Array("Moahmed", "Jason")),

(6, Array("David", "David")),

(7, Array("Jason", "Martin")))).toDF("id", "name")

df.show(false)


이제 데이터셋에서 가장 빈번한 이름(이 경우는 Jason이다)이 0번째 인덱스를 갖도록 name 컬럼을 인덱싱할 것이다. 


그러나 name 컬럼을 인덱스를 사용하는 것은 무엇일까? 



즉 name 컬럼을 추가로 벡터화할 수 있다면 데이터 프레임을 모든 머신 러닝 모델로 쉽게 제공할 수 있다.



StringIndexer로 피팅한 것을 기반으로 데이터 프레임 예를 변환한다. 


import org.apache.spark.ml.feature.StringIndexer

import org.apache.spark.ml.feature.OneHotEncoder


val indexer = new StringIndexer()

                 .setInputCol("name")

                 .setOutputCol("categoryIndex")

                 .fit(df)


val indexed = indexer.transform(df)




결과는 다음과 같다. 


scala> 


indexed.show(false)

+---+-------+-----------+-------------+

|id |name   |address    |categoryIndex|

+---+-------+-----------+-------------+

|0  |Jason  |Germany    |0.0          |

|1  |David  |France         |1.0          |

|2  |Martin |Spain          |3.0          |

|3  |Jason  |USA            |0.0          |

|4  |Daiel  |UK                |4.0          |

|5  |Moahmed|Bangladesh |2.0          |

|6  |David  |Ireland          |1.0          |

|7  |Jason  |Netherlands |0.0          |

+---+-------+-----------+-------------+





OneHotEncoder 인스턴스를 생성한다.


import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()

                 .setInputCol("categoryIndex")

                 .setOutputCol("categoryVec")





이제 트랜스포머를 사용해 벡터로 변환한다.


scala>

val encoded = encoder.transform(indexed)

encoded.show()


+---+-------+-----------+-------------+-------------+

| id|   name|    address|categoryIndex|  categoryVec|

+---+-------+-----------+-------------+-------------+

|  0|  Jason|    Germany|          0.0|(4,[0],[1.0])|

|  1|  David|     France|              1.0|(4,[1],[1.0])|

|  2| Martin|      Spain|              3.0|(4,[3],[1.0])|

|  3|  Jason|        USA|              0.0|(4,[0],[1.0])|

|  4|  Daiel|         UK|                4.0|(4,[],[])|

|  5|Moahmed| Bangladesh|     2.0|(4,[2],[1.0])|

|  6|  David|    Ireland|              1.0|(4,[1],[1.0])|

|  7|  Jason|Netherlands|          0.0|(4,[0],[1.0])|

+---+-------+-----------+-------------+-------------+




이제 결과 데이터 프레임에 피쳐 벡터가 포함된 새로운 컬럼이 추가된 것을 볼 수 있다.

Posted by '김용환'
,


스파크에서 제공하는 spark-shell을 이용한  Tokenizer 예이다.



 데이터 프레임을 생성한다.



scala>

val sentence = spark.createDataFrame(Seq(

(0, "Tokenization,is the process of enchanting words,from the raw text"),

(1, " If you want,to have more advance tokenization,RegexTokenizer,is a good option"),

(2, " Here,will provide a sample example on how to tockenize sentences"),

(3, "This way,you can find all matching occurrences"))).toDF("id","sentence")



이제 다음과 같이 Tokenizer API를 인스턴스화해서 토큰나이저를 생성한다.


import org.apache.spark.ml.feature.Tokenizer


val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") 





이제 다음과 같이 UDF를 사용해 각 문장의 토큰 개수를 계산한다.




import org.apache.spark.sql.functions._


val countTokens = udf { (words: Seq[String]) => words.length } 




이제 각 문장에서 토큰을 얻는다.



val tokenized = tokenizer.transform(sentence) 







tokenized.show(false) 



+---+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+

|id |sentence                                                                      |words                                                                                     |

+---+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+

|0  |Tokenization,is the process of enchanting words,from the raw text             |[tokenization,is, the, process, of, enchanting, words,from, the, raw, text]               |

|1  | If you want,to have more advance tokenization,RegexTokenizer,is a good option|[, if, you, want,to, have, more, advance, tokenization,regextokenizer,is, a, good, option]|

|2  | Here,will provide a sample example on how to tockenize sentences             |[, here,will, provide, a, sample, example, on, how, to, tockenize, sentences]             |

|3  |This way,you can find all matching occurrences                                |[this, way,you, can, find, all, matching, occurrences]                                    |

+---+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+




마지막으로 각 토큰을 다음과 같이 각 원본 문장에 대해 표시한다. tokens 개수가 있다. 





scala> 

tokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)
+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+------+
|sentence                                                                      |words                                                                                     |tokens|
+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+------+
|Tokenization,is the process of enchanting words,from the raw text             |[tokenization,is, the, process, of, enchanting, words,from, the, raw, text]               |9     |
| If you want,to have more advance tokenization,RegexTokenizer,is a good option|[, if, you, want,to, have, more, advance, tokenization,regextokenizer,is, a, good, option]|11    |
| Here,will provide a sample example on how to tockenize sentences             |[, here,will, provide, a, sample, example, on, how, to, tockenize, sentences]             |11    |
|This way,you can find all matching occurrences                                |[this, way,you, can, find, all, matching, occurrences]                                    |7     |
+------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+------+



원본 문장, 단어 모음, 토큰 수를 포함하는 토큰 데이터 프레임 정보를 출력한다.




그러나 RegexTokenizer API를 사용하면 더 좋은 결과를 얻을 수 있다. 다음과 같이 RegexTokenizer API를 인스턴스화하여 정규식 토큰나이저를 생성한다.




import org.apache.spark.ml.feature.RegexTokenizer



val regexTokenizer = new RegexTokenizer()

                    .setInputCol("sentence")

                    .setOutputCol("words")

                    .setPattern("\\W+")

                    .setGaps(true)




이제 다음처럼 각 문장에서 토큰을 얻는다.



val regexTokenized = regexTokenizer.transform(sentence)


regexTokenized.select("sentence", "words")

             .withColumn("tokens", countTokens(col("words")))

             .show(false)





+------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+------+

|sentence                                                                      |words                                                                                      |tokens|

+------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+------+

|Tokenization,is the process of enchanting words,from the raw text             |[tokenization, is, the, process, of, enchanting, words, from, the, raw, text]              |11    |

| If you want,to have more advance tokenization,RegexTokenizer,is a good option|[if, you, want, to, have, more, advance, tokenization, regextokenizer, is, a, good, option]|13    |

| Here,will provide a sample example on how to tockenize sentences             |[here, will, provide, a, sample, example, on, how, to, tockenize, sentences]               |11    |

|This way,you can find all matching occurrences                                |[this, way, you, can, find, all, matching, occurrences]                                    |8     |

+------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+------+






이전 코드 라인은 원본 문장, 단어 모음, 토큰 수를 포함하는 RegexTokenizer를 사용해 토큰화된 데이터 프레임 정보를 출력한다.  RegexTokenizer를 사용해 토큰화가 훨씬 좋아졌다.





이제 여기서 정지 (stop word)를 삭제해 보자.



스톱 워드(stop word)는 일반적으로 단어가 자주 나타나지만 많은 의미를 지니지 않아서 일반적으로 입력에서 제외되야 하는 단어이다. 


스파크의 StopWordsRemover는 입력으로 일련의 문자열을 받는다. 


일련의 문자열은 Tokenizer 또는 RegexTokenizer에 의해 토큰화된다. 


그리고 입력 문자열에서 모든 중지 단어를 제거한다. 중지 단어 목록은 stopWords 매개 변수에 의해 지정된다. 




StopWordsRemover API의 현재 구현은 덴마크어, 네덜란드어, 핀란드어, 프랑스어, 독일어, 헝가리어, 이탈리아어, 노르웨이어, 포르투갈어, 러시아어, 스페인어, 스웨덴어, 터키어, 영어에 대한 옵션을 제공한다. (한국어는 없다.)



예를 제공하기 위해 이전 Tokenizer 예를 이미 토큰화되어 있기 때문에 간단하게 확장할 수 있다. 그러나 이 예에서는 RegexTokenizer API를 사용한다.



먼저 StopWordsRemover API에서 다음과 같이 StopWordsRemover 인스턴스를 생성한다.


import org.apache.spark.ml.feature.StopWordsRemover


val remover = new StopWordsRemover()

            .setInputCol("words")

            .setOutputCol("filtered")




이제 다음과 같이 모든 중지 단어를 제거하고 결과를 출력하자.


scala>

val newDF = remover.transform(regexTokenized)


newDF.select("id", "filtered").show(false)


+---+-----------------------------------------------------------+

|id |filtered                                                   |

+---+-----------------------------------------------------------+

|0  |[tokenization, process, enchanting, words, raw, text]      |

|1  |[want, advance, tokenization, regextokenizer, good, option]|

|2  |[provide, sample, example, tockenize, sentences]           |

|3  |[way, find, matching, occurrences]                         |

+---+-----------------------------------------------------------+




이전 코드 라인은 필터링된 데이터 프레임에서 중지 단어를 제외하고 출력한다.








Posted by '김용환'
,



CountVectorizer와 CountVectorizerModel은 텍스트 문서를 토큰 개수의 벡터로 변환할 수 있다. 


CountVectorizer는 어휘 추출을 위한 에스터메이터(estimator)로 사용되어 CountVectorizerModel을 생성한다. 


해당 모델은 어휘를 통해 문서에 대한 희소한 표현을 생성한 다음 LDA와 같은 알고리즘으로 전달할 수 있다.


다음은 피쳐를 만드는 간단한 데이터 프레임을 생성하는 예이다.



scala>

val df = spark.createDataFrame(

Seq((0, Array("Jason", "David")),

(1, Array("David", "Martin")),

(2, Array("Martin", "Jason")),

(3, Array("Jason", "Daiel")),

(4, Array("Daiel", "Martin")),

(5, Array("Moahmed", "Jason")),

(6, Array("David", "David")),

(7, Array("Jason", "Martin")))).toDF("id", "name")



scala>

df.show(false)


// 결과


+---+----------------+

|id |name            |

+---+----------------+

|0  |[Jason, David]  |

|1  |[David, Martin] |

|2  |[Martin, Jason] |

|3  |[Jason, Daiel]  |

|4  |[Daiel, Martin] |

|5  |[Moahmed, Jason]|

|6  |[David, David]  |

|7  |[Jason, Martin] |

+---+----------------+





대부분의 경우 setInputCol을 사용해 입력 컬럼을 설정할 수 있다. 다음과 같이 코퍼스의 CountVectorizerModel 오브젝트를 피팅한다.



scala>


import org.apache.spark.ml.feature.CountVectorizer

import org.apache.spark.ml.feature.CountVectorizerModel


val cvModel: CountVectorizerModel = new CountVectorizer()

                          .setInputCol("name")

                          .setOutputCol("features")

                          .setVocabSize(3)

                          .setMinDF(2)

                          .fit(df)



이제 추출기를 사용해 다음과 같이 토큰 개수에 대한 벡터를 얻는다.



scala>

val feature = cvModel.transform(df)


scala> 


feature.show(false)

+---+----------------+-------------------+

|id |name            |features           |

+---+----------------+-------------------+

|0  |[Jason, David]  |(3,[0,1],[1.0,1.0])|

|1  |[David, Martin] |(3,[1,2],[1.0,1.0])|

|2  |[Martin, Jason] |(3,[0,2],[1.0,1.0])|

|3  |[Jason, Daiel]  |(3,[0],[1.0])      |

|4  |[Daiel, Martin] |(3,[2],[1.0])      |

|5  |[Moahmed, Jason]|(3,[0],[1.0])      |

|6  |[David, David]  |(3,[1],[2.0])      |

|7  |[Jason, Martin] |(3,[0,2],[1.0,1.0])|

+---+----------------+-------------------+





이름에 대한 피쳐가 생성되었다.

Posted by '김용환'
,



spark-shell에서 Failed to start database 'metastore_db' with class loader 에러가 발생하면..


Caused by: org.apache.derby.iapi.error.StandardException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6d4a82, see the next exception for details.

  at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)

  at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source)

  ... 153 more

Caused by: org.apache.derby.iapi.error.StandardException: Another instance of Derby may have already booted the database /usr/local/spark-2.1.0-bin-hadoop2.7/metastore_db.




다음 metastore lock 파일을 삭제하고 다시 spark-shell을 재기동한다.

$ rm metastore_db/dbex.lck




Posted by '김용환'
,