Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Getting fixed amount of messages using Zookeeper based consumer


Copy link to this message
-
Re: Getting fixed amount of messages using Zookeeper based consumer
Vaibhav Puranik 2012-07-12, 16:34
I tried breaking the loop in the middle and here is what I got:

10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  - Reconnect in
multifetch due to socket error:
java.nio.channels.ClosedByInterruptException
    at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
    at kafka.utils.Utils$.read(Utils.scala:538)
    at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
    at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
    at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
    at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
    at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
FecherRunnable Thread[FetchRunnable-0,5,main] interrupted

I get this exception one per broker. I get three exceptions because we have
three brokers.

I am calling KafakStream.clear() just before breaking the loop.

Is there any way to break the stream cleanly? Or am I just suppose to catch
this exception?
(My fetch size if 1 MB and batch size is small for now - 5. But I  don't
think we can ever match batch size with fetch size accurately because each
message size is not fixed).

Regards,
Vaibhav
On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Yes, it knows. The consumer offset is only advanced every time a message is
> iterated over.
>
> Thanks,
>
> Jun
>
> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED]
> >wrote:
>
> > The inner loop keeps running. If I break it in the middle, is Kafka
> broker
> > going to know that rest of the mesaages in the stream were not delivered?
> >
> > Regards,
> > Vaibhav
> > GumGum
> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote:
> >
> > > Hi all,
> > >
> > > Is there any way to get a fixed amount of messages using Zookeeper
> based
> > > consumer (ConsumerConnector)?
> > >
> > > I know that with SimpleConsumer you can pass fetchSize as an argument
> and
> > > limit the number of messages coming back.
> > >
> > > This sample code creates 4 threads that keep consuming forever.
> > >
> > >
> > > // consume the messages in the threads
> > > for(final KafkaStream<Message> stream: streams) {
> > >   executor.submit(new Runnable() {
> > >     public void run() {
> > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > >         // process message (msgAndMetadata.message())
> > >       }
> > >     }
> > >   });
> > > }
> > >
> > > Regards,
> > > Vaibhav
> > >
> > >
> > >
> >
>