Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Re: ConsumerRebalanceFailedException


Copy link to this message
-
Re: ConsumerRebalanceFailedException
Hanish Bansal 2014-01-01, 04:41
Thanks for response !!

Option looking fine to me is:check the zookeeper consumer registration
path, if the node is gone then try to restart the consumer after the
session timeout.

The thing is we ll have to  implement this as currently it is not taken
care by high level consumer.
On Tue, Dec 31, 2013 at 10:20 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Consumers typically don't need to be hard-killed. If this is not the case,
> another option is to use SimpleConsumer (the logic will be more complicated
> though), which doesn't have dependencies on ZK.
>
> Thanks,
>
> Jun
>
>
> On Mon, Dec 30, 2013 at 7:56 PM, Hanish Bansal <
> [EMAIL PROTECTED]> wrote:
>
> > As default zookeeper.session.timeout.ms is 6000 and i look into the
> > details
> > this value is negotiable.  We try to set this value to less than 4000 to
> > expire the session early but it is negotiated by zookeeper and set to
> 4000
> > ms.
> >
> > We have a backend script running which check in each second that if
> > consumer service is not running then start it. So using this we are
> > starting consumer service within second without any wait.
> > *"connector.shutdown()"
> > is good option for this but that will not work if consumer is killed
> > abnormally using kill -9*.
> >
> > Other option i am seeing to put
> "Thread.sleep(sessionTimeoutMilliseconds)"
> > in consumer service before start but that is also not good  option.
> >
> > *When ConsumerRebalanceFailedException occurs then it stop consumes the
> > data. But expected behaviour should be like this : If C**onsumerRe*
> > *balanceFai**ledExcepti**on occurs due to zookeeper session then it
> should
> > wait for that timeout interval. If previous session is timeout, it should
> > reconnect and start consuming the data.*
> >
> > Any other way to handle it?
> >
> > Also i want to know what is suggested value  for
> > zookeeper.session.timeout.ms in production ?
> >
> >
> > On Mon, Dec 30, 2013 at 11:49 PM, Guozhang Wang <[EMAIL PROTECTED]>
> > wrote:
> >
> > > Yes, it applies to consumer too.
> > >
> > >
> > > On Mon, Dec 30, 2013 at 11:46 AM, Yu, Libo <[EMAIL PROTECTED]> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > zookeeper.session.timeout.ms is used in a broker's configuration and
> > > > manages brokers' registration with zk.
> > > > Does it apply to consumer as well? Thanks.
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Jun Rao [mailto:[EMAIL PROTECTED]]
> > > > Sent: Monday, December 30, 2013 11:13 AM
> > > > To: [EMAIL PROTECTED]
> > > > Subject: Re: ConsumerRebalanceFailedException
> > > >
> > > > If the consumer is not shut down properly, it will take
> > > > zookeeper.session.timeout.ms before the consumer is deregistered
> from
> > > ZK.
> > > > If you restart the consumer before that, rebalances may fail.
> > > >
> > > > Make sure that you call connector.shutdown() when you shut down the
> > > > consumer
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Dec 30, 2013 at 1:58 AM, Hanish Bansal <
> > > > [EMAIL PROTECTED]> wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I am getting consumer rebalance failed exception if i restart my
> > > > > consumer within 1-3 seconds.
> > > > >
> > > > > Exception trace is:
> > > > >
> > > > > Caused by: kafka.common.ConsumerRebalanceFailedException:
> > > > > indexConsumerGroup1_IMPETUS-I0027C-1388416992091-ac0d82d7 can't
> > > > > rebalance after 4 retries
> > > > >     at
> > > > >
> > > > >
> > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedR
> > > > > ebalance(Unknown
> > > > > Source)
> > > > >     at
> > > > >
> > > > >
> > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperCons
> > > > > umerConnector$$reinitializeConsumer(Unknown
> > > > > Source)
> > > > >     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown
> > > Source)
> > > > >     at
> > > > >
> > > > >
> > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams

*Thanks & Regards*
*Hanish Bansal*