[spark] join 예제

scala 2017. 5. 23. 14:56


spark의 join 예제이다. 


sql의 join과 같은 개념이다. collection과 case class를 활용해 데이터 집합을 하나로 결합할 수 있다. 





scala>     val person = sc.parallelize(Array((1, "samuel"), (2, "jackson"), (3, "redis"))).toDF("number", "name")

person: org.apache.spark.sql.DataFrame = [number: int, name: string]


scala> val address = sc.parallelize(Array(("samuel", "seoul"), ("jackson", "new york"), ("juno", "iceland"))).toDF("name", "address")

address: org.apache.spark.sql.DataFrame = [name: string, address: string]


scala> person.show

+------+-------+

|number|   name|

+------+-------+

|     1| samuel|

|     2|jackson|

|     3|  redis|

+------+-------+


scala> person.join(address, "name").show

+-------+------+--------+

|   name|number| address|

+-------+------+--------+

|jackson|     2|new york|

| samuel|     1|   seoul|

+-------+------+--------+





그러나 여러 join타입(예, left_outer)을 넣으면 에러가 발생한다.

 scala> person.join(address, "name", "left_outer").show
<console>:29: error: overloaded method value join with alternatives:
  (right: org.apache.spark.sql.Dataset[_],joinExprs: org.apache.spark.sql.Column,joinType: String)org.apache.spark.sql.DataFrame <and>
  (right: org.apache.spark.sql.Dataset[_],usingColumns: Seq[String],joinType: String)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.DataFrame, String, String)
       person.join(address, "name", "left_outer").show
                    ^



"name" 대신 Seq("name")을 사용한다.

 
scala> person.join(address, Seq("name"), "inner").show
+-------+------+--------+
|   name|number| address|
+-------+------+--------+
|jackson|     2|new york|
| samuel|     1|   seoul|
+-------+------+--------+



scala> person.join(address, Seq("name"), "left_outer").show
+-------+------+--------+
|   name|number| address|
+-------+------+--------+
|jackson|     2|new york|
| samuel|     1|   seoul|
|  redis|     3|    null|
+-------+------+--------+



case class를 이용할 수도 있다.



scala>     val sqlContext = new SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2418ffcc


scala>     case class Person(number: Int, name: String)

defined class Person


scala>     case class Address(name: String, address: String)

defined class Address


scala>     val person = sqlContext.createDataFrame(Person(1, "samuel") :: Person(2, "jackson") :: Person(3, "redis") :: Nil).as("person_dataframe")

person: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string]


scala>     val address = sqlContext.createDataFrame(Address("samuel", "seoul") :: Address("jackson", "new york") :: Address("juno", "iceland") :: Nil).as("address_dataframe")

address: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, address: string]


scala> val joined_dataframe = person.join(address, col("person_dataframe.name") === col("address_dataframe.name"), "inner")

joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]

###아래 처럼 사용할 수도 있다.

scala> val joined_dataframe = person.join(address, $"person_dataframe.name" === $"address_dataframe.name", "inner")

joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]



scala> joined_dataframe.show

+------+-------+-------+--------+

|number|   name|   name| address|

+------+-------+-------+--------+

|     1| samuel| samuel|   seoul|

|     2|jackson|jackson|new york|

+------+-------+-------+--------+







만약 필드 중에 null 컬럼 데이터가 있다면, Option을 사용하는 것이 좋은 방법일 것이다.

'scala' 카테고리의 다른 글

[spark] parquet 사용 예제  (0) 2017.05.26
[spark] zipWithIndex, for-yield 예제  (0) 2017.05.25
[spark] where과 filter의 차이  (0) 2017.05.23
[spark2] spark SQL 예제  (0) 2017.05.20
[spark2] spark2 rdd 생성 -makeRDD  (0) 2017.04.29
Posted by '김용환'
,



filter는 dataframe에서 where를 spark sql에서 사용하는데, 

이 둘의 차이가 무엇일까 살펴봤더니..


where는 filter의 앨리어스라 한다.




https://spark.apache.org/docs/1.5.2/api/scala/index.html#org.apache.spark.sql.DataFrame


defwhere(condition: Column)DataFrame

Filters rows using the given condition. This is an alias for filter.




결국은 아래 함수의 결과는 동일하다.


employee.filter($"age" > 15)

employee.where($"age" > 15)

'scala' 카테고리의 다른 글

[spark] zipWithIndex, for-yield 예제  (0) 2017.05.25
[spark] join 예제  (0) 2017.05.23
[spark2] spark SQL 예제  (0) 2017.05.20
[spark2] spark2 rdd 생성 -makeRDD  (0) 2017.04.29
[scala] 라인 피드("\n") 관련 예시 코드  (0) 2017.04.24
Posted by '김용환'
,

[성과] OKR

scribbling 2017. 5. 23. 11:12



한글 좋은 글, 링크
https://brunch.co.kr/@jjollae/8



영문


https://www.slideshare.net/HenrikJanVanderPol/how-to-outperform-anyone-else-introduction-to-okr


https://www.slideshare.net/mustansir78/guide-to-okr-objectives-key-results


https://library.gv.com/how-google-sets-goals-okrs-a1f69b0b72c7


책으로 보려면 아래 싸이트 가서 공짜 Perdoo-OKR-eBook.pdf를 다운받는다. 

https://info.perdoo.com/how-to-write-okrs



Posted by '김용환'
,


_cat/nodes api를 통해 노드 정보를 볼 수 있다. 


$ curl -XGET localhost:9200/_cat/nodes?v


ip        heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name

127.0.0.1           45         100   6    2.03                  mdi       *      5OEGj_a



Posted by '김용환'
,


elasticsearch 5에서 elasticsearch를 실행할 때 다른 sql처럼 bootstrap 검사를 시작한다. 


관련 내용은 다음을 참고한다. 



https://www.elastic.co/guide/en/elasticsearch/reference/master/bootstrap-checks.html


https://www.elastic.co/blog/bootstrap_checks_annoying_instead_of_devastating




Posted by '김용환'
,

[펌] uber 아키텍처

scribbling 2017. 5. 22. 18:24


EVOLVING DISTRIBUTED TRACING

https://eng.uber.com/distributed-tracing/



https://eng.uber.com/tech-stack-part-one/



qcon london -  REALTIME STREAM COMPUTING &ANALYTICS 

https://qconlondon.com/london-2016/system/files/presentation-slides/sudhittonse.pdf



oscon 2017 - wishful thinking

https://cdn.oreillystatic.com/en/assets/1/event/214/Wishful%20thinking%20Presentation.pdf

Posted by '김용환'
,


nginx에 upstream을 사용하고 있는데, 에서 어느 시간부터 다음과 같은 에러가 선형적으로 늘어가기 시작했고 엄청난 양의 에러가 발생하기 시작했다.


no live upstreams while connecting to upstream, client: ip_address , server: example.com, request: "GET / HTTP/1.1", upstream: "http://example.com", host: "example.com", referrer: "http://example.com/mypages/"


upstream prematurely closed connection while reading response header from upstream, client: ip_address , server: example.com, request: "GET / HTTP/1.1", upstream: "http://example.com", host: "example.com", referrer: "http://example.com/mypages/"




문제 해결 방법을 차례로 테스트했다.


1. timeout (proxy_timeout 등등)

2. upstream keepalive 

3. backend resource thread pool


그러나 여전히 문제를 해결할 수 없었다.


2개의 backend를 보면 upstream을 보던 설정을 1개의 backend로 보게 하고 tcpdump를 뜨면서 에러가 날 때 어떠한 현상이 있는지 확인했다.


no live upstreams while connecting to   upstream, upstream prematurely closed connection while reading response header from upstream 에러 발생과  tcpdump 덤프와의 인과 관계가 없음을 확인했다.


이 문제는 nginx 내부의 문제로 생긴 것으로 생각하고 소스를 확인해봤다.



https://github.com/nginx/nginx/blob/master/src/stream/ngx_stream_proxy_module.c#L696


    if (rc == NGX_BUSY) {

        ngx_log_error(NGX_LOG_ERR, c->log, 0, "no live upstreams");

        ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);

        return;

    }



https://github.com/nginx/nginx/blob/beaaeb9f9e642d1d153ee65569d99499eef624e9/src/http/ngx_http_upstream.c#L3551



                if (upstream->read->eof) {

                    ngx_log_error(NGX_LOG_ERR, upstream->log, 0,

                                  "upstream prematurely closed connection");


                    ngx_http_upstream_finalize_request(r, u,

                                                       NGX_HTTP_BAD_GATEWAY);

                    return;

                }



connection 이슈인 것을 확인했다..


nginx 설정을 보니 아... 내가 못 보던 nginx 사용자 정의 모듈이 있었고 해당 모듈이 특정 서버를 바라보고 있었다.


해당 모듈의 통신이 upstream에 영향을 주는 것으로 판단하고 해당 모듈을 사용하지 않도록 하니..


더이상 에러는 발생되지 않았다..





Posted by '김용환'
,

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-shards.html



전체 인덱스에 대한 shard 정보를 얻으려면 _cat/shards를 실행한다.


$ curl localhost:9200/_cat/shards

persons             0 p STARTED       10   4.1kb 127.0.0.1 5OEGj_a

persons             0 r UNASSIGNED

..




특정 인덱스의 샤드 정보는 다음 커맨드를 사용한다.


$ curl localhost:9200/_cat/shards/wikinews

wikinews 2 p STARTED    4214   71mb 127.0.0.1 5OEGj_a

wikinews 2 r UNASSIGNED

wikinews 1 p STARTED    4112 69.2mb 127.0.0.1 5OEGj_a

wikinews 1 r UNASSIGNED

wikinews 3 p STARTED    4310 73.6mb 127.0.0.1 5OEGj_a

wikinews 3 r UNASSIGNED

wikinews 4 p STARTED    4285 72.4mb 127.0.0.1 5OEGj_a

wikinews 4 r UNASSIGNED

wikinews 0 p STARTED    4146 69.6mb 127.0.0.1 5OEGj_a

wikinews 0 r UNASSIGNED




인덱스의 정보를 상세하게 보려면 h 매개변수에 옵션을 추가할 수 있다. 만약  unassinged라면 이유도 확인할 수 있다. 


$ curl localhost:9200/_cat/shards/wikinews?h=index,shard,prirep,state,unassigned.reason

wikinews 2 p STARTED

wikinews 2 r UNASSIGNED INDEX_CREATED

wikinews 1 p STARTED

wikinews 1 r UNASSIGNED INDEX_CREATED

wikinews 3 p STARTED

wikinews 3 r UNASSIGNED INDEX_CREATED

wikinews 4 p STARTED

wikinews 4 r UNASSIGNED INDEX_CREATED

wikinews 0 p STARTED

wikinews 0 r UNASSIGNED INDEX_CREATED




document number를 샤드단위로 볼 수 있기 때문에 라우팅 테스트를 쉽게 할 수 있다.


documents           1     p      STARTED        3   9.6kb 127.0.0.1 5OEGj_a

documents           0     p      STARTED        1   3.2kb 127.0.0.1 5OEGj_a






Posted by '김용환'
,


특정 기능 여부를 실행하게 하려면

또는 A/B 테스트 여부를 실행하게 하려면

또는 트래픽을 특정 서버에 보내거나 안 보내게 하려면

Flag 같은 기능을 써서 사용한다.



보통 이 Flag는 자바로 하면 property 파일 또는 실행시 -D옵션으로 전달하는 매개 변수로 사용했었다.

요즘에는 분산 coordinator인 zookeeper를 이용해 Flag 여부를 사용해 개발한다.

이런 형태의 고급 용어를 feature toogle이라 불리는 것 같다. 아마도 이렇게들 개발은 할텐데.. 막상 IT 영어로는 잘 모를 수 있다. 


참고 자료.


https://martinfowler.com/articles/feature-toggles.html


Posted by '김용환'
,

[spark2] spark SQL 예제

scala 2017. 5. 20. 06:33


Spark sql 예제이다.




scala> val dataset = Seq("samuel", "jackson", "kin").toDF("name_string")

dataset: org.apache.spark.sql.DataFrame = [name_string: string]


scala> dataset.registerTempTable("names")

warning: there was one deprecation warning; re-run with -deprecation for details


scala> sql("""select name_string from names""").show

+-----------+

|name_string|

+-----------+

|     samuel|

|    jackson|

|        kin|

+-----------+



scala> sql("""select name_string from names where name_string ='kin' """).show

+-----------+

|name_string|

