[scala] Future말고 Promise

scala 2016. 11. 27. 22:30



scala에서는 Future보다는 Promise를 많이 사용한다.


Promise를 사용해 Future를 생성하고 완료할 수 있다. Promise는 Future에 포함된 값을 명시적으로 설정할 수 있는 핸들이다.



import scala.concurrent.Promise
val p = Promise[Int]
println(p.future.value)
println(p.success(42))
println(p.future.value)

결과는 다음과 같다. 


None

Success(42)

Some(Success(42))


sucess나 fail을 호출해야 future 값을 얻는 구조이다. 





그리고, Promise에서 Future를 가지고 사용할 수도 있다. 

import scala.concurrent.Promise
val promise = Promise[String]()
val future = promise.future

println(future.value)
println(promise.success("hello").isCompleted)
println(future.value.get)


결과는 다음과 같다. Promise가 계산되어야 future 값을 얻을 수 있다. 


None

true

Success(hello)





Promise는 case class에서도 사용할 수 있다. 


case class Person(number: Int)
val person = Promise[Person]
person.success(Person(10))
println(person.future.value)






참고로 play2 프레임워크에는 비동기 Promise를 사용하는 예제가 있다. 


https://playframework.com/documentation/2.5.x/ScalaAsync



import play.api.libs.concurrent.Execution.Implicits.defaultContext


val futurePIValue: Future[Double] = computePIAsynchronously()

val futureResult: Future[Result] = futurePIValue.map { pi =>

  Ok("PI value computed: " + pi)

}







'scala' 카테고리의 다른 글

[scala] 부분 적용 함수 / 커링 / 부분 함수  (0) 2016.12.05
[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future 2  (0) 2016.11.23
[scala] Future 1  (0) 2016.11.22
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
Posted by '김용환'
,

[cassandra] read repair

cassandra 2016. 11. 23. 23:15


카산드라에는 read repair(한국어로 굳이 번역하면 읽기 보수 정도?) 라는 것이 있다.


Eventually consistency를 제공한 것은 성능을 포기할 수 없기 때문인데..


데이터의 동기화를 하지만 완벽하지 않을 수 있다. 언제든지 데이터가 이슈가 될 수 있어서 read repair로 데이터의 이슈를 보정할 수 있다. read할 때 quorum으로 하면 데이터를 안전하게 최신 데이터의 맞춰 가져올 수 있다. 



https://docs.datastax.com/en/cassandra/2.1/cassandra/operations/opsRepairNodesReadRepair.html


https://docs.datastax.com/en/cassandra/2.1/cassandra/dml/dmlClientRequestsRead.html


http://www.datastax.com/dev/blog/common-mistakes-and-misconceptions


하지만, read repair를 한다는 것은 성능의 저하를 발생시킬 수 있는 부분이 있다. 


read_repair_chance의 기본 값이 0.1일 텐데. 0으로 바꾸면 성능 저하가 없을 수 있다. 


만약 write : 1, read: quorum으로 하면 read_repiar_chance의 값을 높일 수도 있을 것이다.







CRDT관점으로 설명된 카산드라


https://aphyr.com/posts/294-jepsen-cassandra


merge하는 부분을 잘 보면 좋을 것 같다.





* 주의할 점

DateTieredCompactionStrategy에는 read repair를 포함하기 때문에 read_repair_chance를 0으로 해야 한다.. 기본 정책은 0.1이기 때문에 cassandra.yml 파일을 수정해서 재시작을 해야 한다. 따라서 테이블에 DateTieredCompactionStrategy의 사용을 주의해서 써야 한다.



The compaction strategy DateTieredCompactionStrategy precludes using read repair, because of the way timestamps are checked for DTCS compaction. In this case, you must set read_repair_chance to zero. For other compaction strategies, read repair should be enabled with a read_repair_chancevalue of 0.2 being typical.



-----


qcon london에서 crdt 내용이 나왔다. 

https://qconlondon.com/london-2016/system/files/presentation-slides/matthiaspeter.pdf

'cassandra' 카테고리의 다른 글

[cassadra] compaction 전략  (0) 2016.12.09
[cassandra] select count(*) 구하기  (0) 2016.12.07
[cassandra] cqlsh 팁  (0) 2016.11.21
[cassandra] counter 테이블 예시 및 유의 사항  (0) 2016.11.17
[cassandra] insert는 update와 같다.  (0) 2016.11.16
Posted by '김용환'
,

[scala] Future 2

scala 2016. 11. 23. 11:49


Future에 있어서 약점은 타임아웃도 Exception이라는 점이다. 


타임아웃이 발생했을 때 Exception이 발생한다. 

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

println("Start")
val f = Future {
println("1")
Thread.sleep(2000) // 실제 코드가 들어갈 장소
println("2")
}

println("End")
val result = Await.result(f, 1 second)


결과는 예외이다. 

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [1 second]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

at scala.concurrent.BlockContext$DefaultBlockContext$

....




일반 Exception과 타임아웃 Exception을 구분하기 위해 클래스를 감쌀 수 있다. 


https://github.com/PacktPublishing/Scala-High-Performance-Programming/blob/master/chapter6/src/main/scala/highperfscala/concurrency/future/SafeAwait.scala



result 함수는 Future는 Awaitable을 상속받은 클래스와 시간을 받고 Future를 감싼다. TimeoutException시는 None으로 리턴한하게 한다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

object SafeAwait {
def result[T](awaitable: Awaitable[T],
atMost: Duration): Option[T] = Try(Await.result(awaitable, atMost)) match {
case Success(t) => Some(t)
case Failure(_: TimeoutException) => None
case Failure(e) => throw e
}
}

val f1 = Future {
println("11")
5 + 10
}

println(SafeAwait.result(f1, 1 second))

val f2 = Future {
println("11")
Thread.sleep(5000)
5 + 10
}

println(SafeAwait.result(f2, 1 second))


결과는 다음과 같다. 


11

Some(15)

11

None




Future의 변환에서 Exception이 발생할 수 있는데, 이에 대한 Future에서 어떻게 대응할 수 있는 살펴본다.



문자를 숫자로 변환했으니. map 단계에서 변환할 것이다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).map(i => {
println("Multiplying")
i * 2
})

