Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
This proposal still doesn't address the following fundamental issue: The
random partitioner cannot select a random and AVAILABLE partition.

So, we have the following two choices.

1. Stick with the current partitioner api.
Then, we have to pick one way to do random partitioning (when key is null).
The current behavior may not be very intuitive, but is one of the possible
behaviors in 0.7.

2. Change the partitioner api so that we can (1) be aware of available
partitions and (2) have pluggable partitioners for doing random
distribution.

Option (2) is probably the right approach. However, it's a non-trivial
change. So, I am not sure if it should be done in 0.8 or not.

Thanks,

Jun

On Mon, Sep 30, 2013 at 10:21 PM, Joe Stein <[EMAIL PROTECTED]> wrote:

> How about making UUID.randomUUID.toString() the default in KeyedMessage
> instead of null if not supplied
>
> def this(topic: String, message: V) = this(topic,
> UUID.randomUUID.toString(),
> message)
>
> and if you want the random refresh behavior then pass in "*" on the
> KeyedMessage construction which we can then later check for in
> defaulteventhandler
>
>  val partition =
>       if(key =="*") {
>
> we then throw NPE if key == null in KeyedMessage like we do topic
>
> I believe any null flow control logic is something to shy away from
>
> if this is wrong or too much or still not the best solution we could also
> hold over and just put this in the FAQ with the JIRA and let people know
> when they run into this and want to randomize in development / testing and
> in many production situations where the producer count is not large enough
> then they have to pass in their own continuous random key... if we can get
> a consensus for what we want to-do with minimal changes then I think it is
> important for 0.8 otherwise wait.
>
> On Sun, Sep 29, 2013 at 12:14 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > The main issue is that if we do that, when key is null, we can only
> select
> > a random partition, but not a random and available partition, without
> > changing the partitioner api. Being able to do the latter is important in
> > my opinion. For example, a user may choose the replication factor of a
> > topic to be 1. If a broker is down, it's much better to select partitions
> > on other brokers for producing than losing messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <[EMAIL PROTECTED]>
> wrote:
> >
> > > I think Joe's suggesting that we can remove the checking logic for
> > > key==null in DefaultEventHandler, and do that in partitioner.
> > >
> > > One thing about this idea is any customized partitioner also has to
> > > consider key == null case then.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> > >
> > > > We have the following code in DefaultEventHandler:
> > > >
> > > >     val partition =
> > > >       if(key == null) {
> > > >         // If the key is null, we don't really need a partitioner
> > > >         // So we look up in the send partition cache for the topic to
> > > > decide the target partition
> > > >         val id = sendPartitionPerTopicCache.get(topic)
> > > >         id match {
> > > >           case Some(partitionId) =>
> > > >             // directly return the partitionId without checking
> > > > availability of the leader,
> > > >             // since we want to postpone the failure until the send
> > > > operation anyways
> > > >             partitionId
> > > >           case None =>
> > > >             val availablePartitions =
> > > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> > > >             if (availablePartitions.isEmpty)
> > > >               throw new LeaderNotAvailableException("No leader for
> any
> > > > partition in topic " + topic)
> > > >             val index =
> Utils.abs(partitionCounter.getAndIncrement()) %
> > > > availablePartitions.size
> > > >             val partitionId = availablePartitions(index).partitionId

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB