zookeeper와 연동하는 kazoo를 python3로 업그레이드하면서 알게된 내용이다.




python2에서는 바이트 문자열(byte string)이라는 것은 무시되었다.



A prefix of 'b' or 'B' is ignored in Python 2; it indicates that the literal should become a bytes literal in Python 3 (e.g. when code is automatically converted with 2to3). A 'u' or 'b' prefix may be followed by an 'r' prefix.



그러나, python3부터는 바이트 문자열을 b또는 B로 쓰이게 되었다.


Bytes literals are always prefixed with 'b' or 'B'; they produce an instance of the bytes type instead of the str type. They may only contain ASCII characters; bytes with a numeric value of 128 or greater must be expressed with escapes.





즉 저장할 때 문자열은 encode()로,


 value.encode()


읽을 때는 decode()로 읽는다.


value.decode()






Posted by '김용환'
,



python3에서 


jinja2.exceptions.UndefinedError: 'len' is undefined 해결하려면


'|length'를 이용한다.


{% node.data|length == 0 %}


Posted by '김용환'
,


스파크 잡 관련 메모리 튜닝 정보이다.



https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html




gc 옵션과 RDD 관련 내용과 spark.storage.memoryFraction을 설명한다.



String 대신 숫자 또는 enum을 사용하는 것이 좋으며,


32GB 미만인 경우 JVM 플래그 -XX:+UseCompressedOops를 설정하여 포인터가 8바이트 대신 4바이트로 생성된다는 내용이 있다. 

Posted by '김용환'
,



build.gradle에 최신 logstash-logback-encoder를 추가했다.


compile('net.logstash.logback:logstash-logback-encoder:5.0')




https://github.com/logstash/logstash-logback-encoder


Standard Fields

These fields will appear in every LoggingEvent unless otherwise noted. The field names listed here are the default field names. The field names can be customized (see Customizing Standard Field Names).

FieldDescription
@timestampTime of the log event. (yyyy-MM-dd'T'HH:mm:ss.SSSZZ) See customizing timezone.
@versionLogstash format version (e.g. 1) See customizing version.
messageFormatted log message of the event
logger_nameName of the logger that logged the event
thread_nameName of the thread that logged the event
levelString name of the level of the event
level_valueInteger value of the level of the event
stack_trace(Only if a throwable was logged) The stacktrace of the throwable. Stackframes are separated by line endings.
tags(Only if tags are found) The names of any markers not explicitly handled. (e.g. markers from MarkerFactory.getMarker will be included as tags, but the markers from Markers will not.)




기본 포맷은 다음과 같다.


{"@timestamp":"2018-03-26T16:09:15.692+09:00","@version":"1","message":"data","logger_name":"com.kakao.sauron.api.controller.TestController","thread_name":"http-nio-8080-exec-3","level":"INFO","level_value":20000}



보통 caller 관점에서의 line_number도 필요하는데.. 


includeCallerData를 추가하면 관련 정보가 나타난다. 





<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeCallerData>true</includeCallerData>
</encoder>




caller 쪽 데이터를 출력할 수 있다.


{"@timestamp":"2018-03-26T16:22:03.196+09:00","@version":"1","message":"data","logger_name":"com.kakao.sauron.api.controller.TestController","thread_name":"http-nio-8080-exec-1","level":"INFO","level_value":20000,"caller_class_name":"com.kakao.sauron.api.controller.TestController","caller_method_name":"helloWorld","caller_file_name":"TestController.java","caller_line_number":28}





너무 많이 나와서.. 필드 이름을 조금 줄일 수 있다. 


<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeCallerData>true</includeCallerData>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</encoder>


{"@timestamp":"2018-03-26T16:28:58.886+09:00","@version":"1","message":"afdsafasfsad","logger":"com.kakao.sauron.api.controller.TestController","thread":"http-nio-8080-exec-1","level":"INFO","levelVal":20000,"caller":{"class":"com.kakao.sauron.api.controller.TestController","method":"helloWorld","file":"TestController.java","line":28}}






이정도가 제일 무난한 정도인 듯 하다.

<appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeCallerData>true</includeCallerData>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</encoder>
</appender>
<logger name="jsonLogger" additivity="false" level="DEBUG">
<appender-ref ref="consoleAppender"/>
</logger>
<root level="INFO">
<appender-ref ref="consoleAppender"/>
</root>




물론 여기에 더 해야할 점은 로그 파일의 용량, rotation을 적용해야 한다.

상용에서 사용하려면 더 신경쎠야 한다. 

Posted by '김용환'
,



기존 로컬 브랜치를 리모트를 새로운 브랜치로 생성하고 푸시된 기존 브랜치는 삭제하는 예이다. 




$ git branch -m features/bug_fix GOOGLE-539_bug_fix



$ git push --set-upstream origin GOOGLE-539_bug_fix

Total 0 (delta 0), reused 0 (delta 0)

To https://github.com/samuel-kim/google-search.git

 * [new branch]      GOOGLE-539_bug_fix -> GOOGLE-539_bug_fix

Branch GOOGLE-539_bug_fixset up to track remote branch GOOGLE-539_bug_fix from origin.



삭제할 때는 push origin  다음에 삭제할 브랜치 앞에 :을 추가해야 한다.


$ git push origin :features/bug_fix

To https://github.com/samuel-kim/google-search.git

 - [deleted]         bug_fix



Posted by '김용환'
,



spark 코드에 Log4j의 logger를 직렬화를 진행할 수 없다..



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val i = 0
val data = sc.parallelize(i to 100000)
data.foreach(i => log.info("My number"+ i))
log.warn("Finished")
}
}



아래와 같은 에러가 발생한다.


Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

at MyLog$.main(MyLog.scala:19)

at MyLog.main(MyLog.scala)

Caused by: java.io.NotSerializable






직렬화할 클래스를 만들고 extends Serializable을 추가한다. 즉, 직렬화 클래스를 하나 만들어서 내부어세 RDD를 사용하는 함수를 하나 만든다. 



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.rdd.RDD


class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def dosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("Serialization of: " + i)
(i + n).toString
}
}

object MyMapper{
def apply(n: Int): MyMapper = new MyMapper(n)
}

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.dosomething(data)
other.collect()
log.warn("Finished")
}
}

에러 없이 잘 동작한다. 

Posted by '김용환'
,



gradle에서 특정 라이브러리는 사용하고 싶지 않다면.


configuration-> compile.exclude를 사용한다.




주의할 점은 group, module이 분류되어 있다는 점이다.



configurations {
compile.exclude group: "org.slf4j", module : "slf4j-log4j12"
compile.exclude group: "javax.servlet", module : "servlet-api"
}


Posted by '김용환'
,




다음 hbase 커맨드는 다음 의사 자바 코드와 같다.


<hbase 커맨드>

scan 'table', { COLUMNS => 'colFam:colQualifier', LIMIT => 3 }



<java psudeo code>

    Scan scan = new Scan();

    scan.addFamily(Bytes.toBytes(familyName));

    filters.addFilter(pageFilter);

    scan.setFilter(filters);






다음 hbase 커맨드는 다음 의사 자바 코드와 같다.


<hbase 커맨드>
scan 'table', { COLUMNS => 'colFam:colQualifier', LIMIT => 3, FILTER => "ValueFilter( =, 'value1' )" }


<java psudeo code>
        Scan scan = new Scan();
        FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        Filter pageFilter = new PageFilter(fetchSize);
        
        SingleColumnValueFilter qualifierFilter = new SingleColumnValueFilter(
                COLUMN_FAMILY,
                COLUMN_QUALIFIER,
                CompareFilter.CompareOp.EQUAL,
                new BinaryComparator(Bytes.toBytes("value1"))
        );

        filters.addFilter(pageFilter);
        filters.addFilter(qualifierFilter);

        scan.setFilter(filters);


Posted by '김용환'
,



Hbase Scan 결과에 대해 디버깅을 하려 했다.


예전 쿼리를 Result에 대해서 keyvalue를 얻어오는 부분을 날 쿼리를 이용해 사용했다.


                    for (KeyValue kv : result.raw()) {

                        System.out.printf("row: %d, qualifier: %s, value: %s\n",

                                Bytes.toLong(Bytes.padHead(kv.getRow(), 8)),

                                new String(kv.getQualifier()),

                                new String(kv.getValue()));

                    }




그러나, CellUtil을 사용하면 좀 고급스럽게 디버깅할 수 있다.




import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;



                    List<Cell> cells = result.listCells();

                    final String TABS = "\t";

                    for (Cell cell : cells) {

                        System.out.printf("%s%scolumn=%s:%s, timestamp=%d, value=%s\n",

                                Bytes.toStringBinary(CellUtil.cloneRow(cell)),

                                TABS,

                                Bytes.toString(CellUtil.cloneFamily(cell)),

                                Bytes.toString(CellUtil.cloneQualifier(cell)),

                                cell.getTimestamp(),

                                Bytes.toString(CellUtil.cloneValue(cell)));

                    }

                    



Posted by '김용환'
,



마라톤(marathon)에는 fault tolerance와 locality를 극복할 수 있는 방법으로 constraints를 제공한다. 동일한 기능을 하는 데몬을 한 장비에 두면, 한 장비가 죽으면 더 이상 서비스를 진행할 수 없다.


그래서 가상 장비(virutal machine)에서는 anti affinity 라는 단어를 사용해 동일한 기능을 가진 서비스를 여러 장비에 분산시키면 fault tolerance와 locality를 지원한다.



<fault tolerance와 locality 소개>

https://www.bayt.com/en/specialties/q/215179/what-is-affinity-and-anti-affinity-rule-in-vmware/


Affinity rule, ensure that multiple virtual machines are always running on the same host.  As such, if one of the virtual machines is vMotioned to a different host, the associated virtual machines must be moved as well.






http://mesosphere.github.io/marathon/docs/constraints.html


CLUSTER allows you to run all of your app’s tasks on agent nodes that share a certain attribute. This is useful for example if you have apps with special hardware needs, or if you want to run them on the same rack for low latency.

A field of "hostname" tells Marathon that launched tasks of the app/pod have affinity for each other and should be launched together on the same agent:

....




하나의 장비에 동일한 서비스(데몬)을 실행시키지 않으려면 

marathon json에 다음을 추가한다.



 "constraints": [["hostname", "UNIQUE"]]



Posted by '김용환'
,