Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - Getting fixed amount of messages using Zookeeper based consumer


+
Vaibhav Puranik 2012-07-12, 00:05
+
Jun Rao 2012-07-12, 04:06
+
Vaibhav Puranik 2012-07-12, 05:03
+
Jun Rao 2012-07-12, 14:17
+
Felix GV 2012-09-15, 01:52
+
Jun Rao 2012-09-15, 05:39
+
Vaibhav Puranik 2012-07-12, 16:34
+
Neha Narkhede 2012-07-12, 17:26
+
Vaibhav Puranik 2012-07-12, 18:24
+
Jun Rao 2012-07-12, 18:33
Copy link to this message
-
Re: Getting fixed amount of messages using Zookeeper based consumer
Vaibhav Puranik 2012-07-12, 18:40
Jun,

The messages are always available. My requirement is to get them in batches
(without worrying about offsets) so that I can do batch aggregation.
I want to mark each batch with a batch id so that my bolts can commit the
aggregated results to the database.

Thanks for your quick reply,
Vaibhav

On Thu, Jul 12, 2012 at 11:33 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> If you don't want to shutdown the connector each time, you can also set
> consumer.timeout.ms. This way, the iterator hasNext will get an exception
> if no new messages are available after the timeout. The iterator is
> re-enterable.
>
> Thanks,
>
> Jun
>
> On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <[EMAIL PROTECTED]
> >wrote:
>
> > Neha,
> >
> > I tried calling shutdown on ConsumerConnector. The only problem I have
> with
> > it is that it forces me to close the connection thereby forcing me to
> open
> > the connection every single time I want to fetch new set of messages.
> > Here is my sample code - http://pastebin.com/6NBHtteL
> >
> > But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's
> > possible for me to open the connection everytime. I don't think it's
> going
> > to be huge performance problem at our scale.
> >
> > I am still getting those exceptions inspite of calling
> > ConsumerConnector.shutdown. *But I noticed that it's caught and handled
> by
> > Kafka code. It's being logged by Kafka with INFO log level. *It lets me
> > continue inspite of the exception.
> >
> > Ideally I would have liked to have a cleaner INFO log. This puts a stack
> > trace in the Info log which if possible should be avoided. If you want I
> > can file a jira issue to get rid of it.
> >
> > Regards,
> > Vaibhav
> >
> >
> >
> >
> > On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <[EMAIL PROTECTED]
> > >wrote:
> >
> > > Vaibhav,
> > >
> > > The cleaner way of exiting the consumer iterator loop is by calling
> > > the shutdown() API on the ZookeeperConsumerConnector. But probably
> > > there is a reason you are not able to use that approach ?
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]>
> > > wrote:
> > > > I tried breaking the loop in the middle and here is what I got:
> > > >
> > > > 10158 [FetchRunnable-0] INFO  kafka.consumer.SimpleConsumer  -
> > Reconnect
> > > in
> > > > multifetch due to socket error:
> > > > java.nio.channels.ClosedByInterruptException
> > > >     at
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
> > > >     at kafka.utils.Utils$.read(Utils.scala:538)
> > > >     at
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> > > >     at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> > > >     at
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > >     at
> > > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> > > >     at
> > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> > > >     at
> > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> > > >     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> > > > 10163 [FetchRunnable-0] INFO  kafka.consumer.FetcherRunnable  -
> > > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted
> > > >
> > > > I get this exception one per broker. I get three exceptions because
> we
> > > have
> > > > three brokers.
> > > >
> > > > I am calling KafakStream.clear() just before breaking the loop.
> > > >
> > > > Is there any way to break the stream cleanly? Or am I just suppose to
> > > catch
> > > > this exception?
> > > > (My fetch size if 1 MB and batch size is small for now - 5. But I
> >  don't
> > > > think we can ever match batch size with fetch size accurately because