카프카 서버는 파티셔닝 키를 주어지지 않더라도 키를 기반으로 분산처리되도록 되어 있다. 



https://cwiki.apache.org/confluence/display/KAFKA/FAQ

Why is data not evenly distributed among partitions when a partitioning key is not specified?

In Kafka producer, a partition key can be specified to indicate the destination partition of the message. By default, a hashing-based partitioner is used to determine the partition id given the key, and people can use customized partitioners also.





그러나, 언어별 클라이언트에 따라서는 key 값을 잘 선택을 못한다면 이상 징후가 발생할 수 있다. 


scala의 경우 DefaultPartitioner는 다음과 같은 코드로 되어 있다. 



import kafka.utils._

import org.apache.kafka.common.utils.Utils

@deprecated("This class has been deprecated and will be removed in a future release. " +

            "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {

  private val random = new java.util.Random

  

  def partition(key: Any, numPartitions: Int): Int = {

    Utils.abs(key.hashCode) % numPartitions

  }

}



scala의 kafka.producer.KeyedMessage를 살펴보면, key없이 KeyedMessage를 생성하면 key와 partKey는 null이 된다. 

case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

def this(topic: String, key: K, message: V) = this(topic, key, key, message)

def partitionKey = {
if(partKey != null)
partKey
else if(hasKey)
key
else
null
}

def hasKey = key != null
}


결국 null 이 되어 같은 위치만 계속 파게 된다. 


이 클래스는 0.10.x까지 사용되고 0.11.0에서는 사라진다고 한다.. 


Posted by '김용환'
,