Database Class Loader started - derby.database.classpath=''

18/11/29 22:05:54 ERROR PoolWatchThread: Error in trying to obtain a connection. Retrying in 7000ms

java.sql.SQLException: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.

at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)

at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)

at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)

at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)

at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)

at org.apache.derby.impl.jdbc.EmbedConnection.setReadOnly(Unknown Source)

at com.jolbox.bonecp.ConnectionHandle.setReadOnly(ConnectionHandle.java:1324)

at com.jolbox.bonecp.ConnectionHandle.<init>(ConnectionHandle.java:262)

at com.jolbox.bonecp.PoolWatchThread.fillConnections(PoolWatchThread.java:115)

at com.jolbox.bonecp.PoolWatchThread.run(PoolWatchThread.java:82)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: ERROR 25505: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.

at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)

at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)

at org.apache.derby.impl.sql.conn.GenericAuthorizer.setReadOnlyConnection(Unknown Source)

at org.apache.derby.impl.sql.conn.GenericLanguageConnectionContext.setReadOnly(Unknown Source)




원인은 권한이다. spark-shell의 권한이 root 또는  metastore_db 디렉토리의 권한을 확인한다.

Posted by '김용환'
,




백프레셔(Backpressure) – 스트리밍 작업을 처리하다가 폭발적인 데이터(예, 사건/사고, 이벤트)가 발생하면 처리 시스템은 폭발적인 데이터를 우아하게 처리할 수 있어야 한다. 


처리 시간이 배치 간격보다 커지면 다음 배치 잡에서는 지연이 생기고 불안정해 진다. 따라서 불안정 상태가 지속되면 백프레셔에 의해 입력율(input rate)를 줄여 처리량과 처리 시간을 줄인다. 따라서 지연이 0이 될 것이다. 


폭발적인 데이터가 갑자기 카프카에 저장되어 스파크 스트리밍의 카프카 컨슈머에 리턴하는 배치 크기를 제한하고 싶을 수 있다. 이럴 때 스파크 스트리밍 백프레셔를 적용할 수 있다. (이는 대부분의 스트리밍 처리 플랫폼(예, storm, flink)에서 제공된다.)



spark.streaming.backpressure.enabled와 spark.streaming.backpressure.initialRate를 사용하면 된다. 

spark.streaming.backpressure.initialRate 기본값은 not set이고,  spark.streaming.backpressure.enabled 기본값은  disabled이다. 



https://spark.apache.org/docs/latest/configuration.html#spark-streaming 설정에 잘 설명되어 있다.


스파크 스트리밍은 spark.streaming.backpressure.enabled를 통해 현재 배치 스케줄링 지연과 처리 시간을 기준으로 수신 속도를 제어할 수 있기 때문에 시스템이 최대한 빠르게 처리할 수 있다. 내부적으로는 수신자의 최대 수신 속도가 동적으로 설정된다. 이 속도는 spark.streaming.receiver.maxRate와 spark.streaming.kafka.maxRatePerPartition 상한 값으로 설정된다.


첫 번째 배치를 제어하거나 좀 더 구체적으로 첫 번째 배치의 메시지 수를 설정하고 싶다면, spark.streaming.backpressure.initialRate을 사용할 수 있다. spark.streaming.backpressure.initialRate은 백프레셔 메커니즘이 활성화(spark.streaming.backpressure.enabled=true)되었을 때 각 리시버가 첫번째 배치에 대한 데이터를 수신하는 최대 수신 속도이다. 


spark.streaming.kafka.maxRatePerPartition의 기본값은 not set인고, 카프카 direct stream API를 사용할 때 각 카프카 파티션에서 데이터를 읽을 최대 속도(초당 레코드 수)으로 설정한다.



예)

spark.streaming.kafka.maxRatePerPartition = "100"

spark.streaming.backpressure.enabled = "true"






Posted by '김용환'
,




Spark Structed Streaming 코드로 읽는 부분에 대한 예시




kafka에서 json만 value로 받는다. 


json도 필드로, json에 있는 json 값의 내용도 필드로 꺼집어 내서 DataSet으로 구성하는 예이다.






즉, 


json 필드는 log 컬럼로,


json 필드의 tag 값은 tag 컬럼으로,


json 필드의 @timestamp 값은 @timestamp 컬럼으로,


json 필드의 uuid는 uuid 컬럼으로 생성해, 총 4개의 컬럼으로 구성하는 예시이다. 




    val schema = StructType(

      List(

        StructField("tag", StringType, nullable = true),

        StructField("@timestamp", StringType, nullable = true),

        StructField("uuid", StringType, nullable = true)

      )

    )


    val ds = spark.readStream.format("kafka")

      .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers"))

      .option("startingOffsets", "latest")

      .option("key.deserializer", "classOf[StringDeserializer]")

      .option("value.deserializer", "classOf[StringDeserializer]")

      .option("subscribe", config.getString(s"kafka.$phase.topic.name"))

      .load()

      .selectExpr("CAST(value AS STRING)")

      .select(from_json($"value", schema).as("data"), col("value").cast("string").as("log"))

      .select("data.*", "log")

      .withColumnRenamed("tag", "tag")

      .withColumnRenamed("@timestamp", "timestamp")

      .withColumnRenamed("uuid", "uuid")

      .as[(String, String, String, String)]









기초 지식 참고

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-kafka-integration.html


Posted by '김용환'
,


spark 2.x의 spark structured streaming 예시로 다음 코드를 참고하는 것이 좋다.


아래 코드는 카프카와 카산드라 연동 내용이다.


https://github.com/ansrivas/spark-structured-streaming/blob/master/src/main/scala/com/kafkaToSparkToCass/Main.scala





class SparkJob extends Serializable {

  @transient lazy val logger = Logger.getLogger(this.getClass)


  logger.setLevel(Level.INFO)

  val sparkSession =

    SparkSession.builder

      .master("local[*]")

      .appName("kafka2Spark2Cassandra")

      .config("spark.cassandra.connection.host", "localhost")

      .getOrCreate()


  val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)


  // Create keyspace and tables here, NOT in prod

  connector.withSessionDo { session =>

    Statements.createKeySpaceAndTable(session, true)

  }


  private def processRow(value: Commons.UserEvent) = {

    connector.withSessionDo { session =>

      session.execute(Statements.cql(value.user_id, value.time, value.event))

    }

  }


  def runJob() = {


    logger.info("Execution started with following configuration")

    val cols = List("user_id", "time", "event")


    import sparkSession.implicits._

    val lines = sparkSession.readStream

      .format("kafka")

      .option("subscribe", "test.1")

      .option("kafka.bootstrap.servers", "localhost:9092")

      .option("startingOffsets", "earliest")

      .load()

      .selectExpr("CAST(value AS STRING)",

                  "CAST(topic as STRING)",

                  "CAST(partition as INTEGER)")

      .as[(String, String, Integer)]


    val df =

      lines.map { line =>

        val columns = line._1.split(";") // value being sent out as a comma separated value "userid_1;2015-05-01T00:00:00;some_value"

        (columns(0), Commons.getTimeStamp(columns(1)), columns(2))

      }.toDF(cols: _*)


    df.printSchema()


    // Run your business logic here

    val ds = df.select($"user_id", $"time", $"event").as[Commons.UserEvent]


    // This Foreach sink writer writes the output to cassandra.

    import org.apache.spark.sql.ForeachWriter

    val writer = new ForeachWriter[Commons.UserEvent] {

      override def open(partitionId: Long, version: Long) = true

      override def process(value: Commons.UserEvent) = {

        processRow(value)

      }

      override def close(errorOrNull: Throwable) = {}

    }


    val query =

      ds.writeStream.queryName("kafka2Spark2Cassandra").foreach(writer).start


    query.awaitTermination()

    sparkSession.stop()

  }


Posted by '김용환'
,



Spark - Kafka 코드를 sbt로 빌드할 때 발생했다.


 object Subscribe in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010

[error]       Subscribe[String, String](topics, kafkaParams, emptyMap))




IntelliJ IDEA 자동 컴파일할 때는 다음 에러가 발생한다.


symbol apply is  inaccessible from this place 



원인은 Subscribe 앞에 ConsumerStrategies 클래스를 두지 않아 저런 에러가 발생했다. 




