Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> Random Partitioning Issue


+
Joe Stein 2013-09-14, 05:11
+
Joel Koshy 2013-09-14, 12:17
+
Joe Stein 2013-09-14, 18:19
+
Jun Rao 2013-09-15, 03:15
+
Jay Kreps 2013-09-15, 15:37
+
Jay Kreps 2013-09-15, 15:45
+
Joel Koshy 2013-09-17, 17:19
+
Jay Kreps 2013-09-17, 17:41
+
Joe Stein 2013-09-18, 23:23
+
Jun Rao 2013-09-23, 00:57
+
Joe Stein 2013-09-27, 16:24
+
Jun Rao 2013-09-27, 16:46
+
Joe Stein 2013-09-27, 16:54
+
Jun Rao 2013-09-27, 17:10
+
Joe Stein 2013-09-27, 17:31
+
Jun Rao 2013-09-28, 04:12
+
Guozhang Wang 2013-09-29, 04:52
Copy link to this message
-
Re: Random Partitioning Issue
The main issue is that if we do that, when key is null, we can only select
a random partition, but not a random and available partition, without
changing the partitioner api. Being able to do the latter is important in
my opinion. For example, a user may choose the replication factor of a
topic to be 1. If a broker is down, it's much better to select partitions
on other brokers for producing than losing messages.

Thanks,

Jun

On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <[EMAIL PROTECTED]> wrote:

> I think Joe's suggesting that we can remove the checking logic for
> key==null in DefaultEventHandler, and do that in partitioner.
>
> One thing about this idea is any customized partitioner also has to
> consider key == null case then.
>
> Guozhang
>
>
> On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > We have the following code in DefaultEventHandler:
> >
> >     val partition =
> >       if(key == null) {
> >         // If the key is null, we don't really need a partitioner
> >         // So we look up in the send partition cache for the topic to
> > decide the target partition
> >         val id = sendPartitionPerTopicCache.get(topic)
> >         id match {
> >           case Some(partitionId) =>
> >             // directly return the partitionId without checking
> > availability of the leader,
> >             // since we want to postpone the failure until the send
> > operation anyways
> >             partitionId
> >           case None =>
> >             val availablePartitions =
> > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> >             if (availablePartitions.isEmpty)
> >               throw new LeaderNotAvailableException("No leader for any
> > partition in topic " + topic)
> >             val index = Utils.abs(partitionCounter.getAndIncrement()) %
> > availablePartitions.size
> >             val partitionId = availablePartitions(index).partitionId
> >             sendPartitionPerTopicCache.put(topic, partitionId)
> >             partitionId
> >         }
> >       } else
> >         partitioner.partition(key, numPartitions)
> >
> > So, if key is null, the partitioner is ignored.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]> wrote:
> >
> > > hmmm, yeah, on I don't want todo that ... if we don't have to.
> > >
> > > What if the DefaultPartitioner code looked like this instead =8^)
> > >
> > > private class DefaultPartitioner[T](props: VerifiableProperties = null)
> > > extends Partitioner[T] {
> > >
> > >   def partition(key: T, numPartitions: Int): Int = {
> > >     if (key == null) {
> > >         import java.util.UUID
> > >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> > >     }
> > >     else {
> > >        Utils.abs(key.hashCode) % numPartitions
> > >     }
> > >   }
> > > }
> > >
> > >
> > > Again the goal here is the simple (often initial and dev side up and
> > > running out of the box) so folks don't have to randomize the keys
> > > themselves to get this effect
> > >
> > > We would still have to also have RandomMetaRefreshPartitioner class
> > right?
> > > so null keys there would wait for the time refresh for that use case,
> > > right?
> > >
> > > private class RandomMetaRefreshPartitioner[T](props:
> > VerifiableProperties =
> > > null) extends Partitioner[T] {
> > >
> > >   def partition(key: T, numPartitions: Int): Int = {
> > >     Utils.abs(key.hashCode) % numPartitions
> > >   }
> > > }
> > >
> > >
> > > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> > >
> > > > However, currently, if key is null, the partitioner is not even
> called.
> > > Do
> > > > you want to change DefaultEventHandler too?
> > > >
> > > > This also doesn't allow the partitioner to select a random and
> > available
> > > > partition, which in my opinion is more important than making
> partitions
> > > > perfectly evenly balanced.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <[EMAIL PROTECTED]>

 
+
Joe Stein 2013-10-01, 05:22
+
Jun Rao 2013-10-01, 15:27
+
Joe Stein 2013-10-01, 15:35
+
Jun Rao 2013-10-01, 16:32
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB