Spark에서 원래 json 코드와 파싱된(분류된) 데이터를 한번에 보고 싶다면 아래와 같은 코드를 참조하길 바란다.



val schema = StructType(

List( StructField("year", StringType, nullable = true), StructField("month", StringType, nullable = true), StructField("day", StringType, nullable = true) ) ) val ds = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers")) .option("startingOffsets", "latest") .option("key.deserializer", "classOf[StringDeserializer]") .option("value.deserializer", "classOf[StringDeserializer]") .option("subscribe", config.getString(s"kafka.$phase.topic.name")) .load() .selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema).as("data"), col("value").cast("string")) .select("data.*", "value") .as[(String, String, String, String)]



Posted by '김용환'
,



spark streaming을 처리할 때 Encoder를 잘 이해하지 못하면,  아래 에러를 많이 만나게 된다.



Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.




단순히 Serializable 이슈라 하기에는 좀..

spark을 더 공부할 수 있는 꺼리가 할 수 있다.


DataFrame 및 DataSet에 대한 이해도를 높일 수 있다.



https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row



https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Encoder.html




https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



Posted by '김용환'
,




pip로 어떤 패키지를 설치했는지 목록을 볼 수 있다. freeze 커맨드를 사용한다.


# pip freeze

celery==3.1.7

certifi==2018.8.24

...

selenium==3.14.1

six==1.11.0

urllib3==1.22

w3lib==1.19.0

websocket-client==0.51.0

Werkzeug==0.14.1

zope.interface==4.4.3






# pip freeze | grep sel


selenium==3.14.1


Posted by '김용환'
,




스파크 스트리밍 처리할 때 누산기(accumulator) 같이 처리해야 할 때가 있다. 


아래 예시는 처리해야 할 offset을 모두 더하는(누산기) 기능이다. 잘 동작한다.



  var totalLag: Long = 0


  def printLag(rdd: RDD[ConsumerRecord[String, String]]): Unit = {

    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>

      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      totalLag += o.count()

    }

    println(s"******************total lag : $totalLag")

    totalLag = 0

  }



Posted by '김용환'
,




MacOS의 최신 파이어폭스(firefox)에 인증 정보/쿠키 정보를 lz4로 암호화되어 있다. 그러나 표준이 아니라서 파이썬으로 확인해볼 수 없으나, 툴로는 확인할 수 있다.





git clone https://github.com/andikleen/lz4json.git

cd lz4json

make

