Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Handling consumer rebalance when implementing synchronous auto-offset commit

Jason Rosenberg 2013-10-15, 23:17
Jun Rao 2013-10-16, 04:01
Copy link to this message
Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Yes, sorry, I think that was the basis for my question.   When auto commit
is enabled, special care is taken to make sure things are auto-committed
during a rebalance.  This is needed because when a topic moves off of a
consumer thread (since it is being rebalanced to another one), it's as if
that topic is being shutdown on that connector, and any not-yet-committed
messages need to be committed before letting  go of the topic.

So, my question is around trying to understand if there's a way I can
reproduce similar functionality using my own sync auto commit
implementation (and I'm not sure there is).  It seems that when there's a
rebalance, all processed but not-yet-committed offsets will not be
committed, and thus there will be no way to prevent pretty massive
duplicate consumption on a rebalance.  Is that about right?  Or is there
someway around this that I'm not seeing?

The auto-commit functionality that's builtin is so close to being all that
anyone would need, except it has a glaring weakness, in that it will cause
messages to be lost from time to time, and so I don't know that it will
meet the needs of trying to have reliable delivery (with duplicates ok).

On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> If auto commit is disabled, the consumer connector won't call commitOffsets
> during rebalancing.
> Thanks,
> Jun
> On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
> > I'm looking at implementing a synchronous auto offset commit solution.
> >  People have discussed the need for this in previous
> > threads......Basically, in my consumer loop, I want to make sure a
> message
> > has been actually processed before allowing it's offset to be committed.
> >  But I don't want to commit on every message, since that would be too
> > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> > periodically
> > call commitOffsets, but only after a message is processed, but not after
> > the next message has been issued via a call to 'next()' on the
> > ConsumerIterator.
> >
> > The builtin 'auto.commit.enable' feature unfortunately allows commits to
> > happen on any message that has been returned via ConsumerIterator.next().
> >  But if the consumer goes down before actually processing the message, or
> > if it hangs indefinitely for some reason, then this message will get
> > committed before it has actually been consumed successfully.
> >
> > I think there are issues with trying to implement this on top of the
> > high-level consumer api.  First, I need to worry about multiple threads
> > consuming in the same connector (so for now I'm limiting this to support
> > only 1 thread).
> >
> > Also, when shutting down the connector, I need to make sure any pending
> > messages are committed before allowing the connector to shutdown.  So,
> that
> > seems easy enough to handle.
> >
> > One thing I'm more concerned with, is what happens when there's a
> consumer
> > rebalance.  Looking at the ZookeeperConsumerConnector code, it seems
> there
> > are explicit calls to commitOffsets during the rebalance.  I'm not sure
> how
> > to handle that from the high-level api (and do I need to worry about
> > that?).
> >
> > Thanks for any insight.
> >
> > Jason
> >

Jun Rao 2013-10-16, 14:52
Jason Rosenberg 2013-10-16, 17:35
Joel Koshy 2013-10-16, 21:20
Jason Rosenberg 2013-10-17, 03:00
Joel Koshy 2013-10-18, 05:11
Jason Rosenberg 2013-10-18, 22:54