elasticsearch 4s에서 json 쿼리 요청을 보려면 다음과 같이 사용한다.

(5.2.0이후부터 됨)



val json = search("music" / "bands") query "coldplay"

println("xxxxxxxx " + client.show(json))



결과는 다음과 같다.


xxxxxxxx {"query":{"query_string":{"query":"coldplay"}}}


Posted by '김용환'
,



elasticsearch의 connection pool을 만들고 slick이나 play2처럼 자원 객체를 열고 닫는 불편함을 줄이고 싶었다.


보통 java/spring을 사용한 경우라면 val connection = connectionPool.getObject() 한 후,

코드 작업 후에 connectionPool.returnObjejct()를 호출해서 자원을 반환하는 구조를 사용하는 것이 일반적인 패턴이다. (사실 이게 귀찮으면 interceptor(asepectJ)를 사용하기도 한다)




다음은 es pool 예제이다. apache pool과 elastic4s(https://github.com/sksamuel/elastic4s)를 사용했다. 

object ElasticsearchPoolManager {
val poolConfig: ElasticsearchPoolConfig = new ElasticsearchPoolConfig()
lazy val pool: ElasticsearchPool = new ElasticsearchPool(poolConfig, "googles")

def getObject(): ElasticsearchConnectionSource = {
pool.getResource()
}

def returnObject(esSource: ElasticsearchConnectionSource) = {
pool.returnResource(esSource)
}

def withES[A](block: ElasticsearchConnectionSource => A): A = {
val source = getObject()
try { block(source) }
finally { returnObject(source)}
}
}



withES를 사용함으로서 코드 블록을 감싸게 했고.. 자동으로 getObject와 returnObject 코드를 사용하지 않도록 했다.


@Singleton
class ElasticsearchDAO {
val SIZE = 10
val INDEX_DEFULT_TYPE = "fluentd"
import com.sksamuel.elastic4s.http.ElasticDsl._

def getData(searchData: SearchData): Unit = {
import elasticsearch.pool.KemiElasticsearchPoolManager._
withES { es =>
val client = es.client
val result = client.execute {
search(searchData.serviceTag / INDEX_DEFULT_TYPE).query {
rangeQuery("@timestamp") .gte(searchData.startDateTime).lte(searchData.endDateTime)
}.size(SIZE).sortByFieldDesc("@timestamp")
}.await

result.hits.hits.foreach { println }
}
}
}



아.. 이전보다는 깔끔하다..






Posted by '김용환'
,


scala와 jodatime 예제이다. 사실 상 자바이긴 한데..



build.sbt 파일에 의존성 라이브러리를 추가한다.

libraryDependencies ++= Seq(

  "joda-time" % "joda-time" % "2.3",

  "org.joda" % "joda-convert" % "1.6"

)




간단하게 오늘 시간 정보를 출력한다.

import org.joda.time.DateTime
import org.joda.time.format._

val dateTime = new DateTime()
val dateString = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").print(dateTime)
print(dateString)

결과는 다음과 같다.


2017-11-20 19:37:02





import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format._
val dateTime = new DateTime()
val dateString = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") .print(dateTime.withZone(DateTimeZone.UTC))
print(dateString)


출력은 다음과 같다.


2017-11-20 10:39:24





ISO8601 문자열을 테스트하는 예제이다.



import org.joda.time.{DateTime}
import org.joda.time.format._
val dt = ISODateTimeFormat.dateTimeNoMillis().print(new DateTime())
print(dt)


결과는 다음과 같다. ISO8601 스펙대로 출력한다.


2017-11-20T19:56:28+09:00





다음은 ISO8601 utc 타임이다. 

import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format._
val dt = ISODateTimeFormat.dateTimeNoMillis().print(
    new DateTime().withZone(DateTimeZone.UTC))
print(dt)


결과는 다음과 같다. ISO8601 utc 스펙대로 출력한다.


2017-11-20T10:57:34Z








Posted by '김용환'
,


scala의 REPL에서 main 클래스/함수를 실행시킬 수 있다. 


예제는 다음과 같다. 



scala> object Main {

     |   def main(args: Array[String]) {

     |     println("Hello World")

     |   }

     | }

defined object Main




scala> Main.main(Array())

Hello World



Posted by '김용환'
,


스칼라의 접근 한정자(access modifier)는 자바와 비슷하다. 그러나 하나 더 추가되는 내용이 있다. 


수식자(qualifier)를 사용해 스칼라의 접근 한정자가 확장될 수 있다. private[X] 또는 protected[X] 타입의 한정자는 각각 접근이 X까지 private 또는 protected임을 의미한다. 여기서 X는 패키지, 클래스, 싱글턴 오브젝트를 나타낸다.


다음 예를 살펴보자.



scala> :paste

// Entering paste mode (ctrl-D to finish)



package Country {

package Professional {

  class Executive {

    private[Professional] var jobTitle = "Engineer"

    private[Country] var friend = "Andrew Ng"

    protected[this] var option = "X"


    def getInfo(another : Executive) {

      println(another.jobTitle) // 동작한다

      println(another.friend) // 동작한다

      println(another.secret) //에러가 발생한다

      println(this.option) // 동작한다

    }

  }

}

}


  • jobTitle 변수는 Professional 패키지 내의 모든 클래스에서 접근할 수 있다.

  • friend 변수는 Country 패키지내의 모든 클래스에서 접근할 수 있다.

  • secret 변수는 인스턴스 메소드(this)의 암시(implicit) 오브젝트에만 접근할 수 있다.



Posted by '김용환'
,



lazy val에 대해 잘 설명된 블로그 글이다.


https://blog.codecentric.de/en/2016/02/lazy-vals-scala-look-hood/




스칼라의 LazyCell 클래스에는 lazy val이 있다. 



final class LazyCell {
  lazy val value: Int = 42
}


자바로 디컴파일 해보면 아래와 같이 변환된다고 블로그 글에 나와 있다.


final class LazyCell {
  @volatile var bitmap_0: Boolean = false                   // (1)
  var value_0: Int = _                                      // (2)
  private def value_lzycompute(): Int = {
    this.synchronized {                                     // (3)
      if (!bitmap_0) {                                      // (4)
        value_0 = 42                                        // (5)
        bitmap_0 = true
      }
    }
    value_0
  }
  def value = if (bitmap_0) value_0 else value_lzycompute() // (6)
}


scala 2.12로 컴파일하고 실제로 jad 로 디컴파일하면 다음과 같다. 거의 동일하다.


내부적으로 volatile과 synchronized를 사용한다. 즉 multiple thread에서 동기화가 보장되도록 되어 있다! 예제2에서 설명하고 있다. 



// Decompiled by Jad v1.5.8g. Copyright 2001 Pavel Kouznetsov.

// Jad home page: http://www.kpdus.com/jad.html

// Decompiler options: packimports(3)

// Source File Name:   LazyCell.scala



public final class LazyCell

{


    private int value$lzycompute()

    {

        synchronized(this)

        {

            if(!bitmap$0)

            {

                value = 42;

                bitmap$0 = true;

            }

        }

        return value;

    }


    public int value()

    {

        return bitmap$0 ? value : value$lzycompute();

    }


    public LazyCell()

    {

    }


    private int value;

    private volatile boolean bitmap$0;

}





예제 1이다. 참조 블로그의 1번째 예제를 repl에서 실행해 본다.



scala> :paste

// Entering paste mode (ctrl-D to finish)


import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent._

import scala.concurrent.duration._


def fib(n: Int): Int = n match {

  case x if x < 0 =>

    throw new IllegalArgumentException(

      "Only positive numbers allowed")

  case 0 | 1 => 1

  case _ => fib(n-2) + fib(n-1)

}


object ValStore {

  lazy val fortyFive = fib(45)                   // (1)

  lazy val fortySix  = fib(46)                   // (2)

}


object Scenario1 {

  def run = {

    val result = Future.sequence(Seq(            // (3)

      Future {

        println(ValStore.fortyFive)

        println("done (45)")

      },

      Future {

        println(ValStore.fortySix)

        println("done (46)")

      }

    ))

    Await.result(result, 1.minute)

  }

}



// Exiting paste mode, now interpreting.


import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent._

import scala.concurrent.duration._

fib: (n: Int)Int

defined object ValStore

defined object Scenario1


scala> Scenario1.run

1836311903

done (45)

-1323752223

done (46)

res4: Seq[Unit] = List((), ())




처음 실행할 때는 속도가 걸리지만, 다음 번 실행할 때는 무척 빠르다. lazy val의 특성이 있다. 


scala> Scenario1.run

-1323752223

done (46)

1836311903

done (45)

res5: Seq[Unit] = List((), ())


scala> Scenario1.run

1836311903

done (45)

-1323752223

done (46)

res6: Seq[Unit] = List((), ())






2번째 예제는 lazy val의 내부 synchronized를 이용해 deal lock을 유발시키는 코드이다. 여러 쓰레드를 사용하면서 lazy val을 잘 못 쓴다면 dead lock이 발생할 수 있다.



scala> :paste

// Entering paste mode (ctrl-D to finish)


import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent._

import scala.concurrent.duration._


object A {

  lazy val base = 42

  lazy val start = B.step

}


object B {

  lazy val step = A.base

}


object Scenario2 {

  def run = {

    val result = Future.sequence(Seq(

      Future { A.start },                        // (1)

      Future { B.step }                          // (2)

    ))

    Await.result(result, 1.minute)

  }

}





dead lock이 발생했다.

scala> Scenario2.run
java.util.concurrent.TimeoutException: Futures timed out after [1 minute]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:259)
  at scala.concurrent.Await$.$anonfun$result$1(package.scala:215)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:142)
  at Scenario2$.run(<console>:30)
  ... 29 elided