ConsumerStragies.Subscribe[String, String](topics, kafkaParams, emptyMap)) 로 변경하면 더 이상 컴파일 에러가 발생하지 않는다. 


너무 클래스를 숨기는 것 보다 차라리 클래스를 드러내는 것도 좋다. 

Posted by '김용환'
,


spark readStream()으로 읽은 DataSet을 카산드라에 저장하는 예시 코드이다. 






import com.datastax.driver.core.Session

import com.datastax.spark.connector.cql.CassandraConnector

import org.apache.spark.sql.ForeachWriter




val spark = ...


val ds = spark.readStream()

...




val connector = CassandraConnector.apply(spark.sparkContext.getConf)

val session = connector.openSession


def processRow(value: (String, String, String, String)) = {

  connector.withSessionDo { session =>

    session.execute(s"insert into test.log(ktag, ts, uuid, log) values('  ${value._1}', '${value._2}', '${value._3}', '${value._4}'   )")

  }

}


    

val writer = new ForeachWriter[(String, String, String, String)] {

  override def open(partitionId: Long, version: Long) = true


  override def process(value: (String, String, String, String)) = {

    processRow(value)

  }

  override def close(errorOrNull: Throwable) = {

    println(errorOrNull)

  }


}


val query = ds.writeStream.queryName("test").foreach(writer).start


query.awaitTermination()





build.sbt에는 spark-cassandra-connector를 추가한다.

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"



Posted by '김용환'
,





Spark에서 원래 json 코드와 파싱된(분류된) 데이터를 한번에 보고 싶다면 아래와 같은 코드를 참조하길 바란다.



val schema = StructType(

List( StructField("year", StringType, nullable = true), StructField("month", StringType, nullable = true), StructField("day", StringType, nullable = true) ) ) val ds = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers")) .option("startingOffsets", "latest") .option("key.deserializer", "classOf[StringDeserializer]") .option("value.deserializer", "classOf[StringDeserializer]") .option("subscribe", config.getString(s"kafka.$phase.topic.name")) .load() .selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema).as("data"), col("value").cast("string")) .select("data.*", "value") .as[(String, String, String, String)]



Posted by '김용환'
,



spark streaming을 처리할 때 Encoder를 잘 이해하지 못하면,  아래 에러를 많이 만나게 된다.



Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.




단순히 Serializable 이슈라 하기에는 좀..

spark을 더 공부할 수 있는 꺼리가 할 수 있다.


DataFrame 및 DataSet에 대한 이해도를 높일 수 있다.



https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row



https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Encoder.html




https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



Posted by '김용환'
,




스파크 스트리밍 처리할 때 누산기(accumulator) 같이 처리해야 할 때가 있다. 


아래 예시는 처리해야 할 offset을 모두 더하는(누산기) 기능이다. 잘 동작한다.



  var totalLag: Long = 0


  def printLag(rdd: RDD[ConsumerRecord[String, String]]): Unit = {

    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>

      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      totalLag += o.count()

    }

    println(s"******************total lag : $totalLag")

    totalLag = 0

  }



Posted by '김용환'
,


scala> val rdd_one = sc.parallelize(Seq(1,2,3))

rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> rdd_one.getNumPartitions

res0: Int = 12


scala>


scala> sc.defaultParallelism

res1: Int = 12






기본 파티션 개수는 cpu 개수를 기반으로 만들어지거나 (장비별로 다른 값이 나옴, defaultParallelism으로 확인할 수 있다.) 스파크 설정 매개 변수 spark.default.parallelism 또는 클러스터의 코어 개수 중 하나이다


그외에 큰 자원을 읽을 때 기본 파티션 개수가 변경될 수 있다.


 

파티션 개수는 RDD 트랜스포메이션을 실행할 태스크 수에 직접적인 영향을 주기 때문에 파티션 개수가 중요하다. 


파티션 개수가 너무 적으면 많은 데이터에서 아주 일부의 CPU/코어만 사용하기 때문에 성능이 저하되고 클러스터를 제대로 활용하지 못하게 된다. 


반면에 파티션 개수가 너무 많으면 실제로 필요한 것보다 많은 자원을 사용하기 때문에 멀티 테넌트 환경에서는 자원 부족 현상이 발생할 수 있다.

Posted by '김용환'
,