Spark Structed Streaming 코드로 읽는 부분에 대한 예시




kafka에서 json만 value로 받는다. 


json도 필드로, json에 있는 json 값의 내용도 필드로 꺼집어 내서 DataSet으로 구성하는 예이다.






즉, 


json 필드는 log 컬럼로,


json 필드의 tag 값은 tag 컬럼으로,


json 필드의 @timestamp 값은 @timestamp 컬럼으로,


json 필드의 uuid는 uuid 컬럼으로 생성해, 총 4개의 컬럼으로 구성하는 예시이다. 




    val schema = StructType(

      List(

        StructField("tag", StringType, nullable = true),

        StructField("@timestamp", StringType, nullable = true),

        StructField("uuid", StringType, nullable = true)

      )

    )


    val ds = spark.readStream.format("kafka")

      .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers"))

      .option("startingOffsets", "latest")

      .option("key.deserializer", "classOf[StringDeserializer]")

      .option("value.deserializer", "classOf[StringDeserializer]")

      .option("subscribe", config.getString(s"kafka.$phase.topic.name"))

      .load()

      .selectExpr("CAST(value AS STRING)")

      .select(from_json($"value", schema).as("data"), col("value").cast("string").as("log"))

      .select("data.*", "log")

      .withColumnRenamed("tag", "tag")

      .withColumnRenamed("@timestamp", "timestamp")

      .withColumnRenamed("uuid", "uuid")

      .as[(String, String, String, String)]









기초 지식 참고

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-kafka-integration.html


Posted by '김용환'
,


spark 2.x의 spark structured streaming 예시로 다음 코드를 참고하는 것이 좋다.


아래 코드는 카프카와 카산드라 연동 내용이다.


https://github.com/ansrivas/spark-structured-streaming/blob/master/src/main/scala/com/kafkaToSparkToCass/Main.scala





class SparkJob extends Serializable {

  @transient lazy val logger = Logger.getLogger(this.getClass)


  logger.setLevel(Level.INFO)

  val sparkSession =

    SparkSession.builder

      .master("local[*]")

      .appName("kafka2Spark2Cassandra")

      .config("spark.cassandra.connection.host", "localhost")

      .getOrCreate()


  val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)


  // Create keyspace and tables here, NOT in prod

  connector.withSessionDo { session =>

    Statements.createKeySpaceAndTable(session, true)

  }


  private def processRow(value: Commons.UserEvent) = {

    connector.withSessionDo { session =>

      session.execute(Statements.cql(value.user_id, value.time, value.event))

    }

  }


  def runJob() = {


    logger.info("Execution started with following configuration")

    val cols = List("user_id", "time", "event")


    import sparkSession.implicits._

    val lines = sparkSession.readStream

      .format("kafka")

      .option("subscribe", "test.1")

      .option("kafka.bootstrap.servers", "localhost:9092")

      .option("startingOffsets", "earliest")

      .load()

      .selectExpr("CAST(value AS STRING)",

                  "CAST(topic as STRING)",

                  "CAST(partition as INTEGER)")

      .as[(String, String, Integer)]


    val df =

      lines.map { line =>

        val columns = line._1.split(";") // value being sent out as a comma separated value "userid_1;2015-05-01T00:00:00;some_value"

        (columns(0), Commons.getTimeStamp(columns(1)), columns(2))

      }.toDF(cols: _*)


    df.printSchema()


    // Run your business logic here

    val ds = df.select($"user_id", $"time", $"event").as[Commons.UserEvent]


    // This Foreach sink writer writes the output to cassandra.

    import org.apache.spark.sql.ForeachWriter

    val writer = new ForeachWriter[Commons.UserEvent] {

      override def open(partitionId: Long, version: Long) = true

      override def process(value: Commons.UserEvent) = {

        processRow(value)

      }

      override def close(errorOrNull: Throwable) = {}

    }


    val query =

      ds.writeStream.queryName("kafka2Spark2Cassandra").foreach(writer).start


    query.awaitTermination()

    sparkSession.stop()

  }


Posted by '김용환'
,



Spark - Kafka 코드를 sbt로 빌드할 때 발생했다.


 object Subscribe in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010

[error]       Subscribe[String, String](topics, kafkaParams, emptyMap))




IntelliJ IDEA 자동 컴파일할 때는 다음 에러가 발생한다.


symbol apply is  inaccessible from this place 



원인은 Subscribe 앞에 ConsumerStrategies 클래스를 두지 않아 저런 에러가 발생했다. 




ConsumerStragies.Subscribe[String, String](topics, kafkaParams, emptyMap)) 로 변경하면 더 이상 컴파일 에러가 발생하지 않는다. 


너무 클래스를 숨기는 것 보다 차라리 클래스를 드러내는 것도 좋다. 

Posted by '김용환'
,




Phoenix에서 create schema을 실행하려면



phoenix/bin/hdfs-site.xml과 hbase/conf/hdfs-site.xml에 다음 속성을 추가해야 한다.




<property>

<name>phoenix.schema.isNamespaceMappingEnabled</name> 

<value>true</value> 

</property> 




만약 설정되어 있지 않도록 sqline,py에서 다음 에러가 발생한다.


Cannot create schema because config phoenix.schema.isNamespaceMappingEnabled for enabling name space mapping isn't enabled \


Posted by '김용환'
,

[hbase-phoenix] jdbc driver

hadoop 2018. 11. 1. 19:55


Apache Phoenix driver를 사용할 때 유의해야 할 부분이 있다.



zookeeper를 이용한 분산 hbase 위에 phoenix를 사용할 때와


standalone의 hbase 위에 phoenix를 사용할 때의 jdbc url은 다르다.


(https://phoenix.apache.org/faq.html)



1. Thick driver

zookeeper를 이용한 분산 hbase 위에 phoenix를 사용할 때를 thick driver라 한다.



스키마는 다음과 같고.. lib는 phoenix-4.14.0-Hbase-1.4-client.jar를 사용한다.

jdbc:phoenix:[comma-separated ZooKeeper Quorum [:port [:hbase root znode [:kerberos_principal [:path to kerberos keytab] ] ] ]



예)


jdbc:phoenix:localhost



jdbc:phoenix:zookeeper1.domain,zookeeper2.domain,zookeeper3.domain:2181:/hbase-1:phoenix@EXAMPLE.COM:/etc/security/keytabs/phoenix.keytab






2. Thin driver



standalone의 hbase 위에 phoenix를 사용할 때의 jdbc url은 thin driver라 한다.


스키마는 다음과 같고.. lib는 phoenix-4.14.0-Hbase-1.4-thin-client.jar를 사용한다.


(라이브러리 파일이 다르고..특별히 Dirver Class는 org.apache.phoenix.querserver.client.Driver임을 잘 기억해야 한다.)



jdbc:phoenix:thin:[key=value[;key=value...]]



예)



jdbc:phoenix:thin:url=http://localhost:8765



jdbc:phoenix:thin:url=http://queryserver.domain:8765;serialization=PROTOBUF;authentication=SPENGO;principal=phoenix@EXAMPLE.COM;keytab=/etc/security/keytabs/phoenix.keytab





주의할 점은 기본 phoenix query server를 실행했다면  serialization=PROTOBUF을 넣어줘야 한다!!



json을 사용하고 싶다면. hbase-site.xml에 phoenix.queryserver.serialization=JSON을 설정해야 한다. 

Posted by '김용환'
,