play 2.6 새로운 기능

scala 2018. 2. 8. 10:21



play 2.6에 엄청 많은 기능이 추가/변경되었다. playframwork는 조금 불친절하다.


https://www.playframework.com/documentation/2.6.x/Highlights26





자세한 내용은 아래 블로그에 play 2.6에 대한 설명이 잘 되어 있다. 



https://nvisium.com/resources/blog/2017/10/04/play-2-6-security-analysis.html

Posted by '김용환'
,


자바 개발자가 https://www.playframework.com/documentation/2.6.x/ScalaHttpFilters를 보면서 스칼라 Play 애플리케이션을 만들 때 조금 헤맬 수 있다. 


아래와 같은 코드가 있다고 가정하자. 특별히 소스를 분석하지 않아도 적당히 문서를 읽으면서 알수도 있지만,,

head first로 개념을 이해할 수 있다. 





Filters

package filters

import javax.inject.Inject

import play.api.http.{DefaultHttpFilters, EnabledFilters}
import play.filters.gzip.GzipFilter

class Filters @Inject() (
defaultFilters: EnabledFilters,
gzip: GzipFilter,
logging: LoggingFilter
) extends DefaultHttpFilters (defaultFilters.filters :+ gzip :+ logging: _*)



LoggingFilter

package filters

import javax.inject.Inject

import akka.stream.Materializer
import play.api.Logger
import play.api.mvc._

import scala.concurrent.{ExecutionContext, Future}

class LoggingFilter @Inject() (implicit val mat: Materializer, ec: ExecutionContext) extends Filter {

def apply(nextFilter: RequestHeader => Future[Result])
(requestHeader: RequestHeader): Future[Result] = {

val startTime = System.currentTimeMillis

nextFilter(requestHeader).map { result =>

val endTime = System.currentTimeMillis
val requestTime = endTime - startTime

val log = s"${requestHeader.method} ${requestHeader.uri} took ${requestTime}ms and returned ${result.header.status}"
if (requestTime > 3000) {
Logger.warn(log)
} else {
Logger.debug(log)
}

result
}
}
}




또는 아래와 같이 사용한다.


play.filters.enabled += filters.LoggingFilter






만약 play.filters.enabled를 사용하지 않으면.. 아래와 같이 써야 한다. 



play.http.filters = filters.Filters









아래와 같이 사용하면  다음 에러가 발생한다. 


play.http.filters += filters.LoggingFilter


Configuration error: Configuration error[reference.conf @ jar:file:/Users/samuel.kim/.ivy2/cache/com.typesafe.play/play_2.12/jars/play_2.12-2.6.6.jar!/reference.conf: 69: Cannot concatenate object or list with a non-object-or-list, ConfigNull(null) and SimpleConfigList(["filters.Filters"]) are not compatible]







아래와 같이 사용하면 에러가 발생한다.


play.filters.enabled += filters.Filters



play.api.UnexpectedException: Unexpected exception[ProvisionException: Unable to provision, see the following errors:

1) Found a circular dependency involving play.api.http.EnabledFilters, and circular dependencies are disabled.
  at play.utils.Reflect$.bindingsFromConfiguration(Reflect.scala:58):
Binding(class play.api.http.EnabledFilters to self) (via modules: com.google.inject.util.Modules$OverrideModule -> play.api.inject.guice.GuiceableModuleConversions$$anon$1)
  while locating play.api.http.EnabledFilters
    for the 1st parameter of filters.Filters.<init>(Filters.scala:12)
  while locating filters.Filters
  at play.api.http.EnabledFilters.<init>(HttpFilters.scala:68)
  at play.utils.Reflect$.bindingsFromConfiguration(Reflect.scala:58):
Binding(class play.api.http.EnabledFilters to self) (via modules: com.google.inject.util.Modules$OverrideModule -> play.api.inject.guice.GuiceableModuleConversions$$anon$1)
  while locating play.api.http.EnabledFilters
  while locating play.api.http.HttpFilters
    for the 4th parameter of play.api.http.JavaCompatibleHttpRequestHandler.<init>(HttpRequestHandler.scala:222)
  while locating play.api.http.JavaCompatibleHttpRequestHandler
  while locating play.api.http.HttpRequestHandler
    for the 6th parameter of play.api.DefaultApplication.<init>(Application.scala:236)
  at play.api.DefaultApplication.class(Application.scala:235)
  while locating play.api.DefaultApplication
  while locating play.api.Application




클래스를 보면 Filters의 첫 번째 매개변수인 EnabledFilters의 내부를 보면.. 필터를 구성하는 개념이다. 


  private val enabledKey = "play.filters.enabled"


  private val disabledKey = "play.filters.disabled"





즉 Filters는 상위 개념인데, 처음에는 None으로 지정되어 있다.  따러서 ConfigNull이 이미 들어가 있다. 따라서 Null에 List를 추가하면 당연히 에러가 발생할 것이다.  play.http.filters는 애플리케이션에서 지정하는 filter 목록을 정의하는 것이라 할 수 있겠다. 


play.http.filters += filters.Filters
=>  (애플리케이션 정의 Filter)


play
.http.filters = filters.MyFilters



다시 이전 Filters 클래스를 살펴보면, 필터가 3개가 추가된다. EnableFilter(enable/disable 할 수 있는 filter 리스트), Gzip, LoggingFilter 매개 변수로 추가되어 Injection 되었다.

부모 클래스인 DefaultHttpFilters에서 사용하는 형태로 되어 있다. 그래서 play.filters.enable/play.filters.disable 리트스 모두와 gizp, logging을 몽땅 리스트로 묶도록 되어 있다. 이게 사용자 정의 Filter인 셈이다.

extends DefaultHttpFilters(defaultFilters.filters :+ gzip :+ logging: _*)

자바 개발자라면 황당할 수 있을 것 같다.

Inject와 extends를 이용한 간단 코드이지만.. ㄷ ㄷ ㄷ 





DefaultHttpFilters는 여러 개의 EseentailFilter를 받는다. 
class DefaultHttpFilters @Inject() (val filters: EssentialFilter*)


Filter에서는 아래 filters는 그냥 Seq인데.
defaultFilters.filters :+ gzip :+ logging)

 이를 : _*) 를 추가하면  EssentailFilter* 타입이 된다.



자바에서는 varargs인데.


스칼라에서는 : _*으로 사용하면 컴파일러에게 seq/array를 varargs로 변환하라는 신호이다.



scala> def foo(args: Int*) = args.map{_ + 1}

foo: (args: Int*)Seq[Int]


scala> foo(-1, 0, 1)

res0: Seq[Int] = ArrayBuffer(0, 1, 2)



Posted by '김용환'
,


https://www.playframework.com/documentation/2.6.x/ScalaWS를 살펴보면 WSClient를 

Injection해서 사용하고 있다.


WSClient를 Injection없이 바로 써서 테스트하고 싶다면 다음 예제를 활용한다. 



import play.api.libs.ws.ahc.AhcWSClient
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val ws = AhcWSClient()

val result = ws.url("https://www.google.com")
.withRequestTimeout(10000.millis)
.get()
.map{
resp => resp.body
}(system.dispatcher)

val response = Await.result(result, atMost = 5.second)

println(response)


Posted by '김용환'
,



외부 클래스에서 trait의 내부 필드에 접근할 때는 java interface처럼 접근할 수 없다. 

즉 public처럼 사용할 수 없다.


trait SeperatorTrait {

  val SEP: String = "____"

}



따라서 컴패년 오브젝트를 사용하면 외부 클래스에서 사용할 수 있다.


trait SeparatorTrait {

}


object SeparatorTrait {

 val SEP: String = "____"

}



Posted by '김용환'
,


scala 프로젝트에서 사용할만한 scala- elasticsearch 라이브러리는 sksamuel이 가장 좋은 것 같다. (아직 갈길이 멀지만.)



sksamuel의 HTTPClient을 잘 사용하고 있다. 

val result = client.execute {
val req = search(indexName).
query {
bool {
//..
}

queryStatement = client.show(req)
req
}
}


중요한 것은 비동기로 되어 있어서 쭉 지나가 버린다. 


동기(synchronous) 코드로 개발하려면 Await.result 또는 Await.ready를 사용한다. 




