Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
hmmm, yeah, on I don't want todo that ... if we don't have to.

What if the DefaultPartitioner code looked like this instead =8^)

private class DefaultPartitioner[T](props: VerifiableProperties = null)
extends Partitioner[T] {

  def partition(key: T, numPartitions: Int): Int = {
    if (key == null) {
        import java.util.UUID
        Utils.abs(UUID.randomUUID.toString()) % numPartitions
    }
    else {
       Utils.abs(key.hashCode) % numPartitions
    }
  }
}
Again the goal here is the simple (often initial and dev side up and
running out of the box) so folks don't have to randomize the keys
themselves to get this effect

We would still have to also have RandomMetaRefreshPartitioner class right?
so null keys there would wait for the time refresh for that use case, right?

private class RandomMetaRefreshPartitioner[T](props: VerifiableProperties =
null) extends Partitioner[T] {

  def partition(key: T, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}
On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> However, currently, if key is null, the partitioner is not even called. Do
> you want to change DefaultEventHandler too?
>
> This also doesn't allow the partitioner to select a random and available
> partition, which in my opinion is more important than making partitions
> perfectly evenly balanced.
>
> Thanks,
>
> Jun
>
>
> On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > What I was proposing was two fold
> >
> > 1) revert the DefaultPartitioner class
> >
> > then
> >
> > 2) create a new partitioner that folks could use (like at LinkedIn you
> > would use this partitioner instead) in ProducerConfig
> >
> > private class RandomRefreshTimPartitioner[T](props: VerifiableProperties
> =
> > null) extends Partitioner[T] {
> >   private val random = new java.util.Random
> >
> >   def partition(key: T, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> > }
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> >
> > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> > > Joe,
> > >
> > > Not sure I fully understand your propose. Do you want to put the random
> > > partitioning selection logic (for messages without a key) in the
> > > partitioner without changing the partitioner api? That's difficult. The
> > > issue is that in the current partitioner api, we don't know which
> > > partitions are available. For example, if we have replication factor 1
> > on a
> > > topic and a broker is down, the best thing to do for the random
> > partitioner
> > > is to select an available partition at random (assuming more than 1
> > > partition is created for the topic).
> > >
> > > Another option is to revert the logic in the random partitioning
> > selection
> > > logic in DefaultEventHandler to select a random partition per batch of
> > > events (instead of sticking with a random partition for some configured
> > > amount of time). This is doable, but I am not sure if it's that
> critical.
> > > Since this is one of the two possible behaviors in 0.7, it's hard to
> say
> > > whether people will be surprised by that. Preserving both behaviors in
> > 0.7
> > > will require changing the partitioner api. This is more work and I
> agree
> > > it's better to do this post 0.8.0 final.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> > >
> > > > Jun, can we hold this extra change over for 0.8.1 and just go with
> > > > reverting where we were before for the default with a new partition
> for
> > > > meta refresh and support both?
> > > >
> > > > I am not sure I entirely understand why someone would need the extra