스파크에서 파일을 읽어 첫 번째 field의 값을 long으로 변환하는 코드가 있다고 하자.


val users = sc.textFile("users.txt").map{ line =>

 val fields = line.split(",")

 (fields(0).toLong, User(fields(1), fields(2)))

}


이전 예를 실행하고 난 후 


users.collect 호출 후 EOFException:Cannot seek after EOF 또는 java.lang.NumberFormatException: For input string: "" 에러가 발생하면 fields(0).toLong 대신 fields(0).toInt로 변경하면 에러가 발생하지 않을 수 있다. 


잘 보면. 

users.take(10) 할 때는 에러가 발생하지 않는데..

users.collect를 호출할 때 에러가 발생할 수 있다.




변환 문제 또는 원시 소스 이슈이다. 



이를 해결하기 위해서는 2가지 방법이 있다.


첫번째 원시 소스 이슈를 해결한다.  


두번째 첫 번째 방법은 사실, 대용량에서 자주 발생할 수 있어서 Try를 최대한 사용해 Option을 사용하는 것이 좋다. 


scala> def parseLong(s: String): Option[Int] = Try(s.toInt).toOption

parseLong: (s: String)Option[Int]


scala> parseLong("")

res24: Option[Int] = None





원시 파일이 문제가 없는지 살펴본다. 공백 라인이 있어도 안된다.






Posted by '김용환'
,


가상 클러스터 모드 (로컬 모드)의 아키텍처



https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html




로컬 모드를 때때로 가상 클러스터(pseudocluster) 실행 모드라고 한다. 또한 해당 모드는 분산되지 않고 단일 JVM 기반 배포 모드로서 스파크는 드라이버 프로그램, 익스큐터, LocalSchedulerBackend, 마스터와 같은 모든 실행 컴포넌트를 단일 JVM에 배포한다. 로컬 모드는 드라이버 자체가 익스큐터로 사용되는 유일한 모드이다. 다음 그림은 스파크 잡 제출과 관련된 로컬 모드의 하이 레벨 아키텍처를 보여준다.








Posted by '김용환'
,


spark job을 제출할 때 가장 중요한 요소는 메모리와 core 수이다.




관련 설정은 다음과 같은데. 아래 블로그에 잘 설명되어 있다.





http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/



이를 잘 이해하고 설명한 한글 내용은 다음 블로그에 있다.  



http://kysepark.blogspot.kr/2016/04/how-to-tune-your-apache-spark-jobs-part.html





그리고 스파크 내부 아키텍처에 관련 설명은 다음 url을 살펴본다.


https://0x0fff.com/spark-architecture/

https://0x0fff.com/spark-architecture-shuffle/






Posted by '김용환'
,


스파크 잡 관련 메모리 튜닝 정보이다.



https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html




gc 옵션과 RDD 관련 내용과 spark.storage.memoryFraction을 설명한다.



String 대신 숫자 또는 enum을 사용하는 것이 좋으며,


32GB 미만인 경우 JVM 플래그 -XX:+UseCompressedOops를 설정하여 포인터가 8바이트 대신 4바이트로 생성된다는 내용이 있다. 

Posted by '김용환'
,



spark 코드에 Log4j의 logger를 직렬화를 진행할 수 없다..



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val i = 0
val data = sc.parallelize(i to 100000)
data.foreach(i => log.info("My number"+ i))
log.warn("Finished")
}
}



아래와 같은 에러가 발생한다.


Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

at MyLog$.main(MyLog.scala:19)

at MyLog.main(MyLog.scala)

Caused by: java.io.NotSerializable






직렬화할 클래스를 만들고 extends Serializable을 추가한다. 즉, 직렬화 클래스를 하나 만들어서 내부어세 RDD를 사용하는 함수를 하나 만든다. 



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.rdd.RDD


class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def dosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("Serialization of: " + i)
(i + n).toString
}
}

object MyMapper{
def apply(n: Int): MyMapper = new MyMapper(n)
}

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.dosomething(data)
other.collect()
log.warn("Finished")
}
}

에러 없이 잘 동작한다. 

Posted by '김용환'
,


스파크 MLlib은 K-평균, 이분법 K-평균, 가우스 혼합 외에 PIC, LDA, 스트리밍 K-평균과 같은 세 개의 클러스터링 알고리즘의 구현을 제공한다. 


한 가지 분명한 것은 클러스터링 분석을 미세하게 튜닝하려면 종종 비정상 데이터(outlier 또는  anomaly)이라고 불리는 원치 않는 데이터 오브젝트를 제거해야 한다.


스파크 MLlib으로 비정상 데이터를 찾는데 공부하기 위한 좋은 자료


https://github.com/keiraqz/anomaly-detection


