Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Re: New Producer Public API


Copy link to this message
-
Re: New Producer Public API
I thought a bit about it and I think the getCluster() thing was overly
simplistic because we try to only maintain metadata about the current set
of topics the producer cares about so the cluster might not have the
partitions for the topic the user cares about. I think actually what we
need is a new method on producer:
  List<PartitionInfo> partitionsFor(String...topics)
The intended usage of this method would be:
  int partition = myPartitionFunction(key, producer.partitionsFor(topic));
  producer.send(new ProducerRecord(topic, partition, key, value))
That is, the producer would re-fetch the current set of partitions every
time and the partitions would refresh at whatever schedule the producer
metadata refresh was set with.

So in the case of partition change producers would pick this up as their
natural metadata updates occurred.

This actually solves an important problem in the existing api, which is
that the send method will block on the first message send if we don't yet
have metadata for the destination topic. It blocks until metadata is
fetched. This is a little weird as this occurs even in non-blocking mode.
This allows an escape hatch. Someone who wants to avoid that small block on
the first send can initialize their producer and call
producer.partitionsFor(topics) to force metadata initialization.

-Jay

On Thu, Jan 30, 2014 at 4:34 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> With option 1A, if we increase # partitions on a topic, how will the
> producer find out newly created partitions? Do we expect the producer to
> periodically call getCluster()?
>
> As for ZK dependency, one of the goals of client rewrite is to reduce
> dependencies so that one can implement the client in languages other than
> java. ZK client is only available in a small number of languages.
>
> Thanks,
>
> Jun
>
>
> On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:
>
> > Clark and all,
> >
> > I thought a little bit about the serialization question. Here are the
> > options I see and the pros and cons I can think of. I'd love to hear
> > people's preferences if you have a strong one.
> >
> > One important consideration is that however the producer works will also
> > need to be how the new consumer works (which we hope to write next). That
> > is if you put objects in, you should get objects out. So we need to think
> > through both sides.
> >
> > Options:
> >
> > Option 0: What is in the existing scala code and the java code I
> > posted--Serializer and Partitioner plugin provided by the user via
> config.
> > Partitioner has a sane default, but Serializer needs to be specified in
> > config.
> >
> > Pros: How it works today in the scala code.
> > Cons: You have to adapt your serialization library of choice to our
> > interfaces. The reflective class loading means typo in the serializer
> name
> > give odd errors. Likewise there is little type safety--the ProducerRecord
> > takes Object and any type errors between the object provided and the
> > serializer give occurs at runtime.
> >
> > Option 1: No plugins
> >
> > This would mean byte[] key, byte[] value, and partitioning done by client
> > by passing in a partition *number* directly.
> >
> > The problem with this is that it is tricky to compute the partition
> > correctly and probably most people won't. We could add a getCluster()
> > method to return the Cluster instance you should use for partitioning.
> But
> > I suspect people would be lazy and not use that and instead hard-code
> > partitions which would break if partitions were added or they hard coded
> it
> > wrong. In my experience 3 partitioning strategies cover like 99% of cases
> > so not having a default implementation for this makes the common case
> > harder. Left to their own devices people will use bad hash functions and
> > get weird results.
> >
> > Option 1A: Alternatively we could partition by the key using the existing
> > default partitioning strategy which only uses the byte[] anyway but
> instead