Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Proposed Changes To New Producer Public API


Copy link to this message
-
Re: Proposed Changes To New Producer Public API
>> (1) if the
replication factor is 1 and there is a broker failure, we can still route
the message, (2) if a bunch of producers are started at the same time, this
prevents them from picking up the same partition in a synchronized way.

You raised a good point, Jun. Regarding #1, shouldn't round robin mean
"round robin to the next available partition", by default?
Regarding #2, this is not a problem since each producer will shuffle the
list of brokers once on startup and then round robin, no?
On Mon, Feb 3, 2014 at 10:45 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Fine with most of the changes. Just a few questions/comments.
>
> 1. Would it be better to change Callback to ProducerCallback to distinguish
> it from controller callback and potential future other types of callbacks
> (e.g. consumer)?
>
> 2. If no key is present AND no partition id is present, partitions
> will be chosen
> in a round robin fashion.
>
> Currently, our default event handler picks a random and available
> partition. This is probably better than round robin because (1) if the
> replication factor is 1 and there is a broker failure, we can still route
> the message, (2) if a bunch of producers are started at the same time, this
> prevents them from picking up the same partition in a synchronized way.
>
> 3. I think this will still make it possible to implement any
> partitioning strategy
> you would want. The "challenge case" I considered was the partitioner that
> minimizes the number of TCP connections. This artitioner
> would chose the partition hosted on the node it had most recently
> chosen in hopes
> that it would still have a connection to that node.
>
> To support this mode, we probably need a method in the producer to close
> all existing sockets.
>
> Thanks,
>
> Jun
>
>
>
> On Fri, Jan 31, 2014 at 3:04 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:
>
> > Hey folks,
> >
> > Thanks for all the excellent suggestions on the producer API, I think
> this
> > really made things better. We'll do a similar thing for the consumer as
> we
> > get a proposal together. I wanted to summarize everything I heard and the
> > proposed changes I plan to do versus ignore. I'd like to get feedback
> from
> > the committers or anyone else interest on this as a proposal (+1/0/-1)
> > before I go make the changes just to avoid churn at code review time.
> >
> > 1. Change send to use java.util.concurrent.Future in send():
> >   Future<RecordPosition> send(ProducerRecord record, Callback callback)
> > The cancel method will always return false and not do anything. Callback
> > will then be changed to
> >   interface Callback {
> >     public void onCompletion(RecordPosition position, Exception
> exception);
> >   }
> > so that the exception is available there too and will be null if no error
> > occurred. I'm not planning on changing the name callback because I
> haven't
> > thought of a better one.
> >
> > 2. We will change the way serialization works to proposal 1A in the
> > previous discussion. That is the Partitioner and Serializer interfaces
> will
> > disappear. ProducerRecord will change to:
> >   class ProducerRecord {
> >     public byte[] key() {...}
> >     public byte[] value() {...}
> >     public Integer partition() {...} // can be null
> >   }
> > So key and value are now byte[]; partitionKey will be replaced by an
> > optional partition. The behavior will be the following:
> > 1. If partition is present it will be used.
> > 2. If no partition is present but a key is present, a partition will be
> > chosen by a hash of the key
> > 3. If no key is present AND no partition id is present, partitions will
> be
> > chosen in a round robin fashion
> > In other words what is currently the DefaultPartitioner will now be hard
> > coded as the behavior whenever no partition is provided.
> >
> > In order to allow correctly choosing a partition the Producer interface
> > will include a new method:
> >   List<PartitionInfo> partitionsForTopic(String topic);
> > PartitionInfo will be changed to include the actual Node objects not just

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB