Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Handling consumer rebalance when implementing synchronous auto-offset commit

Copy link to this message
Re: Handling consumer rebalance when implementing synchronous auto-offset commit
If auto commit is disabled, the consumer connector won't call commitOffsets
during rebalancing.


On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> I'm looking at implementing a synchronous auto offset commit solution.
>  People have discussed the need for this in previous
> threads......Basically, in my consumer loop, I want to make sure a message
> has been actually processed before allowing it's offset to be committed.
>  But I don't want to commit on every message, since that would be too
> expensive.  So, I want to use the 'auto.commit.interval.ms' to
> periodically
> call commitOffsets, but only after a message is processed, but not after
> the next message has been issued via a call to 'next()' on the
> ConsumerIterator.
> The builtin 'auto.commit.enable' feature unfortunately allows commits to
> happen on any message that has been returned via ConsumerIterator.next().
>  But if the consumer goes down before actually processing the message, or
> if it hangs indefinitely for some reason, then this message will get
> committed before it has actually been consumed successfully.
> I think there are issues with trying to implement this on top of the
> high-level consumer api.  First, I need to worry about multiple threads
> consuming in the same connector (so for now I'm limiting this to support
> only 1 thread).
> Also, when shutting down the connector, I need to make sure any pending
> messages are committed before allowing the connector to shutdown.  So, that
> seems easy enough to handle.
> One thing I'm more concerned with, is what happens when there's a consumer
> rebalance.  Looking at the ZookeeperConsumerConnector code, it seems there
> are explicit calls to commitOffsets during the rebalance.  I'm not sure how
> to handle that from the high-level api (and do I need to worry about
> that?).
> Thanks for any insight.
> Jason