다음과 같은 에러가 발생한다는 것은.. 실행시 SparkContext가 두 개 이상의 인스턴스가 있다는 의미이다. 



Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:





spark-shell을 실행할 때, 이미 SparkContext가 이미 생성되어 있다. 


scala> sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@26a004ed


이 때 새로운 SparkContext를 생성할 때 해당 예외가 발생한다. 



scala> val conf = new SparkConf().setAppName("aaa")

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@64d42d3d


scala> val ssc = new StreamingContext(conf, Seconds(1))

org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)



StreamingContext를 기존의 sc로 바인딩하면 에러가 발생하지 않는다.


scala> val ssc = new StreamingContext(sc, Seconds(1))

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@55ff64fd




만약 두 개의 SparkContext를 유지하려면, SparkConf에 다음 설정을 추가한다.


conf.set("spark.driver.allowMultipleContexts","true");



Posted by '김용환'
,


spark streaming submit하다가 다음과 같은 에러가 발생할 때가 있었다. 


The main method in the given main class must be static



https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L723



  val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

   if (!Modifier.isStatic(mainMethod.getModifiers)) {

      throw new IllegalStateException("The main method in the given main class must be static")

   }



분명 spark은 scala여서 static이 없는데, 왜 이게 발생한 걸까?


내가 만든 코드에는 문제가 없어 보였다. 


http://docs.scala-lang.org/ko/tutorials/scala-for-java-programmers.html



  1. object HelloWorld {
  2. def main(args: Array[String]) {
  3. println("Hello, world!")
  4. }
  5. }

똑똑한 독자들은 이미 눈치챘겠지만 위의 예제에서 main 함수는 static이 아니다. Scala에는 정적 멤버(함수든 필드든)라는 개념이 아얘 존재하지 않는다. 클래스의 일부로 정적 멤버를 정의하는 대신에 Scala 프로그래머들은 정적이기 원하는 멤버들을 싱글턴 객체안에 선언한다.




아....


결국은 class로 정의한 클래스를 spark submit하다가 발생한 문제였다. 스칼라에서는 static 이라는 reserved word가 없지만, object는 내부적으로 생성한다.그래서 scala 코드에서 확인한 것이었다. 



class 를 object로 변경하니 제대로 동작한다. 


Posted by '김용환'
,


padTo 예제이다. 컬렉션의 길이만큼 없는 데이터는 디폴트 값으로 채운다. 


List("a", "b", "c").padTo(5, "-")

res0: List[String] = List(a, b, c, -, -)




리스트의 특정 데이터의 개수를 꼭 채워야 하는 경우가 있다. 로그 저장시 데이터가 없다고 해서 그냥 두기 보다 디폴트 값같은 개념을 둔다고 생각한다.


padTo를 적용한 Log case 클래스 예제이다. 




scala> case class Log(tokens: List[String]) {

     | val z1 = tokens(0)

     | val z2 = tokens(1)

     | val z3 = tokens(2)

     | val z4 = tokens(3)

     | val z5 = tokens(4)

     | }

defined class Log



scala> val l = new Log(List("a", "b", "c").padTo(5, "-"))

l: Log = Log(List(a, b, c, -, -))


scala> l.z1

res4: String = a


scala> l.z2

res5: String = b


scala> l.z3

res6: String = c


scala> l.z4

res7: String = -


scala> l.z5

res8: String = -



Posted by '김용환'
,

[scala] mkString 예제

scala 2017. 3. 13. 19:34


필드 출력을 위해  간단히 \t 단위로 하고 싶을 때 문자열에 \t를 줄 수 있다. 


scala> s"aa\tbb\tcc"

res34: String = aa bb cc



\t를 너무 많이 입력하는 것이 귀찮을 때 mkString을 사용할 수 있다. 



scala> List("aa", "bb", "cc").mkString("\t")

res32: String = aa bb cc



Posted by '김용환'
,


scala/특히 spark에서 작업하다가 혼동되는 게 flatmap이다.


먼저 map을 살펴보면 컬렉션이 Opion 타입이라면 List(Option) 작업을 강제로 진행해야 한다. 


scala> List(Some(1),Some(2),None).map{case Some(x) => x*2  case None => 0 }

res21: List[Int] = List(2, 4, 0)




컬렉션 flatMap을 사용하기만 하면 Option을 모두 정리한다.



scala> List(Some(1),Some(2),None).flatMap(x => x)

res27: List[Int] = List(1, 2)




Posted by '김용환'
,



forall은 술어함수 p에 대해서 전체가 참이면 true를 리턴한다. 아니면 false를 리턴한다.


scala> List(1,2,3,4,5).forall(x => x>2)

res11: Boolean = false



비슷함 함수로는 exists로서 술어라면 하나라도 참이면 true를 리턴하고 아니면 false를 리턴한다.



scala> List(1,2,3,4,5).exists(x => x>2)

res12: Boolean = true




Posted by '김용환'
,



Akka에서 ActorSystem을 정의할 때, 영어 대소문자, 숫자와 -,_만 된다.(-와 _는 앞에 있으면 안된다)



Exception 나는 경우

val system = ActorSystem("MonitoringTest ")
val system = ActorSystem("-MonitoringTest-")


Exception 내용이다. 


java.lang.IllegalArgumentException: invalid ActorSystem name [MonitoringTest ], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')




문제가 발생하지 않는 경우의 예

val system = ActorSystem("MonitoringTest")


val system = ActorSystem("MonitoringTest1-")



Posted by '김용환'
,


Play2(scala) + Kafka producer/consumer 예제이다. 


kafka는 이제 분산 파일 큐이고 여러 consumer가 동시에 동일한 데이터를 받을 수 있다. 


관련 예제를 소개한다. 





1) Producer


topic은 "aa"로 결정한다.

package worker

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.{Date, Properties}

import scala.util.Random

class KafkaProducerWorker(brokerList: String, topic: String) {
private val rnd = new Random()
private val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[Integer, String](props)

def send(events: Int) {
val runtime = new Date().getTime()
val key = rnd.nextInt(100)
val msg = runtime.toString
val data = new ProducerRecord[Integer, String](topic, key, msg)
producer.send(data)
}
}




2) consumer

package worker

import java.util
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

class KafkaConsumerWorker (val brokers: String,
val groupId: String,
val topic: String) {

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)
var executor: ExecutorService = null
consumer.subscribe(util.Collections.singleton(this.topic))

def run(): Unit= {
Executors.newSingleThreadExecutor.execute(new Runnable {
override def run(): Unit = {
while(true) {
val consumerRecords = consumer.poll(Integer.MAX_VALUE)

println("size : " + consumerRecords.count())
while(consumerRecords.iterator.hasNext) {
val record = consumerRecords.iterator.next() println(s"topic = ${record.topic()}, partition = ${record.partition()}, " + s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}\n")

}
}
}
})
}

def shutdown() = {
if (consumer != null) consumer.close();
if (executor != null) executor.shutdown();
}
}




3) conumer 추가 - ApplicationTimer

@Singleton
class ApplicationTimer @Inject() (clock: Clock, appLifecycle: ApplicationLifecycle) {

private val start: Instant = clock.instant
Logger.info(s"ApplicationTimer demo: Starting application at $start.")

Logger.info(s"Starting Kafka Consume Worker")
private val kafkaConsumerWorker = new KafkaConsumerWorker("localhost:9092", "1", "aa")
kafkaConsumerWorker.run()

appLifecycle.addStopHook { () =>
kafkaConsumerWorker.shutdown()
Logger.info(s"Stopping kafkaConsumerWorker.")

val stop: Instant = clock.instant
val runningTime: Long = stop.getEpochSecond - start.getEpochSecond
Logger.info(s"Stopping application at ${clock.instant} after ${runningTime}s.")
Future.successful(())
}
}



4) producer web call

@Singleton
class AsyncController @Inject()(counter: Counter, actorSystem: ActorSystem) (implicit exec: ExecutionContext) extends Controller {

def index = Action.async {
getFutureMessage(1.second).map { msg => Ok(counter.nextCount().toString + " " + msg) }
}

val kafkaProducerWorker = new KafkaProducerWorker("localhost:9092", "aa")

private def getFutureMessage(delayTime: FiniteDuration): Future[String] = {
val promise: Promise[String] = Promise[String]()
actorSystem.scheduler.scheduleOnce(delayTime) {
kafkaProducerWorker.send(1)
promise.success("Hi!")
}
promise.future
}
}



결과를 보려면 $ curl http://localhost:9000으로 테스트한다.



size : 1

topic = aa, partition = 0, offset = 1680, key = 3, value = 1489127790823


size : 1

topic = aa, partition = 0, offset = 1681, key = 71, value = 1489127833413


size : 1

topic = aa, partition = 0, offset = 1682, key = 17, value = 1489127836895

Posted by '김용환'
,


// 맨 처음(머리) 엘리먼트

scala> val h = myArray.head 

h: Int = -5 

 

// 맨 처음(머리) 엘리먼트를 Option으로 생성

scala> val ho = myArray.headOption 

ho: Option[Int] = Some(-5) 

 

// 마지막 엘리먼트를 제외한 모든 엘리먼트를 포함

scala> val h = myArray.init 

h: Array[Int] = Array(-5, -4, -3, -2, -1, 0, 1, 2, 3, 4) 

 

// 마지막 엘리먼트

scala> val la = myArray.last 

la: Int = 5 

 

// 마지막 엘리먼트를 옵션으로 생성

scala> val la = myArray.lastOption 

la: Option[Int] = Some(5) 

 

// 맨 처음 엘리먼트를 제외한 모든 엘리먼트를 포함

scala> val t = myArray.tail 

t: Array[Int] = Array(-4, -3, -2, -1, 0, 1, 2, 3, 4, 5) 

Posted by '김용환'
,



// -5부터 5까지 정수 타입의 범위를 선언한다

scala> val myArray = (-5 to 5).toArray 

myArray: Array[Int] = Array(-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5) 

 


// N개의 엘리먼트를 삭제한다

scala> val d = myArray.drop(3) 

d: Array[Int] = Array(-2, -1, 0, 1, 2, 3, 4, 5) 


 

// 술어 함수가 true인 모두 엘리먼트를 삭제한다

scala> val dw = myArray.dropWhile(_<2) 

dw: Array[Int] = Array(2, 3, 4, 5) 

 

// 마지막 N개의 엘리먼트를 삭제한다

scala> val dr = myArray.dropRight(4) 

dr: Array[Int] = Array(-5, -4, -3, -2, -1, 0, 1) 

 

// 처음 N개의 엘리먼트를 얻는다

scala> val t = myArray.take(3) 

t: Array[Int] = Array(-5, -4, -3) 

 

// 술어 함수가 true인 모든 엘리먼트를 얻는다.

scala> val tw = myArray.takeWhile(_<2) 

tw: Array[Int] = Array(-5, -4, -3, -2, -1, 0, 1) 

 

// 마지막 N개의 엘리먼트만 얻는다

scala> val tr = myArray.takeRight(3) 

tr: Array[Int] = Array(3, 4, 5) 

 

 

// A번째 인덱스부터 B번째 인덱스까지의 하위 시퀀스

scala> val sl = myArray.slice(1,3) 

sl: Array[Int] = Array(-4, -3) 

Posted by '김용환'
,