flask에서는 json encoder를 사용해서 json 응답을 보내줘야 한다.


@app.route("/getEmployeeList") def getEmployeeList(): try: # Initialize a employee list employeeList = [] # create a instances for filling up employee list for i in range(0,2): empDict = { 'firstName': 'Roy', 'lastName': 'Augustine'} employeeList.append(empDict) # convert to json data jsonStr = json.dumps(employeeList) except Exception ,e: print str(e) return jsonify(Employees=jsonStr)


https://codehandbook.org/create-json-using-python-flask/





그러나 flask에 flask-restful을 추가해 설치한후,, 아래와 같이 설정한다면..


from flask import Flask
from flask_restful import Resource, Api

app = Flask(__name__)
api = Api(app)

class HelloWorld(Resource):
    def get(self):
        return {'hello': 'world'}

api.add_resource(HelloWorld, '/')

if __name__ == '__main__':
    app.run(debug=True)



그냥 기본 타입과 collection은 자동으로 json으로 변환한다. 그 이유가 멀까?


json.dump(aaa) 이런 코드가 필요없어서 참 좋았다.







https://github.com/flask-restful/flask-restful/blob/master/flask_restful/__init__.py#L474


make_response()에서 default decorator로 json을 출력한다.





아래 코드를 보면, indent 4칸에 newline으로 예쁘게 출력하는 코드가 있다. 


https://github.com/flask-restful/flask-restful/blob/master/flask_restful/representations/json.py


from __future__ import absolute_import
from flask import make_response, current_app
from flask_restful.utils import PY3
from json import dumps


def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', not PY3)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    resp.headers.extend(headers or {})
    return resp






Posted by '김용환'
,




okhttp3와 moshi만 있으면 자바/스칼라 http 통신이 완전 편해진다.. 



okhttp3와 moshi는 json serialization/deserialization 개발 공부를 크게 낮춘다.





https://github.com/square/okhttp/wiki/Recipes


https://github.com/square/moshi


Posted by '김용환'
,




python으로 해결하는 JSONP 파싱 예시이다.



>>> import requests

>>> url = '...'

>>> jsonp = requests.get(url % 1000)

>>> jsonp.content

b'callback({"status":{

...

})' 

>>> import json

>>> pure_json = jsonp.text[jsonp.text.index('(') + 1 : jsonp.text.rindex(')')]

>>> dealers = json.loads(pure_json)

>>> dealers.keys()

dict_keys(['status'])

>>> dealers['count']

10 



Posted by '김용환'
,




Spark Structed Streaming 코드로 읽는 부분에 대한 예시




kafka에서 json만 value로 받는다. 


json도 필드로, json에 있는 json 값의 내용도 필드로 꺼집어 내서 DataSet으로 구성하는 예이다.






즉, 


json 필드는 log 컬럼로,


json 필드의 tag 값은 tag 컬럼으로,


json 필드의 @timestamp 값은 @timestamp 컬럼으로,


json 필드의 uuid는 uuid 컬럼으로 생성해, 총 4개의 컬럼으로 구성하는 예시이다. 




    val schema = StructType(

      List(

        StructField("tag", StringType, nullable = true),

        StructField("@timestamp", StringType, nullable = true),

        StructField("uuid", StringType, nullable = true)

      )

    )


    val ds = spark.readStream.format("kafka")

      .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers"))

      .option("startingOffsets", "latest")

      .option("key.deserializer", "classOf[StringDeserializer]")

      .option("value.deserializer", "classOf[StringDeserializer]")

      .option("subscribe", config.getString(s"kafka.$phase.topic.name"))

      .load()

      .selectExpr("CAST(value AS STRING)")

      .select(from_json($"value", schema).as("data"), col("value").cast("string").as("log"))

      .select("data.*", "log")

      .withColumnRenamed("tag", "tag")

      .withColumnRenamed("@timestamp", "timestamp")

      .withColumnRenamed("uuid", "uuid")

      .as[(String, String, String, String)]









기초 지식 참고

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-kafka-integration.html


Posted by '김용환'
,


