Squirrel SQL 클라이트 툴에 

https://acadgild.com/blog/squirrel-gui-phoenix


MACOS에서 Squirrel SQL을 설치했지만 실행이 되지 않는다. 아마도 path 이슈일 것 같다.





간단히 해결한 방법은 다음과 같다.


먼저 설치 jar로 설치하고 디폴트 설치 위치로 /Applications/SQuirreLSQL.app/에 두게 한다.




alias squirrel='/Applications/SQuirreLSQL.app/Contents/MacOS/squirrel-sql.sh'


mkdir -p /Applications/SQuirreLSQL.app/Contents/MacOS/lib/


cp /Applications/SQuirreLSQL.app/Contents/Resources/Java/lib/* /Applications/SQuirreLSQL.app/Contents/MacOS/lib/


cp /Applications/SQuirreLSQL.app/Contents/Resources/Java/squirrel-sql.jar  /Applications/SQuirreLSQL.app/Contents/MacOS/




squirrel을 실행하면 클라이언트 툴이 실행된다.



Posted by 김용환 '김용환'


spark readStream()으로 읽은 DataSet을 카산드라에 저장하는 예시 코드이다. 






import com.datastax.driver.core.Session

import com.datastax.spark.connector.cql.CassandraConnector

import org.apache.spark.sql.ForeachWriter




val spark = ...


val ds = spark.readStream()

...




val connector = CassandraConnector.apply(spark.sparkContext.getConf)

val session = connector.openSession


def processRow(value: (String, String, String, String)) = {

  connector.withSessionDo { session =>

    session.execute(s"insert into test.log(ktag, ts, uuid, log) values('  ${value._1}', '${value._2}', '${value._3}', '${value._4}'   )")

  }

}


    

val writer = new ForeachWriter[(String, String, String, String)] {

  override def open(partitionId: Long, version: Long) = true


  override def process(value: (String, String, String, String)) = {

    processRow(value)

  }

  override def close(errorOrNull: Throwable) = {

    println(errorOrNull)

  }


}


val query = ds.writeStream.queryName("test").foreach(writer).start


query.awaitTermination()





build.sbt에는 spark-cassandra-connector를 추가한다.

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"



Posted by 김용환 '김용환'





Spark에서 원래 json 코드와 파싱된(분류된) 데이터를 한번에 보고 싶다면 아래와 같은 코드를 참조하길 바란다.



val schema = StructType(

List( StructField("year", StringType, nullable = true), StructField("month", StringType, nullable = true), StructField("day", StringType, nullable = true) ) ) val ds = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers")) .option("startingOffsets", "latest") .option("key.deserializer", "classOf[StringDeserializer]") .option("value.deserializer", "classOf[StringDeserializer]") .option("subscribe", config.getString(s"kafka.$phase.topic.name")) .load() .selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema).as("data"), col("value").cast("string")) .select("data.*", "value") .as[(String, String, String, String)]



Posted by 김용환 '김용환'



spark streaming을 처리할 때 Encoder를 잘 이해하지 못하면,  아래 에러를 많이 만나게 된다.



Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.




단순히 Serializable 이슈라 하기에는 좀..

spark을 더 공부할 수 있는 꺼리가 할 수 있다.


DataFrame 및 DataSet에 대한 이해도를 높일 수 있다.



https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row



https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Encoder.html




https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



Posted by 김용환 '김용환'




pip로 어떤 패키지를 설치했는지 목록을 볼 수 있다. freeze 커맨드를 사용한다.


# pip freeze

celery==3.1.7

certifi==2018.8.24

...

selenium==3.14.1

six==1.11.0

urllib3==1.22

w3lib==1.19.0

websocket-client==0.51.0

Werkzeug==0.14.1

zope.interface==4.4.3






# pip freeze | grep sel


selenium==3.14.1


Posted by 김용환 '김용환'




스파크 스트리밍 처리할 때 누산기(accumulator) 같이 처리해야 할 때가 있다. 


아래 예시는 처리해야 할 offset을 모두 더하는(누산기) 기능이다. 잘 동작한다.



  var totalLag: Long = 0


  def printLag(rdd: RDD[ConsumerRecord[String, String]]): Unit = {

    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>

      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      totalLag += o.count()

    }

    println(s"******************total lag : $totalLag")

    totalLag = 0

  }



Posted by 김용환 '김용환'




MacOS의 최신 파이어폭스(firefox)에 인증 정보/쿠키 정보를 lz4로 암호화되어 있다. 그러나 표준이 아니라서 파이썬으로 확인해볼 수 없으나, 툴로는 확인할 수 있다.





git clone https://github.com/andikleen/lz4json.git

cd lz4json

make

cp ~/Library/Application Support/Firefox/Profiles/*.default/sessionstore.jsonlz4 .

./lz4jsoncat sessionstore.jsonlz4



Posted by 김용환 '김용환'





카프카(Kafka) 컨슈머는 토픽(topic)에서 메시지를 읽는다. 갑작스럽게 종료되면 종료되기 전에 어딘가까지 읽었다는 위치(오프셋(offset))을 저장한다. 오프셋(offset)은 파티션에서 수신되는 각 메시지에 대해 계속 증가하는 정수 값인 메타 데이터 조각(piece)입니다. 각 메시지는 파티션에 고유한 오프셋 값을 갖는다.


카프카의 각 메시지는 고유한 오프셋을 갖고 오프셋은 특정 파티션에서 해당 메시지의 위치를 나타낸다.


컨슈머가 파티션에서 메시지를 읽으면 카프카는 마지막으로 사용한 메시지의 오프셋을 알 수 있다. 카프카 오프셋은 _consumer_offsets라는 토픽에 저장되며 컨슈머는 컨슘 메시지를 잊지 않고 중지한 부분부터 재시작할 수 있다.


어떻게 디폴트로 저장되는지 보려면 다음 값을 확인할 수 있다. 


enable.auto.commit  (기본값은 true)

auto.commit.interval.ms (기본값은 5000)

 

즉 컨슈머는 기본적으로 매 5초마다 카프카(Kafka)에 오프셋을 자동 커밋(commit)하거나 지정 토픽에서 데이터를 가져올 때마다 최신 오프셋을 커밋한다



만약 중복 처리를 최대한 하고 싶지 않다면 메시지의 오프셋을 수동으로 커밋(commit)한다. 


그리고 enable.auto.commit 속성의 값을 false로 변경해야 한다.


(자연스럽게 auto.commit.interval.ms 값은 무시된다.)


 


Posted by 김용환 '김용환'



구글 드라이브 용량이 부족하거든.. (용량 부족하면 이메일도 가지 않는다)


필요없는 첨부파일 메일, 


용량 큰 중복 파일이 있는지 확인하고 지우고..


아래  URL에 접속해서 애매하게 남아있는 데이터를 지워야 한다.


https://drive.google.com/drive/search?q=is:unorganized%20owner:me



Posted by 김용환 '김용환'


pull requst할 때 자주 발생할 수 있는 것으로


원격 리모트의 새로운 브랜치가 추가되었고 이를 기반으로 pull request를 하려고 그냥 브랜치를 받고 pr하면 history가 꼬인다.


따라서 pr할 원격 리모트와 연결한 후 git checkout 받고 자신의 저장소에 push한다.


$ git checkout -b dev real/dev

$ git push origin dev





상황에 따라 upstream을 수동으로 연동해야 할 수도 있다.


$ git branch --set-upstream-to origin/dev

Posted by 김용환 '김용환'