cp ~/Library/Application Support/Firefox/Profiles/*.default/sessionstore.jsonlz4 .

./lz4jsoncat sessionstore.jsonlz4



'Web service' 카테고리의 다른 글

공용 IP 얻기  (0) 2019.09.14
[sentry] nginx, PG 매개 변수 튜닝  (0) 2019.03.21
크롬 브라우저의 쿠기 확인하기 - sqlite  (0) 2018.10.20
[jquery] file upload 예제  (0) 2017.05.30
구글 place api : request_denied  (0) 2016.06.28
Posted by '김용환'
,





카프카(Kafka) 컨슈머는 토픽(topic)에서 메시지를 읽는다. 갑작스럽게 종료되면 종료되기 전에 어딘가까지 읽었다는 위치(오프셋(offset))을 저장한다. 오프셋(offset)은 파티션에서 수신되는 각 메시지에 대해 계속 증가하는 정수 값인 메타 데이터 조각(piece)입니다. 각 메시지는 파티션에 고유한 오프셋 값을 갖는다.


카프카의 각 메시지는 고유한 오프셋을 갖고 오프셋은 특정 파티션에서 해당 메시지의 위치를 나타낸다.


컨슈머가 파티션에서 메시지를 읽으면 카프카는 마지막으로 사용한 메시지의 오프셋을 알 수 있다. 카프카 오프셋은 _consumer_offsets라는 토픽에 저장되며 컨슈머는 컨슘 메시지를 잊지 않고 중지한 부분부터 재시작할 수 있다.


어떻게 디폴트로 저장되는지 보려면 다음 값을 확인할 수 있다. 


enable.auto.commit  (기본값은 true)

auto.commit.interval.ms (기본값은 5000)

 

즉 컨슈머는 기본적으로 매 5초마다 카프카(Kafka)에 오프셋을 자동 커밋(commit)하거나 지정 토픽에서 데이터를 가져올 때마다 최신 오프셋을 커밋한다



만약 중복 처리를 최대한 하고 싶지 않다면 메시지의 오프셋을 수동으로 커밋(commit)한다. 


그리고 enable.auto.commit 속성의 값을 false로 변경해야 한다.


(자연스럽게 auto.commit.interval.ms 값은 무시된다.)


 


Posted by '김용환'
,



구글 드라이브 용량이 부족하거든.. (용량 부족하면 이메일도 가지 않는다)


필요없는 첨부파일 메일, 


용량 큰 중복 파일이 있는지 확인하고 지우고..


아래  URL에 접속해서 애매하게 남아있는 데이터를 지워야 한다.


https://drive.google.com/drive/search?q=is:unorganized%20owner:me



Posted by '김용환'
,


pull requst할 때 자주 발생할 수 있는 것으로


원격 리모트의 새로운 브랜치가 추가되었고 이를 기반으로 pull request를 하려고 그냥 브랜치를 받고 pr하면 history가 꼬인다.


따라서 pr할 원격 리모트와 연결한 후 git checkout 받고 자신의 저장소에 push한다.


$ git checkout -b dev real/dev

$ git push origin dev





상황에 따라 upstream을 수동으로 연동해야 할 수도 있다.


$ git branch --set-upstream-to origin/dev

Posted by '김용환'
,


맥 High Sierra OS 버전의 크롬 브라우저의 쿠키를 확인하려면 sqlite를 확인하면 된다.


table을 확인할 수 있고


schema를 사용하면 스키마 정보를 볼 수 있다.


$ cd /Users/sameul/Library/Application Support/Google/Chrome/Default

$ sqlite3 Cookies

SQLite version 3.19.3 2017-06-27 16:48:08

Enter ".help" for usage hints.


sqlite> .tables

cookies  meta


sqlite> .schema

CREATE TABLE meta(key LONGVARCHAR NOT NULL UNIQUE PRIMARY KEY, value LONGVARCHAR);

CREATE TABLE cookies (creation_utc INTEGER NOT NULL,host_key TEXT NOT NULL,name TEXT NOT NULL,value TEXT NOT NULL,path TEXT NOT NULL,expires_utc INTEGER NOT NULL,is_secure INTEGER NOT NULL,is_httponly INTEGER NOT NULL,last_access_utc INTEGER NOT NULL, has_expires INTEGER NOT NULL DEFAULT 1, is_persistent INTEGER NOT NULL DEFAULT 1,priority INTEGER NOT NULL DEFAULT 1,encrypted_value BLOB DEFAULT '',firstpartyonly INTEGER NOT NULL DEFAULT 0,UNIQUE (host_key, name, path));




sqlite3에 읽을 데이터 파일 이름을 주지 않아도 .open 커맨드를 사용해 파일을 읽을 수 있다.


$ sqlite3

SQLite version 3.19.3 2017-06-27 16:48:08

Enter ".help" for usage hints.

Connected to a transient in-memory database.

Use ".open FILENAME" to reopen on a persistent database.

sqlite> .open Cookies



cookies 테이블 정보를 인덴트를 주어 보고 싶다면 다음 커맨드를 사용한다.


sqlite> .schema --indent cookies

CREATE TABLE cookies(

  creation_utc INTEGER NOT NULL,

  host_key TEXT NOT NULL,

  name TEXT NOT NULL,

  value TEXT NOT NULL,

  path TEXT NOT NULL,

  expires_utc INTEGER NOT NULL,

  is_secure INTEGER NOT NULL,

  is_httponly INTEGER NOT NULL,

  last_access_utc INTEGER NOT NULL,

  has_expires INTEGER NOT NULL DEFAULT 1,

  is_persistent INTEGER NOT NULL DEFAULT 1,

  priority INTEGER NOT NULL DEFAULT 1,

  encrypted_value BLOB DEFAULT '',

  firstpartyonly INTEGER NOT NULL DEFAULT 0,

  UNIQUE(host_key, name, path)

);



encrypted_value를 보려면 파이썬을 활용할 수 있다.

Posted by '김용환'
,




파이썬에서 모듈 프로그래밍(디렉토리 , ___init__.py)를 진행할 때,


ModuleNotFoundError를 부딪힐 일이 있다.



$ python util/scrapers.py

Traceback (most recent call last):

  File "util/scrapers.py", line 3, in <module>

    from util.all_scrapers import re_scraper, bs_scraper, \

ModuleNotFoundError: No module named 'util'



이 이유는 파이썬 패스를 못찾다 보니 모듈을 찾지 못한 것이다. 

PYTHONPATH를 bash 설정 파일(예, bash_profile)에 지정하면 된다.



$ cat ~/bash_profile

PYTHONPATH=$PYTHONPATH:/~/dev/python/scraping/code

export PYTHONPATH


Posted by '김용환'
,