Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - What should do after ConsumerConnector.createMessageStreams throws exception


Copy link to this message
-
Re: What should do after ConsumerConnector.createMessageStreams throws exception
Jun Rao 2012-11-26, 05:17
Adam,

KAFKA-379 should be fixed in 0.7.2.

Thanks,

Jun

On Thu, Nov 22, 2012 at 9:18 PM, sun liwei <[EMAIL PROTECTED]> wrote:

> Neha,
>
> In the log I found: kafka.consumer.TopicCount$ - error parsing consumer
> json string { "DS.Input.All.Gyaolan": 1 } java.lang.NullPointerException
> So the exception is caused by the unthread-safe bug in TopicCount. More
> details see https://issues.apache.org/jira/browse/KAFKA-379
>
> The version of Kafka I used is 0.7.1, the bug is not fixed yet and I don't
> know if it will be fixed in this version. So, I would like to know what
> should I do in my case?
>
> Many Thanks.
>
> Adam.
>
>
>
>
> On Fri, Nov 23, 2012 at 2:42 AM, Neha Narkhede <[EMAIL PROTECTED]
> >wrote:
>
> > Can you please send around the entire consumer log that leads to
> > ConsumerRebalanceFailedException ? That exception means the consumer
> > will not be able to startup. There can be several problems that can
> > lead to this exception, most commonly includes inability to connect to
> > zookeeper.
> >
> > Thanks,
> > Neha
> >
> > On Thu, Nov 22, 2012 at 9:04 AM, sun liwei <[EMAIL PROTECTED]> wrote:
> > > Hi, all
> > >
> > > The code is like this:
> > >
> > >          ConsumerConnector consumer = ...;
> > >          Map<String, Integer> topicCountMap = ...;
> > >          Map<String, List<KafkaStream<Message>>>  consumerMap = null;
> > >          KafkaStream<Message> stream = null;
> > >
> > >          try{
> > >                 topicCountMap.put(topic, new Integer(1));
> > > consumerMap = consumer.createMessageStreams(topicCountMap);
> > > stream = consumerMap.get(topic).get(0);
> > >          }catch(Exception e){
> > >
> > >          }finally{
> > >                 *// shall I shutdown the consumer here? or should I
> call
> > *
> > >                 //consumer.shutdown();
> > >                 //consumer = null;
> > >          }
> > >
> > > Actually the exception is:
> > *kafka.common.ConsumerRebalanceFailedException:
> > > ***-1353254404267-5541f750 can't rebalance after 4 retries*
> > >
> > > Thank you very much!
> > >
> > > Adam
> >
>