I'm looking at implementing a synchronous auto offset commit solution. People have discussed the need for this in previous threads......Basically, in my consumer loop, I want to make sure a message has been actually processed before allowing it's offset to be committed. But I don't want to commit on every message, since that would be too expensive. So, I want to use the 'auto.commit.interval.ms' to periodically call commitOffsets, but only after a message is processed, but not after the next message has been issued via a call to 'next()' on the ConsumerIterator.
The builtin 'auto.commit.enable' feature unfortunately allows commits to happen on any message that has been returned via ConsumerIterator.next(). But if the consumer goes down before actually processing the message, or if it hangs indefinitely for some reason, then this message will get committed before it has actually been consumed successfully.
I think there are issues with trying to implement this on top of the high-level consumer api. First, I need to worry about multiple threads consuming in the same connector (so for now I'm limiting this to support only 1 thread).
Also, when shutting down the connector, I need to make sure any pending messages are committed before allowing the connector to shutdown. So, that seems easy enough to handle.
One thing I'm more concerned with, is what happens when there's a consumer rebalance. Looking at the ZookeeperConsumerConnector code, it seems there are explicit calls to commitOffsets during the rebalance. I'm not sure how to handle that from the high-level api (and do I need to worry about that?).
Yes, sorry, I think that was the basis for my question. When auto commit is enabled, special care is taken to make sure things are auto-committed during a rebalance. This is needed because when a topic moves off of a consumer thread (since it is being rebalanced to another one), it's as if that topic is being shutdown on that connector, and any not-yet-committed messages need to be committed before letting go of the topic.
So, my question is around trying to understand if there's a way I can reproduce similar functionality using my own sync auto commit implementation (and I'm not sure there is). It seems that when there's a rebalance, all processed but not-yet-committed offsets will not be committed, and thus there will be no way to prevent pretty massive duplicate consumption on a rebalance. Is that about right? Or is there someway around this that I'm not seeing?
The auto-commit functionality that's builtin is so close to being all that anyone would need, except it has a glaring weakness, in that it will cause messages to be lost from time to time, and so I don't know that it will meet the needs of trying to have reliable delivery (with duplicates ok).
Jason On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
That would be great. Additionally, in the new api, it would be awesome augment the default auto-commit functionality to allow client code to mark a message for commit only after processing a message successfully! On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
Btw, after we complete KAFKA-1000 (offset management in Kafka) it should be reasonable to commit offsets on every message as long as the optional metadata portion of the offset commit request is small/empty.
Joel On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
awesome On Fri, Oct 18, 2013 at 1:10 AM, Joel Koshy <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext