Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
Guozhang Wang 2013-09-29, 04:52
I think Joe's suggesting that we can remove the checking logic for
key==null in DefaultEventHandler, and do that in partitioner.

One thing about this idea is any customized partitioner also has to
consider key == null case then.

Guozhang
On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> We have the following code in DefaultEventHandler:
>
>     val partition =
>       if(key == null) {
>         // If the key is null, we don't really need a partitioner
>         // So we look up in the send partition cache for the topic to
> decide the target partition
>         val id = sendPartitionPerTopicCache.get(topic)
>         id match {
>           case Some(partitionId) =>
>             // directly return the partitionId without checking
> availability of the leader,
>             // since we want to postpone the failure until the send
> operation anyways
>             partitionId
>           case None =>
>             val availablePartitions =
> topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
>             if (availablePartitions.isEmpty)
>               throw new LeaderNotAvailableException("No leader for any
> partition in topic " + topic)
>             val index = Utils.abs(partitionCounter.getAndIncrement()) %
> availablePartitions.size
>             val partitionId = availablePartitions(index).partitionId
>             sendPartitionPerTopicCache.put(topic, partitionId)
>             partitionId
>         }
>       } else
>         partitioner.partition(key, numPartitions)
>
> So, if key is null, the partitioner is ignored.
>
> Thanks,
>
> Jun
>
>
> On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > hmmm, yeah, on I don't want todo that ... if we don't have to.
> >
> > What if the DefaultPartitioner code looked like this instead =8^)
> >
> > private class DefaultPartitioner[T](props: VerifiableProperties = null)
> > extends Partitioner[T] {
> >
> >   def partition(key: T, numPartitions: Int): Int = {
> >     if (key == null) {
> >         import java.util.UUID
> >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> >     }
> >     else {
> >        Utils.abs(key.hashCode) % numPartitions
> >     }
> >   }
> > }
> >
> >
> > Again the goal here is the simple (often initial and dev side up and
> > running out of the box) so folks don't have to randomize the keys
> > themselves to get this effect
> >
> > We would still have to also have RandomMetaRefreshPartitioner class
> right?
> > so null keys there would wait for the time refresh for that use case,
> > right?
> >
> > private class RandomMetaRefreshPartitioner[T](props:
> VerifiableProperties =
> > null) extends Partitioner[T] {
> >
> >   def partition(key: T, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> > }
> >
> >
> > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> > > However, currently, if key is null, the partitioner is not even called.
> > Do
> > > you want to change DefaultEventHandler too?
> > >
> > > This also doesn't allow the partitioner to select a random and
> available
> > > partition, which in my opinion is more important than making partitions
> > > perfectly evenly balanced.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> > >
> > > > What I was proposing was two fold
> > > >
> > > > 1) revert the DefaultPartitioner class
> > > >
> > > > then
> > > >
> > > > 2) create a new partitioner that folks could use (like at LinkedIn
> you
> > > > would use this partitioner instead) in ProducerConfig
> > > >
> > > > private class RandomRefreshTimPartitioner[T](props:
> > VerifiableProperties
> > > =
> > > > null) extends Partitioner[T] {
> > > >   private val random = new java.util.Random
> > > >
> > > >   def partition(key: T, numPartitions: Int): Int = {
> > > >     Utils.abs(key.hashCode) % numPartitions
> > > >   }
> > > > }
> > > >
> > > > /*******************************************