Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
We have the following code in DefaultEventHandler:

    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to
decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking
availability of the leader,
            // since we want to postpone the failure until the send
operation anyways
            partitionId
          case None =>
            val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
            val index = Utils.abs(partitionCounter.getAndIncrement()) %
availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)

So, if key is null, the partitioner is ignored.

Thanks,

Jun
On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]> wrote:

> hmmm, yeah, on I don't want todo that ... if we don't have to.
>
> What if the DefaultPartitioner code looked like this instead =8^)
>
> private class DefaultPartitioner[T](props: VerifiableProperties = null)
> extends Partitioner[T] {
>
>   def partition(key: T, numPartitions: Int): Int = {
>     if (key == null) {
>         import java.util.UUID
>         Utils.abs(UUID.randomUUID.toString()) % numPartitions
>     }
>     else {
>        Utils.abs(key.hashCode) % numPartitions
>     }
>   }
> }
>
>
> Again the goal here is the simple (often initial and dev side up and
> running out of the box) so folks don't have to randomize the keys
> themselves to get this effect
>
> We would still have to also have RandomMetaRefreshPartitioner class right?
> so null keys there would wait for the time refresh for that use case,
> right?
>
> private class RandomMetaRefreshPartitioner[T](props: VerifiableProperties =
> null) extends Partitioner[T] {
>
>   def partition(key: T, numPartitions: Int): Int = {
>     Utils.abs(key.hashCode) % numPartitions
>   }
> }
>
>
> On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > However, currently, if key is null, the partitioner is not even called.
> Do
> > you want to change DefaultEventHandler too?
> >
> > This also doesn't allow the partitioner to select a random and available
> > partition, which in my opinion is more important than making partitions
> > perfectly evenly balanced.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> >
> > > What I was proposing was two fold
> > >
> > > 1) revert the DefaultPartitioner class
> > >
> > > then
> > >
> > > 2) create a new partitioner that folks could use (like at LinkedIn you
> > > would use this partitioner instead) in ProducerConfig
> > >
> > > private class RandomRefreshTimPartitioner[T](props:
> VerifiableProperties
> > =
> > > null) extends Partitioner[T] {
> > >   private val random = new java.util.Random
> > >
> > >   def partition(key: T, numPartitions: Int): Int = {
> > >     Utils.abs(key.hashCode) % numPartitions
> > >   }
> > > }
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > >
> > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> > >
> > > > Joe,
> > > >
> > > > Not sure I fully understand your propose. Do you want to put the
> random
> > > > partitioning selection logic (for messages without a key) in the

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB