Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Offset committing on rebalance


Copy link to this message
-
Re: Offset committing on rebalance
It's a simple enough patch, but wouldn't this mean that messages still in process when a rebalance happens could get delivered to another consumer if we end up losing the partition? Rebalances seem to happen very frequently with a lot of consumers for some reason… And it doesn't seem like a consumer is guaranteed or likely to retain ownership of a partition it's in the middle of consuming after a rebalance.  

--  
Ian Friedman
On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote:

> We are only patching blocker issues in 0.7. 0.8 beta1 has been released and
> most dev effort will be on 0.8 and beyond. That said. This particular case
> is easy to fix. If you can port the patch in
> https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7 branch , we can
> commit that to the 0.7 branch.
>  
> Thanks,
>  
> Jun
>  
>  
> On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])> wrote:
>  
> > Ugh.
> >  
> > Is there any way to make this work in 0.7, or is transitioning to 0.8 the
> > only way? My operations engineers spent a lot of effort in configuring and
> > hardening our 0.7 production install, and 0.8 isn't released yet. Not to
> > mention having to integrate the new client side code.
> >  
> > Either way, thanks for all your help Jun.
> >  
> > --
> > Ian Friedman
> >  
> >  
> > On Thursday, August 15, 2013 at 12:21 AM, Jun Rao wrote:
> >  
> > > Yes, this is an issue and has been fixed in 0.8.
> > >  
> > > Thanks,
> > >  
> > > Jun
> > >  
> > >  
> > > On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED]) (mailto:
> > [EMAIL PROTECTED] (mailto:[EMAIL PROTECTED]))> wrote:
> > >  
> > > > Hey guys,
> > > >  
> > > > I designed my consumer app (running on 0.7) to run with autocommit off
> > and
> > > > commit manually once it was done processing a record. The intent was so
> > > > that if a consumer died while processing a message, the offset would
> > > >  
> > >  
> >  
> > not be
> > > > committed, and another box would pick up the partition and reprocess
> > >  
> >  
> > the
> > > > message. This seemed to work fine with small numbers of consumers
> > >  
> >  
> > (~10).
> > > > But now that I'm scaling it out, I'm running into a problem where it
> > >  
> >  
> > looks
> > > > like messages that consumers picked up and then errored on are not
> > >  
> >  
> > getting
> > > > processed on another machine.
> > > >  
> > > > After investigating the logs and the partition offsets in zookeeper, I
> > > > found that in ZookeeperConsumerConnector.scala closeFetchersForQueues,
> > > > called during the rebalance process, will commit the offset regardless
> > > >  
> > >  
> >  
> > of
> > > > the autocommit status. So it looks like even if my consumer is in the
> > > > middle of processing a message, the offset will be committed, and even
> > > >  
> > >  
> >  
> > if
> > > > the processing fails, it will never be picked up again. Now that I
> > >  
> >  
> > have a
> > > > lot of consumer nodes, the rebalancer is going off a lot more often
> > >  
> >  
> > and I'm
> > > > running into this constantly.
> > > >  
> > > > Were my assumptions faulty? Did I design this wrong? After reading the
> > > > comment in the code I understand that if it didn't commit the offset
> > > >  
> > >  
> >  
> > there,
> > > > the message would just get immediately consumed by whoever ended up
> > >  
> >  
> > owning
> > > > the partition, even if we were in the middle of consuming it
> > >  
> >  
> > elsewhere, and
> > > > we'd get unintentional duplicate delivery. How can I make it work the
> > >  
> >  
> > way
> > > > I've described? Is there any way?
> > > >  
> > > > Thanks in advance,
> > > >  
> > > > --
> > > > Ian Friedman
> > > >  
> > >  
> >  
> >  
>  
>  
>