Await.result(f, 1 second)


<결과>


Exception in thread "main" java.lang.NumberFormatException: For input string: "not-an-integer"

at java.lang.NumberFormatException.forInputString






Future에는 recover라는 메소드가 있다. 이를 활용하면, 다운 스트림 에러를 보정할 수 있다. 



def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete { v => p complete (v recover pf) }
p.future
}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4





recoverWith라는 메소드도 있다. 부분 함수를 사용할 수 있으며, recover와 동일하게 처리할 수 있다. 


def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])
         (implicit executor: ExecutionContext): Future[U] = {




import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recoverWith {
case _: NumberFormatException => Future(2)
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4




Await.result 대신 onComplete를 사용할 수 있다. 



import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
}).onComplete {
case Success(i) => println(s"$i")
case Failure(e) => println(s"${e.getMessage}")
}

Thread.sleep(1000)




Future는 이미 계산된 값으로 만들 수 있다. 그리고 특정 값을 래핑한 Future로 만들 수 있다. (Future를 리턴할 때)


예)

val stringFuture: Future[String] = Future.successful("yes")



Future.successful이나 Future.failed를 이용할 수 도 있고, 더 추상화된 클래스인 Promise를 사용할 수 있다. 아래 예시 결과에서 본 것처럼 모두 계산된 (isCompleted=true) 상태이다. 

import scala.concurrent.{Future}
import scala.concurrent.Promise

val f1 = Future.successful(3)
println(f1.isCompleted)
println(f1)

val f2 = Future.failed(new IllegalArgumentException("no!"))
println(f2.isCompleted)
println(f2)


val promise = Promise[String]()
val future = promise.future
println(promise.success("hello").isCompleted)


결과


true

scala.concurrent.impl.Promise$KeptPromise@146ba0ac

