Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> can high-level consumer api provide the method getting messages with non-block?


Copy link to this message
-
Re: Re: can high-level consumer api provide the method getting messages with non-block?
Neha

To elaborate a little bit more,
Countandra 0.7 architecture is depicted in this link.

https://docs.google.com/drawings/d/1E9h5NodQVaA6iqxmcEYz9EZGapiNhLVHDI67XdXBBOo/edit<https://docs.google.com/drawings/d/1E9h5NodQVaA6iqxmcEYz9EZGapiNhLVHDI67XdXBBOo>

My issue has to do with how the cascaded event counter gets (or more
appropriately does not get) the events dispatched from event counter.

Shorn of all it's glory and focusing on the kafka aspect of this in the
cascaded event counter,

        with the consumer.timeout.ms NOT set, it works fine.

        with the consumer.timeout.ms SET (say to 10000 or 10 seconds), the
timeout is thrown. But no further  messages will be processed; UNLESS I
CRTL^C and restart only the cascaded event counter.

The code is below. Please let me know if I am doing anything wrong. How
long the thread sleeps does not seem to matter. By applying Jun's
suggestion, I merely delay the inevitable.

    while (true) {
         Map<String, List<KafkaMessageStream<Message>>> consumerMap consumer.createMessageStreams(topicCountMap);

         KafkaMessageStream<Message> stream consumerMap.get(CountandraProperties.cascadedCounterTopic).get(0);

         ConsumerIterator<Message> it = stream.iterator();

         try {
           while(it.hasNext()) {
                   mesg = ExampleUtils.getMessage(it.next());
                   System.out.println(mesg);
           }

         }
         catch (kafka.consumer.ConsumerTimeoutException cte) {
              try {
                  Thread.sleep(1000);
              }
              catch(Exception ei) {
              }
         }
         catch(Exception ie) {
             System.out.println(ie);
         }
    }
Regards
Milind

On Thu, Mar 8, 2012 at 8:39 AM, Neha Narkhede <[EMAIL PROTECTED]>wrote:

> >> Therefore it appears that without the resolution of 242, the
> consumer.timeout.ms parameter would not work.
>
> Please can you elaborate on this ? Do you have some test code we can look
> at ?
>
> Thanks,
> Neha
>
> On Wed, Mar 7, 2012 at 5:19 PM, Milind Parikh <[EMAIL PROTECTED]>
> wrote:
> > I introduced a sleep to let rebalance make place to account for 242
> before
> > new createmessagestream. However to no avail.
> >
> > Therefore it appears that without the resolution of 242, the
> > consumer.timeout.ms parameter would not work.
> >
> > Thoughts/workaround?
> >
> > My current workaround is inside the while(true){}; but not ideal as I
> have
> > to make some counter to increase and check counter at every iteration of
> > while(it.next()).
> >
> > Regards
> > Milind
> >
> >
> > On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <[EMAIL PROTECTED]
> >wrote:
> >
> >> You are probably hitting this bug in Kafka -
> >> https://issues.apache.org/jira/browse/KAFKA-242
> >>
> >> Thanks,
> >> Neha
> >>
> >> 2012/3/4  <[EMAIL PROTECTED]>:
> >> > I modify code as beblow. When the program createmessagestream again,
> it
> >> can not get any message although there are some new messages. How could
> I
> >> resolve it? Thanks!
> >> >
> >> > ConsumerConnector consumer > >> kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
> >> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >> > topicCountMap.put("topic", new Integer(1));
> >> > while(true){
> >> > Map<String, List<KafkaMessageStream<Message>>> consumerMap > >> consumer.createMessageStreams(topicCountMap);
> >> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
> >> > ConsumerIterator<Message> it = stream.iterator();
> >> > try{
> >> > while(it.hasNext())
> >> > {
> >> > ByteBuffer buffer = it.next().payload();
> >> > byte [] bytes = new byte[buffer.remaining()];
> >> > buffer.get(bytes);
> >> > System.out.println(new String(bytes));
> >> > }
> >> > }
> >> > catch (ConsumerTimeoutException e){
> >> >     e.printStackTrace();
> >> >  }
> >> >  catch (Exception e){
> >> >   e.printStackTrace();
> >> >  }
> >> > }
> >> >
> >> >
> >> >
>