Await.result를 사용하는 코드는 간단하다. 내부적으로 Future[Either[]] 형태로 Right/Left를 먼저 써야 한다. 


import scala.concurrent.duration._
val response = Await.result(result, atMost = 5.second)

response match {
case Right(success) => {
// ..

}
case Left(e) => {
//..
}
}



Await.ready와 Await.result는 exception 처리방식이 조금 다르다. 나는 아직 완벽한 프로그램이 아니라서 차라리 crash가 나은 듯해서 Await.result를 사용해봤다.



exception이 발생하면 Await.result는 crash exception이 발생하고, 

Await.ready는 exception을 Failure로 감싼다. 

 


Posted by '김용환'
,

scala retry 참조 코드

scala 2018. 1. 23. 11:33

scala retry 참조 코드 



https://stackoverflow.com/questions/7930814/whats-the-scala-way-to-implement-a-retry-able-call-like-this-one





import util._

object RetryUtils {

@annotation.tailrec
def retry[T](n: Int)(fn: => T): T = {
Try { fn } match {
case Success(x) => x
case _ if n > 1 => retry(n - 1)(fn)
case Failure(e) => throw e
}
}
}



예제 코드


val retryResult = retry(3) {
if (validateObject(esClient)) {
esClient
} else {
null
}
}


Posted by '김용환'
,

Spark와 Kafka 연동

scala 2018. 1. 20. 10:01



Spark와 Kafka 연동하는 방식은 다음과 같다.



- 수신기 기반 접근 방식(Receiver-based approach)

- 다이렉트 스트림 접근 방식(Direct stream approach)

- 구조화된 스트리밍(Structured streaming)



1. 수신기 기반 접근 방식


수신기 기반 방식은 스파크와 카프카와의 첫 번째 통합 방식이었다. 수신기 접근 방식에서 드라이버는 익스큐터에서 카프카 브로커의 고급 API를 사용해 데이터를 가져올 수 있는 수신자를 실행한다. 수신자가 카프카 브로커에서 이벤트를 가져 오고 있기 때문에 수신자는 주키퍼(zookeeper)에 오프셋을 저장한다. 주키퍼는 카프카 클러스터에서도 사용된다. 주요 측면은 WAL(Write Ahead Log)의 사용이다. 수신자는 카프카에서 데이터를 소비하면서 WAL에 계속 저장한다. 따라서 문제가 발생해 익스큐터 또는 수신자가 손실되거나 재시작될 때 WAL을 사용해 이벤트를 복구하고 처리할 수 ​​있다. 따라서이 로그 기반 설계는 내구성과 일관성을 모두 제공한다.


각 수신기는 카프카 토픽(topic)에서 이벤트의 입력 DStream을 생성하고 주키퍼에 카프카 토픽, 브로커, 오프셋 등을 쿼리한다. 

사용하는 API는 KafkaUtils.createStream이다.

def createStream(
 ssc: StreamingContext, // StreamingContext 오브젝트
 zkQuorum: String, //주키퍼 쿼럼(quorum) (호스트이름:포트,호스트이름:포트,..)
 groupId: String, //컨슈머의 그룹 id
 topics: Map[String, Int], // 소비할 (토픽 이름, 파티션 개수) 맵입니다. 각 파티션은 자체 스레드에서 사용된다.
 storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
 Storage level to use for storing the received objects
 (default: StorageLevel.MEMORY_AND_DISK_SER_2)
): ReceiverInputDStream[(String, String)] //(카프카 메시지 키, 카프카 메시지 값) DStream 


예제는 다음과 같다.

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)



2. 다이렉트 스트림 접근 방식

다이렉트 스트림 접근 방식(direct stream approach)은 카프카 통합과 관련한 새로운 접근 방식이며 드라이버를 사용하여 브로커에 직접 연결하고 이벤트를 가져 오는 방식으로 동작한다. 주요 내용은 다이렉트 스트림 API를 사용하는 것이므로 스파크 태스크는 카프카 토픽/파티션 대비 스파크 파티션 비율을 볼 때 1:1 비율로 동작한다는 것이다. 다이렉트 스트림 기반 접근 방식은 HDFS 또는 WAL에 대한 의존성 때문에 유연하지 않다. 또한 이제 오프셋으로 바로 접근할 수 있기 때문에 멱등성 또는 트랜잭션 방식을 사용해 정확히 한 번만 처리할 수 있다.
수신자를 사용하지 않고 카프카 브로커에서 직접 메시지를 가져오는 입력 스트림을 생성한다. 입력 스트림은 카프카에서 가져온 각 메시지가 정확히 한 번 처리하는 트랜스포메이션에 포함되도록 보장할 수 있다.

다음과 같이 KafkaUtils.createDirectStream() API를 사용하여 다이렉트 스트림을 생성할 수 있다.


def createDirectStream[
 K: ClassTag, // 카프카 메시지 키의 K 타입
 V: ClassTag, // 카프카 메시지 값의 V 타입
 KD <: Decoder[K]: ClassTag, // 카프카 메시지 키 디코더의 KD 타입
 VD <: Decoder[V]: ClassTag, // 카프카 메시지 값 디코더의 VD 타입
 R: ClassTag // 메시지 핸들러에서 리턴하는 R 타입
](
 ssc: StreamingContext, //StreamingContext 오브젝트
 KafkaParams: Map[String, String],
 /*
카프카의 설정 매개변수(http://kafka.apache.org/documentation.html#configuration)를 참조한다. 
host1:port1,host2:port2 형식으로 지정된 카프카 브로커(주키퍼 서버는 아님)과 함께 "metadata.broker.list"또는 "bootstrap.servers" 매개 변수를 설정해야 한다.
 */
 fromOffsets: Map[TopicAndPartition, Long], // 스트림의 시작점(포함)을 정의하는 토픽/파티션 별 카프카 오프셋
 messageHandler: MessageAndMetadata[K, V] => R // 각 메시지와 메타 데이터를 원한 타입으로 변환하는 함수
): InputDStream[R] // R 타입의 DStream




다이렉트 스트림 API에 대한 예는 다음과 같다.

val topicsSet = topics.split(",").toSet
val KafkaParams : Map[String, String] =
       Map("metadata.broker.list" -> brokers,
           "group.id" -> groupid )
val rawDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, KafkaParams, topicsSet)


다이렉트 스트림 API는 카프카에서만 사용할 수 있어서 일반적으로 사용할 수 있는 방식이 아니다.




3. 구조화된 스트리밍(Structured streaming)

구조화된 스트리밍(structured streaming)은 아파치 스파크 2.0 이상에서 새로 도입되었다.

구조화 스트리밍(structured streaming)은 스파크 SQL 엔진 위에 구축된 확장 가능하고 내결함성 스트림 처리 엔진이다. 이는 DStream 패러다임 및 스파크 스트리밍 API와 관련된 이슈가 아니라 스트림 처리와 계산이 배치 처리에 가깝다. 구조화된 스트리밍 엔진은 정확히 한 번 스트림 처리, 처리 결과에 대한 증분 업데이트, 집계 등과 같은 내용을 처리한다.

다음은 카프카 소스 스트림 또는 카프카 소스에서 읽는 예이다.