https://mapr.com/ebooks/spark/08-unsupervised-anomaly-detection-apache-spark.html




Posted by '김용환'
,


간단한 스파크 잡 실행하기 예제는 다음과 같다. 



# 8코어에서 독립 실행 형 모드로 애플리케이션을 실행한다

SPARK_HOME/bin/spark-submit \  

--class org.apache.spark.examples.Demo \  

--master local[8] \  

Demo-0.1-SNAPSHOT-jar-with-dependencies.jar


# YARN 클러스터에서 실행한다

export HADOOP_CONF_DIR=XXX

SPARK_HOME/bin/spark-submit \  

--class org.apache.spark.examples.Demo \  

--master yarn \  

--deploy-mode cluster \  # 클러스터 모드로 클라이언트가 될 수 있다

--executor-memory 20G \  

--num-executors 50 \  

Demo-0.1-SNAPSHOT-jar-with-dependencies.jar


# supervise 플래그를 포함해 클러스터 배포 모드의 메소스(Mesos) 클러스터에서 실행한다

SPARK_HOME/bin/spark-submit \

--class org.apache.spark.examples.Demo \

--master mesos://207.184.161.138:7077 \ # IP 주소를 사용한다

--deploy-mode cluster \  

--supervise \  

--executor-memory 20G \  

--total-executor-cores 100 \  

Demo-0.1-SNAPSHOT-jar-with-dependencies.jar



supervise는 스탠드 얼론 모드에서 0이외의 값을 리턴, 비정상적인 종료일 때는 다시 실행하라는 의미를 가진다.






예제

https://spark.apache.org/docs/2.1.1/submitting-applications.html


# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000


Posted by '김용환'
,

Play2 Frame에는 괜찮은 인증이 있지만.. 간단하게 구현한 인증 방식(basic authentication)이다.



application.conf


play.http.filters = filters.Filters



Filters.java

package filters

import javax.inject.Inject

import play.api.http.{DefaultHttpFilters, EnabledFilters}

class Filters @Inject() (
defaultFilters: EnabledFilters,
logging: LoggingFilter,
authFilter : AuthFilter,
) extends DefaultHttpFilters (defaultFilters.filters :+ logging :+ authFilter : _*)



AuthFilter.java

package filters

import javax.inject.Inject

import akka.stream.Materializer
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}

class AuthFilter @Inject() (implicit val mat: Materializer, ec: ExecutionContext) extends Filter {

def apply(nextFilter: RequestHeader => Future[Result])
(requestHeader: RequestHeader): Future[Result] = {
if (!requestHeader.path.startsWith("/login") && requestHeader.headers.get("X-Auth-User").isEmpty) {
Future(Results.Forbidden)
} else { // AuthService.authenticate(requestHeader.headers.get("X-Auth-User"))
nextFilter(requestHeader).map { result =>
result
}
}
}
}


여기에 X-Auth-User 정보를 읽고 임의의 AuthService라는 클래스를 사용해 매번 호출마다 인증을 해볼 수도 있다. 









Posted by '김용환'
,




스칼라 Play 프레임워크의 인증/권한 문서 참조자료이다. 



https://www.playframework.com/documentation/2.6.x/ScalaActionsComposition#Authentication


https://www.playframework.com/documentation/2.6.x/api/scala/index.html#play.api.mvc.Security$$AuthenticatedBuilder$


https://github.com/playframework/playframework/blob/2.6.x/framework/src/play/src/main/scala/play/api/mvc/Security.scala


http://fizzylogic.nl/2016/11/27/authorize-access-to-your-play-application-using-action-builders-and-action-functions/



Posted by '김용환'
,

[play] JWT token 예제

scala 2018. 2. 9. 12:09




https://www.playframework.com/documentation/2.6.x/SettingsSession



application.conf 설정


# jwt

play.http.session.maxAge = 60 minutes




소스


Ok("Welcome!").withSession(

  "connected" -> "user@gmail.com")





실제 jwt 토큰으로 오는 값(curl 명령어로 확인할 수 있음)


Set-Cookie: PLAY_SESSION=eyJhbGciOiJIUzI1NiJ9.eyJkYXRhIjp7ImNvbm5lY3RlZCI6InVzZXJAZ21haWwuY29tIn0sImV4cCI6MTUxODA4Mzk0OSwibmJmIjoxNTE4MDgwMzQ5LCJpYXQiOjE1MTgwODAzNDl9.ZqgXH3x6gEDwXZdFcZJ3i9lDSuO2JO8q3kZdzOUsKBU; Max-Age=3600; Expires=Thu, 08 Feb 2018 20:59:09 GMT; SameSite=Lax; Path=/; HTTPOnly






Posted by '김용환'
,