Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - testing issue with reliable sending


Copy link to this message
-
Re: testing issue with reliable sending
Neha Narkhede 2013-10-05, 17:01
Shouldn't this be part of the contract?  It should be able to make sure
this happens before shutting down, no?

The leader writes messages to its local log and then the replicas consume
messages from the leader and write those to their local logs. If you set
request.required.acks=1, the ack is sent to the producer only after the
leader has written messages to its local log. What you are asking for, is
part of the contract if request.required.acks=-1.

In this case, if we need to use
required.request.acks=-1, that will pretty much prevent any successful
message producing while any of the brokers for a partition is unavailable.
 So, I don't think that's an option.  (Not to mention the performance
degradation).

You can implement reliable delivery semantics while allowing rolling
restart of brokers by setting request.required.acks=-1. When one of the
replicas is shut down, the ISR reduces to remove the replica being shut
down and the messages will be committed using the new ISR.

Thanks,
Neha
On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> Neha,
>
> I'm not sure I understand.  I would have thought that if the leader
> acknowledges receipt of a message, and is then shut down cleanly (with
> controlled shutdown enabled), that it would be able to reliably persist any
> in memory buffered messages (and replicate them), before shutting down.
>  Shouldn't this be part of the contract?  It should be able to make sure
> this happens before shutting down, no?
>
> I would understand a message dropped if it were a hard shutdown.
>
> I'm not sure then how to implement reliable delivery semantics, while
> allowing a rolling restart of the broker cluster (or even to tolerate a
> single node failure, where one node might be down for awhile and need to be
> replaced or have a disk repaired).  In this case, if we need to use
> required.request.acks=-1, that will pretty much prevent any successful
> message producing while any of the brokers for a partition is unavailable.
>  So, I don't think that's an option.  (Not to mention the performance
> degradation).
>
> Is there not a way to make this work more reliably with leader only
> acknowledgment, and clean/controlled shutdown?
>
> My test does succeed, as expected, with acks = -1, at least for the 100 or
> so iterations I've let it run so far.  It does on occasion send duplicates
> (but that's ok with me).
>
> Jason
>
>
> On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <[EMAIL PROTECTED]
> >wrote:
>
> > The occasional single message loss could happen since
> > required.request.acks=1 and the leader is shut down before the follower
> > gets a chance to copy the message. Can you try your test with num acks
> set
> > to -1 ?
> >
> > Thanks,
> > Neha
> > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <[EMAIL PROTECTED]> wrote:
> >
> > > All,
> > >
> > > I'm having an issue with an integration test I've setup.  This is using
> > > 0.8-beta1.
> > >
> > > The test is to verify that no messages are dropped (or the sender gets
> an
> > > exception thrown back if failure), while doing a rolling restart of a
> > > cluster of 2 brokers.
> > >
> > > The producer is configured to use 'request.required.acks' = '1'.
> > >
> > > The servers are set up to run locally on localhost, on different ports,
> > and
> > > different data dirs.  The producer connects with a metadata brokerlist
> > > like:  "localhost:2024,localhost:1025" (so no vip).   The servers are
> set
> > > up with a default replication factor of 2.  The servers have controlled
> > > shutdown enabled, as well.
> > >
> > > The producer code looks like this:
> > >     ...
> > >     Producer<Integer, T> producer = getProducer();
> > >     try {
> > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> T>(topic,
> > > message);
> > >       producer.send(msg);
> > >       return true;
> > >     } catch (RuntimeException e) {
> > >       logger.warn("Error sending data to kafka", e);
> > >       return false;
> > >     }