true

scala.concurrent.impl.Promise$KeptPromise@4dfa3a9d

true




'scala' 카테고리의 다른 글

[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 1  (0) 2016.11.22
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
Posted by '김용환'
,

[scala] Future 1

scala 2016. 11. 22. 09:31


scala의 Future는 다음과 같이 단순하게 개발할 수 있다. 

마치 java의 Runnable과 비슷한 느낌으로 개발할 수 있다. (ExecutionContext라는 것은 빼고.)


object Main extends App {
import scala.concurrent.Future
import scala.concurrent.ExecutionContext

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")

Thread.sleep(1000) // 결과를 얻기 위해 기다리는 시간.
}


결과


Start

End

1

2



마지막에 결과를 얻기 위해 Thread.sleep을 주기가 민망한 코드이니. 고급스러운 Await를 사용한다.



import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
val f = Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")
val result = Await.result(f, 1 second) // Await로 변경.

결과는 동일하다. 



Future를 사용했다고 해서 바로 실행이 되지 않는다. 



import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

val f1 = Future{ println("hello"); 10 + 20 }
println(f1.value)
println(f1.isCompleted)


결과는 다음과 같다.


hello

None

false


None

hello

false


Future가 실행되지 않았다. 단순한 출력은 되지만, Future의 내부 값이 실행되기 위해서는 작업이 필요하다.



onComplete를 실행해야 한다. 

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onComplete {
case Success(value) => println(value)
case Failure(e) => e.printStackTrace
}
println(f.value)
println(f.isCompleted)


결과는 다음과 같다. onComplete가 실행되어야 f.value를 알 수 있게 된다. 


hello

None

false


world

Some(Success(world))

true



onComplete의 실행은 onSuccess와 onFailure의 동작과 비슷한 역할을 기대할 수 있다.


import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onSuccess {
case value => println(value)
}
f.onFailure {
case e => e.printStackTrace
}

println(f.value)
println(f.isCompleted)

결과는 동일하다.


hello

None

false


world

Some(Success(world))

true





참고로 scala EPEL은 값이 미리 계산되니.. 유의할 필요가 있다.


scala>   import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.ExecutionContext.Implicits.global


scala> import scala.concurrent.Future

import scala.concurrent.Future


scala>  val f = Future{ println("FOO"); 10 + 2}

FOO

f: scala.concurrent.Future[Int] = List()


scala>  f.value

res2: Option[scala.util.Try[Int]] = Some(Success(12))



Future의 계산이 완료되면, Future는 계산된 값에 대한 단지 래퍼이다. 다시 계산을 수행하길 원한다면, 새로운 Future 인스턴스를 생성해야 한다.




Future는 상태를 가지고 있다.


Future 내부 객체를 살펴보면, 다음과 같이 계산 결과 여부와 값을 가지고 있음을 알 수 있다. 

def isCompleted: Boolean
def value: Option[Try[T]]




Future는 변환 메소드를 사용할 수 있다. map, filter, foreach을 사용하는 예시이다.


import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

Future(1).map(_ + 11).filter(_ % 2 == 0).foreach(println)

Thread.sleep(1000)

결과는 12이다.



기존에 선언했던 context를 내부적으로 global로 import해서 간단하게 적용할 수 있다. global은 암시 객체인데, map의 시그내처에서 사용할 수 있도록 선언되어 있다. 

(컬렉션의 map과 다르다)


implicit lazy val global: ExecutionContextExecutor = 
                 impl.ExecutionContextImpl.fromExecutor(null: Executor)
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] 
         = { // transform(f, identity)
....
}



ExecutionContext는 런타임 비동기를 제공하는 Future 뒤의 시스템으로 생각될 수 있다. ExecutionContext는 트레이트로서 java의 Runnable을 실행하는 execute 메소드를 가지고 있다. 


trait ExecutionContext {

/** Runs a block of code on this execution context.
*
* @param runnable the task to execute
*/
def execute(runnable: Runnable): Unit

/** Reports that an asynchronous computation failed.
*
* @param cause the cause of the failure
*/
def reportFailure(@deprecatedName('t) cause: Throwable): Unit

/** Prepares for the execution of a task. Returns the prepared execution context.
*
* `prepare` should be called at the site where an `ExecutionContext` is received (for
* example, through an implicit method parameter). The returned execution context may
* then be used to execute tasks. The role of `prepare` is to save any context relevant
* to an execution's ''call site'', so that this context may be restored at the
* ''execution site''. (These are often different: for example, execution may be
* suspended through a `Promise`'s future until the `Promise` is completed, which may
* be done in another thread, on another stack.)
*
* Note: a valid implementation of `prepare` is one that simply returns `this`.
*
* @return the prepared execution context
*/
def prepare(): ExecutionContext = this

}




ExecutionContext 트레이트의 동반자 객체를 살펴보면 java의 ExecutorService와 Executor에서 ExecutionContext를 얻을 수 있다. 


ExecutorService를 통해 ExecutionContextExecutorService와 ExecutionContextExecutor를 생성하는 팩토리 메소드이다. 

object ExecutionContext {

/** Creates an `ExecutionContext` from the given `ExecutorService`.
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit) : ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)

/** Creates an `ExecutionContext` from the given `ExecutorService` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* If it is guaranteed that none of the executed tasks are blocking, a single-threaded `ExecutorService`
* can be used to create an `ExecutionContext` as follows:
*
* {{{
* import java.util.concurrent.Executors
* val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
* }}}
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)

/** Creates an `ExecutionContext` from the given `Executor`.
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor, reporter: Throwable => Unit) : ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)

/** Creates an `ExecutionContext` from the given `Executor` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)


impl.ExecutionContextImpl의 구현은 다음과 같다. ForkJoinTask를 구현한 내부 내부 클래스와 fromExecutor로 구성되어 있다. 

private[concurrent] object ExecutionContextImpl {

final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some ⇒ some.uncaughtException(t, anything)
}
throw anything
}
}

def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
override def shutdownNow() = asExecutorService.shutdownNow()
override def isShutdown = asExecutorService.isShutdown
override def isTerminated = asExecutorService.isTerminated
override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
}
}



리턴 타입인 ExeuctionContextExecutor와 ExecutionContextExecutorService는  java의 Excecution를 implement한다. 




/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html Executor]].
*/
trait ExecutionContextExecutor extends ExecutionContext with Executor


/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]].
*/
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService




이제 아래 코드가 눈이 들어오기 시작한다. 


implicit lazy val
global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
def global: ExecutionContextExecutor = Implicits.global





Future의 apply 함수를 보면,다음과 같다.



def apply[T](body: =>T)
              (implicit @deprecatedName('execctx) executor: ExecutionContext)
               : Future[T] = impl.Future(body)


'scala' 카테고리의 다른 글

[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 2  (0) 2016.11.23
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
[scala] List.map(function) 예시  (0) 2016.11.08
Posted by '김용환'
,

[cassandra] cqlsh 팁

cassandra 2016. 11. 21. 13:16



cassandra의 cqlsh이 로컬호스트에 접속을 못할 수 있다면, 간단하게 alias로 작업할 수 있다. 



$ cqlsh

Connection error: ('Unable to connect to any servers', {'127.0.0.1': error(111, "Tried connecting to [('127.0.0.1', 9042)]. Last error: Connection refused")})




$ which cqlsh

/usr/bin/cqlsh



$ vi ~/.bashrc 

// 추가

alias cqlsh='/usr/bin/cqlsh 1.1.1.1'



$ source ~/.bashrc



이제 잘 된다. 


$ cqlsh

Connected to Story Common Cluster at ..

Use HELP for help.

cqlsh>




Posted by '김용환'
,



zeppelin에서 공유 변수를 사용하는 방식이 있다.


디폴트(isolated)는 같은 노트에서도 변수를 공유하지 않는다. 



하나의 노트에서만 변수를 공유하려면,  다음과 같이 진행한다. 



zeppelin -> interpreter 설정 -> spark interpreter 설정에서 edit 실행 -> interpreter mode를 scope 변경 -> spark interpreter 설정에서 restart 실행



만약 전체 노트에서 공유하고 싶다면, shared mode로 변경한다. 




테스트를 다음과 같이 진행할 수 있다. 







참고로, 


- 변수 공유는 interpreter가 재시작되면 다시 노트를 실행해 결과를 캐싱하게 해야 한다.

- registerTempTable에 저장하면 모든 노트에서 사용할 수 있다. 

'scala' 카테고리의 다른 글

[scala] Future 2  (0) 2016.11.23
[scala] Future 1  (0) 2016.11.22
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
[scala] List.map(function) 예시  (0) 2016.11.08
[zeppelin] zeppelin으로 spark 연동 시 팁 (또는 주의 사항)  (0) 2016.11.07
Posted by '김용환'
,



cassandra에서 counter 테이블로 딱 사용하기 좋다. 


cassandra에서는 default 값으로 null을 사용하지만, counter는 예외다. 0이 디폴트 값이다. 



간단한 컬럼패밀리로 테스트를 진행해보면, null이 저장됨을 볼 수 있다. 



cqlsh> create table test.test(test int, x int, primary key(test));

cqlsh> insert into test.test(test) values(1);

cqlsh> select * from test.test;


 test | x

------+------

    1 | null




그러나, counter 필드는 초기값이 0이 되기 때문에 null + 1로 되어 에러가 발생하지 않는다. 




cqlsh> DROP TABLE IF EXISTS googleplus.user_followee_recent_relation


cqlsh> CREATE TABLE googleplus.user_followee_recent_relation(

   ...   followee_id int,

   ...   profile_id int,

   ...   counter_value counter,

   ...   PRIMARY KEY (followee_id, profile_id)

   ... );



cqlsh> update googleplus.user_followee_recent_relation set counter_value = counter_value + 1 where followee_id = 1 and profile_id = 1;



cqlsh> select * from googleplus.user_followee_recent_relation;


 followee_id | profile_id | counter_value

-------------+------------+---------------

           1 |          1 |             1





하지만, 제약사항이 있다.


1. insert를 사용할 수 없다.


 cqlsh> insert into googleplus.user_followee_recent_relation(followee_id, profile_id, counter_value) values (1,1,0) using ttl 2;

InvalidRequest: code=2200 [Invalid query] message="INSERT statement are not allowed on counter tables, use UPDATE instead"




2. TTL 레코드에서 적용할 수 없다. 



cqlsh> update googleplus.user_followee_recent_relation set counter_value = counter_value + 1 where followee_id = 2 and profile_id = 2 using ttl 3 ;

SyntaxException: <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:122 missing EOF at 'using' (...2 and profile_id = 2 [using] ttl...)">



cqlsh> update using ttl 10 googleplus.user_followee_recent_relation set counter_value = counter_value + 1 where followee_id = 2 and profile_id = 2;

SyntaxException: <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:25 no viable alternative at input '.' (update using ttl 10 googleplus[.]...)">




스펙에 따르면, TIMESTAMP와 TTL을 카운터 필드가 포함된 테이블에서는 쓸 수 없도록 명시되어 있다.


https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html


To load data into a counter column, or to increase or decrease the value of the counter, use the UPDATE command. Cassandra rejects USING TIMESTAMP or USING TTL in the command to update a counter column.


 


count 테이블과 TTL을 동시에 사용할 수 없는 속성이다. 따라서 다른 구조체를 고민해봐야 한다.

만약 count가 아주 작은 값으로만 쓴다면, 필드를 여러 개를 만드는 것도 괜찮을 것 같다. 


(key1, key2, counter1, counter2, counter3) 



'cassandra' 카테고리의 다른 글

[cassadra] compaction 전략  (0) 2016.12.09
[cassandra] select count(*) 구하기  (0) 2016.12.07
[cassandra] read repair  (0) 2016.11.23
[cassandra] cqlsh 팁  (0) 2016.11.21
[cassandra] insert는 update와 같다.  (0) 2016.11.16
Posted by '김용환'
,

cassandra는 insert는 update이다.



예시는 다음과 같다. 


cqlsh> insert into google.relation(id, profile_id) values (1, 1) 


 followee_id | profile_id

-------------+------------

           1 |          1


cqlsh> insert into google.relation(id, profile_id) values (1, 1) using ttl 20;






20초 뒤에는..확인하면, 다음과 같다.


cqlsh> select * from story.user_followee_recent_relation;


 followee_id | profile_id

-------------+------------









'cassandra' 카테고리의 다른 글

[cassadra] compaction 전략  (0) 2016.12.09
[cassandra] select count(*) 구하기  (0) 2016.12.07
[cassandra] read repair  (0) 2016.11.23
[cassandra] cqlsh 팁  (0) 2016.11.21
[cassandra] counter 테이블 예시 및 유의 사항  (0) 2016.11.17
Posted by '김용환'
,

[mysql] alter table after 필드

DB 2016. 11. 16. 10:48

테이블에서 새로운 필드를 추가하려면 아래와 비슷한 타입으로 쓴다. 


alter table tableName add column columnName not null


이 때, 맨 마지막 컬럼에 위치하게 되므로 원하는 모델이 되지 않을 수 있다.


순서를 특정 컬럼 다음에 위치하려면, after를 사용한다. 



alter table tableName add column columnName not null after preColumnName


뿐만 아니라 first도 사용할 수 있다.





자세한 내용은 http://dev.mysql.com/doc/refman/5.7/en/alter-table.html를 참고한다.


To add a column at a specific position within a table row, use FIRST or AFTER col_name. The default is to add the column last. You can also use FIRST and AFTER in CHANGE or MODIFY operations to reorder columns within a table.


'DB' 카테고리의 다른 글

[derby] validation query  (0) 2017.04.10
[mysql] auto increment 이슈  (0) 2016.12.19
[mysql] SELECT .. INTO OUTFILE  (0) 2016.04.16
[mysql] INSERT INTO .. VALUES ON DUPLICATE KEY UPDATE.. 응답 값  (0) 2016.03.31
[MySQL] GROUP_CONCAT  (0) 2016.02.15
Posted by '김용환'
,



sparkContext에 mapValues와 reduceByKey 예시를 설명한다.



코드로 간단히 설명하면 다음과 같다. 


val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65)))


val mapped = inputrdd.mapValues(x => 1);

mapped.collect.foreach(println)


val reduced = mapped.reduceByKey(_ + _)

reduced.collect.foreach(println)


mapValues는 map의 값을 1로 변경한다.

reduceByKey는 key의 값으로 키의 값이 동일한 개수를 얻는다 .


<결과>


inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[275] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[276] at mapValues at <console>:58
(arth,1)
(arth,1)
(samuel,1)
(jack,1)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[277] at reduceByKey at <console>:60
(arth,2)
(samuel,1)
(jack,1)



아래 코드에 대한 설명한다 .

mapValues를 활용해 튜플을 추가하면, map의 값에 tuple을 추가한다.

reduceKey를 활용해서 키의 값을 근거로 값과 개수를 튜플로 추가할 수 있다.

reduceKey를 활용해서 얻은 결과값을 map을 이용해 평균 값을 구한다.

val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65))) val mapped = inputrdd.mapValues(x => (x, 1)); mapped.collect.foreach(println)
val reduced = mapped.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) reduced.collect.foreach(println) val average = reduced.map { x => val temp = x._2 val total = temp._1 val count = temp._2 (x._1, total / count) } average.collect.foreach(println)


<결과>
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[281] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[282] at mapValues at <console>:58
(arth,(10,1))
(arth,(20,1))
(samuel,(60,1))
(jack,(65,1))
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[283] at reduceByKey at <console>:60
(arth,(30,2))
(samuel,(60,1))
(jack,(65,1))
average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[284] at map at <console>:62
(arth,15)
(samuel,60)
(jack,65)



Posted by '김용환'
,