Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Re: Client improvement discussion

Xavier Stevens 2013-07-26, 22:41
Copy link to this message
Re: Client improvement discussion
Hey Chris,

Great questions.

1. Sync vs async. Actually I am saying the client will (1) always be async
but (2) always return a response. So
   val resp = client.send(k, v)  // returns immediately
   resp.await() // waits for request to complete
   resp.offset() // waits for request to complete (if needed) and returns
Basically the response object acts as a Future for the offset/error. So if
you like the blocking behavior you can just do
which would behave as it does now.

2. Yeah this is in the protocol. I should add it to the wiki.

3. onCompletion registers a callback that will either fire immediately (if
the request is already completed) or will fire when the request does
complete. The purpose is to allow handling multiple responses without
blocking (as await does), but at the cost of a slightly more complicated
programming model.

4. Consuming from just some topics. I see what you are saying. I don't
think I can actually have this work the way you want though because the
fetch responses are by broker not by topic. So you could ignore a broker
but it would just ignore all topics on that broker. What we probably do
need is a way to deregister a topic which would let you stop fetching for
that topic.

5. A lot of your questions are on the group membership bit. Let me just
clean it up, I think the writing is sloppy. The proposal was to implement a
generic group membership facility where each group member heartbeats to
remain a member. I was proposing to omit any partition assignment logic
from the server as that can be done client side once all group members
agree on who is in the group. So a simple approach would be that clients
know their topics and each take ~P/N partitions where P is the total
partition count and N the number of group members. P is available from the
topic metadata request and P is filled in by the group membership api. As
you say the client will likely want to commit its offsets before leaving a
group to ensure a clean hand-off.

6. Jars. Actually our non-compatability in 0.8 was not due to jars but due
to not implementing compatible protocol changes (partially due to major
refactoring of protocol). This will be even easier with the serialization
layer which would effectively have all protocol versions ever and always
use the writers version for decoding.

7. Serialization and compression. Again I think this is just confusion due
to sloppy writing. The compression will definitely be in the client. The
question is just whether it happens in the user thread when they are
enqueuing the request or in the network thread. Since machines now have
like 48 cores anything cpu-intensive in the network thread is a little
suspicious. Serialization though I think might be better left out entirely.

On Sat, Jul 27, 2013 at 8:12 PM, Chris Riccomini <[EMAIL PROTECTED]>wrote:

> Hey Jay,
> Reading over the wiki (and email thread). Here are some questions/comments:
> "Make the producer fully async to to allow issuing sends to all brokers
> simultaneously and having multiple in-flight requests simultaneously. This
> will dramatically reduce the impact of latency on throughput (which is
> important with replication)."
> Can you say a bit more about this? You're only talking about the async
> case, right? If I set a producer to sync, acks=-1, producer.send() will
> still block as expected, right?
> "Move to server-side offset management will allow us to scale this
> facility which is currently a big scalability problem for high-commit rate
> consumers due to zk non scalability."
> Just confirming that the proposal still allows us to store a K/V map (or
> metadata), in addition to just offsets, right? This was in the older
> proposal that I saw, but I just wanted to make sure. The consumer API
> don't seem to reflect this.
> """
> SendResponse r = producer.send(new KafkaMessage(topic, key, message));
> r.onCompletion(new Runnable() {System.out.println("All done")})
> r.getOffset()