Spark Streaming Job(Kafka-Consumer)의 성능을 높일 수 있다.


* Kafka topic의 파티션 개수 * 설정 파일의 spark.streaming.kafka.maxRatePerPartition 이다.


* 파티션 개수가 10개 * spark.streaming.kafka.maxRatePerPartition이 5 개이면 => 50 event /duration 성능이 나온다.


그러나 maxRatePerPartition이 너무 크면 OOM이 발생하거나 스트리밍 시스템(sink)에 영향을 줄 수 있다.




스트리밍이 제대로 동작되는지 확인하려면 2가지를 확인할 수 있다.


1) 52 completed batches, 653 records (동작안되면 0 completed batches 라고 뜬다)


2) Processing Time과 Total Delay 처리 Avg 시간을 확인한다. -로 나온다면 처리하는 것이 아니다. 




Posted by '김용환'
,

sbt 동작 이상

scala 2019. 10. 21. 15:21


아무리 수정해도 sbt가 잘 동작하지 않으면, sbt의 로컬 디렉토리인  ~/.sbt 을 다시 지우고 시작하자!!


Posted by '김용환'
,


sbt 에서 컴파일 속도가 나지 않는다고 계속 아래 커맨드를 사용하라고 로그가 나와서 

[warn] Getting the hostname Alvins-MacBook-Pro.local was slow (5003.850955 ms).
This is likely because the computer's hostname is not set.
You can set the hostname with the command:
  scutil --set HostName $(scutil --get LocalHostName).

아래 커맨드를 사용하니 잘 동작한다. 

$ scutil —set HostName $(scutil —get LocalHostName)




참고할 내용


실제 코드 

https://github.com/sbt/sbt/pull/3766/files


질문 & 답

https://apple.stackexchange.com/questions/175320/why-is-my-hostname-resolution-taking-so-long


\

Posted by '김용환'
,



scala circe 라이브러리를 사용하다 아래와 같은 에러를 만났다.

could not find implicit value for parameter encoder: io.circe.Encoder[Message]



대충 코드는 이렇다. Message case class를 json으로 변경하는 것이다.

object MessageType extends Enumeration {
type MessageType = Value
val DEL = Value
val INSERT = Value
}

case class Message(
val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, Object],
val createdAt: String
)


scala enumeration은 아래와 같이 처리해 주었는데.. 역시 동일한 에러이다. 

implicit val decoder: Decoder[MessageType.Value] = Decoder.enumDecoder(MessageType)
implicit val encoder: Encoder[MessageType.Value] = Encoder.enumEncoder(MessageType)



혹시나 circe는  Map의 value type을 중요하게 본다. 역시 에러다

case class Message(

val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, Any],
val createdAt: String
)



아래와 같이 value type을 String으로 변경하니 동작한다

case class Message(
val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, String],
val createdAt: String
)



경험해보니 최대한 간결한 패턴의 case class를 써야 circe가 잘 동작한다. 




Posted by '김용환'
,

[scala] jackson, ujson

scala 2019. 10. 4. 11:58


scala 에서 json을 사용할 때 jackson을 바인딩하는 것다..

import java.util.TimeZone

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.util.StdDateFormat
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

object JasksonJsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val stdDateFormat = new StdDateFormat()
stdDateFormat.setTimeZone(TimeZone.getDefault)
mapper.setDateFormat(stdDateFormat)

def toJson(value: Object): String = {
mapper.writeValueAsString(value)
}
}


ujson이 좀 나은 것 같다.  


ujson에서는 scala 계에서 유명한 lihaoyi 라이브러리(com.lihaoyi:ujson)를 활용한다.




import org.scalatest.{FunSuite, Matchers}
import java.text.SimpleDateFormat
import java.util.Date

import ujson.Value

class JasksonJsonUtilTest extends FunSuite with Matchers {
test("JacksonJsonUtil.toJson") {
case class Model(name: String, date: Date)
val actual = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ") .parse("2019-10-04T10:14:27.783+0900")
val json: String = JasksonJsonUtil.toJson(Model("expire", actual))
UJson(json).get("date").str should be("2019-10-04T10:14:27.783+0900")
}

case class UJson(jsonString: String) {
val value: Value = ujson.read(jsonString)

def get(keys: String): Value.Value = {
var x = value
keys.split("[.]").foreach { key =>
x = x(key)
}
x
}
}
}


Posted by '김용환'
,
Posted by '김용환'
,

spark / sbt 1.3.0-RC1를 사용 중이다. 


sbt test를 실행해

테스트 코드를 모두 완료하고 종료할 때 Java의 ShutdownHookManager에서 Can not access 또는 Can not load 예외가 발생할 수 있다. 

이는 jvm shutdown하면서 shutdownhook에서 정리할 내용보다 먼저 자원이 정리되면서 발생하는 문제이다. 



$ sbt test 

...

INFO: Illegal access: this web application instance has been stopped already.  Could not load org.apache.hadoop.util.ShutdownHookManager$2.  The eventual following stack trace is caused by an error thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access, and has no functional impact.

java.lang.IllegalStateException

  at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1600)

  at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1559)

  at org.apache.hadoop.util.ShutdownHookManager.getShutdownHooksInOrder(ShutdownHookManager.java:124)

  at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:52




sbt 실행시 아래와 같은 설정을 추가한다. 더 이상 해당 에러는 발생하지 않는다. 

-Dsbt.classloader.close=false


'scala' 카테고리의 다른 글

[scala] jackson, ujson  (0) 2019.10.04
[펌] [spark] spark graceful하게 종료하는 방법  (0) 2019.10.02
[scala] 문자열의 값에 해당하는 enum 타입 얻어오기  (0) 2019.09.20
[sbt] 1.3.0  (0) 2019.09.06
scala cats 공부 자료.  (1) 2019.06.18
Posted by '김용환'
,


스칼라에서 enum 타입에 맞는 문자열 값을 찾아 enum을 리턴하는 메소드가 필요하다.

이럴 때는 사용할 만한 메소드로 filter와 find가 적당하다.

filter는 List 배열을 리턴하기에 마땅치 않고 find가 적당한 것 같다.


public enum TestEnum {
A("a"),
B("b");

private String value;

private TestEnum(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}





object Test {

def main(args: Array[String]): Unit = {

val wrongEnumType : String = "xxx"

val wrong = TestEnum.values().find(e => wrongEnumType.equals(e.getValue))

if (wrong.isDefined) {
println(wrong)
} else {
println("wrong enum type")
}

}
}


Posted by '김용환'
,

[sbt] 1.3.0

scala 2019. 9. 6. 10:22



https://github.com/sbt/sbt/releases/tag/v1.3.0


sbt 1.3.0에 병렬 라이브러리 다운로드 cousier를 포함하게 되었다.

super shell도 있고..

Posted by '김용환'
,

scala cats 공부 자료.

scala 2019. 6. 18. 20:53
Posted by '김용환'
,