spark 2.x의 spark structured streaming 예시로 다음 코드를 참고하는 것이 좋다.


아래 코드는 카프카와 카산드라 연동 내용이다.


https://github.com/ansrivas/spark-structured-streaming/blob/master/src/main/scala/com/kafkaToSparkToCass/Main.scala





class SparkJob extends Serializable {

  @transient lazy val logger = Logger.getLogger(this.getClass)


  logger.setLevel(Level.INFO)

  val sparkSession =

    SparkSession.builder

      .master("local[*]")

      .appName("kafka2Spark2Cassandra")

      .config("spark.cassandra.connection.host", "localhost")

      .getOrCreate()


  val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)


  // Create keyspace and tables here, NOT in prod

  connector.withSessionDo { session =>

    Statements.createKeySpaceAndTable(session, true)

  }


  private def processRow(value: Commons.UserEvent) = {

    connector.withSessionDo { session =>

      session.execute(Statements.cql(value.user_id, value.time, value.event))

    }

  }


  def runJob() = {


    logger.info("Execution started with following configuration")

    val cols = List("user_id", "time", "event")


    import sparkSession.implicits._

    val lines = sparkSession.readStream

      .format("kafka")

      .option("subscribe", "test.1")

      .option("kafka.bootstrap.servers", "localhost:9092")

      .option("startingOffsets", "earliest")

      .load()

      .selectExpr("CAST(value AS STRING)",

                  "CAST(topic as STRING)",

                  "CAST(partition as INTEGER)")

      .as[(String, String, Integer)]


    val df =

      lines.map { line =>

        val columns = line._1.split(";") // value being sent out as a comma separated value "userid_1;2015-05-01T00:00:00;some_value"

        (columns(0), Commons.getTimeStamp(columns(1)), columns(2))

      }.toDF(cols: _*)


    df.printSchema()


    // Run your business logic here

    val ds = df.select($"user_id", $"time", $"event").as[Commons.UserEvent]


    // This Foreach sink writer writes the output to cassandra.

    import org.apache.spark.sql.ForeachWriter

    val writer = new ForeachWriter[Commons.UserEvent] {

      override def open(partitionId: Long, version: Long) = true

      override def process(value: Commons.UserEvent) = {

        processRow(value)

      }

      override def close(errorOrNull: Throwable) = {}

    }


    val query =

      ds.writeStream.queryName("kafka2Spark2Cassandra").foreach(writer).start


    query.awaitTermination()

    sparkSession.stop()

  }


Posted by '김용환'
,



Spark - Kafka 코드를 sbt로 빌드할 때 발생했다.


 object Subscribe in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010

[error]       Subscribe[String, String](topics, kafkaParams, emptyMap))




IntelliJ IDEA 자동 컴파일할 때는 다음 에러가 발생한다.


symbol apply is  inaccessible from this place 



원인은 Subscribe 앞에 ConsumerStrategies 클래스를 두지 않아 저런 에러가 발생했다. 




ConsumerStragies.Subscribe[String, String](topics, kafkaParams, emptyMap)) 로 변경하면 더 이상 컴파일 에러가 발생하지 않는다. 


너무 클래스를 숨기는 것 보다 차라리 클래스를 드러내는 것도 좋다. 

Posted by '김용환'
,




Phoenix에서 create schema을 실행하려면



phoenix/bin/hdfs-site.xml과 hbase/conf/hdfs-site.xml에 다음 속성을 추가해야 한다.




<property>

<name>phoenix.schema.isNamespaceMappingEnabled</name> 

<value>true</value> 

</property> 




만약 설정되어 있지 않도록 sqline,py에서 다음 에러가 발생한다.


Cannot create schema because config phoenix.schema.isNamespaceMappingEnabled for enabling name space mapping isn't enabled \


Posted by '김용환'
,

[hbase-phoenix] jdbc driver

hadoop 2018. 11. 1. 19:55


Apache Phoenix driver를 사용할 때 유의해야 할 부분이 있다.



zookeeper를 이용한 분산 hbase 위에 phoenix를 사용할 때와


standalone의 hbase 위에 phoenix를 사용할 때의 jdbc url은 다르다.


(https://phoenix.apache.org/faq.html)



1. Thick driver

zookeeper를 이용한 분산 hbase 위에 phoenix를 사용할 때를 thick driver라 한다.



스키마는 다음과 같고.. lib는 phoenix-4.14.0-Hbase-1.4-client.jar를 사용한다.

jdbc:phoenix:[comma-separated ZooKeeper Quorum [:port [:hbase root znode [:kerberos_principal [:path to kerberos keytab] ] ] ]



예)


jdbc:phoenix:localhost



jdbc:phoenix:zookeeper1.domain,zookeeper2.domain,zookeeper3.domain:2181:/hbase-1:phoenix@EXAMPLE.COM:/etc/security/keytabs/phoenix.keytab






2. Thin driver



standalone의 hbase 위에 phoenix를 사용할 때의 jdbc url은 thin driver라 한다.


스키마는 다음과 같고.. lib는 phoenix-4.14.0-Hbase-1.4-thin-client.jar를 사용한다.


(라이브러리 파일이 다르고..특별히 Dirver Class는 org.apache.phoenix.querserver.client.Driver임을 잘 기억해야 한다.)



jdbc:phoenix:thin:[key=value[;key=value...]]



예)



jdbc:phoenix:thin:url=http://localhost:8765



jdbc:phoenix:thin:url=http://queryserver.domain:8765;serialization=PROTOBUF;authentication=SPENGO;principal=phoenix@EXAMPLE.COM;keytab=/etc/security/keytabs/phoenix.keytab





주의할 점은 기본 phoenix query server를 실행했다면  serialization=PROTOBUF을 넣어줘야 한다!!



json을 사용하고 싶다면. hbase-site.xml에 phoenix.queryserver.serialization=JSON을 설정해야 한다. 

Posted by '김용환'
,



Squirrel SQL 클라이트 툴에 

https://acadgild.com/blog/squirrel-gui-phoenix


MACOS에서 Squirrel SQL을 설치했지만 실행이 되지 않는다. 아마도 path 이슈일 것 같다.





간단히 해결한 방법은 다음과 같다.


먼저 설치 jar로 설치하고 디폴트 설치 위치로 /Applications/SQuirreLSQL.app/에 두게 한다.




alias squirrel='/Applications/SQuirreLSQL.app/Contents/MacOS/squirrel-sql.sh'


mkdir -p /Applications/SQuirreLSQL.app/Contents/MacOS/lib/


cp /Applications/SQuirreLSQL.app/Contents/Resources/Java/lib/* /Applications/SQuirreLSQL.app/Contents/MacOS/lib/


cp /Applications/SQuirreLSQL.app/Contents/Resources/Java/squirrel-sql.jar  /Applications/SQuirreLSQL.app/Contents/MacOS/




squirrel을 실행하면 클라이언트 툴이 실행된다.



Posted by '김용환'
,


spark readStream()으로 읽은 DataSet을 카산드라에 저장하는 예시 코드이다. 






import com.datastax.driver.core.Session

import com.datastax.spark.connector.cql.CassandraConnector

import org.apache.spark.sql.ForeachWriter




val spark = ...


val ds = spark.readStream()

...




val connector = CassandraConnector.apply(spark.sparkContext.getConf)

val session = connector.openSession


def processRow(value: (String, String, String, String)) = {

  connector.withSessionDo { session =>

    session.execute(s"insert into test.log(ktag, ts, uuid, log) values('  ${value._1}', '${value._2}', '${value._3}', '${value._4}'   )")

  }

}


    

val writer = new ForeachWriter[(String, String, String, String)] {

  override def open(partitionId: Long, version: Long) = true


  override def process(value: (String, String, String, String)) = {

    processRow(value)

  }

  override def close(errorOrNull: Throwable) = {

    println(errorOrNull)

  }


}


val query = ds.writeStream.queryName("test").foreach(writer).start


query.awaitTermination()





build.sbt에는 spark-cassandra-connector를 추가한다.

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"



Posted by '김용환'
,