spark-shell을 이용한 PCA 예이다. 



스파크에서는 선형 회귀 알고리즘의 RDD 기반 구현을 제공한다.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala




SGD(stochastic gradient descent)를 사용해 정규화가 필요없는 선형 회귀 모델을 학습시킬 수 있다. 이는 다음과 같은 최소 제곱 회귀(least squares regression) 공식을 풀 수 있다.



f(가중치) = 1/n ||A 가중치-y||^2  

(평균 제곱 오차(MSE, mean squared error)이다) 





여기서 데이터 행렬은 n 개의 로우를 가지며 입력 RDD는 A의 로우 셋을 보유하고 각각은 해당 오른쪽 사이드 레이블 y를 가진다. 



데이터셋을 로드하고 RDD를 생성한다.


LIBSVM 포맷으로 MNIST 데이터셋을 로딩하기 위해 여기에 스파크 MLlib의 MLUtils라는 내장 API를 사용했다. 예에서 사용되는 mnist.bz2의 주소는 https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2이다. 




import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2") 




차원 축소를 쉽게 진행하기 위해 피쳐 개수를 계산한다. 결과는 780이 나온다.


val featureSize = data.first().features.size

println("Feature Size: " + featureSize)






이제 데이터셋에는 780개의 컬럼이 있다. 피쳐는 고차원 피쳐로 간주될 수 있다. 


이제 다음처럼 트레이닝 셋과 테스트 셋을 준비한다.


LinearRegressionwithSGD 모델을 트레이닝시킬 것이다. 먼저 피쳐의 원래 차원을 가진 일반 데이터셋을 사용하고 두 번째로 피쳐의 절반을 사용한다. 원래 데이터 셋을 이용해 트레이닝 셋과 테스트 셋을 준비한다.


75%는 트레이닝 셋, 나머지 25%는 테스트 셋이다. 



val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)

val (training, test) = (splits(0), splits(1))




이제 PCA를 통해 축소된 피쳐를 트레이닝한다.


import org.apache.spark.mllib.feature.PCA

val pca = new PCA(featureSize/2).fit(data.map(_.features))

val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))

val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) 






선형 회귀 모형을 트레이닝한다.

다음처럼 LinearRegressionWithSGD를 20번 반복하고 일반 피쳐와 축소된 피쳐를 각각 트레이닝한다.


import org.apache.spark.mllib.regression.LinearRegressionWithSGD

val numIterations = 20

val stepSize = 0.0001

val model = LinearRegressionWithSGD.train(training, numIterations)

val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)




* LinearRegressionWithSGD는 NaN을 리턴할 수 있다. 이유는 다음과 같다.


1. stepSize가 큰 경우이다. 이 경우 0.0001, 0.001, 0.01, 0.03, 0.1, 0.3, 1.0 등과 같이 작은 값을 사용해야 한다.

2. 트레이닝 데이터에 NaN이 있어서 모델을 트레이닝하기 전에 null 값을 제거해야 한다.




두 모델을 평가한다.



분류 모델을 평가하기 전에, 먼저 원래 예측에 대한 차원 축소에 미치는 영향을 살펴보기 위해 일반적인 MSE를 계산한다.  모델 정확도를 정량화하고 잠재적으로 정밀도를 높여 오버 피팅을 피하는 공식적인 방법을 원한다면 분명하다. 



그럼에도 불구하고 잔차 분석(residual analysis)을 통해 수행할 수 있다. 또한 모델 구축과 평가에 사용될 트레이닝 셋과 테스트 셋의 선택을 분석하는 것은 가치가 있다. 모델 예측과 PCA 예측 코드를 작성한다.



val valuesAndPreds = test.map { point =>

                     val score = model.predict(point.features)

                     (score, point.label)

                    }


val valuesAndPreds_pca = test_pca.map { point =>

                        val score = model_pca.predict(point.features)

                        (score, point.label)

                      }




이제 MSE를 계산하고 다음처럼 각각의 경우를 출력한다.


val MSE = valuesAndPreds.map { case (v, p) => math.pow(v - p, 2) }.mean()

val MSE_pca = valuesAndPreds_pca.map { case (v, p) => math.pow(v - p, 2) }.mean()


println("Mean Squared Error = " + MSE)

println("PCA Mean Squared Error = " + MSE_pca)




다음과 같은 결과가 나온다. 거의 동일하다.


Mean Squared Error = 4.809249884658854E238


PCA Mean Squared Error = 4.807999600599884E238



MSE는 실제로 다음 공식을 사용해 계산되었다.





다음처럼 모델 계수를 계산한다.



println("Model coefficients:"+ model.toString())

println("Model with PCA coefficients:"+ model_pca.toString())



Model coefficients: intercept = 0.0, numFeatures = 780

Model with PCA coefficients: intercept = 0.0, numFeatures = 390



Posted by '김용환'
,