org.apache.kafka.clients.producer.Partitioner이 0.10.1.1버전의 다음처럼 변경되었다. 


kafka 0.8


public int partition(Object key, int partitions)



kafka 0.10.1.1


public interface Partitioner extends Configurable {


    /**

     * Compute the partition for the given record.

     *

     * @param topic The topic name

     * @param key The key to partition on (or null if no key)

     * @param keyBytes The serialized key to partition on( or null if no key)

     * @param value The value to partition on or null

     * @param valueBytes The serialized value to partition on or null

     * @param cluster The current cluster metadata

     */

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);


    /**

     * This is called when partitioner is closed.

     */

    public void close();


}




0.8에서는 partitoning을 다음처럼 진행했었다. 


public int partition(Object key, int partitions) {

...

    return Integer.valueOf(key) % partitions;

}






partition 개수를 얻어오려면 Cluster를 통해서 partition을 얻도록 한다. 



@Override

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int partitionsCount = partitions.size();


int id = (Integer) key;

if (id < 0) {

return random.nextInt(partitionsCount);

}

int partitionId = id % partitionsCount;

return partitionId;

}


Posted by '김용환'
,


spring에는 cron 표현식을 사용하지만, 너무 똑같은 시간(0초)에 동작되지 않게 할 수 있다. 



spring 3.x에는 어노테이션에서 spel을 지원하지 않아 XML로 설정해야 한다. 



<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:task="http://www.springframework.org/schema/task"

       xsi:schemaLocation="

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">


    <task:executor id="googleExecutor" pool-size="5-30" queue-capacity="10000"/>

    <task:annotation-driven scheduler="googleScheduler" executor="googleExecutor"/>


    <task:scheduler id="googleScheduler" pool-size="30"/>



    <task:scheduled-tasks scheduler="googleScheduler">

        <task:scheduled ref="googleFeedService" method="reloadCache" fixed-delay="#{new Double(T(java.lang.Math).random()*3000).intValue() + 600000}"/>

    </task:scheduled-tasks>


</beans>





반면, spring 4.3에서 어노테이션을 사용하면 다음과 같이 쉽게 사용할 수 있다.


@Scheduled(fixedRate = 600000, initialDelayString = #{ T(java.lang.Math).random() * 3000} )




Posted by '김용환'
,


https://databricks.com/blog/2016/05/11/apache-spark-2-0-technical-preview-easier-faster-and-smarter.html


좋은 spark 2.0 소개 자료가 있다.



성능이 월등히 좋아졌다.







primitiveSpark 1.6Spark 2.0
filter15ns1.1ns
sum w/o group14ns0.9ns
sum w/ group79ns10.7ns
hash join115ns4.0ns
sort (8-bit entropy)620ns5.3ns
sort (64-bit entropy)620ns40ns
sort-merge join750ns700ns






API가 좋아졌다.

DataFrame이 쓰기 편해졌고, HiveContext(SQLContext) 대신 SparkSession이 새로 추가되었다. 




  • Unifying DataFrames and Datasets in Scala/Java: Starting in Spark 2.0, DataFrame is just a type alias for Dataset of Row. Both the typed methods (e.g. mapfiltergroupByKey) and the untyped methods (e.g. selectgroupBy) are available on the Dataset class. Also, this new combined Dataset interface is the abstraction used for Structured Streaming. Since compile-time type-safety in Python and R is not a language feature, the concept of Dataset does not apply to these languages’ APIs. Instead, DataFrame remains the primary programing abstraction, which is analogous to the single-node data frame notion in these languages. Get a peek from a Dataset API notebook.
  • SparkSession: a new entry point that replaces the old SQLContext and HiveContext. For users of the DataFrame API, a common source of confusion for Spark is which “context” to use. Now you can use SparkSession, which subsumes both, as a single entry point, as demonstrated in this notebook. Note that the old SQLContext and HiveContext are still kept for backward compatibility.
  • Simpler, more performant Accumulator API: We have designed a new Accumulator API that has a simpler type hierarchy and support specialization for primitive types. The old Accumulator API has been deprecated but retained for backward compatibility
  • DataFrame-based Machine Learning API emerges as the primary ML API: With Spark 2.0, the spark.ml package, with its “pipeline” APIs, will emerge as the primary machine learning API. While the original spark.mllib package is preserved, future development will focus on the DataFrame-based API.
  • Machine learning pipeline persistence: Users can now save and load machine learning pipelines and models across all programming languages supported by Spark.
  • Distributed algorithms in R: Added support for Generalized Linear Models (GLM), Naive Bayes, Survival Regression, and K-Means in R.


Posted by '김용환'
,