Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
I think Joe's suggesting that we can remove the checking logic for
key==null in DefaultEventHandler, and do that in partitioner.

One thing about this idea is any customized partitioner also has to
consider key == null case then.

Guozhang
On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> We have the following code in DefaultEventHandler:
>
>     val partition =
>       if(key == null) {
>         // If the key is null, we don't really need a partitioner
>         // So we look up in the send partition cache for the topic to
> decide the target partition
>         val id = sendPartitionPerTopicCache.get(topic)
>         id match {
>           case Some(partitionId) =>
>             // directly return the partitionId without checking
> availability of the leader,
>             // since we want to postpone the failure until the send
> operation anyways
>             partitionId
>           case None =>
>             val availablePartitions =
> topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
>             if (availablePartitions.isEmpty)
>               throw new LeaderNotAvailableException("No leader for any
> partition in topic " + topic)
>             val index = Utils.abs(partitionCounter.getAndIncrement()) %
> availablePartitions.size
>             val partitionId = availablePartitions(index).partitionId
>             sendPartitionPerTopicCache.put(topic, partitionId)
>             partitionId
>         }
>       } else
>         partitioner.partition(key, numPartitions)
>
> So, if key is null, the partitioner is ignored.
>
> Thanks,
>
> Jun
>
>
> On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > hmmm, yeah, on I don't want todo that ... if we don't have to.
> >
> > What if the DefaultPartitioner code looked like this instead =8^)
> >
> > private class DefaultPartitioner[T](props: VerifiableProperties = null)
> > extends Partitioner[T] {
> >
> >   def partition(key: T, numPartitions: Int): Int = {
> >     if (key == null) {
> >         import java.util.UUID
> >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> >     }
> >     else {
> >        Utils.abs(key.hashCode) % numPartitions
> >     }
> >   }
> > }
> >
> >
> > Again the goal here is the simple (often initial and dev side up and
> > running out of the box) so folks don't have to randomize the keys
> > themselves to get this effect
> >
> > We would still have to also have RandomMetaRefreshPartitioner class
> right?
> > so null keys there would wait for the time refresh for that use case,
> > right?
> >
> > private class RandomMetaRefreshPartitioner[T](props:
> VerifiableProperties =
> > null) extends Partitioner[T] {
> >
> >   def partition(key: T, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> > }
> >
> >
> > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> > > However, currently, if key is null, the partitioner is not even called.
> > Do
> > > you want to change DefaultEventHandler too?
> > >
> > > This also doesn't allow the partitioner to select a random and
> available
> > > partition, which in my opinion is more important than making partitions
> > > perfectly evenly balanced.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> > >
> > > > What I was proposing was two fold
> > > >
> > > > 1) revert the DefaultPartitioner class
> > > >
> > > > then
> > > >
> > > > 2) create a new partitioner that folks could use (like at LinkedIn
> you
> > > > would use this partitioner instead) in ProducerConfig
> > > >
> > > > private class RandomRefreshTimPartitioner[T](props:
> > VerifiableProperties
> > > =
> > > > null) extends Partitioner[T] {
> > > >   private val random = new java.util.Random
> > > >
> > > >   def partition(key: T, numPartitions: Int): Int = {
> > > >     Utils.abs(key.hashCode) % numPartitions
> > > >   }
> > > > }
> > > >
> > > > /*******************************************
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB