Intellij에서 play2를 import했지만, scala object를 추가하기 어렵고 play스럽지 않아서 개발이 불편할 수 있다.


만약 아래와 같은 문구까지 나오면 다음 팁을 따른다.


Info: SBT compilation for play framework 2.x disabled by default





1) Intellij 설정 변경


Settings -> Langauges & Frameworks -> Play2 -> Compiler -> User Play 2 compiler for this project.



Info: SBT compilation for play framework 2.x disabled by default이 나오면 Intellij를 재시작한다.



2) 프로젝트 설정 변경

프로젝트에서 마우스 오른쪽 클릭 -> Add Framework Support -> Play2  추가.






Posted by '김용환'
,

1) producer 주의 사항


producer를 개발할 때 bootstrap.servers, key.serializer를 반드시 추가해야 한다.


 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.


org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.




* boostrarp.servers : 9092포트로 떠 있는 kafka 서버이다. 

* key.serializer : 메시지 키를 serialization할 때 사용하는 정보이다.



다음은 producer 예제이다. 


Properties properties = new Properties();

properties.put("client.id", "5");

properties.put("bootstrap.servers", "127.0.0.1:9092");

properties.put("request.timeout.ms", String.valueOf(timeout));

roperties.put("request.required.acks", String.valueOf(requestRequiredAcks));

properties.put("value.serializer", MyDocumentSerializer.class.getName());

properties.put("key.serializer", MyIntegerSerializer.class.getName());

properties.put("serializer.class", MyDocumentSerializer.class.getName());

properties.put("partitioner.class", MyPartitioner.class.getName());


KafkaProducer<String, Document> producer = new KafkaProducer<>(properties);




2) consumer 주의 사항

consumer를 개발할 때 반드시 group.id, zookeeper.connect를 추가해야 한다. 



* group.id : consumr group에서 사용하는 unique 그룹 값이다. 해당 그룹으로 읽어야 offset을 서로 공유하게 되어 중복 데이터를 읽지 않도록 한다. (테스트해보면 쉽게 이해 되는 값이다)


* zookeeper.connect : kafka zookeeper 정보이다. 



만약 두 property를 주지 않으면 다음과 같은 에러가 발생한다. 


java.lang.IllegalArgumentException: requirement failed: Missing required property 'group.id'

at scala.Predef$.require(Predef.scala:224)

at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177)



cosumer 예제

Properties props = new Properties();

props.put("group.id", "1");

props.put("zookeeper.connect", "localhost:2185");

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("autocommit.enable", true);

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);




구체적인 필드에 대한 설명은 아래 문서를 봐야 한다. 


https://kafka.apache.org/documentation/


Posted by '김용환'
,





scala의 collection에서 [error]  required: scala.collection.GenTraversableOnce[?] 에러가 나는 경우가 있다..




예)


scala> List(1, "x").flatten

<console>:12: error: No implicit view available from Any => scala.collection.GenTraversableOnce[B].

       List(1, "x").flatten




scala> List(1, "x").flatMap(a => a)

<console>:12: error: type mismatch;

 found   : Any

 required: scala.collection.GenTraversableOnce[?]

       List(1, "x").flatMap(a => a)

                                 ^



실제 api를 보면  다음과 같이 A => GenTraversableOnce라는 타입을 받는다. 


