accrual 인식

나의 경제 2019. 3. 6. 15:01


매출채권의 개념에 대해서 잘 몰랐다.


회계 업무에 일부 참여하니. "accual 인식"이라는 용어를 사용하면서 매출 채권을 이해할 수 있었다..



참고  링크



https://coolestmoney.tistory.com/4


https://m.blog.naver.com/PostView.nhn?blogId=drakkon&logNo=140208805065&proxyReferer=https%3A%2F%2Fwww.google.com%2F

Posted by '김용환'
,



오랜만에 make를 쓰다보니 실수한 게 있다.


아래와 같은 에러를 만나면 탭(tab) 대신 스페이스(space)가 코드에 안에 있다고 알린다.


스페이스를 없애고 탭을 추가하면 더 이상 에러가 발생하지 않는다.

 


:makefile:4: *** missing separator.  Stop.




Posted by '김용환'
,



구글 논문 - 코드 저장소는 단일화(monolithic)가 좋더라는 내용이 담겨 있다.


https://people.engr.ncsu.edu/ermurph3/papers/seip18.pdf




Posted by '김용환'
,


여러 stream 데이터를 하나의 데이터로 join해 준다면 얼마나 좋을까?

이슈는 상태(state)를 관리해야 하기에 메모리 이슈가 있다.




yelp는 mjoin 알고리즘을 열심히 작업 중이다..

https://engineeringblog.yelp.com/2018/12/joinery-a-tale-of-unwindowed-joins.html





큐의 스트림처리 방식으로는 stream stream-join이라는 개념이 있다.



apache spark에서는 watermark를 활용한다.


https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html



https://dzone.com/articles/spark-stream-stream-join


https://blog.codecentric.de/en/2017/02/crossing-streams-joins-apache-kafka/






메모리 이슈가 있고 역시 타임아웃 이슈가 있어서 완벽히 진행하려면..

데이터를 스토리에 쌓고. 계속 데이터가 도착할 때마다 스토리지를 호출해 데이터가 다 들어올 때까지 쿼리를 날리는 수 밖에 없는 것 같다..



Posted by '김용환'
,



CDC는 변경 데이터 캡처(Change Data Capture)의 약자로서 다른 소프트웨어가 이러한 변경 사항에 응답 할 수 있도록 데이터의 변경 사항을 모니터링하고 캡처하는 시스템의 오래된 용어이다. 

데이터웨어 하우스에는 CDC 지원 기능이 내장되어 있다. 업스트림 OLTP 데이터베이스에서 데이터가 변경되면 데이터웨어 하우스를 최신으로 유지해야 한다.