3번째 예제이다. 참조 블로그를 보면 deadlock 예제로 되어 있다. 


scala> :paste

// Entering paste mode (ctrl-D to finish)


import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent._

import scala.concurrent.duration._


trait Compute {

  def compute: Future[Int] =

    Future(this.synchronized { 21 + 21 })        // (1)

}


object Scenario3 extends Compute {

  def run: Unit = {

    lazy val someVal: Int =

      Await.result(compute, 1.minute)            // (2)

    println(someVal)

  }

}



실제로 실행해 보면 deadlock은 발생되지 않는다. 



scala> Scenario3.run

42




lazy val에 synchronized가 된다면 인스턴스는 분명 deadlock 상황에 빠져야 한다. 그러나 컴파일러가 똑똑해져서 문제가 발생하지는 않는다.



 lazy val 다음 단계는 http://docs.scala-lang.org/sips/pending/improved-lazy-val-initialization.html에 있다.

Posted by '김용환'
,


sbt를 사용하면서 hbase 연동시 만난 library 의존성 라이브러리를 만나며 부딪힌 문제를 정리했다. 



1.

"org.apache.hadoop" % "hadoop-core" % "1.2.1",


hadoop-core는 예전 버전이고, hadoop-common으로 넘어갔음. 안쓰는게 좋음.ㅠㅠ







