Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Random Partitioning Issue


Copy link to this message
-
Re: Random Partitioning Issue
How about making UUID.randomUUID.toString() the default in KeyedMessage
instead of null if not supplied

def this(topic: String, message: V) = this(topic, UUID.randomUUID.toString(),
message)

and if you want the random refresh behavior then pass in "*" on the
KeyedMessage construction which we can then later check for in
defaulteventhandler

 val partition =
      if(key =="*") {

we then throw NPE if key == null in KeyedMessage like we do topic

I believe any null flow control logic is something to shy away from

if this is wrong or too much or still not the best solution we could also
hold over and just put this in the FAQ with the JIRA and let people know
when they run into this and want to randomize in development / testing and
in many production situations where the producer count is not large enough
then they have to pass in their own continuous random key... if we can get
a consensus for what we want to-do with minimal changes then I think it is
important for 0.8 otherwise wait.

On Sun, Sep 29, 2013 at 12:14 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> The main issue is that if we do that, when key is null, we can only select
> a random partition, but not a random and available partition, without
> changing the partitioner api. Being able to do the latter is important in
> my opinion. For example, a user may choose the replication factor of a
> topic to be 1. If a broker is down, it's much better to select partitions
> on other brokers for producing than losing messages.
>
> Thanks,
>
> Jun
>
>
>
> On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <[EMAIL PROTECTED]> wrote:
>
> > I think Joe's suggesting that we can remove the checking logic for
> > key==null in DefaultEventHandler, and do that in partitioner.
> >
> > One thing about this idea is any customized partitioner also has to
> > consider key == null case then.
> >
> > Guozhang
> >
> >
> > On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> > > We have the following code in DefaultEventHandler:
> > >
> > >     val partition =
> > >       if(key == null) {
> > >         // If the key is null, we don't really need a partitioner
> > >         // So we look up in the send partition cache for the topic to
> > > decide the target partition
> > >         val id = sendPartitionPerTopicCache.get(topic)
> > >         id match {
> > >           case Some(partitionId) =>
> > >             // directly return the partitionId without checking
> > > availability of the leader,
> > >             // since we want to postpone the failure until the send
> > > operation anyways
> > >             partitionId
> > >           case None =>
> > >             val availablePartitions =
> > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> > >             if (availablePartitions.isEmpty)
> > >               throw new LeaderNotAvailableException("No leader for any
> > > partition in topic " + topic)
> > >             val index = Utils.abs(partitionCounter.getAndIncrement()) %
> > > availablePartitions.size
> > >             val partitionId = availablePartitions(index).partitionId
> > >             sendPartitionPerTopicCache.put(topic, partitionId)
> > >             partitionId
> > >         }
> > >       } else
> > >         partitioner.partition(key, numPartitions)
> > >
> > > So, if key is null, the partitioner is ignored.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <[EMAIL PROTECTED]>
> wrote:
> > >
> > > > hmmm, yeah, on I don't want todo that ... if we don't have to.
> > > >
> > > > What if the DefaultPartitioner code looked like this instead =8^)
> > > >
> > > > private class DefaultPartitioner[T](props: VerifiableProperties =
> null)
> > > > extends Partitioner[T] {
> > > >
> > > >   def partition(key: T, numPartitions: Int): Int = {
> > > >     if (key == null) {
> > > >         import java.util.UUID
> > > >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> > > >     }
> > > >     else {