org.apache.kafka.clients.producer.Partitioner이 0.10.1.1버전의 다음처럼 변경되었다.
kafka 0.8
public int partition(Object key, int partitions)
kafka 0.10.1.1
public interface Partitioner extends Configurable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
0.8에서는 partitoning을 다음처럼 진행했었다.
public int partition(Object key, int partitions) {
...
return Integer.valueOf(key) % partitions;
}
partition 개수를 얻어오려면 Cluster를 통해서 partition을 얻도록 한다.
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionsCount = partitions.size();
int id = (Integer) key;
if (id < 0) {
return random.nextInt(partitionsCount);
}
int partitionId = id % partitionsCount;
return partitionId;
}