Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Getting fixed amount of messages using Zookeeper based consumer


Copy link to this message
-
Re: Getting fixed amount of messages using Zookeeper based consumer
I tried breaking the loop in the middle and here is what I got:

10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  - Reconnect in
multifetch due to socket error:
java.nio.channels.ClosedByInterruptException
    at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
    at kafka.utils.Utils$.read(Utils.scala:538)
    at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
    at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
    at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
    at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
    at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
FecherRunnable Thread[FetchRunnable-0,5,main] interrupted

I get this exception one per broker. I get three exceptions because we have
three brokers.

I am calling KafakStream.clear() just before breaking the loop.

Is there any way to break the stream cleanly? Or am I just suppose to catch
this exception?
(My fetch size if 1 MB and batch size is small for now - 5. But I  don't
think we can ever match batch size with fetch size accurately because each
message size is not fixed).

Regards,
Vaibhav
On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Yes, it knows. The consumer offset is only advanced every time a message is
> iterated over.
>
> Thanks,
>
> Jun
>
> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED]
> >wrote:
>
> > The inner loop keeps running. If I break it in the middle, is Kafka
> broker
> > going to know that rest of the mesaages in the stream were not delivered?
> >
> > Regards,
> > Vaibhav
> > GumGum
> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote:
> >
> > > Hi all,
> > >
> > > Is there any way to get a fixed amount of messages using Zookeeper
> based
> > > consumer (ConsumerConnector)?
> > >
> > > I know that with SimpleConsumer you can pass fetchSize as an argument
> and
> > > limit the number of messages coming back.
> > >
> > > This sample code creates 4 threads that keep consuming forever.
> > >
> > >
> > > // consume the messages in the threads
> > > for(final KafkaStream<Message> stream: streams) {
> > >   executor.submit(new Runnable() {
> > >     public void run() {
> > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > >         // process message (msgAndMetadata.message())
> > >       }
> > >     }
> > >   });
> > > }
> > >
> > > Regards,
> > > Vaibhav
> > >
> > >
> > >
> >
>
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB