Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> Random Partitioning Issue


+
Joe Stein 2013-09-14, 05:11
+
Joel Koshy 2013-09-14, 12:17
+
Joe Stein 2013-09-14, 18:19
+
Jun Rao 2013-09-15, 03:15
+
Jay Kreps 2013-09-15, 15:37
+
Jay Kreps 2013-09-15, 15:45
+
Joel Koshy 2013-09-17, 17:19
+
Jay Kreps 2013-09-17, 17:41
+
Joe Stein 2013-09-18, 23:23
+
Jun Rao 2013-09-23, 00:57
+
Joe Stein 2013-09-27, 16:24
+
Jun Rao 2013-09-27, 16:46
Copy link to this message
-
Re: Random Partitioning Issue
What I was proposing was two fold

1) revert the DefaultPartitioner class

then

2) create a new partitioner that folks could use (like at LinkedIn you
would use this partitioner instead) in ProducerConfig

private class RandomRefreshTimPartitioner[T](props: VerifiableProperties =
null) extends Partitioner[T] {
  private val random = new java.util.Random

  def partition(key: T, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Joe,
>
> Not sure I fully understand your propose. Do you want to put the random
> partitioning selection logic (for messages without a key) in the
> partitioner without changing the partitioner api? That's difficult. The
> issue is that in the current partitioner api, we don't know which
> partitions are available. For example, if we have replication factor 1 on a
> topic and a broker is down, the best thing to do for the random partitioner
> is to select an available partition at random (assuming more than 1
> partition is created for the topic).
>
> Another option is to revert the logic in the random partitioning selection
> logic in DefaultEventHandler to select a random partition per batch of
> events (instead of sticking with a random partition for some configured
> amount of time). This is doable, but I am not sure if it's that critical.
> Since this is one of the two possible behaviors in 0.7, it's hard to say
> whether people will be surprised by that. Preserving both behaviors in 0.7
> will require changing the partitioner api. This is more work and I agree
> it's better to do this post 0.8.0 final.
>
> Thanks,
>
> Jun
>
>
>
> On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > Jun, can we hold this extra change over for 0.8.1 and just go with
> > reverting where we were before for the default with a new partition for
> > meta refresh and support both?
> >
> > I am not sure I entirely understand why someone would need the extra
> > functionality you are talking about which sounds cool though... adding it
> > to the API (especially now) without people using it may just make folks
> ask
> > more questions and maybe not use it ... IDK ... but in any case we can
> work
> > on buttoning up 0.8 and shipping just the change for two partitioners
> > https://issues.apache.org/jira/browse/KAFKA-1067 and circling back if we
> > wanted on this extra item (including the discussion) to 0.8.1 or greater?
> >  I am always of the mind of reduce complexity unless that complexity is
> in
> > fact better than not having it.
> >
> > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> > > It's reasonable to make the behavior of random producers customizable
> > > through a pluggable partitioner. So, if one doesn't care about # of
> > socket
> > > connections, one can choose to select a random partition on every send.
> > If
> > > one does have many producers, one can choose to periodically select a
> > > random partition. To support this, the partitioner api needs to be
> > changed
> > > though.
> > >
> > > Instead of
> > >   def partition(key: T, numPartitions: Int): Int
> > >
> > > we probably need something like the following:
> > >   def partition(key: T, numPartitions: Int, availablePartitionList:
> > > List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int
> > >
> > > availablePartitionList: allows us to select only partitions that are
> > > available.
> > > isNewBatch: allows us to select the same partition for all messages in
> a
> > > given batch in the async mode.
> > > isRefreshMedatadata: allows us to implement the policy of switching to
> a
> > > random partition periodically.
> > >
> > > This will make the partitioner api a bit more complicated. However, it

 
+
Jun Rao 2013-09-27, 17:10
+
Joe Stein 2013-09-27, 17:31
+
Jun Rao 2013-09-28, 04:12
+
Guozhang Wang 2013-09-29, 04:52
+
Jun Rao 2013-09-29, 16:15
+
Joe Stein 2013-10-01, 05:22
+
Jun Rao 2013-10-01, 15:27
+
Joe Stein 2013-10-01, 15:35
+
Jun Rao 2013-10-01, 16:32