Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - can high-level consumer api provide the method getting messages with non-block?


Copy link to this message
-
Re: Re: can high-level consumer api provide the method getting messages with non-block?
Milind Parikh 2012-03-08, 01:19
I introduced a sleep to let rebalance make place to account for 242 before
new createmessagestream. However to no avail.

Therefore it appears that without the resolution of 242, the
consumer.timeout.ms parameter would not work.

Thoughts/workaround?

My current workaround is inside the while(true){}; but not ideal as I have
to make some counter to increase and check counter at every iteration of
while(it.next()).

Regards
Milind
On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <[EMAIL PROTECTED]>wrote:

> You are probably hitting this bug in Kafka -
> https://issues.apache.org/jira/browse/KAFKA-242
>
> Thanks,
> Neha
>
> 2012/3/4  <[EMAIL PROTECTED]>:
> > I modify code as beblow. When the program createmessagestream again, it
> can not get any message although there are some new messages. How could I
> resolve it? Thanks!
> >
> > ConsumerConnector consumer > kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > topicCountMap.put("topic", new Integer(1));
> > while(true){
> > Map<String, List<KafkaMessageStream<Message>>> consumerMap > consumer.createMessageStreams(topicCountMap);
> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
> > ConsumerIterator<Message> it = stream.iterator();
> > try{
> > while(it.hasNext())
> > {
> > ByteBuffer buffer = it.next().payload();
> > byte [] bytes = new byte[buffer.remaining()];
> > buffer.get(bytes);
> > System.out.println(new String(bytes));
> > }
> > }
> > catch (ConsumerTimeoutException e){
> >     e.printStackTrace();
> >  }
> >  catch (Exception e){
> >   e.printStackTrace();
> >  }
> > }
> >
> >
> >
> >
>