I'm looking at implementing a synchronous auto offset commit solution.
People have discussed the need for this in previous
threads......Basically, in my consumer loop, I want to make sure a message
has been actually processed before allowing it's offset to be committed.
But I don't want to commit on every message, since that would be too
expensive. So, I want to use the 'auto.commit.interval.ms' to periodically
call commitOffsets, but only after a message is processed, but not after
the next message has been issued via a call to 'next()' on the
The builtin 'auto.commit.enable' feature unfortunately allows commits to
happen on any message that has been returned via ConsumerIterator.next().
But if the consumer goes down before actually processing the message, or
if it hangs indefinitely for some reason, then this message will get
committed before it has actually been consumed successfully.
I think there are issues with trying to implement this on top of the
high-level consumer api. First, I need to worry about multiple threads
consuming in the same connector (so for now I'm limiting this to support
only 1 thread).
Also, when shutting down the connector, I need to make sure any pending
messages are committed before allowing the connector to shutdown. So, that
seems easy enough to handle.
One thing I'm more concerned with, is what happens when there's a consumer
rebalance. Looking at the ZookeeperConsumerConnector code, it seems there
are explicit calls to commitOffsets during the rebalance. I'm not sure how
to handle that from the high-level api (and do I need to worry about that?).
Thanks for any insight.