Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Handling consumer rebalance when implementing synchronous auto-offset commit


Copy link to this message
-
Re: Handling consumer rebalance when implementing synchronous auto-offset commit
Joel Koshy 2013-10-16, 21:20
Btw, after we complete KAFKA-1000 (offset management in Kafka) it
should be reasonable to commit offsets on every message as long as the
optional metadata portion of the offset commit request is small/empty.

Thanks,

Joel
On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
> That would be great.  Additionally, in the new api, it would be awesome
> augment the default auto-commit functionality to allow client code to mark
> a message for commit only after processing a message successfully!
>
>
> On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
>> For manual offset commits, it will be useful to have some kind of API that
>> informs the client when a rebalance is going to happen. We can think about
>> this when we do the client rewrite.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>>
>> > Jun,
>> >
>> > Yes, sorry, I think that was the basis for my question.   When auto
>> commit
>> > is enabled, special care is taken to make sure things are auto-committed
>> > during a rebalance.  This is needed because when a topic moves off of a
>> > consumer thread (since it is being rebalanced to another one), it's as if
>> > that topic is being shutdown on that connector, and any not-yet-committed
>> > messages need to be committed before letting  go of the topic.
>> >
>> > So, my question is around trying to understand if there's a way I can
>> > reproduce similar functionality using my own sync auto commit
>> > implementation (and I'm not sure there is).  It seems that when there's a
>> > rebalance, all processed but not-yet-committed offsets will not be
>> > committed, and thus there will be no way to prevent pretty massive
>> > duplicate consumption on a rebalance.  Is that about right?  Or is there
>> > someway around this that I'm not seeing?
>> >
>> > The auto-commit functionality that's builtin is so close to being all
>> that
>> > anyone would need, except it has a glaring weakness, in that it will
>> cause
>> > messages to be lost from time to time, and so I don't know that it will
>> > meet the needs of trying to have reliable delivery (with duplicates ok).
>> >
>> > Jason
>> >
>> >
>> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>> >
>> > > If auto commit is disabled, the consumer connector won't call
>> > commitOffsets
>> > > during rebalancing.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <[EMAIL PROTECTED]>
>> > wrote:
>> > >
>> > > > I'm looking at implementing a synchronous auto offset commit
>> solution.
>> > > >  People have discussed the need for this in previous
>> > > > threads......Basically, in my consumer loop, I want to make sure a
>> > > message
>> > > > has been actually processed before allowing it's offset to be
>> > committed.
>> > > >  But I don't want to commit on every message, since that would be too
>> > > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
>> > > > periodically
>> > > > call commitOffsets, but only after a message is processed, but not
>> > after
>> > > > the next message has been issued via a call to 'next()' on the
>> > > > ConsumerIterator.
>> > > >
>> > > > The builtin 'auto.commit.enable' feature unfortunately allows commits
>> > to
>> > > > happen on any message that has been returned via
>> > ConsumerIterator.next().
>> > > >  But if the consumer goes down before actually processing the
>> message,
>> > or
>> > > > if it hangs indefinitely for some reason, then this message will get
>> > > > committed before it has actually been consumed successfully.
>> > > >
>> > > > I think there are issues with trying to implement this on top of the
>> > > > high-level consumer api.  First, I need to worry about multiple
>> threads
>> > > > consuming in the same connector (so for now I'm limiting this to
>> > support
>> > > > only 1 thread).
>> > > >
>> > > > Also, when shutting down the connector, I need to make sure any