def flatten[B](implicit asTraversable: A => /*<:<!!!*/ GenTraversableOnce[B]): CC[B] = {


final override def flatMap[B, That](f: A => GenTraversableOnce[B])
                       (implicit bf: CanBuildFrom[List[A], B, That]): That = {



재미있는 것은 Option은 GenTraversableOnce으로 implicit으로 변환할 수 있다. 




이전에 에러를 수정하려면 다음처럼 수정하면 될 것이다.



scala> List(Some(1), Some("x"), None).flatten

res9: List[Any] = List(1, x)



scala> List(1, "x").flatMap(a => Some(a))

res8: List[Any] = List(1, x)



Posted by '김용환'
,


kafka를 재시작하더라도 kafka의 토픽과 메시지는 모두 남아 있는데, 


테스트를 위해 kafka를 재시작할 때 모든 토픽과 메시지를 삭제하고 싶을 수 있다. 


이 때, kafka를 재시작 하기 전에 config/server.properties의 log.dirs의 파일들을 모두 지우고 재시작한다. 


# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs



재시작 스크립트는 아래와 같이 사용할 것이다. 


// stop

 rm -rf /tmp/kafka-logs

// start


Posted by '김용환'
,


fine grained transformation와 coarse grained transformation의 의미를 잘설명한 쿼라가 있어서 공유한다.



https://www.quora.com/What-is-the-difference-between-fine-grained-and-coarse-grained-transformation-in-context-of-Spark


If there is a dataset with a billion rows, A fine grained transaction is one applied on smaller set, may be a single row.

A coarse grained one is an operation applied on an entire dataset. 




코오스 그레인드 기법은 전체 데이터에 특정 오퍼레이션을 적용한 기법이며, time based 에서 자주 사용된다.


파인 그레인드 기법은 작은 집합에 적용되는 기법이며 한 엔티티당 기준으로 자주 사용된다.


'데이터 분석' 카테고리의 다른 글

apache zepplin 0.6.2 설치 - python, hive 연동  (0) 2016.10.22
Posted by '김용환'
,



json을 정보를 저장하는 테이블에 view를 생성해서 json없는 테이블처럼 사용할 수 있다. 

(더 정확히 말하면 general하게 로그 수집을 위한 모델링할 수 있는 로그에 json를 사용할 수 있으면서 

쉽게 정보를 파악할 수 있는 방법이다)




아래와 같은 row를 가진 hive 테이블이 있다. 


1787821114291 service 15 birthday {"result":true,"birth":"0224","id":"15","action":"add"}  2011-02-23


이런 hive테이블을 조회하려면 아래와 같이 실행해야 한다.


select * from googleplus_reqlog where date_id='2011-02-23' and label='birthday' and get_json_object(props, '$.id')='15';





hive 질의 시 get_json_object를 사용하는 것이 불편하기 때문에 새로운 view를 만든다.


CREATE VIEW `stat`.`birthday` COMMENT 'google plus birthday' AS SELECT

`birthday_temp`.`ts` AS `ts`,

`birthday_temp`.`date_id` AS `date_id`,

`birthday_temp`.`result` AS `result`,

`birthday_temp`.`birth` AS `birth`,

`birthday_temp`.`id` AS `id`,

`birthday_temp`.`action` AS `action`,

`birthday_temp`.`from` AS `from`

FROM (

  select

    ts,

    date_id,

    get_json_object(`googleplus_reqlog`.`props`, '$.result') as result,

    get_json_object(`googleplus_reqlog`.`props`, '$.birth') as birth,

    get_json_object(`googleplus_reqlog`.`props`, '$.id') as id,

    get_json_object(`googleplus_reqlog`.`props`, '$.action') as action,

  from `src`.`googleplus_reqlog`

  where label='birthday') `birthday_temp`;

  

  

  

view를 확인한다. 


hive> desc birthday;

OK

ts                   bigint

date_id             string

result               string

birth               string

id           string

action               string




이제 get_json_object 없이 간단하게 hive 질의를 할 수 있다. 

  

 select * from birthday where date_id='2011-02-23' and account_id='15';

  


처음에 소개한 hive 질의와 동일한 결과를 내지만, 훨씬 편하다.


select * from googleplus_reqlog where date_id='2011-02-23' and label='birthday' and get_json_object(props, '$.id')='15';



Posted by '김용환'
,


mysql의 경우 reserved word를 table의 컬럼명으로 쓰인다면 back quote(`)을 사용해야 한다.


https://dev.mysql.com/doc/refman/5.7/en/identifiers.html



The identifier quote character is the backtick (`):

mysql> SELECT * FROM `select` WHERE `select`.id > 100;




마찬가지로 hive에서도 reserved word를 table의 컬럼명으로 쓰인다면 back quote(`)을 사용해야 한다.



https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

 Any column name that is specified within backticks (`) is treated literally. 


hive> select * from googleplus_log where `date`='2037-08-23' and label='birthday' and `to`='11935385';

Posted by '김용환'
,


과거에는 netstat -anp을 이용해 특정 포트를 리스닝(Listen)하는 특정 프로세스를 찾았다.



$ netstat -anp | grep LISTEN


tcp        0      0 172.17.64.41:7077           0.0.0.0:*                   LISTEN      26464/java

tcp        0      0 0.0.0.0:51047               0.0.0.0:*                   LISTEN      13816/java

...



lsof에도 비슷한 옵션이 있다. 조금 더 깔끔하게 출력된다. 그리고 컬럼 크기도 정할 수 있어서 유용하다.



$ lsof +c10  -iTCP -sTCP:LISTEN


COMMAND      PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME

java        1627 www   15u  IPv4 605880628      0t0  TCP *:41384 (LISTEN)

java        1627 www   30u  IPv4 605880676      0t0  TCP *:36194 (LISTEN)


Posted by '김용환'
,


스칼라의 리스트(또는 배열)에서 특정 index의 값을 얻을 수 있지만 없는 Index에 접근하면 에러가 발생한다. 


scala> List(8, 9, 10)(0)

res5: Int = 8


scala> List(8, 9, 10)(3)

java.lang.IndexOutOfBoundsException: 3

  at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)

  at scala.collection.immutable.List.apply(List.scala:84)

  ... 48 elided




리스트(또는 배열)에서는 IndexOutOfBoundsException이 발생하지 않도록 Option을 리턴하는 lift를 제공한다. 


scala> List(8, 9, 10).lift

res0: Int => Option[Int] = <function1>


scala> List(8, 9, 10).lift(1)

res1: Option[Int] = Some(9)


scala> List(8, 9, 10).lift(2)

res2: Option[Int] = Some(10)


scala> List(8, 9, 10).lift(3)

res3: Option[Int] = None


scala> List(8, 9, 10).lift(4)

res4: Option[Int] = None






만약 None이라도 getOrElse를 통해 값을 얻을 수 있다.


scala> List(8, 9, 10).lift(10).getOrElse(0).toInt

res10: Int = 0


scala> List(8, 9, 10).lift(1).getOrElse(0).toInt

res11: Int = 9





리스트(또는 배열)의 lift를 따로 올라가면 다음과 같다. PartialFunction이다. 


trait PartialFunction[-A, +B] extends (A => B) {
...

/** Turns this partial function into a plain function returning an `Option` result.
* @see Function.unlift
* @return a function that takes an argument `x` to `Some(this(x))` if `this`
* is defined for `x`, and to `None` otherwise.
*/
def lift: A => Option[B] = new Lifted(this)


PartialFunction[A, B]라 가정하면 PartialFunction에서 A타입을 받고 B 타입으로 올리는(lift)한다는 의미가 있다.



scala> val test: PartialFunction[Int, Int] = { case i if i == 0 => 0 ; case _ => -1}

test: PartialFunction[Int,Int] = <function1>


scala> test.lift(0)

res15: Option[Int] = Some(0)


scala> test.lift(11)

res16: Option[Int] = Some(-1)



Posted by '김용환'
,




타입에 대한 byte[]를 쉽게 만드는 방법이이다. 



ByteBuffer.allocate(4).putInt(data).array();




또는 apache commons의 SerializationUtils.serialize 메소드를 사용한다.



import org.apache.commons.lang.SerializationUtils;


SerializationUtils.serialize(data);

    public static byte[] serialize(Serializable obj) {

        ByteArrayOutputStream baos = new ByteArrayOutputStream(512);

        serialize(obj, baos);

        return baos.toByteArray();

    }



Posted by '김용환'
,