Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Handling consumer rebalance when implementing synchronous auto-offset commit


Copy link to this message
-
Re: Handling consumer rebalance when implementing synchronous auto-offset commit
This looks great.   What is the time frame for this effort?

Jason
On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> Btw, after we complete KAFKA-1000 (offset management in Kafka) it
> should be reasonable to commit offsets on every message as long as the
> optional metadata portion of the offset commit request is small/empty.
>
> Thanks,
>
> Joel
>
>
> On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <[EMAIL PROTECTED]>
> wrote:
> > That would be great.  Additionally, in the new api, it would be awesome
> > augment the default auto-commit functionality to allow client code to
> mark
> > a message for commit only after processing a message successfully!
> >
> >
> > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> >> For manual offset commits, it will be useful to have some kind of API
> that
> >> informs the client when a rebalance is going to happen. We can think
> about
> >> this when we do the client rewrite.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> wrote:
> >>
> >> > Jun,
> >> >
> >> > Yes, sorry, I think that was the basis for my question.   When auto
> >> commit
> >> > is enabled, special care is taken to make sure things are
> auto-committed
> >> > during a rebalance.  This is needed because when a topic moves off of
> a
> >> > consumer thread (since it is being rebalanced to another one), it's
> as if
> >> > that topic is being shutdown on that connector, and any
> not-yet-committed
> >> > messages need to be committed before letting  go of the topic.
> >> >
> >> > So, my question is around trying to understand if there's a way I can
> >> > reproduce similar functionality using my own sync auto commit
> >> > implementation (and I'm not sure there is).  It seems that when
> there's a
> >> > rebalance, all processed but not-yet-committed offsets will not be
> >> > committed, and thus there will be no way to prevent pretty massive
> >> > duplicate consumption on a rebalance.  Is that about right?  Or is
> there
> >> > someway around this that I'm not seeing?
> >> >
> >> > The auto-commit functionality that's builtin is so close to being all
> >> that
> >> > anyone would need, except it has a glaring weakness, in that it will
> >> cause
> >> > messages to be lost from time to time, and so I don't know that it
> will
> >> > meet the needs of trying to have reliable delivery (with duplicates
> ok).
> >> >
> >> > Jason
> >> >
> >> >
> >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >> >
> >> > > If auto commit is disabled, the consumer connector won't call
> >> > commitOffsets
> >> > > during rebalancing.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> >> > wrote:
> >> > >
> >> > > > I'm looking at implementing a synchronous auto offset commit
> >> solution.
> >> > > >  People have discussed the need for this in previous
> >> > > > threads......Basically, in my consumer loop, I want to make sure a
> >> > > message
> >> > > > has been actually processed before allowing it's offset to be
> >> > committed.
> >> > > >  But I don't want to commit on every message, since that would be
> too
> >> > > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> >> > > > periodically
> >> > > > call commitOffsets, but only after a message is processed, but not
> >> > after
> >> > > > the next message has been issued via a call to 'next()' on the
> >> > > > ConsumerIterator.
> >> > > >
> >> > > > The builtin 'auto.commit.enable' feature unfortunately allows
> commits
> >> > to
> >> > > > happen on any message that has been returned via
> >> > ConsumerIterator.next().
> >> > > >  But if the consumer goes down before actually processing the
> >> message,
> >> > or
> >> > > > if it hangs indefinitely for some reason, then this message will
> get
> >> > > > committed before it has actually been consumed successfully.