대표적으로 Debezium(발음은 디비지움이라 함, https://debezium.io/docs/contribute/)이 요즘 뜨고 있는데.. 기본적으로 다양한 데이터베이스 시스템 모니터링을 지원 하는 현대적이고 분산 된 오픈 소스 변경 데이터 캡처 플랫폼이다.



(https://vladmihalcea.com/a-beginners-guide-to-cdc-change-data-capture/ 참고)




 


<참조 링크 모음>

https://en.wikipedia.org/wiki/Change_data_capture


https://vladmihalcea.com/a-beginners-guide-to-cdc-change-data-capture/



https://medium.com/blablacar-tech/streaming-data-out-of-the-monolith-building-a-highly-reliable-cdc-stack-d71599131acb


https://developers.redhat.com/videos/youtube/QYbXDp4Vu-8/


https://www.youtube.com/watch?v=IOZ2Um6e430&feature=youtu.be


https://techmagie.wordpress.com/2018/04/01/accelerating-data-loading-into-data-lake-using-cdc/



https://www.ridicorp.com/blog/2017/10/30/binlog-collector/



https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc


https://www.linkedin.com/pulse/change-data-capture-postgresql-via-debezium-part-1-paolo-scarpino/


https://www.slideshare.net/ceposta/the-hardest-part-of-microservices-your-data



https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka


https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/


https://www.confluent.io/kafka-summit-sf18/change-data-streaming-patterns-for-microservices-with-debezium


https://engineering.linkedin.com/data-replication/open-sourcing-databus-linkedins-low-latency-change-data-capture-system




<고민꺼리>


CDC를 실제로 구현하기 위한 작업이 만만치 않은 것 같다.


GTID, bin 포맷, 운영이슈(MHA 등), 아키텍처





Posted by '김용환'
,


여러 컨슈머가 동일 토픽에서 메시지를 읽을 때 사용하는 주요 패턴은  다음 2가지이다.


<로드 밸런싱(load balancing)>


각 메시지는 여러 컨슈머 중 특정 컨슈머에 전달된다. 따라서 컨슈머들(컨슈머 그룹)은 해당 토픽의 메시지를 처리하는 작업을 공유한다. 브로커는 메시지를 전달할 컨슈머를 임의로 지정한다. 이 패턴은 메시지를 처리하는 비용이 비싸서 처리를 병렬화하기 위해 컨슈머를 추가하고 싶을 때 유용하다. 또한 컨슈머가 죽으면 관련 컨슈머 그룹에서 나머지 실시간  컨슈머에게 나누어진다.


IBM 문서에 친절히 설명되어 있다.

https://www.ibm.com/support/knowledgecenter/ko/SSCGGQ_1.2.0/com.ibm.ism.doc/Developing/sharedsubscriptionsinjms.html



AMQP는 같은 큐를 소비하는 클라이언트를 여러 개 둬서 로드 밸런싱을 구현할 수 있다. JMS에서는 이 방식을 shared subscription이라 한다. 카프카에서는 load share라고 한다.



<팬 아웃(fan out)>


각 메시지가 모든 컨슈머에 전달된다. 컨슈머가 브로드캐스팅된 동일한 메시지를 서로 간섭없이 들을 수 있다. 


JMS에서는 토픽 구독, AMQP에서는 바인딩 교환이라는 용어를 사용한다. 카프카에서는 팬 아웃이라고 한다.



카프카의 경우, 컨슘될 때 카프카에서 메시지가 제거되지 않아서 소비자를 여럿 추가 할 수 있고 각 컨슈머는 자체 메시지 오프셋을 유지 관리할 수 있습니다. 물론 컨슈머는 서로 다른 컨슈머 그룹에 있어야 한다.





이 두 패턴은 함께 사용할 수 있다.




Posted by '김용환'
,


Spark에서 Sqlite DB 테이블을 읽어오는 예시이다. 


spark jdbc로 데이터 프레임을 읽을 때 항상 모든 테이블 로우를 읽을 수 있을 뿐 아니라.


특정 쿼리의 데이터만 읽을 수 있다.




두가지 방법이 있는데. 


첫 번째는 option("dbtable)에  쿼리를 추가하는 방법,


두 번째는 jdbc를 읽을 때 predicate(where절 같은 형태)를 추가하는 방법이 있다.




object JDBCMain extends SparkHelper {
def main(args: Array[String]): Unit = {
val driver = "org.sqlite.JDBC"
val path = "origin-source/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"

// driver loading
import java.sql.DriverManager
Class.forName("org.sqlite.JDBC")
val connection = DriverManager.getConnection(url)
println(connection.isClosed)
println(connection.close())

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val newDbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
newDbDataFrame.explain()

println("predicates--")
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates, props).count())
println(spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions)

val predicates2 = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates2, props).count())
println(spark.read.jdbc(url, tablename, predicates2, props).rdd.getNumPartitions)



이 예시는 Spark Definitive Guide에 있고 깃허브(https://github.com/knight76/spark-definitive-guide-sbt)에 저장되어 있다. 



Posted by '김용환'
,



스파크 (spark)  join은 두가지 전략이 있다.


셔플 조인(shuffle join)과 브로드 캐스트 조인(broadcast join)이 있다.


이 기반에는 wide dependency와 narrow dependency가  있다. 즉 최대한 driver와 executor 간 데이터 교환의 차이를 설명한 것으로서 개발 코드에 따라 성능이 달라진다.

(https://knight76.tistory.com/entry/spark-%ED%8E%8C%EC%A7%88-wide-dependecy-narrow-dependency)



셔플 조인은 조인된 데이터를 동일한 executor로 이동하기 위해 셔플 연산을 사용한다. 각 로우를 해시로 표현 및 생성한 후 적절한 장소에 보낸다. 따라서 데이터 이동이 많이 발생한다. 내부적으로 merge sort join보다 더 많은 비용이 드는 해싱을 사용한다.


따라서 join시 데이터  흐름이 narrow해지고 executor간의 데이터 복사 비용으로 속도가 떨어지게 된다.






브로드캐스트 조인은 데이터를  executor로 복사하지만,  executor간에 데이터복사가 일어나지 않기 때문에 속도가 빨라질 수 있다.


(사실 정확하게 말하면, 데이터의 양/join에 따라 브로드캐스트 조인이 좋을지, 셔플 조인이 좋을지는 테스트해봐야 한다. 이를 위해 spark sql  optimizer가 그걸 내부적으로 결정하기도 한다)






아래 코드는 Spark Definitve Guide의 예제 코드인데, explain으로 어떤 조인이 사용되었는지 알 수 있다. 


package spark.example.tutorial.chapter08

import spark.example.common.SparkHelper

object JoinMain extends SparkHelper {
def main(args: Array[String]): Unit = {
import spark.implicits._

val person = Seq(
(0, "Bill Chaambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")

val joinExpr1 = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr1).explain()

import org.apache.spark.sql.functions.broadcast
val joinExpr2 = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr2).explain()

}
}


보면 둘다 broadcast join이다. spark 내부  optimizer가 힌트를 주든 안주든 내부적으로 broadcast 조인을 할 수 있게 되었다.


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildLeft

:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)))

:  +- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- LocalTableScan [id#26, degree#27, department#28, school#29]


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildRight

:- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))

   +- LocalTableScan [id#26, degree#27, department#28, school#29]





관련 내용은 참조글은 아래 글을 기반으로 한다.


https://www.waitingforcode.com/apache-spark-sql/shuffle-join-spark-sql/read


https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/






Posted by '김용환'
,




저장 데이터가 커지면 1대의 장비에 다 저장할 수 없다.


이를 위해 DB에서는 샤딩(sharding)이라는 방식을 사용해 데이터를 분산 저장한다.(사실 플러스로 복제(replica)를 둔다)


hashing, dictionary-hashing, incremental sharding 이렇게 분리한다.



DB 외에 Nosql마다 비슷한 용어를 가지고 있다. 샤딩은 사실 파티션(partition)이라는 일반적인 단어로 포함할 수 있다.



파티션은 일반DB의 샤딩(공식 용어는 아니다),  몽고 DB, 일래스틱서치, 솔라의 샤드(shard)에 해당된다.


HBase에서는 리전(Region), 


구글 Bigtable에서는 테블릿(tablet),


카산드라(Cassandra)와 Riak은 vnode,


Couchbase에서는 vBucket이라 부른다.




Posted by '김용환'
,



spark에서 UDF (UserDefinedFunction)을 사용하다 아래와 같은 에러가 발생했다.


Exception in thread "main" java.lang.InternalError: Malformed class name

at java.lang.Class.getSimpleName(Class.java:1330)

at org.apache.spark.sql.execution.aggregate.ScalaUDAF.nodeName(udaf.scala:451)



아래와 같은 UDF 코드인데. 



object UDFExampleDemo {


    def main(args: Array[String]) {

    

        class BoolAnd extends UserDefinedAggregateFunction {

...

        }

        ...

    }

}



아래와 같이 변경해야 에러가 발생하지 않는다. 



class BoolAnd extends UserDefinedAggregateFunction {

...

}

 

 

object UDFExampleDemo {


    def main(args: Array[String]) {

    

        ...

    }

}





node name을 얻을 려면 udaf의 class의 getSimpleName을 호출한다.

override def nodeName: String = udaf.getClass.getSimpleName


java의 Class코드를 보면, 

Malformed class name을 발생하는 코드 앞에 주석 설명이 잘 되어 있다.  getSimpleName은 적당한 깊이의  클래스의 simple name을 읽어 오는 것만 허락한다.  따라서 udaf를 상속한 클래스의 위치가 main 안에 두면 안된다.





 java.lang.Class의 getSimpleName 코드



public String getSimpleName() {
if (isArray())
return getComponentType().getSimpleName()+"[]";

String simpleName = getSimpleBinaryName();
if (simpleName == null) { // top level class
simpleName = getName();
return simpleName.substring(simpleName.lastIndexOf(".")+1); // strip the package name
}
// According to JLS3 "Binary Compatibility" (13.1) the binary
// name of non-package classes (not top level) is the binary
// name of the immediately enclosing class followed by a '$' followed by:
// (for nested and inner classes): the simple name.
// (for local classes): 1 or more digits followed by the simple name.
// (for anonymous classes): 1 or more digits.

// Since getSimpleBinaryName() will strip the binary name of
// the immediatly enclosing class, we are now looking at a
// string that matches the regular expression "\$[0-9]*"
// followed by a simple name (considering the simple of an
// anonymous class to be the empty string).

// Remove leading "\$[0-9]*" from the name
int length = simpleName.length();
if (length < 1 || simpleName.charAt(0) != '$')
throw new InternalError("Malformed class name");


Posted by '김용환'
,