Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
Jun Rao 2013-09-28, 04:12
We have the following code in DefaultEventHandler:

    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to
decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking
availability of the leader,
            // since we want to postpone the failure until the send
operation anyways
            partitionId
          case None =>
            val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
            val index = Utils.abs(partitionCounter.getAndIncrement()) %
availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)

So, if key is null, the partitioner is ignored.

Thanks,

Jun
On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]> wrote:

> hmmm, yeah, on I don't want todo that ... if we don't have to.
>
> What if the DefaultPartitioner code looked like this instead =8^)
>
> private class DefaultPartitioner[T](props: VerifiableProperties = null)
> extends Partitioner[T] {
>
>   def partition(key: T, numPartitions: Int): Int = {
>     if (key == null) {
>         import java.util.UUID
>         Utils.abs(UUID.randomUUID.toString()) % numPartitions
>     }
>     else {
>        Utils.abs(key.hashCode) % numPartitions
>     }
>   }
> }
>
>
> Again the goal here is the simple (often initial and dev side up and
> running out of the box) so folks don't have to randomize the keys
> themselves to get this effect
>
> We would still have to also have RandomMetaRefreshPartitioner class right?
> so null keys there would wait for the time refresh for that use case,
> right?
>
> private class RandomMetaRefreshPartitioner[T](props: VerifiableProperties =
> null) extends Partitioner[T] {
>
>   def partition(key: T, numPartitions: Int): Int = {
>     Utils.abs(key.hashCode) % numPartitions
>   }
> }
>
>
> On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > However, currently, if key is null, the partitioner is not even called.
> Do
> > you want to change DefaultEventHandler too?
> >
> > This also doesn't allow the partitioner to select a random and available
> > partition, which in my opinion is more important than making partitions
> > perfectly evenly balanced.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> >
> > > What I was proposing was two fold
> > >
> > > 1) revert the DefaultPartitioner class
> > >
> > > then
> > >
> > > 2) create a new partitioner that folks could use (like at LinkedIn you
> > > would use this partitioner instead) in ProducerConfig
> > >
> > > private class RandomRefreshTimPartitioner[T](props:
> VerifiableProperties
> > =
> > > null) extends Partitioner[T] {
> > >   private val random = new java.util.Random
> > >
> > >   def partition(key: T, numPartitions: Int): Int = {
> > >     Utils.abs(key.hashCode) % numPartitions
> > >   }
> > > }
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > >
> > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> > >
> > > > Joe,
> > > >
> > > > Not sure I fully understand your propose. Do you want to put the
> random
> > > > partitioning selection logic (for messages without a key) in the