Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> async producer behavior if zk and/or kafka cluster goes away...

Copy link to this message
Re: async producer behavior if zk and/or kafka cluster goes away...
Starting in 0.8 there is no direct connection from the producer to zk. The
goal here is to make it easy to implement clients in non-java languages and
avoid painful zk upgrades. ZK is replaced by a "get_metadata" api (which in
terms of implementation, of course still just reads from zk--but now the
read is done by the server).

I think the intended behavior for the async producer is the following:
1. Messages are immediately added to a queue of messages to be sent
2. The  number of messages that can be in the queue is bounded
by queue.size. If the queue is full the parameter
queue.enqueueTimeout.msdetermines how long the producer will wait for
the queue length to decrease
before dropping the message
3. Messages are sent by a background thread. If this thread falls behind
due to throughput or because the kafka cluster is down messages will pile
up in the queue.
4. If the send operation fails there are two options (1) retry, (2) give
up. If you retry you may get a duplicate (this is the semantics of any
mutation RPC in any system--e.g. if you get a network error you do not know
if the mutation has occurred or not). If you give up you will lose those
messages. This decision is controlled by producer.num.retries.

It is desirable that the client not die if the zk connection is lost, if
possible, since zk sometimes can have gc pauses or disk latency or whatever
other transient issue.

Because the send is async it is impossible and does not have a 1-1
correspondence to the network communication it is not possible to
immediately throw an exception in the send() call.

This is not ideal since how do you know if your send succeeded? We think
the fix is to have the send call return a future representing the result of
the request that will eventually be made as well as returning the offset of
your message if you care. This is a bit of a large refactoring of the
producer code (which could definitely use some refactoring) so our
tentative plan was to address it in 0.9.

I think what you are saying is slightly different, though, which is that
the behavior between the various cases should be consistent. What do you
think would be the right behavior?


On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> I forgot to mention, that I'm working with a recent version of the 0.8 code
> (Last chaned rev: 1396425).
> Jason
> On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
> > I've been doing some testing, with an async producer.
> >
> > It seems, if I start up the producer, with no zk cluster present, it does
> > what I expect, that is it waits for a limited time looking for the zk
> > cluster, and then gives up after the zk.connectiontimeout.ms setting
> > (6000ms, by default), and fails to send a message.  However, if after
> > starting up zk and having a good connection to zk and kafka, I then
> > shutdown the zk cluster, the producer never seems to stop accepting
> > messages to send.
> >
> > As long as kafka stays up and running, even without zk still available,
> my
> > producer sends messages and my consumer can consume them.
> >
> > However, if I then stop kafka also, my producer happily keeps on
> accepting
> > messages without failing in a call to producer.send().  It's clearly no
> > longer able to send any messages at this point.  So, I assume it
> eventually
> > will just start dropping messages on the floor?
> >
> > I would have expected that once both zk/kafka are not available, things
> > should revert to the initial startup case, where it tries for 6000ms and
> > then throws an exception on send.
> >
> > Thoughts?
> >
> > What's the expected behavior for async producers, when the async buffered
> > messages can't be sent.  I think it's fine if they are just lost, but
> > should it be possible to block further accepting of messages once the
> > system has detected a problem communicating with zk/kafka?
> >
> > Also, if I cleanly shutdown an async producer (e.g. call
> > producer.close()), should it make a best effort to send out any buffered