org.apache.kafka.clients.producer.Partitioner이 0.10.1.1버전의 다음처럼 변경되었다. 


kafka 0.8


public int partition(Object key, int partitions)



kafka 0.10.1.1


public interface Partitioner extends Configurable {


    /**

     * Compute the partition for the given record.

     *

     * @param topic The topic name

     * @param key The key to partition on (or null if no key)

     * @param keyBytes The serialized key to partition on( or null if no key)

     * @param value The value to partition on or null

     * @param valueBytes The serialized value to partition on or null

     * @param cluster The current cluster metadata

     */

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);


    /**

     * This is called when partitioner is closed.

     */

    public void close();


}




0.8에서는 partitoning을 다음처럼 진행했었다. 


public int partition(Object key, int partitions) {

...

    return Integer.valueOf(key) % partitions;

}






partition 개수를 얻어오려면 Cluster를 통해서 partition을 얻도록 한다. 



@Override

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int partitionsCount = partitions.size();


int id = (Integer) key;

if (id < 0) {

return random.nextInt(partitionsCount);

}

int partitionId = id % partitionsCount;

return partitionId;

}


Posted by '김용환'
,