val ds1 = spark
.read
.format("Kafka")
.option("Kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]


val ds1 = spark
.readStream
.format("Kafka")
.option("Kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]



또한 구조화된 스트리밍 API는 스파크 스트리밍의 큰 이슈를 해결할 수 있는 방법을 제공한다. 즉 스파크 스트리밍은 들어오는 데이터를 마이크로 배치로 처리하고 수신 시간을 데이터를 분할하는 수단으로 사용하므로 실제 이벤트 시간을 고려하지 않는다. 구조화된 스트리밍을 사용하면 수신되는 데이터에서 이런 이벤트 시간을 지정하여 최신 데이터가 자동으로 처리되도록 할 수 있다.

구조화된 스트리밍의 핵심 아이디어는 실시간 데이터 스트림을 이벤트가 스트림에서 처리될 때 연속적으로 추가되는 무제한 테이블(unbounded table)로 처리하는 것이다. 그리고 일반적으로 배치 데이터를 갖고 처리하는 것처럼 무제한 테이블에서 계산과 SQL 쿼리를 실행할 수 있다. 

DStream은 시간이 지나면서 많은 데이터는 처리되어 결과를 생성한다. 따라서 무제한 입력 테이블은 결과 테이블을 생성하는 데 사용된다. 출력 또는 결과 테이블은 출력(output)이라고하는 외부 싱크(sink)에 저장될 수 있다.

스트림을 받는 예제는 다음과 같다. 

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._


val inputLines = spark.readStream

 .format("socket")

 .option("host", "localhost")

 .option("port", 9999)

 .load()


val words = inputLines.as[String].flatMap(_.split(" "))


val wordCounts = words.groupBy("value").count()


val query = wordCounts.writeStream

 .outputMode("complete")

 .format("console")


query.start()




지연을 처리하기 위해 watermark를 사용할 수 있다. 다음은 그 예이다. 




import java.sql.Timestamp

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._


val inputLines = spark.readStream

.format("socket")

.option("host", "localhost")

.option("port", 9999)

.option("includeTimestamp", true)

.load()


val words = inputLines.as[(String, Timestamp)].flatMap(line =>

line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp")


val windowedCounts = words.withWatermark("timestamp", "10 seconds")

.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"word").count().orderBy("window")


val query = windowedCounts.writeStream

.outputMode("complete")

.format("console")

.option("truncate", "false")


query.start()








Posted by '김용환'
,



배열에서 맨 앞 또는 뒤의 엘리먼트를 제거하고 싶을 때가 있다. slice를 사용하면 좋다.


배열의 마지막 엘리먼트를 제거하고 싶다면 다음과 같다. 


scala> val a = Array(1,2,3)

a: Array[Int] = Array(1, 2, 3)


scala> a.slice(0, a.size - 1)

res15: Array[Int] = Array(1, 2)


scala> a.slice(1, a.size - 1)

res17: Array[Int] = Array(2)





또는 dropRight 또는 drop을 사용할 수 있다. 




scala> a.dropRight(1)

res21: Array[Int] = Array(1, 2)


scala> a.drop(1)

res24: Array[Int] = Array(2, 3)




주의할 점은 0을 사용하면 의미 없다.


scala> a.dropRight(0)

res20: Array[Int] = Array(1, 2, 3)


scala> a.drop(0)

res23: Array[Int] = Array(1, 2, 3)



Posted by '김용환'
,

스칼라 빈 값 정의

scala 2018. 1. 11. 19:22



전역 변수는 아래와 같이 _를 사용해서 정의할 수 있다.

var i: Int = _

var s: String = _



로컬 변수는 아래와 같이 

var response: LogResponse = _



Posted by '김용환'
,



Play2-React를 개발하던 중에 아래와 같은 에러가 크롬에서 발생했다. 


Response to preflight request doesn't pass access control check: No 'Access-Control-Allow-Origin' header is present on the requested resource. Origin 'http://localhost:3000' is therefore not allowed access. The response had HTTP status code 404.




https://www.playframework.com/documentation/2.6.x/CorsFilter를 참고로 설정을 수정했다.


play.filters.enabled += "play.filters.cors.CORSFilter"


play.filters.cors {

  pathPrefixes = ["/", ...]

  allowedOrigins = ["http://localhost", ...]

  allowedHttpMethods = ["GET", "POST"]

  allowedHttpHeaders = ["Accept"]

  preflightMaxAge = 3 days

}




처음에는 404, 정책을 잘 지정하지 못하면 403이 나타난다.


allowedOrigins = ["http://localhost:9000", ...]



다음과 같이 모두 개방하면 테스트는 편해진다. 상용에서는 보안을 위해서 잘 수정하는 것이 좋다. 


(테스트용)

play.filters.hosts {

  allowed = ["."]

}


(상용)

play.filters.hosts {

    allowed = ["localhost", ".kakao.com", "localhost:9000", "local.google.com:9000", "local.google.com:4200"]

}




클라이언트(js)에서도 다음과 같이 수정한다. 



superagent

                .get('http://localhost:9000/log')

                .set('http.cors.enabled', true)

                .set('http.cors.allow-origin', "\"*\"")

                .query({...})

                .retry(2)

                .end((err, res) => {

                 }



이제 요청하면 정상적으로 동작하는 지 확인할 수 있다. 

Posted by '김용환'
,