2. 



Caused by: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation


다음 라이브러리를 읽으면 에러가 발생되지 않는다.

"org.apache.hadoop" % "hadoop-hdfs" % "2.7.1",



3. 


Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.net.NetUtils.getInputStream(Ljava/net/Socket;)Lorg/apache/hadoop/net/SocketInputWrapper;


다음 라이브러리를 읽으면 에러가 발생되지 않는다.

"org.apache.hadoop" % "hadoop-client" % "2.7.1",




4.


Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.GlobalStorageStatistics$StorageStatisticsProvider


다음 라이브러리를 읽으면 에러가 발생되지 않는다.

"org.apache.hadoop" % "hadoop-common" % "2.8.0"


'scala' 카테고리의 다른 글

스칼라의 접근 한정자  (0) 2017.11.07
lazy val의 내부(volatile/synchronized)  (0) 2017.11.06
[play2] globalsetting  (0) 2017.11.02
play2에서 apache phoenix 드라이버 사용 이슈.  (0) 2017.10.31
[play2] import play.db.Database 에러  (0) 2017.10.30
Posted by '김용환'
,

[play2] globalsetting

scala 2017. 11. 2. 12:02



play2 2.4까지 잘 사용했던 globalsetting는 deprecated되었다.


eager bindings로 변경해야 한다. 


https://www.playframework.com/documentation/2.6.x/ScalaDependencyInjection#Eager-bindings

Posted by '김용환'
,


Apache Phoenix 드라이버를 사용할 때 유의할 사항이 있다. 

play2의 db에서는 jdbc url에 매개변수(또는 configuration)을 추가할 수 없다. 즉 DriverManager.getConnection("..", props)에서 props에 관련 설정을 추가해야 한다. 



아래와 같은 방식은 사용할 수 없다. 

db {
default.driver = org.apache.phoenix.jdbc.PhoenixDriver
default.url = "jdbc:phoenix:hbase-md15b1-031.dakao.io:8765/kemi;phoenix.schema.isNamespaceMappingEnable=true;phoenix.functions.allowUserDefinedFunctions=true"
default.logSql = true
default.username = ""
default.password = ""
}


문서를 잘 보면. 덜 개발되었다..




참고 https://phoenix.apache.org


Use JDBC to get a connection to an HBase cluster like this:

Connection conn = DriverManager.getConnection("jdbc:phoenix:server1,server2:3333",props);

where props are optional properties which may include Phoenix and HBase configuration properties, and the connection string which is composed of:

jdbc:phoenix [ :<zookeeper quorum> [ :<port number> [ :<root node> [ :<principal> [ :<keytab file> ] ] ] ] ] 





아래와 같이 사용해야 한다. 


Properties props = new Properties();

props.setProperty("phoenix.schema.isNamespaceMappingEnabled", "true");

props.setProperty("phoenix.functions.allowUserDefinedFunctions", "true");

connection = DriverManager.getConnection("jdbc:phoenix:...", props);





또한 sbt 또한 문제가 있다.  sbt에서 apache phoenix 드라이버를 사용하기 위해 lib, guava 이슈를 해결해야 한다. 얼추 해결되더라도 추후 runtime 이슈가 생길 수 있다. 




사례 1

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49)

at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46)

at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)

at org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:91)

at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46)

at org.apache.phoenix.jdbc.PhoenixDriver.initializeConnectionCache(PhoenixDriver.java:151)

at org.apache.phoenix.jdbc.PhoenixDriver.<init>(PhoenixDriver.java:142)




resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"


libraryDependencies ++= Seq(
"org.apache.hbase" % "hbase" % "1.2.6",
"org.apache.hbase" % "hbase-common" % "1.2.6",
"org.apache.hbase" % "hbase-client" % "1.2.6",
"org.apache.hbase" % "hbase-server" % "1.2.6", "org.apache.phoenix" % "phoenix-core" % "4.11.0-HBase-1.2",

..)



사례 2



 org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.hbase.zookeeper.MetaTableLocator



libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "13.0.1" force(),
"org.apache.hbase" % "hbase" % "1.2.6",
"org.apache.hbase" % "hbase-common" % "1.2.6",
"org.apache.hbase" % "hbase-client" % "1.2.6",
"org.apache.hbase" % "hbase-server" % "1.2.6",
"org.apache.phoenix" % "phoenix-core" % "4.11.0-HBase-1.2")



결론은 apache phoenix를 연동하기 위해 play2-sbt를 사용하는 것은 조금 귀찮을 수 있다..


scala를 쓰려면 무거운 play2보다는 경량 프레임워크를 사용하고 내가 db 모듈을 만드는 게 나을 수도.. 

Posted by '김용환'
,


https://www.playframework.com/documentation/2.6.x/api/java/play/db/Database.html 문서에는 있는데.


play.db.Database를 import하려고 하려면 에러가 난다.


import play.db._



libraryDependencies += jdbc 를 추가하면 된다. 기본적으로 추가가 되어 있지 않다.. 

Posted by '김용환'
,