+-----------+

|        kin|

+-----------+





like 검색은 조금 신경써서 해야 한다.

일반적인 like 검색일 때는 결과가 나타나지 않는다.


scala> sql("""select name_string from names where name_string like '*' """).show

+-----------+

|name_string|

+-----------+

+-----------+




like concat을 사용하면 like 검색을 할 수 있다.


scala> sql("""select name_string from names where name_string like concat('%','sam','%')  """).show

+-----------+

|name_string|

+-----------+

|     samuel|

+-----------+



오래전 부터 SQL문이 아닌 ETL 파이프라인 방식으로 사용할 수도 있었다.

(ETL 파이프은 조금 쓰기 불편하다.. )



scala> dataset.groupBy("name_string").count().filter($"count" >= 1).show()

+-----------+-----+

|name_string|count|

+-----------+-----+

|    jackson|    1|

|        kin|    1|

|     samuel|    1|

+-----------+-----+



scala> dataset.groupBy("name_string").count().filter($"count" >= 2).show()

+-----------+-----+

|name_string|count|

+-----------+-----+

+-----------+-----+



scala> dataset.select("name_string").show()

+-----------+

|name_string|

+-----------+

|     samuel|

|    jackson|

|        kin|

+-----------+



scala> dataset.select("name_string").where($"name_string".equalTo("samuel")).show()

+-----------+

|name_string|

+-----------+

|     samuel|

+-----------+


scala> dataset.select("name_string").where($"name_string".contains("sam")).show()
+-----------+
|name_string|
+-----------+
|     samuel|
+-----------+


scala> dataset.select("name_string").groupBy($"name_string").count().show()
+-----------+-----+
|name_string|count|
+-----------+-----+
|    jackson|    1|
|        kin|    1|
|     samuel|    1|
+-----------+-----+




그리고 Spark SQL에는 다양한 UDF 함수를 지원한다. 다음은 관련 예제이다.



scala> val dataset2 = Seq(("samuel", "01/05/2017"), ("noah", "01/05/2018"))

dataset2: Seq[(String, String)] = List((samuel,01/05/2017), (noah,01/05/2018))


scala> val dataset2 = Seq(("samuel", "01/05/2017"), ("noah", "01/05/2018")).toDF("name", "create_date")

dataset2: org.apache.spark.sql.DataFrame = [name: string, create_date: string]


scala> dataset2.registerTempTable("reservation")

warning: there was one deprecation warning; re-run with -deprecation for details


scala> sql("""SELECT * from reservation""").show

+------+-----------+

|  name|create_date|

+------+-----------+

|samuel| 01/05/2017|

|  noah| 01/05/2018|

+------+-----------+



몇 요일인지 확인하려면 다음과 같다. 



scala> sql("""SELECT name,create_date,from_unixtime(unix_timestamp(create_date, 'MM/dd/yyyy'), 'EEEE') as day from reservation where name='samuel' """).show

+------+-----------+--------+

|  name|create_date|     day|

+------+-----------+--------+

|samuel| 01/05/2017|Thursday|

+------+-----------+--------+






이번에는 cass class를 이용한 sql 작업이다.



scala> case class Num(x:Int)

defined class Num


scala> val rdd=sc.parallelize(List(Num(1), Num(2), Num(3)))

rdd: org.apache.spark.rdd.RDD[Num] = ParallelCollectionRDD[12] at parallelize at <console>:34


scala> spark.createDataFrame(rdd).show

+---+

|  x|

+---+

|  1|

|  2|

|  3|

+---+


scala> val df = spark.createDataFrame(rdd)

df: org.apache.spark.sql.DataFrame = [x: int]


scala> df.registerTempTable("num")

warning: there was one deprecation warning; re-run with -deprecation for details


scala> sql("""select * from num where x=2""").show

+---+

|  x|

+---+

|  2|

+---+




'scala' 카테고리의 다른 글

[spark] join 예제  (0) 2017.05.23
[spark] where과 filter의 차이  (0) 2017.05.23
[spark2] spark2 rdd 생성 -makeRDD  (0) 2017.04.29
[scala] 라인 피드("\n") 관련 예시 코드  (0) 2017.04.24
[scala] Iterator의 continually함수  (0) 2017.04.24
Posted by '김용환'
,