Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Getting fixed amount of messages using Zookeeper based consumer


+
Vaibhav Puranik 2012-07-12, 00:05
+
Jun Rao 2012-07-12, 04:06
+
Vaibhav Puranik 2012-07-12, 05:03
+
Jun Rao 2012-07-12, 14:17
+
Felix GV 2012-09-15, 01:52
+
Jun Rao 2012-09-15, 05:39
+
Vaibhav Puranik 2012-07-12, 16:34
+
Neha Narkhede 2012-07-12, 17:26
Copy link to this message
-
Re: Getting fixed amount of messages using Zookeeper based consumer
Neha,

I tried calling shutdown on ConsumerConnector. The only problem I have with
it is that it forces me to close the connection thereby forcing me to open
the connection every single time I want to fetch new set of messages.
Here is my sample code - http://pastebin.com/6NBHtteL

But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's
possible for me to open the connection everytime. I don't think it's going
to be huge performance problem at our scale.

I am still getting those exceptions inspite of calling
ConsumerConnector.shutdown. *But I noticed that it's caught and handled by
Kafka code. It's being logged by Kafka with INFO log level. *It lets me
continue inspite of the exception.

Ideally I would have liked to have a cleaner INFO log. This puts a stack
trace in the Info log which if possible should be avoided. If you want I
can file a jira issue to get rid of it.

Regards,
Vaibhav
On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <[EMAIL PROTECTED]>wrote:

> Vaibhav,
>
> The cleaner way of exiting the consumer iterator loop is by calling
> the shutdown() API on the ZookeeperConsumerConnector. But probably
> there is a reason you are not able to use that approach ?
>
> Thanks,
> Neha
>
> On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]>
> wrote:
> > I tried breaking the loop in the middle and here is what I got:
> >
> > 10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  - Reconnect
> in
> > multifetch due to socket error:
> > java.nio.channels.ClosedByInterruptException
> >     at
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
> >     at kafka.utils.Utils$.read(Utils.scala:538)
> >     at
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> >     at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> >     at
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >     at
> kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> >     at
> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> >     at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> >     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> > 10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
> > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
> >
> > I get this exception one per broker. I get three exceptions because we
> have
> > three brokers.
> >
> > I am calling KafakStream.clear() just before breaking the loop.
> >
> > Is there any way to break the stream cleanly? Or am I just suppose to
> catch
> > this exception?
> > (My fetch size if 1 MB and batch size is small for now - 5. But I  don't
> > think we can ever match batch size with fetch size accurately because
> each
> > message size is not fixed).
> >
> > Regards,
> > Vaibhav
> >
> >
> > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> >> Yes, it knows. The consumer offset is only advanced every time a
> message is
> >> iterated over.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED]
> >> >wrote:
> >>
> >> > The inner loop keeps running. If I break it in the middle, is Kafka
> >> broker
> >> > going to know that rest of the mesaages in the stream were not
> delivered?
> >> >
> >> > Regards,
> >> > Vaibhav
> >> > GumGum
> >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]>
> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Is there any way to get a fixed amount of messages using Zookeeper
> >> based
> >> > > consumer (ConsumerConnector)?
> >> > >
> >> > > I know that with SimpleConsumer you can pass fetchSize as an
> argument
> >> and
> >> > > limit the number of messages coming back.
> >> > >
> >> > > This sample code creates 4 threads that keep consuming forever.
+
Jun Rao 2012-07-12, 18:33
+
Vaibhav Puranik 2012-07-12, 18:40